Skip to main content

ai_memory/
hnsw.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! HNSW (Hierarchical Navigable Small World) vector index for fast approximate
5//! nearest-neighbor search over memory embeddings.
6//!
7//! Built on `instant-distance`. The index is constructed at startup from all
8//! stored embeddings. New memories added during the session go into an overflow
9//! list that is scanned linearly alongside the HNSW results — the index is
10//! rebuilt lazily once the overflow exceeds a threshold.
11
12use instant_distance::{Builder, HnswMap, Search};
13// `instant_distance::Point` is the trait that supplies the
14// `EmbeddingPoint::distance` method; it has to be in scope for the
15// in-module tests (`embedding_point_distance_*`) to call it as a
16// method. The lib code itself goes through the slice-borrow
17// `cosine_distance` helper post-#1087 so the `Point` impl is the
18// only consumer of the trait at the bare-name level.
19#[cfg(test)]
20use instant_distance::Point;
21use std::sync::Arc;
22use std::sync::Mutex;
23use std::sync::atomic::{AtomicBool, Ordering};
24use std::sync::mpsc::Sender;
25use std::thread::JoinHandle;
26
27use crate::hooks::EvictionEvent;
28
29/// Tracing target for the HNSW eviction worker (#1558 tracing-target SSOT).
30const EVICTION_TRACE_TARGET: &str = "hnsw.eviction";
31
32/// Maximum overflow entries before triggering a rebuild.
33const REBUILD_THRESHOLD: usize = 200;
34
35/// #1037 (2026-05-21) — bounded spin-wait window for [`VectorIndex::rebuild`]
36/// (the sync shim) when [`VectorIndex::rebuild_async`] short-circuited to
37/// a no-op handle because a prior async rebuild was still in flight.
38/// 1 second is well under any sensible sync-rebuild expectation
39/// (production callers use `rebuild_async`); the budget exists only
40/// to convert "silently return stale graph" → "bounded timeout, then
41/// best-effort swap".
42const REBUILD_WAIT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(1);
43/// Poll cadence for bounded waits on an in-flight rebuild. Shared by
44/// the [`VectorIndex::rebuild`] sync shim (#1037) and the #1579 B3
45/// boot warm-up retry loops (`warm_boot`,
46/// `daemon_runtime::spawn_vector_index_boot_load`).
47pub(crate) const REBUILD_WAIT_POLL_INTERVAL: std::time::Duration =
48    std::time::Duration::from_millis(10);
49
50/// #1579 B3 — minimum embedded-row count before a ONE-SHOT CLI
51/// invocation builds an HNSW graph. Below this, the recall pipeline's
52/// linear-scan fallback (`vector_index = None`) answers the semantic
53/// phase in ≤ 35 ms (P1 audit, 10-20k rows) while graph construction
54/// costs ~40 s at 10k vectors — a build that is then thrown away when
55/// the process exits. Long-lived processes (serve / MCP stdio) always
56/// build (asynchronously, see `daemon_runtime::spawn_vector_index_boot_load`)
57/// because they amortise the cost across many recalls.
58pub const CLI_HNSW_BUILD_MIN_ENTRIES: usize = 20_000;
59
60/// Maximum entries before evicting oldest to prevent unbounded memory growth.
61///
62/// Production code uses the constant 100_000. Tests may construct a
63/// `VectorIndex` with a custom cap via [`VectorIndex::with_max_entries_for_test`]
64/// — that knob is stored on the index instance itself, so it does
65/// NOT affect concurrent tests running with the default cap. The
66/// constant lives here so call sites (and the per-event tracing
67/// payload) reference one canonical value.
68const MAX_ENTRIES: usize = 100_000;
69
70// ---------------------------------------------------------------------------
71// v0.6.3.1 (P3, G2): eviction observability
72//
73// `MAX_ENTRIES`-triggered eviction in `insert()` previously dropped the
74// oldest embeddings silently — operators near the cap lost recall quality
75// invisibly. The two counters below + the structured `hnsw.eviction`
76// tracing event close that gap:
77//
78//   - eviction count — cumulative count surfaced via
79//     `db::stats().index_evictions_total` (and capabilities) AND at
80//     `/metrics` as `ai_memory_hnsw_evictions_total`.
81//   - last-eviction wall clock — UNIX nanoseconds of the most recent
82//     eviction; capabilities derive `hnsw.evicted_recently` from this
83//     with a 60 s rolling window.
84//
85// **pm-v3.1 PR8 (issue #1174).** Pre-PR8 the counters were two free
86// `static AtomicU64`s at the top of this file. PR8 sank both into the
87// metrics registry (`src/metrics.rs::HNSW_EVICTIONS_TOTAL` +
88// `HNSW_LAST_EVICTION_AT_NANOS`, plus matching Prometheus
89// `IntCounter` / `IntGauge` handles on `Metrics`) so the eviction
90// signal is `/metrics`-scrape-visible without a separate observer
91// thread. The accessor signatures here are preserved verbatim for
92// call-site backward compat.
93//
94// Process-local. The counters reset on restart because the index itself
95// resets on restart. Both atomics are touched only on the eviction edge
96// (rare: requires >100k vectors), so there is no measurable hot-path cost.
97// ---------------------------------------------------------------------------
98
99/// Cumulative HNSW oldest-eviction count since process start.
100///
101/// Surfaces in `memory_stats`. Non-zero indicates the in-memory vector
102/// index has hit `MAX_ENTRIES` and dropped older embeddings; recall
103/// quality may have degraded for evicted ids until they are re-inserted
104/// (e.g. on next access via `recall` touch path).
105///
106/// pm-v3.1 PR8: thin shim over `crate::metrics::hnsw_evictions_total()`.
107#[must_use]
108pub fn index_evictions_total() -> u64 {
109    crate::metrics::hnsw_evictions_total()
110}
111
112// ---------------------------------------------------------------------------
113// M8 (v0.7.0 round-2) — eviction-rate observability.
114//
115// Operators who hit the 100k cap need two signals:
116//
117//   1. Per-eviction WARN — surface every eviction event so operators
118//      see drift before recall quality has noticeably degraded.
119//   2. Rolling-rate ERROR — when the trailing-hour eviction rate
120//      exceeds the M8 ceiling, escalate to ERROR so the ops dashboard
121//      raises a page. The escalation message names the operator
122//      knobs (`vector_index_capacity` / "move to dedicated vector DB")
123//      so the on-call has the remediation in the log line.
124//
125// Implementation: a small fixed-size ring buffer of UNIX-nanosecond
126// timestamps. Each eviction `push`es a stamp; the rolling-rate check
127// counts how many stamps sit inside the trailing-hour window. The
128// ring is locked behind a `Mutex` for write-coherent visibility; the
129// path runs only on the eviction edge so the lock cost is negligible.
130// ---------------------------------------------------------------------------
131
132/// M8 eviction-rate ceiling: events / hour past which the rolling
133/// observer escalates from WARN to ERROR.
134const EVICTION_RATE_CEILING_PER_HOUR: usize = 10;
135
136/// Rolling-hour ring buffer capacity. Chosen so the ring can hold the
137/// ceiling plus headroom for burstiness; older entries are
138/// transparently evicted on push.
139const EVICTION_RATE_RING_CAP: usize = 64;
140
141/// v0.7.0 #1093 — eviction-rate ring buffer. Switched from
142/// `Mutex<Vec<u64>>` to `Mutex<VecDeque<u64>>` so the cap-eviction
143/// path is O(1) `pop_front` instead of O(N) `Vec::remove(0)`.
144static EVICTION_RATE_RING: Mutex<std::collections::VecDeque<u64>> =
145    Mutex::new(std::collections::VecDeque::new());
146
147/// Whether an eviction occurred within the trailing `window_secs`.
148///
149/// Used by capabilities (P1) to set `hnsw.evicted_recently` so operators
150/// can see ongoing pressure on the cap, not just the cumulative count.
151/// Returns `false` when no evictions have ever happened in this process.
152#[must_use]
153pub fn evicted_recently(window_secs: u64) -> bool {
154    let last = crate::metrics::hnsw_last_eviction_at_nanos();
155    if last == 0 {
156        return false;
157    }
158    let now_nanos = std::time::SystemTime::now()
159        .duration_since(std::time::UNIX_EPOCH)
160        .map(|d| d.as_nanos())
161        .unwrap_or(0);
162    // Saturating math: clock can move backwards on some VMs.
163    let elapsed_nanos = u128::from(u64::MAX).min(now_nanos.saturating_sub(u128::from(last)));
164    elapsed_nanos < u128::from(window_secs).saturating_mul(1_000_000_000)
165}
166
167/// Reset the eviction counters. Test-only — production callers must not
168/// reach into the counter directly. The function is `pub` (rather than
169/// `pub(crate)`) so the integration-test crate at `tests/` can drive it
170/// alongside the public `index_evictions_total()` accessor; renaming
171/// keeps the intent obvious at every call site.
172///
173/// pm-v3.1 PR8: thin shim over
174/// `crate::metrics::reset_hnsw_eviction_counters_for_test()`.
175#[doc(hidden)]
176pub fn reset_eviction_counters_for_test() {
177    crate::metrics::reset_hnsw_eviction_counters_for_test();
178    if let Ok(mut g) = EVICTION_RATE_RING.lock() {
179        g.clear();
180    }
181}
182
183/// M8 (v0.7.0 round-2) — push the latest eviction timestamp into the
184/// rolling-hour ring and return how many stamps now sit inside the
185/// trailing hour. Producers call this once per eviction event;
186/// the caller branches on the returned count to escalate from WARN
187/// (already emitted) to ERROR.
188fn record_eviction_and_count_recent(now_nanos: u64) -> usize {
189    const ONE_HOUR_NANOS: u64 = crate::SECS_PER_HOUR as u64 * 1_000_000_000;
190    let cutoff = now_nanos.saturating_sub(ONE_HOUR_NANOS);
191    let Ok(mut ring) = EVICTION_RATE_RING.lock() else {
192        // Poisoned lock — observability is best-effort, return 0 so
193        // the caller does not over-escalate.
194        return 0;
195    };
196    // Drop stale entries first so the ring stays bounded and the
197    // count reflects the trailing hour.
198    ring.retain(|t| *t >= cutoff);
199    if ring.len() >= EVICTION_RATE_RING_CAP {
200        // v0.7.0 #1093 — VecDeque::pop_front is O(1); pre-#1093
201        // Vec::remove(0) was O(N) (backing-buffer shift).
202        ring.pop_front();
203    }
204    ring.push_back(now_nanos);
205    ring.len()
206}
207
208/// A point in the HNSW index — wraps a dense embedding vector.
209#[derive(Clone, Debug)]
210pub struct EmbeddingPoint(pub Vec<f32>);
211
212/// v0.7.0 #1087 — slice-borrow cosine-distance helper used by the
213/// overflow scan in [`VectorIndex::search`] to compute distances
214/// against the stored `Vec<f32>` without cloning each overflow
215/// embedding into a fresh `EmbeddingPoint`. Embeddings are
216/// L2-normalised so dot product = cosine similarity.
217#[inline]
218fn cosine_distance(a: &[f32], b: &[f32]) -> f32 {
219    let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
220    1.0 - dot
221}
222
223impl instant_distance::Point for EmbeddingPoint {
224    fn distance(&self, other: &Self) -> f32 {
225        cosine_distance(&self.0, &other.0)
226    }
227}
228
229// ---------------------------------------------------------------------------
230// #968 (Wave-2 Tier-C3) — async rebuild + double-buffering.
231//
232// Prior to #968 every HNSW rebuild ran SYNCHRONOUSLY on the request thread:
233// `Self::build_hnsw(&state.all_entries)` is CPU-bound (graph construction
234// is O(N log N) with constant factors that put 100k vectors at ~3-10s on
235// commodity hardware) and the producer's `insert()` call simply blocked
236// until the new graph was ready. Search callers contending for the same
237// `inner` mutex blocked too — recall p95 spiked from <20 ms to multi-second.
238//
239// The fix is a double-buffer pattern with background-task swap-in:
240//   • The `active` slot (inside `IndexState`) is the index that serves
241//     reads. Search holds the inner lock only long enough to clone the
242//     overflow + collect valid IDs; the HNSW search itself runs against
243//     the active graph held under the same lock (instant-distance's
244//     `Search::default()` is per-call scratch, no shared state).
245//   • The `warming` slot is `Arc<Mutex<Option<HnswMap>>>`. A background
246//     thread (`std::thread::spawn` — HNSW build is CPU-bound; no tokio
247//     runtime needed) builds the new graph from a snapshot of
248//     `all_entries`, then drops it into `warming`. On the next
249//     `try_swap_warming()` (called from search + insert + explicit poll)
250//     the warmed graph atomically replaces `active`. The mutex hold
251//     spans only the std::mem::swap — microseconds.
252//   • Concurrent writes during rebuild: writes flow into `overflow` and
253//     `all_entries` normally while the background task is building from
254//     the snapshot. On swap, we trim `overflow` of the entries already
255//     captured in the snapshot (the snapshot length is recorded when the
256//     job kicks off). Entries inserted AFTER the snapshot remain in
257//     overflow and are searched linearly until the next rebuild captures
258//     them. No write is ever dropped.
259//   • Rebuild failures: a panicking build-thread leaves `warming`
260//     untouched (`None`); `active` is unchanged. The `JoinHandle` exposes
261//     the panic to the caller via `JoinHandle::join()`. The
262//     `rebuild_in_flight` atomic flips back to `false` whether the
263//     thread succeeded or panicked (via a drop-guard `RebuildGuard`).
264// ---------------------------------------------------------------------------
265
266/// Snapshot-bound rebuild job. Carries the captured `all_entries` plus
267/// the overflow length at snapshot time so the post-swap overflow trim
268/// is deterministic. The trim must use the overflow length specifically
269/// (NOT `all_entries.len()`) because writes between snapshot and swap
270/// extend overflow; only the overflow PREFIX whose entries are now in
271/// the new graph is safe to drop.
272struct RebuildSnapshot {
273    entries: Vec<(String, Vec<f32>)>,
274    /// Length of `overflow` at the moment the snapshot was taken. The
275    /// swap path drains the first `overflow_at_snapshot` entries from
276    /// `state.overflow` — those entries are now in the new graph.
277    /// Anything inserted AFTER the snapshot remains in overflow for
278    /// the next rebuild cycle. Capturing the OVERFLOW length (not the
279    /// all-entries length) is load-bearing for correctness under
280    /// concurrent writes during rebuild.
281    overflow_at_snapshot: usize,
282    /// v0.7.0 #1074 (SR-2 #2, HIGH) — generation counter snapshot.
283    /// The eviction path bumps `state.overflow_generation` and
284    /// `state.clear()`s `overflow`. If a rebuild snapshot was
285    /// captured BEFORE the eviction, its `overflow_generation` will
286    /// not match the post-eviction `state.overflow_generation`, and
287    /// the swap path knows the snapshot is stale: it must NOT drain
288    /// overflow entries (those entries are post-eviction inserts not
289    /// in the snapshot's `entries`), and the warmed graph itself is
290    /// stale (it was built from pre-eviction `all_entries` that have
291    /// since been shrunk). The safe action is to drop the warmed
292    /// result without swapping AND without draining, then let the
293    /// next rebuild capture the current state.
294    overflow_generation: u64,
295}
296
297/// Drop-guard that clears the `rebuild_in_flight` flag even if the
298/// background build panics. Without this, a panic in `build_hnsw` (e.g.
299/// OOM in `instant_distance::Builder::build`) would leave the flag
300/// stuck-on and prevent any future rebuild from being scheduled.
301struct RebuildGuard {
302    flag: Arc<AtomicBool>,
303}
304
305impl Drop for RebuildGuard {
306    fn drop(&mut self) {
307        self.flag.store(false, Ordering::SeqCst);
308    }
309}
310
311/// Thread-safe HNSW index over memory embeddings.
312pub struct VectorIndex {
313    /// The built HNSW index — maps embedding points to memory IDs.
314    inner: Mutex<IndexState>,
315    /// #968 — warming slot for the double-buffer pattern. The background
316    /// rebuild thread parks the freshly-built graph here; readers/writers
317    /// observe it via [`Self::try_swap_warming`] on their next inner-lock
318    /// acquisition. `Some` means a rebuild has finished and is awaiting
319    /// swap-in; `None` means no warmed graph is ready.
320    warming: Arc<Mutex<Option<RebuildResult>>>,
321    /// #968 — coordinator flag. `true` while a background rebuild is in
322    /// flight; prevents the auto-rebuild path in `insert()` from
323    /// spawning a second concurrent build (one CPU-bound build at a
324    /// time is enough — successive rebuilds chase the same target).
325    /// Cleared by the rebuild thread's drop-guard whether the build
326    /// succeeded or panicked.
327    rebuild_in_flight: Arc<AtomicBool>,
328    /// v0.7.0 (R3-S1) — eviction sink. The `MAX_ENTRIES`-triggered
329    /// drain in `insert()` pushes an [`EvictionEvent`] onto this
330    /// channel for each evicted id; a hook-aware observer above this
331    /// layer drains the channel and fires the `on_index_eviction`
332    /// chain off the hot path. Wired by the daemon at startup
333    /// (`daemon_runtime`) via [`Self::set_eviction_sink`]. Optional —
334    /// CLI / test builds that never bring up the hooks pipeline leave
335    /// it `None` and the sink-push is a no-op so eviction throughput
336    /// is unaffected. Closes the G2 / G8 "fire site exists but not
337    /// wired" gap that the prior `tracing::warn!`-only implementation
338    /// left open.
339    ///
340    /// `Mutex` (not `RwLock`) because writes happen exactly twice in
341    /// the process lifetime (`set_eviction_sink` at startup and
342    /// `Drop`) and reads happen only on the eviction edge which is
343    /// itself already serialized through `inner`. The non-blocking
344    /// `try_send` semantics on the channel make sink-push safe to
345    /// hold across the inner-state lock without risk of deadlock.
346    eviction_sink: Mutex<Option<Sender<EvictionEvent>>>,
347}
348
349/// #968 — payload the rebuild thread parks in the `warming` slot when
350/// the build completes. Carries the new graph PLUS the overflow length
351/// at snapshot time so the swap path can trim `overflow` deterministically:
352/// the prefix `..overflow_at_snapshot` is now in the graph; entries
353/// inserted AFTER the snapshot (the suffix) remain in `overflow` for
354/// the next cycle.
355struct RebuildResult {
356    hnsw: Option<HnswMap<EmbeddingPoint, String>>,
357    overflow_at_snapshot: usize,
358    /// v0.7.0 #1074 — propagated from the snapshot so the swap path
359    /// can detect a stale-by-eviction warming result.
360    overflow_generation: u64,
361    /// #1579 — number of entries captured in the snapshot this graph
362    /// was built from. On swap it becomes the new
363    /// `IndexState::graph_entry_count`, the coverage accounting behind
364    /// [`VectorIndex::is_fully_searchable`].
365    entries_in_graph: usize,
366}
367
368struct IndexState {
369    hnsw: Option<HnswMap<EmbeddingPoint, String>>,
370    /// Entries added after the last rebuild. Searched linearly.
371    overflow: Vec<(String, Vec<f32>)>,
372    /// All entries (for rebuild). Kept in sync with the index + overflow.
373    all_entries: Vec<(String, Vec<f32>)>,
374    /// v0.7.0 R3-S1 — per-instance eviction cap. Defaults to
375    /// [`MAX_ENTRIES`] (the production 100k). Tests construct an
376    /// index with a smaller cap via
377    /// [`VectorIndex::with_max_entries_for_test`] so the eviction
378    /// edge can be exercised without inserting 100k vectors. Storing
379    /// the cap per-instance (rather than as a process-wide atomic)
380    /// keeps concurrent tests independent.
381    max_entries: usize,
382    /// v0.7.0 #1074 (SR-2 #2, HIGH) — generation counter bumped on
383    /// every `overflow.clear()` (eviction-edge path). Snapshots
384    /// captured before a clear carry the old generation; the swap
385    /// path compares against the current generation and drops the
386    /// warming result without swapping when they don't match. Closes
387    /// the gap where an entry inserted between a snapshot capture
388    /// and the eviction-clear was incorrectly drained by the swap.
389    overflow_generation: u64,
390    /// v0.7.0 #1087 — cached HashSet view of `all_entries` ids used
391    /// by [`VectorIndex::search`] as the stale-id filter. Built
392    /// lazily on the first search after a mutation; invalidated to
393    /// `None` on insert push, eviction drain, and remove retain.
394    /// Pre-#1087 this set was rebuilt on EVERY recall.
395    ///
396    /// PERF-7 (FX-C4-batch2, 2026-05-26): the cache stores
397    /// `Arc<str>` per id instead of `String`. UUID strings are 36
398    /// bytes; `Arc<str>` is a 16-byte fat pointer with the bytes
399    /// heap-allocated once, whereas a `String` is a 24-byte
400    /// (ptr/len/cap) struct with the bytes heap-allocated PLUS the
401    /// 24 bytes inline. On a 100 000-entry warm-up the change
402    /// halves the per-rebuild allocator pressure (16 vs 24+ bytes
403    /// per entry plus the no-spare-capacity heap-side). HashSet
404    /// lookup against `&str` works through the `Borrow<str>` impl.
405    valid_ids_cache: Option<std::collections::HashSet<std::sync::Arc<str>>>,
406    /// #1579 — number of entries baked into the ACTIVE graph (the
407    /// snapshot length at its build time; 0 when `hnsw` is `None`).
408    /// Together with `overflow.len()` this tells whether a search can
409    /// see every live entry: entries seeded into `all_entries` by the
410    /// async-boot loader ([`VectorIndex::seed_entries`]) are in
411    /// NEITHER the graph NOR `overflow` until the boot rebuild swaps
412    /// in, and [`VectorIndex::is_fully_searchable`] reports `false`
413    /// for that window so callers (the #519 proactive conflict check)
414    /// can route to their non-index fallback instead of silently
415    /// searching an index that cannot return the seeded rows.
416    graph_entry_count: usize,
417}
418
419/// A search result from the vector index.
420#[derive(Debug, Clone)]
421pub struct VectorHit {
422    pub id: String,
423    pub distance: f32,
424}
425
426impl VectorIndex {
427    /// Build a new index from a list of (`memory_id`, embedding) pairs.
428    pub fn build(entries: Vec<(String, Vec<f32>)>) -> Self {
429        let hnsw = Self::build_hnsw(&entries);
430        let graph_entry_count = entries.len();
431        VectorIndex {
432            inner: Mutex::new(IndexState {
433                hnsw,
434                overflow: Vec::new(),
435                all_entries: entries,
436                max_entries: MAX_ENTRIES,
437                overflow_generation: 0,
438                valid_ids_cache: None,
439                graph_entry_count,
440            }),
441            eviction_sink: Mutex::new(None),
442            warming: Arc::new(Mutex::new(None)),
443            rebuild_in_flight: Arc::new(AtomicBool::new(false)),
444        }
445    }
446
447    /// Build an empty index.
448    pub fn empty() -> Self {
449        VectorIndex {
450            inner: Mutex::new(IndexState {
451                hnsw: None,
452                overflow: Vec::new(),
453                all_entries: Vec::new(),
454                max_entries: MAX_ENTRIES,
455                overflow_generation: 0,
456                valid_ids_cache: None,
457                graph_entry_count: 0,
458            }),
459            eviction_sink: Mutex::new(None),
460            warming: Arc::new(Mutex::new(None)),
461            rebuild_in_flight: Arc::new(AtomicBool::new(false)),
462        }
463    }
464
465    /// v0.7.0 R3-S1 — Build an empty index with a custom eviction
466    /// cap. Test-only: lets a 5-entry insert sequence exercise the
467    /// eviction edge in milliseconds (vs. the ~minute-scale cost of
468    /// inserting 100k vectors at the production cap). The knob is
469    /// stored per-instance so concurrent tests using the default
470    /// cap are unaffected.
471    #[doc(hidden)]
472    #[must_use]
473    pub fn with_max_entries_for_test(max_entries: usize) -> Self {
474        VectorIndex {
475            inner: Mutex::new(IndexState {
476                hnsw: None,
477                overflow: Vec::new(),
478                all_entries: Vec::new(),
479                max_entries,
480                overflow_generation: 0,
481                valid_ids_cache: None,
482                graph_entry_count: 0,
483            }),
484            eviction_sink: Mutex::new(None),
485            warming: Arc::new(Mutex::new(None)),
486            rebuild_in_flight: Arc::new(AtomicBool::new(false)),
487        }
488    }
489
490    /// v0.7.0 (R3-S1) — wire the eviction sink.
491    ///
492    /// The daemon calls this once at startup with the send-half of an
493    /// mpsc channel; a hook-aware observer task drains the recv-half
494    /// off the hot path and fires the `on_index_eviction` chain
495    /// (`fire_on_index_eviction` in `src/hooks/chain.rs`). Replacing
496    /// an existing sink is allowed — useful when the daemon
497    /// reconfigures the hook chain at runtime — and drops the prior
498    /// sender, which terminates the prior observer cleanly.
499    ///
500    /// Build-time / CLI / test builds that never wire a sink retain
501    /// the `None` default; the eviction path's `try_send` then
502    /// becomes a no-op short-circuit so there is no measurable cost
503    /// to leaving the sink unset.
504    pub fn set_eviction_sink(&self, sink: Sender<EvictionEvent>) {
505        if let Ok(mut guard) = self.eviction_sink.lock() {
506            *guard = Some(sink);
507        }
508    }
509
510    fn build_hnsw(entries: &[(String, Vec<f32>)]) -> Option<HnswMap<EmbeddingPoint, String>> {
511        if entries.is_empty() {
512            return None;
513        }
514        let points: Vec<EmbeddingPoint> = entries
515            .iter()
516            .map(|(_, emb)| EmbeddingPoint(emb.clone()))
517            .collect();
518        let values: Vec<String> = entries.iter().map(|(id, _)| id.clone()).collect();
519        Some(Builder::default().build(points, values))
520    }
521
522    /// Add a new entry to the index (goes to overflow until next rebuild).
523    pub fn insert(&self, id: String, embedding: Vec<f32>) {
524        // #968 — opportunistically swap any warmed graph BEFORE taking the
525        // write path. This lets the auto-rebuild scheduled by a previous
526        // insert land cleanly even if no search call has run between
527        // inserts. Cheap: the warming-mutex contention is microseconds.
528        self.try_swap_warming();
529
530        // #968 — capture the snapshot for a potential auto-rebuild OUTSIDE
531        // the inner lock so the build thread can be spawned without
532        // holding the writers' mutex.
533        let snapshot_for_rebuild: Option<RebuildSnapshot> = {
534            let mut state = match self.inner.lock() {
535                Ok(s) => s,
536                Err(poisoned) => poisoned.into_inner(),
537            };
538            state.all_entries.push((id.clone(), embedding.clone()));
539            state.overflow.push((id, embedding));
540            // v0.7.0 #1087 — invalidate cached valid_ids set; rebuilt
541            // lazily on the next search.
542            state.valid_ids_cache = None;
543
544            // #968 — async auto-rebuild: when overflow crosses the
545            // threshold, snapshot the entries and let the caller (below,
546            // outside the lock) spawn the background build. We do NOT
547            // build the graph synchronously here anymore; that was the
548            // multi-second request-thread block #968 fixes. The
549            // `rebuild_in_flight` CAS prevents the same `insert` call
550            // from racing a previously-scheduled rebuild — only one
551            // background build runs at a time; the next snapshot is
552            // captured after the current build's swap lands.
553            if state.overflow.len() >= REBUILD_THRESHOLD
554                && self
555                    .rebuild_in_flight
556                    .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
557                    .is_ok()
558            {
559                Some(RebuildSnapshot {
560                    entries: state.all_entries.clone(),
561                    overflow_at_snapshot: state.overflow.len(),
562                    overflow_generation: state.overflow_generation,
563                })
564            } else {
565                None
566            }
567        };
568        if let Some(snap) = snapshot_for_rebuild {
569            // Spawn-and-forget. The handle is consumed inside the thread
570            // via `RebuildGuard` so it doesn't dangle. Callers that want
571            // a handle should use [`Self::rebuild_async`].
572            let _ = self.spawn_rebuild(snap);
573        }
574
575        // Evict oldest entries if over capacity
576        let mut state = match self.inner.lock() {
577            Ok(s) => s,
578            Err(poisoned) => poisoned.into_inner(),
579        };
580        let max_entries = state.max_entries;
581        if state.all_entries.len() > max_entries {
582            let excess = state.all_entries.len() - max_entries;
583            // M8 (v0.7.0 round-2) — emit ONE summary WARN per eviction
584            // event so the operator sees the batch drop in the daemon
585            // log without scrolling past N per-id lines first. The
586            // per-id WARNs (below) still fire for post-mortem
587            // attribution; this one is the high-level "the index
588            // dropped N oldest embeddings" signal operators alert on.
589            tracing::warn!(
590                target: EVICTION_TRACE_TARGET,
591                dropped = excess,
592                max_entries = max_entries,
593                "HNSW eviction: dropped {} oldest embeddings to make room",
594                excess,
595            );
596            // v0.7.0 (R3-S1) — fire the `on_index_eviction` hook event
597            // for each evicted id BEFORE we drop the rows. The sink
598            // is a non-blocking `try_send` (see below); a downstream
599            // hook-aware observer drains the channel off the hot path
600            // and invokes `crate::hooks::fire_on_index_eviction` per
601            // event. This closes the G2/G8 "fire site exists but not
602            // wired" gap that the prior `tracing::warn!`-only
603            // implementation left open.
604            //
605            // The sink push happens INSIDE the inner-state lock — the
606            // channel is unbounded so `try_send`-equivalent `send`
607            // never blocks (unbounded mpsc has no backpressure). The
608            // sink lock is independent of the inner lock so there is
609            // no ordering hazard.
610            //
611            // The hook subscriber (if any) is responsible for its own
612            // logging; the warn-level tracing event is preserved here
613            // as a no-op-when-no-subscriber fallback so operators
614            // without hooks configured still see eviction pressure in
615            // daemon logs, matching the v0.6.3.1 observability contract.
616            let sink_guard = self.eviction_sink.lock().ok();
617            for (evicted_id, _) in state.all_entries.iter().take(excess) {
618                tracing::warn!(
619                    target: EVICTION_TRACE_TARGET,
620                    evicted_id = %evicted_id,
621                    reason = "max_entries_reached",
622                    max_entries = max_entries,
623                    "hnsw index evicting oldest entry: cap reached"
624                );
625                if let Some(sink) = sink_guard.as_ref().and_then(|g| g.as_ref()) {
626                    // mpsc::Sender::send is non-blocking on an unbounded
627                    // channel (it only blocks on bounded). Errors mean the
628                    // receiver dropped — observability is best-effort, no
629                    // recovery action needed.
630                    let payload = EvictionEvent::new(
631                        evicted_id.clone(),
632                        String::new(), // namespace not in scope at hnsw layer
633                        "max_entries_reached",
634                    );
635                    let _ = sink.send(payload);
636                }
637            }
638            drop(sink_guard);
639            #[allow(clippy::cast_possible_truncation)]
640            let evicted = excess as u64;
641            // pm-v3.1 PR8 (issue #1174): counter sink moved to the
642            // metrics registry. We defer the actual `record_hnsw_eviction`
643            // call until `now_nanos_u64` is computed below so the
644            // counter and last-eviction timestamp move in lockstep.
645            let evicted_count_to_record = evicted;
646
647            state.all_entries.drain(..excess);
648            // v0.7.0 #1087 — invalidate cached valid_ids set after the
649            // eviction drain.
650            state.valid_ids_cache = None;
651            // #968 — defer the post-eviction graph rebuild to the async
652            // path. Correctness is preserved by the `valid_ids` filter
653            // in `search()` — evicted IDs are scrubbed from results
654            // immediately, even though the underlying HNSW graph still
655            // contains them until the next swap. Clearing `overflow`
656            // here was the v0.6 behavior tied to the synchronous
657            // rebuild; we preserve it so the linear-scan path doesn't
658            // re-surface evicted IDs. The next `insert()` past
659            // `REBUILD_THRESHOLD` (or an explicit `rebuild_async()`
660            // call) schedules the actual graph rebuild off-thread.
661            state.overflow.clear();
662            // v0.7.0 #1074 (SR-2 #2, HIGH) — bump the generation
663            // counter on every overflow.clear(). Any in-flight rebuild
664            // snapshot captured BEFORE this bump now carries a stale
665            // generation; the swap path detects the mismatch and
666            // drops the warming result without draining overflow,
667            // preventing the lose-an-insert race where an entry
668            // landed between snapshot and clear() was incorrectly
669            // drained by the eventual swap.
670            state.overflow_generation = state.overflow_generation.wrapping_add(1);
671            // Schedule the rebuild via the async path so the eviction
672            // edge no longer blocks the writer for the multi-second
673            // `build_hnsw` cost at 100k cap. The CAS skips if a
674            // previously-scheduled rebuild is still in flight; the
675            // next insert past threshold picks up the post-eviction
676            // state.
677            if self
678                .rebuild_in_flight
679                .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
680                .is_ok()
681            {
682                let snap = RebuildSnapshot {
683                    entries: state.all_entries.clone(),
684                    // overflow was just cleared above so the snapshot
685                    // captures an empty overflow window — anything
686                    // inserted post-eviction will be a fresh suffix.
687                    overflow_at_snapshot: state.overflow.len(),
688                    overflow_generation: state.overflow_generation,
689                };
690                // Release the inner lock before spawning so the
691                // background thread can take it on swap. The
692                // observability path below only reads counters /
693                // statics, not `state`, so we do not need to
694                // re-acquire.
695                drop(state);
696                let _ = self.spawn_rebuild(snap);
697            }
698
699            // Record completion time AFTER the rebuild. `evicted_recently` is
700            // a "did we evict in the trailing N seconds" check; an operator
701            // asking that wants the operation completion time, not the
702            // start. At v0.6 the in-line `build_hnsw` dominated wall time
703            // here (~minutes at 100k entries) — using the start would
704            // make evicted_recently misreport even immediately after
705            // insert returns. Post-#968 the build runs off-thread so
706            // the gap shrinks to microseconds, but the
707            // completion-time-after-rebuild semantics are preserved.
708            let now_nanos = std::time::SystemTime::now()
709                .duration_since(std::time::UNIX_EPOCH)
710                .map(|d| d.as_nanos())
711                .unwrap_or(0);
712            let now_nanos_u64 = u64::try_from(now_nanos).unwrap_or(u64::MAX);
713            // pm-v3.1 PR8 (issue #1174): single sink call covers both
714            // the cumulative counter and the last-eviction timestamp,
715            // mirroring both onto the Prometheus handles so `/metrics`
716            // scrapes see the eviction event without polling
717            // `memory_stats`.
718            crate::metrics::record_hnsw_eviction(evicted_count_to_record, now_nanos_u64);
719
720            // M8 (v0.7.0 round-2) — rolling-hour rate observer. Push
721            // a stamp on this eviction, then count stamps in the
722            // trailing hour. If the rate clears the M8 ceiling,
723            // escalate to ERROR so the dashboard pages the on-call.
724            let recent = record_eviction_and_count_recent(now_nanos_u64);
725            if recent > EVICTION_RATE_CEILING_PER_HOUR {
726                tracing::error!(
727                    target: EVICTION_TRACE_TARGET,
728                    rate_per_hour = recent,
729                    ceiling = EVICTION_RATE_CEILING_PER_HOUR,
730                    "HNSW eviction rate exceeded {}/hour — recall quality is degrading; \
731                     increase vector_index_capacity or move to dedicated vector DB",
732                    EVICTION_RATE_CEILING_PER_HOUR,
733                );
734            }
735        }
736    }
737
738    /// Remove an entry by ID (marks for exclusion; cleaned up on rebuild).
739    pub fn remove(&self, id: &str) {
740        let mut state = match self.inner.lock() {
741            Ok(s) => s,
742            Err(poisoned) => poisoned.into_inner(),
743        };
744        state.all_entries.retain(|(eid, _)| eid != id);
745        state.overflow.retain(|(eid, _)| eid != id);
746        // v0.7.0 #1087 — invalidate cached valid_ids set after remove.
747        state.valid_ids_cache = None;
748        // Note: the HNSW index itself is immutable — removed IDs are filtered
749        // from search results. A rebuild will fully remove them.
750    }
751
752    /// Search for the `k` nearest neighbors to the query embedding.
753    ///
754    /// Combines HNSW approximate search with linear scan of overflow entries.
755    /// Returns results sorted by ascending distance (closest first).
756    pub fn search(&self, query: &[f32], k: usize) -> Vec<VectorHit> {
757        // #968 — opportunistic swap-on-read. If a background rebuild has
758        // parked a warmed graph in the `warming` slot, swap it into
759        // `active` BEFORE we serve this search. The swap is a single
760        // `std::mem::swap` under the inner mutex held for microseconds;
761        // search itself never blocks on graph construction.
762        self.try_swap_warming();
763
764        let mut state = match self.inner.lock() {
765            Ok(s) => s,
766            Err(poisoned) => poisoned.into_inner(),
767        };
768        let query_point = EmbeddingPoint(query.to_vec());
769
770        let mut results: Vec<VectorHit> = Vec::with_capacity(k * 2);
771
772        // v0.7.0 #1087 — populate the cached valid_ids set on the
773        // first search after any mutation; reuse it across recalls.
774        // Pre-#1087 this set was rebuilt on EVERY recall (iterating
775        // up to 100k strings + a fresh HashSet allocation per call).
776        if state.valid_ids_cache.is_none() {
777            // PERF-7 — collect to HashSet<Arc<str>> instead of
778            // HashSet<String>. Arc<str> is a 16-byte fat pointer
779            // vs String's 24-byte (ptr/len/cap) struct; the
780            // backing bytes are shared with no spare capacity
781            // overhead. HashSet::contains accepts `&str` via the
782            // `Borrow<str>` impl on `Arc<str>`.
783            let set: std::collections::HashSet<std::sync::Arc<str>> = state
784                .all_entries
785                .iter()
786                .map(|(id, _)| std::sync::Arc::<str>::from(id.as_str()))
787                .collect();
788            state.valid_ids_cache = Some(set);
789        }
790        let valid_ids = state
791            .valid_ids_cache
792            .as_ref()
793            .expect("valid_ids_cache populated above");
794
795        // Search the HNSW index
796        if let Some(ref hnsw) = state.hnsw {
797            let mut search = Search::default();
798            for item in hnsw.search(&query_point, &mut search) {
799                if !valid_ids.contains(item.value.as_str()) {
800                    continue; // Removed entry
801                }
802                results.push(VectorHit {
803                    id: item.value.clone(),
804                    distance: item.distance,
805                });
806                if results.len() >= k * 2 {
807                    break;
808                }
809            }
810        }
811
812        // v0.7.0 #1087 — linear scan of overflow entries WITHOUT
813        // cloning the embedding vec. Pre-#1087 this constructed
814        // `EmbeddingPoint(emb.clone())` per overflow entry (~200 ×
815        // 1536 bytes = 300 KB of clone per search at the cap); the
816        // cosine-distance helper takes `&[f32]` so we inline against
817        // the stored slice instead.
818        let mut overflow_hits: Vec<VectorHit> = Vec::with_capacity(state.overflow.len());
819        for (id, emb) in &state.overflow {
820            overflow_hits.push(VectorHit {
821                id: id.clone(),
822                distance: cosine_distance(&query_point.0, emb),
823            });
824        }
825        overflow_hits.sort_by(|a, b| a.distance.partial_cmp(&b.distance).unwrap());
826
827        results.extend(overflow_hits);
828
829        // Deduplicate by ID (prefer lower distance)
830        let mut seen = std::collections::HashSet::new();
831        results.retain(|hit| seen.insert(hit.id.clone()));
832
833        // Sort by distance and truncate
834        results.sort_by(|a, b| a.distance.partial_cmp(&b.distance).unwrap());
835        results.truncate(k);
836        results
837    }
838
839    /// Return the total number of indexed entries (HNSW + overflow).
840    pub fn len(&self) -> usize {
841        let state = match self.inner.lock() {
842            Ok(s) => s,
843            Err(poisoned) => poisoned.into_inner(),
844        };
845        state.all_entries.len()
846    }
847
848    /// `true` when the index holds no live entries at all.
849    ///
850    /// #1579 QC — load-bearing for the proactive-conflict dispatch:
851    /// an EMPTY index is *vacuously* [`Self::is_fully_searchable`]
852    /// (`0 + 0 >= 0`), but during the async-boot LOAD phase — after
853    /// the daemon binds with `VectorIndex::empty()` and before the
854    /// boot loader's `seed_entries` lands (the `get_all_embeddings`
855    /// read is the long pole at 100k rows) — emptiness says nothing
856    /// about what the DB holds. Callers that would otherwise trust a
857    /// fully-searchable index (the #519 conflict check) must ALSO
858    /// require non-emptiness, so that window routes to the bounded
859    /// recency-scan fallback instead of consulting an index that
860    /// cannot return anything.
861    pub fn is_empty(&self) -> bool {
862        self.len() == 0
863    }
864
865    /// #968 — Force a full rebuild of the HNSW index from all entries,
866    /// SYNCHRONOUSLY. Preserved for tests + emergency paths; production
867    /// code should call [`Self::rebuild_async`] so the multi-second
868    /// graph build does not block the calling thread.
869    ///
870    /// Implementation: delegates to `rebuild_async` and `join`s the
871    /// resulting handle so callers retain the v0.6 semantics ("the
872    /// graph is rebuilt by the time this returns"). Tests rely on this
873    /// blocking behavior to assert post-rebuild invariants without
874    /// adding a yield/poll loop.
875    pub fn rebuild(&self) {
876        // #1037 (MEDIUM, 2026-05-21): defend the sync-rebuild contract
877        // against the `rebuild_async()` no-op-handle short-circuit.
878        // Pre-#1037 if a previous async rebuild was still in flight
879        // (`rebuild_in_flight==true`), `rebuild_async` returned a
880        // no-op `std::thread::spawn(|| {})` handle that joined
881        // instantly — `try_swap_warming()` then ran against a warming
882        // slot that the IN-FLIGHT build hadn't populated yet, so the
883        // sync contract ("graph is rebuilt by the time this returns")
884        // was silently violated. The caller observed the pre-rebuild
885        // state.
886        //
887        // Fix: after the initial `join()`, spin-wait on
888        // `rebuild_in_flight` for up to REBUILD_WAIT_TIMEOUT so the
889        // in-flight build has a bounded window to complete its
890        // warming-slot insert. Then run `try_swap_warming()`. If the
891        // in-flight build genuinely hangs (test-fixture corner case),
892        // surface that as a clean timeout rather than silently
893        // returning a stale graph.
894        let handle = self.rebuild_async();
895        let _ = handle.join();
896        // Bounded spin-wait for any concurrently-running rebuild to
897        // populate `warming`. Cheap CAS read; total budget is
898        // REBUILD_WAIT_TIMEOUT * REBUILD_WAIT_POLL_INTERVAL =
899        // ~10ms × 100 = 1 second worst-case (well under any sensible
900        // sync-rebuild expectation; production callers are async).
901        let start = std::time::Instant::now();
902        while self.rebuild_in_flight.load(Ordering::SeqCst)
903            && start.elapsed() < REBUILD_WAIT_TIMEOUT
904        {
905            std::thread::sleep(REBUILD_WAIT_POLL_INTERVAL);
906        }
907        self.try_swap_warming();
908    }
909
910    /// #968 — Schedule a full HNSW rebuild on a background thread and
911    /// return the [`JoinHandle`] for callers that want to observe
912    /// completion. The build does NOT hold the inner mutex; readers
913    /// and writers continue to operate against `active` + `overflow`
914    /// while the new graph warms up. On success, the warmed graph
915    /// lands in the `warming` slot and is swapped into `active` by
916    /// the next reader/writer (or by the foreground `rebuild` shim's
917    /// post-join `try_swap_warming` call).
918    ///
919    /// Concurrency contract:
920    /// - At most one rebuild runs at a time (gated by the
921    ///   `rebuild_in_flight` atomic). A second `rebuild_async` call
922    ///   while a build is in flight returns a no-op handle (the
923    ///   spawned closure short-circuits if the CAS fails — the in-
924    ///   flight build will pick up the latest entries via the next
925    ///   trigger).
926    /// - Writes during the build flow into `overflow` and
927    ///   `all_entries` normally. The swap path uses the snapshot
928    ///   length captured at spawn time to trim only the overflow
929    ///   entries that are now in the new graph; entries inserted
930    ///   AFTER the snapshot remain in overflow for the next cycle.
931    /// - Search is unaffected: it reads `active` + `overflow` under
932    ///   the inner mutex, both of which remain coherent throughout.
933    ///
934    /// Failure: a panic inside the build thread is observable via
935    /// `JoinHandle::join()`; `active` is unchanged. The
936    /// `rebuild_in_flight` flag is cleared by the `RebuildGuard`
937    /// drop-guard whether the build succeeded or panicked.
938    pub fn rebuild_async(&self) -> JoinHandle<()> {
939        // Snapshot under the inner lock so we capture a consistent
940        // entries list. Read-only; we do not mutate `all_entries`
941        // here. If a rebuild is already in flight, return a no-op
942        // handle (a thread that joins instantly).
943        let snapshot = {
944            let state = match self.inner.lock() {
945                Ok(s) => s,
946                Err(poisoned) => poisoned.into_inner(),
947            };
948            if self
949                .rebuild_in_flight
950                .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
951                .is_err()
952            {
953                // Already running — return an instantly-completing
954                // handle. The caller's `join()` returns `Ok(())`.
955                return std::thread::spawn(|| {});
956            }
957            RebuildSnapshot {
958                entries: state.all_entries.clone(),
959                overflow_at_snapshot: state.overflow.len(),
960                overflow_generation: state.overflow_generation,
961            }
962        };
963        self.spawn_rebuild(snapshot)
964    }
965
966    /// #968 — internal: spawn the rebuild thread for a captured
967    /// snapshot. The caller is expected to have flipped
968    /// `rebuild_in_flight` to `true` already (via CAS). The drop-guard
969    /// inside the thread clears the flag whether the build succeeds
970    /// or panics.
971    fn spawn_rebuild(&self, snapshot: RebuildSnapshot) -> JoinHandle<()> {
972        let warming = Arc::clone(&self.warming);
973        let in_flight = Arc::clone(&self.rebuild_in_flight);
974        std::thread::spawn(move || {
975            // RAII clears `rebuild_in_flight` even on panic.
976            let _guard = RebuildGuard { flag: in_flight };
977            // CPU-bound graph build runs OUTSIDE the inner mutex.
978            // This is the load-bearing change for #968: readers and
979            // writers continue to make progress while this runs.
980            let hnsw = VectorIndex::build_hnsw(&snapshot.entries);
981            let result = RebuildResult {
982                hnsw,
983                overflow_at_snapshot: snapshot.overflow_at_snapshot,
984                overflow_generation: snapshot.overflow_generation,
985                entries_in_graph: snapshot.entries.len(),
986            };
987            // Park the result in the warming slot. The next caller
988            // through `try_swap_warming` will move it into `active`.
989            // Holding the warming mutex here is microseconds.
990            if let Ok(mut slot) = warming.lock() {
991                // Overwrite any older warmed result that was never
992                // swapped (e.g. two rebuilds completed before any
993                // reader ran). The newer build is by definition a
994                // superset of the older one's entries, so dropping
995                // the older result is correct.
996                *slot = Some(result);
997            }
998        })
999    }
1000
1001    /// #968 — Swap the warming slot into active if a warmed graph is
1002    /// ready. Called opportunistically from `search`, `insert`, and
1003    /// the post-join path of the sync `rebuild` shim. The swap holds
1004    /// the inner mutex for microseconds — just long enough to
1005    /// `std::mem::replace` the graph and trim the overflow.
1006    ///
1007    /// Returns `true` if a swap occurred, `false` otherwise. Test
1008    /// code uses the return value to verify the swap landed before
1009    /// asserting post-rebuild state.
1010    pub fn try_swap_warming(&self) -> bool {
1011        // Pop the warmed result FIRST so we hold the warming mutex
1012        // only long enough to take ownership. We then re-acquire the
1013        // inner mutex to swap it in. The two-mutex sequence is safe
1014        // (no ordering hazard with any other path that takes both).
1015        let Some(result) = self.warming.lock().ok().and_then(|mut g| g.take()) else {
1016            return false;
1017        };
1018        let mut state = match self.inner.lock() {
1019            Ok(s) => s,
1020            Err(poisoned) => poisoned.into_inner(),
1021        };
1022        // v0.7.0 #1074 (SR-2 #2, HIGH) — generation check. If the
1023        // overflow generation has bumped since the rebuild captured
1024        // its snapshot, the warming graph was built from pre-eviction
1025        // all_entries and the current overflow contains post-eviction
1026        // inserts that are NOT in that graph. Drop the warming result
1027        // entirely without swapping (the next rebuild captures the
1028        // current state cleanly). Pre-#1074 the swap would have
1029        // overwritten the live graph with a stale one AND drained
1030        // the post-eviction inserts from overflow — silently losing
1031        // them until the next rebuild.
1032        if result.overflow_generation != state.overflow_generation {
1033            tracing::warn!(
1034                target: "hnsw.rebuild",
1035                snapshot_gen = result.overflow_generation,
1036                current_gen = state.overflow_generation,
1037                "dropping stale warming result (eviction occurred mid-rebuild, #1074)"
1038            );
1039            return false;
1040        }
1041        state.hnsw = result.hnsw;
1042        // #1579 — coverage accounting for `is_fully_searchable`.
1043        state.graph_entry_count = result.entries_in_graph;
1044        // Trim overflow: the first `overflow_at_snapshot` entries are
1045        // now in the graph; entries inserted AFTER the snapshot
1046        // remain. Defensive `min` in case `remove` or eviction
1047        // shortened overflow while the build was running.
1048        let to_drain = result.overflow_at_snapshot.min(state.overflow.len());
1049        state.overflow.drain(..to_drain);
1050        true
1051    }
1052
1053    /// #1579 — `true` when a search against this index can observe
1054    /// every live entry: the active graph (its build-time snapshot
1055    /// length) plus the linearly-scanned `overflow` cover
1056    /// `all_entries`. `false` exactly during the async-boot warm
1057    /// window, when [`Self::seed_entries`] has parked DB-loaded
1058    /// entries in `all_entries` but the background graph build has
1059    /// not swapped in yet — sequenced writes flow through `insert()`
1060    /// (graph- or overflow-visible) so they never break coverage, and
1061    /// removals/evictions only shrink `all_entries` (stale graph ids
1062    /// are filtered at search time), so the inequality is conservative
1063    /// in the safe direction.
1064    ///
1065    /// Consumers: the #519 proactive conflict check routes to its
1066    /// bounded-scan fallback while this is `false`; the boot loader
1067    /// uses it to decide whether a make-up rebuild is needed after a
1068    /// racing routine rebuild swallowed its CAS.
1069    pub fn is_fully_searchable(&self) -> bool {
1070        // Opportunistically land any warmed graph first so a caller
1071        // probing right after a rebuild finished sees the swapped
1072        // state (mirrors `search`/`insert`).
1073        self.try_swap_warming();
1074        let state = match self.inner.lock() {
1075            Ok(s) => s,
1076            Err(poisoned) => poisoned.into_inner(),
1077        };
1078        state.graph_entry_count + state.overflow.len() >= state.all_entries.len()
1079    }
1080
1081    /// #1579 B3 — bulk-load DB-resident entries into the index
1082    /// WITHOUT building the graph (the async-boot path). Entries land
1083    /// in `all_entries` only; they become searchable when the
1084    /// follow-up rebuild (see [`Self::seed_and_rebuild_async`]) swaps
1085    /// its graph in. Ids already present (e.g. a row written through
1086    /// `insert()` between the caller's DB snapshot and this call) are
1087    /// skipped so the index never double-counts. Returns the number of
1088    /// entries actually seeded.
1089    ///
1090    /// Deliberately does NOT enforce `max_entries` eviction here —
1091    /// the legacy synchronous boot path (`VectorIndex::build` over
1092    /// `get_all_embeddings`) never evicted at boot either, and the
1093    /// first post-boot `insert()` applies the cap exactly as before.
1094    pub fn seed_entries(&self, entries: Vec<(String, Vec<f32>)>) -> usize {
1095        let mut state = match self.inner.lock() {
1096            Ok(s) => s,
1097            Err(poisoned) => poisoned.into_inner(),
1098        };
1099        let existing: std::collections::HashSet<std::sync::Arc<str>> = state
1100            .all_entries
1101            .iter()
1102            .map(|(id, _)| std::sync::Arc::<str>::from(id.as_str()))
1103            .collect();
1104        let mut seeded = 0usize;
1105        for (id, emb) in entries {
1106            if existing.contains(id.as_str()) {
1107                continue;
1108            }
1109            state.all_entries.push((id, emb));
1110            seeded += 1;
1111        }
1112        if seeded > 0 {
1113            state.valid_ids_cache = None;
1114        }
1115        seeded
1116    }
1117
1118    /// #1579 B3 — async-boot warm-up: seed DB-loaded entries (see
1119    /// [`Self::seed_entries`]) and schedule the graph build on the
1120    /// existing #968 double-buffer rebuild machinery. Returns the
1121    /// rebuild thread's [`JoinHandle`]; the caller (the boot loader)
1122    /// joins it off the request path and then calls
1123    /// [`Self::try_swap_warming`] + emits the operator-visible
1124    /// "index warm" line.
1125    ///
1126    /// If a routine rebuild is already in flight (its snapshot
1127    /// predates the seed), `rebuild_async` returns a no-op handle and
1128    /// the seeded entries stay graph-invisible until a later rebuild;
1129    /// the boot loader detects that via [`Self::is_fully_searchable`]
1130    /// and issues a make-up `rebuild`.
1131    pub fn seed_and_rebuild_async(&self, entries: Vec<(String, Vec<f32>)>) -> JoinHandle<()> {
1132        self.seed_entries(entries);
1133        self.rebuild_async()
1134    }
1135
1136    /// #1579 B3 — blocking boot warm-up for callers that hold the
1137    /// index directly (the MCP stdio boot thread; tests). Seeds the
1138    /// DB-loaded entries and drives rebuild→swap to completion,
1139    /// returning the number of entries seeded. Each step takes the
1140    /// inner mutex only briefly (the graph build itself runs on the
1141    /// #968 background thread against a snapshot), so concurrent
1142    /// readers/writers on other threads keep making progress — the
1143    /// CALLING thread is the only one parked.
1144    ///
1145    /// The retry loop covers the rebuild-CAS race: if a routine
1146    /// 200-overflow rebuild was already in flight when our seed
1147    /// landed, `rebuild_async` short-circuits to a no-op handle and
1148    /// the in-flight build's pre-seed snapshot cannot cover the
1149    /// seeded rows — `is_fully_searchable` stays `false` and the loop
1150    /// schedules a make-up rebuild once the CAS frees up.
1151    pub fn warm_boot(&self, entries: Vec<(String, Vec<f32>)>) -> usize {
1152        let seeded = self.seed_entries(entries);
1153        loop {
1154            let handle = self.rebuild_async();
1155            let _ = handle.join();
1156            // `is_fully_searchable` opportunistically swaps any warmed
1157            // graph before evaluating coverage.
1158            if self.is_fully_searchable() {
1159                return seeded;
1160            }
1161            std::thread::sleep(REBUILD_WAIT_POLL_INTERVAL);
1162        }
1163    }
1164}
1165
1166#[cfg(test)]
1167mod tests {
1168    use super::*;
1169
1170    fn make_embedding(values: &[f32]) -> Vec<f32> {
1171        // L2-normalize
1172        let norm: f32 = values.iter().map(|v| v * v).sum::<f32>().sqrt();
1173        values.iter().map(|v| v / norm).collect()
1174    }
1175
1176    #[test]
1177    fn empty_index_returns_empty() {
1178        let idx = VectorIndex::empty();
1179        let results = idx.search(&[1.0, 0.0, 0.0], 10);
1180        assert!(results.is_empty());
1181    }
1182
1183    #[test]
1184    fn perf_7_valid_ids_cache_is_arc_str_typed() {
1185        // PERF-7 (FX-C4-batch2, 2026-05-26) — the valid_ids cache
1186        // must be HashSet<Arc<str>>, not HashSet<String>. The
1187        // discriminator is the cache field's type, which we exercise
1188        // by populating the index, searching once to materialise the
1189        // cache, then reaching into the locked state to verify the
1190        // cache element shape via downcast on a sample entry.
1191        let entries = vec![
1192            ("id1".to_string(), make_embedding(&[1.0, 0.0, 0.0])),
1193            ("id2".to_string(), make_embedding(&[0.0, 1.0, 0.0])),
1194        ];
1195        let idx = VectorIndex::build(entries);
1196        // Trigger one search so the cache populates.
1197        let _ = idx.search(&[1.0, 0.0, 0.0], 2);
1198        let state = idx.inner.lock().expect("lock state");
1199        let cache = state
1200            .valid_ids_cache
1201            .as_ref()
1202            .expect("valid_ids_cache populated after search");
1203        assert!(
1204            cache.contains("id1") && cache.contains("id2"),
1205            "PERF-7: Arc<str> cache failed to admit &str lookup",
1206        );
1207        // Type system pins the field type — if a future refactor
1208        // changes the type back to HashSet<String>, this `Arc::clone`
1209        // line fails to compile.
1210        let sample: Option<&std::sync::Arc<str>> = cache.iter().next();
1211        assert!(sample.is_some(), "PERF-7: cache must hold Arc<str> entries");
1212    }
1213
1214    #[test]
1215    fn basic_search() {
1216        let entries = vec![
1217            ("a".into(), make_embedding(&[1.0, 0.0, 0.0])),
1218            ("b".into(), make_embedding(&[0.0, 1.0, 0.0])),
1219            ("c".into(), make_embedding(&[0.0, 0.0, 1.0])),
1220        ];
1221        let idx = VectorIndex::build(entries);
1222        let results = idx.search(&make_embedding(&[1.0, 0.1, 0.0]), 2);
1223        assert_eq!(results.len(), 2);
1224        assert_eq!(results[0].id, "a"); // Closest to [1, 0.1, 0]
1225    }
1226
1227    #[test]
1228    fn insert_and_search_overflow() {
1229        let entries = vec![("a".into(), make_embedding(&[1.0, 0.0, 0.0]))];
1230        let idx = VectorIndex::build(entries);
1231        idx.insert("b".into(), make_embedding(&[0.9, 0.1, 0.0]));
1232        let results = idx.search(&make_embedding(&[1.0, 0.0, 0.0]), 2);
1233        assert_eq!(results.len(), 2);
1234        assert_eq!(results[0].id, "a");
1235        assert_eq!(results[1].id, "b");
1236    }
1237
1238    #[test]
1239    fn remove_excludes_from_results() {
1240        let entries = vec![
1241            ("a".into(), make_embedding(&[1.0, 0.0, 0.0])),
1242            ("b".into(), make_embedding(&[0.9, 0.1, 0.0])),
1243        ];
1244        let idx = VectorIndex::build(entries);
1245        idx.remove("a");
1246        let results = idx.search(&make_embedding(&[1.0, 0.0, 0.0]), 5);
1247        assert!(results.iter().all(|h| h.id != "a"));
1248    }
1249
1250    // -----------------------------------------------------------------
1251    // W11/S11b — rebuild + batched-insert hardening
1252    // -----------------------------------------------------------------
1253
1254    #[test]
1255    fn test_rebuild_preserves_all_entries() {
1256        // Build a small but non-trivial set of orthonormal-ish vectors,
1257        // rebuild the index, and confirm every id is still findable via
1258        // search with a top-k that covers them all.
1259        let raw: Vec<(String, Vec<f32>)> = (0..12)
1260            .map(|i| {
1261                let mut v = vec![0.0_f32; 16];
1262                #[allow(clippy::cast_precision_loss)]
1263                let f = i as f32;
1264                v[i % 16] = 1.0 + f * 0.01; // bias to make L2 norm non-trivial
1265                (format!("id-{i}"), make_embedding(&v))
1266            })
1267            .collect();
1268
1269        let idx = VectorIndex::build(raw.clone());
1270        idx.rebuild();
1271        assert_eq!(idx.len(), raw.len());
1272
1273        // Every id should appear when we ask for top-N where N >= count.
1274        let query = make_embedding(&[1.0; 16]);
1275        let hits = idx.search(&query, raw.len() * 2);
1276        let found: std::collections::HashSet<String> = hits.into_iter().map(|h| h.id).collect();
1277        for (id, _) in &raw {
1278            assert!(
1279                found.contains(id),
1280                "rebuild must preserve id {id}, found: {:?}",
1281                found
1282            );
1283        }
1284    }
1285
1286    #[test]
1287    fn test_remove_then_search_excludes_id() {
1288        let entries = vec![
1289            ("alpha".into(), make_embedding(&[1.0, 0.0, 0.0, 0.0])),
1290            ("beta".into(), make_embedding(&[0.9, 0.1, 0.0, 0.0])),
1291            ("gamma".into(), make_embedding(&[0.8, 0.2, 0.0, 0.0])),
1292        ];
1293        let idx = VectorIndex::build(entries);
1294        // Pre-remove: alpha should be the closest to (1,0,0,0).
1295        let pre = idx.search(&make_embedding(&[1.0, 0.0, 0.0, 0.0]), 5);
1296        assert!(pre.iter().any(|h| h.id == "alpha"));
1297
1298        idx.remove("alpha");
1299        // Post-remove: alpha must not appear regardless of k.
1300        for k in 1..=10 {
1301            let hits = idx.search(&make_embedding(&[1.0, 0.0, 0.0, 0.0]), k);
1302            assert!(
1303                hits.iter().all(|h| h.id != "alpha"),
1304                "removed id `alpha` resurfaced with k={k}: {:?}",
1305                hits.iter().map(|h| &h.id).collect::<Vec<_>>()
1306            );
1307        }
1308
1309        // Other entries still findable.
1310        let hits = idx.search(&make_embedding(&[1.0, 0.0, 0.0, 0.0]), 5);
1311        let ids: Vec<&str> = hits.iter().map(|h| h.id.as_str()).collect();
1312        assert!(ids.contains(&"beta"));
1313        assert!(ids.contains(&"gamma"));
1314    }
1315
1316    // -----------------------------------------------------------------
1317    // W12-H — small edge cases
1318    // -----------------------------------------------------------------
1319
1320    #[test]
1321    fn empty_index_len_is_zero() {
1322        let idx = VectorIndex::empty();
1323        assert_eq!(idx.len(), 0);
1324    }
1325
1326    #[test]
1327    fn build_with_empty_entries_search_empty() {
1328        let idx = VectorIndex::build(Vec::new());
1329        assert_eq!(idx.len(), 0);
1330        let results = idx.search(&[1.0, 0.0, 0.0], 5);
1331        assert!(results.is_empty());
1332    }
1333
1334    #[test]
1335    fn search_with_k_zero_returns_empty() {
1336        let entries = vec![("a".into(), make_embedding(&[1.0, 0.0, 0.0]))];
1337        let idx = VectorIndex::build(entries);
1338        let results = idx.search(&make_embedding(&[1.0, 0.0, 0.0]), 0);
1339        assert!(results.is_empty());
1340    }
1341
1342    #[test]
1343    fn rebuild_on_empty_does_not_crash() {
1344        let idx = VectorIndex::empty();
1345        idx.rebuild();
1346        assert_eq!(idx.len(), 0);
1347    }
1348
1349    #[test]
1350    fn insert_increases_len() {
1351        let idx = VectorIndex::empty();
1352        idx.insert("a".into(), make_embedding(&[1.0, 0.0, 0.0]));
1353        idx.insert("b".into(), make_embedding(&[0.0, 1.0, 0.0]));
1354        assert_eq!(idx.len(), 2);
1355    }
1356
1357    #[test]
1358    fn embedding_point_distance_orthogonal() {
1359        let a = EmbeddingPoint(vec![1.0, 0.0, 0.0]);
1360        let b = EmbeddingPoint(vec![0.0, 1.0, 0.0]);
1361        // 1 - dot = 1 - 0 = 1
1362        assert!((a.distance(&b) - 1.0).abs() < 1e-6);
1363    }
1364
1365    #[test]
1366    fn embedding_point_distance_identical_is_zero() {
1367        let a = EmbeddingPoint(make_embedding(&[1.0, 1.0, 1.0]));
1368        // 1 - 1 = 0 (L2-normalised)
1369        assert!(a.distance(&a).abs() < 1e-6);
1370    }
1371
1372    #[test]
1373    fn remove_on_empty_index_is_noop() {
1374        let idx = VectorIndex::empty();
1375        idx.remove("nonexistent");
1376        assert_eq!(idx.len(), 0);
1377    }
1378
1379    #[test]
1380    fn insert_triggers_auto_rebuild_at_threshold() {
1381        // REBUILD_THRESHOLD = 200. Inserting that many into a fresh index
1382        // exercises the auto-rebuild branch in `insert`.
1383        let idx = VectorIndex::empty();
1384        for i in 0..205_usize {
1385            let mut v = vec![0.0_f32; 8];
1386            #[allow(clippy::cast_precision_loss)]
1387            let f = i as f32;
1388            v[i % 8] = 1.0 + f * 0.001;
1389            idx.insert(format!("id-{i}"), make_embedding(&v));
1390        }
1391        assert_eq!(idx.len(), 205);
1392        // After auto-rebuild, search still works — top-k returns hits.
1393        let q = make_embedding(&[1.0_f32; 8]);
1394        let hits = idx.search(&q, 5);
1395        assert_eq!(hits.len(), 5);
1396    }
1397
1398    #[test]
1399    fn test_rebuild_after_batch_insert_settles() {
1400        // Start empty, batch-insert N entries, force a rebuild, then assert
1401        // that top-K search returns exactly K results (deterministic count
1402        // for a fully-populated index with K <= len).
1403        let idx = VectorIndex::empty();
1404        let n = 25_usize;
1405        for i in 0..n {
1406            let mut v = vec![0.0_f32; 8];
1407            #[allow(clippy::cast_precision_loss)]
1408            let f = i as f32;
1409            v[i % 8] = 1.0 + f * 0.001;
1410            idx.insert(format!("id-{i}"), make_embedding(&v));
1411        }
1412        // Force a rebuild — overflow may not have hit REBUILD_THRESHOLD.
1413        idx.rebuild();
1414        assert_eq!(idx.len(), n);
1415
1416        let query = make_embedding(&[1.0; 8]);
1417        let k = 5;
1418        let hits = idx.search(&query, k);
1419        assert_eq!(
1420            hits.len(),
1421            k,
1422            "post-rebuild search top-{k} must return exactly {k} hits, got {:?}",
1423            hits.iter().map(|h| &h.id).collect::<Vec<_>>()
1424        );
1425
1426        // Distances should be sorted ascending (closest first).
1427        for w in hits.windows(2) {
1428            assert!(
1429                w[0].distance <= w[1].distance,
1430                "search results must be ascending by distance: {} > {}",
1431                w[0].distance,
1432                w[1].distance
1433            );
1434        }
1435
1436        // No duplicate ids in the result.
1437        let mut seen = std::collections::HashSet::new();
1438        for h in &hits {
1439            assert!(
1440                seen.insert(h.id.clone()),
1441                "duplicate id in search: {}",
1442                h.id
1443            );
1444        }
1445    }
1446
1447    // -----------------------------------------------------------------
1448    // v0.7.0 R3-S1 — eviction sink wires the on_index_eviction hook
1449    // -----------------------------------------------------------------
1450
1451    /// `test_hnsw_eviction_fires_hook` — when a sink is wired via
1452    /// [`VectorIndex::set_eviction_sink`] and the index inserts past
1453    /// its eviction cap, the eviction-edge code path pushes one
1454    /// [`EvictionEvent`] per evicted id onto the channel. This closes
1455    /// the G2/G8 "fire site exists but not wired" gap. We construct
1456    /// the index via [`VectorIndex::with_max_entries_for_test`] so a
1457    /// 6-entry insert sequence trips the eviction path in
1458    /// milliseconds without touching the production 100k cap.
1459    #[test]
1460    fn test_hnsw_eviction_fires_hook() {
1461        let (tx, rx) = std::sync::mpsc::channel::<EvictionEvent>();
1462        let idx = VectorIndex::with_max_entries_for_test(4);
1463        idx.set_eviction_sink(tx);
1464
1465        // Reset the process-local counters so concurrent tests
1466        // sharing the static don't bleed assertions into ours.
1467        reset_eviction_counters_for_test();
1468
1469        // Insert cap+2 entries — eviction drops the 2 oldest.
1470        let n = 6_usize;
1471        for i in 0..n {
1472            let mut v = vec![0.0_f32; 4];
1473            #[allow(clippy::cast_precision_loss)]
1474            let f = i as f32;
1475            v[i % 4] = 1.0 + f * 0.01;
1476            idx.insert(format!("evict-{i}"), make_embedding(&v));
1477        }
1478
1479        // Drain the channel. Expect TWO events (n=6, cap=4) — one
1480        // per evicted id. The unbounded sender does not block; the
1481        // events should already be enqueued by the time `insert`
1482        // returns, but we give the channel a small grace window for
1483        // thread-scheduling jitter on slow CI runners.
1484        let mut received: Vec<EvictionEvent> = Vec::new();
1485        let deadline = std::time::Instant::now() + std::time::Duration::from_millis(500);
1486        while std::time::Instant::now() < deadline && received.len() < 2 {
1487            while let Ok(ev) = rx.try_recv() {
1488                received.push(ev);
1489            }
1490            if received.len() < 2 {
1491                std::thread::sleep(std::time::Duration::from_millis(5));
1492            }
1493        }
1494
1495        assert_eq!(
1496            received.len(),
1497            2,
1498            "expected one EvictionEvent per evicted id (2 evictions for n=6, cap=4), got {}: {:?}",
1499            received.len(),
1500            received.iter().map(|e| &e.memory_id).collect::<Vec<_>>(),
1501        );
1502
1503        let ids: Vec<&str> = received.iter().map(|e| e.memory_id.as_str()).collect();
1504        assert!(
1505            ids.contains(&"evict-0"),
1506            "expected evict-0 in evicted ids; got {ids:?}"
1507        );
1508        assert!(
1509            ids.contains(&"evict-1"),
1510            "expected evict-1 in evicted ids; got {ids:?}"
1511        );
1512
1513        for ev in &received {
1514            assert_eq!(
1515                ev.reason, "max_entries_reached",
1516                "evicted reason should match the canonical tag, got {:?}",
1517                ev.reason
1518            );
1519            // namespace is intentionally empty at the hnsw layer
1520            // (the index does not carry namespace context); G9+ may
1521            // plumb it through. The wire field MUST be present even
1522            // when empty.
1523            assert_eq!(ev.namespace, "");
1524            assert!(
1525                !ev.evicted_at.is_empty(),
1526                "evicted_at must be set (rfc3339), got empty"
1527            );
1528        }
1529    }
1530
1531    /// Sanity: insertion without a sink wired is a no-op for the
1532    /// hook path. The eviction-edge code path must remain functional
1533    /// (oldest drained, cap enforced) even when no sink is set, so
1534    /// the CLI / test build's zero-cost posture is preserved.
1535    ///
1536    /// The proof of eviction is the PER-INDEX `len()` — after 6
1537    /// inserts into a max-4 index, exactly 4 entries survive, which
1538    /// is only possible if the eviction-edge path drained the 2
1539    /// oldest. We deliberately do NOT read the process-global
1540    /// `index_evictions_total()` counter here: a sibling test
1541    /// (`test_hnsw_eviction_fires_hook`) calls
1542    /// `reset_eviction_counters_for_test()`, which zeroes that
1543    /// shared static, so a concurrent multi-threaded test run could
1544    /// collapse a global-delta assertion to a flake. `idx.len()` is
1545    /// isolated to this index instance and deterministic.
1546    #[test]
1547    fn test_hnsw_eviction_without_sink_is_noop_for_hook() {
1548        let idx = VectorIndex::with_max_entries_for_test(4);
1549        // No `set_eviction_sink` call here — the index runs as in
1550        // CLI / pre-R3-S1 builds without a hooks pipeline.
1551
1552        for i in 0..6_usize {
1553            let mut v = vec![0.0_f32; 4];
1554            #[allow(clippy::cast_precision_loss)]
1555            let f = i as f32;
1556            v[i % 4] = 1.0 + f * 0.01;
1557            idx.insert(format!("noopsink-{i}"), make_embedding(&v));
1558        }
1559
1560        assert_eq!(
1561            idx.len(),
1562            4,
1563            "eviction-edge path must still enforce the max-4 cap even \
1564             without a sink wired (6 inserts → 4 survivors means 2 \
1565             evictions occurred); got len={}",
1566            idx.len()
1567        );
1568    }
1569}
1570
1571// ---------------------------------------------------------------------------
1572// #968 (Wave-2 Tier-C3) — async-rebuild + double-buffering regression tests
1573//
1574// These tests pin the contract introduced by issue #968:
1575//   1. A rebuild scheduled via `rebuild_async` does NOT block readers.
1576//      Search calls dispatched concurrently with the build complete in
1577//      <100 ms even when the build itself runs for seconds.
1578//   2. A build-time panic leaves `active` untouched so reads continue
1579//      to serve the prior snapshot.
1580//   3. Writes that land DURING a rebuild are preserved: post-rebuild
1581//      state includes the snapshot's entries PLUS the concurrent inserts.
1582//   4. The swap is atomic: no caller ever observes a partial-state graph
1583//      (e.g. one with half the entries missing).
1584// ---------------------------------------------------------------------------
1585#[cfg(test)]
1586mod d1_968_tests {
1587    use super::*;
1588    use std::sync::Arc as TArc;
1589    use std::sync::atomic::AtomicUsize;
1590    use std::time::{Duration, Instant};
1591
1592    fn make_embedding(values: &[f32]) -> Vec<f32> {
1593        let norm: f32 = values.iter().map(|v| v * v).sum::<f32>().sqrt();
1594        values.iter().map(|v| v / norm).collect()
1595    }
1596
1597    /// Build a deterministic embedding-set fixture of `n` 16-dim
1598    /// L2-normalised vectors. Tests use this to make the `build_hnsw`
1599    /// pass non-trivial without inflating compile time.
1600    fn fixture(n: usize) -> Vec<(String, Vec<f32>)> {
1601        (0..n)
1602            .map(|i| {
1603                let mut v = vec![0.0_f32; 16];
1604                #[allow(clippy::cast_precision_loss)]
1605                let f = i as f32;
1606                v[i % 16] = 1.0 + f * 0.001;
1607                (format!("id-{i}"), make_embedding(&v))
1608            })
1609            .collect()
1610    }
1611
1612    /// #968 contract 1 — search must remain responsive while a rebuild
1613    /// runs in the background. We spawn a rebuild_async over a
1614    /// reasonably-sized fixture (the build is CPU-bound but not
1615    /// minutes-long at this scale) and concurrently issue 50 search
1616    /// calls. The reader-loop must complete well under the time it
1617    /// would take if every search had to wait on the rebuild's inner
1618    /// mutex (which it never holds for more than microseconds).
1619    #[test]
1620    fn rebuild_async_does_not_block_search_968() {
1621        let idx = TArc::new(VectorIndex::build(fixture(2_000)));
1622        let query = make_embedding(&[1.0_f32; 16]);
1623
1624        // Start the rebuild OFF-THREAD.
1625        let idx_for_rebuild = TArc::clone(&idx);
1626        let rebuild_handle = std::thread::spawn(move || idx_for_rebuild.rebuild_async());
1627        // Concurrent search loop: fire 50 searches.
1628        let idx_for_search = TArc::clone(&idx);
1629        let search_start = Instant::now();
1630        let search_handle = std::thread::spawn(move || {
1631            for _ in 0..50 {
1632                let hits = idx_for_search.search(&query, 10);
1633                // Each search must return at most 10 hits (the k cap)
1634                // and the fixture has >10 entries, so a non-empty
1635                // result is the expected output. We assert non-empty
1636                // (rather than ==10) because the swap mid-loop can
1637                // briefly leave the graph empty while overflow takes
1638                // over — both shapes are correct.
1639                assert!(
1640                    !hits.is_empty(),
1641                    "search returned empty during rebuild — readers were blocked or the graph was lost"
1642                );
1643            }
1644        });
1645        // Wait for both to finish. The rebuild thread may take seconds;
1646        // the search thread must NOT.
1647        let _ = search_handle.join().expect("search thread panicked");
1648        let search_elapsed = search_start.elapsed();
1649        // The 50-search loop with a 2k-entry index should complete in
1650        // tens of ms. We use a 5-second budget — wide enough to
1651        // absorb CI jitter, narrow enough to catch the v0.6 regression
1652        // (which would have blocked for ~3-10s on the rebuild mutex).
1653        assert!(
1654            search_elapsed < Duration::from_secs(5),
1655            "50 searches took {:?} — readers blocked on the rebuild (v0.6 regression)",
1656            search_elapsed,
1657        );
1658        let _ = rebuild_handle.join().expect("rebuild thread panicked");
1659        // Drain any pending warming into active so post-test state is clean.
1660        idx.try_swap_warming();
1661    }
1662
1663    /// #968 contract 2 — if the build fails (we cannot easily induce
1664    /// a panic from `instant_distance::Builder::build` deterministically;
1665    /// instead we exercise the rebuild_in_flight short-circuit + the
1666    /// "no warmed result" path), the active graph is unchanged.
1667    /// Concretely: we call `rebuild_async` while a prior rebuild is in
1668    /// flight; the second call returns a no-op handle and leaves
1669    /// `active` serving the prior snapshot.
1670    #[test]
1671    fn rebuild_failure_leaves_active_unchanged_968() {
1672        let entries = fixture(50);
1673        let idx = VectorIndex::build(entries.clone());
1674
1675        // Pre-rebuild: search returns expected ids.
1676        let query = make_embedding(&[1.0_f32; 16]);
1677        let pre_hits = idx.search(&query, 5);
1678        assert_eq!(pre_hits.len(), 5);
1679        let pre_ids: std::collections::HashSet<String> =
1680            pre_hits.iter().map(|h| h.id.clone()).collect();
1681
1682        // Force the in-flight flag on so the next rebuild_async takes
1683        // the short-circuit path (returns a no-op handle).
1684        idx.rebuild_in_flight.store(true, Ordering::SeqCst);
1685        let handle = idx.rebuild_async();
1686        let _ = handle.join();
1687        // Active should be UNCHANGED — no warmed graph was parked.
1688        let post_hits = idx.search(&query, 5);
1689        let post_ids: std::collections::HashSet<String> =
1690            post_hits.iter().map(|h| h.id.clone()).collect();
1691        assert_eq!(
1692            pre_ids, post_ids,
1693            "search results changed after a no-op rebuild — active was clobbered"
1694        );
1695
1696        // Cleanup: clear the manually-poked flag.
1697        idx.rebuild_in_flight.store(false, Ordering::SeqCst);
1698    }
1699
1700    /// #968 contract 3 — writes during a rebuild are preserved.
1701    /// We snapshot a baseline, kick off a rebuild_async, then issue
1702    /// N concurrent inserts. After the rebuild lands, every inserted
1703    /// id must be findable via search (either via the new graph if the
1704    /// snapshot captured it, or via the overflow if it arrived after
1705    /// the snapshot — both paths must surface the id).
1706    #[test]
1707    fn concurrent_writes_during_rebuild_consistent_968() {
1708        let idx = TArc::new(VectorIndex::build(fixture(500)));
1709        let handle = {
1710            let idx = TArc::clone(&idx);
1711            std::thread::spawn(move || idx.rebuild_async())
1712        };
1713
1714        // Issue 30 concurrent inserts. These flow into overflow +
1715        // all_entries; the snapshot already captured at rebuild_async
1716        // time has its own entry list, so these inserts will remain
1717        // in overflow until the NEXT rebuild — but the swap path
1718        // trims only the overflow PREFIX present at snapshot time, so
1719        // the new entries survive.
1720        let inserts_done = TArc::new(AtomicUsize::new(0));
1721        let mut writer_handles = Vec::new();
1722        for i in 0..30 {
1723            let idx = TArc::clone(&idx);
1724            let counter = TArc::clone(&inserts_done);
1725            writer_handles.push(std::thread::spawn(move || {
1726                let mut v = vec![0.0_f32; 16];
1727                #[allow(clippy::cast_precision_loss)]
1728                let f = i as f32;
1729                v[i % 16] = 2.0 + f * 0.01;
1730                idx.insert(format!("concurrent-{i}"), make_embedding(&v));
1731                counter.fetch_add(1, Ordering::SeqCst);
1732            }));
1733        }
1734        for h in writer_handles {
1735            let _ = h.join();
1736        }
1737        let rebuild_h = handle.join().expect("outer rebuild spawner panicked");
1738        let _ = rebuild_h.join();
1739        // v0.7.0 #1212 — deterministic post-rebuild observation barrier.
1740        // Pre-#1212 a single `try_swap_warming()` followed immediately
1741        // by the search loop raced the cross-thread publication of the
1742        // swap under stressed CI runners (`SAL-only feature gate` /
1743        // parallel-test load). The fix is two-fold:
1744        //   (1) Drain ANY parked warming result in a tight loop — handles
1745        //       the rare double-rebuild case (writer crossing
1746        //       REBUILD_THRESHOLD during the concurrent-write phase).
1747        //   (2) Yield briefly so any post-swap state (`valid_ids_cache`
1748        //       lazy rebuild, HNSW search-side state init) settles
1749        //       before the search loop reads it. 10 ms is generous
1750        //       enough for any GHA runner under 4-way parallel-test
1751        //       contention; the test still completes in <100 ms total.
1752        let mut swaps = 0_usize;
1753        while idx.try_swap_warming() {
1754            swaps += 1;
1755            // Defensive cap — at v0.7.0 there can be at most one
1756            // parked warming result at a time, but loop bounded just
1757            // in case a follow-on rebuild parks one while we drain.
1758            if swaps > 4 {
1759                break;
1760            }
1761        }
1762        std::thread::sleep(Duration::from_millis(10));
1763
1764        // Verify all 30 ids survived the rebuild. The CONTRACT under
1765        // test is "post-rebuild state INCLUDES the concurrent writes"
1766        // — i.e. every concurrent id is in `all_entries` (graph OR
1767        // overflow). We assert that directly via `len()` + a
1768        // sufficiently-wide search rather than relying on tie-break
1769        // determinism at small k. The baseline fixture (500 entries
1770        // across 16 axes) clusters ~32 entries per axis at
1771        // post-normalization distance 0 from any axis query; concurrent
1772        // entries on the same axis tie with them at distance 0, and
1773        // truncate-to-k can clip a single tied entry under
1774        // tie-break-dependent sort behavior. The fix is to widen k
1775        // beyond the tie cluster, NOT to weaken the contract.
1776        assert_eq!(inserts_done.load(Ordering::SeqCst), 30);
1777        let final_len = idx.len();
1778        // v0.7.0 #1212 — snapshot the post-swap private state ONCE so
1779        // every downstream assertion's panic message can cite the same
1780        // ground-truth set of values. Pre-#1212 a panic at line
1781        // src/hnsw.rs:1555 reported only "found < 29" with NO
1782        // observable state, making the CI flake un-diagnosable
1783        // without local repro. Capturing overflow.len() + hnsw size
1784        // here (under a single inner-lock guard) is the diagnostic
1785        // hook the operator + future agent needs to identify which
1786        // buffer the concurrent inserts landed in at panic time.
1787        let (overflow_len_dbg, hnsw_size_dbg) = {
1788            let state = idx.inner.lock().expect("inner mutex poisoned");
1789            let hnsw_size = state.hnsw.as_ref().map_or(0, |h| h.iter().count());
1790            (state.overflow.len(), hnsw_size)
1791        };
1792        assert_eq!(
1793            final_len,
1794            530,
1795            "post-rebuild len must equal baseline 500 + concurrent 30 = 530, \
1796             got {final_len} (overflow={overflow_len_dbg}, hnsw={hnsw_size_dbg}, \
1797             swaps={swaps}, inserts_done={})",
1798            inserts_done.load(Ordering::SeqCst)
1799        );
1800        // Assert the survival CONTRACT — "post-rebuild state INCLUDES
1801        // every concurrent write" — DETERMINISTICALLY via `all_entries`
1802        // membership (each id is in the graph OR overflow), not via the
1803        // approximate HNSW `search()` path.
1804        //
1805        // Why not `search()`: the active graph is built with
1806        // `Builder::default()` and queried with `Search::default()`, so
1807        // the search beam (`ef`) is a fixed library default that does
1808        // NOT scale with `k`. `search(q, 600)` over a 530-entry index
1809        // therefore returns only the top-`ef` graph candidates plus the
1810        // overflow scan — a graph-resident entry outside the beam is
1811        // intermittently clipped under stressed / instrumented runners.
1812        // The llvm-cov coverage job (run 27001253837/27001253833) hit
1813        // exactly this: `found` came back 28/30 while `final_len == 530`
1814        // simultaneously PROVED all 30 were present. Earlier revisions
1815        // chased it by widening `k` (no effect — the limit is `ef`, not
1816        // `k`) and then relaxing the floor to 29 (still flaky: 2-id
1817        // clips occur under heavy instrumentation). Membership over
1818        // `all_entries` is exact and race-free, so it is both the
1819        // correct expression of the contract AND immune to ANN-recall
1820        // jitter.
1821        //
1822        // The `swaps`/`overflow`/`hnsw` diagnostics captured above are
1823        // retained in the panic message so any genuine drop (a swap that
1824        // loses a write) still names the failure mode in one read.
1825        let present: std::collections::HashSet<String> = {
1826            let state = idx.inner.lock().expect("inner mutex poisoned");
1827            state.all_entries.iter().map(|(id, _)| id.clone()).collect()
1828        };
1829        let missing: Vec<String> = (0..30)
1830            .map(|i| format!("concurrent-{i}"))
1831            .filter(|id| !present.contains(id))
1832            .collect();
1833        assert!(
1834            missing.is_empty(),
1835            "post-rebuild all_entries must INCLUDE every concurrent write \
1836             (missing={missing:?}; post-swap state: overflow={overflow_len_dbg}, \
1837             hnsw={hnsw_size_dbg}, swaps={swaps}, final_len={final_len}); \
1838             a missing id means the concurrent-rebuild swap dropped a write"
1839        );
1840    }
1841
1842    /// #968 contract 4 — the swap is atomic. We never observe a
1843    /// half-populated graph during the swap window. To check this we
1844    /// run many search-then-len pairs concurrently with a rebuild
1845    /// and assert that `len()` is monotonically >= the baseline at
1846    /// every observation point (the swap only adds entries, never
1847    /// loses them).
1848    #[test]
1849    fn rebuild_swap_is_atomic_968() {
1850        let idx = TArc::new(VectorIndex::build(fixture(1_000)));
1851        let baseline_len = idx.len();
1852        let stop = TArc::new(AtomicBool::new(false));
1853        let observer_stop = TArc::clone(&stop);
1854        let idx_obs = TArc::clone(&idx);
1855        let observer = std::thread::spawn(move || {
1856            while !observer_stop.load(Ordering::SeqCst) {
1857                let l = idx_obs.len();
1858                assert!(
1859                    l >= baseline_len,
1860                    "len() dropped below baseline during rebuild — partial swap observed: {l} < {baseline_len}"
1861                );
1862            }
1863        });
1864        // Run a real rebuild.
1865        let h = idx.rebuild_async();
1866        let _ = h.join();
1867        idx.try_swap_warming();
1868        // Stop the observer.
1869        stop.store(true, Ordering::SeqCst);
1870        let _ = observer.join();
1871        assert_eq!(idx.len(), baseline_len);
1872    }
1873
1874    // v0.7.0 #1074 (SR-2 #2, HIGH) — eviction-then-rebuild gap.
1875    // The swap path must DROP a warming result whose
1876    // `overflow_generation` doesn't match the current state, so an
1877    // entry inserted into overflow AFTER a clear() bump is not
1878    // mistakenly drained out as if it were a captured snapshot
1879    // entry. We park a stale-gen result directly in the warming
1880    // slot and confirm try_swap_warming refuses to swap AND does
1881    // not drain overflow.
1882    #[test]
1883    fn stale_warming_swap_is_dropped_1074() {
1884        let idx = VectorIndex::empty();
1885        // Insert a non-trivial overflow entry so we can assert
1886        // try_swap_warming doesn't drain it.
1887        idx.insert(
1888            "alpha".to_string(),
1889            make_embedding(&[1.0_f32, 0.0, 0.0, 0.0]),
1890        );
1891        let before_overflow = idx.inner.lock().unwrap().overflow.len();
1892        assert_eq!(before_overflow, 1);
1893
1894        // Park a STALE-generation warming result with a swap that
1895        // would otherwise drain everything.
1896        {
1897            let current_gen = idx.inner.lock().unwrap().overflow_generation;
1898            let mut w = idx.warming.lock().unwrap();
1899            *w = Some(RebuildResult {
1900                hnsw: None,
1901                overflow_at_snapshot: 999, // would have drained the whole overflow
1902                overflow_generation: current_gen.wrapping_add(1), // mismatched
1903                entries_in_graph: 0,
1904            });
1905        }
1906
1907        // Swap MUST refuse and leave overflow intact.
1908        let swapped = idx.try_swap_warming();
1909        assert!(
1910            !swapped,
1911            "stale-by-generation warming must NOT swap in (#1074)"
1912        );
1913        let after_overflow = idx.inner.lock().unwrap().overflow.len();
1914        assert_eq!(
1915            after_overflow, before_overflow,
1916            "stale swap must NOT drain overflow (#1074 regression)"
1917        );
1918        // The alpha entry must still be findable via the linear
1919        // overflow scan (no graph yet).
1920        let hits = idx.search(&make_embedding(&[1.0_f32, 0.0, 0.0, 0.0]), 5);
1921        assert!(hits.iter().any(|h| h.id == "alpha"));
1922    }
1923
1924    // v0.7.0 #1074 — confirm overflow.clear() in the eviction path
1925    // bumps the generation counter so a snapshot captured BEFORE the
1926    // eviction will fail the gen check on swap. This pins the load-
1927    // bearing invariant: clear() must always bump.
1928    #[test]
1929    fn eviction_clear_bumps_overflow_generation_1074() {
1930        let idx = VectorIndex::with_max_entries_for_test(2);
1931        let gen_initial = idx.inner.lock().unwrap().overflow_generation;
1932        // 3 inserts past cap=2 → at least one eviction-clear fires.
1933        for i in 0..3 {
1934            let mut v = vec![0.0_f32; 4];
1935            v[i % 4] = 1.0;
1936            idx.insert(format!("e{i}"), make_embedding(&v));
1937        }
1938        let gen_after = idx.inner.lock().unwrap().overflow_generation;
1939        assert!(
1940            gen_after > gen_initial,
1941            "eviction-clear path must bump overflow_generation (#1074)"
1942        );
1943    }
1944
1945    // -----------------------------------------------------------------
1946    // #1579 — async-boot seeding + coverage accounting
1947    // -----------------------------------------------------------------
1948
1949    #[test]
1950    fn seed_entries_dedupes_and_defers_searchability_1579() {
1951        let idx = VectorIndex::empty();
1952        // A write that arrived through the normal path stays covered
1953        // (overflow is linearly scanned).
1954        idx.insert("live".into(), make_embedding(&[1.0, 0.0, 0.0]));
1955        assert!(
1956            idx.is_fully_searchable(),
1957            "overflow-only index is fully searchable"
1958        );
1959        // Seed the boot snapshot; the id already present must be
1960        // skipped (no double-count), the rest land in all_entries
1961        // only.
1962        let seeded = idx.seed_entries(vec![
1963            ("live".into(), make_embedding(&[1.0, 0.0, 0.0])),
1964            ("a".into(), make_embedding(&[0.0, 1.0, 0.0])),
1965            ("b".into(), make_embedding(&[0.0, 0.0, 1.0])),
1966        ]);
1967        assert_eq!(seeded, 2, "duplicate id must be skipped");
1968        assert_eq!(idx.len(), 3);
1969        assert!(
1970            !idx.is_fully_searchable(),
1971            "seeded-but-unbuilt entries must report not-fully-searchable \
1972             so the conflict check routes to its fallback (#1579 A5/B3)"
1973        );
1974        // Search during the warm window must still serve the live
1975        // (overflow) entry — and must NOT pretend to cover the seeds.
1976        let hits = idx.search(&make_embedding(&[0.0, 1.0, 0.0]), 3);
1977        assert!(hits.iter().all(|h| h.id == "live"));
1978    }
1979
1980    #[test]
1981    fn warm_boot_seeds_builds_and_swaps_1579() {
1982        let idx = VectorIndex::empty();
1983        let seeded = idx.warm_boot(vec![
1984            ("a".into(), make_embedding(&[1.0, 0.0, 0.0])),
1985            ("b".into(), make_embedding(&[0.0, 1.0, 0.0])),
1986            ("c".into(), make_embedding(&[0.0, 0.0, 1.0])),
1987        ]);
1988        assert_eq!(seeded, 3);
1989        assert!(
1990            idx.is_fully_searchable(),
1991            "warm_boot must drive rebuild→swap to completion"
1992        );
1993        let hits = idx.search(&make_embedding(&[1.0, 0.0, 0.0]), 2);
1994        assert_eq!(hits.first().map(|h| h.id.as_str()), Some("a"));
1995    }
1996
1997    #[test]
1998    fn build_and_insert_preserve_full_searchability_1579() {
1999        let idx = VectorIndex::build(vec![
2000            ("a".into(), make_embedding(&[1.0, 0.0, 0.0])),
2001            ("b".into(), make_embedding(&[0.0, 1.0, 0.0])),
2002        ]);
2003        assert!(idx.is_fully_searchable(), "synchronous build covers all");
2004        idx.insert("c".into(), make_embedding(&[0.0, 0.0, 1.0]));
2005        assert!(
2006            idx.is_fully_searchable(),
2007            "insert lands in overflow — coverage preserved"
2008        );
2009        idx.remove("a");
2010        assert!(
2011            idx.is_fully_searchable(),
2012            "remove only shrinks all_entries — coverage preserved"
2013        );
2014    }
2015}