Skip to main content

luci/vector/
hnsw.rs

1//! HNSW (Hierarchical Navigable Small World) graph for approximate nearest
2//! neighbor search.
3//!
4//! See [[hierarchical-navigable-small-world]] and [[architecture-overview#Step 2]].
5
6use std::cmp::Ordering;
7use std::collections::{BinaryHeap, HashSet};
8use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
9
10use crate::core::LuciError;
11use crate::mapping::QuantizationType;
12use parking_lot::RwLock;
13use rayon::ThreadPoolBuilder;
14use rayon::current_num_threads;
15use rayon::prelude::*;
16
17use super::{DistanceMetric, normalize_in_place};
18
19/// Sentinel ordinal in the packed [`HnswBuilder::entry`] word meaning
20/// "no entry point yet". `u32::MAX` is never a real ordinal (ordinals are
21/// dense from 0), so it unambiguously marks the un-bootstrapped state.
22/// See [[optimization-concurrent-hnsw-insert]].
23const ENTRY_SENTINEL: u32 = u32::MAX;
24
25/// Pack `(entry_point, max_level)` into one `u64` so a concurrent descent
26/// reads a consistent pair in a single atomic load — Lucene's
27/// `AtomicReference<EntryNode>` as a packed word. High 32 bits = entry
28/// ordinal, low 32 = max level.
29fn pack_entry(entry_point: u32, max_level: u32) -> u64 {
30    ((entry_point as u64) << 32) | (max_level as u64)
31}
32
33/// Inverse of [`pack_entry`] — returns `(entry_point, max_level)`.
34fn unpack_entry(packed: u64) -> (u32, u32) {
35    ((packed >> 32) as u32, packed as u32)
36}
37
38/// Thread budget for the parallel connect phase
39/// ([[optimization-concurrent-hnsw-insert]]). `Ambient` uses rayon's
40/// global pool (the production default); `Fixed(n)` runs in a scoped
41/// `n`-thread pool. `Fixed(1)` takes the sequential, bit-identical path
42/// (the deterministic test/profile contract).
43#[derive(Clone, Copy, Debug, PartialEq, Eq)]
44pub enum BuildThreads {
45    Ambient,
46    Fixed(usize),
47}
48
49/// Magic prefix for v2+ HNSW segments — `b"LHNS"`.
50///
51/// v1 segments wrote `dims: u32` as their first 4 bytes, so any v1
52/// segment whose dims happen to equal `0x534E484C` (1 397 772 876)
53/// would collide. No real HNSW segment carries 1.4B dimensions, so the
54/// detection is unambiguous. See [[optimize-cosine-norm-precompute]].
55pub const HNSW_FORMAT_MAGIC: [u8; 4] = *b"LHNS";
56
57/// Current on-disk HNSW format version. Bumped when the segment bytes
58/// change in a way readers must observe. v2 added the magic prefix and
59/// the cosine kernel's "vectors are unit-length on disk" invariant.
60pub const HNSW_FORMAT_VERSION: u8 = 2;
61
62/// On-disk HNSW format version. v1 is the legacy format with no magic
63/// prefix; v2 added [[HNSW_FORMAT_MAGIC]] and the cosine
64/// normalize-at-insert invariant.
65#[derive(Clone, Copy, Debug, PartialEq, Eq)]
66pub enum HnswFormatVersion {
67    V1,
68    V2,
69}
70
71/// Parsed HNSW header fields plus a byte cursor pointing at the next
72/// section (the flat vector array).
73#[derive(Clone, Copy, Debug)]
74pub struct HnswHeader {
75    pub version: HnswFormatVersion,
76    pub dims: usize,
77    pub m: usize,
78    pub metric: DistanceMetric,
79    pub num_vectors: usize,
80    pub entry_point: Option<u32>,
81    pub max_level: usize,
82    /// Byte offset of the section immediately after the header, used by
83    /// every caller to position its own parse cursor.
84    pub vectors_offset: usize,
85}
86
87/// Read exactly `n` bytes at `*pos`, advancing `*pos`. Returns
88/// [`LuciError::IndexCorrupted`] instead of slice-panicking when the blob
89/// is truncated, so a malformed `.luci` vector blob fails loudly rather
90/// than out-of-bounds. Per [[cross-platform-portability]] a valid file is
91/// never truncated, so this rejects only genuinely-corrupt input. Shared
92/// with [`super::global`]'s `load_field`, the other vector-index reader.
93pub(crate) fn take_bytes<'a>(
94    data: &'a [u8],
95    pos: &mut usize,
96    n: usize,
97) -> Result<&'a [u8], LuciError> {
98    let start = *pos;
99    let end = start
100        .checked_add(n)
101        .filter(|&e| e <= data.len())
102        .ok_or_else(|| {
103            LuciError::IndexCorrupted(format!(
104                "vector index blob truncated: need {n} bytes at offset {start}, have {} total",
105                data.len()
106            ))
107        })?;
108    *pos = end;
109    Ok(&data[start..end])
110}
111
112pub(crate) fn read_u32(data: &[u8], pos: &mut usize) -> Result<u32, LuciError> {
113    Ok(u32::from_le_bytes(
114        take_bytes(data, pos, 4)?.try_into().unwrap(),
115    ))
116}
117
118pub(crate) fn read_u64(data: &[u8], pos: &mut usize) -> Result<u64, LuciError> {
119    Ok(u64::from_le_bytes(
120        take_bytes(data, pos, 8)?.try_into().unwrap(),
121    ))
122}
123
124fn read_u8(data: &[u8], pos: &mut usize) -> Result<u8, LuciError> {
125    Ok(take_bytes(data, pos, 1)?[0])
126}
127
128fn read_f32(data: &[u8], pos: &mut usize) -> Result<f32, LuciError> {
129    Ok(f32::from_le_bytes(
130        take_bytes(data, pos, 4)?.try_into().unwrap(),
131    ))
132}
133
134/// Validate a length prefix before it is used to pre-allocate, so a corrupt
135/// count cannot drive a huge speculative `Vec::with_capacity` on malformed
136/// input. Each element consumes at least `min_elem_bytes` downstream, so a
137/// count exceeding `remaining / min_elem_bytes` cannot be satisfied by the
138/// blob and is rejected as corruption. Shared with [`super::global`]'s
139/// `load_field`.
140pub(crate) fn checked_len(
141    count: usize,
142    min_elem_bytes: usize,
143    data: &[u8],
144    pos: usize,
145) -> Result<usize, LuciError> {
146    let remaining = data.len().saturating_sub(pos);
147    if min_elem_bytes != 0 && count > remaining / min_elem_bytes {
148        return Err(LuciError::IndexCorrupted(format!(
149            "vector index blob declares {count} elements (≥{min_elem_bytes} B each) \
150             but only {remaining} bytes remain"
151        )));
152    }
153    Ok(count)
154}
155
156/// Parse the HNSW segment header.
157///
158/// Dispatches on the 4-byte magic prefix: v2 segments start with `LHNS`
159/// followed by a version byte; everything else is treated as a v1
160/// segment. v1 cosine segments are rejected with a migration error —
161/// they stored raw vectors and the v2 kernel requires unit-length
162/// vectors on disk. v1 L2 / DotProduct segments load unchanged.
163pub fn read_header(data: &[u8]) -> Result<HnswHeader, LuciError> {
164    let (version, mut pos) = if data.len() >= 5 && data[0..4] == HNSW_FORMAT_MAGIC {
165        let v = data[4];
166        if v != HNSW_FORMAT_VERSION {
167            return Err(LuciError::SegmentFormatUnknown(format!(
168                "unknown HNSW format version: {v}",
169            )));
170        }
171        (HnswFormatVersion::V2, 5_usize)
172    } else {
173        (HnswFormatVersion::V1, 0_usize)
174    };
175
176    let dims = read_u32(data, &mut pos)? as usize;
177    let m = read_u32(data, &mut pos)? as usize;
178    let metric = DistanceMetric::from_byte(read_u8(data, &mut pos)?);
179
180    if version == HnswFormatVersion::V1 && metric == DistanceMetric::Cosine {
181        return Err(LuciError::SegmentFormatMigrationRequired(
182            "cosine HNSW segment was built with Luci ≤ 0.7.1 which stored \
183             raw vectors. The v0.7.2+ kernel requires unit-length vectors \
184             on disk. Re-index this collection."
185                .into(),
186        ));
187    }
188
189    let num_vectors = read_u32(data, &mut pos)? as usize;
190    let ep = read_u32(data, &mut pos)?;
191    let entry_point = if ep == u32::MAX { None } else { Some(ep) };
192    let max_level = read_u32(data, &mut pos)? as usize;
193
194    Ok(HnswHeader {
195        version,
196        dims,
197        m,
198        metric,
199        num_vectors,
200        entry_point,
201        max_level,
202        vectors_offset: pos,
203    })
204}
205
206/// HNSW index parameters.
207#[derive(Clone, Debug)]
208pub struct HnswParams {
209    pub dims: usize,
210    pub m: usize,               // max connections per node per layer
211    pub ef_construction: usize, // beam width during construction
212    pub metric: DistanceMetric,
213    /// How stored vectors are quantized for fast distance computation.
214    /// Validated at [`HnswBuilder::new`]; only [`QuantizationType::None`]
215    /// and [`QuantizationType::Int8`] are accepted today. The mapping
216    /// parser rejects unimplemented values, so this is a defense-in-depth
217    /// check rather than a routing fork.
218    pub quantization: QuantizationType,
219}
220
221impl Default for HnswParams {
222    fn default() -> Self {
223        Self {
224            dims: 128,
225            m: 16,
226            ef_construction: 100,
227            metric: DistanceMetric::Cosine,
228            quantization: QuantizationType::DEFAULT,
229        }
230    }
231}
232
233/// A scored neighbor candidate (lower distance = better).
234#[derive(Clone, Copy)]
235struct Candidate {
236    id: u32,
237    dist: f32,
238}
239
240impl PartialEq for Candidate {
241    fn eq(&self, o: &Self) -> bool {
242        self.dist == o.dist
243    }
244}
245impl Eq for Candidate {}
246impl PartialOrd for Candidate {
247    fn partial_cmp(&self, o: &Self) -> Option<Ordering> {
248        Some(self.cmp(o))
249    }
250}
251// Min-heap: lower distance first
252impl Ord for Candidate {
253    fn cmp(&self, o: &Self) -> Ordering {
254        o.dist.partial_cmp(&self.dist).unwrap_or(Ordering::Equal)
255    }
256}
257
258/// Max-heap version for finding furthest candidates.
259struct FurthestCandidate(Candidate);
260impl PartialEq for FurthestCandidate {
261    fn eq(&self, o: &Self) -> bool {
262        self.0.dist == o.0.dist
263    }
264}
265impl Eq for FurthestCandidate {}
266impl PartialOrd for FurthestCandidate {
267    fn partial_cmp(&self, o: &Self) -> Option<Ordering> {
268        Some(self.cmp(o))
269    }
270}
271impl Ord for FurthestCandidate {
272    fn cmp(&self, o: &Self) -> Ordering {
273        self.0
274            .dist
275            .partial_cmp(&o.0.dist)
276            .unwrap_or(Ordering::Equal)
277    }
278}
279
280/// Node in the HNSW graph.
281#[derive(Clone)]
282pub(crate) struct Node {
283    /// Connections per layer: layer_index → vec of neighbor ids
284    neighbors: Vec<Vec<u32>>,
285    level: usize,
286}
287
288/// HNSW graph builder.
289///
290/// `nodes` holds each node's neighbor lists behind a per-node
291/// `parking_lot::RwLock` (data-in-lock, qdrant's shape) so the parallel
292/// connect phase ([[optimization-concurrent-hnsw-insert]]) can mutate
293/// edges through `&self` while distance reads hit the separate,
294/// connect-phase-immutable `vectors`. `entry` packs
295/// `(entry_point, max_level)` into one atomic for a consistent
296/// single-load descent. `Node` is reused as the in-lock payload — it is
297/// structurally the design's `NodeNeighbors`, so reusing it keeps the
298/// `build()` conversion a plain `into_inner()`. The manual `Clone`
299/// (below) snapshots each lock; `#[derive(Clone)]` cannot, since neither
300/// `RwLock` nor `AtomicU64` is `Clone`.
301pub struct HnswBuilder {
302    params: HnswParams,
303    /// Connect-phase-immutable, so distance reads are race-free without
304    /// locks; only neighbor lists and `entry` mutate concurrently.
305    vectors: Vec<Vec<f32>>,
306    /// Per-node neighbor lists, each behind its own lock.
307    nodes: Vec<RwLock<Node>>,
308    /// Packed `(entry_point:u32, max_level:u32)`; `entry_point ==
309    /// ENTRY_SENTINEL` means un-bootstrapped.
310    entry: AtomicU64,
311    /// `ceil(len/64)` words; bit `ord` set (Release) ⇒ node `ord` is
312    /// fully linked at **every** level and safe to traverse. Required for
313    /// correctness, not just safety: it closes the inter-level race a
314    /// per-node `RwLock` alone cannot (a traverser must never route
315    /// through a node mid-multi-level-build). See
316    /// [[optimization-concurrent-hnsw-insert]].
317    ready: Vec<AtomicU64>,
318    /// Pending-tail cursor: nodes `[0, connected_count)` are linked;
319    /// `[connected_count, vectors.len())` are stored-but-unlinked and are
320    /// linked by the next `connect_pending`.
321    connected_count: usize,
322    level_mult: f64,
323    rng_state: u64,
324}
325
326impl Clone for HnswBuilder {
327    /// Snapshots each node out of its lock and copies the packed entry.
328    /// The sole load-bearing caller is `get_or_build_index`
329    /// (`builder.clone().build()`), where the locks are uncontended.
330    fn clone(&self) -> Self {
331        Self {
332            params: self.params.clone(),
333            vectors: self.vectors.clone(),
334            nodes: self
335                .nodes
336                .iter()
337                .map(|n| RwLock::new(n.read().clone()))
338                .collect(),
339            entry: AtomicU64::new(self.entry.load(AtomicOrdering::Relaxed)),
340            ready: self
341                .ready
342                .iter()
343                .map(|w| AtomicU64::new(w.load(AtomicOrdering::Relaxed)))
344                .collect(),
345            connected_count: self.connected_count,
346            level_mult: self.level_mult,
347            rng_state: self.rng_state,
348        }
349    }
350}
351
352impl HnswBuilder {
353    /// # Panics
354    ///
355    /// Panics if `params.quantization` is a recognized but unimplemented
356    /// variant ([`QuantizationType::Int4`] or [`QuantizationType::Bbq`]).
357    /// The mapping parser rejects these values at index creation time, so
358    /// reaching this panic indicates an upstream wiring bug — not user
359    /// input. The panic is preferred over silently substituting a
360    /// different quantization; see [[code-must-not-lie]].
361    pub fn new(params: HnswParams) -> Self {
362        match params.quantization {
363            QuantizationType::None | QuantizationType::Int8 => {}
364            unsupported @ (QuantizationType::Int4 | QuantizationType::Bbq) => {
365                panic!(
366                    "HnswBuilder constructed with unimplemented quantization \
367                     type {unsupported:?}; the mapping parser should have \
368                     rejected this at index creation. This is an internal \
369                     wiring bug, not user input."
370                );
371            }
372        }
373        let level_mult = 1.0 / (params.m as f64).ln();
374        Self {
375            params,
376            vectors: Vec::new(),
377            nodes: Vec::new(),
378            entry: AtomicU64::new(pack_entry(ENTRY_SENTINEL, 0)),
379            ready: Vec::new(),
380            connected_count: 0,
381            level_mult,
382            rng_state: 42,
383        }
384    }
385
386    /// Construct a builder pre-sized for a known final ordinal range,
387    /// intended for the merge path ([[optimize-hnsw-merge-stitching]]).
388    /// `capacity` is the final merged-segment doc count; the builder
389    /// reserves `vectors[]` and `nodes[]` of that size with empty
390    /// placeholders.
391    ///
392    /// Callers must populate every slot either by
393    /// [`Self::seed_from_graph`] (bulk-copy from a source segment's
394    /// graph) or [`Self::add_vector_at_ordinal`]. Slots left empty at
395    /// [`Self::build`] time are still part of `vectors.len()` and will
396    /// produce a degenerate graph — that's a caller bug to surface
397    /// during testing.
398    pub fn with_capacity_for_merge(params: HnswParams, capacity: usize) -> Self {
399        match params.quantization {
400            QuantizationType::None | QuantizationType::Int8 => {}
401            unsupported @ (QuantizationType::Int4 | QuantizationType::Bbq) => {
402                panic!(
403                    "HnswBuilder::with_capacity_for_merge constructed with \
404                     unimplemented quantization type {unsupported:?}; the \
405                     mapping parser should have rejected this at index \
406                     creation. This is an internal wiring bug, not user \
407                     input."
408                );
409            }
410        }
411        let level_mult = 1.0 / (params.m as f64).ln();
412        let mut vectors = Vec::with_capacity(capacity);
413        let mut nodes = Vec::with_capacity(capacity);
414        for _ in 0..capacity {
415            vectors.push(Vec::new());
416            nodes.push(RwLock::new(Node {
417                neighbors: Vec::new(),
418                level: 0,
419            }));
420        }
421        let mut ready = Vec::with_capacity(capacity.div_ceil(64));
422        for _ in 0..capacity.div_ceil(64) {
423            ready.push(AtomicU64::new(0));
424        }
425        Self {
426            params,
427            vectors,
428            nodes,
429            entry: AtomicU64::new(pack_entry(ENTRY_SENTINEL, 0)),
430            ready,
431            connected_count: 0,
432            level_mult,
433            rng_state: 42,
434        }
435    }
436
437    /// Seed the merge-time builder with a source segment's graph,
438    /// remapping its ordinals into the merged segment's ordinal space.
439    ///
440    /// `seed`: the source segment's parsed graph (levels + neighbors).
441    /// `hnsw_bytes`: the source segment's HNSW byte range, starting at
442    ///   the magic prefix / header. The vector data is read out of
443    ///   `hnsw_bytes[seed.vector_data_offset..]` four bytes per `f32`
444    ///   (little-endian, matches the on-disk format).
445    /// `ord_map`: function mapping source ordinals to merged ordinals.
446    ///   Must return a valid merged ordinal for every source ordinal in
447    ///   `0..seed.num_vectors`.
448    ///
449    /// After this returns, `self.entry_point` and `self.max_level`
450    /// reflect the seed graph; non-seed slots stay empty until a
451    /// caller fills them via [`Self::add_vector_at_ordinal`].
452    ///
453    /// **Cosine note:** the source segment's stored vectors are already
454    /// unit-length under v0.7.2's invariant
455    /// ([[optimize-cosine-norm-precompute]]), so they go in without
456    /// re-normalization. v1 cosine segments cannot reach this path —
457    /// [[read_header]] rejects them at open time.
458    pub fn seed_from_graph<F>(&mut self, seed: &ParsedGraph, hnsw_bytes: &[u8], ord_map: F)
459    where
460        F: Fn(u32) -> u32,
461    {
462        let dims = self.params.dims;
463        debug_assert_eq!(seed.params.dims, dims);
464        debug_assert_eq!(seed.params.metric, self.params.metric);
465        debug_assert_eq!(seed.params.m, self.params.m);
466
467        for src_ord in 0..seed.num_vectors as u32 {
468            let merged_ord = ord_map(src_ord);
469            let start = seed.vector_data_offset + src_ord as usize * dims * 4;
470            let mut v = Vec::with_capacity(dims);
471            for d in 0..dims {
472                let off = start + d * 4;
473                v.push(f32::from_le_bytes(
474                    hnsw_bytes[off..off + 4].try_into().unwrap(),
475                ));
476            }
477            self.vectors[merged_ord as usize] = v;
478
479            let src_node = &seed.nodes[src_ord as usize];
480            let neighbors: Vec<Vec<u32>> = src_node
481                .neighbors
482                .iter()
483                .map(|layer| layer.iter().copied().map(&ord_map).collect())
484                .collect();
485            self.nodes[merged_ord as usize] = RwLock::new(Node {
486                neighbors,
487                level: src_node.level,
488            });
489        }
490
491        let ep = seed.entry_point.map(&ord_map).unwrap_or(ENTRY_SENTINEL);
492        self.entry.store(
493            pack_entry(ep, seed.max_level as u32),
494            AtomicOrdering::Relaxed,
495        );
496    }
497
498    fn next_rand(&mut self) -> f64 {
499        // Simple xorshift64 for deterministic tests
500        self.rng_state ^= self.rng_state << 13;
501        self.rng_state ^= self.rng_state >> 7;
502        self.rng_state ^= self.rng_state << 17;
503        (self.rng_state as f64) / (u64::MAX as f64)
504    }
505
506    fn random_level(&mut self) -> usize {
507        let r = self.next_rand().max(1e-10);
508        (-r.ln() * self.level_mult).floor() as usize
509    }
510
511    /// Store a vector **without linking it into the graph** — the defer
512    /// half of the [[declare-intent-defer-execution]] write model. The
513    /// vector is assigned ordinal `self.vectors.len()`, normalized
514    /// (cosine), given a drawn level and empty per-level neighbor lists,
515    /// and its `ready` bit left unset. The node joins the pending tail
516    /// `[connected_count, vectors.len())` and is linked by a later
517    /// [`Self::connect_pending`]. Cheap and sequential.
518    ///
519    /// For [`DistanceMetric::Cosine`], the input is normalized to unit
520    /// length in place so the kernel can run as a single-pass dot product
521    /// (see [[optimize-cosine-norm-precompute]]). Zero / non-finite
522    /// vectors are rejected with `LuciError::InvalidQuery` rather than
523    /// silently embedded — see [[code-must-not-lie]].
524    pub fn store_vector(&mut self, mut vector: Vec<f32>) -> Result<(), LuciError> {
525        debug_assert_eq!(vector.len(), self.params.dims);
526        if self.params.metric == DistanceMetric::Cosine {
527            normalize_in_place(&mut vector)?;
528        }
529        let ord = self.vectors.len();
530        let level = self.random_level();
531
532        self.vectors.push(vector);
533        let mut neighbors = Vec::with_capacity(level + 1);
534        for _ in 0..=level {
535            neighbors.push(Vec::new());
536        }
537        self.nodes.push(RwLock::new(Node { neighbors, level }));
538        // Grow the ready bitset to cover the new ord (bit starts unset).
539        if ord / 64 >= self.ready.len() {
540            self.ready.push(AtomicU64::new(0));
541        }
542        Ok(())
543    }
544
545    /// Store **and** link a vector in one call (sequential). Convenience
546    /// for tests and one-shot builders; equivalent to `store_vector`
547    /// followed by an immediate sequential connect, advancing the
548    /// connected-prefix cursor. The deferred write path uses
549    /// `store_vector` + `connect_pending` instead.
550    pub fn add_vector(&mut self, vector: Vec<f32>) -> Result<(), LuciError> {
551        let id = self.vectors.len() as u32;
552        self.store_vector(vector)?;
553        let level = self.nodes[id as usize].read().level;
554        self.connect_node(id, level);
555        self.set_ready(id);
556        self.connected_count = self.vectors.len();
557        Ok(())
558    }
559
560    /// Add a vector at a specific pre-reserved ordinal. Used by the merge
561    /// path ([[optimize-hnsw-merge-stitching]]) which knows the final
562    /// merged-segment doc count up front and reserves slots via
563    /// [`Self::with_capacity_for_merge`]. The slot at `ord` must already
564    /// exist (otherwise the builder would have been a [`Self::new`]
565    /// instance and the caller should use [`Self::add_vector`] instead).
566    ///
567    /// Cosine normalize / zero-vector rejection happens here identically
568    /// to `add_vector`. The same insertion routine runs once the slot is
569    /// populated — search_layer against the partially-filled graph,
570    /// neighbor heuristic, back-links — so the resulting edges are
571    /// indistinguishable from a from-scratch build modulo level-sampling
572    /// RNG state.
573    pub fn add_vector_at_ordinal(
574        &mut self,
575        ord: u32,
576        mut vector: Vec<f32>,
577    ) -> Result<(), LuciError> {
578        debug_assert_eq!(vector.len(), self.params.dims);
579        debug_assert!((ord as usize) < self.vectors.len());
580        debug_assert!(
581            self.vectors[ord as usize].is_empty(),
582            "add_vector_at_ordinal called on already-filled ordinal {ord}",
583        );
584        if self.params.metric == DistanceMetric::Cosine {
585            normalize_in_place(&mut vector)?;
586        }
587        let level = self.random_level();
588
589        self.vectors[ord as usize] = vector;
590        let mut neighbors = Vec::with_capacity(level + 1);
591        for _ in 0..=level {
592            neighbors.push(Vec::new());
593        }
594        self.nodes[ord as usize] = RwLock::new(Node { neighbors, level });
595
596        self.connect_node(ord, level);
597        Ok(())
598    }
599
600    /// Search-layer + neighbor-select + back-link insertion routine.
601    /// Shared by [`Self::add_vector`] (append) and
602    /// [`Self::add_vector_at_ordinal`] (pre-reserved slot). At call
603    /// time the slot for `id` must already hold its vector and an
604    /// empty-neighbors `Node { level }`. Updates `entry_point` /
605    /// `max_level` if this is the first node or if `level` exceeds
606    /// the current max.
607    fn connect_node(&mut self, id: u32, level: usize) {
608        let (ep0, max_l0) = unpack_entry(self.entry.load(AtomicOrdering::Relaxed));
609        if ep0 == ENTRY_SENTINEL {
610            // First node bootstraps the entry; it has no neighbors to link.
611            self.entry
612                .store(pack_entry(id, level as u32), AtomicOrdering::Relaxed);
613            return;
614        }
615        // max_level cannot change during this call — only the promotion
616        // below mutates it, so the start-of-call snapshot is exact (the
617        // basis for `Fixed(1)` bit-identity with the old `self.max_level`).
618        let max_level = max_l0 as usize;
619
620        // Greedy search from top to the insertion level + 1
621        let mut current = ep0;
622        for lev in (level + 1..=max_level).rev() {
623            current = self.greedy_closest(current, id, lev);
624        }
625
626        // Insert at each level from min(level, max_level) down to 0
627        let insert_from = level.min(max_level);
628        let mut ep_for_level = current;
629
630        for lev in (0..=insert_from).rev() {
631            let candidates = self.search_layer(id, ep_for_level, self.params.ef_construction, lev);
632            let neighbors_to_connect =
633                self.select_neighbors_heuristic(&candidates, self.m_max(lev));
634
635            // Set this node's neighbors at this level (under its own lock).
636            {
637                let mut node = self.nodes[id as usize].write();
638                if lev < node.neighbors.len() {
639                    node.neighbors[lev] = neighbors_to_connect.iter().map(|c| c.id).collect();
640                }
641            }
642
643            // Add back-links — one neighbor lock held at a time.
644            for &n in &neighbors_to_connect {
645                let mut node = self.nodes[n.id as usize].write();
646                // Bounds guard (mirrors the original): n must exist at this
647                // level, else per-level index is OOB.
648                if lev < node.neighbors.len() {
649                    node.neighbors[lev].push(id);
650                    // Prune when over the layer-appropriate cap.
651                    // M_max0 = 2*M at layer 0, M_max = M elsewhere.
652                    if node.neighbors[lev].len() > self.m_max(lev) {
653                        self.prune_connections_in(&mut node, n.id, lev);
654                    }
655                }
656            }
657
658            if !candidates.is_empty() {
659                ep_for_level = candidates[0].id;
660            }
661        }
662
663        // Promote to entry only on a strictly taller level (hnswlib's
664        // "only promote when taller"; strict `>` preserves bit-identity).
665        if level as u32 > max_l0 {
666            self.entry
667                .store(pack_entry(id, level as u32), AtomicOrdering::Relaxed);
668        }
669    }
670
671    fn dist(&self, a: u32, b: u32) -> f32 {
672        super::distance(
673            &self.vectors[a as usize],
674            &self.vectors[b as usize],
675            self.params.metric,
676        )
677    }
678
679    fn dist_to_vec(&self, a: u32, query: &[f32]) -> f32 {
680        super::distance(&self.vectors[a as usize], query, self.params.metric)
681    }
682
683    fn greedy_closest(&self, start: u32, target: u32, level: usize) -> u32 {
684        let mut current = start;
685        let mut current_dist = self.dist(current, target);
686        loop {
687            let mut changed = false;
688            // Read-lock `current` across the scan: `dist` reads `vectors`,
689            // never `nodes`, so no second node lock is taken (deadlock-free)
690            // and no neighbor copy is needed on this sequential path.
691            let node = self.nodes[current as usize].read();
692            if level < node.neighbors.len() {
693                for &neighbor in &node.neighbors[level] {
694                    let d = self.dist(neighbor, target);
695                    if d < current_dist {
696                        current = neighbor;
697                        current_dist = d;
698                        changed = true;
699                    }
700                }
701            }
702            drop(node);
703            if !changed {
704                break;
705            }
706        }
707        current
708    }
709
710    fn search_layer(&self, query_id: u32, entry: u32, ef: usize, level: usize) -> Vec<Candidate> {
711        self.search_layer_vec(&self.vectors[query_id as usize], entry, ef, level, None)
712    }
713
714    fn search_layer_vec(
715        &self,
716        query: &[f32],
717        entry: u32,
718        ef: usize,
719        level: usize,
720        filter: Option<&roaring::RoaringBitmap>,
721    ) -> Vec<Candidate> {
722        let mut visited = HashSet::new();
723        let mut candidates = BinaryHeap::new(); // min-heap (closest first)
724        let mut results = BinaryHeap::new(); // max-heap (furthest first)
725
726        let d = self.dist_to_vec(entry, query);
727        visited.insert(entry);
728        candidates.push(Candidate { id: entry, dist: d });
729        if filter.is_none() || filter.unwrap().contains(entry) {
730            results.push(FurthestCandidate(Candidate { id: entry, dist: d }));
731        }
732
733        while let Some(c) = candidates.pop() {
734            let furthest_dist = results.peek().map(|f| f.0.dist).unwrap_or(f32::MAX);
735            if c.dist > furthest_dist && results.len() >= ef {
736                break;
737            }
738
739            let node = self.nodes[c.id as usize].read();
740            if level < node.neighbors.len() {
741                for &neighbor in &node.neighbors[level] {
742                    if visited.insert(neighbor) {
743                        let d = self.dist_to_vec(neighbor, query);
744                        let furthest_dist = results.peek().map(|f| f.0.dist).unwrap_or(f32::MAX);
745                        if d < furthest_dist || results.len() < ef {
746                            candidates.push(Candidate {
747                                id: neighbor,
748                                dist: d,
749                            });
750                            if filter.is_none() || filter.unwrap().contains(neighbor) {
751                                results.push(FurthestCandidate(Candidate {
752                                    id: neighbor,
753                                    dist: d,
754                                }));
755                                if results.len() > ef {
756                                    results.pop();
757                                }
758                            }
759                        }
760                    }
761                }
762            }
763        }
764
765        results.into_sorted_vec().into_iter().map(|f| f.0).collect()
766    }
767
768    /// Layer-appropriate neighbor cap. M_max0 = 2*M at layer 0,
769    /// M_max = M at higher layers. Matches Lucene's
770    /// `Lucene99HnswVectorsFormat`, hnswlib's `maxM0_ = 2 * M_`,
771    /// faiss's `nb_neighbors(0) = M*2`, and pgvector's
772    /// `HnswGetMaxConnections`. See [[fix-hnsw-neighbor-heuristic]].
773    fn m_max(&self, level: usize) -> usize {
774        if level == 0 {
775            self.params.m * 2
776        } else {
777            self.params.m
778        }
779    }
780
781    /// Algorithm 4 from Malkov & Yashunin 2018 — neighbor selection
782    /// with diversity. Without the diversity check, the resulting
783    /// graph clusters neighbors in one direction and the search has
784    /// to traverse many extra nodes to reach other parts of the
785    /// vector space. See [[fix-hnsw-neighbor-heuristic]].
786    ///
787    /// Matches hnswlib's `getNeighborsByHeuristic2` strictness: a
788    /// candidate `c` is kept when `dist(c, s) >= c.dist_to_query`
789    /// for every already-selected neighbor `s`. The `>=` (not `>`)
790    /// handles exact ties — matters for diff-testing graphs against
791    /// hnswlib even though cosine on real corpora rarely produces
792    /// ties.
793    fn select_neighbors_heuristic(&self, candidates: &[Candidate], m: usize) -> Vec<Candidate> {
794        let mut working: Vec<Candidate> = candidates.to_vec();
795        working.sort_by(|a, b| a.dist.partial_cmp(&b.dist).unwrap_or(Ordering::Equal));
796
797        let mut selected: Vec<Candidate> = Vec::with_capacity(m);
798        for c in working {
799            if selected.len() == m {
800                break;
801            }
802            // c.dist holds dist-to-query. Keep c only if it is closer
803            // to the query than to any already-selected neighbor.
804            let diverse = selected.iter().all(|s| self.dist(c.id, s.id) >= c.dist);
805            if diverse {
806                selected.push(c);
807            }
808        }
809        selected
810    }
811
812    /// Re-prune `node`'s level-`level` neighbor list down to the layer
813    /// cap, operating on an already-held write guard so no second node
814    /// lock is taken (deadlock-free). Reads only this node's own list
815    /// plus the connect-phase-immutable `vectors`. Replaces the old
816    /// `prune_connections(&mut self, ...)` that re-indexed `self.nodes`,
817    /// which would self-deadlock once `nodes` is `Vec<RwLock<Node>>`.
818    fn prune_connections_in(&self, node: &mut Node, node_id: u32, level: usize) {
819        let candidates: Vec<Candidate> = node.neighbors[level]
820            .iter()
821            .map(|&n| Candidate {
822                id: n,
823                dist: self.dist(node_id, n),
824            })
825            .collect();
826        let kept = self.select_neighbors_heuristic(&candidates, self.m_max(level));
827        node.neighbors[level] = kept.iter().map(|c| c.id).collect();
828    }
829
830    /// Mark node `ord` fully linked (Release) so traversers may route
831    /// through it. Paired with [`Self::is_ready`]'s Acquire load.
832    fn set_ready(&self, ord: u32) {
833        let word = (ord / 64) as usize;
834        let bit = ord % 64;
835        self.ready[word].fetch_or(1u64 << bit, AtomicOrdering::Release);
836    }
837
838    /// True iff node `ord` is fully linked at every level (Acquire).
839    fn is_ready(&self, ord: u32) -> bool {
840        let word = (ord / 64) as usize;
841        let bit = ord % 64;
842        (self.ready[word].load(AtomicOrdering::Acquire) >> bit) & 1 == 1
843    }
844
845    /// Link every stored-but-unlinked node — the pending tail
846    /// `[connected_count, vectors.len())` — into the graph, then advance
847    /// the cursor. A no-op when the tail is empty. `Fixed(1)` runs the
848    /// sequential, bit-identical path; `Ambient` / `Fixed(n>1)` run the
849    /// `par_iter` over [`Self::connect_node_locked`]. See
850    /// [[optimization-concurrent-hnsw-insert]] §Write model.
851    pub fn connect_pending(&mut self, threads: BuildThreads) {
852        let start = self.connected_count;
853        let end = self.vectors.len();
854        if start >= end {
855            return;
856        }
857        match threads {
858            BuildThreads::Fixed(1) => self.connect_tail_sequential(start, end),
859            BuildThreads::Ambient => {
860                // A single-threaded ambient pool — a 1-core machine, a build
861                // pinned via `set_num_threads(1)` / `RAYON_NUM_THREADS=1`, or
862                // a 1-thread scoped pool — has no concurrency to exploit, so
863                // run the deterministic, byte-identical sequential path
864                // instead of the lock/atomic machinery. This makes 1-thread
865                // builds reproducible (and keeps the recall-regression guard
866                // deterministic) at zero cost to the multi-core default — the
867                // `Fixed(1)` bit-identical contract from
868                // [[optimization-concurrent-hnsw-insert]] §Determinism.
869                if current_num_threads() <= 1 {
870                    self.connect_tail_sequential(start, end);
871                } else {
872                    self.connect_tail_parallel(start, end);
873                }
874            }
875            BuildThreads::Fixed(n) => {
876                let pool = ThreadPoolBuilder::new()
877                    .num_threads(n)
878                    .build()
879                    .expect("failed to build HNSW connect thread pool");
880                pool.install(|| self.connect_tail_parallel(start, end));
881            }
882        }
883        self.connected_count = end;
884    }
885
886    /// Sequential connect of the tail `[start, end)` — the deterministic
887    /// path that is byte-identical to building via [`Self::add_vector`]
888    /// (proven by `connect_pending_fixed1_matches_add_vector_bytes`). Used
889    /// for `Fixed(1)` and for a single-threaded `Ambient` pool.
890    fn connect_tail_sequential(&mut self, start: usize, end: usize) {
891        for ord in start..end {
892            let level = self.nodes[ord].read().level;
893            self.connect_node(ord as u32, level);
894            self.set_ready(ord as u32);
895        }
896    }
897
898    /// `par_iter` the tail through [`Self::connect_node_locked`]. Borrows
899    /// `&self`; neighbor state mutates through the per-node locks and the
900    /// packed `entry` atomic, so no `&mut` crosses threads. Runs on
901    /// whatever rayon pool the caller installed.
902    fn connect_tail_parallel(&self, start: usize, end: usize) {
903        (start..end).into_par_iter().for_each(|ord| {
904            let level = self.nodes[ord].read().level;
905            self.connect_node_locked(ord as u32, level);
906        });
907    }
908
909    /// Parallel insert of an already-stored node `ord` at its drawn
910    /// `level`. Mutates only interior locks and the `entry` / `ready`
911    /// atomics through `&self`, so many run concurrently under
912    /// `connect_tail_parallel`. Mirrors the sequential
913    /// [`Self::connect_node`] but: (a) reads gate on `ready` and
914    /// snapshot-under-lock, (b) entry bootstrap / promotion is CAS,
915    /// (c) `ready` is published last. See
916    /// [[optimization-concurrent-hnsw-insert]] §connect_node_locked. The
917    /// entry CAS and the `ready` gate are modeled exhaustively in
918    /// `tests/loom_hnsw.rs` — keep the two in sync.
919    fn connect_node_locked(&self, ord: u32, level: usize) {
920        // `max_level_u` is a point-in-time snapshot of the entry word; a
921        // concurrent promotion can make it stale, so the descent may start
922        // one layer low. Intentional (hnswlib's `maxlevelcopy`) — the node
923        // still links at every one of its own layers, so don't "fix" this
924        // into a lock.
925        let (mut ep, mut max_level_u) = unpack_entry(self.entry.load(AtomicOrdering::Acquire));
926        if ep == ENTRY_SENTINEL {
927            // First node bootstraps the entry; it has no neighbors. Mark
928            // ready BEFORE the CAS so any thread that reads `ord` as the
929            // entry and gates on `ready[ord]` finds it set. A thread that
930            // *loses* the bootstrap race has now published `ready[ord]` with
931            // zero edges and falls through to link below; that is safe — no
932            // node points to `ord` until it builds its own back-links, its
933            // slot and vector were filled by `store_vector`, and every read
934            // is lock-protected, so a traverser reaching `ord` in that window
935            // sees an empty (dead-end) list, never UB. Recall wobble is
936            // construction-time only.
937            self.set_ready(ord);
938            match self.entry.compare_exchange(
939                pack_entry(ENTRY_SENTINEL, 0),
940                pack_entry(ord, level as u32),
941                AtomicOrdering::AcqRel,
942                AtomicOrdering::Acquire,
943            ) {
944                Ok(_) => return,
945                Err(actual) => {
946                    // Lost the bootstrap race; descend from the winner,
947                    // whose (ep, max_level) pair is read as one word.
948                    let (a_ep, a_max) = unpack_entry(actual);
949                    ep = a_ep;
950                    max_level_u = a_max;
951                }
952            }
953        }
954        let max_level = max_level_u as usize;
955
956        // Greedy descent from max_level down to level+1 (reads only).
957        for lev in (level + 1..=max_level).rev() {
958            ep = self.greedy_closest_ready(ep, ord, lev);
959        }
960
961        let insert_from = level.min(max_level);
962        let mut ep_for_level = ep;
963        for lev in (0..=insert_from).rev() {
964            let candidates =
965                self.search_layer_ready(ord, ep_for_level, self.params.ef_construction, lev);
966            let selected = self.select_neighbors_heuristic(&candidates, self.m_max(lev));
967
968            // Write own list under ord's lock.
969            {
970                let mut node = self.nodes[ord as usize].write();
971                if lev < node.neighbors.len() {
972                    node.neighbors[lev] = selected.iter().map(|c| c.id).collect();
973                }
974            }
975            // Back-links — one neighbor lock held at a time (no nested
976            // locks ⇒ no lock-ordering deadlock).
977            for &n in &selected {
978                let mut node = self.nodes[n.id as usize].write();
979                if lev < node.neighbors.len() {
980                    node.neighbors[lev].push(ord);
981                    if node.neighbors[lev].len() > self.m_max(lev) {
982                        self.prune_connections_in(&mut node, n.id, lev);
983                    }
984                }
985            }
986            if !candidates.is_empty() {
987                ep_for_level = candidates[0].id;
988            }
989        }
990
991        // Publish: node fully linked at every level, now traversable.
992        self.set_ready(ord);
993
994        // Promote to entry only on a strictly taller level (CAS-loop;
995        // strict `>` matches the sequential path's promotion rule).
996        loop {
997            let cur = self.entry.load(AtomicOrdering::Acquire);
998            let (_, cur_max) = unpack_entry(cur);
999            if (level as u32) <= cur_max {
1000                break;
1001            }
1002            if self
1003                .entry
1004                .compare_exchange(
1005                    cur,
1006                    pack_entry(ord, level as u32),
1007                    AtomicOrdering::AcqRel,
1008                    AtomicOrdering::Acquire,
1009                )
1010                .is_ok()
1011            {
1012                break;
1013            }
1014        }
1015    }
1016
1017    /// `greedy_closest` for the parallel path: skips candidates whose
1018    /// `ready` bit is unset (a half-linked node is never traversed) and
1019    /// snapshots each node's neighbor ids under a short read lock before
1020    /// computing `dist` outside the lock (qdrant pattern — never hold the
1021    /// lock across `dist`, never block a back-linker).
1022    fn greedy_closest_ready(&self, start: u32, target: u32, level: usize) -> u32 {
1023        let mut current = start;
1024        let mut current_dist = self.dist(current, target);
1025        loop {
1026            let neighbors: Vec<u32> = {
1027                let node = self.nodes[current as usize].read();
1028                if level < node.neighbors.len() {
1029                    node.neighbors[level].clone()
1030                } else {
1031                    Vec::new()
1032                }
1033            };
1034            let mut changed = false;
1035            for neighbor in neighbors {
1036                if !self.is_ready(neighbor) {
1037                    continue;
1038                }
1039                let d = self.dist(neighbor, target);
1040                if d < current_dist {
1041                    current = neighbor;
1042                    current_dist = d;
1043                    changed = true;
1044                }
1045            }
1046            if !changed {
1047                break;
1048            }
1049        }
1050        current
1051    }
1052
1053    /// `search_layer` for the parallel path: ready-gated, snapshot-under-
1054    /// lock (see [`Self::greedy_closest_ready`]). No `filter` arg — the
1055    /// builder never filters at construction time.
1056    fn search_layer_ready(
1057        &self,
1058        query_id: u32,
1059        entry: u32,
1060        ef: usize,
1061        level: usize,
1062    ) -> Vec<Candidate> {
1063        let query = self.vectors[query_id as usize].as_slice();
1064        let mut visited = HashSet::new();
1065        let mut candidates = BinaryHeap::new();
1066        let mut results = BinaryHeap::new();
1067
1068        let d = self.dist_to_vec(entry, query);
1069        visited.insert(entry);
1070        candidates.push(Candidate { id: entry, dist: d });
1071        results.push(FurthestCandidate(Candidate { id: entry, dist: d }));
1072
1073        while let Some(c) = candidates.pop() {
1074            let furthest_dist = results.peek().map(|f| f.0.dist).unwrap_or(f32::MAX);
1075            if c.dist > furthest_dist && results.len() >= ef {
1076                break;
1077            }
1078            let neighbors: Vec<u32> = {
1079                let node = self.nodes[c.id as usize].read();
1080                if level < node.neighbors.len() {
1081                    node.neighbors[level].clone()
1082                } else {
1083                    Vec::new()
1084                }
1085            };
1086            for neighbor in neighbors {
1087                if !self.is_ready(neighbor) {
1088                    continue;
1089                }
1090                if visited.insert(neighbor) {
1091                    let d = self.dist_to_vec(neighbor, query);
1092                    let furthest_dist = results.peek().map(|f| f.0.dist).unwrap_or(f32::MAX);
1093                    if d < furthest_dist || results.len() < ef {
1094                        candidates.push(Candidate {
1095                            id: neighbor,
1096                            dist: d,
1097                        });
1098                        results.push(FurthestCandidate(Candidate {
1099                            id: neighbor,
1100                            dist: d,
1101                        }));
1102                        if results.len() > ef {
1103                            results.pop();
1104                        }
1105                    }
1106                }
1107            }
1108        }
1109        results.into_sorted_vec().into_iter().map(|f| f.0).collect()
1110    }
1111
1112    /// Number of vectors in the index.
1113    pub fn len(&self) -> usize {
1114        self.vectors.len()
1115    }
1116    pub fn is_empty(&self) -> bool {
1117        self.vectors.is_empty()
1118    }
1119
1120    /// True iff there are stored-but-unlinked nodes (a non-empty pending
1121    /// tail). `get_or_build_index` asserts this is false before building
1122    /// the read-side index — a true value at persist means a
1123    /// `connect_pending` trigger was missed (the disconnected-graph bug).
1124    /// See [[optimization-concurrent-hnsw-insert]] §Conversion.
1125    pub fn has_pending_tail(&self) -> bool {
1126        self.connected_count < self.vectors.len()
1127    }
1128
1129    /// Configured params (dims, M, metric, quantization). Exposed for
1130    /// the merge path so it can size a new builder identically without
1131    /// re-deriving from the schema.
1132    pub fn params(&self) -> HnswParams {
1133        self.params.clone()
1134    }
1135
1136    /// Reconstruct a builder from a previously-built [`HnswIndex`].
1137    ///
1138    /// Used by the global-HNSW reopen path: persisted graphs come back
1139    /// as an [`HnswIndex`], but the writer needs an `HnswBuilder` to
1140    /// accept further inserts. `level_mult` and `rng_state` are
1141    /// re-derived; HNSW is approximate, so resuming with a fresh RNG
1142    /// state changes which levels new inserts land on but not
1143    /// correctness.
1144    pub fn from_index(index: HnswIndex) -> Self {
1145        let level_mult = 1.0 / (index.params.m as f64).ln();
1146        let ep = index.entry_point.unwrap_or(ENTRY_SENTINEL);
1147        let entry = AtomicU64::new(pack_entry(ep, index.max_level as u32));
1148        let n = index.vectors.len();
1149        // A reloaded graph is fully connected: every node is `ready` and
1150        // the pending tail is empty. Without all-ready, a subsequent
1151        // `connect_pending` would skip the loaded nodes as un-traversable
1152        // and link the new tail into an effectively empty graph. Trailing
1153        // bits past `n` are set but never indexed (harmless).
1154        let mut ready = Vec::with_capacity(n.div_ceil(64));
1155        for _ in 0..n.div_ceil(64) {
1156            ready.push(AtomicU64::new(u64::MAX));
1157        }
1158        Self {
1159            params: index.params,
1160            vectors: index.vectors,
1161            nodes: index.nodes.into_iter().map(RwLock::new).collect(),
1162            entry,
1163            ready,
1164            connected_count: n,
1165            level_mult,
1166            rng_state: 42,
1167        }
1168    }
1169
1170    /// Build the final index. Quantization is applied per the
1171    /// `quantization` field on [`HnswParams`].
1172    pub fn build(self) -> HnswIndex {
1173        let quantized = match self.params.quantization {
1174            QuantizationType::None => None,
1175            QuantizationType::Int8 if !self.vectors.is_empty() => Some(
1176                super::quantize::QuantizedVectors::quantize(&self.vectors, self.params.metric),
1177            ),
1178            QuantizationType::Int8 => None,
1179            // `new()` rejects these variants. Reaching this arm means
1180            // someone bypassed the constructor invariant.
1181            unsupported @ (QuantizationType::Int4 | QuantizationType::Bbq) => {
1182                panic!(
1183                    "HnswBuilder::build reached with unimplemented \
1184                     quantization {unsupported:?}; the constructor was \
1185                     supposed to reject this."
1186                );
1187            }
1188        };
1189
1190        let (ep, max_level) = unpack_entry(self.entry.load(AtomicOrdering::Relaxed));
1191        let entry_point = if ep == ENTRY_SENTINEL { None } else { Some(ep) };
1192
1193        HnswIndex {
1194            params: self.params,
1195            vectors: self.vectors,
1196            nodes: self
1197                .nodes
1198                .into_iter()
1199                .map(|lock| lock.into_inner())
1200                .collect(),
1201            entry_point,
1202            max_level: max_level as usize,
1203            quantized,
1204        }
1205    }
1206}
1207
1208/// Immutable HNSW index for search.
1209pub struct HnswIndex {
1210    params: HnswParams,
1211    vectors: Vec<Vec<f32>>,
1212    nodes: Vec<Node>,
1213    entry_point: Option<u32>,
1214    max_level: usize,
1215    /// Int8 quantized vectors for fast approximate distance.
1216    quantized: Option<super::quantize::QuantizedVectors>,
1217}
1218
1219impl HnswIndex {
1220    /// Search for k nearest neighbors.
1221    pub fn search(&self, query: &[f32], k: usize, ef: usize) -> Result<Vec<(u32, f32)>, LuciError> {
1222        self.search_filtered(query, k, ef, None)
1223    }
1224
1225    /// Search with optional pre-filter bitmap.
1226    ///
1227    /// For [`DistanceMetric::Cosine`] the query is normalized once at entry
1228    /// so every kernel call sees unit-length inputs. Zero / non-finite
1229    /// query vectors return `LuciError::InvalidQuery` rather than producing
1230    /// garbage scores.
1231    pub fn search_filtered(
1232        &self,
1233        query: &[f32],
1234        k: usize,
1235        ef: usize,
1236        filter: Option<&roaring::RoaringBitmap>,
1237    ) -> Result<Vec<(u32, f32)>, LuciError> {
1238        if self.entry_point.is_none() {
1239            return Ok(Vec::new());
1240        }
1241
1242        // Normalize query once for cosine; the inner kernel runs as a
1243        // pure dot product under the unit-length invariant.
1244        let query_owned: Vec<f32> = if self.params.metric == DistanceMetric::Cosine {
1245            let mut q = query.to_vec();
1246            normalize_in_place(&mut q)?;
1247            q
1248        } else {
1249            query.to_vec()
1250        };
1251        let query = &query_owned[..];
1252
1253        // Brute-force fallback for very selective filters
1254        if let Some(bm) = filter {
1255            if (bm.len() as f64) < (self.vectors.len() as f64 * 0.01) {
1256                return Ok(self.brute_force_search(query, k, bm));
1257            }
1258        }
1259
1260        let ep = self.entry_point.unwrap();
1261        let ef_actual = ef.max(k);
1262
1263        // Greedy descend from top to layer 1
1264        let mut current = ep;
1265        for lev in (1..=self.max_level).rev() {
1266            current = self.greedy_closest_vec(current, query, lev);
1267        }
1268
1269        // Search layer 0 with beam (uses quantized distance if available)
1270        let candidates = self.search_layer_0(query, current, ef_actual, filter);
1271
1272        // Rerank with exact float32 distances for final ordering
1273        let mut results: Vec<(u32, f32)> = if self.quantized.is_some() {
1274            candidates
1275                .into_iter()
1276                .map(|c| (c.id, self.exact_dist(c.id, query)))
1277                .collect()
1278        } else {
1279            candidates.into_iter().map(|c| (c.id, c.dist)).collect()
1280        };
1281        results.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(Ordering::Equal));
1282        results.truncate(k);
1283        Ok(results)
1284    }
1285
1286    fn greedy_closest_vec(&self, start: u32, query: &[f32], level: usize) -> u32 {
1287        let mut current = start;
1288        let mut current_dist = self.quantized_dist(current, query);
1289        loop {
1290            let mut changed = false;
1291            if level < self.nodes[current as usize].neighbors.len() {
1292                for &neighbor in &self.nodes[current as usize].neighbors[level] {
1293                    let d = self.quantized_dist(neighbor, query);
1294                    if d < current_dist {
1295                        current = neighbor;
1296                        current_dist = d;
1297                        changed = true;
1298                    }
1299                }
1300            }
1301            if !changed {
1302                break;
1303            }
1304        }
1305        current
1306    }
1307
1308    fn search_layer_0(
1309        &self,
1310        query: &[f32],
1311        entry: u32,
1312        ef: usize,
1313        filter: Option<&roaring::RoaringBitmap>,
1314    ) -> Vec<Candidate> {
1315        let mut visited = HashSet::new();
1316        let mut candidates = BinaryHeap::new();
1317        let mut results = BinaryHeap::new();
1318
1319        let d = self.quantized_dist(entry, query);
1320        visited.insert(entry);
1321        candidates.push(Candidate { id: entry, dist: d });
1322        if filter.is_none() || filter.unwrap().contains(entry) {
1323            results.push(FurthestCandidate(Candidate { id: entry, dist: d }));
1324        }
1325
1326        while let Some(c) = candidates.pop() {
1327            let furthest_dist = results.peek().map(|f| f.0.dist).unwrap_or(f32::MAX);
1328            if c.dist > furthest_dist && results.len() >= ef {
1329                break;
1330            }
1331
1332            if !self.nodes[c.id as usize].neighbors.is_empty() {
1333                for &neighbor in &self.nodes[c.id as usize].neighbors[0] {
1334                    if visited.insert(neighbor) {
1335                        let d = self.quantized_dist(neighbor, query);
1336                        let furthest_dist = results.peek().map(|f| f.0.dist).unwrap_or(f32::MAX);
1337                        if d < furthest_dist || results.len() < ef {
1338                            candidates.push(Candidate {
1339                                id: neighbor,
1340                                dist: d,
1341                            });
1342                            if filter.is_none() || filter.unwrap().contains(neighbor) {
1343                                results.push(FurthestCandidate(Candidate {
1344                                    id: neighbor,
1345                                    dist: d,
1346                                }));
1347                                if results.len() > ef {
1348                                    results.pop();
1349                                }
1350                            }
1351                        }
1352                    }
1353                }
1354            }
1355        }
1356
1357        let mut result: Vec<Candidate> = results.into_iter().map(|f| f.0).collect();
1358        result.sort_by(|a, b| a.dist.partial_cmp(&b.dist).unwrap_or(Ordering::Equal));
1359        result
1360    }
1361
1362    fn brute_force_search(
1363        &self,
1364        query: &[f32],
1365        k: usize,
1366        filter: &roaring::RoaringBitmap,
1367    ) -> Vec<(u32, f32)> {
1368        let mut results: Vec<(u32, f32)> = filter
1369            .iter()
1370            .filter(|&id| (id as usize) < self.vectors.len())
1371            .map(|id| (id, self.quantized_dist(id, query)))
1372            .collect();
1373        results.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(Ordering::Equal));
1374        results.truncate(k);
1375        results
1376    }
1377
1378    /// Distance between stored vector `a` and query. Uses quantized
1379    /// distance when available, falling back to float32.
1380    #[inline]
1381    fn quantized_dist(&self, a: u32, query: &[f32]) -> f32 {
1382        if let Some(ref qv) = self.quantized {
1383            qv.asymmetric_distance(a as usize, query)
1384        } else {
1385            super::distance(&self.vectors[a as usize], query, self.params.metric)
1386        }
1387    }
1388
1389    /// Exact float32 distance (for reranking after quantized search).
1390    fn exact_dist(&self, a: u32, query: &[f32]) -> f32 {
1391        super::distance(&self.vectors[a as usize], query, self.params.metric)
1392    }
1393
1394    pub fn len(&self) -> usize {
1395        self.vectors.len()
1396    }
1397    pub fn is_empty(&self) -> bool {
1398        self.vectors.is_empty()
1399    }
1400    pub fn dims(&self) -> usize {
1401        self.params.dims
1402    }
1403
1404    /// Get a vector by ID.
1405    pub fn vector(&self, id: u32) -> &[f32] {
1406        &self.vectors[id as usize]
1407    }
1408
1409    /// Serialize to bytes.
1410    ///
1411    /// Writes the v2 format unconditionally — magic prefix
1412    /// [`HNSW_FORMAT_MAGIC`] followed by [`HNSW_FORMAT_VERSION`], then
1413    /// the header fields readers expect. v1 segments (without the
1414    /// magic prefix) are read-only legacy; the builder never writes
1415    /// them. See [[optimize-cosine-norm-precompute]].
1416    pub fn to_bytes(&self) -> Vec<u8> {
1417        let mut buf = Vec::new();
1418        // v2 format prefix
1419        buf.extend_from_slice(&HNSW_FORMAT_MAGIC);
1420        buf.push(HNSW_FORMAT_VERSION);
1421        // Header
1422        buf.extend_from_slice(&(self.params.dims as u32).to_le_bytes());
1423        buf.extend_from_slice(&(self.params.m as u32).to_le_bytes());
1424        buf.push(self.params.metric as u8);
1425        buf.extend_from_slice(&(self.vectors.len() as u32).to_le_bytes());
1426        buf.extend_from_slice(&self.entry_point.unwrap_or(u32::MAX).to_le_bytes());
1427        buf.extend_from_slice(&(self.max_level as u32).to_le_bytes());
1428
1429        // Vectors: flat f32 array
1430        for v in &self.vectors {
1431            for &f in v {
1432                buf.extend_from_slice(&f.to_le_bytes());
1433            }
1434        }
1435
1436        // Nodes: level + per-level neighbor lists
1437        for node in &self.nodes {
1438            buf.extend_from_slice(&(node.level as u32).to_le_bytes());
1439            buf.extend_from_slice(&(node.neighbors.len() as u32).to_le_bytes());
1440            for layer in &node.neighbors {
1441                buf.extend_from_slice(&(layer.len() as u32).to_le_bytes());
1442                for &n in layer {
1443                    buf.extend_from_slice(&n.to_le_bytes());
1444                }
1445            }
1446        }
1447
1448        // Quantized vectors (if present)
1449        if let Some(ref qv) = self.quantized {
1450            buf.push(1u8); // has quantized
1451            let qbytes = qv.to_bytes();
1452            buf.extend_from_slice(&(qbytes.len() as u32).to_le_bytes());
1453            buf.extend_from_slice(&qbytes);
1454        } else {
1455            buf.push(0u8); // no quantized
1456        }
1457
1458        buf
1459    }
1460
1461    /// Deserialize from bytes.
1462    ///
1463    /// Returns `Err(SegmentFormatMigrationRequired)` for v0.7.1 cosine
1464    /// segments (which stored un-normalized vectors and can't be read
1465    /// under the v2 unit-length invariant) and
1466    /// `Err(SegmentFormatUnknown)` for future versions an older binary
1467    /// can't parse. See [[optimize-cosine-norm-precompute]] §Migration.
1468    pub fn from_bytes(data: &[u8]) -> Result<Self, LuciError> {
1469        let header = read_header(data)?;
1470        let dims = header.dims;
1471        let m = header.m;
1472        let metric = header.metric;
1473        let num_vectors = header.num_vectors;
1474        let entry_point = header.entry_point;
1475        let max_level = header.max_level;
1476        let mut pos = header.vectors_offset;
1477
1478        // Vectors. Guard `num_vectors` before reserving: each index costs at
1479        // least its vector bytes (`dims × 4`) plus its node entry (≥8 B), so a
1480        // corrupt count can't drive a giant speculative allocation.
1481        checked_len(
1482            num_vectors,
1483            dims.saturating_mul(4).saturating_add(8),
1484            data,
1485            pos,
1486        )?;
1487        let mut vectors = Vec::with_capacity(num_vectors);
1488        for _ in 0..num_vectors {
1489            let mut v = Vec::with_capacity(dims);
1490            for _ in 0..dims {
1491                v.push(read_f32(data, &mut pos)?);
1492            }
1493            vectors.push(v);
1494        }
1495
1496        // Nodes
1497        let mut nodes = Vec::with_capacity(num_vectors);
1498        for _ in 0..num_vectors {
1499            let level = read_u32(data, &mut pos)? as usize;
1500            let num_layers = read_u32(data, &mut pos)? as usize;
1501            let mut neighbors = Vec::with_capacity(checked_len(num_layers, 4, data, pos)?);
1502            for _ in 0..num_layers {
1503                let num_neighbors = read_u32(data, &mut pos)? as usize;
1504                let mut layer = Vec::with_capacity(checked_len(num_neighbors, 4, data, pos)?);
1505                for _ in 0..num_neighbors {
1506                    layer.push(read_u32(data, &mut pos)?);
1507                }
1508                neighbors.push(layer);
1509            }
1510            nodes.push(Node { neighbors, level });
1511        }
1512
1513        // Quantized vectors. The on-disk format records a flag byte at
1514        // `pos` indicating whether quantized data follows:
1515        //   - `1` → a quantized blob follows; deserialise it.
1516        //   - `0` → no quantized data; honor it (this is what the user
1517        //          asked for when the mapping said `"quantization": "none"`).
1518        //   - missing (`pos == data.len()`) → v1 legacy file that predates
1519        //          the flag. v1 never had quantization, so treat as None.
1520        //
1521        // The previous else branch synthesised int8 quantized vectors when
1522        // the file flag was 0 *or* missing, overriding the user's mapping
1523        // and routing query-time beam search through int8 scalar instead
1524        // of FP32. That was a [[code-must-not-lie]] violation; see
1525        // [[hnsw-query-path-allocation-overhead]] for the diagnostic.
1526        let quantized = if pos < data.len() && data[pos] == 1 {
1527            pos += 1;
1528            let qlen = read_u32(data, &mut pos)? as usize;
1529            let qbytes = take_bytes(data, &mut pos, qlen)?;
1530            Some(super::quantize::QuantizedVectors::from_bytes(qbytes))
1531        } else {
1532            None
1533        };
1534
1535        let quantization = if quantized.is_some() {
1536            QuantizationType::Int8
1537        } else {
1538            QuantizationType::None
1539        };
1540
1541        // Validate the graph invariants the search path assumes, so a
1542        // structurally-parseable but corrupt graph errors here instead of
1543        // indexing `self.vectors[id]` / `self.nodes[id]` out of bounds during
1544        // a query. This is the failure the visited-pool design flagged as its
1545        // concrete panic trigger — see [[optimization-hnsw-visited-bitset]].
1546        if let Some(ep) = entry_point {
1547            if ep as usize >= num_vectors {
1548                return Err(LuciError::IndexCorrupted(format!(
1549                    "HNSW entry_point {ep} out of range (num_vectors {num_vectors})"
1550                )));
1551            }
1552        }
1553        for (node_id, node) in nodes.iter().enumerate() {
1554            for layer in &node.neighbors {
1555                for &nid in layer {
1556                    if nid as usize >= num_vectors {
1557                        return Err(LuciError::IndexCorrupted(format!(
1558                            "HNSW node {node_id} neighbour id {nid} out of range \
1559                             (num_vectors {num_vectors})"
1560                        )));
1561                    }
1562                }
1563            }
1564        }
1565
1566        Ok(Self {
1567            params: HnswParams {
1568                dims,
1569                m,
1570                ef_construction: 100,
1571                metric,
1572                quantization,
1573            },
1574            vectors,
1575            nodes,
1576            entry_point,
1577            max_level,
1578            quantized,
1579        })
1580    }
1581}
1582
1583// --- Zero-copy HNSW searcher for scalable segment access ---
1584
1585/// Parsed graph structure (nodes + neighbors). This is the only part
1586/// that needs heap allocation. Cached in SegmentReader.
1587pub struct ParsedGraph {
1588    pub params: HnswParams,
1589    pub num_vectors: usize,
1590    pub entry_point: Option<u32>,
1591    pub max_level: usize,
1592    pub(crate) nodes: Vec<Node>,
1593    /// Byte offset where vector data starts in the segment slice.
1594    pub vector_data_offset: usize,
1595    /// Byte offset where quantized data starts (if present).
1596    pub quantized_offset: Option<usize>,
1597    /// Length of quantized data block.
1598    pub quantized_len: usize,
1599}
1600
1601impl ParsedGraph {
1602    /// Parse the graph structure from HNSW bytes. Records byte offsets
1603    /// for vector and quantized data but does NOT copy them.
1604    pub fn parse(data: &[u8]) -> Result<Self, LuciError> {
1605        let header = read_header(data)?;
1606        let dims = header.dims;
1607        let num_vectors = header.num_vectors;
1608        let mut pos = header.vectors_offset;
1609
1610        // Record vector data offset, skip past vector bytes
1611        let vector_data_offset = pos;
1612        pos += num_vectors * dims * 4; // skip float32 vectors
1613
1614        // Parse nodes (graph structure — this we DO parse into memory)
1615        let mut nodes = Vec::with_capacity(num_vectors);
1616        for _ in 0..num_vectors {
1617            let level = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1618            pos += 4;
1619            let num_layers = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1620            pos += 4;
1621            let mut neighbors = Vec::with_capacity(num_layers);
1622            for _ in 0..num_layers {
1623                let num_neighbors =
1624                    u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1625                pos += 4;
1626                let mut layer = Vec::with_capacity(num_neighbors);
1627                for _ in 0..num_neighbors {
1628                    layer.push(u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()));
1629                    pos += 4;
1630                }
1631                neighbors.push(layer);
1632            }
1633            nodes.push(Node { neighbors, level });
1634        }
1635
1636        // Record quantized data offset
1637        let (quantized_offset, quantized_len) = if pos < data.len() && data[pos] == 1 {
1638            pos += 1;
1639            let qlen = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1640            pos += 4;
1641            (Some(pos), qlen)
1642        } else {
1643            (None, 0)
1644        };
1645
1646        // Derive quantization from whether the segment carries a quantized
1647        // blob — see HnswIndex::from_bytes for the rationale.
1648        let quantization = if quantized_offset.is_some() {
1649            QuantizationType::Int8
1650        } else {
1651            QuantizationType::None
1652        };
1653
1654        Ok(Self {
1655            params: HnswParams {
1656                dims,
1657                m: header.m,
1658                ef_construction: 100,
1659                metric: header.metric,
1660                quantization,
1661            },
1662            num_vectors,
1663            entry_point: header.entry_point,
1664            max_level: header.max_level,
1665            nodes,
1666            vector_data_offset,
1667            quantized_offset,
1668            quantized_len,
1669        })
1670    }
1671}
1672
1673/// Zero-copy HNSW searcher that borrows segment data for vectors and
1674/// quantized data. Only the graph structure (via `ParsedGraph`) is
1675/// heap-allocated.
1676pub struct HnswSearcher<'a> {
1677    graph: &'a ParsedGraph,
1678    /// Raw HNSW bytes (vectors + nodes + quantized, borrowed from segment).
1679    data: &'a [u8],
1680    /// Parsed quantized calibration (mins, scales, norms) — small, cached.
1681    quantized_cal: Option<QuantizedCalibration<'a>>,
1682}
1683
1684/// Quantized calibration data (borrowed from segment bytes).
1685struct QuantizedCalibration<'a> {
1686    dims: usize,
1687    _num_vectors: usize,
1688    _metric: DistanceMetric,
1689    mins: &'a [u8],   // dims × 4 bytes (f32 le)
1690    scales: &'a [u8], // dims × 4 bytes (f32 le)
1691    norms: &'a [u8],  // num_vectors × 4 bytes (f32 le)
1692    data: &'a [u8],   // num_vectors × dims bytes (u8 quantized values)
1693}
1694
1695impl<'a> HnswSearcher<'a> {
1696    /// Create a searcher borrowing segment data and a parsed graph.
1697    pub fn new(data: &'a [u8], graph: &'a ParsedGraph) -> Self {
1698        let quantized_cal = graph.quantized_offset.map(|qoff| {
1699            let qdata = &data[qoff..qoff + graph.quantized_len];
1700            // Parse quantized calibration structure (borrow slices)
1701            let dims = graph.params.dims;
1702            let num_vectors = graph.num_vectors;
1703            let mut p = 0;
1704            // Skip dims, num_vectors, metric (already in graph.params)
1705            p += 4 + 4 + 1; // dims u32 + num_vectors u32 + metric u8
1706            let mins = &qdata[p..p + dims * 4];
1707            p += dims * 4;
1708            let scales = &qdata[p..p + dims * 4];
1709            p += dims * 4;
1710            let norms = &qdata[p..p + num_vectors * 4];
1711            p += num_vectors * 4;
1712            let vdata = &qdata[p..p + num_vectors * dims];
1713            QuantizedCalibration {
1714                dims,
1715                _num_vectors: num_vectors,
1716                _metric: graph.params.metric,
1717                mins,
1718                scales,
1719                norms,
1720                data: vdata,
1721            }
1722        });
1723
1724        Self {
1725            graph,
1726            data,
1727            quantized_cal,
1728        }
1729    }
1730
1731    /// Search for k nearest neighbors.
1732    pub fn search(&self, query: &[f32], k: usize, ef: usize) -> Result<Vec<(u32, f32)>, LuciError> {
1733        self.search_filtered(query, k, ef, None)
1734    }
1735
1736    pub fn search_filtered(
1737        &self,
1738        query: &[f32],
1739        k: usize,
1740        ef: usize,
1741        filter: Option<&roaring::RoaringBitmap>,
1742    ) -> Result<Vec<(u32, f32)>, LuciError> {
1743        if self.graph.entry_point.is_none() {
1744            return Ok(Vec::new());
1745        }
1746
1747        // Normalize the query once at entry for cosine. v2 segments store
1748        // unit-length vectors, so the kernel runs as a pure dot product.
1749        let query_owned: Vec<f32> = if self.graph.params.metric == DistanceMetric::Cosine {
1750            let mut q = query.to_vec();
1751            normalize_in_place(&mut q)?;
1752            q
1753        } else {
1754            query.to_vec()
1755        };
1756        let query = &query_owned[..];
1757
1758        if let Some(bm) = filter {
1759            if (bm.len() as f64) < (self.graph.num_vectors as f64 * 0.01) {
1760                return Ok(self.brute_force_search(query, k, bm));
1761            }
1762        }
1763
1764        let ep = self.graph.entry_point.unwrap();
1765        let ef_actual = ef.max(k);
1766
1767        let mut current = ep;
1768        for lev in (1..=self.graph.max_level).rev() {
1769            current = self.greedy_closest(current, query, lev);
1770        }
1771
1772        let candidates = self.search_layer_0(query, current, ef_actual, filter);
1773
1774        // Rerank with exact float32 distance
1775        let mut results: Vec<(u32, f32)> = candidates
1776            .into_iter()
1777            .map(|c| (c.id, self.exact_dist(c.id, query)))
1778            .collect();
1779        results.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(Ordering::Equal));
1780        results.truncate(k);
1781        Ok(results)
1782    }
1783
1784    /// Read a float32 value from the vector data region.
1785    #[inline]
1786    fn read_f32(&self, byte_offset: usize) -> f32 {
1787        f32::from_le_bytes(self.data[byte_offset..byte_offset + 4].try_into().unwrap())
1788    }
1789
1790    /// Exact float32 distance (for reranking).
1791    fn exact_dist(&self, idx: u32, query: &[f32]) -> f32 {
1792        let dims = self.graph.params.dims;
1793        let base = self.graph.vector_data_offset + (idx as usize) * dims * 4;
1794        let mut vec = Vec::with_capacity(dims);
1795        for d in 0..dims {
1796            vec.push(self.read_f32(base + d * 4));
1797        }
1798        super::distance(&vec, query, self.graph.params.metric)
1799    }
1800
1801    /// Fast approximate distance (quantized if available, else float32).
1802    #[inline]
1803    fn approx_dist(&self, idx: u32, query: &[f32]) -> f32 {
1804        if let Some(ref cal) = self.quantized_cal {
1805            self.quantized_cosine_dist(cal, idx as usize, query)
1806        } else {
1807            self.exact_dist(idx, query)
1808        }
1809    }
1810
1811    fn quantized_cosine_dist(&self, cal: &QuantizedCalibration, idx: usize, query: &[f32]) -> f32 {
1812        let dims = cal.dims;
1813        let qvec = &cal.data[idx * dims..(idx + 1) * dims];
1814        let mut dot = 0.0f32;
1815        for d in 0..dims {
1816            let min = f32::from_le_bytes(cal.mins[d * 4..d * 4 + 4].try_into().unwrap());
1817            let scale = f32::from_le_bytes(cal.scales[d * 4..d * 4 + 4].try_into().unwrap());
1818            let dequant = min + qvec[d] as f32 * scale;
1819            dot += dequant * query[d];
1820        }
1821        let norm_offset = idx * 4;
1822        let stored_norm =
1823            f32::from_le_bytes(cal.norms[norm_offset..norm_offset + 4].try_into().unwrap());
1824        // Query is unit-length by the v0.7.2 invariant; only the
1825        // dequantized stored_norm carries quantization drift.
1826        if stored_norm == 0.0 {
1827            1.0
1828        } else {
1829            1.0 - dot / stored_norm
1830        }
1831    }
1832
1833    fn greedy_closest(&self, start: u32, query: &[f32], level: usize) -> u32 {
1834        let mut current = start;
1835        let mut current_dist = self.approx_dist(current, query);
1836        loop {
1837            let mut changed = false;
1838            if level < self.graph.nodes[current as usize].neighbors.len() {
1839                for &neighbor in &self.graph.nodes[current as usize].neighbors[level] {
1840                    let d = self.approx_dist(neighbor, query);
1841                    if d < current_dist {
1842                        current = neighbor;
1843                        current_dist = d;
1844                        changed = true;
1845                    }
1846                }
1847            }
1848            if !changed {
1849                break;
1850            }
1851        }
1852        current
1853    }
1854
1855    fn search_layer_0(
1856        &self,
1857        query: &[f32],
1858        entry: u32,
1859        ef: usize,
1860        filter: Option<&roaring::RoaringBitmap>,
1861    ) -> Vec<Candidate> {
1862        let mut visited = HashSet::new();
1863        let mut candidates = BinaryHeap::new();
1864        let mut results = BinaryHeap::new();
1865
1866        let d = self.approx_dist(entry, query);
1867        visited.insert(entry);
1868        candidates.push(Candidate { id: entry, dist: d });
1869        if filter.is_none() || filter.unwrap().contains(entry) {
1870            results.push(FurthestCandidate(Candidate { id: entry, dist: d }));
1871        }
1872
1873        while let Some(c) = candidates.pop() {
1874            let furthest_dist = results.peek().map(|f| f.0.dist).unwrap_or(f32::MAX);
1875            if c.dist > furthest_dist && results.len() >= ef {
1876                break;
1877            }
1878
1879            if !self.graph.nodes[c.id as usize].neighbors.is_empty() {
1880                for &neighbor in &self.graph.nodes[c.id as usize].neighbors[0] {
1881                    if visited.insert(neighbor) {
1882                        let d = self.approx_dist(neighbor, query);
1883                        let furthest_dist = results.peek().map(|f| f.0.dist).unwrap_or(f32::MAX);
1884                        if d < furthest_dist || results.len() < ef {
1885                            candidates.push(Candidate {
1886                                id: neighbor,
1887                                dist: d,
1888                            });
1889                            if filter.is_none() || filter.unwrap().contains(neighbor) {
1890                                results.push(FurthestCandidate(Candidate {
1891                                    id: neighbor,
1892                                    dist: d,
1893                                }));
1894                                if results.len() > ef {
1895                                    results.pop();
1896                                }
1897                            }
1898                        }
1899                    }
1900                }
1901            }
1902        }
1903
1904        let mut result: Vec<Candidate> = results.into_iter().map(|f| f.0).collect();
1905        result.sort_by(|a, b| a.dist.partial_cmp(&b.dist).unwrap_or(Ordering::Equal));
1906        result
1907    }
1908
1909    fn brute_force_search(
1910        &self,
1911        query: &[f32],
1912        k: usize,
1913        filter: &roaring::RoaringBitmap,
1914    ) -> Vec<(u32, f32)> {
1915        let mut results: Vec<(u32, f32)> = filter
1916            .iter()
1917            .filter(|&id| (id as usize) < self.graph.num_vectors)
1918            .map(|id| (id, self.exact_dist(id, query)))
1919            .collect();
1920        results.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(Ordering::Equal));
1921        results.truncate(k);
1922        results
1923    }
1924}
1925
1926#[cfg(test)]
1927mod tests {
1928    use super::*;
1929
1930    fn make_params(dims: usize) -> HnswParams {
1931        HnswParams {
1932            dims,
1933            m: 8,
1934            ef_construction: 50,
1935            metric: DistanceMetric::L2,
1936            quantization: QuantizationType::DEFAULT,
1937        }
1938    }
1939
1940    #[test]
1941    fn build_and_search_small() {
1942        let mut builder = HnswBuilder::new(make_params(2));
1943        // 4 points in a square
1944        builder.add_vector(vec![0.0, 0.0]).unwrap(); // 0
1945        builder.add_vector(vec![1.0, 0.0]).unwrap(); // 1
1946        builder.add_vector(vec![0.0, 1.0]).unwrap(); // 2
1947        builder.add_vector(vec![1.0, 1.0]).unwrap(); // 3
1948
1949        let index = builder.build();
1950        let results = index.search(&[0.1, 0.1], 2, 10).unwrap();
1951        assert_eq!(results.len(), 2);
1952        assert_eq!(results[0].0, 0); // closest to (0,0)
1953    }
1954
1955    #[test]
1956    fn search_returns_k_results() {
1957        let mut builder = HnswBuilder::new(make_params(3));
1958        for i in 0..20 {
1959            builder.add_vector(vec![i as f32, 0.0, 0.0]).unwrap();
1960        }
1961        let index = builder.build();
1962        let results = index.search(&[5.0, 0.0, 0.0], 5, 20).unwrap();
1963        assert_eq!(results.len(), 5);
1964        // Should include point 5 (exact match)
1965        assert!(results.iter().any(|(id, _)| *id == 5));
1966    }
1967
1968    #[test]
1969    fn recall_on_random_data() {
1970        let dims = 16;
1971        let n = 200;
1972        let mut builder = HnswBuilder::new(HnswParams {
1973            dims,
1974            m: 12,
1975            ef_construction: 50,
1976            metric: DistanceMetric::L2,
1977            quantization: QuantizationType::DEFAULT,
1978        });
1979
1980        // Deterministic "random" vectors
1981        let mut rng: u64 = 12345;
1982        let mut vectors = Vec::new();
1983        for _ in 0..n {
1984            let mut v = Vec::with_capacity(dims);
1985            for _ in 0..dims {
1986                rng ^= rng << 13;
1987                rng ^= rng >> 7;
1988                rng ^= rng << 17;
1989                v.push((rng as f32 / u64::MAX as f32) * 2.0 - 1.0);
1990            }
1991            vectors.push(v);
1992        }
1993
1994        for v in &vectors {
1995            builder.add_vector(v.clone()).unwrap();
1996        }
1997        let index = builder.build();
1998
1999        // Query with the first vector — it should find itself
2000        let results = index.search(&vectors[0], 1, 50).unwrap();
2001        assert_eq!(results[0].0, 0);
2002
2003        // Compute recall@10 against brute-force
2004        let query = &vectors[42];
2005        let hnsw_results = index.search(query, 10, 50).unwrap();
2006
2007        // Brute-force top-10
2008        let mut brute: Vec<(u32, f32)> = (0..n as u32)
2009            .map(|i| {
2010                (
2011                    i,
2012                    super::super::distance(&vectors[i as usize], query, DistanceMetric::L2),
2013                )
2014            })
2015            .collect();
2016        brute.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap());
2017        let brute_top10: HashSet<u32> = brute[..10].iter().map(|x| x.0).collect();
2018        let hnsw_top10: HashSet<u32> = hnsw_results.iter().map(|x| x.0).collect();
2019
2020        let recall = brute_top10.intersection(&hnsw_top10).count() as f64 / 10.0;
2021        assert!(recall >= 0.8, "recall@10 = {recall}, expected >= 0.8");
2022    }
2023
2024    /// Deterministic pseudo-random vectors for the parallel-build tests.
2025    fn gen_vectors(n: usize, dims: usize, seed: u64) -> Vec<Vec<f32>> {
2026        let mut rng = seed | 1;
2027        let mut vectors = Vec::with_capacity(n);
2028        for _ in 0..n {
2029            let mut v = Vec::with_capacity(dims);
2030            for _ in 0..dims {
2031                rng ^= rng << 13;
2032                rng ^= rng >> 7;
2033                rng ^= rng << 17;
2034                v.push((rng as f32 / u64::MAX as f32) * 2.0 - 1.0);
2035            }
2036            vectors.push(v);
2037        }
2038        vectors
2039    }
2040
2041    fn mean_recall_at_10(index: &HnswIndex, vectors: &[Vec<f32>], queries: &[usize]) -> f64 {
2042        let n = vectors.len();
2043        let mut total = 0.0;
2044        for &qi in queries {
2045            let query = &vectors[qi];
2046            let hnsw: HashSet<u32> = index
2047                .search(query, 10, 64)
2048                .unwrap()
2049                .iter()
2050                .map(|x| x.0)
2051                .collect();
2052            let mut brute: Vec<(u32, f32)> = (0..n as u32)
2053                .map(|i| {
2054                    (
2055                        i,
2056                        super::super::distance(&vectors[i as usize], query, DistanceMetric::L2),
2057                    )
2058                })
2059                .collect();
2060            brute.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap());
2061            let truth: HashSet<u32> = brute[..10].iter().map(|x| x.0).collect();
2062            total += truth.intersection(&hnsw).count() as f64 / 10.0;
2063        }
2064        total / queries.len() as f64
2065    }
2066
2067    /// `store_vector` + `connect_pending(Fixed(1))` must produce a
2068    /// byte-identical graph to the immediate `add_vector` path — the
2069    /// deferred write model's determinism contract at unit scale. (The
2070    /// full golden-bytes test against `main`'s serializer is the
2071    /// integration suite's job.)
2072    #[test]
2073    fn connect_pending_fixed1_matches_add_vector_bytes() {
2074        let dims = 16;
2075        let vectors = gen_vectors(300, dims, 0x00AB_CDEF);
2076
2077        let mut immediate = HnswBuilder::new(make_params(dims));
2078        for v in &vectors {
2079            immediate.add_vector(v.clone()).unwrap();
2080        }
2081        let bytes_immediate = immediate.build().to_bytes();
2082
2083        let mut deferred = HnswBuilder::new(make_params(dims));
2084        for v in &vectors {
2085            deferred.store_vector(v.clone()).unwrap();
2086        }
2087        deferred.connect_pending(BuildThreads::Fixed(1));
2088        let bytes_deferred = deferred.build().to_bytes();
2089
2090        assert_eq!(
2091            bytes_immediate, bytes_deferred,
2092            "Fixed(1) deferred build must be byte-identical to immediate add_vector",
2093        );
2094    }
2095
2096    /// Parallel `connect_pending` must produce graph quality on par with
2097    /// the sequential build (qdrant's results-not-topology invariant). A
2098    /// broken parallel link (e.g. a wrong `ready` gate) craters recall to
2099    /// near zero, which the band below catches.
2100    #[test]
2101    fn parallel_build_recall_matches_sequential() {
2102        let dims = 16;
2103        let vectors = gen_vectors(800, dims, 0x1234_5678);
2104        let queries: Vec<usize> = (0..40).map(|i| i * 17 % vectors.len()).collect();
2105
2106        let mut seq = HnswBuilder::new(make_params(dims));
2107        for v in &vectors {
2108            seq.add_vector(v.clone()).unwrap();
2109        }
2110        let recall_seq = mean_recall_at_10(&seq.build(), &vectors, &queries);
2111
2112        for threads in [
2113            BuildThreads::Fixed(1),
2114            BuildThreads::Fixed(8),
2115            BuildThreads::Ambient,
2116        ] {
2117            let mut par = HnswBuilder::new(make_params(dims));
2118            for v in &vectors {
2119                par.store_vector(v.clone()).unwrap();
2120            }
2121            par.connect_pending(threads);
2122            let recall_par = mean_recall_at_10(&par.build(), &vectors, &queries);
2123            assert!(
2124                recall_par >= recall_seq - 0.10,
2125                "{threads:?}: parallel recall {recall_par:.3} should track sequential \
2126                 {recall_seq:.3} (within 0.10)",
2127            );
2128        }
2129    }
2130
2131    /// Adversarial corpus with many exact-duplicate vectors maximizes
2132    /// shared back-link targets (lock contention). The build must
2133    /// complete (no deadlock) at a high thread count and still answer.
2134    #[test]
2135    fn parallel_build_no_deadlock_on_duplicates() {
2136        let dims = 8;
2137        let distinct = gen_vectors(10, dims, 0x0000_9999);
2138        let mut builder = HnswBuilder::new(make_params(dims));
2139        for i in 0..2000 {
2140            builder
2141                .store_vector(distinct[i % distinct.len()].clone())
2142                .unwrap();
2143        }
2144        builder.connect_pending(BuildThreads::Fixed(16));
2145        let index = builder.build();
2146        let results = index.search(&distinct[0], 5, 32).unwrap();
2147        assert!(
2148            !results.is_empty(),
2149            "duplicate-heavy graph should still answer queries",
2150        );
2151    }
2152
2153    #[test]
2154    fn filtered_search() {
2155        let mut builder = HnswBuilder::new(make_params(2));
2156        for i in 0..10 {
2157            builder.add_vector(vec![i as f32, 0.0]).unwrap();
2158        }
2159        let index = builder.build();
2160
2161        // Only allow even IDs
2162        let mut filter = roaring::RoaringBitmap::new();
2163        for i in (0..10).step_by(2) {
2164            filter.insert(i);
2165        }
2166
2167        let results = index
2168            .search_filtered(&[3.0, 0.0], 3, 20, Some(&filter))
2169            .unwrap();
2170        // All results should have even IDs
2171        for (id, _) in &results {
2172            assert!(id % 2 == 0, "filtered result should be even, got {id}");
2173        }
2174        // Closest even to 3.0 is 2 or 4
2175        assert!(results[0].0 == 2 || results[0].0 == 4);
2176    }
2177
2178    #[test]
2179    fn serialization_round_trip() {
2180        let mut builder = HnswBuilder::new(make_params(3));
2181        builder.add_vector(vec![1.0, 2.0, 3.0]).unwrap();
2182        builder.add_vector(vec![4.0, 5.0, 6.0]).unwrap();
2183        builder.add_vector(vec![7.0, 8.0, 9.0]).unwrap();
2184        let index = builder.build();
2185
2186        let bytes = index.to_bytes();
2187        let restored = HnswIndex::from_bytes(&bytes).unwrap();
2188
2189        assert_eq!(restored.len(), 3);
2190        assert_eq!(restored.dims(), 3);
2191
2192        let r1 = index.search(&[1.0, 2.0, 3.0], 1, 10).unwrap();
2193        let r2 = restored.search(&[1.0, 2.0, 3.0], 1, 10).unwrap();
2194        assert_eq!(r1[0].0, r2[0].0);
2195    }
2196
2197    /// `from_bytes` must reject a malformed blob with an explicit
2198    /// [`LuciError::IndexCorrupted`] rather than panicking out of bounds.
2199    ///
2200    /// Regression for the deserializer-robustness gap surfaced by the
2201    /// visited-pool review: truncated blobs slice-panicked, and an
2202    /// out-of-range neighbour / entry-point id sailed through deserialization
2203    /// to an out-of-bounds panic during search. See
2204    /// [[optimization-hnsw-visited-bitset]].
2205    #[test]
2206    fn from_bytes_rejects_corrupt_blob() {
2207        let mut builder = HnswBuilder::new(make_params(3));
2208        builder.add_vector(vec![1.0, 2.0, 3.0]).unwrap();
2209        builder.add_vector(vec![4.0, 5.0, 6.0]).unwrap();
2210        builder.add_vector(vec![7.0, 8.0, 9.0]).unwrap();
2211        let valid = builder.build().to_bytes();
2212        assert!(
2213            HnswIndex::from_bytes(&valid).is_ok(),
2214            "valid blob must load"
2215        );
2216
2217        // Truncation anywhere in the required header/vectors/nodes region must
2218        // error, not panic (exercises the bounded readers).
2219        for cut in [
2220            1usize,
2221            10,
2222            20,
2223            valid.len() / 2,
2224            valid.len().saturating_sub(6),
2225        ] {
2226            assert!(
2227                HnswIndex::from_bytes(&valid[..cut]).is_err(),
2228                "truncated-to-{cut} blob must be rejected, not panic"
2229            );
2230        }
2231
2232        // Out-of-range entry_point (v2 header bytes [18..22]) must be rejected
2233        // before it can index self.vectors[ep] during search.
2234        let mut bad_ep = valid.clone();
2235        bad_ep[18..22].copy_from_slice(&9999u32.to_le_bytes());
2236        assert!(
2237            matches!(
2238                HnswIndex::from_bytes(&bad_ep),
2239                Err(LuciError::IndexCorrupted(_))
2240            ),
2241            "out-of-range entry_point must be IndexCorrupted"
2242        );
2243
2244        // Out-of-range neighbour id must likewise be rejected. Hand-build a
2245        // structurally-valid v2 blob (1 dim, 2 vectors) whose node 0 points at
2246        // a neighbour past num_vectors, reusing the real metric byte.
2247        let metric_byte = valid[13];
2248        let mut blob = Vec::new();
2249        blob.extend_from_slice(&HNSW_FORMAT_MAGIC);
2250        blob.push(HNSW_FORMAT_VERSION);
2251        blob.extend_from_slice(&1u32.to_le_bytes()); // dims
2252        blob.extend_from_slice(&1u32.to_le_bytes()); // m
2253        blob.push(metric_byte);
2254        blob.extend_from_slice(&2u32.to_le_bytes()); // num_vectors
2255        blob.extend_from_slice(&0u32.to_le_bytes()); // entry_point = 0 (valid)
2256        blob.extend_from_slice(&0u32.to_le_bytes()); // max_level
2257        blob.extend_from_slice(&0.0f32.to_le_bytes()); // vector 0
2258        blob.extend_from_slice(&1.0f32.to_le_bytes()); // vector 1
2259        // node 0: level 0, 1 layer, 1 neighbour pointing out of range
2260        blob.extend_from_slice(&0u32.to_le_bytes()); // level
2261        blob.extend_from_slice(&1u32.to_le_bytes()); // num_layers
2262        blob.extend_from_slice(&1u32.to_le_bytes()); // num_neighbors
2263        blob.extend_from_slice(&99u32.to_le_bytes()); // neighbour 99 >= 2 (BAD)
2264        // node 1: level 0, 1 layer, 1 valid neighbour
2265        blob.extend_from_slice(&0u32.to_le_bytes());
2266        blob.extend_from_slice(&1u32.to_le_bytes());
2267        blob.extend_from_slice(&1u32.to_le_bytes());
2268        blob.extend_from_slice(&0u32.to_le_bytes()); // neighbour 0 (valid)
2269        blob.push(0); // quantized flag: none
2270        assert!(
2271            matches!(
2272                HnswIndex::from_bytes(&blob),
2273                Err(LuciError::IndexCorrupted(_))
2274            ),
2275            "out-of-range neighbour id must be IndexCorrupted"
2276        );
2277    }
2278
2279    /// `from_bytes` must honor `"quantization": "none"`.
2280    ///
2281    /// Regression test for the auto-quantize bug at the original
2282    /// `hnsw.rs:1006-1011`: when the on-disk format records "no
2283    /// quantized data" (flag byte 0), `from_bytes` was synthesising
2284    /// int8 vectors anyway. That routed query-time beam search through
2285    /// scalar int8 dequant even though the user mapping said `"none"`.
2286    /// See [[hnsw-query-path-allocation-overhead]] and
2287    /// [[code-must-not-lie]].
2288    #[test]
2289    fn from_bytes_honors_quantization_none() {
2290        let mut builder = HnswBuilder::new(HnswParams {
2291            dims: 4,
2292            m: 16,
2293            ef_construction: 100,
2294            metric: DistanceMetric::L2,
2295            quantization: QuantizationType::None,
2296        });
2297        builder.add_vector(vec![1.0, 2.0, 3.0, 4.0]).unwrap();
2298        builder.add_vector(vec![5.0, 6.0, 7.0, 8.0]).unwrap();
2299        let index = builder.build();
2300        assert!(
2301            index.quantized.is_none(),
2302            "build must produce quantized = None when mapping says None",
2303        );
2304
2305        let bytes = index.to_bytes();
2306        let restored = HnswIndex::from_bytes(&bytes).unwrap();
2307
2308        assert!(
2309            restored.quantized.is_none(),
2310            "from_bytes must honor on-disk `no quantized data` flag; \
2311             auto-synthesising int8 overrides the user's mapping",
2312        );
2313        assert_eq!(restored.params.quantization, QuantizationType::None);
2314    }
2315
2316    /// Sanity: the explicit-int8 round-trip still works after the
2317    /// None-honoring fix.
2318    #[test]
2319    fn from_bytes_preserves_quantization_int8() {
2320        let mut builder = HnswBuilder::new(HnswParams {
2321            dims: 4,
2322            m: 16,
2323            ef_construction: 100,
2324            metric: DistanceMetric::L2,
2325            quantization: QuantizationType::Int8,
2326        });
2327        builder.add_vector(vec![1.0, 2.0, 3.0, 4.0]).unwrap();
2328        builder.add_vector(vec![5.0, 6.0, 7.0, 8.0]).unwrap();
2329        let index = builder.build();
2330        assert!(
2331            index.quantized.is_some(),
2332            "build must produce quantized = Some when mapping says Int8",
2333        );
2334
2335        let bytes = index.to_bytes();
2336        let restored = HnswIndex::from_bytes(&bytes).unwrap();
2337
2338        assert!(restored.quantized.is_some());
2339        assert_eq!(restored.params.quantization, QuantizationType::Int8);
2340    }
2341
2342    #[test]
2343    fn empty_index() {
2344        let builder = HnswBuilder::new(make_params(2));
2345        let index = builder.build();
2346        let results = index.search(&[0.0, 0.0], 5, 10).unwrap();
2347        assert!(results.is_empty());
2348    }
2349
2350    #[test]
2351    fn single_vector() {
2352        let mut builder = HnswBuilder::new(make_params(2));
2353        builder.add_vector(vec![1.0, 1.0]).unwrap();
2354        let index = builder.build();
2355        let results = index.search(&[0.0, 0.0], 1, 10).unwrap();
2356        assert_eq!(results.len(), 1);
2357        assert_eq!(results[0].0, 0);
2358    }
2359
2360    #[test]
2361    fn cosine_metric() {
2362        let params = HnswParams {
2363            dims: 3,
2364            m: 8,
2365            ef_construction: 50,
2366            metric: DistanceMetric::Cosine,
2367            quantization: QuantizationType::DEFAULT,
2368        };
2369        let mut builder = HnswBuilder::new(params);
2370        builder.add_vector(vec![1.0, 0.0, 0.0]).unwrap(); // 0: x-axis
2371        builder.add_vector(vec![0.0, 1.0, 0.0]).unwrap(); // 1: y-axis
2372        builder.add_vector(vec![0.9, 0.1, 0.0]).unwrap(); // 2: near x-axis
2373
2374        let index = builder.build();
2375        let results = index.search(&[1.0, 0.0, 0.0], 2, 10).unwrap();
2376        // Closest by cosine to x-axis: 0 (identical) then 2 (near x-axis)
2377        assert_eq!(results[0].0, 0);
2378        assert_eq!(results[1].0, 2);
2379    }
2380
2381    #[test]
2382    #[should_panic(expected = "unimplemented quantization")]
2383    fn hnsw_builder_panics_on_int4_quantization() {
2384        // Defense in depth: even if Int4 somehow reaches the builder
2385        // (bypassing the mapping parser), the builder must refuse rather
2386        // than silently substitute Int8. See [[code-must-not-lie]].
2387        HnswBuilder::new(HnswParams {
2388            dims: 4,
2389            m: 8,
2390            ef_construction: 50,
2391            metric: DistanceMetric::Cosine,
2392            quantization: QuantizationType::Int4,
2393        });
2394    }
2395
2396    #[test]
2397    #[should_panic(expected = "unimplemented quantization")]
2398    fn hnsw_builder_panics_on_bbq_quantization() {
2399        HnswBuilder::new(HnswParams {
2400            dims: 4,
2401            m: 8,
2402            ef_construction: 50,
2403            metric: DistanceMetric::Cosine,
2404            quantization: QuantizationType::Bbq,
2405        });
2406    }
2407
2408    /// Cosine builder normalizes each insert to unit length. Stored vectors
2409    /// must have norm == 1 (within f32 rounding) — that's the invariant
2410    /// the v2 kernel relies on to skip the per-call norm passes. See
2411    /// [[optimize-cosine-norm-precompute]].
2412    #[test]
2413    fn builder_normalizes_input_on_cosine() {
2414        let mut builder = HnswBuilder::new(HnswParams {
2415            dims: 3,
2416            m: 8,
2417            ef_construction: 50,
2418            metric: DistanceMetric::Cosine,
2419            quantization: QuantizationType::None,
2420        });
2421        builder.add_vector(vec![3.0, 0.0, 4.0]).unwrap(); // norm = 5
2422        let v = &builder.vectors[0];
2423        let norm_sq: f32 = v.iter().map(|x| x * x).sum();
2424        assert!(
2425            (norm_sq - 1.0).abs() < 1e-5,
2426            "stored vector must be unit-length, got norm_sq = {norm_sq}",
2427        );
2428        assert!((v[0] - 0.6).abs() < 1e-5);
2429        assert!((v[2] - 0.8).abs() < 1e-5);
2430    }
2431
2432    /// Zero-vector insertion on Cosine must fail loudly rather than embed
2433    /// a vector whose cosine score is undefined. See [[code-must-not-lie]].
2434    #[test]
2435    fn builder_rejects_zero_vector_with_cosine() {
2436        let mut builder = HnswBuilder::new(HnswParams {
2437            dims: 3,
2438            m: 8,
2439            ef_construction: 50,
2440            metric: DistanceMetric::Cosine,
2441            quantization: QuantizationType::None,
2442        });
2443        let err = builder.add_vector(vec![0.0, 0.0, 0.0]).unwrap_err();
2444        assert!(
2445            matches!(err, LuciError::InvalidQuery(_)),
2446            "expected InvalidQuery, got {err:?}",
2447        );
2448    }
2449
2450    /// DotProduct / L2 builders pass raw vectors through, including the
2451    /// all-zero vector — the cosine-specific normalization gate must not
2452    /// kick in.
2453    #[test]
2454    fn builder_accepts_zero_vector_with_dot_product() {
2455        let mut builder = HnswBuilder::new(HnswParams {
2456            dims: 3,
2457            m: 8,
2458            ef_construction: 50,
2459            metric: DistanceMetric::DotProduct,
2460            quantization: QuantizationType::None,
2461        });
2462        builder.add_vector(vec![0.0, 0.0, 0.0]).unwrap();
2463        assert_eq!(builder.vectors[0], vec![0.0, 0.0, 0.0]);
2464    }
2465
2466    /// Cosine bulk insert must abort on the first zero vector so the
2467    /// caller learns about the bad input — silently inserting only the
2468    /// good ones would drop data.
2469    #[test]
2470    fn bulk_aborts_on_zero_vector() {
2471        let mut builder = HnswBuilder::new(HnswParams {
2472            dims: 3,
2473            m: 8,
2474            ef_construction: 50,
2475            metric: DistanceMetric::Cosine,
2476            quantization: QuantizationType::None,
2477        });
2478        builder.add_vector(vec![1.0, 0.0, 0.0]).unwrap();
2479        let err = builder.add_vector(vec![0.0, 0.0, 0.0]).unwrap_err();
2480        assert!(matches!(err, LuciError::InvalidQuery(_)));
2481        // No third insert should run.
2482        assert_eq!(builder.vectors.len(), 1);
2483    }
2484
2485    #[test]
2486    fn hnsw_builder_accepts_none_quantization() {
2487        let mut builder = HnswBuilder::new(HnswParams {
2488            dims: 3,
2489            m: 8,
2490            ef_construction: 50,
2491            metric: DistanceMetric::Cosine,
2492            quantization: QuantizationType::None,
2493        });
2494        builder.add_vector(vec![1.0, 0.0, 0.0]).unwrap();
2495        let index = builder.build();
2496        // No quantized blob materialized when QuantizationType::None.
2497        assert!(index.quantized.is_none());
2498    }
2499
2500    #[test]
2501    fn m_max_is_2m_at_layer_0() {
2502        // M_max0 = 2*M at layer 0, M_max = M at higher layers.
2503        // See [[fix-hnsw-neighbor-heuristic]].
2504        let builder = HnswBuilder::new(HnswParams {
2505            dims: 2,
2506            m: 8,
2507            ef_construction: 50,
2508            metric: DistanceMetric::L2,
2509            quantization: QuantizationType::None,
2510        });
2511        assert_eq!(builder.m_max(0), 16);
2512        assert_eq!(builder.m_max(1), 8);
2513        assert_eq!(builder.m_max(5), 8);
2514    }
2515
2516    #[test]
2517    fn select_neighbors_heuristic_rejects_clustered_candidate() {
2518        // Direction-regression test. Configuration where simple
2519        // (m-closest) selection would pick A and B (both close to
2520        // query, in the same direction), and the diversity-aware
2521        // heuristic should pick A and C (B rejected as redundant,
2522        // C kept because it's in a different direction).
2523        //
2524        // Query is conceptually the origin. Vectors live in 2D:
2525        //   id=0 — query anchor (only used to provide a builder slot)
2526        //   id=1 — A at (1.0, 0.0)
2527        //   id=2 — B at (1.0, 0.05) — very close to A in space
2528        //   id=3 — C at (0.0, 1.05) — different direction, slightly farther
2529        let mut builder = HnswBuilder::new(HnswParams {
2530            dims: 2,
2531            m: 2,
2532            ef_construction: 10,
2533            metric: DistanceMetric::L2,
2534            quantization: QuantizationType::None,
2535        });
2536        builder.add_vector(vec![0.0, 0.0]).unwrap(); // 0 — query anchor
2537        builder.add_vector(vec![1.0, 0.0]).unwrap(); // 1 — A
2538        builder.add_vector(vec![1.0, 0.05]).unwrap(); // 2 — B (clustered with A)
2539        builder.add_vector(vec![0.0, 1.05]).unwrap(); // 3 — C (diverse from A)
2540
2541        // Synthetic candidate list: dist field is dist-to-query.
2542        let candidates = vec![
2543            Candidate { id: 1, dist: 1.0 }, // A
2544            Candidate {
2545                id: 2,
2546                dist: 1.00125,
2547            }, // B (sqrt(1^2 + 0.05^2))
2548            Candidate { id: 3, dist: 1.05 }, // C
2549        ];
2550
2551        let selected = builder.select_neighbors_heuristic(&candidates, 2);
2552        let ids: HashSet<u32> = selected.iter().map(|c| c.id).collect();
2553
2554        assert_eq!(selected.len(), 2);
2555        assert!(
2556            ids.contains(&1),
2557            "Expected A (id 1) selected, got {:?}",
2558            ids
2559        );
2560        assert!(
2561            ids.contains(&3),
2562            "Expected C (id 3) selected (diverse direction), got {:?}",
2563            ids
2564        );
2565        assert!(
2566            !ids.contains(&2),
2567            "Expected B (id 2) rejected (too close to A), got {:?}",
2568            ids
2569        );
2570    }
2571
2572    #[test]
2573    fn select_neighbors_heuristic_satisfies_diversity_invariant() {
2574        // For any output of the heuristic with m candidates selected,
2575        // every later-selected neighbor must be at least as close to
2576        // the query as it is to every earlier-selected neighbor.
2577        // This is the load-bearing invariant of Algorithm 4 —
2578        // robust to tie-breaking shifts that a literal-output test
2579        // would break on.
2580        let dims = 4;
2581        let n = 30;
2582        let mut builder = HnswBuilder::new(HnswParams {
2583            dims,
2584            m: 8,
2585            ef_construction: 50,
2586            metric: DistanceMetric::L2,
2587            quantization: QuantizationType::None,
2588        });
2589
2590        // Deterministic synthetic vectors.
2591        let mut rng: u64 = 12345;
2592        for _ in 0..n {
2593            let mut v = Vec::with_capacity(dims);
2594            for _ in 0..dims {
2595                rng ^= rng << 13;
2596                rng ^= rng >> 7;
2597                rng ^= rng << 17;
2598                v.push((rng as f32 / u64::MAX as f32) * 2.0 - 1.0);
2599            }
2600            builder.add_vector(v).unwrap();
2601        }
2602
2603        // Treat node 0 as the conceptual query. Build candidates
2604        // from all other nodes with dist field = dist(other, 0).
2605        let candidates: Vec<Candidate> = (1..n as u32)
2606            .map(|id| Candidate {
2607                id,
2608                dist: builder.dist(id, 0),
2609            })
2610            .collect();
2611
2612        let selected = builder.select_neighbors_heuristic(&candidates, 8);
2613        assert!(!selected.is_empty(), "heuristic returned empty selection");
2614
2615        for (i, s_i) in selected.iter().enumerate() {
2616            for s_k in &selected[..i] {
2617                let d_ik = builder.dist(s_i.id, s_k.id);
2618                assert!(
2619                    d_ik >= s_i.dist,
2620                    "Diversity invariant violated: selected[{i}] (id={}, dist_to_query={}) \
2621                     has dist to earlier-selected (id={}) of {}; expected >= {}",
2622                    s_i.id,
2623                    s_i.dist,
2624                    s_k.id,
2625                    d_ik,
2626                    s_i.dist
2627                );
2628            }
2629        }
2630    }
2631
2632    /// `with_capacity_for_merge` pre-sizes vectors and nodes and
2633    /// `add_vector_at_ordinal` fills slot N regardless of insertion
2634    /// order. Built graph is searchable.
2635    #[test]
2636    fn add_vector_at_ordinal_fills_reserved_slot() {
2637        let params = make_params(3);
2638        let mut builder = HnswBuilder::with_capacity_for_merge(params, 5);
2639        // Insert ordinals out of order to exercise the slot semantic
2640        builder
2641            .add_vector_at_ordinal(2, vec![0.0, 0.0, 1.0])
2642            .unwrap();
2643        builder
2644            .add_vector_at_ordinal(0, vec![1.0, 0.0, 0.0])
2645            .unwrap();
2646        builder
2647            .add_vector_at_ordinal(4, vec![0.5, 0.5, 0.0])
2648            .unwrap();
2649        builder
2650            .add_vector_at_ordinal(1, vec![0.0, 1.0, 0.0])
2651            .unwrap();
2652        builder
2653            .add_vector_at_ordinal(3, vec![0.7, 0.0, 0.7])
2654            .unwrap();
2655
2656        assert_eq!(builder.vectors[0], vec![1.0, 0.0, 0.0]);
2657        assert_eq!(builder.vectors[2], vec![0.0, 0.0, 1.0]);
2658        assert_eq!(builder.vectors[4], vec![0.5, 0.5, 0.0]);
2659
2660        let index = builder.build();
2661        let results = index.search(&[1.0, 0.0, 0.0], 1, 10).unwrap();
2662        assert_eq!(results[0].0, 0, "x-axis vector should match query closest");
2663    }
2664
2665    /// `seed_from_graph` copies a built graph's topology into a
2666    /// pre-sized merge builder under an ordinal remapping. Search
2667    /// against the seeded builder returns the same top-1 as the
2668    /// original, modulo the remap.
2669    #[test]
2670    fn seed_from_graph_round_trips_topology() {
2671        // Build a small source graph.
2672        let dims = 3;
2673        let mut src = HnswBuilder::new(make_params(dims));
2674        src.add_vector(vec![1.0, 0.0, 0.0]).unwrap();
2675        src.add_vector(vec![0.0, 1.0, 0.0]).unwrap();
2676        src.add_vector(vec![0.0, 0.0, 1.0]).unwrap();
2677        src.add_vector(vec![0.7, 0.0, 0.7]).unwrap();
2678        let src_index = src.build();
2679        let src_bytes = src_index.to_bytes();
2680
2681        // Re-open the source as a parsed graph, then seed-fill a new
2682        // builder under an identity remap (no actual remapping).
2683        let graph = ParsedGraph::parse(&src_bytes).unwrap();
2684        let mut merged = HnswBuilder::with_capacity_for_merge(make_params(dims), 4);
2685        merged.seed_from_graph(&graph, &src_bytes, |src_ord| src_ord);
2686
2687        // Entry point and max level survive (packed into `entry` now).
2688        let (merged_ep, merged_max) = unpack_entry(merged.entry.load(AtomicOrdering::Relaxed));
2689        assert_eq!(merged_ep, graph.entry_point.unwrap_or(ENTRY_SENTINEL));
2690        assert_eq!(merged_max as usize, graph.max_level);
2691        // All four vector slots are populated
2692        for ord in 0..4 {
2693            assert_eq!(
2694                merged.vectors[ord].len(),
2695                dims,
2696                "ordinal {ord} should be seeded with a {dims}-dim vector",
2697            );
2698        }
2699
2700        // Search against the rebuilt graph
2701        let merged_index = merged.build();
2702        let results = merged_index.search(&[1.0, 0.0, 0.0], 1, 10).unwrap();
2703        assert_eq!(results[0].0, 0);
2704    }
2705}