ai_memory/hnsw.rs
1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! HNSW (Hierarchical Navigable Small World) vector index for fast approximate
5//! nearest-neighbor search over memory embeddings.
6//!
7//! Built on `instant-distance`. The index is constructed at startup from all
8//! stored embeddings. New memories added during the session go into an overflow
9//! list that is scanned linearly alongside the HNSW results — the index is
10//! rebuilt lazily once the overflow exceeds a threshold.
11
12use instant_distance::{Builder, HnswMap, Search};
13// `instant_distance::Point` is the trait that supplies the
14// `EmbeddingPoint::distance` method; it has to be in scope for the
15// in-module tests (`embedding_point_distance_*`) to call it as a
16// method. The lib code itself goes through the slice-borrow
17// `cosine_distance` helper post-#1087 so the `Point` impl is the
18// only consumer of the trait at the bare-name level.
19#[cfg(test)]
20use instant_distance::Point;
21use std::sync::Arc;
22use std::sync::Mutex;
23use std::sync::atomic::{AtomicBool, Ordering};
24use std::sync::mpsc::Sender;
25use std::thread::JoinHandle;
26
27use crate::hooks::EvictionEvent;
28
29/// Tracing target for the HNSW eviction worker (#1558 tracing-target SSOT).
30const EVICTION_TRACE_TARGET: &str = "hnsw.eviction";
31
32/// Maximum overflow entries before triggering a rebuild.
33const REBUILD_THRESHOLD: usize = 200;
34
35/// #1037 (2026-05-21) — bounded spin-wait window for [`VectorIndex::rebuild`]
36/// (the sync shim) when [`VectorIndex::rebuild_async`] short-circuited to
37/// a no-op handle because a prior async rebuild was still in flight.
38/// 1 second is well under any sensible sync-rebuild expectation
39/// (production callers use `rebuild_async`); the budget exists only
40/// to convert "silently return stale graph" → "bounded timeout, then
41/// best-effort swap".
42const REBUILD_WAIT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(1);
43/// Poll cadence for bounded waits on an in-flight rebuild. Shared by
44/// the [`VectorIndex::rebuild`] sync shim (#1037) and the #1579 B3
45/// boot warm-up retry loops (`warm_boot`,
46/// `daemon_runtime::spawn_vector_index_boot_load`).
47pub(crate) const REBUILD_WAIT_POLL_INTERVAL: std::time::Duration =
48 std::time::Duration::from_millis(10);
49
50/// #1579 B3 — minimum embedded-row count before a ONE-SHOT CLI
51/// invocation builds an HNSW graph. Below this, the recall pipeline's
52/// linear-scan fallback (`vector_index = None`) answers the semantic
53/// phase in ≤ 35 ms (P1 audit, 10-20k rows) while graph construction
54/// costs ~40 s at 10k vectors — a build that is then thrown away when
55/// the process exits. Long-lived processes (serve / MCP stdio) always
56/// build (asynchronously, see `daemon_runtime::spawn_vector_index_boot_load`)
57/// because they amortise the cost across many recalls.
58pub const CLI_HNSW_BUILD_MIN_ENTRIES: usize = 20_000;
59
60/// Maximum entries before evicting oldest to prevent unbounded memory growth.
61///
62/// Production code uses the constant 100_000. Tests may construct a
63/// `VectorIndex` with a custom cap via [`VectorIndex::with_max_entries_for_test`]
64/// — that knob is stored on the index instance itself, so it does
65/// NOT affect concurrent tests running with the default cap. The
66/// constant lives here so call sites (and the per-event tracing
67/// payload) reference one canonical value.
68const MAX_ENTRIES: usize = 100_000;
69
70// ---------------------------------------------------------------------------
71// v0.6.3.1 (P3, G2): eviction observability
72//
73// `MAX_ENTRIES`-triggered eviction in `insert()` previously dropped the
74// oldest embeddings silently — operators near the cap lost recall quality
75// invisibly. The two counters below + the structured `hnsw.eviction`
76// tracing event close that gap:
77//
78// - eviction count — cumulative count surfaced via
79// `db::stats().index_evictions_total` (and capabilities) AND at
80// `/metrics` as `ai_memory_hnsw_evictions_total`.
81// - last-eviction wall clock — UNIX nanoseconds of the most recent
82// eviction; capabilities derive `hnsw.evicted_recently` from this
83// with a 60 s rolling window.
84//
85// **pm-v3.1 PR8 (issue #1174).** Pre-PR8 the counters were two free
86// `static AtomicU64`s at the top of this file. PR8 sank both into the
87// metrics registry (`src/metrics.rs::HNSW_EVICTIONS_TOTAL` +
88// `HNSW_LAST_EVICTION_AT_NANOS`, plus matching Prometheus
89// `IntCounter` / `IntGauge` handles on `Metrics`) so the eviction
90// signal is `/metrics`-scrape-visible without a separate observer
91// thread. The accessor signatures here are preserved verbatim for
92// call-site backward compat.
93//
94// Process-local. The counters reset on restart because the index itself
95// resets on restart. Both atomics are touched only on the eviction edge
96// (rare: requires >100k vectors), so there is no measurable hot-path cost.
97// ---------------------------------------------------------------------------
98
99/// Cumulative HNSW oldest-eviction count since process start.
100///
101/// Surfaces in `memory_stats`. Non-zero indicates the in-memory vector
102/// index has hit `MAX_ENTRIES` and dropped older embeddings; recall
103/// quality may have degraded for evicted ids until they are re-inserted
104/// (e.g. on next access via `recall` touch path).
105///
106/// pm-v3.1 PR8: thin shim over `crate::metrics::hnsw_evictions_total()`.
107#[must_use]
108pub fn index_evictions_total() -> u64 {
109 crate::metrics::hnsw_evictions_total()
110}
111
112// ---------------------------------------------------------------------------
113// M8 (v0.7.0 round-2) — eviction-rate observability.
114//
115// Operators who hit the 100k cap need two signals:
116//
117// 1. Per-eviction WARN — surface every eviction event so operators
118// see drift before recall quality has noticeably degraded.
119// 2. Rolling-rate ERROR — when the trailing-hour eviction rate
120// exceeds the M8 ceiling, escalate to ERROR so the ops dashboard
121// raises a page. The escalation message names the operator
122// knobs (`vector_index_capacity` / "move to dedicated vector DB")
123// so the on-call has the remediation in the log line.
124//
125// Implementation: a small fixed-size ring buffer of UNIX-nanosecond
126// timestamps. Each eviction `push`es a stamp; the rolling-rate check
127// counts how many stamps sit inside the trailing-hour window. The
128// ring is locked behind a `Mutex` for write-coherent visibility; the
129// path runs only on the eviction edge so the lock cost is negligible.
130// ---------------------------------------------------------------------------
131
132/// M8 eviction-rate ceiling: events / hour past which the rolling
133/// observer escalates from WARN to ERROR.
134const EVICTION_RATE_CEILING_PER_HOUR: usize = 10;
135
136/// Rolling-hour ring buffer capacity. Chosen so the ring can hold the
137/// ceiling plus headroom for burstiness; older entries are
138/// transparently evicted on push.
139const EVICTION_RATE_RING_CAP: usize = 64;
140
141/// v0.7.0 #1093 — eviction-rate ring buffer. Switched from
142/// `Mutex<Vec<u64>>` to `Mutex<VecDeque<u64>>` so the cap-eviction
143/// path is O(1) `pop_front` instead of O(N) `Vec::remove(0)`.
144static EVICTION_RATE_RING: Mutex<std::collections::VecDeque<u64>> =
145 Mutex::new(std::collections::VecDeque::new());
146
147/// Whether an eviction occurred within the trailing `window_secs`.
148///
149/// Used by capabilities (P1) to set `hnsw.evicted_recently` so operators
150/// can see ongoing pressure on the cap, not just the cumulative count.
151/// Returns `false` when no evictions have ever happened in this process.
152#[must_use]
153pub fn evicted_recently(window_secs: u64) -> bool {
154 let last = crate::metrics::hnsw_last_eviction_at_nanos();
155 if last == 0 {
156 return false;
157 }
158 let now_nanos = std::time::SystemTime::now()
159 .duration_since(std::time::UNIX_EPOCH)
160 .map(|d| d.as_nanos())
161 .unwrap_or(0);
162 // Saturating math: clock can move backwards on some VMs.
163 let elapsed_nanos = u128::from(u64::MAX).min(now_nanos.saturating_sub(u128::from(last)));
164 elapsed_nanos < u128::from(window_secs).saturating_mul(1_000_000_000)
165}
166
167/// Reset the eviction counters. Test-only — production callers must not
168/// reach into the counter directly. The function is `pub` (rather than
169/// `pub(crate)`) so the integration-test crate at `tests/` can drive it
170/// alongside the public `index_evictions_total()` accessor; renaming
171/// keeps the intent obvious at every call site.
172///
173/// pm-v3.1 PR8: thin shim over
174/// `crate::metrics::reset_hnsw_eviction_counters_for_test()`.
175#[doc(hidden)]
176pub fn reset_eviction_counters_for_test() {
177 crate::metrics::reset_hnsw_eviction_counters_for_test();
178 if let Ok(mut g) = EVICTION_RATE_RING.lock() {
179 g.clear();
180 }
181}
182
183/// M8 (v0.7.0 round-2) — push the latest eviction timestamp into the
184/// rolling-hour ring and return how many stamps now sit inside the
185/// trailing hour. Producers call this once per eviction event;
186/// the caller branches on the returned count to escalate from WARN
187/// (already emitted) to ERROR.
188fn record_eviction_and_count_recent(now_nanos: u64) -> usize {
189 const ONE_HOUR_NANOS: u64 = crate::SECS_PER_HOUR as u64 * 1_000_000_000;
190 let cutoff = now_nanos.saturating_sub(ONE_HOUR_NANOS);
191 let Ok(mut ring) = EVICTION_RATE_RING.lock() else {
192 // Poisoned lock — observability is best-effort, return 0 so
193 // the caller does not over-escalate.
194 return 0;
195 };
196 // Drop stale entries first so the ring stays bounded and the
197 // count reflects the trailing hour.
198 ring.retain(|t| *t >= cutoff);
199 if ring.len() >= EVICTION_RATE_RING_CAP {
200 // v0.7.0 #1093 — VecDeque::pop_front is O(1); pre-#1093
201 // Vec::remove(0) was O(N) (backing-buffer shift).
202 ring.pop_front();
203 }
204 ring.push_back(now_nanos);
205 ring.len()
206}
207
208/// A point in the HNSW index — wraps a dense embedding vector.
209#[derive(Clone, Debug)]
210pub struct EmbeddingPoint(pub Vec<f32>);
211
212/// v0.7.0 #1087 — slice-borrow cosine-distance helper used by the
213/// overflow scan in [`VectorIndex::search`] to compute distances
214/// against the stored `Vec<f32>` without cloning each overflow
215/// embedding into a fresh `EmbeddingPoint`. Embeddings are
216/// L2-normalised so dot product = cosine similarity.
217#[inline]
218fn cosine_distance(a: &[f32], b: &[f32]) -> f32 {
219 let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
220 1.0 - dot
221}
222
223impl instant_distance::Point for EmbeddingPoint {
224 fn distance(&self, other: &Self) -> f32 {
225 cosine_distance(&self.0, &other.0)
226 }
227}
228
229// ---------------------------------------------------------------------------
230// #968 (Wave-2 Tier-C3) — async rebuild + double-buffering.
231//
232// Prior to #968 every HNSW rebuild ran SYNCHRONOUSLY on the request thread:
233// `Self::build_hnsw(&state.all_entries)` is CPU-bound (graph construction
234// is O(N log N) with constant factors that put 100k vectors at ~3-10s on
235// commodity hardware) and the producer's `insert()` call simply blocked
236// until the new graph was ready. Search callers contending for the same
237// `inner` mutex blocked too — recall p95 spiked from <20 ms to multi-second.
238//
239// The fix is a double-buffer pattern with background-task swap-in:
240// • The `active` slot (inside `IndexState`) is the index that serves
241// reads. Search holds the inner lock only long enough to clone the
242// overflow + collect valid IDs; the HNSW search itself runs against
243// the active graph held under the same lock (instant-distance's
244// `Search::default()` is per-call scratch, no shared state).
245// • The `warming` slot is `Arc<Mutex<Option<HnswMap>>>`. A background
246// thread (`std::thread::spawn` — HNSW build is CPU-bound; no tokio
247// runtime needed) builds the new graph from a snapshot of
248// `all_entries`, then drops it into `warming`. On the next
249// `try_swap_warming()` (called from search + insert + explicit poll)
250// the warmed graph atomically replaces `active`. The mutex hold
251// spans only the std::mem::swap — microseconds.
252// • Concurrent writes during rebuild: writes flow into `overflow` and
253// `all_entries` normally while the background task is building from
254// the snapshot. On swap, we trim `overflow` of the entries already
255// captured in the snapshot (the snapshot length is recorded when the
256// job kicks off). Entries inserted AFTER the snapshot remain in
257// overflow and are searched linearly until the next rebuild captures
258// them. No write is ever dropped.
259// • Rebuild failures: a panicking build-thread leaves `warming`
260// untouched (`None`); `active` is unchanged. The `JoinHandle` exposes
261// the panic to the caller via `JoinHandle::join()`. The
262// `rebuild_in_flight` atomic flips back to `false` whether the
263// thread succeeded or panicked (via a drop-guard `RebuildGuard`).
264// ---------------------------------------------------------------------------
265
266/// Snapshot-bound rebuild job. Carries the captured `all_entries` plus
267/// the overflow length at snapshot time so the post-swap overflow trim
268/// is deterministic. The trim must use the overflow length specifically
269/// (NOT `all_entries.len()`) because writes between snapshot and swap
270/// extend overflow; only the overflow PREFIX whose entries are now in
271/// the new graph is safe to drop.
272struct RebuildSnapshot {
273 entries: Vec<(String, Vec<f32>)>,
274 /// Length of `overflow` at the moment the snapshot was taken. The
275 /// swap path drains the first `overflow_at_snapshot` entries from
276 /// `state.overflow` — those entries are now in the new graph.
277 /// Anything inserted AFTER the snapshot remains in overflow for
278 /// the next rebuild cycle. Capturing the OVERFLOW length (not the
279 /// all-entries length) is load-bearing for correctness under
280 /// concurrent writes during rebuild.
281 overflow_at_snapshot: usize,
282 /// v0.7.0 #1074 (SR-2 #2, HIGH) — generation counter snapshot.
283 /// The eviction path bumps `state.overflow_generation` and
284 /// `state.clear()`s `overflow`. If a rebuild snapshot was
285 /// captured BEFORE the eviction, its `overflow_generation` will
286 /// not match the post-eviction `state.overflow_generation`, and
287 /// the swap path knows the snapshot is stale: it must NOT drain
288 /// overflow entries (those entries are post-eviction inserts not
289 /// in the snapshot's `entries`), and the warmed graph itself is
290 /// stale (it was built from pre-eviction `all_entries` that have
291 /// since been shrunk). The safe action is to drop the warmed
292 /// result without swapping AND without draining, then let the
293 /// next rebuild capture the current state.
294 overflow_generation: u64,
295}
296
297/// Drop-guard that clears the `rebuild_in_flight` flag even if the
298/// background build panics. Without this, a panic in `build_hnsw` (e.g.
299/// OOM in `instant_distance::Builder::build`) would leave the flag
300/// stuck-on and prevent any future rebuild from being scheduled.
301struct RebuildGuard {
302 flag: Arc<AtomicBool>,
303}
304
305impl Drop for RebuildGuard {
306 fn drop(&mut self) {
307 self.flag.store(false, Ordering::SeqCst);
308 }
309}
310
311/// Thread-safe HNSW index over memory embeddings.
312pub struct VectorIndex {
313 /// The built HNSW index — maps embedding points to memory IDs.
314 inner: Mutex<IndexState>,
315 /// #968 — warming slot for the double-buffer pattern. The background
316 /// rebuild thread parks the freshly-built graph here; readers/writers
317 /// observe it via [`Self::try_swap_warming`] on their next inner-lock
318 /// acquisition. `Some` means a rebuild has finished and is awaiting
319 /// swap-in; `None` means no warmed graph is ready.
320 warming: Arc<Mutex<Option<RebuildResult>>>,
321 /// #968 — coordinator flag. `true` while a background rebuild is in
322 /// flight; prevents the auto-rebuild path in `insert()` from
323 /// spawning a second concurrent build (one CPU-bound build at a
324 /// time is enough — successive rebuilds chase the same target).
325 /// Cleared by the rebuild thread's drop-guard whether the build
326 /// succeeded or panicked.
327 rebuild_in_flight: Arc<AtomicBool>,
328 /// v0.7.0 (R3-S1) — eviction sink. The `MAX_ENTRIES`-triggered
329 /// drain in `insert()` pushes an [`EvictionEvent`] onto this
330 /// channel for each evicted id; a hook-aware observer above this
331 /// layer drains the channel and fires the `on_index_eviction`
332 /// chain off the hot path. Wired by the daemon at startup
333 /// (`daemon_runtime`) via [`Self::set_eviction_sink`]. Optional —
334 /// CLI / test builds that never bring up the hooks pipeline leave
335 /// it `None` and the sink-push is a no-op so eviction throughput
336 /// is unaffected. Closes the G2 / G8 "fire site exists but not
337 /// wired" gap that the prior `tracing::warn!`-only implementation
338 /// left open.
339 ///
340 /// `Mutex` (not `RwLock`) because writes happen exactly twice in
341 /// the process lifetime (`set_eviction_sink` at startup and
342 /// `Drop`) and reads happen only on the eviction edge which is
343 /// itself already serialized through `inner`. The non-blocking
344 /// `try_send` semantics on the channel make sink-push safe to
345 /// hold across the inner-state lock without risk of deadlock.
346 eviction_sink: Mutex<Option<Sender<EvictionEvent>>>,
347}
348
349/// #968 — payload the rebuild thread parks in the `warming` slot when
350/// the build completes. Carries the new graph PLUS the overflow length
351/// at snapshot time so the swap path can trim `overflow` deterministically:
352/// the prefix `..overflow_at_snapshot` is now in the graph; entries
353/// inserted AFTER the snapshot (the suffix) remain in `overflow` for
354/// the next cycle.
355struct RebuildResult {
356 hnsw: Option<HnswMap<EmbeddingPoint, String>>,
357 overflow_at_snapshot: usize,
358 /// v0.7.0 #1074 — propagated from the snapshot so the swap path
359 /// can detect a stale-by-eviction warming result.
360 overflow_generation: u64,
361 /// #1579 — number of entries captured in the snapshot this graph
362 /// was built from. On swap it becomes the new
363 /// `IndexState::graph_entry_count`, the coverage accounting behind
364 /// [`VectorIndex::is_fully_searchable`].
365 entries_in_graph: usize,
366}
367
368struct IndexState {
369 hnsw: Option<HnswMap<EmbeddingPoint, String>>,
370 /// Entries added after the last rebuild. Searched linearly.
371 overflow: Vec<(String, Vec<f32>)>,
372 /// All entries (for rebuild). Kept in sync with the index + overflow.
373 all_entries: Vec<(String, Vec<f32>)>,
374 /// v0.7.0 R3-S1 — per-instance eviction cap. Defaults to
375 /// [`MAX_ENTRIES`] (the production 100k). Tests construct an
376 /// index with a smaller cap via
377 /// [`VectorIndex::with_max_entries_for_test`] so the eviction
378 /// edge can be exercised without inserting 100k vectors. Storing
379 /// the cap per-instance (rather than as a process-wide atomic)
380 /// keeps concurrent tests independent.
381 max_entries: usize,
382 /// v0.7.0 #1074 (SR-2 #2, HIGH) — generation counter bumped on
383 /// every `overflow.clear()` (eviction-edge path). Snapshots
384 /// captured before a clear carry the old generation; the swap
385 /// path compares against the current generation and drops the
386 /// warming result without swapping when they don't match. Closes
387 /// the gap where an entry inserted between a snapshot capture
388 /// and the eviction-clear was incorrectly drained by the swap.
389 overflow_generation: u64,
390 /// v0.7.0 #1087 — cached HashSet view of `all_entries` ids used
391 /// by [`VectorIndex::search`] as the stale-id filter. Built
392 /// lazily on the first search after a mutation; invalidated to
393 /// `None` on insert push, eviction drain, and remove retain.
394 /// Pre-#1087 this set was rebuilt on EVERY recall.
395 ///
396 /// PERF-7 (FX-C4-batch2, 2026-05-26): the cache stores
397 /// `Arc<str>` per id instead of `String`. UUID strings are 36
398 /// bytes; `Arc<str>` is a 16-byte fat pointer with the bytes
399 /// heap-allocated once, whereas a `String` is a 24-byte
400 /// (ptr/len/cap) struct with the bytes heap-allocated PLUS the
401 /// 24 bytes inline. On a 100 000-entry warm-up the change
402 /// halves the per-rebuild allocator pressure (16 vs 24+ bytes
403 /// per entry plus the no-spare-capacity heap-side). HashSet
404 /// lookup against `&str` works through the `Borrow<str>` impl.
405 valid_ids_cache: Option<std::collections::HashSet<std::sync::Arc<str>>>,
406 /// #1579 — number of entries baked into the ACTIVE graph (the
407 /// snapshot length at its build time; 0 when `hnsw` is `None`).
408 /// Together with `overflow.len()` this tells whether a search can
409 /// see every live entry: entries seeded into `all_entries` by the
410 /// async-boot loader ([`VectorIndex::seed_entries`]) are in
411 /// NEITHER the graph NOR `overflow` until the boot rebuild swaps
412 /// in, and [`VectorIndex::is_fully_searchable`] reports `false`
413 /// for that window so callers (the #519 proactive conflict check)
414 /// can route to their non-index fallback instead of silently
415 /// searching an index that cannot return the seeded rows.
416 graph_entry_count: usize,
417}
418
419/// A search result from the vector index.
420#[derive(Debug, Clone)]
421pub struct VectorHit {
422 pub id: String,
423 pub distance: f32,
424}
425
426impl VectorIndex {
427 /// Build a new index from a list of (`memory_id`, embedding) pairs.
428 pub fn build(entries: Vec<(String, Vec<f32>)>) -> Self {
429 let hnsw = Self::build_hnsw(&entries);
430 let graph_entry_count = entries.len();
431 VectorIndex {
432 inner: Mutex::new(IndexState {
433 hnsw,
434 overflow: Vec::new(),
435 all_entries: entries,
436 max_entries: MAX_ENTRIES,
437 overflow_generation: 0,
438 valid_ids_cache: None,
439 graph_entry_count,
440 }),
441 eviction_sink: Mutex::new(None),
442 warming: Arc::new(Mutex::new(None)),
443 rebuild_in_flight: Arc::new(AtomicBool::new(false)),
444 }
445 }
446
447 /// Build an empty index.
448 pub fn empty() -> Self {
449 VectorIndex {
450 inner: Mutex::new(IndexState {
451 hnsw: None,
452 overflow: Vec::new(),
453 all_entries: Vec::new(),
454 max_entries: MAX_ENTRIES,
455 overflow_generation: 0,
456 valid_ids_cache: None,
457 graph_entry_count: 0,
458 }),
459 eviction_sink: Mutex::new(None),
460 warming: Arc::new(Mutex::new(None)),
461 rebuild_in_flight: Arc::new(AtomicBool::new(false)),
462 }
463 }
464
465 /// v0.7.0 R3-S1 — Build an empty index with a custom eviction
466 /// cap. Test-only: lets a 5-entry insert sequence exercise the
467 /// eviction edge in milliseconds (vs. the ~minute-scale cost of
468 /// inserting 100k vectors at the production cap). The knob is
469 /// stored per-instance so concurrent tests using the default
470 /// cap are unaffected.
471 #[doc(hidden)]
472 #[must_use]
473 pub fn with_max_entries_for_test(max_entries: usize) -> Self {
474 VectorIndex {
475 inner: Mutex::new(IndexState {
476 hnsw: None,
477 overflow: Vec::new(),
478 all_entries: Vec::new(),
479 max_entries,
480 overflow_generation: 0,
481 valid_ids_cache: None,
482 graph_entry_count: 0,
483 }),
484 eviction_sink: Mutex::new(None),
485 warming: Arc::new(Mutex::new(None)),
486 rebuild_in_flight: Arc::new(AtomicBool::new(false)),
487 }
488 }
489
490 /// v0.7.0 (R3-S1) — wire the eviction sink.
491 ///
492 /// The daemon calls this once at startup with the send-half of an
493 /// mpsc channel; a hook-aware observer task drains the recv-half
494 /// off the hot path and fires the `on_index_eviction` chain
495 /// (`fire_on_index_eviction` in `src/hooks/chain.rs`). Replacing
496 /// an existing sink is allowed — useful when the daemon
497 /// reconfigures the hook chain at runtime — and drops the prior
498 /// sender, which terminates the prior observer cleanly.
499 ///
500 /// Build-time / CLI / test builds that never wire a sink retain
501 /// the `None` default; the eviction path's `try_send` then
502 /// becomes a no-op short-circuit so there is no measurable cost
503 /// to leaving the sink unset.
504 pub fn set_eviction_sink(&self, sink: Sender<EvictionEvent>) {
505 if let Ok(mut guard) = self.eviction_sink.lock() {
506 *guard = Some(sink);
507 }
508 }
509
510 fn build_hnsw(entries: &[(String, Vec<f32>)]) -> Option<HnswMap<EmbeddingPoint, String>> {
511 if entries.is_empty() {
512 return None;
513 }
514 let points: Vec<EmbeddingPoint> = entries
515 .iter()
516 .map(|(_, emb)| EmbeddingPoint(emb.clone()))
517 .collect();
518 let values: Vec<String> = entries.iter().map(|(id, _)| id.clone()).collect();
519 Some(Builder::default().build(points, values))
520 }
521
522 /// Add a new entry to the index (goes to overflow until next rebuild).
523 pub fn insert(&self, id: String, embedding: Vec<f32>) {
524 // #968 — opportunistically swap any warmed graph BEFORE taking the
525 // write path. This lets the auto-rebuild scheduled by a previous
526 // insert land cleanly even if no search call has run between
527 // inserts. Cheap: the warming-mutex contention is microseconds.
528 self.try_swap_warming();
529
530 // #968 — capture the snapshot for a potential auto-rebuild OUTSIDE
531 // the inner lock so the build thread can be spawned without
532 // holding the writers' mutex.
533 let snapshot_for_rebuild: Option<RebuildSnapshot> = {
534 let mut state = match self.inner.lock() {
535 Ok(s) => s,
536 Err(poisoned) => poisoned.into_inner(),
537 };
538 state.all_entries.push((id.clone(), embedding.clone()));
539 state.overflow.push((id, embedding));
540 // v0.7.0 #1087 — invalidate cached valid_ids set; rebuilt
541 // lazily on the next search.
542 state.valid_ids_cache = None;
543
544 // #968 — async auto-rebuild: when overflow crosses the
545 // threshold, snapshot the entries and let the caller (below,
546 // outside the lock) spawn the background build. We do NOT
547 // build the graph synchronously here anymore; that was the
548 // multi-second request-thread block #968 fixes. The
549 // `rebuild_in_flight` CAS prevents the same `insert` call
550 // from racing a previously-scheduled rebuild — only one
551 // background build runs at a time; the next snapshot is
552 // captured after the current build's swap lands.
553 if state.overflow.len() >= REBUILD_THRESHOLD
554 && self
555 .rebuild_in_flight
556 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
557 .is_ok()
558 {
559 Some(RebuildSnapshot {
560 entries: state.all_entries.clone(),
561 overflow_at_snapshot: state.overflow.len(),
562 overflow_generation: state.overflow_generation,
563 })
564 } else {
565 None
566 }
567 };
568 if let Some(snap) = snapshot_for_rebuild {
569 // Spawn-and-forget. The handle is consumed inside the thread
570 // via `RebuildGuard` so it doesn't dangle. Callers that want
571 // a handle should use [`Self::rebuild_async`].
572 let _ = self.spawn_rebuild(snap);
573 }
574
575 // Evict oldest entries if over capacity
576 let mut state = match self.inner.lock() {
577 Ok(s) => s,
578 Err(poisoned) => poisoned.into_inner(),
579 };
580 let max_entries = state.max_entries;
581 if state.all_entries.len() > max_entries {
582 let excess = state.all_entries.len() - max_entries;
583 // M8 (v0.7.0 round-2) — emit ONE summary WARN per eviction
584 // event so the operator sees the batch drop in the daemon
585 // log without scrolling past N per-id lines first. The
586 // per-id WARNs (below) still fire for post-mortem
587 // attribution; this one is the high-level "the index
588 // dropped N oldest embeddings" signal operators alert on.
589 tracing::warn!(
590 target: EVICTION_TRACE_TARGET,
591 dropped = excess,
592 max_entries = max_entries,
593 "HNSW eviction: dropped {} oldest embeddings to make room",
594 excess,
595 );
596 // v0.7.0 (R3-S1) — fire the `on_index_eviction` hook event
597 // for each evicted id BEFORE we drop the rows. The sink
598 // is a non-blocking `try_send` (see below); a downstream
599 // hook-aware observer drains the channel off the hot path
600 // and invokes `crate::hooks::fire_on_index_eviction` per
601 // event. This closes the G2/G8 "fire site exists but not
602 // wired" gap that the prior `tracing::warn!`-only
603 // implementation left open.
604 //
605 // The sink push happens INSIDE the inner-state lock — the
606 // channel is unbounded so `try_send`-equivalent `send`
607 // never blocks (unbounded mpsc has no backpressure). The
608 // sink lock is independent of the inner lock so there is
609 // no ordering hazard.
610 //
611 // The hook subscriber (if any) is responsible for its own
612 // logging; the warn-level tracing event is preserved here
613 // as a no-op-when-no-subscriber fallback so operators
614 // without hooks configured still see eviction pressure in
615 // daemon logs, matching the v0.6.3.1 observability contract.
616 let sink_guard = self.eviction_sink.lock().ok();
617 for (evicted_id, _) in state.all_entries.iter().take(excess) {
618 tracing::warn!(
619 target: EVICTION_TRACE_TARGET,
620 evicted_id = %evicted_id,
621 reason = "max_entries_reached",
622 max_entries = max_entries,
623 "hnsw index evicting oldest entry: cap reached"
624 );
625 if let Some(sink) = sink_guard.as_ref().and_then(|g| g.as_ref()) {
626 // mpsc::Sender::send is non-blocking on an unbounded
627 // channel (it only blocks on bounded). Errors mean the
628 // receiver dropped — observability is best-effort, no
629 // recovery action needed.
630 let payload = EvictionEvent::new(
631 evicted_id.clone(),
632 String::new(), // namespace not in scope at hnsw layer
633 "max_entries_reached",
634 );
635 let _ = sink.send(payload);
636 }
637 }
638 drop(sink_guard);
639 #[allow(clippy::cast_possible_truncation)]
640 let evicted = excess as u64;
641 // pm-v3.1 PR8 (issue #1174): counter sink moved to the
642 // metrics registry. We defer the actual `record_hnsw_eviction`
643 // call until `now_nanos_u64` is computed below so the
644 // counter and last-eviction timestamp move in lockstep.
645 let evicted_count_to_record = evicted;
646
647 state.all_entries.drain(..excess);
648 // v0.7.0 #1087 — invalidate cached valid_ids set after the
649 // eviction drain.
650 state.valid_ids_cache = None;
651 // #968 — defer the post-eviction graph rebuild to the async
652 // path. Correctness is preserved by the `valid_ids` filter
653 // in `search()` — evicted IDs are scrubbed from results
654 // immediately, even though the underlying HNSW graph still
655 // contains them until the next swap. Clearing `overflow`
656 // here was the v0.6 behavior tied to the synchronous
657 // rebuild; we preserve it so the linear-scan path doesn't
658 // re-surface evicted IDs. The next `insert()` past
659 // `REBUILD_THRESHOLD` (or an explicit `rebuild_async()`
660 // call) schedules the actual graph rebuild off-thread.
661 state.overflow.clear();
662 // v0.7.0 #1074 (SR-2 #2, HIGH) — bump the generation
663 // counter on every overflow.clear(). Any in-flight rebuild
664 // snapshot captured BEFORE this bump now carries a stale
665 // generation; the swap path detects the mismatch and
666 // drops the warming result without draining overflow,
667 // preventing the lose-an-insert race where an entry
668 // landed between snapshot and clear() was incorrectly
669 // drained by the eventual swap.
670 state.overflow_generation = state.overflow_generation.wrapping_add(1);
671 // Schedule the rebuild via the async path so the eviction
672 // edge no longer blocks the writer for the multi-second
673 // `build_hnsw` cost at 100k cap. The CAS skips if a
674 // previously-scheduled rebuild is still in flight; the
675 // next insert past threshold picks up the post-eviction
676 // state.
677 if self
678 .rebuild_in_flight
679 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
680 .is_ok()
681 {
682 let snap = RebuildSnapshot {
683 entries: state.all_entries.clone(),
684 // overflow was just cleared above so the snapshot
685 // captures an empty overflow window — anything
686 // inserted post-eviction will be a fresh suffix.
687 overflow_at_snapshot: state.overflow.len(),
688 overflow_generation: state.overflow_generation,
689 };
690 // Release the inner lock before spawning so the
691 // background thread can take it on swap. The
692 // observability path below only reads counters /
693 // statics, not `state`, so we do not need to
694 // re-acquire.
695 drop(state);
696 let _ = self.spawn_rebuild(snap);
697 }
698
699 // Record completion time AFTER the rebuild. `evicted_recently` is
700 // a "did we evict in the trailing N seconds" check; an operator
701 // asking that wants the operation completion time, not the
702 // start. At v0.6 the in-line `build_hnsw` dominated wall time
703 // here (~minutes at 100k entries) — using the start would
704 // make evicted_recently misreport even immediately after
705 // insert returns. Post-#968 the build runs off-thread so
706 // the gap shrinks to microseconds, but the
707 // completion-time-after-rebuild semantics are preserved.
708 let now_nanos = std::time::SystemTime::now()
709 .duration_since(std::time::UNIX_EPOCH)
710 .map(|d| d.as_nanos())
711 .unwrap_or(0);
712 let now_nanos_u64 = u64::try_from(now_nanos).unwrap_or(u64::MAX);
713 // pm-v3.1 PR8 (issue #1174): single sink call covers both
714 // the cumulative counter and the last-eviction timestamp,
715 // mirroring both onto the Prometheus handles so `/metrics`
716 // scrapes see the eviction event without polling
717 // `memory_stats`.
718 crate::metrics::record_hnsw_eviction(evicted_count_to_record, now_nanos_u64);
719
720 // M8 (v0.7.0 round-2) — rolling-hour rate observer. Push
721 // a stamp on this eviction, then count stamps in the
722 // trailing hour. If the rate clears the M8 ceiling,
723 // escalate to ERROR so the dashboard pages the on-call.
724 let recent = record_eviction_and_count_recent(now_nanos_u64);
725 if recent > EVICTION_RATE_CEILING_PER_HOUR {
726 tracing::error!(
727 target: EVICTION_TRACE_TARGET,
728 rate_per_hour = recent,
729 ceiling = EVICTION_RATE_CEILING_PER_HOUR,
730 "HNSW eviction rate exceeded {}/hour — recall quality is degrading; \
731 increase vector_index_capacity or move to dedicated vector DB",
732 EVICTION_RATE_CEILING_PER_HOUR,
733 );
734 }
735 }
736 }
737
738 /// Remove an entry by ID (marks for exclusion; cleaned up on rebuild).
739 pub fn remove(&self, id: &str) {
740 let mut state = match self.inner.lock() {
741 Ok(s) => s,
742 Err(poisoned) => poisoned.into_inner(),
743 };
744 state.all_entries.retain(|(eid, _)| eid != id);
745 state.overflow.retain(|(eid, _)| eid != id);
746 // v0.7.0 #1087 — invalidate cached valid_ids set after remove.
747 state.valid_ids_cache = None;
748 // Note: the HNSW index itself is immutable — removed IDs are filtered
749 // from search results. A rebuild will fully remove them.
750 }
751
752 /// Search for the `k` nearest neighbors to the query embedding.
753 ///
754 /// Combines HNSW approximate search with linear scan of overflow entries.
755 /// Returns results sorted by ascending distance (closest first).
756 pub fn search(&self, query: &[f32], k: usize) -> Vec<VectorHit> {
757 // #968 — opportunistic swap-on-read. If a background rebuild has
758 // parked a warmed graph in the `warming` slot, swap it into
759 // `active` BEFORE we serve this search. The swap is a single
760 // `std::mem::swap` under the inner mutex held for microseconds;
761 // search itself never blocks on graph construction.
762 self.try_swap_warming();
763
764 let mut state = match self.inner.lock() {
765 Ok(s) => s,
766 Err(poisoned) => poisoned.into_inner(),
767 };
768 let query_point = EmbeddingPoint(query.to_vec());
769
770 let mut results: Vec<VectorHit> = Vec::with_capacity(k * 2);
771
772 // v0.7.0 #1087 — populate the cached valid_ids set on the
773 // first search after any mutation; reuse it across recalls.
774 // Pre-#1087 this set was rebuilt on EVERY recall (iterating
775 // up to 100k strings + a fresh HashSet allocation per call).
776 if state.valid_ids_cache.is_none() {
777 // PERF-7 — collect to HashSet<Arc<str>> instead of
778 // HashSet<String>. Arc<str> is a 16-byte fat pointer
779 // vs String's 24-byte (ptr/len/cap) struct; the
780 // backing bytes are shared with no spare capacity
781 // overhead. HashSet::contains accepts `&str` via the
782 // `Borrow<str>` impl on `Arc<str>`.
783 let set: std::collections::HashSet<std::sync::Arc<str>> = state
784 .all_entries
785 .iter()
786 .map(|(id, _)| std::sync::Arc::<str>::from(id.as_str()))
787 .collect();
788 state.valid_ids_cache = Some(set);
789 }
790 let valid_ids = state
791 .valid_ids_cache
792 .as_ref()
793 .expect("valid_ids_cache populated above");
794
795 // Search the HNSW index
796 if let Some(ref hnsw) = state.hnsw {
797 let mut search = Search::default();
798 for item in hnsw.search(&query_point, &mut search) {
799 if !valid_ids.contains(item.value.as_str()) {
800 continue; // Removed entry
801 }
802 results.push(VectorHit {
803 id: item.value.clone(),
804 distance: item.distance,
805 });
806 if results.len() >= k * 2 {
807 break;
808 }
809 }
810 }
811
812 // v0.7.0 #1087 — linear scan of overflow entries WITHOUT
813 // cloning the embedding vec. Pre-#1087 this constructed
814 // `EmbeddingPoint(emb.clone())` per overflow entry (~200 ×
815 // 1536 bytes = 300 KB of clone per search at the cap); the
816 // cosine-distance helper takes `&[f32]` so we inline against
817 // the stored slice instead.
818 let mut overflow_hits: Vec<VectorHit> = Vec::with_capacity(state.overflow.len());
819 for (id, emb) in &state.overflow {
820 overflow_hits.push(VectorHit {
821 id: id.clone(),
822 distance: cosine_distance(&query_point.0, emb),
823 });
824 }
825 overflow_hits.sort_by(|a, b| a.distance.partial_cmp(&b.distance).unwrap());
826
827 results.extend(overflow_hits);
828
829 // Deduplicate by ID (prefer lower distance)
830 let mut seen = std::collections::HashSet::new();
831 results.retain(|hit| seen.insert(hit.id.clone()));
832
833 // Sort by distance and truncate
834 results.sort_by(|a, b| a.distance.partial_cmp(&b.distance).unwrap());
835 results.truncate(k);
836 results
837 }
838
839 /// Return the total number of indexed entries (HNSW + overflow).
840 pub fn len(&self) -> usize {
841 let state = match self.inner.lock() {
842 Ok(s) => s,
843 Err(poisoned) => poisoned.into_inner(),
844 };
845 state.all_entries.len()
846 }
847
848 /// `true` when the index holds no live entries at all.
849 ///
850 /// #1579 QC — load-bearing for the proactive-conflict dispatch:
851 /// an EMPTY index is *vacuously* [`Self::is_fully_searchable`]
852 /// (`0 + 0 >= 0`), but during the async-boot LOAD phase — after
853 /// the daemon binds with `VectorIndex::empty()` and before the
854 /// boot loader's `seed_entries` lands (the `get_all_embeddings`
855 /// read is the long pole at 100k rows) — emptiness says nothing
856 /// about what the DB holds. Callers that would otherwise trust a
857 /// fully-searchable index (the #519 conflict check) must ALSO
858 /// require non-emptiness, so that window routes to the bounded
859 /// recency-scan fallback instead of consulting an index that
860 /// cannot return anything.
861 pub fn is_empty(&self) -> bool {
862 self.len() == 0
863 }
864
865 /// #968 — Force a full rebuild of the HNSW index from all entries,
866 /// SYNCHRONOUSLY. Preserved for tests + emergency paths; production
867 /// code should call [`Self::rebuild_async`] so the multi-second
868 /// graph build does not block the calling thread.
869 ///
870 /// Implementation: delegates to `rebuild_async` and `join`s the
871 /// resulting handle so callers retain the v0.6 semantics ("the
872 /// graph is rebuilt by the time this returns"). Tests rely on this
873 /// blocking behavior to assert post-rebuild invariants without
874 /// adding a yield/poll loop.
875 pub fn rebuild(&self) {
876 // #1037 (MEDIUM, 2026-05-21): defend the sync-rebuild contract
877 // against the `rebuild_async()` no-op-handle short-circuit.
878 // Pre-#1037 if a previous async rebuild was still in flight
879 // (`rebuild_in_flight==true`), `rebuild_async` returned a
880 // no-op `std::thread::spawn(|| {})` handle that joined
881 // instantly — `try_swap_warming()` then ran against a warming
882 // slot that the IN-FLIGHT build hadn't populated yet, so the
883 // sync contract ("graph is rebuilt by the time this returns")
884 // was silently violated. The caller observed the pre-rebuild
885 // state.
886 //
887 // Fix: after the initial `join()`, spin-wait on
888 // `rebuild_in_flight` for up to REBUILD_WAIT_TIMEOUT so the
889 // in-flight build has a bounded window to complete its
890 // warming-slot insert. Then run `try_swap_warming()`. If the
891 // in-flight build genuinely hangs (test-fixture corner case),
892 // surface that as a clean timeout rather than silently
893 // returning a stale graph.
894 let handle = self.rebuild_async();
895 let _ = handle.join();
896 // Bounded spin-wait for any concurrently-running rebuild to
897 // populate `warming`. Cheap CAS read; total budget is
898 // REBUILD_WAIT_TIMEOUT * REBUILD_WAIT_POLL_INTERVAL =
899 // ~10ms × 100 = 1 second worst-case (well under any sensible
900 // sync-rebuild expectation; production callers are async).
901 let start = std::time::Instant::now();
902 while self.rebuild_in_flight.load(Ordering::SeqCst)
903 && start.elapsed() < REBUILD_WAIT_TIMEOUT
904 {
905 std::thread::sleep(REBUILD_WAIT_POLL_INTERVAL);
906 }
907 self.try_swap_warming();
908 }
909
910 /// #968 — Schedule a full HNSW rebuild on a background thread and
911 /// return the [`JoinHandle`] for callers that want to observe
912 /// completion. The build does NOT hold the inner mutex; readers
913 /// and writers continue to operate against `active` + `overflow`
914 /// while the new graph warms up. On success, the warmed graph
915 /// lands in the `warming` slot and is swapped into `active` by
916 /// the next reader/writer (or by the foreground `rebuild` shim's
917 /// post-join `try_swap_warming` call).
918 ///
919 /// Concurrency contract:
920 /// - At most one rebuild runs at a time (gated by the
921 /// `rebuild_in_flight` atomic). A second `rebuild_async` call
922 /// while a build is in flight returns a no-op handle (the
923 /// spawned closure short-circuits if the CAS fails — the in-
924 /// flight build will pick up the latest entries via the next
925 /// trigger).
926 /// - Writes during the build flow into `overflow` and
927 /// `all_entries` normally. The swap path uses the snapshot
928 /// length captured at spawn time to trim only the overflow
929 /// entries that are now in the new graph; entries inserted
930 /// AFTER the snapshot remain in overflow for the next cycle.
931 /// - Search is unaffected: it reads `active` + `overflow` under
932 /// the inner mutex, both of which remain coherent throughout.
933 ///
934 /// Failure: a panic inside the build thread is observable via
935 /// `JoinHandle::join()`; `active` is unchanged. The
936 /// `rebuild_in_flight` flag is cleared by the `RebuildGuard`
937 /// drop-guard whether the build succeeded or panicked.
938 pub fn rebuild_async(&self) -> JoinHandle<()> {
939 // Snapshot under the inner lock so we capture a consistent
940 // entries list. Read-only; we do not mutate `all_entries`
941 // here. If a rebuild is already in flight, return a no-op
942 // handle (a thread that joins instantly).
943 let snapshot = {
944 let state = match self.inner.lock() {
945 Ok(s) => s,
946 Err(poisoned) => poisoned.into_inner(),
947 };
948 if self
949 .rebuild_in_flight
950 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
951 .is_err()
952 {
953 // Already running — return an instantly-completing
954 // handle. The caller's `join()` returns `Ok(())`.
955 return std::thread::spawn(|| {});
956 }
957 RebuildSnapshot {
958 entries: state.all_entries.clone(),
959 overflow_at_snapshot: state.overflow.len(),
960 overflow_generation: state.overflow_generation,
961 }
962 };
963 self.spawn_rebuild(snapshot)
964 }
965
966 /// #968 — internal: spawn the rebuild thread for a captured
967 /// snapshot. The caller is expected to have flipped
968 /// `rebuild_in_flight` to `true` already (via CAS). The drop-guard
969 /// inside the thread clears the flag whether the build succeeds
970 /// or panics.
971 fn spawn_rebuild(&self, snapshot: RebuildSnapshot) -> JoinHandle<()> {
972 let warming = Arc::clone(&self.warming);
973 let in_flight = Arc::clone(&self.rebuild_in_flight);
974 std::thread::spawn(move || {
975 // RAII clears `rebuild_in_flight` even on panic.
976 let _guard = RebuildGuard { flag: in_flight };
977 // CPU-bound graph build runs OUTSIDE the inner mutex.
978 // This is the load-bearing change for #968: readers and
979 // writers continue to make progress while this runs.
980 let hnsw = VectorIndex::build_hnsw(&snapshot.entries);
981 let result = RebuildResult {
982 hnsw,
983 overflow_at_snapshot: snapshot.overflow_at_snapshot,
984 overflow_generation: snapshot.overflow_generation,
985 entries_in_graph: snapshot.entries.len(),
986 };
987 // Park the result in the warming slot. The next caller
988 // through `try_swap_warming` will move it into `active`.
989 // Holding the warming mutex here is microseconds.
990 if let Ok(mut slot) = warming.lock() {
991 // Overwrite any older warmed result that was never
992 // swapped (e.g. two rebuilds completed before any
993 // reader ran). The newer build is by definition a
994 // superset of the older one's entries, so dropping
995 // the older result is correct.
996 *slot = Some(result);
997 }
998 })
999 }
1000
1001 /// #968 — Swap the warming slot into active if a warmed graph is
1002 /// ready. Called opportunistically from `search`, `insert`, and
1003 /// the post-join path of the sync `rebuild` shim. The swap holds
1004 /// the inner mutex for microseconds — just long enough to
1005 /// `std::mem::replace` the graph and trim the overflow.
1006 ///
1007 /// Returns `true` if a swap occurred, `false` otherwise. Test
1008 /// code uses the return value to verify the swap landed before
1009 /// asserting post-rebuild state.
1010 pub fn try_swap_warming(&self) -> bool {
1011 // Pop the warmed result FIRST so we hold the warming mutex
1012 // only long enough to take ownership. We then re-acquire the
1013 // inner mutex to swap it in. The two-mutex sequence is safe
1014 // (no ordering hazard with any other path that takes both).
1015 let Some(result) = self.warming.lock().ok().and_then(|mut g| g.take()) else {
1016 return false;
1017 };
1018 let mut state = match self.inner.lock() {
1019 Ok(s) => s,
1020 Err(poisoned) => poisoned.into_inner(),
1021 };
1022 // v0.7.0 #1074 (SR-2 #2, HIGH) — generation check. If the
1023 // overflow generation has bumped since the rebuild captured
1024 // its snapshot, the warming graph was built from pre-eviction
1025 // all_entries and the current overflow contains post-eviction
1026 // inserts that are NOT in that graph. Drop the warming result
1027 // entirely without swapping (the next rebuild captures the
1028 // current state cleanly). Pre-#1074 the swap would have
1029 // overwritten the live graph with a stale one AND drained
1030 // the post-eviction inserts from overflow — silently losing
1031 // them until the next rebuild.
1032 if result.overflow_generation != state.overflow_generation {
1033 tracing::warn!(
1034 target: "hnsw.rebuild",
1035 snapshot_gen = result.overflow_generation,
1036 current_gen = state.overflow_generation,
1037 "dropping stale warming result (eviction occurred mid-rebuild, #1074)"
1038 );
1039 return false;
1040 }
1041 state.hnsw = result.hnsw;
1042 // #1579 — coverage accounting for `is_fully_searchable`.
1043 state.graph_entry_count = result.entries_in_graph;
1044 // Trim overflow: the first `overflow_at_snapshot` entries are
1045 // now in the graph; entries inserted AFTER the snapshot
1046 // remain. Defensive `min` in case `remove` or eviction
1047 // shortened overflow while the build was running.
1048 let to_drain = result.overflow_at_snapshot.min(state.overflow.len());
1049 state.overflow.drain(..to_drain);
1050 true
1051 }
1052
1053 /// #1579 — `true` when a search against this index can observe
1054 /// every live entry: the active graph (its build-time snapshot
1055 /// length) plus the linearly-scanned `overflow` cover
1056 /// `all_entries`. `false` exactly during the async-boot warm
1057 /// window, when [`Self::seed_entries`] has parked DB-loaded
1058 /// entries in `all_entries` but the background graph build has
1059 /// not swapped in yet — sequenced writes flow through `insert()`
1060 /// (graph- or overflow-visible) so they never break coverage, and
1061 /// removals/evictions only shrink `all_entries` (stale graph ids
1062 /// are filtered at search time), so the inequality is conservative
1063 /// in the safe direction.
1064 ///
1065 /// Consumers: the #519 proactive conflict check routes to its
1066 /// bounded-scan fallback while this is `false`; the boot loader
1067 /// uses it to decide whether a make-up rebuild is needed after a
1068 /// racing routine rebuild swallowed its CAS.
1069 pub fn is_fully_searchable(&self) -> bool {
1070 // Opportunistically land any warmed graph first so a caller
1071 // probing right after a rebuild finished sees the swapped
1072 // state (mirrors `search`/`insert`).
1073 self.try_swap_warming();
1074 let state = match self.inner.lock() {
1075 Ok(s) => s,
1076 Err(poisoned) => poisoned.into_inner(),
1077 };
1078 state.graph_entry_count + state.overflow.len() >= state.all_entries.len()
1079 }
1080
1081 /// #1579 B3 — bulk-load DB-resident entries into the index
1082 /// WITHOUT building the graph (the async-boot path). Entries land
1083 /// in `all_entries` only; they become searchable when the
1084 /// follow-up rebuild (see [`Self::seed_and_rebuild_async`]) swaps
1085 /// its graph in. Ids already present (e.g. a row written through
1086 /// `insert()` between the caller's DB snapshot and this call) are
1087 /// skipped so the index never double-counts. Returns the number of
1088 /// entries actually seeded.
1089 ///
1090 /// Deliberately does NOT enforce `max_entries` eviction here —
1091 /// the legacy synchronous boot path (`VectorIndex::build` over
1092 /// `get_all_embeddings`) never evicted at boot either, and the
1093 /// first post-boot `insert()` applies the cap exactly as before.
1094 pub fn seed_entries(&self, entries: Vec<(String, Vec<f32>)>) -> usize {
1095 let mut state = match self.inner.lock() {
1096 Ok(s) => s,
1097 Err(poisoned) => poisoned.into_inner(),
1098 };
1099 let existing: std::collections::HashSet<std::sync::Arc<str>> = state
1100 .all_entries
1101 .iter()
1102 .map(|(id, _)| std::sync::Arc::<str>::from(id.as_str()))
1103 .collect();
1104 let mut seeded = 0usize;
1105 for (id, emb) in entries {
1106 if existing.contains(id.as_str()) {
1107 continue;
1108 }
1109 state.all_entries.push((id, emb));
1110 seeded += 1;
1111 }
1112 if seeded > 0 {
1113 state.valid_ids_cache = None;
1114 }
1115 seeded
1116 }
1117
1118 /// #1579 B3 — async-boot warm-up: seed DB-loaded entries (see
1119 /// [`Self::seed_entries`]) and schedule the graph build on the
1120 /// existing #968 double-buffer rebuild machinery. Returns the
1121 /// rebuild thread's [`JoinHandle`]; the caller (the boot loader)
1122 /// joins it off the request path and then calls
1123 /// [`Self::try_swap_warming`] + emits the operator-visible
1124 /// "index warm" line.
1125 ///
1126 /// If a routine rebuild is already in flight (its snapshot
1127 /// predates the seed), `rebuild_async` returns a no-op handle and
1128 /// the seeded entries stay graph-invisible until a later rebuild;
1129 /// the boot loader detects that via [`Self::is_fully_searchable`]
1130 /// and issues a make-up `rebuild`.
1131 pub fn seed_and_rebuild_async(&self, entries: Vec<(String, Vec<f32>)>) -> JoinHandle<()> {
1132 self.seed_entries(entries);
1133 self.rebuild_async()
1134 }
1135
1136 /// #1579 B3 — blocking boot warm-up for callers that hold the
1137 /// index directly (the MCP stdio boot thread; tests). Seeds the
1138 /// DB-loaded entries and drives rebuild→swap to completion,
1139 /// returning the number of entries seeded. Each step takes the
1140 /// inner mutex only briefly (the graph build itself runs on the
1141 /// #968 background thread against a snapshot), so concurrent
1142 /// readers/writers on other threads keep making progress — the
1143 /// CALLING thread is the only one parked.
1144 ///
1145 /// The retry loop covers the rebuild-CAS race: if a routine
1146 /// 200-overflow rebuild was already in flight when our seed
1147 /// landed, `rebuild_async` short-circuits to a no-op handle and
1148 /// the in-flight build's pre-seed snapshot cannot cover the
1149 /// seeded rows — `is_fully_searchable` stays `false` and the loop
1150 /// schedules a make-up rebuild once the CAS frees up.
1151 pub fn warm_boot(&self, entries: Vec<(String, Vec<f32>)>) -> usize {
1152 let seeded = self.seed_entries(entries);
1153 loop {
1154 let handle = self.rebuild_async();
1155 let _ = handle.join();
1156 // `is_fully_searchable` opportunistically swaps any warmed
1157 // graph before evaluating coverage.
1158 if self.is_fully_searchable() {
1159 return seeded;
1160 }
1161 std::thread::sleep(REBUILD_WAIT_POLL_INTERVAL);
1162 }
1163 }
1164}
1165
1166#[cfg(test)]
1167mod tests {
1168 use super::*;
1169
1170 fn make_embedding(values: &[f32]) -> Vec<f32> {
1171 // L2-normalize
1172 let norm: f32 = values.iter().map(|v| v * v).sum::<f32>().sqrt();
1173 values.iter().map(|v| v / norm).collect()
1174 }
1175
1176 #[test]
1177 fn empty_index_returns_empty() {
1178 let idx = VectorIndex::empty();
1179 let results = idx.search(&[1.0, 0.0, 0.0], 10);
1180 assert!(results.is_empty());
1181 }
1182
1183 #[test]
1184 fn perf_7_valid_ids_cache_is_arc_str_typed() {
1185 // PERF-7 (FX-C4-batch2, 2026-05-26) — the valid_ids cache
1186 // must be HashSet<Arc<str>>, not HashSet<String>. The
1187 // discriminator is the cache field's type, which we exercise
1188 // by populating the index, searching once to materialise the
1189 // cache, then reaching into the locked state to verify the
1190 // cache element shape via downcast on a sample entry.
1191 let entries = vec![
1192 ("id1".to_string(), make_embedding(&[1.0, 0.0, 0.0])),
1193 ("id2".to_string(), make_embedding(&[0.0, 1.0, 0.0])),
1194 ];
1195 let idx = VectorIndex::build(entries);
1196 // Trigger one search so the cache populates.
1197 let _ = idx.search(&[1.0, 0.0, 0.0], 2);
1198 let state = idx.inner.lock().expect("lock state");
1199 let cache = state
1200 .valid_ids_cache
1201 .as_ref()
1202 .expect("valid_ids_cache populated after search");
1203 assert!(
1204 cache.contains("id1") && cache.contains("id2"),
1205 "PERF-7: Arc<str> cache failed to admit &str lookup",
1206 );
1207 // Type system pins the field type — if a future refactor
1208 // changes the type back to HashSet<String>, this `Arc::clone`
1209 // line fails to compile.
1210 let sample: Option<&std::sync::Arc<str>> = cache.iter().next();
1211 assert!(sample.is_some(), "PERF-7: cache must hold Arc<str> entries");
1212 }
1213
1214 #[test]
1215 fn basic_search() {
1216 let entries = vec![
1217 ("a".into(), make_embedding(&[1.0, 0.0, 0.0])),
1218 ("b".into(), make_embedding(&[0.0, 1.0, 0.0])),
1219 ("c".into(), make_embedding(&[0.0, 0.0, 1.0])),
1220 ];
1221 let idx = VectorIndex::build(entries);
1222 let results = idx.search(&make_embedding(&[1.0, 0.1, 0.0]), 2);
1223 assert_eq!(results.len(), 2);
1224 assert_eq!(results[0].id, "a"); // Closest to [1, 0.1, 0]
1225 }
1226
1227 #[test]
1228 fn insert_and_search_overflow() {
1229 let entries = vec![("a".into(), make_embedding(&[1.0, 0.0, 0.0]))];
1230 let idx = VectorIndex::build(entries);
1231 idx.insert("b".into(), make_embedding(&[0.9, 0.1, 0.0]));
1232 let results = idx.search(&make_embedding(&[1.0, 0.0, 0.0]), 2);
1233 assert_eq!(results.len(), 2);
1234 assert_eq!(results[0].id, "a");
1235 assert_eq!(results[1].id, "b");
1236 }
1237
1238 #[test]
1239 fn remove_excludes_from_results() {
1240 let entries = vec![
1241 ("a".into(), make_embedding(&[1.0, 0.0, 0.0])),
1242 ("b".into(), make_embedding(&[0.9, 0.1, 0.0])),
1243 ];
1244 let idx = VectorIndex::build(entries);
1245 idx.remove("a");
1246 let results = idx.search(&make_embedding(&[1.0, 0.0, 0.0]), 5);
1247 assert!(results.iter().all(|h| h.id != "a"));
1248 }
1249
1250 // -----------------------------------------------------------------
1251 // W11/S11b — rebuild + batched-insert hardening
1252 // -----------------------------------------------------------------
1253
1254 #[test]
1255 fn test_rebuild_preserves_all_entries() {
1256 // Build a small but non-trivial set of orthonormal-ish vectors,
1257 // rebuild the index, and confirm every id is still findable via
1258 // search with a top-k that covers them all.
1259 let raw: Vec<(String, Vec<f32>)> = (0..12)
1260 .map(|i| {
1261 let mut v = vec![0.0_f32; 16];
1262 #[allow(clippy::cast_precision_loss)]
1263 let f = i as f32;
1264 v[i % 16] = 1.0 + f * 0.01; // bias to make L2 norm non-trivial
1265 (format!("id-{i}"), make_embedding(&v))
1266 })
1267 .collect();
1268
1269 let idx = VectorIndex::build(raw.clone());
1270 idx.rebuild();
1271 assert_eq!(idx.len(), raw.len());
1272
1273 // Every id should appear when we ask for top-N where N >= count.
1274 let query = make_embedding(&[1.0; 16]);
1275 let hits = idx.search(&query, raw.len() * 2);
1276 let found: std::collections::HashSet<String> = hits.into_iter().map(|h| h.id).collect();
1277 for (id, _) in &raw {
1278 assert!(
1279 found.contains(id),
1280 "rebuild must preserve id {id}, found: {:?}",
1281 found
1282 );
1283 }
1284 }
1285
1286 #[test]
1287 fn test_remove_then_search_excludes_id() {
1288 let entries = vec![
1289 ("alpha".into(), make_embedding(&[1.0, 0.0, 0.0, 0.0])),
1290 ("beta".into(), make_embedding(&[0.9, 0.1, 0.0, 0.0])),
1291 ("gamma".into(), make_embedding(&[0.8, 0.2, 0.0, 0.0])),
1292 ];
1293 let idx = VectorIndex::build(entries);
1294 // Pre-remove: alpha should be the closest to (1,0,0,0).
1295 let pre = idx.search(&make_embedding(&[1.0, 0.0, 0.0, 0.0]), 5);
1296 assert!(pre.iter().any(|h| h.id == "alpha"));
1297
1298 idx.remove("alpha");
1299 // Post-remove: alpha must not appear regardless of k.
1300 for k in 1..=10 {
1301 let hits = idx.search(&make_embedding(&[1.0, 0.0, 0.0, 0.0]), k);
1302 assert!(
1303 hits.iter().all(|h| h.id != "alpha"),
1304 "removed id `alpha` resurfaced with k={k}: {:?}",
1305 hits.iter().map(|h| &h.id).collect::<Vec<_>>()
1306 );
1307 }
1308
1309 // Other entries still findable.
1310 let hits = idx.search(&make_embedding(&[1.0, 0.0, 0.0, 0.0]), 5);
1311 let ids: Vec<&str> = hits.iter().map(|h| h.id.as_str()).collect();
1312 assert!(ids.contains(&"beta"));
1313 assert!(ids.contains(&"gamma"));
1314 }
1315
1316 // -----------------------------------------------------------------
1317 // W12-H — small edge cases
1318 // -----------------------------------------------------------------
1319
1320 #[test]
1321 fn empty_index_len_is_zero() {
1322 let idx = VectorIndex::empty();
1323 assert_eq!(idx.len(), 0);
1324 }
1325
1326 #[test]
1327 fn build_with_empty_entries_search_empty() {
1328 let idx = VectorIndex::build(Vec::new());
1329 assert_eq!(idx.len(), 0);
1330 let results = idx.search(&[1.0, 0.0, 0.0], 5);
1331 assert!(results.is_empty());
1332 }
1333
1334 #[test]
1335 fn search_with_k_zero_returns_empty() {
1336 let entries = vec![("a".into(), make_embedding(&[1.0, 0.0, 0.0]))];
1337 let idx = VectorIndex::build(entries);
1338 let results = idx.search(&make_embedding(&[1.0, 0.0, 0.0]), 0);
1339 assert!(results.is_empty());
1340 }
1341
1342 #[test]
1343 fn rebuild_on_empty_does_not_crash() {
1344 let idx = VectorIndex::empty();
1345 idx.rebuild();
1346 assert_eq!(idx.len(), 0);
1347 }
1348
1349 #[test]
1350 fn insert_increases_len() {
1351 let idx = VectorIndex::empty();
1352 idx.insert("a".into(), make_embedding(&[1.0, 0.0, 0.0]));
1353 idx.insert("b".into(), make_embedding(&[0.0, 1.0, 0.0]));
1354 assert_eq!(idx.len(), 2);
1355 }
1356
1357 #[test]
1358 fn embedding_point_distance_orthogonal() {
1359 let a = EmbeddingPoint(vec![1.0, 0.0, 0.0]);
1360 let b = EmbeddingPoint(vec![0.0, 1.0, 0.0]);
1361 // 1 - dot = 1 - 0 = 1
1362 assert!((a.distance(&b) - 1.0).abs() < 1e-6);
1363 }
1364
1365 #[test]
1366 fn embedding_point_distance_identical_is_zero() {
1367 let a = EmbeddingPoint(make_embedding(&[1.0, 1.0, 1.0]));
1368 // 1 - 1 = 0 (L2-normalised)
1369 assert!(a.distance(&a).abs() < 1e-6);
1370 }
1371
1372 #[test]
1373 fn remove_on_empty_index_is_noop() {
1374 let idx = VectorIndex::empty();
1375 idx.remove("nonexistent");
1376 assert_eq!(idx.len(), 0);
1377 }
1378
1379 #[test]
1380 fn insert_triggers_auto_rebuild_at_threshold() {
1381 // REBUILD_THRESHOLD = 200. Inserting that many into a fresh index
1382 // exercises the auto-rebuild branch in `insert`.
1383 let idx = VectorIndex::empty();
1384 for i in 0..205_usize {
1385 let mut v = vec![0.0_f32; 8];
1386 #[allow(clippy::cast_precision_loss)]
1387 let f = i as f32;
1388 v[i % 8] = 1.0 + f * 0.001;
1389 idx.insert(format!("id-{i}"), make_embedding(&v));
1390 }
1391 assert_eq!(idx.len(), 205);
1392 // After auto-rebuild, search still works — top-k returns hits.
1393 let q = make_embedding(&[1.0_f32; 8]);
1394 let hits = idx.search(&q, 5);
1395 assert_eq!(hits.len(), 5);
1396 }
1397
1398 #[test]
1399 fn test_rebuild_after_batch_insert_settles() {
1400 // Start empty, batch-insert N entries, force a rebuild, then assert
1401 // that top-K search returns exactly K results (deterministic count
1402 // for a fully-populated index with K <= len).
1403 let idx = VectorIndex::empty();
1404 let n = 25_usize;
1405 for i in 0..n {
1406 let mut v = vec![0.0_f32; 8];
1407 #[allow(clippy::cast_precision_loss)]
1408 let f = i as f32;
1409 v[i % 8] = 1.0 + f * 0.001;
1410 idx.insert(format!("id-{i}"), make_embedding(&v));
1411 }
1412 // Force a rebuild — overflow may not have hit REBUILD_THRESHOLD.
1413 idx.rebuild();
1414 assert_eq!(idx.len(), n);
1415
1416 let query = make_embedding(&[1.0; 8]);
1417 let k = 5;
1418 let hits = idx.search(&query, k);
1419 assert_eq!(
1420 hits.len(),
1421 k,
1422 "post-rebuild search top-{k} must return exactly {k} hits, got {:?}",
1423 hits.iter().map(|h| &h.id).collect::<Vec<_>>()
1424 );
1425
1426 // Distances should be sorted ascending (closest first).
1427 for w in hits.windows(2) {
1428 assert!(
1429 w[0].distance <= w[1].distance,
1430 "search results must be ascending by distance: {} > {}",
1431 w[0].distance,
1432 w[1].distance
1433 );
1434 }
1435
1436 // No duplicate ids in the result.
1437 let mut seen = std::collections::HashSet::new();
1438 for h in &hits {
1439 assert!(
1440 seen.insert(h.id.clone()),
1441 "duplicate id in search: {}",
1442 h.id
1443 );
1444 }
1445 }
1446
1447 // -----------------------------------------------------------------
1448 // v0.7.0 R3-S1 — eviction sink wires the on_index_eviction hook
1449 // -----------------------------------------------------------------
1450
1451 /// `test_hnsw_eviction_fires_hook` — when a sink is wired via
1452 /// [`VectorIndex::set_eviction_sink`] and the index inserts past
1453 /// its eviction cap, the eviction-edge code path pushes one
1454 /// [`EvictionEvent`] per evicted id onto the channel. This closes
1455 /// the G2/G8 "fire site exists but not wired" gap. We construct
1456 /// the index via [`VectorIndex::with_max_entries_for_test`] so a
1457 /// 6-entry insert sequence trips the eviction path in
1458 /// milliseconds without touching the production 100k cap.
1459 #[test]
1460 fn test_hnsw_eviction_fires_hook() {
1461 let (tx, rx) = std::sync::mpsc::channel::<EvictionEvent>();
1462 let idx = VectorIndex::with_max_entries_for_test(4);
1463 idx.set_eviction_sink(tx);
1464
1465 // Reset the process-local counters so concurrent tests
1466 // sharing the static don't bleed assertions into ours.
1467 reset_eviction_counters_for_test();
1468
1469 // Insert cap+2 entries — eviction drops the 2 oldest.
1470 let n = 6_usize;
1471 for i in 0..n {
1472 let mut v = vec![0.0_f32; 4];
1473 #[allow(clippy::cast_precision_loss)]
1474 let f = i as f32;
1475 v[i % 4] = 1.0 + f * 0.01;
1476 idx.insert(format!("evict-{i}"), make_embedding(&v));
1477 }
1478
1479 // Drain the channel. Expect TWO events (n=6, cap=4) — one
1480 // per evicted id. The unbounded sender does not block; the
1481 // events should already be enqueued by the time `insert`
1482 // returns, but we give the channel a small grace window for
1483 // thread-scheduling jitter on slow CI runners.
1484 let mut received: Vec<EvictionEvent> = Vec::new();
1485 let deadline = std::time::Instant::now() + std::time::Duration::from_millis(500);
1486 while std::time::Instant::now() < deadline && received.len() < 2 {
1487 while let Ok(ev) = rx.try_recv() {
1488 received.push(ev);
1489 }
1490 if received.len() < 2 {
1491 std::thread::sleep(std::time::Duration::from_millis(5));
1492 }
1493 }
1494
1495 assert_eq!(
1496 received.len(),
1497 2,
1498 "expected one EvictionEvent per evicted id (2 evictions for n=6, cap=4), got {}: {:?}",
1499 received.len(),
1500 received.iter().map(|e| &e.memory_id).collect::<Vec<_>>(),
1501 );
1502
1503 let ids: Vec<&str> = received.iter().map(|e| e.memory_id.as_str()).collect();
1504 assert!(
1505 ids.contains(&"evict-0"),
1506 "expected evict-0 in evicted ids; got {ids:?}"
1507 );
1508 assert!(
1509 ids.contains(&"evict-1"),
1510 "expected evict-1 in evicted ids; got {ids:?}"
1511 );
1512
1513 for ev in &received {
1514 assert_eq!(
1515 ev.reason, "max_entries_reached",
1516 "evicted reason should match the canonical tag, got {:?}",
1517 ev.reason
1518 );
1519 // namespace is intentionally empty at the hnsw layer
1520 // (the index does not carry namespace context); G9+ may
1521 // plumb it through. The wire field MUST be present even
1522 // when empty.
1523 assert_eq!(ev.namespace, "");
1524 assert!(
1525 !ev.evicted_at.is_empty(),
1526 "evicted_at must be set (rfc3339), got empty"
1527 );
1528 }
1529 }
1530
1531 /// Sanity: insertion without a sink wired is a no-op for the
1532 /// hook path. The eviction-edge code path must remain functional
1533 /// (oldest drained, cap enforced) even when no sink is set, so
1534 /// the CLI / test build's zero-cost posture is preserved.
1535 ///
1536 /// The proof of eviction is the PER-INDEX `len()` — after 6
1537 /// inserts into a max-4 index, exactly 4 entries survive, which
1538 /// is only possible if the eviction-edge path drained the 2
1539 /// oldest. We deliberately do NOT read the process-global
1540 /// `index_evictions_total()` counter here: a sibling test
1541 /// (`test_hnsw_eviction_fires_hook`) calls
1542 /// `reset_eviction_counters_for_test()`, which zeroes that
1543 /// shared static, so a concurrent multi-threaded test run could
1544 /// collapse a global-delta assertion to a flake. `idx.len()` is
1545 /// isolated to this index instance and deterministic.
1546 #[test]
1547 fn test_hnsw_eviction_without_sink_is_noop_for_hook() {
1548 let idx = VectorIndex::with_max_entries_for_test(4);
1549 // No `set_eviction_sink` call here — the index runs as in
1550 // CLI / pre-R3-S1 builds without a hooks pipeline.
1551
1552 for i in 0..6_usize {
1553 let mut v = vec![0.0_f32; 4];
1554 #[allow(clippy::cast_precision_loss)]
1555 let f = i as f32;
1556 v[i % 4] = 1.0 + f * 0.01;
1557 idx.insert(format!("noopsink-{i}"), make_embedding(&v));
1558 }
1559
1560 assert_eq!(
1561 idx.len(),
1562 4,
1563 "eviction-edge path must still enforce the max-4 cap even \
1564 without a sink wired (6 inserts → 4 survivors means 2 \
1565 evictions occurred); got len={}",
1566 idx.len()
1567 );
1568 }
1569}
1570
1571// ---------------------------------------------------------------------------
1572// #968 (Wave-2 Tier-C3) — async-rebuild + double-buffering regression tests
1573//
1574// These tests pin the contract introduced by issue #968:
1575// 1. A rebuild scheduled via `rebuild_async` does NOT block readers.
1576// Search calls dispatched concurrently with the build complete in
1577// <100 ms even when the build itself runs for seconds.
1578// 2. A build-time panic leaves `active` untouched so reads continue
1579// to serve the prior snapshot.
1580// 3. Writes that land DURING a rebuild are preserved: post-rebuild
1581// state includes the snapshot's entries PLUS the concurrent inserts.
1582// 4. The swap is atomic: no caller ever observes a partial-state graph
1583// (e.g. one with half the entries missing).
1584// ---------------------------------------------------------------------------
1585#[cfg(test)]
1586mod d1_968_tests {
1587 use super::*;
1588 use std::sync::Arc as TArc;
1589 use std::sync::atomic::AtomicUsize;
1590 use std::time::{Duration, Instant};
1591
1592 fn make_embedding(values: &[f32]) -> Vec<f32> {
1593 let norm: f32 = values.iter().map(|v| v * v).sum::<f32>().sqrt();
1594 values.iter().map(|v| v / norm).collect()
1595 }
1596
1597 /// Build a deterministic embedding-set fixture of `n` 16-dim
1598 /// L2-normalised vectors. Tests use this to make the `build_hnsw`
1599 /// pass non-trivial without inflating compile time.
1600 fn fixture(n: usize) -> Vec<(String, Vec<f32>)> {
1601 (0..n)
1602 .map(|i| {
1603 let mut v = vec![0.0_f32; 16];
1604 #[allow(clippy::cast_precision_loss)]
1605 let f = i as f32;
1606 v[i % 16] = 1.0 + f * 0.001;
1607 (format!("id-{i}"), make_embedding(&v))
1608 })
1609 .collect()
1610 }
1611
1612 /// #968 contract 1 — search must remain responsive while a rebuild
1613 /// runs in the background. We spawn a rebuild_async over a
1614 /// reasonably-sized fixture (the build is CPU-bound but not
1615 /// minutes-long at this scale) and concurrently issue 50 search
1616 /// calls. The reader-loop must complete well under the time it
1617 /// would take if every search had to wait on the rebuild's inner
1618 /// mutex (which it never holds for more than microseconds).
1619 #[test]
1620 fn rebuild_async_does_not_block_search_968() {
1621 let idx = TArc::new(VectorIndex::build(fixture(2_000)));
1622 let query = make_embedding(&[1.0_f32; 16]);
1623
1624 // Start the rebuild OFF-THREAD.
1625 let idx_for_rebuild = TArc::clone(&idx);
1626 let rebuild_handle = std::thread::spawn(move || idx_for_rebuild.rebuild_async());
1627 // Concurrent search loop: fire 50 searches.
1628 let idx_for_search = TArc::clone(&idx);
1629 let search_start = Instant::now();
1630 let search_handle = std::thread::spawn(move || {
1631 for _ in 0..50 {
1632 let hits = idx_for_search.search(&query, 10);
1633 // Each search must return at most 10 hits (the k cap)
1634 // and the fixture has >10 entries, so a non-empty
1635 // result is the expected output. We assert non-empty
1636 // (rather than ==10) because the swap mid-loop can
1637 // briefly leave the graph empty while overflow takes
1638 // over — both shapes are correct.
1639 assert!(
1640 !hits.is_empty(),
1641 "search returned empty during rebuild — readers were blocked or the graph was lost"
1642 );
1643 }
1644 });
1645 // Wait for both to finish. The rebuild thread may take seconds;
1646 // the search thread must NOT.
1647 let _ = search_handle.join().expect("search thread panicked");
1648 let search_elapsed = search_start.elapsed();
1649 // The 50-search loop with a 2k-entry index should complete in
1650 // tens of ms. We use a 5-second budget — wide enough to
1651 // absorb CI jitter, narrow enough to catch the v0.6 regression
1652 // (which would have blocked for ~3-10s on the rebuild mutex).
1653 assert!(
1654 search_elapsed < Duration::from_secs(5),
1655 "50 searches took {:?} — readers blocked on the rebuild (v0.6 regression)",
1656 search_elapsed,
1657 );
1658 let _ = rebuild_handle.join().expect("rebuild thread panicked");
1659 // Drain any pending warming into active so post-test state is clean.
1660 idx.try_swap_warming();
1661 }
1662
1663 /// #968 contract 2 — if the build fails (we cannot easily induce
1664 /// a panic from `instant_distance::Builder::build` deterministically;
1665 /// instead we exercise the rebuild_in_flight short-circuit + the
1666 /// "no warmed result" path), the active graph is unchanged.
1667 /// Concretely: we call `rebuild_async` while a prior rebuild is in
1668 /// flight; the second call returns a no-op handle and leaves
1669 /// `active` serving the prior snapshot.
1670 #[test]
1671 fn rebuild_failure_leaves_active_unchanged_968() {
1672 let entries = fixture(50);
1673 let idx = VectorIndex::build(entries.clone());
1674
1675 // Pre-rebuild: search returns expected ids.
1676 let query = make_embedding(&[1.0_f32; 16]);
1677 let pre_hits = idx.search(&query, 5);
1678 assert_eq!(pre_hits.len(), 5);
1679 let pre_ids: std::collections::HashSet<String> =
1680 pre_hits.iter().map(|h| h.id.clone()).collect();
1681
1682 // Force the in-flight flag on so the next rebuild_async takes
1683 // the short-circuit path (returns a no-op handle).
1684 idx.rebuild_in_flight.store(true, Ordering::SeqCst);
1685 let handle = idx.rebuild_async();
1686 let _ = handle.join();
1687 // Active should be UNCHANGED — no warmed graph was parked.
1688 let post_hits = idx.search(&query, 5);
1689 let post_ids: std::collections::HashSet<String> =
1690 post_hits.iter().map(|h| h.id.clone()).collect();
1691 assert_eq!(
1692 pre_ids, post_ids,
1693 "search results changed after a no-op rebuild — active was clobbered"
1694 );
1695
1696 // Cleanup: clear the manually-poked flag.
1697 idx.rebuild_in_flight.store(false, Ordering::SeqCst);
1698 }
1699
1700 /// #968 contract 3 — writes during a rebuild are preserved.
1701 /// We snapshot a baseline, kick off a rebuild_async, then issue
1702 /// N concurrent inserts. After the rebuild lands, every inserted
1703 /// id must be findable via search (either via the new graph if the
1704 /// snapshot captured it, or via the overflow if it arrived after
1705 /// the snapshot — both paths must surface the id).
1706 #[test]
1707 fn concurrent_writes_during_rebuild_consistent_968() {
1708 let idx = TArc::new(VectorIndex::build(fixture(500)));
1709 let handle = {
1710 let idx = TArc::clone(&idx);
1711 std::thread::spawn(move || idx.rebuild_async())
1712 };
1713
1714 // Issue 30 concurrent inserts. These flow into overflow +
1715 // all_entries; the snapshot already captured at rebuild_async
1716 // time has its own entry list, so these inserts will remain
1717 // in overflow until the NEXT rebuild — but the swap path
1718 // trims only the overflow PREFIX present at snapshot time, so
1719 // the new entries survive.
1720 let inserts_done = TArc::new(AtomicUsize::new(0));
1721 let mut writer_handles = Vec::new();
1722 for i in 0..30 {
1723 let idx = TArc::clone(&idx);
1724 let counter = TArc::clone(&inserts_done);
1725 writer_handles.push(std::thread::spawn(move || {
1726 let mut v = vec![0.0_f32; 16];
1727 #[allow(clippy::cast_precision_loss)]
1728 let f = i as f32;
1729 v[i % 16] = 2.0 + f * 0.01;
1730 idx.insert(format!("concurrent-{i}"), make_embedding(&v));
1731 counter.fetch_add(1, Ordering::SeqCst);
1732 }));
1733 }
1734 for h in writer_handles {
1735 let _ = h.join();
1736 }
1737 let rebuild_h = handle.join().expect("outer rebuild spawner panicked");
1738 let _ = rebuild_h.join();
1739 // v0.7.0 #1212 — deterministic post-rebuild observation barrier.
1740 // Pre-#1212 a single `try_swap_warming()` followed immediately
1741 // by the search loop raced the cross-thread publication of the
1742 // swap under stressed CI runners (`SAL-only feature gate` /
1743 // parallel-test load). The fix is two-fold:
1744 // (1) Drain ANY parked warming result in a tight loop — handles
1745 // the rare double-rebuild case (writer crossing
1746 // REBUILD_THRESHOLD during the concurrent-write phase).
1747 // (2) Yield briefly so any post-swap state (`valid_ids_cache`
1748 // lazy rebuild, HNSW search-side state init) settles
1749 // before the search loop reads it. 10 ms is generous
1750 // enough for any GHA runner under 4-way parallel-test
1751 // contention; the test still completes in <100 ms total.
1752 let mut swaps = 0_usize;
1753 while idx.try_swap_warming() {
1754 swaps += 1;
1755 // Defensive cap — at v0.7.0 there can be at most one
1756 // parked warming result at a time, but loop bounded just
1757 // in case a follow-on rebuild parks one while we drain.
1758 if swaps > 4 {
1759 break;
1760 }
1761 }
1762 std::thread::sleep(Duration::from_millis(10));
1763
1764 // Verify all 30 ids survived the rebuild. The CONTRACT under
1765 // test is "post-rebuild state INCLUDES the concurrent writes"
1766 // — i.e. every concurrent id is in `all_entries` (graph OR
1767 // overflow). We assert that directly via `len()` + a
1768 // sufficiently-wide search rather than relying on tie-break
1769 // determinism at small k. The baseline fixture (500 entries
1770 // across 16 axes) clusters ~32 entries per axis at
1771 // post-normalization distance 0 from any axis query; concurrent
1772 // entries on the same axis tie with them at distance 0, and
1773 // truncate-to-k can clip a single tied entry under
1774 // tie-break-dependent sort behavior. The fix is to widen k
1775 // beyond the tie cluster, NOT to weaken the contract.
1776 assert_eq!(inserts_done.load(Ordering::SeqCst), 30);
1777 let final_len = idx.len();
1778 // v0.7.0 #1212 — snapshot the post-swap private state ONCE so
1779 // every downstream assertion's panic message can cite the same
1780 // ground-truth set of values. Pre-#1212 a panic at line
1781 // src/hnsw.rs:1555 reported only "found < 29" with NO
1782 // observable state, making the CI flake un-diagnosable
1783 // without local repro. Capturing overflow.len() + hnsw size
1784 // here (under a single inner-lock guard) is the diagnostic
1785 // hook the operator + future agent needs to identify which
1786 // buffer the concurrent inserts landed in at panic time.
1787 let (overflow_len_dbg, hnsw_size_dbg) = {
1788 let state = idx.inner.lock().expect("inner mutex poisoned");
1789 let hnsw_size = state.hnsw.as_ref().map_or(0, |h| h.iter().count());
1790 (state.overflow.len(), hnsw_size)
1791 };
1792 assert_eq!(
1793 final_len,
1794 530,
1795 "post-rebuild len must equal baseline 500 + concurrent 30 = 530, \
1796 got {final_len} (overflow={overflow_len_dbg}, hnsw={hnsw_size_dbg}, \
1797 swaps={swaps}, inserts_done={})",
1798 inserts_done.load(Ordering::SeqCst)
1799 );
1800 // Assert the survival CONTRACT — "post-rebuild state INCLUDES
1801 // every concurrent write" — DETERMINISTICALLY via `all_entries`
1802 // membership (each id is in the graph OR overflow), not via the
1803 // approximate HNSW `search()` path.
1804 //
1805 // Why not `search()`: the active graph is built with
1806 // `Builder::default()` and queried with `Search::default()`, so
1807 // the search beam (`ef`) is a fixed library default that does
1808 // NOT scale with `k`. `search(q, 600)` over a 530-entry index
1809 // therefore returns only the top-`ef` graph candidates plus the
1810 // overflow scan — a graph-resident entry outside the beam is
1811 // intermittently clipped under stressed / instrumented runners.
1812 // The llvm-cov coverage job (run 27001253837/27001253833) hit
1813 // exactly this: `found` came back 28/30 while `final_len == 530`
1814 // simultaneously PROVED all 30 were present. Earlier revisions
1815 // chased it by widening `k` (no effect — the limit is `ef`, not
1816 // `k`) and then relaxing the floor to 29 (still flaky: 2-id
1817 // clips occur under heavy instrumentation). Membership over
1818 // `all_entries` is exact and race-free, so it is both the
1819 // correct expression of the contract AND immune to ANN-recall
1820 // jitter.
1821 //
1822 // The `swaps`/`overflow`/`hnsw` diagnostics captured above are
1823 // retained in the panic message so any genuine drop (a swap that
1824 // loses a write) still names the failure mode in one read.
1825 let present: std::collections::HashSet<String> = {
1826 let state = idx.inner.lock().expect("inner mutex poisoned");
1827 state.all_entries.iter().map(|(id, _)| id.clone()).collect()
1828 };
1829 let missing: Vec<String> = (0..30)
1830 .map(|i| format!("concurrent-{i}"))
1831 .filter(|id| !present.contains(id))
1832 .collect();
1833 assert!(
1834 missing.is_empty(),
1835 "post-rebuild all_entries must INCLUDE every concurrent write \
1836 (missing={missing:?}; post-swap state: overflow={overflow_len_dbg}, \
1837 hnsw={hnsw_size_dbg}, swaps={swaps}, final_len={final_len}); \
1838 a missing id means the concurrent-rebuild swap dropped a write"
1839 );
1840 }
1841
1842 /// #968 contract 4 — the swap is atomic. We never observe a
1843 /// half-populated graph during the swap window. To check this we
1844 /// run many search-then-len pairs concurrently with a rebuild
1845 /// and assert that `len()` is monotonically >= the baseline at
1846 /// every observation point (the swap only adds entries, never
1847 /// loses them).
1848 #[test]
1849 fn rebuild_swap_is_atomic_968() {
1850 let idx = TArc::new(VectorIndex::build(fixture(1_000)));
1851 let baseline_len = idx.len();
1852 let stop = TArc::new(AtomicBool::new(false));
1853 let observer_stop = TArc::clone(&stop);
1854 let idx_obs = TArc::clone(&idx);
1855 let observer = std::thread::spawn(move || {
1856 while !observer_stop.load(Ordering::SeqCst) {
1857 let l = idx_obs.len();
1858 assert!(
1859 l >= baseline_len,
1860 "len() dropped below baseline during rebuild — partial swap observed: {l} < {baseline_len}"
1861 );
1862 }
1863 });
1864 // Run a real rebuild.
1865 let h = idx.rebuild_async();
1866 let _ = h.join();
1867 idx.try_swap_warming();
1868 // Stop the observer.
1869 stop.store(true, Ordering::SeqCst);
1870 let _ = observer.join();
1871 assert_eq!(idx.len(), baseline_len);
1872 }
1873
1874 // v0.7.0 #1074 (SR-2 #2, HIGH) — eviction-then-rebuild gap.
1875 // The swap path must DROP a warming result whose
1876 // `overflow_generation` doesn't match the current state, so an
1877 // entry inserted into overflow AFTER a clear() bump is not
1878 // mistakenly drained out as if it were a captured snapshot
1879 // entry. We park a stale-gen result directly in the warming
1880 // slot and confirm try_swap_warming refuses to swap AND does
1881 // not drain overflow.
1882 #[test]
1883 fn stale_warming_swap_is_dropped_1074() {
1884 let idx = VectorIndex::empty();
1885 // Insert a non-trivial overflow entry so we can assert
1886 // try_swap_warming doesn't drain it.
1887 idx.insert(
1888 "alpha".to_string(),
1889 make_embedding(&[1.0_f32, 0.0, 0.0, 0.0]),
1890 );
1891 let before_overflow = idx.inner.lock().unwrap().overflow.len();
1892 assert_eq!(before_overflow, 1);
1893
1894 // Park a STALE-generation warming result with a swap that
1895 // would otherwise drain everything.
1896 {
1897 let current_gen = idx.inner.lock().unwrap().overflow_generation;
1898 let mut w = idx.warming.lock().unwrap();
1899 *w = Some(RebuildResult {
1900 hnsw: None,
1901 overflow_at_snapshot: 999, // would have drained the whole overflow
1902 overflow_generation: current_gen.wrapping_add(1), // mismatched
1903 entries_in_graph: 0,
1904 });
1905 }
1906
1907 // Swap MUST refuse and leave overflow intact.
1908 let swapped = idx.try_swap_warming();
1909 assert!(
1910 !swapped,
1911 "stale-by-generation warming must NOT swap in (#1074)"
1912 );
1913 let after_overflow = idx.inner.lock().unwrap().overflow.len();
1914 assert_eq!(
1915 after_overflow, before_overflow,
1916 "stale swap must NOT drain overflow (#1074 regression)"
1917 );
1918 // The alpha entry must still be findable via the linear
1919 // overflow scan (no graph yet).
1920 let hits = idx.search(&make_embedding(&[1.0_f32, 0.0, 0.0, 0.0]), 5);
1921 assert!(hits.iter().any(|h| h.id == "alpha"));
1922 }
1923
1924 // v0.7.0 #1074 — confirm overflow.clear() in the eviction path
1925 // bumps the generation counter so a snapshot captured BEFORE the
1926 // eviction will fail the gen check on swap. This pins the load-
1927 // bearing invariant: clear() must always bump.
1928 #[test]
1929 fn eviction_clear_bumps_overflow_generation_1074() {
1930 let idx = VectorIndex::with_max_entries_for_test(2);
1931 let gen_initial = idx.inner.lock().unwrap().overflow_generation;
1932 // 3 inserts past cap=2 → at least one eviction-clear fires.
1933 for i in 0..3 {
1934 let mut v = vec![0.0_f32; 4];
1935 v[i % 4] = 1.0;
1936 idx.insert(format!("e{i}"), make_embedding(&v));
1937 }
1938 let gen_after = idx.inner.lock().unwrap().overflow_generation;
1939 assert!(
1940 gen_after > gen_initial,
1941 "eviction-clear path must bump overflow_generation (#1074)"
1942 );
1943 }
1944
1945 // -----------------------------------------------------------------
1946 // #1579 — async-boot seeding + coverage accounting
1947 // -----------------------------------------------------------------
1948
1949 #[test]
1950 fn seed_entries_dedupes_and_defers_searchability_1579() {
1951 let idx = VectorIndex::empty();
1952 // A write that arrived through the normal path stays covered
1953 // (overflow is linearly scanned).
1954 idx.insert("live".into(), make_embedding(&[1.0, 0.0, 0.0]));
1955 assert!(
1956 idx.is_fully_searchable(),
1957 "overflow-only index is fully searchable"
1958 );
1959 // Seed the boot snapshot; the id already present must be
1960 // skipped (no double-count), the rest land in all_entries
1961 // only.
1962 let seeded = idx.seed_entries(vec![
1963 ("live".into(), make_embedding(&[1.0, 0.0, 0.0])),
1964 ("a".into(), make_embedding(&[0.0, 1.0, 0.0])),
1965 ("b".into(), make_embedding(&[0.0, 0.0, 1.0])),
1966 ]);
1967 assert_eq!(seeded, 2, "duplicate id must be skipped");
1968 assert_eq!(idx.len(), 3);
1969 assert!(
1970 !idx.is_fully_searchable(),
1971 "seeded-but-unbuilt entries must report not-fully-searchable \
1972 so the conflict check routes to its fallback (#1579 A5/B3)"
1973 );
1974 // Search during the warm window must still serve the live
1975 // (overflow) entry — and must NOT pretend to cover the seeds.
1976 let hits = idx.search(&make_embedding(&[0.0, 1.0, 0.0]), 3);
1977 assert!(hits.iter().all(|h| h.id == "live"));
1978 }
1979
1980 #[test]
1981 fn warm_boot_seeds_builds_and_swaps_1579() {
1982 let idx = VectorIndex::empty();
1983 let seeded = idx.warm_boot(vec![
1984 ("a".into(), make_embedding(&[1.0, 0.0, 0.0])),
1985 ("b".into(), make_embedding(&[0.0, 1.0, 0.0])),
1986 ("c".into(), make_embedding(&[0.0, 0.0, 1.0])),
1987 ]);
1988 assert_eq!(seeded, 3);
1989 assert!(
1990 idx.is_fully_searchable(),
1991 "warm_boot must drive rebuild→swap to completion"
1992 );
1993 let hits = idx.search(&make_embedding(&[1.0, 0.0, 0.0]), 2);
1994 assert_eq!(hits.first().map(|h| h.id.as_str()), Some("a"));
1995 }
1996
1997 #[test]
1998 fn build_and_insert_preserve_full_searchability_1579() {
1999 let idx = VectorIndex::build(vec![
2000 ("a".into(), make_embedding(&[1.0, 0.0, 0.0])),
2001 ("b".into(), make_embedding(&[0.0, 1.0, 0.0])),
2002 ]);
2003 assert!(idx.is_fully_searchable(), "synchronous build covers all");
2004 idx.insert("c".into(), make_embedding(&[0.0, 0.0, 1.0]));
2005 assert!(
2006 idx.is_fully_searchable(),
2007 "insert lands in overflow — coverage preserved"
2008 );
2009 idx.remove("a");
2010 assert!(
2011 idx.is_fully_searchable(),
2012 "remove only shrinks all_entries — coverage preserved"
2013 );
2014 }
2015}