ai_memory/hnsw.rs
1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! HNSW (Hierarchical Navigable Small World) vector index for fast approximate
5//! nearest-neighbor search over memory embeddings.
6//!
7//! Built on `instant-distance`. The index is constructed at startup from all
8//! stored embeddings. New memories added during the session go into an overflow
9//! list that is scanned linearly alongside the HNSW results — the index is
10//! rebuilt lazily once the overflow exceeds a threshold.
11
12use instant_distance::{Builder, HnswMap, Search};
13// `instant_distance::Point` is the trait that supplies the
14// `EmbeddingPoint::distance` method; it has to be in scope for the
15// in-module tests (`embedding_point_distance_*`) to call it as a
16// method. The lib code itself goes through the slice-borrow
17// `cosine_distance` helper post-#1087 so the `Point` impl is the
18// only consumer of the trait at the bare-name level.
19#[cfg(test)]
20use instant_distance::Point;
21use std::sync::Arc;
22use std::sync::Mutex;
23use std::sync::atomic::{AtomicBool, Ordering};
24use std::sync::mpsc::Sender;
25use std::thread::JoinHandle;
26
27use crate::hooks::EvictionEvent;
28
29/// Tracing target for the HNSW eviction worker (#1558 tracing-target SSOT).
30const EVICTION_TRACE_TARGET: &str = "hnsw.eviction";
31
32/// Maximum overflow entries before triggering a rebuild.
33const REBUILD_THRESHOLD: usize = 200;
34
35/// #1037 (2026-05-21) — bounded spin-wait window for [`VectorIndex::rebuild`]
36/// (the sync shim) when [`VectorIndex::rebuild_async`] short-circuited to
37/// a no-op handle because a prior async rebuild was still in flight.
38/// 1 second is well under any sensible sync-rebuild expectation
39/// (production callers use `rebuild_async`); the budget exists only
40/// to convert "silently return stale graph" → "bounded timeout, then
41/// best-effort swap".
42const REBUILD_WAIT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(1);
43/// Poll cadence for bounded waits on an in-flight rebuild. Shared by
44/// the [`VectorIndex::rebuild`] sync shim (#1037) and the #1579 B3
45/// boot warm-up retry loops (`warm_boot`,
46/// `daemon_runtime::spawn_vector_index_boot_load`).
47pub(crate) const REBUILD_WAIT_POLL_INTERVAL: std::time::Duration =
48 std::time::Duration::from_millis(10);
49
50/// #1579 B3 — minimum embedded-row count before a ONE-SHOT CLI
51/// invocation builds an HNSW graph. Below this, the recall pipeline's
52/// linear-scan fallback (`vector_index = None`) answers the semantic
53/// phase in ≤ 35 ms (P1 audit, 10-20k rows) while graph construction
54/// costs ~40 s at 10k vectors — a build that is then thrown away when
55/// the process exits. Long-lived processes (serve / MCP stdio) always
56/// build (asynchronously, see `daemon_runtime::spawn_vector_index_boot_load`)
57/// because they amortise the cost across many recalls.
58pub const CLI_HNSW_BUILD_MIN_ENTRIES: usize = 20_000;
59
60/// Maximum entries before evicting oldest to prevent unbounded memory growth.
61///
62/// Production code uses the constant 100_000. Tests may construct a
63/// `VectorIndex` with a custom cap via [`VectorIndex::with_max_entries_for_test`]
64/// — that knob is stored on the index instance itself, so it does
65/// NOT affect concurrent tests running with the default cap. The
66/// constant lives here so call sites (and the per-event tracing
67/// payload) reference one canonical value.
68const MAX_ENTRIES: usize = 100_000;
69
70// ---------------------------------------------------------------------------
71// v0.6.3.1 (P3, G2): eviction observability
72//
73// `MAX_ENTRIES`-triggered eviction in `insert()` previously dropped the
74// oldest embeddings silently — operators near the cap lost recall quality
75// invisibly. The two counters below + the structured `hnsw.eviction`
76// tracing event close that gap:
77//
78// - eviction count — cumulative count surfaced via
79// `db::stats().index_evictions_total` (and capabilities) AND at
80// `/metrics` as `ai_memory_hnsw_evictions_total`.
81// - last-eviction wall clock — UNIX nanoseconds of the most recent
82// eviction; capabilities derive `hnsw.evicted_recently` from this
83// with a 60 s rolling window.
84//
85// **pm-v3.1 PR8 (issue #1174).** Pre-PR8 the counters were two free
86// `static AtomicU64`s at the top of this file. PR8 sank both into the
87// metrics registry (`src/metrics.rs::HNSW_EVICTIONS_TOTAL` +
88// `HNSW_LAST_EVICTION_AT_NANOS`, plus matching Prometheus
89// `IntCounter` / `IntGauge` handles on `Metrics`) so the eviction
90// signal is `/metrics`-scrape-visible without a separate observer
91// thread. The accessor signatures here are preserved verbatim for
92// call-site backward compat.
93//
94// Process-local. The counters reset on restart because the index itself
95// resets on restart. Both atomics are touched only on the eviction edge
96// (rare: requires >100k vectors), so there is no measurable hot-path cost.
97// ---------------------------------------------------------------------------
98
99/// Cumulative HNSW oldest-eviction count since process start.
100///
101/// Surfaces in `memory_stats`. Non-zero indicates the in-memory vector
102/// index has hit `MAX_ENTRIES` and dropped older embeddings; recall
103/// quality may have degraded for evicted ids until they are re-inserted
104/// (e.g. on next access via `recall` touch path).
105///
106/// pm-v3.1 PR8: thin shim over `crate::metrics::hnsw_evictions_total()`.
107#[must_use]
108pub fn index_evictions_total() -> u64 {
109 crate::metrics::hnsw_evictions_total()
110}
111
112// ---------------------------------------------------------------------------
113// M8 (v0.7.0 round-2) — eviction-rate observability.
114//
115// Operators who hit the 100k cap need two signals:
116//
117// 1. Per-eviction WARN — surface every eviction event so operators
118// see drift before recall quality has noticeably degraded.
119// 2. Rolling-rate ERROR — when the trailing-hour eviction rate
120// exceeds the M8 ceiling, escalate to ERROR so the ops dashboard
121// raises a page. The escalation message names the operator
122// knobs (`vector_index_capacity` / "move to dedicated vector DB")
123// so the on-call has the remediation in the log line.
124//
125// Implementation: a small fixed-size ring buffer of UNIX-nanosecond
126// timestamps. Each eviction `push`es a stamp; the rolling-rate check
127// counts how many stamps sit inside the trailing-hour window. The
128// ring is locked behind a `Mutex` for write-coherent visibility; the
129// path runs only on the eviction edge so the lock cost is negligible.
130// ---------------------------------------------------------------------------
131
132/// M8 eviction-rate ceiling: events / hour past which the rolling
133/// observer escalates from WARN to ERROR.
134const EVICTION_RATE_CEILING_PER_HOUR: usize = 10;
135
136/// Rolling-hour ring buffer capacity. Chosen so the ring can hold the
137/// ceiling plus headroom for burstiness; older entries are
138/// transparently evicted on push.
139const EVICTION_RATE_RING_CAP: usize = 64;
140
141/// v0.7.0 #1093 — eviction-rate ring buffer. Switched from
142/// `Mutex<Vec<u64>>` to `Mutex<VecDeque<u64>>` so the cap-eviction
143/// path is O(1) `pop_front` instead of O(N) `Vec::remove(0)`.
144static EVICTION_RATE_RING: Mutex<std::collections::VecDeque<u64>> =
145 Mutex::new(std::collections::VecDeque::new());
146
147/// Whether an eviction occurred within the trailing `window_secs`.
148///
149/// Used by capabilities (P1) to set `hnsw.evicted_recently` so operators
150/// can see ongoing pressure on the cap, not just the cumulative count.
151/// Returns `false` when no evictions have ever happened in this process.
152#[must_use]
153pub fn evicted_recently(window_secs: u64) -> bool {
154 let last = crate::metrics::hnsw_last_eviction_at_nanos();
155 if last == 0 {
156 return false;
157 }
158 let now_nanos = std::time::SystemTime::now()
159 .duration_since(std::time::UNIX_EPOCH)
160 .map(|d| d.as_nanos())
161 .unwrap_or(0);
162 // Saturating math: clock can move backwards on some VMs.
163 let elapsed_nanos = u128::from(u64::MAX).min(now_nanos.saturating_sub(u128::from(last)));
164 elapsed_nanos < u128::from(window_secs).saturating_mul(1_000_000_000)
165}
166
167/// Reset the eviction counters. Test-only — production callers must not
168/// reach into the counter directly. The function is `pub` (rather than
169/// `pub(crate)`) so the integration-test crate at `tests/` can drive it
170/// alongside the public `index_evictions_total()` accessor; renaming
171/// keeps the intent obvious at every call site.
172///
173/// pm-v3.1 PR8: thin shim over
174/// `crate::metrics::reset_hnsw_eviction_counters_for_test()`.
175#[doc(hidden)]
176pub fn reset_eviction_counters_for_test() {
177 crate::metrics::reset_hnsw_eviction_counters_for_test();
178 if let Ok(mut g) = EVICTION_RATE_RING.lock() {
179 g.clear();
180 }
181}
182
183/// M8 (v0.7.0 round-2) — push the latest eviction timestamp into the
184/// rolling-hour ring and return how many stamps now sit inside the
185/// trailing hour. Producers call this once per eviction event;
186/// the caller branches on the returned count to escalate from WARN
187/// (already emitted) to ERROR.
188fn record_eviction_and_count_recent(now_nanos: u64) -> usize {
189 const ONE_HOUR_NANOS: u64 = crate::SECS_PER_HOUR as u64 * 1_000_000_000;
190 let cutoff = now_nanos.saturating_sub(ONE_HOUR_NANOS);
191 let Ok(mut ring) = EVICTION_RATE_RING.lock() else {
192 // Poisoned lock — observability is best-effort, return 0 so
193 // the caller does not over-escalate.
194 return 0;
195 };
196 // Drop stale entries first so the ring stays bounded and the
197 // count reflects the trailing hour.
198 ring.retain(|t| *t >= cutoff);
199 if ring.len() >= EVICTION_RATE_RING_CAP {
200 // v0.7.0 #1093 — VecDeque::pop_front is O(1); pre-#1093
201 // Vec::remove(0) was O(N) (backing-buffer shift).
202 ring.pop_front();
203 }
204 ring.push_back(now_nanos);
205 ring.len()
206}
207
208/// A point in the HNSW index — wraps a dense embedding vector.
209#[derive(Clone, Debug)]
210pub struct EmbeddingPoint(pub Vec<f32>);
211
212/// v0.7.0 #1087 — slice-borrow cosine-distance helper used by the
213/// overflow scan in [`VectorIndex::search`] to compute distances
214/// against the stored `Vec<f32>` without cloning each overflow
215/// embedding into a fresh `EmbeddingPoint`. Embeddings are
216/// L2-normalised so dot product = cosine similarity.
217#[inline]
218fn cosine_distance(a: &[f32], b: &[f32]) -> f32 {
219 let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
220 let dist = 1.0 - dot;
221 // #1684 — a NaN/±Inf component (local seed/insert paths bypass
222 // `federation::sanitize_shipped_vector`) makes `dist` non-finite; NaN is
223 // UNORDERED under `partial_cmp`, which previously aborted the sort via
224 // `.unwrap()` on `None` and killed the single-threaded MCP process.
225 // Collapse non-finite to `f32::MAX` so a poisoned row ranks LAST instead of
226 // corrupting (or crashing) the candidate set. Mirrors the finite-or-floor
227 // defense in `Embedder::cosine_similarity` (src/embeddings.rs).
228 if dist.is_finite() { dist } else { f32::MAX }
229}
230
231impl instant_distance::Point for EmbeddingPoint {
232 fn distance(&self, other: &Self) -> f32 {
233 cosine_distance(&self.0, &other.0)
234 }
235}
236
237// ---------------------------------------------------------------------------
238// #968 (Wave-2 Tier-C3) — async rebuild + double-buffering.
239//
240// Prior to #968 every HNSW rebuild ran SYNCHRONOUSLY on the request thread:
241// `Self::build_hnsw(&state.all_entries)` is CPU-bound (graph construction
242// is O(N log N) with constant factors that put 100k vectors at ~3-10s on
243// commodity hardware) and the producer's `insert()` call simply blocked
244// until the new graph was ready. Search callers contending for the same
245// `inner` mutex blocked too — recall p95 spiked from <20 ms to multi-second.
246//
247// The fix is a double-buffer pattern with background-task swap-in:
248// • The `active` slot (inside `IndexState`) is the index that serves
249// reads. Search holds the inner lock only long enough to clone the
250// overflow + collect valid IDs; the HNSW search itself runs against
251// the active graph held under the same lock (instant-distance's
252// `Search::default()` is per-call scratch, no shared state).
253// • The `warming` slot is `Arc<Mutex<Option<HnswMap>>>`. A background
254// thread (`std::thread::spawn` — HNSW build is CPU-bound; no tokio
255// runtime needed) builds the new graph from a snapshot of
256// `all_entries`, then drops it into `warming`. On the next
257// `try_swap_warming()` (called from search + insert + explicit poll)
258// the warmed graph atomically replaces `active`. The mutex hold
259// spans only the std::mem::swap — microseconds.
260// • Concurrent writes during rebuild: writes flow into `overflow` and
261// `all_entries` normally while the background task is building from
262// the snapshot. On swap, we trim `overflow` of the entries already
263// captured in the snapshot (the snapshot length is recorded when the
264// job kicks off). Entries inserted AFTER the snapshot remain in
265// overflow and are searched linearly until the next rebuild captures
266// them. No write is ever dropped.
267// • Rebuild failures: a panicking build-thread leaves `warming`
268// untouched (`None`); `active` is unchanged. The `JoinHandle` exposes
269// the panic to the caller via `JoinHandle::join()`. The
270// `rebuild_in_flight` atomic flips back to `false` whether the
271// thread succeeded or panicked (via a drop-guard `RebuildGuard`).
272// ---------------------------------------------------------------------------
273
274/// Snapshot-bound rebuild job. Carries the captured `all_entries` plus
275/// the overflow length at snapshot time so the post-swap overflow trim
276/// is deterministic. The trim must use the overflow length specifically
277/// (NOT `all_entries.len()`) because writes between snapshot and swap
278/// extend overflow; only the overflow PREFIX whose entries are now in
279/// the new graph is safe to drop.
280struct RebuildSnapshot {
281 entries: Vec<(String, Vec<f32>)>,
282 /// Length of `overflow` at the moment the snapshot was taken. The
283 /// swap path drains the first `overflow_at_snapshot` entries from
284 /// `state.overflow` — those entries are now in the new graph.
285 /// Anything inserted AFTER the snapshot remains in overflow for
286 /// the next rebuild cycle. Capturing the OVERFLOW length (not the
287 /// all-entries length) is load-bearing for correctness under
288 /// concurrent writes during rebuild.
289 overflow_at_snapshot: usize,
290 /// v0.7.0 #1074 (SR-2 #2, HIGH) — generation counter snapshot.
291 /// The eviction path bumps `state.overflow_generation` and
292 /// `state.clear()`s `overflow`. If a rebuild snapshot was
293 /// captured BEFORE the eviction, its `overflow_generation` will
294 /// not match the post-eviction `state.overflow_generation`, and
295 /// the swap path knows the snapshot is stale: it must NOT drain
296 /// overflow entries (those entries are post-eviction inserts not
297 /// in the snapshot's `entries`), and the warmed graph itself is
298 /// stale (it was built from pre-eviction `all_entries` that have
299 /// since been shrunk). The safe action is to drop the warmed
300 /// result without swapping AND without draining, then let the
301 /// next rebuild capture the current state.
302 overflow_generation: u64,
303}
304
305/// Drop-guard that clears the `rebuild_in_flight` flag even if the
306/// background build panics. Without this, a panic in `build_hnsw` (e.g.
307/// OOM in `instant_distance::Builder::build`) would leave the flag
308/// stuck-on and prevent any future rebuild from being scheduled.
309struct RebuildGuard {
310 flag: Arc<AtomicBool>,
311}
312
313impl Drop for RebuildGuard {
314 fn drop(&mut self) {
315 self.flag.store(false, Ordering::SeqCst);
316 }
317}
318
319/// Thread-safe HNSW index over memory embeddings.
320pub struct VectorIndex {
321 /// The built HNSW index — maps embedding points to memory IDs.
322 inner: Mutex<IndexState>,
323 /// #968 — warming slot for the double-buffer pattern. The background
324 /// rebuild thread parks the freshly-built graph here; readers/writers
325 /// observe it via [`Self::try_swap_warming`] on their next inner-lock
326 /// acquisition. `Some` means a rebuild has finished and is awaiting
327 /// swap-in; `None` means no warmed graph is ready.
328 warming: Arc<Mutex<Option<RebuildResult>>>,
329 /// #968 — coordinator flag. `true` while a background rebuild is in
330 /// flight; prevents the auto-rebuild path in `insert()` from
331 /// spawning a second concurrent build (one CPU-bound build at a
332 /// time is enough — successive rebuilds chase the same target).
333 /// Cleared by the rebuild thread's drop-guard whether the build
334 /// succeeded or panicked.
335 rebuild_in_flight: Arc<AtomicBool>,
336 /// v0.7.0 (R3-S1) — eviction sink. The `MAX_ENTRIES`-triggered
337 /// drain in `insert()` pushes an [`EvictionEvent`] onto this
338 /// channel for each evicted id; a hook-aware observer above this
339 /// layer drains the channel and fires the `on_index_eviction`
340 /// chain off the hot path. Wired by the daemon at startup
341 /// (`daemon_runtime`) via [`Self::set_eviction_sink`]. Optional —
342 /// CLI / test builds that never bring up the hooks pipeline leave
343 /// it `None` and the sink-push is a no-op so eviction throughput
344 /// is unaffected. Closes the G2 / G8 "fire site exists but not
345 /// wired" gap that the prior `tracing::warn!`-only implementation
346 /// left open.
347 ///
348 /// `Mutex` (not `RwLock`) because writes happen exactly twice in
349 /// the process lifetime (`set_eviction_sink` at startup and
350 /// `Drop`) and reads happen only on the eviction edge which is
351 /// itself already serialized through `inner`. The non-blocking
352 /// `try_send` semantics on the channel make sink-push safe to
353 /// hold across the inner-state lock without risk of deadlock.
354 eviction_sink: Mutex<Option<Sender<EvictionEvent>>>,
355}
356
357/// #968 — payload the rebuild thread parks in the `warming` slot when
358/// the build completes. Carries the new graph PLUS the overflow length
359/// at snapshot time so the swap path can trim `overflow` deterministically:
360/// the prefix `..overflow_at_snapshot` is now in the graph; entries
361/// inserted AFTER the snapshot (the suffix) remain in `overflow` for
362/// the next cycle.
363struct RebuildResult {
364 hnsw: Option<HnswMap<EmbeddingPoint, String>>,
365 overflow_at_snapshot: usize,
366 /// v0.7.0 #1074 — propagated from the snapshot so the swap path
367 /// can detect a stale-by-eviction warming result.
368 overflow_generation: u64,
369 /// #1579 — number of entries captured in the snapshot this graph
370 /// was built from. On swap it becomes the new
371 /// `IndexState::graph_entry_count`, the coverage accounting behind
372 /// [`VectorIndex::is_fully_searchable`].
373 entries_in_graph: usize,
374}
375
376struct IndexState {
377 hnsw: Option<HnswMap<EmbeddingPoint, String>>,
378 /// Entries added after the last rebuild. Searched linearly.
379 overflow: Vec<(String, Vec<f32>)>,
380 /// All entries (for rebuild). Kept in sync with the index + overflow.
381 all_entries: Vec<(String, Vec<f32>)>,
382 /// v0.7.0 R3-S1 — per-instance eviction cap. Defaults to
383 /// [`MAX_ENTRIES`] (the production 100k). Tests construct an
384 /// index with a smaller cap via
385 /// [`VectorIndex::with_max_entries_for_test`] so the eviction
386 /// edge can be exercised without inserting 100k vectors. Storing
387 /// the cap per-instance (rather than as a process-wide atomic)
388 /// keeps concurrent tests independent.
389 max_entries: usize,
390 /// v0.7.0 #1074 (SR-2 #2, HIGH) — generation counter bumped on
391 /// every `overflow.clear()` (eviction-edge path). Snapshots
392 /// captured before a clear carry the old generation; the swap
393 /// path compares against the current generation and drops the
394 /// warming result without swapping when they don't match. Closes
395 /// the gap where an entry inserted between a snapshot capture
396 /// and the eviction-clear was incorrectly drained by the swap.
397 overflow_generation: u64,
398 /// v0.7.0 #1087 — cached HashSet view of `all_entries` ids used
399 /// by [`VectorIndex::search`] as the stale-id filter. Built
400 /// lazily on the first search after a mutation; invalidated to
401 /// `None` on insert push, eviction drain, and remove retain.
402 /// Pre-#1087 this set was rebuilt on EVERY recall.
403 ///
404 /// PERF-7 (FX-C4-batch2, 2026-05-26): the cache stores
405 /// `Arc<str>` per id instead of `String`. UUID strings are 36
406 /// bytes; `Arc<str>` is a 16-byte fat pointer with the bytes
407 /// heap-allocated once, whereas a `String` is a 24-byte
408 /// (ptr/len/cap) struct with the bytes heap-allocated PLUS the
409 /// 24 bytes inline. On a 100 000-entry warm-up the change
410 /// halves the per-rebuild allocator pressure (16 vs 24+ bytes
411 /// per entry plus the no-spare-capacity heap-side). HashSet
412 /// lookup against `&str` works through the `Borrow<str>` impl.
413 valid_ids_cache: Option<std::collections::HashSet<std::sync::Arc<str>>>,
414 /// #1579 — number of entries baked into the ACTIVE graph (the
415 /// snapshot length at its build time; 0 when `hnsw` is `None`).
416 /// Together with `overflow.len()` this tells whether a search can
417 /// see every live entry: entries seeded into `all_entries` by the
418 /// async-boot loader ([`VectorIndex::seed_entries`]) are in
419 /// NEITHER the graph NOR `overflow` until the boot rebuild swaps
420 /// in, and [`VectorIndex::is_fully_searchable`] reports `false`
421 /// for that window so callers (the #519 proactive conflict check)
422 /// can route to their non-index fallback instead of silently
423 /// searching an index that cannot return the seeded rows.
424 graph_entry_count: usize,
425}
426
427/// A search result from the vector index.
428#[derive(Debug, Clone)]
429pub struct VectorHit {
430 pub id: String,
431 pub distance: f32,
432}
433
434impl VectorIndex {
435 /// Build a new index from a list of (`memory_id`, embedding) pairs.
436 pub fn build(entries: Vec<(String, Vec<f32>)>) -> Self {
437 let hnsw = Self::build_hnsw(&entries);
438 let graph_entry_count = entries.len();
439 VectorIndex {
440 inner: Mutex::new(IndexState {
441 hnsw,
442 overflow: Vec::new(),
443 all_entries: entries,
444 max_entries: MAX_ENTRIES,
445 overflow_generation: 0,
446 valid_ids_cache: None,
447 graph_entry_count,
448 }),
449 eviction_sink: Mutex::new(None),
450 warming: Arc::new(Mutex::new(None)),
451 rebuild_in_flight: Arc::new(AtomicBool::new(false)),
452 }
453 }
454
455 /// Build an empty index.
456 pub fn empty() -> Self {
457 VectorIndex {
458 inner: Mutex::new(IndexState {
459 hnsw: None,
460 overflow: Vec::new(),
461 all_entries: Vec::new(),
462 max_entries: MAX_ENTRIES,
463 overflow_generation: 0,
464 valid_ids_cache: None,
465 graph_entry_count: 0,
466 }),
467 eviction_sink: Mutex::new(None),
468 warming: Arc::new(Mutex::new(None)),
469 rebuild_in_flight: Arc::new(AtomicBool::new(false)),
470 }
471 }
472
473 /// v0.7.0 R3-S1 — Build an empty index with a custom eviction
474 /// cap. Test-only: lets a 5-entry insert sequence exercise the
475 /// eviction edge in milliseconds (vs. the ~minute-scale cost of
476 /// inserting 100k vectors at the production cap). The knob is
477 /// stored per-instance so concurrent tests using the default
478 /// cap are unaffected.
479 #[doc(hidden)]
480 #[must_use]
481 pub fn with_max_entries_for_test(max_entries: usize) -> Self {
482 VectorIndex {
483 inner: Mutex::new(IndexState {
484 hnsw: None,
485 overflow: Vec::new(),
486 all_entries: Vec::new(),
487 max_entries,
488 overflow_generation: 0,
489 valid_ids_cache: None,
490 graph_entry_count: 0,
491 }),
492 eviction_sink: Mutex::new(None),
493 warming: Arc::new(Mutex::new(None)),
494 rebuild_in_flight: Arc::new(AtomicBool::new(false)),
495 }
496 }
497
498 /// v0.7.0 (R3-S1) — wire the eviction sink.
499 ///
500 /// The daemon calls this once at startup with the send-half of an
501 /// mpsc channel; a hook-aware observer task drains the recv-half
502 /// off the hot path and fires the `on_index_eviction` chain
503 /// (`fire_on_index_eviction` in `src/hooks/chain.rs`). Replacing
504 /// an existing sink is allowed — useful when the daemon
505 /// reconfigures the hook chain at runtime — and drops the prior
506 /// sender, which terminates the prior observer cleanly.
507 ///
508 /// Build-time / CLI / test builds that never wire a sink retain
509 /// the `None` default; the eviction path's `try_send` then
510 /// becomes a no-op short-circuit so there is no measurable cost
511 /// to leaving the sink unset.
512 pub fn set_eviction_sink(&self, sink: Sender<EvictionEvent>) {
513 if let Ok(mut guard) = self.eviction_sink.lock() {
514 *guard = Some(sink);
515 }
516 }
517
518 fn build_hnsw(entries: &[(String, Vec<f32>)]) -> Option<HnswMap<EmbeddingPoint, String>> {
519 if entries.is_empty() {
520 return None;
521 }
522 let points: Vec<EmbeddingPoint> = entries
523 .iter()
524 .map(|(_, emb)| EmbeddingPoint(emb.clone()))
525 .collect();
526 let values: Vec<String> = entries.iter().map(|(id, _)| id.clone()).collect();
527 Some(Builder::default().build(points, values))
528 }
529
530 /// Add a new entry to the index (goes to overflow until next rebuild).
531 pub fn insert(&self, id: String, embedding: Vec<f32>) {
532 // #968 — opportunistically swap any warmed graph BEFORE taking the
533 // write path. This lets the auto-rebuild scheduled by a previous
534 // insert land cleanly even if no search call has run between
535 // inserts. Cheap: the warming-mutex contention is microseconds.
536 self.try_swap_warming();
537
538 // #968 — capture the snapshot for a potential auto-rebuild OUTSIDE
539 // the inner lock so the build thread can be spawned without
540 // holding the writers' mutex.
541 let snapshot_for_rebuild: Option<RebuildSnapshot> = {
542 let mut state = match self.inner.lock() {
543 Ok(s) => s,
544 Err(poisoned) => poisoned.into_inner(),
545 };
546 state.all_entries.push((id.clone(), embedding.clone()));
547 state.overflow.push((id, embedding));
548 // v0.7.0 #1087 — invalidate cached valid_ids set; rebuilt
549 // lazily on the next search.
550 state.valid_ids_cache = None;
551
552 // #968 — async auto-rebuild: when overflow crosses the
553 // threshold, snapshot the entries and let the caller (below,
554 // outside the lock) spawn the background build. We do NOT
555 // build the graph synchronously here anymore; that was the
556 // multi-second request-thread block #968 fixes. The
557 // `rebuild_in_flight` CAS prevents the same `insert` call
558 // from racing a previously-scheduled rebuild — only one
559 // background build runs at a time; the next snapshot is
560 // captured after the current build's swap lands.
561 if state.overflow.len() >= REBUILD_THRESHOLD
562 && self
563 .rebuild_in_flight
564 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
565 .is_ok()
566 {
567 Some(RebuildSnapshot {
568 entries: state.all_entries.clone(),
569 overflow_at_snapshot: state.overflow.len(),
570 overflow_generation: state.overflow_generation,
571 })
572 } else {
573 None
574 }
575 };
576 if let Some(snap) = snapshot_for_rebuild {
577 // Spawn-and-forget. The handle is consumed inside the thread
578 // via `RebuildGuard` so it doesn't dangle. Callers that want
579 // a handle should use [`Self::rebuild_async`].
580 let _ = self.spawn_rebuild(snap);
581 }
582
583 // Evict oldest entries if over capacity
584 let mut state = match self.inner.lock() {
585 Ok(s) => s,
586 Err(poisoned) => poisoned.into_inner(),
587 };
588 let max_entries = state.max_entries;
589 if state.all_entries.len() > max_entries {
590 let excess = state.all_entries.len() - max_entries;
591 // M8 (v0.7.0 round-2) — emit ONE summary WARN per eviction
592 // event so the operator sees the batch drop in the daemon
593 // log without scrolling past N per-id lines first. The
594 // per-id WARNs (below) still fire for post-mortem
595 // attribution; this one is the high-level "the index
596 // dropped N oldest embeddings" signal operators alert on.
597 tracing::warn!(
598 target: EVICTION_TRACE_TARGET,
599 dropped = excess,
600 max_entries = max_entries,
601 "HNSW eviction: dropped {} oldest embeddings to make room",
602 excess,
603 );
604 // v0.7.0 (R3-S1) — fire the `on_index_eviction` hook event
605 // for each evicted id BEFORE we drop the rows. The sink
606 // is a non-blocking `try_send` (see below); a downstream
607 // hook-aware observer drains the channel off the hot path
608 // and invokes `crate::hooks::fire_on_index_eviction` per
609 // event. This closes the G2/G8 "fire site exists but not
610 // wired" gap that the prior `tracing::warn!`-only
611 // implementation left open.
612 //
613 // The sink push happens INSIDE the inner-state lock — the
614 // channel is unbounded so `try_send`-equivalent `send`
615 // never blocks (unbounded mpsc has no backpressure). The
616 // sink lock is independent of the inner lock so there is
617 // no ordering hazard.
618 //
619 // The hook subscriber (if any) is responsible for its own
620 // logging; the warn-level tracing event is preserved here
621 // as a no-op-when-no-subscriber fallback so operators
622 // without hooks configured still see eviction pressure in
623 // daemon logs, matching the v0.6.3.1 observability contract.
624 let sink_guard = self.eviction_sink.lock().ok();
625 for (evicted_id, _) in state.all_entries.iter().take(excess) {
626 tracing::warn!(
627 target: EVICTION_TRACE_TARGET,
628 evicted_id = %evicted_id,
629 reason = "max_entries_reached",
630 max_entries = max_entries,
631 "hnsw index evicting oldest entry: cap reached"
632 );
633 if let Some(sink) = sink_guard.as_ref().and_then(|g| g.as_ref()) {
634 // mpsc::Sender::send is non-blocking on an unbounded
635 // channel (it only blocks on bounded). Errors mean the
636 // receiver dropped — observability is best-effort, no
637 // recovery action needed.
638 let payload = EvictionEvent::new(
639 evicted_id.clone(),
640 String::new(), // namespace not in scope at hnsw layer
641 "max_entries_reached",
642 );
643 let _ = sink.send(payload);
644 }
645 }
646 drop(sink_guard);
647 #[allow(clippy::cast_possible_truncation)]
648 let evicted = excess as u64;
649 // pm-v3.1 PR8 (issue #1174): counter sink moved to the
650 // metrics registry. We defer the actual `record_hnsw_eviction`
651 // call until `now_nanos_u64` is computed below so the
652 // counter and last-eviction timestamp move in lockstep.
653 let evicted_count_to_record = evicted;
654
655 state.all_entries.drain(..excess);
656 // v0.7.0 #1087 — invalidate cached valid_ids set after the
657 // eviction drain.
658 state.valid_ids_cache = None;
659 // #968 — defer the post-eviction graph rebuild to the async
660 // path. Correctness is preserved by the `valid_ids` filter
661 // in `search()` — evicted IDs are scrubbed from results
662 // immediately, even though the underlying HNSW graph still
663 // contains them until the next swap. Clearing `overflow`
664 // here was the v0.6 behavior tied to the synchronous
665 // rebuild; we preserve it so the linear-scan path doesn't
666 // re-surface evicted IDs. The next `insert()` past
667 // `REBUILD_THRESHOLD` (or an explicit `rebuild_async()`
668 // call) schedules the actual graph rebuild off-thread.
669 state.overflow.clear();
670 // v0.7.0 #1074 (SR-2 #2, HIGH) — bump the generation
671 // counter on every overflow.clear(). Any in-flight rebuild
672 // snapshot captured BEFORE this bump now carries a stale
673 // generation; the swap path detects the mismatch and
674 // drops the warming result without draining overflow,
675 // preventing the lose-an-insert race where an entry
676 // landed between snapshot and clear() was incorrectly
677 // drained by the eventual swap.
678 state.overflow_generation = state.overflow_generation.wrapping_add(1);
679 // Schedule the rebuild via the async path so the eviction
680 // edge no longer blocks the writer for the multi-second
681 // `build_hnsw` cost at 100k cap. The CAS skips if a
682 // previously-scheduled rebuild is still in flight; the
683 // next insert past threshold picks up the post-eviction
684 // state.
685 if self
686 .rebuild_in_flight
687 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
688 .is_ok()
689 {
690 let snap = RebuildSnapshot {
691 entries: state.all_entries.clone(),
692 // overflow was just cleared above so the snapshot
693 // captures an empty overflow window — anything
694 // inserted post-eviction will be a fresh suffix.
695 overflow_at_snapshot: state.overflow.len(),
696 overflow_generation: state.overflow_generation,
697 };
698 // Release the inner lock before spawning so the
699 // background thread can take it on swap. The
700 // observability path below only reads counters /
701 // statics, not `state`, so we do not need to
702 // re-acquire.
703 drop(state);
704 let _ = self.spawn_rebuild(snap);
705 }
706
707 // Record completion time AFTER the rebuild. `evicted_recently` is
708 // a "did we evict in the trailing N seconds" check; an operator
709 // asking that wants the operation completion time, not the
710 // start. At v0.6 the in-line `build_hnsw` dominated wall time
711 // here (~minutes at 100k entries) — using the start would
712 // make evicted_recently misreport even immediately after
713 // insert returns. Post-#968 the build runs off-thread so
714 // the gap shrinks to microseconds, but the
715 // completion-time-after-rebuild semantics are preserved.
716 let now_nanos = std::time::SystemTime::now()
717 .duration_since(std::time::UNIX_EPOCH)
718 .map(|d| d.as_nanos())
719 .unwrap_or(0);
720 let now_nanos_u64 = u64::try_from(now_nanos).unwrap_or(u64::MAX);
721 // pm-v3.1 PR8 (issue #1174): single sink call covers both
722 // the cumulative counter and the last-eviction timestamp,
723 // mirroring both onto the Prometheus handles so `/metrics`
724 // scrapes see the eviction event without polling
725 // `memory_stats`.
726 crate::metrics::record_hnsw_eviction(evicted_count_to_record, now_nanos_u64);
727
728 // M8 (v0.7.0 round-2) — rolling-hour rate observer. Push
729 // a stamp on this eviction, then count stamps in the
730 // trailing hour. If the rate clears the M8 ceiling,
731 // escalate to ERROR so the dashboard pages the on-call.
732 let recent = record_eviction_and_count_recent(now_nanos_u64);
733 if recent > EVICTION_RATE_CEILING_PER_HOUR {
734 tracing::error!(
735 target: EVICTION_TRACE_TARGET,
736 rate_per_hour = recent,
737 ceiling = EVICTION_RATE_CEILING_PER_HOUR,
738 "HNSW eviction rate exceeded {}/hour — recall quality is degrading; \
739 increase vector_index_capacity or move to dedicated vector DB",
740 EVICTION_RATE_CEILING_PER_HOUR,
741 );
742 }
743 }
744 }
745
746 /// Remove an entry by ID (marks for exclusion; cleaned up on rebuild).
747 pub fn remove(&self, id: &str) {
748 let mut state = match self.inner.lock() {
749 Ok(s) => s,
750 Err(poisoned) => poisoned.into_inner(),
751 };
752 state.all_entries.retain(|(eid, _)| eid != id);
753 state.overflow.retain(|(eid, _)| eid != id);
754 // v0.7.0 #1087 — invalidate cached valid_ids set after remove.
755 state.valid_ids_cache = None;
756 // Note: the HNSW index itself is immutable — removed IDs are filtered
757 // from search results. A rebuild will fully remove them.
758 }
759
760 /// Search for the `k` nearest neighbors to the query embedding.
761 ///
762 /// Combines HNSW approximate search with linear scan of overflow entries.
763 /// Returns results sorted by ascending distance (closest first).
764 pub fn search(&self, query: &[f32], k: usize) -> Vec<VectorHit> {
765 // #968 — opportunistic swap-on-read. If a background rebuild has
766 // parked a warmed graph in the `warming` slot, swap it into
767 // `active` BEFORE we serve this search. The swap is a single
768 // `std::mem::swap` under the inner mutex held for microseconds;
769 // search itself never blocks on graph construction.
770 self.try_swap_warming();
771
772 let mut state = match self.inner.lock() {
773 Ok(s) => s,
774 Err(poisoned) => poisoned.into_inner(),
775 };
776 let query_point = EmbeddingPoint(query.to_vec());
777
778 let mut results: Vec<VectorHit> = Vec::with_capacity(k * 2);
779
780 // v0.7.0 #1087 — populate the cached valid_ids set on the
781 // first search after any mutation; reuse it across recalls.
782 // Pre-#1087 this set was rebuilt on EVERY recall (iterating
783 // up to 100k strings + a fresh HashSet allocation per call).
784 if state.valid_ids_cache.is_none() {
785 // PERF-7 — collect to HashSet<Arc<str>> instead of
786 // HashSet<String>. Arc<str> is a 16-byte fat pointer
787 // vs String's 24-byte (ptr/len/cap) struct; the
788 // backing bytes are shared with no spare capacity
789 // overhead. HashSet::contains accepts `&str` via the
790 // `Borrow<str>` impl on `Arc<str>`.
791 let set: std::collections::HashSet<std::sync::Arc<str>> = state
792 .all_entries
793 .iter()
794 .map(|(id, _)| std::sync::Arc::<str>::from(id.as_str()))
795 .collect();
796 state.valid_ids_cache = Some(set);
797 }
798 let valid_ids = state
799 .valid_ids_cache
800 .as_ref()
801 .expect("valid_ids_cache populated above");
802
803 // Search the HNSW index
804 if let Some(ref hnsw) = state.hnsw {
805 let mut search = Search::default();
806 for item in hnsw.search(&query_point, &mut search) {
807 if !valid_ids.contains(item.value.as_str()) {
808 continue; // Removed entry
809 }
810 results.push(VectorHit {
811 id: item.value.clone(),
812 distance: item.distance,
813 });
814 if results.len() >= k * 2 {
815 break;
816 }
817 }
818 }
819
820 // v0.7.0 #1087 — linear scan of overflow entries WITHOUT
821 // cloning the embedding vec. Pre-#1087 this constructed
822 // `EmbeddingPoint(emb.clone())` per overflow entry (~200 ×
823 // 1536 bytes = 300 KB of clone per search at the cap); the
824 // cosine-distance helper takes `&[f32]` so we inline against
825 // the stored slice instead.
826 let mut overflow_hits: Vec<VectorHit> = Vec::with_capacity(state.overflow.len());
827 for (id, emb) in &state.overflow {
828 overflow_hits.push(VectorHit {
829 id: id.clone(),
830 distance: cosine_distance(&query_point.0, emb),
831 });
832 }
833 // #1684 — total_cmp is total over f32 (no panic even if a NaN slips
834 // past the cosine_distance floor); deterministic ordering.
835 overflow_hits.sort_by(|a, b| a.distance.total_cmp(&b.distance));
836
837 results.extend(overflow_hits);
838
839 // Deduplicate by ID (prefer lower distance)
840 let mut seen = std::collections::HashSet::new();
841 results.retain(|hit| seen.insert(hit.id.clone()));
842
843 // Sort by distance and truncate (#1684 — total_cmp, panic-free)
844 results.sort_by(|a, b| a.distance.total_cmp(&b.distance));
845 results.truncate(k);
846 results
847 }
848
849 /// Return the total number of indexed entries (HNSW + overflow).
850 pub fn len(&self) -> usize {
851 let state = match self.inner.lock() {
852 Ok(s) => s,
853 Err(poisoned) => poisoned.into_inner(),
854 };
855 state.all_entries.len()
856 }
857
858 /// `true` when the index holds no live entries at all.
859 ///
860 /// #1579 QC — load-bearing for the proactive-conflict dispatch:
861 /// an EMPTY index is *vacuously* [`Self::is_fully_searchable`]
862 /// (`0 + 0 >= 0`), but during the async-boot LOAD phase — after
863 /// the daemon binds with `VectorIndex::empty()` and before the
864 /// boot loader's `seed_entries` lands (the `get_all_embeddings`
865 /// read is the long pole at 100k rows) — emptiness says nothing
866 /// about what the DB holds. Callers that would otherwise trust a
867 /// fully-searchable index (the #519 conflict check) must ALSO
868 /// require non-emptiness, so that window routes to the bounded
869 /// recency-scan fallback instead of consulting an index that
870 /// cannot return anything.
871 pub fn is_empty(&self) -> bool {
872 self.len() == 0
873 }
874
875 /// #968 — Force a full rebuild of the HNSW index from all entries,
876 /// SYNCHRONOUSLY. Preserved for tests + emergency paths; production
877 /// code should call [`Self::rebuild_async`] so the multi-second
878 /// graph build does not block the calling thread.
879 ///
880 /// Implementation: delegates to `rebuild_async` and `join`s the
881 /// resulting handle so callers retain the v0.6 semantics ("the
882 /// graph is rebuilt by the time this returns"). Tests rely on this
883 /// blocking behavior to assert post-rebuild invariants without
884 /// adding a yield/poll loop.
885 pub fn rebuild(&self) {
886 // #1037 (MEDIUM, 2026-05-21): defend the sync-rebuild contract
887 // against the `rebuild_async()` no-op-handle short-circuit.
888 // Pre-#1037 if a previous async rebuild was still in flight
889 // (`rebuild_in_flight==true`), `rebuild_async` returned a
890 // no-op `std::thread::spawn(|| {})` handle that joined
891 // instantly — `try_swap_warming()` then ran against a warming
892 // slot that the IN-FLIGHT build hadn't populated yet, so the
893 // sync contract ("graph is rebuilt by the time this returns")
894 // was silently violated. The caller observed the pre-rebuild
895 // state.
896 //
897 // Fix: after the initial `join()`, spin-wait on
898 // `rebuild_in_flight` for up to REBUILD_WAIT_TIMEOUT so the
899 // in-flight build has a bounded window to complete its
900 // warming-slot insert. Then run `try_swap_warming()`. If the
901 // in-flight build genuinely hangs (test-fixture corner case),
902 // surface that as a clean timeout rather than silently
903 // returning a stale graph.
904 let handle = self.rebuild_async();
905 let _ = handle.join();
906 // Bounded spin-wait for any concurrently-running rebuild to
907 // populate `warming`. Cheap CAS read; total budget is
908 // REBUILD_WAIT_TIMEOUT * REBUILD_WAIT_POLL_INTERVAL =
909 // ~10ms × 100 = 1 second worst-case (well under any sensible
910 // sync-rebuild expectation; production callers are async).
911 let start = std::time::Instant::now();
912 while self.rebuild_in_flight.load(Ordering::SeqCst)
913 && start.elapsed() < REBUILD_WAIT_TIMEOUT
914 {
915 std::thread::sleep(REBUILD_WAIT_POLL_INTERVAL);
916 }
917 self.try_swap_warming();
918 }
919
920 /// #968 — Schedule a full HNSW rebuild on a background thread and
921 /// return the [`JoinHandle`] for callers that want to observe
922 /// completion. The build does NOT hold the inner mutex; readers
923 /// and writers continue to operate against `active` + `overflow`
924 /// while the new graph warms up. On success, the warmed graph
925 /// lands in the `warming` slot and is swapped into `active` by
926 /// the next reader/writer (or by the foreground `rebuild` shim's
927 /// post-join `try_swap_warming` call).
928 ///
929 /// Concurrency contract:
930 /// - At most one rebuild runs at a time (gated by the
931 /// `rebuild_in_flight` atomic). A second `rebuild_async` call
932 /// while a build is in flight returns a no-op handle (the
933 /// spawned closure short-circuits if the CAS fails — the in-
934 /// flight build will pick up the latest entries via the next
935 /// trigger).
936 /// - Writes during the build flow into `overflow` and
937 /// `all_entries` normally. The swap path uses the snapshot
938 /// length captured at spawn time to trim only the overflow
939 /// entries that are now in the new graph; entries inserted
940 /// AFTER the snapshot remain in overflow for the next cycle.
941 /// - Search is unaffected: it reads `active` + `overflow` under
942 /// the inner mutex, both of which remain coherent throughout.
943 ///
944 /// Failure: a panic inside the build thread is observable via
945 /// `JoinHandle::join()`; `active` is unchanged. The
946 /// `rebuild_in_flight` flag is cleared by the `RebuildGuard`
947 /// drop-guard whether the build succeeded or panicked.
948 pub fn rebuild_async(&self) -> JoinHandle<()> {
949 // Snapshot under the inner lock so we capture a consistent
950 // entries list. Read-only; we do not mutate `all_entries`
951 // here. If a rebuild is already in flight, return a no-op
952 // handle (a thread that joins instantly).
953 let snapshot = {
954 let state = match self.inner.lock() {
955 Ok(s) => s,
956 Err(poisoned) => poisoned.into_inner(),
957 };
958 if self
959 .rebuild_in_flight
960 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
961 .is_err()
962 {
963 // Already running — return an instantly-completing
964 // handle. The caller's `join()` returns `Ok(())`.
965 return std::thread::spawn(|| {});
966 }
967 RebuildSnapshot {
968 entries: state.all_entries.clone(),
969 overflow_at_snapshot: state.overflow.len(),
970 overflow_generation: state.overflow_generation,
971 }
972 };
973 self.spawn_rebuild(snapshot)
974 }
975
976 /// #968 — internal: spawn the rebuild thread for a captured
977 /// snapshot. The caller is expected to have flipped
978 /// `rebuild_in_flight` to `true` already (via CAS). The drop-guard
979 /// inside the thread clears the flag whether the build succeeds
980 /// or panics.
981 fn spawn_rebuild(&self, snapshot: RebuildSnapshot) -> JoinHandle<()> {
982 let warming = Arc::clone(&self.warming);
983 let in_flight = Arc::clone(&self.rebuild_in_flight);
984 std::thread::spawn(move || {
985 // RAII clears `rebuild_in_flight` even on panic.
986 let _guard = RebuildGuard { flag: in_flight };
987 // CPU-bound graph build runs OUTSIDE the inner mutex.
988 // This is the load-bearing change for #968: readers and
989 // writers continue to make progress while this runs.
990 let hnsw = VectorIndex::build_hnsw(&snapshot.entries);
991 let result = RebuildResult {
992 hnsw,
993 overflow_at_snapshot: snapshot.overflow_at_snapshot,
994 overflow_generation: snapshot.overflow_generation,
995 entries_in_graph: snapshot.entries.len(),
996 };
997 // Park the result in the warming slot. The next caller
998 // through `try_swap_warming` will move it into `active`.
999 // Holding the warming mutex here is microseconds.
1000 if let Ok(mut slot) = warming.lock() {
1001 // Overwrite any older warmed result that was never
1002 // swapped (e.g. two rebuilds completed before any
1003 // reader ran). The newer build is by definition a
1004 // superset of the older one's entries, so dropping
1005 // the older result is correct.
1006 *slot = Some(result);
1007 }
1008 })
1009 }
1010
1011 /// #968 — Swap the warming slot into active if a warmed graph is
1012 /// ready. Called opportunistically from `search`, `insert`, and
1013 /// the post-join path of the sync `rebuild` shim. The swap holds
1014 /// the inner mutex for microseconds — just long enough to
1015 /// `std::mem::replace` the graph and trim the overflow.
1016 ///
1017 /// Returns `true` if a swap occurred, `false` otherwise. Test
1018 /// code uses the return value to verify the swap landed before
1019 /// asserting post-rebuild state.
1020 pub fn try_swap_warming(&self) -> bool {
1021 // Pop the warmed result FIRST so we hold the warming mutex
1022 // only long enough to take ownership. We then re-acquire the
1023 // inner mutex to swap it in. The two-mutex sequence is safe
1024 // (no ordering hazard with any other path that takes both).
1025 let Some(result) = self.warming.lock().ok().and_then(|mut g| g.take()) else {
1026 return false;
1027 };
1028 let mut state = match self.inner.lock() {
1029 Ok(s) => s,
1030 Err(poisoned) => poisoned.into_inner(),
1031 };
1032 // v0.7.0 #1074 (SR-2 #2, HIGH) — generation check. If the
1033 // overflow generation has bumped since the rebuild captured
1034 // its snapshot, the warming graph was built from pre-eviction
1035 // all_entries and the current overflow contains post-eviction
1036 // inserts that are NOT in that graph. Drop the warming result
1037 // entirely without swapping (the next rebuild captures the
1038 // current state cleanly). Pre-#1074 the swap would have
1039 // overwritten the live graph with a stale one AND drained
1040 // the post-eviction inserts from overflow — silently losing
1041 // them until the next rebuild.
1042 if result.overflow_generation != state.overflow_generation {
1043 tracing::warn!(
1044 target: "hnsw.rebuild",
1045 snapshot_gen = result.overflow_generation,
1046 current_gen = state.overflow_generation,
1047 "dropping stale warming result (eviction occurred mid-rebuild, #1074)"
1048 );
1049 return false;
1050 }
1051 state.hnsw = result.hnsw;
1052 // #1579 — coverage accounting for `is_fully_searchable`.
1053 state.graph_entry_count = result.entries_in_graph;
1054 // Trim overflow: the first `overflow_at_snapshot` entries are
1055 // now in the graph; entries inserted AFTER the snapshot
1056 // remain. Defensive `min` in case `remove` or eviction
1057 // shortened overflow while the build was running.
1058 let to_drain = result.overflow_at_snapshot.min(state.overflow.len());
1059 state.overflow.drain(..to_drain);
1060 true
1061 }
1062
1063 /// #1579 — `true` when a search against this index can observe
1064 /// every live entry: the active graph (its build-time snapshot
1065 /// length) plus the linearly-scanned `overflow` cover
1066 /// `all_entries`. `false` exactly during the async-boot warm
1067 /// window, when [`Self::seed_entries`] has parked DB-loaded
1068 /// entries in `all_entries` but the background graph build has
1069 /// not swapped in yet — sequenced writes flow through `insert()`
1070 /// (graph- or overflow-visible) so they never break coverage, and
1071 /// removals/evictions only shrink `all_entries` (stale graph ids
1072 /// are filtered at search time), so the inequality is conservative
1073 /// in the safe direction.
1074 ///
1075 /// Consumers: the #519 proactive conflict check routes to its
1076 /// bounded-scan fallback while this is `false`; the boot loader
1077 /// uses it to decide whether a make-up rebuild is needed after a
1078 /// racing routine rebuild swallowed its CAS.
1079 pub fn is_fully_searchable(&self) -> bool {
1080 // Opportunistically land any warmed graph first so a caller
1081 // probing right after a rebuild finished sees the swapped
1082 // state (mirrors `search`/`insert`).
1083 self.try_swap_warming();
1084 let state = match self.inner.lock() {
1085 Ok(s) => s,
1086 Err(poisoned) => poisoned.into_inner(),
1087 };
1088 state.graph_entry_count + state.overflow.len() >= state.all_entries.len()
1089 }
1090
1091 /// #1579 B3 — bulk-load DB-resident entries into the index
1092 /// WITHOUT building the graph (the async-boot path). Entries land
1093 /// in `all_entries` only; they become searchable when the
1094 /// follow-up rebuild (see [`Self::seed_and_rebuild_async`]) swaps
1095 /// its graph in. Ids already present (e.g. a row written through
1096 /// `insert()` between the caller's DB snapshot and this call) are
1097 /// skipped so the index never double-counts. Returns the number of
1098 /// entries actually seeded.
1099 ///
1100 /// Deliberately does NOT enforce `max_entries` eviction here —
1101 /// the legacy synchronous boot path (`VectorIndex::build` over
1102 /// `get_all_embeddings`) never evicted at boot either, and the
1103 /// first post-boot `insert()` applies the cap exactly as before.
1104 pub fn seed_entries(&self, entries: Vec<(String, Vec<f32>)>) -> usize {
1105 let mut state = match self.inner.lock() {
1106 Ok(s) => s,
1107 Err(poisoned) => poisoned.into_inner(),
1108 };
1109 let existing: std::collections::HashSet<std::sync::Arc<str>> = state
1110 .all_entries
1111 .iter()
1112 .map(|(id, _)| std::sync::Arc::<str>::from(id.as_str()))
1113 .collect();
1114 let mut seeded = 0usize;
1115 for (id, emb) in entries {
1116 if existing.contains(id.as_str()) {
1117 continue;
1118 }
1119 state.all_entries.push((id, emb));
1120 seeded += 1;
1121 }
1122 if seeded > 0 {
1123 state.valid_ids_cache = None;
1124 }
1125 seeded
1126 }
1127
1128 /// #1579 B3 — async-boot warm-up: seed DB-loaded entries (see
1129 /// [`Self::seed_entries`]) and schedule the graph build on the
1130 /// existing #968 double-buffer rebuild machinery. Returns the
1131 /// rebuild thread's [`JoinHandle`]; the caller (the boot loader)
1132 /// joins it off the request path and then calls
1133 /// [`Self::try_swap_warming`] + emits the operator-visible
1134 /// "index warm" line.
1135 ///
1136 /// If a routine rebuild is already in flight (its snapshot
1137 /// predates the seed), `rebuild_async` returns a no-op handle and
1138 /// the seeded entries stay graph-invisible until a later rebuild;
1139 /// the boot loader detects that via [`Self::is_fully_searchable`]
1140 /// and issues a make-up `rebuild`.
1141 pub fn seed_and_rebuild_async(&self, entries: Vec<(String, Vec<f32>)>) -> JoinHandle<()> {
1142 self.seed_entries(entries);
1143 self.rebuild_async()
1144 }
1145
1146 /// #1579 B3 — blocking boot warm-up for callers that hold the
1147 /// index directly (the MCP stdio boot thread; tests). Seeds the
1148 /// DB-loaded entries and drives rebuild→swap to completion,
1149 /// returning the number of entries seeded. Each step takes the
1150 /// inner mutex only briefly (the graph build itself runs on the
1151 /// #968 background thread against a snapshot), so concurrent
1152 /// readers/writers on other threads keep making progress — the
1153 /// CALLING thread is the only one parked.
1154 ///
1155 /// The retry loop covers the rebuild-CAS race: if a routine
1156 /// 200-overflow rebuild was already in flight when our seed
1157 /// landed, `rebuild_async` short-circuits to a no-op handle and
1158 /// the in-flight build's pre-seed snapshot cannot cover the
1159 /// seeded rows — `is_fully_searchable` stays `false` and the loop
1160 /// schedules a make-up rebuild once the CAS frees up.
1161 pub fn warm_boot(&self, entries: Vec<(String, Vec<f32>)>) -> usize {
1162 let seeded = self.seed_entries(entries);
1163 loop {
1164 let handle = self.rebuild_async();
1165 let _ = handle.join();
1166 // `is_fully_searchable` opportunistically swaps any warmed
1167 // graph before evaluating coverage.
1168 if self.is_fully_searchable() {
1169 return seeded;
1170 }
1171 std::thread::sleep(REBUILD_WAIT_POLL_INTERVAL);
1172 }
1173 }
1174}
1175
1176#[cfg(test)]
1177mod tests {
1178 use super::*;
1179
1180 fn make_embedding(values: &[f32]) -> Vec<f32> {
1181 // L2-normalize
1182 let norm: f32 = values.iter().map(|v| v * v).sum::<f32>().sqrt();
1183 values.iter().map(|v| v / norm).collect()
1184 }
1185
1186 #[test]
1187 fn empty_index_returns_empty() {
1188 let idx = VectorIndex::empty();
1189 let results = idx.search(&[1.0, 0.0, 0.0], 10);
1190 assert!(results.is_empty());
1191 }
1192
1193 #[test]
1194 fn issue_1684_nan_vector_does_not_panic_and_ranks_last() {
1195 // Pre-#1684: a NaN distance made `partial_cmp` return `None` and
1196 // `.unwrap()` panicked inside `sort_by`, aborting the single-threaded
1197 // MCP process. Post-fix the poison vector ranks last and search returns.
1198 let idx = VectorIndex::empty();
1199 idx.insert("good".into(), make_embedding(&[1.0, 0.0, 0.0]));
1200 idx.insert("poison".into(), vec![f32::NAN, 0.0, 0.0]);
1201 let hits = idx.search(&make_embedding(&[1.0, 0.0, 0.0]), 5);
1202 let good = hits.iter().position(|h| h.id == "good");
1203 assert!(good.is_some(), "good vector must be returned");
1204 if let Some(poison) = hits.iter().position(|h| h.id == "poison") {
1205 assert!(
1206 good.unwrap() < poison,
1207 "non-finite-distance poison vector must rank after the good vector"
1208 );
1209 }
1210 }
1211
1212 #[test]
1213 fn issue_1684_cosine_distance_non_finite_collapses_to_max() {
1214 assert_eq!(cosine_distance(&[f32::NAN, 1.0], &[1.0, 1.0]), f32::MAX);
1215 assert_eq!(
1216 cosine_distance(&[f32::INFINITY, 0.0], &[1.0, 0.0]),
1217 f32::MAX
1218 );
1219 assert!(cosine_distance(&[1.0, 0.0], &[1.0, 0.0]).is_finite());
1220 }
1221
1222 #[test]
1223 fn perf_7_valid_ids_cache_is_arc_str_typed() {
1224 // PERF-7 (FX-C4-batch2, 2026-05-26) — the valid_ids cache
1225 // must be HashSet<Arc<str>>, not HashSet<String>. The
1226 // discriminator is the cache field's type, which we exercise
1227 // by populating the index, searching once to materialise the
1228 // cache, then reaching into the locked state to verify the
1229 // cache element shape via downcast on a sample entry.
1230 let entries = vec![
1231 ("id1".to_string(), make_embedding(&[1.0, 0.0, 0.0])),
1232 ("id2".to_string(), make_embedding(&[0.0, 1.0, 0.0])),
1233 ];
1234 let idx = VectorIndex::build(entries);
1235 // Trigger one search so the cache populates.
1236 let _ = idx.search(&[1.0, 0.0, 0.0], 2);
1237 let state = idx.inner.lock().expect("lock state");
1238 let cache = state
1239 .valid_ids_cache
1240 .as_ref()
1241 .expect("valid_ids_cache populated after search");
1242 assert!(
1243 cache.contains("id1") && cache.contains("id2"),
1244 "PERF-7: Arc<str> cache failed to admit &str lookup",
1245 );
1246 // Type system pins the field type — if a future refactor
1247 // changes the type back to HashSet<String>, this `Arc::clone`
1248 // line fails to compile.
1249 let sample: Option<&std::sync::Arc<str>> = cache.iter().next();
1250 assert!(sample.is_some(), "PERF-7: cache must hold Arc<str> entries");
1251 }
1252
1253 #[test]
1254 fn basic_search() {
1255 let entries = vec![
1256 ("a".into(), make_embedding(&[1.0, 0.0, 0.0])),
1257 ("b".into(), make_embedding(&[0.0, 1.0, 0.0])),
1258 ("c".into(), make_embedding(&[0.0, 0.0, 1.0])),
1259 ];
1260 let idx = VectorIndex::build(entries);
1261 let results = idx.search(&make_embedding(&[1.0, 0.1, 0.0]), 2);
1262 assert_eq!(results.len(), 2);
1263 assert_eq!(results[0].id, "a"); // Closest to [1, 0.1, 0]
1264 }
1265
1266 #[test]
1267 fn insert_and_search_overflow() {
1268 let entries = vec![("a".into(), make_embedding(&[1.0, 0.0, 0.0]))];
1269 let idx = VectorIndex::build(entries);
1270 idx.insert("b".into(), make_embedding(&[0.9, 0.1, 0.0]));
1271 let results = idx.search(&make_embedding(&[1.0, 0.0, 0.0]), 2);
1272 assert_eq!(results.len(), 2);
1273 assert_eq!(results[0].id, "a");
1274 assert_eq!(results[1].id, "b");
1275 }
1276
1277 #[test]
1278 fn remove_excludes_from_results() {
1279 let entries = vec![
1280 ("a".into(), make_embedding(&[1.0, 0.0, 0.0])),
1281 ("b".into(), make_embedding(&[0.9, 0.1, 0.0])),
1282 ];
1283 let idx = VectorIndex::build(entries);
1284 idx.remove("a");
1285 let results = idx.search(&make_embedding(&[1.0, 0.0, 0.0]), 5);
1286 assert!(results.iter().all(|h| h.id != "a"));
1287 }
1288
1289 // -----------------------------------------------------------------
1290 // W11/S11b — rebuild + batched-insert hardening
1291 // -----------------------------------------------------------------
1292
1293 #[test]
1294 fn test_rebuild_preserves_all_entries() {
1295 // Build a small but non-trivial set of orthonormal-ish vectors,
1296 // rebuild the index, and confirm every id is still findable via
1297 // search with a top-k that covers them all.
1298 let raw: Vec<(String, Vec<f32>)> = (0..12)
1299 .map(|i| {
1300 let mut v = vec![0.0_f32; 16];
1301 #[allow(clippy::cast_precision_loss)]
1302 let f = i as f32;
1303 v[i % 16] = 1.0 + f * 0.01; // bias to make L2 norm non-trivial
1304 (format!("id-{i}"), make_embedding(&v))
1305 })
1306 .collect();
1307
1308 let idx = VectorIndex::build(raw.clone());
1309 idx.rebuild();
1310 assert_eq!(idx.len(), raw.len());
1311
1312 // Every id should appear when we ask for top-N where N >= count.
1313 let query = make_embedding(&[1.0; 16]);
1314 let hits = idx.search(&query, raw.len() * 2);
1315 let found: std::collections::HashSet<String> = hits.into_iter().map(|h| h.id).collect();
1316 for (id, _) in &raw {
1317 assert!(
1318 found.contains(id),
1319 "rebuild must preserve id {id}, found: {:?}",
1320 found
1321 );
1322 }
1323 }
1324
1325 #[test]
1326 fn test_remove_then_search_excludes_id() {
1327 let entries = vec![
1328 ("alpha".into(), make_embedding(&[1.0, 0.0, 0.0, 0.0])),
1329 ("beta".into(), make_embedding(&[0.9, 0.1, 0.0, 0.0])),
1330 ("gamma".into(), make_embedding(&[0.8, 0.2, 0.0, 0.0])),
1331 ];
1332 let idx = VectorIndex::build(entries);
1333 // Pre-remove: alpha should be the closest to (1,0,0,0).
1334 let pre = idx.search(&make_embedding(&[1.0, 0.0, 0.0, 0.0]), 5);
1335 assert!(pre.iter().any(|h| h.id == "alpha"));
1336
1337 idx.remove("alpha");
1338 // Post-remove: alpha must not appear regardless of k.
1339 for k in 1..=10 {
1340 let hits = idx.search(&make_embedding(&[1.0, 0.0, 0.0, 0.0]), k);
1341 assert!(
1342 hits.iter().all(|h| h.id != "alpha"),
1343 "removed id `alpha` resurfaced with k={k}: {:?}",
1344 hits.iter().map(|h| &h.id).collect::<Vec<_>>()
1345 );
1346 }
1347
1348 // Other entries still findable.
1349 let hits = idx.search(&make_embedding(&[1.0, 0.0, 0.0, 0.0]), 5);
1350 let ids: Vec<&str> = hits.iter().map(|h| h.id.as_str()).collect();
1351 assert!(ids.contains(&"beta"));
1352 assert!(ids.contains(&"gamma"));
1353 }
1354
1355 // -----------------------------------------------------------------
1356 // W12-H — small edge cases
1357 // -----------------------------------------------------------------
1358
1359 #[test]
1360 fn empty_index_len_is_zero() {
1361 let idx = VectorIndex::empty();
1362 assert_eq!(idx.len(), 0);
1363 }
1364
1365 #[test]
1366 fn build_with_empty_entries_search_empty() {
1367 let idx = VectorIndex::build(Vec::new());
1368 assert_eq!(idx.len(), 0);
1369 let results = idx.search(&[1.0, 0.0, 0.0], 5);
1370 assert!(results.is_empty());
1371 }
1372
1373 #[test]
1374 fn search_with_k_zero_returns_empty() {
1375 let entries = vec![("a".into(), make_embedding(&[1.0, 0.0, 0.0]))];
1376 let idx = VectorIndex::build(entries);
1377 let results = idx.search(&make_embedding(&[1.0, 0.0, 0.0]), 0);
1378 assert!(results.is_empty());
1379 }
1380
1381 #[test]
1382 fn rebuild_on_empty_does_not_crash() {
1383 let idx = VectorIndex::empty();
1384 idx.rebuild();
1385 assert_eq!(idx.len(), 0);
1386 }
1387
1388 #[test]
1389 fn insert_increases_len() {
1390 let idx = VectorIndex::empty();
1391 idx.insert("a".into(), make_embedding(&[1.0, 0.0, 0.0]));
1392 idx.insert("b".into(), make_embedding(&[0.0, 1.0, 0.0]));
1393 assert_eq!(idx.len(), 2);
1394 }
1395
1396 #[test]
1397 fn embedding_point_distance_orthogonal() {
1398 let a = EmbeddingPoint(vec![1.0, 0.0, 0.0]);
1399 let b = EmbeddingPoint(vec![0.0, 1.0, 0.0]);
1400 // 1 - dot = 1 - 0 = 1
1401 assert!((a.distance(&b) - 1.0).abs() < 1e-6);
1402 }
1403
1404 #[test]
1405 fn embedding_point_distance_identical_is_zero() {
1406 let a = EmbeddingPoint(make_embedding(&[1.0, 1.0, 1.0]));
1407 // 1 - 1 = 0 (L2-normalised)
1408 assert!(a.distance(&a).abs() < 1e-6);
1409 }
1410
1411 #[test]
1412 fn remove_on_empty_index_is_noop() {
1413 let idx = VectorIndex::empty();
1414 idx.remove("nonexistent");
1415 assert_eq!(idx.len(), 0);
1416 }
1417
1418 #[test]
1419 fn insert_triggers_auto_rebuild_at_threshold() {
1420 // REBUILD_THRESHOLD = 200. Inserting that many into a fresh index
1421 // exercises the auto-rebuild branch in `insert`.
1422 let idx = VectorIndex::empty();
1423 for i in 0..205_usize {
1424 let mut v = vec![0.0_f32; 8];
1425 #[allow(clippy::cast_precision_loss)]
1426 let f = i as f32;
1427 v[i % 8] = 1.0 + f * 0.001;
1428 idx.insert(format!("id-{i}"), make_embedding(&v));
1429 }
1430 assert_eq!(idx.len(), 205);
1431 // After auto-rebuild, search still works — top-k returns hits.
1432 let q = make_embedding(&[1.0_f32; 8]);
1433 let hits = idx.search(&q, 5);
1434 assert_eq!(hits.len(), 5);
1435 }
1436
1437 #[test]
1438 fn test_rebuild_after_batch_insert_settles() {
1439 // Start empty, batch-insert N entries, force a rebuild, then assert
1440 // that top-K search returns exactly K results (deterministic count
1441 // for a fully-populated index with K <= len).
1442 let idx = VectorIndex::empty();
1443 let n = 25_usize;
1444 for i in 0..n {
1445 let mut v = vec![0.0_f32; 8];
1446 #[allow(clippy::cast_precision_loss)]
1447 let f = i as f32;
1448 v[i % 8] = 1.0 + f * 0.001;
1449 idx.insert(format!("id-{i}"), make_embedding(&v));
1450 }
1451 // Force a rebuild — overflow may not have hit REBUILD_THRESHOLD.
1452 idx.rebuild();
1453 assert_eq!(idx.len(), n);
1454
1455 let query = make_embedding(&[1.0; 8]);
1456 let k = 5;
1457 let hits = idx.search(&query, k);
1458 assert_eq!(
1459 hits.len(),
1460 k,
1461 "post-rebuild search top-{k} must return exactly {k} hits, got {:?}",
1462 hits.iter().map(|h| &h.id).collect::<Vec<_>>()
1463 );
1464
1465 // Distances should be sorted ascending (closest first).
1466 for w in hits.windows(2) {
1467 assert!(
1468 w[0].distance <= w[1].distance,
1469 "search results must be ascending by distance: {} > {}",
1470 w[0].distance,
1471 w[1].distance
1472 );
1473 }
1474
1475 // No duplicate ids in the result.
1476 let mut seen = std::collections::HashSet::new();
1477 for h in &hits {
1478 assert!(
1479 seen.insert(h.id.clone()),
1480 "duplicate id in search: {}",
1481 h.id
1482 );
1483 }
1484 }
1485
1486 // -----------------------------------------------------------------
1487 // v0.7.0 R3-S1 — eviction sink wires the on_index_eviction hook
1488 // -----------------------------------------------------------------
1489
1490 /// `test_hnsw_eviction_fires_hook` — when a sink is wired via
1491 /// [`VectorIndex::set_eviction_sink`] and the index inserts past
1492 /// its eviction cap, the eviction-edge code path pushes one
1493 /// [`EvictionEvent`] per evicted id onto the channel. This closes
1494 /// the G2/G8 "fire site exists but not wired" gap. We construct
1495 /// the index via [`VectorIndex::with_max_entries_for_test`] so a
1496 /// 6-entry insert sequence trips the eviction path in
1497 /// milliseconds without touching the production 100k cap.
1498 #[test]
1499 fn test_hnsw_eviction_fires_hook() {
1500 let (tx, rx) = std::sync::mpsc::channel::<EvictionEvent>();
1501 let idx = VectorIndex::with_max_entries_for_test(4);
1502 idx.set_eviction_sink(tx);
1503
1504 // Reset the process-local counters so concurrent tests
1505 // sharing the static don't bleed assertions into ours.
1506 reset_eviction_counters_for_test();
1507
1508 // Insert cap+2 entries — eviction drops the 2 oldest.
1509 let n = 6_usize;
1510 for i in 0..n {
1511 let mut v = vec![0.0_f32; 4];
1512 #[allow(clippy::cast_precision_loss)]
1513 let f = i as f32;
1514 v[i % 4] = 1.0 + f * 0.01;
1515 idx.insert(format!("evict-{i}"), make_embedding(&v));
1516 }
1517
1518 // Drain the channel. Expect TWO events (n=6, cap=4) — one
1519 // per evicted id. The unbounded sender does not block; the
1520 // events should already be enqueued by the time `insert`
1521 // returns, but we give the channel a small grace window for
1522 // thread-scheduling jitter on slow CI runners.
1523 let mut received: Vec<EvictionEvent> = Vec::new();
1524 let deadline = std::time::Instant::now() + std::time::Duration::from_millis(500);
1525 while std::time::Instant::now() < deadline && received.len() < 2 {
1526 while let Ok(ev) = rx.try_recv() {
1527 received.push(ev);
1528 }
1529 if received.len() < 2 {
1530 std::thread::sleep(std::time::Duration::from_millis(5));
1531 }
1532 }
1533
1534 assert_eq!(
1535 received.len(),
1536 2,
1537 "expected one EvictionEvent per evicted id (2 evictions for n=6, cap=4), got {}: {:?}",
1538 received.len(),
1539 received.iter().map(|e| &e.memory_id).collect::<Vec<_>>(),
1540 );
1541
1542 let ids: Vec<&str> = received.iter().map(|e| e.memory_id.as_str()).collect();
1543 assert!(
1544 ids.contains(&"evict-0"),
1545 "expected evict-0 in evicted ids; got {ids:?}"
1546 );
1547 assert!(
1548 ids.contains(&"evict-1"),
1549 "expected evict-1 in evicted ids; got {ids:?}"
1550 );
1551
1552 for ev in &received {
1553 assert_eq!(
1554 ev.reason, "max_entries_reached",
1555 "evicted reason should match the canonical tag, got {:?}",
1556 ev.reason
1557 );
1558 // namespace is intentionally empty at the hnsw layer
1559 // (the index does not carry namespace context); G9+ may
1560 // plumb it through. The wire field MUST be present even
1561 // when empty.
1562 assert_eq!(ev.namespace, "");
1563 assert!(
1564 !ev.evicted_at.is_empty(),
1565 "evicted_at must be set (rfc3339), got empty"
1566 );
1567 }
1568 }
1569
1570 /// Sanity: insertion without a sink wired is a no-op for the
1571 /// hook path. The eviction-edge code path must remain functional
1572 /// (oldest drained, cap enforced) even when no sink is set, so
1573 /// the CLI / test build's zero-cost posture is preserved.
1574 ///
1575 /// The proof of eviction is the PER-INDEX `len()` — after 6
1576 /// inserts into a max-4 index, exactly 4 entries survive, which
1577 /// is only possible if the eviction-edge path drained the 2
1578 /// oldest. We deliberately do NOT read the process-global
1579 /// `index_evictions_total()` counter here: a sibling test
1580 /// (`test_hnsw_eviction_fires_hook`) calls
1581 /// `reset_eviction_counters_for_test()`, which zeroes that
1582 /// shared static, so a concurrent multi-threaded test run could
1583 /// collapse a global-delta assertion to a flake. `idx.len()` is
1584 /// isolated to this index instance and deterministic.
1585 #[test]
1586 fn test_hnsw_eviction_without_sink_is_noop_for_hook() {
1587 let idx = VectorIndex::with_max_entries_for_test(4);
1588 // No `set_eviction_sink` call here — the index runs as in
1589 // CLI / pre-R3-S1 builds without a hooks pipeline.
1590
1591 for i in 0..6_usize {
1592 let mut v = vec![0.0_f32; 4];
1593 #[allow(clippy::cast_precision_loss)]
1594 let f = i as f32;
1595 v[i % 4] = 1.0 + f * 0.01;
1596 idx.insert(format!("noopsink-{i}"), make_embedding(&v));
1597 }
1598
1599 assert_eq!(
1600 idx.len(),
1601 4,
1602 "eviction-edge path must still enforce the max-4 cap even \
1603 without a sink wired (6 inserts → 4 survivors means 2 \
1604 evictions occurred); got len={}",
1605 idx.len()
1606 );
1607 }
1608}
1609
1610// ---------------------------------------------------------------------------
1611// #968 (Wave-2 Tier-C3) — async-rebuild + double-buffering regression tests
1612//
1613// These tests pin the contract introduced by issue #968:
1614// 1. A rebuild scheduled via `rebuild_async` does NOT block readers.
1615// Search calls dispatched concurrently with the build complete in
1616// <100 ms even when the build itself runs for seconds.
1617// 2. A build-time panic leaves `active` untouched so reads continue
1618// to serve the prior snapshot.
1619// 3. Writes that land DURING a rebuild are preserved: post-rebuild
1620// state includes the snapshot's entries PLUS the concurrent inserts.
1621// 4. The swap is atomic: no caller ever observes a partial-state graph
1622// (e.g. one with half the entries missing).
1623// ---------------------------------------------------------------------------
1624#[cfg(test)]
1625mod d1_968_tests {
1626 use super::*;
1627 use std::sync::Arc as TArc;
1628 use std::sync::atomic::AtomicUsize;
1629 use std::time::{Duration, Instant};
1630
1631 fn make_embedding(values: &[f32]) -> Vec<f32> {
1632 let norm: f32 = values.iter().map(|v| v * v).sum::<f32>().sqrt();
1633 values.iter().map(|v| v / norm).collect()
1634 }
1635
1636 /// Build a deterministic embedding-set fixture of `n` 16-dim
1637 /// L2-normalised vectors. Tests use this to make the `build_hnsw`
1638 /// pass non-trivial without inflating compile time.
1639 fn fixture(n: usize) -> Vec<(String, Vec<f32>)> {
1640 (0..n)
1641 .map(|i| {
1642 let mut v = vec![0.0_f32; 16];
1643 #[allow(clippy::cast_precision_loss)]
1644 let f = i as f32;
1645 v[i % 16] = 1.0 + f * 0.001;
1646 (format!("id-{i}"), make_embedding(&v))
1647 })
1648 .collect()
1649 }
1650
1651 /// #968 contract 1 — search must remain responsive while a rebuild
1652 /// runs in the background. We spawn a rebuild_async over a
1653 /// reasonably-sized fixture (the build is CPU-bound but not
1654 /// minutes-long at this scale) and concurrently issue 50 search
1655 /// calls. The reader-loop must complete well under the time it
1656 /// would take if every search had to wait on the rebuild's inner
1657 /// mutex (which it never holds for more than microseconds).
1658 #[test]
1659 fn rebuild_async_does_not_block_search_968() {
1660 let idx = TArc::new(VectorIndex::build(fixture(2_000)));
1661 let query = make_embedding(&[1.0_f32; 16]);
1662
1663 // Start the rebuild OFF-THREAD.
1664 let idx_for_rebuild = TArc::clone(&idx);
1665 let rebuild_handle = std::thread::spawn(move || idx_for_rebuild.rebuild_async());
1666 // Concurrent search loop: fire 50 searches.
1667 let idx_for_search = TArc::clone(&idx);
1668 let search_start = Instant::now();
1669 let search_handle = std::thread::spawn(move || {
1670 for _ in 0..50 {
1671 let hits = idx_for_search.search(&query, 10);
1672 // Each search must return at most 10 hits (the k cap)
1673 // and the fixture has >10 entries, so a non-empty
1674 // result is the expected output. We assert non-empty
1675 // (rather than ==10) because the swap mid-loop can
1676 // briefly leave the graph empty while overflow takes
1677 // over — both shapes are correct.
1678 assert!(
1679 !hits.is_empty(),
1680 "search returned empty during rebuild — readers were blocked or the graph was lost"
1681 );
1682 }
1683 });
1684 // Wait for both to finish. The rebuild thread may take seconds;
1685 // the search thread must NOT.
1686 let _ = search_handle.join().expect("search thread panicked");
1687 let search_elapsed = search_start.elapsed();
1688 // The 50-search loop with a 2k-entry index should complete in
1689 // tens of ms. We use a 5-second budget — wide enough to
1690 // absorb CI jitter, narrow enough to catch the v0.6 regression
1691 // (which would have blocked for ~3-10s on the rebuild mutex).
1692 assert!(
1693 search_elapsed < Duration::from_secs(5),
1694 "50 searches took {:?} — readers blocked on the rebuild (v0.6 regression)",
1695 search_elapsed,
1696 );
1697 let _ = rebuild_handle.join().expect("rebuild thread panicked");
1698 // Drain any pending warming into active so post-test state is clean.
1699 idx.try_swap_warming();
1700 }
1701
1702 /// #968 contract 2 — if the build fails (we cannot easily induce
1703 /// a panic from `instant_distance::Builder::build` deterministically;
1704 /// instead we exercise the rebuild_in_flight short-circuit + the
1705 /// "no warmed result" path), the active graph is unchanged.
1706 /// Concretely: we call `rebuild_async` while a prior rebuild is in
1707 /// flight; the second call returns a no-op handle and leaves
1708 /// `active` serving the prior snapshot.
1709 #[test]
1710 fn rebuild_failure_leaves_active_unchanged_968() {
1711 let entries = fixture(50);
1712 let idx = VectorIndex::build(entries.clone());
1713
1714 // Pre-rebuild: search returns expected ids.
1715 let query = make_embedding(&[1.0_f32; 16]);
1716 let pre_hits = idx.search(&query, 5);
1717 assert_eq!(pre_hits.len(), 5);
1718 let pre_ids: std::collections::HashSet<String> =
1719 pre_hits.iter().map(|h| h.id.clone()).collect();
1720
1721 // Force the in-flight flag on so the next rebuild_async takes
1722 // the short-circuit path (returns a no-op handle).
1723 idx.rebuild_in_flight.store(true, Ordering::SeqCst);
1724 let handle = idx.rebuild_async();
1725 let _ = handle.join();
1726 // Active should be UNCHANGED — no warmed graph was parked.
1727 let post_hits = idx.search(&query, 5);
1728 let post_ids: std::collections::HashSet<String> =
1729 post_hits.iter().map(|h| h.id.clone()).collect();
1730 assert_eq!(
1731 pre_ids, post_ids,
1732 "search results changed after a no-op rebuild — active was clobbered"
1733 );
1734
1735 // Cleanup: clear the manually-poked flag.
1736 idx.rebuild_in_flight.store(false, Ordering::SeqCst);
1737 }
1738
1739 /// #968 contract 3 — writes during a rebuild are preserved.
1740 /// We snapshot a baseline, kick off a rebuild_async, then issue
1741 /// N concurrent inserts. After the rebuild lands, every inserted
1742 /// id must be findable via search (either via the new graph if the
1743 /// snapshot captured it, or via the overflow if it arrived after
1744 /// the snapshot — both paths must surface the id).
1745 #[test]
1746 fn concurrent_writes_during_rebuild_consistent_968() {
1747 let idx = TArc::new(VectorIndex::build(fixture(500)));
1748 let handle = {
1749 let idx = TArc::clone(&idx);
1750 std::thread::spawn(move || idx.rebuild_async())
1751 };
1752
1753 // Issue 30 concurrent inserts. These flow into overflow +
1754 // all_entries; the snapshot already captured at rebuild_async
1755 // time has its own entry list, so these inserts will remain
1756 // in overflow until the NEXT rebuild — but the swap path
1757 // trims only the overflow PREFIX present at snapshot time, so
1758 // the new entries survive.
1759 let inserts_done = TArc::new(AtomicUsize::new(0));
1760 let mut writer_handles = Vec::new();
1761 for i in 0..30 {
1762 let idx = TArc::clone(&idx);
1763 let counter = TArc::clone(&inserts_done);
1764 writer_handles.push(std::thread::spawn(move || {
1765 let mut v = vec![0.0_f32; 16];
1766 #[allow(clippy::cast_precision_loss)]
1767 let f = i as f32;
1768 v[i % 16] = 2.0 + f * 0.01;
1769 idx.insert(format!("concurrent-{i}"), make_embedding(&v));
1770 counter.fetch_add(1, Ordering::SeqCst);
1771 }));
1772 }
1773 for h in writer_handles {
1774 let _ = h.join();
1775 }
1776 let rebuild_h = handle.join().expect("outer rebuild spawner panicked");
1777 let _ = rebuild_h.join();
1778 // v0.7.0 #1212 — deterministic post-rebuild observation barrier.
1779 // Pre-#1212 a single `try_swap_warming()` followed immediately
1780 // by the search loop raced the cross-thread publication of the
1781 // swap under stressed CI runners (`SAL-only feature gate` /
1782 // parallel-test load). The fix is two-fold:
1783 // (1) Drain ANY parked warming result in a tight loop — handles
1784 // the rare double-rebuild case (writer crossing
1785 // REBUILD_THRESHOLD during the concurrent-write phase).
1786 // (2) Yield briefly so any post-swap state (`valid_ids_cache`
1787 // lazy rebuild, HNSW search-side state init) settles
1788 // before the search loop reads it. 10 ms is generous
1789 // enough for any GHA runner under 4-way parallel-test
1790 // contention; the test still completes in <100 ms total.
1791 let mut swaps = 0_usize;
1792 while idx.try_swap_warming() {
1793 swaps += 1;
1794 // Defensive cap — at v0.7.0 there can be at most one
1795 // parked warming result at a time, but loop bounded just
1796 // in case a follow-on rebuild parks one while we drain.
1797 if swaps > 4 {
1798 break;
1799 }
1800 }
1801 std::thread::sleep(Duration::from_millis(10));
1802
1803 // Verify all 30 ids survived the rebuild. The CONTRACT under
1804 // test is "post-rebuild state INCLUDES the concurrent writes"
1805 // — i.e. every concurrent id is in `all_entries` (graph OR
1806 // overflow). We assert that directly via `len()` + a
1807 // sufficiently-wide search rather than relying on tie-break
1808 // determinism at small k. The baseline fixture (500 entries
1809 // across 16 axes) clusters ~32 entries per axis at
1810 // post-normalization distance 0 from any axis query; concurrent
1811 // entries on the same axis tie with them at distance 0, and
1812 // truncate-to-k can clip a single tied entry under
1813 // tie-break-dependent sort behavior. The fix is to widen k
1814 // beyond the tie cluster, NOT to weaken the contract.
1815 assert_eq!(inserts_done.load(Ordering::SeqCst), 30);
1816 let final_len = idx.len();
1817 // v0.7.0 #1212 — snapshot the post-swap private state ONCE so
1818 // every downstream assertion's panic message can cite the same
1819 // ground-truth set of values. Pre-#1212 a panic at line
1820 // src/hnsw.rs:1555 reported only "found < 29" with NO
1821 // observable state, making the CI flake un-diagnosable
1822 // without local repro. Capturing overflow.len() + hnsw size
1823 // here (under a single inner-lock guard) is the diagnostic
1824 // hook the operator + future agent needs to identify which
1825 // buffer the concurrent inserts landed in at panic time.
1826 let (overflow_len_dbg, hnsw_size_dbg) = {
1827 let state = idx.inner.lock().expect("inner mutex poisoned");
1828 let hnsw_size = state.hnsw.as_ref().map_or(0, |h| h.iter().count());
1829 (state.overflow.len(), hnsw_size)
1830 };
1831 assert_eq!(
1832 final_len,
1833 530,
1834 "post-rebuild len must equal baseline 500 + concurrent 30 = 530, \
1835 got {final_len} (overflow={overflow_len_dbg}, hnsw={hnsw_size_dbg}, \
1836 swaps={swaps}, inserts_done={})",
1837 inserts_done.load(Ordering::SeqCst)
1838 );
1839 // Assert the survival CONTRACT — "post-rebuild state INCLUDES
1840 // every concurrent write" — DETERMINISTICALLY via `all_entries`
1841 // membership (each id is in the graph OR overflow), not via the
1842 // approximate HNSW `search()` path.
1843 //
1844 // Why not `search()`: the active graph is built with
1845 // `Builder::default()` and queried with `Search::default()`, so
1846 // the search beam (`ef`) is a fixed library default that does
1847 // NOT scale with `k`. `search(q, 600)` over a 530-entry index
1848 // therefore returns only the top-`ef` graph candidates plus the
1849 // overflow scan — a graph-resident entry outside the beam is
1850 // intermittently clipped under stressed / instrumented runners.
1851 // The llvm-cov coverage job (run 27001253837/27001253833) hit
1852 // exactly this: `found` came back 28/30 while `final_len == 530`
1853 // simultaneously PROVED all 30 were present. Earlier revisions
1854 // chased it by widening `k` (no effect — the limit is `ef`, not
1855 // `k`) and then relaxing the floor to 29 (still flaky: 2-id
1856 // clips occur under heavy instrumentation). Membership over
1857 // `all_entries` is exact and race-free, so it is both the
1858 // correct expression of the contract AND immune to ANN-recall
1859 // jitter.
1860 //
1861 // The `swaps`/`overflow`/`hnsw` diagnostics captured above are
1862 // retained in the panic message so any genuine drop (a swap that
1863 // loses a write) still names the failure mode in one read.
1864 let present: std::collections::HashSet<String> = {
1865 let state = idx.inner.lock().expect("inner mutex poisoned");
1866 state.all_entries.iter().map(|(id, _)| id.clone()).collect()
1867 };
1868 let missing: Vec<String> = (0..30)
1869 .map(|i| format!("concurrent-{i}"))
1870 .filter(|id| !present.contains(id))
1871 .collect();
1872 assert!(
1873 missing.is_empty(),
1874 "post-rebuild all_entries must INCLUDE every concurrent write \
1875 (missing={missing:?}; post-swap state: overflow={overflow_len_dbg}, \
1876 hnsw={hnsw_size_dbg}, swaps={swaps}, final_len={final_len}); \
1877 a missing id means the concurrent-rebuild swap dropped a write"
1878 );
1879 }
1880
1881 /// #968 contract 4 — the swap is atomic. We never observe a
1882 /// half-populated graph during the swap window. To check this we
1883 /// run many search-then-len pairs concurrently with a rebuild
1884 /// and assert that `len()` is monotonically >= the baseline at
1885 /// every observation point (the swap only adds entries, never
1886 /// loses them).
1887 #[test]
1888 fn rebuild_swap_is_atomic_968() {
1889 let idx = TArc::new(VectorIndex::build(fixture(1_000)));
1890 let baseline_len = idx.len();
1891 let stop = TArc::new(AtomicBool::new(false));
1892 let observer_stop = TArc::clone(&stop);
1893 let idx_obs = TArc::clone(&idx);
1894 let observer = std::thread::spawn(move || {
1895 while !observer_stop.load(Ordering::SeqCst) {
1896 let l = idx_obs.len();
1897 assert!(
1898 l >= baseline_len,
1899 "len() dropped below baseline during rebuild — partial swap observed: {l} < {baseline_len}"
1900 );
1901 }
1902 });
1903 // Run a real rebuild.
1904 let h = idx.rebuild_async();
1905 let _ = h.join();
1906 idx.try_swap_warming();
1907 // Stop the observer.
1908 stop.store(true, Ordering::SeqCst);
1909 let _ = observer.join();
1910 assert_eq!(idx.len(), baseline_len);
1911 }
1912
1913 // v0.7.0 #1074 (SR-2 #2, HIGH) — eviction-then-rebuild gap.
1914 // The swap path must DROP a warming result whose
1915 // `overflow_generation` doesn't match the current state, so an
1916 // entry inserted into overflow AFTER a clear() bump is not
1917 // mistakenly drained out as if it were a captured snapshot
1918 // entry. We park a stale-gen result directly in the warming
1919 // slot and confirm try_swap_warming refuses to swap AND does
1920 // not drain overflow.
1921 #[test]
1922 fn stale_warming_swap_is_dropped_1074() {
1923 let idx = VectorIndex::empty();
1924 // Insert a non-trivial overflow entry so we can assert
1925 // try_swap_warming doesn't drain it.
1926 idx.insert(
1927 "alpha".to_string(),
1928 make_embedding(&[1.0_f32, 0.0, 0.0, 0.0]),
1929 );
1930 let before_overflow = idx.inner.lock().unwrap().overflow.len();
1931 assert_eq!(before_overflow, 1);
1932
1933 // Park a STALE-generation warming result with a swap that
1934 // would otherwise drain everything.
1935 {
1936 let current_gen = idx.inner.lock().unwrap().overflow_generation;
1937 let mut w = idx.warming.lock().unwrap();
1938 *w = Some(RebuildResult {
1939 hnsw: None,
1940 overflow_at_snapshot: 999, // would have drained the whole overflow
1941 overflow_generation: current_gen.wrapping_add(1), // mismatched
1942 entries_in_graph: 0,
1943 });
1944 }
1945
1946 // Swap MUST refuse and leave overflow intact.
1947 let swapped = idx.try_swap_warming();
1948 assert!(
1949 !swapped,
1950 "stale-by-generation warming must NOT swap in (#1074)"
1951 );
1952 let after_overflow = idx.inner.lock().unwrap().overflow.len();
1953 assert_eq!(
1954 after_overflow, before_overflow,
1955 "stale swap must NOT drain overflow (#1074 regression)"
1956 );
1957 // The alpha entry must still be findable via the linear
1958 // overflow scan (no graph yet).
1959 let hits = idx.search(&make_embedding(&[1.0_f32, 0.0, 0.0, 0.0]), 5);
1960 assert!(hits.iter().any(|h| h.id == "alpha"));
1961 }
1962
1963 // v0.7.0 #1074 — confirm overflow.clear() in the eviction path
1964 // bumps the generation counter so a snapshot captured BEFORE the
1965 // eviction will fail the gen check on swap. This pins the load-
1966 // bearing invariant: clear() must always bump.
1967 #[test]
1968 fn eviction_clear_bumps_overflow_generation_1074() {
1969 let idx = VectorIndex::with_max_entries_for_test(2);
1970 let gen_initial = idx.inner.lock().unwrap().overflow_generation;
1971 // 3 inserts past cap=2 → at least one eviction-clear fires.
1972 for i in 0..3 {
1973 let mut v = vec![0.0_f32; 4];
1974 v[i % 4] = 1.0;
1975 idx.insert(format!("e{i}"), make_embedding(&v));
1976 }
1977 let gen_after = idx.inner.lock().unwrap().overflow_generation;
1978 assert!(
1979 gen_after > gen_initial,
1980 "eviction-clear path must bump overflow_generation (#1074)"
1981 );
1982 }
1983
1984 // -----------------------------------------------------------------
1985 // #1579 — async-boot seeding + coverage accounting
1986 // -----------------------------------------------------------------
1987
1988 #[test]
1989 fn seed_entries_dedupes_and_defers_searchability_1579() {
1990 let idx = VectorIndex::empty();
1991 // A write that arrived through the normal path stays covered
1992 // (overflow is linearly scanned).
1993 idx.insert("live".into(), make_embedding(&[1.0, 0.0, 0.0]));
1994 assert!(
1995 idx.is_fully_searchable(),
1996 "overflow-only index is fully searchable"
1997 );
1998 // Seed the boot snapshot; the id already present must be
1999 // skipped (no double-count), the rest land in all_entries
2000 // only.
2001 let seeded = idx.seed_entries(vec![
2002 ("live".into(), make_embedding(&[1.0, 0.0, 0.0])),
2003 ("a".into(), make_embedding(&[0.0, 1.0, 0.0])),
2004 ("b".into(), make_embedding(&[0.0, 0.0, 1.0])),
2005 ]);
2006 assert_eq!(seeded, 2, "duplicate id must be skipped");
2007 assert_eq!(idx.len(), 3);
2008 assert!(
2009 !idx.is_fully_searchable(),
2010 "seeded-but-unbuilt entries must report not-fully-searchable \
2011 so the conflict check routes to its fallback (#1579 A5/B3)"
2012 );
2013 // Search during the warm window must still serve the live
2014 // (overflow) entry — and must NOT pretend to cover the seeds.
2015 let hits = idx.search(&make_embedding(&[0.0, 1.0, 0.0]), 3);
2016 assert!(hits.iter().all(|h| h.id == "live"));
2017 }
2018
2019 #[test]
2020 fn warm_boot_seeds_builds_and_swaps_1579() {
2021 let idx = VectorIndex::empty();
2022 let seeded = idx.warm_boot(vec![
2023 ("a".into(), make_embedding(&[1.0, 0.0, 0.0])),
2024 ("b".into(), make_embedding(&[0.0, 1.0, 0.0])),
2025 ("c".into(), make_embedding(&[0.0, 0.0, 1.0])),
2026 ]);
2027 assert_eq!(seeded, 3);
2028 assert!(
2029 idx.is_fully_searchable(),
2030 "warm_boot must drive rebuild→swap to completion"
2031 );
2032 let hits = idx.search(&make_embedding(&[1.0, 0.0, 0.0]), 2);
2033 assert_eq!(hits.first().map(|h| h.id.as_str()), Some("a"));
2034 }
2035
2036 #[test]
2037 fn build_and_insert_preserve_full_searchability_1579() {
2038 let idx = VectorIndex::build(vec![
2039 ("a".into(), make_embedding(&[1.0, 0.0, 0.0])),
2040 ("b".into(), make_embedding(&[0.0, 1.0, 0.0])),
2041 ]);
2042 assert!(idx.is_fully_searchable(), "synchronous build covers all");
2043 idx.insert("c".into(), make_embedding(&[0.0, 0.0, 1.0]));
2044 assert!(
2045 idx.is_fully_searchable(),
2046 "insert lands in overflow — coverage preserved"
2047 );
2048 idx.remove("a");
2049 assert!(
2050 idx.is_fully_searchable(),
2051 "remove only shrinks all_entries — coverage preserved"
2052 );
2053 }
2054}