use instant_distance::{Builder, HnswMap, Search};
#[cfg(test)]
use instant_distance::Point;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::Sender;
use std::thread::JoinHandle;
use crate::hooks::EvictionEvent;
const EVICTION_TRACE_TARGET: &str = "hnsw.eviction";
const REBUILD_THRESHOLD: usize = 200;
const REBUILD_WAIT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(1);
pub(crate) const REBUILD_WAIT_POLL_INTERVAL: std::time::Duration =
std::time::Duration::from_millis(10);
pub const CLI_HNSW_BUILD_MIN_ENTRIES: usize = 20_000;
const MAX_ENTRIES: usize = 100_000;
#[must_use]
pub fn index_evictions_total() -> u64 {
crate::metrics::hnsw_evictions_total()
}
const EVICTION_RATE_CEILING_PER_HOUR: usize = 10;
const EVICTION_RATE_RING_CAP: usize = 64;
static EVICTION_RATE_RING: Mutex<std::collections::VecDeque<u64>> =
Mutex::new(std::collections::VecDeque::new());
#[must_use]
pub fn evicted_recently(window_secs: u64) -> bool {
let last = crate::metrics::hnsw_last_eviction_at_nanos();
if last == 0 {
return false;
}
let now_nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
let elapsed_nanos = u128::from(u64::MAX).min(now_nanos.saturating_sub(u128::from(last)));
elapsed_nanos < u128::from(window_secs).saturating_mul(1_000_000_000)
}
#[doc(hidden)]
pub fn reset_eviction_counters_for_test() {
crate::metrics::reset_hnsw_eviction_counters_for_test();
if let Ok(mut g) = EVICTION_RATE_RING.lock() {
g.clear();
}
}
fn record_eviction_and_count_recent(now_nanos: u64) -> usize {
const ONE_HOUR_NANOS: u64 = crate::SECS_PER_HOUR as u64 * 1_000_000_000;
let cutoff = now_nanos.saturating_sub(ONE_HOUR_NANOS);
let Ok(mut ring) = EVICTION_RATE_RING.lock() else {
return 0;
};
ring.retain(|t| *t >= cutoff);
if ring.len() >= EVICTION_RATE_RING_CAP {
ring.pop_front();
}
ring.push_back(now_nanos);
ring.len()
}
#[derive(Clone, Debug)]
pub struct EmbeddingPoint(pub Vec<f32>);
#[inline]
fn cosine_distance(a: &[f32], b: &[f32]) -> f32 {
let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
let dist = 1.0 - dot;
if dist.is_finite() { dist } else { f32::MAX }
}
impl instant_distance::Point for EmbeddingPoint {
fn distance(&self, other: &Self) -> f32 {
cosine_distance(&self.0, &other.0)
}
}
struct RebuildSnapshot {
entries: Vec<(String, Vec<f32>)>,
overflow_at_snapshot: usize,
overflow_generation: u64,
}
struct RebuildGuard {
flag: Arc<AtomicBool>,
}
impl Drop for RebuildGuard {
fn drop(&mut self) {
self.flag.store(false, Ordering::SeqCst);
}
}
pub struct VectorIndex {
inner: Mutex<IndexState>,
warming: Arc<Mutex<Option<RebuildResult>>>,
rebuild_in_flight: Arc<AtomicBool>,
eviction_sink: Mutex<Option<Sender<EvictionEvent>>>,
}
struct RebuildResult {
hnsw: Option<HnswMap<EmbeddingPoint, String>>,
overflow_at_snapshot: usize,
overflow_generation: u64,
entries_in_graph: usize,
}
struct IndexState {
hnsw: Option<HnswMap<EmbeddingPoint, String>>,
overflow: Vec<(String, Vec<f32>)>,
all_entries: Vec<(String, Vec<f32>)>,
max_entries: usize,
overflow_generation: u64,
valid_ids_cache: Option<std::collections::HashSet<std::sync::Arc<str>>>,
graph_entry_count: usize,
}
#[derive(Debug, Clone)]
pub struct VectorHit {
pub id: String,
pub distance: f32,
}
impl VectorIndex {
pub fn build(entries: Vec<(String, Vec<f32>)>) -> Self {
let hnsw = Self::build_hnsw(&entries);
let graph_entry_count = entries.len();
VectorIndex {
inner: Mutex::new(IndexState {
hnsw,
overflow: Vec::new(),
all_entries: entries,
max_entries: MAX_ENTRIES,
overflow_generation: 0,
valid_ids_cache: None,
graph_entry_count,
}),
eviction_sink: Mutex::new(None),
warming: Arc::new(Mutex::new(None)),
rebuild_in_flight: Arc::new(AtomicBool::new(false)),
}
}
pub fn empty() -> Self {
VectorIndex {
inner: Mutex::new(IndexState {
hnsw: None,
overflow: Vec::new(),
all_entries: Vec::new(),
max_entries: MAX_ENTRIES,
overflow_generation: 0,
valid_ids_cache: None,
graph_entry_count: 0,
}),
eviction_sink: Mutex::new(None),
warming: Arc::new(Mutex::new(None)),
rebuild_in_flight: Arc::new(AtomicBool::new(false)),
}
}
#[doc(hidden)]
#[must_use]
pub fn with_max_entries_for_test(max_entries: usize) -> Self {
VectorIndex {
inner: Mutex::new(IndexState {
hnsw: None,
overflow: Vec::new(),
all_entries: Vec::new(),
max_entries,
overflow_generation: 0,
valid_ids_cache: None,
graph_entry_count: 0,
}),
eviction_sink: Mutex::new(None),
warming: Arc::new(Mutex::new(None)),
rebuild_in_flight: Arc::new(AtomicBool::new(false)),
}
}
pub fn set_eviction_sink(&self, sink: Sender<EvictionEvent>) {
if let Ok(mut guard) = self.eviction_sink.lock() {
*guard = Some(sink);
}
}
fn build_hnsw(entries: &[(String, Vec<f32>)]) -> Option<HnswMap<EmbeddingPoint, String>> {
if entries.is_empty() {
return None;
}
let points: Vec<EmbeddingPoint> = entries
.iter()
.map(|(_, emb)| EmbeddingPoint(emb.clone()))
.collect();
let values: Vec<String> = entries.iter().map(|(id, _)| id.clone()).collect();
Some(Builder::default().build(points, values))
}
pub fn insert(&self, id: String, embedding: Vec<f32>) {
self.try_swap_warming();
let snapshot_for_rebuild: Option<RebuildSnapshot> = {
let mut state = match self.inner.lock() {
Ok(s) => s,
Err(poisoned) => poisoned.into_inner(),
};
state.all_entries.push((id.clone(), embedding.clone()));
state.overflow.push((id, embedding));
state.valid_ids_cache = None;
if state.overflow.len() >= REBUILD_THRESHOLD
&& self
.rebuild_in_flight
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
Some(RebuildSnapshot {
entries: state.all_entries.clone(),
overflow_at_snapshot: state.overflow.len(),
overflow_generation: state.overflow_generation,
})
} else {
None
}
};
if let Some(snap) = snapshot_for_rebuild {
let _ = self.spawn_rebuild(snap);
}
let mut state = match self.inner.lock() {
Ok(s) => s,
Err(poisoned) => poisoned.into_inner(),
};
let max_entries = state.max_entries;
if state.all_entries.len() > max_entries {
let excess = state.all_entries.len() - max_entries;
tracing::warn!(
target: EVICTION_TRACE_TARGET,
dropped = excess,
max_entries = max_entries,
"HNSW eviction: dropped {} oldest embeddings to make room",
excess,
);
let sink_guard = self.eviction_sink.lock().ok();
for (evicted_id, _) in state.all_entries.iter().take(excess) {
tracing::warn!(
target: EVICTION_TRACE_TARGET,
evicted_id = %evicted_id,
reason = "max_entries_reached",
max_entries = max_entries,
"hnsw index evicting oldest entry: cap reached"
);
if let Some(sink) = sink_guard.as_ref().and_then(|g| g.as_ref()) {
let payload = EvictionEvent::new(
evicted_id.clone(),
String::new(), "max_entries_reached",
);
let _ = sink.send(payload);
}
}
drop(sink_guard);
#[allow(clippy::cast_possible_truncation)]
let evicted = excess as u64;
let evicted_count_to_record = evicted;
state.all_entries.drain(..excess);
state.valid_ids_cache = None;
state.overflow.clear();
state.overflow_generation = state.overflow_generation.wrapping_add(1);
if self
.rebuild_in_flight
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
let snap = RebuildSnapshot {
entries: state.all_entries.clone(),
overflow_at_snapshot: state.overflow.len(),
overflow_generation: state.overflow_generation,
};
drop(state);
let _ = self.spawn_rebuild(snap);
}
let now_nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
let now_nanos_u64 = u64::try_from(now_nanos).unwrap_or(u64::MAX);
crate::metrics::record_hnsw_eviction(evicted_count_to_record, now_nanos_u64);
let recent = record_eviction_and_count_recent(now_nanos_u64);
if recent > EVICTION_RATE_CEILING_PER_HOUR {
tracing::error!(
target: EVICTION_TRACE_TARGET,
rate_per_hour = recent,
ceiling = EVICTION_RATE_CEILING_PER_HOUR,
"HNSW eviction rate exceeded {}/hour — recall quality is degrading; \
increase vector_index_capacity or move to dedicated vector DB",
EVICTION_RATE_CEILING_PER_HOUR,
);
}
}
}
pub fn remove(&self, id: &str) {
let mut state = match self.inner.lock() {
Ok(s) => s,
Err(poisoned) => poisoned.into_inner(),
};
state.all_entries.retain(|(eid, _)| eid != id);
state.overflow.retain(|(eid, _)| eid != id);
state.valid_ids_cache = None;
}
pub fn search(&self, query: &[f32], k: usize) -> Vec<VectorHit> {
self.try_swap_warming();
let mut state = match self.inner.lock() {
Ok(s) => s,
Err(poisoned) => poisoned.into_inner(),
};
let query_point = EmbeddingPoint(query.to_vec());
let mut results: Vec<VectorHit> = Vec::with_capacity(k * 2);
if state.valid_ids_cache.is_none() {
let set: std::collections::HashSet<std::sync::Arc<str>> = state
.all_entries
.iter()
.map(|(id, _)| std::sync::Arc::<str>::from(id.as_str()))
.collect();
state.valid_ids_cache = Some(set);
}
let valid_ids = state
.valid_ids_cache
.as_ref()
.expect("valid_ids_cache populated above");
if let Some(ref hnsw) = state.hnsw {
let mut search = Search::default();
for item in hnsw.search(&query_point, &mut search) {
if !valid_ids.contains(item.value.as_str()) {
continue; }
results.push(VectorHit {
id: item.value.clone(),
distance: item.distance,
});
if results.len() >= k * 2 {
break;
}
}
}
let mut overflow_hits: Vec<VectorHit> = Vec::with_capacity(state.overflow.len());
for (id, emb) in &state.overflow {
overflow_hits.push(VectorHit {
id: id.clone(),
distance: cosine_distance(&query_point.0, emb),
});
}
overflow_hits.sort_by(|a, b| a.distance.total_cmp(&b.distance));
results.extend(overflow_hits);
let mut seen = std::collections::HashSet::new();
results.retain(|hit| seen.insert(hit.id.clone()));
results.sort_by(|a, b| a.distance.total_cmp(&b.distance));
results.truncate(k);
results
}
pub fn len(&self) -> usize {
let state = match self.inner.lock() {
Ok(s) => s,
Err(poisoned) => poisoned.into_inner(),
};
state.all_entries.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn rebuild(&self) {
let handle = self.rebuild_async();
let _ = handle.join();
let start = std::time::Instant::now();
while self.rebuild_in_flight.load(Ordering::SeqCst)
&& start.elapsed() < REBUILD_WAIT_TIMEOUT
{
std::thread::sleep(REBUILD_WAIT_POLL_INTERVAL);
}
self.try_swap_warming();
}
pub fn rebuild_async(&self) -> JoinHandle<()> {
let snapshot = {
let state = match self.inner.lock() {
Ok(s) => s,
Err(poisoned) => poisoned.into_inner(),
};
if self
.rebuild_in_flight
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_err()
{
return std::thread::spawn(|| {});
}
RebuildSnapshot {
entries: state.all_entries.clone(),
overflow_at_snapshot: state.overflow.len(),
overflow_generation: state.overflow_generation,
}
};
self.spawn_rebuild(snapshot)
}
fn spawn_rebuild(&self, snapshot: RebuildSnapshot) -> JoinHandle<()> {
let warming = Arc::clone(&self.warming);
let in_flight = Arc::clone(&self.rebuild_in_flight);
std::thread::spawn(move || {
let _guard = RebuildGuard { flag: in_flight };
let hnsw = VectorIndex::build_hnsw(&snapshot.entries);
let result = RebuildResult {
hnsw,
overflow_at_snapshot: snapshot.overflow_at_snapshot,
overflow_generation: snapshot.overflow_generation,
entries_in_graph: snapshot.entries.len(),
};
if let Ok(mut slot) = warming.lock() {
*slot = Some(result);
}
})
}
pub fn try_swap_warming(&self) -> bool {
let Some(result) = self.warming.lock().ok().and_then(|mut g| g.take()) else {
return false;
};
let mut state = match self.inner.lock() {
Ok(s) => s,
Err(poisoned) => poisoned.into_inner(),
};
if result.overflow_generation != state.overflow_generation {
tracing::warn!(
target: "hnsw.rebuild",
snapshot_gen = result.overflow_generation,
current_gen = state.overflow_generation,
"dropping stale warming result (eviction occurred mid-rebuild, #1074)"
);
return false;
}
state.hnsw = result.hnsw;
state.graph_entry_count = result.entries_in_graph;
let to_drain = result.overflow_at_snapshot.min(state.overflow.len());
state.overflow.drain(..to_drain);
true
}
pub fn is_fully_searchable(&self) -> bool {
self.try_swap_warming();
let state = match self.inner.lock() {
Ok(s) => s,
Err(poisoned) => poisoned.into_inner(),
};
state.graph_entry_count + state.overflow.len() >= state.all_entries.len()
}
pub fn seed_entries(&self, entries: Vec<(String, Vec<f32>)>) -> usize {
let mut state = match self.inner.lock() {
Ok(s) => s,
Err(poisoned) => poisoned.into_inner(),
};
let existing: std::collections::HashSet<std::sync::Arc<str>> = state
.all_entries
.iter()
.map(|(id, _)| std::sync::Arc::<str>::from(id.as_str()))
.collect();
let mut seeded = 0usize;
for (id, emb) in entries {
if existing.contains(id.as_str()) {
continue;
}
state.all_entries.push((id, emb));
seeded += 1;
}
if seeded > 0 {
state.valid_ids_cache = None;
}
seeded
}
pub fn seed_and_rebuild_async(&self, entries: Vec<(String, Vec<f32>)>) -> JoinHandle<()> {
self.seed_entries(entries);
self.rebuild_async()
}
pub fn warm_boot(&self, entries: Vec<(String, Vec<f32>)>) -> usize {
let seeded = self.seed_entries(entries);
loop {
let handle = self.rebuild_async();
let _ = handle.join();
if self.is_fully_searchable() {
return seeded;
}
std::thread::sleep(REBUILD_WAIT_POLL_INTERVAL);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_embedding(values: &[f32]) -> Vec<f32> {
let norm: f32 = values.iter().map(|v| v * v).sum::<f32>().sqrt();
values.iter().map(|v| v / norm).collect()
}
#[test]
fn empty_index_returns_empty() {
let idx = VectorIndex::empty();
let results = idx.search(&[1.0, 0.0, 0.0], 10);
assert!(results.is_empty());
}
#[test]
fn issue_1684_nan_vector_does_not_panic_and_ranks_last() {
let idx = VectorIndex::empty();
idx.insert("good".into(), make_embedding(&[1.0, 0.0, 0.0]));
idx.insert("poison".into(), vec![f32::NAN, 0.0, 0.0]);
let hits = idx.search(&make_embedding(&[1.0, 0.0, 0.0]), 5);
let good = hits.iter().position(|h| h.id == "good");
assert!(good.is_some(), "good vector must be returned");
if let Some(poison) = hits.iter().position(|h| h.id == "poison") {
assert!(
good.unwrap() < poison,
"non-finite-distance poison vector must rank after the good vector"
);
}
}
#[test]
fn issue_1684_cosine_distance_non_finite_collapses_to_max() {
assert_eq!(cosine_distance(&[f32::NAN, 1.0], &[1.0, 1.0]), f32::MAX);
assert_eq!(
cosine_distance(&[f32::INFINITY, 0.0], &[1.0, 0.0]),
f32::MAX
);
assert!(cosine_distance(&[1.0, 0.0], &[1.0, 0.0]).is_finite());
}
#[test]
fn perf_7_valid_ids_cache_is_arc_str_typed() {
let entries = vec![
("id1".to_string(), make_embedding(&[1.0, 0.0, 0.0])),
("id2".to_string(), make_embedding(&[0.0, 1.0, 0.0])),
];
let idx = VectorIndex::build(entries);
let _ = idx.search(&[1.0, 0.0, 0.0], 2);
let state = idx.inner.lock().expect("lock state");
let cache = state
.valid_ids_cache
.as_ref()
.expect("valid_ids_cache populated after search");
assert!(
cache.contains("id1") && cache.contains("id2"),
"PERF-7: Arc<str> cache failed to admit &str lookup",
);
let sample: Option<&std::sync::Arc<str>> = cache.iter().next();
assert!(sample.is_some(), "PERF-7: cache must hold Arc<str> entries");
}
#[test]
fn basic_search() {
let entries = vec![
("a".into(), make_embedding(&[1.0, 0.0, 0.0])),
("b".into(), make_embedding(&[0.0, 1.0, 0.0])),
("c".into(), make_embedding(&[0.0, 0.0, 1.0])),
];
let idx = VectorIndex::build(entries);
let results = idx.search(&make_embedding(&[1.0, 0.1, 0.0]), 2);
assert_eq!(results.len(), 2);
assert_eq!(results[0].id, "a"); }
#[test]
fn insert_and_search_overflow() {
let entries = vec![("a".into(), make_embedding(&[1.0, 0.0, 0.0]))];
let idx = VectorIndex::build(entries);
idx.insert("b".into(), make_embedding(&[0.9, 0.1, 0.0]));
let results = idx.search(&make_embedding(&[1.0, 0.0, 0.0]), 2);
assert_eq!(results.len(), 2);
assert_eq!(results[0].id, "a");
assert_eq!(results[1].id, "b");
}
#[test]
fn remove_excludes_from_results() {
let entries = vec![
("a".into(), make_embedding(&[1.0, 0.0, 0.0])),
("b".into(), make_embedding(&[0.9, 0.1, 0.0])),
];
let idx = VectorIndex::build(entries);
idx.remove("a");
let results = idx.search(&make_embedding(&[1.0, 0.0, 0.0]), 5);
assert!(results.iter().all(|h| h.id != "a"));
}
#[test]
fn test_rebuild_preserves_all_entries() {
let raw: Vec<(String, Vec<f32>)> = (0..12)
.map(|i| {
let mut v = vec![0.0_f32; 16];
#[allow(clippy::cast_precision_loss)]
let f = i as f32;
v[i % 16] = 1.0 + f * 0.01; (format!("id-{i}"), make_embedding(&v))
})
.collect();
let idx = VectorIndex::build(raw.clone());
idx.rebuild();
assert_eq!(idx.len(), raw.len());
let query = make_embedding(&[1.0; 16]);
let hits = idx.search(&query, raw.len() * 2);
let found: std::collections::HashSet<String> = hits.into_iter().map(|h| h.id).collect();
for (id, _) in &raw {
assert!(
found.contains(id),
"rebuild must preserve id {id}, found: {:?}",
found
);
}
}
#[test]
fn test_remove_then_search_excludes_id() {
let entries = vec![
("alpha".into(), make_embedding(&[1.0, 0.0, 0.0, 0.0])),
("beta".into(), make_embedding(&[0.9, 0.1, 0.0, 0.0])),
("gamma".into(), make_embedding(&[0.8, 0.2, 0.0, 0.0])),
];
let idx = VectorIndex::build(entries);
let pre = idx.search(&make_embedding(&[1.0, 0.0, 0.0, 0.0]), 5);
assert!(pre.iter().any(|h| h.id == "alpha"));
idx.remove("alpha");
for k in 1..=10 {
let hits = idx.search(&make_embedding(&[1.0, 0.0, 0.0, 0.0]), k);
assert!(
hits.iter().all(|h| h.id != "alpha"),
"removed id `alpha` resurfaced with k={k}: {:?}",
hits.iter().map(|h| &h.id).collect::<Vec<_>>()
);
}
let hits = idx.search(&make_embedding(&[1.0, 0.0, 0.0, 0.0]), 5);
let ids: Vec<&str> = hits.iter().map(|h| h.id.as_str()).collect();
assert!(ids.contains(&"beta"));
assert!(ids.contains(&"gamma"));
}
#[test]
fn empty_index_len_is_zero() {
let idx = VectorIndex::empty();
assert_eq!(idx.len(), 0);
}
#[test]
fn build_with_empty_entries_search_empty() {
let idx = VectorIndex::build(Vec::new());
assert_eq!(idx.len(), 0);
let results = idx.search(&[1.0, 0.0, 0.0], 5);
assert!(results.is_empty());
}
#[test]
fn search_with_k_zero_returns_empty() {
let entries = vec![("a".into(), make_embedding(&[1.0, 0.0, 0.0]))];
let idx = VectorIndex::build(entries);
let results = idx.search(&make_embedding(&[1.0, 0.0, 0.0]), 0);
assert!(results.is_empty());
}
#[test]
fn rebuild_on_empty_does_not_crash() {
let idx = VectorIndex::empty();
idx.rebuild();
assert_eq!(idx.len(), 0);
}
#[test]
fn insert_increases_len() {
let idx = VectorIndex::empty();
idx.insert("a".into(), make_embedding(&[1.0, 0.0, 0.0]));
idx.insert("b".into(), make_embedding(&[0.0, 1.0, 0.0]));
assert_eq!(idx.len(), 2);
}
#[test]
fn embedding_point_distance_orthogonal() {
let a = EmbeddingPoint(vec![1.0, 0.0, 0.0]);
let b = EmbeddingPoint(vec![0.0, 1.0, 0.0]);
assert!((a.distance(&b) - 1.0).abs() < 1e-6);
}
#[test]
fn embedding_point_distance_identical_is_zero() {
let a = EmbeddingPoint(make_embedding(&[1.0, 1.0, 1.0]));
assert!(a.distance(&a).abs() < 1e-6);
}
#[test]
fn remove_on_empty_index_is_noop() {
let idx = VectorIndex::empty();
idx.remove("nonexistent");
assert_eq!(idx.len(), 0);
}
#[test]
fn insert_triggers_auto_rebuild_at_threshold() {
let idx = VectorIndex::empty();
for i in 0..205_usize {
let mut v = vec![0.0_f32; 8];
#[allow(clippy::cast_precision_loss)]
let f = i as f32;
v[i % 8] = 1.0 + f * 0.001;
idx.insert(format!("id-{i}"), make_embedding(&v));
}
assert_eq!(idx.len(), 205);
let q = make_embedding(&[1.0_f32; 8]);
let hits = idx.search(&q, 5);
assert_eq!(hits.len(), 5);
}
#[test]
fn test_rebuild_after_batch_insert_settles() {
let idx = VectorIndex::empty();
let n = 25_usize;
for i in 0..n {
let mut v = vec![0.0_f32; 8];
#[allow(clippy::cast_precision_loss)]
let f = i as f32;
v[i % 8] = 1.0 + f * 0.001;
idx.insert(format!("id-{i}"), make_embedding(&v));
}
idx.rebuild();
assert_eq!(idx.len(), n);
let query = make_embedding(&[1.0; 8]);
let k = 5;
let hits = idx.search(&query, k);
assert_eq!(
hits.len(),
k,
"post-rebuild search top-{k} must return exactly {k} hits, got {:?}",
hits.iter().map(|h| &h.id).collect::<Vec<_>>()
);
for w in hits.windows(2) {
assert!(
w[0].distance <= w[1].distance,
"search results must be ascending by distance: {} > {}",
w[0].distance,
w[1].distance
);
}
let mut seen = std::collections::HashSet::new();
for h in &hits {
assert!(
seen.insert(h.id.clone()),
"duplicate id in search: {}",
h.id
);
}
}
#[test]
fn test_hnsw_eviction_fires_hook() {
let (tx, rx) = std::sync::mpsc::channel::<EvictionEvent>();
let idx = VectorIndex::with_max_entries_for_test(4);
idx.set_eviction_sink(tx);
reset_eviction_counters_for_test();
let n = 6_usize;
for i in 0..n {
let mut v = vec![0.0_f32; 4];
#[allow(clippy::cast_precision_loss)]
let f = i as f32;
v[i % 4] = 1.0 + f * 0.01;
idx.insert(format!("evict-{i}"), make_embedding(&v));
}
let mut received: Vec<EvictionEvent> = Vec::new();
let deadline = std::time::Instant::now() + std::time::Duration::from_millis(500);
while std::time::Instant::now() < deadline && received.len() < 2 {
while let Ok(ev) = rx.try_recv() {
received.push(ev);
}
if received.len() < 2 {
std::thread::sleep(std::time::Duration::from_millis(5));
}
}
assert_eq!(
received.len(),
2,
"expected one EvictionEvent per evicted id (2 evictions for n=6, cap=4), got {}: {:?}",
received.len(),
received.iter().map(|e| &e.memory_id).collect::<Vec<_>>(),
);
let ids: Vec<&str> = received.iter().map(|e| e.memory_id.as_str()).collect();
assert!(
ids.contains(&"evict-0"),
"expected evict-0 in evicted ids; got {ids:?}"
);
assert!(
ids.contains(&"evict-1"),
"expected evict-1 in evicted ids; got {ids:?}"
);
for ev in &received {
assert_eq!(
ev.reason, "max_entries_reached",
"evicted reason should match the canonical tag, got {:?}",
ev.reason
);
assert_eq!(ev.namespace, "");
assert!(
!ev.evicted_at.is_empty(),
"evicted_at must be set (rfc3339), got empty"
);
}
}
#[test]
fn test_hnsw_eviction_without_sink_is_noop_for_hook() {
let idx = VectorIndex::with_max_entries_for_test(4);
for i in 0..6_usize {
let mut v = vec![0.0_f32; 4];
#[allow(clippy::cast_precision_loss)]
let f = i as f32;
v[i % 4] = 1.0 + f * 0.01;
idx.insert(format!("noopsink-{i}"), make_embedding(&v));
}
assert_eq!(
idx.len(),
4,
"eviction-edge path must still enforce the max-4 cap even \
without a sink wired (6 inserts → 4 survivors means 2 \
evictions occurred); got len={}",
idx.len()
);
}
}
#[cfg(test)]
mod d1_968_tests {
use super::*;
use std::sync::Arc as TArc;
use std::sync::atomic::AtomicUsize;
use std::time::{Duration, Instant};
fn make_embedding(values: &[f32]) -> Vec<f32> {
let norm: f32 = values.iter().map(|v| v * v).sum::<f32>().sqrt();
values.iter().map(|v| v / norm).collect()
}
fn fixture(n: usize) -> Vec<(String, Vec<f32>)> {
(0..n)
.map(|i| {
let mut v = vec![0.0_f32; 16];
#[allow(clippy::cast_precision_loss)]
let f = i as f32;
v[i % 16] = 1.0 + f * 0.001;
(format!("id-{i}"), make_embedding(&v))
})
.collect()
}
#[test]
fn rebuild_async_does_not_block_search_968() {
let idx = TArc::new(VectorIndex::build(fixture(2_000)));
let query = make_embedding(&[1.0_f32; 16]);
let idx_for_rebuild = TArc::clone(&idx);
let rebuild_handle = std::thread::spawn(move || idx_for_rebuild.rebuild_async());
let idx_for_search = TArc::clone(&idx);
let search_start = Instant::now();
let search_handle = std::thread::spawn(move || {
for _ in 0..50 {
let hits = idx_for_search.search(&query, 10);
assert!(
!hits.is_empty(),
"search returned empty during rebuild — readers were blocked or the graph was lost"
);
}
});
let _ = search_handle.join().expect("search thread panicked");
let search_elapsed = search_start.elapsed();
assert!(
search_elapsed < Duration::from_secs(5),
"50 searches took {:?} — readers blocked on the rebuild (v0.6 regression)",
search_elapsed,
);
let _ = rebuild_handle.join().expect("rebuild thread panicked");
idx.try_swap_warming();
}
#[test]
fn rebuild_failure_leaves_active_unchanged_968() {
let entries = fixture(50);
let idx = VectorIndex::build(entries.clone());
let query = make_embedding(&[1.0_f32; 16]);
let pre_hits = idx.search(&query, 5);
assert_eq!(pre_hits.len(), 5);
let pre_ids: std::collections::HashSet<String> =
pre_hits.iter().map(|h| h.id.clone()).collect();
idx.rebuild_in_flight.store(true, Ordering::SeqCst);
let handle = idx.rebuild_async();
let _ = handle.join();
let post_hits = idx.search(&query, 5);
let post_ids: std::collections::HashSet<String> =
post_hits.iter().map(|h| h.id.clone()).collect();
assert_eq!(
pre_ids, post_ids,
"search results changed after a no-op rebuild — active was clobbered"
);
idx.rebuild_in_flight.store(false, Ordering::SeqCst);
}
#[test]
fn concurrent_writes_during_rebuild_consistent_968() {
let idx = TArc::new(VectorIndex::build(fixture(500)));
let handle = {
let idx = TArc::clone(&idx);
std::thread::spawn(move || idx.rebuild_async())
};
let inserts_done = TArc::new(AtomicUsize::new(0));
let mut writer_handles = Vec::new();
for i in 0..30 {
let idx = TArc::clone(&idx);
let counter = TArc::clone(&inserts_done);
writer_handles.push(std::thread::spawn(move || {
let mut v = vec![0.0_f32; 16];
#[allow(clippy::cast_precision_loss)]
let f = i as f32;
v[i % 16] = 2.0 + f * 0.01;
idx.insert(format!("concurrent-{i}"), make_embedding(&v));
counter.fetch_add(1, Ordering::SeqCst);
}));
}
for h in writer_handles {
let _ = h.join();
}
let rebuild_h = handle.join().expect("outer rebuild spawner panicked");
let _ = rebuild_h.join();
let mut swaps = 0_usize;
while idx.try_swap_warming() {
swaps += 1;
if swaps > 4 {
break;
}
}
std::thread::sleep(Duration::from_millis(10));
assert_eq!(inserts_done.load(Ordering::SeqCst), 30);
let final_len = idx.len();
let (overflow_len_dbg, hnsw_size_dbg) = {
let state = idx.inner.lock().expect("inner mutex poisoned");
let hnsw_size = state.hnsw.as_ref().map_or(0, |h| h.iter().count());
(state.overflow.len(), hnsw_size)
};
assert_eq!(
final_len,
530,
"post-rebuild len must equal baseline 500 + concurrent 30 = 530, \
got {final_len} (overflow={overflow_len_dbg}, hnsw={hnsw_size_dbg}, \
swaps={swaps}, inserts_done={})",
inserts_done.load(Ordering::SeqCst)
);
let present: std::collections::HashSet<String> = {
let state = idx.inner.lock().expect("inner mutex poisoned");
state.all_entries.iter().map(|(id, _)| id.clone()).collect()
};
let missing: Vec<String> = (0..30)
.map(|i| format!("concurrent-{i}"))
.filter(|id| !present.contains(id))
.collect();
assert!(
missing.is_empty(),
"post-rebuild all_entries must INCLUDE every concurrent write \
(missing={missing:?}; post-swap state: overflow={overflow_len_dbg}, \
hnsw={hnsw_size_dbg}, swaps={swaps}, final_len={final_len}); \
a missing id means the concurrent-rebuild swap dropped a write"
);
}
#[test]
fn rebuild_swap_is_atomic_968() {
let idx = TArc::new(VectorIndex::build(fixture(1_000)));
let baseline_len = idx.len();
let stop = TArc::new(AtomicBool::new(false));
let observer_stop = TArc::clone(&stop);
let idx_obs = TArc::clone(&idx);
let observer = std::thread::spawn(move || {
while !observer_stop.load(Ordering::SeqCst) {
let l = idx_obs.len();
assert!(
l >= baseline_len,
"len() dropped below baseline during rebuild — partial swap observed: {l} < {baseline_len}"
);
}
});
let h = idx.rebuild_async();
let _ = h.join();
idx.try_swap_warming();
stop.store(true, Ordering::SeqCst);
let _ = observer.join();
assert_eq!(idx.len(), baseline_len);
}
#[test]
fn stale_warming_swap_is_dropped_1074() {
let idx = VectorIndex::empty();
idx.insert(
"alpha".to_string(),
make_embedding(&[1.0_f32, 0.0, 0.0, 0.0]),
);
let before_overflow = idx.inner.lock().unwrap().overflow.len();
assert_eq!(before_overflow, 1);
{
let current_gen = idx.inner.lock().unwrap().overflow_generation;
let mut w = idx.warming.lock().unwrap();
*w = Some(RebuildResult {
hnsw: None,
overflow_at_snapshot: 999, overflow_generation: current_gen.wrapping_add(1), entries_in_graph: 0,
});
}
let swapped = idx.try_swap_warming();
assert!(
!swapped,
"stale-by-generation warming must NOT swap in (#1074)"
);
let after_overflow = idx.inner.lock().unwrap().overflow.len();
assert_eq!(
after_overflow, before_overflow,
"stale swap must NOT drain overflow (#1074 regression)"
);
let hits = idx.search(&make_embedding(&[1.0_f32, 0.0, 0.0, 0.0]), 5);
assert!(hits.iter().any(|h| h.id == "alpha"));
}
#[test]
fn eviction_clear_bumps_overflow_generation_1074() {
let idx = VectorIndex::with_max_entries_for_test(2);
let gen_initial = idx.inner.lock().unwrap().overflow_generation;
for i in 0..3 {
let mut v = vec![0.0_f32; 4];
v[i % 4] = 1.0;
idx.insert(format!("e{i}"), make_embedding(&v));
}
let gen_after = idx.inner.lock().unwrap().overflow_generation;
assert!(
gen_after > gen_initial,
"eviction-clear path must bump overflow_generation (#1074)"
);
}
#[test]
fn seed_entries_dedupes_and_defers_searchability_1579() {
let idx = VectorIndex::empty();
idx.insert("live".into(), make_embedding(&[1.0, 0.0, 0.0]));
assert!(
idx.is_fully_searchable(),
"overflow-only index is fully searchable"
);
let seeded = idx.seed_entries(vec![
("live".into(), make_embedding(&[1.0, 0.0, 0.0])),
("a".into(), make_embedding(&[0.0, 1.0, 0.0])),
("b".into(), make_embedding(&[0.0, 0.0, 1.0])),
]);
assert_eq!(seeded, 2, "duplicate id must be skipped");
assert_eq!(idx.len(), 3);
assert!(
!idx.is_fully_searchable(),
"seeded-but-unbuilt entries must report not-fully-searchable \
so the conflict check routes to its fallback (#1579 A5/B3)"
);
let hits = idx.search(&make_embedding(&[0.0, 1.0, 0.0]), 3);
assert!(hits.iter().all(|h| h.id == "live"));
}
#[test]
fn warm_boot_seeds_builds_and_swaps_1579() {
let idx = VectorIndex::empty();
let seeded = idx.warm_boot(vec![
("a".into(), make_embedding(&[1.0, 0.0, 0.0])),
("b".into(), make_embedding(&[0.0, 1.0, 0.0])),
("c".into(), make_embedding(&[0.0, 0.0, 1.0])),
]);
assert_eq!(seeded, 3);
assert!(
idx.is_fully_searchable(),
"warm_boot must drive rebuild→swap to completion"
);
let hits = idx.search(&make_embedding(&[1.0, 0.0, 0.0]), 2);
assert_eq!(hits.first().map(|h| h.id.as_str()), Some("a"));
}
#[test]
fn build_and_insert_preserve_full_searchability_1579() {
let idx = VectorIndex::build(vec![
("a".into(), make_embedding(&[1.0, 0.0, 0.0])),
("b".into(), make_embedding(&[0.0, 1.0, 0.0])),
]);
assert!(idx.is_fully_searchable(), "synchronous build covers all");
idx.insert("c".into(), make_embedding(&[0.0, 0.0, 1.0]));
assert!(
idx.is_fully_searchable(),
"insert lands in overflow — coverage preserved"
);
idx.remove("a");
assert!(
idx.is_fully_searchable(),
"remove only shrinks all_entries — coverage preserved"
);
}
}