Skip to main content

ai_memory/
hnsw.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! HNSW (Hierarchical Navigable Small World) vector index for fast approximate
5//! nearest-neighbor search over memory embeddings.
6//!
7//! Built on `instant-distance`. The index is constructed at startup from all
8//! stored embeddings. New memories added during the session go into an overflow
9//! list that is scanned linearly alongside the HNSW results — the index is
10//! rebuilt lazily once the overflow exceeds a threshold.
11
12use instant_distance::{Builder, HnswMap, Search};
13// `instant_distance::Point` is the trait that supplies the
14// `EmbeddingPoint::distance` method; it has to be in scope for the
15// in-module tests (`embedding_point_distance_*`) to call it as a
16// method. The lib code itself goes through the slice-borrow
17// `cosine_distance` helper post-#1087 so the `Point` impl is the
18// only consumer of the trait at the bare-name level.
19#[cfg(test)]
20use instant_distance::Point;
21use std::sync::Arc;
22use std::sync::Mutex;
23use std::sync::atomic::{AtomicBool, Ordering};
24use std::sync::mpsc::Sender;
25use std::thread::JoinHandle;
26
27use crate::hooks::EvictionEvent;
28
29/// Tracing target for the HNSW eviction worker (#1558 tracing-target SSOT).
30const EVICTION_TRACE_TARGET: &str = "hnsw.eviction";
31
32/// Maximum overflow entries before triggering a rebuild.
33const REBUILD_THRESHOLD: usize = 200;
34
35/// #1037 (2026-05-21) — bounded spin-wait window for [`VectorIndex::rebuild`]
36/// (the sync shim) when [`VectorIndex::rebuild_async`] short-circuited to
37/// a no-op handle because a prior async rebuild was still in flight.
38/// 1 second is well under any sensible sync-rebuild expectation
39/// (production callers use `rebuild_async`); the budget exists only
40/// to convert "silently return stale graph" → "bounded timeout, then
41/// best-effort swap".
42const REBUILD_WAIT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(1);
43/// Poll cadence for bounded waits on an in-flight rebuild. Shared by
44/// the [`VectorIndex::rebuild`] sync shim (#1037) and the #1579 B3
45/// boot warm-up retry loops (`warm_boot`,
46/// `daemon_runtime::spawn_vector_index_boot_load`).
47pub(crate) const REBUILD_WAIT_POLL_INTERVAL: std::time::Duration =
48    std::time::Duration::from_millis(10);
49
50/// #1579 B3 — minimum embedded-row count before a ONE-SHOT CLI
51/// invocation builds an HNSW graph. Below this, the recall pipeline's
52/// linear-scan fallback (`vector_index = None`) answers the semantic
53/// phase in ≤ 35 ms (P1 audit, 10-20k rows) while graph construction
54/// costs ~40 s at 10k vectors — a build that is then thrown away when
55/// the process exits. Long-lived processes (serve / MCP stdio) always
56/// build (asynchronously, see `daemon_runtime::spawn_vector_index_boot_load`)
57/// because they amortise the cost across many recalls.
58pub const CLI_HNSW_BUILD_MIN_ENTRIES: usize = 20_000;
59
60/// Maximum entries before evicting oldest to prevent unbounded memory growth.
61///
62/// Production code uses the constant 100_000. Tests may construct a
63/// `VectorIndex` with a custom cap via [`VectorIndex::with_max_entries_for_test`]
64/// — that knob is stored on the index instance itself, so it does
65/// NOT affect concurrent tests running with the default cap. The
66/// constant lives here so call sites (and the per-event tracing
67/// payload) reference one canonical value.
68const MAX_ENTRIES: usize = 100_000;
69
70// ---------------------------------------------------------------------------
71// v0.6.3.1 (P3, G2): eviction observability
72//
73// `MAX_ENTRIES`-triggered eviction in `insert()` previously dropped the
74// oldest embeddings silently — operators near the cap lost recall quality
75// invisibly. The two counters below + the structured `hnsw.eviction`
76// tracing event close that gap:
77//
78//   - eviction count — cumulative count surfaced via
79//     `db::stats().index_evictions_total` (and capabilities) AND at
80//     `/metrics` as `ai_memory_hnsw_evictions_total`.
81//   - last-eviction wall clock — UNIX nanoseconds of the most recent
82//     eviction; capabilities derive `hnsw.evicted_recently` from this
83//     with a 60 s rolling window.
84//
85// **pm-v3.1 PR8 (issue #1174).** Pre-PR8 the counters were two free
86// `static AtomicU64`s at the top of this file. PR8 sank both into the
87// metrics registry (`src/metrics.rs::HNSW_EVICTIONS_TOTAL` +
88// `HNSW_LAST_EVICTION_AT_NANOS`, plus matching Prometheus
89// `IntCounter` / `IntGauge` handles on `Metrics`) so the eviction
90// signal is `/metrics`-scrape-visible without a separate observer
91// thread. The accessor signatures here are preserved verbatim for
92// call-site backward compat.
93//
94// Process-local. The counters reset on restart because the index itself
95// resets on restart. Both atomics are touched only on the eviction edge
96// (rare: requires >100k vectors), so there is no measurable hot-path cost.
97// ---------------------------------------------------------------------------
98
99/// Cumulative HNSW oldest-eviction count since process start.
100///
101/// Surfaces in `memory_stats`. Non-zero indicates the in-memory vector
102/// index has hit `MAX_ENTRIES` and dropped older embeddings; recall
103/// quality may have degraded for evicted ids until they are re-inserted
104/// (e.g. on next access via `recall` touch path).
105///
106/// pm-v3.1 PR8: thin shim over `crate::metrics::hnsw_evictions_total()`.
107#[must_use]
108pub fn index_evictions_total() -> u64 {
109    crate::metrics::hnsw_evictions_total()
110}
111
112// ---------------------------------------------------------------------------
113// M8 (v0.7.0 round-2) — eviction-rate observability.
114//
115// Operators who hit the 100k cap need two signals:
116//
117//   1. Per-eviction WARN — surface every eviction event so operators
118//      see drift before recall quality has noticeably degraded.
119//   2. Rolling-rate ERROR — when the trailing-hour eviction rate
120//      exceeds the M8 ceiling, escalate to ERROR so the ops dashboard
121//      raises a page. The escalation message names the operator
122//      knobs (`vector_index_capacity` / "move to dedicated vector DB")
123//      so the on-call has the remediation in the log line.
124//
125// Implementation: a small fixed-size ring buffer of UNIX-nanosecond
126// timestamps. Each eviction `push`es a stamp; the rolling-rate check
127// counts how many stamps sit inside the trailing-hour window. The
128// ring is locked behind a `Mutex` for write-coherent visibility; the
129// path runs only on the eviction edge so the lock cost is negligible.
130// ---------------------------------------------------------------------------
131
132/// M8 eviction-rate ceiling: events / hour past which the rolling
133/// observer escalates from WARN to ERROR.
134const EVICTION_RATE_CEILING_PER_HOUR: usize = 10;
135
136/// Rolling-hour ring buffer capacity. Chosen so the ring can hold the
137/// ceiling plus headroom for burstiness; older entries are
138/// transparently evicted on push.
139const EVICTION_RATE_RING_CAP: usize = 64;
140
141/// v0.7.0 #1093 — eviction-rate ring buffer. Switched from
142/// `Mutex<Vec<u64>>` to `Mutex<VecDeque<u64>>` so the cap-eviction
143/// path is O(1) `pop_front` instead of O(N) `Vec::remove(0)`.
144static EVICTION_RATE_RING: Mutex<std::collections::VecDeque<u64>> =
145    Mutex::new(std::collections::VecDeque::new());
146
147/// Whether an eviction occurred within the trailing `window_secs`.
148///
149/// Used by capabilities (P1) to set `hnsw.evicted_recently` so operators
150/// can see ongoing pressure on the cap, not just the cumulative count.
151/// Returns `false` when no evictions have ever happened in this process.
152#[must_use]
153pub fn evicted_recently(window_secs: u64) -> bool {
154    let last = crate::metrics::hnsw_last_eviction_at_nanos();
155    if last == 0 {
156        return false;
157    }
158    let now_nanos = std::time::SystemTime::now()
159        .duration_since(std::time::UNIX_EPOCH)
160        .map(|d| d.as_nanos())
161        .unwrap_or(0);
162    // Saturating math: clock can move backwards on some VMs.
163    let elapsed_nanos = u128::from(u64::MAX).min(now_nanos.saturating_sub(u128::from(last)));
164    elapsed_nanos < u128::from(window_secs).saturating_mul(1_000_000_000)
165}
166
167/// Reset the eviction counters. Test-only — production callers must not
168/// reach into the counter directly. The function is `pub` (rather than
169/// `pub(crate)`) so the integration-test crate at `tests/` can drive it
170/// alongside the public `index_evictions_total()` accessor; renaming
171/// keeps the intent obvious at every call site.
172///
173/// pm-v3.1 PR8: thin shim over
174/// `crate::metrics::reset_hnsw_eviction_counters_for_test()`.
175#[doc(hidden)]
176pub fn reset_eviction_counters_for_test() {
177    crate::metrics::reset_hnsw_eviction_counters_for_test();
178    if let Ok(mut g) = EVICTION_RATE_RING.lock() {
179        g.clear();
180    }
181}
182
183/// M8 (v0.7.0 round-2) — push the latest eviction timestamp into the
184/// rolling-hour ring and return how many stamps now sit inside the
185/// trailing hour. Producers call this once per eviction event;
186/// the caller branches on the returned count to escalate from WARN
187/// (already emitted) to ERROR.
188fn record_eviction_and_count_recent(now_nanos: u64) -> usize {
189    const ONE_HOUR_NANOS: u64 = crate::SECS_PER_HOUR as u64 * 1_000_000_000;
190    let cutoff = now_nanos.saturating_sub(ONE_HOUR_NANOS);
191    let Ok(mut ring) = EVICTION_RATE_RING.lock() else {
192        // Poisoned lock — observability is best-effort, return 0 so
193        // the caller does not over-escalate.
194        return 0;
195    };
196    // Drop stale entries first so the ring stays bounded and the
197    // count reflects the trailing hour.
198    ring.retain(|t| *t >= cutoff);
199    if ring.len() >= EVICTION_RATE_RING_CAP {
200        // v0.7.0 #1093 — VecDeque::pop_front is O(1); pre-#1093
201        // Vec::remove(0) was O(N) (backing-buffer shift).
202        ring.pop_front();
203    }
204    ring.push_back(now_nanos);
205    ring.len()
206}
207
208/// A point in the HNSW index — wraps a dense embedding vector.
209#[derive(Clone, Debug)]
210pub struct EmbeddingPoint(pub Vec<f32>);
211
212/// v0.7.0 #1087 — slice-borrow cosine-distance helper used by the
213/// overflow scan in [`VectorIndex::search`] to compute distances
214/// against the stored `Vec<f32>` without cloning each overflow
215/// embedding into a fresh `EmbeddingPoint`. Embeddings are
216/// L2-normalised so dot product = cosine similarity.
217#[inline]
218fn cosine_distance(a: &[f32], b: &[f32]) -> f32 {
219    let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
220    let dist = 1.0 - dot;
221    // #1684 — a NaN/±Inf component (local seed/insert paths bypass
222    // `federation::sanitize_shipped_vector`) makes `dist` non-finite; NaN is
223    // UNORDERED under `partial_cmp`, which previously aborted the sort via
224    // `.unwrap()` on `None` and killed the single-threaded MCP process.
225    // Collapse non-finite to `f32::MAX` so a poisoned row ranks LAST instead of
226    // corrupting (or crashing) the candidate set. Mirrors the finite-or-floor
227    // defense in `Embedder::cosine_similarity` (src/embeddings.rs).
228    if dist.is_finite() { dist } else { f32::MAX }
229}
230
231impl instant_distance::Point for EmbeddingPoint {
232    fn distance(&self, other: &Self) -> f32 {
233        cosine_distance(&self.0, &other.0)
234    }
235}
236
237// ---------------------------------------------------------------------------
238// #968 (Wave-2 Tier-C3) — async rebuild + double-buffering.
239//
240// Prior to #968 every HNSW rebuild ran SYNCHRONOUSLY on the request thread:
241// `Self::build_hnsw(&state.all_entries)` is CPU-bound (graph construction
242// is O(N log N) with constant factors that put 100k vectors at ~3-10s on
243// commodity hardware) and the producer's `insert()` call simply blocked
244// until the new graph was ready. Search callers contending for the same
245// `inner` mutex blocked too — recall p95 spiked from <20 ms to multi-second.
246//
247// The fix is a double-buffer pattern with background-task swap-in:
248//   • The `active` slot (inside `IndexState`) is the index that serves
249//     reads. Search holds the inner lock only long enough to clone the
250//     overflow + collect valid IDs; the HNSW search itself runs against
251//     the active graph held under the same lock (instant-distance's
252//     `Search::default()` is per-call scratch, no shared state).
253//   • The `warming` slot is `Arc<Mutex<Option<HnswMap>>>`. A background
254//     thread (`std::thread::spawn` — HNSW build is CPU-bound; no tokio
255//     runtime needed) builds the new graph from a snapshot of
256//     `all_entries`, then drops it into `warming`. On the next
257//     `try_swap_warming()` (called from search + insert + explicit poll)
258//     the warmed graph atomically replaces `active`. The mutex hold
259//     spans only the std::mem::swap — microseconds.
260//   • Concurrent writes during rebuild: writes flow into `overflow` and
261//     `all_entries` normally while the background task is building from
262//     the snapshot. On swap, we trim `overflow` of the entries already
263//     captured in the snapshot (the snapshot length is recorded when the
264//     job kicks off). Entries inserted AFTER the snapshot remain in
265//     overflow and are searched linearly until the next rebuild captures
266//     them. No write is ever dropped.
267//   • Rebuild failures: a panicking build-thread leaves `warming`
268//     untouched (`None`); `active` is unchanged. The `JoinHandle` exposes
269//     the panic to the caller via `JoinHandle::join()`. The
270//     `rebuild_in_flight` atomic flips back to `false` whether the
271//     thread succeeded or panicked (via a drop-guard `RebuildGuard`).
272// ---------------------------------------------------------------------------
273
274/// Snapshot-bound rebuild job. Carries the captured `all_entries` plus
275/// the overflow length at snapshot time so the post-swap overflow trim
276/// is deterministic. The trim must use the overflow length specifically
277/// (NOT `all_entries.len()`) because writes between snapshot and swap
278/// extend overflow; only the overflow PREFIX whose entries are now in
279/// the new graph is safe to drop.
280struct RebuildSnapshot {
281    entries: Vec<(String, Vec<f32>)>,
282    /// Length of `overflow` at the moment the snapshot was taken. The
283    /// swap path drains the first `overflow_at_snapshot` entries from
284    /// `state.overflow` — those entries are now in the new graph.
285    /// Anything inserted AFTER the snapshot remains in overflow for
286    /// the next rebuild cycle. Capturing the OVERFLOW length (not the
287    /// all-entries length) is load-bearing for correctness under
288    /// concurrent writes during rebuild.
289    overflow_at_snapshot: usize,
290    /// v0.7.0 #1074 (SR-2 #2, HIGH) — generation counter snapshot.
291    /// The eviction path bumps `state.overflow_generation` and
292    /// `state.clear()`s `overflow`. If a rebuild snapshot was
293    /// captured BEFORE the eviction, its `overflow_generation` will
294    /// not match the post-eviction `state.overflow_generation`, and
295    /// the swap path knows the snapshot is stale: it must NOT drain
296    /// overflow entries (those entries are post-eviction inserts not
297    /// in the snapshot's `entries`), and the warmed graph itself is
298    /// stale (it was built from pre-eviction `all_entries` that have
299    /// since been shrunk). The safe action is to drop the warmed
300    /// result without swapping AND without draining, then let the
301    /// next rebuild capture the current state.
302    overflow_generation: u64,
303}
304
305/// Drop-guard that clears the `rebuild_in_flight` flag even if the
306/// background build panics. Without this, a panic in `build_hnsw` (e.g.
307/// OOM in `instant_distance::Builder::build`) would leave the flag
308/// stuck-on and prevent any future rebuild from being scheduled.
309struct RebuildGuard {
310    flag: Arc<AtomicBool>,
311}
312
313impl Drop for RebuildGuard {
314    fn drop(&mut self) {
315        self.flag.store(false, Ordering::SeqCst);
316    }
317}
318
319/// Thread-safe HNSW index over memory embeddings.
320pub struct VectorIndex {
321    /// The built HNSW index — maps embedding points to memory IDs.
322    inner: Mutex<IndexState>,
323    /// #968 — warming slot for the double-buffer pattern. The background
324    /// rebuild thread parks the freshly-built graph here; readers/writers
325    /// observe it via [`Self::try_swap_warming`] on their next inner-lock
326    /// acquisition. `Some` means a rebuild has finished and is awaiting
327    /// swap-in; `None` means no warmed graph is ready.
328    warming: Arc<Mutex<Option<RebuildResult>>>,
329    /// #968 — coordinator flag. `true` while a background rebuild is in
330    /// flight; prevents the auto-rebuild path in `insert()` from
331    /// spawning a second concurrent build (one CPU-bound build at a
332    /// time is enough — successive rebuilds chase the same target).
333    /// Cleared by the rebuild thread's drop-guard whether the build
334    /// succeeded or panicked.
335    rebuild_in_flight: Arc<AtomicBool>,
336    /// v0.7.0 (R3-S1) — eviction sink. The `MAX_ENTRIES`-triggered
337    /// drain in `insert()` pushes an [`EvictionEvent`] onto this
338    /// channel for each evicted id; a hook-aware observer above this
339    /// layer drains the channel and fires the `on_index_eviction`
340    /// chain off the hot path. Wired by the daemon at startup
341    /// (`daemon_runtime`) via [`Self::set_eviction_sink`]. Optional —
342    /// CLI / test builds that never bring up the hooks pipeline leave
343    /// it `None` and the sink-push is a no-op so eviction throughput
344    /// is unaffected. Closes the G2 / G8 "fire site exists but not
345    /// wired" gap that the prior `tracing::warn!`-only implementation
346    /// left open.
347    ///
348    /// `Mutex` (not `RwLock`) because writes happen exactly twice in
349    /// the process lifetime (`set_eviction_sink` at startup and
350    /// `Drop`) and reads happen only on the eviction edge which is
351    /// itself already serialized through `inner`. The non-blocking
352    /// `try_send` semantics on the channel make sink-push safe to
353    /// hold across the inner-state lock without risk of deadlock.
354    eviction_sink: Mutex<Option<Sender<EvictionEvent>>>,
355}
356
357/// #968 — payload the rebuild thread parks in the `warming` slot when
358/// the build completes. Carries the new graph PLUS the overflow length
359/// at snapshot time so the swap path can trim `overflow` deterministically:
360/// the prefix `..overflow_at_snapshot` is now in the graph; entries
361/// inserted AFTER the snapshot (the suffix) remain in `overflow` for
362/// the next cycle.
363struct RebuildResult {
364    hnsw: Option<HnswMap<EmbeddingPoint, String>>,
365    overflow_at_snapshot: usize,
366    /// v0.7.0 #1074 — propagated from the snapshot so the swap path
367    /// can detect a stale-by-eviction warming result.
368    overflow_generation: u64,
369    /// #1579 — number of entries captured in the snapshot this graph
370    /// was built from. On swap it becomes the new
371    /// `IndexState::graph_entry_count`, the coverage accounting behind
372    /// [`VectorIndex::is_fully_searchable`].
373    entries_in_graph: usize,
374}
375
376struct IndexState {
377    hnsw: Option<HnswMap<EmbeddingPoint, String>>,
378    /// Entries added after the last rebuild. Searched linearly.
379    overflow: Vec<(String, Vec<f32>)>,
380    /// All entries (for rebuild). Kept in sync with the index + overflow.
381    all_entries: Vec<(String, Vec<f32>)>,
382    /// v0.7.0 R3-S1 — per-instance eviction cap. Defaults to
383    /// [`MAX_ENTRIES`] (the production 100k). Tests construct an
384    /// index with a smaller cap via
385    /// [`VectorIndex::with_max_entries_for_test`] so the eviction
386    /// edge can be exercised without inserting 100k vectors. Storing
387    /// the cap per-instance (rather than as a process-wide atomic)
388    /// keeps concurrent tests independent.
389    max_entries: usize,
390    /// v0.7.0 #1074 (SR-2 #2, HIGH) — generation counter bumped on
391    /// every `overflow.clear()` (eviction-edge path). Snapshots
392    /// captured before a clear carry the old generation; the swap
393    /// path compares against the current generation and drops the
394    /// warming result without swapping when they don't match. Closes
395    /// the gap where an entry inserted between a snapshot capture
396    /// and the eviction-clear was incorrectly drained by the swap.
397    overflow_generation: u64,
398    /// v0.7.0 #1087 — cached HashSet view of `all_entries` ids used
399    /// by [`VectorIndex::search`] as the stale-id filter. Built
400    /// lazily on the first search after a mutation; invalidated to
401    /// `None` on insert push, eviction drain, and remove retain.
402    /// Pre-#1087 this set was rebuilt on EVERY recall.
403    ///
404    /// PERF-7 (FX-C4-batch2, 2026-05-26): the cache stores
405    /// `Arc<str>` per id instead of `String`. UUID strings are 36
406    /// bytes; `Arc<str>` is a 16-byte fat pointer with the bytes
407    /// heap-allocated once, whereas a `String` is a 24-byte
408    /// (ptr/len/cap) struct with the bytes heap-allocated PLUS the
409    /// 24 bytes inline. On a 100 000-entry warm-up the change
410    /// halves the per-rebuild allocator pressure (16 vs 24+ bytes
411    /// per entry plus the no-spare-capacity heap-side). HashSet
412    /// lookup against `&str` works through the `Borrow<str>` impl.
413    valid_ids_cache: Option<std::collections::HashSet<std::sync::Arc<str>>>,
414    /// #1579 — number of entries baked into the ACTIVE graph (the
415    /// snapshot length at its build time; 0 when `hnsw` is `None`).
416    /// Together with `overflow.len()` this tells whether a search can
417    /// see every live entry: entries seeded into `all_entries` by the
418    /// async-boot loader ([`VectorIndex::seed_entries`]) are in
419    /// NEITHER the graph NOR `overflow` until the boot rebuild swaps
420    /// in, and [`VectorIndex::is_fully_searchable`] reports `false`
421    /// for that window so callers (the #519 proactive conflict check)
422    /// can route to their non-index fallback instead of silently
423    /// searching an index that cannot return the seeded rows.
424    graph_entry_count: usize,
425}
426
427/// A search result from the vector index.
428#[derive(Debug, Clone)]
429pub struct VectorHit {
430    pub id: String,
431    pub distance: f32,
432}
433
434impl VectorIndex {
435    /// Build a new index from a list of (`memory_id`, embedding) pairs.
436    pub fn build(entries: Vec<(String, Vec<f32>)>) -> Self {
437        let hnsw = Self::build_hnsw(&entries);
438        let graph_entry_count = entries.len();
439        VectorIndex {
440            inner: Mutex::new(IndexState {
441                hnsw,
442                overflow: Vec::new(),
443                all_entries: entries,
444                max_entries: MAX_ENTRIES,
445                overflow_generation: 0,
446                valid_ids_cache: None,
447                graph_entry_count,
448            }),
449            eviction_sink: Mutex::new(None),
450            warming: Arc::new(Mutex::new(None)),
451            rebuild_in_flight: Arc::new(AtomicBool::new(false)),
452        }
453    }
454
455    /// Build an empty index.
456    pub fn empty() -> Self {
457        VectorIndex {
458            inner: Mutex::new(IndexState {
459                hnsw: None,
460                overflow: Vec::new(),
461                all_entries: Vec::new(),
462                max_entries: MAX_ENTRIES,
463                overflow_generation: 0,
464                valid_ids_cache: None,
465                graph_entry_count: 0,
466            }),
467            eviction_sink: Mutex::new(None),
468            warming: Arc::new(Mutex::new(None)),
469            rebuild_in_flight: Arc::new(AtomicBool::new(false)),
470        }
471    }
472
473    /// v0.7.0 R3-S1 — Build an empty index with a custom eviction
474    /// cap. Test-only: lets a 5-entry insert sequence exercise the
475    /// eviction edge in milliseconds (vs. the ~minute-scale cost of
476    /// inserting 100k vectors at the production cap). The knob is
477    /// stored per-instance so concurrent tests using the default
478    /// cap are unaffected.
479    #[doc(hidden)]
480    #[must_use]
481    pub fn with_max_entries_for_test(max_entries: usize) -> Self {
482        VectorIndex {
483            inner: Mutex::new(IndexState {
484                hnsw: None,
485                overflow: Vec::new(),
486                all_entries: Vec::new(),
487                max_entries,
488                overflow_generation: 0,
489                valid_ids_cache: None,
490                graph_entry_count: 0,
491            }),
492            eviction_sink: Mutex::new(None),
493            warming: Arc::new(Mutex::new(None)),
494            rebuild_in_flight: Arc::new(AtomicBool::new(false)),
495        }
496    }
497
498    /// v0.7.0 (R3-S1) — wire the eviction sink.
499    ///
500    /// The daemon calls this once at startup with the send-half of an
501    /// mpsc channel; a hook-aware observer task drains the recv-half
502    /// off the hot path and fires the `on_index_eviction` chain
503    /// (`fire_on_index_eviction` in `src/hooks/chain.rs`). Replacing
504    /// an existing sink is allowed — useful when the daemon
505    /// reconfigures the hook chain at runtime — and drops the prior
506    /// sender, which terminates the prior observer cleanly.
507    ///
508    /// Build-time / CLI / test builds that never wire a sink retain
509    /// the `None` default; the eviction path's `try_send` then
510    /// becomes a no-op short-circuit so there is no measurable cost
511    /// to leaving the sink unset.
512    pub fn set_eviction_sink(&self, sink: Sender<EvictionEvent>) {
513        if let Ok(mut guard) = self.eviction_sink.lock() {
514            *guard = Some(sink);
515        }
516    }
517
518    fn build_hnsw(entries: &[(String, Vec<f32>)]) -> Option<HnswMap<EmbeddingPoint, String>> {
519        if entries.is_empty() {
520            return None;
521        }
522        let points: Vec<EmbeddingPoint> = entries
523            .iter()
524            .map(|(_, emb)| EmbeddingPoint(emb.clone()))
525            .collect();
526        let values: Vec<String> = entries.iter().map(|(id, _)| id.clone()).collect();
527        Some(Builder::default().build(points, values))
528    }
529
530    /// Add a new entry to the index (goes to overflow until next rebuild).
531    pub fn insert(&self, id: String, embedding: Vec<f32>) {
532        // #968 — opportunistically swap any warmed graph BEFORE taking the
533        // write path. This lets the auto-rebuild scheduled by a previous
534        // insert land cleanly even if no search call has run between
535        // inserts. Cheap: the warming-mutex contention is microseconds.
536        self.try_swap_warming();
537
538        // #968 — capture the snapshot for a potential auto-rebuild OUTSIDE
539        // the inner lock so the build thread can be spawned without
540        // holding the writers' mutex.
541        let snapshot_for_rebuild: Option<RebuildSnapshot> = {
542            let mut state = match self.inner.lock() {
543                Ok(s) => s,
544                Err(poisoned) => poisoned.into_inner(),
545            };
546            state.all_entries.push((id.clone(), embedding.clone()));
547            state.overflow.push((id, embedding));
548            // v0.7.0 #1087 — invalidate cached valid_ids set; rebuilt
549            // lazily on the next search.
550            state.valid_ids_cache = None;
551
552            // #968 — async auto-rebuild: when overflow crosses the
553            // threshold, snapshot the entries and let the caller (below,
554            // outside the lock) spawn the background build. We do NOT
555            // build the graph synchronously here anymore; that was the
556            // multi-second request-thread block #968 fixes. The
557            // `rebuild_in_flight` CAS prevents the same `insert` call
558            // from racing a previously-scheduled rebuild — only one
559            // background build runs at a time; the next snapshot is
560            // captured after the current build's swap lands.
561            if state.overflow.len() >= REBUILD_THRESHOLD
562                && self
563                    .rebuild_in_flight
564                    .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
565                    .is_ok()
566            {
567                Some(RebuildSnapshot {
568                    entries: state.all_entries.clone(),
569                    overflow_at_snapshot: state.overflow.len(),
570                    overflow_generation: state.overflow_generation,
571                })
572            } else {
573                None
574            }
575        };
576        if let Some(snap) = snapshot_for_rebuild {
577            // Spawn-and-forget. The handle is consumed inside the thread
578            // via `RebuildGuard` so it doesn't dangle. Callers that want
579            // a handle should use [`Self::rebuild_async`].
580            let _ = self.spawn_rebuild(snap);
581        }
582
583        // Evict oldest entries if over capacity
584        let mut state = match self.inner.lock() {
585            Ok(s) => s,
586            Err(poisoned) => poisoned.into_inner(),
587        };
588        let max_entries = state.max_entries;
589        if state.all_entries.len() > max_entries {
590            let excess = state.all_entries.len() - max_entries;
591            // M8 (v0.7.0 round-2) — emit ONE summary WARN per eviction
592            // event so the operator sees the batch drop in the daemon
593            // log without scrolling past N per-id lines first. The
594            // per-id WARNs (below) still fire for post-mortem
595            // attribution; this one is the high-level "the index
596            // dropped N oldest embeddings" signal operators alert on.
597            tracing::warn!(
598                target: EVICTION_TRACE_TARGET,
599                dropped = excess,
600                max_entries = max_entries,
601                "HNSW eviction: dropped {} oldest embeddings to make room",
602                excess,
603            );
604            // v0.7.0 (R3-S1) — fire the `on_index_eviction` hook event
605            // for each evicted id BEFORE we drop the rows. The sink
606            // is a non-blocking `try_send` (see below); a downstream
607            // hook-aware observer drains the channel off the hot path
608            // and invokes `crate::hooks::fire_on_index_eviction` per
609            // event. This closes the G2/G8 "fire site exists but not
610            // wired" gap that the prior `tracing::warn!`-only
611            // implementation left open.
612            //
613            // The sink push happens INSIDE the inner-state lock — the
614            // channel is unbounded so `try_send`-equivalent `send`
615            // never blocks (unbounded mpsc has no backpressure). The
616            // sink lock is independent of the inner lock so there is
617            // no ordering hazard.
618            //
619            // The hook subscriber (if any) is responsible for its own
620            // logging; the warn-level tracing event is preserved here
621            // as a no-op-when-no-subscriber fallback so operators
622            // without hooks configured still see eviction pressure in
623            // daemon logs, matching the v0.6.3.1 observability contract.
624            let sink_guard = self.eviction_sink.lock().ok();
625            for (evicted_id, _) in state.all_entries.iter().take(excess) {
626                tracing::warn!(
627                    target: EVICTION_TRACE_TARGET,
628                    evicted_id = %evicted_id,
629                    reason = "max_entries_reached",
630                    max_entries = max_entries,
631                    "hnsw index evicting oldest entry: cap reached"
632                );
633                if let Some(sink) = sink_guard.as_ref().and_then(|g| g.as_ref()) {
634                    // mpsc::Sender::send is non-blocking on an unbounded
635                    // channel (it only blocks on bounded). Errors mean the
636                    // receiver dropped — observability is best-effort, no
637                    // recovery action needed.
638                    let payload = EvictionEvent::new(
639                        evicted_id.clone(),
640                        String::new(), // namespace not in scope at hnsw layer
641                        "max_entries_reached",
642                    );
643                    let _ = sink.send(payload);
644                }
645            }
646            drop(sink_guard);
647            #[allow(clippy::cast_possible_truncation)]
648            let evicted = excess as u64;
649            // pm-v3.1 PR8 (issue #1174): counter sink moved to the
650            // metrics registry. We defer the actual `record_hnsw_eviction`
651            // call until `now_nanos_u64` is computed below so the
652            // counter and last-eviction timestamp move in lockstep.
653            let evicted_count_to_record = evicted;
654
655            state.all_entries.drain(..excess);
656            // v0.7.0 #1087 — invalidate cached valid_ids set after the
657            // eviction drain.
658            state.valid_ids_cache = None;
659            // #968 — defer the post-eviction graph rebuild to the async
660            // path. Correctness is preserved by the `valid_ids` filter
661            // in `search()` — evicted IDs are scrubbed from results
662            // immediately, even though the underlying HNSW graph still
663            // contains them until the next swap. Clearing `overflow`
664            // here was the v0.6 behavior tied to the synchronous
665            // rebuild; we preserve it so the linear-scan path doesn't
666            // re-surface evicted IDs. The next `insert()` past
667            // `REBUILD_THRESHOLD` (or an explicit `rebuild_async()`
668            // call) schedules the actual graph rebuild off-thread.
669            state.overflow.clear();
670            // v0.7.0 #1074 (SR-2 #2, HIGH) — bump the generation
671            // counter on every overflow.clear(). Any in-flight rebuild
672            // snapshot captured BEFORE this bump now carries a stale
673            // generation; the swap path detects the mismatch and
674            // drops the warming result without draining overflow,
675            // preventing the lose-an-insert race where an entry
676            // landed between snapshot and clear() was incorrectly
677            // drained by the eventual swap.
678            state.overflow_generation = state.overflow_generation.wrapping_add(1);
679            // Schedule the rebuild via the async path so the eviction
680            // edge no longer blocks the writer for the multi-second
681            // `build_hnsw` cost at 100k cap. The CAS skips if a
682            // previously-scheduled rebuild is still in flight; the
683            // next insert past threshold picks up the post-eviction
684            // state.
685            if self
686                .rebuild_in_flight
687                .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
688                .is_ok()
689            {
690                let snap = RebuildSnapshot {
691                    entries: state.all_entries.clone(),
692                    // overflow was just cleared above so the snapshot
693                    // captures an empty overflow window — anything
694                    // inserted post-eviction will be a fresh suffix.
695                    overflow_at_snapshot: state.overflow.len(),
696                    overflow_generation: state.overflow_generation,
697                };
698                // Release the inner lock before spawning so the
699                // background thread can take it on swap. The
700                // observability path below only reads counters /
701                // statics, not `state`, so we do not need to
702                // re-acquire.
703                drop(state);
704                let _ = self.spawn_rebuild(snap);
705            }
706
707            // Record completion time AFTER the rebuild. `evicted_recently` is
708            // a "did we evict in the trailing N seconds" check; an operator
709            // asking that wants the operation completion time, not the
710            // start. At v0.6 the in-line `build_hnsw` dominated wall time
711            // here (~minutes at 100k entries) — using the start would
712            // make evicted_recently misreport even immediately after
713            // insert returns. Post-#968 the build runs off-thread so
714            // the gap shrinks to microseconds, but the
715            // completion-time-after-rebuild semantics are preserved.
716            let now_nanos = std::time::SystemTime::now()
717                .duration_since(std::time::UNIX_EPOCH)
718                .map(|d| d.as_nanos())
719                .unwrap_or(0);
720            let now_nanos_u64 = u64::try_from(now_nanos).unwrap_or(u64::MAX);
721            // pm-v3.1 PR8 (issue #1174): single sink call covers both
722            // the cumulative counter and the last-eviction timestamp,
723            // mirroring both onto the Prometheus handles so `/metrics`
724            // scrapes see the eviction event without polling
725            // `memory_stats`.
726            crate::metrics::record_hnsw_eviction(evicted_count_to_record, now_nanos_u64);
727
728            // M8 (v0.7.0 round-2) — rolling-hour rate observer. Push
729            // a stamp on this eviction, then count stamps in the
730            // trailing hour. If the rate clears the M8 ceiling,
731            // escalate to ERROR so the dashboard pages the on-call.
732            let recent = record_eviction_and_count_recent(now_nanos_u64);
733            if recent > EVICTION_RATE_CEILING_PER_HOUR {
734                tracing::error!(
735                    target: EVICTION_TRACE_TARGET,
736                    rate_per_hour = recent,
737                    ceiling = EVICTION_RATE_CEILING_PER_HOUR,
738                    "HNSW eviction rate exceeded {}/hour — recall quality is degrading; \
739                     increase vector_index_capacity or move to dedicated vector DB",
740                    EVICTION_RATE_CEILING_PER_HOUR,
741                );
742            }
743        }
744    }
745
746    /// Remove an entry by ID (marks for exclusion; cleaned up on rebuild).
747    pub fn remove(&self, id: &str) {
748        let mut state = match self.inner.lock() {
749            Ok(s) => s,
750            Err(poisoned) => poisoned.into_inner(),
751        };
752        state.all_entries.retain(|(eid, _)| eid != id);
753        state.overflow.retain(|(eid, _)| eid != id);
754        // v0.7.0 #1087 — invalidate cached valid_ids set after remove.
755        state.valid_ids_cache = None;
756        // Note: the HNSW index itself is immutable — removed IDs are filtered
757        // from search results. A rebuild will fully remove them.
758    }
759
760    /// Search for the `k` nearest neighbors to the query embedding.
761    ///
762    /// Combines HNSW approximate search with linear scan of overflow entries.
763    /// Returns results sorted by ascending distance (closest first).
764    pub fn search(&self, query: &[f32], k: usize) -> Vec<VectorHit> {
765        // #968 — opportunistic swap-on-read. If a background rebuild has
766        // parked a warmed graph in the `warming` slot, swap it into
767        // `active` BEFORE we serve this search. The swap is a single
768        // `std::mem::swap` under the inner mutex held for microseconds;
769        // search itself never blocks on graph construction.
770        self.try_swap_warming();
771
772        let mut state = match self.inner.lock() {
773            Ok(s) => s,
774            Err(poisoned) => poisoned.into_inner(),
775        };
776        let query_point = EmbeddingPoint(query.to_vec());
777
778        let mut results: Vec<VectorHit> = Vec::with_capacity(k * 2);
779
780        // v0.7.0 #1087 — populate the cached valid_ids set on the
781        // first search after any mutation; reuse it across recalls.
782        // Pre-#1087 this set was rebuilt on EVERY recall (iterating
783        // up to 100k strings + a fresh HashSet allocation per call).
784        if state.valid_ids_cache.is_none() {
785            // PERF-7 — collect to HashSet<Arc<str>> instead of
786            // HashSet<String>. Arc<str> is a 16-byte fat pointer
787            // vs String's 24-byte (ptr/len/cap) struct; the
788            // backing bytes are shared with no spare capacity
789            // overhead. HashSet::contains accepts `&str` via the
790            // `Borrow<str>` impl on `Arc<str>`.
791            let set: std::collections::HashSet<std::sync::Arc<str>> = state
792                .all_entries
793                .iter()
794                .map(|(id, _)| std::sync::Arc::<str>::from(id.as_str()))
795                .collect();
796            state.valid_ids_cache = Some(set);
797        }
798        let valid_ids = state
799            .valid_ids_cache
800            .as_ref()
801            .expect("valid_ids_cache populated above");
802
803        // Search the HNSW index
804        if let Some(ref hnsw) = state.hnsw {
805            let mut search = Search::default();
806            for item in hnsw.search(&query_point, &mut search) {
807                if !valid_ids.contains(item.value.as_str()) {
808                    continue; // Removed entry
809                }
810                results.push(VectorHit {
811                    id: item.value.clone(),
812                    distance: item.distance,
813                });
814                if results.len() >= k * 2 {
815                    break;
816                }
817            }
818        }
819
820        // v0.7.0 #1087 — linear scan of overflow entries WITHOUT
821        // cloning the embedding vec. Pre-#1087 this constructed
822        // `EmbeddingPoint(emb.clone())` per overflow entry (~200 ×
823        // 1536 bytes = 300 KB of clone per search at the cap); the
824        // cosine-distance helper takes `&[f32]` so we inline against
825        // the stored slice instead.
826        let mut overflow_hits: Vec<VectorHit> = Vec::with_capacity(state.overflow.len());
827        for (id, emb) in &state.overflow {
828            overflow_hits.push(VectorHit {
829                id: id.clone(),
830                distance: cosine_distance(&query_point.0, emb),
831            });
832        }
833        // #1684 — total_cmp is total over f32 (no panic even if a NaN slips
834        // past the cosine_distance floor); deterministic ordering.
835        overflow_hits.sort_by(|a, b| a.distance.total_cmp(&b.distance));
836
837        results.extend(overflow_hits);
838
839        // Deduplicate by ID (prefer lower distance)
840        let mut seen = std::collections::HashSet::new();
841        results.retain(|hit| seen.insert(hit.id.clone()));
842
843        // Sort by distance and truncate (#1684 — total_cmp, panic-free)
844        results.sort_by(|a, b| a.distance.total_cmp(&b.distance));
845        results.truncate(k);
846        results
847    }
848
849    /// Return the total number of indexed entries (HNSW + overflow).
850    pub fn len(&self) -> usize {
851        let state = match self.inner.lock() {
852            Ok(s) => s,
853            Err(poisoned) => poisoned.into_inner(),
854        };
855        state.all_entries.len()
856    }
857
858    /// `true` when the index holds no live entries at all.
859    ///
860    /// #1579 QC — load-bearing for the proactive-conflict dispatch:
861    /// an EMPTY index is *vacuously* [`Self::is_fully_searchable`]
862    /// (`0 + 0 >= 0`), but during the async-boot LOAD phase — after
863    /// the daemon binds with `VectorIndex::empty()` and before the
864    /// boot loader's `seed_entries` lands (the `get_all_embeddings`
865    /// read is the long pole at 100k rows) — emptiness says nothing
866    /// about what the DB holds. Callers that would otherwise trust a
867    /// fully-searchable index (the #519 conflict check) must ALSO
868    /// require non-emptiness, so that window routes to the bounded
869    /// recency-scan fallback instead of consulting an index that
870    /// cannot return anything.
871    pub fn is_empty(&self) -> bool {
872        self.len() == 0
873    }
874
875    /// #968 — Force a full rebuild of the HNSW index from all entries,
876    /// SYNCHRONOUSLY. Preserved for tests + emergency paths; production
877    /// code should call [`Self::rebuild_async`] so the multi-second
878    /// graph build does not block the calling thread.
879    ///
880    /// Implementation: delegates to `rebuild_async` and `join`s the
881    /// resulting handle so callers retain the v0.6 semantics ("the
882    /// graph is rebuilt by the time this returns"). Tests rely on this
883    /// blocking behavior to assert post-rebuild invariants without
884    /// adding a yield/poll loop.
885    pub fn rebuild(&self) {
886        // #1037 (MEDIUM, 2026-05-21): defend the sync-rebuild contract
887        // against the `rebuild_async()` no-op-handle short-circuit.
888        // Pre-#1037 if a previous async rebuild was still in flight
889        // (`rebuild_in_flight==true`), `rebuild_async` returned a
890        // no-op `std::thread::spawn(|| {})` handle that joined
891        // instantly — `try_swap_warming()` then ran against a warming
892        // slot that the IN-FLIGHT build hadn't populated yet, so the
893        // sync contract ("graph is rebuilt by the time this returns")
894        // was silently violated. The caller observed the pre-rebuild
895        // state.
896        //
897        // Fix: after the initial `join()`, spin-wait on
898        // `rebuild_in_flight` for up to REBUILD_WAIT_TIMEOUT so the
899        // in-flight build has a bounded window to complete its
900        // warming-slot insert. Then run `try_swap_warming()`. If the
901        // in-flight build genuinely hangs (test-fixture corner case),
902        // surface that as a clean timeout rather than silently
903        // returning a stale graph.
904        let handle = self.rebuild_async();
905        let _ = handle.join();
906        // Bounded spin-wait for any concurrently-running rebuild to
907        // populate `warming`. Cheap CAS read; total budget is
908        // REBUILD_WAIT_TIMEOUT * REBUILD_WAIT_POLL_INTERVAL =
909        // ~10ms × 100 = 1 second worst-case (well under any sensible
910        // sync-rebuild expectation; production callers are async).
911        let start = std::time::Instant::now();
912        while self.rebuild_in_flight.load(Ordering::SeqCst)
913            && start.elapsed() < REBUILD_WAIT_TIMEOUT
914        {
915            std::thread::sleep(REBUILD_WAIT_POLL_INTERVAL);
916        }
917        self.try_swap_warming();
918    }
919
920    /// #968 — Schedule a full HNSW rebuild on a background thread and
921    /// return the [`JoinHandle`] for callers that want to observe
922    /// completion. The build does NOT hold the inner mutex; readers
923    /// and writers continue to operate against `active` + `overflow`
924    /// while the new graph warms up. On success, the warmed graph
925    /// lands in the `warming` slot and is swapped into `active` by
926    /// the next reader/writer (or by the foreground `rebuild` shim's
927    /// post-join `try_swap_warming` call).
928    ///
929    /// Concurrency contract:
930    /// - At most one rebuild runs at a time (gated by the
931    ///   `rebuild_in_flight` atomic). A second `rebuild_async` call
932    ///   while a build is in flight returns a no-op handle (the
933    ///   spawned closure short-circuits if the CAS fails — the in-
934    ///   flight build will pick up the latest entries via the next
935    ///   trigger).
936    /// - Writes during the build flow into `overflow` and
937    ///   `all_entries` normally. The swap path uses the snapshot
938    ///   length captured at spawn time to trim only the overflow
939    ///   entries that are now in the new graph; entries inserted
940    ///   AFTER the snapshot remain in overflow for the next cycle.
941    /// - Search is unaffected: it reads `active` + `overflow` under
942    ///   the inner mutex, both of which remain coherent throughout.
943    ///
944    /// Failure: a panic inside the build thread is observable via
945    /// `JoinHandle::join()`; `active` is unchanged. The
946    /// `rebuild_in_flight` flag is cleared by the `RebuildGuard`
947    /// drop-guard whether the build succeeded or panicked.
948    pub fn rebuild_async(&self) -> JoinHandle<()> {
949        // Snapshot under the inner lock so we capture a consistent
950        // entries list. Read-only; we do not mutate `all_entries`
951        // here. If a rebuild is already in flight, return a no-op
952        // handle (a thread that joins instantly).
953        let snapshot = {
954            let state = match self.inner.lock() {
955                Ok(s) => s,
956                Err(poisoned) => poisoned.into_inner(),
957            };
958            if self
959                .rebuild_in_flight
960                .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
961                .is_err()
962            {
963                // Already running — return an instantly-completing
964                // handle. The caller's `join()` returns `Ok(())`.
965                return std::thread::spawn(|| {});
966            }
967            RebuildSnapshot {
968                entries: state.all_entries.clone(),
969                overflow_at_snapshot: state.overflow.len(),
970                overflow_generation: state.overflow_generation,
971            }
972        };
973        self.spawn_rebuild(snapshot)
974    }
975
976    /// #968 — internal: spawn the rebuild thread for a captured
977    /// snapshot. The caller is expected to have flipped
978    /// `rebuild_in_flight` to `true` already (via CAS). The drop-guard
979    /// inside the thread clears the flag whether the build succeeds
980    /// or panics.
981    fn spawn_rebuild(&self, snapshot: RebuildSnapshot) -> JoinHandle<()> {
982        let warming = Arc::clone(&self.warming);
983        let in_flight = Arc::clone(&self.rebuild_in_flight);
984        std::thread::spawn(move || {
985            // RAII clears `rebuild_in_flight` even on panic.
986            let _guard = RebuildGuard { flag: in_flight };
987            // CPU-bound graph build runs OUTSIDE the inner mutex.
988            // This is the load-bearing change for #968: readers and
989            // writers continue to make progress while this runs.
990            let hnsw = VectorIndex::build_hnsw(&snapshot.entries);
991            let result = RebuildResult {
992                hnsw,
993                overflow_at_snapshot: snapshot.overflow_at_snapshot,
994                overflow_generation: snapshot.overflow_generation,
995                entries_in_graph: snapshot.entries.len(),
996            };
997            // Park the result in the warming slot. The next caller
998            // through `try_swap_warming` will move it into `active`.
999            // Holding the warming mutex here is microseconds.
1000            if let Ok(mut slot) = warming.lock() {
1001                // Overwrite any older warmed result that was never
1002                // swapped (e.g. two rebuilds completed before any
1003                // reader ran). The newer build is by definition a
1004                // superset of the older one's entries, so dropping
1005                // the older result is correct.
1006                *slot = Some(result);
1007            }
1008        })
1009    }
1010
1011    /// #968 — Swap the warming slot into active if a warmed graph is
1012    /// ready. Called opportunistically from `search`, `insert`, and
1013    /// the post-join path of the sync `rebuild` shim. The swap holds
1014    /// the inner mutex for microseconds — just long enough to
1015    /// `std::mem::replace` the graph and trim the overflow.
1016    ///
1017    /// Returns `true` if a swap occurred, `false` otherwise. Test
1018    /// code uses the return value to verify the swap landed before
1019    /// asserting post-rebuild state.
1020    pub fn try_swap_warming(&self) -> bool {
1021        // Pop the warmed result FIRST so we hold the warming mutex
1022        // only long enough to take ownership. We then re-acquire the
1023        // inner mutex to swap it in. The two-mutex sequence is safe
1024        // (no ordering hazard with any other path that takes both).
1025        let Some(result) = self.warming.lock().ok().and_then(|mut g| g.take()) else {
1026            return false;
1027        };
1028        let mut state = match self.inner.lock() {
1029            Ok(s) => s,
1030            Err(poisoned) => poisoned.into_inner(),
1031        };
1032        // v0.7.0 #1074 (SR-2 #2, HIGH) — generation check. If the
1033        // overflow generation has bumped since the rebuild captured
1034        // its snapshot, the warming graph was built from pre-eviction
1035        // all_entries and the current overflow contains post-eviction
1036        // inserts that are NOT in that graph. Drop the warming result
1037        // entirely without swapping (the next rebuild captures the
1038        // current state cleanly). Pre-#1074 the swap would have
1039        // overwritten the live graph with a stale one AND drained
1040        // the post-eviction inserts from overflow — silently losing
1041        // them until the next rebuild.
1042        if result.overflow_generation != state.overflow_generation {
1043            tracing::warn!(
1044                target: "hnsw.rebuild",
1045                snapshot_gen = result.overflow_generation,
1046                current_gen = state.overflow_generation,
1047                "dropping stale warming result (eviction occurred mid-rebuild, #1074)"
1048            );
1049            return false;
1050        }
1051        state.hnsw = result.hnsw;
1052        // #1579 — coverage accounting for `is_fully_searchable`.
1053        state.graph_entry_count = result.entries_in_graph;
1054        // Trim overflow: the first `overflow_at_snapshot` entries are
1055        // now in the graph; entries inserted AFTER the snapshot
1056        // remain. Defensive `min` in case `remove` or eviction
1057        // shortened overflow while the build was running.
1058        let to_drain = result.overflow_at_snapshot.min(state.overflow.len());
1059        state.overflow.drain(..to_drain);
1060        true
1061    }
1062
1063    /// #1579 — `true` when a search against this index can observe
1064    /// every live entry: the active graph (its build-time snapshot
1065    /// length) plus the linearly-scanned `overflow` cover
1066    /// `all_entries`. `false` exactly during the async-boot warm
1067    /// window, when [`Self::seed_entries`] has parked DB-loaded
1068    /// entries in `all_entries` but the background graph build has
1069    /// not swapped in yet — sequenced writes flow through `insert()`
1070    /// (graph- or overflow-visible) so they never break coverage, and
1071    /// removals/evictions only shrink `all_entries` (stale graph ids
1072    /// are filtered at search time), so the inequality is conservative
1073    /// in the safe direction.
1074    ///
1075    /// Consumers: the #519 proactive conflict check routes to its
1076    /// bounded-scan fallback while this is `false`; the boot loader
1077    /// uses it to decide whether a make-up rebuild is needed after a
1078    /// racing routine rebuild swallowed its CAS.
1079    pub fn is_fully_searchable(&self) -> bool {
1080        // Opportunistically land any warmed graph first so a caller
1081        // probing right after a rebuild finished sees the swapped
1082        // state (mirrors `search`/`insert`).
1083        self.try_swap_warming();
1084        let state = match self.inner.lock() {
1085            Ok(s) => s,
1086            Err(poisoned) => poisoned.into_inner(),
1087        };
1088        state.graph_entry_count + state.overflow.len() >= state.all_entries.len()
1089    }
1090
1091    /// #1579 B3 — bulk-load DB-resident entries into the index
1092    /// WITHOUT building the graph (the async-boot path). Entries land
1093    /// in `all_entries` only; they become searchable when the
1094    /// follow-up rebuild (see [`Self::seed_and_rebuild_async`]) swaps
1095    /// its graph in. Ids already present (e.g. a row written through
1096    /// `insert()` between the caller's DB snapshot and this call) are
1097    /// skipped so the index never double-counts. Returns the number of
1098    /// entries actually seeded.
1099    ///
1100    /// Deliberately does NOT enforce `max_entries` eviction here —
1101    /// the legacy synchronous boot path (`VectorIndex::build` over
1102    /// `get_all_embeddings`) never evicted at boot either, and the
1103    /// first post-boot `insert()` applies the cap exactly as before.
1104    pub fn seed_entries(&self, entries: Vec<(String, Vec<f32>)>) -> usize {
1105        let mut state = match self.inner.lock() {
1106            Ok(s) => s,
1107            Err(poisoned) => poisoned.into_inner(),
1108        };
1109        let existing: std::collections::HashSet<std::sync::Arc<str>> = state
1110            .all_entries
1111            .iter()
1112            .map(|(id, _)| std::sync::Arc::<str>::from(id.as_str()))
1113            .collect();
1114        let mut seeded = 0usize;
1115        for (id, emb) in entries {
1116            if existing.contains(id.as_str()) {
1117                continue;
1118            }
1119            state.all_entries.push((id, emb));
1120            seeded += 1;
1121        }
1122        if seeded > 0 {
1123            state.valid_ids_cache = None;
1124        }
1125        seeded
1126    }
1127
1128    /// #1579 B3 — async-boot warm-up: seed DB-loaded entries (see
1129    /// [`Self::seed_entries`]) and schedule the graph build on the
1130    /// existing #968 double-buffer rebuild machinery. Returns the
1131    /// rebuild thread's [`JoinHandle`]; the caller (the boot loader)
1132    /// joins it off the request path and then calls
1133    /// [`Self::try_swap_warming`] + emits the operator-visible
1134    /// "index warm" line.
1135    ///
1136    /// If a routine rebuild is already in flight (its snapshot
1137    /// predates the seed), `rebuild_async` returns a no-op handle and
1138    /// the seeded entries stay graph-invisible until a later rebuild;
1139    /// the boot loader detects that via [`Self::is_fully_searchable`]
1140    /// and issues a make-up `rebuild`.
1141    pub fn seed_and_rebuild_async(&self, entries: Vec<(String, Vec<f32>)>) -> JoinHandle<()> {
1142        self.seed_entries(entries);
1143        self.rebuild_async()
1144    }
1145
1146    /// #1579 B3 — blocking boot warm-up for callers that hold the
1147    /// index directly (the MCP stdio boot thread; tests). Seeds the
1148    /// DB-loaded entries and drives rebuild→swap to completion,
1149    /// returning the number of entries seeded. Each step takes the
1150    /// inner mutex only briefly (the graph build itself runs on the
1151    /// #968 background thread against a snapshot), so concurrent
1152    /// readers/writers on other threads keep making progress — the
1153    /// CALLING thread is the only one parked.
1154    ///
1155    /// The retry loop covers the rebuild-CAS race: if a routine
1156    /// 200-overflow rebuild was already in flight when our seed
1157    /// landed, `rebuild_async` short-circuits to a no-op handle and
1158    /// the in-flight build's pre-seed snapshot cannot cover the
1159    /// seeded rows — `is_fully_searchable` stays `false` and the loop
1160    /// schedules a make-up rebuild once the CAS frees up.
1161    pub fn warm_boot(&self, entries: Vec<(String, Vec<f32>)>) -> usize {
1162        let seeded = self.seed_entries(entries);
1163        loop {
1164            let handle = self.rebuild_async();
1165            let _ = handle.join();
1166            // `is_fully_searchable` opportunistically swaps any warmed
1167            // graph before evaluating coverage.
1168            if self.is_fully_searchable() {
1169                return seeded;
1170            }
1171            std::thread::sleep(REBUILD_WAIT_POLL_INTERVAL);
1172        }
1173    }
1174}
1175
1176#[cfg(test)]
1177mod tests {
1178    use super::*;
1179
1180    fn make_embedding(values: &[f32]) -> Vec<f32> {
1181        // L2-normalize
1182        let norm: f32 = values.iter().map(|v| v * v).sum::<f32>().sqrt();
1183        values.iter().map(|v| v / norm).collect()
1184    }
1185
1186    #[test]
1187    fn empty_index_returns_empty() {
1188        let idx = VectorIndex::empty();
1189        let results = idx.search(&[1.0, 0.0, 0.0], 10);
1190        assert!(results.is_empty());
1191    }
1192
1193    #[test]
1194    fn issue_1684_nan_vector_does_not_panic_and_ranks_last() {
1195        // Pre-#1684: a NaN distance made `partial_cmp` return `None` and
1196        // `.unwrap()` panicked inside `sort_by`, aborting the single-threaded
1197        // MCP process. Post-fix the poison vector ranks last and search returns.
1198        let idx = VectorIndex::empty();
1199        idx.insert("good".into(), make_embedding(&[1.0, 0.0, 0.0]));
1200        idx.insert("poison".into(), vec![f32::NAN, 0.0, 0.0]);
1201        let hits = idx.search(&make_embedding(&[1.0, 0.0, 0.0]), 5);
1202        let good = hits.iter().position(|h| h.id == "good");
1203        assert!(good.is_some(), "good vector must be returned");
1204        if let Some(poison) = hits.iter().position(|h| h.id == "poison") {
1205            assert!(
1206                good.unwrap() < poison,
1207                "non-finite-distance poison vector must rank after the good vector"
1208            );
1209        }
1210    }
1211
1212    #[test]
1213    fn issue_1684_cosine_distance_non_finite_collapses_to_max() {
1214        assert_eq!(cosine_distance(&[f32::NAN, 1.0], &[1.0, 1.0]), f32::MAX);
1215        assert_eq!(
1216            cosine_distance(&[f32::INFINITY, 0.0], &[1.0, 0.0]),
1217            f32::MAX
1218        );
1219        assert!(cosine_distance(&[1.0, 0.0], &[1.0, 0.0]).is_finite());
1220    }
1221
1222    #[test]
1223    fn perf_7_valid_ids_cache_is_arc_str_typed() {
1224        // PERF-7 (FX-C4-batch2, 2026-05-26) — the valid_ids cache
1225        // must be HashSet<Arc<str>>, not HashSet<String>. The
1226        // discriminator is the cache field's type, which we exercise
1227        // by populating the index, searching once to materialise the
1228        // cache, then reaching into the locked state to verify the
1229        // cache element shape via downcast on a sample entry.
1230        let entries = vec![
1231            ("id1".to_string(), make_embedding(&[1.0, 0.0, 0.0])),
1232            ("id2".to_string(), make_embedding(&[0.0, 1.0, 0.0])),
1233        ];
1234        let idx = VectorIndex::build(entries);
1235        // Trigger one search so the cache populates.
1236        let _ = idx.search(&[1.0, 0.0, 0.0], 2);
1237        let state = idx.inner.lock().expect("lock state");
1238        let cache = state
1239            .valid_ids_cache
1240            .as_ref()
1241            .expect("valid_ids_cache populated after search");
1242        assert!(
1243            cache.contains("id1") && cache.contains("id2"),
1244            "PERF-7: Arc<str> cache failed to admit &str lookup",
1245        );
1246        // Type system pins the field type — if a future refactor
1247        // changes the type back to HashSet<String>, this `Arc::clone`
1248        // line fails to compile.
1249        let sample: Option<&std::sync::Arc<str>> = cache.iter().next();
1250        assert!(sample.is_some(), "PERF-7: cache must hold Arc<str> entries");
1251    }
1252
1253    #[test]
1254    fn basic_search() {
1255        let entries = vec![
1256            ("a".into(), make_embedding(&[1.0, 0.0, 0.0])),
1257            ("b".into(), make_embedding(&[0.0, 1.0, 0.0])),
1258            ("c".into(), make_embedding(&[0.0, 0.0, 1.0])),
1259        ];
1260        let idx = VectorIndex::build(entries);
1261        let results = idx.search(&make_embedding(&[1.0, 0.1, 0.0]), 2);
1262        assert_eq!(results.len(), 2);
1263        assert_eq!(results[0].id, "a"); // Closest to [1, 0.1, 0]
1264    }
1265
1266    #[test]
1267    fn insert_and_search_overflow() {
1268        let entries = vec![("a".into(), make_embedding(&[1.0, 0.0, 0.0]))];
1269        let idx = VectorIndex::build(entries);
1270        idx.insert("b".into(), make_embedding(&[0.9, 0.1, 0.0]));
1271        let results = idx.search(&make_embedding(&[1.0, 0.0, 0.0]), 2);
1272        assert_eq!(results.len(), 2);
1273        assert_eq!(results[0].id, "a");
1274        assert_eq!(results[1].id, "b");
1275    }
1276
1277    #[test]
1278    fn remove_excludes_from_results() {
1279        let entries = vec![
1280            ("a".into(), make_embedding(&[1.0, 0.0, 0.0])),
1281            ("b".into(), make_embedding(&[0.9, 0.1, 0.0])),
1282        ];
1283        let idx = VectorIndex::build(entries);
1284        idx.remove("a");
1285        let results = idx.search(&make_embedding(&[1.0, 0.0, 0.0]), 5);
1286        assert!(results.iter().all(|h| h.id != "a"));
1287    }
1288
1289    // -----------------------------------------------------------------
1290    // W11/S11b — rebuild + batched-insert hardening
1291    // -----------------------------------------------------------------
1292
1293    #[test]
1294    fn test_rebuild_preserves_all_entries() {
1295        // Build a small but non-trivial set of orthonormal-ish vectors,
1296        // rebuild the index, and confirm every id is still findable via
1297        // search with a top-k that covers them all.
1298        let raw: Vec<(String, Vec<f32>)> = (0..12)
1299            .map(|i| {
1300                let mut v = vec![0.0_f32; 16];
1301                #[allow(clippy::cast_precision_loss)]
1302                let f = i as f32;
1303                v[i % 16] = 1.0 + f * 0.01; // bias to make L2 norm non-trivial
1304                (format!("id-{i}"), make_embedding(&v))
1305            })
1306            .collect();
1307
1308        let idx = VectorIndex::build(raw.clone());
1309        idx.rebuild();
1310        assert_eq!(idx.len(), raw.len());
1311
1312        // Every id should appear when we ask for top-N where N >= count.
1313        let query = make_embedding(&[1.0; 16]);
1314        let hits = idx.search(&query, raw.len() * 2);
1315        let found: std::collections::HashSet<String> = hits.into_iter().map(|h| h.id).collect();
1316        for (id, _) in &raw {
1317            assert!(
1318                found.contains(id),
1319                "rebuild must preserve id {id}, found: {:?}",
1320                found
1321            );
1322        }
1323    }
1324
1325    #[test]
1326    fn test_remove_then_search_excludes_id() {
1327        let entries = vec![
1328            ("alpha".into(), make_embedding(&[1.0, 0.0, 0.0, 0.0])),
1329            ("beta".into(), make_embedding(&[0.9, 0.1, 0.0, 0.0])),
1330            ("gamma".into(), make_embedding(&[0.8, 0.2, 0.0, 0.0])),
1331        ];
1332        let idx = VectorIndex::build(entries);
1333        // Pre-remove: alpha should be the closest to (1,0,0,0).
1334        let pre = idx.search(&make_embedding(&[1.0, 0.0, 0.0, 0.0]), 5);
1335        assert!(pre.iter().any(|h| h.id == "alpha"));
1336
1337        idx.remove("alpha");
1338        // Post-remove: alpha must not appear regardless of k.
1339        for k in 1..=10 {
1340            let hits = idx.search(&make_embedding(&[1.0, 0.0, 0.0, 0.0]), k);
1341            assert!(
1342                hits.iter().all(|h| h.id != "alpha"),
1343                "removed id `alpha` resurfaced with k={k}: {:?}",
1344                hits.iter().map(|h| &h.id).collect::<Vec<_>>()
1345            );
1346        }
1347
1348        // Other entries still findable.
1349        let hits = idx.search(&make_embedding(&[1.0, 0.0, 0.0, 0.0]), 5);
1350        let ids: Vec<&str> = hits.iter().map(|h| h.id.as_str()).collect();
1351        assert!(ids.contains(&"beta"));
1352        assert!(ids.contains(&"gamma"));
1353    }
1354
1355    // -----------------------------------------------------------------
1356    // W12-H — small edge cases
1357    // -----------------------------------------------------------------
1358
1359    #[test]
1360    fn empty_index_len_is_zero() {
1361        let idx = VectorIndex::empty();
1362        assert_eq!(idx.len(), 0);
1363    }
1364
1365    #[test]
1366    fn build_with_empty_entries_search_empty() {
1367        let idx = VectorIndex::build(Vec::new());
1368        assert_eq!(idx.len(), 0);
1369        let results = idx.search(&[1.0, 0.0, 0.0], 5);
1370        assert!(results.is_empty());
1371    }
1372
1373    #[test]
1374    fn search_with_k_zero_returns_empty() {
1375        let entries = vec![("a".into(), make_embedding(&[1.0, 0.0, 0.0]))];
1376        let idx = VectorIndex::build(entries);
1377        let results = idx.search(&make_embedding(&[1.0, 0.0, 0.0]), 0);
1378        assert!(results.is_empty());
1379    }
1380
1381    #[test]
1382    fn rebuild_on_empty_does_not_crash() {
1383        let idx = VectorIndex::empty();
1384        idx.rebuild();
1385        assert_eq!(idx.len(), 0);
1386    }
1387
1388    #[test]
1389    fn insert_increases_len() {
1390        let idx = VectorIndex::empty();
1391        idx.insert("a".into(), make_embedding(&[1.0, 0.0, 0.0]));
1392        idx.insert("b".into(), make_embedding(&[0.0, 1.0, 0.0]));
1393        assert_eq!(idx.len(), 2);
1394    }
1395
1396    #[test]
1397    fn embedding_point_distance_orthogonal() {
1398        let a = EmbeddingPoint(vec![1.0, 0.0, 0.0]);
1399        let b = EmbeddingPoint(vec![0.0, 1.0, 0.0]);
1400        // 1 - dot = 1 - 0 = 1
1401        assert!((a.distance(&b) - 1.0).abs() < 1e-6);
1402    }
1403
1404    #[test]
1405    fn embedding_point_distance_identical_is_zero() {
1406        let a = EmbeddingPoint(make_embedding(&[1.0, 1.0, 1.0]));
1407        // 1 - 1 = 0 (L2-normalised)
1408        assert!(a.distance(&a).abs() < 1e-6);
1409    }
1410
1411    #[test]
1412    fn remove_on_empty_index_is_noop() {
1413        let idx = VectorIndex::empty();
1414        idx.remove("nonexistent");
1415        assert_eq!(idx.len(), 0);
1416    }
1417
1418    #[test]
1419    fn insert_triggers_auto_rebuild_at_threshold() {
1420        // REBUILD_THRESHOLD = 200. Inserting that many into a fresh index
1421        // exercises the auto-rebuild branch in `insert`.
1422        let idx = VectorIndex::empty();
1423        for i in 0..205_usize {
1424            let mut v = vec![0.0_f32; 8];
1425            #[allow(clippy::cast_precision_loss)]
1426            let f = i as f32;
1427            v[i % 8] = 1.0 + f * 0.001;
1428            idx.insert(format!("id-{i}"), make_embedding(&v));
1429        }
1430        assert_eq!(idx.len(), 205);
1431        // After auto-rebuild, search still works — top-k returns hits.
1432        let q = make_embedding(&[1.0_f32; 8]);
1433        let hits = idx.search(&q, 5);
1434        assert_eq!(hits.len(), 5);
1435    }
1436
1437    #[test]
1438    fn test_rebuild_after_batch_insert_settles() {
1439        // Start empty, batch-insert N entries, force a rebuild, then assert
1440        // that top-K search returns exactly K results (deterministic count
1441        // for a fully-populated index with K <= len).
1442        let idx = VectorIndex::empty();
1443        let n = 25_usize;
1444        for i in 0..n {
1445            let mut v = vec![0.0_f32; 8];
1446            #[allow(clippy::cast_precision_loss)]
1447            let f = i as f32;
1448            v[i % 8] = 1.0 + f * 0.001;
1449            idx.insert(format!("id-{i}"), make_embedding(&v));
1450        }
1451        // Force a rebuild — overflow may not have hit REBUILD_THRESHOLD.
1452        idx.rebuild();
1453        assert_eq!(idx.len(), n);
1454
1455        let query = make_embedding(&[1.0; 8]);
1456        let k = 5;
1457        let hits = idx.search(&query, k);
1458        assert_eq!(
1459            hits.len(),
1460            k,
1461            "post-rebuild search top-{k} must return exactly {k} hits, got {:?}",
1462            hits.iter().map(|h| &h.id).collect::<Vec<_>>()
1463        );
1464
1465        // Distances should be sorted ascending (closest first).
1466        for w in hits.windows(2) {
1467            assert!(
1468                w[0].distance <= w[1].distance,
1469                "search results must be ascending by distance: {} > {}",
1470                w[0].distance,
1471                w[1].distance
1472            );
1473        }
1474
1475        // No duplicate ids in the result.
1476        let mut seen = std::collections::HashSet::new();
1477        for h in &hits {
1478            assert!(
1479                seen.insert(h.id.clone()),
1480                "duplicate id in search: {}",
1481                h.id
1482            );
1483        }
1484    }
1485
1486    // -----------------------------------------------------------------
1487    // v0.7.0 R3-S1 — eviction sink wires the on_index_eviction hook
1488    // -----------------------------------------------------------------
1489
1490    /// `test_hnsw_eviction_fires_hook` — when a sink is wired via
1491    /// [`VectorIndex::set_eviction_sink`] and the index inserts past
1492    /// its eviction cap, the eviction-edge code path pushes one
1493    /// [`EvictionEvent`] per evicted id onto the channel. This closes
1494    /// the G2/G8 "fire site exists but not wired" gap. We construct
1495    /// the index via [`VectorIndex::with_max_entries_for_test`] so a
1496    /// 6-entry insert sequence trips the eviction path in
1497    /// milliseconds without touching the production 100k cap.
1498    #[test]
1499    fn test_hnsw_eviction_fires_hook() {
1500        let (tx, rx) = std::sync::mpsc::channel::<EvictionEvent>();
1501        let idx = VectorIndex::with_max_entries_for_test(4);
1502        idx.set_eviction_sink(tx);
1503
1504        // Reset the process-local counters so concurrent tests
1505        // sharing the static don't bleed assertions into ours.
1506        reset_eviction_counters_for_test();
1507
1508        // Insert cap+2 entries — eviction drops the 2 oldest.
1509        let n = 6_usize;
1510        for i in 0..n {
1511            let mut v = vec![0.0_f32; 4];
1512            #[allow(clippy::cast_precision_loss)]
1513            let f = i as f32;
1514            v[i % 4] = 1.0 + f * 0.01;
1515            idx.insert(format!("evict-{i}"), make_embedding(&v));
1516        }
1517
1518        // Drain the channel. Expect TWO events (n=6, cap=4) — one
1519        // per evicted id. The unbounded sender does not block; the
1520        // events should already be enqueued by the time `insert`
1521        // returns, but we give the channel a small grace window for
1522        // thread-scheduling jitter on slow CI runners.
1523        let mut received: Vec<EvictionEvent> = Vec::new();
1524        let deadline = std::time::Instant::now() + std::time::Duration::from_millis(500);
1525        while std::time::Instant::now() < deadline && received.len() < 2 {
1526            while let Ok(ev) = rx.try_recv() {
1527                received.push(ev);
1528            }
1529            if received.len() < 2 {
1530                std::thread::sleep(std::time::Duration::from_millis(5));
1531            }
1532        }
1533
1534        assert_eq!(
1535            received.len(),
1536            2,
1537            "expected one EvictionEvent per evicted id (2 evictions for n=6, cap=4), got {}: {:?}",
1538            received.len(),
1539            received.iter().map(|e| &e.memory_id).collect::<Vec<_>>(),
1540        );
1541
1542        let ids: Vec<&str> = received.iter().map(|e| e.memory_id.as_str()).collect();
1543        assert!(
1544            ids.contains(&"evict-0"),
1545            "expected evict-0 in evicted ids; got {ids:?}"
1546        );
1547        assert!(
1548            ids.contains(&"evict-1"),
1549            "expected evict-1 in evicted ids; got {ids:?}"
1550        );
1551
1552        for ev in &received {
1553            assert_eq!(
1554                ev.reason, "max_entries_reached",
1555                "evicted reason should match the canonical tag, got {:?}",
1556                ev.reason
1557            );
1558            // namespace is intentionally empty at the hnsw layer
1559            // (the index does not carry namespace context); G9+ may
1560            // plumb it through. The wire field MUST be present even
1561            // when empty.
1562            assert_eq!(ev.namespace, "");
1563            assert!(
1564                !ev.evicted_at.is_empty(),
1565                "evicted_at must be set (rfc3339), got empty"
1566            );
1567        }
1568    }
1569
1570    /// Sanity: insertion without a sink wired is a no-op for the
1571    /// hook path. The eviction-edge code path must remain functional
1572    /// (oldest drained, cap enforced) even when no sink is set, so
1573    /// the CLI / test build's zero-cost posture is preserved.
1574    ///
1575    /// The proof of eviction is the PER-INDEX `len()` — after 6
1576    /// inserts into a max-4 index, exactly 4 entries survive, which
1577    /// is only possible if the eviction-edge path drained the 2
1578    /// oldest. We deliberately do NOT read the process-global
1579    /// `index_evictions_total()` counter here: a sibling test
1580    /// (`test_hnsw_eviction_fires_hook`) calls
1581    /// `reset_eviction_counters_for_test()`, which zeroes that
1582    /// shared static, so a concurrent multi-threaded test run could
1583    /// collapse a global-delta assertion to a flake. `idx.len()` is
1584    /// isolated to this index instance and deterministic.
1585    #[test]
1586    fn test_hnsw_eviction_without_sink_is_noop_for_hook() {
1587        let idx = VectorIndex::with_max_entries_for_test(4);
1588        // No `set_eviction_sink` call here — the index runs as in
1589        // CLI / pre-R3-S1 builds without a hooks pipeline.
1590
1591        for i in 0..6_usize {
1592            let mut v = vec![0.0_f32; 4];
1593            #[allow(clippy::cast_precision_loss)]
1594            let f = i as f32;
1595            v[i % 4] = 1.0 + f * 0.01;
1596            idx.insert(format!("noopsink-{i}"), make_embedding(&v));
1597        }
1598
1599        assert_eq!(
1600            idx.len(),
1601            4,
1602            "eviction-edge path must still enforce the max-4 cap even \
1603             without a sink wired (6 inserts → 4 survivors means 2 \
1604             evictions occurred); got len={}",
1605            idx.len()
1606        );
1607    }
1608}
1609
1610// ---------------------------------------------------------------------------
1611// #968 (Wave-2 Tier-C3) — async-rebuild + double-buffering regression tests
1612//
1613// These tests pin the contract introduced by issue #968:
1614//   1. A rebuild scheduled via `rebuild_async` does NOT block readers.
1615//      Search calls dispatched concurrently with the build complete in
1616//      <100 ms even when the build itself runs for seconds.
1617//   2. A build-time panic leaves `active` untouched so reads continue
1618//      to serve the prior snapshot.
1619//   3. Writes that land DURING a rebuild are preserved: post-rebuild
1620//      state includes the snapshot's entries PLUS the concurrent inserts.
1621//   4. The swap is atomic: no caller ever observes a partial-state graph
1622//      (e.g. one with half the entries missing).
1623// ---------------------------------------------------------------------------
1624#[cfg(test)]
1625mod d1_968_tests {
1626    use super::*;
1627    use std::sync::Arc as TArc;
1628    use std::sync::atomic::AtomicUsize;
1629    use std::time::{Duration, Instant};
1630
1631    fn make_embedding(values: &[f32]) -> Vec<f32> {
1632        let norm: f32 = values.iter().map(|v| v * v).sum::<f32>().sqrt();
1633        values.iter().map(|v| v / norm).collect()
1634    }
1635
1636    /// Build a deterministic embedding-set fixture of `n` 16-dim
1637    /// L2-normalised vectors. Tests use this to make the `build_hnsw`
1638    /// pass non-trivial without inflating compile time.
1639    fn fixture(n: usize) -> Vec<(String, Vec<f32>)> {
1640        (0..n)
1641            .map(|i| {
1642                let mut v = vec![0.0_f32; 16];
1643                #[allow(clippy::cast_precision_loss)]
1644                let f = i as f32;
1645                v[i % 16] = 1.0 + f * 0.001;
1646                (format!("id-{i}"), make_embedding(&v))
1647            })
1648            .collect()
1649    }
1650
1651    /// #968 contract 1 — search must remain responsive while a rebuild
1652    /// runs in the background. We spawn a rebuild_async over a
1653    /// reasonably-sized fixture (the build is CPU-bound but not
1654    /// minutes-long at this scale) and concurrently issue 50 search
1655    /// calls. The reader-loop must complete well under the time it
1656    /// would take if every search had to wait on the rebuild's inner
1657    /// mutex (which it never holds for more than microseconds).
1658    #[test]
1659    fn rebuild_async_does_not_block_search_968() {
1660        let idx = TArc::new(VectorIndex::build(fixture(2_000)));
1661        let query = make_embedding(&[1.0_f32; 16]);
1662
1663        // Start the rebuild OFF-THREAD.
1664        let idx_for_rebuild = TArc::clone(&idx);
1665        let rebuild_handle = std::thread::spawn(move || idx_for_rebuild.rebuild_async());
1666        // Concurrent search loop: fire 50 searches.
1667        let idx_for_search = TArc::clone(&idx);
1668        let search_start = Instant::now();
1669        let search_handle = std::thread::spawn(move || {
1670            for _ in 0..50 {
1671                let hits = idx_for_search.search(&query, 10);
1672                // Each search must return at most 10 hits (the k cap)
1673                // and the fixture has >10 entries, so a non-empty
1674                // result is the expected output. We assert non-empty
1675                // (rather than ==10) because the swap mid-loop can
1676                // briefly leave the graph empty while overflow takes
1677                // over — both shapes are correct.
1678                assert!(
1679                    !hits.is_empty(),
1680                    "search returned empty during rebuild — readers were blocked or the graph was lost"
1681                );
1682            }
1683        });
1684        // Wait for both to finish. The rebuild thread may take seconds;
1685        // the search thread must NOT.
1686        let _ = search_handle.join().expect("search thread panicked");
1687        let search_elapsed = search_start.elapsed();
1688        // The 50-search loop with a 2k-entry index should complete in
1689        // tens of ms. We use a 5-second budget — wide enough to
1690        // absorb CI jitter, narrow enough to catch the v0.6 regression
1691        // (which would have blocked for ~3-10s on the rebuild mutex).
1692        assert!(
1693            search_elapsed < Duration::from_secs(5),
1694            "50 searches took {:?} — readers blocked on the rebuild (v0.6 regression)",
1695            search_elapsed,
1696        );
1697        let _ = rebuild_handle.join().expect("rebuild thread panicked");
1698        // Drain any pending warming into active so post-test state is clean.
1699        idx.try_swap_warming();
1700    }
1701
1702    /// #968 contract 2 — if the build fails (we cannot easily induce
1703    /// a panic from `instant_distance::Builder::build` deterministically;
1704    /// instead we exercise the rebuild_in_flight short-circuit + the
1705    /// "no warmed result" path), the active graph is unchanged.
1706    /// Concretely: we call `rebuild_async` while a prior rebuild is in
1707    /// flight; the second call returns a no-op handle and leaves
1708    /// `active` serving the prior snapshot.
1709    #[test]
1710    fn rebuild_failure_leaves_active_unchanged_968() {
1711        let entries = fixture(50);
1712        let idx = VectorIndex::build(entries.clone());
1713
1714        // Pre-rebuild: search returns expected ids.
1715        let query = make_embedding(&[1.0_f32; 16]);
1716        let pre_hits = idx.search(&query, 5);
1717        assert_eq!(pre_hits.len(), 5);
1718        let pre_ids: std::collections::HashSet<String> =
1719            pre_hits.iter().map(|h| h.id.clone()).collect();
1720
1721        // Force the in-flight flag on so the next rebuild_async takes
1722        // the short-circuit path (returns a no-op handle).
1723        idx.rebuild_in_flight.store(true, Ordering::SeqCst);
1724        let handle = idx.rebuild_async();
1725        let _ = handle.join();
1726        // Active should be UNCHANGED — no warmed graph was parked.
1727        let post_hits = idx.search(&query, 5);
1728        let post_ids: std::collections::HashSet<String> =
1729            post_hits.iter().map(|h| h.id.clone()).collect();
1730        assert_eq!(
1731            pre_ids, post_ids,
1732            "search results changed after a no-op rebuild — active was clobbered"
1733        );
1734
1735        // Cleanup: clear the manually-poked flag.
1736        idx.rebuild_in_flight.store(false, Ordering::SeqCst);
1737    }
1738
1739    /// #968 contract 3 — writes during a rebuild are preserved.
1740    /// We snapshot a baseline, kick off a rebuild_async, then issue
1741    /// N concurrent inserts. After the rebuild lands, every inserted
1742    /// id must be findable via search (either via the new graph if the
1743    /// snapshot captured it, or via the overflow if it arrived after
1744    /// the snapshot — both paths must surface the id).
1745    #[test]
1746    fn concurrent_writes_during_rebuild_consistent_968() {
1747        let idx = TArc::new(VectorIndex::build(fixture(500)));
1748        let handle = {
1749            let idx = TArc::clone(&idx);
1750            std::thread::spawn(move || idx.rebuild_async())
1751        };
1752
1753        // Issue 30 concurrent inserts. These flow into overflow +
1754        // all_entries; the snapshot already captured at rebuild_async
1755        // time has its own entry list, so these inserts will remain
1756        // in overflow until the NEXT rebuild — but the swap path
1757        // trims only the overflow PREFIX present at snapshot time, so
1758        // the new entries survive.
1759        let inserts_done = TArc::new(AtomicUsize::new(0));
1760        let mut writer_handles = Vec::new();
1761        for i in 0..30 {
1762            let idx = TArc::clone(&idx);
1763            let counter = TArc::clone(&inserts_done);
1764            writer_handles.push(std::thread::spawn(move || {
1765                let mut v = vec![0.0_f32; 16];
1766                #[allow(clippy::cast_precision_loss)]
1767                let f = i as f32;
1768                v[i % 16] = 2.0 + f * 0.01;
1769                idx.insert(format!("concurrent-{i}"), make_embedding(&v));
1770                counter.fetch_add(1, Ordering::SeqCst);
1771            }));
1772        }
1773        for h in writer_handles {
1774            let _ = h.join();
1775        }
1776        let rebuild_h = handle.join().expect("outer rebuild spawner panicked");
1777        let _ = rebuild_h.join();
1778        // v0.7.0 #1212 — deterministic post-rebuild observation barrier.
1779        // Pre-#1212 a single `try_swap_warming()` followed immediately
1780        // by the search loop raced the cross-thread publication of the
1781        // swap under stressed CI runners (`SAL-only feature gate` /
1782        // parallel-test load). The fix is two-fold:
1783        //   (1) Drain ANY parked warming result in a tight loop — handles
1784        //       the rare double-rebuild case (writer crossing
1785        //       REBUILD_THRESHOLD during the concurrent-write phase).
1786        //   (2) Yield briefly so any post-swap state (`valid_ids_cache`
1787        //       lazy rebuild, HNSW search-side state init) settles
1788        //       before the search loop reads it. 10 ms is generous
1789        //       enough for any GHA runner under 4-way parallel-test
1790        //       contention; the test still completes in <100 ms total.
1791        let mut swaps = 0_usize;
1792        while idx.try_swap_warming() {
1793            swaps += 1;
1794            // Defensive cap — at v0.7.0 there can be at most one
1795            // parked warming result at a time, but loop bounded just
1796            // in case a follow-on rebuild parks one while we drain.
1797            if swaps > 4 {
1798                break;
1799            }
1800        }
1801        std::thread::sleep(Duration::from_millis(10));
1802
1803        // Verify all 30 ids survived the rebuild. The CONTRACT under
1804        // test is "post-rebuild state INCLUDES the concurrent writes"
1805        // — i.e. every concurrent id is in `all_entries` (graph OR
1806        // overflow). We assert that directly via `len()` + a
1807        // sufficiently-wide search rather than relying on tie-break
1808        // determinism at small k. The baseline fixture (500 entries
1809        // across 16 axes) clusters ~32 entries per axis at
1810        // post-normalization distance 0 from any axis query; concurrent
1811        // entries on the same axis tie with them at distance 0, and
1812        // truncate-to-k can clip a single tied entry under
1813        // tie-break-dependent sort behavior. The fix is to widen k
1814        // beyond the tie cluster, NOT to weaken the contract.
1815        assert_eq!(inserts_done.load(Ordering::SeqCst), 30);
1816        let final_len = idx.len();
1817        // v0.7.0 #1212 — snapshot the post-swap private state ONCE so
1818        // every downstream assertion's panic message can cite the same
1819        // ground-truth set of values. Pre-#1212 a panic at line
1820        // src/hnsw.rs:1555 reported only "found < 29" with NO
1821        // observable state, making the CI flake un-diagnosable
1822        // without local repro. Capturing overflow.len() + hnsw size
1823        // here (under a single inner-lock guard) is the diagnostic
1824        // hook the operator + future agent needs to identify which
1825        // buffer the concurrent inserts landed in at panic time.
1826        let (overflow_len_dbg, hnsw_size_dbg) = {
1827            let state = idx.inner.lock().expect("inner mutex poisoned");
1828            let hnsw_size = state.hnsw.as_ref().map_or(0, |h| h.iter().count());
1829            (state.overflow.len(), hnsw_size)
1830        };
1831        assert_eq!(
1832            final_len,
1833            530,
1834            "post-rebuild len must equal baseline 500 + concurrent 30 = 530, \
1835             got {final_len} (overflow={overflow_len_dbg}, hnsw={hnsw_size_dbg}, \
1836             swaps={swaps}, inserts_done={})",
1837            inserts_done.load(Ordering::SeqCst)
1838        );
1839        // Assert the survival CONTRACT — "post-rebuild state INCLUDES
1840        // every concurrent write" — DETERMINISTICALLY via `all_entries`
1841        // membership (each id is in the graph OR overflow), not via the
1842        // approximate HNSW `search()` path.
1843        //
1844        // Why not `search()`: the active graph is built with
1845        // `Builder::default()` and queried with `Search::default()`, so
1846        // the search beam (`ef`) is a fixed library default that does
1847        // NOT scale with `k`. `search(q, 600)` over a 530-entry index
1848        // therefore returns only the top-`ef` graph candidates plus the
1849        // overflow scan — a graph-resident entry outside the beam is
1850        // intermittently clipped under stressed / instrumented runners.
1851        // The llvm-cov coverage job (run 27001253837/27001253833) hit
1852        // exactly this: `found` came back 28/30 while `final_len == 530`
1853        // simultaneously PROVED all 30 were present. Earlier revisions
1854        // chased it by widening `k` (no effect — the limit is `ef`, not
1855        // `k`) and then relaxing the floor to 29 (still flaky: 2-id
1856        // clips occur under heavy instrumentation). Membership over
1857        // `all_entries` is exact and race-free, so it is both the
1858        // correct expression of the contract AND immune to ANN-recall
1859        // jitter.
1860        //
1861        // The `swaps`/`overflow`/`hnsw` diagnostics captured above are
1862        // retained in the panic message so any genuine drop (a swap that
1863        // loses a write) still names the failure mode in one read.
1864        let present: std::collections::HashSet<String> = {
1865            let state = idx.inner.lock().expect("inner mutex poisoned");
1866            state.all_entries.iter().map(|(id, _)| id.clone()).collect()
1867        };
1868        let missing: Vec<String> = (0..30)
1869            .map(|i| format!("concurrent-{i}"))
1870            .filter(|id| !present.contains(id))
1871            .collect();
1872        assert!(
1873            missing.is_empty(),
1874            "post-rebuild all_entries must INCLUDE every concurrent write \
1875             (missing={missing:?}; post-swap state: overflow={overflow_len_dbg}, \
1876             hnsw={hnsw_size_dbg}, swaps={swaps}, final_len={final_len}); \
1877             a missing id means the concurrent-rebuild swap dropped a write"
1878        );
1879    }
1880
1881    /// #968 contract 4 — the swap is atomic. We never observe a
1882    /// half-populated graph during the swap window. To check this we
1883    /// run many search-then-len pairs concurrently with a rebuild
1884    /// and assert that `len()` is monotonically >= the baseline at
1885    /// every observation point (the swap only adds entries, never
1886    /// loses them).
1887    #[test]
1888    fn rebuild_swap_is_atomic_968() {
1889        let idx = TArc::new(VectorIndex::build(fixture(1_000)));
1890        let baseline_len = idx.len();
1891        let stop = TArc::new(AtomicBool::new(false));
1892        let observer_stop = TArc::clone(&stop);
1893        let idx_obs = TArc::clone(&idx);
1894        let observer = std::thread::spawn(move || {
1895            while !observer_stop.load(Ordering::SeqCst) {
1896                let l = idx_obs.len();
1897                assert!(
1898                    l >= baseline_len,
1899                    "len() dropped below baseline during rebuild — partial swap observed: {l} < {baseline_len}"
1900                );
1901            }
1902        });
1903        // Run a real rebuild.
1904        let h = idx.rebuild_async();
1905        let _ = h.join();
1906        idx.try_swap_warming();
1907        // Stop the observer.
1908        stop.store(true, Ordering::SeqCst);
1909        let _ = observer.join();
1910        assert_eq!(idx.len(), baseline_len);
1911    }
1912
1913    // v0.7.0 #1074 (SR-2 #2, HIGH) — eviction-then-rebuild gap.
1914    // The swap path must DROP a warming result whose
1915    // `overflow_generation` doesn't match the current state, so an
1916    // entry inserted into overflow AFTER a clear() bump is not
1917    // mistakenly drained out as if it were a captured snapshot
1918    // entry. We park a stale-gen result directly in the warming
1919    // slot and confirm try_swap_warming refuses to swap AND does
1920    // not drain overflow.
1921    #[test]
1922    fn stale_warming_swap_is_dropped_1074() {
1923        let idx = VectorIndex::empty();
1924        // Insert a non-trivial overflow entry so we can assert
1925        // try_swap_warming doesn't drain it.
1926        idx.insert(
1927            "alpha".to_string(),
1928            make_embedding(&[1.0_f32, 0.0, 0.0, 0.0]),
1929        );
1930        let before_overflow = idx.inner.lock().unwrap().overflow.len();
1931        assert_eq!(before_overflow, 1);
1932
1933        // Park a STALE-generation warming result with a swap that
1934        // would otherwise drain everything.
1935        {
1936            let current_gen = idx.inner.lock().unwrap().overflow_generation;
1937            let mut w = idx.warming.lock().unwrap();
1938            *w = Some(RebuildResult {
1939                hnsw: None,
1940                overflow_at_snapshot: 999, // would have drained the whole overflow
1941                overflow_generation: current_gen.wrapping_add(1), // mismatched
1942                entries_in_graph: 0,
1943            });
1944        }
1945
1946        // Swap MUST refuse and leave overflow intact.
1947        let swapped = idx.try_swap_warming();
1948        assert!(
1949            !swapped,
1950            "stale-by-generation warming must NOT swap in (#1074)"
1951        );
1952        let after_overflow = idx.inner.lock().unwrap().overflow.len();
1953        assert_eq!(
1954            after_overflow, before_overflow,
1955            "stale swap must NOT drain overflow (#1074 regression)"
1956        );
1957        // The alpha entry must still be findable via the linear
1958        // overflow scan (no graph yet).
1959        let hits = idx.search(&make_embedding(&[1.0_f32, 0.0, 0.0, 0.0]), 5);
1960        assert!(hits.iter().any(|h| h.id == "alpha"));
1961    }
1962
1963    // v0.7.0 #1074 — confirm overflow.clear() in the eviction path
1964    // bumps the generation counter so a snapshot captured BEFORE the
1965    // eviction will fail the gen check on swap. This pins the load-
1966    // bearing invariant: clear() must always bump.
1967    #[test]
1968    fn eviction_clear_bumps_overflow_generation_1074() {
1969        let idx = VectorIndex::with_max_entries_for_test(2);
1970        let gen_initial = idx.inner.lock().unwrap().overflow_generation;
1971        // 3 inserts past cap=2 → at least one eviction-clear fires.
1972        for i in 0..3 {
1973            let mut v = vec![0.0_f32; 4];
1974            v[i % 4] = 1.0;
1975            idx.insert(format!("e{i}"), make_embedding(&v));
1976        }
1977        let gen_after = idx.inner.lock().unwrap().overflow_generation;
1978        assert!(
1979            gen_after > gen_initial,
1980            "eviction-clear path must bump overflow_generation (#1074)"
1981        );
1982    }
1983
1984    // -----------------------------------------------------------------
1985    // #1579 — async-boot seeding + coverage accounting
1986    // -----------------------------------------------------------------
1987
1988    #[test]
1989    fn seed_entries_dedupes_and_defers_searchability_1579() {
1990        let idx = VectorIndex::empty();
1991        // A write that arrived through the normal path stays covered
1992        // (overflow is linearly scanned).
1993        idx.insert("live".into(), make_embedding(&[1.0, 0.0, 0.0]));
1994        assert!(
1995            idx.is_fully_searchable(),
1996            "overflow-only index is fully searchable"
1997        );
1998        // Seed the boot snapshot; the id already present must be
1999        // skipped (no double-count), the rest land in all_entries
2000        // only.
2001        let seeded = idx.seed_entries(vec![
2002            ("live".into(), make_embedding(&[1.0, 0.0, 0.0])),
2003            ("a".into(), make_embedding(&[0.0, 1.0, 0.0])),
2004            ("b".into(), make_embedding(&[0.0, 0.0, 1.0])),
2005        ]);
2006        assert_eq!(seeded, 2, "duplicate id must be skipped");
2007        assert_eq!(idx.len(), 3);
2008        assert!(
2009            !idx.is_fully_searchable(),
2010            "seeded-but-unbuilt entries must report not-fully-searchable \
2011             so the conflict check routes to its fallback (#1579 A5/B3)"
2012        );
2013        // Search during the warm window must still serve the live
2014        // (overflow) entry — and must NOT pretend to cover the seeds.
2015        let hits = idx.search(&make_embedding(&[0.0, 1.0, 0.0]), 3);
2016        assert!(hits.iter().all(|h| h.id == "live"));
2017    }
2018
2019    #[test]
2020    fn warm_boot_seeds_builds_and_swaps_1579() {
2021        let idx = VectorIndex::empty();
2022        let seeded = idx.warm_boot(vec![
2023            ("a".into(), make_embedding(&[1.0, 0.0, 0.0])),
2024            ("b".into(), make_embedding(&[0.0, 1.0, 0.0])),
2025            ("c".into(), make_embedding(&[0.0, 0.0, 1.0])),
2026        ]);
2027        assert_eq!(seeded, 3);
2028        assert!(
2029            idx.is_fully_searchable(),
2030            "warm_boot must drive rebuild→swap to completion"
2031        );
2032        let hits = idx.search(&make_embedding(&[1.0, 0.0, 0.0]), 2);
2033        assert_eq!(hits.first().map(|h| h.id.as_str()), Some("a"));
2034    }
2035
2036    #[test]
2037    fn build_and_insert_preserve_full_searchability_1579() {
2038        let idx = VectorIndex::build(vec![
2039            ("a".into(), make_embedding(&[1.0, 0.0, 0.0])),
2040            ("b".into(), make_embedding(&[0.0, 1.0, 0.0])),
2041        ]);
2042        assert!(idx.is_fully_searchable(), "synchronous build covers all");
2043        idx.insert("c".into(), make_embedding(&[0.0, 0.0, 1.0]));
2044        assert!(
2045            idx.is_fully_searchable(),
2046            "insert lands in overflow — coverage preserved"
2047        );
2048        idx.remove("a");
2049        assert!(
2050            idx.is_fully_searchable(),
2051            "remove only shrinks all_entries — coverage preserved"
2052        );
2053    }
2054}