Skip to main content

quiver_embed/
lib.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2//! The embeddable, in-process Quiver database handle.
3//!
4//! [`Database`] composes the storage engine ([`quiver_core::Store`]) with a
5//! per-collection vector index and payload filtering ([`quiver_query::Filter`])
6//! into one handle. It exposes the same logical operations the server speaks
7//! (`docs/api/wire-protocol.md`), so library mode and server mode exercise
8//! identical engine semantics — the server is a thin transport/policy shell.
9//!
10//! ## Index lifecycle
11//! The store is the source of truth. Each collection chooses its index via the
12//! descriptor's [`IndexSpec`] (default in-memory HNSW); the index is built from
13//! the store on open. HNSW applies new-id inserts incrementally; once an IVF
14//! index is built it applies inserts, in-place updates, and deletes
15//! incrementally with LIRE rebalancing (ADR-0023). The Vamana / disk graph
16//! family is maintained the FreshDiskANN way (ADR-0033): the batch-built graph
17//! is a read-only base, recent inserts land in an in-memory delta graph, and
18//! deletes are tombstoned, so writes are size-independent; when the pending work
19//! grows past a fixed fraction of the base the next access consolidates by
20//! rebuilding from the store. All indexes stay derived (rebuilt from the store
21//! on open), so the crash gate never sees an index write.
22//!
23//! ## Filtered (hybrid) search
24//! A search may carry a [`quiver_query::Filter`] over the payload. The planner
25//! decomposes it into the predicates the collection's secondary indexes can
26//! answer; when those narrow the query to a small candidate set it scans that
27//! set exactly (perfect recall, no filtered-ANN cliff), and otherwise it
28//! over-fetches from the ANN index and post-filters. Both arms re-check the full
29//! filter, so results are exact regardless of which path runs.
30//!
31//! ## Concurrency (ADR-0057 / ADR-0062)
32//! Single-writer. Writes take `&mut self`. Reads come in two flavors: the
33//! `&mut self` convenience methods (`search`, `hybrid_search`,
34//! `search_multi_vector`) rebuild a stale index in place and so give embedded,
35//! single-threaded callers read-your-writes; the `&self` `*_snapshot` methods
36//! read the current immutable snapshot and run **concurrently**, serving the
37//! *prior* snapshot when a write deferred a rebuild (snapshot-isolated, slightly
38//! stale). A server therefore serves concurrent reads behind a reader–writer lock,
39//! and rebuilds **off** the exclusive lock (ADR-0062): it captures the rebuild
40//! inputs under the shared lock ([`Database::snapshot_rebuild_inputs`]), builds the
41//! new index with no lock held ([`RebuildInputs::build`]), and installs it under a
42//! brief write lock ([`Database::commit_rebuild`]) — so a rebuild never stalls
43//! concurrent readers.
44
45use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
46use std::path::Path;
47use std::sync::Arc;
48
49use arc_swap::ArcSwap;
50use quiver_core::{SecPredicate, SecValue, Store};
51use quiver_index::{
52    ColbertConfig, ColbertIndex, DiskVamana, FreshDiskVamana, FreshVamana, Hnsw, HnswConfig, Index,
53    Ivf, IvfConfig, Metric, Neighbor, ProductQuantizer, Vamana, VamanaConfig, max_sim,
54    ordering_distance, report_metric,
55};
56use serde::{Deserialize, Serialize};
57use serde_json::Value;
58use thiserror::Error;
59
60pub use quiver_core::keyring::{KeyRing, SingleCodecKeyRing};
61pub use quiver_core::page::PageCodec;
62pub use quiver_core::{CollectionId, CommitObserver, WalEntry, WalOp};
63pub use quiver_core::{
64    Descriptor, DistanceMetric, Dtype, FieldType, FilterableField, IndexKind, IndexSpec,
65    VectorEncryption,
66};
67pub use quiver_query::Filter;
68pub use quiver_query::{
69    BM25_B, BM25_K1, DEFAULT_RRF_K0, SPARSE_KEY, SparseInvertedIndex, SparseVector, TEXT_KEY,
70    query_term_ids, rrf_fuse, text_to_sparse,
71};
72
73/// Errors returned by the embeddable database.
74#[derive(Debug, Error)]
75#[non_exhaustive]
76pub enum Error {
77    /// An error from the storage engine (includes not-found / already-exists /
78    /// invalid-argument from the catalog and write path).
79    #[error(transparent)]
80    Core(#[from] quiver_core::CoreError),
81    /// An error from the vector index.
82    #[error(transparent)]
83    Index(#[from] quiver_index::IndexError),
84    /// An error from the disk-resident index (build, open, or query).
85    #[error(transparent)]
86    Disk(#[from] quiver_index::DiskError),
87    /// A payload could not be (de)serialized as JSON.
88    #[error("payload json error: {0}")]
89    Json(#[from] serde_json::Error),
90    /// The named collection is not loaded in this database.
91    #[error("collection not found: {0}")]
92    CollectionNotFound(String),
93    /// The requested index / metric combination is not supported.
94    #[error("unsupported configuration: {0}")]
95    Unsupported(&'static str),
96    /// A durable index snapshot could not be restored (ADR-0025); the caller
97    /// falls back to rebuilding from the store, so this does not surface to users.
98    #[error(transparent)]
99    IndexSnapshot(#[from] quiver_index::SnapshotError),
100    /// An index snapshot envelope could not be (de)serialized.
101    #[error("index snapshot envelope: {0}")]
102    Envelope(#[from] postcard::Error),
103}
104
105/// Result alias for database operations.
106pub type Result<T> = std::result::Result<T, Error>;
107
108/// What a [`Database::snapshot`] captured (ADR-0050): the catalog generation and
109/// the number of files / bytes copied.
110#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
111pub struct SnapshotInfo {
112    /// The manifest version the snapshot reflects (its consistent LSN anchor).
113    pub manifest_version: u64,
114    /// Number of files copied into the snapshot directory.
115    pub files: u64,
116    /// Total bytes copied.
117    pub bytes: u64,
118}
119
120/// A single search or fetch result.
121#[derive(Debug, Clone, PartialEq)]
122pub struct Match {
123    /// External id of the point.
124    pub id: String,
125    /// Distance / similarity under the collection metric (0 for a direct fetch).
126    pub score: f32,
127    /// The payload, if requested.
128    pub payload: Option<Value>,
129    /// The vector, if requested.
130    pub vector: Option<Vec<f32>>,
131}
132
133/// A multi-vector (late-interaction / ColBERT) document result: a document id, its
134/// MaxSim relevance, the payload, and — if requested — the document's token
135/// vectors (ADR-0028).
136#[derive(Debug, Clone, PartialEq)]
137pub struct DocumentMatch {
138    /// Document id.
139    pub id: String,
140    /// MaxSim relevance score (0 for a direct fetch).
141    pub score: f32,
142    /// The document payload, if requested / present (stored on the anchor token).
143    pub payload: Option<Value>,
144    /// The document's token vectors, if requested.
145    pub vectors: Option<Vec<Vec<f32>>>,
146}
147
148// A candidate document during multi-vector re-ranking, before it becomes a
149// [`DocumentMatch`]: `(MaxSim score, document id, anchor payload, token vectors
150// when requested)`. Named so the re-rank buffer stays under clippy's
151// type-complexity threshold.
152type ScoredDocument = (f32, String, Option<Value>, Option<Vec<Vec<f32>>>);
153
154/// Parameters for a [`Database::search`].
155#[derive(Debug, Clone)]
156pub struct SearchParams {
157    /// Number of results to return.
158    pub k: usize,
159    /// Optional payload predicate. The planner pre-filters through the
160    /// collection's secondary indexes when the filter is selective enough, and
161    /// otherwise post-filters the ANN candidates; either way the full predicate
162    /// is re-checked, so results are exact.
163    pub filter: Option<Filter>,
164    /// Search beam width (recall/latency knob), clamped up to at least `k`.
165    pub ef_search: usize,
166    /// Include payloads in the results.
167    pub with_payload: bool,
168    /// Include vectors in the results.
169    pub with_vector: bool,
170}
171
172impl Default for SearchParams {
173    fn default() -> Self {
174        Self {
175            k: 10,
176            filter: None,
177            ef_search: 64,
178            with_payload: true,
179            with_vector: false,
180        }
181    }
182}
183
184// How many extra candidates to pull before post-filtering, so a filtered query
185// still has enough survivors to fill `k`.
186const FILTER_OVERFETCH: usize = 8;
187
188// Hybrid search (ADR-0043) pulls `k * RRF_CANDIDATE_FACTOR` (at least
189// `MIN_RRF_CANDIDATES`) candidates from each side before Reciprocal Rank Fusion,
190// so a document ranked outside the top `k` on one side can still surface via the
191// other.
192const RRF_CANDIDATE_FACTOR: usize = 10;
193const MIN_RRF_CANDIDATES: usize = 100;
194
195// Selectivity threshold for the hybrid planner. When a payload filter decomposes
196// into secondary-index predicates that narrow a query to at most this many live
197// candidate rows, those rows are scanned exactly (brute force) instead of going
198// through the ANN index. Below this size an exact scan is both cheaper and
199// higher-recall than filtered ANN, which can return too few results when the
200// filter is very selective (the "filtered-search recall cliff"). Qdrant calls
201// the equivalent knob its full-scan threshold.
202const FULL_SCAN_THRESHOLD: usize = 10_000;
203
204// Soft-deleted fraction at which a built HNSW is rebuilt from the store to
205// reclaim its tombstoned graph nodes (ADR-0026). Below it, a delete is an O(1)
206// soft-delete; at it, the next access rebuilds.
207const HNSW_REBUILD_DELETED_FRACTION: f64 = 0.2;
208
209/// Pending graph work — delta inserts plus tombstones — at which a FreshDiskANN
210/// graph collection consolidates: the next access rebuilds the base from the
211/// store, reclaiming tombstones and folding in the delta (ADR-0033). Below it,
212/// inserts go to the in-memory delta and deletes are `O(1)` tombstones. Mirrors
213/// [`HNSW_REBUILD_DELETED_FRACTION`].
214const GRAPH_REBUILD_PENDING_FRACTION: f64 = 0.2;
215
216// The vector index backing one collection. HNSW and (once built) IVF are
217// maintained incrementally; the Vamana and disk graphs are batch-built from the
218// store (the `Option` is `None` until first build) and then maintained
219// incrementally the FreshDiskANN way — a read-only base graph plus an in-memory
220// delta and deletion set, consolidated by a rebuild past a churn threshold
221// (ADR-0033).
222enum CollectionIndex {
223    // A fetch-only collection with no server-side ANN index: a client-side-encrypted
224    // collection (ADR-0032) stores opaque ciphertext the server never ranks, so the
225    // client fetches points and ranks locally.
226    None,
227    Hnsw(Hnsw),
228    Vamana(Option<FreshVamana>),
229    Ivf(Option<Ivf>),
230    // The disk-resident DiskANN index: PQ codes in RAM, graph + full vectors on
231    // (encrypted) SSD, exact re-rank (ADR-0019), with a FreshDiskANN in-memory
232    // delta layered on top so the on-disk artifact stays immutable.
233    Disk(Option<FreshDiskVamana>),
234    // The ColBERTv2/PLAID compressed token-pool index for a multi-vector
235    // collection (ADR-0034): centroid + residual-PQ codes with centroid-pruned
236    // candidate generation.
237    Colbert(Option<ColbertIndex>),
238}
239
240impl CollectionIndex {
241    // Search, mapping the generic `ef` knob onto each index's search width.
242    fn search(&self, query: &[f32], k: usize, ef: usize) -> Result<Vec<Neighbor>> {
243        Ok(match self {
244            CollectionIndex::Hnsw(h) => h.search(query, k, ef)?,
245            CollectionIndex::Vamana(Some(g)) => g.search(query, k, ef)?,
246            CollectionIndex::Ivf(Some(i)) => i.search(query, k, ef)?,
247            CollectionIndex::Disk(Some(d)) => d.search(query, k, ef)?,
248            CollectionIndex::Colbert(Some(c)) => c.search(query, k, ef)?,
249            CollectionIndex::None
250            | CollectionIndex::Vamana(None)
251            | CollectionIndex::Ivf(None)
252            | CollectionIndex::Disk(None)
253            | CollectionIndex::Colbert(None) => Vec::new(),
254        })
255    }
256}
257
258// === Lock-free MVCC serving snapshot (ADR-0064, increment 1) ===
259//
260// The single writer mutates indexes in place under the write lock, so a reader
261// cannot share the live index by `Arc` and read it lock-free — that is a data
262// race. Instead, in MVCC mode the writer publishes an immutable
263// [`CollectionSnapshot`] (the base index as of the last rebuild + an [`Overlay`]
264// of writes since) into an [`ArcSwap`]; readers `load()` it without a lock. The
265// overlay is index-kind-agnostic and bounded by the rebuild cadence, so it never
266// rewrites the index and a republish costs O(overlay), not O(index). MVCC changes
267// **visibility, not durability**: the WAL-fsync acknowledgement and the `kill -9`
268// crash gate are untouched (the overlay is derived from the same WAL the store
269// already replays).
270
271/// Per-collection lock-free serving snapshot pointer: the single writer
272/// `store`s a new [`CollectionSnapshot`]; readers `load` one without a lock.
273/// (`ArcSwap<T>` stores an `Arc<T>` internally, so this is one `Arc` per load.)
274pub type SnapshotCell = Arc<ArcSwap<CollectionSnapshot>>;
275
276/// Writes accumulated since a [`CollectionSnapshot`]'s base index was published.
277/// A lock-free read brute-scans these recent vectors and merges them with the
278/// base-index search, dropping tombstoned ids. Cloned (O(overlay)) on each write
279/// and reset to empty when a rebuild folds it into a fresh base.
280#[derive(Default, Clone)]
281struct Overlay {
282    // (vector, external id) for points upserted since the base, in id order: the
283    // j-th entry's internal id is `base_len + j`.
284    upserts: Vec<(Arc<[f32]>, String)>,
285    // Internal ids (base or overlay) deleted or superseded since the base; their
286    // hits are dropped from a search.
287    tombstones: HashSet<u64>,
288}
289
290/// An immutable, lock-free-readable view of a single-vector collection (ADR-0064):
291/// the base index as of the last rebuild, the base id map, and the overlay of
292/// writes since. Obtained via [`Database::collection_snapshot`] and read with
293/// [`CollectionSnapshot::search`]; a read is snapshot-isolated — it sees one
294/// consistent `(base, overlay)` pair, and a write that lands mid-read is simply
295/// the next snapshot.
296pub struct CollectionSnapshot {
297    base: Arc<CollectionIndex>,
298    base_int_to_ext: Arc<Vec<String>>,
299    base_len: u64,
300    overlay: Arc<Overlay>,
301    metric: Metric,
302}
303
304impl CollectionSnapshot {
305    // An empty snapshot for a freshly created/opened collection (no base yet); the
306    // writer publishes a real one at the first rebuild. Reading it yields no hits.
307    fn empty(metric: Metric) -> Self {
308        Self {
309            base: Arc::new(CollectionIndex::None),
310            base_int_to_ext: Arc::new(Vec::new()),
311            base_len: 0,
312            overlay: Arc::new(Overlay::default()),
313            metric,
314        }
315    }
316
317    // Map an internal id to its external id: base ids index the base map, overlay
318    // ids (`>= base_len`) index the overlay's upserts.
319    fn ext_id(&self, internal: u64) -> Option<&str> {
320        if internal < self.base_len {
321            self.base_int_to_ext
322                .get(internal as usize)
323                .map(String::as_str)
324        } else {
325            self.overlay
326                .upserts
327                .get((internal - self.base_len) as usize)
328                .map(|(_, e)| e.as_str())
329        }
330    }
331
332    /// Lock-free nearest-neighbor search over the base index merged with the
333    /// overlay (ADR-0064 increment 1) — **pure vector reads**: no payload filter
334    /// and no payload/vector fetch (those need the store and land in increment 2).
335    /// Returns the `k` nearest live points, closest first, scored in the true
336    /// collection metric — identical ordering and scores to the locked
337    /// [`Database::search_snapshot`] path for the same case.
338    ///
339    /// # Errors
340    /// Propagates an index search error.
341    pub fn search(&self, query: &[f32], k: usize, ef_search: usize) -> Result<Vec<Match>> {
342        if k == 0 {
343            return Ok(Vec::new());
344        }
345        // Collect candidates in "smaller is closer" ordering space (uniform across
346        // metrics — `score::ordering_distance`), dropping tombstoned ids.
347        let mut cands: Vec<(f32, u64)> = Vec::new();
348        for n in self.base.search(query, k, ef_search)? {
349            if !self.overlay.tombstones.contains(&n.id) {
350                // `Neighbor.distance` is the reported metric; `report_metric` is its
351                // own inverse (identity for L2, negation for similarities), so it
352                // maps the reported value back to the ordering key.
353                cands.push((report_metric(self.metric, n.distance), n.id));
354            }
355        }
356        for (j, (vector, _)) in self.overlay.upserts.iter().enumerate() {
357            let internal = self.base_len + j as u64;
358            if !self.overlay.tombstones.contains(&internal) {
359                cands.push((ordering_distance(self.metric, query, vector), internal));
360            }
361        }
362        cands.sort_by(|a, b| a.0.total_cmp(&b.0));
363        cands.truncate(k);
364        let mut out = Vec::with_capacity(cands.len());
365        for (ordering, internal) in cands {
366            if let Some(ext) = self.ext_id(internal) {
367                out.push(Match {
368                    id: ext.to_owned(),
369                    score: report_metric(self.metric, ordering),
370                    payload: None,
371                    vector: None,
372                });
373            }
374        }
375        Ok(out)
376    }
377}
378
379// A fresh, empty snapshot cell for a collection's metric — the initial value of
380// `CollectionHandle::snapshot` (the writer publishes a real base at the first
381// rebuild when MVCC is on; left untouched, at one tiny allocation, when off).
382fn empty_snapshot(descriptor: &Descriptor) -> SnapshotCell {
383    let metric = to_index_metric(descriptor.metric);
384    Arc::new(ArcSwap::from_pointee(CollectionSnapshot::empty(metric)))
385}
386
387// Whether a collection *kind* can be served by the lock-free MVCC snapshot path
388// (ADR-0064 increment 1), independent of the flag: single-vector, server-searchable,
389// and an **in-memory** index. Disk-resident indexes are excluded for now — their
390// `mmap` assumes an immutable file, which a superseded snapshot still referencing
391// the old file would violate when a rebuild seals a new one (a later increment
392// versions the file).
393fn mvcc_eligible(descriptor: &Descriptor) -> bool {
394    !descriptor.multivector
395        && descriptor.vector_encryption != VectorEncryption::ClientSide
396        && descriptor.index.kind != IndexKind::DiskVamana
397}
398
399// Whether a collection is *currently* served via the snapshot: eligible and the
400// flag is on.
401fn mvcc_served(handle: &CollectionHandle) -> bool {
402    handle.mvcc && mvcc_eligible(&handle.descriptor)
403}
404
405// Republish a collection's snapshot reusing the immutable base + id map, swapping
406// in a freshly-extended overlay (the per-write O(overlay) cost).
407fn publish_overlay(handle: &CollectionHandle, prior: &CollectionSnapshot, overlay: Arc<Overlay>) {
408    handle.snapshot.store(Arc::new(CollectionSnapshot {
409        base: prior.base.clone(),
410        base_int_to_ext: prior.base_int_to_ext.clone(),
411        base_len: prior.base_len,
412        overlay,
413        metric: prior.metric,
414    }));
415}
416
417// Move a freshly rebuilt index out of `handle.index` into a new published
418// snapshot with an empty overlay (the base now absorbs all prior writes). Leaves
419// `handle.index` empty — in MVCC mode the live base lives in the snapshot.
420fn publish_base(handle: &mut CollectionHandle) {
421    let base = std::mem::replace(&mut handle.index, empty_index(&handle.descriptor));
422    let metric = to_index_metric(handle.descriptor.metric);
423    handle.snapshot.store(Arc::new(CollectionSnapshot {
424        base: Arc::new(base),
425        base_int_to_ext: Arc::new(handle.int_to_ext.clone()),
426        base_len: handle.int_to_ext.len() as u64,
427        overlay: Arc::new(Overlay::default()),
428        metric,
429    }));
430}
431
432// Overlay size at which an MVCC write defers a consolidating rebuild: ~20% churn
433// over the base (the same threshold that already triggers consolidation), with a
434// floor so a tiny base does not rebuild on every write.
435fn overlay_rebuild_threshold(base_len: u64) -> u64 {
436    (base_len / 5).max(1024)
437}
438
439// MVCC-mode single-vector upsert (ADR-0064): append to the published overlay
440// instead of mutating the immutable base, so lock-free readers stay race-free.
441// `ponytail`: republishes per write — O(overlay) each; coalesce per batch if
442// batched upserts under MVCC ever dominate.
443fn overlay_upsert(handle: &mut CollectionHandle, ext_id: &str, vector: &[f32]) {
444    bump_write_gen(handle);
445    let cur = handle.snapshot.load_full();
446    let mut overlay = cur.overlay.as_ref().clone();
447    // An update supersedes the prior copy (in the base or the overlay): tombstone it.
448    if let Some(&old) = handle.ext_to_int.get(ext_id) {
449        overlay.tombstones.insert(old);
450    }
451    let internal = cur.base_len + overlay.upserts.len() as u64;
452    overlay.upserts.push((Arc::from(vector), ext_id.to_owned()));
453    handle.ext_to_int.insert(ext_id.to_owned(), internal);
454    handle.int_to_ext.push(ext_id.to_owned());
455    let crowded = overlay.upserts.len() as u64 >= overlay_rebuild_threshold(cur.base_len);
456    publish_overlay(handle, &cur, Arc::new(overlay));
457    // Defer a consolidating rebuild once the overlay is large (the server runs it
458    // off-lock; the embedded `&mut` search rebuilds synchronously).
459    if crowded {
460        handle.stale = true;
461    }
462}
463
464// MVCC-mode single-vector delete (ADR-0064): tombstone the id in the published
465// overlay; the base stays immutable.
466fn overlay_delete(handle: &mut CollectionHandle, ext_id: &str) {
467    bump_write_gen(handle);
468    let Some(&internal) = handle.ext_to_int.get(ext_id) else {
469        return;
470    };
471    let cur = handle.snapshot.load_full();
472    let mut overlay = cur.overlay.as_ref().clone();
473    overlay.tombstones.insert(internal);
474    publish_overlay(handle, &cur, Arc::new(overlay));
475}
476
477/// On-disk envelope (ADR-0025) for a durable IVF snapshot: the `Ivf` bytes plus
478/// the internal->external id mapping they are addressed by, postcard-encoded and
479/// handed to the store as one opaque blob. On open the envelope is decoded, the
480/// `Ivf` restored, and the post-checkpoint WAL tail replayed. A decode/version
481/// error means "rebuild from the store" — the snapshot is only ever a fast path.
482#[derive(Serialize, Deserialize)]
483struct IndexEnvelope {
484    version: u16,
485    int_to_ext: Vec<String>,
486    ivf: Vec<u8>,
487}
488
489// Envelope format version, independent of the product SemVer (and of the inner
490// `Ivf` snapshot version); a mismatch falls back to a rebuild.
491const INDEX_ENVELOPE_VERSION: u16 = 1;
492
493/// On-disk envelope (ADR-0063) for a durable DiskVamana snapshot. Unlike the IVF
494/// envelope, the bulk (graph + full vectors) stays in the immutable `mmap`-ed
495/// base file (`vamana.qvx`); this blob carries only what ties that base to the
496/// live state — the base point count (validated against the opened file), the
497/// FreshDiskANN tombstones, and the id map. The delta vectors are *not* stored:
498/// the delta ids are implied as `[base_row_count, int_to_ext.len())` and their
499/// vectors re-fetched from the store on open, so the blob stays O(delta ids), not
500/// O(N) vectors. A decode/version/validation error means "rebuild from the store".
501#[derive(Serialize, Deserialize)]
502struct DiskEnvelope {
503    version: u16,
504    int_to_ext: Vec<String>,
505    base_row_count: u64,
506    deleted_ids: Vec<u64>,
507}
508
509struct CollectionHandle {
510    id: CollectionId,
511    descriptor: Descriptor,
512    index: CollectionIndex,
513    int_to_ext: Vec<String>,
514    ext_to_int: HashMap<String, u64>,
515    stale: bool,
516    // Monotonic per-collection write counter, bumped every time a write defers a
517    // rebuild (`mark_stale`). An off-lock rebuild (ADR-0062) captures it before
518    // building and re-checks it at commit: if it advanced, a write landed during
519    // the build, so the freshly built index is already behind — the commit installs
520    // it (still newer than the prior snapshot) but leaves the handle stale so the
521    // next rebuild catches up. Wrapping is fine: equality, not ordering, is what the
522    // commit checks.
523    write_gen: u64,
524    // For a multi-vector (ColBERT) collection: each document id mapped to its
525    // token count, so a re-rank can gather all of a document's token rows
526    // (`<doc-id><US><ordinal>`) and `document_count` is O(1). `None` for a
527    // single-vector collection. Maintained eagerly on document writes and rebuilt
528    // authoritatively from the store on open / rebuild; never persisted (ADR-0028).
529    docs: Option<BTreeMap<String, u32>>,
530    // Derived inverted index over the collection's `__quiver_sparse__` payloads,
531    // for the sparse half of hybrid search (ADR-0045). `Some` for a single-vector,
532    // server-searchable collection; `None` for multi-vector and client-side-encrypted
533    // collections (which never run hybrid search) — and as a backstop, when `None`
534    // the sparse ranking falls back to a full store scan. Built on rebuild from the
535    // store and maintained incrementally on upsert/delete; never persisted.
536    sparse: Option<SparseInvertedIndex>,
537    // Whether this collection is served via the lock-free MVCC snapshot (ADR-0064);
538    // mirrors the database-wide flag, set at creation and by `set_mvcc_reads`. When
539    // off, the `snapshot` cell is never republished and the in-place RwLock read
540    // path is used — zero overhead.
541    mvcc: bool,
542    // The lock-free serving snapshot cell (ADR-0064). Initialized empty; the writer
543    // publishes a real base at each rebuild and an extended overlay per write when
544    // `mvcc` is on. Never persisted (rebuilt on open).
545    snapshot: SnapshotCell,
546}
547
548// Whether a collection should carry a derived sparse inverted index: only
549// single-vector, server-searchable collections run hybrid search (ADR-0045).
550fn uses_sparse_index(descriptor: &Descriptor) -> bool {
551    !descriptor.multivector && descriptor.vector_encryption != VectorEncryption::ClientSide
552}
553
554// Mark a collection's index stale: a write the index could not absorb in place
555// defers a full rebuild. Also bumps the write generation (every staleness-marking
556// write is a write).
557fn mark_stale(handle: &mut CollectionHandle) {
558    handle.stale = true;
559    bump_write_gen(handle);
560}
561
562// Bump a collection's monotonic write generation. Called on **every** write that
563// touches the store — including writes that land while the index is already stale,
564// which the incremental maintenance skips. The off-lock rebuild (ADR-0062) captures
565// this before scanning and re-checks it at commit: if it advanced, a write landed
566// during the build, so the freshly built index is already behind and the commit
567// leaves the collection stale for the next rebuild. Wrapping is fine — the commit
568// checks equality, not ordering — and over-counting is harmless (at worst one extra
569// rebuild); only *missing* a bump could lose a write.
570fn bump_write_gen(handle: &mut CollectionHandle) {
571    handle.write_gen = handle.write_gen.wrapping_add(1);
572}
573
574/// An in-process Quiver database over one data directory.
575pub struct Database {
576    store: Store,
577    collections: HashMap<String, CollectionHandle>,
578    // Lock-free MVCC reads (ADR-0064), default off. Defaulted from
579    // `QUIVER_MVCC_READS` at open and overridable via `set_mvcc_reads`. When off,
580    // the proven in-place RwLock read path is used with zero overhead.
581    mvcc: bool,
582}
583
584// Whether `QUIVER_MVCC_READS` requests lock-free MVCC reads (ADR-0064).
585fn mvcc_from_env() -> bool {
586    std::env::var("QUIVER_MVCC_READS")
587        .map(|v| matches!(v.as_str(), "1" | "true" | "TRUE" | "on" | "yes"))
588        .unwrap_or(false)
589}
590
591impl Database {
592    /// Open (creating if absent) the database at `dir` with encryption-at-rest
593    /// disabled, rebuilding each collection's index from the store.
594    pub fn open(dir: &Path) -> Result<Self> {
595        Self::from_store(Store::open(dir)?)
596    }
597
598    /// Open the database with a specific page codec — used to enable
599    /// encryption-at-rest by passing `quiver-crypto`'s AEAD codec. Mirrors
600    /// [`quiver_core::Store::open_with_codec`]; the codec seals both paged files
601    /// and the WAL, so no plaintext user data reaches the disk.
602    pub fn open_with_codec(dir: &Path, codec: Box<dyn PageCodec>) -> Result<Self> {
603        Self::from_store(Store::open_with_codec(dir, codec)?)
604    }
605
606    /// Open the database with a [`KeyRing`], the seam that lets `quiver-crypto`'s
607    /// envelope key-ring seal each collection under its own data-encryption key
608    /// (enabling crypto-shredding). Mirrors
609    /// [`quiver_core::Store::open_with_keyring`].
610    pub fn open_with_keyring(dir: &Path, keyring: Box<dyn KeyRing>) -> Result<Self> {
611        Self::from_store(Store::open_with_keyring(dir, keyring)?)
612    }
613
614    // Build the in-memory handles (and their HNSW indexes) over an opened store.
615    fn from_store(store: Store) -> Result<Self> {
616        let mvcc = mvcc_from_env();
617        let mut collections = HashMap::new();
618        for name in store.collection_names() {
619            let Some(id) = store.collection_id(&name) else {
620                continue;
621            };
622            let Some(descriptor) = store.descriptor(id).cloned() else {
623                continue;
624            };
625            let snapshot = empty_snapshot(&descriptor);
626            let mut handle = CollectionHandle {
627                id,
628                index: empty_index(&descriptor),
629                descriptor,
630                int_to_ext: Vec::new(),
631                ext_to_int: HashMap::new(),
632                stale: true,
633                write_gen: 0,
634                docs: None,
635                // Populated by `load_index` / `rebuild_index` from the store.
636                sparse: None,
637                mvcc,
638                snapshot,
639            };
640            load_index(&store, &mut handle)?;
641            // MVCC mode: the lock-free base lives in the snapshot, which `load_index`
642            // does not populate. Force a rebuild on first read so the base is
643            // published (a restored durable IVF would otherwise leave the snapshot
644            // empty); MVCC trades the durable-index fast reopen for the snapshot.
645            if mvcc_served(&handle) {
646                handle.stale = true;
647            }
648            collections.insert(name, handle);
649        }
650        Ok(Self {
651            store,
652            collections,
653            mvcc,
654        })
655    }
656
657    /// Create a collection. Errors if the name already exists, or if the index
658    /// specification is unsupported for the metric.
659    pub fn create_collection(&mut self, name: &str, descriptor: Descriptor) -> Result<()> {
660        validate_index(&descriptor)?;
661        let id = self.store.create_collection(name, descriptor.clone())?;
662        let index = empty_index(&descriptor);
663        let docs = descriptor.multivector.then(BTreeMap::new);
664        // A fresh single-vector collection starts with an empty inverted index
665        // maintained incrementally from the first upsert (an empty index allocates
666        // nothing until a sparse vector arrives).
667        let sparse = uses_sparse_index(&descriptor).then(SparseInvertedIndex::new);
668        let snapshot = empty_snapshot(&descriptor);
669        let mvcc = self.mvcc;
670        self.collections.insert(
671            name.to_owned(),
672            CollectionHandle {
673                id,
674                descriptor,
675                index,
676                int_to_ext: Vec::new(),
677                ext_to_int: HashMap::new(),
678                stale: false,
679                write_gen: 0,
680                docs,
681                sparse,
682                mvcc,
683                snapshot,
684            },
685        );
686        Ok(())
687    }
688
689    /// Drop a collection and its data. Returns whether it existed.
690    pub fn drop_collection(&mut self, name: &str) -> Result<bool> {
691        let existed = self.store.drop_collection(name)?;
692        self.collections.remove(name);
693        Ok(existed)
694    }
695
696    /// Crypto-shred a collection: drop it and destroy its data-encryption key, so
697    /// its sealed data is unrecoverable even with the master key, then reclaim
698    /// its files. Mirrors [`quiver_core::Store::shred_collection`]; with an
699    /// envelope key-ring this is irreversible erasure, with a single-codec
700    /// key-ring it is `drop` plus a checkpoint. Returns whether it existed.
701    pub fn shred_collection(&mut self, name: &str) -> Result<bool> {
702        let existed = self.store.shred_collection(name)?;
703        self.collections.remove(name);
704        Ok(existed)
705    }
706
707    /// Install a replication commit observer, invoked with each committed
708    /// [`WalEntry`] in commit order (ADR-0030). The server uses this to drive a
709    /// leader's replication stream.
710    pub fn set_commit_observer(&mut self, observer: CommitObserver) {
711        self.store.set_commit_observer(observer);
712    }
713
714    /// The operations that recreate the current logical state, for a replication
715    /// follower to bootstrap from (ADR-0030).
716    ///
717    /// # Errors
718    /// Propagates a store read error.
719    pub fn replication_snapshot(&self) -> Result<Vec<WalOp>> {
720        Ok(self.store.replication_snapshot()?)
721    }
722
723    /// Apply a replicated operation from a leader (ADR-0030): persist and apply it
724    /// to the store (preserving the leader's collection id), then reconcile the
725    /// in-memory index handles — register a new collection, drop a removed one, or
726    /// mark a touched collection's index stale so the next read rebuilds from the
727    /// replicated state.
728    ///
729    /// # Errors
730    /// Propagates a store apply error.
731    pub fn apply_replicated(&mut self, op: WalOp) -> Result<()> {
732        let target = match &op {
733            WalOp::CreateCollection { collection_id, .. }
734            | WalOp::DropCollection { collection_id }
735            | WalOp::Upsert { collection_id, .. }
736            | WalOp::Delete { collection_id, .. } => Some(*collection_id),
737            WalOp::Checkpoint { .. } => None,
738        };
739        let create_name = match &op {
740            WalOp::CreateCollection { name, .. } => Some(name.clone()),
741            _ => None,
742        };
743        let is_drop = matches!(op, WalOp::DropCollection { .. });
744        self.store.apply_replicated(op)?;
745
746        if let Some(name) = create_name {
747            // Register a fresh handle for the newly replicated collection.
748            if let Some(id) = target
749                && let Some(descriptor) = self.store.descriptor(id).cloned()
750            {
751                let docs = descriptor.multivector.then(BTreeMap::new);
752                let index = empty_index(&descriptor);
753                let snapshot = empty_snapshot(&descriptor);
754                let mvcc = self.mvcc;
755                // Replicated writes mark the handle stale, so the next read rebuilds
756                // the inverted index from the replicated store.
757                self.collections.insert(
758                    name,
759                    CollectionHandle {
760                        id,
761                        descriptor,
762                        index,
763                        int_to_ext: Vec::new(),
764                        ext_to_int: HashMap::new(),
765                        stale: false,
766                        write_gen: 0,
767                        docs,
768                        sparse: None,
769                        mvcc,
770                        snapshot,
771                    },
772                );
773            }
774        } else if is_drop {
775            if let Some(id) = target {
776                self.collections.retain(|_, h| h.id != id);
777            }
778        } else if let Some(id) = target
779            && let Some(handle) = self.collections.values_mut().find(|h| h.id == id)
780        {
781            mark_stale(handle);
782        }
783        Ok(())
784    }
785
786    /// Names of all collections, sorted.
787    #[must_use]
788    pub fn collection_names(&self) -> Vec<String> {
789        self.store.collection_names()
790    }
791
792    /// The descriptor of a collection, if it exists.
793    #[must_use]
794    pub fn descriptor(&self, name: &str) -> Option<&Descriptor> {
795        self.collections.get(name).map(|h| &h.descriptor)
796    }
797
798    /// Number of live points in a collection.
799    pub fn len(&self, name: &str) -> Result<usize> {
800        let handle = self.handle(name)?;
801        Ok(self.store.len(handle.id)?)
802    }
803
804    /// Whether a collection has no points.
805    pub fn is_empty(&self, name: &str) -> Result<bool> {
806        Ok(self.len(name)? == 0)
807    }
808
809    /// Insert or replace a point with a JSON payload.
810    pub fn upsert(
811        &mut self,
812        collection: &str,
813        id: &str,
814        vector: &[f32],
815        payload: &Value,
816    ) -> Result<()> {
817        let handle = self
818            .collections
819            .get_mut(collection)
820            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
821        require_single_vector(handle)?;
822        let payload_bytes = serde_json::to_vec(payload)?;
823        self.store.upsert(handle.id, id, vector, &payload_bytes)?;
824        // Client-side-encrypted collections have no server-side index to maintain
825        // (ADR-0032): the stored vector is an opaque placeholder the server never
826        // ranks. The point is durable in the store; nothing else to do.
827        if handle.descriptor.vector_encryption == VectorEncryption::ClientSide {
828            return Ok(());
829        }
830        // Maintain the in-memory index in place where the kind allows it, else
831        // defer to a lazy rebuild on the next search (ADR-0023/0026/0033).
832        index_upsert_point(handle, id, vector)?;
833        // Keep the derived sparse inverted index in step (ADR-0045).
834        sparse_index_upsert_point(handle, id, payload);
835        Ok(())
836    }
837
838    /// Upsert a batch of points with a single WAL `fdatasync` (ADR-0038).
839    ///
840    /// `points` is `(id, vector, payload)` tuples.  The batch is committed
841    /// atomically — all points or none (from the client's perspective).  This
842    /// is the preferred path for the REST `POST /v1/collections/{c}/points`
843    /// handler which already delivers a batch per HTTP request.
844    pub fn upsert_batch(
845        &mut self,
846        collection: &str,
847        points: &[(&str, &[f32], &serde_json::Value)],
848    ) -> Result<u64> {
849        let handle = self
850            .collections
851            .get(collection)
852            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
853        require_single_vector(handle)?;
854        let coll_id = handle.id;
855        let is_client_side = handle.descriptor.vector_encryption == VectorEncryption::ClientSide;
856
857        let payload_bytes: Vec<Vec<u8>> = points
858            .iter()
859            .map(|(_, _, p)| serde_json::to_vec(p).map_err(Error::Json))
860            .collect::<Result<_>>()?;
861
862        let records: Vec<(&str, &[f32], &[u8])> = points
863            .iter()
864            .zip(payload_bytes.iter())
865            .map(|((id, vec, _), p)| (*id, *vec, p.as_slice()))
866            .collect();
867
868        self.store.upsert_batch(coll_id, &records)?;
869
870        if is_client_side {
871            return Ok(records.len() as u64);
872        }
873
874        for (id, vector, payload) in points {
875            let handle = self
876                .collections
877                .get_mut(collection)
878                .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
879            index_upsert_point(handle, id, vector)?;
880            sparse_index_upsert_point(handle, id, payload);
881        }
882        Ok(records.len() as u64)
883    }
884
885    /// Upsert a large batch for a bulk load, deferring all index work to a single
886    /// rebuild pass (ADR-0045).
887    ///
888    /// Like [`upsert_batch`](Self::upsert_batch) the points are committed with one
889    /// WAL `fdatasync`, but instead of folding each point into the in-memory index
890    /// one at a time, the collection's index is marked **stale** so the next search
891    /// rebuilds it in a single pass over the whole collection — far cheaper for a
892    /// fresh load (one k-means for IVF, one graph build for Vamana, one inverted-index
893    /// scan) than N incremental inserts. Prefer `upsert_batch` for steady-state
894    /// writes where query-after-write latency matters.
895    pub fn upsert_bulk(
896        &mut self,
897        collection: &str,
898        points: &[(&str, &[f32], &serde_json::Value)],
899    ) -> Result<u64> {
900        let handle = self
901            .collections
902            .get_mut(collection)
903            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
904        require_single_vector(handle)?;
905        let coll_id = handle.id;
906
907        let payload_bytes: Vec<Vec<u8>> = points
908            .iter()
909            .map(|(_, _, p)| serde_json::to_vec(p).map_err(Error::Json))
910            .collect::<Result<_>>()?;
911        let records: Vec<(&str, &[f32], &[u8])> = points
912            .iter()
913            .zip(payload_bytes.iter())
914            .map(|((id, vec, _), p)| (*id, *vec, p.as_slice()))
915            .collect();
916
917        self.store.upsert_batch(coll_id, &records)?;
918
919        // Defer the dense and sparse index maintenance to a single rebuild on the
920        // next read (a client-side-encrypted collection has no index to rebuild, but
921        // marking it stale is harmless — its rebuild produces the same no-op index).
922        let handle = self
923            .collections
924            .get_mut(collection)
925            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
926        mark_stale(handle);
927        Ok(records.len() as u64)
928    }
929
930    /// Delete a point by id. Returns whether it existed.
931    pub fn delete(&mut self, collection: &str, id: &str) -> Result<bool> {
932        let handle = self
933            .collections
934            .get_mut(collection)
935            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
936        require_single_vector(handle)?;
937        let existed = self.store.delete(handle.id, id)?;
938        if !existed {
939            return Ok(false);
940        }
941        // No server-side index to update for client-side-encrypted collections
942        // (ADR-0032); the store delete is authoritative.
943        if handle.descriptor.vector_encryption == VectorEncryption::ClientSide {
944            return Ok(true);
945        }
946        // A built IVF removes in place (ADR-0023), a built HNSW soft-deletes
947        // (ADR-0026), and a built FreshDiskANN graph tombstones in its deletion set
948        // (ADR-0033); other kinds defer to a rebuild. The id->internal mapping is
949        // kept so a later re-insert allocates afresh — a removed or soft-deleted
950        // internal is simply never returned by the index.
951        index_delete_point(handle, id);
952        // Drop the point from the derived sparse inverted index too (ADR-0045).
953        sparse_index_delete_point(handle, id);
954        Ok(true)
955    }
956
957    /// Fetch a single point by id, with its payload and vector.
958    pub fn get(&self, collection: &str, id: &str) -> Result<Option<Match>> {
959        let handle = self.handle(collection)?;
960        require_single_vector(handle)?;
961        match self.store.get(handle.id, id)? {
962            Some(record) => Ok(Some(Match {
963                id: id.to_owned(),
964                score: 0.0,
965                payload: Some(serde_json::from_slice(&record.payload)?),
966                vector: Some(record.vector),
967            })),
968            None => Ok(None),
969        }
970    }
971
972    /// Fetch points without ranking — an optional cleartext payload `filter`
973    /// narrows the set and `limit` bounds it. This is the retrieval path for a
974    /// client-side-encrypted collection (ADR-0032): the server returns the entitled
975    /// set (each point's payload carries the sealed vector blob under the reserved
976    /// `__quiver_vec__` key) and the client decrypts and ranks locally. It also
977    /// serves as a general "list points" primitive for any single-vector collection.
978    ///
979    /// Results come in the store's scan order, not by relevance; the filter is
980    /// re-checked exactly against each candidate (a selective filter could use the
981    /// secondary index in future — today it scans).
982    ///
983    /// # Errors
984    /// Errors if the collection does not exist or is multi-vector.
985    pub fn fetch(
986        &self,
987        collection: &str,
988        filter: Option<&Filter>,
989        limit: usize,
990        with_payload: bool,
991        with_vector: bool,
992    ) -> Result<Vec<Match>> {
993        let handle = self.handle(collection)?;
994        require_single_vector(handle)?;
995        let mut out = Vec::new();
996        for (id, record) in self.store.scan(handle.id)? {
997            if out.len() >= limit {
998                break;
999            }
1000            let payload: Value = serde_json::from_slice(&record.payload)?;
1001            if let Some(filter) = filter
1002                && !filter.matches(&payload)
1003            {
1004                continue;
1005            }
1006            out.push(Match {
1007                id,
1008                score: 0.0,
1009                payload: with_payload.then_some(payload),
1010                vector: with_vector.then_some(record.vector),
1011            });
1012        }
1013        Ok(out)
1014    }
1015
1016    /// Rebuild a collection's index if a prior write deferred it (the `stale`
1017    /// flag), making the collection's read snapshot current. Idempotent and cheap
1018    /// when already fresh. Separating this `&mut self` maintenance from the `&self`
1019    /// `*_snapshot` reads is what lets a server serve concurrent reads behind a
1020    /// shared lock and take the exclusive lock only for the rare rebuild (ADR-0057).
1021    pub fn ensure_indexed(&mut self, collection: &str) -> Result<()> {
1022        if self.handle(collection)?.stale {
1023            let store = &self.store;
1024            let handle = self
1025                .collections
1026                .get_mut(collection)
1027                .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
1028            rebuild_index(store, handle)?;
1029            // MVCC mode: publish the freshly rebuilt base as the new lock-free
1030            // snapshot (an empty overlay; the base now absorbs all prior writes).
1031            // The sync rebuild holds `&mut self`, so no write races it.
1032            if mvcc_served(handle) {
1033                publish_base(handle);
1034            }
1035        }
1036        Ok(())
1037    }
1038
1039    /// Enable or disable lock-free MVCC reads (ADR-0064) at runtime — the server
1040    /// sets this from `QUIVER_MVCC_READS`. Default off: the proven RwLock read path
1041    /// stays the default until MVCC is loom- and benchmark-validated (increments
1042    /// 2–3). Applies to every loaded collection; a collection's first rebuild after
1043    /// enabling publishes its base snapshot.
1044    pub fn set_mvcc_reads(&mut self, on: bool) {
1045        self.mvcc = on;
1046        for handle in self.collections.values_mut() {
1047            handle.mvcc = on;
1048            // A toggle either direction forces a rebuild on the next read of an
1049            // eligible collection: turning on publishes the base snapshot; turning
1050            // off repopulates the in-place `handle.index` that MVCC mode left empty.
1051            if mvcc_eligible(&handle.descriptor) {
1052                handle.stale = true;
1053            }
1054        }
1055    }
1056
1057    /// Whether lock-free MVCC reads are enabled (ADR-0064).
1058    #[must_use]
1059    pub fn mvcc_reads(&self) -> bool {
1060        self.mvcc
1061    }
1062
1063    /// The lock-free serving snapshot cell for a collection (ADR-0064), for a
1064    /// reader to `load()` without taking any lock — the basis of reads that proceed
1065    /// during writes. Only republished for an MVCC-served collection (single-vector,
1066    /// server-searchable, in-memory index, with MVCC enabled); otherwise it stays
1067    /// the initial empty snapshot and callers use the locked [`Database::search`]
1068    /// path. The cell outlives `&self`, so a reader can hold and re-`load` it.
1069    ///
1070    /// # Errors
1071    /// Returns [`Error::CollectionNotFound`] if the collection is not loaded.
1072    pub fn collection_snapshot(&self, collection: &str) -> Result<SnapshotCell> {
1073        Ok(self.handle(collection)?.snapshot.clone())
1074    }
1075
1076    /// The lock-free serving snapshot cell **only** for a collection that is
1077    /// currently MVCC-served (the flag is on and the collection is single-vector,
1078    /// server-searchable, and in-memory); otherwise `None`. A server caches the
1079    /// returned cell once and `load()`s it to serve pure-vector reads with no lock
1080    /// (ADR-0064 increment 3) — the cell self-updates as the writer republishes, so
1081    /// it never needs re-fetching under the lock.
1082    ///
1083    /// # Errors
1084    /// Returns [`Error::CollectionNotFound`] if the collection is not loaded.
1085    pub fn mvcc_cell(&self, collection: &str) -> Result<Option<SnapshotCell>> {
1086        let handle = self.handle(collection)?;
1087        Ok(mvcc_served(handle).then(|| handle.snapshot.clone()))
1088    }
1089
1090    /// Whether a collection's index is stale — a prior write deferred its rebuild.
1091    /// The server reads this to schedule an **off-lock** rebuild (ADR-0062) without
1092    /// holding the exclusive lock; embedded callers never need it (the `&mut self`
1093    /// searches rebuild synchronously via [`Database::ensure_indexed`]).
1094    pub fn needs_rebuild(&self, collection: &str) -> Result<bool> {
1095        Ok(self.handle(collection)?.stale)
1096    }
1097
1098    /// Capture everything an off-lock rebuild needs (ADR-0062): under the shared
1099    /// read lock the caller already holds, scan the collection's live rows and
1100    /// record the write generation. Returns `None` when the index is already fresh
1101    /// (nothing to rebuild). The expensive build then runs with **no lock** via
1102    /// [`RebuildInputs::build`], and [`Database::commit_rebuild`] installs it.
1103    pub fn snapshot_rebuild_inputs(&self, collection: &str) -> Result<Option<RebuildInputs>> {
1104        let handle = self.handle(collection)?;
1105        if !handle.stale {
1106            return Ok(None);
1107        }
1108        let scan = scan_collection(&self.store, handle)?;
1109        Ok(Some(RebuildInputs {
1110            collection: collection.to_owned(),
1111            descriptor: handle.descriptor.clone(),
1112            scan,
1113            write_gen: handle.write_gen,
1114        }))
1115    }
1116
1117    /// Install an index built off-lock (ADR-0062) under the brief exclusive lock the
1118    /// caller holds. Returns whether the collection is **still** stale: if a write
1119    /// landed during the build (the write generation advanced), the fresh index is
1120    /// already behind, so it is installed (still newer than the prior snapshot) but
1121    /// the handle stays stale for the next rebuild. A collection dropped or replaced
1122    /// during the build is ignored (`Ok(false)`) — the build is discarded.
1123    pub fn commit_rebuild(&mut self, rebuilt: RebuiltIndex) -> Result<bool> {
1124        let store = &self.store;
1125        let Some(handle) = self.collections.get_mut(&rebuilt.collection) else {
1126            return Ok(false);
1127        };
1128        match rebuilt.kind {
1129            RebuiltKind::Ready(index) => handle.index = *index,
1130            RebuiltKind::Disk { graph, pq } => {
1131                // Drop the prior index before sealing the artifact in place: its
1132                // `mmap` assumes an immutable file. Safe under the write lock — no
1133                // read can observe the momentary empty index.
1134                handle.index = empty_index(&handle.descriptor);
1135                let disk = write_disk_index(store, handle.id, &graph, &pq)?;
1136                handle.index = CollectionIndex::Disk(Some(FreshDiskVamana::new(disk)?));
1137            }
1138        }
1139        handle.int_to_ext = rebuilt.int_to_ext;
1140        handle.ext_to_int = rebuilt.ext_to_int;
1141        handle.docs = rebuilt.docs;
1142        handle.sparse = rebuilt.sparse;
1143        // Clear stale only if no write landed since the inputs were captured;
1144        // otherwise leave it set so the driver rebuilds again for the newer write.
1145        let still_stale = handle.write_gen != rebuilt.write_gen;
1146        handle.stale = still_stale;
1147        // MVCC mode: publish the new base as the lock-free snapshot with an empty
1148        // overlay. If a write landed during the build (`still_stale`), it is briefly
1149        // invisible to snapshot reads until the next rebuild folds it in — the same
1150        // eventual-consistency window the locked off-lock path already serves
1151        // (ADR-0062), with no double-count (the base is the sole source). The
1152        // single-writer's exclusive lock makes the swap race-free.
1153        if mvcc_served(handle) {
1154            publish_base(handle);
1155        }
1156        Ok(still_stale)
1157    }
1158
1159    /// Search a collection for the nearest points to `query`, optionally
1160    /// post-filtered by payload predicate.
1161    pub fn search(
1162        &mut self,
1163        collection: &str,
1164        query: &[f32],
1165        params: &SearchParams,
1166    ) -> Result<Vec<Match>> {
1167        // Embedded read-your-writes: rebuild a deferred index first (synchronously),
1168        // then read the now-fresh snapshot. The server instead serves the prior
1169        // snapshot and rebuilds off-lock (ADR-0062), so concurrent readers never
1170        // block on a writer's rebuild.
1171        self.ensure_indexed(collection)?;
1172        self.search_snapshot(collection, query, params)
1173    }
1174
1175    /// Search a collection's **current immutable snapshot** for the nearest points
1176    /// to `query`, optionally post-filtered by payload predicate. Takes `&self`, so
1177    /// many readers run concurrently. When a prior write deferred this collection's
1178    /// rebuild, it serves the **prior** snapshot (a snapshot-isolated, slightly stale
1179    /// read — ADR-0062/0053); the caller schedules a rebuild via
1180    /// [`Database::needs_rebuild`].
1181    pub fn search_snapshot(
1182        &self,
1183        collection: &str,
1184        query: &[f32],
1185        params: &SearchParams,
1186    ) -> Result<Vec<Match>> {
1187        require_single_vector(self.handle(collection)?)?;
1188        require_server_searchable(self.handle(collection)?)?;
1189
1190        let handle = self.handle(collection)?;
1191
1192        // MVCC mode (ADR-0064): the immutable base index lives in the published
1193        // snapshot, not `handle.index`. Serve the read from the snapshot — the
1194        // base-index search merged lock-free with the overlay of recent writes —
1195        // reusing the same pre-filter planning and store-fetch enrichment as the
1196        // locked path; only the dense candidate source differs.
1197        if mvcc_served(handle) {
1198            return self.search_snapshot_mvcc(handle, query, params);
1199        }
1200
1201        // Hybrid planning: if the filter narrows to a small, secondary-indexed
1202        // candidate set, scan those rows exactly instead of post-filtering ANN
1203        // hits — exact, and immune to the filtered-ANN recall cliff.
1204        if let Some(filter) = &params.filter
1205            && let Some(candidates) = candidate_ids(
1206                &self.store,
1207                handle.id,
1208                filter,
1209                &handle.descriptor.filterable,
1210            )?
1211            && candidates.len() <= FULL_SCAN_THRESHOLD
1212        {
1213            return self.exact_filtered_search(
1214                handle.id,
1215                &handle.descriptor,
1216                query,
1217                params,
1218                filter,
1219                &candidates,
1220            );
1221        }
1222
1223        let fetch = if params.filter.is_some() {
1224            params
1225                .k
1226                .saturating_mul(FILTER_OVERFETCH)
1227                .max(params.ef_search)
1228        } else {
1229            params.k
1230        };
1231        let raw = handle.index.search(query, fetch, params.ef_search)?;
1232
1233        let need_record = params.filter.is_some() || params.with_payload || params.with_vector;
1234        let mut out = Vec::with_capacity(params.k);
1235        for neighbor in raw {
1236            if out.len() >= params.k {
1237                break;
1238            }
1239            let Some(ext_id) = handle.int_to_ext.get(neighbor.id as usize) else {
1240                continue;
1241            };
1242            let record = if need_record {
1243                self.store.get(handle.id, ext_id)?
1244            } else {
1245                None
1246            };
1247            let payload_value: Option<Value> = match &record {
1248                Some(r) if params.filter.is_some() || params.with_payload => {
1249                    Some(serde_json::from_slice(&r.payload)?)
1250                }
1251                _ => None,
1252            };
1253            if let Some(filter) = &params.filter {
1254                let value = payload_value.as_ref().unwrap_or(&Value::Null);
1255                if !filter.matches(value) {
1256                    continue;
1257                }
1258            }
1259            out.push(Match {
1260                id: ext_id.clone(),
1261                score: neighbor.distance,
1262                payload: if params.with_payload {
1263                    payload_value
1264                } else {
1265                    None
1266                },
1267                vector: if params.with_vector {
1268                    record.map(|r| r.vector)
1269                } else {
1270                    None
1271                },
1272            });
1273        }
1274        Ok(out)
1275    }
1276
1277    // Serve `search_snapshot` for an MVCC-served collection (ADR-0064 increment 2):
1278    // the dense candidates come from the lock-free snapshot (base ⊕ overlay), and
1279    // everything else — the exact small-candidate pre-filter, post-filter, and
1280    // payload/vector enrichment — reuses the same store-only logic as the locked
1281    // path, so filtered and unfiltered reads are both exact.
1282    fn search_snapshot_mvcc(
1283        &self,
1284        handle: &CollectionHandle,
1285        query: &[f32],
1286        params: &SearchParams,
1287    ) -> Result<Vec<Match>> {
1288        // Exact pre-filter for a small, secondary-indexed candidate set (store-only,
1289        // index-independent — identical to the locked path).
1290        if let Some(filter) = &params.filter
1291            && let Some(candidates) = candidate_ids(
1292                &self.store,
1293                handle.id,
1294                filter,
1295                &handle.descriptor.filterable,
1296            )?
1297            && candidates.len() <= FULL_SCAN_THRESHOLD
1298        {
1299            return self.exact_filtered_search(
1300                handle.id,
1301                &handle.descriptor,
1302                query,
1303                params,
1304                filter,
1305                &candidates,
1306            );
1307        }
1308
1309        // Otherwise: dense candidates from the lock-free snapshot (overfetched when
1310        // filtering, to survive the post-filter), then post-filter + enrich by a
1311        // store fetch on the result ids.
1312        let fetch = if params.filter.is_some() {
1313            params
1314                .k
1315                .saturating_mul(FILTER_OVERFETCH)
1316                .max(params.ef_search)
1317        } else {
1318            params.k
1319        };
1320        let dense = handle
1321            .snapshot
1322            .load()
1323            .search(query, fetch, params.ef_search)?;
1324        let need_record = params.filter.is_some() || params.with_payload || params.with_vector;
1325        let mut out = Vec::with_capacity(params.k);
1326        for m in dense {
1327            if out.len() >= params.k {
1328                break;
1329            }
1330            let record = if need_record {
1331                self.store.get(handle.id, &m.id)?
1332            } else {
1333                None
1334            };
1335            let payload_value: Option<Value> = match &record {
1336                Some(r) if params.filter.is_some() || params.with_payload => {
1337                    Some(serde_json::from_slice(&r.payload)?)
1338                }
1339                _ => None,
1340            };
1341            if let Some(filter) = &params.filter
1342                && !filter.matches(payload_value.as_ref().unwrap_or(&Value::Null))
1343            {
1344                continue;
1345            }
1346            out.push(Match {
1347                id: m.id,
1348                score: m.score,
1349                payload: if params.with_payload {
1350                    payload_value
1351                } else {
1352                    None
1353                },
1354                vector: if params.with_vector {
1355                    record.map(|r| r.vector)
1356                } else {
1357                    None
1358                },
1359            });
1360        }
1361        Ok(out)
1362    }
1363
1364    /// Hybrid search (ADR-0043/0046): fuse up to three rankings with Reciprocal
1365    /// Rank Fusion — a dense ANN ranking, a sparse inverted-index dot-product
1366    /// ranking (`sparse_query`), and a BM25 full-text ranking (`text_query`, scored
1367    /// over the same inverted index). Any may be `None`; at least one is required,
1368    /// giving pure dense / sparse / lexical or any blend through the same path. The
1369    /// same payload `filter` is re-checked on every side, so results stay exact.
1370    /// `rrf_k0` is the RRF rank-bias constant ([`DEFAULT_RRF_K0`]).
1371    pub fn hybrid_search(
1372        &mut self,
1373        collection: &str,
1374        dense_query: Option<&[f32]>,
1375        sparse_query: Option<&SparseVector>,
1376        text_query: Option<&str>,
1377        params: &SearchParams,
1378        rrf_k0: f32,
1379    ) -> Result<Vec<Match>> {
1380        // Embedded read-your-writes: rebuild a deferred index synchronously, then
1381        // read the fresh snapshot (the server serves the prior snapshot and rebuilds
1382        // off-lock — ADR-0062).
1383        self.ensure_indexed(collection)?;
1384        self.hybrid_search_snapshot(
1385            collection,
1386            dense_query,
1387            sparse_query,
1388            text_query,
1389            params,
1390            rrf_k0,
1391        )
1392    }
1393
1394    /// Hybrid search over the collection's current immutable snapshot (`&self`, so
1395    /// readers run concurrently). When a prior write deferred the rebuild, it serves
1396    /// the **prior** snapshot (snapshot-isolated, slightly stale — ADR-0062/0053);
1397    /// the caller schedules a rebuild via [`Database::needs_rebuild`].
1398    pub fn hybrid_search_snapshot(
1399        &self,
1400        collection: &str,
1401        dense_query: Option<&[f32]>,
1402        sparse_query: Option<&SparseVector>,
1403        text_query: Option<&str>,
1404        params: &SearchParams,
1405        rrf_k0: f32,
1406    ) -> Result<Vec<Match>> {
1407        require_single_vector(self.handle(collection)?)?;
1408        require_server_searchable(self.handle(collection)?)?;
1409        if dense_query.is_none() && sparse_query.is_none() && text_query.is_none() {
1410            return Err(Error::Unsupported(
1411                "hybrid_search requires a dense query, a sparse query, or a text query",
1412            ));
1413        }
1414        let handle = self.handle(collection)?;
1415
1416        // Pull a deep-enough candidate list from each side so the fusion is
1417        // meaningful, then RRF down to `k`.
1418        let depth = params
1419            .k
1420            .saturating_mul(RRF_CANDIDATE_FACTOR)
1421            .max(MIN_RRF_CANDIDATES);
1422        let filter = params.filter.as_ref();
1423        let mut lists: Vec<Vec<String>> = Vec::new();
1424        if let Some(q) = dense_query {
1425            lists.push(self.dense_ranked_ids(handle, q, depth, params.ef_search, filter)?);
1426        }
1427        if let Some(sp) = sparse_query {
1428            lists.push(self.sparse_ranked_ids(handle, sp, depth, filter)?);
1429        }
1430        if let Some(text) = text_query {
1431            lists.push(self.bm25_ranked_ids(handle, text, depth, filter)?);
1432        }
1433        let fused = rrf_fuse(&lists, rrf_k0, params.k);
1434
1435        let mut out = Vec::with_capacity(fused.len());
1436        for (ext_id, score) in fused {
1437            let record = if params.with_payload || params.with_vector {
1438                self.store.get(handle.id, &ext_id)?
1439            } else {
1440                None
1441            };
1442            let payload = match (&record, params.with_payload) {
1443                (Some(r), true) => Some(serde_json::from_slice(&r.payload)?),
1444                _ => None,
1445            };
1446            out.push(Match {
1447                id: ext_id,
1448                score,
1449                payload,
1450                vector: if params.with_vector {
1451                    record.map(|r| r.vector)
1452                } else {
1453                    None
1454                },
1455            });
1456        }
1457        Ok(out)
1458    }
1459
1460    // Dense candidates as a ranked list of external ids (the filter re-checked),
1461    // for hybrid fusion.
1462    fn dense_ranked_ids(
1463        &self,
1464        handle: &CollectionHandle,
1465        query: &[f32],
1466        depth: usize,
1467        ef_search: usize,
1468        filter: Option<&Filter>,
1469    ) -> Result<Vec<String>> {
1470        let mut ids = Vec::new();
1471        // MVCC mode (ADR-0064): the dense candidates come from the lock-free
1472        // snapshot (base ⊕ overlay), with ext ids already resolved; the sparse/BM25
1473        // sides and the filter re-check are unchanged.
1474        if mvcc_served(handle) {
1475            for m in handle
1476                .snapshot
1477                .load()
1478                .search(query, depth, ef_search.max(depth))?
1479            {
1480                if !self.passes_filter(handle.id, &m.id, filter)? {
1481                    continue;
1482                }
1483                ids.push(m.id);
1484                if ids.len() >= depth {
1485                    break;
1486                }
1487            }
1488            return Ok(ids);
1489        }
1490        let raw = handle.index.search(query, depth, ef_search.max(depth))?;
1491        for neighbor in raw {
1492            let Some(ext_id) = handle.int_to_ext.get(neighbor.id as usize) else {
1493                continue;
1494            };
1495            if !self.passes_filter(handle.id, ext_id, filter)? {
1496                continue;
1497            }
1498            ids.push(ext_id.clone());
1499            if ids.len() >= depth {
1500                break;
1501            }
1502        }
1503        Ok(ids)
1504    }
1505
1506    // Sparse candidates as a ranked list of external ids. With the derived inverted
1507    // index present (the common case), score only the query's nonzero dimensions via
1508    // the posting lists, then re-check the filter on the ranked ids until `depth` are
1509    // filled — so low-scored rows never load a payload (ADR-0045). When the index is
1510    // absent (a not-yet-rebuilt or client-side collection), fall back to the full
1511    // store scan, which stays correct under the incremental upsert/delete path.
1512    fn sparse_ranked_ids(
1513        &self,
1514        handle: &CollectionHandle,
1515        query: &SparseVector,
1516        depth: usize,
1517        filter: Option<&Filter>,
1518    ) -> Result<Vec<String>> {
1519        if let Some(idx) = handle.sparse.as_ref() {
1520            let mut ids = Vec::new();
1521            for (ext_id, _score) in idx.search(query) {
1522                if !self.passes_filter(handle.id, &ext_id, filter)? {
1523                    continue;
1524                }
1525                ids.push(ext_id);
1526                if ids.len() >= depth {
1527                    break;
1528                }
1529            }
1530            return Ok(ids);
1531        }
1532        self.sparse_ranked_ids_by_scan(handle.id, query, depth, filter)
1533    }
1534
1535    // The store-scan fallback for [`sparse_ranked_ids`]: load every row, score its
1536    // `__quiver_sparse__` vector by dot product against the query, re-check the
1537    // filter, and return the top `depth`. O(N-rows), but correct without an index.
1538    fn sparse_ranked_ids_by_scan(
1539        &self,
1540        cid: CollectionId,
1541        query: &SparseVector,
1542        depth: usize,
1543        filter: Option<&Filter>,
1544    ) -> Result<Vec<String>> {
1545        let qmap: HashMap<u32, f32> = query
1546            .indices
1547            .iter()
1548            .copied()
1549            .zip(query.values.iter().copied())
1550            .collect();
1551        let mut scored: Vec<(f32, String)> = Vec::new();
1552        for (ext_id, record) in self.store.scan(cid)? {
1553            if record.payload.is_empty() {
1554                continue;
1555            }
1556            let Ok(value) = serde_json::from_slice::<Value>(&record.payload) else {
1557                continue;
1558            };
1559            if let Some(filter) = filter
1560                && !filter.matches(&value)
1561            {
1562                continue;
1563            }
1564            let Some(raw) = value.get(SPARSE_KEY) else {
1565                continue;
1566            };
1567            let Ok(sv) = serde_json::from_value::<SparseVector>(raw.clone()) else {
1568                continue;
1569            };
1570            let mut score = 0.0f32;
1571            for (dim, weight) in sv.indices.iter().zip(sv.values.iter()) {
1572                if let Some(qw) = qmap.get(dim) {
1573                    score += qw * weight;
1574                }
1575            }
1576            if score > 0.0 {
1577                scored.push((score, ext_id));
1578            }
1579        }
1580        scored.sort_by(|a, b| b.0.total_cmp(&a.0).then(a.1.cmp(&b.1)));
1581        Ok(scored.into_iter().take(depth).map(|(_, id)| id).collect())
1582    }
1583
1584    // BM25 full-text candidates as a ranked list of external ids (ADR-0046): tokenize
1585    // the query text into term ids and score them with BM25 over the derived inverted
1586    // index, re-checking the filter on the ranked ids until `depth` are filled. BM25
1587    // needs the index's corpus statistics, so when a collection has no inverted index
1588    // (a not-yet-rebuilt or client-side collection — neither reachable here, since
1589    // hybrid rebuilds a stale handle and rejects client-side) there is nothing to
1590    // score and the list is empty; any dense side still contributes.
1591    fn bm25_ranked_ids(
1592        &self,
1593        handle: &CollectionHandle,
1594        query_text: &str,
1595        depth: usize,
1596        filter: Option<&Filter>,
1597    ) -> Result<Vec<String>> {
1598        let Some(idx) = handle.sparse.as_ref() else {
1599            return Ok(Vec::new());
1600        };
1601        let terms = query_term_ids(query_text);
1602        let mut ids = Vec::new();
1603        for (ext_id, _score) in idx.bm25_search(&terms, BM25_K1, BM25_B) {
1604            if !self.passes_filter(handle.id, &ext_id, filter)? {
1605                continue;
1606            }
1607            ids.push(ext_id);
1608            if ids.len() >= depth {
1609                break;
1610            }
1611        }
1612        Ok(ids)
1613    }
1614
1615    // Re-check a payload filter against a row (loading its payload). `None` filter
1616    // always passes.
1617    fn passes_filter(
1618        &self,
1619        cid: CollectionId,
1620        ext_id: &str,
1621        filter: Option<&Filter>,
1622    ) -> Result<bool> {
1623        let Some(filter) = filter else {
1624            return Ok(true);
1625        };
1626        let value: Value = match self.store.get(cid, ext_id)? {
1627            Some(r) => serde_json::from_slice(&r.payload)?,
1628            None => Value::Null,
1629        };
1630        Ok(filter.matches(&value))
1631    }
1632
1633    // Exactly score `candidates` (a superset of the filter's matches that the
1634    // secondary indexes produced) against the query, re-check the full filter
1635    // for correctness, and return the top `k`. The pre-filter arm of the hybrid
1636    // planner: perfect recall over an already-narrowed set.
1637    fn exact_filtered_search(
1638        &self,
1639        cid: CollectionId,
1640        descriptor: &Descriptor,
1641        query: &[f32],
1642        params: &SearchParams,
1643        filter: &Filter,
1644        candidates: &BTreeSet<String>,
1645    ) -> Result<Vec<Match>> {
1646        let metric = to_index_metric(descriptor.metric);
1647        let mut scored: Vec<(f32, String, Value, Vec<f32>)> = Vec::new();
1648        for ext_id in candidates {
1649            let Some(record) = self.store.get(cid, ext_id)? else {
1650                continue;
1651            };
1652            let payload: Value = serde_json::from_slice(&record.payload)?;
1653            if !filter.matches(&payload) {
1654                continue;
1655            }
1656            let ordering = ordering_distance(metric, query, &record.vector);
1657            scored.push((ordering, ext_id.clone(), payload, record.vector));
1658        }
1659        scored.sort_by(|a, b| a.0.total_cmp(&b.0));
1660        scored.truncate(params.k);
1661        Ok(scored
1662            .into_iter()
1663            .map(|(ordering, id, payload, vector)| Match {
1664                id,
1665                score: report_metric(metric, ordering),
1666                payload: params.with_payload.then_some(payload),
1667                vector: params.with_vector.then_some(vector),
1668            })
1669            .collect())
1670    }
1671
1672    /// Insert or replace a multi-vector (late-interaction / ColBERT) document: its
1673    /// `vectors` are stored as a group of token rows and its `payload` once on the
1674    /// anchor token (ADR-0028). Re-upserting a document first removes the tokens a
1675    /// shorter version would leave behind, so the document is replaced cleanly.
1676    ///
1677    /// # Errors
1678    /// Errors if the collection is single-vector, the document has no vectors, a
1679    /// vector's dimensionality is wrong, or the id contains the reserved separator.
1680    pub fn upsert_document(
1681        &mut self,
1682        collection: &str,
1683        doc_id: &str,
1684        vectors: &[Vec<f32>],
1685        payload: &Value,
1686    ) -> Result<()> {
1687        let handle = self
1688            .collections
1689            .get_mut(collection)
1690            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
1691        require_multivector(handle)?;
1692        if doc_id.contains(DOC_TOKEN_SEP) {
1693            return Err(Error::Unsupported(
1694                "document id must not contain the reserved 0x1f separator",
1695            ));
1696        }
1697        if vectors.is_empty() {
1698            return Err(Error::Unsupported("a document needs at least one vector"));
1699        }
1700        let dim = handle.descriptor.dim as usize;
1701        if vectors.iter().any(|v| v.len() != dim) {
1702            return Err(Error::Unsupported(
1703                "every document vector must match the collection dimensionality",
1704            ));
1705        }
1706        // Remove only the trailing tokens a shorter re-upsert leaves behind; the
1707        // rest are overwritten by the upserts below.
1708        let previous = handle
1709            .docs
1710            .as_ref()
1711            .and_then(|d| d.get(doc_id))
1712            .copied()
1713            .unwrap_or(0) as usize;
1714        for j in vectors.len()..previous {
1715            self.store.delete(handle.id, &token_id(doc_id, j))?;
1716            // Tombstone the dropped token row in the ANN index too (ADR-0034).
1717            index_delete_point(handle, &token_id(doc_id, j));
1718        }
1719        let payload_bytes = serde_json::to_vec(payload)?;
1720        for (j, vector) in vectors.iter().enumerate() {
1721            // The payload is stored once, on the anchor token; the rest carry none.
1722            let bytes: &[u8] = if j == 0 {
1723                payload_bytes.as_slice()
1724            } else {
1725                &[]
1726            };
1727            self.store
1728                .upsert(handle.id, &token_id(doc_id, j), vector, bytes)?;
1729            // Fold the token row into the ANN index incrementally instead of
1730            // marking the whole collection stale (ADR-0034); the underlying index
1731            // consolidates by a rebuild past its own churn threshold.
1732            index_upsert_point(handle, &token_id(doc_id, j), vector)?;
1733        }
1734        if let Some(docs) = handle.docs.as_mut() {
1735            docs.insert(doc_id.to_owned(), vectors.len() as u32);
1736        }
1737        Ok(())
1738    }
1739
1740    /// Search a multi-vector collection by a set of query token vectors, ranking
1741    /// documents by MaxSim late interaction (ADR-0028). At or below the exact-scan
1742    /// threshold every document is scored exactly; above it, candidates are
1743    /// generated by nearest-neighbour search over the token pool (recall tuned by
1744    /// `ef_search`) and re-ranked exactly. An optional `filter` is applied to each
1745    /// document's payload, exactly. A document has no single vector, so `with_payload`
1746    /// returns the anchor payload and `with_vector` returns the token vectors.
1747    pub fn search_multi_vector(
1748        &mut self,
1749        collection: &str,
1750        query_tokens: &[Vec<f32>],
1751        params: &SearchParams,
1752    ) -> Result<Vec<DocumentMatch>> {
1753        // Embedded read-your-writes: rebuild a deferred index synchronously, then
1754        // read the fresh snapshot (the server serves the prior snapshot and rebuilds
1755        // off-lock — ADR-0062).
1756        self.ensure_indexed(collection)?;
1757        self.search_multi_vector_snapshot(collection, query_tokens, params)
1758    }
1759
1760    /// Multi-vector (late-interaction) search over the collection's current
1761    /// immutable snapshot (`&self`, so readers run concurrently). A small corpus is
1762    /// scored exactly; a large corpus draws candidates from the ANN index, serving
1763    /// the **prior** snapshot when a write deferred its rebuild (snapshot-isolated,
1764    /// slightly stale — ADR-0062/0053); the caller schedules the rebuild via
1765    /// [`Database::needs_rebuild`].
1766    pub fn search_multi_vector_snapshot(
1767        &self,
1768        collection: &str,
1769        query_tokens: &[Vec<f32>],
1770        params: &SearchParams,
1771    ) -> Result<Vec<DocumentMatch>> {
1772        require_multivector(self.handle(collection)?)?;
1773        let dim = self.handle(collection)?.descriptor.dim as usize;
1774        if query_tokens.is_empty() {
1775            return Ok(Vec::new());
1776        }
1777        if query_tokens.iter().any(|v| v.len() != dim) {
1778            return Err(Error::Unsupported(
1779                "every query token must match the collection dimensionality",
1780            ));
1781        }
1782
1783        let doc_count = self
1784            .handle(collection)?
1785            .docs
1786            .as_ref()
1787            .map_or(0, BTreeMap::len);
1788        let candidates: Vec<String> = if doc_count <= MULTIVECTOR_EXACT_DOC_THRESHOLD {
1789            // Exact: score every document. No ANN index needed.
1790            self.handle(collection)?
1791                .docs
1792                .as_ref()
1793                .map(|d| d.keys().cloned().collect())
1794                .unwrap_or_default()
1795        } else {
1796            // Large corpus: generate candidates from the token pool. A prior write
1797            // may have deferred the ANN index rebuild; serve the prior snapshot (the
1798            // server schedules an off-lock rebuild — ADR-0062).
1799            let handle = self.handle(collection)?;
1800            let per_token_k = params
1801                .k
1802                .saturating_mul(MULTIVECTOR_CANDIDATE_FACTOR)
1803                .max(params.ef_search);
1804            let mut set = BTreeSet::new();
1805            for token in query_tokens {
1806                for neighbor in handle.index.search(token, per_token_k, params.ef_search)? {
1807                    if let Some(ext) = handle.int_to_ext.get(neighbor.id as usize)
1808                        && let Some((doc, _)) = parse_token_id(ext)
1809                    {
1810                        set.insert(doc.to_owned());
1811                    }
1812                }
1813            }
1814            set.into_iter().collect()
1815        };
1816
1817        // Re-rank the candidate documents by exact MaxSim over all their tokens.
1818        let handle = self.handle(collection)?;
1819        let cid = handle.id;
1820        let metric = to_index_metric(handle.descriptor.metric);
1821        let mut scored: Vec<ScoredDocument> = Vec::new();
1822        for doc in &candidates {
1823            let count = handle
1824                .docs
1825                .as_ref()
1826                .and_then(|d| d.get(doc))
1827                .copied()
1828                .unwrap_or(0) as usize;
1829            let (tokens, payload) = self.gather_document(cid, doc, count)?;
1830            if tokens.is_empty() {
1831                continue;
1832            }
1833            if let Some(filter) = &params.filter {
1834                let value = payload.clone().unwrap_or(Value::Null);
1835                if !filter.matches(&value) {
1836                    continue;
1837                }
1838            }
1839            let score = max_sim(metric, query_tokens, &tokens);
1840            let vectors = params.with_vector.then_some(tokens);
1841            scored.push((score, doc.clone(), payload, vectors));
1842        }
1843        // Higher MaxSim first; ties broken by id for a deterministic order.
1844        scored.sort_by(|a, b| b.0.total_cmp(&a.0).then_with(|| a.1.cmp(&b.1)));
1845        scored.truncate(params.k);
1846        Ok(scored
1847            .into_iter()
1848            .map(|(score, id, payload, vectors)| DocumentMatch {
1849                id,
1850                score,
1851                payload: params.with_payload.then_some(payload).flatten(),
1852                vectors,
1853            })
1854            .collect())
1855    }
1856
1857    /// Fetch a multi-vector document by id: its anchor payload and, if
1858    /// `with_vectors`, its token vectors. `None` if the document does not exist.
1859    pub fn get_document(
1860        &self,
1861        collection: &str,
1862        doc_id: &str,
1863        with_vectors: bool,
1864    ) -> Result<Option<DocumentMatch>> {
1865        let handle = self.handle(collection)?;
1866        require_multivector(handle)?;
1867        let Some(&count) = handle.docs.as_ref().and_then(|d| d.get(doc_id)) else {
1868            return Ok(None);
1869        };
1870        let (tokens, payload) = self.gather_document(handle.id, doc_id, count as usize)?;
1871        if tokens.is_empty() {
1872            return Ok(None);
1873        }
1874        Ok(Some(DocumentMatch {
1875            id: doc_id.to_owned(),
1876            score: 0.0,
1877            payload,
1878            vectors: with_vectors.then_some(tokens),
1879        }))
1880    }
1881
1882    /// Delete a multi-vector document and all of its token rows. Returns whether it
1883    /// existed.
1884    pub fn delete_document(&mut self, collection: &str, doc_id: &str) -> Result<bool> {
1885        let handle = self
1886            .collections
1887            .get_mut(collection)
1888            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
1889        require_multivector(handle)?;
1890        let Some(count) = handle.docs.as_ref().and_then(|d| d.get(doc_id)).copied() else {
1891            return Ok(false);
1892        };
1893        for j in 0..count as usize {
1894            self.store.delete(handle.id, &token_id(doc_id, j))?;
1895            // Tombstone each token row in the ANN index incrementally (ADR-0034).
1896            index_delete_point(handle, &token_id(doc_id, j));
1897        }
1898        if let Some(docs) = handle.docs.as_mut() {
1899            docs.remove(doc_id);
1900        }
1901        Ok(true)
1902    }
1903
1904    /// The number of documents in a multi-vector collection. Errors if the
1905    /// collection is single-vector.
1906    pub fn document_count(&self, collection: &str) -> Result<usize> {
1907        let handle = self.handle(collection)?;
1908        require_multivector(handle)?;
1909        Ok(handle.docs.as_ref().map_or(0, BTreeMap::len))
1910    }
1911
1912    // Read a document's token vectors (in ordinal order) and its anchor payload
1913    // from the store. Missing token rows are skipped, so a torn document yields a
1914    // short token list the caller treats as empty.
1915    fn gather_document(
1916        &self,
1917        cid: CollectionId,
1918        doc_id: &str,
1919        count: usize,
1920    ) -> Result<(Vec<Vec<f32>>, Option<Value>)> {
1921        let mut tokens = Vec::with_capacity(count);
1922        let mut payload: Option<Value> = None;
1923        for j in 0..count {
1924            let Some(record) = self.store.get(cid, &token_id(doc_id, j))? else {
1925                continue;
1926            };
1927            if j == 0 && !record.payload.is_empty() {
1928                payload = Some(serde_json::from_slice(&record.payload)?);
1929            }
1930            tokens.push(record.vector);
1931        }
1932        Ok((tokens, payload))
1933    }
1934
1935    /// Flush a durable checkpoint of all collections, capturing a durable
1936    /// snapshot of each built, up-to-date IVF index (ADR-0025) so it reloads on
1937    /// open instead of rebuilding. Other index kinds, and a stale or unbuilt IVF,
1938    /// are rebuilt on open.
1939    pub fn checkpoint(&mut self) -> Result<()> {
1940        let mut snapshots: HashMap<CollectionId, Vec<u8>> = HashMap::new();
1941        for handle in self.collections.values() {
1942            if handle.stale {
1943                continue;
1944            }
1945            if let CollectionIndex::Ivf(Some(ivf)) = &handle.index {
1946                if ivf.is_empty() {
1947                    continue;
1948                }
1949                let envelope = IndexEnvelope {
1950                    version: INDEX_ENVELOPE_VERSION,
1951                    int_to_ext: handle.int_to_ext.clone(),
1952                    ivf: ivf.snapshot()?,
1953                };
1954                snapshots.insert(handle.id, postcard::to_allocvec(&envelope)?);
1955            } else if let CollectionIndex::Disk(Some(fresh)) = &handle.index {
1956                // Durable DiskVamana (ADR-0063): the base file is already on disk
1957                // (sealed at build/consolidation); seal only the tiny tie-to-live
1958                // blob. The derived sparse index is not persisted (as for IVF); on
1959                // reopen the durable load leaves it `None` and hybrid search falls
1960                // back to the store scan until the next rebuild — correct, not fast.
1961                let envelope = DiskEnvelope {
1962                    version: INDEX_ENVELOPE_VERSION,
1963                    int_to_ext: handle.int_to_ext.clone(),
1964                    base_row_count: fresh.base_len() as u64,
1965                    deleted_ids: fresh.deleted_ids(),
1966                };
1967                snapshots.insert(handle.id, postcard::to_allocvec(&envelope)?);
1968            }
1969        }
1970        self.store.checkpoint_with_index_snapshots(&snapshots)?;
1971        Ok(())
1972    }
1973
1974    /// Compact every collection with reclaimable space, merging its sealed
1975    /// segments and dropping deleted/shadowed rows. Crash-safe; a no-op for
1976    /// collections with nothing to reclaim.
1977    pub fn compact(&mut self) -> Result<()> {
1978        Ok(self.store.compact()?)
1979    }
1980
1981    /// The manifest version — the catalog generation a snapshot captures
1982    /// (ADR-0050). Surfaced as snapshot-relevant status in `database_stats`.
1983    #[must_use]
1984    pub fn manifest_version(&self) -> u64 {
1985        self.store.manifest_version()
1986    }
1987
1988    /// Best-effort total on-disk size of the data directory, in bytes — what a
1989    /// full snapshot would copy (ADR-0050). Unreadable entries are skipped.
1990    #[must_use]
1991    pub fn disk_usage_bytes(&self) -> u64 {
1992        dir_size(self.store.dir())
1993    }
1994
1995    /// Take a consistent online snapshot of the whole database into `dest`
1996    /// (which must not already exist), returning what was captured (ADR-0050).
1997    ///
1998    /// The writer lock is held for the duration: `checkpoint` seals the active
1999    /// buffer into segments and advances the WAL floor to the head, then the
2000    /// data directory is byte-copied. Opening `dest` afterwards replays an empty
2001    /// WAL tail and reconstructs the database exactly as of this call.
2002    ///
2003    /// # Errors
2004    /// [`Error::Core`] if `dest` already exists, or on any I/O error during the
2005    /// checkpoint or the copy.
2006    pub fn snapshot(&mut self, dest: &Path) -> Result<SnapshotInfo> {
2007        if dest.exists() {
2008            return Err(Error::Core(quiver_core::CoreError::AlreadyExists(
2009                dest.display().to_string(),
2010            )));
2011        }
2012        // Consistency anchor: flush to segments and advance the WAL floor so the
2013        // copied tree opens with no replay (ADR-0050).
2014        self.checkpoint()?;
2015        let (files, bytes) = copy_tree(self.store.dir(), dest)?;
2016        // ponytail: flush the snapshot root's metadata only; a backup target is
2017        // re-takeable, so per-file fsync isn't warranted. Add it if snapshots
2018        // must survive an immediate post-copy crash.
2019        let _ = std::fs::File::open(dest).and_then(|f| f.sync_all());
2020        Ok(SnapshotInfo {
2021            manifest_version: self.store.manifest_version(),
2022            files,
2023            bytes,
2024        })
2025    }
2026
2027    fn handle(&self, name: &str) -> Result<&CollectionHandle> {
2028        self.collections
2029            .get(name)
2030            .ok_or_else(|| Error::CollectionNotFound(name.to_owned()))
2031    }
2032}
2033
2034/// Restore a snapshot directory `src` (produced by [`Database::snapshot`]) into a
2035/// fresh `dest` directory, leaving it ready for the caller to open with the same
2036/// keyring/codec the snapshot was written under (ADR-0050).
2037///
2038/// `dest` must not already exist — restore never overwrites a live data
2039/// directory. The real integrity check is the caller's subsequent open (which,
2040/// for an encrypted store, is the only party holding the key); this function
2041/// only verifies the source looks like a snapshot and copies it.
2042///
2043/// # Errors
2044/// [`Error::Core`] if `dest` exists, `src` is not a snapshot (no `CURRENT`), or
2045/// on any I/O error during the copy.
2046pub fn restore_snapshot(src: &Path, dest: &Path) -> Result<SnapshotInfo> {
2047    if dest.exists() {
2048        return Err(Error::Core(quiver_core::CoreError::AlreadyExists(
2049            dest.display().to_string(),
2050        )));
2051    }
2052    if !src.join("CURRENT").exists() {
2053        return Err(Error::Core(quiver_core::CoreError::InvalidArgument(
2054            format!("{} is not a snapshot (no CURRENT)", src.display()),
2055        )));
2056    }
2057    let (files, bytes) = copy_tree(src, dest)?;
2058    Ok(SnapshotInfo {
2059        // The restored directory's manifest version is whatever the snapshot
2060        // carried; report 0 since we don't re-open here to read it (the caller's
2061        // open is authoritative). `files`/`bytes` reflect the copy.
2062        manifest_version: 0,
2063        files,
2064        bytes,
2065    })
2066}
2067
2068// Recursively copy `src` into `dst`, returning `(files, bytes)`. I/O errors are
2069// tagged with the offending path. The data directory contains only files and
2070// directories (no symlinks), so a plain walk is complete.
2071fn copy_tree(src: &Path, dst: &Path) -> Result<(u64, u64)> {
2072    std::fs::create_dir_all(dst).map_err(|e| quiver_core::CoreError::io(dst, e))?;
2073    let mut files = 0u64;
2074    let mut bytes = 0u64;
2075    for entry in std::fs::read_dir(src).map_err(|e| quiver_core::CoreError::io(src, e))? {
2076        let entry = entry.map_err(|e| quiver_core::CoreError::io(src, e))?;
2077        let from = entry.path();
2078        let to = dst.join(entry.file_name());
2079        let ft = entry
2080            .file_type()
2081            .map_err(|e| quiver_core::CoreError::io(&from, e))?;
2082        if ft.is_dir() {
2083            let (f, b) = copy_tree(&from, &to)?;
2084            files += f;
2085            bytes += b;
2086        } else {
2087            let n = std::fs::copy(&from, &to).map_err(|e| quiver_core::CoreError::io(&from, e))?;
2088            files += 1;
2089            bytes += n;
2090        }
2091    }
2092    Ok((files, bytes))
2093}
2094
2095// Best-effort recursive on-disk size (bytes) of `dir`; unreadable entries are
2096// skipped so a stats read never fails on a transient error.
2097fn dir_size(dir: &Path) -> u64 {
2098    let mut total = 0u64;
2099    let Ok(rd) = std::fs::read_dir(dir) else {
2100        return total;
2101    };
2102    for entry in rd.flatten() {
2103        let Ok(ft) = entry.file_type() else { continue };
2104        if ft.is_dir() {
2105            total += dir_size(&entry.path());
2106        } else if let Ok(meta) = entry.metadata() {
2107            total += meta.len();
2108        }
2109    }
2110    total
2111}
2112
2113// The byte separating a multi-vector document id from a token ordinal in a token
2114// row's external id (`<doc-id><US><ordinal>`): the ASCII Unit Separator, which is
2115// disallowed in user document ids (ADR-0028).
2116const DOC_TOKEN_SEP: char = '\u{1f}';
2117
2118// At or below this document count a multi-vector search scores every document
2119// exactly; above it, nearest-neighbour candidate generation over the token pool
2120// kicks in (mirrors the single-vector planner's full-scan threshold).
2121const MULTIVECTOR_EXACT_DOC_THRESHOLD: usize = 10_000;
2122
2123// Per-query-token candidate breadth for the large-corpus path: each query token
2124// retrieves about `k × this` nearest token rows before the documents are unioned.
2125const MULTIVECTOR_CANDIDATE_FACTOR: usize = 4;
2126
2127// The external id of a multi-vector document's `ordinal`-th token row.
2128fn token_id(doc_id: &str, ordinal: usize) -> String {
2129    format!("{doc_id}{DOC_TOKEN_SEP}{ordinal}")
2130}
2131
2132// Split a token row's external id back into its document id and ordinal, or `None`
2133// if it is not a token id. Splits from the right, so a document id (which cannot
2134// contain the separator) is recovered intact.
2135fn parse_token_id(ext: &str) -> Option<(&str, u32)> {
2136    let (doc, ordinal) = ext.rsplit_once(DOC_TOKEN_SEP)?;
2137    Some((doc, ordinal.parse().ok()?))
2138}
2139
2140// Reject the single-vector API on a multi-vector collection.
2141fn require_single_vector(handle: &CollectionHandle) -> Result<()> {
2142    if handle.descriptor.multivector {
2143        Err(Error::Unsupported(
2144            "collection is multi-vector; use upsert_document / search_multi_vector",
2145        ))
2146    } else {
2147        Ok(())
2148    }
2149}
2150
2151// Reject the document API on a single-vector collection.
2152fn require_multivector(handle: &CollectionHandle) -> Result<()> {
2153    if handle.descriptor.multivector {
2154        Ok(())
2155    } else {
2156        Err(Error::Unsupported(
2157            "collection is single-vector; use upsert / search",
2158        ))
2159    }
2160}
2161
2162// Reject server-side ranked search on a client-side-encrypted collection: the
2163// server holds only opaque ciphertext and cannot rank it. The client fetches the
2164// entitled set (see `Database::fetch`) and ranks locally (ADR-0032).
2165fn require_server_searchable(handle: &CollectionHandle) -> Result<()> {
2166    if handle.descriptor.vector_encryption == VectorEncryption::ClientSide {
2167        Err(Error::Unsupported(
2168            "collection is client-side encrypted; the server cannot rank opaque vectors — \
2169             fetch points and rank client-side",
2170        ))
2171    } else {
2172        Ok(())
2173    }
2174}
2175
2176fn to_index_metric(metric: DistanceMetric) -> Metric {
2177    match metric {
2178        DistanceMetric::Dot => Metric::Dot,
2179        DistanceMetric::Cosine => Metric::Cosine,
2180        DistanceMetric::L2 => Metric::L2,
2181    }
2182}
2183
2184// Reject index/metric combinations the engine cannot serve.
2185fn validate_index(descriptor: &Descriptor) -> Result<()> {
2186    // Late interaction (MaxSim) is a similarity, so multi-vector collections need a
2187    // similarity metric.
2188    if descriptor.multivector && descriptor.metric == DistanceMetric::L2 {
2189        return Err(Error::Unsupported(
2190            "multi-vector collections require a similarity metric (cosine or dot)",
2191        ));
2192    }
2193    // Client-side opaque encryption (ADR-0032) is searched by the client, not the
2194    // server, so it has no metric or index constraints — but it cannot combine with
2195    // the multi-vector document layout.
2196    if descriptor.vector_encryption == VectorEncryption::ClientSide {
2197        if descriptor.multivector {
2198            return Err(Error::Unsupported(
2199                "client-side vector encryption is not supported for multi-vector collections",
2200            ));
2201        }
2202        return Ok(());
2203    }
2204    // DCPE (ADR-0031) preserves Euclidean distance comparison; the secret scaling
2205    // changes vector norms, so cosine and dot orderings are not preserved.
2206    if descriptor.vector_encryption == VectorEncryption::Dcpe
2207        && descriptor.metric != DistanceMetric::L2
2208    {
2209        return Err(Error::Unsupported(
2210            "dcpe-encrypted collections require the l2 metric",
2211        ));
2212    }
2213    // ColBERT is a late-interaction token-pool index (ADR-0034): valid only for a
2214    // multi-vector collection (which already requires a similarity metric).
2215    if descriptor.index.kind == IndexKind::Colbert && !descriptor.multivector {
2216        return Err(Error::Unsupported(
2217            "the colbert index is only for multi-vector collections",
2218        ));
2219    }
2220    match descriptor.index.kind {
2221        IndexKind::Vamana | IndexKind::Ivf | IndexKind::DiskVamana
2222            if descriptor.metric == DistanceMetric::Dot =>
2223        {
2224            Err(Error::Unsupported(
2225                "vamana, ivf, and the disk index support l2 and cosine; use hnsw for dot",
2226            ))
2227        }
2228        _ => Ok(()),
2229    }
2230}
2231
2232// An empty index of the kind the descriptor selects. Batch kinds start unbuilt.
2233fn empty_index(descriptor: &Descriptor) -> CollectionIndex {
2234    if descriptor.vector_encryption == VectorEncryption::ClientSide {
2235        return CollectionIndex::None;
2236    }
2237    match descriptor.index.kind {
2238        IndexKind::Vamana => CollectionIndex::Vamana(None),
2239        IndexKind::DiskVamana => CollectionIndex::Disk(None),
2240        IndexKind::Ivf => CollectionIndex::Ivf(None),
2241        IndexKind::Colbert => CollectionIndex::Colbert(None),
2242        _ => CollectionIndex::Hnsw(Hnsw::new(
2243            descriptor.dim as usize,
2244            to_index_metric(descriptor.metric),
2245            HnswConfig::default(),
2246        )),
2247    }
2248}
2249
2250// A product-quantization subspace count that divides `dim`, targeting roughly
2251// eight dimensions per subspace; falls back to one whole-vector codebook.
2252fn default_pq_m(dim: usize) -> usize {
2253    let target = (dim / 8).max(1);
2254    (1..=target)
2255        .rev()
2256        .find(|&m| dim.is_multiple_of(m))
2257        .unwrap_or(1)
2258}
2259
2260// Build the descriptor's index over `ids` (internal 0..n) and their flat vectors.
2261// Fixed seed for codebook training, so a collection's disk index is reproducible.
2262const PQ_SEED: u64 = 0x5176_5044_5141_5453;
2263// The disk index artifact, overwritten in place each rebuild; the caller drops
2264// the previous handle (unmapping the file) first.
2265const DISK_INDEX_FILE: &str = "vamana.qvx";
2266
2267fn build_index(
2268    store: &Store,
2269    cid: CollectionId,
2270    descriptor: &Descriptor,
2271    ids: &[u64],
2272    flat: &[f32],
2273) -> Result<CollectionIndex> {
2274    Ok(match build_in_memory_index(descriptor, ids, flat)? {
2275        Some(index) => index,
2276        None => {
2277            let (graph, pq) = build_disk_graph_pq(descriptor, ids, flat)?;
2278            CollectionIndex::Disk(Some(FreshDiskVamana::new(write_disk_index(
2279                store, cid, &graph, &pq,
2280            )?)?))
2281        }
2282    })
2283}
2284
2285// Build the in-memory index for a collection, or `None` for the disk-resident kind
2286// whose artifact must be sealed through the store. Pure given the vectors (no
2287// store, no I/O), so the off-lock rebuild (ADR-0062) calls it without a lock; the
2288// synchronous `build_index` and disk path layer the store I/O on top.
2289fn build_in_memory_index(
2290    descriptor: &Descriptor,
2291    ids: &[u64],
2292    flat: &[f32],
2293) -> Result<Option<CollectionIndex>> {
2294    // Client-side-encrypted collections have no server-side index (ADR-0032): the
2295    // server stores opaque ciphertext it never ranks.
2296    if descriptor.vector_encryption == VectorEncryption::ClientSide {
2297        return Ok(Some(CollectionIndex::None));
2298    }
2299    let dim = descriptor.dim as usize;
2300    let metric = to_index_metric(descriptor.metric);
2301    Ok(Some(match descriptor.index.kind {
2302        IndexKind::Vamana => CollectionIndex::Vamana(Some(FreshVamana::new(Vamana::build(
2303            ids,
2304            flat,
2305            dim,
2306            metric,
2307            VamanaConfig::default(),
2308        )?)?)),
2309        // The disk-resident artifact is sealed through the store, not here.
2310        IndexKind::DiskVamana => return Ok(None),
2311        IndexKind::Ivf => {
2312            let cfg = IvfConfig {
2313                quantization: descriptor.index.pq_subspaces.map(|m| m as usize),
2314                ..IvfConfig::default()
2315            };
2316            CollectionIndex::Ivf(Some(Ivf::build(ids, flat, dim, metric, cfg)?))
2317        }
2318        IndexKind::Colbert => {
2319            // Coarse centroids scale ~√(tokens); probe a quarter of them (PLAID),
2320            // and PQ the residual with the same subspace default as the disk path.
2321            let n = ids.len();
2322            let n_centroids = ((n as f64).sqrt().ceil() as usize).clamp(1, 4096);
2323            let cfg = ColbertConfig {
2324                n_centroids,
2325                n_probe: n_centroids.div_ceil(4).clamp(1, n_centroids),
2326                pq_subspaces: descriptor
2327                    .index
2328                    .pq_subspaces
2329                    .map_or_else(|| default_pq_m(dim), |m| m as usize),
2330                seed: PQ_SEED,
2331            };
2332            CollectionIndex::Colbert(Some(ColbertIndex::build(ids, flat, dim, metric, cfg)?))
2333        }
2334        _ => {
2335            let mut h = Hnsw::new(dim, metric, HnswConfig::default());
2336            for (i, &id) in ids.iter().enumerate() {
2337                h.insert(id, &flat[i * dim..(i + 1) * dim])?;
2338            }
2339            CollectionIndex::Hnsw(h)
2340        }
2341    }))
2342}
2343
2344// Build the Vamana graph + PQ codebook for the disk-resident index. Pure (no
2345// store, no I/O), so the off-lock rebuild (ADR-0062) does the expensive graph build
2346// and PQ training without a lock; `write_disk_index` then seals the result.
2347fn build_disk_graph_pq(
2348    descriptor: &Descriptor,
2349    ids: &[u64],
2350    flat: &[f32],
2351) -> Result<(Vamana, ProductQuantizer)> {
2352    let dim = descriptor.dim as usize;
2353    let metric = to_index_metric(descriptor.metric);
2354    let graph = Vamana::build(ids, flat, dim, metric, VamanaConfig::default())?;
2355    let m = descriptor
2356        .index
2357        .pq_subspaces
2358        .map_or_else(|| default_pq_m(dim), |x| x as usize);
2359    let pq = ProductQuantizer::train(flat, ids.len(), dim, m, metric, PQ_SEED)?;
2360    Ok((graph, pq))
2361}
2362
2363// Seal a prebuilt disk artifact under the collection's index dir with the store's
2364// codec, and open it for queries. Overwrites the artifact in place, so the caller
2365// must drop any prior `DiskVamana` for this collection first (its mmap assumes an
2366// immutable file).
2367fn write_disk_index(
2368    store: &Store,
2369    cid: CollectionId,
2370    graph: &Vamana,
2371    pq: &ProductQuantizer,
2372) -> Result<DiskVamana> {
2373    let dir = store.index_dir(cid);
2374    std::fs::create_dir_all(&dir).map_err(quiver_index::DiskError::Io)?;
2375    let path = dir.join(DISK_INDEX_FILE);
2376    // Seal the index artifact with the collection's own codec (its DEK under an
2377    // envelope key-ring), so a crypto-shred of the collection also makes its index
2378    // unreadable. The same owned handle writes and then mmap-opens it.
2379    let codec = store.collection_codec_clone(cid)?;
2380    // Atomic publish (ADR-0063): write to a temp file, fsync it (disk::write does
2381    // the file sync), rename over the live name, then fsync the dir. A crash mid
2382    // write leaves the previous complete base or none — never a torn file that the
2383    // durable load path could `mmap` and serve. The caller has already dropped any
2384    // prior `DiskVamana` (its mmap assumed the old inode), so the rename is safe.
2385    let tmp = dir.join(format!("{DISK_INDEX_FILE}.tmp"));
2386    quiver_index::disk::write(&tmp, graph, pq, codec.as_ref())?;
2387    std::fs::rename(&tmp, &path).map_err(quiver_index::DiskError::Io)?;
2388    let _ = std::fs::File::open(&dir).and_then(|f| f.sync_all());
2389    open_disk_index(store, cid, codec)
2390}
2391
2392// Open a collection's sealed disk base (`vamana.qvx`) for queries, decrypting with
2393// its codec. Errors (absent/torn/wrong-codec file) propagate so the durable load
2394// path can fall back to a rebuild (ADR-0063).
2395fn open_disk_index(
2396    store: &Store,
2397    cid: CollectionId,
2398    codec: Box<dyn PageCodec>,
2399) -> Result<DiskVamana> {
2400    let path = store.index_dir(cid).join(DISK_INDEX_FILE);
2401    Ok(DiskVamana::open(&path, codec)?)
2402}
2403
2404// Load a collection's index on open: restore the durable IVF snapshot and replay
2405// the post-checkpoint tail (ADR-0025) when one is present and intact, otherwise
2406// rebuild from the store. The snapshot is only a fast path — any problem reading
2407// or restoring it falls back to the authoritative rebuild.
2408fn load_index(store: &Store, handle: &mut CollectionHandle) -> Result<()> {
2409    // Multi-vector collections always rebuild on open, so the document grouping is
2410    // derived from the live rows; the snapshot fast-paths stay single-vector.
2411    if !handle.descriptor.multivector
2412        && handle.descriptor.index.kind == IndexKind::Ivf
2413        && let Ok(Some(blob)) = store.read_index_snapshot(handle.id)
2414        && restore_ivf_snapshot(store, handle, &blob).is_ok()
2415    {
2416        return Ok(());
2417    }
2418    // Durable DiskVamana (ADR-0063): mmap the frugal base instead of an O(N)
2419    // full-RAM rebuild. Any failure falls back to rebuild, so the artifact is never
2420    // load-bearing for correctness. The derived sparse index is left `None` (as for
2421    // IVF) and hybrid falls back to the store scan until the next rebuild.
2422    // `QUIVER_DISABLE_DURABLE_DISK_INDEX` is an ops kill switch: set it to force the
2423    // (always-correct) rebuild path if the durable load is ever suspected.
2424    if !handle.descriptor.multivector
2425        && handle.descriptor.index.kind == IndexKind::DiskVamana
2426        && std::env::var_os("QUIVER_DISABLE_DURABLE_DISK_INDEX").is_none()
2427        && let Ok(Some(blob)) = store.read_index_snapshot(handle.id)
2428        && restore_disk_snapshot(store, handle, &blob).is_ok()
2429    {
2430        return Ok(());
2431    }
2432    rebuild_index(store, handle)
2433}
2434
2435// Restore a DiskVamana from its durable snapshot (ADR-0063): `mmap` the immutable
2436// base file, rebuild the in-memory FreshDiskANN delta from the store by the implied
2437// delta ids, re-apply the tombstones, then replay the post-checkpoint WAL tail. Any
2438// problem — absent/torn/mismatched base, a missing delta row — returns an error so
2439// the caller falls back to an authoritative rebuild.
2440fn restore_disk_snapshot(store: &Store, handle: &mut CollectionHandle, blob: &[u8]) -> Result<()> {
2441    let envelope: DiskEnvelope = postcard::from_bytes(blob)?;
2442    if envelope.version != INDEX_ENVELOPE_VERSION {
2443        return Err(Error::Unsupported(
2444            "unsupported disk index snapshot version",
2445        ));
2446    }
2447    let base = open_disk_index(store, handle.id, store.collection_codec_clone(handle.id)?)?;
2448    // The blob's base count must match the opened file, or the (base, blob) pair is
2449    // inconsistent (e.g. a base torn or replaced after the blob was sealed).
2450    if base.len() as u64 != envelope.base_row_count {
2451        return Err(Error::Unsupported(
2452            "disk base count disagrees with snapshot",
2453        ));
2454    }
2455    handle.ext_to_int = envelope
2456        .int_to_ext
2457        .iter()
2458        .enumerate()
2459        .map(|(i, ext)| (ext.clone(), i as u64))
2460        .collect();
2461    handle.int_to_ext = envelope.int_to_ext;
2462    let mut fresh = FreshDiskVamana::new(base)?;
2463    // Reconstruct the delta: the ids above the base were inserted since the last
2464    // consolidation; their vectors live in the store, fetched by id (a row that
2465    // died this window is simply absent and need not enter the delta).
2466    for internal in envelope.base_row_count..handle.int_to_ext.len() as u64 {
2467        let ext = &handle.int_to_ext[internal as usize];
2468        if let Some(record) = store.get(handle.id, ext)? {
2469            fresh.insert(internal, &record.vector)?;
2470        }
2471    }
2472    for id in envelope.deleted_ids {
2473        fresh.mark_deleted(id);
2474    }
2475    handle.index = CollectionIndex::Disk(Some(fresh));
2476    handle.stale = false;
2477    replay_recovery_tail(store, handle)
2478}
2479
2480// Catch a restored index up to the store's current state by replaying the
2481// post-checkpoint WAL tail through the shared incremental apply path (ADR-0025/0063):
2482// tombstones first, then active-buffer upserts, so a row shadowed this window ends
2483// with its new vector. Bounded by the checkpoint cadence, not the collection size.
2484fn replay_recovery_tail(store: &Store, handle: &mut CollectionHandle) -> Result<()> {
2485    let tail = store.recovery_tail(handle.id)?;
2486    for ext in &tail.deleted {
2487        index_delete_point(handle, ext);
2488    }
2489    for (ext, record) in tail.upserts {
2490        index_upsert_point(handle, &ext, &record.vector)?;
2491    }
2492    Ok(())
2493}
2494
2495// Restore an IVF from its snapshot envelope and catch it up to the store's
2496// current state by replaying the post-checkpoint tail (ADR-0025). Tombstoned ids
2497// are removed before the active upserts are applied, so a row shadowed this
2498// window (present in both) ends with its new vector.
2499fn restore_ivf_snapshot(store: &Store, handle: &mut CollectionHandle, blob: &[u8]) -> Result<()> {
2500    let envelope: IndexEnvelope = postcard::from_bytes(blob)?;
2501    if envelope.version != INDEX_ENVELOPE_VERSION {
2502        return Err(Error::Unsupported(
2503            "unsupported index snapshot envelope version",
2504        ));
2505    }
2506    let ivf = Ivf::restore(&envelope.ivf)?;
2507    handle.ext_to_int = envelope
2508        .int_to_ext
2509        .iter()
2510        .enumerate()
2511        .map(|(i, ext)| (ext.clone(), i as u64))
2512        .collect();
2513    handle.int_to_ext = envelope.int_to_ext;
2514    handle.index = CollectionIndex::Ivf(Some(ivf));
2515    handle.stale = false;
2516
2517    let tail = store.recovery_tail(handle.id)?;
2518    for ext in &tail.deleted {
2519        let Some(&internal) = handle.ext_to_int.get(ext) else {
2520            continue;
2521        };
2522        if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2523            ivf.remove(internal);
2524        }
2525    }
2526    for (ext, record) in tail.upserts {
2527        let internal = match handle.ext_to_int.get(&ext) {
2528            Some(&i) => i,
2529            None => {
2530                let i = handle.int_to_ext.len() as u64;
2531                handle.ext_to_int.insert(ext.clone(), i);
2532                handle.int_to_ext.push(ext);
2533                i
2534            }
2535        };
2536        if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2537            ivf.insert(internal, &record.vector)?;
2538        }
2539    }
2540    Ok(())
2541}
2542
2543// Fold one point write (`ext_id` → `vector`) into a built index incrementally,
2544// or mark the handle stale to defer to a rebuild when the kind cannot absorb it
2545// in place. HNSW absorbs a brand-new id but cannot update one (a known id falls
2546// to a rebuild); a built/trained IVF inserts or replaces any id (ADR-0023); a
2547// built FreshDiskANN graph appends to its delta and tombstones the prior copy on
2548// an update, consolidating past its churn threshold (ADR-0033). Shared by the
2549// single-vector `upsert` and each token row of a multi-vector `upsert_document`
2550// (ADR-0034). A no-op once the handle is stale (a rebuild is already pending).
2551fn index_upsert_point(handle: &mut CollectionHandle, ext_id: &str, vector: &[f32]) -> Result<()> {
2552    // MVCC mode: append to the published overlay instead of mutating the immutable
2553    // base index (ADR-0064), keeping lock-free readers race-free. Bumps the write
2554    // generation itself.
2555    if mvcc_served(handle) {
2556        overlay_upsert(handle, ext_id, vector);
2557        return Ok(());
2558    }
2559    // Every write bumps the generation, even one skipped here because a rebuild is
2560    // already pending — an in-flight off-lock rebuild must still notice it (ADR-0062).
2561    bump_write_gen(handle);
2562    if handle.stale {
2563        return Ok(());
2564    }
2565    let known = handle.ext_to_int.contains_key(ext_id);
2566    let is_hnsw = matches!(handle.index, CollectionIndex::Hnsw(_));
2567    let is_live_ivf = matches!(&handle.index, CollectionIndex::Ivf(Some(ivf)) if !ivf.is_empty());
2568    let is_live_graph = matches!(
2569        handle.index,
2570        CollectionIndex::Vamana(Some(_)) | CollectionIndex::Disk(Some(_))
2571    );
2572    let is_live_colbert = matches!(handle.index, CollectionIndex::Colbert(Some(_)));
2573    if is_hnsw && !known {
2574        let internal = handle.int_to_ext.len() as u64;
2575        if let CollectionIndex::Hnsw(h) = &mut handle.index {
2576            h.insert(internal, vector)?;
2577        }
2578        handle.ext_to_int.insert(ext_id.to_owned(), internal);
2579        handle.int_to_ext.push(ext_id.to_owned());
2580    } else if is_live_ivf {
2581        // Reuse the internal id for an in-place update; allocate a fresh, dense one
2582        // for a new id (so `int_to_ext` stays index-addressable).
2583        let internal = if known {
2584            handle.ext_to_int[ext_id]
2585        } else {
2586            let i = handle.int_to_ext.len() as u64;
2587            handle.ext_to_int.insert(ext_id.to_owned(), i);
2588            handle.int_to_ext.push(ext_id.to_owned());
2589            i
2590        };
2591        if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2592            ivf.insert(internal, vector)?;
2593        }
2594    } else if is_live_graph {
2595        // A graph cannot update a node in place: append a new delta node under a
2596        // fresh internal id and tombstone the prior copy on an update (ADR-0033).
2597        let old = handle.ext_to_int.get(ext_id).copied();
2598        let internal = handle.int_to_ext.len() as u64;
2599        let mut pending = 0.0;
2600        match &mut handle.index {
2601            CollectionIndex::Vamana(Some(fresh)) => {
2602                if let Some(o) = old {
2603                    fresh.mark_deleted(o);
2604                }
2605                fresh.insert(internal, vector)?;
2606                pending = fresh.pending_fraction();
2607            }
2608            CollectionIndex::Disk(Some(fresh)) => {
2609                if let Some(o) = old {
2610                    fresh.mark_deleted(o);
2611                }
2612                fresh.insert(internal, vector)?;
2613                pending = fresh.pending_fraction();
2614            }
2615            _ => {}
2616        }
2617        handle.ext_to_int.insert(ext_id.to_owned(), internal);
2618        handle.int_to_ext.push(ext_id.to_owned());
2619        if pending >= GRAPH_REBUILD_PENDING_FRACTION {
2620            mark_stale(handle);
2621        }
2622    } else if is_live_colbert {
2623        // ColBERT appends a new token and tombstones the prior copy on an update —
2624        // its centroids are fixed until a rebuild (ADR-0034); the deletion fraction
2625        // drives consolidation, as for HNSW.
2626        let old = handle.ext_to_int.get(ext_id).copied();
2627        let internal = handle.int_to_ext.len() as u64;
2628        let mut crowded = false;
2629        if let CollectionIndex::Colbert(Some(c)) = &mut handle.index {
2630            if let Some(o) = old {
2631                c.mark_deleted(o);
2632            }
2633            c.insert(internal, vector)?;
2634            crowded = c.deleted_fraction() >= HNSW_REBUILD_DELETED_FRACTION;
2635        }
2636        handle.ext_to_int.insert(ext_id.to_owned(), internal);
2637        handle.int_to_ext.push(ext_id.to_owned());
2638        if crowded {
2639            mark_stale(handle);
2640        }
2641    } else {
2642        mark_stale(handle);
2643    }
2644    Ok(())
2645}
2646
2647// Tombstone one point (`ext_id`) from a built index incrementally, or mark the
2648// handle stale to defer to a rebuild. A built IVF removes in place (ADR-0023), a
2649// built HNSW soft-deletes (ADR-0026), and a built FreshDiskANN graph tombstones in
2650// its deletion set (ADR-0033); each amortizes a rebuild when tombstones dominate.
2651// The id→internal mapping is kept so a later re-insert allocates afresh. Shared by
2652// the single-vector `delete` and each token row of `delete_document` (ADR-0034).
2653fn index_delete_point(handle: &mut CollectionHandle, ext_id: &str) {
2654    // MVCC mode: tombstone the id in the published overlay (ADR-0064); the base
2655    // stays immutable. Bumps the write generation itself.
2656    if mvcc_served(handle) {
2657        overlay_delete(handle, ext_id);
2658        return;
2659    }
2660    // Every write bumps the generation, even one skipped here (see `index_upsert_point`).
2661    bump_write_gen(handle);
2662    if handle.stale {
2663        return;
2664    }
2665    let internal = handle.ext_to_int.get(ext_id).copied();
2666    let live_ivf = matches!(handle.index, CollectionIndex::Ivf(Some(_)));
2667    let live_hnsw = matches!(handle.index, CollectionIndex::Hnsw(_));
2668    let live_graph = matches!(
2669        handle.index,
2670        CollectionIndex::Vamana(Some(_)) | CollectionIndex::Disk(Some(_))
2671    );
2672    let live_colbert = matches!(handle.index, CollectionIndex::Colbert(Some(_)));
2673    match internal {
2674        Some(internal) if live_ivf => {
2675            if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2676                ivf.remove(internal);
2677            }
2678        }
2679        Some(internal) if live_hnsw => {
2680            let mut crowded = false;
2681            if let CollectionIndex::Hnsw(h) = &mut handle.index {
2682                h.mark_deleted(internal as u32);
2683                crowded = h.deleted_fraction() >= HNSW_REBUILD_DELETED_FRACTION;
2684            }
2685            if crowded {
2686                mark_stale(handle);
2687            }
2688        }
2689        Some(internal) if live_graph => {
2690            let mut crowded = false;
2691            match &mut handle.index {
2692                CollectionIndex::Vamana(Some(fresh)) => {
2693                    fresh.mark_deleted(internal);
2694                    crowded = fresh.pending_fraction() >= GRAPH_REBUILD_PENDING_FRACTION;
2695                }
2696                CollectionIndex::Disk(Some(fresh)) => {
2697                    fresh.mark_deleted(internal);
2698                    crowded = fresh.pending_fraction() >= GRAPH_REBUILD_PENDING_FRACTION;
2699                }
2700                _ => {}
2701            }
2702            if crowded {
2703                mark_stale(handle);
2704            }
2705        }
2706        Some(internal) if live_colbert => {
2707            let mut crowded = false;
2708            if let CollectionIndex::Colbert(Some(c)) = &mut handle.index {
2709                c.mark_deleted(internal);
2710                crowded = c.deleted_fraction() >= HNSW_REBUILD_DELETED_FRACTION;
2711            }
2712            if crowded {
2713                mark_stale(handle);
2714            }
2715        }
2716        _ => mark_stale(handle),
2717    }
2718}
2719
2720// The product of scanning a collection's live rows: the dense id maps + contiguous
2721// vectors, the multi-vector document grouping, and the derived sparse inverted
2722// index — everything a rebuild needs from the store, owned so the build can then
2723// proceed with no store and no lock (ADR-0062).
2724struct RebuildScan {
2725    int_to_ext: Vec<String>,
2726    ext_to_int: HashMap<String, u64>,
2727    flat: Vec<f32>,
2728    docs: Option<BTreeMap<String, u32>>,
2729    sparse: Option<SparseInvertedIndex>,
2730}
2731
2732// Scan a collection's live rows into a [`RebuildScan`]. Rebuilds the derived sparse
2733// inverted index (ADR-0045) and, for a multi-vector collection, the document
2734// grouping (doc id → token count) authoritatively from those rows. `&self` on the
2735// store, so it runs under a shared read lock.
2736fn scan_collection(store: &Store, handle: &CollectionHandle) -> Result<RebuildScan> {
2737    let multivector = handle.descriptor.multivector;
2738    let mut int_to_ext = Vec::new();
2739    let mut ext_to_int = HashMap::new();
2740    let mut flat: Vec<f32> = Vec::new();
2741    let mut docs: BTreeMap<String, u32> = BTreeMap::new();
2742    // Only single-vector, server-searchable collections carry a sparse index, and
2743    // only non-empty payloads are parsed, so non-sparse collections pay nothing.
2744    let mut sparse = uses_sparse_index(&handle.descriptor).then(SparseInvertedIndex::new);
2745    for (ext_id, record) in store.scan(handle.id)? {
2746        let internal = int_to_ext.len() as u64;
2747        flat.extend_from_slice(&record.vector);
2748        if multivector && let Some((doc, _)) = parse_token_id(&ext_id) {
2749            *docs.entry(doc.to_owned()).or_insert(0) += 1;
2750        }
2751        if let Some(idx) = sparse.as_mut()
2752            && let Some(sv) = sparse_vector_from_payload(&record.payload)
2753        {
2754            idx.upsert(&ext_id, &sv);
2755        }
2756        ext_to_int.insert(ext_id.clone(), internal);
2757        int_to_ext.push(ext_id);
2758    }
2759    Ok(RebuildScan {
2760        int_to_ext,
2761        ext_to_int,
2762        flat,
2763        docs: multivector.then_some(docs),
2764        sparse,
2765    })
2766}
2767
2768// Rebuild a collection's index from the store's current live rows, synchronously
2769// under `&mut self` — the embedded/open path. The server's runtime path builds
2770// off-lock instead (ADR-0062: `snapshot_rebuild_inputs` → `build` → `commit_rebuild`).
2771fn rebuild_index(store: &Store, handle: &mut CollectionHandle) -> Result<()> {
2772    let scan = scan_collection(store, handle)?;
2773    let ids: Vec<u64> = (0..scan.int_to_ext.len() as u64).collect();
2774    // Drop the previous index before rebuilding: a disk index `mmap`s a file we
2775    // are about to overwrite in place, and the mapping assumes an immutable file.
2776    handle.index = empty_index(&handle.descriptor);
2777    handle.index = build_index(store, handle.id, &handle.descriptor, &ids, &scan.flat)?;
2778    handle.int_to_ext = scan.int_to_ext;
2779    handle.ext_to_int = scan.ext_to_int;
2780    handle.docs = scan.docs;
2781    handle.sparse = scan.sparse;
2782    handle.stale = false;
2783    Ok(())
2784}
2785
2786/// A captured, owned snapshot of everything an off-lock rebuild needs (ADR-0062):
2787/// the scanned rows, the collection's descriptor, and the write generation at
2788/// capture time. Produced under the shared read lock by
2789/// [`Database::snapshot_rebuild_inputs`]; [`RebuildInputs::build`] then constructs
2790/// the new index with no lock held.
2791pub struct RebuildInputs {
2792    collection: String,
2793    descriptor: Descriptor,
2794    scan: RebuildScan,
2795    write_gen: u64,
2796}
2797
2798// The built product of a [`RebuildInputs`], ready to install. In-memory indexes are
2799// fully built; the disk-resident artifact carries its prebuilt graph + PQ, which
2800// `commit_rebuild` seals through the store under the brief write lock (the file
2801// write must not race the prior index's `mmap`).
2802enum RebuiltKind {
2803    Ready(Box<CollectionIndex>),
2804    Disk {
2805        graph: Box<Vamana>,
2806        pq: Box<ProductQuantizer>,
2807    },
2808}
2809
2810/// A new index built off-lock from a [`RebuildInputs`], ready for
2811/// [`Database::commit_rebuild`] to install under the brief write lock (ADR-0062).
2812pub struct RebuiltIndex {
2813    collection: String,
2814    kind: RebuiltKind,
2815    int_to_ext: Vec<String>,
2816    ext_to_int: HashMap<String, u64>,
2817    docs: Option<BTreeMap<String, u32>>,
2818    sparse: Option<SparseInvertedIndex>,
2819    write_gen: u64,
2820}
2821
2822impl RebuildInputs {
2823    /// Build the new index from the captured rows with **no lock held** — the
2824    /// expensive CPU work (graph build, PQ training, HNSW insertion) of an off-lock
2825    /// rebuild (ADR-0062). The disk artifact is built in memory here; its file is
2826    /// sealed later by `commit_rebuild` under the write lock.
2827    pub fn build(self) -> Result<RebuiltIndex> {
2828        let ids: Vec<u64> = (0..self.scan.int_to_ext.len() as u64).collect();
2829        let kind = match build_in_memory_index(&self.descriptor, &ids, &self.scan.flat)? {
2830            Some(index) => RebuiltKind::Ready(Box::new(index)),
2831            None => {
2832                let (graph, pq) = build_disk_graph_pq(&self.descriptor, &ids, &self.scan.flat)?;
2833                RebuiltKind::Disk {
2834                    graph: Box::new(graph),
2835                    pq: Box::new(pq),
2836                }
2837            }
2838        };
2839        Ok(RebuiltIndex {
2840            collection: self.collection,
2841            kind,
2842            int_to_ext: self.scan.int_to_ext,
2843            ext_to_int: self.scan.ext_to_int,
2844            docs: self.scan.docs,
2845            sparse: self.scan.sparse,
2846            write_gen: self.write_gen,
2847        })
2848    }
2849}
2850
2851// Extract a point's sparse vector from its serialized payload, if it carries one
2852// under `__quiver_sparse__` and the value deserializes. An empty payload or a
2853// missing/malformed sparse vector yields `None`.
2854fn sparse_vector_from_payload(payload: &[u8]) -> Option<SparseVector> {
2855    if payload.is_empty() {
2856        return None;
2857    }
2858    let value = serde_json::from_slice::<Value>(payload).ok()?;
2859    sparse_vector_from_value(&value)
2860}
2861
2862// As [`sparse_vector_from_payload`] but over an already-parsed payload `Value`
2863// (the upsert path has the payload before it is serialized). An explicit
2864// `__quiver_sparse__` vector wins; otherwise a `__quiver_text__` string is
2865// tokenized into a term-frequency vector (ADR-0046), so a point is searchable by
2866// BM25 over text alone.
2867fn sparse_vector_from_value(payload: &Value) -> Option<SparseVector> {
2868    if let Some(raw) = payload.get(SPARSE_KEY) {
2869        return serde_json::from_value::<SparseVector>(raw.clone()).ok();
2870    }
2871    let text = payload.get(TEXT_KEY)?.as_str()?;
2872    Some(text_to_sparse(text))
2873}
2874
2875// Maintain the derived sparse inverted index for one point write (ADR-0045): index
2876// the point's sparse vector, or drop any prior entry when the new payload no longer
2877// carries one (an update that removed it). A no-op when the collection has no
2878// inverted index, or when a rebuild is pending — the rebuild repopulates the index
2879// authoritatively from the store, exactly as it does the dense index.
2880fn sparse_index_upsert_point(handle: &mut CollectionHandle, ext_id: &str, payload: &Value) {
2881    if handle.stale {
2882        return;
2883    }
2884    let Some(idx) = handle.sparse.as_mut() else {
2885        return;
2886    };
2887    match sparse_vector_from_value(payload) {
2888        Some(sv) => idx.upsert(ext_id, &sv),
2889        None => {
2890            idx.remove(ext_id);
2891        }
2892    }
2893}
2894
2895// Drop one point from the derived sparse inverted index. A no-op when the
2896// collection has no inverted index.
2897fn sparse_index_delete_point(handle: &mut CollectionHandle, ext_id: &str) {
2898    if let Some(idx) = handle.sparse.as_mut() {
2899        idx.remove(ext_id);
2900    }
2901}
2902
2903// The set of live external ids guaranteed to contain every row the `filter`
2904// accepts (a sound superset), resolved through the collection's secondary
2905// indexes. `None` means the filter cannot be narrowed with the available indexes
2906// and the caller should post-filter instead; `Some(set)` is a candidate
2907// superset, and an empty set proves no row can match.
2908//
2909// Only indexable leaves — equality, `in`, and range comparisons on declared
2910// filterable fields of a matching type — constrain the set. Everything else
2911// (negation, existence, `ne`, non-indexed fields, type mismatches) is treated as
2912// unconstrained, which keeps the result a superset; exactness is restored when
2913// the caller re-checks the full `Filter` on each surviving candidate.
2914fn candidate_ids(
2915    store: &Store,
2916    cid: CollectionId,
2917    filter: &Filter,
2918    filterable: &[FilterableField],
2919) -> Result<Option<BTreeSet<String>>> {
2920    match filter {
2921        Filter::And(subs) => {
2922            // Intersect the constrained children; an unconstrained child (None)
2923            // is dropped, which only widens the set — still a superset.
2924            let mut acc: Option<BTreeSet<String>> = None;
2925            for sub in subs {
2926                if let Some(set) = candidate_ids(store, cid, sub, filterable)? {
2927                    acc = Some(match acc {
2928                        Some(existing) => existing.intersection(&set).cloned().collect(),
2929                        None => set,
2930                    });
2931                }
2932            }
2933            Ok(acc)
2934        }
2935        Filter::Or(subs) => {
2936            // The union of the children — but one unconstrained child makes the
2937            // whole disjunction unconstrained (its rows could be anything).
2938            let mut acc = BTreeSet::new();
2939            for sub in subs {
2940                match candidate_ids(store, cid, sub, filterable)? {
2941                    Some(set) => acc.extend(set),
2942                    None => return Ok(None),
2943                }
2944            }
2945            Ok(Some(acc))
2946        }
2947        // A negation cannot be narrowed to a superset with these indexes.
2948        Filter::Not(_) => Ok(None),
2949        // A leaf: indexable ⇒ its matching ids; otherwise unconstrained.
2950        leaf => match leaf_predicate(leaf, filterable) {
2951            Some(pred) => Ok(Some(store.matching_ids(cid, &pred)?.into_iter().collect())),
2952            None => Ok(None),
2953        },
2954    }
2955}
2956
2957// Map a single filter leaf to an indexable secondary-index predicate, if its
2958// field is declared filterable and the value types line up. Boolean nodes and
2959// predicates the secondary index cannot answer (`Ne`, `Exists`) return `None`.
2960fn leaf_predicate(filter: &Filter, filterable: &[FilterableField]) -> Option<SecPredicate> {
2961    let field_type = |field: &str| {
2962        filterable
2963            .iter()
2964            .find(|f| f.path == field)
2965            .map(|f| f.field_type)
2966    };
2967    match filter {
2968        Filter::Eq { field, value } => Some(SecPredicate::Eq {
2969            field: field.clone(),
2970            value: sec_value(field_type(field)?, value)?,
2971        }),
2972        Filter::In { field, values } => {
2973            let ft = field_type(field)?;
2974            // Every value must encode, or the `in` set would be understated and
2975            // the candidate superset unsound.
2976            let values: Option<Vec<SecValue>> = values.iter().map(|v| sec_value(ft, v)).collect();
2977            Some(SecPredicate::In {
2978                field: field.clone(),
2979                values: values?,
2980            })
2981        }
2982        Filter::Lt { field, value } => {
2983            one_sided_range(field, field_type(field)?, value, false, false)
2984        }
2985        Filter::Lte { field, value } => {
2986            one_sided_range(field, field_type(field)?, value, false, true)
2987        }
2988        Filter::Gt { field, value } => {
2989            one_sided_range(field, field_type(field)?, value, true, false)
2990        }
2991        Filter::Gte { field, value } => {
2992            one_sided_range(field, field_type(field)?, value, true, true)
2993        }
2994        _ => None,
2995    }
2996}
2997
2998// Build a one-sided range predicate from a comparison leaf. `is_lower` selects
2999// whether `value` is the lower (`Gt`/`Gte`) or upper (`Lt`/`Lte`) bound;
3000// `inclusive` is that bound's inclusivity. `None` on a type mismatch.
3001fn one_sided_range(
3002    field: &str,
3003    field_type: FieldType,
3004    value: &Value,
3005    is_lower: bool,
3006    inclusive: bool,
3007) -> Option<SecPredicate> {
3008    let v = sec_value(field_type, value)?;
3009    let (lo, hi, lo_inclusive, hi_inclusive) = if is_lower {
3010        (Some(v), None, inclusive, false)
3011    } else {
3012        (None, Some(v), false, inclusive)
3013    };
3014    Some(SecPredicate::Range {
3015        field: field.to_owned(),
3016        lo,
3017        hi,
3018        lo_inclusive,
3019        hi_inclusive,
3020    })
3021}
3022
3023// Encode a filter value as a typed secondary-index value, or `None` when the
3024// JSON type does not match the field's declared type (so it cannot be
3025// pre-filtered and the planner falls back to post-filtering).
3026fn sec_value(field_type: FieldType, value: &Value) -> Option<SecValue> {
3027    match (field_type, value) {
3028        (FieldType::Keyword, Value::String(s)) => Some(SecValue::Keyword(s.clone())),
3029        (FieldType::Numeric, Value::Number(n)) => n.as_f64().map(SecValue::Numeric),
3030        _ => None,
3031    }
3032}
3033
3034#[cfg(test)]
3035mod tests {
3036    use super::*;
3037    use serde_json::json;
3038
3039    fn desc() -> Descriptor {
3040        Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
3041    }
3042
3043    fn open(dir: &Path) -> Database {
3044        Database::open(dir).unwrap()
3045    }
3046
3047    #[test]
3048    fn hybrid_search_fuses_dense_and_sparse() {
3049        let tmp = tempfile::tempdir().unwrap();
3050        let mut db = open(tmp.path());
3051        db.create_collection("kb", desc()).unwrap();
3052        // "a" is the nearest dense neighbour of the query; "b" shares the query's
3053        // sparse terms; "c" is good on neither.
3054        db.upsert(
3055            "kb",
3056            "a",
3057            &[1.0, 0.0, 0.0, 0.0],
3058            &json!({ "__quiver_sparse__": { "indices": [100], "values": [0.1] } }),
3059        )
3060        .unwrap();
3061        db.upsert(
3062            "kb",
3063            "b",
3064            &[0.0, 1.0, 0.0, 0.0],
3065            &json!({ "__quiver_sparse__": { "indices": [1, 2], "values": [5.0, 5.0] } }),
3066        )
3067        .unwrap();
3068        db.upsert(
3069            "kb",
3070            "c",
3071            &[0.0, 0.0, 0.0, 1.0],
3072            &json!({ "__quiver_sparse__": { "indices": [9], "values": [1.0] } }),
3073        )
3074        .unwrap();
3075
3076        let dense_q = [1.0, 0.0, 0.0, 0.0];
3077        let sparse_q = SparseVector {
3078            indices: vec![1, 2],
3079            values: vec![1.0, 1.0],
3080        };
3081        let params = SearchParams {
3082            k: 3,
3083            ..SearchParams::default()
3084        };
3085
3086        // Hybrid: "a" (dense) and "b" (sparse) both rank above "c" (neither).
3087        let hits = db
3088            .hybrid_search(
3089                "kb",
3090                Some(&dense_q),
3091                Some(&sparse_q),
3092                None,
3093                &params,
3094                DEFAULT_RRF_K0,
3095            )
3096            .unwrap();
3097        let ids: Vec<&str> = hits.iter().map(|m| m.id.as_str()).collect();
3098        assert!(ids.contains(&"a") && ids.contains(&"b"), "got {ids:?}");
3099        assert_eq!(ids[2], "c", "c is worst on both sides; got {ids:?}");
3100
3101        // Pure sparse: only "b" shares the query's terms.
3102        let sparse_only = db
3103            .hybrid_search("kb", None, Some(&sparse_q), None, &params, DEFAULT_RRF_K0)
3104            .unwrap();
3105        assert_eq!(sparse_only[0].id, "b");
3106
3107        // Pure dense: "a" is nearest.
3108        let dense_only = db
3109            .hybrid_search("kb", Some(&dense_q), None, None, &params, DEFAULT_RRF_K0)
3110            .unwrap();
3111        assert_eq!(dense_only[0].id, "a");
3112
3113        // Neither query is an error.
3114        assert!(
3115            db.hybrid_search("kb", None, None, None, &params, DEFAULT_RRF_K0)
3116                .is_err()
3117        );
3118    }
3119
3120    // Pure-sparse hybrid result ids, in fused order.
3121    fn sparse_ids(db: &mut Database, q: &SparseVector) -> Vec<String> {
3122        let params = SearchParams {
3123            k: 10,
3124            ..SearchParams::default()
3125        };
3126        db.hybrid_search("kb", None, Some(q), None, &params, DEFAULT_RRF_K0)
3127            .unwrap()
3128            .into_iter()
3129            .map(|m| m.id)
3130            .collect()
3131    }
3132
3133    #[test]
3134    fn sparse_index_equals_the_store_scan_fallback() {
3135        let tmp = tempfile::tempdir().unwrap();
3136        let mut db = open(tmp.path());
3137        db.create_collection("kb", desc()).unwrap();
3138        let z = [0.0f32, 0.0, 0.0, 0.0];
3139        for (id, dims, vals) in [
3140            ("a", vec![1u32, 2], vec![5.0f32, 1.0]),
3141            ("b", vec![2u32, 3], vec![3.0f32, 4.0]),
3142            ("c", vec![1u32, 3], vec![2.0f32, 2.0]),
3143            ("d", vec![9u32], vec![1.0f32]), // shares no query term
3144        ] {
3145            db.upsert(
3146                "kb",
3147                id,
3148                &z,
3149                &json!({ "__quiver_sparse__": { "indices": dims, "values": vals } }),
3150            )
3151            .unwrap();
3152        }
3153        let q = SparseVector {
3154            indices: vec![1, 2, 3],
3155            values: vec![1.0, 1.0, 1.0],
3156        };
3157
3158        // The derived index is present and used.
3159        assert!(db.collections.get("kb").unwrap().sparse.is_some());
3160        let via_index = sparse_ids(&mut db, &q);
3161        assert!(!via_index.contains(&"d".to_owned()), "d shares no term");
3162
3163        // Drop the index (not stale, so no rebuild) → the store-scan fallback runs
3164        // and must return the identical ranking.
3165        db.collections.get_mut("kb").unwrap().sparse = None;
3166        let via_scan = sparse_ids(&mut db, &q);
3167        assert_eq!(via_index, via_scan);
3168    }
3169
3170    #[test]
3171    fn sparse_index_reflects_updates_and_deletes_like_a_rebuild() {
3172        let tmp = tempfile::tempdir().unwrap();
3173        let mut db = open(tmp.path());
3174        db.create_collection("kb", desc()).unwrap();
3175        let z = [0.0f32, 0.0, 0.0, 0.0];
3176        db.upsert(
3177            "kb",
3178            "a",
3179            &z,
3180            &json!({ "__quiver_sparse__": { "indices": [1, 2], "values": [5.0, 5.0] } }),
3181        )
3182        .unwrap();
3183        db.upsert(
3184            "kb",
3185            "b",
3186            &z,
3187            &json!({ "__quiver_sparse__": { "indices": [2], "values": [3.0] } }),
3188        )
3189        .unwrap();
3190        db.upsert(
3191            "kb",
3192            "c",
3193            &z,
3194            &json!({ "__quiver_sparse__": { "indices": [1], "values": [9.0] } }),
3195        )
3196        .unwrap();
3197        // Update "a" onto a disjoint term; delete "b".
3198        db.upsert(
3199            "kb",
3200            "a",
3201            &z,
3202            &json!({ "__quiver_sparse__": { "indices": [7], "values": [1.0] } }),
3203        )
3204        .unwrap();
3205        assert!(db.delete("kb", "b").unwrap());
3206
3207        let q = SparseVector {
3208            indices: vec![1, 2],
3209            values: vec![1.0, 1.0],
3210        };
3211        // Only "c" still shares a query term ("a" moved to 7, "b" is gone).
3212        let incremental = sparse_ids(&mut db, &q);
3213        assert_eq!(incremental, vec!["c".to_owned()]);
3214
3215        // A full rebuild from the store must agree with the incremental state.
3216        db.collections.get_mut("kb").unwrap().stale = true;
3217        let rebuilt = sparse_ids(&mut db, &q);
3218        assert_eq!(incremental, rebuilt);
3219    }
3220
3221    #[test]
3222    fn sparse_index_is_rebuilt_on_reopen() {
3223        let tmp = tempfile::tempdir().unwrap();
3224        {
3225            let mut db = open(tmp.path());
3226            db.create_collection("kb", desc()).unwrap();
3227            db.upsert(
3228                "kb",
3229                "a",
3230                &[0.0, 0.0, 0.0, 0.0],
3231                &json!({ "__quiver_sparse__": { "indices": [1], "values": [1.0] } }),
3232            )
3233            .unwrap();
3234        }
3235        let mut db = open(tmp.path());
3236        assert!(db.collections.get("kb").unwrap().sparse.is_some());
3237        let q = SparseVector {
3238            indices: vec![1],
3239            values: vec![1.0],
3240        };
3241        assert_eq!(sparse_ids(&mut db, &q), vec!["a".to_owned()]);
3242    }
3243
3244    #[test]
3245    fn hybrid_sparse_honours_the_payload_filter() {
3246        let tmp = tempfile::tempdir().unwrap();
3247        let mut db = open(tmp.path());
3248        db.create_collection("kb", desc()).unwrap();
3249        let z = [0.0f32, 0.0, 0.0, 0.0];
3250        db.upsert(
3251            "kb",
3252            "a",
3253            &z,
3254            &json!({ "lang": "en", "__quiver_sparse__": { "indices": [1], "values": [5.0] } }),
3255        )
3256        .unwrap();
3257        db.upsert(
3258            "kb",
3259            "b",
3260            &z,
3261            &json!({ "lang": "fr", "__quiver_sparse__": { "indices": [1], "values": [9.0] } }),
3262        )
3263        .unwrap();
3264        let q = SparseVector {
3265            indices: vec![1],
3266            values: vec![1.0],
3267        };
3268        let params = SearchParams {
3269            k: 10,
3270            filter: Some(Filter::Eq {
3271                field: "lang".to_owned(),
3272                value: json!("en"),
3273            }),
3274            ..SearchParams::default()
3275        };
3276        let hits: Vec<String> = db
3277            .hybrid_search("kb", None, Some(&q), None, &params, DEFAULT_RRF_K0)
3278            .unwrap()
3279            .into_iter()
3280            .map(|m| m.id)
3281            .collect();
3282        // "b" scores higher but is filtered out by lang == "en".
3283        assert_eq!(hits, vec!["a".to_owned()]);
3284    }
3285
3286    #[test]
3287    fn hybrid_text_search_indexes_and_ranks_by_bm25() {
3288        let tmp = tempfile::tempdir().unwrap();
3289        let mut db = open(tmp.path());
3290        db.create_collection("kb", desc()).unwrap();
3291        let z = [0.0f32, 0.0, 0.0, 0.0];
3292        // Points carry only a `__quiver_text__` string — tokenized at ingest.
3293        db.upsert(
3294            "kb",
3295            "cats",
3296            &z,
3297            &json!({ "__quiver_text__": "the quick brown cat jumps" }),
3298        )
3299        .unwrap();
3300        db.upsert(
3301            "kb",
3302            "dogs",
3303            &z,
3304            &json!({ "__quiver_text__": "a lazy dog sleeps all day" }),
3305        )
3306        .unwrap();
3307
3308        let params = SearchParams {
3309            k: 10,
3310            ..SearchParams::default()
3311        };
3312        // A text query (no dense, no explicit sparse) ranks the lexical match first;
3313        // "cat"/"cats" conflate via the stemmer.
3314        let hits: Vec<String> = db
3315            .hybrid_search("kb", None, None, Some("cats"), &params, DEFAULT_RRF_K0)
3316            .unwrap()
3317            .into_iter()
3318            .map(|m| m.id)
3319            .collect();
3320        assert_eq!(hits, vec!["cats".to_owned()], "only the cat doc matches");
3321
3322        // A non-matching query returns nothing from the text side.
3323        assert!(
3324            db.hybrid_search("kb", None, None, Some("elephant"), &params, DEFAULT_RRF_K0)
3325                .unwrap()
3326                .is_empty()
3327        );
3328
3329        // Text fuses with a dense query through the same RRF path.
3330        let dense_q = [1.0, 0.0, 0.0, 0.0];
3331        db.upsert("kb", "near", &[1.0, 0.0, 0.0, 0.0], &json!({}))
3332            .unwrap();
3333        let fused: Vec<String> = db
3334            .hybrid_search(
3335                "kb",
3336                Some(&dense_q),
3337                None,
3338                Some("dog"),
3339                &params,
3340                DEFAULT_RRF_K0,
3341            )
3342            .unwrap()
3343            .into_iter()
3344            .map(|m| m.id)
3345            .collect();
3346        assert!(
3347            fused.contains(&"near".to_owned()) && fused.contains(&"dogs".to_owned()),
3348            "dense match + lexical match both surface; got {fused:?}"
3349        );
3350    }
3351
3352    #[test]
3353    fn create_upsert_search_get_end_to_end() {
3354        let tmp = tempfile::tempdir().unwrap();
3355        let mut db = open(tmp.path());
3356        db.create_collection("items", desc()).unwrap();
3357        db.upsert(
3358            "items",
3359            "a",
3360            &[0.0, 0.0, 0.0, 0.0],
3361            &json!({"color": "red"}),
3362        )
3363        .unwrap();
3364        db.upsert(
3365            "items",
3366            "b",
3367            &[1.0, 0.0, 0.0, 0.0],
3368            &json!({"color": "blue"}),
3369        )
3370        .unwrap();
3371        db.upsert(
3372            "items",
3373            "c",
3374            &[5.0, 5.0, 5.0, 5.0],
3375            &json!({"color": "red"}),
3376        )
3377        .unwrap();
3378
3379        let near = db
3380            .search("items", &[0.1, 0.0, 0.0, 0.0], &SearchParams::default())
3381            .unwrap();
3382        assert_eq!(near[0].id, "a");
3383        assert_eq!(near[1].id, "b");
3384
3385        let got = db.get("items", "c").unwrap().unwrap();
3386        assert_eq!(got.vector, Some(vec![5.0, 5.0, 5.0, 5.0]));
3387        assert_eq!(got.payload, Some(json!({"color": "red"})));
3388    }
3389
3390    #[test]
3391    fn upsert_batch_produces_same_search_results_as_sequential() {
3392        let tmp_seq = tempfile::tempdir().unwrap();
3393        let tmp_bat = tempfile::tempdir().unwrap();
3394
3395        let vectors: Vec<[f32; 4]> = (0..20u32).map(|i| [i as f32, 0.0, 0.0, 0.0]).collect();
3396        let ids: Vec<String> = (0..20u32).map(|i| format!("p{i}")).collect();
3397        let payload = json!({});
3398
3399        // Sequential upserts
3400        {
3401            let mut db = open(tmp_seq.path());
3402            db.create_collection("c", desc()).unwrap();
3403            for (id, vec) in ids.iter().zip(vectors.iter()) {
3404                db.upsert("c", id, vec, &payload).unwrap();
3405            }
3406        }
3407        // Batch upsert
3408        {
3409            let mut db = open(tmp_bat.path());
3410            db.create_collection("c", desc()).unwrap();
3411            let pts: Vec<(&str, &[f32], &serde_json::Value)> = ids
3412                .iter()
3413                .zip(vectors.iter())
3414                .map(|(id, v)| (id.as_str(), v.as_slice(), &payload))
3415                .collect();
3416            let n = db.upsert_batch("c", &pts).unwrap();
3417            assert_eq!(n, 20);
3418        }
3419
3420        let query = [10.0f32, 0.0, 0.0, 0.0];
3421        let params = SearchParams {
3422            k: 5,
3423            ..Default::default()
3424        };
3425
3426        let mut seq_db = open(tmp_seq.path());
3427        let mut bat_db = open(tmp_bat.path());
3428        let seq: Vec<String> = seq_db
3429            .search("c", &query, &params)
3430            .unwrap()
3431            .into_iter()
3432            .map(|m| m.id)
3433            .collect();
3434        let bat: Vec<String> = bat_db
3435            .search("c", &query, &params)
3436            .unwrap()
3437            .into_iter()
3438            .map(|m| m.id)
3439            .collect();
3440        assert_eq!(
3441            seq, bat,
3442            "batch and sequential produce different search results"
3443        );
3444    }
3445
3446    #[test]
3447    fn upsert_bulk_defers_the_index_then_searches_correctly() {
3448        let tmp = tempfile::tempdir().unwrap();
3449        let mut db = open(tmp.path());
3450        db.create_collection("c", desc()).unwrap();
3451        let vectors: Vec<[f32; 4]> = (0..20u32).map(|i| [i as f32, 0.0, 0.0, 0.0]).collect();
3452        let ids: Vec<String> = (0..20u32).map(|i| format!("p{i}")).collect();
3453        // Give one point a sparse vector so the deferred rebuild also rebuilds the
3454        // inverted index.
3455        let plain = json!({});
3456        let sparse_payload = json!({ "__quiver_sparse__": { "indices": [7], "values": [1.0] } });
3457        let pts: Vec<(&str, &[f32], &serde_json::Value)> = ids
3458            .iter()
3459            .zip(vectors.iter())
3460            .map(|(id, v)| {
3461                let payload = if id == "p3" { &sparse_payload } else { &plain };
3462                (id.as_str(), v.as_slice(), payload)
3463            })
3464            .collect();
3465        let n = db.upsert_bulk("c", &pts).unwrap();
3466        assert_eq!(n, 20);
3467
3468        // The bulk path defers index maintenance: the handle is stale until a read.
3469        assert!(db.collections.get("c").unwrap().stale);
3470
3471        // Dense search triggers the single rebuild and returns the nearest points.
3472        let query = [10.0f32, 0.0, 0.0, 0.0];
3473        let params = SearchParams {
3474            k: 5,
3475            ..Default::default()
3476        };
3477        let hits: Vec<String> = db
3478            .search("c", &query, &params)
3479            .unwrap()
3480            .into_iter()
3481            .map(|m| m.id)
3482            .collect();
3483        assert_eq!(hits[0], "p10", "nearest to 10 is p10; got {hits:?}");
3484        assert!(!db.collections.get("c").unwrap().stale, "rebuilt on read");
3485
3486        // The sparse vector loaded via the bulk path is searchable after the rebuild.
3487        let q = SparseVector {
3488            indices: vec![7],
3489            values: vec![1.0],
3490        };
3491        let sparse_hits: Vec<String> = db
3492            .hybrid_search("c", None, Some(&q), None, &params, DEFAULT_RRF_K0)
3493            .unwrap()
3494            .into_iter()
3495            .map(|m| m.id)
3496            .collect();
3497        assert_eq!(sparse_hits, vec!["p3".to_owned()]);
3498    }
3499
3500    #[test]
3501    fn filtered_search_only_returns_matching_payloads() {
3502        let tmp = tempfile::tempdir().unwrap();
3503        let mut db = open(tmp.path());
3504        db.create_collection("items", desc()).unwrap();
3505        for i in 0..20u32 {
3506            let color = if i % 2 == 0 { "red" } else { "blue" };
3507            db.upsert(
3508                "items",
3509                &format!("p{i}"),
3510                &[i as f32, 0.0, 0.0, 0.0],
3511                &json!({"color": color, "n": i}),
3512            )
3513            .unwrap();
3514        }
3515        let params = SearchParams {
3516            k: 5,
3517            filter: Some(Filter::Eq {
3518                field: "color".into(),
3519                value: json!("red"),
3520            }),
3521            ef_search: 64,
3522            with_payload: true,
3523            with_vector: false,
3524        };
3525        let results = db.search("items", &[0.0; 4], &params).unwrap();
3526        assert!(!results.is_empty());
3527        for m in &results {
3528            assert_eq!(m.payload.as_ref().unwrap()["color"], json!("red"));
3529        }
3530    }
3531
3532    #[test]
3533    fn persists_and_rebuilds_index_on_reopen() {
3534        let tmp = tempfile::tempdir().unwrap();
3535        {
3536            let mut db = open(tmp.path());
3537            db.create_collection("items", desc()).unwrap();
3538            for i in 0..50u32 {
3539                db.upsert(
3540                    "items",
3541                    &format!("p{i}"),
3542                    &[i as f32, 1.0, 2.0, 3.0],
3543                    &json!({}),
3544                )
3545                .unwrap();
3546            }
3547            db.checkpoint().unwrap();
3548        }
3549        let mut db = open(tmp.path());
3550        assert_eq!(db.len("items").unwrap(), 50);
3551        let res = db
3552            .search("items", &[7.0, 1.0, 2.0, 3.0], &SearchParams::default())
3553            .unwrap();
3554        assert_eq!(res[0].id, "p7");
3555    }
3556
3557    #[test]
3558    fn update_reflects_new_vector_after_rebuild() {
3559        let tmp = tempfile::tempdir().unwrap();
3560        let mut db = open(tmp.path());
3561        db.create_collection("items", desc()).unwrap();
3562        db.upsert("items", "a", &[0.0; 4], &json!({})).unwrap();
3563        db.upsert("items", "b", &[10.0, 0.0, 0.0, 0.0], &json!({}))
3564            .unwrap();
3565        // Move "a" far away; querying near the origin should now prefer "b".
3566        db.upsert("items", "a", &[100.0, 0.0, 0.0, 0.0], &json!({}))
3567            .unwrap();
3568        let res = db
3569            .search("items", &[0.0; 4], &SearchParams::default())
3570            .unwrap();
3571        assert_eq!(res[0].id, "b");
3572        assert_eq!(
3573            db.get("items", "a").unwrap().unwrap().vector,
3574            Some(vec![100.0, 0.0, 0.0, 0.0])
3575        );
3576    }
3577
3578    #[test]
3579    fn delete_removes_from_search() {
3580        let tmp = tempfile::tempdir().unwrap();
3581        let mut db = open(tmp.path());
3582        db.create_collection("items", desc()).unwrap();
3583        db.upsert("items", "a", &[0.0; 4], &json!({})).unwrap();
3584        db.upsert("items", "b", &[1.0, 0.0, 0.0, 0.0], &json!({}))
3585            .unwrap();
3586        assert!(db.delete("items", "a").unwrap());
3587        let res = db
3588            .search("items", &[0.0; 4], &SearchParams::default())
3589            .unwrap();
3590        assert!(res.iter().all(|m| m.id != "a"));
3591        assert!(db.get("items", "a").unwrap().is_none());
3592    }
3593
3594    #[test]
3595    fn unknown_collection_errors() {
3596        let tmp = tempfile::tempdir().unwrap();
3597        let mut db = open(tmp.path());
3598        assert!(matches!(
3599            db.search("nope", &[0.0; 4], &SearchParams::default()),
3600            Err(Error::CollectionNotFound(_))
3601        ));
3602        db.create_collection("c", desc()).unwrap();
3603        assert!(matches!(
3604            db.create_collection("c", desc()),
3605            Err(Error::Core(quiver_core::CoreError::AlreadyExists(_)))
3606        ));
3607    }
3608
3609    fn desc_with(kind: IndexKind) -> Descriptor {
3610        Descriptor::new(4, Dtype::F32, DistanceMetric::L2).with_index(IndexSpec {
3611            kind,
3612            pq_subspaces: None,
3613        })
3614    }
3615
3616    #[test]
3617    fn vamana_and_ivf_collections_find_the_nearest_point() {
3618        for kind in [IndexKind::Vamana, IndexKind::Ivf] {
3619            let tmp = tempfile::tempdir().unwrap();
3620            let mut db = open(tmp.path());
3621            db.create_collection("c", desc_with(kind)).unwrap();
3622            for i in 0..40u32 {
3623                db.upsert(
3624                    "c",
3625                    &format!("p{i}"),
3626                    &[i as f32, 0.0, 0.0, 0.0],
3627                    &json!({}),
3628                )
3629                .unwrap();
3630            }
3631            // ef_search maps onto the index's search width (l_search / nprobe).
3632            let res = db
3633                .search("c", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
3634                .unwrap();
3635            assert_eq!(res[0].id, "p7", "{kind:?} nearest");
3636        }
3637    }
3638
3639    #[test]
3640    fn index_kind_persists_and_rebuilds_on_reopen() {
3641        let tmp = tempfile::tempdir().unwrap();
3642        {
3643            let mut db = open(tmp.path());
3644            db.create_collection("v", desc_with(IndexKind::Vamana))
3645                .unwrap();
3646            for i in 0..20u32 {
3647                db.upsert(
3648                    "v",
3649                    &format!("p{i}"),
3650                    &[i as f32, 1.0, 2.0, 3.0],
3651                    &json!({}),
3652                )
3653                .unwrap();
3654            }
3655            db.checkpoint().unwrap();
3656        }
3657        let mut db = open(tmp.path());
3658        assert_eq!(db.descriptor("v").unwrap().index.kind, IndexKind::Vamana);
3659        let res = db
3660            .search("v", &[7.0, 1.0, 2.0, 3.0], &SearchParams::default())
3661            .unwrap();
3662        assert_eq!(res[0].id, "p7");
3663    }
3664
3665    // ADR-0063: a DiskVamana collection reopens by loading its frugal mmap base +
3666    // replaying the WAL tail, not by an O(N) full-RAM rebuild. Proven structurally:
3667    // the on-disk base keeps its checkpoint row count after reopen (a rebuild would
3668    // rewrite it to include the post-checkpoint inserts).
3669    #[test]
3670    fn disk_index_loads_from_snapshot_without_rebuild_on_reopen() {
3671        let tmp = tempfile::tempdir().unwrap();
3672        let cid;
3673        {
3674            let mut db = open(tmp.path());
3675            db.create_collection("d", desc_with(IndexKind::DiskVamana))
3676                .unwrap();
3677            for i in 0..100u32 {
3678                db.upsert(
3679                    "d",
3680                    &format!("p{i}"),
3681                    &[i as f32, 0.0, 0.0, 0.0],
3682                    &json!({}),
3683                )
3684                .unwrap();
3685            }
3686            // Force the deferred build so the base file + checkpoint blob exist.
3687            db.search("d", &[1.0, 0.0, 0.0, 0.0], &SearchParams::default())
3688                .unwrap();
3689            db.checkpoint().unwrap();
3690            // 15 more after the checkpoint (< the 20% consolidation threshold): they
3691            // land in the in-memory delta + the WAL tail, not the base file.
3692            for i in 100..115u32 {
3693                db.upsert(
3694                    "d",
3695                    &format!("p{i}"),
3696                    &[i as f32, 0.0, 0.0, 0.0],
3697                    &json!({}),
3698                )
3699                .unwrap();
3700            }
3701            cid = db.collections["d"].id;
3702            let base = open_disk_index(
3703                &db.store,
3704                cid,
3705                db.store.collection_codec_clone(cid).unwrap(),
3706            )
3707            .unwrap();
3708            assert_eq!(base.len(), 100, "base sealed at the checkpoint count");
3709        }
3710
3711        let mut db = open(tmp.path());
3712        // A base point and a post-checkpoint (WAL-tail-replayed) point are both found.
3713        assert_eq!(
3714            db.search("d", &[50.0, 0.0, 0.0, 0.0], &SearchParams::default())
3715                .unwrap()[0]
3716                .id,
3717            "p50",
3718        );
3719        assert_eq!(
3720            db.search("d", &[110.0, 0.0, 0.0, 0.0], &SearchParams::default())
3721                .unwrap()[0]
3722                .id,
3723            "p110",
3724            "post-checkpoint insert survived reopen via WAL-tail replay",
3725        );
3726        // Loaded, not rebuilt: the base file still holds exactly the 100 checkpoint
3727        // rows. A rebuild on open would have rewritten it over all 115 live rows.
3728        let base = open_disk_index(
3729            &db.store,
3730            cid,
3731            db.store.collection_codec_clone(cid).unwrap(),
3732        )
3733        .unwrap();
3734        assert_eq!(
3735            base.len(),
3736            100,
3737            "reopen loaded the base; it was not rebuilt"
3738        );
3739    }
3740
3741    // ADR-0063: the durable artifact is never load-bearing for correctness — a
3742    // missing/torn base falls back to an authoritative rebuild that still answers.
3743    #[test]
3744    fn disk_index_falls_back_to_rebuild_when_base_is_missing() {
3745        let tmp = tempfile::tempdir().unwrap();
3746        let base_path;
3747        {
3748            let mut db = open(tmp.path());
3749            db.create_collection("d", desc_with(IndexKind::DiskVamana))
3750                .unwrap();
3751            for i in 0..60u32 {
3752                db.upsert(
3753                    "d",
3754                    &format!("p{i}"),
3755                    &[i as f32, 0.0, 0.0, 0.0],
3756                    &json!({}),
3757                )
3758                .unwrap();
3759            }
3760            db.search("d", &[1.0, 0.0, 0.0, 0.0], &SearchParams::default())
3761                .unwrap();
3762            db.checkpoint().unwrap();
3763            let cid = db.collections["d"].id;
3764            base_path = db.store.index_dir(cid).join(DISK_INDEX_FILE);
3765        }
3766        // Simulate a lost base: remove the file the snapshot references.
3767        std::fs::remove_file(&base_path).unwrap();
3768        {
3769            let mut db = open(tmp.path());
3770            assert_eq!(
3771                db.search("d", &[25.0, 0.0, 0.0, 0.0], &SearchParams::default())
3772                    .unwrap()[0]
3773                    .id,
3774                "p25",
3775                "rebuild fallback still answers correctly after a lost base",
3776            );
3777            // The fallback rebuild re-seals the base, and a subsequent checkpoint
3778            // refreshes the snapshot blob to match it.
3779            assert!(
3780                base_path.exists(),
3781                "the fallback rebuild re-sealed the base file"
3782            );
3783            db.checkpoint().unwrap();
3784        }
3785        // Simulate a torn base: truncate the file mid-way. `DiskVamana::open`'s
3786        // per-page checks reject it, so the load falls back to a rebuild.
3787        let len = std::fs::metadata(&base_path).unwrap().len();
3788        std::fs::OpenOptions::new()
3789            .write(true)
3790            .open(&base_path)
3791            .unwrap()
3792            .set_len(len / 2)
3793            .unwrap();
3794
3795        let mut db = open(tmp.path());
3796        assert_eq!(
3797            db.search("d", &[25.0, 0.0, 0.0, 0.0], &SearchParams::default())
3798                .unwrap()[0]
3799                .id,
3800            "p25",
3801            "rebuild fallback still answers correctly after a torn base",
3802        );
3803    }
3804
3805    #[test]
3806    fn ivf_upserts_and_deletes_incrementally_without_rebuild() {
3807        let tmp = tempfile::tempdir().unwrap();
3808        let mut db = open(tmp.path());
3809        db.create_collection("c", desc_with(IndexKind::Ivf))
3810            .unwrap();
3811        for i in 0..50u32 {
3812            db.upsert(
3813                "c",
3814                &format!("p{i}"),
3815                &[i as f32, 0.0, 0.0, 0.0],
3816                &json!({}),
3817            )
3818            .unwrap();
3819        }
3820        // The first search builds (and trains) the IVF from the store.
3821        let _ = db
3822            .search("c", &[1.0, 0.0, 0.0, 0.0], &SearchParams::default())
3823            .unwrap();
3824        assert!(!db.collections["c"].stale, "the search built the index");
3825
3826        // Incremental insert: a new outlier is found, with no rebuild scheduled.
3827        db.upsert("c", "far", &[500.0, 0.0, 0.0, 0.0], &json!({}))
3828            .unwrap();
3829        assert!(!db.collections["c"].stale, "ivf insert stayed incremental");
3830        let res = db
3831            .search("c", &[500.0, 0.0, 0.0, 0.0], &SearchParams::default())
3832            .unwrap();
3833        assert_eq!(res[0].id, "far");
3834
3835        // Incremental delete: the point disappears, still with no rebuild.
3836        assert!(db.delete("c", "far").unwrap());
3837        assert!(!db.collections["c"].stale, "ivf delete stayed incremental");
3838        let res = db
3839            .search("c", &[500.0, 0.0, 0.0, 0.0], &SearchParams::default())
3840            .unwrap();
3841        assert!(res.iter().all(|m| m.id != "far"), "deleted point is gone");
3842    }
3843
3844    #[test]
3845    fn ivf_incremental_update_replaces_the_vector() {
3846        let tmp = tempfile::tempdir().unwrap();
3847        let mut db = open(tmp.path());
3848        db.create_collection("c", desc_with(IndexKind::Ivf))
3849            .unwrap();
3850        for i in 0..30u32 {
3851            db.upsert(
3852                "c",
3853                &format!("p{i}"),
3854                &[i as f32, 0.0, 0.0, 0.0],
3855                &json!({}),
3856            )
3857            .unwrap();
3858        }
3859        let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
3860        // Move p5 far away in place.
3861        db.upsert("c", "p5", &[900.0, 0.0, 0.0, 0.0], &json!({}))
3862            .unwrap();
3863        assert!(!db.collections["c"].stale);
3864        let at_new = db
3865            .search("c", &[900.0, 0.0, 0.0, 0.0], &SearchParams::default())
3866            .unwrap();
3867        assert_eq!(at_new[0].id, "p5", "p5 found at its new location");
3868        let at_old = db
3869            .search("c", &[5.0, 0.0, 0.0, 0.0], &SearchParams::default())
3870            .unwrap();
3871        assert!(at_old.iter().all(|m| m.id != "p5"), "stale vector is gone");
3872    }
3873
3874    #[test]
3875    fn ivf_reinsert_after_incremental_delete_is_found() {
3876        let tmp = tempfile::tempdir().unwrap();
3877        let mut db = open(tmp.path());
3878        db.create_collection("c", desc_with(IndexKind::Ivf))
3879            .unwrap();
3880        for i in 0..20u32 {
3881            db.upsert(
3882                "c",
3883                &format!("p{i}"),
3884                &[i as f32, 0.0, 0.0, 0.0],
3885                &json!({}),
3886            )
3887            .unwrap();
3888        }
3889        let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
3890        assert!(db.delete("c", "p3").unwrap());
3891        assert!(!db.collections["c"].stale);
3892        // Re-insert the same id; it must be searchable again (the slot is reused).
3893        db.upsert("c", "p3", &[3.0, 0.0, 0.0, 0.0], &json!({}))
3894            .unwrap();
3895        assert!(!db.collections["c"].stale);
3896        let res = db
3897            .search("c", &[3.0, 0.0, 0.0, 0.0], &SearchParams::default())
3898            .unwrap();
3899        assert_eq!(res[0].id, "p3");
3900    }
3901
3902    #[test]
3903    fn hnsw_in_place_update_falls_back_to_rebuild() {
3904        // HNSW cannot update an id in place, so an update marks the index stale.
3905        let tmp = tempfile::tempdir().unwrap();
3906        let mut db = open(tmp.path());
3907        db.create_collection("c", desc()).unwrap();
3908        for i in 0..10u32 {
3909            db.upsert(
3910                "c",
3911                &format!("p{i}"),
3912                &[i as f32, 0.0, 0.0, 0.0],
3913                &json!({}),
3914            )
3915            .unwrap();
3916        }
3917        let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
3918        assert!(!db.collections["c"].stale);
3919        db.upsert("c", "p2", &[42.0, 0.0, 0.0, 0.0], &json!({}))
3920            .unwrap();
3921        assert!(db.collections["c"].stale, "hnsw update schedules a rebuild");
3922        // The rebuild on the next search reflects the new vector.
3923        let res = db
3924            .search("c", &[42.0, 0.0, 0.0, 0.0], &SearchParams::default())
3925            .unwrap();
3926        assert_eq!(res[0].id, "p2");
3927    }
3928
3929    #[test]
3930    fn unsupported_index_configurations_are_rejected() {
3931        let tmp = tempfile::tempdir().unwrap();
3932        let mut db = open(tmp.path());
3933        // Vamana/IVF do not support inner product.
3934        let dot_vamana =
3935            Descriptor::new(4, Dtype::F32, DistanceMetric::Dot).with_index(IndexSpec {
3936                kind: IndexKind::Vamana,
3937                pq_subspaces: None,
3938            });
3939        assert!(matches!(
3940            db.create_collection("a", dot_vamana),
3941            Err(Error::Unsupported(_))
3942        ));
3943        // ...nor does the disk-resident index.
3944        let dot_disk = Descriptor::new(4, Dtype::F32, DistanceMetric::Dot).with_index(IndexSpec {
3945            kind: IndexKind::DiskVamana,
3946            pq_subspaces: None,
3947        });
3948        assert!(matches!(
3949            db.create_collection("b", dot_disk),
3950            Err(Error::Unsupported(_))
3951        ));
3952    }
3953
3954    #[test]
3955    fn dcpe_collections_require_the_l2_metric() {
3956        let tmp = tempfile::tempdir().unwrap();
3957        let mut db = open(tmp.path());
3958        // DCPE preserves Euclidean distance, so cosine/dot are rejected.
3959        for metric in [DistanceMetric::Cosine, DistanceMetric::Dot] {
3960            let bad = Descriptor::new(4, Dtype::F32, metric)
3961                .with_vector_encryption(VectorEncryption::Dcpe);
3962            assert!(matches!(
3963                db.create_collection("bad", bad),
3964                Err(Error::Unsupported(_))
3965            ));
3966        }
3967        // L2 is accepted, and the flag persists on the descriptor.
3968        let good = Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
3969            .with_vector_encryption(VectorEncryption::Dcpe);
3970        db.create_collection("enc", good)
3971            .expect("l2 dcpe collection");
3972        assert_eq!(
3973            db.descriptor("enc").expect("descriptor").vector_encryption,
3974            VectorEncryption::Dcpe
3975        );
3976    }
3977
3978    #[test]
3979    fn client_side_collections_are_fetch_only_and_reject_search() {
3980        let tmp = tempfile::tempdir().unwrap();
3981        let mut db = open(tmp.path());
3982        // Any metric is allowed (the server never ranks), so there is no L2
3983        // restriction as there is for DCPE.
3984        let desc = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
3985            .with_vector_encryption(VectorEncryption::ClientSide);
3986        db.create_collection("vault", desc)
3987            .expect("create client-side collection");
3988        // No server-side index is built for a client-side collection (ADR-0032).
3989        assert!(matches!(
3990            db.collections["vault"].index,
3991            CollectionIndex::None
3992        ));
3993
3994        // Upsert opaque placeholder points: a zero vector plus a payload carrying a
3995        // stand-in sealed vector blob and a cleartext, server-filterable field.
3996        for i in 0..5 {
3997            let tier = if i < 2 { "vip" } else { "std" };
3998            db.upsert(
3999                "vault",
4000                &format!("p{i}"),
4001                &[0.0; 4],
4002                &serde_json::json!({ "__quiver_vec__": format!("ct-{i}"), "tier": tier }),
4003            )
4004            .expect("upsert");
4005        }
4006        assert_eq!(db.len("vault").unwrap(), 5);
4007        // Still no index after writes — it never goes stale or rebuilds.
4008        assert!(matches!(
4009            db.collections["vault"].index,
4010            CollectionIndex::None
4011        ));
4012
4013        // Ranked search is rejected: the server cannot rank opaque vectors.
4014        assert!(matches!(
4015            db.search("vault", &[0.0; 4], &SearchParams::default()),
4016            Err(Error::Unsupported(_))
4017        ));
4018
4019        // Fetch returns the entitled set; each payload carries the blob the client
4020        // would decrypt and rank locally, and vectors are omitted when not asked for.
4021        let all = db.fetch("vault", None, 100, true, false).unwrap();
4022        assert_eq!(all.len(), 5);
4023        assert!(
4024            all.iter()
4025                .all(|m| m.payload.is_some() && m.vector.is_none())
4026        );
4027
4028        // A cleartext payload filter narrows the set server-side.
4029        let vip = db
4030            .fetch(
4031                "vault",
4032                Some(&Filter::Eq {
4033                    field: "tier".to_owned(),
4034                    value: serde_json::json!("vip"),
4035                }),
4036                100,
4037                false,
4038                false,
4039            )
4040            .unwrap();
4041        assert_eq!(vip.len(), 2);
4042        // A limit bounds the returned set.
4043        assert_eq!(db.fetch("vault", None, 2, false, false).unwrap().len(), 2);
4044
4045        // get returns the stored placeholder + blob payload by id; delete works
4046        // through the store with no index to update.
4047        assert_eq!(db.get("vault", "p0").unwrap().unwrap().id, "p0");
4048        assert!(db.delete("vault", "p0").unwrap());
4049        assert_eq!(db.len("vault").unwrap(), 4);
4050    }
4051
4052    #[test]
4053    fn client_side_encryption_rejects_multivector() {
4054        let tmp = tempfile::tempdir().unwrap();
4055        let mut db = open(tmp.path());
4056        let desc = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
4057            .with_multivector(true)
4058            .with_vector_encryption(VectorEncryption::ClientSide);
4059        assert!(matches!(
4060            db.create_collection("bad", desc),
4061            Err(Error::Unsupported(_))
4062        ));
4063    }
4064
4065    // Recursively check whether a file named `name` exists under `dir`.
4066    fn contains_file(dir: &Path, name: &str) -> bool {
4067        std::fs::read_dir(dir).is_ok_and(|rd| {
4068            rd.flatten().any(|e| {
4069                let p = e.path();
4070                if p.is_dir() {
4071                    contains_file(&p, name)
4072                } else {
4073                    p.file_name().is_some_and(|f| f == name)
4074                }
4075            })
4076        })
4077    }
4078
4079    #[test]
4080    fn disk_index_collection_searches_persists_and_writes_an_artifact() {
4081        let tmp = tempfile::tempdir().unwrap();
4082        {
4083            let mut db = open(tmp.path());
4084            db.create_collection("d", desc_with(IndexKind::DiskVamana))
4085                .unwrap();
4086            for i in 0..40u32 {
4087                db.upsert(
4088                    "d",
4089                    &format!("p{i}"),
4090                    &[i as f32, 0.0, 0.0, 0.0],
4091                    &json!({}),
4092                )
4093                .unwrap();
4094            }
4095            let res = db
4096                .search("d", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
4097                .unwrap();
4098            assert_eq!(res[0].id, "p7");
4099            db.checkpoint().unwrap();
4100        }
4101        // The encrypted disk artifact was written under the collection's index dir.
4102        assert!(
4103            contains_file(tmp.path(), "vamana.qvx"),
4104            "disk index file missing"
4105        );
4106        // Reopening rebuilds and re-opens the disk index; search still works.
4107        let mut db = open(tmp.path());
4108        assert_eq!(
4109            db.descriptor("d").unwrap().index.kind,
4110            IndexKind::DiskVamana
4111        );
4112        let res = db
4113            .search("d", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
4114            .unwrap();
4115        assert_eq!(res[0].id, "p7");
4116    }
4117
4118    #[test]
4119    fn graph_collections_maintain_writes_incrementally() {
4120        // FreshDiskANN incremental maintenance (ADR-0033): after the base graph is
4121        // built, inserts land in the delta, deletes tombstone, and updates move —
4122        // all without a rebuild or a reopen, for both the in-memory and disk graphs.
4123        for kind in [IndexKind::Vamana, IndexKind::DiskVamana] {
4124            let tmp = tempfile::tempdir().unwrap();
4125            let mut db = open(tmp.path());
4126            db.create_collection("c", desc_with(kind)).unwrap();
4127            for i in 0..40u32 {
4128                db.upsert(
4129                    "c",
4130                    &format!("p{i}"),
4131                    &[i as f32, 0.0, 0.0, 0.0],
4132                    &json!({}),
4133                )
4134                .unwrap();
4135            }
4136            // The first search builds the read-only base graph.
4137            let res = db
4138                .search("c", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
4139                .unwrap();
4140            assert_eq!(res[0].id, "p7", "{kind:?} base nearest");
4141
4142            // A brand-new point lands in the in-memory delta and is immediately
4143            // findable — no rebuild, no reopen.
4144            db.upsert("c", "p7b", &[7.4, 0.0, 0.0, 0.0], &json!({}))
4145                .unwrap();
4146            let res = db
4147                .search("c", &[7.45, 0.0, 0.0, 0.0], &SearchParams::default())
4148                .unwrap();
4149            assert_eq!(res[0].id, "p7b", "{kind:?} delta insert not found");
4150
4151            // A delete tombstones the point: it is never returned again.
4152            assert!(db.delete("c", "p7").unwrap());
4153            let res = db
4154                .search("c", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
4155                .unwrap();
4156            assert!(
4157                res.iter().all(|m| m.id != "p7"),
4158                "{kind:?} deleted id returned"
4159            );
4160
4161            // An update moves a vector: it is found at the new position and no
4162            // longer dominates the old one.
4163            db.upsert("c", "p20", &[500.0, 0.0, 0.0, 0.0], &json!({}))
4164                .unwrap();
4165            let res = db
4166                .search("c", &[500.0, 0.0, 0.0, 0.0], &SearchParams::default())
4167                .unwrap();
4168            assert_eq!(res[0].id, "p20", "{kind:?} updated vector not at new spot");
4169            let res = db
4170                .search("c", &[20.0, 0.0, 0.0, 0.0], &SearchParams::default())
4171                .unwrap();
4172            assert_ne!(
4173                res[0].id, "p20",
4174                "{kind:?} stale copy still nearest old spot"
4175            );
4176        }
4177    }
4178
4179    #[test]
4180    fn graph_consolidates_under_heavy_churn() {
4181        // Churn past the 20% pending threshold forces a consolidation (a derived
4182        // rebuild from the store); results stay correct throughout and across a
4183        // reopen, since the store is the source of truth (ADR-0033).
4184        let tmp = tempfile::tempdir().unwrap();
4185        let mut db = open(tmp.path());
4186        db.create_collection("c", desc_with(IndexKind::Vamana))
4187            .unwrap();
4188        for i in 0..50u32 {
4189            db.upsert(
4190                "c",
4191                &format!("p{i}"),
4192                &[i as f32, 0.0, 0.0, 0.0],
4193                &json!({}),
4194            )
4195            .unwrap();
4196        }
4197        let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
4198
4199        let deleted: Vec<String> = (0..15u32).map(|i| format!("p{i}")).collect();
4200        for i in 0..15u32 {
4201            assert!(db.delete("c", &format!("p{i}")).unwrap());
4202            db.upsert(
4203                "c",
4204                &format!("q{i}"),
4205                &[1000.0 + i as f32, 0.0, 0.0, 0.0],
4206                &json!({}),
4207            )
4208            .unwrap();
4209        }
4210
4211        let near_origin = db
4212            .search("c", &[5.0, 0.0, 0.0, 0.0], &SearchParams::default())
4213            .unwrap();
4214        assert!(
4215            near_origin.iter().all(|m| !deleted.contains(&m.id)),
4216            "a churned-out id was returned"
4217        );
4218        let near_q = db
4219            .search("c", &[1007.0, 0.0, 0.0, 0.0], &SearchParams::default())
4220            .unwrap();
4221        assert_eq!(near_q[0].id, "q7", "new point not found after churn");
4222
4223        db.checkpoint().unwrap();
4224        drop(db);
4225        let mut db = open(tmp.path());
4226        let near_q = db
4227            .search("c", &[1007.0, 0.0, 0.0, 0.0], &SearchParams::default())
4228            .unwrap();
4229        assert_eq!(near_q[0].id, "q7", "new point lost across reopen");
4230        let near_origin = db
4231            .search("c", &[5.0, 0.0, 0.0, 0.0], &SearchParams::default())
4232            .unwrap();
4233        assert!(
4234            near_origin.iter().all(|m| !deleted.contains(&m.id)),
4235            "a churned-out id resurfaced after reopen"
4236        );
4237    }
4238
4239    #[test]
4240    fn multivector_writes_are_incremental_and_match_a_rebuild() {
4241        // Token rows fold into the ANN index incrementally instead of stale->rebuild
4242        // (ADR-0034); the document API stays correct under interleaved
4243        // upsert/delete/re-upsert with no reopen, and a reopen (the authoritative
4244        // rebuild) yields the identical ranking. Docs lie on an arc of increasing
4245        // angle to the query (cosine, so direction alone ranks them), making the
4246        // expected order unambiguous. (Token-pool candidate generation only engages
4247        // above the exact-scan threshold; the incremental token maintenance it
4248        // relies on is the same code the single-vector index tests exercise.)
4249        let dir = |theta: f32| vec![theta.cos(), theta.sin(), 0.0, 0.0];
4250        let doc = |theta: f32| vec![dir(theta), dir(theta)];
4251        for kind in [
4252            IndexKind::Ivf,
4253            IndexKind::Hnsw,
4254            IndexKind::Vamana,
4255            IndexKind::Colbert,
4256        ] {
4257            let tmp = tempfile::tempdir().unwrap();
4258            let desc = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
4259                .with_multivector(true)
4260                .with_index(IndexSpec {
4261                    kind,
4262                    pq_subspaces: None,
4263                });
4264            let mut db = open(tmp.path());
4265            db.create_collection("m", desc).unwrap();
4266            // d1 is most aligned with the query (smallest angle), d10 least.
4267            for i in 1..=10u32 {
4268                db.upsert_document(
4269                    "m",
4270                    &format!("d{i}"),
4271                    &doc(0.1 * i as f32),
4272                    &json!({ "i": i }),
4273                )
4274                .unwrap();
4275            }
4276            let q = vec![dir(0.0)];
4277            let top = |db: &mut Database| {
4278                db.search_multi_vector(
4279                    "m",
4280                    &q,
4281                    &SearchParams {
4282                        k: 3,
4283                        ..Default::default()
4284                    },
4285                )
4286                .unwrap()
4287                .into_iter()
4288                .map(|m| m.id)
4289                .collect::<Vec<_>>()
4290            };
4291            assert_eq!(top(&mut db), vec!["d1", "d2", "d3"], "{kind:?} initial");
4292
4293            // Delete the top document — gone without a reopen.
4294            assert!(db.delete_document("m", "d1").unwrap());
4295            assert_eq!(
4296                top(&mut db),
4297                vec!["d2", "d3", "d4"],
4298                "{kind:?} after delete"
4299            );
4300
4301            // Re-upsert the least-aligned doc onto the query — the update is live.
4302            db.upsert_document("m", "d10", &doc(0.0), &json!({ "i": 10 }))
4303                .unwrap();
4304            assert_eq!(top(&mut db)[0], "d10", "{kind:?} after update");
4305
4306            // A new, near-aligned document is immediately findable, second only to d10.
4307            db.upsert_document("m", "d11", &doc(0.05), &json!({ "i": 11 }))
4308                .unwrap();
4309            let r = top(&mut db);
4310            assert_eq!(r[0], "d10", "{kind:?}");
4311            assert_eq!(r[1], "d11", "{kind:?} new doc not ranked");
4312
4313            // A shorter re-upsert drops the trailing token row.
4314            db.upsert_document("m", "d8", &[dir(0.8)], &json!({ "i": 8 }))
4315                .unwrap();
4316            let d8 = db.get_document("m", "d8", true).unwrap().unwrap();
4317            assert_eq!(d8.vectors.unwrap().len(), 1, "{kind:?} trailing token kept");
4318
4319            // The incremental in-memory state matches an authoritative rebuild.
4320            let before = top(&mut db);
4321            drop(db);
4322            let mut db = open(tmp.path());
4323            assert_eq!(top(&mut db), before, "{kind:?} incremental != rebuild");
4324            assert!(
4325                db.get_document("m", "d1", false).unwrap().is_none(),
4326                "{kind:?} deleted doc resurfaced"
4327            );
4328        }
4329    }
4330
4331    #[test]
4332    fn colbert_index_requires_multivector() {
4333        let tmp = tempfile::tempdir().unwrap();
4334        let mut db = open(tmp.path());
4335        // ColBERT is a late-interaction token-pool index — invalid for a
4336        // single-vector collection (ADR-0034).
4337        let single = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine).with_index(IndexSpec {
4338            kind: IndexKind::Colbert,
4339            pq_subspaces: None,
4340        });
4341        assert!(matches!(
4342            db.create_collection("c", single),
4343            Err(Error::Unsupported(_))
4344        ));
4345        // ...but valid for a multi-vector collection.
4346        let multi = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
4347            .with_multivector(true)
4348            .with_index(IndexSpec {
4349                kind: IndexKind::Colbert,
4350                pq_subspaces: None,
4351            });
4352        assert!(db.create_collection("m", multi).is_ok());
4353    }
4354
4355    // ---- hybrid (pre-filtered) search ----
4356
4357    // A collection whose `city` (keyword) and `n` (numeric) payload fields are
4358    // declared filterable, so the planner can pre-filter on them.
4359    fn desc_filterable() -> Descriptor {
4360        Descriptor::new(4, Dtype::F32, DistanceMetric::L2).with_filterable(vec![
4361            FilterableField::keyword("city"),
4362            FilterableField::numeric("n"),
4363        ])
4364    }
4365
4366    // 30 points on the x-axis (distance to the origin grows with i), cycling
4367    // through three cities, each carrying its index as a numeric `n`.
4368    // Checkpointed so the rows live in a sealed segment's secondary index — the
4369    // pre-filter primitive — not only the active buffer.
4370    fn seed_cities(db: &mut Database) {
4371        const CITIES: [&str; 3] = ["paris", "lyon", "rome"];
4372        db.create_collection("c", desc_filterable()).unwrap();
4373        for i in 0..30u32 {
4374            db.upsert(
4375                "c",
4376                &format!("p{i}"),
4377                &[i as f32, 0.0, 0.0, 0.0],
4378                &json!({"city": CITIES[i as usize % 3], "n": i}),
4379            )
4380            .unwrap();
4381        }
4382        db.checkpoint().unwrap();
4383    }
4384
4385    #[test]
4386    fn hybrid_equality_prefilter_is_exact() {
4387        let tmp = tempfile::tempdir().unwrap();
4388        let mut db = open(tmp.path());
4389        seed_cities(&mut db);
4390        let params = SearchParams {
4391            k: 5,
4392            filter: Some(Filter::Eq {
4393                field: "city".into(),
4394                value: json!("lyon"),
4395            }),
4396            ..SearchParams::default()
4397        };
4398        let res = db.search("c", &[0.0; 4], &params).unwrap();
4399        assert!(!res.is_empty());
4400        // lyon is i % 3 == 1 → p1, p4, p7, …; nearest the origin is p1.
4401        assert_eq!(res[0].id, "p1");
4402        for m in &res {
4403            assert_eq!(m.payload.as_ref().unwrap()["city"], json!("lyon"));
4404        }
4405    }
4406
4407    #[test]
4408    fn hybrid_numeric_range_prefilter_is_exact() {
4409        let tmp = tempfile::tempdir().unwrap();
4410        let mut db = open(tmp.path());
4411        seed_cities(&mut db);
4412        let params = SearchParams {
4413            k: 4,
4414            filter: Some(Filter::Gte {
4415                field: "n".into(),
4416                value: json!(10),
4417            }),
4418            ..SearchParams::default()
4419        };
4420        let res = db.search("c", &[0.0; 4], &params).unwrap();
4421        // Among n >= 10, the nearest the origin is n == 10 (p10).
4422        assert_eq!(res[0].id, "p10");
4423        for m in &res {
4424            assert!(m.payload.as_ref().unwrap()["n"].as_u64().unwrap() >= 10);
4425        }
4426    }
4427
4428    #[test]
4429    fn hybrid_unsatisfiable_filter_returns_empty() {
4430        let tmp = tempfile::tempdir().unwrap();
4431        let mut db = open(tmp.path());
4432        seed_cities(&mut db);
4433        // No row holds this city, so the pre-filter proves the result empty
4434        // without touching the vector index.
4435        let params = SearchParams {
4436            filter: Some(Filter::Eq {
4437                field: "city".into(),
4438                value: json!("atlantis"),
4439            }),
4440            ..SearchParams::default()
4441        };
4442        assert!(db.search("c", &[0.0; 4], &params).unwrap().is_empty());
4443    }
4444
4445    #[test]
4446    fn hybrid_and_or_composition_is_exact() {
4447        let tmp = tempfile::tempdir().unwrap();
4448        let mut db = open(tmp.path());
4449        seed_cities(&mut db);
4450        // (city in {paris, rome}) AND (n < 12): a keyword `in` intersected with a
4451        // numeric range, both pre-filterable.
4452        let params = SearchParams {
4453            k: 10,
4454            filter: Some(Filter::And(vec![
4455                Filter::In {
4456                    field: "city".into(),
4457                    values: vec![json!("paris"), json!("rome")],
4458                },
4459                Filter::Lt {
4460                    field: "n".into(),
4461                    value: json!(12),
4462                },
4463            ])),
4464            ..SearchParams::default()
4465        };
4466        let res = db.search("c", &[0.0; 4], &params).unwrap();
4467        // paris is i % 3 == 0 → the nearest qualifying point is p0.
4468        assert_eq!(res[0].id, "p0");
4469        for m in &res {
4470            let payload = m.payload.as_ref().unwrap();
4471            let city = payload["city"].as_str().unwrap();
4472            assert!(city == "paris" || city == "rome");
4473            assert!(payload["n"].as_u64().unwrap() < 12);
4474        }
4475    }
4476
4477    #[test]
4478    fn hybrid_rechecks_non_indexable_clause() {
4479        let tmp = tempfile::tempdir().unwrap();
4480        let mut db = open(tmp.path());
4481        seed_cities(&mut db);
4482        // city is pre-filterable; the Not(…) clause is not, so it is re-checked
4483        // exactly on the narrowed candidates — paris rows excluding p0.
4484        let params = SearchParams {
4485            k: 10,
4486            filter: Some(Filter::And(vec![
4487                Filter::Eq {
4488                    field: "city".into(),
4489                    value: json!("paris"),
4490                },
4491                Filter::Not(Box::new(Filter::Eq {
4492                    field: "n".into(),
4493                    value: json!(0),
4494                })),
4495            ])),
4496            ..SearchParams::default()
4497        };
4498        let res = db.search("c", &[0.0; 4], &params).unwrap();
4499        assert!(res.iter().all(|m| m.id != "p0"));
4500        // The nearest paris point after p0 is p3.
4501        assert_eq!(res[0].id, "p3");
4502        for m in &res {
4503            assert_eq!(m.payload.as_ref().unwrap()["city"], json!("paris"));
4504        }
4505    }
4506
4507    #[test]
4508    fn post_filter_fallback_on_undeclared_field_is_correct() {
4509        let tmp = tempfile::tempdir().unwrap();
4510        let mut db = open(tmp.path());
4511        // Only `city` is filterable; a filter on the undeclared `tier` cannot
4512        // pre-filter and falls back to ANN post-filtering — still exact.
4513        db.create_collection(
4514            "c",
4515            Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
4516                .with_filterable(vec![FilterableField::keyword("city")]),
4517        )
4518        .unwrap();
4519        for i in 0..20u32 {
4520            let tier = if i % 2 == 0 { "gold" } else { "silver" };
4521            db.upsert(
4522                "c",
4523                &format!("p{i}"),
4524                &[i as f32, 0.0, 0.0, 0.0],
4525                &json!({"city": "paris", "tier": tier}),
4526            )
4527            .unwrap();
4528        }
4529        let params = SearchParams {
4530            k: 5,
4531            filter: Some(Filter::Eq {
4532                field: "tier".into(),
4533                value: json!("gold"),
4534            }),
4535            ..SearchParams::default()
4536        };
4537        let res = db.search("c", &[0.0; 4], &params).unwrap();
4538        assert!(!res.is_empty());
4539        for m in &res {
4540            assert_eq!(m.payload.as_ref().unwrap()["tier"], json!("gold"));
4541        }
4542    }
4543
4544    #[test]
4545    fn leaf_predicate_maps_only_indexable_filterable_leaves() {
4546        let fields = vec![
4547            FilterableField::keyword("city"),
4548            FilterableField::numeric("n"),
4549        ];
4550        // Keyword equality on a filterable field maps.
4551        assert_eq!(
4552            leaf_predicate(
4553                &Filter::Eq {
4554                    field: "city".into(),
4555                    value: json!("paris")
4556                },
4557                &fields
4558            ),
4559            Some(SecPredicate::Eq {
4560                field: "city".into(),
4561                value: SecValue::Keyword("paris".into())
4562            })
4563        );
4564        // A numeric comparison maps to a one-sided range.
4565        assert_eq!(
4566            leaf_predicate(
4567                &Filter::Gte {
4568                    field: "n".into(),
4569                    value: json!(3)
4570                },
4571                &fields
4572            ),
4573            Some(SecPredicate::Range {
4574                field: "n".into(),
4575                lo: Some(SecValue::Numeric(3.0)),
4576                hi: None,
4577                lo_inclusive: true,
4578                hi_inclusive: false,
4579            })
4580        );
4581        // Undeclared field, type mismatch, `ne`, and `exists` do not map.
4582        let undeclared = Filter::Eq {
4583            field: "tier".into(),
4584            value: json!("gold"),
4585        };
4586        let mismatch = Filter::Eq {
4587            field: "city".into(),
4588            value: json!(5),
4589        };
4590        let ne = Filter::Ne {
4591            field: "city".into(),
4592            value: json!("x"),
4593        };
4594        let exists = Filter::Exists {
4595            field: "city".into(),
4596        };
4597        assert!(leaf_predicate(&undeclared, &fields).is_none());
4598        assert!(leaf_predicate(&mismatch, &fields).is_none());
4599        assert!(leaf_predicate(&ne, &fields).is_none());
4600        assert!(leaf_predicate(&exists, &fields).is_none());
4601    }
4602
4603    // ----- durable IVF index recovery (ADR-0025) -----
4604
4605    // The first collection created in a fresh store has id 0.
4606    fn ivf_index_dir(root: &Path) -> std::path::PathBuf {
4607        root.join("collections").join("0000000000").join("index")
4608    }
4609
4610    fn idx_snapshot_files(root: &Path) -> Vec<String> {
4611        let mut v: Vec<String> = std::fs::read_dir(ivf_index_dir(root))
4612            .map(|rd| {
4613                rd.filter_map(std::result::Result::ok)
4614                    .filter_map(|e| e.file_name().to_str().map(str::to_owned))
4615                    .filter(|n| n.starts_with("idx-"))
4616                    .collect()
4617            })
4618            .unwrap_or_default();
4619        v.sort();
4620        v
4621    }
4622
4623    fn nearest(db: &mut Database, q: &[f32]) -> Vec<String> {
4624        db.search("c", q, &SearchParams::default())
4625            .unwrap()
4626            .into_iter()
4627            .map(|m| m.id)
4628            .collect()
4629    }
4630
4631    fn seed_ivf(db: &mut Database, n: u32) {
4632        db.create_collection("c", desc_with(IndexKind::Ivf))
4633            .unwrap();
4634        for i in 0..n {
4635            db.upsert(
4636                "c",
4637                &format!("p{i}"),
4638                &[i as f32, 0.0, 0.0, 0.0],
4639                &json!({}),
4640            )
4641            .unwrap();
4642        }
4643        // The first search builds (and trains) the IVF from the store.
4644        let _ = nearest(db, &[1.0, 0.0, 0.0, 0.0]);
4645    }
4646
4647    #[test]
4648    fn ivf_snapshot_is_written_at_checkpoint() {
4649        let tmp = tempfile::tempdir().unwrap();
4650        let mut db = open(tmp.path());
4651        seed_ivf(&mut db, 40);
4652        db.checkpoint().unwrap();
4653        assert_eq!(idx_snapshot_files(tmp.path()).len(), 1);
4654    }
4655
4656    #[test]
4657    fn ivf_loads_from_snapshot_rather_than_rebuilding() {
4658        let tmp = tempfile::tempdir().unwrap();
4659        {
4660            let mut db = open(tmp.path());
4661            db.create_collection("c", desc_with(IndexKind::Ivf))
4662                .unwrap();
4663            db.upsert("c", "a", &[0.0, 0.0, 0.0, 0.0], &json!({}))
4664                .unwrap();
4665            db.upsert("c", "m", &[1.0, 0.0, 0.0, 0.0], &json!({}))
4666                .unwrap();
4667            // First search builds the IVF — int_to_ext is the sorted scan order.
4668            let _ = nearest(&mut db, &[0.0, 0.0, 0.0, 0.0]);
4669            // Incremental upserts append in insertion order, diverging from sort.
4670            db.upsert("c", "z", &[2.0, 0.0, 0.0, 0.0], &json!({}))
4671                .unwrap();
4672            db.upsert("c", "b", &[3.0, 0.0, 0.0, 0.0], &json!({}))
4673                .unwrap();
4674            db.checkpoint().unwrap();
4675            assert_eq!(db.collections["c"].int_to_ext, ["a", "m", "z", "b"]);
4676        }
4677        let db = open(tmp.path());
4678        // Loaded from the snapshot: the insertion-order mapping is preserved. A
4679        // rebuild would have produced the sorted order ["a", "b", "m", "z"].
4680        assert_eq!(
4681            db.collections["c"].int_to_ext,
4682            ["a", "m", "z", "b"],
4683            "index was rebuilt, not loaded from the snapshot"
4684        );
4685    }
4686
4687    #[test]
4688    fn ivf_recovery_replays_post_checkpoint_upserts() {
4689        let tmp = tempfile::tempdir().unwrap();
4690        {
4691            let mut db = open(tmp.path());
4692            seed_ivf(&mut db, 30);
4693            db.checkpoint().unwrap();
4694            // Post-checkpoint upsert, no further checkpoint.
4695            db.upsert("c", "far", &[500.0, 0.0, 0.0, 0.0], &json!({}))
4696                .unwrap();
4697        }
4698        let mut db = open(tmp.path());
4699        assert_eq!(nearest(&mut db, &[500.0, 0.0, 0.0, 0.0])[0], "far");
4700        assert_eq!(nearest(&mut db, &[1.0, 0.0, 0.0, 0.0])[0], "p1");
4701    }
4702
4703    #[test]
4704    fn ivf_recovery_replays_post_checkpoint_deletes() {
4705        let tmp = tempfile::tempdir().unwrap();
4706        {
4707            let mut db = open(tmp.path());
4708            seed_ivf(&mut db, 30);
4709            db.checkpoint().unwrap();
4710            assert!(db.delete("c", "p7").unwrap());
4711        }
4712        let mut db = open(tmp.path());
4713        assert!(
4714            nearest(&mut db, &[7.0, 0.0, 0.0, 0.0])
4715                .iter()
4716                .all(|id| id != "p7")
4717        );
4718        assert!(db.get("c", "p7").unwrap().is_none());
4719        assert!(db.get("c", "p6").unwrap().is_some());
4720    }
4721
4722    #[test]
4723    fn ivf_recovery_replays_post_checkpoint_updates() {
4724        let tmp = tempfile::tempdir().unwrap();
4725        {
4726            let mut db = open(tmp.path());
4727            seed_ivf(&mut db, 30);
4728            db.checkpoint().unwrap();
4729            // Move p0 far away — an in-place update shadowing its sealed row.
4730            db.upsert("c", "p0", &[999.0, 0.0, 0.0, 0.0], &json!({}))
4731                .unwrap();
4732        }
4733        let mut db = open(tmp.path());
4734        assert_eq!(nearest(&mut db, &[999.0, 0.0, 0.0, 0.0])[0], "p0");
4735        assert_ne!(
4736            nearest(&mut db, &[0.0, 0.0, 0.0, 0.0])[0],
4737            "p0",
4738            "the stale p0 vector survived the update"
4739        );
4740    }
4741
4742    #[test]
4743    fn corrupt_ivf_snapshot_falls_back_to_rebuild() {
4744        let tmp = tempfile::tempdir().unwrap();
4745        {
4746            let mut db = open(tmp.path());
4747            seed_ivf(&mut db, 30);
4748            db.checkpoint().unwrap();
4749        }
4750        // Corrupt the snapshot file; recovery must fall back to a rebuild.
4751        let files = idx_snapshot_files(tmp.path());
4752        assert_eq!(files.len(), 1);
4753        std::fs::write(ivf_index_dir(tmp.path()).join(&files[0]), b"corrupt").unwrap();
4754
4755        let mut db = open(tmp.path());
4756        assert_eq!(nearest(&mut db, &[7.0, 0.0, 0.0, 0.0])[0], "p7");
4757    }
4758
4759    // ---- Multi-vector / late interaction (ColBERT, ADR-0028) ----
4760
4761    fn mv_desc() -> Descriptor {
4762        Descriptor::new(3, Dtype::F32, DistanceMetric::Cosine).with_multivector(true)
4763    }
4764
4765    // Brute-force MaxSim ranking over a corpus: the reference the pipeline must
4766    // reproduce (score desc, ties by id), using the same shared scorer.
4767    fn bf_rank(query: &[Vec<f32>], corpus: &[(&str, Vec<Vec<f32>>)]) -> Vec<(String, f32)> {
4768        let mut v: Vec<(String, f32)> = corpus
4769            .iter()
4770            .map(|(id, toks)| ((*id).to_owned(), max_sim(Metric::Cosine, query, toks)))
4771            .collect();
4772        v.sort_by(|a, b| b.1.total_cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
4773        v
4774    }
4775
4776    #[test]
4777    fn multivector_search_ranks_documents_by_maxsim() {
4778        let tmp = tempfile::tempdir().unwrap();
4779        let mut db = open(tmp.path());
4780        db.create_collection("docs", mv_desc()).unwrap();
4781        let corpus: Vec<(&str, Vec<Vec<f32>>)> = vec![
4782            ("d_cat", vec![vec![1.0, 0.0, 0.0], vec![0.0, 1.0, 0.0]]),
4783            ("d_dog", vec![vec![0.0, 1.0, 0.0], vec![0.0, 0.0, 1.0]]),
4784            (
4785                "d_mix",
4786                vec![
4787                    vec![1.0, 1.0, 0.0],
4788                    vec![0.0, 0.0, 1.0],
4789                    vec![1.0, 0.0, 1.0],
4790                ],
4791            ),
4792        ];
4793        for (id, toks) in &corpus {
4794            db.upsert_document("docs", id, toks, &json!({ "id": id }))
4795                .unwrap();
4796        }
4797        assert_eq!(db.document_count("docs").unwrap(), 3);
4798
4799        let query = vec![vec![1.0, 0.0, 0.0], vec![0.0, 0.0, 1.0]];
4800        let params = SearchParams {
4801            k: 3,
4802            with_payload: false,
4803            ..SearchParams::default()
4804        };
4805        let got = db.search_multi_vector("docs", &query, &params).unwrap();
4806        let expected = bf_rank(&query, &corpus);
4807
4808        assert_eq!(got.len(), 3);
4809        for (g, (eid, escore)) in got.iter().zip(expected.iter()) {
4810            assert_eq!(&g.id, eid, "ranking matches brute force");
4811            assert!(
4812                (g.score - escore).abs() < 1e-5,
4813                "{} score {} vs {escore}",
4814                g.id,
4815                g.score
4816            );
4817        }
4818    }
4819
4820    #[test]
4821    fn multivector_search_truncates_to_k() {
4822        let tmp = tempfile::tempdir().unwrap();
4823        let mut db = open(tmp.path());
4824        db.create_collection("docs", mv_desc()).unwrap();
4825        for i in 0..5 {
4826            let v = vec![vec![1.0, i as f32, 0.0]];
4827            db.upsert_document("docs", &format!("d{i}"), &v, &json!({}))
4828                .unwrap();
4829        }
4830        let params = SearchParams {
4831            k: 2,
4832            ..SearchParams::default()
4833        };
4834        let got = db
4835            .search_multi_vector("docs", &[vec![1.0, 0.0, 0.0]], &params)
4836            .unwrap();
4837        assert_eq!(got.len(), 2);
4838    }
4839
4840    #[test]
4841    fn multivector_filter_selects_documents_exactly() {
4842        let tmp = tempfile::tempdir().unwrap();
4843        let mut db = open(tmp.path());
4844        db.create_collection("docs", mv_desc()).unwrap();
4845        // Identical token sets, so only the filter distinguishes the documents.
4846        db.upsert_document("docs", "a", &[vec![1.0, 0.0, 0.0]], &json!({"lang":"en"}))
4847            .unwrap();
4848        db.upsert_document("docs", "b", &[vec![1.0, 0.0, 0.0]], &json!({"lang":"fr"}))
4849            .unwrap();
4850        let params = SearchParams {
4851            k: 10,
4852            filter: Some(Filter::Eq {
4853                field: "lang".into(),
4854                value: json!("fr"),
4855            }),
4856            ..SearchParams::default()
4857        };
4858        let got = db
4859            .search_multi_vector("docs", &[vec![1.0, 0.0, 0.0]], &params)
4860            .unwrap();
4861        assert_eq!(got.len(), 1);
4862        assert_eq!(got[0].id, "b");
4863        assert_eq!(got[0].payload, Some(json!({"lang":"fr"})));
4864    }
4865
4866    #[test]
4867    fn multivector_reopen_rebuilds_grouping_and_ranking() {
4868        let tmp = tempfile::tempdir().unwrap();
4869        let query = vec![vec![1.0, 0.0, 0.0], vec![0.0, 0.0, 1.0]];
4870        let corpus: Vec<(&str, Vec<Vec<f32>>)> = vec![
4871            ("x", vec![vec![1.0, 0.0, 0.0], vec![0.0, 1.0, 0.0]]),
4872            ("y", vec![vec![0.0, 0.0, 1.0], vec![1.0, 0.0, 1.0]]),
4873        ];
4874        {
4875            let mut db = open(tmp.path());
4876            db.create_collection("docs", mv_desc()).unwrap();
4877            for (id, toks) in &corpus {
4878                db.upsert_document("docs", id, toks, &json!({})).unwrap();
4879            }
4880            db.checkpoint().unwrap();
4881        }
4882        // Reopen: the document grouping is rebuilt from the live rows.
4883        let mut db = open(tmp.path());
4884        assert_eq!(db.document_count("docs").unwrap(), 2);
4885        let params = SearchParams {
4886            k: 2,
4887            ..SearchParams::default()
4888        };
4889        let got = db.search_multi_vector("docs", &query, &params).unwrap();
4890        let expected = bf_rank(&query, &corpus);
4891        assert_eq!(
4892            got.iter().map(|m| m.id.clone()).collect::<Vec<_>>(),
4893            expected
4894                .iter()
4895                .map(|(id, _)| id.clone())
4896                .collect::<Vec<_>>()
4897        );
4898    }
4899
4900    #[test]
4901    fn multivector_delete_document_removes_all_tokens() {
4902        let tmp = tempfile::tempdir().unwrap();
4903        let mut db = open(tmp.path());
4904        db.create_collection("docs", mv_desc()).unwrap();
4905        db.upsert_document(
4906            "docs",
4907            "a",
4908            &[vec![1.0, 0.0, 0.0], vec![0.0, 1.0, 0.0]],
4909            &json!({}),
4910        )
4911        .unwrap();
4912        db.upsert_document("docs", "b", &[vec![0.0, 0.0, 1.0]], &json!({}))
4913            .unwrap();
4914        assert_eq!(db.document_count("docs").unwrap(), 2);
4915        assert_eq!(db.len("docs").unwrap(), 3);
4916
4917        assert!(db.delete_document("docs", "a").unwrap());
4918        assert_eq!(db.document_count("docs").unwrap(), 1);
4919        assert_eq!(db.len("docs").unwrap(), 1);
4920        assert!(db.get_document("docs", "a", false).unwrap().is_none());
4921        let params = SearchParams {
4922            k: 10,
4923            ..SearchParams::default()
4924        };
4925        let got = db
4926            .search_multi_vector("docs", &[vec![1.0, 0.0, 0.0]], &params)
4927            .unwrap();
4928        assert!(got.iter().all(|m| m.id != "a"));
4929        assert!(!db.delete_document("docs", "a").unwrap());
4930    }
4931
4932    #[test]
4933    fn multivector_reupsert_replaces_tokens() {
4934        let tmp = tempfile::tempdir().unwrap();
4935        let mut db = open(tmp.path());
4936        db.create_collection("docs", mv_desc()).unwrap();
4937        db.upsert_document(
4938            "docs",
4939            "a",
4940            &[
4941                vec![1.0, 0.0, 0.0],
4942                vec![0.0, 1.0, 0.0],
4943                vec![0.0, 0.0, 1.0],
4944            ],
4945            &json!({"v":1}),
4946        )
4947        .unwrap();
4948        assert_eq!(db.len("docs").unwrap(), 3);
4949        // Re-upsert with a single token: the trailing two must be gone.
4950        db.upsert_document("docs", "a", &[vec![0.0, 0.0, 1.0]], &json!({"v":2}))
4951            .unwrap();
4952        assert_eq!(db.document_count("docs").unwrap(), 1);
4953        assert_eq!(db.len("docs").unwrap(), 1);
4954        let doc = db.get_document("docs", "a", true).unwrap().unwrap();
4955        assert_eq!(doc.payload, Some(json!({"v":2})));
4956        assert_eq!(doc.vectors, Some(vec![vec![0.0, 0.0, 1.0]]));
4957    }
4958
4959    #[test]
4960    fn single_and_multi_vector_apis_are_mutually_exclusive() {
4961        let tmp = tempfile::tempdir().unwrap();
4962        let mut db = open(tmp.path());
4963        db.create_collection("mv", mv_desc()).unwrap();
4964        db.create_collection("sv", Descriptor::new(3, Dtype::F32, DistanceMetric::Cosine))
4965            .unwrap();
4966        // Single-vector ops on a multi-vector collection are rejected.
4967        assert!(matches!(
4968            db.upsert("mv", "a", &[1.0, 0.0, 0.0], &json!({})),
4969            Err(Error::Unsupported(_))
4970        ));
4971        assert!(matches!(
4972            db.search("mv", &[1.0, 0.0, 0.0], &SearchParams::default()),
4973            Err(Error::Unsupported(_))
4974        ));
4975        // Document ops on a single-vector collection are rejected.
4976        assert!(matches!(
4977            db.upsert_document("sv", "a", &[vec![1.0, 0.0, 0.0]], &json!({})),
4978            Err(Error::Unsupported(_))
4979        ));
4980        assert!(matches!(
4981            db.search_multi_vector("sv", &[vec![1.0, 0.0, 0.0]], &SearchParams::default()),
4982            Err(Error::Unsupported(_))
4983        ));
4984        assert!(matches!(
4985            db.document_count("sv"),
4986            Err(Error::Unsupported(_))
4987        ));
4988    }
4989
4990    #[test]
4991    fn multivector_rejects_l2_metric_and_bad_documents() {
4992        let tmp = tempfile::tempdir().unwrap();
4993        let mut db = open(tmp.path());
4994        let l2 = Descriptor::new(3, Dtype::F32, DistanceMetric::L2).with_multivector(true);
4995        assert!(matches!(
4996            db.create_collection("bad", l2),
4997            Err(Error::Unsupported(_))
4998        ));
4999
5000        db.create_collection("docs", mv_desc()).unwrap();
5001        // A document id may not contain the reserved separator.
5002        assert!(matches!(
5003            db.upsert_document("docs", "a\u{1f}b", &[vec![1.0, 0.0, 0.0]], &json!({})),
5004            Err(Error::Unsupported(_))
5005        ));
5006        // A document needs at least one vector, of the right dimensionality.
5007        assert!(matches!(
5008            db.upsert_document("docs", "a", &[], &json!({})),
5009            Err(Error::Unsupported(_))
5010        ));
5011        assert!(matches!(
5012            db.upsert_document("docs", "a", &[vec![1.0, 0.0]], &json!({})),
5013            Err(Error::Unsupported(_))
5014        ));
5015    }
5016
5017    #[test]
5018    fn snapshot_then_open_reproduces_the_database() {
5019        let src = tempfile::tempdir().unwrap();
5020        let mut db = open(src.path());
5021        db.create_collection("kb", desc()).unwrap();
5022        db.create_collection("kb2", desc()).unwrap();
5023        db.upsert("kb", "a", &[1.0, 0.0, 0.0, 0.0], &json!({ "n": 1 }))
5024            .unwrap();
5025        db.upsert("kb", "b", &[0.0, 1.0, 0.0, 0.0], &json!({ "n": 2 }))
5026            .unwrap();
5027        db.upsert("kb2", "z", &[0.0, 0.0, 1.0, 0.0], &json!({ "n": 3 }))
5028            .unwrap();
5029
5030        let dest = tempfile::tempdir().unwrap();
5031        let snap_dir = dest.path().join("snap");
5032        let info = db.snapshot(&snap_dir).unwrap();
5033        assert!(info.files > 0 && info.bytes > 0);
5034        assert_eq!(info.manifest_version, db.manifest_version());
5035
5036        // A write after the snapshot must not appear in the snapshot.
5037        db.upsert("kb", "late", &[1.0, 1.0, 0.0, 0.0], &json!({ "n": 9 }))
5038            .unwrap();
5039
5040        let restored = open(&snap_dir);
5041        let mut names = restored.collection_names();
5042        names.sort();
5043        assert_eq!(names, vec!["kb".to_owned(), "kb2".to_owned()]);
5044        assert_eq!(restored.len("kb").unwrap(), 2, "no post-snapshot write");
5045        assert_eq!(
5046            restored.get("kb", "a").unwrap().unwrap().payload,
5047            Some(json!({ "n": 1 }))
5048        );
5049        assert_eq!(restored.len("kb2").unwrap(), 1);
5050        assert!(restored.get("kb", "late").unwrap().is_none());
5051    }
5052
5053    #[test]
5054    fn snapshot_refuses_an_existing_destination() {
5055        let src = tempfile::tempdir().unwrap();
5056        let mut db = open(src.path());
5057        let dest = tempfile::tempdir().unwrap(); // already exists
5058        assert!(matches!(
5059            db.snapshot(dest.path()),
5060            Err(Error::Core(quiver_core::CoreError::AlreadyExists(_)))
5061        ));
5062    }
5063
5064    #[test]
5065    fn restore_snapshot_roundtrips_and_guards() {
5066        let src = tempfile::tempdir().unwrap();
5067        let mut db = open(src.path());
5068        db.create_collection("kb", desc()).unwrap();
5069        db.upsert("kb", "a", &[1.0, 0.0, 0.0, 0.0], &json!({ "n": 1 }))
5070            .unwrap();
5071        let work = tempfile::tempdir().unwrap();
5072        let snap_dir = work.path().join("snap");
5073        db.snapshot(&snap_dir).unwrap();
5074
5075        // Restore into a fresh directory, then open it.
5076        let restored_dir = work.path().join("restored");
5077        let info = restore_snapshot(&snap_dir, &restored_dir).unwrap();
5078        assert!(info.files > 0);
5079        let restored = open(&restored_dir);
5080        assert_eq!(restored.len("kb").unwrap(), 1);
5081
5082        // Restoring over an existing directory is refused.
5083        assert!(matches!(
5084            restore_snapshot(&snap_dir, &restored_dir),
5085            Err(Error::Core(quiver_core::CoreError::AlreadyExists(_)))
5086        ));
5087        // A directory that is not a snapshot (no CURRENT) is rejected.
5088        let not_snap = work.path().join("not-a-snapshot");
5089        std::fs::create_dir_all(&not_snap).unwrap();
5090        assert!(matches!(
5091            restore_snapshot(&not_snap, &work.path().join("out")),
5092            Err(Error::Core(quiver_core::CoreError::InvalidArgument(_)))
5093        ));
5094    }
5095}