1#![no_std]
7#![cfg_attr(target_arch = "aarch64", allow(unsafe_code))]
11
12extern crate alloc;
13
14pub mod bloom;
15pub mod halfvec;
16pub mod persistent;
17pub mod persistent_btree;
18pub mod quantize;
19pub mod row_locator;
20pub mod segment;
21
22pub use self::bloom::{BloomError, BloomFilter};
23pub use self::row_locator::{RowLocator, RowLocatorError};
24pub use self::segment::{
25 BRIN_SIDECAR_MAGIC, BrinSummary, OwnedSegment, SEGMENT_COMPRESS_ALGO_LZSS,
26 SEGMENT_COMPRESS_ALGO_NONE, SEGMENT_MAGIC, SEGMENT_MAGIC_V2, SEGMENT_PAGE_BYTES, SegmentError,
27 SegmentMeta, SegmentReader, derive_brin_summaries, encode_segment, wrap_v2_envelope,
28 wrap_v2_envelope_with_brin,
29};
30
31use alloc::collections::{BTreeMap, BTreeSet};
32use alloc::format;
33use alloc::string::String;
34use alloc::sync::Arc;
35use alloc::vec::Vec;
36use core::fmt;
37
38use self::persistent::PersistentVec;
39use self::persistent_btree::PersistentBTreeMap;
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
53pub enum VecEncoding {
54 #[default]
55 F32,
56 Sq8,
57 F16,
58}
59
60impl fmt::Display for VecEncoding {
61 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62 match self {
63 Self::F32 => f.write_str("F32"),
64 Self::Sq8 => f.write_str("SQ8"),
65 Self::F16 => f.write_str("HALF"),
66 }
67 }
68}
69
70#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub enum DataType {
75 SmallInt,
78 Int, BigInt, Float, Text,
82 Varchar(u32),
85 Char(u32),
89 Bool,
90 Vector {
96 dim: u32,
97 encoding: VecEncoding,
98 },
99 Numeric {
105 precision: u8,
106 scale: u8,
107 },
108 Date,
111 Timestamp,
114 Timestamptz,
122 Interval,
127 Json,
132 Jsonb,
138 Bytes,
146 TextArray,
155}
156
157impl fmt::Display for DataType {
158 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
159 match self {
160 Self::SmallInt => f.write_str("SMALLINT"),
161 Self::Int => f.write_str("INT"),
162 Self::BigInt => f.write_str("BIGINT"),
163 Self::Float => f.write_str("FLOAT"),
164 Self::Text => f.write_str("TEXT"),
165 Self::Varchar(n) => write!(f, "VARCHAR({n})"),
166 Self::Char(n) => write!(f, "CHAR({n})"),
167 Self::Bool => f.write_str("BOOL"),
168 Self::Vector { dim, encoding } => match encoding {
169 VecEncoding::F32 => write!(f, "VECTOR({dim})"),
170 VecEncoding::Sq8 => write!(f, "VECTOR({dim}) USING SQ8"),
171 VecEncoding::F16 => write!(f, "VECTOR({dim}) USING HALF"),
172 },
173 Self::Numeric { precision, scale } => {
174 if *scale == 0 {
175 write!(f, "NUMERIC({precision})")
176 } else {
177 write!(f, "NUMERIC({precision}, {scale})")
178 }
179 }
180 Self::Date => f.write_str("DATE"),
181 Self::Timestamp => f.write_str("TIMESTAMP"),
182 Self::Timestamptz => f.write_str("TIMESTAMPTZ"),
183 Self::Interval => f.write_str("INTERVAL"),
184 Self::Json => f.write_str("JSON"),
185 Self::Jsonb => f.write_str("JSONB"),
186 Self::Bytes => f.write_str("BYTEA"),
187 Self::TextArray => f.write_str("TEXT[]"),
188 }
189 }
190}
191
192#[derive(Debug, Clone, PartialEq)]
196#[non_exhaustive]
197pub enum Value {
198 SmallInt(i16),
199 Int(i32),
200 BigInt(i64),
201 Float(f64),
202 Text(String),
203 Bool(bool),
204 Vector(Vec<f32>),
205 Sq8Vector(crate::quantize::Sq8Vector),
212 HalfVector(crate::halfvec::HalfVector),
218 Numeric {
222 scaled: i128,
223 scale: u8,
224 },
225 Date(i32),
227 Timestamp(i64),
229 Interval {
232 months: i32,
233 micros: i64,
234 },
235 Json(String),
239 Bytes(Vec<u8>),
245 TextArray(Vec<Option<String>>),
251 Null,
252}
253
254impl Value {
255 pub fn data_type(&self) -> Option<DataType> {
257 match self {
258 Self::SmallInt(_) => Some(DataType::SmallInt),
259 Self::Int(_) => Some(DataType::Int),
260 Self::BigInt(_) => Some(DataType::BigInt),
261 Self::Float(_) => Some(DataType::Float),
262 Self::Text(_) => Some(DataType::Text),
265 Self::Bool(_) => Some(DataType::Bool),
266 Self::Vector(v) => Some(DataType::Vector {
267 dim: u32::try_from(v.len()).expect("vector dim ≤ u32"),
268 encoding: VecEncoding::F32,
269 }),
270 Self::Sq8Vector(q) => Some(DataType::Vector {
271 dim: u32::try_from(q.bytes.len()).expect("vector dim ≤ u32"),
272 encoding: VecEncoding::Sq8,
273 }),
274 Self::HalfVector(h) => Some(DataType::Vector {
275 dim: u32::try_from(h.dim()).expect("vector dim ≤ u32"),
276 encoding: VecEncoding::F16,
277 }),
278 Self::Numeric { scale, .. } => Some(DataType::Numeric {
283 precision: 0,
284 scale: *scale,
285 }),
286 Self::Date(_) => Some(DataType::Date),
287 Self::Timestamp(_) => Some(DataType::Timestamp),
288 Self::Interval { .. } => Some(DataType::Interval),
289 Self::Json(_) => Some(DataType::Json),
290 Self::Bytes(_) => Some(DataType::Bytes),
291 Self::TextArray(_) => Some(DataType::TextArray),
292 Self::Null => None,
293 }
294 }
295
296 pub const fn is_null(&self) -> bool {
297 matches!(self, Self::Null)
298 }
299}
300
301#[derive(Debug, Clone, PartialEq)]
304pub struct Row {
305 pub values: Vec<Value>,
306}
307
308impl Row {
309 pub const fn new(values: Vec<Value>) -> Self {
310 Self { values }
311 }
312
313 pub fn len(&self) -> usize {
314 self.values.len()
315 }
316
317 pub fn is_empty(&self) -> bool {
318 self.values.is_empty()
319 }
320}
321
322#[derive(Debug, Clone, PartialEq)]
323pub struct ColumnSchema {
324 pub name: String,
325 pub ty: DataType,
326 pub nullable: bool,
327 pub default: Option<Value>,
332 pub runtime_default: Option<String>,
340 pub auto_increment: bool,
344}
345
346#[derive(Debug, Clone, PartialEq)]
347pub struct TableSchema {
348 pub name: String,
349 pub columns: Vec<ColumnSchema>,
350 pub hot_tier_bytes: Option<u64>,
356 pub foreign_keys: Vec<ForeignKeyConstraint>,
363 pub uniqueness_constraints: Vec<UniquenessConstraint>,
370}
371
372#[derive(Debug, Clone, PartialEq, Eq)]
377pub struct UniquenessConstraint {
378 pub is_primary_key: bool,
383 pub columns: Vec<usize>,
387}
388
389#[derive(Debug, Clone, PartialEq, Eq)]
394pub struct ForeignKeyConstraint {
395 pub name: Option<String>,
399 pub local_columns: Vec<usize>,
402 pub parent_table: String,
404 pub parent_columns: Vec<usize>,
409 pub on_delete: FkAction,
411 pub on_update: FkAction,
414}
415
416#[derive(Debug, Clone, Copy, PartialEq, Eq)]
418pub enum FkAction {
419 Restrict,
420 Cascade,
421 SetNull,
422 SetDefault,
423 NoAction,
424}
425
426impl FkAction {
427 pub const fn tag(self) -> u8 {
429 match self {
430 Self::Restrict => 0,
431 Self::Cascade => 1,
432 Self::SetNull => 2,
433 Self::SetDefault => 3,
434 Self::NoAction => 4,
435 }
436 }
437 pub const fn from_tag(b: u8) -> Option<Self> {
438 Some(match b {
439 0 => Self::Restrict,
440 1 => Self::Cascade,
441 2 => Self::SetNull,
442 3 => Self::SetDefault,
443 4 => Self::NoAction,
444 _ => return None,
445 })
446 }
447}
448
449impl TableSchema {
450 pub fn column_position(&self, name: &str) -> Option<usize> {
451 self.columns.iter().position(|c| c.name == name)
452 }
453}
454
455#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
460pub enum IndexKey {
461 Int(i64),
462 Text(String),
463 Bool(bool),
464}
465
466impl IndexKey {
467 pub fn from_value(v: &Value) -> Option<Self> {
468 match v {
469 Value::SmallInt(n) => Some(Self::Int(i64::from(*n))),
470 Value::Int(n) => Some(Self::Int(i64::from(*n))),
471 Value::BigInt(n) => Some(Self::Int(*n)),
472 Value::Text(s) => Some(Self::Text(s.clone())),
473 Value::Bool(b) => Some(Self::Bool(*b)),
474 Value::Date(d) => Some(Self::Int(i64::from(*d))),
477 Value::Timestamp(t) => Some(Self::Int(*t)),
478 Value::Null
483 | Value::Float(_)
484 | Value::Vector(_)
485 | Value::Sq8Vector(_)
486 | Value::HalfVector(_)
487 | Value::Numeric { .. }
488 | Value::Interval { .. }
489 | Value::Json(_)
490 | Value::Bytes(_)
491 | Value::TextArray(_) => None,
492 }
493 }
494}
495
496#[derive(Debug, Clone)]
501pub struct Index {
502 pub name: String,
503 pub column_position: usize,
504 pub kind: IndexKind,
505 pub included_columns: Vec<usize>,
515 pub partial_predicate: Option<String>,
522 pub expression: Option<String>,
527 pub is_unique: bool,
534 pub extra_column_positions: Vec<usize>,
543}
544
545pub const NSW_DEFAULT_M: usize = 16;
548
549#[derive(Debug, Clone)]
557pub struct FreezeReport {
558 pub segment_id: u32,
561 pub frozen_rows: usize,
564 pub bytes_freed: u64,
568 pub segment_bytes: Vec<u8>,
573}
574
575#[derive(Debug, Clone)]
584pub struct FreezeSlice {
585 pub row_range: core::ops::Range<usize>,
590 pub rows: Vec<(u64, Vec<u8>, IndexKey)>,
596}
597
598#[derive(Debug, Clone)]
614pub struct CompactReport {
615 pub sources: Vec<u32>,
617 pub merged_segment_id: Option<u32>,
619 pub merged_segment_bytes: Vec<u8>,
621 pub merged_rows: usize,
623 pub deleted_rows_pruned: usize,
628 pub bytes_reclaimed_estimate: u64,
632}
633
634#[derive(Debug, Clone)]
635pub enum IndexKind {
636 BTree(PersistentBTreeMap<IndexKey, Vec<RowLocator>>),
653 Nsw(NswGraph),
655 Brin {
662 column_type: DataType,
666 },
667}
668
669#[derive(Debug, Clone)]
678pub struct NswGraph {
679 pub m: usize,
681 pub m_max_0: usize,
684 pub entry: Option<usize>,
687 pub entry_level: u8,
689 pub levels: PersistentVec<u8>,
696 pub layers: Vec<PersistentVec<Vec<u32>>>,
712}
713
714impl NswGraph {
715 fn new(m: usize) -> Self {
716 Self {
717 m,
718 m_max_0: m.saturating_mul(2),
719 entry: None,
720 entry_level: 0,
721 levels: PersistentVec::new(),
722 layers: alloc::vec![PersistentVec::new()],
723 }
724 }
725
726 pub const fn cap_for_layer(&self, layer: u8) -> usize {
728 if layer == 0 { self.m_max_0 } else { self.m }
729 }
730}
731
732#[allow(clippy::verbose_bit_mask)] pub fn nsw_assign_level(row_idx: usize) -> u8 {
739 const MAX_LEVEL: u8 = 7; let mut x = (row_idx as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
742 x ^= x >> 30;
743 x = x.wrapping_mul(0xBF58_476D_1CE4_E5B9);
744 x ^= x >> 27;
745 x = x.wrapping_mul(0x94D0_49BB_1331_11EB);
746 x ^= x >> 31;
747 let mut level: u8 = 0;
752 while x & 0xF == 0 && level < MAX_LEVEL {
753 level += 1;
754 x >>= 4;
755 }
756 level
757}
758
759impl Index {
760 fn new_btree(name: String, column_position: usize) -> Self {
761 Self {
762 name,
763 column_position,
764 kind: IndexKind::BTree(PersistentBTreeMap::new()),
765 included_columns: Vec::new(),
766 partial_predicate: None,
767 expression: None,
768 is_unique: false,
769 extra_column_positions: Vec::new(),
770 }
771 }
772
773 fn new_nsw(name: String, column_position: usize, m: usize) -> Self {
774 Self {
775 name,
776 column_position,
777 kind: IndexKind::Nsw(NswGraph::new(m)),
778 included_columns: Vec::new(),
779 partial_predicate: None,
780 expression: None,
781 is_unique: false,
782 extra_column_positions: Vec::new(),
783 }
784 }
785
786 fn new_brin(name: String, column_position: usize, column_type: DataType) -> Self {
790 Self {
791 name,
792 column_position,
793 kind: IndexKind::Brin { column_type },
794 included_columns: Vec::new(),
795 partial_predicate: None,
796 expression: None,
797 is_unique: false,
798 extra_column_positions: Vec::new(),
799 }
800 }
801
802 pub fn lookup_eq(&self, key: &IndexKey) -> &[RowLocator] {
811 match &self.kind {
812 IndexKind::BTree(m) => m.get(key).map_or(&[][..], Vec::as_slice),
813 IndexKind::Nsw(_) | IndexKind::Brin { .. } => &[][..],
815 }
816 }
817
818 pub const fn nsw(&self) -> Option<&NswGraph> {
821 match &self.kind {
822 IndexKind::Nsw(g) => Some(g),
823 IndexKind::BTree(_) | IndexKind::Brin { .. } => None,
824 }
825 }
826
827 pub const fn is_brin(&self) -> bool {
832 matches!(self.kind, IndexKind::Brin { .. })
833 }
834}
835
836#[derive(Debug, Clone)]
852pub struct Table {
853 schema: TableSchema,
854 rows: PersistentVec<Row>,
855 indices: Vec<Index>,
856 hot_bytes: u64,
857 cold_row_count: u64,
871 cold_row_count_stale: bool,
876}
877
878impl Table {
879 pub fn new(schema: TableSchema) -> Self {
880 Self {
881 schema,
882 rows: PersistentVec::new(),
883 indices: Vec::new(),
884 hot_bytes: 0,
885 cold_row_count: 0,
886 cold_row_count_stale: false,
887 }
888 }
889
890 #[must_use]
894 pub const fn hot_bytes(&self) -> u64 {
895 self.hot_bytes
896 }
897
898 #[must_use]
901 pub const fn cold_row_count(&self) -> u64 {
902 self.cold_row_count
903 }
904
905 pub fn set_cold_row_count(&mut self, n: u64) {
908 self.cold_row_count = n;
909 self.cold_row_count_stale = false;
910 }
911
912 pub fn mark_cold_row_count_stale(&mut self) {
917 self.cold_row_count_stale = true;
918 }
919
920 #[must_use]
924 pub const fn cold_row_count_stale(&self) -> bool {
925 self.cold_row_count_stale
926 }
927
928 #[must_use]
939 pub fn count_cold_locators(&self) -> u64 {
940 let mut best: u64 = 0;
941 for idx in &self.indices {
942 if let IndexKind::BTree(map) = &idx.kind {
943 let n: u64 = map
944 .iter()
945 .map(|(_, locs)| locs.iter().filter(|l| l.is_cold()).count() as u64)
946 .sum();
947 if n > best {
948 best = n;
949 }
950 }
951 }
952 best
953 }
954
955 pub const fn schema(&self) -> &TableSchema {
956 &self.schema
957 }
958
959 pub const fn schema_mut(&mut self) -> &mut TableSchema {
963 &mut self.schema
964 }
965
966 pub const fn rows(&self) -> &PersistentVec<Row> {
970 &self.rows
971 }
972
973 pub const fn row_count(&self) -> usize {
974 self.rows.len()
975 }
976
977 pub fn indices_mut(&mut self) -> &mut [Index] {
982 &mut self.indices
983 }
984
985 pub fn indices(&self) -> &[Index] {
986 &self.indices
987 }
988
989 pub fn next_auto_value(&self, col_pos: usize) -> Option<i64> {
995 let ty = self.schema.columns.get(col_pos)?.ty;
996 if !matches!(ty, DataType::SmallInt | DataType::Int | DataType::BigInt) {
997 return None;
998 }
999 let mut max: Option<i64> = None;
1000 for row in &self.rows {
1001 match row.values.get(col_pos) {
1002 Some(Value::SmallInt(n)) => {
1003 let v = i64::from(*n);
1004 max = Some(max.map_or(v, |m| m.max(v)));
1005 }
1006 Some(Value::Int(n)) => {
1007 let v = i64::from(*n);
1008 max = Some(max.map_or(v, |m| m.max(v)));
1009 }
1010 Some(Value::BigInt(n)) => {
1011 max = Some(max.map_or(*n, |m| m.max(*n)));
1012 }
1013 _ => {}
1014 }
1015 }
1016 Some(max.map_or(1, |m| m + 1))
1017 }
1018
1019 pub fn index_on(&self, column_position: usize) -> Option<&Index> {
1023 self.indices
1030 .iter()
1031 .find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::BTree(_)))
1032 .or_else(|| {
1033 self.indices
1034 .iter()
1035 .find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::Nsw(_)))
1036 })
1037 }
1038
1039 pub fn insert(&mut self, row: Row) -> Result<(), StorageError> {
1043 if row.len() != self.schema.columns.len() {
1044 return Err(StorageError::ArityMismatch {
1045 expected: self.schema.columns.len(),
1046 actual: row.len(),
1047 });
1048 }
1049 for (i, (val, col)) in row.values.iter().zip(&self.schema.columns).enumerate() {
1050 if val.is_null() {
1051 if !col.nullable {
1052 return Err(StorageError::NullInNotNull {
1053 column: col.name.clone(),
1054 });
1055 }
1056 continue;
1057 }
1058 let actual = val.data_type().expect("non-null");
1059 let compatible = actual == col.ty
1073 || matches!(
1074 (actual, col.ty),
1075 (
1076 DataType::Text,
1077 DataType::Varchar(_) | DataType::Char(_) | DataType::Json | DataType::Jsonb
1078 ) | (DataType::Json | DataType::Jsonb, DataType::Text)
1079 | (DataType::Json, DataType::Jsonb) | (DataType::Jsonb, DataType::Json)
1080 | (DataType::Timestamp, DataType::Timestamptz)
1081 | (DataType::Timestamptz, DataType::Timestamp)
1082 )
1083 || matches!(
1084 (actual, col.ty),
1085 (
1086 DataType::Numeric { scale: a, .. },
1087 DataType::Numeric { scale: b, .. },
1088 ) if a == b
1089 );
1090 if !compatible {
1091 return Err(StorageError::TypeMismatch {
1092 column: col.name.clone(),
1093 expected: col.ty,
1094 actual,
1095 position: i,
1096 });
1097 }
1098 }
1099 let new_row_idx = self.rows.len();
1100 for idx in &mut self.indices {
1104 if let IndexKind::BTree(map) = &mut idx.kind
1105 && let Some(key) = IndexKey::from_value(&row.values[idx.column_position])
1106 {
1107 let mut entries = map.get(&key).cloned().unwrap_or_default();
1113 entries.push(RowLocator::Hot(new_row_idx));
1114 map.insert_mut(key, entries);
1115 }
1116 }
1117 self.hot_bytes = self
1120 .hot_bytes
1121 .saturating_add(row_body_encoded_len(&row, &self.schema) as u64);
1122 self.rows.push_mut(row);
1127 let new_row_idx = self.rows.len() - 1;
1130 let nsw_targets: Vec<usize> = self
1131 .indices
1132 .iter()
1133 .enumerate()
1134 .filter_map(|(i, idx)| {
1135 if matches!(idx.kind, IndexKind::Nsw(_)) {
1136 Some(i)
1137 } else {
1138 None
1139 }
1140 })
1141 .collect();
1142 for idx_pos in nsw_targets {
1143 nsw_insert_at(self, idx_pos, new_row_idx);
1144 }
1145 Ok(())
1146 }
1147
1148 pub fn add_index(&mut self, name: String, column_name: &str) -> Result<(), StorageError> {
1152 if self.indices.iter().any(|i| i.name == name) {
1153 return Err(StorageError::DuplicateIndex { name });
1154 }
1155 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1156 StorageError::ColumnNotFound {
1157 column: column_name.into(),
1158 }
1159 })?;
1160 let mut idx = Index::new_btree(name, column_position);
1161 if let IndexKind::BTree(map) = &mut idx.kind {
1162 for (i, row) in self.rows.iter().enumerate() {
1163 if let Some(key) = IndexKey::from_value(&row.values[column_position]) {
1164 let mut entries = map.get(&key).cloned().unwrap_or_default();
1165 entries.push(RowLocator::Hot(i));
1166 map.insert_mut(key, entries);
1167 }
1168 }
1169 }
1170 self.indices.push(idx);
1171 Ok(())
1172 }
1173
1174 pub fn add_nsw_index(
1179 &mut self,
1180 name: String,
1181 column_name: &str,
1182 m: usize,
1183 ) -> Result<(), StorageError> {
1184 self.add_nsw_index_inner(name, column_name, m, None)
1185 }
1186
1187 pub fn rebuild_nsw_index(
1199 &mut self,
1200 name: &str,
1201 new_encoding: Option<VecEncoding>,
1202 ) -> Result<(), StorageError> {
1203 let idx_pos = self
1204 .indices
1205 .iter()
1206 .position(|i| i.name == name)
1207 .ok_or_else(|| StorageError::IndexNotFound {
1208 name: String::from(name),
1209 })?;
1210 let col_pos = self.indices[idx_pos].column_position;
1211 let m = match &self.indices[idx_pos].kind {
1212 IndexKind::Nsw(g) => g.m,
1213 IndexKind::BTree(_) | IndexKind::Brin { .. } => {
1214 return Err(StorageError::Unsupported(format!(
1215 "ALTER INDEX REBUILD on non-NSW index {name:?} — only NSW indexes can rebuild"
1216 )));
1217 }
1218 };
1219 let col_name = self.schema.columns[col_pos].name.clone();
1220 if let Some(target) = new_encoding {
1223 let current = match self.schema.columns[col_pos].ty {
1224 DataType::Vector { encoding, .. } => encoding,
1225 ref other => {
1226 return Err(StorageError::Unsupported(format!(
1227 "ALTER INDEX REBUILD WITH (encoding=…) on non-vector column type {other:?}"
1228 )));
1229 }
1230 };
1231 if target != current {
1232 let DataType::Vector { dim, .. } = self.schema.columns[col_pos].ty else {
1233 unreachable!("checked above")
1234 };
1235 let n = self.rows.len();
1236 for i in 0..n {
1237 let row = self
1238 .rows
1239 .get_mut(i)
1240 .expect("row index in bounds (we iterated up to len())");
1241 let cell = core::mem::replace(&mut row.values[col_pos], Value::Null);
1242 let recoded = recode_vector_cell(cell, target)?;
1243 row.values[col_pos] = recoded;
1244 }
1245 self.schema.columns[col_pos].ty = DataType::Vector {
1246 dim,
1247 encoding: target,
1248 };
1249 }
1250 }
1251 self.indices.remove(idx_pos);
1253 self.add_nsw_index_inner(String::from(name), &col_name, m, None)?;
1254 Ok(())
1255 }
1256
1257 pub fn restore_nsw_index(
1262 &mut self,
1263 name: String,
1264 column_name: &str,
1265 graph: NswGraph,
1266 ) -> Result<(), StorageError> {
1267 self.add_nsw_index_inner(name, column_name, graph.m, Some(graph))
1268 }
1269
1270 pub fn restore_btree_index(
1277 &mut self,
1278 name: String,
1279 column_name: &str,
1280 map: PersistentBTreeMap<IndexKey, Vec<RowLocator>>,
1281 ) -> Result<(), StorageError> {
1282 if self.indices.iter().any(|i| i.name == name) {
1283 return Err(StorageError::DuplicateIndex { name });
1284 }
1285 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1286 StorageError::ColumnNotFound {
1287 column: column_name.into(),
1288 }
1289 })?;
1290 self.indices.push(Index {
1291 name,
1292 column_position,
1293 kind: IndexKind::BTree(map),
1294 included_columns: Vec::new(),
1295 partial_predicate: None,
1296 expression: None,
1297 is_unique: false,
1298 extra_column_positions: Vec::new(),
1299 });
1300 Ok(())
1301 }
1302
1303 pub fn restore_brin_index(
1308 &mut self,
1309 name: String,
1310 column_name: &str,
1311 column_type: DataType,
1312 ) -> Result<(), StorageError> {
1313 if self.indices.iter().any(|i| i.name == name) {
1314 return Err(StorageError::DuplicateIndex { name });
1315 }
1316 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1317 StorageError::ColumnNotFound {
1318 column: column_name.into(),
1319 }
1320 })?;
1321 self.indices.push(Index::new_brin(name, column_position, column_type));
1322 Ok(())
1323 }
1324
1325 pub fn add_brin_index(
1329 &mut self,
1330 name: String,
1331 column_name: &str,
1332 ) -> Result<(), StorageError> {
1333 if self.indices.iter().any(|i| i.name == name) {
1334 return Err(StorageError::DuplicateIndex { name });
1335 }
1336 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1337 StorageError::ColumnNotFound {
1338 column: column_name.into(),
1339 }
1340 })?;
1341 let column_type = self.schema.columns[column_position].ty;
1342 self.indices.push(Index::new_brin(name, column_position, column_type));
1343 Ok(())
1344 }
1345
1346 pub fn register_cold_locators<I>(
1363 &mut self,
1364 index_name: &str,
1365 locators: I,
1366 ) -> Result<usize, StorageError>
1367 where
1368 I: IntoIterator<Item = (IndexKey, RowLocator)>,
1369 {
1370 let idx = self
1371 .indices
1372 .iter_mut()
1373 .find(|i| i.name == index_name)
1374 .ok_or_else(|| StorageError::Corrupt(format!("index {index_name:?} not found")))?;
1375 let map = match &mut idx.kind {
1376 IndexKind::BTree(map) => map,
1377 IndexKind::Nsw(_) | IndexKind::Brin { .. } => {
1378 return Err(StorageError::Corrupt(format!(
1379 "index {index_name:?} is not BTree; cold locators apply only to BTree indices"
1380 )));
1381 }
1382 };
1383 let mut count = 0usize;
1384 for (key, locator) in locators {
1385 let mut entries = map.get(&key).cloned().unwrap_or_default();
1386 entries.push(locator);
1387 map.insert_mut(key, entries);
1388 count += 1;
1389 }
1390 Ok(count)
1391 }
1392
1393 pub fn remove_cold_locators_for_key(
1403 &mut self,
1404 index_name: &str,
1405 key: &IndexKey,
1406 ) -> Result<usize, StorageError> {
1407 let idx = self
1408 .indices
1409 .iter_mut()
1410 .find(|i| i.name == index_name)
1411 .ok_or_else(|| {
1412 StorageError::Corrupt(format!(
1413 "remove_cold_locators_for_key: index {index_name:?} not found"
1414 ))
1415 })?;
1416 let map = match &mut idx.kind {
1417 IndexKind::BTree(map) => map,
1418 IndexKind::Nsw(_) | IndexKind::Brin { .. } => {
1419 return Err(StorageError::Corrupt(format!(
1420 "remove_cold_locators_for_key: index {index_name:?} is not BTree; \
1421 cold locators apply only to BTree indices"
1422 )));
1423 }
1424 };
1425 let Some(entries) = map.get(key) else {
1426 return Ok(0);
1427 };
1428 let mut kept: Vec<RowLocator> =
1429 entries.iter().copied().filter(RowLocator::is_hot).collect();
1430 let removed = entries.len() - kept.len();
1431 if removed == 0 {
1432 return Ok(0);
1433 }
1434 kept.shrink_to_fit();
1435 map.insert_mut(key.clone(), kept);
1443 Ok(removed)
1444 }
1445
1446 pub fn delete_rows(&mut self, positions: &[usize]) -> usize {
1452 if positions.is_empty() {
1453 return 0;
1454 }
1455 let mut to_remove = alloc::vec![false; self.rows.len()];
1459 let mut removed = 0;
1460 for &p in positions {
1461 if p < to_remove.len() && !to_remove[p] {
1462 to_remove[p] = true;
1463 removed += 1;
1464 }
1465 }
1466 let mut new_rows: PersistentVec<Row> = PersistentVec::new();
1467 let mut removed_bytes: u64 = 0;
1468 for (i, row) in self.rows.iter().enumerate() {
1469 if to_remove[i] {
1470 removed_bytes =
1471 removed_bytes.saturating_add(row_body_encoded_len(row, &self.schema) as u64);
1472 } else {
1473 new_rows.push_mut(row.clone());
1474 }
1475 }
1476 self.rows = new_rows;
1477 self.hot_bytes = self.hot_bytes.saturating_sub(removed_bytes);
1478 self.rebuild_indices();
1479 removed
1480 }
1481
1482 pub fn update_row(
1488 &mut self,
1489 position: usize,
1490 new_values: Vec<Value>,
1491 ) -> Result<(), StorageError> {
1492 if position >= self.rows.len() {
1493 return Err(StorageError::Corrupt(alloc::format!(
1494 "update_row: position {position} out of bounds (rows={})",
1495 self.rows.len()
1496 )));
1497 }
1498 if new_values.len() != self.schema.columns.len() {
1499 return Err(StorageError::ArityMismatch {
1500 expected: self.schema.columns.len(),
1501 actual: new_values.len(),
1502 });
1503 }
1504 for (i, (val, col)) in new_values.iter().zip(&self.schema.columns).enumerate() {
1508 if val.is_null() {
1509 if !col.nullable {
1510 return Err(StorageError::NullInNotNull {
1511 column: col.name.clone(),
1512 });
1513 }
1514 continue;
1515 }
1516 let actual = val.data_type().expect("non-null");
1517 let compatible = actual == col.ty
1518 || matches!(
1519 (actual, col.ty),
1520 (
1521 DataType::Text,
1522 DataType::Varchar(_) | DataType::Char(_) | DataType::Json | DataType::Jsonb
1523 ) | (DataType::Json | DataType::Jsonb, DataType::Text)
1524 | (DataType::Json, DataType::Jsonb) | (DataType::Jsonb, DataType::Json)
1525 | (DataType::Timestamp, DataType::Timestamptz)
1526 | (DataType::Timestamptz, DataType::Timestamp)
1527 )
1528 || matches!(
1529 (actual, col.ty),
1530 (
1531 DataType::Numeric { scale: a, .. },
1532 DataType::Numeric { scale: b, .. },
1533 ) if a == b
1534 );
1535 if !compatible {
1536 return Err(StorageError::TypeMismatch {
1537 column: col.name.clone(),
1538 expected: col.ty,
1539 actual,
1540 position: i,
1541 });
1542 }
1543 }
1544 let old_row = self
1545 .rows
1546 .get(position)
1547 .expect("position bounds-checked above");
1548 let old_bytes = row_body_encoded_len(old_row, &self.schema) as u64;
1549 let new_row = Row::new(new_values);
1550 let new_bytes = row_body_encoded_len(&new_row, &self.schema) as u64;
1551 self.rows = self
1552 .rows
1553 .set(position, new_row)
1554 .expect("position bounds-checked above");
1555 self.hot_bytes = self
1556 .hot_bytes
1557 .saturating_sub(old_bytes)
1558 .saturating_add(new_bytes);
1559 self.rebuild_indices();
1560 Ok(())
1561 }
1562
1563 fn rebuild_indices(&mut self) {
1570 let preserved_cold: Vec<(String, Vec<(IndexKey, RowLocator)>)> = self
1579 .indices
1580 .iter()
1581 .filter_map(|idx| match &idx.kind {
1582 IndexKind::BTree(map) => {
1583 let cold: Vec<(IndexKey, RowLocator)> = map
1584 .iter()
1585 .flat_map(|(k, locs)| {
1586 locs.iter()
1587 .filter(|l| l.is_cold())
1588 .copied()
1589 .map(move |l| (k.clone(), l))
1590 })
1591 .collect();
1592 if cold.is_empty() {
1593 None
1594 } else {
1595 Some((idx.name.clone(), cold))
1596 }
1597 }
1598 IndexKind::Nsw(_) | IndexKind::Brin { .. } => None,
1600 })
1601 .collect();
1602
1603 #[derive(Clone)]
1608 enum RebuildKind {
1609 BTree,
1610 Nsw(usize),
1611 Brin(DataType),
1612 }
1613 let descriptors: Vec<(String, usize, RebuildKind)> = self
1614 .indices
1615 .iter()
1616 .map(|idx| {
1617 let kind = match &idx.kind {
1618 IndexKind::Nsw(g) => RebuildKind::Nsw(g.m),
1619 IndexKind::Brin { column_type } => RebuildKind::Brin(*column_type),
1620 IndexKind::BTree(_) => RebuildKind::BTree,
1621 };
1622 (idx.name.clone(), idx.column_position, kind)
1623 })
1624 .collect();
1625 self.indices.clear();
1626 for (name, column_position, rebuild_kind) in descriptors {
1627 match rebuild_kind {
1628 RebuildKind::Nsw(m) => {
1629 let idx = Index::new_nsw(name, column_position, m);
1630 self.indices.push(idx);
1631 let idx_pos = self.indices.len() - 1;
1632 let row_indices: Vec<usize> = (0..self.rows.len()).collect();
1633 for row_idx in row_indices {
1634 nsw_insert_at(self, idx_pos, row_idx);
1635 }
1636 }
1637 RebuildKind::Brin(column_type) => {
1638 self.indices.push(Index::new_brin(name, column_position, column_type));
1641 }
1642 RebuildKind::BTree => {
1643 let mut idx = Index::new_btree(name, column_position);
1644 if let IndexKind::BTree(map) = &mut idx.kind {
1645 for (i, row) in self.rows.iter().enumerate() {
1646 if let Some(key) = IndexKey::from_value(&row.values[column_position]) {
1647 let mut entries = map.get(&key).cloned().unwrap_or_default();
1648 entries.push(RowLocator::Hot(i));
1649 map.insert_mut(key, entries);
1650 }
1651 }
1652 }
1653 self.indices.push(idx);
1654 }
1655 }
1656 }
1657
1658 for (idx_name, locators) in preserved_cold {
1663 let _ = self.register_cold_locators(&idx_name, locators);
1667 }
1668 }
1669
1670 fn add_nsw_index_inner(
1671 &mut self,
1672 name: String,
1673 column_name: &str,
1674 m: usize,
1675 restore: Option<NswGraph>,
1676 ) -> Result<(), StorageError> {
1677 if self.indices.iter().any(|i| i.name == name) {
1678 return Err(StorageError::DuplicateIndex { name });
1679 }
1680 let column_position = self.schema.column_position(column_name).ok_or_else(|| {
1681 StorageError::ColumnNotFound {
1682 column: column_name.into(),
1683 }
1684 })?;
1685 if !matches!(
1686 self.schema.columns[column_position].ty,
1687 DataType::Vector { .. }
1688 ) {
1689 return Err(StorageError::TypeMismatch {
1690 column: column_name.into(),
1691 expected: DataType::Vector {
1692 dim: 0,
1693 encoding: VecEncoding::F32,
1694 },
1695 actual: self.schema.columns[column_position].ty,
1696 position: column_position,
1697 });
1698 }
1699 if let Some(graph) = restore {
1700 self.indices.push(Index {
1701 name,
1702 column_position,
1703 kind: IndexKind::Nsw(graph),
1704 included_columns: Vec::new(),
1705 partial_predicate: None,
1706 expression: None,
1707 is_unique: false,
1708 extra_column_positions: Vec::new(),
1709 });
1710 return Ok(());
1711 }
1712 let idx = Index::new_nsw(name, column_position, m);
1713 self.indices.push(idx);
1714 let idx_pos = self.indices.len() - 1;
1715 let row_indices: Vec<usize> = (0..self.rows.len()).collect();
1718 for row_idx in row_indices {
1719 nsw_insert_at(self, idx_pos, row_idx);
1720 }
1721 Ok(())
1722 }
1723}
1724
1725fn recode_vector_cell(cell: Value, target: VecEncoding) -> Result<Value, StorageError> {
1732 if matches!(cell, Value::Null) {
1733 return Ok(cell);
1734 }
1735 let as_f32: Vec<f32> = match &cell {
1737 Value::Vector(v) => v.clone(),
1738 Value::Sq8Vector(q) => quantize::dequantize(q),
1739 Value::HalfVector(h) => h.to_f32_vec(),
1740 other => {
1741 return Err(StorageError::Unsupported(format!(
1742 "ALTER INDEX REBUILD: cannot recode non-vector cell {:?}",
1743 other.data_type()
1744 )));
1745 }
1746 };
1747 Ok(match target {
1752 VecEncoding::F32 => Value::Vector(as_f32),
1753 VecEncoding::Sq8 => Value::Sq8Vector(quantize::quantize(&as_f32)),
1754 VecEncoding::F16 => Value::HalfVector(halfvec::HalfVector::from_f32_slice(&as_f32)),
1755 })
1756}
1757
1758fn nsw_insert_at(table: &mut Table, idx_pos: usize, new_row_idx: usize) {
1765 let col_pos = table.indices[idx_pos].column_position;
1766 let cell_dim: Option<usize> = match &table.rows[new_row_idx].values[col_pos] {
1767 Value::Vector(v) => Some(v.len()),
1768 Value::Sq8Vector(q) => Some(q.bytes.len()),
1769 Value::HalfVector(h) => Some(h.dim()),
1770 _ => None,
1771 };
1772 let Some(dim) = cell_dim else {
1773 ensure_node_slot(table, idx_pos, new_row_idx, 0);
1776 return;
1777 };
1778 if dim == 0 {
1779 ensure_node_slot(table, idx_pos, new_row_idx, 0);
1780 return;
1781 }
1782 let level = nsw_assign_level(new_row_idx);
1783 ensure_node_slot(table, idx_pos, new_row_idx, level);
1784 let (entry, entry_level, m) = match &table.indices[idx_pos].kind {
1785 IndexKind::Nsw(g) => (g.entry, g.entry_level, g.m),
1786 IndexKind::BTree(_) | IndexKind::Brin { .. } => {
1787 unreachable!("nsw_insert_at on a non-NSW index")
1788 }
1789 };
1790 if entry.is_none() {
1792 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
1793 g.entry = Some(new_row_idx);
1794 g.entry_level = level;
1795 *g.levels
1796 .get_mut(new_row_idx)
1797 .expect("levels slot padded by ensure_node_slot") = level;
1798 }
1799 return;
1800 }
1801 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
1803 *g.levels
1804 .get_mut(new_row_idx)
1805 .expect("levels slot padded by ensure_node_slot") = level;
1806 }
1807 let query = match &table.rows[new_row_idx].values[col_pos] {
1808 Value::Vector(v) => v.clone(),
1809 Value::Sq8Vector(q) => quantize::dequantize(q),
1815 Value::HalfVector(h) => h.to_f32_vec(),
1818 _ => return,
1819 };
1820 let mut current = entry.expect("entry was Some above");
1823 let mut current_d = vec_l2_sq(table, col_pos, current, &query);
1824 if entry_level > level {
1825 for layer in (level + 1..=entry_level).rev() {
1826 (current, current_d) =
1827 greedy_layer_walk(table, idx_pos, layer, current, current_d, &query);
1828 }
1829 }
1830 let top = level.min(entry_level);
1834 let ef = (m * 2).max(8);
1835 for layer in (0..=top).rev() {
1836 let cap = if layer == 0 { m * 2 } else { m };
1837 let mut candidates = layer_beam_search(
1838 table,
1839 idx_pos,
1840 layer,
1841 current,
1842 current_d,
1843 &query,
1844 ef,
1845 NswMetric::L2,
1846 );
1847 candidates.retain(|&(_, n)| n != new_row_idx);
1848 if let Some(&(d, n)) = candidates.first() {
1851 current = n;
1852 current_d = d;
1853 }
1854 let peers = select_neighbours_heuristic(&candidates, cap, table, col_pos);
1855 connect_at_layer(table, idx_pos, layer, new_row_idx, &peers);
1856 }
1857 if level > entry_level
1860 && let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind
1861 {
1862 g.entry = Some(new_row_idx);
1863 g.entry_level = level;
1864 }
1865}
1866
1867fn ensure_node_slot(table: &mut Table, idx_pos: usize, new_row_idx: usize, level: u8) {
1871 let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind else {
1872 unreachable!("ensure_node_slot on a BTree index");
1873 };
1874 while g.layers.len() <= level as usize {
1875 g.layers.push(PersistentVec::new());
1876 }
1877 while g.levels.len() <= new_row_idx {
1878 g.levels.push_mut(0);
1879 }
1880 for layer_vec in &mut g.layers {
1881 while layer_vec.len() <= new_row_idx {
1882 layer_vec.push_mut(Vec::new());
1883 }
1884 }
1885}
1886
1887fn greedy_layer_walk(
1893 table: &Table,
1894 idx_pos: usize,
1895 layer: u8,
1896 mut current: usize,
1897 mut current_d: f32,
1898 query: &[f32],
1899) -> (usize, f32) {
1900 let g = match &table.indices[idx_pos].kind {
1901 IndexKind::Nsw(g) => g,
1902 IndexKind::BTree(_) | IndexKind::Brin { .. } => return (current, current_d),
1903 };
1904 let col_pos = table.indices[idx_pos].column_position;
1905 loop {
1906 let neighbours: &[u32] = g
1907 .layers
1908 .get(layer as usize)
1909 .and_then(|layer_v| layer_v.get(current))
1910 .map_or(&[][..], Vec::as_slice);
1911 let mut best = current;
1912 let mut best_d = current_d;
1913 for &n in neighbours {
1914 let n = n as usize;
1915 let d = vec_l2_sq(table, col_pos, n, query);
1916 if d < best_d {
1917 best = n;
1918 best_d = d;
1919 }
1920 }
1921 if best == current {
1922 return (current, current_d);
1923 }
1924 current = best;
1925 current_d = best_d;
1926 }
1927}
1928
1929#[allow(clippy::too_many_arguments)] fn layer_beam_search(
1942 table: &Table,
1943 idx_pos: usize,
1944 layer: u8,
1945 entry_node: usize,
1946 entry_d: f32,
1947 query: &[f32],
1948 ef: usize,
1949 metric: NswMetric,
1950) -> Vec<(f32, usize)> {
1951 let g = match &table.indices[idx_pos].kind {
1952 IndexKind::Nsw(g) => g,
1953 IndexKind::BTree(_) | IndexKind::Brin { .. } => return Vec::new(),
1954 };
1955 let col_pos = table.indices[idx_pos].column_position;
1956 let d0 = if matches!(metric, NswMetric::L2) {
1957 entry_d
1958 } else {
1959 cell_to_query_metric_distance(table, col_pos, entry_node, query, metric)
1960 };
1961 let row_count = table.rows.len();
1962 let mut visited: Vec<bool> = alloc::vec![false; row_count];
1963 if entry_node < row_count {
1964 visited[entry_node] = true;
1965 }
1966 let mut candidates: alloc::collections::BinaryHeap<NodeClosest> =
1969 alloc::collections::BinaryHeap::with_capacity(ef);
1970 let mut results: alloc::collections::BinaryHeap<NodeFurthest> =
1971 alloc::collections::BinaryHeap::with_capacity(ef);
1972 candidates.push(NodeClosest {
1973 dist: d0,
1974 node: entry_node,
1975 });
1976 results.push(NodeFurthest {
1977 dist: d0,
1978 node: entry_node,
1979 });
1980 while let Some(cur) = candidates.pop() {
1981 let worst = results.peek().map_or(f32::INFINITY, |c| c.dist);
1982 if cur.dist > worst && results.len() >= ef {
1983 break;
1984 }
1985 let neighbours: &[u32] = g
1986 .layers
1987 .get(layer as usize)
1988 .and_then(|layer_v| layer_v.get(cur.node))
1989 .map_or(&[][..], Vec::as_slice);
1990 for &n in neighbours {
1991 let n = n as usize;
1992 if n >= row_count || visited[n] {
1993 continue;
1994 }
1995 visited[n] = true;
1996 let dn = cell_to_query_metric_distance(table, col_pos, n, query, metric);
2000 if !dn.is_finite() {
2001 continue;
2002 }
2003 let worst = results.peek().map_or(f32::INFINITY, |c| c.dist);
2004 if results.len() < ef || dn < worst {
2005 results.push(NodeFurthest { dist: dn, node: n });
2006 if results.len() > ef {
2007 results.pop();
2008 }
2009 candidates.push(NodeClosest { dist: dn, node: n });
2010 }
2011 }
2012 }
2013 let mut out: Vec<(f32, usize)> = results.into_iter().map(|c| (c.dist, c.node)).collect();
2016 out.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2017 out
2018}
2019
2020#[derive(Debug, Clone, Copy)]
2024struct NodeClosest {
2025 dist: f32,
2026 node: usize,
2027}
2028impl PartialEq for NodeClosest {
2029 fn eq(&self, other: &Self) -> bool {
2030 self.dist == other.dist && self.node == other.node
2031 }
2032}
2033impl Eq for NodeClosest {}
2034impl PartialOrd for NodeClosest {
2035 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
2036 Some(self.cmp(other))
2037 }
2038}
2039impl Ord for NodeClosest {
2040 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
2041 other
2043 .dist
2044 .partial_cmp(&self.dist)
2045 .unwrap_or(core::cmp::Ordering::Equal)
2046 }
2047}
2048
2049#[derive(Debug, Clone, Copy)]
2052struct NodeFurthest {
2053 dist: f32,
2054 node: usize,
2055}
2056impl PartialEq for NodeFurthest {
2057 fn eq(&self, other: &Self) -> bool {
2058 self.dist == other.dist && self.node == other.node
2059 }
2060}
2061impl Eq for NodeFurthest {}
2062impl PartialOrd for NodeFurthest {
2063 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
2064 Some(self.cmp(other))
2065 }
2066}
2067impl Ord for NodeFurthest {
2068 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
2069 self.dist
2070 .partial_cmp(&other.dist)
2071 .unwrap_or(core::cmp::Ordering::Equal)
2072 }
2073}
2074
2075fn select_neighbours_heuristic(
2084 candidates: &[(f32, usize)],
2085 m: usize,
2086 table: &Table,
2087 col_pos: usize,
2088) -> Vec<usize> {
2089 let mut chosen: Vec<usize> = Vec::with_capacity(m);
2090 for &(d_q, e) in candidates {
2091 if chosen.len() >= m {
2092 break;
2093 }
2094 if !matches!(
2099 table.rows.get(e).and_then(|r| r.values.get(col_pos)),
2100 Some(Value::Vector(_) | Value::Sq8Vector(_) | Value::HalfVector(_))
2101 ) {
2102 continue;
2103 }
2104 let mut covered = false;
2105 for &r in &chosen {
2106 if cell_l2_sq(table, col_pos, e, r) < d_q {
2110 covered = true;
2111 break;
2112 }
2113 }
2114 if !covered {
2115 chosen.push(e);
2116 }
2117 }
2118 chosen
2119}
2120
2121fn connect_at_layer(
2125 table: &mut Table,
2126 idx_pos: usize,
2127 layer: u8,
2128 new_row_idx: usize,
2129 peers: &[usize],
2130) {
2131 let col_pos = table.indices[idx_pos].column_position;
2132 let cap = match &table.indices[idx_pos].kind {
2133 IndexKind::Nsw(g) => g.cap_for_layer(layer),
2134 IndexKind::BTree(_) | IndexKind::Brin { .. } => return,
2135 };
2136 let new_row_u32 = u32::try_from(new_row_idx).expect("row index fits in u32");
2141 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2142 let layer_v = &mut g.layers[layer as usize];
2143 if let Some(slot) = layer_v.get_mut(new_row_idx) {
2144 *slot = peers
2145 .iter()
2146 .map(|&p| u32::try_from(p).expect("row index fits in u32"))
2147 .collect();
2148 }
2149 }
2150 for &peer in peers {
2151 if !matches!(
2155 &table.rows[peer].values[col_pos],
2156 Value::Vector(_) | Value::Sq8Vector(_) | Value::HalfVector(_)
2157 ) {
2158 continue;
2159 }
2160 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind {
2162 let layer_v = &mut g.layers[layer as usize];
2163 if let Some(slot) = layer_v.get_mut(peer)
2164 && !slot.contains(&new_row_u32)
2165 {
2166 slot.push(new_row_u32);
2167 }
2168 }
2169 let needs_trim = match &table.indices[idx_pos].kind {
2173 IndexKind::Nsw(g) => g.layers[layer as usize][peer].len() > cap,
2174 IndexKind::BTree(_) | IndexKind::Brin { .. } => false,
2175 };
2176 if needs_trim {
2177 let current_peers: Vec<usize> = match &table.indices[idx_pos].kind {
2178 IndexKind::Nsw(g) => g.layers[layer as usize][peer]
2179 .iter()
2180 .map(|&n| n as usize)
2181 .collect(),
2182 IndexKind::BTree(_) | IndexKind::Brin { .. } => continue,
2183 };
2184 let mut tagged: Vec<(f32, usize)> = current_peers
2189 .iter()
2190 .map(|&p| (cell_l2_sq(table, col_pos, peer, p), p))
2191 .collect();
2192 tagged.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2193 let kept = select_neighbours_heuristic(&tagged, cap, table, col_pos);
2194 if let IndexKind::Nsw(g) = &mut table.indices[idx_pos].kind
2195 && let Some(slot) = g.layers[layer as usize].get_mut(peer)
2196 {
2197 *slot = kept
2198 .into_iter()
2199 .map(|p| u32::try_from(p).expect("row index fits in u32"))
2200 .collect();
2201 }
2202 }
2203 }
2204}
2205
2206fn vec_l2_sq(table: &Table, col_pos: usize, row: usize, query: &[f32]) -> f32 {
2213 match table.rows.get(row).and_then(|r| r.values.get(col_pos)) {
2214 Some(Value::Vector(v)) if v.len() == query.len() => l2_distance_sq(v, query),
2215 Some(Value::Sq8Vector(q)) if q.bytes.len() == query.len() => {
2216 quantize::sq8_l2_distance_sq_asymmetric(q, query)
2217 }
2218 Some(Value::HalfVector(h)) if h.dim() == query.len() => {
2222 halfvec::half_l2_distance_sq_asymmetric(h, query)
2223 }
2224 _ => f32::INFINITY,
2225 }
2226}
2227
2228fn cell_l2_sq(table: &Table, col_pos: usize, row_a: usize, row_b: usize) -> f32 {
2235 let Some(cell_a) = table.rows.get(row_a).and_then(|r| r.values.get(col_pos)) else {
2236 return f32::INFINITY;
2237 };
2238 let Some(cell_b) = table.rows.get(row_b).and_then(|r| r.values.get(col_pos)) else {
2239 return f32::INFINITY;
2240 };
2241 match (cell_a, cell_b) {
2242 (Value::Vector(a), Value::Vector(b)) if a.len() == b.len() => l2_distance_sq(a, b),
2243 (Value::Sq8Vector(a), Value::Sq8Vector(b)) if a.bytes.len() == b.bytes.len() => {
2244 quantize::sq8_l2_distance_sq(a, b)
2245 }
2246 (Value::HalfVector(a), Value::HalfVector(b)) if a.dim() == b.dim() => {
2251 halfvec::half_l2_distance_sq(a, b)
2252 }
2253 _ => f32::INFINITY,
2254 }
2255}
2256
2257fn cell_to_query_metric_distance(
2262 table: &Table,
2263 col_pos: usize,
2264 row: usize,
2265 query: &[f32],
2266 metric: NswMetric,
2267) -> f32 {
2268 match table.rows.get(row).and_then(|r| r.values.get(col_pos)) {
2269 Some(Value::Vector(v)) if v.len() == query.len() => metric_distance(metric, v, query),
2270 Some(Value::Sq8Vector(q)) if q.bytes.len() == query.len() => match metric {
2271 NswMetric::L2 => quantize::sq8_l2_distance_sq_asymmetric(q, query),
2272 NswMetric::InnerProduct => quantize::sq8_inner_product_asymmetric(q, query),
2273 NswMetric::Cosine => quantize::sq8_cosine_distance_asymmetric(q, query),
2274 },
2275 Some(Value::HalfVector(h)) if h.dim() == query.len() => match metric {
2278 NswMetric::L2 => halfvec::half_l2_distance_sq_asymmetric(h, query),
2279 NswMetric::InnerProduct => halfvec::half_inner_product_asymmetric(h, query),
2280 NswMetric::Cosine => halfvec::half_cosine_distance_asymmetric(h, query),
2281 },
2282 _ => f32::INFINITY,
2283 }
2284}
2285
2286#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2292pub enum NswMetric {
2293 L2,
2296 InnerProduct,
2299 Cosine,
2302}
2303
2304fn nsw_search(
2310 table: &Table,
2311 idx_pos: usize,
2312 query: &[f32],
2313 k: usize,
2314 ef: usize,
2315 metric: NswMetric,
2316) -> Vec<(f32, usize)> {
2317 let (entry, entry_level) = match &table.indices[idx_pos].kind {
2318 IndexKind::Nsw(g) => (g.entry, g.entry_level),
2319 IndexKind::BTree(_) | IndexKind::Brin { .. } => return Vec::new(),
2320 };
2321 let Some(entry) = entry else {
2322 return Vec::new();
2323 };
2324 let col_pos = table.indices[idx_pos].column_position;
2325 let sq8 = matches!(
2332 table.schema.columns.get(col_pos).map(|c| c.ty),
2333 Some(DataType::Vector {
2334 encoding: VecEncoding::Sq8,
2335 ..
2336 })
2337 );
2338 let ef = if sq8 {
2339 ef.max(k).max(k * SQ8_RERANK_OVER_FETCH)
2340 } else {
2341 ef.max(k)
2342 };
2343 let entry_d = vec_l2_sq(table, col_pos, entry, query);
2345 let mut current = entry;
2346 let mut current_d = entry_d;
2347 for layer in (1..=entry_level).rev() {
2348 (current, current_d) = greedy_layer_walk(table, idx_pos, layer, current, current_d, query);
2349 }
2350 let mut results = layer_beam_search(table, idx_pos, 0, current, current_d, query, ef, metric);
2352 if sq8 {
2353 results = sq8_rerank(table, col_pos, &results, query, metric);
2354 }
2355 results.truncate(k);
2356 results
2357}
2358
2359fn sq8_rerank(
2366 table: &Table,
2367 col_pos: usize,
2368 candidates: &[(f32, usize)],
2369 query: &[f32],
2370 metric: NswMetric,
2371) -> Vec<(f32, usize)> {
2372 let mut out: Vec<(f32, usize)> = candidates
2373 .iter()
2374 .filter_map(|&(adc_d, row)| {
2375 let cell = table.rows.get(row).and_then(|r| r.values.get(col_pos))?;
2376 let Value::Sq8Vector(q) = cell else {
2377 return Some((adc_d, row));
2381 };
2382 let deq = quantize::dequantize(q);
2383 if deq.len() != query.len() {
2384 return None;
2385 }
2386 Some((metric_distance(metric, &deq, query), row))
2387 })
2388 .collect();
2389 out.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
2390 out
2391}
2392
2393const SQ8_RERANK_OVER_FETCH: usize = 3;
2397
2398fn metric_distance(metric: NswMetric, a: &[f32], b: &[f32]) -> f32 {
2399 match metric {
2400 NswMetric::L2 => l2_distance_sq(a, b),
2401 NswMetric::InnerProduct => -inner_product_f32(a, b),
2402 NswMetric::Cosine => {
2403 let (dot, na, nb) = cosine_dot_norms_f32(a, b);
2404 if na == 0.0 || nb == 0.0 {
2405 return f32::INFINITY;
2406 }
2407 let denom = sqrt_newton_f32(na) * sqrt_newton_f32(nb);
2410 1.0 - dot / denom
2411 }
2412 }
2413}
2414
2415#[doc(hidden)]
2424#[inline]
2425pub fn inner_product_f32(a: &[f32], b: &[f32]) -> f32 {
2426 #[cfg(target_arch = "aarch64")]
2427 {
2428 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
2429 return unsafe { inner_product_neon(a, b) };
2432 }
2433 }
2434 inner_product_scalar(a, b)
2435}
2436
2437fn inner_product_scalar(a: &[f32], b: &[f32]) -> f32 {
2438 let mut dot: f32 = 0.0;
2439 for (x, y) in a.iter().zip(b.iter()) {
2440 dot += x * y;
2441 }
2442 dot
2443}
2444
2445#[cfg(target_arch = "aarch64")]
2446#[target_feature(enable = "neon")]
2447#[allow(clippy::many_single_char_names)] unsafe fn inner_product_neon(a: &[f32], b: &[f32]) -> f32 {
2449 use core::arch::aarch64::{
2450 float32x4_t, vaddq_f32, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32,
2451 };
2452 unsafe {
2453 let zero: float32x4_t = vdupq_n_f32(0.0);
2456 let mut acc0 = zero;
2457 let mut acc1 = zero;
2458 let n = a.len();
2459 let mut i = 0usize;
2460 while i + 8 <= n {
2461 let av0 = vld1q_f32(a.as_ptr().add(i));
2462 let bv0 = vld1q_f32(b.as_ptr().add(i));
2463 acc0 = vfmaq_f32(acc0, av0, bv0);
2464 let av1 = vld1q_f32(a.as_ptr().add(i + 4));
2465 let bv1 = vld1q_f32(b.as_ptr().add(i + 4));
2466 acc1 = vfmaq_f32(acc1, av1, bv1);
2467 i += 8;
2468 }
2469 while i + 4 <= n {
2470 let av = vld1q_f32(a.as_ptr().add(i));
2471 let bv = vld1q_f32(b.as_ptr().add(i));
2472 acc0 = vfmaq_f32(acc0, av, bv);
2473 i += 4;
2474 }
2475 vaddvq_f32(vaddq_f32(acc0, acc1))
2476 }
2477}
2478
2479#[doc(hidden)]
2486#[inline]
2487pub fn cosine_dot_norms_f32(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
2488 #[cfg(target_arch = "aarch64")]
2489 {
2490 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
2491 return unsafe { cosine_dot_norms_neon(a, b) };
2493 }
2494 }
2495 cosine_dot_norms_scalar(a, b)
2496}
2497
2498fn cosine_dot_norms_scalar(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
2499 let mut dot: f32 = 0.0;
2500 let mut na: f32 = 0.0;
2501 let mut nb: f32 = 0.0;
2502 for (x, y) in a.iter().zip(b.iter()) {
2503 dot += x * y;
2504 na += x * x;
2505 nb += y * y;
2506 }
2507 (dot, na, nb)
2508}
2509
2510#[cfg(target_arch = "aarch64")]
2511#[target_feature(enable = "neon")]
2512#[allow(clippy::many_single_char_names, clippy::similar_names)]
2513unsafe fn cosine_dot_norms_neon(a: &[f32], b: &[f32]) -> (f32, f32, f32) {
2514 use core::arch::aarch64::{float32x4_t, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32};
2515 unsafe {
2516 let zero: float32x4_t = vdupq_n_f32(0.0);
2517 let mut acc_dot = zero;
2518 let mut acc_na = zero;
2519 let mut acc_nb = zero;
2520 let n = a.len();
2521 let mut i = 0usize;
2522 while i + 4 <= n {
2523 let av = vld1q_f32(a.as_ptr().add(i));
2524 let bv = vld1q_f32(b.as_ptr().add(i));
2525 acc_dot = vfmaq_f32(acc_dot, av, bv);
2526 acc_na = vfmaq_f32(acc_na, av, av);
2527 acc_nb = vfmaq_f32(acc_nb, bv, bv);
2528 i += 4;
2529 }
2530 (vaddvq_f32(acc_dot), vaddvq_f32(acc_na), vaddvq_f32(acc_nb))
2531 }
2532}
2533
2534fn sqrt_newton_f32(x: f32) -> f32 {
2535 if x <= 0.0 {
2536 return 0.0;
2537 }
2538 let mut g = x;
2539 for _ in 0..10 {
2540 g = 0.5 * (g + x / g);
2541 }
2542 g
2543}
2544
2545#[inline]
2553fn l2_distance_sq(a: &[f32], b: &[f32]) -> f32 {
2554 #[cfg(target_arch = "aarch64")]
2555 {
2556 if a.len() == b.len() && a.len() >= 4 && a.len().is_multiple_of(4) {
2557 return unsafe { l2_distance_sq_neon(a, b) };
2561 }
2562 }
2563 l2_distance_sq_scalar(a, b)
2564}
2565
2566fn l2_distance_sq_scalar(a: &[f32], b: &[f32]) -> f32 {
2567 let mut sum: f32 = 0.0;
2568 for (x, y) in a.iter().zip(b.iter()) {
2569 let d = *x - *y;
2570 sum += d * d;
2571 }
2572 sum
2573}
2574
2575#[cfg(target_arch = "aarch64")]
2576#[target_feature(enable = "neon")]
2577#[allow(clippy::many_single_char_names)] unsafe fn l2_distance_sq_neon(a: &[f32], b: &[f32]) -> f32 {
2579 use core::arch::aarch64::{
2580 float32x4_t, vaddq_f32, vaddvq_f32, vdupq_n_f32, vfmaq_f32, vld1q_f32, vsubq_f32,
2581 };
2582 unsafe {
2583 let zero: float32x4_t = vdupq_n_f32(0.0);
2588 let mut acc0 = zero;
2589 let mut acc1 = zero;
2590 let n = a.len();
2591 let mut i = 0usize;
2592 while i + 8 <= n {
2595 let d0 = vsubq_f32(vld1q_f32(a.as_ptr().add(i)), vld1q_f32(b.as_ptr().add(i)));
2596 acc0 = vfmaq_f32(acc0, d0, d0);
2597 let d1 = vsubq_f32(
2598 vld1q_f32(a.as_ptr().add(i + 4)),
2599 vld1q_f32(b.as_ptr().add(i + 4)),
2600 );
2601 acc1 = vfmaq_f32(acc1, d1, d1);
2602 i += 8;
2603 }
2604 while i + 4 <= n {
2605 let d = vsubq_f32(vld1q_f32(a.as_ptr().add(i)), vld1q_f32(b.as_ptr().add(i)));
2606 acc0 = vfmaq_f32(acc0, d, d);
2607 i += 4;
2608 }
2609 vaddvq_f32(vaddq_f32(acc0, acc1))
2610 }
2611}
2612
2613pub fn nsw_query(
2616 table: &Table,
2617 idx_name: &str,
2618 query: &[f32],
2619 k: usize,
2620 metric: NswMetric,
2621) -> Vec<usize> {
2622 let Some(idx_pos) = table.indices.iter().position(|i| i.name == idx_name) else {
2623 return Vec::new();
2624 };
2625 let ef = (k * 2).max(NSW_DEFAULT_M);
2626 let mut hits = nsw_search(table, idx_pos, query, k, ef, metric);
2627 hits.truncate(k);
2628 hits.into_iter().map(|(_, idx)| idx).collect()
2629}
2630
2631pub fn nsw_index_on(table: &Table, column_position: usize) -> Option<&Index> {
2635 table
2636 .indices
2637 .iter()
2638 .find(|i| i.column_position == column_position && matches!(i.kind, IndexKind::Nsw(_)))
2639}
2640
2641#[derive(Debug, Clone, Default)]
2653pub struct Catalog {
2654 tables: Vec<Table>,
2655 by_name: BTreeMap<String, usize>,
2658 cold_segments: Vec<Option<Arc<OwnedSegment>>>,
2680}
2681
2682impl Catalog {
2683 pub const fn new() -> Self {
2684 Self {
2685 tables: Vec::new(),
2686 by_name: BTreeMap::new(),
2687 cold_segments: Vec::new(),
2688 }
2689 }
2690
2691 pub fn create_table(&mut self, schema: TableSchema) -> Result<(), StorageError> {
2692 if self.by_name.contains_key(&schema.name) {
2693 return Err(StorageError::DuplicateTable {
2694 name: schema.name.clone(),
2695 });
2696 }
2697 let idx = self.tables.len();
2698 let name = schema.name.clone();
2699 self.tables.push(Table::new(schema));
2700 self.by_name.insert(name, idx);
2701 Ok(())
2702 }
2703
2704 pub fn get(&self, name: &str) -> Option<&Table> {
2705 let idx = *self.by_name.get(name)?;
2706 self.tables.get(idx)
2707 }
2708
2709 pub fn get_mut(&mut self, name: &str) -> Option<&mut Table> {
2710 let idx = *self.by_name.get(name)?;
2711 self.tables.get_mut(idx)
2712 }
2713
2714 pub fn table_count(&self) -> usize {
2715 self.tables.len()
2716 }
2717
2718 pub fn table_names(&self) -> Vec<String> {
2721 self.tables.iter().map(|t| t.schema.name.clone()).collect()
2722 }
2723
2724 pub fn load_segment_bytes(&mut self, bytes: Vec<u8>) -> Result<u32, StorageError> {
2735 let id = u32::try_from(self.cold_segments.len()).map_err(|_| {
2736 StorageError::Corrupt("cold segment count would exceed u32::MAX".into())
2737 })?;
2738 let seg = OwnedSegment::from_bytes(bytes)
2739 .map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
2740 self.cold_segments.push(Some(Arc::new(seg)));
2741 Ok(id)
2742 }
2743
2744 pub fn load_segment_bytes_at(
2757 &mut self,
2758 target_id: u32,
2759 bytes: Vec<u8>,
2760 ) -> Result<(), StorageError> {
2761 let seg = OwnedSegment::from_bytes(bytes)
2762 .map_err(|e| StorageError::Corrupt(format!("cold segment parse failed: {e}")))?;
2763 let idx = target_id as usize;
2764 while self.cold_segments.len() <= idx {
2765 self.cold_segments.push(None);
2766 }
2767 if self.cold_segments[idx].is_some() {
2768 return Err(StorageError::Corrupt(format!(
2769 "load_segment_bytes_at: segment_id {target_id} already occupied"
2770 )));
2771 }
2772 self.cold_segments[idx] = Some(Arc::new(seg));
2773 Ok(())
2774 }
2775
2776 pub fn tombstone_segment(&mut self, segment_id: u32) -> Result<(), StorageError> {
2786 let idx = segment_id as usize;
2787 if idx >= self.cold_segments.len() {
2788 return Err(StorageError::Corrupt(format!(
2789 "tombstone_segment: segment_id {segment_id} out of bounds (len={})",
2790 self.cold_segments.len()
2791 )));
2792 }
2793 self.cold_segments[idx] = None;
2794 Ok(())
2795 }
2796
2797 #[must_use]
2799 pub fn cold_segment_count(&self) -> usize {
2800 self.cold_segments.iter().filter(|s| s.is_some()).count()
2801 }
2802
2803 #[must_use]
2806 pub fn cold_segment_slot_count(&self) -> usize {
2807 self.cold_segments.len()
2808 }
2809
2810 #[must_use]
2815 pub fn cold_segment_ids_global(&self) -> Vec<u32> {
2816 self.cold_segments
2817 .iter()
2818 .enumerate()
2819 .filter_map(|(i, s)| s.as_ref().map(|_| i as u32))
2820 .collect()
2821 }
2822
2823 #[must_use]
2830 pub fn hot_tier_bytes(&self) -> u64 {
2831 self.tables
2832 .iter()
2833 .map(Table::hot_bytes)
2834 .fold(0u64, u64::saturating_add)
2835 }
2836
2837 pub fn freeze_oldest_to_cold(
2882 &mut self,
2883 table_name: &str,
2884 index_name: &str,
2885 max_rows: usize,
2886 ) -> Result<FreezeReport, StorageError> {
2887 if max_rows == 0 {
2889 return Err(StorageError::Corrupt(
2890 "freeze_oldest_to_cold: max_rows must be > 0".into(),
2891 ));
2892 }
2893 let table = self.get(table_name).ok_or_else(|| {
2894 StorageError::Corrupt(format!(
2895 "freeze_oldest_to_cold: table {table_name:?} not found"
2896 ))
2897 })?;
2898 if max_rows > table.rows.len() {
2899 return Err(StorageError::Corrupt(format!(
2900 "freeze_oldest_to_cold: max_rows {max_rows} > row_count {}",
2901 table.rows.len()
2902 )));
2903 }
2904 let idx = table
2905 .indices
2906 .iter()
2907 .find(|i| i.name == index_name)
2908 .ok_or_else(|| {
2909 StorageError::Corrupt(format!(
2910 "freeze_oldest_to_cold: index {index_name:?} not found on {table_name:?}"
2911 ))
2912 })?;
2913 if !matches!(idx.kind, IndexKind::BTree(_)) {
2914 return Err(StorageError::Corrupt(format!(
2915 "freeze_oldest_to_cold: index {index_name:?} is NSW; only BTree indices may freeze"
2916 )));
2917 }
2918 let column_position = idx.column_position;
2919
2920 let schema = table.schema.clone();
2922 let mut to_freeze: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(max_rows);
2923 for row_idx in 0..max_rows {
2924 let row = table.rows.get(row_idx).expect("bounds-checked above");
2925 let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
2926 StorageError::Corrupt(format!(
2927 "freeze_oldest_to_cold: row {row_idx} has NULL / non-key value in index column"
2928 ))
2929 })?;
2930 let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
2931 StorageError::Corrupt(format!(
2932 "freeze_oldest_to_cold: index {index_name:?} column type is non-integer; \
2933 v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
2934 ))
2935 })?;
2936 to_freeze.push((pk_u64, encode_row_body_dense(row, &schema), key));
2937 }
2938 to_freeze.sort_by_key(|(k, _, _)| *k);
2943 for w in to_freeze.windows(2) {
2947 if w[0].0 == w[1].0 {
2948 return Err(StorageError::Corrupt(format!(
2949 "freeze_oldest_to_cold: duplicate PK {} in freeze batch",
2950 w[0].0
2951 )));
2952 }
2953 }
2954 let post_swap_keys: Vec<IndexKey> = to_freeze.iter().map(|(_, _, k)| k.clone()).collect();
2958 let seg_rows: Vec<(u64, Vec<u8>)> = to_freeze
2962 .into_iter()
2963 .map(|(k, body, _)| (k, body))
2964 .collect();
2965 let frozen_rows = seg_rows.len();
2966 let (seg_bytes, _meta) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES)
2967 .map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: encode: {e}")))?;
2968
2969 let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
2978 let positions: Vec<usize> = (0..max_rows).collect();
2979 let t_mut = self
2980 .get_mut(table_name)
2981 .expect("just validated; still present");
2982 let removed = t_mut.delete_rows(&positions);
2983 debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
2984 let bytes_after = t_mut.hot_bytes();
2985 let bytes_freed = bytes_before.saturating_sub(bytes_after);
2986
2987 let segment_id = self
2988 .load_segment_bytes(seg_bytes.clone())
2989 .map_err(|e| StorageError::Corrupt(format!("freeze_oldest_to_cold: load: {e}")))?;
2990 let new_cold = post_swap_keys.into_iter().map(|k| {
2991 (
2992 k,
2993 RowLocator::Cold {
2994 segment_id,
2995 page_offset: 0,
2996 },
2997 )
2998 });
2999 let t_mut = self.get_mut(table_name).expect("still present");
3000 t_mut.register_cold_locators(index_name, new_cold)?;
3001
3002 Ok(FreezeReport {
3003 segment_id,
3004 frozen_rows,
3005 bytes_freed,
3006 segment_bytes: seg_bytes,
3007 })
3008 }
3009
3010 #[must_use]
3016 pub fn cold_segment(&self, segment_id: u32) -> Option<&OwnedSegment> {
3017 self.cold_segments
3018 .get(segment_id as usize)
3019 .and_then(|s| s.as_deref())
3020 }
3021
3022 pub fn resolve_cold_locator(
3031 &self,
3032 table_name: &str,
3033 segment_id: u32,
3034 key: &IndexKey,
3035 ) -> Option<Row> {
3036 let t = self.get(table_name)?;
3037 let u64_key = index_key_as_u64(key)?;
3038 let seg = self.cold_segments.get(segment_id as usize)?.as_ref()?;
3039 let payload = seg.lookup(u64_key)?;
3040 let (row, _) = decode_row_body_dense(&payload, &t.schema).ok()?;
3041 Some(row)
3042 }
3043
3044 pub fn lookup_by_pk(&self, table: &str, index_name: &str, key: &IndexKey) -> Option<Row> {
3062 let t = self.get(table)?;
3063 let idx = t.indices.iter().find(|i| i.name == index_name)?;
3064 let locators = idx.lookup_eq(key);
3065 let cold_u64_key = index_key_as_u64(key);
3066 for loc in locators {
3067 match *loc {
3068 RowLocator::Hot(i) => {
3069 if let Some(row) = t.rows.get(i) {
3070 return Some(row.clone());
3071 }
3072 }
3073 RowLocator::Cold {
3074 segment_id,
3075 page_offset: _,
3076 } => {
3077 let Some(u64_key) = cold_u64_key else {
3078 continue;
3081 };
3082 let Some(seg) = self
3083 .cold_segments
3084 .get(segment_id as usize)
3085 .and_then(|s| s.as_deref())
3086 else {
3087 continue;
3098 };
3099 let Some(payload) = seg.lookup(u64_key) else {
3100 continue;
3101 };
3102 let (row, _) = decode_row_body_dense(&payload, &t.schema).ok()?;
3103 return Some(row);
3104 }
3105 }
3106 }
3107 None
3108 }
3109
3110 pub fn promote_cold_row(
3132 &mut self,
3133 table_name: &str,
3134 index_name: &str,
3135 key: &IndexKey,
3136 ) -> Result<Option<usize>, StorageError> {
3137 let cold_loc = self.find_cold_locator(table_name, index_name, key)?;
3138 let Some((segment_id, _page_offset)) = cold_loc else {
3139 return Ok(None);
3140 };
3141 let u64_key = index_key_as_u64(key).ok_or_else(|| {
3142 StorageError::Corrupt(
3143 "promote_cold_row: key type not coercible to u64 (cold tier requires integer PK)"
3144 .into(),
3145 )
3146 })?;
3147 let schema = self
3151 .get(table_name)
3152 .ok_or_else(|| {
3153 StorageError::Corrupt(format!("promote_cold_row: table {table_name:?} not found"))
3154 })?
3155 .schema
3156 .clone();
3157 let seg = self
3158 .cold_segments
3159 .get(segment_id as usize)
3160 .and_then(|s| s.as_ref())
3161 .ok_or_else(|| {
3162 StorageError::Corrupt(format!(
3163 "promote_cold_row: segment {segment_id} not registered on catalog"
3164 ))
3165 })?;
3166 let payload = seg.lookup(u64_key).ok_or_else(|| {
3167 StorageError::Corrupt(format!(
3168 "promote_cold_row: key {u64_key} resolves to segment {segment_id} \
3169 but the segment's bloom/page lookup didn't return a row"
3170 ))
3171 })?;
3172 let (row, _consumed) = decode_row_body_dense(&payload, &schema)?;
3173 let t = self
3178 .get_mut(table_name)
3179 .expect("table existed at lookup time");
3180 t.insert(row)?;
3181 let new_hot_idx =
3182 t.rows.len().checked_sub(1).ok_or_else(|| {
3183 StorageError::Corrupt("promote_cold_row: empty after insert".into())
3184 })?;
3185 t.remove_cold_locators_for_key(index_name, key)?;
3189 Ok(Some(new_hot_idx))
3190 }
3191
3192 pub fn shadow_cold_row(
3210 &mut self,
3211 table_name: &str,
3212 index_name: &str,
3213 key: &IndexKey,
3214 ) -> Result<usize, StorageError> {
3215 let t = self.get_mut(table_name).ok_or_else(|| {
3216 StorageError::Corrupt(format!("shadow_cold_row: table {table_name:?} not found"))
3217 })?;
3218 t.remove_cold_locators_for_key(index_name, key)
3219 }
3220
3221 pub fn prepare_freeze_slice(
3239 &self,
3240 table_name: &str,
3241 index_name: &str,
3242 row_range: core::ops::Range<usize>,
3243 ) -> Result<FreezeSlice, StorageError> {
3244 let table = self.get(table_name).ok_or_else(|| {
3245 StorageError::Corrupt(format!(
3246 "prepare_freeze_slice: table {table_name:?} not found"
3247 ))
3248 })?;
3249 let idx = table
3250 .indices
3251 .iter()
3252 .find(|i| i.name == index_name)
3253 .ok_or_else(|| {
3254 StorageError::Corrupt(format!(
3255 "prepare_freeze_slice: index {index_name:?} not found on {table_name:?}"
3256 ))
3257 })?;
3258 if !matches!(idx.kind, IndexKind::BTree(_)) {
3259 return Err(StorageError::Corrupt(format!(
3260 "prepare_freeze_slice: index {index_name:?} is NSW; only BTree indices may freeze"
3261 )));
3262 }
3263 if row_range.end > table.rows.len() {
3264 return Err(StorageError::Corrupt(format!(
3265 "prepare_freeze_slice: row_range end {} > row_count {}",
3266 row_range.end,
3267 table.rows.len()
3268 )));
3269 }
3270 let column_position = idx.column_position;
3271 let schema = table.schema.clone();
3272 let mut rows: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(row_range.len());
3273 for row_idx in row_range.clone() {
3274 let row = table.rows.get(row_idx).expect("bounds-checked above");
3275 let key = IndexKey::from_value(&row.values[column_position]).ok_or_else(|| {
3276 StorageError::Corrupt(format!(
3277 "prepare_freeze_slice: row {row_idx} has NULL / non-key value in index column"
3278 ))
3279 })?;
3280 let pk_u64 = index_key_as_u64(&key).ok_or_else(|| {
3281 StorageError::Corrupt(format!(
3282 "prepare_freeze_slice: index {index_name:?} column type is non-integer; \
3283 v5.2.2 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
3284 ))
3285 })?;
3286 rows.push((pk_u64, encode_row_body_dense(row, &schema), key));
3287 }
3288 rows.sort_by_key(|(k, _, _)| *k);
3289 Ok(FreezeSlice { row_range, rows })
3290 }
3291
3292 pub fn commit_freeze_slices(
3306 &mut self,
3307 table_name: &str,
3308 index_name: &str,
3309 slices: Vec<FreezeSlice>,
3310 ) -> Result<FreezeReport, StorageError> {
3311 let table = self.get(table_name).ok_or_else(|| {
3313 StorageError::Corrupt(format!(
3314 "commit_freeze_slices: table {table_name:?} not found"
3315 ))
3316 })?;
3317 let idx = table
3318 .indices
3319 .iter()
3320 .find(|i| i.name == index_name)
3321 .ok_or_else(|| {
3322 StorageError::Corrupt(format!(
3323 "commit_freeze_slices: index {index_name:?} not found on {table_name:?}"
3324 ))
3325 })?;
3326 if !matches!(idx.kind, IndexKind::BTree(_)) {
3327 return Err(StorageError::Corrupt(format!(
3328 "commit_freeze_slices: index {index_name:?} is NSW; only BTree indices may freeze"
3329 )));
3330 }
3331 let mut ordered = slices;
3335 ordered.sort_by_key(|s| s.row_range.start);
3336 let mut expected_start = 0usize;
3340 for s in &ordered {
3341 if s.row_range.start != expected_start {
3342 return Err(StorageError::Corrupt(format!(
3343 "commit_freeze_slices: gap/overlap at row {}; expected start {}",
3344 s.row_range.start, expected_start
3345 )));
3346 }
3347 expected_start = s.row_range.end;
3348 }
3349 let max_rows = expected_start;
3350 if max_rows > table.rows.len() {
3351 return Err(StorageError::Corrupt(format!(
3352 "commit_freeze_slices: total row range {} exceeds row_count {}",
3353 max_rows,
3354 table.rows.len()
3355 )));
3356 }
3357 if max_rows == 0 {
3358 return Ok(FreezeReport {
3359 segment_id: u32::MAX,
3360 frozen_rows: 0,
3361 bytes_freed: 0,
3362 segment_bytes: Vec::new(),
3363 });
3364 }
3365
3366 let total_rows: usize = ordered.iter().map(|s| s.rows.len()).sum();
3371 if total_rows != max_rows {
3372 return Err(StorageError::Corrupt(format!(
3373 "commit_freeze_slices: total slice rows {total_rows} ≠ row_range coverage {max_rows}"
3374 )));
3375 }
3376 let mut cursors: Vec<usize> = alloc::vec![0; ordered.len()];
3377 let mut merged: Vec<(u64, Vec<u8>, IndexKey)> = Vec::with_capacity(total_rows);
3378 loop {
3379 let mut pick: Option<usize> = None;
3382 for (i, c) in cursors.iter().enumerate() {
3383 let slice = &ordered[i];
3384 if *c >= slice.rows.len() {
3385 continue;
3386 }
3387 match pick {
3388 None => pick = Some(i),
3389 Some(j) => {
3390 if slice.rows[*c].0 < ordered[j].rows[cursors[j]].0 {
3391 pick = Some(i);
3392 }
3393 }
3394 }
3395 }
3396 let Some(i) = pick else { break };
3397 let row = ordered[i].rows[cursors[i]].clone();
3398 cursors[i] += 1;
3399 merged.push(row);
3400 }
3401 for w in merged.windows(2) {
3404 if w[0].0 == w[1].0 {
3405 return Err(StorageError::Corrupt(format!(
3406 "commit_freeze_slices: duplicate PK {} across slices",
3407 w[0].0
3408 )));
3409 }
3410 }
3411 let post_swap_keys: Vec<IndexKey> = merged.iter().map(|(_, _, k)| k.clone()).collect();
3412 let seg_rows: Vec<(u64, Vec<u8>)> = merged
3413 .into_iter()
3414 .map(|(k, body, _)| (k, body))
3415 .collect();
3416 let frozen_rows = seg_rows.len();
3417 let (seg_bytes, _meta) =
3418 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).map_err(|e| {
3419 StorageError::Corrupt(format!("commit_freeze_slices: encode: {e}"))
3420 })?;
3421
3422 let bytes_before = self.get(table_name).expect("just validated").hot_bytes();
3424 let positions: Vec<usize> = (0..max_rows).collect();
3425 let t_mut = self
3426 .get_mut(table_name)
3427 .expect("just validated; still present");
3428 let removed = t_mut.delete_rows(&positions);
3429 debug_assert_eq!(removed, max_rows, "delete_rows count matches request");
3430 let bytes_after = t_mut.hot_bytes();
3431 let bytes_freed = bytes_before.saturating_sub(bytes_after);
3432
3433 let segment_id = self
3434 .load_segment_bytes(seg_bytes.clone())
3435 .map_err(|e| StorageError::Corrupt(format!("commit_freeze_slices: load: {e}")))?;
3436 let new_cold = post_swap_keys.into_iter().map(|k| {
3437 (
3438 k,
3439 RowLocator::Cold {
3440 segment_id,
3441 page_offset: 0,
3442 },
3443 )
3444 });
3445 let t_mut = self.get_mut(table_name).expect("still present");
3446 t_mut.register_cold_locators(index_name, new_cold)?;
3447
3448 Ok(FreezeReport {
3449 segment_id,
3450 frozen_rows,
3451 bytes_freed,
3452 segment_bytes: seg_bytes,
3453 })
3454 }
3455
3456 pub fn compact_cold_segments(
3499 &mut self,
3500 table_name: &str,
3501 index_name: &str,
3502 target_segment_bytes: u64,
3503 ) -> Result<CompactReport, StorageError> {
3504 let t = self.get(table_name).ok_or_else(|| {
3506 StorageError::Corrupt(format!(
3507 "compact_cold_segments: table {table_name:?} not found"
3508 ))
3509 })?;
3510 let idx = t
3511 .indices
3512 .iter()
3513 .find(|i| i.name == index_name)
3514 .ok_or_else(|| {
3515 StorageError::Corrupt(format!(
3516 "compact_cold_segments: index {index_name:?} not found on {table_name:?}"
3517 ))
3518 })?;
3519 let map = match &idx.kind {
3520 IndexKind::BTree(m) => m,
3521 IndexKind::Nsw(_) | IndexKind::Brin { .. } => {
3522 return Err(StorageError::Corrupt(format!(
3523 "compact_cold_segments: index {index_name:?} is not BTree; \
3524 compaction applies only to BTree cold-tier indices"
3525 )));
3526 }
3527 };
3528
3529 let mut referenced_ids: BTreeSet<u32> = BTreeSet::new();
3532 for (_key, locators) in map.iter() {
3533 for loc in locators {
3534 if let RowLocator::Cold { segment_id, .. } = loc {
3535 referenced_ids.insert(*segment_id);
3536 }
3537 }
3538 }
3539 let candidate_set: BTreeSet<u32> = referenced_ids
3541 .into_iter()
3542 .filter(|id| {
3543 self.cold_segments
3544 .get(*id as usize)
3545 .and_then(|s| s.as_deref())
3546 .is_some_and(|s| (s.bytes().len() as u64) < target_segment_bytes)
3547 })
3548 .collect();
3549 if candidate_set.len() < 2 {
3550 return Ok(CompactReport {
3551 sources: Vec::new(),
3552 merged_segment_id: None,
3553 merged_segment_bytes: Vec::new(),
3554 merged_rows: 0,
3555 deleted_rows_pruned: 0,
3556 bytes_reclaimed_estimate: 0,
3557 });
3558 }
3559 let mut source_row_count: usize = 0;
3561 let mut source_byte_total: u64 = 0;
3562 for &id in &candidate_set {
3563 let seg = self.cold_segments[id as usize]
3564 .as_ref()
3565 .expect("candidate selected only when slot is Some");
3566 source_row_count = source_row_count.saturating_add(seg.meta().num_rows as usize);
3567 source_byte_total =
3568 source_byte_total.saturating_add(seg.bytes().len() as u64);
3569 }
3570 let mut collected: BTreeMap<u64, (Vec<u8>, IndexKey)> = BTreeMap::new();
3576 for (key, locators) in map.iter() {
3577 for loc in locators {
3578 let RowLocator::Cold { segment_id, .. } = loc else {
3579 continue;
3580 };
3581 if !candidate_set.contains(segment_id) {
3582 continue;
3583 }
3584 let u64_key = index_key_as_u64(key).ok_or_else(|| {
3585 StorageError::Corrupt(format!(
3586 "compact_cold_segments: index {index_name:?} has non-integer Cold key; \
3587 cold tier requires IndexKey::Int (Text PK lands in v5.5+)"
3588 ))
3589 })?;
3590 let seg = self.cold_segments[*segment_id as usize]
3591 .as_ref()
3592 .expect("candidate slot guaranteed Some above");
3593 let payload = seg.lookup(u64_key).ok_or_else(|| {
3594 StorageError::Corrupt(format!(
3595 "compact_cold_segments: BTree {index_name:?} points key={u64_key} \
3596 at segment {segment_id} but the segment lookup missed"
3597 ))
3598 })?;
3599 collected.insert(u64_key, (payload, key.clone()));
3600 break;
3601 }
3602 }
3603 let merged_rows = collected.len();
3604 let deleted_rows_pruned = source_row_count.saturating_sub(merged_rows);
3605
3606 let seg_rows: Vec<(u64, Vec<u8>)> = collected
3610 .iter()
3611 .map(|(k, (body, _))| (*k, body.clone()))
3612 .collect();
3613 let (seg_bytes, _meta) =
3614 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).map_err(|e| {
3615 StorageError::Corrupt(format!("compact_cold_segments: encode: {e}"))
3616 })?;
3617 let merged_bytes_len = seg_bytes.len() as u64;
3618
3619 let merged_segment_id = self
3621 .load_segment_bytes(seg_bytes.clone())
3622 .map_err(|e| StorageError::Corrupt(format!("compact_cold_segments: load: {e}")))?;
3623
3624 let entries: Vec<(IndexKey, Vec<RowLocator>)> = {
3630 let t = self
3631 .get(table_name)
3632 .expect("table existed at the start of this fn");
3633 let idx = t
3634 .indices
3635 .iter()
3636 .find(|i| i.name == index_name)
3637 .expect("index existed at the start of this fn");
3638 let IndexKind::BTree(map) = &idx.kind else {
3639 unreachable!("validated above");
3640 };
3641 map.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
3642 };
3643 let t_mut = self
3644 .get_mut(table_name)
3645 .expect("table existed at the start of this fn");
3646 let idx_mut = t_mut
3647 .indices
3648 .iter_mut()
3649 .find(|i| i.name == index_name)
3650 .expect("index existed at the start of this fn");
3651 let IndexKind::BTree(map_mut) = &mut idx_mut.kind else {
3652 unreachable!("validated above");
3653 };
3654 for (key, locators) in entries {
3655 let mut new_locs: Vec<RowLocator> = Vec::with_capacity(locators.len());
3656 let mut changed = false;
3657 for loc in &locators {
3658 match *loc {
3659 RowLocator::Cold {
3660 segment_id,
3661 page_offset: _,
3662 } if candidate_set.contains(&segment_id) => {
3663 let replacement = RowLocator::Cold {
3664 segment_id: merged_segment_id,
3665 page_offset: 0,
3666 };
3667 if !new_locs.contains(&replacement) {
3668 new_locs.push(replacement);
3669 }
3670 changed = true;
3671 }
3672 other => new_locs.push(other),
3673 }
3674 }
3675 if changed {
3676 map_mut.insert_mut(key, new_locs);
3677 }
3678 }
3679
3680 for &id in &candidate_set {
3685 self.tombstone_segment(id)?;
3686 }
3687
3688 let bytes_reclaimed_estimate = source_byte_total.saturating_sub(merged_bytes_len);
3689 Ok(CompactReport {
3690 sources: candidate_set.into_iter().collect(),
3691 merged_segment_id: Some(merged_segment_id),
3692 merged_segment_bytes: seg_bytes,
3693 merged_rows,
3694 deleted_rows_pruned,
3695 bytes_reclaimed_estimate,
3696 })
3697 }
3698
3699 fn find_cold_locator(
3705 &self,
3706 table_name: &str,
3707 index_name: &str,
3708 key: &IndexKey,
3709 ) -> Result<Option<(u32, u32)>, StorageError> {
3710 let t = self.get(table_name).ok_or_else(|| {
3711 StorageError::Corrupt(format!("find_cold_locator: table {table_name:?} not found"))
3712 })?;
3713 let idx = t
3714 .indices
3715 .iter()
3716 .find(|i| i.name == index_name)
3717 .ok_or_else(|| {
3718 StorageError::Corrupt(format!(
3719 "find_cold_locator: index {index_name:?} not found on {table_name:?}"
3720 ))
3721 })?;
3722 if !matches!(idx.kind, IndexKind::BTree(_)) {
3723 return Err(StorageError::Corrupt(format!(
3724 "find_cold_locator: index {index_name:?} is NSW; promote-on-write only applies to BTree indices"
3725 )));
3726 }
3727 for loc in idx.lookup_eq(key) {
3728 if let RowLocator::Cold {
3729 segment_id,
3730 page_offset,
3731 } = *loc
3732 {
3733 return Ok(Some((segment_id, page_offset)));
3734 }
3735 }
3736 Ok(None)
3737 }
3738}
3739
3740fn index_key_as_u64(key: &IndexKey) -> Option<u64> {
3746 match key {
3747 IndexKey::Int(n) => Some(n.cast_unsigned()),
3753 IndexKey::Text(_) | IndexKey::Bool(_) => None,
3754 }
3755}
3756
3757#[derive(Debug, Clone, PartialEq, Eq)]
3758#[non_exhaustive]
3759pub enum StorageError {
3760 DuplicateTable {
3761 name: String,
3762 },
3763 TableNotFound {
3764 name: String,
3765 },
3766 ArityMismatch {
3767 expected: usize,
3768 actual: usize,
3769 },
3770 TypeMismatch {
3771 column: String,
3772 expected: DataType,
3773 actual: DataType,
3774 position: usize,
3775 },
3776 NullInNotNull {
3777 column: String,
3778 },
3779 DuplicateIndex {
3781 name: String,
3782 },
3783 ColumnNotFound {
3785 column: String,
3786 },
3787 Corrupt(String),
3790 IndexNotFound {
3793 name: String,
3794 },
3795 Unsupported(String),
3799}
3800
3801impl fmt::Display for StorageError {
3802 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3803 match self {
3804 Self::DuplicateTable { name } => write!(f, "table already exists: {name}"),
3805 Self::TableNotFound { name } => write!(f, "table not found: {name}"),
3806 Self::ArityMismatch { expected, actual } => write!(
3807 f,
3808 "row arity mismatch: expected {expected} columns, got {actual}"
3809 ),
3810 Self::TypeMismatch {
3811 column,
3812 expected,
3813 actual,
3814 position,
3815 } => write!(
3816 f,
3817 "type mismatch in column {column:?} (position {position}): expected {expected}, got {actual}"
3818 ),
3819 Self::NullInNotNull { column } => {
3820 write!(f, "NULL value in NOT NULL column {column:?}")
3821 }
3822 Self::DuplicateIndex { name } => write!(f, "index already exists: {name}"),
3823 Self::ColumnNotFound { column } => write!(f, "column not found: {column}"),
3824 Self::Corrupt(detail) => write!(f, "corrupt on-disk format: {detail}"),
3825 Self::IndexNotFound { name } => write!(f, "index not found: {name}"),
3826 Self::Unsupported(detail) => write!(f, "unsupported: {detail}"),
3827 }
3828 }
3829}
3830
3831impl ColumnSchema {
3832 pub fn new(name: impl Into<String>, ty: DataType, nullable: bool) -> Self {
3833 Self {
3834 name: name.into(),
3835 ty,
3836 nullable,
3837 default: None,
3838 runtime_default: None,
3839 auto_increment: false,
3840 }
3841 }
3842
3843 #[must_use]
3847 pub fn with_default(mut self, default: Value) -> Self {
3848 self.default = Some(default);
3849 self
3850 }
3851
3852 #[must_use]
3857 pub fn with_runtime_default(mut self, expr: impl Into<String>) -> Self {
3858 self.runtime_default = Some(expr.into());
3859 self
3860 }
3861
3862 #[must_use]
3864 pub const fn with_auto_increment(mut self) -> Self {
3865 self.auto_increment = true;
3866 self
3867 }
3868}
3869
3870impl TableSchema {
3871 pub fn new(name: impl Into<String>, columns: Vec<ColumnSchema>) -> Self {
3872 Self {
3873 name: name.into(),
3874 columns,
3875 hot_tier_bytes: None,
3876 foreign_keys: Vec::new(),
3877 uniqueness_constraints: Vec::new(),
3878 }
3879 }
3880}
3881
3882const FILE_MAGIC: &[u8; 8] = b"SPGDB001";
3930const FILE_VERSION: u8 = 18;
3956const MIN_SUPPORTED_FILE_VERSION: u8 = 8;
3959
3960const INDEX_KEY_TAG_INT: u8 = 0;
3965const INDEX_KEY_TAG_TEXT: u8 = 1;
3966const INDEX_KEY_TAG_BOOL: u8 = 2;
3967
3968impl Catalog {
3969 pub fn serialize(&self) -> Vec<u8> {
3972 let mut out = Vec::with_capacity(64);
3973 out.extend_from_slice(FILE_MAGIC);
3974 out.push(FILE_VERSION);
3975 write_u32(
3976 &mut out,
3977 u32::try_from(self.tables.len()).expect("≤ 4G tables"),
3978 );
3979 for t in &self.tables {
3980 write_str(&mut out, &t.schema.name);
3981 write_u16(
3982 &mut out,
3983 u16::try_from(t.schema.columns.len()).expect("≤ 65k columns/table"),
3984 );
3985 for c in &t.schema.columns {
3986 write_str(&mut out, &c.name);
3987 write_data_type(&mut out, c.ty);
3988 out.push(u8::from(c.nullable));
3989 match &c.default {
3990 None => out.push(0),
3991 Some(v) => {
3992 out.push(1);
3993 write_value(&mut out, v);
3994 }
3995 }
3996 out.push(u8::from(c.auto_increment));
3997 }
3998 write_u32(
3999 &mut out,
4000 u32::try_from(t.rows.len()).expect("≤ 4G rows/table"),
4001 );
4002 for row in &t.rows {
4007 out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
4008 }
4009 write_u16(
4016 &mut out,
4017 u16::try_from(t.indices.len()).expect("≤ 65k indices/table"),
4018 );
4019 for idx in &t.indices {
4020 write_str(&mut out, &idx.name);
4021 write_u16(
4022 &mut out,
4023 u16::try_from(idx.column_position).expect("≤ 65k columns/table"),
4024 );
4025 match &idx.kind {
4026 IndexKind::BTree(map) => {
4027 out.push(0);
4028 write_u32(
4036 &mut out,
4037 u32::try_from(map.len()).expect("≤ 4G index entries/index"),
4038 );
4039 for (key, locators) in map {
4040 write_index_key(&mut out, key);
4041 write_u32(
4042 &mut out,
4043 u32::try_from(locators.len()).expect("≤ 4G locators/key"),
4044 );
4045 for loc in locators {
4046 loc.write_le(&mut out);
4047 }
4048 }
4049 }
4050 IndexKind::Nsw(g) => {
4051 out.push(1);
4052 write_u16(&mut out, u16::try_from(g.m).expect("≤ 65k NSW neighbours"));
4053 write_nsw_graph(&mut out, g);
4054 }
4055 IndexKind::Brin { column_type } => {
4056 out.push(2);
4062 write_data_type(&mut out, *column_type);
4063 }
4064 }
4065 write_u16(
4071 &mut out,
4072 u16::try_from(idx.included_columns.len())
4073 .expect("≤ 65k INCLUDE columns/index"),
4074 );
4075 for col_pos in &idx.included_columns {
4076 write_u16(
4077 &mut out,
4078 u16::try_from(*col_pos).expect("≤ 65k columns/table"),
4079 );
4080 }
4081 match &idx.partial_predicate {
4085 None => out.push(0),
4086 Some(pred) => {
4087 out.push(1);
4088 write_str(&mut out, pred);
4089 }
4090 }
4091 match &idx.expression {
4094 None => out.push(0),
4095 Some(expr) => {
4096 out.push(1);
4097 write_str(&mut out, expr);
4098 }
4099 }
4100 out.push(u8::from(idx.is_unique));
4104 write_u16(
4107 &mut out,
4108 u16::try_from(idx.extra_column_positions.len())
4109 .expect("≤ 65k extra cols / index"),
4110 );
4111 for cp in &idx.extra_column_positions {
4112 write_u16(
4113 &mut out,
4114 u16::try_from(*cp).expect("≤ 65k columns/table"),
4115 );
4116 }
4117 }
4118 match t.schema.hot_tier_bytes {
4124 None => out.push(0),
4125 Some(n) => {
4126 out.push(1);
4127 out.extend_from_slice(&n.to_le_bytes());
4128 }
4129 }
4130 write_u16(
4141 &mut out,
4142 u16::try_from(t.schema.foreign_keys.len()).expect("≤ 65k FKs/table"),
4143 );
4144 for fk in &t.schema.foreign_keys {
4145 match &fk.name {
4146 None => out.push(0),
4147 Some(n) => {
4148 out.push(1);
4149 write_str(&mut out, n);
4150 }
4151 }
4152 write_u16(
4153 &mut out,
4154 u16::try_from(fk.local_columns.len()).expect("≤ 65k FK columns"),
4155 );
4156 for &p in &fk.local_columns {
4157 write_u16(
4158 &mut out,
4159 u16::try_from(p).expect("≤ 65k columns/table"),
4160 );
4161 }
4162 write_str(&mut out, &fk.parent_table);
4163 write_u16(
4164 &mut out,
4165 u16::try_from(fk.parent_columns.len()).expect("≤ 65k FK parent columns"),
4166 );
4167 for &p in &fk.parent_columns {
4168 write_u16(
4169 &mut out,
4170 u16::try_from(p).expect("≤ 65k columns/table"),
4171 );
4172 }
4173 out.push(fk.on_delete.tag());
4174 out.push(fk.on_update.tag());
4175 }
4176 write_u16(
4185 &mut out,
4186 u16::try_from(t.schema.uniqueness_constraints.len())
4187 .expect("≤ 65k uniqueness constraints/table"),
4188 );
4189 for uc in &t.schema.uniqueness_constraints {
4190 out.push(u8::from(uc.is_primary_key));
4191 write_u16(
4192 &mut out,
4193 u16::try_from(uc.columns.len())
4194 .expect("≤ 65k cols in uniqueness constraint"),
4195 );
4196 for &p in &uc.columns {
4197 write_u16(
4198 &mut out,
4199 u16::try_from(p).expect("≤ 65k columns/table"),
4200 );
4201 }
4202 }
4203 let mut rt_defaults: Vec<(usize, &str)> = Vec::new();
4210 for (i, c) in t.schema.columns.iter().enumerate() {
4211 if let Some(e) = &c.runtime_default {
4212 rt_defaults.push((i, e.as_str()));
4213 }
4214 }
4215 write_u16(
4216 &mut out,
4217 u16::try_from(rt_defaults.len())
4218 .expect("≤ 65k runtime defaults/table"),
4219 );
4220 for (pos, expr) in rt_defaults {
4221 write_u16(
4222 &mut out,
4223 u16::try_from(pos).expect("≤ 65k columns/table"),
4224 );
4225 write_str(&mut out, expr);
4226 }
4227 }
4228 out
4229 }
4230
4231 pub fn deserialize(buf: &[u8]) -> Result<Self, StorageError> {
4234 let mut cur = Cursor::new(buf);
4235 let magic = cur.take(8)?;
4236 if magic != FILE_MAGIC {
4237 return Err(StorageError::Corrupt(format!(
4238 "bad magic: expected SPGDB001, got {magic:?}"
4239 )));
4240 }
4241 let version = cur.read_u8()?;
4242 if !(MIN_SUPPORTED_FILE_VERSION..=FILE_VERSION).contains(&version) {
4243 return Err(StorageError::Corrupt(format!(
4244 "unsupported file version: {version} (supported: {MIN_SUPPORTED_FILE_VERSION}..={FILE_VERSION})"
4245 )));
4246 }
4247 let table_count = cur.read_u32()? as usize;
4248 let mut cat = Self::new();
4249 for _ in 0..table_count {
4250 deserialize_table(&mut cur, &mut cat, version)?;
4251 }
4252 if cur.pos < buf.len() {
4253 return Err(StorageError::Corrupt(format!(
4254 "trailing bytes: {} unread",
4255 buf.len() - cur.pos
4256 )));
4257 }
4258 Ok(cat)
4259 }
4260}
4261
4262fn deserialize_table(
4267 cur: &mut Cursor<'_>,
4268 cat: &mut Catalog,
4269 version: u8,
4270) -> Result<(), StorageError> {
4271 let table_name = cur.read_str()?;
4272 let name = table_name.clone();
4273 let col_count = cur.read_u16()? as usize;
4274 let mut cols = Vec::with_capacity(col_count);
4275 for _ in 0..col_count {
4276 let c_name = cur.read_str()?;
4277 let ty = cur.read_data_type()?;
4278 let nullable = cur.read_u8()? != 0;
4279 let default = match cur.read_u8()? {
4280 0 => None,
4281 1 => Some(cur.read_value()?),
4282 other => {
4283 return Err(StorageError::Corrupt(format!(
4284 "unknown default tag: {other}"
4285 )));
4286 }
4287 };
4288 let auto_increment = cur.read_u8()? != 0;
4289 cols.push(ColumnSchema {
4293 name: c_name,
4294 ty,
4295 nullable,
4296 default,
4297 runtime_default: None,
4298 auto_increment,
4299 });
4300 }
4301 let n_cols = cols.len();
4302 cat.create_table(TableSchema::new(name, cols))?;
4303 let t = cat.tables.last_mut().expect("create_table just pushed");
4307 deserialize_rows(cur, t, n_cols)?;
4308 deserialize_indices(cur, t, version)?;
4309 if version >= 11 {
4315 let has = cur.read_u8()?;
4316 let hot_tier_bytes = match has {
4317 0 => None,
4318 1 => Some(cur.read_u64()?),
4319 other => {
4320 return Err(StorageError::Corrupt(format!(
4321 "hot_tier_bytes appendix: unknown has-value byte {other}"
4322 )));
4323 }
4324 };
4325 t.schema_mut().hot_tier_bytes = hot_tier_bytes;
4326 }
4327 if version >= 13 {
4330 let fk_count = cur.read_u16()? as usize;
4331 let mut fks = Vec::with_capacity(fk_count);
4332 for _ in 0..fk_count {
4333 let name = match cur.read_u8()? {
4334 0 => None,
4335 1 => Some(cur.read_str()?),
4336 other => {
4337 return Err(StorageError::Corrupt(format!(
4338 "FK appendix: unknown has-name byte {other}"
4339 )));
4340 }
4341 };
4342 let local_arity = cur.read_u16()? as usize;
4343 let mut local_columns = Vec::with_capacity(local_arity);
4344 for _ in 0..local_arity {
4345 local_columns.push(cur.read_u16()? as usize);
4346 }
4347 let parent_table = cur.read_str()?;
4348 let parent_arity = cur.read_u16()? as usize;
4349 if parent_arity != local_arity {
4350 return Err(StorageError::Corrupt(format!(
4351 "FK arity mismatch in catalog: local {local_arity} vs parent {parent_arity}"
4352 )));
4353 }
4354 let mut parent_columns = Vec::with_capacity(parent_arity);
4355 for _ in 0..parent_arity {
4356 parent_columns.push(cur.read_u16()? as usize);
4357 }
4358 let on_delete = FkAction::from_tag(cur.read_u8()?).ok_or_else(|| {
4359 StorageError::Corrupt("FK appendix: unknown on_delete tag".into())
4360 })?;
4361 let on_update = FkAction::from_tag(cur.read_u8()?).ok_or_else(|| {
4362 StorageError::Corrupt("FK appendix: unknown on_update tag".into())
4363 })?;
4364 fks.push(ForeignKeyConstraint {
4365 name,
4366 local_columns,
4367 parent_table,
4368 parent_columns,
4369 on_delete,
4370 on_update,
4371 });
4372 }
4373 t.schema_mut().foreign_keys = fks;
4374 }
4375 if version >= 15 {
4378 let uc_count = cur.read_u16()? as usize;
4379 let mut ucs = Vec::with_capacity(uc_count);
4380 for _ in 0..uc_count {
4381 let is_pk = cur.read_u8()? != 0;
4382 let arity = cur.read_u16()? as usize;
4383 let mut cols = Vec::with_capacity(arity);
4384 for _ in 0..arity {
4385 cols.push(cur.read_u16()? as usize);
4386 }
4387 ucs.push(UniquenessConstraint {
4388 is_primary_key: is_pk,
4389 columns: cols,
4390 });
4391 }
4392 t.schema_mut().uniqueness_constraints = ucs;
4393 let rt_count = cur.read_u16()? as usize;
4395 for _ in 0..rt_count {
4396 let pos = cur.read_u16()? as usize;
4397 let expr = cur.read_str()?;
4398 if let Some(col) = t.schema_mut().columns.get_mut(pos) {
4399 col.runtime_default = Some(expr);
4400 }
4401 }
4402 }
4403 let _ = table_name;
4404 Ok(())
4405}
4406
4407fn deserialize_rows(
4408 cur: &mut Cursor<'_>,
4409 t: &mut Table,
4410 _n_cols: usize,
4411) -> Result<(), StorageError> {
4412 let row_count = cur.read_u32()? as usize;
4413 let mut hot_bytes: u64 = 0;
4418 for _ in 0..row_count {
4419 let tail = &cur.buf[cur.pos..];
4420 let (row, consumed) = decode_row_body_dense(tail, &t.schema)?;
4421 cur.pos += consumed;
4422 hot_bytes = hot_bytes.saturating_add(row_body_encoded_len(&row, &t.schema) as u64);
4428 t.rows.push_mut(row);
4429 }
4430 t.hot_bytes = hot_bytes;
4431 Ok(())
4432}
4433
4434fn deserialize_indices(
4435 cur: &mut Cursor<'_>,
4436 t: &mut Table,
4437 version: u8,
4438) -> Result<(), StorageError> {
4439 let index_count = cur.read_u16()? as usize;
4440 for _ in 0..index_count {
4441 let idx_name = cur.read_str()?;
4442 let col_pos = cur.read_u16()? as usize;
4443 let column_name = t
4444 .schema
4445 .columns
4446 .get(col_pos)
4447 .ok_or_else(|| {
4448 StorageError::Corrupt(format!(
4449 "index {idx_name:?} points at non-existent column position {col_pos}"
4450 ))
4451 })?
4452 .name
4453 .clone();
4454 let kind_tag = cur.read_u8()?;
4455 match kind_tag {
4456 0 => {
4457 if version >= 9 {
4458 let map = read_btree_map(cur)?;
4463 t.restore_btree_index(idx_name, &column_name, map)?;
4464 } else {
4465 t.add_index(idx_name, &column_name)?;
4470 }
4471 }
4472 1 => {
4473 let m = cur.read_u16()? as usize;
4474 let graph = cur.read_nsw_graph(m)?;
4475 t.restore_nsw_index(idx_name, &column_name, graph)?;
4476 }
4477 2 => {
4478 let column_type = cur.read_data_type()?;
4482 t.restore_brin_index(idx_name, &column_name, column_type)?;
4483 }
4484 other => {
4485 return Err(StorageError::Corrupt(format!(
4486 "unknown index kind tag: {other}"
4487 )));
4488 }
4489 }
4490 if version >= 12 {
4493 let num_included = cur.read_u16()? as usize;
4494 if num_included > 0 {
4495 let mut included: Vec<usize> = Vec::with_capacity(num_included);
4496 for _ in 0..num_included {
4497 let cp = cur.read_u16()? as usize;
4498 if cp >= t.schema.columns.len() {
4499 return Err(StorageError::Corrupt(format!(
4500 "INCLUDE column position {cp} out of range \
4501 ({} schema columns)",
4502 t.schema.columns.len()
4503 )));
4504 }
4505 included.push(cp);
4506 }
4507 if let Some(last) = t.indices.last_mut() {
4508 last.included_columns = included;
4509 }
4510 }
4511 match cur.read_u8()? {
4513 0 => {}
4514 1 => {
4515 let pred = cur.read_str()?;
4516 if let Some(last) = t.indices.last_mut() {
4517 last.partial_predicate = Some(pred);
4518 }
4519 }
4520 other => {
4521 return Err(StorageError::Corrupt(format!(
4522 "partial_predicate tag: unknown byte {other}"
4523 )));
4524 }
4525 }
4526 match cur.read_u8()? {
4528 0 => {}
4529 1 => {
4530 let expr = cur.read_str()?;
4531 if let Some(last) = t.indices.last_mut() {
4532 last.expression = Some(expr);
4533 }
4534 }
4535 other => {
4536 return Err(StorageError::Corrupt(format!(
4537 "expression tag: unknown byte {other}"
4538 )));
4539 }
4540 }
4541 if version >= 16 {
4544 match cur.read_u8()? {
4545 0 => {}
4546 1 => {
4547 if let Some(last) = t.indices.last_mut() {
4548 last.is_unique = true;
4549 }
4550 }
4551 other => {
4552 return Err(StorageError::Corrupt(format!(
4553 "is_unique tag: unknown byte {other}"
4554 )));
4555 }
4556 }
4557 let n = cur.read_u16()? as usize;
4559 if n > 0 {
4560 let mut extras: Vec<usize> = Vec::with_capacity(n);
4561 for _ in 0..n {
4562 let cp = cur.read_u16()? as usize;
4563 if cp >= t.schema.columns.len() {
4564 return Err(StorageError::Corrupt(format!(
4565 "extra column position {cp} out of range \
4566 ({} schema columns)",
4567 t.schema.columns.len()
4568 )));
4569 }
4570 extras.push(cp);
4571 }
4572 if let Some(last) = t.indices.last_mut() {
4573 last.extra_column_positions = extras;
4574 }
4575 }
4576 }
4577 }
4578 }
4579 Ok(())
4580}
4581
4582fn read_btree_map(
4586 cur: &mut Cursor<'_>,
4587) -> Result<PersistentBTreeMap<IndexKey, Vec<RowLocator>>, StorageError> {
4588 let entry_count = cur.read_u32()? as usize;
4589 let mut map = PersistentBTreeMap::new();
4590 for _ in 0..entry_count {
4591 let key = cur.read_index_key()?;
4592 let locator_count = cur.read_u32()? as usize;
4593 let mut locators = Vec::with_capacity(locator_count);
4594 for _ in 0..locator_count {
4595 let tail = &cur.buf[cur.pos..];
4596 let (loc, consumed) = RowLocator::read_le(tail).map_err(|e| {
4597 StorageError::Corrupt(format!("row_locator decode at offset {}: {e}", cur.pos))
4598 })?;
4599 cur.pos += consumed;
4600 locators.push(loc);
4601 }
4602 map.insert_mut(key, locators);
4603 }
4604 Ok(map)
4605}
4606
4607fn write_nsw_graph(out: &mut Vec<u8>, g: &NswGraph) {
4623 let entry = g.entry.map_or(u32::MAX, |e| {
4624 u32::try_from(e).expect("NSW entry fits in u32")
4625 });
4626 write_u16(
4627 out,
4628 u16::try_from(g.m_max_0).expect("HNSW m_max_0 fits in u16"),
4629 );
4630 out.extend_from_slice(&entry.to_le_bytes());
4631 out.push(g.entry_level);
4632 let node_count = g.levels.len();
4633 write_u32(
4634 out,
4635 u32::try_from(node_count).expect("HNSW node count fits in u32"),
4636 );
4637 for &lvl in &g.levels {
4638 out.push(lvl);
4639 }
4640 let layer_count = u8::try_from(g.layers.len()).expect("HNSW layer count ≤ 255");
4641 out.push(layer_count);
4642 for layer in &g.layers {
4643 write_u32(
4644 out,
4645 u32::try_from(layer.len()).expect("HNSW per-layer node count fits in u32"),
4646 );
4647 for neighbors in layer {
4648 write_u16(
4649 out,
4650 u16::try_from(neighbors.len()).expect("HNSW neighbour list fits in u16"),
4651 );
4652 for &peer in neighbors {
4656 write_u32(out, peer);
4657 }
4658 }
4659 }
4660}
4661
4662fn write_data_type(out: &mut Vec<u8>, t: DataType) {
4663 match t {
4664 DataType::Int => out.push(1),
4665 DataType::BigInt => out.push(2),
4666 DataType::Float => out.push(3),
4667 DataType::Text => out.push(4),
4668 DataType::Bool => out.push(5),
4669 DataType::Vector { dim, encoding } => match encoding {
4670 VecEncoding::F32 => {
4674 out.push(6);
4675 out.extend_from_slice(&dim.to_le_bytes());
4676 }
4677 VecEncoding::F16 => {
4680 out.push(15);
4681 out.extend_from_slice(&dim.to_le_bytes());
4682 }
4683 VecEncoding::Sq8 => {
4689 out.push(14);
4690 out.extend_from_slice(&dim.to_le_bytes());
4691 }
4692 },
4693 DataType::SmallInt => out.push(7),
4694 DataType::Varchar(max) => {
4695 out.push(8);
4696 out.extend_from_slice(&max.to_le_bytes());
4697 }
4698 DataType::Char(size) => {
4699 out.push(9);
4700 out.extend_from_slice(&size.to_le_bytes());
4701 }
4702 DataType::Numeric { precision, scale } => {
4703 out.push(10);
4704 out.push(precision);
4705 out.push(scale);
4706 }
4707 DataType::Date => out.push(11),
4708 DataType::Timestamp => out.push(12),
4709 DataType::Timestamptz => out.push(17),
4713 DataType::Interval => {
4718 unreachable!("DataType::Interval has no on-disk encoding in v2.11")
4719 }
4720 DataType::Json => out.push(13),
4721 DataType::Jsonb => out.push(16),
4724 DataType::Bytes => out.push(18),
4726 DataType::TextArray => out.push(19),
4729 }
4730}
4731
4732impl Cursor<'_> {
4733 fn read_data_type(&mut self) -> Result<DataType, StorageError> {
4734 let tag = self.read_u8()?;
4735 match tag {
4736 1 => Ok(DataType::Int),
4737 2 => Ok(DataType::BigInt),
4738 3 => Ok(DataType::Float),
4739 4 => Ok(DataType::Text),
4740 5 => Ok(DataType::Bool),
4741 6 => Ok(DataType::Vector {
4742 dim: self.read_u32()?,
4743 encoding: VecEncoding::F32,
4744 }),
4745 7 => Ok(DataType::SmallInt),
4746 8 => Ok(DataType::Varchar(self.read_u32()?)),
4747 9 => Ok(DataType::Char(self.read_u32()?)),
4748 10 => {
4749 let precision = self.read_u8()?;
4750 let scale = self.read_u8()?;
4751 Ok(DataType::Numeric { precision, scale })
4752 }
4753 11 => Ok(DataType::Date),
4754 12 => Ok(DataType::Timestamp),
4755 13 => Ok(DataType::Json),
4756 14 => Ok(DataType::Vector {
4757 dim: self.read_u32()?,
4758 encoding: VecEncoding::Sq8,
4759 }),
4760 15 => Ok(DataType::Vector {
4764 dim: self.read_u32()?,
4765 encoding: VecEncoding::F16,
4766 }),
4767 16 => Ok(DataType::Jsonb),
4771 17 => Ok(DataType::Timestamptz),
4775 18 => Ok(DataType::Bytes),
4777 19 => Ok(DataType::TextArray),
4779 other => Err(StorageError::Corrupt(format!(
4780 "unknown data type tag: {other}"
4781 ))),
4782 }
4783 }
4784}
4785
4786pub fn row_body_encoded_len(row: &Row, schema: &TableSchema) -> usize {
4792 debug_assert_eq!(
4793 row.values.len(),
4794 schema.columns.len(),
4795 "row_body_encoded_len: row arity must match schema"
4796 );
4797 let bitmap_bytes = schema.columns.len().div_ceil(8);
4798 let mut n = bitmap_bytes;
4799 for (col_idx, v) in row.values.iter().enumerate() {
4800 if matches!(v, Value::Null) {
4801 continue;
4802 }
4803 n += value_body_encoded_len(v, schema.columns[col_idx].ty);
4804 }
4805 n
4806}
4807
4808fn value_body_encoded_len(v: &Value, _ty: DataType) -> usize {
4814 match v {
4815 Value::SmallInt(_) => 2,
4816 Value::Int(_) | Value::Date(_) => 4,
4818 Value::BigInt(_) | Value::Float(_) | Value::Timestamp(_) => 8,
4820 Value::Bool(_) => 1,
4821 Value::Text(s) | Value::Json(s) => 2 + s.len(),
4823 Value::Vector(vec) => 4 + 4 * vec.len(),
4825 Value::Sq8Vector(q) => 4 + 4 + 4 + q.bytes.len(),
4832 Value::HalfVector(h) => 4 + h.bytes.len(),
4835 Value::Numeric { .. } => 16 + 1,
4837 Value::Bytes(b) => 2 + b.len(),
4843 Value::TextArray(items) => {
4846 let mut n = 2; for item in items {
4848 n += 1; if let Some(s) = item {
4850 n += 2 + s.len();
4851 }
4852 }
4853 n
4854 }
4855 Value::Null => 0,
4857 Value::Interval { .. } => {
4859 unreachable!("Value::Interval has no on-disk encoding")
4860 }
4861 }
4862}
4863
4864pub fn encode_row_body_dense(row: &Row, schema: &TableSchema) -> Vec<u8> {
4875 debug_assert_eq!(
4876 row.values.len(),
4877 schema.columns.len(),
4878 "dense encode: row arity must match schema"
4879 );
4880 let bitmap_bytes = schema.columns.len().div_ceil(8);
4881 let mut out = Vec::with_capacity(bitmap_bytes + schema.columns.len() * 8);
4884 let bitmap_offset = out.len();
4885 out.resize(bitmap_offset + bitmap_bytes, 0);
4886 for (i, v) in row.values.iter().enumerate() {
4887 if matches!(v, Value::Null) {
4888 out[bitmap_offset + i / 8] |= 1 << (i % 8);
4889 }
4890 }
4891 for (col_idx, v) in row.values.iter().enumerate() {
4892 if matches!(v, Value::Null) {
4893 continue;
4894 }
4895 write_value_body(&mut out, v, schema.columns[col_idx].ty);
4896 }
4897 out
4898}
4899
4900pub fn decode_row_body_dense(
4906 bytes: &[u8],
4907 schema: &TableSchema,
4908) -> Result<(Row, usize), StorageError> {
4909 let mut cur = Cursor::new(bytes);
4910 let bitmap_bytes = schema.columns.len().div_ceil(8);
4911 let mut bitmap_buf = [0u8; 32];
4912 if bitmap_bytes > bitmap_buf.len() {
4913 return Err(StorageError::Corrupt(format!(
4914 "row NULL bitmap {bitmap_bytes} B exceeds 32 B cap"
4915 )));
4916 }
4917 let slice = cur.take(bitmap_bytes)?;
4918 bitmap_buf[..bitmap_bytes].copy_from_slice(slice);
4919 let mut values = Vec::with_capacity(schema.columns.len());
4920 for (col_idx, col) in schema.columns.iter().enumerate() {
4921 if (bitmap_buf[col_idx / 8] >> (col_idx % 8)) & 1 == 1 {
4922 values.push(Value::Null);
4923 } else {
4924 values.push(cur.read_value_body(col.ty)?);
4925 }
4926 }
4927 Ok((Row { values }, cur.pos))
4928}
4929
4930fn write_value_body(out: &mut Vec<u8>, v: &Value, ty: DataType) {
4939 match (v, ty) {
4940 (Value::SmallInt(n), DataType::SmallInt) => out.extend_from_slice(&n.to_le_bytes()),
4941 (Value::Int(n), DataType::Int) => out.extend_from_slice(&n.to_le_bytes()),
4942 (Value::BigInt(n), DataType::BigInt) => out.extend_from_slice(&n.to_le_bytes()),
4943 (Value::Float(x), DataType::Float) => out.extend_from_slice(&x.to_le_bytes()),
4944 (Value::Bool(b), DataType::Bool) => out.push(u8::from(*b)),
4945 (Value::Text(s), DataType::Text | DataType::Varchar(_) | DataType::Char(_)) => {
4946 write_str(out, s);
4947 }
4948 (
4949 Value::Vector(v),
4950 DataType::Vector {
4951 encoding: VecEncoding::F32,
4952 ..
4953 },
4954 ) => {
4955 let dim = u32::try_from(v.len()).expect("vector dim fits in u32");
4956 out.extend_from_slice(&dim.to_le_bytes());
4957 for x in v {
4958 out.extend_from_slice(&x.to_le_bytes());
4959 }
4960 }
4961 (
4967 Value::Sq8Vector(q),
4968 DataType::Vector {
4969 encoding: VecEncoding::Sq8,
4970 ..
4971 },
4972 ) => {
4973 let dim = u32::try_from(q.bytes.len()).expect("vector dim fits in u32");
4974 out.extend_from_slice(&dim.to_le_bytes());
4975 out.extend_from_slice(&q.min.to_le_bytes());
4976 out.extend_from_slice(&q.max.to_le_bytes());
4977 out.extend_from_slice(&q.bytes);
4978 }
4979 (
4983 Value::HalfVector(h),
4984 DataType::Vector {
4985 encoding: VecEncoding::F16,
4986 ..
4987 },
4988 ) => {
4989 let dim = u32::try_from(h.dim()).expect("vector dim fits in u32");
4990 out.extend_from_slice(&dim.to_le_bytes());
4991 out.extend_from_slice(&h.bytes);
4992 }
4993 (Value::Numeric { scaled, .. }, DataType::Numeric { scale, .. }) => {
4994 out.extend_from_slice(&scaled.to_le_bytes());
4995 out.push(scale);
4996 }
4997 (Value::Date(d), DataType::Date) => out.extend_from_slice(&d.to_le_bytes()),
4998 (Value::Timestamp(t), DataType::Timestamp | DataType::Timestamptz) => {
4999 out.extend_from_slice(&t.to_le_bytes())
5000 }
5001 (Value::Json(s), DataType::Json | DataType::Jsonb) => write_str(out, s),
5005 (Value::Bytes(b), DataType::Bytes) => {
5008 let len = u16::try_from(b.len()).expect("BYTEA cell ≤ 64 KiB");
5009 out.extend_from_slice(&len.to_le_bytes());
5010 out.extend_from_slice(b);
5011 }
5012 (Value::TextArray(items), DataType::TextArray) => {
5015 let count = u16::try_from(items.len()).expect("TEXT[] ≤ 65k elements");
5016 out.extend_from_slice(&count.to_le_bytes());
5017 for item in items {
5018 match item {
5019 None => out.push(1),
5020 Some(s) => {
5021 out.push(0);
5022 let len = u16::try_from(s.len()).expect("TEXT[] element ≤ 64 KiB");
5023 out.extend_from_slice(&len.to_le_bytes());
5024 out.extend_from_slice(s.as_bytes());
5025 }
5026 }
5027 }
5028 }
5029 (other, ty) => unreachable!(
5033 "schema-driven encode received mismatched value/type pair: \
5034 value tag={:?}, column type={:?}",
5035 other.data_type(),
5036 ty
5037 ),
5038 }
5039}
5040
5041fn write_value(out: &mut Vec<u8>, v: &Value) {
5042 match v {
5043 Value::Null => out.push(0),
5044 Value::SmallInt(n) => {
5045 out.push(7);
5046 out.extend_from_slice(&n.to_le_bytes());
5047 }
5048 Value::Int(n) => {
5049 out.push(1);
5050 out.extend_from_slice(&n.to_le_bytes());
5051 }
5052 Value::BigInt(n) => {
5053 out.push(2);
5054 out.extend_from_slice(&n.to_le_bytes());
5055 }
5056 Value::Float(x) => {
5057 out.push(3);
5058 out.extend_from_slice(&x.to_le_bytes());
5059 }
5060 Value::Text(s) | Value::Json(s) => {
5065 out.push(4);
5066 write_str(out, s);
5067 }
5068 Value::Bool(b) => {
5069 out.push(5);
5070 out.push(u8::from(*b));
5071 }
5072 Value::Vector(v) => {
5073 out.push(6);
5074 let dim = u32::try_from(v.len()).expect("vector dim fits in u32");
5075 out.extend_from_slice(&dim.to_le_bytes());
5076 for x in v {
5077 out.extend_from_slice(&x.to_le_bytes());
5078 }
5079 }
5080 Value::Sq8Vector(q) => {
5085 out.push(11);
5086 let dim = u32::try_from(q.bytes.len()).expect("vector dim fits in u32");
5087 out.extend_from_slice(&dim.to_le_bytes());
5088 out.extend_from_slice(&q.min.to_le_bytes());
5089 out.extend_from_slice(&q.max.to_le_bytes());
5090 out.extend_from_slice(&q.bytes);
5091 }
5092 Value::HalfVector(h) => {
5097 out.push(12);
5098 let dim = u32::try_from(h.dim()).expect("vector dim fits in u32");
5099 out.extend_from_slice(&dim.to_le_bytes());
5100 out.extend_from_slice(&h.bytes);
5101 }
5102 Value::Numeric { scaled, scale } => {
5103 out.push(8);
5104 out.extend_from_slice(&scaled.to_le_bytes());
5105 out.push(*scale);
5106 }
5107 Value::Date(d) => {
5108 out.push(9);
5109 out.extend_from_slice(&d.to_le_bytes());
5110 }
5111 Value::Timestamp(t) => {
5112 out.push(10);
5113 out.extend_from_slice(&t.to_le_bytes());
5114 }
5115 Value::Interval { .. } => {
5119 unreachable!(
5120 "Value::Interval has no on-disk encoding; engine must reject it before write"
5121 )
5122 }
5123 Value::Bytes(b) => {
5128 out.push(14);
5129 let len = u16::try_from(b.len()).expect("BYTEA value ≤ 64 KiB");
5130 out.extend_from_slice(&len.to_le_bytes());
5131 out.extend_from_slice(b);
5132 }
5133 Value::TextArray(items) => {
5136 out.push(15);
5137 let count = u16::try_from(items.len()).expect("TEXT[] ≤ 65k elements");
5138 out.extend_from_slice(&count.to_le_bytes());
5139 for item in items {
5140 match item {
5141 None => out.push(1),
5142 Some(s) => {
5143 out.push(0);
5144 let len = u16::try_from(s.len()).expect("TEXT[] element ≤ 64 KiB");
5145 out.extend_from_slice(&len.to_le_bytes());
5146 out.extend_from_slice(s.as_bytes());
5147 }
5148 }
5149 }
5150 }
5151 }
5152}
5153
5154fn write_u16(out: &mut Vec<u8>, n: u16) {
5155 out.extend_from_slice(&n.to_le_bytes());
5156}
5157fn write_u32(out: &mut Vec<u8>, n: u32) {
5158 out.extend_from_slice(&n.to_le_bytes());
5159}
5160fn write_str(out: &mut Vec<u8>, s: &str) {
5161 let len = u16::try_from(s.len()).expect("identifier / text fits in u16");
5162 write_u16(out, len);
5163 out.extend_from_slice(s.as_bytes());
5164}
5165
5166fn write_index_key(out: &mut Vec<u8>, key: &IndexKey) {
5170 match key {
5171 IndexKey::Int(n) => {
5172 out.push(INDEX_KEY_TAG_INT);
5173 out.extend_from_slice(&n.to_le_bytes());
5174 }
5175 IndexKey::Text(s) => {
5176 out.push(INDEX_KEY_TAG_TEXT);
5177 write_str(out, s);
5178 }
5179 IndexKey::Bool(b) => {
5180 out.push(INDEX_KEY_TAG_BOOL);
5181 out.push(u8::from(*b));
5182 }
5183 }
5184}
5185
5186struct Cursor<'a> {
5187 buf: &'a [u8],
5188 pos: usize,
5189}
5190
5191impl<'a> Cursor<'a> {
5192 const fn new(buf: &'a [u8]) -> Self {
5193 Self { buf, pos: 0 }
5194 }
5195
5196 fn take(&mut self, n: usize) -> Result<&'a [u8], StorageError> {
5197 let end = self
5198 .pos
5199 .checked_add(n)
5200 .ok_or_else(|| StorageError::Corrupt(format!("length overflow taking {n} bytes")))?;
5201 if end > self.buf.len() {
5202 return Err(StorageError::Corrupt(format!(
5203 "unexpected EOF at offset {} (wanted {n} more bytes)",
5204 self.pos
5205 )));
5206 }
5207 let s = &self.buf[self.pos..end];
5208 self.pos = end;
5209 Ok(s)
5210 }
5211
5212 fn read_u8(&mut self) -> Result<u8, StorageError> {
5213 Ok(self.take(1)?[0])
5214 }
5215 fn read_u16(&mut self) -> Result<u16, StorageError> {
5216 let s = self.take(2)?;
5217 Ok(u16::from_le_bytes([s[0], s[1]]))
5218 }
5219 fn read_u32(&mut self) -> Result<u32, StorageError> {
5220 let s = self.take(4)?;
5221 Ok(u32::from_le_bytes([s[0], s[1], s[2], s[3]]))
5222 }
5223 fn read_i32(&mut self) -> Result<i32, StorageError> {
5224 let s = self.take(4)?;
5225 Ok(i32::from_le_bytes([s[0], s[1], s[2], s[3]]))
5226 }
5227 fn read_u64(&mut self) -> Result<u64, StorageError> {
5230 let s = self.take(8)?;
5231 Ok(u64::from_le_bytes([
5232 s[0], s[1], s[2], s[3], s[4], s[5], s[6], s[7],
5233 ]))
5234 }
5235 fn read_i64(&mut self) -> Result<i64, StorageError> {
5236 let s = self.take(8)?;
5237 let arr: [u8; 8] = s.try_into().expect("checked");
5238 Ok(i64::from_le_bytes(arr))
5239 }
5240 fn read_f64(&mut self) -> Result<f64, StorageError> {
5241 let s = self.take(8)?;
5242 let arr: [u8; 8] = s.try_into().expect("checked");
5243 Ok(f64::from_le_bytes(arr))
5244 }
5245 fn read_f32(&mut self) -> Result<f32, StorageError> {
5246 let s = self.take(4)?;
5247 Ok(f32::from_le_bytes([s[0], s[1], s[2], s[3]]))
5248 }
5249 fn read_str(&mut self) -> Result<String, StorageError> {
5250 let len = self.read_u16()? as usize;
5251 let bytes = self.take(len)?;
5252 core::str::from_utf8(bytes)
5253 .map(String::from)
5254 .map_err(|_| StorageError::Corrupt("invalid UTF-8 in identifier or text".into()))
5255 }
5256
5257 fn read_index_key(&mut self) -> Result<IndexKey, StorageError> {
5261 let tag = self.read_u8()?;
5262 match tag {
5263 INDEX_KEY_TAG_INT => Ok(IndexKey::Int(self.read_i64()?)),
5264 INDEX_KEY_TAG_TEXT => Ok(IndexKey::Text(self.read_str()?)),
5265 INDEX_KEY_TAG_BOOL => Ok(IndexKey::Bool(self.read_u8()? != 0)),
5266 other => Err(StorageError::Corrupt(format!(
5267 "unknown index key tag: {other}"
5268 ))),
5269 }
5270 }
5271 fn read_value_body(&mut self, ty: DataType) -> Result<Value, StorageError> {
5277 match ty {
5278 DataType::SmallInt => {
5279 let s = self.take(2)?;
5280 Ok(Value::SmallInt(i16::from_le_bytes([s[0], s[1]])))
5281 }
5282 DataType::Int => Ok(Value::Int(self.read_i32()?)),
5283 DataType::BigInt => Ok(Value::BigInt(self.read_i64()?)),
5284 DataType::Float => Ok(Value::Float(self.read_f64()?)),
5285 DataType::Bool => Ok(Value::Bool(self.read_u8()? != 0)),
5286 DataType::Text | DataType::Varchar(_) | DataType::Char(_) => {
5287 Ok(Value::Text(self.read_str()?))
5288 }
5289 DataType::Vector {
5290 encoding: VecEncoding::F32,
5291 ..
5292 } => {
5293 let dim = self.read_u32()? as usize;
5294 let mut v = Vec::with_capacity(dim);
5295 for _ in 0..dim {
5296 let bytes: [u8; 4] = self.take(4)?.try_into().expect("checked");
5297 v.push(f32::from_le_bytes(bytes));
5298 }
5299 Ok(Value::Vector(v))
5300 }
5301 DataType::Vector {
5302 encoding: VecEncoding::Sq8,
5303 ..
5304 } => {
5305 let dim = self.read_u32()? as usize;
5306 let min = self.read_f32()?;
5307 let max = self.read_f32()?;
5308 let bytes = self.take(dim)?.to_vec();
5309 Ok(Value::Sq8Vector(quantize::Sq8Vector { min, max, bytes }))
5310 }
5311 DataType::Vector {
5312 encoding: VecEncoding::F16,
5313 ..
5314 } => {
5315 let dim = self.read_u32()? as usize;
5316 let bytes = self.take(dim * 2)?.to_vec();
5317 Ok(Value::HalfVector(halfvec::HalfVector { bytes }))
5318 }
5319 DataType::Numeric { .. } => {
5320 let s = self.take(16)?;
5321 let arr: [u8; 16] = s.try_into().expect("checked");
5322 let scaled = i128::from_le_bytes(arr);
5323 let scale = self.read_u8()?;
5324 Ok(Value::Numeric { scaled, scale })
5325 }
5326 DataType::Date => Ok(Value::Date(self.read_i32()?)),
5327 DataType::Timestamp => Ok(Value::Timestamp(self.read_i64()?)),
5328 DataType::Timestamptz => Ok(Value::Timestamp(self.read_i64()?)),
5329 DataType::Jsonb => Ok(Value::Json(self.read_str()?)),
5330 DataType::Interval => {
5331 Err(StorageError::Corrupt(
5336 "INTERVAL column found on disk — runtime-only type, v3.0.2 rejects it".into(),
5337 ))
5338 }
5339 DataType::Json => Ok(Value::Json(self.read_str()?)),
5340 DataType::Bytes => {
5343 let len = self.read_u16()? as usize;
5344 let bytes = self.take(len)?.to_vec();
5345 Ok(Value::Bytes(bytes))
5346 }
5347 DataType::TextArray => {
5349 let count = self.read_u16()? as usize;
5350 let mut items: Vec<Option<String>> = Vec::with_capacity(count);
5351 for _ in 0..count {
5352 match self.read_u8()? {
5353 0 => items.push(Some(self.read_str()?)),
5354 1 => items.push(None),
5355 other => {
5356 return Err(StorageError::Corrupt(format!(
5357 "TEXT[] null flag: unknown byte {other}"
5358 )));
5359 }
5360 }
5361 }
5362 Ok(Value::TextArray(items))
5363 }
5364 }
5365 }
5366
5367 fn read_value(&mut self) -> Result<Value, StorageError> {
5368 let tag = self.read_u8()?;
5369 match tag {
5370 0 => Ok(Value::Null),
5371 1 => Ok(Value::Int(self.read_i32()?)),
5372 2 => Ok(Value::BigInt(self.read_i64()?)),
5373 3 => Ok(Value::Float(self.read_f64()?)),
5374 4 => Ok(Value::Text(self.read_str()?)),
5375 5 => Ok(Value::Bool(self.read_u8()? != 0)),
5376 6 => {
5377 let dim = self.read_u32()? as usize;
5378 let mut v = Vec::with_capacity(dim);
5379 for _ in 0..dim {
5380 let bytes: [u8; 4] = self.take(4)?.try_into().expect("checked");
5381 v.push(f32::from_le_bytes(bytes));
5382 }
5383 Ok(Value::Vector(v))
5384 }
5385 7 => {
5386 let s = self.take(2)?;
5387 Ok(Value::SmallInt(i16::from_le_bytes([s[0], s[1]])))
5388 }
5389 8 => {
5390 let s = self.take(16)?;
5391 let arr: [u8; 16] = s.try_into().expect("checked");
5392 let scaled = i128::from_le_bytes(arr);
5393 let scale = self.read_u8()?;
5394 Ok(Value::Numeric { scaled, scale })
5395 }
5396 9 => Ok(Value::Date(self.read_i32()?)),
5397 10 => Ok(Value::Timestamp(self.read_i64()?)),
5398 11 => {
5403 let dim = self.read_u32()? as usize;
5404 let min = self.read_f32()?;
5405 let max = self.read_f32()?;
5406 let bytes = self.take(dim)?.to_vec();
5407 Ok(Value::Sq8Vector(quantize::Sq8Vector { min, max, bytes }))
5408 }
5409 12 => {
5412 let dim = self.read_u32()? as usize;
5413 let bytes = self.take(dim * 2)?.to_vec();
5414 Ok(Value::HalfVector(halfvec::HalfVector { bytes }))
5415 }
5416 14 => {
5418 let len = self.read_u16()? as usize;
5419 let bytes = self.take(len)?.to_vec();
5420 Ok(Value::Bytes(bytes))
5421 }
5422 15 => {
5425 let count = self.read_u16()? as usize;
5426 let mut items: Vec<Option<String>> = Vec::with_capacity(count);
5427 for _ in 0..count {
5428 match self.read_u8()? {
5429 0 => items.push(Some(self.read_str()?)),
5430 1 => items.push(None),
5431 other => {
5432 return Err(StorageError::Corrupt(format!(
5433 "TEXT[] null flag in value tag: unknown byte {other}"
5434 )));
5435 }
5436 }
5437 }
5438 Ok(Value::TextArray(items))
5439 }
5440 other => Err(StorageError::Corrupt(format!("unknown value tag: {other}"))),
5441 }
5442 }
5443
5444 fn read_nsw_graph(&mut self, m: usize) -> Result<NswGraph, StorageError> {
5448 let m_max_0 = self.read_u16()? as usize;
5449 let entry_raw = self.read_u32()?;
5450 let entry = if entry_raw == u32::MAX {
5451 None
5452 } else {
5453 Some(entry_raw as usize)
5454 };
5455 let entry_level = self.read_u8()?;
5456 let node_count = self.read_u32()? as usize;
5457 let mut levels: PersistentVec<u8> = PersistentVec::new();
5462 for _ in 0..node_count {
5463 levels.push_mut(self.read_u8()?);
5464 }
5465 let layer_count = self.read_u8()? as usize;
5466 let mut layers: Vec<PersistentVec<Vec<u32>>> = Vec::with_capacity(layer_count);
5467 for _ in 0..layer_count {
5468 let n = self.read_u32()? as usize;
5469 let mut per_layer: PersistentVec<Vec<u32>> = PersistentVec::new();
5470 for _ in 0..n {
5471 let cnt = self.read_u16()? as usize;
5472 let mut row: Vec<u32> = Vec::with_capacity(cnt);
5473 for _ in 0..cnt {
5474 row.push(self.read_u32()?);
5475 }
5476 per_layer.push_mut(row);
5477 }
5478 layers.push(per_layer);
5479 }
5480 Ok(NswGraph {
5481 m,
5482 m_max_0,
5483 entry,
5484 entry_level,
5485 levels,
5486 layers,
5487 })
5488 }
5489}
5490
5491#[cfg(test)]
5492mod tests {
5493 use super::*;
5494 use alloc::string::ToString;
5495 use alloc::vec;
5496
5497 #[cfg(target_arch = "aarch64")]
5498 #[test]
5499 fn neon_l2_matches_scalar() {
5500 let dims = [4usize, 8, 12, 16, 64, 128, 256, 384, 512, 768, 1024, 1536];
5505 for &d in &dims {
5506 let mut state: u64 = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
5507 let mut a = Vec::with_capacity(d);
5508 let mut b = Vec::with_capacity(d);
5509 for _ in 0..d {
5510 state = state
5511 .wrapping_mul(6_364_136_223_846_793_005)
5512 .wrapping_add(1);
5513 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5514 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5515 state = state
5516 .wrapping_mul(6_364_136_223_846_793_005)
5517 .wrapping_add(1);
5518 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5519 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5520 a.push(x);
5521 b.push(y);
5522 }
5523 let scalar = l2_distance_sq_scalar(&a, &b);
5524 let neon = unsafe { l2_distance_sq_neon(&a, &b) };
5525 let tol = (scalar.abs().max(1e-6)) * 1e-4;
5526 assert!(
5527 (scalar - neon).abs() <= tol,
5528 "dim={d}: scalar={scalar} neon={neon} diff={}",
5529 (scalar - neon).abs()
5530 );
5531 }
5532 }
5533
5534 #[cfg(target_arch = "aarch64")]
5535 #[test]
5536 fn neon_inner_product_matches_scalar() {
5537 let dims = [4usize, 8, 12, 16, 64, 128, 256, 512, 1024];
5541 for &d in &dims {
5542 let mut state: u64 = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
5543 let mut a = Vec::with_capacity(d);
5544 let mut b = Vec::with_capacity(d);
5545 for _ in 0..d {
5546 state = state
5547 .wrapping_mul(6_364_136_223_846_793_005)
5548 .wrapping_add(1);
5549 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5550 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5551 state = state
5552 .wrapping_mul(6_364_136_223_846_793_005)
5553 .wrapping_add(1);
5554 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5555 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5556 a.push(x);
5557 b.push(y);
5558 }
5559 let scalar = inner_product_scalar(&a, &b);
5560 let neon = unsafe { inner_product_neon(&a, &b) };
5561 #[allow(clippy::cast_precision_loss)]
5562 let tol = (scalar.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
5563 assert!(
5564 (scalar - neon).abs() <= tol,
5565 "IP dim={d}: scalar={scalar} neon={neon} diff={}",
5566 (scalar - neon).abs()
5567 );
5568 }
5569 }
5570
5571 #[cfg(target_arch = "aarch64")]
5572 #[allow(clippy::similar_names)]
5573 #[test]
5574 fn neon_cosine_dot_norms_matches_scalar() {
5575 let dims = [4usize, 8, 12, 16, 64, 128, 256, 512, 1024];
5576 for &d in &dims {
5577 let mut state: u64 = (d as u64).wrapping_mul(0xBF58_476D_1CE4_E5B9);
5578 let mut a = Vec::with_capacity(d);
5579 let mut b = Vec::with_capacity(d);
5580 for _ in 0..d {
5581 state = state
5582 .wrapping_mul(6_364_136_223_846_793_005)
5583 .wrapping_add(1);
5584 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5585 let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5586 state = state
5587 .wrapping_mul(6_364_136_223_846_793_005)
5588 .wrapping_add(1);
5589 #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
5590 let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
5591 a.push(x);
5592 b.push(y);
5593 }
5594 let (dot_s, na_s, nb_s) = cosine_dot_norms_scalar(&a, &b);
5595 let (dot_n, na_n, nb_n) = unsafe { cosine_dot_norms_neon(&a, &b) };
5596 #[allow(clippy::cast_precision_loss)]
5597 let tol_d = (dot_s.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
5598 #[allow(clippy::cast_precision_loss)]
5599 let tol_n = (na_s.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
5600 assert!(
5601 (dot_s - dot_n).abs() <= tol_d,
5602 "cosine dot dim={d}: scalar={dot_s} neon={dot_n}"
5603 );
5604 assert!(
5605 (na_s - na_n).abs() <= tol_n,
5606 "cosine na dim={d}: scalar={na_s} neon={na_n}"
5607 );
5608 assert!(
5609 (nb_s - nb_n).abs() <= tol_n,
5610 "cosine nb dim={d}: scalar={nb_s} neon={nb_n}"
5611 );
5612 }
5613 }
5614
5615 fn make_users_schema() -> TableSchema {
5616 TableSchema::new(
5617 "users",
5618 vec![
5619 ColumnSchema::new("id", DataType::Int, false),
5620 ColumnSchema::new("name", DataType::Text, false),
5621 ColumnSchema::new("score", DataType::Float, true),
5622 ],
5623 )
5624 }
5625
5626 #[test]
5627 fn value_type_tag_matches_variant() {
5628 assert_eq!(Value::Int(1).data_type(), Some(DataType::Int));
5629 assert_eq!(Value::BigInt(1).data_type(), Some(DataType::BigInt));
5630 assert_eq!(Value::Float(1.0).data_type(), Some(DataType::Float));
5631 assert_eq!(Value::Text("x".into()).data_type(), Some(DataType::Text));
5632 assert_eq!(Value::Bool(true).data_type(), Some(DataType::Bool));
5633 assert_eq!(Value::Null.data_type(), None);
5634 assert!(Value::Null.is_null());
5635 assert!(!Value::Int(0).is_null());
5636 }
5637
5638 #[test]
5639 fn sq8_value_reports_sq8_data_type() {
5640 let q = crate::quantize::quantize(&[0.0, 0.25, 0.5, 0.75, 1.0]);
5645 let v = Value::Sq8Vector(q);
5646 assert_eq!(
5647 v.data_type(),
5648 Some(DataType::Vector {
5649 dim: 5,
5650 encoding: VecEncoding::Sq8,
5651 }),
5652 );
5653 }
5654
5655 #[test]
5656 fn datatype_display_matches_pg_keyword() {
5657 assert_eq!(DataType::Int.to_string(), "INT");
5658 assert_eq!(DataType::BigInt.to_string(), "BIGINT");
5659 assert_eq!(DataType::Float.to_string(), "FLOAT");
5660 assert_eq!(DataType::Text.to_string(), "TEXT");
5661 assert_eq!(DataType::Bool.to_string(), "BOOL");
5662 }
5663
5664 #[test]
5665 fn row_len_and_emptiness() {
5666 let r = Row::new(vec![Value::Int(1), Value::Null]);
5667 assert_eq!(r.len(), 2);
5668 assert!(!r.is_empty());
5669 assert!(Row::new(Vec::new()).is_empty());
5670 }
5671
5672 #[test]
5673 fn table_schema_column_position() {
5674 let s = make_users_schema();
5675 assert_eq!(s.column_position("id"), Some(0));
5676 assert_eq!(s.column_position("score"), Some(2));
5677 assert_eq!(s.column_position("missing"), None);
5678 }
5679
5680 #[test]
5681 fn catalog_create_table_then_lookup() {
5682 let mut cat = Catalog::new();
5683 cat.create_table(make_users_schema()).unwrap();
5684 assert_eq!(cat.table_count(), 1);
5685 assert!(cat.get("users").is_some());
5686 assert!(cat.get("nope").is_none());
5687 }
5688
5689 #[test]
5690 fn catalog_duplicate_table_is_rejected() {
5691 let mut cat = Catalog::new();
5692 cat.create_table(make_users_schema()).unwrap();
5693 let err = cat.create_table(make_users_schema()).unwrap_err();
5694 assert!(matches!(err, StorageError::DuplicateTable { ref name } if name == "users"));
5695 }
5696
5697 #[test]
5698 fn table_insert_happy_path_appends_row() {
5699 let mut cat = Catalog::new();
5700 cat.create_table(make_users_schema()).unwrap();
5701 let t = cat.get_mut("users").unwrap();
5702 t.insert(Row::new(vec![
5703 Value::Int(1),
5704 Value::Text("alice".into()),
5705 Value::Float(99.5),
5706 ]))
5707 .unwrap();
5708 assert_eq!(t.row_count(), 1);
5709 assert_eq!(t.rows()[0].values[1], Value::Text("alice".into()));
5710 }
5711
5712 #[test]
5713 fn table_insert_arity_mismatch() {
5714 let mut cat = Catalog::new();
5715 cat.create_table(make_users_schema()).unwrap();
5716 let t = cat.get_mut("users").unwrap();
5717 let err = t.insert(Row::new(vec![Value::Int(1)])).unwrap_err();
5718 assert!(matches!(
5719 err,
5720 StorageError::ArityMismatch {
5721 expected: 3,
5722 actual: 1
5723 }
5724 ));
5725 assert_eq!(t.row_count(), 0);
5726 }
5727
5728 #[test]
5729 fn table_insert_type_mismatch_reports_column() {
5730 let mut cat = Catalog::new();
5731 cat.create_table(make_users_schema()).unwrap();
5732 let t = cat.get_mut("users").unwrap();
5733 let err = t
5734 .insert(Row::new(vec![
5735 Value::Int(1),
5736 Value::Int(42), Value::Float(0.0),
5738 ]))
5739 .unwrap_err();
5740 match err {
5741 StorageError::TypeMismatch {
5742 ref column,
5743 expected,
5744 actual,
5745 position,
5746 } => {
5747 assert_eq!(column, "name");
5748 assert_eq!(expected, DataType::Text);
5749 assert_eq!(actual, DataType::Int);
5750 assert_eq!(position, 1);
5751 }
5752 other => panic!("unexpected: {other:?}"),
5753 }
5754 assert_eq!(t.row_count(), 0);
5755 }
5756
5757 #[test]
5758 fn table_insert_null_into_not_null_rejected() {
5759 let mut cat = Catalog::new();
5760 cat.create_table(make_users_schema()).unwrap();
5761 let t = cat.get_mut("users").unwrap();
5762 let err = t
5763 .insert(Row::new(vec![
5764 Value::Int(1),
5765 Value::Null, Value::Float(1.0),
5767 ]))
5768 .unwrap_err();
5769 assert!(matches!(err, StorageError::NullInNotNull { ref column } if column == "name"));
5770 }
5771
5772 #[test]
5773 fn table_insert_null_into_nullable_ok() {
5774 let mut cat = Catalog::new();
5775 cat.create_table(make_users_schema()).unwrap();
5776 let t = cat.get_mut("users").unwrap();
5777 t.insert(Row::new(vec![
5778 Value::Int(1),
5779 Value::Text("bob".into()),
5780 Value::Null,
5781 ]))
5782 .unwrap();
5783 assert_eq!(t.row_count(), 1);
5784 }
5785
5786 #[test]
5787 fn catalog_get_mut_independent_per_table() {
5788 let mut cat = Catalog::new();
5789 cat.create_table(TableSchema::new(
5790 "a",
5791 vec![ColumnSchema::new("v", DataType::Int, false)],
5792 ))
5793 .unwrap();
5794 cat.create_table(TableSchema::new(
5795 "b",
5796 vec![ColumnSchema::new("v", DataType::Int, false)],
5797 ))
5798 .unwrap();
5799 cat.get_mut("a")
5800 .unwrap()
5801 .insert(Row::new(vec![Value::Int(1)]))
5802 .unwrap();
5803 assert_eq!(cat.get("a").unwrap().row_count(), 1);
5804 assert_eq!(cat.get("b").unwrap().row_count(), 0);
5805 }
5806
5807 fn assert_round_trip(cat: &Catalog) {
5810 let bytes = cat.serialize();
5811 let restored = Catalog::deserialize(&bytes).expect("deserialize");
5812 assert_eq!(restored.table_count(), cat.table_count());
5815 for (a, b) in cat.tables.iter().zip(restored.tables.iter()) {
5816 assert_eq!(a.schema, b.schema);
5817 assert_eq!(a.rows, b.rows);
5818 }
5819 }
5820
5821 #[test]
5822 fn serialize_empty_catalog_round_trips() {
5823 assert_round_trip(&Catalog::new());
5824 }
5825
5826 #[test]
5827 fn serialize_single_empty_table_round_trips() {
5828 let mut cat = Catalog::new();
5829 cat.create_table(make_users_schema()).unwrap();
5830 assert_round_trip(&cat);
5831 }
5832
5833 #[test]
5834 fn nsw_clone_is_o1() {
5835 let mut cat = Catalog::new();
5844 cat.create_table(TableSchema::new(
5845 "docs",
5846 alloc::vec![
5847 ColumnSchema::new("id", DataType::Int, false),
5848 ColumnSchema::new(
5849 "v",
5850 DataType::Vector {
5851 dim: 3,
5852 encoding: VecEncoding::F32
5853 },
5854 true
5855 ),
5856 ],
5857 ))
5858 .unwrap();
5859 let t = cat.get_mut("docs").unwrap();
5860 for i in 0..1500_i32 {
5861 #[allow(clippy::cast_precision_loss)] let base = (i as f32) * 0.01;
5863 t.insert(Row::new(alloc::vec![
5864 Value::Int(i),
5865 Value::Vector(alloc::vec![base, base + 0.05, base + 0.1]),
5866 ]))
5867 .unwrap();
5868 }
5869 t.add_nsw_index("docs_nsw".into(), "v", NSW_DEFAULT_M)
5870 .unwrap();
5871 let g = match &cat.get("docs").unwrap().indices()[0].kind {
5872 IndexKind::Nsw(g) => g,
5873 IndexKind::BTree(_) | IndexKind::Brin { .. } => panic!("expected NSW"),
5874 };
5875 assert_eq!(g.levels.len(), 1500, "one level slot per inserted row");
5878 assert!(
5879 g.layers.len() >= 2,
5880 "1500 nodes should populate at least two HNSW layers, got {}",
5881 g.layers.len()
5882 );
5883
5884 let cloned = g.clone();
5885
5886 assert!(
5887 g.levels.shares_storage_with(&cloned.levels),
5888 "levels PV not shared after clone — clone copied elements (O(N))"
5889 );
5890 assert_eq!(g.layers.len(), cloned.layers.len());
5891 for (l, (orig, cl)) in g.layers.iter().zip(cloned.layers.iter()).enumerate() {
5892 assert!(
5893 orig.shares_storage_with(cl),
5894 "layer {l} PV not shared after clone — clone copied elements (O(N))"
5895 );
5896 }
5897 }
5898
5899 #[test]
5900 fn sq8_catalog_serialise_roundtrip_preserves_cells_and_index() {
5901 let mut cat = Catalog::new();
5908 cat.create_table(TableSchema::new(
5909 "vecs",
5910 alloc::vec![
5911 ColumnSchema::new("id", DataType::Int, false),
5912 ColumnSchema::new(
5913 "v",
5914 DataType::Vector {
5915 dim: 8,
5916 encoding: VecEncoding::Sq8,
5917 },
5918 false,
5919 ),
5920 ],
5921 ))
5922 .unwrap();
5923 let t = cat.get_mut("vecs").unwrap();
5924 for i in 0..32_i32 {
5925 #[allow(clippy::cast_precision_loss)]
5926 let base = (i as f32) * 0.03;
5927 let v: Vec<f32> = (0..8_i32)
5928 .map(|j| {
5929 #[allow(clippy::cast_precision_loss)]
5930 let off = (j as f32) * 0.01;
5931 base + off
5932 })
5933 .collect();
5934 t.insert(Row::new(alloc::vec![
5935 Value::Int(i),
5936 Value::Sq8Vector(quantize::quantize(&v)),
5937 ]))
5938 .unwrap();
5939 }
5940 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
5941 let query = alloc::vec![0.15_f32, 0.16, 0.17, 0.18, 0.19, 0.20, 0.21, 0.22];
5944 let (before_cell, before_ty, before_hits) = {
5945 let t_ref = cat.get("vecs").unwrap();
5946 (
5947 t_ref.rows()[5].values[1].clone(),
5948 t_ref.schema().columns[1].ty,
5949 nsw_query(t_ref, "v_idx", &query, 5, NswMetric::L2),
5950 )
5951 };
5952
5953 let bytes = cat.serialize();
5954 let restored = Catalog::deserialize(&bytes).expect("deserialize ok");
5955 let rt = restored.get("vecs").unwrap();
5956 assert_eq!(rt.schema().columns[1].ty, before_ty);
5957 assert_eq!(rt.rows()[5].values[1], before_cell);
5958 let after_hits = nsw_query(rt, "v_idx", &query, 5, NswMetric::L2);
5959 assert_eq!(before_hits, after_hits);
5960 }
5961
5962 #[test]
5963 fn half_catalog_serialise_roundtrip_preserves_cells_and_index() {
5964 use crate::halfvec;
5971 let mut cat = Catalog::new();
5972 cat.create_table(TableSchema::new(
5973 "vecs",
5974 alloc::vec![
5975 ColumnSchema::new("id", DataType::Int, false),
5976 ColumnSchema::new(
5977 "v",
5978 DataType::Vector {
5979 dim: 8,
5980 encoding: VecEncoding::F16,
5981 },
5982 false,
5983 ),
5984 ],
5985 ))
5986 .unwrap();
5987 let t = cat.get_mut("vecs").unwrap();
5988 for i in 0..32_i32 {
5989 #[allow(clippy::cast_precision_loss)]
5990 let base = (i as f32) * 0.03;
5991 let v: Vec<f32> = (0..8_i32)
5992 .map(|j| {
5993 #[allow(clippy::cast_precision_loss)]
5994 let off = (j as f32) * 0.01;
5995 base + off
5996 })
5997 .collect();
5998 t.insert(Row::new(alloc::vec![
5999 Value::Int(i),
6000 Value::HalfVector(halfvec::HalfVector::from_f32_slice(&v)),
6001 ]))
6002 .unwrap();
6003 }
6004 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
6005 let query = alloc::vec![0.15_f32, 0.16, 0.17, 0.18, 0.19, 0.20, 0.21, 0.22];
6006 let (before_cell, before_ty, before_hits) = {
6007 let t_ref = cat.get("vecs").unwrap();
6008 (
6009 t_ref.rows()[5].values[1].clone(),
6010 t_ref.schema().columns[1].ty,
6011 nsw_query(t_ref, "v_idx", &query, 5, NswMetric::L2),
6012 )
6013 };
6014 let bytes = cat.serialize();
6015 let restored = Catalog::deserialize(&bytes).expect("deserialize ok");
6016 let rt = restored.get("vecs").unwrap();
6017 assert_eq!(rt.schema().columns[1].ty, before_ty);
6018 assert_eq!(rt.rows()[5].values[1], before_cell);
6019 let after_hits = nsw_query(rt, "v_idx", &query, 5, NswMetric::L2);
6020 assert_eq!(before_hits, after_hits);
6021 }
6022
6023 #[test]
6024 #[allow(clippy::similar_names)]
6025 fn hnsw_half_recall_at_10_matches_f32_groundtruth() {
6026 use crate::halfvec;
6033 fn next(state: &mut u64) -> f32 {
6034 *state = state
6035 .wrapping_add(0x9E37_79B9_7F4A_7C15)
6036 .wrapping_mul(0xBF58_476D_1CE4_E5B9);
6037 #[allow(clippy::cast_precision_loss)]
6038 let u = ((*state >> 32) as u32 as f32) / (u32::MAX as f32);
6039 2.0 * u - 1.0
6040 }
6041 let dim: u32 = 32;
6042 let n: usize = 512;
6043 let dim_us = dim as usize;
6044 let mut seed: u64 = 0xF16_F16_F16_F16_u64;
6045 let corpus: Vec<Vec<f32>> = (0..n)
6046 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
6047 .collect();
6048 let queries: Vec<Vec<f32>> = (0..32)
6049 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
6050 .collect();
6051 let exact_top10: Vec<Vec<usize>> = queries
6052 .iter()
6053 .map(|q| {
6054 let mut scored: Vec<(f32, usize)> = corpus
6055 .iter()
6056 .enumerate()
6057 .map(|(i, v)| (l2_distance_sq(v, q), i))
6058 .collect();
6059 scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
6060 scored.into_iter().take(10).map(|(_, i)| i).collect()
6061 })
6062 .collect();
6063 let mut cat = Catalog::new();
6064 cat.create_table(TableSchema::new(
6065 "vecs",
6066 alloc::vec![
6067 ColumnSchema::new("id", DataType::Int, false),
6068 ColumnSchema::new(
6069 "v",
6070 DataType::Vector {
6071 dim,
6072 encoding: VecEncoding::F16,
6073 },
6074 false,
6075 ),
6076 ],
6077 ))
6078 .unwrap();
6079 let t = cat.get_mut("vecs").unwrap();
6080 for (i, v) in corpus.iter().enumerate() {
6081 t.insert(Row::new(alloc::vec![
6082 Value::Int(i32::try_from(i).unwrap()),
6083 Value::HalfVector(halfvec::HalfVector::from_f32_slice(v)),
6084 ]))
6085 .unwrap();
6086 }
6087 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
6088 let table = cat.get("vecs").unwrap();
6089 let mut total_overlap = 0_usize;
6090 for (q, exact) in queries.iter().zip(exact_top10.iter()) {
6091 let hits = nsw_query(table, "v_idx", q, 10, NswMetric::L2);
6092 for h in &hits {
6093 if exact.contains(h) {
6094 total_overlap += 1;
6095 }
6096 }
6097 }
6098 #[allow(clippy::cast_precision_loss)]
6099 let recall = total_overlap as f32 / (10.0 * queries.len() as f32);
6100 assert!(
6101 recall >= 0.95,
6102 "HALF HNSW recall@10 = {recall:.3}, below floor 0.95 — \
6103 check halfvec dispatch in `cell_to_query_metric_distance`"
6104 );
6105 }
6106
6107 #[test]
6108 fn hnsw_sq8_recall_at_10_above_0_95_vs_f32_groundtruth() {
6109 use crate::quantize;
6116 fn next(state: &mut u64) -> f32 {
6120 *state = state
6121 .wrapping_add(0x9E37_79B9_7F4A_7C15)
6122 .wrapping_mul(0xBF58_476D_1CE4_E5B9);
6123 #[allow(clippy::cast_precision_loss)]
6124 let u = ((*state >> 32) as u32 as f32) / (u32::MAX as f32);
6125 2.0 * u - 1.0
6126 }
6127 let dim: u32 = 32;
6128 let n: usize = 512;
6129 let dim_us = dim as usize;
6130 let mut seed: u64 = 0xCAFE_BABE_DEAD_BEEFu64;
6131 let corpus: Vec<Vec<f32>> = (0..n)
6132 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
6133 .collect();
6134 let queries: Vec<Vec<f32>> = (0..32)
6135 .map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
6136 .collect();
6137 let exact_top10: Vec<Vec<usize>> = queries
6139 .iter()
6140 .map(|q| {
6141 let mut scored: Vec<(f32, usize)> = corpus
6142 .iter()
6143 .enumerate()
6144 .map(|(i, v)| (l2_distance_sq(v, q), i))
6145 .collect();
6146 scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
6147 scored.into_iter().take(10).map(|(_, i)| i).collect()
6148 })
6149 .collect();
6150 let mut cat = Catalog::new();
6153 cat.create_table(TableSchema::new(
6154 "vecs",
6155 alloc::vec![
6156 ColumnSchema::new("id", DataType::Int, false),
6157 ColumnSchema::new(
6158 "v",
6159 DataType::Vector {
6160 dim,
6161 encoding: VecEncoding::Sq8,
6162 },
6163 false,
6164 ),
6165 ],
6166 ))
6167 .unwrap();
6168 let t = cat.get_mut("vecs").unwrap();
6169 for (i, v) in corpus.iter().enumerate() {
6170 t.insert(Row::new(alloc::vec![
6171 Value::Int(i32::try_from(i).unwrap()),
6172 Value::Sq8Vector(quantize::quantize(v)),
6173 ]))
6174 .unwrap();
6175 }
6176 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
6177 let table = cat.get("vecs").unwrap();
6178 let mut total_overlap = 0_usize;
6179 for (q, exact) in queries.iter().zip(exact_top10.iter()) {
6180 let hits = nsw_query(table, "v_idx", q, 10, NswMetric::L2);
6181 for h in &hits {
6182 if exact.contains(h) {
6183 total_overlap += 1;
6184 }
6185 }
6186 }
6187 #[allow(clippy::cast_precision_loss)]
6188 let recall = total_overlap as f32 / (10.0 * queries.len() as f32);
6189 assert!(
6190 recall >= 0.95,
6191 "SQ8 HNSW recall@10 = {recall:.3}, below floor 0.95 — \
6192 check `sq8_rerank` is wired in `nsw_search` for SQ8 columns"
6193 );
6194 }
6195
6196 #[test]
6197 fn nsw_index_topology_persists_through_round_trip() {
6198 let mut cat = Catalog::new();
6204 cat.create_table(TableSchema::new(
6205 "docs",
6206 alloc::vec![
6207 ColumnSchema::new("id", DataType::Int, false),
6208 ColumnSchema::new(
6209 "v",
6210 DataType::Vector {
6211 dim: 3,
6212 encoding: VecEncoding::F32
6213 },
6214 true
6215 ),
6216 ],
6217 ))
6218 .unwrap();
6219 let t = cat.get_mut("docs").unwrap();
6220 for i in 0..6_i32 {
6221 #[allow(clippy::cast_precision_loss)] let base = (i as f32) * 0.1;
6223 let row = Row::new(alloc::vec![
6224 Value::Int(i),
6225 Value::Vector(alloc::vec![base, base + 0.05, base + 0.1]),
6226 ]);
6227 t.insert(row).unwrap();
6228 }
6229 t.add_nsw_index("docs_nsw".into(), "v", NSW_DEFAULT_M)
6230 .unwrap();
6231 let original = match &cat.get("docs").unwrap().indices()[0].kind {
6232 IndexKind::Nsw(g) => g.clone(),
6233 IndexKind::BTree(_) | IndexKind::Brin { .. } => panic!("expected NSW"),
6234 };
6235 let bytes = cat.serialize();
6236 let restored = Catalog::deserialize(&bytes).expect("deserialize");
6237 let restored_graph = match &restored.get("docs").unwrap().indices()[0].kind {
6238 IndexKind::Nsw(g) => g.clone(),
6239 IndexKind::BTree(_) | IndexKind::Brin { .. } => panic!("expected NSW"),
6240 };
6241 assert_eq!(restored_graph.m, original.m);
6242 assert_eq!(restored_graph.m_max_0, original.m_max_0);
6243 assert_eq!(restored_graph.entry, original.entry);
6244 assert_eq!(restored_graph.entry_level, original.entry_level);
6245 assert_eq!(restored_graph.levels, original.levels);
6246 assert_eq!(restored_graph.layers, original.layers);
6247 }
6248
6249 #[test]
6250 fn hnsw_level_assignment_is_deterministic() {
6251 for i in 0..32usize {
6254 assert_eq!(nsw_assign_level(i), nsw_assign_level(i));
6255 }
6256 }
6257
6258 #[test]
6259 fn hnsw_layer_0_dominates_population() {
6260 let on_zero = (0..200usize).filter(|&i| nsw_assign_level(i) == 0).count();
6265 assert!(on_zero > 150, "level-0 nodes too few: {on_zero}");
6266 }
6267
6268 #[test]
6269 fn hnsw_search_matches_brute_force_for_l2_top1() {
6270 let mut cat = Catalog::new();
6274 cat.create_table(TableSchema::new(
6275 "vecs",
6276 alloc::vec![
6277 ColumnSchema::new("id", DataType::Int, false),
6278 ColumnSchema::new(
6279 "v",
6280 DataType::Vector {
6281 dim: 3,
6282 encoding: VecEncoding::F32
6283 },
6284 true
6285 ),
6286 ],
6287 ))
6288 .unwrap();
6289 let t = cat.get_mut("vecs").unwrap();
6290 let dataset: alloc::vec::Vec<(i32, [f32; 3])> = alloc::vec![
6291 (1, [0.0, 0.0, 0.0]),
6292 (2, [1.0, 0.0, 0.0]),
6293 (3, [0.0, 1.0, 0.0]),
6294 (4, [0.0, 0.0, 1.0]),
6295 (5, [1.0, 1.0, 0.0]),
6296 (6, [1.0, 0.0, 1.0]),
6297 (7, [0.0, 1.0, 1.0]),
6298 (8, [1.0, 1.0, 1.0]),
6299 (9, [0.5, 0.5, 0.5]),
6300 (10, [0.2, 0.8, 0.5]),
6301 ];
6302 for &(id, v) in &dataset {
6303 t.insert(Row::new(alloc::vec![
6304 Value::Int(id),
6305 Value::Vector(alloc::vec![v[0], v[1], v[2]]),
6306 ]))
6307 .unwrap();
6308 }
6309 t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
6310 let idx_pos = cat
6311 .get("vecs")
6312 .unwrap()
6313 .indices()
6314 .iter()
6315 .position(|i| i.name == "v_idx")
6316 .unwrap();
6317 for query in [[0.4, 0.4, 0.4], [0.9, 0.1, 0.0], [0.0, 0.9, 0.9]] {
6318 let table = cat.get("vecs").unwrap();
6319 let hnsw_top = nsw_search(table, idx_pos, &query, 1, 16, NswMetric::L2);
6320 let mut brute: alloc::vec::Vec<(f32, usize)> = (0..table.rows.len())
6321 .map(|i| {
6322 let Value::Vector(v) = &table.rows[i].values[1] else {
6323 return (f32::INFINITY, i);
6324 };
6325 (l2_distance_sq(v, &query), i)
6326 })
6327 .collect();
6328 brute.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
6329 assert!(!hnsw_top.is_empty(), "HNSW returned no results");
6330 assert_eq!(
6331 hnsw_top[0].1, brute[0].1,
6332 "HNSW top-1 != brute-force top-1 for {query:?}"
6333 );
6334 }
6335 }
6336
6337 #[test]
6338 fn serialize_table_with_rows_round_trips() {
6339 let mut cat = Catalog::new();
6340 cat.create_table(make_users_schema()).unwrap();
6341 let t = cat.get_mut("users").unwrap();
6342 t.insert(Row::new(vec![
6343 Value::Int(1),
6344 Value::Text("alice".into()),
6345 Value::Float(95.5),
6346 ]))
6347 .unwrap();
6348 t.insert(Row::new(vec![
6349 Value::Int(2),
6350 Value::Text("bob".into()),
6351 Value::Null,
6352 ]))
6353 .unwrap();
6354 assert_round_trip(&cat);
6355 }
6356
6357 #[test]
6358 fn serialize_multiple_tables_round_trips() {
6359 let mut cat = Catalog::new();
6360 cat.create_table(make_users_schema()).unwrap();
6361 cat.create_table(TableSchema::new(
6362 "flags",
6363 vec![
6364 ColumnSchema::new("id", DataType::BigInt, false),
6365 ColumnSchema::new("active", DataType::Bool, false),
6366 ],
6367 ))
6368 .unwrap();
6369 cat.get_mut("flags")
6370 .unwrap()
6371 .insert(Row::new(vec![Value::BigInt(7), Value::Bool(true)]))
6372 .unwrap();
6373 assert_round_trip(&cat);
6374 }
6375
6376 #[test]
6377 fn deserialize_rejects_bad_magic() {
6378 let mut buf = b"BADMAGIC".to_vec();
6379 buf.push(FILE_VERSION);
6380 buf.extend_from_slice(&0u32.to_le_bytes());
6381 let err = Catalog::deserialize(&buf).unwrap_err();
6382 assert!(matches!(err, StorageError::Corrupt(_)));
6383 }
6384
6385 #[test]
6386 fn deserialize_rejects_unsupported_version() {
6387 let mut buf = FILE_MAGIC.to_vec();
6388 buf.push(99); buf.extend_from_slice(&0u32.to_le_bytes());
6390 let err = Catalog::deserialize(&buf).unwrap_err();
6391 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("version")));
6392 }
6393
6394 #[test]
6395 fn deserialize_rejects_truncated_file() {
6396 let mut cat = Catalog::new();
6397 cat.create_table(make_users_schema()).unwrap();
6398 let bytes = cat.serialize();
6399 let truncated = &bytes[..bytes.len() - 1];
6401 assert!(matches!(
6402 Catalog::deserialize(truncated),
6403 Err(StorageError::Corrupt(_))
6404 ));
6405 }
6406
6407 #[test]
6408 fn deserialize_rejects_trailing_garbage() {
6409 let cat = Catalog::new();
6410 let mut bytes = cat.serialize();
6411 bytes.push(0xFF);
6412 assert!(matches!(
6413 Catalog::deserialize(&bytes),
6414 Err(StorageError::Corrupt(ref s)) if s.contains("trailing")
6415 ));
6416 }
6417
6418 fn populated_users() -> Catalog {
6421 let mut cat = Catalog::new();
6422 cat.create_table(make_users_schema()).unwrap();
6423 let t = cat.get_mut("users").unwrap();
6424 for (id, name, score) in [
6425 (1, "alice", Some(90.0)),
6426 (2, "bob", None),
6427 (3, "alice", Some(70.0)), ] {
6429 t.insert(Row::new(vec![
6430 Value::Int(id),
6431 Value::Text(name.into()),
6432 score.map_or(Value::Null, Value::Float),
6433 ]))
6434 .unwrap();
6435 }
6436 cat
6437 }
6438
6439 #[test]
6440 fn add_index_builds_from_existing_rows() {
6441 let mut cat = populated_users();
6442 cat.get_mut("users")
6443 .unwrap()
6444 .add_index("by_id".into(), "id")
6445 .unwrap();
6446 let t = cat.get("users").unwrap();
6447 let idx = t.index_on(0).expect("index_on(0)");
6448 assert_eq!(idx.lookup_eq(&IndexKey::Int(2)), &[RowLocator::Hot(1)]);
6449 assert_eq!(idx.lookup_eq(&IndexKey::Int(99)), &[] as &[RowLocator]);
6450 }
6451
6452 #[test]
6453 fn add_index_dup_name_rejected() {
6454 let mut cat = populated_users();
6455 let t = cat.get_mut("users").unwrap();
6456 t.add_index("ix".into(), "id").unwrap();
6457 let err = t.add_index("ix".into(), "name").unwrap_err();
6458 assert!(matches!(err, StorageError::DuplicateIndex { ref name } if name == "ix"));
6459 }
6460
6461 #[test]
6462 fn add_index_unknown_column_rejected() {
6463 let mut cat = populated_users();
6464 let err = cat
6465 .get_mut("users")
6466 .unwrap()
6467 .add_index("ix".into(), "ghost")
6468 .unwrap_err();
6469 assert!(matches!(err, StorageError::ColumnNotFound { ref column } if column == "ghost"));
6470 }
6471
6472 #[test]
6473 fn insert_after_create_index_updates_it() {
6474 let mut cat = populated_users();
6475 let t = cat.get_mut("users").unwrap();
6476 t.add_index("by_name".into(), "name").unwrap();
6477 t.insert(Row::new(vec![
6478 Value::Int(4),
6479 Value::Text("dave".into()),
6480 Value::Null,
6481 ]))
6482 .unwrap();
6483 let idx = t.index_on(1).unwrap();
6484 assert_eq!(
6485 idx.lookup_eq(&IndexKey::Text("dave".into())),
6486 &[RowLocator::Hot(3)]
6487 );
6488 assert_eq!(
6490 idx.lookup_eq(&IndexKey::Text("alice".into())),
6491 &[RowLocator::Hot(0), RowLocator::Hot(2)]
6492 );
6493 }
6494
6495 #[test]
6496 fn null_or_float_values_are_not_indexed() {
6497 let mut cat = populated_users();
6498 let t = cat.get_mut("users").unwrap();
6499 t.add_index("by_score".into(), "score").unwrap();
6500 let idx = t.index_on(2).unwrap();
6501 assert_eq!(idx.lookup_eq(&IndexKey::Int(90)), &[] as &[RowLocator]);
6506 }
6507
6508 #[test]
6511 fn vector_value_data_type_carries_dim() {
6512 let v = Value::Vector(vec![1.0, 2.0, 3.0]);
6513 assert_eq!(
6514 v.data_type(),
6515 Some(DataType::Vector {
6516 dim: 3,
6517 encoding: VecEncoding::F32
6518 })
6519 );
6520 }
6521
6522 #[test]
6523 fn vector_column_insert_matching_dim_ok() {
6524 let mut cat = Catalog::new();
6525 cat.create_table(TableSchema::new(
6526 "emb",
6527 vec![ColumnSchema::new(
6528 "v",
6529 DataType::Vector {
6530 dim: 3,
6531 encoding: VecEncoding::F32,
6532 },
6533 false,
6534 )],
6535 ))
6536 .unwrap();
6537 cat.get_mut("emb")
6538 .unwrap()
6539 .insert(Row::new(vec![Value::Vector(vec![1.0, 2.0, 3.0])]))
6540 .unwrap();
6541 }
6542
6543 #[test]
6544 fn vector_column_insert_dim_mismatch_rejected() {
6545 let mut cat = Catalog::new();
6546 cat.create_table(TableSchema::new(
6547 "emb",
6548 vec![ColumnSchema::new(
6549 "v",
6550 DataType::Vector {
6551 dim: 3,
6552 encoding: VecEncoding::F32,
6553 },
6554 false,
6555 )],
6556 ))
6557 .unwrap();
6558 let err = cat
6559 .get_mut("emb")
6560 .unwrap()
6561 .insert(Row::new(vec![Value::Vector(vec![1.0, 2.0])]))
6562 .unwrap_err();
6563 assert!(matches!(err, StorageError::TypeMismatch { .. }));
6564 }
6565
6566 #[test]
6567 fn vector_value_survives_catalog_round_trip() {
6568 let mut cat = Catalog::new();
6569 cat.create_table(TableSchema::new(
6570 "emb",
6571 vec![
6572 ColumnSchema::new("id", DataType::Int, false),
6573 ColumnSchema::new(
6574 "v",
6575 DataType::Vector {
6576 dim: 4,
6577 encoding: VecEncoding::F32,
6578 },
6579 false,
6580 ),
6581 ],
6582 ))
6583 .unwrap();
6584 cat.get_mut("emb")
6585 .unwrap()
6586 .insert(Row::new(vec![
6587 Value::Int(1),
6588 Value::Vector(vec![0.5, -1.25, 3.0, 7.0]),
6589 ]))
6590 .unwrap();
6591 let restored = Catalog::deserialize(&cat.serialize()).expect("round-trip");
6592 let table = restored.get("emb").unwrap();
6593 assert_eq!(
6594 table.schema().columns[1].ty,
6595 DataType::Vector {
6596 dim: 4,
6597 encoding: VecEncoding::F32
6598 }
6599 );
6600 assert_eq!(
6601 table.rows()[0].values[1],
6602 Value::Vector(vec![0.5, -1.25, 3.0, 7.0])
6603 );
6604 }
6605
6606 #[test]
6607 fn index_survives_serialize_deserialize_round_trip() {
6608 let mut cat = populated_users();
6609 cat.get_mut("users")
6610 .unwrap()
6611 .add_index("by_name".into(), "name")
6612 .unwrap();
6613 let restored = Catalog::deserialize(&cat.serialize()).unwrap();
6614 let idx = restored
6615 .get("users")
6616 .unwrap()
6617 .index_on(1)
6618 .expect("index_on(1) after restore");
6619 assert_eq!(idx.name, "by_name");
6620 assert_eq!(
6622 idx.lookup_eq(&IndexKey::Text("alice".into())),
6623 &[RowLocator::Hot(0), RowLocator::Hot(2)]
6624 );
6625 }
6626
6627 fn bigint_pk_users_schema() -> TableSchema {
6632 TableSchema::new(
6633 "users",
6634 vec![
6635 ColumnSchema::new("id", DataType::BigInt, false),
6636 ColumnSchema::new("name", DataType::Text, false),
6637 ],
6638 )
6639 }
6640
6641 fn make_user_row(id: i64, name: &str) -> Row {
6642 Row::new(vec![Value::BigInt(id), Value::Text(name.into())])
6643 }
6644
6645 #[test]
6646 fn lookup_by_pk_finds_row_via_hot_index() {
6647 let mut cat = Catalog::new();
6648 cat.create_table(bigint_pk_users_schema()).unwrap();
6649 let t = cat.get_mut("users").unwrap();
6650 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
6651 t.insert(make_user_row(id, name)).unwrap();
6652 }
6653 t.add_index("by_id".into(), "id").unwrap();
6654 let got = cat
6656 .lookup_by_pk("users", "by_id", &IndexKey::Int(2))
6657 .unwrap();
6658 assert_eq!(got, make_user_row(2, "bob"));
6659 assert_eq!(cat.cold_segment_count(), 0);
6660 }
6661
6662 #[test]
6663 fn lookup_by_pk_returns_none_when_key_missing() {
6664 let mut cat = Catalog::new();
6665 cat.create_table(bigint_pk_users_schema()).unwrap();
6666 let t = cat.get_mut("users").unwrap();
6667 t.insert(make_user_row(1, "alice")).unwrap();
6668 t.add_index("by_id".into(), "id").unwrap();
6669 assert!(
6670 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(999))
6671 .is_none()
6672 );
6673 assert!(
6675 cat.lookup_by_pk("other_table", "by_id", &IndexKey::Int(1))
6676 .is_none()
6677 );
6678 assert!(
6679 cat.lookup_by_pk("users", "no_such_index", &IndexKey::Int(1))
6680 .is_none()
6681 );
6682 }
6683
6684 #[test]
6685 fn lookup_by_pk_resolves_cold_locator_via_loaded_segment() {
6686 let mut cat = Catalog::new();
6690 cat.create_table(bigint_pk_users_schema()).unwrap();
6691 let t = cat.get_mut("users").unwrap();
6692 t.add_index("by_id".into(), "id").unwrap();
6693 let schema = t.schema.clone();
6694
6695 let cold_rows: Vec<(i64, &str)> =
6696 vec![(100, "ivy"), (200, "joe"), (300, "kim"), (400, "lin")];
6697 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
6698 .iter()
6699 .map(|(id, name)| {
6700 let row = make_user_row(*id, name);
6701 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
6702 })
6703 .collect();
6704 let (seg_bytes, _meta) =
6705 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
6706 let seg_id = cat.load_segment_bytes(seg_bytes).unwrap();
6707 assert_eq!(seg_id, 0);
6708 assert_eq!(cat.cold_segment_count(), 1);
6709
6710 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
6711 .iter()
6712 .map(|(id, _)| {
6713 (
6714 IndexKey::Int(*id),
6715 RowLocator::Cold {
6716 segment_id: seg_id,
6717 page_offset: 0,
6718 },
6719 )
6720 })
6721 .collect();
6722 let registered = cat
6723 .get_mut("users")
6724 .unwrap()
6725 .register_cold_locators("by_id", pairs)
6726 .unwrap();
6727 assert_eq!(registered, 4);
6728
6729 for (id, name) in &cold_rows {
6730 let got = cat
6731 .lookup_by_pk("users", "by_id", &IndexKey::Int(*id))
6732 .unwrap_or_else(|| panic!("cold key {id} not found"));
6733 assert_eq!(got, make_user_row(*id, name));
6734 }
6735 assert!(
6737 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(999))
6738 .is_none()
6739 );
6740 }
6741
6742 #[test]
6743 fn lookup_by_pk_mixes_hot_and_cold_tiers() {
6744 let mut cat = Catalog::new();
6748 cat.create_table(bigint_pk_users_schema()).unwrap();
6749 let t = cat.get_mut("users").unwrap();
6750 for (id, name) in [(1i64, "alice"), (2, "bob")] {
6751 t.insert(make_user_row(id, name)).unwrap();
6752 }
6753 t.add_index("by_id".into(), "id").unwrap();
6754 let schema = t.schema.clone();
6755
6756 let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe")];
6757 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
6758 .iter()
6759 .map(|(id, name)| {
6760 let row = make_user_row(*id, name);
6761 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
6762 })
6763 .collect();
6764 let (seg_bytes, _) =
6765 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
6766 let seg_id = cat.load_segment_bytes(seg_bytes).unwrap();
6767 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
6768 .iter()
6769 .map(|(id, _)| {
6770 (
6771 IndexKey::Int(*id),
6772 RowLocator::Cold {
6773 segment_id: seg_id,
6774 page_offset: 0,
6775 },
6776 )
6777 })
6778 .collect();
6779 cat.get_mut("users")
6780 .unwrap()
6781 .register_cold_locators("by_id", pairs)
6782 .unwrap();
6783
6784 assert_eq!(
6786 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
6787 .unwrap(),
6788 make_user_row(1, "alice")
6789 );
6790 assert_eq!(
6791 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
6792 .unwrap(),
6793 make_user_row(2, "bob")
6794 );
6795 assert_eq!(
6797 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(100))
6798 .unwrap(),
6799 make_user_row(100, "ivy")
6800 );
6801 assert_eq!(
6802 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(200))
6803 .unwrap(),
6804 make_user_row(200, "joe")
6805 );
6806 assert!(
6808 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(50))
6809 .is_none()
6810 );
6811 }
6812
6813 #[test]
6814 fn register_cold_locators_rejects_nsw_index() {
6815 let mut cat = Catalog::new();
6816 cat.create_table(TableSchema::new(
6817 "vecs",
6818 vec![
6819 ColumnSchema::new("id", DataType::Int, false),
6820 ColumnSchema::new(
6821 "v",
6822 DataType::Vector {
6823 dim: 4,
6824 encoding: VecEncoding::F32,
6825 },
6826 false,
6827 ),
6828 ],
6829 ))
6830 .unwrap();
6831 let t = cat.get_mut("vecs").unwrap();
6832 t.insert(Row::new(vec![
6833 Value::Int(1),
6834 Value::Vector(vec![1.0, 0.0, 0.0, 0.0]),
6835 ]))
6836 .unwrap();
6837 t.add_nsw_index("by_v".into(), "v", NSW_DEFAULT_M).unwrap();
6838 let err = t
6839 .register_cold_locators(
6840 "by_v",
6841 vec![(
6842 IndexKey::Int(1),
6843 RowLocator::Cold {
6844 segment_id: 0,
6845 page_offset: 0,
6846 },
6847 )],
6848 )
6849 .unwrap_err();
6850 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("not BTree")));
6853 }
6854
6855 #[test]
6856 fn load_segment_bytes_rejects_garbage() {
6857 let mut cat = Catalog::new();
6858 let err = cat.load_segment_bytes(vec![0u8; 10]).unwrap_err();
6859 assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("segment")));
6860 assert_eq!(cat.cold_segment_count(), 0);
6862 }
6863
6864 #[test]
6865 fn load_segment_bytes_returns_sequential_ids() {
6866 let mut cat = Catalog::new();
6867 cat.create_table(bigint_pk_users_schema()).unwrap();
6868 let schema = cat.get("users").unwrap().schema.clone();
6869 for batch in 0u32..3 {
6870 let rows: Vec<(u64, Vec<u8>)> = (0u64..4)
6871 .map(|i| {
6872 let id = u64::from(batch) * 100 + i;
6873 let row = make_user_row(id.cast_signed(), "x");
6874 (id, encode_row_body_dense(&row, &schema))
6875 })
6876 .collect();
6877 let (bytes, _) = encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
6878 assert_eq!(cat.load_segment_bytes(bytes).unwrap(), batch);
6879 }
6880 assert_eq!(cat.cold_segment_count(), 3);
6881 }
6882
6883 #[test]
6890 fn v8_catalog_decodes_as_all_hot_under_v9_reader() {
6891 let mut cat = populated_users();
6898 cat.get_mut("users")
6899 .unwrap()
6900 .add_index("by_name".into(), "name")
6901 .unwrap();
6902
6903 let v8_bytes = encode_as_v8(&cat);
6908 assert_eq!(v8_bytes[FILE_MAGIC.len()], 8, "version byte must be 8");
6909
6910 let restored = Catalog::deserialize(&v8_bytes).expect("v9 reader accepts v8 stream");
6911 let idx = restored
6912 .get("users")
6913 .unwrap()
6914 .index_on(1)
6915 .expect("index_on(1) after restore");
6916 assert_eq!(
6919 idx.lookup_eq(&IndexKey::Text("alice".into())),
6920 &[RowLocator::Hot(0), RowLocator::Hot(2)]
6921 );
6922 for entry in idx.lookup_eq(&IndexKey::Text("alice".into())) {
6924 assert!(entry.is_hot(), "v8 → v9 read must yield Hot only");
6925 }
6926 }
6927
6928 fn encode_as_v8(cat: &Catalog) -> Vec<u8> {
6933 let mut out = Vec::with_capacity(64);
6934 out.extend_from_slice(FILE_MAGIC);
6935 out.push(8u8);
6936 write_u32(&mut out, u32::try_from(cat.tables.len()).unwrap());
6937 for t in &cat.tables {
6938 write_str(&mut out, &t.schema.name);
6939 write_u16(&mut out, u16::try_from(t.schema.columns.len()).unwrap());
6940 for c in &t.schema.columns {
6941 write_str(&mut out, &c.name);
6942 write_data_type(&mut out, c.ty);
6943 out.push(u8::from(c.nullable));
6944 match &c.default {
6945 None => out.push(0),
6946 Some(v) => {
6947 out.push(1);
6948 write_value(&mut out, v);
6949 }
6950 }
6951 out.push(u8::from(c.auto_increment));
6952 }
6953 write_u32(&mut out, u32::try_from(t.rows.len()).unwrap());
6954 for row in &t.rows {
6955 out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
6956 }
6957 write_u16(&mut out, u16::try_from(t.indices.len()).unwrap());
6958 for idx in &t.indices {
6959 write_str(&mut out, &idx.name);
6960 write_u16(&mut out, u16::try_from(idx.column_position).unwrap());
6961 match &idx.kind {
6962 IndexKind::BTree(_) => out.push(0),
6965 IndexKind::Nsw(g) => {
6966 out.push(1);
6967 write_u16(&mut out, u16::try_from(g.m).unwrap());
6968 write_nsw_graph(&mut out, g);
6969 }
6970 IndexKind::Brin { .. } => panic!(
6973 "v8 catalog writer cannot serialise BRIN — \
6974 tests with BRIN indices must use the current writer"
6975 ),
6976 }
6977 }
6978 }
6979 out
6980 }
6981
6982 #[test]
6988 fn v9_catalog_round_trip_preserves_cold_locators() {
6989 let mut cat = Catalog::new();
6990 cat.create_table(bigint_pk_users_schema()).unwrap();
6991 let t = cat.get_mut("users").unwrap();
6992 for (id, name) in [(1i64, "alice"), (2, "bob")] {
6994 t.insert(make_user_row(id, name)).unwrap();
6995 }
6996 t.add_index("by_id".into(), "id").unwrap();
6997 let schema = t.schema.clone();
6998
6999 let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe"), (300, "kim")];
7001 let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
7002 .iter()
7003 .map(|(id, name)| {
7004 let row = make_user_row(*id, name);
7005 ((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
7006 })
7007 .collect();
7008 let (seg_bytes, _) =
7009 encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
7010 let seg_id = cat.load_segment_bytes(seg_bytes.clone()).unwrap();
7011 let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
7012 .iter()
7013 .map(|(id, _)| {
7014 (
7015 IndexKey::Int(*id),
7016 RowLocator::Cold {
7017 segment_id: seg_id,
7018 page_offset: 0,
7019 },
7020 )
7021 })
7022 .collect();
7023 cat.get_mut("users")
7024 .unwrap()
7025 .register_cold_locators("by_id", pairs)
7026 .unwrap();
7027
7028 let bytes = cat.serialize();
7030 assert_eq!(bytes[FILE_MAGIC.len()], FILE_VERSION);
7031 let mut restored = Catalog::deserialize(&bytes).expect("v9 round-trip parses");
7032
7033 let restored_seg_id = restored.load_segment_bytes(seg_bytes).unwrap();
7040 assert_eq!(restored_seg_id, seg_id);
7041
7042 let idx = restored.get("users").unwrap().index_on(0).unwrap();
7043 assert_eq!(idx.lookup_eq(&IndexKey::Int(1)), &[RowLocator::Hot(0)]);
7045 assert_eq!(idx.lookup_eq(&IndexKey::Int(2)), &[RowLocator::Hot(1)]);
7046 for (id, _) in &cold_rows {
7048 assert_eq!(
7049 idx.lookup_eq(&IndexKey::Int(*id)),
7050 &[RowLocator::Cold {
7051 segment_id: seg_id,
7052 page_offset: 0,
7053 }]
7054 );
7055 }
7056 assert_eq!(
7058 restored
7059 .lookup_by_pk("users", "by_id", &IndexKey::Int(2))
7060 .unwrap(),
7061 make_user_row(2, "bob")
7062 );
7063 for (id, name) in &cold_rows {
7064 assert_eq!(
7065 restored
7066 .lookup_by_pk("users", "by_id", &IndexKey::Int(*id))
7067 .unwrap(),
7068 make_user_row(*id, name)
7069 );
7070 }
7071 }
7072
7073 #[test]
7080 fn row_body_encoded_len_matches_actual_encode_for_all_types() {
7081 let schema = TableSchema::new(
7082 "wide",
7083 vec![
7084 ColumnSchema::new("a", DataType::SmallInt, true),
7085 ColumnSchema::new("b", DataType::Int, false),
7086 ColumnSchema::new("c", DataType::BigInt, false),
7087 ColumnSchema::new("d", DataType::Float, false),
7088 ColumnSchema::new("e", DataType::Bool, false),
7089 ColumnSchema::new("f", DataType::Text, false),
7090 ColumnSchema::new(
7091 "g",
7092 DataType::Vector {
7093 dim: 3,
7094 encoding: VecEncoding::F32,
7095 },
7096 false,
7097 ),
7098 ColumnSchema::new(
7099 "h",
7100 DataType::Numeric {
7101 precision: 18,
7102 scale: 2,
7103 },
7104 false,
7105 ),
7106 ColumnSchema::new("i", DataType::Date, false),
7107 ColumnSchema::new("j", DataType::Timestamp, false),
7108 ],
7109 );
7110 let cases: &[Row] = &[
7111 Row::new(vec![
7112 Value::SmallInt(7),
7113 Value::Int(42),
7114 Value::BigInt(1_000_000),
7115 Value::Float(1.5),
7116 Value::Bool(true),
7117 Value::Text("hello".into()),
7118 Value::Vector(vec![1.0, 2.0, 3.0]),
7119 Value::Numeric {
7120 scaled: 12345,
7121 scale: 2,
7122 },
7123 Value::Date(20_000),
7124 Value::Timestamp(1_700_000_000_000_000),
7125 ]),
7126 Row::new(vec![
7128 Value::Null,
7129 Value::Int(0),
7130 Value::BigInt(0),
7131 Value::Float(0.0),
7132 Value::Bool(false),
7133 Value::Text(String::new()),
7134 Value::Vector(vec![]),
7135 Value::Numeric {
7136 scaled: 0,
7137 scale: 2,
7138 },
7139 Value::Date(0),
7140 Value::Timestamp(0),
7141 ]),
7142 Row::new(vec![
7143 Value::SmallInt(-1),
7144 Value::Int(-1),
7145 Value::BigInt(-1),
7146 Value::Float(-0.5),
7147 Value::Bool(true),
7148 Value::Text("a much longer payload here".into()),
7149 Value::Vector(vec![0.1, 0.2, 0.3]),
7150 Value::Numeric {
7151 scaled: -999_999_999,
7152 scale: 2,
7153 },
7154 Value::Date(-1),
7155 Value::Timestamp(-1),
7156 ]),
7157 ];
7158 for row in cases {
7159 let actual = encode_row_body_dense(row, &schema).len();
7160 let fast = row_body_encoded_len(row, &schema);
7161 assert_eq!(actual, fast, "row {row:?}");
7162 }
7163 }
7164
7165 #[test]
7166 fn hot_bytes_grows_on_insert_and_matches_encoded_sum() {
7167 let mut cat = Catalog::new();
7168 cat.create_table(bigint_pk_users_schema()).unwrap();
7169 let t = cat.get_mut("users").unwrap();
7170 assert_eq!(t.hot_bytes(), 0);
7171 let mut expected: u64 = 0;
7172 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
7173 let row = make_user_row(id, name);
7174 expected += encode_row_body_dense(&row, &t.schema).len() as u64;
7175 t.insert(row).unwrap();
7176 }
7177 assert_eq!(t.hot_bytes(), expected);
7178 assert_eq!(cat.hot_tier_bytes(), expected);
7179 }
7180
7181 #[test]
7182 fn hot_bytes_shrinks_on_delete() {
7183 let mut cat = Catalog::new();
7184 cat.create_table(bigint_pk_users_schema()).unwrap();
7185 let t = cat.get_mut("users").unwrap();
7186 for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
7187 t.insert(make_user_row(id, name)).unwrap();
7188 }
7189 let before = t.hot_bytes();
7190 let bob_row = make_user_row(2, "bob");
7192 let bob_bytes = encode_row_body_dense(&bob_row, &t.schema).len() as u64;
7193 let removed = t.delete_rows(&[1]);
7194 assert_eq!(removed, 1);
7195 assert_eq!(t.hot_bytes(), before - bob_bytes);
7196 }
7197
7198 #[test]
7199 fn hot_bytes_diffs_on_update_for_variable_width_columns() {
7200 let mut cat = Catalog::new();
7201 cat.create_table(bigint_pk_users_schema()).unwrap();
7202 let t = cat.get_mut("users").unwrap();
7203 t.insert(make_user_row(1, "alice")).unwrap();
7204 let after_insert = t.hot_bytes();
7205 let new_row = make_user_row(1, "alice-the-longer-name");
7208 let old_len = encode_row_body_dense(&make_user_row(1, "alice"), &t.schema).len() as u64;
7209 let new_len = encode_row_body_dense(&new_row, &t.schema).len() as u64;
7210 t.update_row(0, new_row.values).unwrap();
7211 assert_eq!(t.hot_bytes(), after_insert - old_len + new_len);
7212 assert!(t.hot_bytes() > after_insert, "longer text grew the counter");
7213 }
7214
7215 #[test]
7216 fn hot_bytes_round_trips_through_serialize_deserialize() {
7217 let mut cat = Catalog::new();
7218 cat.create_table(bigint_pk_users_schema()).unwrap();
7219 let t = cat.get_mut("users").unwrap();
7220 for i in 0..10 {
7221 t.insert(make_user_row(i, &alloc::format!("name-{i}")))
7222 .unwrap();
7223 }
7224 let pre = cat.hot_tier_bytes();
7225 let restored = Catalog::deserialize(&cat.serialize()).unwrap();
7226 assert_eq!(restored.hot_tier_bytes(), pre);
7227 assert_eq!(restored.get("users").unwrap().hot_bytes(), pre);
7228 }
7229
7230 #[test]
7237 fn freeze_oldest_to_cold_moves_rows_and_keeps_lookups_working() {
7238 let mut cat = Catalog::new();
7239 cat.create_table(bigint_pk_users_schema()).unwrap();
7240 let t = cat.get_mut("users").unwrap();
7241 for id in 0..10i64 {
7242 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7243 .unwrap();
7244 }
7245 t.add_index("by_id".into(), "id").unwrap();
7246 let total_bytes_before = t.hot_bytes();
7247
7248 let report = cat
7249 .freeze_oldest_to_cold("users", "by_id", 6)
7250 .expect("freeze succeeds");
7251 assert_eq!(report.frozen_rows, 6);
7252 assert_eq!(report.segment_id, 0);
7253 assert!(report.bytes_freed > 0);
7254 assert!(!report.segment_bytes.is_empty());
7255
7256 let t = cat.get("users").unwrap();
7257 assert_eq!(t.row_count(), 4, "4 hot rows remain (10 - 6 frozen)");
7258 assert_eq!(cat.cold_segment_count(), 1);
7259 assert_eq!(
7261 t.hot_bytes(),
7262 total_bytes_before - report.bytes_freed,
7263 "hot_bytes accounting matches FreezeReport"
7264 );
7265
7266 for id in 0..10i64 {
7269 let got = cat
7270 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
7271 .unwrap_or_else(|| panic!("PK {id} disappeared after freeze"));
7272 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
7273 }
7274 }
7275
7276 #[test]
7281 fn freeze_twice_preserves_prior_cold_locators() {
7282 let mut cat = Catalog::new();
7283 cat.create_table(bigint_pk_users_schema()).unwrap();
7284 let t = cat.get_mut("users").unwrap();
7285 for id in 0..12i64 {
7286 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7287 .unwrap();
7288 }
7289 t.add_index("by_id".into(), "id").unwrap();
7290
7291 cat.freeze_oldest_to_cold("users", "by_id", 4)
7292 .expect("first freeze ok");
7293 cat.freeze_oldest_to_cold("users", "by_id", 4)
7294 .expect("second freeze ok");
7295
7296 assert_eq!(cat.get("users").unwrap().row_count(), 4);
7297 assert_eq!(cat.cold_segment_count(), 2);
7298 for id in 0..12i64 {
7301 let got = cat
7302 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
7303 .unwrap_or_else(|| panic!("PK {id} not resolvable after two freezes"));
7304 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
7305 }
7306 }
7307
7308 #[test]
7311 fn freeze_oldest_to_cold_rejects_invalid_input() {
7312 let mut cat = Catalog::new();
7313 cat.create_table(bigint_pk_users_schema()).unwrap();
7314 let t = cat.get_mut("users").unwrap();
7315 for id in 0..3i64 {
7316 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7317 .unwrap();
7318 }
7319 t.add_index("by_id".into(), "id").unwrap();
7320
7321 assert!(matches!(
7323 cat.freeze_oldest_to_cold("users", "by_id", 0),
7324 Err(StorageError::Corrupt(_))
7325 ));
7326 assert!(matches!(
7328 cat.freeze_oldest_to_cold("missing", "by_id", 1),
7329 Err(StorageError::Corrupt(_))
7330 ));
7331 assert!(matches!(
7333 cat.freeze_oldest_to_cold("users", "no_such_index", 1),
7334 Err(StorageError::Corrupt(_))
7335 ));
7336 assert!(matches!(
7338 cat.freeze_oldest_to_cold("users", "by_id", 999),
7339 Err(StorageError::Corrupt(_))
7340 ));
7341 assert_eq!(cat.get("users").unwrap().row_count(), 3);
7343 assert_eq!(cat.cold_segment_count(), 0);
7344 }
7345
7346 #[test]
7349 fn freeze_oldest_to_cold_rejects_non_integer_pk() {
7350 let mut cat = Catalog::new();
7351 cat.create_table(TableSchema::new(
7352 "by_name",
7353 vec![
7354 ColumnSchema::new("name", DataType::Text, false),
7355 ColumnSchema::new("payload", DataType::BigInt, false),
7356 ],
7357 ))
7358 .unwrap();
7359 let t = cat.get_mut("by_name").unwrap();
7360 t.insert(Row::new(vec![Value::Text("a".into()), Value::BigInt(1)]))
7361 .unwrap();
7362 t.add_index("by_n".into(), "name").unwrap();
7363 let err = cat
7364 .freeze_oldest_to_cold("by_name", "by_n", 1)
7365 .expect_err("non-integer PK rejected");
7366 match err {
7367 StorageError::Corrupt(s) => assert!(
7368 s.contains("non-integer"),
7369 "error message names the constraint: {s}"
7370 ),
7371 other => panic!("expected Corrupt, got {other:?}"),
7372 }
7373 assert_eq!(cat.get("by_name").unwrap().row_count(), 1);
7375 assert_eq!(cat.cold_segment_count(), 0);
7376 }
7377
7378 #[test]
7383 fn freeze_keeps_remaining_hot_rows_addressable_via_secondary_index() {
7384 let mut cat = Catalog::new();
7385 cat.create_table(bigint_pk_users_schema()).unwrap();
7386 let t = cat.get_mut("users").unwrap();
7387 for id in 0..6i64 {
7388 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7389 .unwrap();
7390 }
7391 t.add_index("by_id".into(), "id").unwrap();
7392 t.add_index("by_name".into(), "name").unwrap();
7393
7394 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7395
7396 let idx = cat.get("users").unwrap().index_on(1).unwrap();
7400 let got = idx.lookup_eq(&IndexKey::Text("u-4".into()));
7401 assert_eq!(got.len(), 1);
7402 assert!(got[0].is_hot(), "kept-hot rows still surface as Hot");
7403 match got[0] {
7404 RowLocator::Hot(i) => {
7405 assert_eq!(i, 1);
7408 }
7409 RowLocator::Cold { .. } => unreachable!(),
7410 }
7411 }
7412
7413 #[test]
7421 fn promote_cold_row_pulls_frozen_row_back_to_hot_tier() {
7422 let mut cat = Catalog::new();
7423 cat.create_table(bigint_pk_users_schema()).unwrap();
7424 let t = cat.get_mut("users").unwrap();
7425 for id in 0..6i64 {
7426 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7427 .unwrap();
7428 }
7429 t.add_index("by_id".into(), "id").unwrap();
7430 cat.freeze_oldest_to_cold("users", "by_id", 4).unwrap();
7433 let hot_bytes_before = cat.get("users").unwrap().hot_bytes();
7434
7435 let new_idx = cat
7437 .promote_cold_row("users", "by_id", &IndexKey::Int(2))
7438 .expect("promote ok")
7439 .expect("PK 2 was cold");
7440 assert_eq!(
7441 new_idx, 2,
7442 "promoted row appended after the 2 surviving hot rows"
7443 );
7444
7445 let t = cat.get("users").unwrap();
7446 assert_eq!(t.row_count(), 3, "hot tier grew from 2 to 3");
7447 let row = make_user_row(2, "u-2");
7449 let row_len = encode_row_body_dense(&row, &t.schema).len() as u64;
7450 assert_eq!(t.hot_bytes(), hot_bytes_before + row_len);
7451
7452 let entries = t.index_on(0).unwrap().lookup_eq(&IndexKey::Int(2));
7455 assert_eq!(entries.len(), 1, "exactly one locator per key");
7456 assert!(entries[0].is_hot(), "promote retired the Cold locator");
7457 assert_eq!(
7459 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
7460 .unwrap(),
7461 row
7462 );
7463 assert_eq!(
7466 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(0))
7467 .unwrap(),
7468 make_user_row(0, "u-0")
7469 );
7470 }
7471
7472 #[test]
7476 fn promote_cold_row_returns_none_when_key_is_not_cold() {
7477 let mut cat = Catalog::new();
7478 cat.create_table(bigint_pk_users_schema()).unwrap();
7479 let t = cat.get_mut("users").unwrap();
7480 t.insert(make_user_row(7, "alice")).unwrap();
7481 t.add_index("by_id".into(), "id").unwrap();
7482
7483 assert!(
7485 cat.promote_cold_row("users", "by_id", &IndexKey::Int(7))
7486 .unwrap()
7487 .is_none()
7488 );
7489 assert!(
7491 cat.promote_cold_row("users", "by_id", &IndexKey::Int(99))
7492 .unwrap()
7493 .is_none()
7494 );
7495 assert_eq!(cat.get("users").unwrap().row_count(), 1);
7497 assert_eq!(cat.cold_segment_count(), 0);
7498 }
7499
7500 #[test]
7505 fn shadow_cold_row_removes_cold_locators_and_drops_lookup() {
7506 let mut cat = Catalog::new();
7507 cat.create_table(bigint_pk_users_schema()).unwrap();
7508 let t = cat.get_mut("users").unwrap();
7509 for id in 0..5i64 {
7510 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7511 .unwrap();
7512 }
7513 t.add_index("by_id".into(), "id").unwrap();
7514 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7515
7516 assert!(
7518 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
7519 .is_some(),
7520 "frozen PK resolves before shadow"
7521 );
7522 let removed = cat
7523 .shadow_cold_row("users", "by_id", &IndexKey::Int(1))
7524 .unwrap();
7525 assert_eq!(removed, 1, "exactly one cold locator retired");
7526
7527 assert!(
7530 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
7531 .is_none(),
7532 "shadowed key no longer resolves"
7533 );
7534 assert_eq!(
7536 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(0))
7537 .unwrap(),
7538 make_user_row(0, "u-0")
7539 );
7540 assert_eq!(
7541 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
7542 .unwrap(),
7543 make_user_row(2, "u-2")
7544 );
7545 }
7546
7547 #[test]
7552 fn shadow_cold_row_returns_zero_when_key_is_not_cold() {
7553 let mut cat = Catalog::new();
7554 cat.create_table(bigint_pk_users_schema()).unwrap();
7555 let t = cat.get_mut("users").unwrap();
7556 t.insert(make_user_row(1, "alice")).unwrap();
7557 t.add_index("by_id".into(), "id").unwrap();
7558 assert_eq!(
7559 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
7560 .unwrap(),
7561 0,
7562 "hot-only key drops no cold locators"
7563 );
7564 assert_eq!(
7565 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(999))
7566 .unwrap(),
7567 0,
7568 "absent key drops no cold locators"
7569 );
7570 assert_eq!(cat.get("users").unwrap().row_count(), 1);
7571 }
7572
7573 #[test]
7575 fn promote_and_shadow_reject_invalid_inputs() {
7576 let mut cat = Catalog::new();
7577 cat.create_table(bigint_pk_users_schema()).unwrap();
7578 let t = cat.get_mut("users").unwrap();
7579 t.insert(make_user_row(1, "alice")).unwrap();
7580 t.add_index("by_id".into(), "id").unwrap();
7581
7582 assert!(matches!(
7584 cat.promote_cold_row("missing", "by_id", &IndexKey::Int(1)),
7585 Err(StorageError::Corrupt(_))
7586 ));
7587 assert!(matches!(
7588 cat.shadow_cold_row("missing", "by_id", &IndexKey::Int(1)),
7589 Err(StorageError::Corrupt(_))
7590 ));
7591 assert!(matches!(
7593 cat.promote_cold_row("users", "no_such_index", &IndexKey::Int(1)),
7594 Err(StorageError::Corrupt(_))
7595 ));
7596 assert!(matches!(
7597 cat.shadow_cold_row("users", "no_such_index", &IndexKey::Int(1)),
7598 Err(StorageError::Corrupt(_))
7599 ));
7600 }
7601
7602 #[test]
7609 fn commit_freeze_slices_single_slice_matches_freeze_oldest() {
7610 let mut a = Catalog::new();
7611 let mut b = Catalog::new();
7612 for cat in [&mut a, &mut b] {
7613 cat.create_table(bigint_pk_users_schema()).unwrap();
7614 let t = cat.get_mut("users").unwrap();
7615 for id in 0..10i64 {
7616 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7617 .unwrap();
7618 }
7619 t.add_index("by_id".into(), "id").unwrap();
7620 }
7621 let single = a.freeze_oldest_to_cold("users", "by_id", 6).unwrap();
7622 let slice = b
7623 .prepare_freeze_slice("users", "by_id", 0..6)
7624 .expect("prepare");
7625 let parallel = b
7626 .commit_freeze_slices("users", "by_id", alloc::vec![slice])
7627 .expect("commit");
7628 assert_eq!(single.segment_id, parallel.segment_id);
7629 assert_eq!(single.frozen_rows, parallel.frozen_rows);
7630 assert_eq!(single.bytes_freed, parallel.bytes_freed);
7631 assert_eq!(single.segment_bytes, parallel.segment_bytes);
7632 for id in 0..10i64 {
7634 assert_eq!(
7635 a.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
7636 b.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
7637 "PK {id} differs after single vs slice freeze"
7638 );
7639 }
7640 }
7641
7642 #[test]
7647 fn commit_freeze_slices_two_slices_match_single_slice() {
7648 let mut a = Catalog::new();
7649 let mut b = Catalog::new();
7650 for cat in [&mut a, &mut b] {
7651 cat.create_table(bigint_pk_users_schema()).unwrap();
7652 let t = cat.get_mut("users").unwrap();
7653 for id in [3, 7, 1, 9, 5, 0, 8, 4, 2, 6].iter().copied() {
7656 t.insert(make_user_row(id as i64, &alloc::format!("u-{id}")))
7657 .unwrap();
7658 }
7659 t.add_index("by_id".into(), "id").unwrap();
7660 }
7661 let single = a
7662 .prepare_freeze_slice("users", "by_id", 0..8)
7663 .expect("prepare");
7664 let one = a
7665 .commit_freeze_slices("users", "by_id", alloc::vec![single])
7666 .expect("commit one");
7667 let s1 = b
7668 .prepare_freeze_slice("users", "by_id", 0..4)
7669 .expect("prepare s1");
7670 let s2 = b
7671 .prepare_freeze_slice("users", "by_id", 4..8)
7672 .expect("prepare s2");
7673 let two = b
7674 .commit_freeze_slices("users", "by_id", alloc::vec![s1, s2])
7675 .expect("commit two");
7676 assert_eq!(one.segment_bytes, two.segment_bytes);
7677 assert_eq!(one.frozen_rows, two.frozen_rows);
7678 for id in 0..10i64 {
7681 assert_eq!(
7682 a.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
7683 b.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
7684 "PK {id} differs after one-slice vs two-slice freeze"
7685 );
7686 }
7687 }
7688
7689 #[test]
7691 fn commit_freeze_slices_rejects_gap() {
7692 let mut cat = Catalog::new();
7693 cat.create_table(bigint_pk_users_schema()).unwrap();
7694 let t = cat.get_mut("users").unwrap();
7695 for id in 0..6i64 {
7696 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7697 .unwrap();
7698 }
7699 t.add_index("by_id".into(), "id").unwrap();
7700 let s1 = cat.prepare_freeze_slice("users", "by_id", 0..2).unwrap();
7701 let s2 = cat.prepare_freeze_slice("users", "by_id", 3..5).unwrap();
7702 assert!(matches!(
7703 cat.commit_freeze_slices("users", "by_id", alloc::vec![s1, s2]),
7704 Err(StorageError::Corrupt(_))
7705 ));
7706 assert_eq!(cat.cold_segment_count(), 0);
7708 assert_eq!(cat.get("users").unwrap().row_count(), 6);
7709 }
7710
7711 #[test]
7713 fn commit_freeze_slices_empty_is_noop() {
7714 let mut cat = Catalog::new();
7715 cat.create_table(bigint_pk_users_schema()).unwrap();
7716 let t = cat.get_mut("users").unwrap();
7717 for id in 0..3i64 {
7718 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7719 .unwrap();
7720 }
7721 t.add_index("by_id".into(), "id").unwrap();
7722 let report = cat
7723 .commit_freeze_slices("users", "by_id", Vec::new())
7724 .unwrap();
7725 assert_eq!(report.frozen_rows, 0);
7726 assert_eq!(cat.cold_segment_count(), 0);
7727 assert_eq!(cat.get("users").unwrap().row_count(), 3);
7728 }
7729
7730 #[test]
7737 fn compact_merges_small_segments_storage_unit() {
7738 let mut cat = Catalog::new();
7739 cat.create_table(bigint_pk_users_schema()).unwrap();
7740 let t = cat.get_mut("users").unwrap();
7741 for id in 0..8i64 {
7742 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7743 .unwrap();
7744 }
7745 t.add_index("by_id".into(), "id").unwrap();
7746 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7748 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7749 assert_eq!(cat.cold_segment_count(), 2);
7750 assert_eq!(cat.cold_segment_slot_count(), 2);
7751
7752 let max_seg_bytes = cat
7755 .cold_segment_ids_global()
7756 .iter()
7757 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
7758 .max()
7759 .unwrap();
7760 let target = max_seg_bytes + 1;
7761
7762 let report = cat
7763 .compact_cold_segments("users", "by_id", target)
7764 .expect("compact succeeds");
7765 assert_eq!(report.sources.len(), 2);
7766 let merged_id = report.merged_segment_id.expect("merge happened");
7767 assert_eq!(report.merged_rows, 6);
7768 assert_eq!(report.deleted_rows_pruned, 0);
7769 assert!(!report.merged_segment_bytes.is_empty());
7770
7771 assert_eq!(cat.cold_segment_count(), 1);
7774 assert_eq!(cat.cold_segment_slot_count(), 3);
7775 assert_eq!(cat.cold_segment_ids_global(), alloc::vec![merged_id]);
7776
7777 for id in 0..8i64 {
7780 let got = cat
7781 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
7782 .unwrap_or_else(|| panic!("PK {id} lost after compaction"));
7783 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
7784 }
7785 }
7786
7787 #[test]
7791 fn compact_drops_shadowed_cold_rows() {
7792 let mut cat = Catalog::new();
7793 cat.create_table(bigint_pk_users_schema()).unwrap();
7794 let t = cat.get_mut("users").unwrap();
7795 for id in 0..6i64 {
7796 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7797 .unwrap();
7798 }
7799 t.add_index("by_id".into(), "id").unwrap();
7800 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7801 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7802 assert_eq!(
7804 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
7805 .unwrap(),
7806 1
7807 );
7808 assert_eq!(
7809 cat.shadow_cold_row("users", "by_id", &IndexKey::Int(4))
7810 .unwrap(),
7811 1
7812 );
7813
7814 let max_seg_bytes = cat
7815 .cold_segment_ids_global()
7816 .iter()
7817 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
7818 .max()
7819 .unwrap();
7820 let report = cat
7821 .compact_cold_segments("users", "by_id", max_seg_bytes + 1)
7822 .expect("compact succeeds");
7823 assert_eq!(report.sources.len(), 2);
7824 assert_eq!(report.merged_rows, 4, "6 frozen − 2 shadowed = 4 live");
7825 assert_eq!(report.deleted_rows_pruned, 2);
7826
7827 for shadowed in [1i64, 4i64] {
7829 assert!(
7830 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(shadowed))
7831 .is_none(),
7832 "shadowed PK {shadowed} must remain invisible after compact"
7833 );
7834 }
7835 for live in [0i64, 2, 3, 5] {
7837 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(live))
7838 .unwrap_or_else(|| panic!("live PK {live} lost after compact"));
7839 }
7840 }
7841
7842 #[test]
7845 fn compact_is_noop_below_two_candidates() {
7846 let mut cat = Catalog::new();
7847 cat.create_table(bigint_pk_users_schema()).unwrap();
7848 let t = cat.get_mut("users").unwrap();
7849 for id in 0..6i64 {
7850 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7851 .unwrap();
7852 }
7853 t.add_index("by_id".into(), "id").unwrap();
7854 let report = cat
7856 .compact_cold_segments("users", "by_id", 1 << 30)
7857 .expect("noop ok");
7858 assert!(report.merged_segment_id.is_none());
7859 assert!(report.sources.is_empty());
7860
7861 cat.freeze_oldest_to_cold("users", "by_id", 4).unwrap();
7863 let report = cat
7864 .compact_cold_segments("users", "by_id", 1 << 30)
7865 .expect("noop ok");
7866 assert!(report.merged_segment_id.is_none());
7867 assert_eq!(cat.cold_segment_count(), 1);
7868
7869 let report = cat
7872 .compact_cold_segments("users", "by_id", 1)
7873 .expect("noop ok");
7874 assert!(report.merged_segment_id.is_none());
7875 assert_eq!(cat.cold_segment_count(), 1);
7876 }
7877
7878 #[test]
7886 fn compact_swap_survives_catalog_roundtrip_via_load_at() {
7887 let mut cat = Catalog::new();
7888 cat.create_table(bigint_pk_users_schema()).unwrap();
7889 let t = cat.get_mut("users").unwrap();
7890 for id in 0..6i64 {
7891 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7892 .unwrap();
7893 }
7894 t.add_index("by_id".into(), "id").unwrap();
7895 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7896 cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
7897 let max_seg_bytes = cat
7898 .cold_segment_ids_global()
7899 .iter()
7900 .map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
7901 .max()
7902 .unwrap();
7903 let report = cat
7904 .compact_cold_segments("users", "by_id", max_seg_bytes + 1)
7905 .expect("compact ok");
7906 let merged_id = report.merged_segment_id.unwrap();
7907
7908 let cat_bytes = cat.serialize();
7913 let merged_bytes = report.merged_segment_bytes.clone();
7914
7915 let mut restored = Catalog::deserialize(&cat_bytes).expect("deserialize ok");
7916 restored
7917 .load_segment_bytes_at(merged_id, merged_bytes)
7918 .expect("reload merged ok");
7919
7920 for id in 0..6i64 {
7922 let got = restored
7923 .lookup_by_pk("users", "by_id", &IndexKey::Int(id))
7924 .unwrap_or_else(|| panic!("PK {id} lost across roundtrip"));
7925 assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
7926 }
7927 assert_eq!(restored.cold_segment_count(), 1);
7930 }
7931
7932 #[test]
7935 fn load_segment_bytes_at_pads_and_rejects_collision() {
7936 let mut cat = Catalog::new();
7937 cat.create_table(bigint_pk_users_schema()).unwrap();
7938 let t = cat.get_mut("users").unwrap();
7939 for id in 0..4i64 {
7940 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7941 .unwrap();
7942 }
7943 t.add_index("by_id".into(), "id").unwrap();
7944 let report = cat.freeze_oldest_to_cold("users", "by_id", 2).unwrap();
7945 let bytes_seg0 = report.segment_bytes.clone();
7946
7947 cat.load_segment_bytes_at(5, bytes_seg0.clone())
7951 .expect("pad + load ok");
7952 assert_eq!(cat.cold_segment_slot_count(), 6);
7953 assert_eq!(cat.cold_segment_count(), 2);
7954
7955 assert!(matches!(
7957 cat.load_segment_bytes_at(5, bytes_seg0.clone()),
7958 Err(StorageError::Corrupt(_))
7959 ));
7960 assert!(matches!(
7962 cat.load_segment_bytes_at(0, bytes_seg0),
7963 Err(StorageError::Corrupt(_))
7964 ));
7965 }
7966
7967 #[test]
7971 fn promote_then_refreeze_does_not_leave_orphan_locators() {
7972 let mut cat = Catalog::new();
7973 cat.create_table(bigint_pk_users_schema()).unwrap();
7974 let t = cat.get_mut("users").unwrap();
7975 for id in 0..4i64 {
7976 t.insert(make_user_row(id, &alloc::format!("u-{id}")))
7977 .unwrap();
7978 }
7979 t.add_index("by_id".into(), "id").unwrap();
7980
7981 cat.freeze_oldest_to_cold("users", "by_id", 2).unwrap();
7983 let promoted = cat
7984 .promote_cold_row("users", "by_id", &IndexKey::Int(0))
7985 .unwrap();
7986 assert!(promoted.is_some());
7987 let entries_after_promote = cat
7988 .get("users")
7989 .unwrap()
7990 .index_on(0)
7991 .unwrap()
7992 .lookup_eq(&IndexKey::Int(0))
7993 .to_vec();
7994 assert_eq!(entries_after_promote.len(), 1);
7995 assert!(entries_after_promote[0].is_hot());
7996
7997 for id in [2i64, 3] {
8004 assert_eq!(
8005 cat.lookup_by_pk("users", "by_id", &IndexKey::Int(id))
8006 .unwrap(),
8007 make_user_row(id, &alloc::format!("u-{id}"))
8008 );
8009 }
8010 }
8011}