use super::*;
fn open(base: &std::path::Path) -> StorageHandle {
open_store(StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap()
}
#[test]
fn batch_write_all_vectors_searchable() {
let base = temp_dir("batch_all_searchable");
let mut handle = open(&base);
let space_id = 1_u32;
let n = 10_usize;
let mut deltas = Vec::new();
for i in 0..n {
put_full_node(&mut handle, i as u64, 1, &[]).unwrap();
let v = vec![i as f32, 0.0, 0.0];
let payload = encode_vector_payload_f32(space_id, VectorMetric::Cosine, &v, false);
deltas.push(encode_delta(i as u64, 2, &payload));
}
put_vector_delta_batch(&mut handle, &deltas).unwrap();
let results = hnsw_search_in_space(&handle, space_id, &[9.0, 0.0, 0.0], n);
assert_eq!(
results.len(),
n,
"all {n} vectors should be searchable after batch write"
);
}
#[test]
fn batch_write_wal_replay_restores_all_vectors() {
let base = temp_dir("batch_wal_replay");
let space_id = 2_u32;
{
let mut handle = open(&base);
let n = 5_usize;
let mut deltas = Vec::new();
for i in 0..n {
put_full_node(&mut handle, i as u64, 1, &[]).unwrap();
let v = vec![i as f32, 0.0];
let payload = encode_vector_payload_f32(space_id, VectorMetric::Cosine, &v, false);
deltas.push(encode_delta(i as u64, 2, &payload));
}
put_vector_delta_batch(&mut handle, &deltas).unwrap();
}
let mut handle = open(&base);
recover_from_wal(&mut handle).unwrap();
let results = hnsw_search_in_space(&handle, space_id, &[4.0, 0.0], 10);
assert_eq!(
results.len(),
5,
"all 5 vectors must survive WAL replay after batch write"
);
}
#[test]
fn batch_write_clears_embedding_pending_flags() {
let base = temp_dir("batch_clears_pending");
let mut handle = open(&base);
let space_id = 3_u32;
for i in 0..3_u64 {
put_full_node(&mut handle, i, 1, &[]).unwrap();
put_embedding_pending(&mut handle, i).unwrap();
assert!(node_is_embedding_pending(&handle, i));
}
let mut deltas = Vec::new();
for i in 0..3_u64 {
let payload =
encode_vector_payload_f32(space_id, VectorMetric::Cosine, &[i as f32, 0.0], false);
deltas.push(encode_delta(i, 2, &payload));
}
put_vector_delta_batch(&mut handle, &deltas).unwrap();
for i in 0..3_u64 {
assert!(
!node_is_embedding_pending(&handle, i),
"embedding_pending flag for node {i} must be cleared after batch write"
);
}
}
#[test]
fn batch_write_empty_is_noop() {
let base = temp_dir("batch_empty");
let mut handle = open(&base);
put_vector_delta_batch(&mut handle, &[]).unwrap();
}
#[test]
fn suspended_space_suppresses_scheduler() {
let base = temp_dir("suspend_suppresses_scheduler");
let mut handle = open(&base);
let space_id = 10_u32;
let n = 200_usize;
for i in 0..n {
put_full_node(&mut handle, i as u64, 1, &[]).unwrap();
}
suspend_hnsw_maintenance(&mut handle, space_id);
let mut deltas = Vec::new();
for i in 0..n {
let payload =
encode_vector_payload_f32(space_id, VectorMetric::Cosine, &[i as f32, 0.0], false);
deltas.push(encode_delta(i as u64, 2, &payload));
}
put_vector_delta_batch(&mut handle, &deltas).unwrap();
assert!(
handle.last_hnsw_rebuild_reason.is_none(),
"scheduler must not fire for a suspended space"
);
}
#[test]
fn resume_reenables_scheduler() {
let base = temp_dir("resume_reenables");
let mut handle = open(&base);
let space_id = 11_u32;
let n = 200_usize;
for i in 0..n {
put_full_node(&mut handle, i as u64, 1, &[]).unwrap();
}
suspend_hnsw_maintenance(&mut handle, space_id);
let mut deltas: Vec<Vec<u8>> = Vec::new();
for i in 0..n {
let payload =
encode_vector_payload_f32(space_id, VectorMetric::Cosine, &[i as f32, 0.0], false);
deltas.push(encode_delta(i as u64, 2, &payload));
}
put_vector_delta_batch(&mut handle, &deltas).unwrap();
assert!(handle.last_hnsw_rebuild_reason.is_none());
resume_hnsw_maintenance(&mut handle, space_id);
let extra_payload =
encode_vector_payload_f32(space_id, VectorMetric::Cosine, &[999.0, 0.0], false);
let extra_delta = encode_delta(9999, 2, &extra_payload);
put_full_node(&mut handle, 9999, 1, &[]).unwrap();
put_vector_delta(&mut handle, &extra_delta).unwrap();
handle.hnsw_updated_vectors = (handle.hnsw_total_vectors as f32 * 0.1) as u64;
let trip_payload =
encode_vector_payload_f32(space_id, VectorMetric::Cosine, &[1000.0, 0.0], false);
let trip_delta = encode_delta(10000, 2, &trip_payload);
put_full_node(&mut handle, 10000, 1, &[]).unwrap();
put_vector_delta(&mut handle, &trip_delta).unwrap();
assert!(
handle.last_hnsw_rebuild_reason.is_some(),
"scheduler must fire after resume when threshold is exceeded"
);
}
#[test]
fn rebuild_vector_space_produces_correct_graph() {
let base = temp_dir("rebuild_correct");
let mut handle = open(&base);
let space_id = 20_u32;
for i in 1..=3_u64 {
put_full_node(&mut handle, i, 1, &[]).unwrap();
let payload =
encode_vector_payload_f32(space_id, VectorMetric::Cosine, &[i as f32, 0.0], false);
put_vector_delta(&mut handle, &encode_delta(i, 2, &payload)).unwrap();
}
put_tombstone(&mut handle, 2, 3).unwrap();
rebuild_vector_space(&mut handle, space_id).unwrap();
let results = hnsw_search_in_space(&handle, space_id, &[1.0, 0.0], 10);
let ids: Vec<u64> = results.into_iter().map(|(id, _)| id).collect();
assert!(ids.contains(&1), "node 1 must be in rebuilt graph");
assert!(ids.contains(&3), "node 3 must be in rebuilt graph");
assert!(
!ids.contains(&2),
"tombstoned node 2 must not appear after rebuild"
);
}