use super::graph::NativeHnsw;
use super::layer::NodeId;
use crate::distance::DistanceMetric;
use crate::index::hnsw::native::distance::{CachedSimdDistance, CpuDistance};
#[allow(clippy::cast_precision_loss)]
#[test]
fn test_insert_and_search() {
let engine = CpuDistance::new(DistanceMetric::Euclidean);
let hnsw = NativeHnsw::new(engine, 16, 100, 1000);
for i in 0..100 {
let v: Vec<f32> = (0..32).map(|j| (i * 32 + j) as f32).collect();
hnsw.insert(&v).expect("test");
}
assert_eq!(hnsw.len(), 100);
let query: Vec<f32> = (0..32).map(|j| j as f32).collect();
let results = hnsw.search(&query, 10, 50);
assert!(!results.is_empty());
assert!(results.len() <= 10);
assert_eq!(results[0].0, 0);
}
#[test]
fn test_empty_search() {
let engine = CpuDistance::new(DistanceMetric::Cosine);
let hnsw = NativeHnsw::new(engine, 16, 100, 1000);
let query = vec![1.0, 2.0, 3.0];
let results = hnsw.search(&query, 10, 50);
assert!(results.is_empty());
}
#[allow(clippy::cast_precision_loss)]
#[test]
fn test_heuristic_selection_empty_candidates() {
let engine = CpuDistance::new(DistanceMetric::Euclidean);
let hnsw = NativeHnsw::new(engine, 16, 100, 100);
hnsw.insert(&[0.0; 32]).expect("test");
let candidates: Vec<(NodeId, f32)> = vec![];
let selected = hnsw.select_neighbors(&candidates, 10);
assert!(selected.is_empty(), "Empty candidates should return empty");
}
#[allow(clippy::cast_precision_loss)]
#[test]
fn test_heuristic_selection_fewer_than_max() {
let engine = CpuDistance::new(DistanceMetric::Euclidean);
let hnsw = NativeHnsw::new(engine, 16, 100, 100);
for i in 0..5 {
hnsw.insert(&[i as f32; 32]).expect("test");
}
let candidates: Vec<(NodeId, f32)> = vec![(0, 0.0), (1, 1.0), (2, 2.0)];
let selected = hnsw.select_neighbors(&candidates, 10);
assert_eq!(
selected.len(),
3,
"Should return all candidates when fewer than max"
);
}
#[allow(clippy::cast_precision_loss)]
#[test]
fn test_heuristic_selection_respects_max() {
let engine = CpuDistance::new(DistanceMetric::Euclidean);
let hnsw = NativeHnsw::new(engine, 16, 100, 100);
for i in 0..20 {
hnsw.insert(&[i as f32; 32]).expect("test");
}
let candidates: Vec<(NodeId, f32)> = (0..15).map(|i| (i, i as f32)).collect();
let selected = hnsw.select_neighbors(&candidates, 5);
assert_eq!(selected.len(), 5, "Should respect max_neighbors limit");
}
#[test]
fn test_heuristic_selection_prefers_diverse_neighbors() {
let engine = CpuDistance::new(DistanceMetric::Euclidean);
let hnsw = NativeHnsw::new(engine, 16, 100, 100);
hnsw.insert(&[0.0; 32]).expect("test");
let mut v1 = vec![0.0; 32];
v1[0] = 10.0;
hnsw.insert(&v1).expect("test"); let mut v2 = vec![0.0; 32];
v2[0] = 10.5;
hnsw.insert(&v2).expect("test"); let mut v3 = vec![0.0; 32];
v3[0] = 10.2;
hnsw.insert(&v3).expect("test");
let mut v4 = vec![0.0; 32];
v4[1] = 10.0;
hnsw.insert(&v4).expect("test");
let candidates: Vec<(NodeId, f32)> = vec![
(1, 10.0), (2, 10.5), (3, 10.2), (4, 10.0), ];
let selected = hnsw.select_neighbors(&candidates, 2);
assert_eq!(selected.len(), 2);
assert!(selected.contains(&1), "Should include first closest");
}
#[allow(clippy::cast_precision_loss)]
#[test]
fn test_heuristic_fills_quota_with_closest_if_needed() {
let engine = CpuDistance::new(DistanceMetric::Euclidean);
let hnsw = NativeHnsw::new(engine, 16, 100, 100);
for i in 0..10 {
hnsw.insert(&[i as f32; 32]).expect("test");
}
let candidates: Vec<(NodeId, f32)> = (0..10).map(|i| (i, i as f32)).collect();
let selected = hnsw.select_neighbors(&candidates, 8);
assert_eq!(
selected.len(),
8,
"Should fill quota with closest candidates"
);
}
#[test]
fn test_recall_with_heuristic_selection() {
let engine = CachedSimdDistance::new(DistanceMetric::Cosine, 128);
let hnsw = NativeHnsw::new(engine, 32, 200, 1000);
for i in 0..500 {
let v: Vec<f32> = (0..128)
.map(|j| ((i * 127 + j) as f32 * 0.01).sin())
.collect();
hnsw.insert(&v).expect("test");
}
let query: Vec<f32> = (0..128).map(|j| (j as f32 * 0.01).sin()).collect();
let results = hnsw.search(&query, 10, 100);
assert!(!results.is_empty(), "Should find results");
assert!(results.len() >= 5, "Should find at least 5 neighbors");
for i in 1..results.len() {
assert!(
results[i].1 >= results[i - 1].1,
"Results should be sorted by distance"
);
}
}
#[test]
fn test_graph_parallel_insert_search_integrity() {
use std::sync::Arc;
use std::thread;
let engine = CpuDistance::new(DistanceMetric::Euclidean);
let hnsw = Arc::new(NativeHnsw::new(engine, 16, 100, 500));
for i in 0..50 {
#[allow(clippy::cast_precision_loss)]
let v: Vec<f32> = (0..32).map(|j| (i * 32 + j) as f32).collect();
hnsw.insert(&v).expect("test");
}
let mut handles = vec![];
for t in 0..4_usize {
let hnsw_clone = Arc::clone(&hnsw);
handles.push(thread::spawn(move || {
for i in 0..50_usize {
#[allow(clippy::cast_precision_loss)]
let v: Vec<f32> = (0..32).map(|j| ((t * 1000 + i) * 32 + j) as f32).collect();
hnsw_clone.insert(&v).expect("test");
}
}));
}
for _ in 0..2 {
let hnsw_clone = Arc::clone(&hnsw);
handles.push(thread::spawn(move || {
for i in 0..30_usize {
#[allow(clippy::cast_precision_loss)]
let query: Vec<f32> = (0..32).map(|j| (i * 32 + j) as f32).collect();
let results = hnsw_clone.search(&query, 5, 50);
for window in results.windows(2) {
assert!(
window[0].1 <= window[1].1,
"Search results must be distance-sorted"
);
}
}
}));
}
for handle in handles {
handle.join().expect("No deadlock or panic");
}
assert_eq!(hnsw.len(), 250, "All inserts must be reflected in count");
let snapshot = super::graph::safety_counters::HNSW_COUNTERS.snapshot();
assert_eq!(
snapshot.invariant_violation_total, 0,
"No lock-order violations during parallel graph operations"
);
}
#[test]
fn test_concurrent_insertions() {
use std::sync::Arc;
use std::thread;
let engine = CpuDistance::new(DistanceMetric::Euclidean);
let hnsw = Arc::new(NativeHnsw::new(engine, 16, 100, 1000));
let handles: Vec<_> = (0..4)
.map(|thread_id| {
let hnsw_clone = Arc::clone(&hnsw);
thread::spawn(move || {
for i in 0..50 {
#[allow(clippy::cast_precision_loss)]
let v: Vec<f32> = (0..32)
.map(|j| ((thread_id * 50 + i) * 32 + j) as f32)
.collect();
hnsw_clone.insert(&v).expect("test");
}
})
})
.collect();
for handle in handles {
handle.join().expect("Thread should not panic");
}
assert_eq!(hnsw.len(), 200, "All insertions should succeed");
}
#[test]
fn test_concurrent_insert_and_search() {
use std::sync::Arc;
use std::thread;
let engine = CpuDistance::new(DistanceMetric::Euclidean);
let hnsw = Arc::new(NativeHnsw::new(engine, 16, 100, 1000));
for i in 0..100 {
#[allow(clippy::cast_precision_loss)]
let v: Vec<f32> = (0..32).map(|j| (i * 32 + j) as f32).collect();
hnsw.insert(&v).expect("test");
}
let handles: Vec<_> = (0..4)
.map(|thread_id| {
let hnsw_clone = Arc::clone(&hnsw);
thread::spawn(move || {
for i in 0..25 {
if thread_id % 2 == 0 {
#[allow(clippy::cast_precision_loss)]
let v: Vec<f32> = (0..32)
.map(|j| ((100 + thread_id * 25 + i) * 32 + j) as f32)
.collect();
hnsw_clone.insert(&v).expect("test");
} else {
#[allow(clippy::cast_precision_loss)]
let query: Vec<f32> = (0..32).map(|j| (i * 32 + j) as f32).collect();
let results = hnsw_clone.search(&query, 5, 50);
assert!(!results.is_empty(), "Search should return results");
}
}
})
})
.collect();
for handle in handles {
handle.join().expect("Thread should not panic");
}
assert!(
hnsw.len() >= 100,
"Index should have at least initial vectors"
);
}
#[test]
fn test_cas_promote_entry_point_concurrent() {
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::thread;
let engine = CpuDistance::new(DistanceMetric::Euclidean);
let hnsw = Arc::new(NativeHnsw::new(engine, 16, 100, 2000));
for i in 0..200_usize {
#[allow(clippy::cast_precision_loss)]
let v: Vec<f32> = (0..32).map(|j| (i * 32 + j) as f32).collect();
hnsw.insert(&v).expect("test: insert should succeed");
}
let initial_ep = hnsw.entry_point.load(Ordering::Acquire);
assert_ne!(
initial_ep,
super::graph::NO_ENTRY_POINT,
"test: entry point must be set after pre-population"
);
let mut handles = vec![];
for t in 0..8_usize {
let hnsw_clone = Arc::clone(&hnsw);
handles.push(thread::spawn(move || {
for layer in (t + 1)..=(t + 5) {
hnsw_clone.promote_entry_point(t, layer);
}
}));
}
for handle in handles {
handle.join().expect("test: thread must not panic");
}
let final_ep = hnsw.entry_point.load(Ordering::Acquire);
let final_max = hnsw.max_layer.load(Ordering::Relaxed);
assert_eq!(
final_max, 12,
"test: max_layer must be highest promoted layer"
);
assert_ne!(
final_ep,
super::graph::NO_ENTRY_POINT,
"test: entry point must not be sentinel after promotions"
);
}
#[test]
fn test_cas_promote_from_empty() {
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::thread;
let engine = CpuDistance::new(DistanceMetric::Euclidean);
let hnsw = Arc::new(NativeHnsw::new(engine, 16, 100, 100));
let mut handles = vec![];
for t in 0..4_usize {
let hnsw_clone = Arc::clone(&hnsw);
handles.push(thread::spawn(move || {
hnsw_clone.promote_entry_point(t, 0);
}));
}
for handle in handles {
handle.join().expect("test: thread must not panic");
}
let ep = hnsw.entry_point.load(Ordering::Acquire);
assert_ne!(
ep,
super::graph::NO_ENTRY_POINT,
"test: one thread must have set the entry point"
);
assert!(
ep < 4,
"test: entry point must be one of the promoted node IDs (0..4)"
);
}
#[test]
fn test_no_mutex_field_exists() {
let engine = CpuDistance::new(DistanceMetric::Euclidean);
let hnsw = NativeHnsw::new(engine, 16, 100, 500);
for i in 0..200_usize {
#[allow(clippy::cast_precision_loss)]
let v: Vec<f32> = (0..32).map(|j| (i * 32 + j) as f32).collect();
hnsw.insert(&v).expect("test: insert should succeed");
}
let query: Vec<f32> = (0..32).map(|j| j as f32).collect();
let results = hnsw.search(&query, 10, 50);
assert!(!results.is_empty(), "test: search must return results");
}
#[cfg(feature = "gpu")]
#[test]
fn test_gpu_snapshot_version_bumps_on_every_mutation() {
use std::sync::atomic::Ordering;
let engine = CachedSimdDistance::new(DistanceMetric::Cosine, 8);
let hnsw = NativeHnsw::new_with_dimension_and_alpha(engine, 16, 100, 1000, 8, 1.2)
.expect("test: index creation should succeed");
let v0 = hnsw.gpu_snapshot_version.load(Ordering::Acquire);
assert_eq!(v0, 0, "fresh index starts at version 0");
let vec_a: Vec<f32> = (0..8).map(|i| i as f32 / 10.0).collect();
hnsw.insert(&vec_a).expect("test: insert succeeds");
let v1 = hnsw.gpu_snapshot_version.load(Ordering::Acquire);
assert!(v1 > v0, "single insert must bump the snapshot version");
let batch: Vec<Vec<f32>> = (0..120)
.map(|seed| (0..8).map(|j| (seed + j) as f32 / 100.0).collect())
.collect();
let batch_refs: Vec<(&[f32], usize)> =
batch.iter().enumerate().map(|(i, v)| (&v[..], i)).collect();
hnsw.parallel_insert(&batch_refs)
.expect("test: parallel_insert succeeds");
let v2 = hnsw.gpu_snapshot_version.load(Ordering::Acquire);
assert!(v2 > v1, "parallel insert path must also bump the version");
}
#[cfg(feature = "gpu")]
#[test]
fn test_reorder_for_locality_bumps_snapshot_version() {
use std::sync::atomic::Ordering;
const N: usize = 1024;
const DIM: usize = 8;
let engine = CachedSimdDistance::new(DistanceMetric::Cosine, DIM);
let hnsw = NativeHnsw::new_with_dimension_and_alpha(engine, 16, 100, 2048, DIM, 1.2)
.expect("test: index creation");
let vectors: Vec<Vec<f32>> = (0..N)
.map(|i| (0..DIM).map(|j| (i * DIM + j) as f32 / 1000.0).collect())
.collect();
let refs: Vec<(&[f32], usize)> = vectors
.iter()
.enumerate()
.map(|(i, v)| (&v[..], i))
.collect();
hnsw.parallel_insert(&refs)
.expect("test: bulk insert for reorder");
let before = hnsw.gpu_snapshot_version.load(Ordering::Acquire);
hnsw.reorder_for_locality().expect("test: reorder succeeds");
let after = hnsw.gpu_snapshot_version.load(Ordering::Acquire);
assert!(
after > before,
"reorder_for_locality must bump the snapshot version \
(before={before}, after={after})"
);
assert!(
hnsw.gpu_vectors_snapshot.lock().is_none(),
"reorder_for_locality must clear the snapshot mutex"
);
}
#[cfg(feature = "gpu")]
#[test]
fn test_snapshot_version_detects_staleness_without_explicit_clear() {
use std::sync::atomic::Ordering;
let engine = CachedSimdDistance::new(DistanceMetric::Cosine, 4);
let hnsw = NativeHnsw::new_with_dimension_and_alpha(engine, 16, 100, 1000, 4, 1.2)
.expect("test: index creation should succeed");
let v: Vec<f32> = vec![0.1, 0.2, 0.3, 0.4];
hnsw.insert(&v).expect("test: insert succeeds");
let current_version = hnsw.gpu_snapshot_version.load(Ordering::Acquire);
let fake_arc: std::sync::Arc<[f32]> = vec![9.9_f32; 4].into();
*hnsw.gpu_vectors_snapshot.lock() = Some((current_version, 4, fake_arc.clone()));
hnsw.invalidate_gpu_caches();
let post_version = hnsw.gpu_snapshot_version.load(Ordering::Acquire);
assert!(
post_version > current_version,
"invalidate_gpu_caches must bump the version"
);
assert!(
hnsw.gpu_vectors_snapshot.lock().is_none(),
"invalidate_gpu_caches must also clear the snapshot mutex (belt-and-suspenders)"
);
}