use super::*;
use crate::nsw::*;
use alloc::string::ToString;
use alloc::vec;
#[test]
fn redo_apply_matches_direct_position_ops() {
fn fresh() -> Catalog {
let mut c = Catalog::new();
c.create_table(TableSchema::new(
"t",
vec![
ColumnSchema::new("id", DataType::BigInt, false),
ColumnSchema::new("v", DataType::Text, true),
],
))
.unwrap();
c
}
let rows = [
Row::new(alloc::vec![Value::BigInt(1), Value::Text("a".to_string())]),
Row::new(alloc::vec![Value::BigInt(2), Value::Text("b".to_string())]),
Row::new(alloc::vec![Value::BigInt(3), Value::Text("c".to_string())]),
Row::new(alloc::vec![Value::BigInt(4), Value::Text("d".to_string())]),
];
let upd = alloc::vec![Value::BigInt(2), Value::Text("B".to_string())];
let mut c1 = fresh();
{
let t = c1.get_mut("t").unwrap();
for r in &rows {
t.insert(r.clone()).unwrap();
}
t.update_row(1, upd.clone()).unwrap(); t.delete_rows(&[0, 2]); }
let mut log = alloc::vec::Vec::new();
for r in &rows {
log.push(RowChange::Insert {
table: "t".to_string(),
row: r.clone(),
});
}
log.push(RowChange::Update {
table: "t".to_string(),
pos: 1,
new_row: upd,
});
log.push(RowChange::Delete {
table: "t".to_string(),
positions: vec![0, 2],
});
let mut c2 = fresh();
c2.apply_redo(&log).unwrap();
assert_eq!(
c1.serialize(),
c2.serialize(),
"redo apply diverged from direct position ops"
);
let mut c3 = fresh();
assert!(
c3.apply_redo(&[RowChange::Insert {
table: "nope".to_string(),
row: rows[0].clone(),
}])
.is_err()
);
}
#[test]
fn redo_capture_replays_to_identical_state() {
fn fresh() -> Catalog {
let mut c = Catalog::new();
c.create_table(TableSchema::new(
"t",
vec![
ColumnSchema::new("id", DataType::BigInt, false),
ColumnSchema::new("v", DataType::Text, true),
],
))
.unwrap();
c
}
let mk =
|id: i64, v: &str| Row::new(alloc::vec![Value::BigInt(id), Value::Text(v.to_string())]);
let mut c1 = fresh();
{
let t = c1.get_mut("t").unwrap();
t.enable_redo();
t.insert(mk(1, "a")).unwrap();
t.insert(mk(2, "b")).unwrap();
t.insert(mk(3, "c")).unwrap();
t.update_row(
1,
alloc::vec![Value::BigInt(2), Value::Text("B".to_string())],
)
.unwrap();
t.delete_rows(&[0]); t.delete_rows(&[99]); }
let log = c1.get_mut("t").unwrap().take_redo();
assert_eq!(log.len(), 5, "captured log: {log:?}");
let mut c2 = fresh();
c2.apply_redo(&log).unwrap();
assert_eq!(
c1.serialize(),
c2.serialize(),
"replayed capture diverged from execution"
);
assert!(c1.get_mut("t").unwrap().take_redo().is_empty());
}
#[test]
fn catalog_drain_redo_replays_multi_table() {
fn fresh() -> Catalog {
let mut c = Catalog::new();
for name in ["a", "b"] {
c.create_table(TableSchema::new(
name,
vec![
ColumnSchema::new("id", DataType::BigInt, false),
ColumnSchema::new("v", DataType::Text, true),
],
))
.unwrap();
}
c
}
let mk =
|id: i64, v: &str| Row::new(alloc::vec![Value::BigInt(id), Value::Text(v.to_string())]);
let mut c1 = fresh();
c1.enable_redo_all();
c1.get_mut("a").unwrap().insert(mk(1, "a1")).unwrap();
c1.get_mut("b").unwrap().insert(mk(2, "b1")).unwrap();
c1.get_mut("a").unwrap().insert(mk(3, "a2")).unwrap();
c1.get_mut("a")
.unwrap()
.update_row(
0,
alloc::vec![Value::BigInt(1), Value::Text("A1".to_string())],
)
.unwrap();
c1.get_mut("b").unwrap().delete_rows(&[0]);
let log = c1.drain_redo();
let mut c2 = fresh();
c2.apply_redo(&log).unwrap();
assert_eq!(c1.serialize(), c2.serialize(), "multi-table redo diverged");
assert!(c1.drain_redo().is_empty());
}
#[test]
fn redo_log_codec_round_trips() {
let changes = vec![
RowChange::Insert {
table: "t".to_string(),
row: Row::new(alloc::vec![
Value::BigInt(1),
Value::Text("a".to_string()),
Value::Null,
Value::Bool(true),
]),
},
RowChange::Update {
table: "users".to_string(),
pos: 42,
new_row: alloc::vec![Value::Int(7), Value::Bytes(alloc::vec![1, 2, 3])],
},
RowChange::Delete {
table: "t".to_string(),
positions: alloc::vec![0, 5, 99],
},
RowChange::Delete {
table: "empty".to_string(),
positions: alloc::vec![],
},
];
let bytes = encode_redo_log(&changes);
assert_eq!(decode_redo_log(&bytes).unwrap(), changes);
let empty = encode_redo_log(&[]);
assert_eq!(decode_redo_log(&empty).unwrap(), Vec::<RowChange>::new());
assert!(decode_redo_log(&bytes[..bytes.len() / 2]).is_err());
assert!(decode_redo_log(&[]).is_err());
}
#[test]
fn snapshot_round_trips_large_bytea_and_text_array_element() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"q",
vec![
ColumnSchema::new("id", DataType::BigInt, false),
ColumnSchema::new("data", DataType::Bytes, true),
ColumnSchema::new("uris", DataType::TextArray, true),
],
))
.unwrap();
let big_blob = alloc::vec![0xAB_u8; 200_000];
let big_elem = "u".repeat(100_000);
cat.get_mut("q")
.unwrap()
.insert(Row::new(alloc::vec![
Value::BigInt(1),
Value::Bytes(big_blob.clone()),
Value::TextArray(alloc::vec![Some(big_elem.clone()), None, Some("s".into())]),
]))
.unwrap();
let bytes = cat.serialize();
let re = Catalog::deserialize(&bytes).unwrap();
let row = re.get("q").unwrap().rows.get(0).unwrap().clone();
match &row.values[1] {
Value::Bytes(b) => assert_eq!(b.len(), big_blob.len()),
other => panic!("expected Bytes, got {other:?}"),
}
match &row.values[2] {
Value::TextArray(items) => {
assert_eq!(items[0].as_ref().unwrap().len(), big_elem.len());
assert!(items[1].is_none());
}
other => panic!("expected TextArray, got {other:?}"),
}
}
#[test]
fn plain_u16_bytea_len_ffff_decodes_under_v46_rules() {
let payload = alloc::vec![7_u8; 65_535];
let mut buf = Vec::new();
write_u16(&mut buf, 65_535);
buf.extend_from_slice(&payload);
let mut cur = Cursor::new(&buf).with_codec_version(46);
let len = cur.read_len_escaped_v47().unwrap();
assert_eq!(len, 65_535);
assert_eq!(cur.take(len).unwrap().len(), 65_535);
}
#[test]
fn escaped_string_codec_round_trips_large_text() {
for len in [0usize, 1, 65_534, 65_535, 65_536, 1_048_576] {
let s: String = "x".repeat(len);
let mut buf = Vec::new();
write_str(&mut buf, &s);
let expected_header = if len >= STR_LEN_ESCAPE as usize { 6 } else { 2 };
assert_eq!(buf.len(), expected_header + len, "header width for {len}");
let mut cur = Cursor::new(&buf).with_codec_version(FILE_VERSION);
assert_eq!(cur.read_str().unwrap().len(), len, "round-trip {len}");
}
}
#[test]
fn plain_u16_len_ffff_decodes_under_old_rules() {
let s = "y".repeat(65_535);
let mut buf = Vec::new();
write_u16(&mut buf, 65_535);
buf.extend_from_slice(s.as_bytes());
let mut old = Cursor::new(&buf); assert_eq!(old.read_str().unwrap(), s);
}
#[test]
fn snapshot_round_trips_megabyte_text_row() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"mail",
vec![
ColumnSchema::new("id", DataType::BigInt, false),
ColumnSchema::new("body", DataType::Text, false),
],
))
.unwrap();
let body = "m".repeat(1_048_576);
cat.get_mut("mail")
.unwrap()
.insert(Row::new(vec![Value::BigInt(1), Value::Text(body.clone())]))
.unwrap();
let bytes = cat.serialize();
let re = Catalog::deserialize(&bytes).unwrap();
let t = re.get("mail").unwrap();
match &t.rows.get(0).unwrap().values[1] {
Value::Text(s) => assert_eq!(s.len(), body.len()),
other => panic!("expected Text, got {other:?}"),
}
}
#[test]
fn segment_v3_round_trips_large_text_rows() {
let schema = TableSchema::new(
"mail",
vec![
ColumnSchema::new("id", DataType::BigInt, false),
ColumnSchema::new("body", DataType::Text, false),
],
);
let big = "b".repeat(200_000);
let rows: Vec<(u64, Vec<u8>)> = (0u64..3)
.map(|i| {
let row = Row::new(vec![
Value::BigInt(i.cast_signed()),
Value::Text(big.clone()),
]);
(i, encode_row_body_dense(&row, &schema))
})
.collect();
let (bytes, _) = encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
assert_eq!(&bytes[..8], b"SPGSEG\x04\x00", "new segments are V4");
let seg = OwnedSegment::from_bytes(bytes).unwrap();
assert!(seg.codec_version() >= 47);
let payload = seg.lookup(1).expect("pk 1 present");
let (row, _) = decode_row_body_dense(&payload, &schema, seg.codec_version()).unwrap();
match &row.values[1] {
Value::Text(s) => assert_eq!(s.len(), big.len()),
other => panic!("expected Text, got {other:?}"),
}
}
#[test]
fn index_key_round_trips_large_text() {
let key = IndexKey::Text("k".repeat(100_000));
let mut buf = Vec::new();
write_index_key(&mut buf, &key);
let mut cur = Cursor::new(&buf).with_codec_version(FILE_VERSION);
let back = cur.read_index_key().unwrap();
assert_eq!(back, key);
}
#[cfg(target_arch = "aarch64")]
#[test]
fn neon_l2_matches_scalar() {
let dims = [4usize, 8, 12, 16, 64, 128, 256, 384, 512, 768, 1024, 1536];
for &d in &dims {
let mut state: u64 = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
let mut a = Vec::with_capacity(d);
let mut b = Vec::with_capacity(d);
for _ in 0..d {
state = state
.wrapping_mul(6_364_136_223_846_793_005)
.wrapping_add(1);
#[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
state = state
.wrapping_mul(6_364_136_223_846_793_005)
.wrapping_add(1);
#[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
a.push(x);
b.push(y);
}
let scalar = l2_distance_sq_scalar(&a, &b);
let neon = unsafe { l2_distance_sq_neon(&a, &b) };
let tol = (scalar.abs().max(1e-6)) * 1e-4;
assert!(
(scalar - neon).abs() <= tol,
"dim={d}: scalar={scalar} neon={neon} diff={}",
(scalar - neon).abs()
);
}
}
#[cfg(target_arch = "aarch64")]
#[test]
fn neon_inner_product_matches_scalar() {
let dims = [4usize, 8, 12, 16, 64, 128, 256, 512, 1024];
for &d in &dims {
let mut state: u64 = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15);
let mut a = Vec::with_capacity(d);
let mut b = Vec::with_capacity(d);
for _ in 0..d {
state = state
.wrapping_mul(6_364_136_223_846_793_005)
.wrapping_add(1);
#[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
state = state
.wrapping_mul(6_364_136_223_846_793_005)
.wrapping_add(1);
#[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
a.push(x);
b.push(y);
}
let scalar = inner_product_scalar(&a, &b);
let neon = unsafe { inner_product_neon(&a, &b) };
#[allow(clippy::cast_precision_loss)]
let tol = (scalar.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
assert!(
(scalar - neon).abs() <= tol,
"IP dim={d}: scalar={scalar} neon={neon} diff={}",
(scalar - neon).abs()
);
}
}
#[cfg(target_arch = "aarch64")]
#[allow(clippy::similar_names)]
#[test]
fn neon_cosine_dot_norms_matches_scalar() {
let dims = [4usize, 8, 12, 16, 64, 128, 256, 512, 1024];
for &d in &dims {
let mut state: u64 = (d as u64).wrapping_mul(0xBF58_476D_1CE4_E5B9);
let mut a = Vec::with_capacity(d);
let mut b = Vec::with_capacity(d);
for _ in 0..d {
state = state
.wrapping_mul(6_364_136_223_846_793_005)
.wrapping_add(1);
#[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
let x = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
state = state
.wrapping_mul(6_364_136_223_846_793_005)
.wrapping_add(1);
#[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
let y = (((state >> 32) & 0x00FF_FFFF) as f32) / (0x80_0000_u32 as f32) - 1.0;
a.push(x);
b.push(y);
}
let (dot_s, na_s, nb_s) = cosine_dot_norms_scalar(&a, &b);
let (dot_n, na_n, nb_n) = unsafe { cosine_dot_norms_neon(&a, &b) };
#[allow(clippy::cast_precision_loss)]
let tol_d = (dot_s.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
#[allow(clippy::cast_precision_loss)]
let tol_n = (na_s.abs().max(1e-6)) * 1e-4 + (d as f32) * 1e-6;
assert!(
(dot_s - dot_n).abs() <= tol_d,
"cosine dot dim={d}: scalar={dot_s} neon={dot_n}"
);
assert!(
(na_s - na_n).abs() <= tol_n,
"cosine na dim={d}: scalar={na_s} neon={na_n}"
);
assert!(
(nb_s - nb_n).abs() <= tol_n,
"cosine nb dim={d}: scalar={nb_s} neon={nb_n}"
);
}
}
fn make_users_schema() -> TableSchema {
TableSchema::new(
"users",
vec![
ColumnSchema::new("id", DataType::Int, false),
ColumnSchema::new("name", DataType::Text, false),
ColumnSchema::new("score", DataType::Float, true),
],
)
}
#[test]
fn value_type_tag_matches_variant() {
assert_eq!(Value::Int(1).data_type(), Some(DataType::Int));
assert_eq!(Value::BigInt(1).data_type(), Some(DataType::BigInt));
assert_eq!(Value::Float(1.0).data_type(), Some(DataType::Float));
assert_eq!(Value::Text("x".into()).data_type(), Some(DataType::Text));
assert_eq!(Value::Bool(true).data_type(), Some(DataType::Bool));
assert_eq!(Value::Null.data_type(), None);
assert!(Value::Null.is_null());
assert!(!Value::Int(0).is_null());
}
#[test]
fn sq8_value_reports_sq8_data_type() {
let q = crate::quantize::quantize(&[0.0, 0.25, 0.5, 0.75, 1.0]);
let v = Value::Sq8Vector(q);
assert_eq!(
v.data_type(),
Some(DataType::Vector {
dim: 5,
encoding: VecEncoding::Sq8,
}),
);
}
#[test]
fn datatype_display_matches_pg_keyword() {
assert_eq!(DataType::Int.to_string(), "INT");
assert_eq!(DataType::BigInt.to_string(), "BIGINT");
assert_eq!(DataType::Float.to_string(), "FLOAT");
assert_eq!(DataType::Text.to_string(), "TEXT");
assert_eq!(DataType::Bool.to_string(), "BOOL");
}
#[test]
fn row_len_and_emptiness() {
let r = Row::new(vec![Value::Int(1), Value::Null]);
assert_eq!(r.len(), 2);
assert!(!r.is_empty());
assert!(Row::new(Vec::new()).is_empty());
}
#[test]
fn table_schema_column_position() {
let s = make_users_schema();
assert_eq!(s.column_position("id"), Some(0));
assert_eq!(s.column_position("score"), Some(2));
assert_eq!(s.column_position("missing"), None);
}
#[test]
fn catalog_create_table_then_lookup() {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
assert_eq!(cat.table_count(), 1);
assert!(cat.get("users").is_some());
assert!(cat.get("nope").is_none());
}
#[test]
fn catalog_duplicate_table_is_rejected() {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
let err = cat.create_table(make_users_schema()).unwrap_err();
assert!(matches!(err, StorageError::DuplicateTable { ref name } if name == "users"));
}
#[test]
fn table_insert_happy_path_appends_row() {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
t.insert(Row::new(vec![
Value::Int(1),
Value::Text("alice".into()),
Value::Float(99.5),
]))
.unwrap();
assert_eq!(t.row_count(), 1);
assert_eq!(t.rows()[0].values[1], Value::Text("alice".into()));
}
#[test]
fn table_insert_arity_mismatch() {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
let err = t.insert(Row::new(vec![Value::Int(1)])).unwrap_err();
assert!(matches!(
err,
StorageError::ArityMismatch {
expected: 3,
actual: 1
}
));
assert_eq!(t.row_count(), 0);
}
#[test]
fn table_insert_type_mismatch_reports_column() {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
let err = t
.insert(Row::new(vec![
Value::Int(1),
Value::Int(42), Value::Float(0.0),
]))
.unwrap_err();
match err {
StorageError::TypeMismatch {
ref column,
expected,
actual,
position,
} => {
assert_eq!(column, "name");
assert_eq!(expected, DataType::Text);
assert_eq!(actual, DataType::Int);
assert_eq!(position, 1);
}
other => panic!("unexpected: {other:?}"),
}
assert_eq!(t.row_count(), 0);
}
#[test]
fn table_insert_null_into_not_null_rejected() {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
let err = t
.insert(Row::new(vec![
Value::Int(1),
Value::Null, Value::Float(1.0),
]))
.unwrap_err();
assert!(matches!(err, StorageError::NullInNotNull { ref column } if column == "name"));
}
#[test]
fn table_insert_null_into_nullable_ok() {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
t.insert(Row::new(vec![
Value::Int(1),
Value::Text("bob".into()),
Value::Null,
]))
.unwrap();
assert_eq!(t.row_count(), 1);
}
#[test]
fn catalog_get_mut_independent_per_table() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"a",
vec![ColumnSchema::new("v", DataType::Int, false)],
))
.unwrap();
cat.create_table(TableSchema::new(
"b",
vec![ColumnSchema::new("v", DataType::Int, false)],
))
.unwrap();
cat.get_mut("a")
.unwrap()
.insert(Row::new(vec![Value::Int(1)]))
.unwrap();
assert_eq!(cat.get("a").unwrap().row_count(), 1);
assert_eq!(cat.get("b").unwrap().row_count(), 0);
}
fn assert_round_trip(cat: &Catalog) {
let bytes = cat.serialize();
let restored = Catalog::deserialize(&bytes).expect("deserialize");
assert_eq!(restored.table_count(), cat.table_count());
for (a, b) in cat.tables.iter().zip(restored.tables.iter()) {
assert_eq!(a.schema, b.schema);
assert_eq!(a.rows, b.rows);
}
}
#[test]
fn serialize_empty_catalog_round_trips() {
assert_round_trip(&Catalog::new());
}
#[test]
fn serialize_single_empty_table_round_trips() {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
assert_round_trip(&cat);
}
#[test]
fn nsw_clone_is_o1() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"docs",
alloc::vec![
ColumnSchema::new("id", DataType::Int, false),
ColumnSchema::new(
"v",
DataType::Vector {
dim: 3,
encoding: VecEncoding::F32
},
true
),
],
))
.unwrap();
let t = cat.get_mut("docs").unwrap();
for i in 0..1500_i32 {
#[allow(clippy::cast_precision_loss)] let base = (i as f32) * 0.01;
t.insert(Row::new(alloc::vec![
Value::Int(i),
Value::Vector(alloc::vec![base, base + 0.05, base + 0.1]),
]))
.unwrap();
}
t.add_nsw_index("docs_nsw".into(), "v", NSW_DEFAULT_M)
.unwrap();
let g = match &cat.get("docs").unwrap().indices()[0].kind {
IndexKind::Nsw(g) => g,
IndexKind::BTree(_)
| IndexKind::Brin { .. }
| IndexKind::Gin(_)
| IndexKind::GinTrgm(_)
| IndexKind::GinFulltext(_) => {
panic!("expected NSW")
}
};
assert_eq!(g.levels.len(), 1500, "one level slot per inserted row");
assert!(
g.layers.len() >= 2,
"1500 nodes should populate at least two HNSW layers, got {}",
g.layers.len()
);
let cloned = g.clone();
assert!(
g.levels.shares_storage_with(&cloned.levels),
"levels PV not shared after clone — clone copied elements (O(N))"
);
assert_eq!(g.layers.len(), cloned.layers.len());
for (l, (orig, cl)) in g.layers.iter().zip(cloned.layers.iter()).enumerate() {
assert!(
orig.shares_storage_with(cl),
"layer {l} PV not shared after clone — clone copied elements (O(N))"
);
}
}
#[test]
fn sq8_catalog_serialise_roundtrip_preserves_cells_and_index() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"vecs",
alloc::vec![
ColumnSchema::new("id", DataType::Int, false),
ColumnSchema::new(
"v",
DataType::Vector {
dim: 8,
encoding: VecEncoding::Sq8,
},
false,
),
],
))
.unwrap();
let t = cat.get_mut("vecs").unwrap();
for i in 0..32_i32 {
#[allow(clippy::cast_precision_loss)]
let base = (i as f32) * 0.03;
let v: Vec<f32> = (0..8_i32)
.map(|j| {
#[allow(clippy::cast_precision_loss)]
let off = (j as f32) * 0.01;
base + off
})
.collect();
t.insert(Row::new(alloc::vec![
Value::Int(i),
Value::Sq8Vector(quantize::quantize(&v)),
]))
.unwrap();
}
t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
let query = alloc::vec![0.15_f32, 0.16, 0.17, 0.18, 0.19, 0.20, 0.21, 0.22];
let (before_cell, before_ty, before_hits) = {
let t_ref = cat.get("vecs").unwrap();
(
t_ref.rows()[5].values[1].clone(),
t_ref.schema().columns[1].ty,
nsw_query(t_ref, "v_idx", &query, 5, NswMetric::L2),
)
};
let bytes = cat.serialize();
let restored = Catalog::deserialize(&bytes).expect("deserialize ok");
let rt = restored.get("vecs").unwrap();
assert_eq!(rt.schema().columns[1].ty, before_ty);
assert_eq!(rt.rows()[5].values[1], before_cell);
let after_hits = nsw_query(rt, "v_idx", &query, 5, NswMetric::L2);
assert_eq!(before_hits, after_hits);
}
#[test]
fn half_catalog_serialise_roundtrip_preserves_cells_and_index() {
use crate::halfvec;
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"vecs",
alloc::vec![
ColumnSchema::new("id", DataType::Int, false),
ColumnSchema::new(
"v",
DataType::Vector {
dim: 8,
encoding: VecEncoding::F16,
},
false,
),
],
))
.unwrap();
let t = cat.get_mut("vecs").unwrap();
for i in 0..32_i32 {
#[allow(clippy::cast_precision_loss)]
let base = (i as f32) * 0.03;
let v: Vec<f32> = (0..8_i32)
.map(|j| {
#[allow(clippy::cast_precision_loss)]
let off = (j as f32) * 0.01;
base + off
})
.collect();
t.insert(Row::new(alloc::vec![
Value::Int(i),
Value::HalfVector(halfvec::HalfVector::from_f32_slice(&v)),
]))
.unwrap();
}
t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
let query = alloc::vec![0.15_f32, 0.16, 0.17, 0.18, 0.19, 0.20, 0.21, 0.22];
let (before_cell, before_ty, before_hits) = {
let t_ref = cat.get("vecs").unwrap();
(
t_ref.rows()[5].values[1].clone(),
t_ref.schema().columns[1].ty,
nsw_query(t_ref, "v_idx", &query, 5, NswMetric::L2),
)
};
let bytes = cat.serialize();
let restored = Catalog::deserialize(&bytes).expect("deserialize ok");
let rt = restored.get("vecs").unwrap();
assert_eq!(rt.schema().columns[1].ty, before_ty);
assert_eq!(rt.rows()[5].values[1], before_cell);
let after_hits = nsw_query(rt, "v_idx", &query, 5, NswMetric::L2);
assert_eq!(before_hits, after_hits);
}
#[test]
#[allow(clippy::similar_names)]
fn hnsw_half_recall_at_10_matches_f32_groundtruth() {
use crate::halfvec;
fn next(state: &mut u64) -> f32 {
*state = state
.wrapping_add(0x9E37_79B9_7F4A_7C15)
.wrapping_mul(0xBF58_476D_1CE4_E5B9);
#[allow(clippy::cast_precision_loss)]
let u = ((*state >> 32) as u32 as f32) / (u32::MAX as f32);
2.0 * u - 1.0
}
let dim: u32 = 32;
let n: usize = 512;
let dim_us = dim as usize;
let mut seed: u64 = 0xF16_F16_F16_F16_u64;
let corpus: Vec<Vec<f32>> = (0..n)
.map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
.collect();
let queries: Vec<Vec<f32>> = (0..32)
.map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
.collect();
let exact_top10: Vec<Vec<usize>> = queries
.iter()
.map(|q| {
let mut scored: Vec<(f32, usize)> = corpus
.iter()
.enumerate()
.map(|(i, v)| (l2_distance_sq(v, q), i))
.collect();
scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
scored.into_iter().take(10).map(|(_, i)| i).collect()
})
.collect();
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"vecs",
alloc::vec![
ColumnSchema::new("id", DataType::Int, false),
ColumnSchema::new(
"v",
DataType::Vector {
dim,
encoding: VecEncoding::F16,
},
false,
),
],
))
.unwrap();
let t = cat.get_mut("vecs").unwrap();
for (i, v) in corpus.iter().enumerate() {
t.insert(Row::new(alloc::vec![
Value::Int(i32::try_from(i).unwrap()),
Value::HalfVector(halfvec::HalfVector::from_f32_slice(v)),
]))
.unwrap();
}
t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
let table = cat.get("vecs").unwrap();
let mut total_overlap = 0_usize;
for (q, exact) in queries.iter().zip(exact_top10.iter()) {
let hits = nsw_query(table, "v_idx", q, 10, NswMetric::L2);
for h in &hits {
if exact.contains(h) {
total_overlap += 1;
}
}
}
#[allow(clippy::cast_precision_loss)]
let recall = total_overlap as f32 / (10.0 * queries.len() as f32);
assert!(
recall >= 0.95,
"HALF HNSW recall@10 = {recall:.3}, below floor 0.95 — \
check halfvec dispatch in `cell_to_query_metric_distance`"
);
}
#[test]
fn hnsw_sq8_recall_at_10_above_0_95_vs_f32_groundtruth() {
use crate::quantize;
fn next(state: &mut u64) -> f32 {
*state = state
.wrapping_add(0x9E37_79B9_7F4A_7C15)
.wrapping_mul(0xBF58_476D_1CE4_E5B9);
#[allow(clippy::cast_precision_loss)]
let u = ((*state >> 32) as u32 as f32) / (u32::MAX as f32);
2.0 * u - 1.0
}
let dim: u32 = 32;
let n: usize = 512;
let dim_us = dim as usize;
let mut seed: u64 = 0xCAFE_BABE_DEAD_BEEFu64;
let corpus: Vec<Vec<f32>> = (0..n)
.map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
.collect();
let queries: Vec<Vec<f32>> = (0..32)
.map(|_| (0..dim_us).map(|_| next(&mut seed)).collect())
.collect();
let exact_top10: Vec<Vec<usize>> = queries
.iter()
.map(|q| {
let mut scored: Vec<(f32, usize)> = corpus
.iter()
.enumerate()
.map(|(i, v)| (l2_distance_sq(v, q), i))
.collect();
scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
scored.into_iter().take(10).map(|(_, i)| i).collect()
})
.collect();
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"vecs",
alloc::vec![
ColumnSchema::new("id", DataType::Int, false),
ColumnSchema::new(
"v",
DataType::Vector {
dim,
encoding: VecEncoding::Sq8,
},
false,
),
],
))
.unwrap();
let t = cat.get_mut("vecs").unwrap();
for (i, v) in corpus.iter().enumerate() {
t.insert(Row::new(alloc::vec![
Value::Int(i32::try_from(i).unwrap()),
Value::Sq8Vector(quantize::quantize(v)),
]))
.unwrap();
}
t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
let table = cat.get("vecs").unwrap();
let mut total_overlap = 0_usize;
for (q, exact) in queries.iter().zip(exact_top10.iter()) {
let hits = nsw_query(table, "v_idx", q, 10, NswMetric::L2);
for h in &hits {
if exact.contains(h) {
total_overlap += 1;
}
}
}
#[allow(clippy::cast_precision_loss)]
let recall = total_overlap as f32 / (10.0 * queries.len() as f32);
assert!(
recall >= 0.95,
"SQ8 HNSW recall@10 = {recall:.3}, below floor 0.95 — \
check `sq8_rerank` is wired in `nsw_search` for SQ8 columns"
);
}
#[test]
fn nsw_index_topology_persists_through_round_trip() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"docs",
alloc::vec![
ColumnSchema::new("id", DataType::Int, false),
ColumnSchema::new(
"v",
DataType::Vector {
dim: 3,
encoding: VecEncoding::F32
},
true
),
],
))
.unwrap();
let t = cat.get_mut("docs").unwrap();
for i in 0..6_i32 {
#[allow(clippy::cast_precision_loss)] let base = (i as f32) * 0.1;
let row = Row::new(alloc::vec![
Value::Int(i),
Value::Vector(alloc::vec![base, base + 0.05, base + 0.1]),
]);
t.insert(row).unwrap();
}
t.add_nsw_index("docs_nsw".into(), "v", NSW_DEFAULT_M)
.unwrap();
let original = match &cat.get("docs").unwrap().indices()[0].kind {
IndexKind::Nsw(g) => g.clone(),
IndexKind::BTree(_)
| IndexKind::Brin { .. }
| IndexKind::Gin(_)
| IndexKind::GinTrgm(_)
| IndexKind::GinFulltext(_) => {
panic!("expected NSW")
}
};
let bytes = cat.serialize();
let restored = Catalog::deserialize(&bytes).expect("deserialize");
let restored_graph = match &restored.get("docs").unwrap().indices()[0].kind {
IndexKind::Nsw(g) => g.clone(),
IndexKind::BTree(_)
| IndexKind::Brin { .. }
| IndexKind::Gin(_)
| IndexKind::GinTrgm(_)
| IndexKind::GinFulltext(_) => {
panic!("expected NSW")
}
};
assert_eq!(restored_graph.m, original.m);
assert_eq!(restored_graph.m_max_0, original.m_max_0);
assert_eq!(restored_graph.entry, original.entry);
assert_eq!(restored_graph.entry_level, original.entry_level);
assert_eq!(restored_graph.levels, original.levels);
assert_eq!(restored_graph.layers, original.layers);
}
#[test]
fn hnsw_level_assignment_is_deterministic() {
for i in 0..32usize {
assert_eq!(nsw_assign_level(i), nsw_assign_level(i));
}
}
#[test]
fn hnsw_layer_0_dominates_population() {
let on_zero = (0..200usize).filter(|&i| nsw_assign_level(i) == 0).count();
assert!(on_zero > 150, "level-0 nodes too few: {on_zero}");
}
#[test]
fn hnsw_search_matches_brute_force_for_l2_top1() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"vecs",
alloc::vec![
ColumnSchema::new("id", DataType::Int, false),
ColumnSchema::new(
"v",
DataType::Vector {
dim: 3,
encoding: VecEncoding::F32
},
true
),
],
))
.unwrap();
let t = cat.get_mut("vecs").unwrap();
let dataset: alloc::vec::Vec<(i32, [f32; 3])> = alloc::vec![
(1, [0.0, 0.0, 0.0]),
(2, [1.0, 0.0, 0.0]),
(3, [0.0, 1.0, 0.0]),
(4, [0.0, 0.0, 1.0]),
(5, [1.0, 1.0, 0.0]),
(6, [1.0, 0.0, 1.0]),
(7, [0.0, 1.0, 1.0]),
(8, [1.0, 1.0, 1.0]),
(9, [0.5, 0.5, 0.5]),
(10, [0.2, 0.8, 0.5]),
];
for &(id, v) in &dataset {
t.insert(Row::new(alloc::vec![
Value::Int(id),
Value::Vector(alloc::vec![v[0], v[1], v[2]]),
]))
.unwrap();
}
t.add_nsw_index("v_idx".into(), "v", NSW_DEFAULT_M).unwrap();
let idx_pos = cat
.get("vecs")
.unwrap()
.indices()
.iter()
.position(|i| i.name == "v_idx")
.unwrap();
for query in [[0.4, 0.4, 0.4], [0.9, 0.1, 0.0], [0.0, 0.9, 0.9]] {
let table = cat.get("vecs").unwrap();
let hnsw_top = nsw_search(table, idx_pos, &query, 1, 16, NswMetric::L2);
let mut brute: alloc::vec::Vec<(f32, usize)> = (0..table.rows.len())
.map(|i| {
let Value::Vector(v) = &table.rows[i].values[1] else {
return (f32::INFINITY, i);
};
(l2_distance_sq(v, &query), i)
})
.collect();
brute.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(core::cmp::Ordering::Equal));
assert!(!hnsw_top.is_empty(), "HNSW returned no results");
assert_eq!(
hnsw_top[0].1, brute[0].1,
"HNSW top-1 != brute-force top-1 for {query:?}"
);
}
}
#[test]
fn serialize_table_with_rows_round_trips() {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
t.insert(Row::new(vec![
Value::Int(1),
Value::Text("alice".into()),
Value::Float(95.5),
]))
.unwrap();
t.insert(Row::new(vec![
Value::Int(2),
Value::Text("bob".into()),
Value::Null,
]))
.unwrap();
assert_round_trip(&cat);
}
#[test]
fn serialize_multiple_tables_round_trips() {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
cat.create_table(TableSchema::new(
"flags",
vec![
ColumnSchema::new("id", DataType::BigInt, false),
ColumnSchema::new("active", DataType::Bool, false),
],
))
.unwrap();
cat.get_mut("flags")
.unwrap()
.insert(Row::new(vec![Value::BigInt(7), Value::Bool(true)]))
.unwrap();
assert_round_trip(&cat);
}
#[test]
fn deserialize_rejects_bad_magic() {
let mut buf = b"BADMAGIC".to_vec();
buf.push(FILE_VERSION);
buf.extend_from_slice(&0u32.to_le_bytes());
let err = Catalog::deserialize(&buf).unwrap_err();
assert!(matches!(err, StorageError::Corrupt(_)));
}
#[test]
fn deserialize_rejects_unsupported_version() {
let mut buf = FILE_MAGIC.to_vec();
buf.push(99); buf.extend_from_slice(&0u32.to_le_bytes());
let err = Catalog::deserialize(&buf).unwrap_err();
assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("version")));
}
#[test]
fn deserialize_rejects_truncated_file() {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
let bytes = cat.serialize();
let truncated = &bytes[..bytes.len() - 1];
assert!(matches!(
Catalog::deserialize(truncated),
Err(StorageError::Corrupt(_))
));
}
#[test]
fn deserialize_rejects_trailing_garbage() {
let cat = Catalog::new();
let mut bytes = cat.serialize();
bytes.push(0xFF);
assert!(matches!(
Catalog::deserialize(&bytes),
Err(StorageError::Corrupt(ref s)) if s.contains("trailing")
));
}
fn populated_users() -> Catalog {
let mut cat = Catalog::new();
cat.create_table(make_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for (id, name, score) in [
(1, "alice", Some(90.0)),
(2, "bob", None),
(3, "alice", Some(70.0)), ] {
t.insert(Row::new(vec![
Value::Int(id),
Value::Text(name.into()),
score.map_or(Value::Null, Value::Float),
]))
.unwrap();
}
cat
}
#[test]
fn add_index_builds_from_existing_rows() {
let mut cat = populated_users();
cat.get_mut("users")
.unwrap()
.add_index("by_id".into(), "id")
.unwrap();
let t = cat.get("users").unwrap();
let idx = t.index_on(0).expect("index_on(0)");
assert_eq!(idx.lookup_eq(&IndexKey::Int(2)), &[RowLocator::Hot(1)]);
assert_eq!(idx.lookup_eq(&IndexKey::Int(99)), &[] as &[RowLocator]);
}
#[test]
fn add_index_dup_name_rejected() {
let mut cat = populated_users();
let t = cat.get_mut("users").unwrap();
t.add_index("ix".into(), "id").unwrap();
let err = t.add_index("ix".into(), "name").unwrap_err();
assert!(matches!(err, StorageError::DuplicateIndex { ref name } if name == "ix"));
}
#[test]
fn add_index_unknown_column_rejected() {
let mut cat = populated_users();
let err = cat
.get_mut("users")
.unwrap()
.add_index("ix".into(), "ghost")
.unwrap_err();
assert!(matches!(err, StorageError::ColumnNotFound { ref column } if column == "ghost"));
}
#[test]
fn insert_after_create_index_updates_it() {
let mut cat = populated_users();
let t = cat.get_mut("users").unwrap();
t.add_index("by_name".into(), "name").unwrap();
t.insert(Row::new(vec![
Value::Int(4),
Value::Text("dave".into()),
Value::Null,
]))
.unwrap();
let idx = t.index_on(1).unwrap();
assert_eq!(
idx.lookup_eq(&IndexKey::Text("dave".into())),
&[RowLocator::Hot(3)]
);
assert_eq!(
idx.lookup_eq(&IndexKey::Text("alice".into())),
&[RowLocator::Hot(0), RowLocator::Hot(2)]
);
}
#[test]
fn null_or_float_values_are_not_indexed() {
let mut cat = populated_users();
let t = cat.get_mut("users").unwrap();
t.add_index("by_score".into(), "score").unwrap();
let idx = t.index_on(2).unwrap();
assert_eq!(idx.lookup_eq(&IndexKey::Int(90)), &[] as &[RowLocator]);
}
#[test]
fn vector_value_data_type_carries_dim() {
let v = Value::Vector(vec![1.0, 2.0, 3.0]);
assert_eq!(
v.data_type(),
Some(DataType::Vector {
dim: 3,
encoding: VecEncoding::F32
})
);
}
#[test]
fn vector_column_insert_matching_dim_ok() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"emb",
vec![ColumnSchema::new(
"v",
DataType::Vector {
dim: 3,
encoding: VecEncoding::F32,
},
false,
)],
))
.unwrap();
cat.get_mut("emb")
.unwrap()
.insert(Row::new(vec![Value::Vector(vec![1.0, 2.0, 3.0])]))
.unwrap();
}
#[test]
fn vector_column_insert_dim_mismatch_rejected() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"emb",
vec![ColumnSchema::new(
"v",
DataType::Vector {
dim: 3,
encoding: VecEncoding::F32,
},
false,
)],
))
.unwrap();
let err = cat
.get_mut("emb")
.unwrap()
.insert(Row::new(vec![Value::Vector(vec![1.0, 2.0])]))
.unwrap_err();
assert!(matches!(err, StorageError::TypeMismatch { .. }));
}
#[test]
fn vector_value_survives_catalog_round_trip() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"emb",
vec![
ColumnSchema::new("id", DataType::Int, false),
ColumnSchema::new(
"v",
DataType::Vector {
dim: 4,
encoding: VecEncoding::F32,
},
false,
),
],
))
.unwrap();
cat.get_mut("emb")
.unwrap()
.insert(Row::new(vec![
Value::Int(1),
Value::Vector(vec![0.5, -1.25, 3.0, 7.0]),
]))
.unwrap();
let restored = Catalog::deserialize(&cat.serialize()).expect("round-trip");
let table = restored.get("emb").unwrap();
assert_eq!(
table.schema().columns[1].ty,
DataType::Vector {
dim: 4,
encoding: VecEncoding::F32
}
);
assert_eq!(
table.rows()[0].values[1],
Value::Vector(vec![0.5, -1.25, 3.0, 7.0])
);
}
#[test]
fn index_survives_serialize_deserialize_round_trip() {
let mut cat = populated_users();
cat.get_mut("users")
.unwrap()
.add_index("by_name".into(), "name")
.unwrap();
let restored = Catalog::deserialize(&cat.serialize()).unwrap();
let idx = restored
.get("users")
.unwrap()
.index_on(1)
.expect("index_on(1) after restore");
assert_eq!(idx.name, "by_name");
assert_eq!(
idx.lookup_eq(&IndexKey::Text("alice".into())),
&[RowLocator::Hot(0), RowLocator::Hot(2)]
);
}
fn bigint_pk_users_schema() -> TableSchema {
TableSchema::new(
"users",
vec![
ColumnSchema::new("id", DataType::BigInt, false),
ColumnSchema::new("name", DataType::Text, false),
],
)
}
fn make_user_row(id: i64, name: &str) -> Row {
Row::new(vec![Value::BigInt(id), Value::Text(name.into())])
}
#[test]
fn update_row_non_indexed_column_keeps_index_intact() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
t.insert(make_user_row(id, name)).unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
t.update_row(1, vec![Value::BigInt(2), Value::Text("bobby".into())])
.unwrap();
let idx = t.index_on(0).unwrap();
assert_eq!(
idx.lookup_eq(&IndexKey::Int(2)),
&[RowLocator::Hot(1)],
"old key still resolves the in-place position"
);
assert_eq!(t.rows()[1].values[1], Value::Text("bobby".into()));
}
#[test]
fn update_row_indexed_column_moves_entry() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
t.insert(make_user_row(id, name)).unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
t.update_row(1, vec![Value::BigInt(20), Value::Text("bob".into())])
.unwrap();
let idx = t.index_on(0).unwrap();
assert!(
idx.lookup_eq(&IndexKey::Int(2)).is_empty(),
"old key entry removed"
);
assert_eq!(
idx.lookup_eq(&IndexKey::Int(20)),
&[RowLocator::Hot(1)],
"new key entry resolves the position"
);
assert_eq!(idx.lookup_eq(&IndexKey::Int(1)), &[RowLocator::Hot(0)]);
assert_eq!(idx.lookup_eq(&IndexKey::Int(3)), &[RowLocator::Hot(2)]);
}
#[test]
fn update_row_duplicate_key_moves_only_target_position() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for (id, name) in [(7i64, "a"), (7, "b"), (9, "c")] {
t.insert(make_user_row(id, name)).unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
t.update_row(1, vec![Value::BigInt(8), Value::Text("b".into())])
.unwrap();
let idx = t.index_on(0).unwrap();
assert_eq!(idx.lookup_eq(&IndexKey::Int(7)), &[RowLocator::Hot(0)]);
assert_eq!(idx.lookup_eq(&IndexKey::Int(8)), &[RowLocator::Hot(1)]);
assert_eq!(idx.lookup_eq(&IndexKey::Int(9)), &[RowLocator::Hot(2)]);
}
#[test]
fn update_row_null_transition_on_indexed_nullable_column() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"n",
vec![
ColumnSchema::new("id", DataType::BigInt, false),
ColumnSchema::new("tag", DataType::BigInt, true),
],
))
.unwrap();
let t = cat.get_mut("n").unwrap();
t.insert(Row::new(vec![Value::BigInt(1), Value::BigInt(5)]))
.unwrap();
t.add_index("by_tag".into(), "tag").unwrap();
t.update_row(0, vec![Value::BigInt(1), Value::Null])
.unwrap();
let idx = t.index_on(1).unwrap();
assert!(idx.lookup_eq(&IndexKey::Int(5)).is_empty());
t.update_row(0, vec![Value::BigInt(1), Value::BigInt(6)])
.unwrap();
let idx = t.index_on(1).unwrap();
assert_eq!(idx.lookup_eq(&IndexKey::Int(6)), &[RowLocator::Hot(0)]);
}
#[test]
fn lookup_by_pk_finds_row_via_hot_index() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
t.insert(make_user_row(id, name)).unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
let got = cat
.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
.unwrap();
assert_eq!(got, make_user_row(2, "bob"));
assert_eq!(cat.cold_segment_count(), 0);
}
#[test]
fn lookup_by_pk_returns_none_when_key_missing() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
t.insert(make_user_row(1, "alice")).unwrap();
t.add_index("by_id".into(), "id").unwrap();
assert!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(999))
.is_none()
);
assert!(
cat.lookup_by_pk("other_table", "by_id", &IndexKey::Int(1))
.is_none()
);
assert!(
cat.lookup_by_pk("users", "no_such_index", &IndexKey::Int(1))
.is_none()
);
}
#[test]
fn lookup_by_pk_resolves_cold_locator_via_loaded_segment() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
t.add_index("by_id".into(), "id").unwrap();
let schema = t.schema.clone();
let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe"), (300, "kim"), (400, "lin")];
let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
.iter()
.map(|(id, name)| {
let row = make_user_row(*id, name);
((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
})
.collect();
let (seg_bytes, _meta) =
encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
let seg_id = cat.load_segment_bytes(seg_bytes).unwrap();
assert_eq!(seg_id, 0);
assert_eq!(cat.cold_segment_count(), 1);
let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
.iter()
.map(|(id, _)| {
(
IndexKey::Int(*id),
RowLocator::Cold {
segment_id: seg_id,
page_offset: 0,
},
)
})
.collect();
let registered = cat
.get_mut("users")
.unwrap()
.register_cold_locators("by_id", pairs)
.unwrap();
assert_eq!(registered, 4);
for (id, name) in &cold_rows {
let got = cat
.lookup_by_pk("users", "by_id", &IndexKey::Int(*id))
.unwrap_or_else(|| panic!("cold key {id} not found"));
assert_eq!(got, make_user_row(*id, name));
}
assert!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(999))
.is_none()
);
}
#[test]
fn lookup_by_pk_mixes_hot_and_cold_tiers() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for (id, name) in [(1i64, "alice"), (2, "bob")] {
t.insert(make_user_row(id, name)).unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
let schema = t.schema.clone();
let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe")];
let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
.iter()
.map(|(id, name)| {
let row = make_user_row(*id, name);
((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
})
.collect();
let (seg_bytes, _) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
let seg_id = cat.load_segment_bytes(seg_bytes).unwrap();
let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
.iter()
.map(|(id, _)| {
(
IndexKey::Int(*id),
RowLocator::Cold {
segment_id: seg_id,
page_offset: 0,
},
)
})
.collect();
cat.get_mut("users")
.unwrap()
.register_cold_locators("by_id", pairs)
.unwrap();
assert_eq!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
.unwrap(),
make_user_row(1, "alice")
);
assert_eq!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
.unwrap(),
make_user_row(2, "bob")
);
assert_eq!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(100))
.unwrap(),
make_user_row(100, "ivy")
);
assert_eq!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(200))
.unwrap(),
make_user_row(200, "joe")
);
assert!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(50))
.is_none()
);
}
#[test]
fn register_cold_locators_rejects_nsw_index() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"vecs",
vec![
ColumnSchema::new("id", DataType::Int, false),
ColumnSchema::new(
"v",
DataType::Vector {
dim: 4,
encoding: VecEncoding::F32,
},
false,
),
],
))
.unwrap();
let t = cat.get_mut("vecs").unwrap();
t.insert(Row::new(vec![
Value::Int(1),
Value::Vector(vec![1.0, 0.0, 0.0, 0.0]),
]))
.unwrap();
t.add_nsw_index("by_v".into(), "v", NSW_DEFAULT_M).unwrap();
let err = t
.register_cold_locators(
"by_v",
vec![(
IndexKey::Int(1),
RowLocator::Cold {
segment_id: 0,
page_offset: 0,
},
)],
)
.unwrap_err();
assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("not BTree")));
}
#[test]
fn load_segment_bytes_rejects_garbage() {
let mut cat = Catalog::new();
let err = cat.load_segment_bytes(vec![0u8; 10]).unwrap_err();
assert!(matches!(err, StorageError::Corrupt(ref s) if s.contains("segment")));
assert_eq!(cat.cold_segment_count(), 0);
}
#[test]
fn load_segment_bytes_returns_sequential_ids() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let schema = cat.get("users").unwrap().schema.clone();
for batch in 0u32..3 {
let rows: Vec<(u64, Vec<u8>)> = (0u64..4)
.map(|i| {
let id = u64::from(batch) * 100 + i;
let row = make_user_row(id.cast_signed(), "x");
(id, encode_row_body_dense(&row, &schema))
})
.collect();
let (bytes, _) = encode_segment(rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
assert_eq!(cat.load_segment_bytes(bytes).unwrap(), batch);
}
assert_eq!(cat.cold_segment_count(), 3);
}
#[test]
fn v8_catalog_decodes_as_all_hot_under_v9_reader() {
let mut cat = populated_users();
cat.get_mut("users")
.unwrap()
.add_index("by_name".into(), "name")
.unwrap();
let v8_bytes = encode_as_v8(&cat);
assert_eq!(v8_bytes[FILE_MAGIC.len()], 8, "version byte must be 8");
let restored = Catalog::deserialize(&v8_bytes).expect("v9 reader accepts v8 stream");
let idx = restored
.get("users")
.unwrap()
.index_on(1)
.expect("index_on(1) after restore");
assert_eq!(
idx.lookup_eq(&IndexKey::Text("alice".into())),
&[RowLocator::Hot(0), RowLocator::Hot(2)]
);
for entry in idx.lookup_eq(&IndexKey::Text("alice".into())) {
assert!(entry.is_hot(), "v8 → v9 read must yield Hot only");
}
}
fn encode_as_v8(cat: &Catalog) -> Vec<u8> {
let mut out = Vec::with_capacity(64);
out.extend_from_slice(FILE_MAGIC);
out.push(8u8);
write_u32(&mut out, u32::try_from(cat.tables.len()).unwrap());
for t in &cat.tables {
write_str(&mut out, &t.schema.name);
write_u16(&mut out, u16::try_from(t.schema.columns.len()).unwrap());
for c in &t.schema.columns {
write_str(&mut out, &c.name);
write_data_type(&mut out, c.ty);
out.push(u8::from(c.nullable));
match &c.default {
None => out.push(0),
Some(v) => {
out.push(1);
write_value(&mut out, v);
}
}
out.push(u8::from(c.auto_increment));
}
write_u32(&mut out, u32::try_from(t.rows.len()).unwrap());
for row in &t.rows {
out.extend_from_slice(&encode_row_body_dense(row, &t.schema));
}
write_u16(&mut out, u16::try_from(t.indices.len()).unwrap());
for idx in &t.indices {
write_str(&mut out, &idx.name);
write_u16(&mut out, u16::try_from(idx.column_position).unwrap());
match &idx.kind {
IndexKind::BTree(_) => out.push(0),
IndexKind::Nsw(g) => {
out.push(1);
write_u16(&mut out, u16::try_from(g.m).unwrap());
write_nsw_graph(&mut out, g);
}
IndexKind::Brin { .. } => panic!(
"v8 catalog writer cannot serialise BRIN — \
tests with BRIN indices must use the current writer"
),
IndexKind::Gin(_) => panic!(
"v8 catalog writer cannot serialise GIN — \
tests with GIN indices must use the current writer"
),
IndexKind::GinTrgm(_) => panic!(
"v8 catalog writer cannot serialise trigram-GIN — \
tests with trgm indices must use the current writer"
),
IndexKind::GinFulltext(_) => panic!(
"v8 catalog writer cannot serialise fulltext-GIN — \
tests with FULLTEXT KEY must use the current writer"
),
}
}
}
out
}
#[test]
fn v9_catalog_round_trip_preserves_cold_locators() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for (id, name) in [(1i64, "alice"), (2, "bob")] {
t.insert(make_user_row(id, name)).unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
let schema = t.schema.clone();
let cold_rows: Vec<(i64, &str)> = vec![(100, "ivy"), (200, "joe"), (300, "kim")];
let seg_rows: Vec<(u64, Vec<u8>)> = cold_rows
.iter()
.map(|(id, name)| {
let row = make_user_row(*id, name);
((*id).cast_unsigned(), encode_row_body_dense(&row, &schema))
})
.collect();
let (seg_bytes, _) = encode_segment(seg_rows.into_iter(), 0.01, SEGMENT_PAGE_BYTES).unwrap();
let seg_id = cat.load_segment_bytes(seg_bytes.clone()).unwrap();
let pairs: Vec<(IndexKey, RowLocator)> = cold_rows
.iter()
.map(|(id, _)| {
(
IndexKey::Int(*id),
RowLocator::Cold {
segment_id: seg_id,
page_offset: 0,
},
)
})
.collect();
cat.get_mut("users")
.unwrap()
.register_cold_locators("by_id", pairs)
.unwrap();
let bytes = cat.serialize();
assert_eq!(bytes[FILE_MAGIC.len()], FILE_VERSION);
let mut restored = Catalog::deserialize(&bytes).expect("v9 round-trip parses");
let restored_seg_id = restored.load_segment_bytes(seg_bytes).unwrap();
assert_eq!(restored_seg_id, seg_id);
let idx = restored.get("users").unwrap().index_on(0).unwrap();
assert_eq!(idx.lookup_eq(&IndexKey::Int(1)), &[RowLocator::Hot(0)]);
assert_eq!(idx.lookup_eq(&IndexKey::Int(2)), &[RowLocator::Hot(1)]);
for (id, _) in &cold_rows {
assert_eq!(
idx.lookup_eq(&IndexKey::Int(*id)),
&[RowLocator::Cold {
segment_id: seg_id,
page_offset: 0,
}]
);
}
assert_eq!(
restored
.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
.unwrap(),
make_user_row(2, "bob")
);
for (id, name) in &cold_rows {
assert_eq!(
restored
.lookup_by_pk("users", "by_id", &IndexKey::Int(*id))
.unwrap(),
make_user_row(*id, name)
);
}
}
#[test]
fn row_body_encoded_len_matches_actual_encode_for_all_types() {
let schema = TableSchema::new(
"wide",
vec![
ColumnSchema::new("a", DataType::SmallInt, true),
ColumnSchema::new("b", DataType::Int, false),
ColumnSchema::new("c", DataType::BigInt, false),
ColumnSchema::new("d", DataType::Float, false),
ColumnSchema::new("e", DataType::Bool, false),
ColumnSchema::new("f", DataType::Text, false),
ColumnSchema::new(
"g",
DataType::Vector {
dim: 3,
encoding: VecEncoding::F32,
},
false,
),
ColumnSchema::new(
"h",
DataType::Numeric {
precision: 18,
scale: 2,
},
false,
),
ColumnSchema::new("i", DataType::Date, false),
ColumnSchema::new("j", DataType::Timestamp, false),
],
);
let cases: &[Row] = &[
Row::new(vec![
Value::SmallInt(7),
Value::Int(42),
Value::BigInt(1_000_000),
Value::Float(1.5),
Value::Bool(true),
Value::Text("hello".into()),
Value::Vector(vec![1.0, 2.0, 3.0]),
Value::Numeric {
scaled: 12345,
scale: 2,
},
Value::Date(20_000),
Value::Timestamp(1_700_000_000_000_000),
]),
Row::new(vec![
Value::Null,
Value::Int(0),
Value::BigInt(0),
Value::Float(0.0),
Value::Bool(false),
Value::Text(String::new()),
Value::Vector(vec![]),
Value::Numeric {
scaled: 0,
scale: 2,
},
Value::Date(0),
Value::Timestamp(0),
]),
Row::new(vec![
Value::SmallInt(-1),
Value::Int(-1),
Value::BigInt(-1),
Value::Float(-0.5),
Value::Bool(true),
Value::Text("a much longer payload here".into()),
Value::Vector(vec![0.1, 0.2, 0.3]),
Value::Numeric {
scaled: -999_999_999,
scale: 2,
},
Value::Date(-1),
Value::Timestamp(-1),
]),
];
for row in cases {
let actual = encode_row_body_dense(row, &schema).len();
let fast = row_body_encoded_len(row, &schema);
assert_eq!(actual, fast, "row {row:?}");
}
}
#[test]
fn hot_bytes_grows_on_insert_and_matches_encoded_sum() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
assert_eq!(t.hot_bytes(), 0);
let mut expected: u64 = 0;
for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
let row = make_user_row(id, name);
expected += encode_row_body_dense(&row, &t.schema).len() as u64;
t.insert(row).unwrap();
}
assert_eq!(t.hot_bytes(), expected);
assert_eq!(cat.hot_tier_bytes(), expected);
}
#[test]
fn hot_bytes_shrinks_on_delete() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for (id, name) in [(1i64, "alice"), (2, "bob"), (3, "carol")] {
t.insert(make_user_row(id, name)).unwrap();
}
let before = t.hot_bytes();
let bob_row = make_user_row(2, "bob");
let bob_bytes = encode_row_body_dense(&bob_row, &t.schema).len() as u64;
let removed = t.delete_rows(&[1]);
assert_eq!(removed, 1);
assert_eq!(t.hot_bytes(), before - bob_bytes);
}
#[test]
fn hot_bytes_diffs_on_update_for_variable_width_columns() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
t.insert(make_user_row(1, "alice")).unwrap();
let after_insert = t.hot_bytes();
let new_row = make_user_row(1, "alice-the-longer-name");
let old_len = encode_row_body_dense(&make_user_row(1, "alice"), &t.schema).len() as u64;
let new_len = encode_row_body_dense(&new_row, &t.schema).len() as u64;
t.update_row(0, new_row.values).unwrap();
assert_eq!(t.hot_bytes(), after_insert - old_len + new_len);
assert!(t.hot_bytes() > after_insert, "longer text grew the counter");
}
#[test]
fn hot_bytes_round_trips_through_serialize_deserialize() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for i in 0..10 {
t.insert(make_user_row(i, &alloc::format!("name-{i}")))
.unwrap();
}
let pre = cat.hot_tier_bytes();
let restored = Catalog::deserialize(&cat.serialize()).unwrap();
assert_eq!(restored.hot_tier_bytes(), pre);
assert_eq!(restored.get("users").unwrap().hot_bytes(), pre);
}
#[test]
fn freeze_oldest_to_cold_moves_rows_and_keeps_lookups_working() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..10i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
let total_bytes_before = t.hot_bytes();
let report = cat
.freeze_oldest_to_cold("users", "by_id", 6)
.expect("freeze succeeds");
assert_eq!(report.frozen_rows, 6);
assert_eq!(report.segment_id, 0);
assert!(report.bytes_freed > 0);
assert!(!report.segment_bytes.is_empty());
let t = cat.get("users").unwrap();
assert_eq!(t.row_count(), 4, "4 hot rows remain (10 - 6 frozen)");
assert_eq!(cat.cold_segment_count(), 1);
assert_eq!(
t.hot_bytes(),
total_bytes_before - report.bytes_freed,
"hot_bytes accounting matches FreezeReport"
);
for id in 0..10i64 {
let got = cat
.lookup_by_pk("users", "by_id", &IndexKey::Int(id))
.unwrap_or_else(|| panic!("PK {id} disappeared after freeze"));
assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
}
}
#[test]
fn freeze_twice_preserves_prior_cold_locators() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..12i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
cat.freeze_oldest_to_cold("users", "by_id", 4)
.expect("first freeze ok");
cat.freeze_oldest_to_cold("users", "by_id", 4)
.expect("second freeze ok");
assert_eq!(cat.get("users").unwrap().row_count(), 4);
assert_eq!(cat.cold_segment_count(), 2);
for id in 0..12i64 {
let got = cat
.lookup_by_pk("users", "by_id", &IndexKey::Int(id))
.unwrap_or_else(|| panic!("PK {id} not resolvable after two freezes"));
assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
}
}
#[test]
fn freeze_oldest_to_cold_rejects_invalid_input() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..3i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
assert!(matches!(
cat.freeze_oldest_to_cold("users", "by_id", 0),
Err(StorageError::Corrupt(_))
));
assert!(matches!(
cat.freeze_oldest_to_cold("missing", "by_id", 1),
Err(StorageError::Corrupt(_))
));
assert!(matches!(
cat.freeze_oldest_to_cold("users", "no_such_index", 1),
Err(StorageError::Corrupt(_))
));
assert!(matches!(
cat.freeze_oldest_to_cold("users", "by_id", 999),
Err(StorageError::Corrupt(_))
));
assert_eq!(cat.get("users").unwrap().row_count(), 3);
assert_eq!(cat.cold_segment_count(), 0);
}
#[test]
fn freeze_oldest_to_cold_rejects_non_integer_pk() {
let mut cat = Catalog::new();
cat.create_table(TableSchema::new(
"by_name",
vec![
ColumnSchema::new("name", DataType::Text, false),
ColumnSchema::new("payload", DataType::BigInt, false),
],
))
.unwrap();
let t = cat.get_mut("by_name").unwrap();
t.insert(Row::new(vec![Value::Text("a".into()), Value::BigInt(1)]))
.unwrap();
t.add_index("by_n".into(), "name").unwrap();
let err = cat
.freeze_oldest_to_cold("by_name", "by_n", 1)
.expect_err("non-integer PK rejected");
match err {
StorageError::Corrupt(s) => assert!(
s.contains("non-integer"),
"error message names the constraint: {s}"
),
other => panic!("expected Corrupt, got {other:?}"),
}
assert_eq!(cat.get("by_name").unwrap().row_count(), 1);
assert_eq!(cat.cold_segment_count(), 0);
}
#[test]
fn freeze_keeps_remaining_hot_rows_addressable_via_secondary_index() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..6i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
t.add_index("by_name".into(), "name").unwrap();
cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
let idx = cat.get("users").unwrap().index_on(1).unwrap();
let got = idx.lookup_eq(&IndexKey::Text("u-4".into()));
assert_eq!(got.len(), 1);
assert!(got[0].is_hot(), "kept-hot rows still surface as Hot");
match got[0] {
RowLocator::Hot(i) => {
assert_eq!(i, 1);
}
RowLocator::Cold { .. } => unreachable!(),
}
}
#[test]
fn promote_cold_row_pulls_frozen_row_back_to_hot_tier() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..6i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
cat.freeze_oldest_to_cold("users", "by_id", 4).unwrap();
let hot_bytes_before = cat.get("users").unwrap().hot_bytes();
let new_idx = cat
.promote_cold_row("users", "by_id", &IndexKey::Int(2))
.expect("promote ok")
.expect("PK 2 was cold");
assert_eq!(
new_idx, 2,
"promoted row appended after the 2 surviving hot rows"
);
let t = cat.get("users").unwrap();
assert_eq!(t.row_count(), 3, "hot tier grew from 2 to 3");
let row = make_user_row(2, "u-2");
let row_len = encode_row_body_dense(&row, &t.schema).len() as u64;
assert_eq!(t.hot_bytes(), hot_bytes_before + row_len);
let entries = t.index_on(0).unwrap().lookup_eq(&IndexKey::Int(2));
assert_eq!(entries.len(), 1, "exactly one locator per key");
assert!(entries[0].is_hot(), "promote retired the Cold locator");
assert_eq!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
.unwrap(),
row
);
assert_eq!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(0))
.unwrap(),
make_user_row(0, "u-0")
);
}
#[test]
fn promote_cold_row_returns_none_when_key_is_not_cold() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
t.insert(make_user_row(7, "alice")).unwrap();
t.add_index("by_id".into(), "id").unwrap();
assert!(
cat.promote_cold_row("users", "by_id", &IndexKey::Int(7))
.unwrap()
.is_none()
);
assert!(
cat.promote_cold_row("users", "by_id", &IndexKey::Int(99))
.unwrap()
.is_none()
);
assert_eq!(cat.get("users").unwrap().row_count(), 1);
assert_eq!(cat.cold_segment_count(), 0);
}
#[test]
fn shadow_cold_row_removes_cold_locators_and_drops_lookup() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..5i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
assert!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
.is_some(),
"frozen PK resolves before shadow"
);
let removed = cat
.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
.unwrap();
assert_eq!(removed, 1, "exactly one cold locator retired");
assert!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(1))
.is_none(),
"shadowed key no longer resolves"
);
assert_eq!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(0))
.unwrap(),
make_user_row(0, "u-0")
);
assert_eq!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(2))
.unwrap(),
make_user_row(2, "u-2")
);
}
#[test]
fn shadow_cold_row_returns_zero_when_key_is_not_cold() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
t.insert(make_user_row(1, "alice")).unwrap();
t.add_index("by_id".into(), "id").unwrap();
assert_eq!(
cat.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
.unwrap(),
0,
"hot-only key drops no cold locators"
);
assert_eq!(
cat.shadow_cold_row("users", "by_id", &IndexKey::Int(999))
.unwrap(),
0,
"absent key drops no cold locators"
);
assert_eq!(cat.get("users").unwrap().row_count(), 1);
}
#[test]
fn promote_and_shadow_reject_invalid_inputs() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
t.insert(make_user_row(1, "alice")).unwrap();
t.add_index("by_id".into(), "id").unwrap();
assert!(matches!(
cat.promote_cold_row("missing", "by_id", &IndexKey::Int(1)),
Err(StorageError::Corrupt(_))
));
assert!(matches!(
cat.shadow_cold_row("missing", "by_id", &IndexKey::Int(1)),
Err(StorageError::Corrupt(_))
));
assert!(matches!(
cat.promote_cold_row("users", "no_such_index", &IndexKey::Int(1)),
Err(StorageError::Corrupt(_))
));
assert!(matches!(
cat.shadow_cold_row("users", "no_such_index", &IndexKey::Int(1)),
Err(StorageError::Corrupt(_))
));
}
#[test]
fn commit_freeze_slices_single_slice_matches_freeze_oldest() {
let mut a = Catalog::new();
let mut b = Catalog::new();
for cat in [&mut a, &mut b] {
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..10i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
}
let single = a.freeze_oldest_to_cold("users", "by_id", 6).unwrap();
let slice = b
.prepare_freeze_slice("users", "by_id", 0..6)
.expect("prepare");
let parallel = b
.commit_freeze_slices("users", "by_id", alloc::vec![slice])
.expect("commit");
assert_eq!(single.segment_id, parallel.segment_id);
assert_eq!(single.frozen_rows, parallel.frozen_rows);
assert_eq!(single.bytes_freed, parallel.bytes_freed);
assert_eq!(single.segment_bytes, parallel.segment_bytes);
for id in 0..10i64 {
assert_eq!(
a.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
b.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
"PK {id} differs after single vs slice freeze"
);
}
}
#[test]
fn commit_freeze_slices_two_slices_match_single_slice() {
let mut a = Catalog::new();
let mut b = Catalog::new();
for cat in [&mut a, &mut b] {
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in [3, 7, 1, 9, 5, 0, 8, 4, 2, 6].iter().copied() {
t.insert(make_user_row(id as i64, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
}
let single = a
.prepare_freeze_slice("users", "by_id", 0..8)
.expect("prepare");
let one = a
.commit_freeze_slices("users", "by_id", alloc::vec![single])
.expect("commit one");
let s1 = b
.prepare_freeze_slice("users", "by_id", 0..4)
.expect("prepare s1");
let s2 = b
.prepare_freeze_slice("users", "by_id", 4..8)
.expect("prepare s2");
let two = b
.commit_freeze_slices("users", "by_id", alloc::vec![s1, s2])
.expect("commit two");
assert_eq!(one.segment_bytes, two.segment_bytes);
assert_eq!(one.frozen_rows, two.frozen_rows);
for id in 0..10i64 {
assert_eq!(
a.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
b.lookup_by_pk("users", "by_id", &IndexKey::Int(id)),
"PK {id} differs after one-slice vs two-slice freeze"
);
}
}
#[test]
fn commit_freeze_slices_rejects_gap() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..6i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
let s1 = cat.prepare_freeze_slice("users", "by_id", 0..2).unwrap();
let s2 = cat.prepare_freeze_slice("users", "by_id", 3..5).unwrap();
assert!(matches!(
cat.commit_freeze_slices("users", "by_id", alloc::vec![s1, s2]),
Err(StorageError::Corrupt(_))
));
assert_eq!(cat.cold_segment_count(), 0);
assert_eq!(cat.get("users").unwrap().row_count(), 6);
}
#[test]
fn commit_freeze_slices_empty_is_noop() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..3i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
let report = cat
.commit_freeze_slices("users", "by_id", Vec::new())
.unwrap();
assert_eq!(report.frozen_rows, 0);
assert_eq!(cat.cold_segment_count(), 0);
assert_eq!(cat.get("users").unwrap().row_count(), 3);
}
#[test]
fn compact_merges_small_segments_storage_unit() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..8i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
assert_eq!(cat.cold_segment_count(), 2);
assert_eq!(cat.cold_segment_slot_count(), 2);
let max_seg_bytes = cat
.cold_segment_ids_global()
.iter()
.map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
.max()
.unwrap();
let target = max_seg_bytes + 1;
let report = cat
.compact_cold_segments("users", "by_id", target)
.expect("compact succeeds");
assert_eq!(report.sources.len(), 2);
let merged_id = report.merged_segment_id.expect("merge happened");
assert_eq!(report.merged_rows, 6);
assert_eq!(report.deleted_rows_pruned, 0);
assert!(!report.merged_segment_bytes.is_empty());
assert_eq!(cat.cold_segment_count(), 1);
assert_eq!(cat.cold_segment_slot_count(), 3);
assert_eq!(cat.cold_segment_ids_global(), alloc::vec![merged_id]);
for id in 0..8i64 {
let got = cat
.lookup_by_pk("users", "by_id", &IndexKey::Int(id))
.unwrap_or_else(|| panic!("PK {id} lost after compaction"));
assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
}
}
#[test]
fn compact_drops_shadowed_cold_rows() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..6i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
assert_eq!(
cat.shadow_cold_row("users", "by_id", &IndexKey::Int(1))
.unwrap(),
1
);
assert_eq!(
cat.shadow_cold_row("users", "by_id", &IndexKey::Int(4))
.unwrap(),
1
);
let max_seg_bytes = cat
.cold_segment_ids_global()
.iter()
.map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
.max()
.unwrap();
let report = cat
.compact_cold_segments("users", "by_id", max_seg_bytes + 1)
.expect("compact succeeds");
assert_eq!(report.sources.len(), 2);
assert_eq!(report.merged_rows, 4, "6 frozen − 2 shadowed = 4 live");
assert_eq!(report.deleted_rows_pruned, 2);
for shadowed in [1i64, 4i64] {
assert!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(shadowed))
.is_none(),
"shadowed PK {shadowed} must remain invisible after compact"
);
}
for live in [0i64, 2, 3, 5] {
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(live))
.unwrap_or_else(|| panic!("live PK {live} lost after compact"));
}
}
#[test]
fn compact_is_noop_below_two_candidates() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..6i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
let report = cat
.compact_cold_segments("users", "by_id", 1 << 30)
.expect("noop ok");
assert!(report.merged_segment_id.is_none());
assert!(report.sources.is_empty());
cat.freeze_oldest_to_cold("users", "by_id", 4).unwrap();
let report = cat
.compact_cold_segments("users", "by_id", 1 << 30)
.expect("noop ok");
assert!(report.merged_segment_id.is_none());
assert_eq!(cat.cold_segment_count(), 1);
let report = cat
.compact_cold_segments("users", "by_id", 1)
.expect("noop ok");
assert!(report.merged_segment_id.is_none());
assert_eq!(cat.cold_segment_count(), 1);
}
#[test]
fn compact_swap_survives_catalog_roundtrip_via_load_at() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..6i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
cat.freeze_oldest_to_cold("users", "by_id", 3).unwrap();
let max_seg_bytes = cat
.cold_segment_ids_global()
.iter()
.map(|id| cat.cold_segment(*id).unwrap().bytes().len() as u64)
.max()
.unwrap();
let report = cat
.compact_cold_segments("users", "by_id", max_seg_bytes + 1)
.expect("compact ok");
let merged_id = report.merged_segment_id.unwrap();
let cat_bytes = cat.serialize();
let merged_bytes = report.merged_segment_bytes.clone();
let mut restored = Catalog::deserialize(&cat_bytes).expect("deserialize ok");
restored
.load_segment_bytes_at(merged_id, merged_bytes)
.expect("reload merged ok");
for id in 0..6i64 {
let got = restored
.lookup_by_pk("users", "by_id", &IndexKey::Int(id))
.unwrap_or_else(|| panic!("PK {id} lost across roundtrip"));
assert_eq!(got, make_user_row(id, &alloc::format!("u-{id}")));
}
assert_eq!(restored.cold_segment_count(), 1);
}
#[test]
fn load_segment_bytes_at_pads_and_rejects_collision() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..4i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
let report = cat.freeze_oldest_to_cold("users", "by_id", 2).unwrap();
let bytes_seg0 = report.segment_bytes.clone();
cat.load_segment_bytes_at(5, bytes_seg0.clone())
.expect("pad + load ok");
assert_eq!(cat.cold_segment_slot_count(), 6);
assert_eq!(cat.cold_segment_count(), 2);
assert!(matches!(
cat.load_segment_bytes_at(5, bytes_seg0.clone()),
Err(StorageError::Corrupt(_))
));
assert!(matches!(
cat.load_segment_bytes_at(0, bytes_seg0),
Err(StorageError::Corrupt(_))
));
}
#[test]
fn promote_then_refreeze_does_not_leave_orphan_locators() {
let mut cat = Catalog::new();
cat.create_table(bigint_pk_users_schema()).unwrap();
let t = cat.get_mut("users").unwrap();
for id in 0..4i64 {
t.insert(make_user_row(id, &alloc::format!("u-{id}")))
.unwrap();
}
t.add_index("by_id".into(), "id").unwrap();
cat.freeze_oldest_to_cold("users", "by_id", 2).unwrap();
let promoted = cat
.promote_cold_row("users", "by_id", &IndexKey::Int(0))
.unwrap();
assert!(promoted.is_some());
let entries_after_promote = cat
.get("users")
.unwrap()
.index_on(0)
.unwrap()
.lookup_eq(&IndexKey::Int(0))
.to_vec();
assert_eq!(entries_after_promote.len(), 1);
assert!(entries_after_promote[0].is_hot());
for id in [2i64, 3] {
assert_eq!(
cat.lookup_by_pk("users", "by_id", &IndexKey::Int(id))
.unwrap(),
make_user_row(id, &alloc::format!("u-{id}"))
);
}
}