Skip to main content

quiver_embed/
lib.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2//! The embeddable, in-process Quiver database handle.
3//!
4//! [`Database`] composes the storage engine ([`quiver_core::Store`]) with a
5//! per-collection vector index and payload filtering ([`quiver_query::Filter`])
6//! into one handle. It exposes the same logical operations the server speaks
7//! (`docs/api/wire-protocol.md`), so library mode and server mode exercise
8//! identical engine semantics — the server is a thin transport/policy shell.
9//!
10//! ## Index lifecycle
11//! The store is the source of truth. Each collection chooses its index via the
12//! descriptor's [`IndexSpec`] (default in-memory HNSW); the index is built from
13//! the store on open. HNSW applies new-id inserts incrementally; once an IVF
14//! index is built it applies inserts, in-place updates, and deletes
15//! incrementally with LIRE rebalancing (ADR-0023). The Vamana / disk graph
16//! family is maintained the FreshDiskANN way (ADR-0033): the batch-built graph
17//! is a read-only base, recent inserts land in an in-memory delta graph, and
18//! deletes are tombstoned, so writes are size-independent; when the pending work
19//! grows past a fixed fraction of the base the next access consolidates by
20//! rebuilding from the store. All indexes stay derived (rebuilt from the store
21//! on open), so the crash gate never sees an index write.
22//!
23//! ## Filtered (hybrid) search
24//! A search may carry a [`quiver_query::Filter`] over the payload. The planner
25//! decomposes it into the predicates the collection's secondary indexes can
26//! answer; when those narrow the query to a small candidate set it scans that
27//! set exactly (perfect recall, no filtered-ANN cliff), and otherwise it
28//! over-fetches from the ANN index and post-filters. Both arms re-check the full
29//! filter, so results are exact regardless of which path runs.
30//!
31//! ## Concurrency (ADR-0057 / ADR-0062)
32//! Single-writer. Writes take `&mut self`. Reads come in two flavors: the
33//! `&mut self` convenience methods (`search`, `hybrid_search`,
34//! `search_multi_vector`) rebuild a stale index in place and so give embedded,
35//! single-threaded callers read-your-writes; the `&self` `*_snapshot` methods
36//! read the current immutable snapshot and run **concurrently**, serving the
37//! *prior* snapshot when a write deferred a rebuild (snapshot-isolated, slightly
38//! stale). A server therefore serves concurrent reads behind a reader–writer lock,
39//! and rebuilds **off** the exclusive lock (ADR-0062): it captures the rebuild
40//! inputs under the shared lock ([`Database::snapshot_rebuild_inputs`]), builds the
41//! new index with no lock held ([`RebuildInputs::build`]), and installs it under a
42//! brief write lock ([`Database::commit_rebuild`]) — so a rebuild never stalls
43//! concurrent readers.
44
45use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
46use std::path::Path;
47use std::sync::Arc;
48
49use arc_swap::ArcSwap;
50use quiver_core::{SecPredicate, SecValue, Store};
51use quiver_index::{
52    ColbertConfig, ColbertIndex, DiskVamana, FreshDiskVamana, FreshVamana, Hnsw, HnswConfig, Index,
53    Ivf, IvfConfig, Metric, Neighbor, ProductQuantizer, Vamana, VamanaConfig, max_sim,
54    ordering_distance, report_metric,
55};
56use serde::{Deserialize, Serialize};
57use serde_json::Value;
58use thiserror::Error;
59
60pub use quiver_core::keyring::{KeyRing, SingleCodecKeyRing};
61pub use quiver_core::page::PageCodec;
62pub use quiver_core::{CollectionId, CommitObserver, WalEntry, WalOp};
63pub use quiver_core::{
64    Descriptor, DistanceMetric, Dtype, FieldType, FilterableField, IndexKind, IndexSpec,
65    VectorEncryption,
66};
67pub use quiver_query::Filter;
68pub use quiver_query::{
69    BM25_B, BM25_K1, DEFAULT_RRF_K0, SPARSE_KEY, SparseInvertedIndex, SparseVector, TEXT_KEY,
70    query_term_ids, rrf_fuse, text_to_sparse,
71};
72
73/// Errors returned by the embeddable database.
74#[derive(Debug, Error)]
75#[non_exhaustive]
76pub enum Error {
77    /// An error from the storage engine (includes not-found / already-exists /
78    /// invalid-argument from the catalog and write path).
79    #[error(transparent)]
80    Core(#[from] quiver_core::CoreError),
81    /// An error from the vector index.
82    #[error(transparent)]
83    Index(#[from] quiver_index::IndexError),
84    /// An error from the disk-resident index (build, open, or query).
85    #[error(transparent)]
86    Disk(#[from] quiver_index::DiskError),
87    /// A payload could not be (de)serialized as JSON.
88    #[error("payload json error: {0}")]
89    Json(#[from] serde_json::Error),
90    /// The named collection is not loaded in this database.
91    #[error("collection not found: {0}")]
92    CollectionNotFound(String),
93    /// The requested index / metric combination is not supported.
94    #[error("unsupported configuration: {0}")]
95    Unsupported(&'static str),
96    /// A durable index snapshot could not be restored (ADR-0025); the caller
97    /// falls back to rebuilding from the store, so this does not surface to users.
98    #[error(transparent)]
99    IndexSnapshot(#[from] quiver_index::SnapshotError),
100    /// An index snapshot envelope could not be (de)serialized.
101    #[error("index snapshot envelope: {0}")]
102    Envelope(#[from] postcard::Error),
103}
104
105/// Result alias for database operations.
106pub type Result<T> = std::result::Result<T, Error>;
107
108/// What a [`Database::snapshot`] captured (ADR-0050): the catalog generation and
109/// the number of files / bytes copied.
110#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
111pub struct SnapshotInfo {
112    /// The manifest version the snapshot reflects (its consistent LSN anchor).
113    pub manifest_version: u64,
114    /// Number of files copied into the snapshot directory.
115    pub files: u64,
116    /// Total bytes copied.
117    pub bytes: u64,
118}
119
120/// A single search or fetch result.
121#[derive(Debug, Clone, PartialEq)]
122pub struct Match {
123    /// External id of the point.
124    pub id: String,
125    /// Distance / similarity under the collection metric (0 for a direct fetch).
126    pub score: f32,
127    /// The payload, if requested.
128    pub payload: Option<Value>,
129    /// The vector, if requested.
130    pub vector: Option<Vec<f32>>,
131}
132
133/// A multi-vector (late-interaction / ColBERT) document result: a document id, its
134/// MaxSim relevance, the payload, and — if requested — the document's token
135/// vectors (ADR-0028).
136#[derive(Debug, Clone, PartialEq)]
137pub struct DocumentMatch {
138    /// Document id.
139    pub id: String,
140    /// MaxSim relevance score (0 for a direct fetch).
141    pub score: f32,
142    /// The document payload, if requested / present (stored on the anchor token).
143    pub payload: Option<Value>,
144    /// The document's token vectors, if requested.
145    pub vectors: Option<Vec<Vec<f32>>>,
146}
147
148// A candidate document during multi-vector re-ranking, before it becomes a
149// [`DocumentMatch`]: `(MaxSim score, document id, anchor payload, token vectors
150// when requested)`. Named so the re-rank buffer stays under clippy's
151// type-complexity threshold.
152type ScoredDocument = (f32, String, Option<Value>, Option<Vec<Vec<f32>>>);
153
154/// Parameters for a [`Database::search`].
155#[derive(Debug, Clone)]
156pub struct SearchParams {
157    /// Number of results to return.
158    pub k: usize,
159    /// Optional payload predicate. The planner pre-filters through the
160    /// collection's secondary indexes when the filter is selective enough, and
161    /// otherwise post-filters the ANN candidates; either way the full predicate
162    /// is re-checked, so results are exact.
163    pub filter: Option<Filter>,
164    /// Search beam width (recall/latency knob), clamped up to at least `k`.
165    pub ef_search: usize,
166    /// Include payloads in the results.
167    pub with_payload: bool,
168    /// Include vectors in the results.
169    pub with_vector: bool,
170}
171
172impl Default for SearchParams {
173    fn default() -> Self {
174        Self {
175            k: 10,
176            filter: None,
177            ef_search: 64,
178            with_payload: true,
179            with_vector: false,
180        }
181    }
182}
183
184// How many extra candidates to pull before post-filtering, so a filtered query
185// still has enough survivors to fill `k`.
186const FILTER_OVERFETCH: usize = 8;
187
188// Hybrid search (ADR-0043) pulls `k * RRF_CANDIDATE_FACTOR` (at least
189// `MIN_RRF_CANDIDATES`) candidates from each side before Reciprocal Rank Fusion,
190// so a document ranked outside the top `k` on one side can still surface via the
191// other.
192const RRF_CANDIDATE_FACTOR: usize = 10;
193const MIN_RRF_CANDIDATES: usize = 100;
194
195// Selectivity threshold for the hybrid planner. When a payload filter decomposes
196// into secondary-index predicates that narrow a query to at most this many live
197// candidate rows, those rows are scanned exactly (brute force) instead of going
198// through the ANN index. Below this size an exact scan is both cheaper and
199// higher-recall than filtered ANN, which can return too few results when the
200// filter is very selective (the "filtered-search recall cliff"). Qdrant calls
201// the equivalent knob its full-scan threshold.
202const FULL_SCAN_THRESHOLD: usize = 10_000;
203
204// Soft-deleted fraction at which a built HNSW is rebuilt from the store to
205// reclaim its tombstoned graph nodes (ADR-0026). Below it, a delete is an O(1)
206// soft-delete; at it, the next access rebuilds.
207const HNSW_REBUILD_DELETED_FRACTION: f64 = 0.2;
208
209/// Pending graph work — delta inserts plus tombstones — at which a FreshDiskANN
210/// graph collection consolidates: the next access rebuilds the base from the
211/// store, reclaiming tombstones and folding in the delta (ADR-0033). Below it,
212/// inserts go to the in-memory delta and deletes are `O(1)` tombstones. Mirrors
213/// [`HNSW_REBUILD_DELETED_FRACTION`].
214const GRAPH_REBUILD_PENDING_FRACTION: f64 = 0.2;
215
216// The vector index backing one collection. HNSW and (once built) IVF are
217// maintained incrementally; the Vamana and disk graphs are batch-built from the
218// store (the `Option` is `None` until first build) and then maintained
219// incrementally the FreshDiskANN way — a read-only base graph plus an in-memory
220// delta and deletion set, consolidated by a rebuild past a churn threshold
221// (ADR-0033).
222enum CollectionIndex {
223    // A fetch-only collection with no server-side ANN index: a client-side-encrypted
224    // collection (ADR-0032) stores opaque ciphertext the server never ranks, so the
225    // client fetches points and ranks locally.
226    None,
227    Hnsw(Hnsw),
228    Vamana(Option<FreshVamana>),
229    Ivf(Option<Ivf>),
230    // The disk-resident DiskANN index: PQ codes in RAM, graph + full vectors on
231    // (encrypted) SSD, exact re-rank (ADR-0019), with a FreshDiskANN in-memory
232    // delta layered on top so the on-disk artifact stays immutable.
233    Disk(Option<FreshDiskVamana>),
234    // The ColBERTv2/PLAID compressed token-pool index for a multi-vector
235    // collection (ADR-0034): centroid + residual-PQ codes with centroid-pruned
236    // candidate generation.
237    Colbert(Option<ColbertIndex>),
238}
239
240impl CollectionIndex {
241    // Search, mapping the generic `ef` knob onto each index's search width.
242    fn search(&self, query: &[f32], k: usize, ef: usize) -> Result<Vec<Neighbor>> {
243        Ok(match self {
244            CollectionIndex::Hnsw(h) => h.search(query, k, ef)?,
245            CollectionIndex::Vamana(Some(g)) => g.search(query, k, ef)?,
246            CollectionIndex::Ivf(Some(i)) => i.search(query, k, ef)?,
247            CollectionIndex::Disk(Some(d)) => d.search(query, k, ef)?,
248            CollectionIndex::Colbert(Some(c)) => c.search(query, k, ef)?,
249            CollectionIndex::None
250            | CollectionIndex::Vamana(None)
251            | CollectionIndex::Ivf(None)
252            | CollectionIndex::Disk(None)
253            | CollectionIndex::Colbert(None) => Vec::new(),
254        })
255    }
256}
257
258// === Lock-free MVCC serving snapshot (ADR-0064, increment 1) ===
259//
260// The single writer mutates indexes in place under the write lock, so a reader
261// cannot share the live index by `Arc` and read it lock-free — that is a data
262// race. Instead, in MVCC mode the writer publishes an immutable
263// [`CollectionSnapshot`] (the base index as of the last rebuild + an [`Overlay`]
264// of writes since) into an [`ArcSwap`]; readers `load()` it without a lock. The
265// overlay is index-kind-agnostic and bounded by the rebuild cadence, so it never
266// rewrites the index and a republish costs O(overlay), not O(index). MVCC changes
267// **visibility, not durability**: the WAL-fsync acknowledgement and the `kill -9`
268// crash gate are untouched (the overlay is derived from the same WAL the store
269// already replays).
270
271/// Per-collection lock-free serving snapshot pointer: the single writer
272/// `store`s a new [`CollectionSnapshot`]; readers `load` one without a lock.
273/// (`ArcSwap<T>` stores an `Arc<T>` internally, so this is one `Arc` per load.)
274pub type SnapshotCell = Arc<ArcSwap<CollectionSnapshot>>;
275
276/// Writes accumulated since a [`CollectionSnapshot`]'s base index was published.
277/// A lock-free read brute-scans these recent vectors and merges them with the
278/// base-index search, dropping tombstoned ids. Cloned (O(overlay)) on each write
279/// and reset to empty when a rebuild folds it into a fresh base.
280#[derive(Default, Clone)]
281struct Overlay {
282    // (vector, external id) for points upserted since the base, in id order: the
283    // j-th entry's internal id is `base_len + j`.
284    upserts: Vec<(Arc<[f32]>, String)>,
285    // Internal ids (base or overlay) deleted or superseded since the base; their
286    // hits are dropped from a search.
287    tombstones: HashSet<u64>,
288}
289
290/// An immutable, lock-free-readable view of a single-vector collection (ADR-0064):
291/// the base index as of the last rebuild, the base id map, and the overlay of
292/// writes since. Obtained via [`Database::collection_snapshot`] and read with
293/// [`CollectionSnapshot::search`]; a read is snapshot-isolated — it sees one
294/// consistent `(base, overlay)` pair, and a write that lands mid-read is simply
295/// the next snapshot.
296pub struct CollectionSnapshot {
297    base: Arc<CollectionIndex>,
298    base_int_to_ext: Arc<Vec<String>>,
299    base_len: u64,
300    overlay: Arc<Overlay>,
301    metric: Metric,
302}
303
304impl CollectionSnapshot {
305    // An empty snapshot for a freshly created/opened collection (no base yet); the
306    // writer publishes a real one at the first rebuild. Reading it yields no hits.
307    fn empty(metric: Metric) -> Self {
308        Self {
309            base: Arc::new(CollectionIndex::None),
310            base_int_to_ext: Arc::new(Vec::new()),
311            base_len: 0,
312            overlay: Arc::new(Overlay::default()),
313            metric,
314        }
315    }
316
317    // Map an internal id to its external id: base ids index the base map, overlay
318    // ids (`>= base_len`) index the overlay's upserts.
319    fn ext_id(&self, internal: u64) -> Option<&str> {
320        if internal < self.base_len {
321            self.base_int_to_ext
322                .get(internal as usize)
323                .map(String::as_str)
324        } else {
325            self.overlay
326                .upserts
327                .get((internal - self.base_len) as usize)
328                .map(|(_, e)| e.as_str())
329        }
330    }
331
332    /// Lock-free nearest-neighbor search over the base index merged with the
333    /// overlay (ADR-0064 increment 1) — **pure vector reads**: no payload filter
334    /// and no payload/vector fetch (those need the store and land in increment 2).
335    /// Returns the `k` nearest live points, closest first, scored in the true
336    /// collection metric — identical ordering and scores to the locked
337    /// [`Database::search_snapshot`] path for the same case.
338    ///
339    /// # Errors
340    /// Propagates an index search error.
341    pub fn search(&self, query: &[f32], k: usize, ef_search: usize) -> Result<Vec<Match>> {
342        if k == 0 {
343            return Ok(Vec::new());
344        }
345        // Collect candidates in "smaller is closer" ordering space (uniform across
346        // metrics — `score::ordering_distance`), dropping tombstoned ids.
347        let mut cands: Vec<(f32, u64)> = Vec::new();
348        for n in self.base.search(query, k, ef_search)? {
349            if !self.overlay.tombstones.contains(&n.id) {
350                // `Neighbor.distance` is the reported metric; `report_metric` is its
351                // own inverse (identity for L2, negation for similarities), so it
352                // maps the reported value back to the ordering key.
353                cands.push((report_metric(self.metric, n.distance), n.id));
354            }
355        }
356        for (j, (vector, _)) in self.overlay.upserts.iter().enumerate() {
357            let internal = self.base_len + j as u64;
358            if !self.overlay.tombstones.contains(&internal) {
359                cands.push((ordering_distance(self.metric, query, vector), internal));
360            }
361        }
362        cands.sort_by(|a, b| a.0.total_cmp(&b.0));
363        cands.truncate(k);
364        let mut out = Vec::with_capacity(cands.len());
365        for (ordering, internal) in cands {
366            if let Some(ext) = self.ext_id(internal) {
367                out.push(Match {
368                    id: ext.to_owned(),
369                    score: report_metric(self.metric, ordering),
370                    payload: None,
371                    vector: None,
372                });
373            }
374        }
375        Ok(out)
376    }
377}
378
379// A fresh, empty snapshot cell for a collection's metric — the initial value of
380// `CollectionHandle::snapshot` (the writer publishes a real base at the first
381// rebuild when MVCC is on; left untouched, at one tiny allocation, when off).
382fn empty_snapshot(descriptor: &Descriptor) -> SnapshotCell {
383    let metric = to_index_metric(descriptor.metric);
384    Arc::new(ArcSwap::from_pointee(CollectionSnapshot::empty(metric)))
385}
386
387// Whether a collection *kind* can be served by the lock-free MVCC snapshot path
388// (ADR-0064 increment 1), independent of the flag: single-vector, server-searchable,
389// and an **in-memory** index. Disk-resident indexes are excluded for now — their
390// `mmap` assumes an immutable file, which a superseded snapshot still referencing
391// the old file would violate when a rebuild seals a new one (a later increment
392// versions the file).
393fn mvcc_eligible(descriptor: &Descriptor) -> bool {
394    !descriptor.multivector
395        && descriptor.vector_encryption != VectorEncryption::ClientSide
396        && descriptor.index.kind != IndexKind::DiskVamana
397}
398
399// Whether a collection is *currently* served via the snapshot: eligible and the
400// flag is on.
401fn mvcc_served(handle: &CollectionHandle) -> bool {
402    handle.mvcc && mvcc_eligible(&handle.descriptor)
403}
404
405// Republish a collection's snapshot reusing the immutable base + id map, swapping
406// in a freshly-extended overlay (the per-write O(overlay) cost).
407fn publish_overlay(handle: &CollectionHandle, prior: &CollectionSnapshot, overlay: Arc<Overlay>) {
408    handle.snapshot.store(Arc::new(CollectionSnapshot {
409        base: prior.base.clone(),
410        base_int_to_ext: prior.base_int_to_ext.clone(),
411        base_len: prior.base_len,
412        overlay,
413        metric: prior.metric,
414    }));
415}
416
417// Move a freshly rebuilt index out of `handle.index` into a new published
418// snapshot with an empty overlay (the base now absorbs all prior writes). Leaves
419// `handle.index` empty — in MVCC mode the live base lives in the snapshot.
420fn publish_base(handle: &mut CollectionHandle) {
421    let base = std::mem::replace(&mut handle.index, empty_index(&handle.descriptor));
422    let metric = to_index_metric(handle.descriptor.metric);
423    handle.snapshot.store(Arc::new(CollectionSnapshot {
424        base: Arc::new(base),
425        base_int_to_ext: Arc::new(handle.int_to_ext.clone()),
426        base_len: handle.int_to_ext.len() as u64,
427        overlay: Arc::new(Overlay::default()),
428        metric,
429    }));
430}
431
432// Overlay size at which an MVCC write defers a consolidating rebuild: ~20% churn
433// over the base (the same threshold that already triggers consolidation), with a
434// floor so a tiny base does not rebuild on every write.
435fn overlay_rebuild_threshold(base_len: u64) -> u64 {
436    (base_len / 5).max(1024)
437}
438
439// MVCC-mode single-vector upsert (ADR-0064): append to the published overlay
440// instead of mutating the immutable base, so lock-free readers stay race-free.
441// `ponytail`: republishes per write — O(overlay) each; coalesce per batch if
442// batched upserts under MVCC ever dominate.
443fn overlay_upsert(handle: &mut CollectionHandle, ext_id: &str, vector: &[f32]) {
444    bump_write_gen(handle);
445    let cur = handle.snapshot.load_full();
446    let mut overlay = cur.overlay.as_ref().clone();
447    // An update supersedes the prior copy (in the base or the overlay): tombstone it.
448    if let Some(&old) = handle.ext_to_int.get(ext_id) {
449        overlay.tombstones.insert(old);
450    }
451    let internal = cur.base_len + overlay.upserts.len() as u64;
452    overlay.upserts.push((Arc::from(vector), ext_id.to_owned()));
453    handle.ext_to_int.insert(ext_id.to_owned(), internal);
454    handle.int_to_ext.push(ext_id.to_owned());
455    let crowded = overlay.upserts.len() as u64 >= overlay_rebuild_threshold(cur.base_len);
456    publish_overlay(handle, &cur, Arc::new(overlay));
457    // Defer a consolidating rebuild once the overlay is large (the server runs it
458    // off-lock; the embedded `&mut` search rebuilds synchronously).
459    if crowded {
460        handle.stale = true;
461    }
462}
463
464// MVCC-mode single-vector delete (ADR-0064): tombstone the id in the published
465// overlay; the base stays immutable.
466fn overlay_delete(handle: &mut CollectionHandle, ext_id: &str) {
467    bump_write_gen(handle);
468    let Some(&internal) = handle.ext_to_int.get(ext_id) else {
469        return;
470    };
471    let cur = handle.snapshot.load_full();
472    let mut overlay = cur.overlay.as_ref().clone();
473    overlay.tombstones.insert(internal);
474    publish_overlay(handle, &cur, Arc::new(overlay));
475}
476
477/// On-disk envelope (ADR-0025) for a durable IVF snapshot: the `Ivf` bytes plus
478/// the internal->external id mapping they are addressed by, postcard-encoded and
479/// handed to the store as one opaque blob. On open the envelope is decoded, the
480/// `Ivf` restored, and the post-checkpoint WAL tail replayed. A decode/version
481/// error means "rebuild from the store" — the snapshot is only ever a fast path.
482#[derive(Serialize, Deserialize)]
483struct IndexEnvelope {
484    version: u16,
485    int_to_ext: Vec<String>,
486    ivf: Vec<u8>,
487}
488
489// Envelope format version, independent of the product SemVer (and of the inner
490// `Ivf` snapshot version); a mismatch falls back to a rebuild.
491const INDEX_ENVELOPE_VERSION: u16 = 1;
492
493/// On-disk envelope (ADR-0063) for a durable DiskVamana snapshot. Unlike the IVF
494/// envelope, the bulk (graph + full vectors) stays in the immutable `mmap`-ed
495/// base file (`vamana.qvx`); this blob carries only what ties that base to the
496/// live state — the base point count (validated against the opened file), the
497/// FreshDiskANN tombstones, and the id map. The delta vectors are *not* stored:
498/// the delta ids are implied as `[base_row_count, int_to_ext.len())` and their
499/// vectors re-fetched from the store on open, so the blob stays O(delta ids), not
500/// O(N) vectors. A decode/version/validation error means "rebuild from the store".
501#[derive(Serialize, Deserialize)]
502struct DiskEnvelope {
503    version: u16,
504    int_to_ext: Vec<String>,
505    base_row_count: u64,
506    deleted_ids: Vec<u64>,
507}
508
509struct CollectionHandle {
510    id: CollectionId,
511    descriptor: Descriptor,
512    index: CollectionIndex,
513    int_to_ext: Vec<String>,
514    ext_to_int: HashMap<String, u64>,
515    stale: bool,
516    // Monotonic per-collection write counter, bumped every time a write defers a
517    // rebuild (`mark_stale`). An off-lock rebuild (ADR-0062) captures it before
518    // building and re-checks it at commit: if it advanced, a write landed during
519    // the build, so the freshly built index is already behind — the commit installs
520    // it (still newer than the prior snapshot) but leaves the handle stale so the
521    // next rebuild catches up. Wrapping is fine: equality, not ordering, is what the
522    // commit checks.
523    write_gen: u64,
524    // For a multi-vector (ColBERT) collection: each document id mapped to its
525    // token count, so a re-rank can gather all of a document's token rows
526    // (`<doc-id><US><ordinal>`) and `document_count` is O(1). `None` for a
527    // single-vector collection. Maintained eagerly on document writes and rebuilt
528    // authoritatively from the store on open / rebuild; never persisted (ADR-0028).
529    docs: Option<BTreeMap<String, u32>>,
530    // Derived inverted index over the collection's `__quiver_sparse__` payloads,
531    // for the sparse half of hybrid search (ADR-0045). `Some` for a single-vector,
532    // server-searchable collection; `None` for multi-vector and client-side-encrypted
533    // collections (which never run hybrid search) — and as a backstop, when `None`
534    // the sparse ranking falls back to a full store scan. Built on rebuild from the
535    // store and maintained incrementally on upsert/delete; never persisted.
536    sparse: Option<SparseInvertedIndex>,
537    // Whether this collection is served via the lock-free MVCC snapshot (ADR-0064);
538    // mirrors the database-wide flag, set at creation and by `set_mvcc_reads`. When
539    // off, the `snapshot` cell is never republished and the in-place RwLock read
540    // path is used — zero overhead.
541    mvcc: bool,
542    // The lock-free serving snapshot cell (ADR-0064). Initialized empty; the writer
543    // publishes a real base at each rebuild and an extended overlay per write when
544    // `mvcc` is on. Never persisted (rebuilt on open).
545    snapshot: SnapshotCell,
546}
547
548// Whether a collection should carry a derived sparse inverted index: only
549// single-vector, server-searchable collections run hybrid search (ADR-0045).
550fn uses_sparse_index(descriptor: &Descriptor) -> bool {
551    !descriptor.multivector && descriptor.vector_encryption != VectorEncryption::ClientSide
552}
553
554// Mark a collection's index stale: a write the index could not absorb in place
555// defers a full rebuild. Also bumps the write generation (every staleness-marking
556// write is a write).
557fn mark_stale(handle: &mut CollectionHandle) {
558    handle.stale = true;
559    bump_write_gen(handle);
560}
561
562// Bump a collection's monotonic write generation. Called on **every** write that
563// touches the store — including writes that land while the index is already stale,
564// which the incremental maintenance skips. The off-lock rebuild (ADR-0062) captures
565// this before scanning and re-checks it at commit: if it advanced, a write landed
566// during the build, so the freshly built index is already behind and the commit
567// leaves the collection stale for the next rebuild. Wrapping is fine — the commit
568// checks equality, not ordering — and over-counting is harmless (at worst one extra
569// rebuild); only *missing* a bump could lose a write.
570fn bump_write_gen(handle: &mut CollectionHandle) {
571    handle.write_gen = handle.write_gen.wrapping_add(1);
572}
573
574/// An in-process Quiver database over one data directory.
575pub struct Database {
576    store: Store,
577    collections: HashMap<String, CollectionHandle>,
578    // Lock-free MVCC reads (ADR-0064), default off. Defaulted from
579    // `QUIVER_MVCC_READS` at open and overridable via `set_mvcc_reads`. When off,
580    // the proven in-place RwLock read path is used with zero overhead.
581    mvcc: bool,
582}
583
584// Whether `QUIVER_MVCC_READS` requests lock-free MVCC reads (ADR-0064).
585fn mvcc_from_env() -> bool {
586    std::env::var("QUIVER_MVCC_READS")
587        .map(|v| matches!(v.as_str(), "1" | "true" | "TRUE" | "on" | "yes"))
588        .unwrap_or(false)
589}
590
591impl Database {
592    /// Open (creating if absent) the database at `dir` with encryption-at-rest
593    /// disabled, rebuilding each collection's index from the store.
594    pub fn open(dir: &Path) -> Result<Self> {
595        Self::from_store(Store::open(dir)?)
596    }
597
598    /// Open the database with a specific page codec — used to enable
599    /// encryption-at-rest by passing `quiver-crypto`'s AEAD codec. Mirrors
600    /// [`quiver_core::Store::open_with_codec`]; the codec seals both paged files
601    /// and the WAL, so no plaintext user data reaches the disk.
602    pub fn open_with_codec(dir: &Path, codec: Box<dyn PageCodec>) -> Result<Self> {
603        Self::from_store(Store::open_with_codec(dir, codec)?)
604    }
605
606    /// Open the database with a [`KeyRing`], the seam that lets `quiver-crypto`'s
607    /// envelope key-ring seal each collection under its own data-encryption key
608    /// (enabling crypto-shredding). Mirrors
609    /// [`quiver_core::Store::open_with_keyring`].
610    pub fn open_with_keyring(dir: &Path, keyring: Box<dyn KeyRing>) -> Result<Self> {
611        Self::from_store(Store::open_with_keyring(dir, keyring)?)
612    }
613
614    // Build the in-memory handles (and their HNSW indexes) over an opened store.
615    fn from_store(store: Store) -> Result<Self> {
616        let mvcc = mvcc_from_env();
617        let mut collections = HashMap::new();
618        for name in store.collection_names() {
619            let Some(id) = store.collection_id(&name) else {
620                continue;
621            };
622            let Some(descriptor) = store.descriptor(id).cloned() else {
623                continue;
624            };
625            let snapshot = empty_snapshot(&descriptor);
626            let mut handle = CollectionHandle {
627                id,
628                index: empty_index(&descriptor),
629                descriptor,
630                int_to_ext: Vec::new(),
631                ext_to_int: HashMap::new(),
632                stale: true,
633                write_gen: 0,
634                docs: None,
635                // Populated by `load_index` / `rebuild_index` from the store.
636                sparse: None,
637                mvcc,
638                snapshot,
639            };
640            load_index(&store, &mut handle)?;
641            // MVCC mode: the lock-free base lives in the snapshot, which `load_index`
642            // does not populate. Force a rebuild on first read so the base is
643            // published (a restored durable IVF would otherwise leave the snapshot
644            // empty); MVCC trades the durable-index fast reopen for the snapshot.
645            if mvcc_served(&handle) {
646                handle.stale = true;
647            }
648            collections.insert(name, handle);
649        }
650        Ok(Self {
651            store,
652            collections,
653            mvcc,
654        })
655    }
656
657    /// Create a collection. Errors if the name already exists, or if the index
658    /// specification is unsupported for the metric.
659    pub fn create_collection(&mut self, name: &str, descriptor: Descriptor) -> Result<()> {
660        validate_index(&descriptor)?;
661        let id = self.store.create_collection(name, descriptor.clone())?;
662        let index = empty_index(&descriptor);
663        let docs = descriptor.multivector.then(BTreeMap::new);
664        // A fresh single-vector collection starts with an empty inverted index
665        // maintained incrementally from the first upsert (an empty index allocates
666        // nothing until a sparse vector arrives).
667        let sparse = uses_sparse_index(&descriptor).then(SparseInvertedIndex::new);
668        let snapshot = empty_snapshot(&descriptor);
669        let mvcc = self.mvcc;
670        self.collections.insert(
671            name.to_owned(),
672            CollectionHandle {
673                id,
674                descriptor,
675                index,
676                int_to_ext: Vec::new(),
677                ext_to_int: HashMap::new(),
678                stale: false,
679                write_gen: 0,
680                docs,
681                sparse,
682                mvcc,
683                snapshot,
684            },
685        );
686        Ok(())
687    }
688
689    /// Drop a collection and its data. Returns whether it existed.
690    pub fn drop_collection(&mut self, name: &str) -> Result<bool> {
691        let existed = self.store.drop_collection(name)?;
692        self.collections.remove(name);
693        Ok(existed)
694    }
695
696    /// Crypto-shred a collection: drop it and destroy its data-encryption key, so
697    /// its sealed data is unrecoverable even with the master key, then reclaim
698    /// its files. Mirrors [`quiver_core::Store::shred_collection`]; with an
699    /// envelope key-ring this is irreversible erasure, with a single-codec
700    /// key-ring it is `drop` plus a checkpoint. Returns whether it existed.
701    pub fn shred_collection(&mut self, name: &str) -> Result<bool> {
702        let existed = self.store.shred_collection(name)?;
703        self.collections.remove(name);
704        Ok(existed)
705    }
706
707    /// Install a replication commit observer, invoked with each committed
708    /// [`WalEntry`] in commit order (ADR-0030). The server uses this to drive a
709    /// leader's replication stream.
710    pub fn set_commit_observer(&mut self, observer: CommitObserver) {
711        self.store.set_commit_observer(observer);
712    }
713
714    /// The operations that recreate the current logical state, for a replication
715    /// follower to bootstrap from (ADR-0030).
716    ///
717    /// # Errors
718    /// Propagates a store read error.
719    pub fn replication_snapshot(&self) -> Result<Vec<WalOp>> {
720        Ok(self.store.replication_snapshot()?)
721    }
722
723    /// Apply a replicated operation from a leader (ADR-0030): persist and apply it
724    /// to the store (preserving the leader's collection id), then reconcile the
725    /// in-memory index handles — register a new collection, drop a removed one, or
726    /// mark a touched collection's index stale so the next read rebuilds from the
727    /// replicated state.
728    ///
729    /// # Errors
730    /// Propagates a store apply error.
731    pub fn apply_replicated(&mut self, op: WalOp) -> Result<()> {
732        let target = match &op {
733            WalOp::CreateCollection { collection_id, .. }
734            | WalOp::DropCollection { collection_id }
735            | WalOp::Upsert { collection_id, .. }
736            | WalOp::Delete { collection_id, .. } => Some(*collection_id),
737            WalOp::Checkpoint { .. } => None,
738        };
739        let create_name = match &op {
740            WalOp::CreateCollection { name, .. } => Some(name.clone()),
741            _ => None,
742        };
743        let is_drop = matches!(op, WalOp::DropCollection { .. });
744        self.store.apply_replicated(op)?;
745
746        if let Some(name) = create_name {
747            // Register a fresh handle for the newly replicated collection.
748            if let Some(id) = target
749                && let Some(descriptor) = self.store.descriptor(id).cloned()
750            {
751                let docs = descriptor.multivector.then(BTreeMap::new);
752                let index = empty_index(&descriptor);
753                let snapshot = empty_snapshot(&descriptor);
754                let mvcc = self.mvcc;
755                // Replicated writes mark the handle stale, so the next read rebuilds
756                // the inverted index from the replicated store.
757                self.collections.insert(
758                    name,
759                    CollectionHandle {
760                        id,
761                        descriptor,
762                        index,
763                        int_to_ext: Vec::new(),
764                        ext_to_int: HashMap::new(),
765                        stale: false,
766                        write_gen: 0,
767                        docs,
768                        sparse: None,
769                        mvcc,
770                        snapshot,
771                    },
772                );
773            }
774        } else if is_drop {
775            if let Some(id) = target {
776                self.collections.retain(|_, h| h.id != id);
777            }
778        } else if let Some(id) = target
779            && let Some(handle) = self.collections.values_mut().find(|h| h.id == id)
780        {
781            mark_stale(handle);
782        }
783        Ok(())
784    }
785
786    /// Names of all collections, sorted.
787    #[must_use]
788    pub fn collection_names(&self) -> Vec<String> {
789        self.store.collection_names()
790    }
791
792    /// The descriptor of a collection, if it exists.
793    #[must_use]
794    pub fn descriptor(&self, name: &str) -> Option<&Descriptor> {
795        self.collections.get(name).map(|h| &h.descriptor)
796    }
797
798    /// Number of live points in a collection.
799    pub fn len(&self, name: &str) -> Result<usize> {
800        let handle = self.handle(name)?;
801        Ok(self.store.len(handle.id)?)
802    }
803
804    /// Whether a collection has no points.
805    pub fn is_empty(&self, name: &str) -> Result<bool> {
806        Ok(self.len(name)? == 0)
807    }
808
809    /// Insert or replace a point with a JSON payload.
810    pub fn upsert(
811        &mut self,
812        collection: &str,
813        id: &str,
814        vector: &[f32],
815        payload: &Value,
816    ) -> Result<()> {
817        let handle = self
818            .collections
819            .get_mut(collection)
820            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
821        require_single_vector(handle)?;
822        let payload_bytes = serde_json::to_vec(payload)?;
823        self.store.upsert(handle.id, id, vector, &payload_bytes)?;
824        // Client-side-encrypted collections have no server-side index to maintain
825        // (ADR-0032): the stored vector is an opaque placeholder the server never
826        // ranks. The point is durable in the store; nothing else to do.
827        if handle.descriptor.vector_encryption == VectorEncryption::ClientSide {
828            return Ok(());
829        }
830        // Maintain the in-memory index in place where the kind allows it, else
831        // defer to a lazy rebuild on the next search (ADR-0023/0026/0033).
832        index_upsert_point(handle, id, vector)?;
833        // Keep the derived sparse inverted index in step (ADR-0045).
834        sparse_index_upsert_point(handle, id, payload);
835        Ok(())
836    }
837
838    /// Build the [`WalOp`] for creating a collection — the same validated op
839    /// [`Database::create_collection`] would log — **without** applying it. The
840    /// per-shard Raft write path (ADR-0067) proposes this op through consensus and
841    /// applies it via [`Database::apply_replicated`] only after a quorum commits,
842    /// so a leader failover never loses an acknowledged write. The new id is
843    /// assigned here, on the leader (the caller serializes concurrent creates).
844    ///
845    /// # Errors
846    /// Propagates index validation and store preparation errors.
847    pub fn prepare_create_collection(&self, name: &str, descriptor: &Descriptor) -> Result<WalOp> {
848        validate_index(descriptor)?;
849        Ok(self.store.prepare_create_collection(name, descriptor)?)
850    }
851
852    /// Build the validated [`WalOp::Upsert`] [`Database::upsert`] would log, without
853    /// applying it (the Raft write path; see [`Database::prepare_create_collection`]).
854    ///
855    /// # Errors
856    /// [`Error::CollectionNotFound`] if the collection is not loaded, or a store
857    /// preparation error (e.g. a dimensionality mismatch).
858    pub fn prepare_upsert(
859        &self,
860        collection: &str,
861        id: &str,
862        vector: &[f32],
863        payload: &Value,
864    ) -> Result<WalOp> {
865        let handle = self
866            .collections
867            .get(collection)
868            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
869        require_single_vector(handle)?;
870        let payload_bytes = serde_json::to_vec(payload)?;
871        Ok(self
872            .store
873            .prepare_upsert(handle.id, id, vector, &payload_bytes)?)
874    }
875
876    /// Build the [`WalOp::Delete`] [`Database::delete`] would log, or `None` if the
877    /// point does not exist, without applying it (the Raft write path).
878    ///
879    /// # Errors
880    /// [`Error::CollectionNotFound`] if the collection is not loaded.
881    pub fn prepare_delete(&self, collection: &str, id: &str) -> Result<Option<WalOp>> {
882        let handle = self
883            .collections
884            .get(collection)
885            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
886        Ok(self.store.prepare_delete(handle.id, id)?)
887    }
888
889    /// Upsert a batch of points with a single WAL `fdatasync` (ADR-0038).
890    ///
891    /// `points` is `(id, vector, payload)` tuples.  The batch is committed
892    /// atomically — all points or none (from the client's perspective).  This
893    /// is the preferred path for the REST `POST /v1/collections/{c}/points`
894    /// handler which already delivers a batch per HTTP request.
895    pub fn upsert_batch(
896        &mut self,
897        collection: &str,
898        points: &[(&str, &[f32], &serde_json::Value)],
899    ) -> Result<u64> {
900        let handle = self
901            .collections
902            .get(collection)
903            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
904        require_single_vector(handle)?;
905        let coll_id = handle.id;
906        let is_client_side = handle.descriptor.vector_encryption == VectorEncryption::ClientSide;
907
908        let payload_bytes: Vec<Vec<u8>> = points
909            .iter()
910            .map(|(_, _, p)| serde_json::to_vec(p).map_err(Error::Json))
911            .collect::<Result<_>>()?;
912
913        let records: Vec<(&str, &[f32], &[u8])> = points
914            .iter()
915            .zip(payload_bytes.iter())
916            .map(|((id, vec, _), p)| (*id, *vec, p.as_slice()))
917            .collect();
918
919        self.store.upsert_batch(coll_id, &records)?;
920
921        if is_client_side {
922            return Ok(records.len() as u64);
923        }
924
925        for (id, vector, payload) in points {
926            let handle = self
927                .collections
928                .get_mut(collection)
929                .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
930            index_upsert_point(handle, id, vector)?;
931            sparse_index_upsert_point(handle, id, payload);
932        }
933        Ok(records.len() as u64)
934    }
935
936    /// Upsert a large batch for a bulk load, deferring all index work to a single
937    /// rebuild pass (ADR-0045).
938    ///
939    /// Like [`upsert_batch`](Self::upsert_batch) the points are committed with one
940    /// WAL `fdatasync`, but instead of folding each point into the in-memory index
941    /// one at a time, the collection's index is marked **stale** so the next search
942    /// rebuilds it in a single pass over the whole collection — far cheaper for a
943    /// fresh load (one k-means for IVF, one graph build for Vamana, one inverted-index
944    /// scan) than N incremental inserts. Prefer `upsert_batch` for steady-state
945    /// writes where query-after-write latency matters.
946    pub fn upsert_bulk(
947        &mut self,
948        collection: &str,
949        points: &[(&str, &[f32], &serde_json::Value)],
950    ) -> Result<u64> {
951        let handle = self
952            .collections
953            .get_mut(collection)
954            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
955        require_single_vector(handle)?;
956        let coll_id = handle.id;
957
958        let payload_bytes: Vec<Vec<u8>> = points
959            .iter()
960            .map(|(_, _, p)| serde_json::to_vec(p).map_err(Error::Json))
961            .collect::<Result<_>>()?;
962        let records: Vec<(&str, &[f32], &[u8])> = points
963            .iter()
964            .zip(payload_bytes.iter())
965            .map(|((id, vec, _), p)| (*id, *vec, p.as_slice()))
966            .collect();
967
968        self.store.upsert_batch(coll_id, &records)?;
969
970        // Defer the dense and sparse index maintenance to a single rebuild on the
971        // next read (a client-side-encrypted collection has no index to rebuild, but
972        // marking it stale is harmless — its rebuild produces the same no-op index).
973        let handle = self
974            .collections
975            .get_mut(collection)
976            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
977        mark_stale(handle);
978        Ok(records.len() as u64)
979    }
980
981    /// Delete a point by id. Returns whether it existed.
982    pub fn delete(&mut self, collection: &str, id: &str) -> Result<bool> {
983        let handle = self
984            .collections
985            .get_mut(collection)
986            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
987        require_single_vector(handle)?;
988        let existed = self.store.delete(handle.id, id)?;
989        if !existed {
990            return Ok(false);
991        }
992        // No server-side index to update for client-side-encrypted collections
993        // (ADR-0032); the store delete is authoritative.
994        if handle.descriptor.vector_encryption == VectorEncryption::ClientSide {
995            return Ok(true);
996        }
997        // A built IVF removes in place (ADR-0023), a built HNSW soft-deletes
998        // (ADR-0026), and a built FreshDiskANN graph tombstones in its deletion set
999        // (ADR-0033); other kinds defer to a rebuild. The id->internal mapping is
1000        // kept so a later re-insert allocates afresh — a removed or soft-deleted
1001        // internal is simply never returned by the index.
1002        index_delete_point(handle, id);
1003        // Drop the point from the derived sparse inverted index too (ADR-0045).
1004        sparse_index_delete_point(handle, id);
1005        Ok(true)
1006    }
1007
1008    /// Fetch a single point by id, with its payload and vector.
1009    pub fn get(&self, collection: &str, id: &str) -> Result<Option<Match>> {
1010        let handle = self.handle(collection)?;
1011        require_single_vector(handle)?;
1012        match self.store.get(handle.id, id)? {
1013            Some(record) => Ok(Some(Match {
1014                id: id.to_owned(),
1015                score: 0.0,
1016                payload: Some(serde_json::from_slice(&record.payload)?),
1017                vector: Some(record.vector),
1018            })),
1019            None => Ok(None),
1020        }
1021    }
1022
1023    /// Fetch points without ranking — an optional cleartext payload `filter`
1024    /// narrows the set and `limit` bounds it. This is the retrieval path for a
1025    /// client-side-encrypted collection (ADR-0032): the server returns the entitled
1026    /// set (each point's payload carries the sealed vector blob under the reserved
1027    /// `__quiver_vec__` key) and the client decrypts and ranks locally. It also
1028    /// serves as a general "list points" primitive for any single-vector collection.
1029    ///
1030    /// Results come in the store's scan order, not by relevance; the filter is
1031    /// re-checked exactly against each candidate (a selective filter could use the
1032    /// secondary index in future — today it scans).
1033    ///
1034    /// # Errors
1035    /// Errors if the collection does not exist or is multi-vector.
1036    pub fn fetch(
1037        &self,
1038        collection: &str,
1039        filter: Option<&Filter>,
1040        offset: usize,
1041        limit: usize,
1042        with_payload: bool,
1043        with_vector: bool,
1044    ) -> Result<Vec<Match>> {
1045        let handle = self.handle(collection)?;
1046        require_single_vector(handle)?;
1047        let mut out = Vec::new();
1048        // `offset` skips matching points already returned by a prior page, so a caller
1049        // can scroll the whole collection in `limit`-sized pages (the scan order is
1050        // stable for a quiescent collection). Applied after the filter so offset
1051        // counts matches, not raw rows.
1052        let mut skipped = 0usize;
1053        for (id, record) in self.store.scan(handle.id)? {
1054            if out.len() >= limit {
1055                break;
1056            }
1057            let payload: Value = serde_json::from_slice(&record.payload)?;
1058            if let Some(filter) = filter
1059                && !filter.matches(&payload)
1060            {
1061                continue;
1062            }
1063            if skipped < offset {
1064                skipped += 1;
1065                continue;
1066            }
1067            out.push(Match {
1068                id,
1069                score: 0.0,
1070                payload: with_payload.then_some(payload),
1071                vector: with_vector.then_some(record.vector),
1072            });
1073        }
1074        Ok(out)
1075    }
1076
1077    /// Rebuild a collection's index if a prior write deferred it (the `stale`
1078    /// flag), making the collection's read snapshot current. Idempotent and cheap
1079    /// when already fresh. Separating this `&mut self` maintenance from the `&self`
1080    /// `*_snapshot` reads is what lets a server serve concurrent reads behind a
1081    /// shared lock and take the exclusive lock only for the rare rebuild (ADR-0057).
1082    pub fn ensure_indexed(&mut self, collection: &str) -> Result<()> {
1083        if self.handle(collection)?.stale {
1084            let store = &self.store;
1085            let handle = self
1086                .collections
1087                .get_mut(collection)
1088                .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
1089            rebuild_index(store, handle)?;
1090            // MVCC mode: publish the freshly rebuilt base as the new lock-free
1091            // snapshot (an empty overlay; the base now absorbs all prior writes).
1092            // The sync rebuild holds `&mut self`, so no write races it.
1093            if mvcc_served(handle) {
1094                publish_base(handle);
1095            }
1096        }
1097        Ok(())
1098    }
1099
1100    /// Enable or disable lock-free MVCC reads (ADR-0064) at runtime — the server
1101    /// sets this from `QUIVER_MVCC_READS`. Default off: the proven RwLock read path
1102    /// stays the default until MVCC is loom- and benchmark-validated (increments
1103    /// 2–3). Applies to every loaded collection; a collection's first rebuild after
1104    /// enabling publishes its base snapshot.
1105    pub fn set_mvcc_reads(&mut self, on: bool) {
1106        self.mvcc = on;
1107        for handle in self.collections.values_mut() {
1108            handle.mvcc = on;
1109            // A toggle either direction forces a rebuild on the next read of an
1110            // eligible collection: turning on publishes the base snapshot; turning
1111            // off repopulates the in-place `handle.index` that MVCC mode left empty.
1112            if mvcc_eligible(&handle.descriptor) {
1113                handle.stale = true;
1114            }
1115        }
1116    }
1117
1118    /// Whether lock-free MVCC reads are enabled (ADR-0064).
1119    #[must_use]
1120    pub fn mvcc_reads(&self) -> bool {
1121        self.mvcc
1122    }
1123
1124    /// The lock-free serving snapshot cell for a collection (ADR-0064), for a
1125    /// reader to `load()` without taking any lock — the basis of reads that proceed
1126    /// during writes. Only republished for an MVCC-served collection (single-vector,
1127    /// server-searchable, in-memory index, with MVCC enabled); otherwise it stays
1128    /// the initial empty snapshot and callers use the locked [`Database::search`]
1129    /// path. The cell outlives `&self`, so a reader can hold and re-`load` it.
1130    ///
1131    /// # Errors
1132    /// Returns [`Error::CollectionNotFound`] if the collection is not loaded.
1133    pub fn collection_snapshot(&self, collection: &str) -> Result<SnapshotCell> {
1134        Ok(self.handle(collection)?.snapshot.clone())
1135    }
1136
1137    /// The lock-free serving snapshot cell **only** for a collection that is
1138    /// currently MVCC-served (the flag is on and the collection is single-vector,
1139    /// server-searchable, and in-memory); otherwise `None`. A server caches the
1140    /// returned cell once and `load()`s it to serve pure-vector reads with no lock
1141    /// (ADR-0064 increment 3) — the cell self-updates as the writer republishes, so
1142    /// it never needs re-fetching under the lock.
1143    ///
1144    /// # Errors
1145    /// Returns [`Error::CollectionNotFound`] if the collection is not loaded.
1146    pub fn mvcc_cell(&self, collection: &str) -> Result<Option<SnapshotCell>> {
1147        let handle = self.handle(collection)?;
1148        Ok(mvcc_served(handle).then(|| handle.snapshot.clone()))
1149    }
1150
1151    /// Whether a collection's index is stale — a prior write deferred its rebuild.
1152    /// The server reads this to schedule an **off-lock** rebuild (ADR-0062) without
1153    /// holding the exclusive lock; embedded callers never need it (the `&mut self`
1154    /// searches rebuild synchronously via [`Database::ensure_indexed`]).
1155    pub fn needs_rebuild(&self, collection: &str) -> Result<bool> {
1156        Ok(self.handle(collection)?.stale)
1157    }
1158
1159    /// Capture everything an off-lock rebuild needs (ADR-0062): under the shared
1160    /// read lock the caller already holds, scan the collection's live rows and
1161    /// record the write generation. Returns `None` when the index is already fresh
1162    /// (nothing to rebuild). The expensive build then runs with **no lock** via
1163    /// [`RebuildInputs::build`], and [`Database::commit_rebuild`] installs it.
1164    pub fn snapshot_rebuild_inputs(&self, collection: &str) -> Result<Option<RebuildInputs>> {
1165        let handle = self.handle(collection)?;
1166        if !handle.stale {
1167            return Ok(None);
1168        }
1169        let scan = scan_collection(&self.store, handle)?;
1170        Ok(Some(RebuildInputs {
1171            collection: collection.to_owned(),
1172            descriptor: handle.descriptor.clone(),
1173            scan,
1174            write_gen: handle.write_gen,
1175        }))
1176    }
1177
1178    /// Install an index built off-lock (ADR-0062) under the brief exclusive lock the
1179    /// caller holds. Returns whether the collection is **still** stale: if a write
1180    /// landed during the build (the write generation advanced), the fresh index is
1181    /// already behind, so it is installed (still newer than the prior snapshot) but
1182    /// the handle stays stale for the next rebuild. A collection dropped or replaced
1183    /// during the build is ignored (`Ok(false)`) — the build is discarded.
1184    pub fn commit_rebuild(&mut self, rebuilt: RebuiltIndex) -> Result<bool> {
1185        let store = &self.store;
1186        let Some(handle) = self.collections.get_mut(&rebuilt.collection) else {
1187            return Ok(false);
1188        };
1189        match rebuilt.kind {
1190            RebuiltKind::Ready(index) => handle.index = *index,
1191            RebuiltKind::Disk { graph, pq } => {
1192                // Drop the prior index before sealing the artifact in place: its
1193                // `mmap` assumes an immutable file. Safe under the write lock — no
1194                // read can observe the momentary empty index.
1195                handle.index = empty_index(&handle.descriptor);
1196                let disk = write_disk_index(store, handle.id, &graph, &pq)?;
1197                handle.index = CollectionIndex::Disk(Some(FreshDiskVamana::new(disk)?));
1198            }
1199        }
1200        handle.int_to_ext = rebuilt.int_to_ext;
1201        handle.ext_to_int = rebuilt.ext_to_int;
1202        handle.docs = rebuilt.docs;
1203        handle.sparse = rebuilt.sparse;
1204        // Clear stale only if no write landed since the inputs were captured;
1205        // otherwise leave it set so the driver rebuilds again for the newer write.
1206        let still_stale = handle.write_gen != rebuilt.write_gen;
1207        handle.stale = still_stale;
1208        // MVCC mode: publish the new base as the lock-free snapshot with an empty
1209        // overlay. If a write landed during the build (`still_stale`), it is briefly
1210        // invisible to snapshot reads until the next rebuild folds it in — the same
1211        // eventual-consistency window the locked off-lock path already serves
1212        // (ADR-0062), with no double-count (the base is the sole source). The
1213        // single-writer's exclusive lock makes the swap race-free.
1214        if mvcc_served(handle) {
1215            publish_base(handle);
1216        }
1217        Ok(still_stale)
1218    }
1219
1220    /// Search a collection for the nearest points to `query`, optionally
1221    /// post-filtered by payload predicate.
1222    pub fn search(
1223        &mut self,
1224        collection: &str,
1225        query: &[f32],
1226        params: &SearchParams,
1227    ) -> Result<Vec<Match>> {
1228        // Embedded read-your-writes: rebuild a deferred index first (synchronously),
1229        // then read the now-fresh snapshot. The server instead serves the prior
1230        // snapshot and rebuilds off-lock (ADR-0062), so concurrent readers never
1231        // block on a writer's rebuild.
1232        self.ensure_indexed(collection)?;
1233        self.search_snapshot(collection, query, params)
1234    }
1235
1236    /// Search a collection's **current immutable snapshot** for the nearest points
1237    /// to `query`, optionally post-filtered by payload predicate. Takes `&self`, so
1238    /// many readers run concurrently. When a prior write deferred this collection's
1239    /// rebuild, it serves the **prior** snapshot (a snapshot-isolated, slightly stale
1240    /// read — ADR-0062/0053); the caller schedules a rebuild via
1241    /// [`Database::needs_rebuild`].
1242    pub fn search_snapshot(
1243        &self,
1244        collection: &str,
1245        query: &[f32],
1246        params: &SearchParams,
1247    ) -> Result<Vec<Match>> {
1248        require_single_vector(self.handle(collection)?)?;
1249        require_server_searchable(self.handle(collection)?)?;
1250
1251        let handle = self.handle(collection)?;
1252
1253        // MVCC mode (ADR-0064): the immutable base index lives in the published
1254        // snapshot, not `handle.index`. Serve the read from the snapshot — the
1255        // base-index search merged lock-free with the overlay of recent writes —
1256        // reusing the same pre-filter planning and store-fetch enrichment as the
1257        // locked path; only the dense candidate source differs.
1258        if mvcc_served(handle) {
1259            return self.search_snapshot_mvcc(handle, query, params);
1260        }
1261
1262        // Hybrid planning: if the filter narrows to a small, secondary-indexed
1263        // candidate set, scan those rows exactly instead of post-filtering ANN
1264        // hits — exact, and immune to the filtered-ANN recall cliff.
1265        if let Some(filter) = &params.filter
1266            && let Some(candidates) = candidate_ids(
1267                &self.store,
1268                handle.id,
1269                filter,
1270                &handle.descriptor.filterable,
1271            )?
1272            && candidates.len() <= FULL_SCAN_THRESHOLD
1273        {
1274            return self.exact_filtered_search(
1275                handle.id,
1276                &handle.descriptor,
1277                query,
1278                params,
1279                filter,
1280                &candidates,
1281            );
1282        }
1283
1284        let fetch = if params.filter.is_some() {
1285            params
1286                .k
1287                .saturating_mul(FILTER_OVERFETCH)
1288                .max(params.ef_search)
1289        } else {
1290            params.k
1291        };
1292        let raw = handle.index.search(query, fetch, params.ef_search)?;
1293
1294        let need_record = params.filter.is_some() || params.with_payload || params.with_vector;
1295        let mut out = Vec::with_capacity(params.k);
1296        for neighbor in raw {
1297            if out.len() >= params.k {
1298                break;
1299            }
1300            let Some(ext_id) = handle.int_to_ext.get(neighbor.id as usize) else {
1301                continue;
1302            };
1303            let record = if need_record {
1304                self.store.get(handle.id, ext_id)?
1305            } else {
1306                None
1307            };
1308            let payload_value: Option<Value> = match &record {
1309                Some(r) if params.filter.is_some() || params.with_payload => {
1310                    Some(serde_json::from_slice(&r.payload)?)
1311                }
1312                _ => None,
1313            };
1314            if let Some(filter) = &params.filter {
1315                let value = payload_value.as_ref().unwrap_or(&Value::Null);
1316                if !filter.matches(value) {
1317                    continue;
1318                }
1319            }
1320            out.push(Match {
1321                id: ext_id.clone(),
1322                score: neighbor.distance,
1323                payload: if params.with_payload {
1324                    payload_value
1325                } else {
1326                    None
1327                },
1328                vector: if params.with_vector {
1329                    record.map(|r| r.vector)
1330                } else {
1331                    None
1332                },
1333            });
1334        }
1335        Ok(out)
1336    }
1337
1338    // Serve `search_snapshot` for an MVCC-served collection (ADR-0064 increment 2):
1339    // the dense candidates come from the lock-free snapshot (base ⊕ overlay), and
1340    // everything else — the exact small-candidate pre-filter, post-filter, and
1341    // payload/vector enrichment — reuses the same store-only logic as the locked
1342    // path, so filtered and unfiltered reads are both exact.
1343    fn search_snapshot_mvcc(
1344        &self,
1345        handle: &CollectionHandle,
1346        query: &[f32],
1347        params: &SearchParams,
1348    ) -> Result<Vec<Match>> {
1349        // Exact pre-filter for a small, secondary-indexed candidate set (store-only,
1350        // index-independent — identical to the locked path).
1351        if let Some(filter) = &params.filter
1352            && let Some(candidates) = candidate_ids(
1353                &self.store,
1354                handle.id,
1355                filter,
1356                &handle.descriptor.filterable,
1357            )?
1358            && candidates.len() <= FULL_SCAN_THRESHOLD
1359        {
1360            return self.exact_filtered_search(
1361                handle.id,
1362                &handle.descriptor,
1363                query,
1364                params,
1365                filter,
1366                &candidates,
1367            );
1368        }
1369
1370        // Otherwise: dense candidates from the lock-free snapshot (overfetched when
1371        // filtering, to survive the post-filter), then post-filter + enrich by a
1372        // store fetch on the result ids.
1373        let fetch = if params.filter.is_some() {
1374            params
1375                .k
1376                .saturating_mul(FILTER_OVERFETCH)
1377                .max(params.ef_search)
1378        } else {
1379            params.k
1380        };
1381        let dense = handle
1382            .snapshot
1383            .load()
1384            .search(query, fetch, params.ef_search)?;
1385        let need_record = params.filter.is_some() || params.with_payload || params.with_vector;
1386        let mut out = Vec::with_capacity(params.k);
1387        for m in dense {
1388            if out.len() >= params.k {
1389                break;
1390            }
1391            let record = if need_record {
1392                self.store.get(handle.id, &m.id)?
1393            } else {
1394                None
1395            };
1396            let payload_value: Option<Value> = match &record {
1397                Some(r) if params.filter.is_some() || params.with_payload => {
1398                    Some(serde_json::from_slice(&r.payload)?)
1399                }
1400                _ => None,
1401            };
1402            if let Some(filter) = &params.filter
1403                && !filter.matches(payload_value.as_ref().unwrap_or(&Value::Null))
1404            {
1405                continue;
1406            }
1407            out.push(Match {
1408                id: m.id,
1409                score: m.score,
1410                payload: if params.with_payload {
1411                    payload_value
1412                } else {
1413                    None
1414                },
1415                vector: if params.with_vector {
1416                    record.map(|r| r.vector)
1417                } else {
1418                    None
1419                },
1420            });
1421        }
1422        Ok(out)
1423    }
1424
1425    /// Hybrid search (ADR-0043/0046): fuse up to three rankings with Reciprocal
1426    /// Rank Fusion — a dense ANN ranking, a sparse inverted-index dot-product
1427    /// ranking (`sparse_query`), and a BM25 full-text ranking (`text_query`, scored
1428    /// over the same inverted index). Any may be `None`; at least one is required,
1429    /// giving pure dense / sparse / lexical or any blend through the same path. The
1430    /// same payload `filter` is re-checked on every side, so results stay exact.
1431    /// `rrf_k0` is the RRF rank-bias constant ([`DEFAULT_RRF_K0`]).
1432    pub fn hybrid_search(
1433        &mut self,
1434        collection: &str,
1435        dense_query: Option<&[f32]>,
1436        sparse_query: Option<&SparseVector>,
1437        text_query: Option<&str>,
1438        params: &SearchParams,
1439        rrf_k0: f32,
1440    ) -> Result<Vec<Match>> {
1441        // Embedded read-your-writes: rebuild a deferred index synchronously, then
1442        // read the fresh snapshot (the server serves the prior snapshot and rebuilds
1443        // off-lock — ADR-0062).
1444        self.ensure_indexed(collection)?;
1445        self.hybrid_search_snapshot(
1446            collection,
1447            dense_query,
1448            sparse_query,
1449            text_query,
1450            params,
1451            rrf_k0,
1452        )
1453    }
1454
1455    /// Hybrid search over the collection's current immutable snapshot (`&self`, so
1456    /// readers run concurrently). When a prior write deferred the rebuild, it serves
1457    /// the **prior** snapshot (snapshot-isolated, slightly stale — ADR-0062/0053);
1458    /// the caller schedules a rebuild via [`Database::needs_rebuild`].
1459    pub fn hybrid_search_snapshot(
1460        &self,
1461        collection: &str,
1462        dense_query: Option<&[f32]>,
1463        sparse_query: Option<&SparseVector>,
1464        text_query: Option<&str>,
1465        params: &SearchParams,
1466        rrf_k0: f32,
1467    ) -> Result<Vec<Match>> {
1468        require_single_vector(self.handle(collection)?)?;
1469        require_server_searchable(self.handle(collection)?)?;
1470        if dense_query.is_none() && sparse_query.is_none() && text_query.is_none() {
1471            return Err(Error::Unsupported(
1472                "hybrid_search requires a dense query, a sparse query, or a text query",
1473            ));
1474        }
1475        let handle = self.handle(collection)?;
1476
1477        // Pull a deep-enough candidate list from each side so the fusion is
1478        // meaningful, then RRF down to `k`.
1479        let depth = params
1480            .k
1481            .saturating_mul(RRF_CANDIDATE_FACTOR)
1482            .max(MIN_RRF_CANDIDATES);
1483        let filter = params.filter.as_ref();
1484        let mut lists: Vec<Vec<String>> = Vec::new();
1485        if let Some(q) = dense_query {
1486            lists.push(self.dense_ranked_ids(handle, q, depth, params.ef_search, filter)?);
1487        }
1488        if let Some(sp) = sparse_query {
1489            lists.push(self.sparse_ranked_ids(handle, sp, depth, filter)?);
1490        }
1491        if let Some(text) = text_query {
1492            lists.push(self.bm25_ranked_ids(handle, text, depth, filter)?);
1493        }
1494        let fused = rrf_fuse(&lists, rrf_k0, params.k);
1495
1496        let mut out = Vec::with_capacity(fused.len());
1497        for (ext_id, score) in fused {
1498            let record = if params.with_payload || params.with_vector {
1499                self.store.get(handle.id, &ext_id)?
1500            } else {
1501                None
1502            };
1503            let payload = match (&record, params.with_payload) {
1504                (Some(r), true) => Some(serde_json::from_slice(&r.payload)?),
1505                _ => None,
1506            };
1507            out.push(Match {
1508                id: ext_id,
1509                score,
1510                payload,
1511                vector: if params.with_vector {
1512                    record.map(|r| r.vector)
1513                } else {
1514                    None
1515                },
1516            });
1517        }
1518        Ok(out)
1519    }
1520
1521    // Dense candidates as a ranked list of external ids (the filter re-checked),
1522    // for hybrid fusion.
1523    fn dense_ranked_ids(
1524        &self,
1525        handle: &CollectionHandle,
1526        query: &[f32],
1527        depth: usize,
1528        ef_search: usize,
1529        filter: Option<&Filter>,
1530    ) -> Result<Vec<String>> {
1531        let mut ids = Vec::new();
1532        // MVCC mode (ADR-0064): the dense candidates come from the lock-free
1533        // snapshot (base ⊕ overlay), with ext ids already resolved; the sparse/BM25
1534        // sides and the filter re-check are unchanged.
1535        if mvcc_served(handle) {
1536            for m in handle
1537                .snapshot
1538                .load()
1539                .search(query, depth, ef_search.max(depth))?
1540            {
1541                if !self.passes_filter(handle.id, &m.id, filter)? {
1542                    continue;
1543                }
1544                ids.push(m.id);
1545                if ids.len() >= depth {
1546                    break;
1547                }
1548            }
1549            return Ok(ids);
1550        }
1551        let raw = handle.index.search(query, depth, ef_search.max(depth))?;
1552        for neighbor in raw {
1553            let Some(ext_id) = handle.int_to_ext.get(neighbor.id as usize) else {
1554                continue;
1555            };
1556            if !self.passes_filter(handle.id, ext_id, filter)? {
1557                continue;
1558            }
1559            ids.push(ext_id.clone());
1560            if ids.len() >= depth {
1561                break;
1562            }
1563        }
1564        Ok(ids)
1565    }
1566
1567    // Sparse candidates as a ranked list of external ids. With the derived inverted
1568    // index present (the common case), score only the query's nonzero dimensions via
1569    // the posting lists, then re-check the filter on the ranked ids until `depth` are
1570    // filled — so low-scored rows never load a payload (ADR-0045). When the index is
1571    // absent (a not-yet-rebuilt or client-side collection), fall back to the full
1572    // store scan, which stays correct under the incremental upsert/delete path.
1573    fn sparse_ranked_ids(
1574        &self,
1575        handle: &CollectionHandle,
1576        query: &SparseVector,
1577        depth: usize,
1578        filter: Option<&Filter>,
1579    ) -> Result<Vec<String>> {
1580        if let Some(idx) = handle.sparse.as_ref() {
1581            let mut ids = Vec::new();
1582            for (ext_id, _score) in idx.search(query) {
1583                if !self.passes_filter(handle.id, &ext_id, filter)? {
1584                    continue;
1585                }
1586                ids.push(ext_id);
1587                if ids.len() >= depth {
1588                    break;
1589                }
1590            }
1591            return Ok(ids);
1592        }
1593        self.sparse_ranked_ids_by_scan(handle.id, query, depth, filter)
1594    }
1595
1596    // The store-scan fallback for [`sparse_ranked_ids`]: load every row, score its
1597    // `__quiver_sparse__` vector by dot product against the query, re-check the
1598    // filter, and return the top `depth`. O(N-rows), but correct without an index.
1599    fn sparse_ranked_ids_by_scan(
1600        &self,
1601        cid: CollectionId,
1602        query: &SparseVector,
1603        depth: usize,
1604        filter: Option<&Filter>,
1605    ) -> Result<Vec<String>> {
1606        let qmap: HashMap<u32, f32> = query
1607            .indices
1608            .iter()
1609            .copied()
1610            .zip(query.values.iter().copied())
1611            .collect();
1612        let mut scored: Vec<(f32, String)> = Vec::new();
1613        for (ext_id, record) in self.store.scan(cid)? {
1614            if record.payload.is_empty() {
1615                continue;
1616            }
1617            let Ok(value) = serde_json::from_slice::<Value>(&record.payload) else {
1618                continue;
1619            };
1620            if let Some(filter) = filter
1621                && !filter.matches(&value)
1622            {
1623                continue;
1624            }
1625            let Some(raw) = value.get(SPARSE_KEY) else {
1626                continue;
1627            };
1628            let Ok(sv) = serde_json::from_value::<SparseVector>(raw.clone()) else {
1629                continue;
1630            };
1631            let mut score = 0.0f32;
1632            for (dim, weight) in sv.indices.iter().zip(sv.values.iter()) {
1633                if let Some(qw) = qmap.get(dim) {
1634                    score += qw * weight;
1635                }
1636            }
1637            if score > 0.0 {
1638                scored.push((score, ext_id));
1639            }
1640        }
1641        scored.sort_by(|a, b| b.0.total_cmp(&a.0).then(a.1.cmp(&b.1)));
1642        Ok(scored.into_iter().take(depth).map(|(_, id)| id).collect())
1643    }
1644
1645    // BM25 full-text candidates as a ranked list of external ids (ADR-0046): tokenize
1646    // the query text into term ids and score them with BM25 over the derived inverted
1647    // index, re-checking the filter on the ranked ids until `depth` are filled. BM25
1648    // needs the index's corpus statistics, so when a collection has no inverted index
1649    // (a not-yet-rebuilt or client-side collection — neither reachable here, since
1650    // hybrid rebuilds a stale handle and rejects client-side) there is nothing to
1651    // score and the list is empty; any dense side still contributes.
1652    fn bm25_ranked_ids(
1653        &self,
1654        handle: &CollectionHandle,
1655        query_text: &str,
1656        depth: usize,
1657        filter: Option<&Filter>,
1658    ) -> Result<Vec<String>> {
1659        let Some(idx) = handle.sparse.as_ref() else {
1660            return Ok(Vec::new());
1661        };
1662        let terms = query_term_ids(query_text);
1663        let mut ids = Vec::new();
1664        for (ext_id, _score) in idx.bm25_search(&terms, BM25_K1, BM25_B) {
1665            if !self.passes_filter(handle.id, &ext_id, filter)? {
1666                continue;
1667            }
1668            ids.push(ext_id);
1669            if ids.len() >= depth {
1670                break;
1671            }
1672        }
1673        Ok(ids)
1674    }
1675
1676    // Re-check a payload filter against a row (loading its payload). `None` filter
1677    // always passes.
1678    fn passes_filter(
1679        &self,
1680        cid: CollectionId,
1681        ext_id: &str,
1682        filter: Option<&Filter>,
1683    ) -> Result<bool> {
1684        let Some(filter) = filter else {
1685            return Ok(true);
1686        };
1687        let value: Value = match self.store.get(cid, ext_id)? {
1688            Some(r) => serde_json::from_slice(&r.payload)?,
1689            None => Value::Null,
1690        };
1691        Ok(filter.matches(&value))
1692    }
1693
1694    // Exactly score `candidates` (a superset of the filter's matches that the
1695    // secondary indexes produced) against the query, re-check the full filter
1696    // for correctness, and return the top `k`. The pre-filter arm of the hybrid
1697    // planner: perfect recall over an already-narrowed set.
1698    fn exact_filtered_search(
1699        &self,
1700        cid: CollectionId,
1701        descriptor: &Descriptor,
1702        query: &[f32],
1703        params: &SearchParams,
1704        filter: &Filter,
1705        candidates: &BTreeSet<String>,
1706    ) -> Result<Vec<Match>> {
1707        let metric = to_index_metric(descriptor.metric);
1708        let mut scored: Vec<(f32, String, Value, Vec<f32>)> = Vec::new();
1709        for ext_id in candidates {
1710            let Some(record) = self.store.get(cid, ext_id)? else {
1711                continue;
1712            };
1713            let payload: Value = serde_json::from_slice(&record.payload)?;
1714            if !filter.matches(&payload) {
1715                continue;
1716            }
1717            let ordering = ordering_distance(metric, query, &record.vector);
1718            scored.push((ordering, ext_id.clone(), payload, record.vector));
1719        }
1720        scored.sort_by(|a, b| a.0.total_cmp(&b.0));
1721        scored.truncate(params.k);
1722        Ok(scored
1723            .into_iter()
1724            .map(|(ordering, id, payload, vector)| Match {
1725                id,
1726                score: report_metric(metric, ordering),
1727                payload: params.with_payload.then_some(payload),
1728                vector: params.with_vector.then_some(vector),
1729            })
1730            .collect())
1731    }
1732
1733    /// Insert or replace a multi-vector (late-interaction / ColBERT) document: its
1734    /// `vectors` are stored as a group of token rows and its `payload` once on the
1735    /// anchor token (ADR-0028). Re-upserting a document first removes the tokens a
1736    /// shorter version would leave behind, so the document is replaced cleanly.
1737    ///
1738    /// # Errors
1739    /// Errors if the collection is single-vector, the document has no vectors, a
1740    /// vector's dimensionality is wrong, or the id contains the reserved separator.
1741    pub fn upsert_document(
1742        &mut self,
1743        collection: &str,
1744        doc_id: &str,
1745        vectors: &[Vec<f32>],
1746        payload: &Value,
1747    ) -> Result<()> {
1748        let handle = self
1749            .collections
1750            .get_mut(collection)
1751            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
1752        require_multivector(handle)?;
1753        if doc_id.contains(DOC_TOKEN_SEP) {
1754            return Err(Error::Unsupported(
1755                "document id must not contain the reserved 0x1f separator",
1756            ));
1757        }
1758        if vectors.is_empty() {
1759            return Err(Error::Unsupported("a document needs at least one vector"));
1760        }
1761        let dim = handle.descriptor.dim as usize;
1762        if vectors.iter().any(|v| v.len() != dim) {
1763            return Err(Error::Unsupported(
1764                "every document vector must match the collection dimensionality",
1765            ));
1766        }
1767        // Remove only the trailing tokens a shorter re-upsert leaves behind; the
1768        // rest are overwritten by the upserts below.
1769        let previous = handle
1770            .docs
1771            .as_ref()
1772            .and_then(|d| d.get(doc_id))
1773            .copied()
1774            .unwrap_or(0) as usize;
1775        for j in vectors.len()..previous {
1776            self.store.delete(handle.id, &token_id(doc_id, j))?;
1777            // Tombstone the dropped token row in the ANN index too (ADR-0034).
1778            index_delete_point(handle, &token_id(doc_id, j));
1779        }
1780        let payload_bytes = serde_json::to_vec(payload)?;
1781        for (j, vector) in vectors.iter().enumerate() {
1782            // The payload is stored once, on the anchor token; the rest carry none.
1783            let bytes: &[u8] = if j == 0 {
1784                payload_bytes.as_slice()
1785            } else {
1786                &[]
1787            };
1788            self.store
1789                .upsert(handle.id, &token_id(doc_id, j), vector, bytes)?;
1790            // Fold the token row into the ANN index incrementally instead of
1791            // marking the whole collection stale (ADR-0034); the underlying index
1792            // consolidates by a rebuild past its own churn threshold.
1793            index_upsert_point(handle, &token_id(doc_id, j), vector)?;
1794        }
1795        if let Some(docs) = handle.docs.as_mut() {
1796            docs.insert(doc_id.to_owned(), vectors.len() as u32);
1797        }
1798        Ok(())
1799    }
1800
1801    /// Search a multi-vector collection by a set of query token vectors, ranking
1802    /// documents by MaxSim late interaction (ADR-0028). At or below the exact-scan
1803    /// threshold every document is scored exactly; above it, candidates are
1804    /// generated by nearest-neighbour search over the token pool (recall tuned by
1805    /// `ef_search`) and re-ranked exactly. An optional `filter` is applied to each
1806    /// document's payload, exactly. A document has no single vector, so `with_payload`
1807    /// returns the anchor payload and `with_vector` returns the token vectors.
1808    pub fn search_multi_vector(
1809        &mut self,
1810        collection: &str,
1811        query_tokens: &[Vec<f32>],
1812        params: &SearchParams,
1813    ) -> Result<Vec<DocumentMatch>> {
1814        // Embedded read-your-writes: rebuild a deferred index synchronously, then
1815        // read the fresh snapshot (the server serves the prior snapshot and rebuilds
1816        // off-lock — ADR-0062).
1817        self.ensure_indexed(collection)?;
1818        self.search_multi_vector_snapshot(collection, query_tokens, params)
1819    }
1820
1821    /// Multi-vector (late-interaction) search over the collection's current
1822    /// immutable snapshot (`&self`, so readers run concurrently). A small corpus is
1823    /// scored exactly; a large corpus draws candidates from the ANN index, serving
1824    /// the **prior** snapshot when a write deferred its rebuild (snapshot-isolated,
1825    /// slightly stale — ADR-0062/0053); the caller schedules the rebuild via
1826    /// [`Database::needs_rebuild`].
1827    pub fn search_multi_vector_snapshot(
1828        &self,
1829        collection: &str,
1830        query_tokens: &[Vec<f32>],
1831        params: &SearchParams,
1832    ) -> Result<Vec<DocumentMatch>> {
1833        require_multivector(self.handle(collection)?)?;
1834        let dim = self.handle(collection)?.descriptor.dim as usize;
1835        if query_tokens.is_empty() {
1836            return Ok(Vec::new());
1837        }
1838        if query_tokens.iter().any(|v| v.len() != dim) {
1839            return Err(Error::Unsupported(
1840                "every query token must match the collection dimensionality",
1841            ));
1842        }
1843
1844        let doc_count = self
1845            .handle(collection)?
1846            .docs
1847            .as_ref()
1848            .map_or(0, BTreeMap::len);
1849        let candidates: Vec<String> = if doc_count <= MULTIVECTOR_EXACT_DOC_THRESHOLD {
1850            // Exact: score every document. No ANN index needed.
1851            self.handle(collection)?
1852                .docs
1853                .as_ref()
1854                .map(|d| d.keys().cloned().collect())
1855                .unwrap_or_default()
1856        } else {
1857            // Large corpus: generate candidates from the token pool. A prior write
1858            // may have deferred the ANN index rebuild; serve the prior snapshot (the
1859            // server schedules an off-lock rebuild — ADR-0062).
1860            let handle = self.handle(collection)?;
1861            let per_token_k = params
1862                .k
1863                .saturating_mul(MULTIVECTOR_CANDIDATE_FACTOR)
1864                .max(params.ef_search);
1865            let mut set = BTreeSet::new();
1866            for token in query_tokens {
1867                for neighbor in handle.index.search(token, per_token_k, params.ef_search)? {
1868                    if let Some(ext) = handle.int_to_ext.get(neighbor.id as usize)
1869                        && let Some((doc, _)) = parse_token_id(ext)
1870                    {
1871                        set.insert(doc.to_owned());
1872                    }
1873                }
1874            }
1875            set.into_iter().collect()
1876        };
1877
1878        // Re-rank the candidate documents by exact MaxSim over all their tokens.
1879        let handle = self.handle(collection)?;
1880        let cid = handle.id;
1881        let metric = to_index_metric(handle.descriptor.metric);
1882        let mut scored: Vec<ScoredDocument> = Vec::new();
1883        for doc in &candidates {
1884            let count = handle
1885                .docs
1886                .as_ref()
1887                .and_then(|d| d.get(doc))
1888                .copied()
1889                .unwrap_or(0) as usize;
1890            let (tokens, payload) = self.gather_document(cid, doc, count)?;
1891            if tokens.is_empty() {
1892                continue;
1893            }
1894            if let Some(filter) = &params.filter {
1895                let value = payload.clone().unwrap_or(Value::Null);
1896                if !filter.matches(&value) {
1897                    continue;
1898                }
1899            }
1900            let score = max_sim(metric, query_tokens, &tokens);
1901            let vectors = params.with_vector.then_some(tokens);
1902            scored.push((score, doc.clone(), payload, vectors));
1903        }
1904        // Higher MaxSim first; ties broken by id for a deterministic order.
1905        scored.sort_by(|a, b| b.0.total_cmp(&a.0).then_with(|| a.1.cmp(&b.1)));
1906        scored.truncate(params.k);
1907        Ok(scored
1908            .into_iter()
1909            .map(|(score, id, payload, vectors)| DocumentMatch {
1910                id,
1911                score,
1912                payload: params.with_payload.then_some(payload).flatten(),
1913                vectors,
1914            })
1915            .collect())
1916    }
1917
1918    /// Fetch a multi-vector document by id: its anchor payload and, if
1919    /// `with_vectors`, its token vectors. `None` if the document does not exist.
1920    pub fn get_document(
1921        &self,
1922        collection: &str,
1923        doc_id: &str,
1924        with_vectors: bool,
1925    ) -> Result<Option<DocumentMatch>> {
1926        let handle = self.handle(collection)?;
1927        require_multivector(handle)?;
1928        let Some(&count) = handle.docs.as_ref().and_then(|d| d.get(doc_id)) else {
1929            return Ok(None);
1930        };
1931        let (tokens, payload) = self.gather_document(handle.id, doc_id, count as usize)?;
1932        if tokens.is_empty() {
1933            return Ok(None);
1934        }
1935        Ok(Some(DocumentMatch {
1936            id: doc_id.to_owned(),
1937            score: 0.0,
1938            payload,
1939            vectors: with_vectors.then_some(tokens),
1940        }))
1941    }
1942
1943    /// Delete a multi-vector document and all of its token rows. Returns whether it
1944    /// existed.
1945    pub fn delete_document(&mut self, collection: &str, doc_id: &str) -> Result<bool> {
1946        let handle = self
1947            .collections
1948            .get_mut(collection)
1949            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
1950        require_multivector(handle)?;
1951        let Some(count) = handle.docs.as_ref().and_then(|d| d.get(doc_id)).copied() else {
1952            return Ok(false);
1953        };
1954        for j in 0..count as usize {
1955            self.store.delete(handle.id, &token_id(doc_id, j))?;
1956            // Tombstone each token row in the ANN index incrementally (ADR-0034).
1957            index_delete_point(handle, &token_id(doc_id, j));
1958        }
1959        if let Some(docs) = handle.docs.as_mut() {
1960            docs.remove(doc_id);
1961        }
1962        Ok(true)
1963    }
1964
1965    /// The number of documents in a multi-vector collection. Errors if the
1966    /// collection is single-vector.
1967    pub fn document_count(&self, collection: &str) -> Result<usize> {
1968        let handle = self.handle(collection)?;
1969        require_multivector(handle)?;
1970        Ok(handle.docs.as_ref().map_or(0, BTreeMap::len))
1971    }
1972
1973    // Read a document's token vectors (in ordinal order) and its anchor payload
1974    // from the store. Missing token rows are skipped, so a torn document yields a
1975    // short token list the caller treats as empty.
1976    fn gather_document(
1977        &self,
1978        cid: CollectionId,
1979        doc_id: &str,
1980        count: usize,
1981    ) -> Result<(Vec<Vec<f32>>, Option<Value>)> {
1982        let mut tokens = Vec::with_capacity(count);
1983        let mut payload: Option<Value> = None;
1984        for j in 0..count {
1985            let Some(record) = self.store.get(cid, &token_id(doc_id, j))? else {
1986                continue;
1987            };
1988            if j == 0 && !record.payload.is_empty() {
1989                payload = Some(serde_json::from_slice(&record.payload)?);
1990            }
1991            tokens.push(record.vector);
1992        }
1993        Ok((tokens, payload))
1994    }
1995
1996    /// Flush a durable checkpoint of all collections, capturing a durable
1997    /// snapshot of each built, up-to-date IVF index (ADR-0025) so it reloads on
1998    /// open instead of rebuilding. Other index kinds, and a stale or unbuilt IVF,
1999    /// are rebuilt on open.
2000    pub fn checkpoint(&mut self) -> Result<()> {
2001        let mut snapshots: HashMap<CollectionId, Vec<u8>> = HashMap::new();
2002        for handle in self.collections.values() {
2003            if handle.stale {
2004                continue;
2005            }
2006            if let CollectionIndex::Ivf(Some(ivf)) = &handle.index {
2007                if ivf.is_empty() {
2008                    continue;
2009                }
2010                let envelope = IndexEnvelope {
2011                    version: INDEX_ENVELOPE_VERSION,
2012                    int_to_ext: handle.int_to_ext.clone(),
2013                    ivf: ivf.snapshot()?,
2014                };
2015                snapshots.insert(handle.id, postcard::to_allocvec(&envelope)?);
2016            } else if let CollectionIndex::Disk(Some(fresh)) = &handle.index {
2017                // Durable DiskVamana (ADR-0063): the base file is already on disk
2018                // (sealed at build/consolidation); seal only the tiny tie-to-live
2019                // blob. The derived sparse index is not persisted (as for IVF); on
2020                // reopen the durable load leaves it `None` and hybrid search falls
2021                // back to the store scan until the next rebuild — correct, not fast.
2022                let envelope = DiskEnvelope {
2023                    version: INDEX_ENVELOPE_VERSION,
2024                    int_to_ext: handle.int_to_ext.clone(),
2025                    base_row_count: fresh.base_len() as u64,
2026                    deleted_ids: fresh.deleted_ids(),
2027                };
2028                snapshots.insert(handle.id, postcard::to_allocvec(&envelope)?);
2029            }
2030        }
2031        self.store.checkpoint_with_index_snapshots(&snapshots)?;
2032        Ok(())
2033    }
2034
2035    /// Compact every collection with reclaimable space, merging its sealed
2036    /// segments and dropping deleted/shadowed rows. Crash-safe; a no-op for
2037    /// collections with nothing to reclaim.
2038    pub fn compact(&mut self) -> Result<()> {
2039        Ok(self.store.compact()?)
2040    }
2041
2042    /// The manifest version — the catalog generation a snapshot captures
2043    /// (ADR-0050). Surfaced as snapshot-relevant status in `database_stats`.
2044    #[must_use]
2045    pub fn manifest_version(&self) -> u64 {
2046        self.store.manifest_version()
2047    }
2048
2049    /// Best-effort total on-disk size of the data directory, in bytes — what a
2050    /// full snapshot would copy (ADR-0050). Unreadable entries are skipped.
2051    #[must_use]
2052    pub fn disk_usage_bytes(&self) -> u64 {
2053        dir_size(self.store.dir())
2054    }
2055
2056    /// Take a consistent online snapshot of the whole database into `dest`
2057    /// (which must not already exist), returning what was captured (ADR-0050).
2058    ///
2059    /// The writer lock is held for the duration: `checkpoint` seals the active
2060    /// buffer into segments and advances the WAL floor to the head, then the
2061    /// data directory is byte-copied. Opening `dest` afterwards replays an empty
2062    /// WAL tail and reconstructs the database exactly as of this call.
2063    ///
2064    /// # Errors
2065    /// [`Error::Core`] if `dest` already exists, or on any I/O error during the
2066    /// checkpoint or the copy.
2067    pub fn snapshot(&mut self, dest: &Path) -> Result<SnapshotInfo> {
2068        if dest.exists() {
2069            return Err(Error::Core(quiver_core::CoreError::AlreadyExists(
2070                dest.display().to_string(),
2071            )));
2072        }
2073        // Consistency anchor: flush to segments and advance the WAL floor so the
2074        // copied tree opens with no replay (ADR-0050).
2075        self.checkpoint()?;
2076        let (files, bytes) = copy_tree(self.store.dir(), dest)?;
2077        // ponytail: flush the snapshot root's metadata only; a backup target is
2078        // re-takeable, so per-file fsync isn't warranted. Add it if snapshots
2079        // must survive an immediate post-copy crash.
2080        let _ = std::fs::File::open(dest).and_then(|f| f.sync_all());
2081        Ok(SnapshotInfo {
2082            manifest_version: self.store.manifest_version(),
2083            files,
2084            bytes,
2085        })
2086    }
2087
2088    fn handle(&self, name: &str) -> Result<&CollectionHandle> {
2089        self.collections
2090            .get(name)
2091            .ok_or_else(|| Error::CollectionNotFound(name.to_owned()))
2092    }
2093}
2094
2095/// Restore a snapshot directory `src` (produced by [`Database::snapshot`]) into a
2096/// fresh `dest` directory, leaving it ready for the caller to open with the same
2097/// keyring/codec the snapshot was written under (ADR-0050).
2098///
2099/// `dest` must not already exist — restore never overwrites a live data
2100/// directory. The real integrity check is the caller's subsequent open (which,
2101/// for an encrypted store, is the only party holding the key); this function
2102/// only verifies the source looks like a snapshot and copies it.
2103///
2104/// # Errors
2105/// [`Error::Core`] if `dest` exists, `src` is not a snapshot (no `CURRENT`), or
2106/// on any I/O error during the copy.
2107pub fn restore_snapshot(src: &Path, dest: &Path) -> Result<SnapshotInfo> {
2108    if dest.exists() {
2109        return Err(Error::Core(quiver_core::CoreError::AlreadyExists(
2110            dest.display().to_string(),
2111        )));
2112    }
2113    if !src.join("CURRENT").exists() {
2114        return Err(Error::Core(quiver_core::CoreError::InvalidArgument(
2115            format!("{} is not a snapshot (no CURRENT)", src.display()),
2116        )));
2117    }
2118    let (files, bytes) = copy_tree(src, dest)?;
2119    Ok(SnapshotInfo {
2120        // The restored directory's manifest version is whatever the snapshot
2121        // carried; report 0 since we don't re-open here to read it (the caller's
2122        // open is authoritative). `files`/`bytes` reflect the copy.
2123        manifest_version: 0,
2124        files,
2125        bytes,
2126    })
2127}
2128
2129// Recursively copy `src` into `dst`, returning `(files, bytes)`. I/O errors are
2130// tagged with the offending path. The data directory contains only files and
2131// directories (no symlinks), so a plain walk is complete.
2132fn copy_tree(src: &Path, dst: &Path) -> Result<(u64, u64)> {
2133    std::fs::create_dir_all(dst).map_err(|e| quiver_core::CoreError::io(dst, e))?;
2134    let mut files = 0u64;
2135    let mut bytes = 0u64;
2136    for entry in std::fs::read_dir(src).map_err(|e| quiver_core::CoreError::io(src, e))? {
2137        let entry = entry.map_err(|e| quiver_core::CoreError::io(src, e))?;
2138        let from = entry.path();
2139        let to = dst.join(entry.file_name());
2140        let ft = entry
2141            .file_type()
2142            .map_err(|e| quiver_core::CoreError::io(&from, e))?;
2143        if ft.is_dir() {
2144            let (f, b) = copy_tree(&from, &to)?;
2145            files += f;
2146            bytes += b;
2147        } else {
2148            let n = std::fs::copy(&from, &to).map_err(|e| quiver_core::CoreError::io(&from, e))?;
2149            files += 1;
2150            bytes += n;
2151        }
2152    }
2153    Ok((files, bytes))
2154}
2155
2156// Best-effort recursive on-disk size (bytes) of `dir`; unreadable entries are
2157// skipped so a stats read never fails on a transient error.
2158fn dir_size(dir: &Path) -> u64 {
2159    let mut total = 0u64;
2160    let Ok(rd) = std::fs::read_dir(dir) else {
2161        return total;
2162    };
2163    for entry in rd.flatten() {
2164        let Ok(ft) = entry.file_type() else { continue };
2165        if ft.is_dir() {
2166            total += dir_size(&entry.path());
2167        } else if let Ok(meta) = entry.metadata() {
2168            total += meta.len();
2169        }
2170    }
2171    total
2172}
2173
2174// The byte separating a multi-vector document id from a token ordinal in a token
2175// row's external id (`<doc-id><US><ordinal>`): the ASCII Unit Separator, which is
2176// disallowed in user document ids (ADR-0028).
2177const DOC_TOKEN_SEP: char = '\u{1f}';
2178
2179// At or below this document count a multi-vector search scores every document
2180// exactly; above it, nearest-neighbour candidate generation over the token pool
2181// kicks in (mirrors the single-vector planner's full-scan threshold).
2182const MULTIVECTOR_EXACT_DOC_THRESHOLD: usize = 10_000;
2183
2184// Per-query-token candidate breadth for the large-corpus path: each query token
2185// retrieves about `k × this` nearest token rows before the documents are unioned.
2186const MULTIVECTOR_CANDIDATE_FACTOR: usize = 4;
2187
2188// The external id of a multi-vector document's `ordinal`-th token row.
2189fn token_id(doc_id: &str, ordinal: usize) -> String {
2190    format!("{doc_id}{DOC_TOKEN_SEP}{ordinal}")
2191}
2192
2193// Split a token row's external id back into its document id and ordinal, or `None`
2194// if it is not a token id. Splits from the right, so a document id (which cannot
2195// contain the separator) is recovered intact.
2196fn parse_token_id(ext: &str) -> Option<(&str, u32)> {
2197    let (doc, ordinal) = ext.rsplit_once(DOC_TOKEN_SEP)?;
2198    Some((doc, ordinal.parse().ok()?))
2199}
2200
2201// Reject the single-vector API on a multi-vector collection.
2202fn require_single_vector(handle: &CollectionHandle) -> Result<()> {
2203    if handle.descriptor.multivector {
2204        Err(Error::Unsupported(
2205            "collection is multi-vector; use upsert_document / search_multi_vector",
2206        ))
2207    } else {
2208        Ok(())
2209    }
2210}
2211
2212// Reject the document API on a single-vector collection.
2213fn require_multivector(handle: &CollectionHandle) -> Result<()> {
2214    if handle.descriptor.multivector {
2215        Ok(())
2216    } else {
2217        Err(Error::Unsupported(
2218            "collection is single-vector; use upsert / search",
2219        ))
2220    }
2221}
2222
2223// Reject server-side ranked search on a client-side-encrypted collection: the
2224// server holds only opaque ciphertext and cannot rank it. The client fetches the
2225// entitled set (see `Database::fetch`) and ranks locally (ADR-0032).
2226fn require_server_searchable(handle: &CollectionHandle) -> Result<()> {
2227    if handle.descriptor.vector_encryption == VectorEncryption::ClientSide {
2228        Err(Error::Unsupported(
2229            "collection is client-side encrypted; the server cannot rank opaque vectors — \
2230             fetch points and rank client-side",
2231        ))
2232    } else {
2233        Ok(())
2234    }
2235}
2236
2237fn to_index_metric(metric: DistanceMetric) -> Metric {
2238    match metric {
2239        DistanceMetric::Dot => Metric::Dot,
2240        DistanceMetric::Cosine => Metric::Cosine,
2241        DistanceMetric::L2 => Metric::L2,
2242    }
2243}
2244
2245// Reject index/metric combinations the engine cannot serve.
2246fn validate_index(descriptor: &Descriptor) -> Result<()> {
2247    // Late interaction (MaxSim) is a similarity, so multi-vector collections need a
2248    // similarity metric.
2249    if descriptor.multivector && descriptor.metric == DistanceMetric::L2 {
2250        return Err(Error::Unsupported(
2251            "multi-vector collections require a similarity metric (cosine or dot)",
2252        ));
2253    }
2254    // Client-side opaque encryption (ADR-0032) is searched by the client, not the
2255    // server, so it has no metric or index constraints — but it cannot combine with
2256    // the multi-vector document layout.
2257    if descriptor.vector_encryption == VectorEncryption::ClientSide {
2258        if descriptor.multivector {
2259            return Err(Error::Unsupported(
2260                "client-side vector encryption is not supported for multi-vector collections",
2261            ));
2262        }
2263        return Ok(());
2264    }
2265    // DCPE (ADR-0031) preserves Euclidean distance comparison; the secret scaling
2266    // changes vector norms, so cosine and dot orderings are not preserved.
2267    if descriptor.vector_encryption == VectorEncryption::Dcpe
2268        && descriptor.metric != DistanceMetric::L2
2269    {
2270        return Err(Error::Unsupported(
2271            "dcpe-encrypted collections require the l2 metric",
2272        ));
2273    }
2274    // ColBERT is a late-interaction token-pool index (ADR-0034): valid only for a
2275    // multi-vector collection (which already requires a similarity metric).
2276    if descriptor.index.kind == IndexKind::Colbert && !descriptor.multivector {
2277        return Err(Error::Unsupported(
2278            "the colbert index is only for multi-vector collections",
2279        ));
2280    }
2281    match descriptor.index.kind {
2282        IndexKind::Vamana | IndexKind::Ivf | IndexKind::DiskVamana
2283            if descriptor.metric == DistanceMetric::Dot =>
2284        {
2285            Err(Error::Unsupported(
2286                "vamana, ivf, and the disk index support l2 and cosine; use hnsw for dot",
2287            ))
2288        }
2289        _ => Ok(()),
2290    }
2291}
2292
2293// An empty index of the kind the descriptor selects. Batch kinds start unbuilt.
2294fn empty_index(descriptor: &Descriptor) -> CollectionIndex {
2295    if descriptor.vector_encryption == VectorEncryption::ClientSide {
2296        return CollectionIndex::None;
2297    }
2298    match descriptor.index.kind {
2299        IndexKind::Vamana => CollectionIndex::Vamana(None),
2300        IndexKind::DiskVamana => CollectionIndex::Disk(None),
2301        IndexKind::Ivf => CollectionIndex::Ivf(None),
2302        IndexKind::Colbert => CollectionIndex::Colbert(None),
2303        _ => CollectionIndex::Hnsw(Hnsw::new(
2304            descriptor.dim as usize,
2305            to_index_metric(descriptor.metric),
2306            HnswConfig::default(),
2307        )),
2308    }
2309}
2310
2311// A product-quantization subspace count that divides `dim`, targeting roughly
2312// eight dimensions per subspace; falls back to one whole-vector codebook.
2313fn default_pq_m(dim: usize) -> usize {
2314    let target = (dim / 8).max(1);
2315    (1..=target)
2316        .rev()
2317        .find(|&m| dim.is_multiple_of(m))
2318        .unwrap_or(1)
2319}
2320
2321// Build the descriptor's index over `ids` (internal 0..n) and their flat vectors.
2322// Fixed seed for codebook training, so a collection's disk index is reproducible.
2323const PQ_SEED: u64 = 0x5176_5044_5141_5453;
2324// The disk index artifact, overwritten in place each rebuild; the caller drops
2325// the previous handle (unmapping the file) first.
2326const DISK_INDEX_FILE: &str = "vamana.qvx";
2327
2328fn build_index(
2329    store: &Store,
2330    cid: CollectionId,
2331    descriptor: &Descriptor,
2332    ids: &[u64],
2333    flat: &[f32],
2334) -> Result<CollectionIndex> {
2335    Ok(match build_in_memory_index(descriptor, ids, flat)? {
2336        Some(index) => index,
2337        None => {
2338            let (graph, pq) = build_disk_graph_pq(descriptor, ids, flat)?;
2339            CollectionIndex::Disk(Some(FreshDiskVamana::new(write_disk_index(
2340                store, cid, &graph, &pq,
2341            )?)?))
2342        }
2343    })
2344}
2345
2346// Build the in-memory index for a collection, or `None` for the disk-resident kind
2347// whose artifact must be sealed through the store. Pure given the vectors (no
2348// store, no I/O), so the off-lock rebuild (ADR-0062) calls it without a lock; the
2349// synchronous `build_index` and disk path layer the store I/O on top.
2350fn build_in_memory_index(
2351    descriptor: &Descriptor,
2352    ids: &[u64],
2353    flat: &[f32],
2354) -> Result<Option<CollectionIndex>> {
2355    // Client-side-encrypted collections have no server-side index (ADR-0032): the
2356    // server stores opaque ciphertext it never ranks.
2357    if descriptor.vector_encryption == VectorEncryption::ClientSide {
2358        return Ok(Some(CollectionIndex::None));
2359    }
2360    let dim = descriptor.dim as usize;
2361    let metric = to_index_metric(descriptor.metric);
2362    Ok(Some(match descriptor.index.kind {
2363        IndexKind::Vamana => CollectionIndex::Vamana(Some(FreshVamana::new(Vamana::build(
2364            ids,
2365            flat,
2366            dim,
2367            metric,
2368            VamanaConfig::default(),
2369        )?)?)),
2370        // The disk-resident artifact is sealed through the store, not here.
2371        IndexKind::DiskVamana => return Ok(None),
2372        IndexKind::Ivf => {
2373            let cfg = IvfConfig {
2374                quantization: descriptor.index.pq_subspaces.map(|m| m as usize),
2375                ..IvfConfig::default()
2376            };
2377            CollectionIndex::Ivf(Some(Ivf::build(ids, flat, dim, metric, cfg)?))
2378        }
2379        IndexKind::Colbert => {
2380            // Coarse centroids scale ~√(tokens); probe a quarter of them (PLAID),
2381            // and PQ the residual with the same subspace default as the disk path.
2382            let n = ids.len();
2383            let n_centroids = ((n as f64).sqrt().ceil() as usize).clamp(1, 4096);
2384            let cfg = ColbertConfig {
2385                n_centroids,
2386                n_probe: n_centroids.div_ceil(4).clamp(1, n_centroids),
2387                pq_subspaces: descriptor
2388                    .index
2389                    .pq_subspaces
2390                    .map_or_else(|| default_pq_m(dim), |m| m as usize),
2391                seed: PQ_SEED,
2392            };
2393            CollectionIndex::Colbert(Some(ColbertIndex::build(ids, flat, dim, metric, cfg)?))
2394        }
2395        _ => {
2396            let mut h = Hnsw::new(dim, metric, HnswConfig::default());
2397            for (i, &id) in ids.iter().enumerate() {
2398                h.insert(id, &flat[i * dim..(i + 1) * dim])?;
2399            }
2400            CollectionIndex::Hnsw(h)
2401        }
2402    }))
2403}
2404
2405// Build the Vamana graph + PQ codebook for the disk-resident index. Pure (no
2406// store, no I/O), so the off-lock rebuild (ADR-0062) does the expensive graph build
2407// and PQ training without a lock; `write_disk_index` then seals the result.
2408fn build_disk_graph_pq(
2409    descriptor: &Descriptor,
2410    ids: &[u64],
2411    flat: &[f32],
2412) -> Result<(Vamana, ProductQuantizer)> {
2413    let dim = descriptor.dim as usize;
2414    let metric = to_index_metric(descriptor.metric);
2415    let graph = Vamana::build(ids, flat, dim, metric, VamanaConfig::default())?;
2416    let m = descriptor
2417        .index
2418        .pq_subspaces
2419        .map_or_else(|| default_pq_m(dim), |x| x as usize);
2420    let pq = ProductQuantizer::train(flat, ids.len(), dim, m, metric, PQ_SEED)?;
2421    Ok((graph, pq))
2422}
2423
2424// Seal a prebuilt disk artifact under the collection's index dir with the store's
2425// codec, and open it for queries. Overwrites the artifact in place, so the caller
2426// must drop any prior `DiskVamana` for this collection first (its mmap assumes an
2427// immutable file).
2428fn write_disk_index(
2429    store: &Store,
2430    cid: CollectionId,
2431    graph: &Vamana,
2432    pq: &ProductQuantizer,
2433) -> Result<DiskVamana> {
2434    let dir = store.index_dir(cid);
2435    std::fs::create_dir_all(&dir).map_err(quiver_index::DiskError::Io)?;
2436    let path = dir.join(DISK_INDEX_FILE);
2437    // Seal the index artifact with the collection's own codec (its DEK under an
2438    // envelope key-ring), so a crypto-shred of the collection also makes its index
2439    // unreadable. The same owned handle writes and then mmap-opens it.
2440    let codec = store.collection_codec_clone(cid)?;
2441    // Atomic publish (ADR-0063): write to a temp file, fsync it (disk::write does
2442    // the file sync), rename over the live name, then fsync the dir. A crash mid
2443    // write leaves the previous complete base or none — never a torn file that the
2444    // durable load path could `mmap` and serve. The caller has already dropped any
2445    // prior `DiskVamana` (its mmap assumed the old inode), so the rename is safe.
2446    let tmp = dir.join(format!("{DISK_INDEX_FILE}.tmp"));
2447    quiver_index::disk::write(&tmp, graph, pq, codec.as_ref())?;
2448    std::fs::rename(&tmp, &path).map_err(quiver_index::DiskError::Io)?;
2449    let _ = std::fs::File::open(&dir).and_then(|f| f.sync_all());
2450    open_disk_index(store, cid, codec)
2451}
2452
2453// Open a collection's sealed disk base (`vamana.qvx`) for queries, decrypting with
2454// its codec. Errors (absent/torn/wrong-codec file) propagate so the durable load
2455// path can fall back to a rebuild (ADR-0063).
2456fn open_disk_index(
2457    store: &Store,
2458    cid: CollectionId,
2459    codec: Box<dyn PageCodec>,
2460) -> Result<DiskVamana> {
2461    let path = store.index_dir(cid).join(DISK_INDEX_FILE);
2462    Ok(DiskVamana::open(&path, codec)?)
2463}
2464
2465// Load a collection's index on open: restore the durable IVF snapshot and replay
2466// the post-checkpoint tail (ADR-0025) when one is present and intact, otherwise
2467// rebuild from the store. The snapshot is only a fast path — any problem reading
2468// or restoring it falls back to the authoritative rebuild.
2469fn load_index(store: &Store, handle: &mut CollectionHandle) -> Result<()> {
2470    // Multi-vector collections always rebuild on open, so the document grouping is
2471    // derived from the live rows; the snapshot fast-paths stay single-vector.
2472    if !handle.descriptor.multivector
2473        && handle.descriptor.index.kind == IndexKind::Ivf
2474        && let Ok(Some(blob)) = store.read_index_snapshot(handle.id)
2475        && restore_ivf_snapshot(store, handle, &blob).is_ok()
2476    {
2477        return Ok(());
2478    }
2479    // Durable DiskVamana (ADR-0063): mmap the frugal base instead of an O(N)
2480    // full-RAM rebuild. Any failure falls back to rebuild, so the artifact is never
2481    // load-bearing for correctness. The derived sparse index is left `None` (as for
2482    // IVF) and hybrid falls back to the store scan until the next rebuild.
2483    // `QUIVER_DISABLE_DURABLE_DISK_INDEX` is an ops kill switch: set it to force the
2484    // (always-correct) rebuild path if the durable load is ever suspected.
2485    if !handle.descriptor.multivector
2486        && handle.descriptor.index.kind == IndexKind::DiskVamana
2487        && std::env::var_os("QUIVER_DISABLE_DURABLE_DISK_INDEX").is_none()
2488        && let Ok(Some(blob)) = store.read_index_snapshot(handle.id)
2489        && restore_disk_snapshot(store, handle, &blob).is_ok()
2490    {
2491        return Ok(());
2492    }
2493    rebuild_index(store, handle)
2494}
2495
2496// Restore a DiskVamana from its durable snapshot (ADR-0063): `mmap` the immutable
2497// base file, rebuild the in-memory FreshDiskANN delta from the store by the implied
2498// delta ids, re-apply the tombstones, then replay the post-checkpoint WAL tail. Any
2499// problem — absent/torn/mismatched base, a missing delta row — returns an error so
2500// the caller falls back to an authoritative rebuild.
2501fn restore_disk_snapshot(store: &Store, handle: &mut CollectionHandle, blob: &[u8]) -> Result<()> {
2502    let envelope: DiskEnvelope = postcard::from_bytes(blob)?;
2503    if envelope.version != INDEX_ENVELOPE_VERSION {
2504        return Err(Error::Unsupported(
2505            "unsupported disk index snapshot version",
2506        ));
2507    }
2508    let base = open_disk_index(store, handle.id, store.collection_codec_clone(handle.id)?)?;
2509    // The blob's base count must match the opened file, or the (base, blob) pair is
2510    // inconsistent (e.g. a base torn or replaced after the blob was sealed).
2511    if base.len() as u64 != envelope.base_row_count {
2512        return Err(Error::Unsupported(
2513            "disk base count disagrees with snapshot",
2514        ));
2515    }
2516    handle.ext_to_int = envelope
2517        .int_to_ext
2518        .iter()
2519        .enumerate()
2520        .map(|(i, ext)| (ext.clone(), i as u64))
2521        .collect();
2522    handle.int_to_ext = envelope.int_to_ext;
2523    let mut fresh = FreshDiskVamana::new(base)?;
2524    // Reconstruct the delta: the ids above the base were inserted since the last
2525    // consolidation; their vectors live in the store, fetched by id (a row that
2526    // died this window is simply absent and need not enter the delta).
2527    for internal in envelope.base_row_count..handle.int_to_ext.len() as u64 {
2528        let ext = &handle.int_to_ext[internal as usize];
2529        if let Some(record) = store.get(handle.id, ext)? {
2530            fresh.insert(internal, &record.vector)?;
2531        }
2532    }
2533    for id in envelope.deleted_ids {
2534        fresh.mark_deleted(id);
2535    }
2536    handle.index = CollectionIndex::Disk(Some(fresh));
2537    handle.stale = false;
2538    replay_recovery_tail(store, handle)
2539}
2540
2541// Catch a restored index up to the store's current state by replaying the
2542// post-checkpoint WAL tail through the shared incremental apply path (ADR-0025/0063):
2543// tombstones first, then active-buffer upserts, so a row shadowed this window ends
2544// with its new vector. Bounded by the checkpoint cadence, not the collection size.
2545fn replay_recovery_tail(store: &Store, handle: &mut CollectionHandle) -> Result<()> {
2546    let tail = store.recovery_tail(handle.id)?;
2547    for ext in &tail.deleted {
2548        index_delete_point(handle, ext);
2549    }
2550    for (ext, record) in tail.upserts {
2551        index_upsert_point(handle, &ext, &record.vector)?;
2552    }
2553    Ok(())
2554}
2555
2556// Restore an IVF from its snapshot envelope and catch it up to the store's
2557// current state by replaying the post-checkpoint tail (ADR-0025). Tombstoned ids
2558// are removed before the active upserts are applied, so a row shadowed this
2559// window (present in both) ends with its new vector.
2560fn restore_ivf_snapshot(store: &Store, handle: &mut CollectionHandle, blob: &[u8]) -> Result<()> {
2561    let envelope: IndexEnvelope = postcard::from_bytes(blob)?;
2562    if envelope.version != INDEX_ENVELOPE_VERSION {
2563        return Err(Error::Unsupported(
2564            "unsupported index snapshot envelope version",
2565        ));
2566    }
2567    let ivf = Ivf::restore(&envelope.ivf)?;
2568    handle.ext_to_int = envelope
2569        .int_to_ext
2570        .iter()
2571        .enumerate()
2572        .map(|(i, ext)| (ext.clone(), i as u64))
2573        .collect();
2574    handle.int_to_ext = envelope.int_to_ext;
2575    handle.index = CollectionIndex::Ivf(Some(ivf));
2576    handle.stale = false;
2577
2578    let tail = store.recovery_tail(handle.id)?;
2579    for ext in &tail.deleted {
2580        let Some(&internal) = handle.ext_to_int.get(ext) else {
2581            continue;
2582        };
2583        if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2584            ivf.remove(internal);
2585        }
2586    }
2587    for (ext, record) in tail.upserts {
2588        let internal = match handle.ext_to_int.get(&ext) {
2589            Some(&i) => i,
2590            None => {
2591                let i = handle.int_to_ext.len() as u64;
2592                handle.ext_to_int.insert(ext.clone(), i);
2593                handle.int_to_ext.push(ext);
2594                i
2595            }
2596        };
2597        if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2598            ivf.insert(internal, &record.vector)?;
2599        }
2600    }
2601    Ok(())
2602}
2603
2604// Fold one point write (`ext_id` → `vector`) into a built index incrementally,
2605// or mark the handle stale to defer to a rebuild when the kind cannot absorb it
2606// in place. HNSW absorbs a brand-new id but cannot update one (a known id falls
2607// to a rebuild); a built/trained IVF inserts or replaces any id (ADR-0023); a
2608// built FreshDiskANN graph appends to its delta and tombstones the prior copy on
2609// an update, consolidating past its churn threshold (ADR-0033). Shared by the
2610// single-vector `upsert` and each token row of a multi-vector `upsert_document`
2611// (ADR-0034). A no-op once the handle is stale (a rebuild is already pending).
2612fn index_upsert_point(handle: &mut CollectionHandle, ext_id: &str, vector: &[f32]) -> Result<()> {
2613    // MVCC mode: append to the published overlay instead of mutating the immutable
2614    // base index (ADR-0064), keeping lock-free readers race-free. Bumps the write
2615    // generation itself.
2616    if mvcc_served(handle) {
2617        overlay_upsert(handle, ext_id, vector);
2618        return Ok(());
2619    }
2620    // Every write bumps the generation, even one skipped here because a rebuild is
2621    // already pending — an in-flight off-lock rebuild must still notice it (ADR-0062).
2622    bump_write_gen(handle);
2623    if handle.stale {
2624        return Ok(());
2625    }
2626    let known = handle.ext_to_int.contains_key(ext_id);
2627    let is_hnsw = matches!(handle.index, CollectionIndex::Hnsw(_));
2628    let is_live_ivf = matches!(&handle.index, CollectionIndex::Ivf(Some(ivf)) if !ivf.is_empty());
2629    let is_live_graph = matches!(
2630        handle.index,
2631        CollectionIndex::Vamana(Some(_)) | CollectionIndex::Disk(Some(_))
2632    );
2633    let is_live_colbert = matches!(handle.index, CollectionIndex::Colbert(Some(_)));
2634    if is_hnsw && !known {
2635        let internal = handle.int_to_ext.len() as u64;
2636        if let CollectionIndex::Hnsw(h) = &mut handle.index {
2637            h.insert(internal, vector)?;
2638        }
2639        handle.ext_to_int.insert(ext_id.to_owned(), internal);
2640        handle.int_to_ext.push(ext_id.to_owned());
2641    } else if is_live_ivf {
2642        // Reuse the internal id for an in-place update; allocate a fresh, dense one
2643        // for a new id (so `int_to_ext` stays index-addressable).
2644        let internal = if known {
2645            handle.ext_to_int[ext_id]
2646        } else {
2647            let i = handle.int_to_ext.len() as u64;
2648            handle.ext_to_int.insert(ext_id.to_owned(), i);
2649            handle.int_to_ext.push(ext_id.to_owned());
2650            i
2651        };
2652        if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2653            ivf.insert(internal, vector)?;
2654        }
2655    } else if is_live_graph {
2656        // A graph cannot update a node in place: append a new delta node under a
2657        // fresh internal id and tombstone the prior copy on an update (ADR-0033).
2658        let old = handle.ext_to_int.get(ext_id).copied();
2659        let internal = handle.int_to_ext.len() as u64;
2660        let mut pending = 0.0;
2661        match &mut handle.index {
2662            CollectionIndex::Vamana(Some(fresh)) => {
2663                if let Some(o) = old {
2664                    fresh.mark_deleted(o);
2665                }
2666                fresh.insert(internal, vector)?;
2667                pending = fresh.pending_fraction();
2668            }
2669            CollectionIndex::Disk(Some(fresh)) => {
2670                if let Some(o) = old {
2671                    fresh.mark_deleted(o);
2672                }
2673                fresh.insert(internal, vector)?;
2674                pending = fresh.pending_fraction();
2675            }
2676            _ => {}
2677        }
2678        handle.ext_to_int.insert(ext_id.to_owned(), internal);
2679        handle.int_to_ext.push(ext_id.to_owned());
2680        if pending >= GRAPH_REBUILD_PENDING_FRACTION {
2681            mark_stale(handle);
2682        }
2683    } else if is_live_colbert {
2684        // ColBERT appends a new token and tombstones the prior copy on an update —
2685        // its centroids are fixed until a rebuild (ADR-0034); the deletion fraction
2686        // drives consolidation, as for HNSW.
2687        let old = handle.ext_to_int.get(ext_id).copied();
2688        let internal = handle.int_to_ext.len() as u64;
2689        let mut crowded = false;
2690        if let CollectionIndex::Colbert(Some(c)) = &mut handle.index {
2691            if let Some(o) = old {
2692                c.mark_deleted(o);
2693            }
2694            c.insert(internal, vector)?;
2695            crowded = c.deleted_fraction() >= HNSW_REBUILD_DELETED_FRACTION;
2696        }
2697        handle.ext_to_int.insert(ext_id.to_owned(), internal);
2698        handle.int_to_ext.push(ext_id.to_owned());
2699        if crowded {
2700            mark_stale(handle);
2701        }
2702    } else {
2703        mark_stale(handle);
2704    }
2705    Ok(())
2706}
2707
2708// Tombstone one point (`ext_id`) from a built index incrementally, or mark the
2709// handle stale to defer to a rebuild. A built IVF removes in place (ADR-0023), a
2710// built HNSW soft-deletes (ADR-0026), and a built FreshDiskANN graph tombstones in
2711// its deletion set (ADR-0033); each amortizes a rebuild when tombstones dominate.
2712// The id→internal mapping is kept so a later re-insert allocates afresh. Shared by
2713// the single-vector `delete` and each token row of `delete_document` (ADR-0034).
2714fn index_delete_point(handle: &mut CollectionHandle, ext_id: &str) {
2715    // MVCC mode: tombstone the id in the published overlay (ADR-0064); the base
2716    // stays immutable. Bumps the write generation itself.
2717    if mvcc_served(handle) {
2718        overlay_delete(handle, ext_id);
2719        return;
2720    }
2721    // Every write bumps the generation, even one skipped here (see `index_upsert_point`).
2722    bump_write_gen(handle);
2723    if handle.stale {
2724        return;
2725    }
2726    let internal = handle.ext_to_int.get(ext_id).copied();
2727    let live_ivf = matches!(handle.index, CollectionIndex::Ivf(Some(_)));
2728    let live_hnsw = matches!(handle.index, CollectionIndex::Hnsw(_));
2729    let live_graph = matches!(
2730        handle.index,
2731        CollectionIndex::Vamana(Some(_)) | CollectionIndex::Disk(Some(_))
2732    );
2733    let live_colbert = matches!(handle.index, CollectionIndex::Colbert(Some(_)));
2734    match internal {
2735        Some(internal) if live_ivf => {
2736            if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2737                ivf.remove(internal);
2738            }
2739        }
2740        Some(internal) if live_hnsw => {
2741            let mut crowded = false;
2742            if let CollectionIndex::Hnsw(h) = &mut handle.index {
2743                h.mark_deleted(internal as u32);
2744                crowded = h.deleted_fraction() >= HNSW_REBUILD_DELETED_FRACTION;
2745            }
2746            if crowded {
2747                mark_stale(handle);
2748            }
2749        }
2750        Some(internal) if live_graph => {
2751            let mut crowded = false;
2752            match &mut handle.index {
2753                CollectionIndex::Vamana(Some(fresh)) => {
2754                    fresh.mark_deleted(internal);
2755                    crowded = fresh.pending_fraction() >= GRAPH_REBUILD_PENDING_FRACTION;
2756                }
2757                CollectionIndex::Disk(Some(fresh)) => {
2758                    fresh.mark_deleted(internal);
2759                    crowded = fresh.pending_fraction() >= GRAPH_REBUILD_PENDING_FRACTION;
2760                }
2761                _ => {}
2762            }
2763            if crowded {
2764                mark_stale(handle);
2765            }
2766        }
2767        Some(internal) if live_colbert => {
2768            let mut crowded = false;
2769            if let CollectionIndex::Colbert(Some(c)) = &mut handle.index {
2770                c.mark_deleted(internal);
2771                crowded = c.deleted_fraction() >= HNSW_REBUILD_DELETED_FRACTION;
2772            }
2773            if crowded {
2774                mark_stale(handle);
2775            }
2776        }
2777        _ => mark_stale(handle),
2778    }
2779}
2780
2781// The product of scanning a collection's live rows: the dense id maps + contiguous
2782// vectors, the multi-vector document grouping, and the derived sparse inverted
2783// index — everything a rebuild needs from the store, owned so the build can then
2784// proceed with no store and no lock (ADR-0062).
2785struct RebuildScan {
2786    int_to_ext: Vec<String>,
2787    ext_to_int: HashMap<String, u64>,
2788    flat: Vec<f32>,
2789    docs: Option<BTreeMap<String, u32>>,
2790    sparse: Option<SparseInvertedIndex>,
2791}
2792
2793// Scan a collection's live rows into a [`RebuildScan`]. Rebuilds the derived sparse
2794// inverted index (ADR-0045) and, for a multi-vector collection, the document
2795// grouping (doc id → token count) authoritatively from those rows. `&self` on the
2796// store, so it runs under a shared read lock.
2797fn scan_collection(store: &Store, handle: &CollectionHandle) -> Result<RebuildScan> {
2798    let multivector = handle.descriptor.multivector;
2799    let mut int_to_ext = Vec::new();
2800    let mut ext_to_int = HashMap::new();
2801    let mut flat: Vec<f32> = Vec::new();
2802    let mut docs: BTreeMap<String, u32> = BTreeMap::new();
2803    // Only single-vector, server-searchable collections carry a sparse index, and
2804    // only non-empty payloads are parsed, so non-sparse collections pay nothing.
2805    let mut sparse = uses_sparse_index(&handle.descriptor).then(SparseInvertedIndex::new);
2806    for (ext_id, record) in store.scan(handle.id)? {
2807        let internal = int_to_ext.len() as u64;
2808        flat.extend_from_slice(&record.vector);
2809        if multivector && let Some((doc, _)) = parse_token_id(&ext_id) {
2810            *docs.entry(doc.to_owned()).or_insert(0) += 1;
2811        }
2812        if let Some(idx) = sparse.as_mut()
2813            && let Some(sv) = sparse_vector_from_payload(&record.payload)
2814        {
2815            idx.upsert(&ext_id, &sv);
2816        }
2817        ext_to_int.insert(ext_id.clone(), internal);
2818        int_to_ext.push(ext_id);
2819    }
2820    Ok(RebuildScan {
2821        int_to_ext,
2822        ext_to_int,
2823        flat,
2824        docs: multivector.then_some(docs),
2825        sparse,
2826    })
2827}
2828
2829// Rebuild a collection's index from the store's current live rows, synchronously
2830// under `&mut self` — the embedded/open path. The server's runtime path builds
2831// off-lock instead (ADR-0062: `snapshot_rebuild_inputs` → `build` → `commit_rebuild`).
2832fn rebuild_index(store: &Store, handle: &mut CollectionHandle) -> Result<()> {
2833    let scan = scan_collection(store, handle)?;
2834    let ids: Vec<u64> = (0..scan.int_to_ext.len() as u64).collect();
2835    // Drop the previous index before rebuilding: a disk index `mmap`s a file we
2836    // are about to overwrite in place, and the mapping assumes an immutable file.
2837    handle.index = empty_index(&handle.descriptor);
2838    handle.index = build_index(store, handle.id, &handle.descriptor, &ids, &scan.flat)?;
2839    handle.int_to_ext = scan.int_to_ext;
2840    handle.ext_to_int = scan.ext_to_int;
2841    handle.docs = scan.docs;
2842    handle.sparse = scan.sparse;
2843    handle.stale = false;
2844    Ok(())
2845}
2846
2847/// A captured, owned snapshot of everything an off-lock rebuild needs (ADR-0062):
2848/// the scanned rows, the collection's descriptor, and the write generation at
2849/// capture time. Produced under the shared read lock by
2850/// [`Database::snapshot_rebuild_inputs`]; [`RebuildInputs::build`] then constructs
2851/// the new index with no lock held.
2852pub struct RebuildInputs {
2853    collection: String,
2854    descriptor: Descriptor,
2855    scan: RebuildScan,
2856    write_gen: u64,
2857}
2858
2859// The built product of a [`RebuildInputs`], ready to install. In-memory indexes are
2860// fully built; the disk-resident artifact carries its prebuilt graph + PQ, which
2861// `commit_rebuild` seals through the store under the brief write lock (the file
2862// write must not race the prior index's `mmap`).
2863enum RebuiltKind {
2864    Ready(Box<CollectionIndex>),
2865    Disk {
2866        graph: Box<Vamana>,
2867        pq: Box<ProductQuantizer>,
2868    },
2869}
2870
2871/// A new index built off-lock from a [`RebuildInputs`], ready for
2872/// [`Database::commit_rebuild`] to install under the brief write lock (ADR-0062).
2873pub struct RebuiltIndex {
2874    collection: String,
2875    kind: RebuiltKind,
2876    int_to_ext: Vec<String>,
2877    ext_to_int: HashMap<String, u64>,
2878    docs: Option<BTreeMap<String, u32>>,
2879    sparse: Option<SparseInvertedIndex>,
2880    write_gen: u64,
2881}
2882
2883impl RebuildInputs {
2884    /// Build the new index from the captured rows with **no lock held** — the
2885    /// expensive CPU work (graph build, PQ training, HNSW insertion) of an off-lock
2886    /// rebuild (ADR-0062). The disk artifact is built in memory here; its file is
2887    /// sealed later by `commit_rebuild` under the write lock.
2888    pub fn build(self) -> Result<RebuiltIndex> {
2889        let ids: Vec<u64> = (0..self.scan.int_to_ext.len() as u64).collect();
2890        let kind = match build_in_memory_index(&self.descriptor, &ids, &self.scan.flat)? {
2891            Some(index) => RebuiltKind::Ready(Box::new(index)),
2892            None => {
2893                let (graph, pq) = build_disk_graph_pq(&self.descriptor, &ids, &self.scan.flat)?;
2894                RebuiltKind::Disk {
2895                    graph: Box::new(graph),
2896                    pq: Box::new(pq),
2897                }
2898            }
2899        };
2900        Ok(RebuiltIndex {
2901            collection: self.collection,
2902            kind,
2903            int_to_ext: self.scan.int_to_ext,
2904            ext_to_int: self.scan.ext_to_int,
2905            docs: self.scan.docs,
2906            sparse: self.scan.sparse,
2907            write_gen: self.write_gen,
2908        })
2909    }
2910}
2911
2912// Extract a point's sparse vector from its serialized payload, if it carries one
2913// under `__quiver_sparse__` and the value deserializes. An empty payload or a
2914// missing/malformed sparse vector yields `None`.
2915fn sparse_vector_from_payload(payload: &[u8]) -> Option<SparseVector> {
2916    if payload.is_empty() {
2917        return None;
2918    }
2919    let value = serde_json::from_slice::<Value>(payload).ok()?;
2920    sparse_vector_from_value(&value)
2921}
2922
2923// As [`sparse_vector_from_payload`] but over an already-parsed payload `Value`
2924// (the upsert path has the payload before it is serialized). An explicit
2925// `__quiver_sparse__` vector wins; otherwise a `__quiver_text__` string is
2926// tokenized into a term-frequency vector (ADR-0046), so a point is searchable by
2927// BM25 over text alone.
2928fn sparse_vector_from_value(payload: &Value) -> Option<SparseVector> {
2929    if let Some(raw) = payload.get(SPARSE_KEY) {
2930        return serde_json::from_value::<SparseVector>(raw.clone()).ok();
2931    }
2932    let text = payload.get(TEXT_KEY)?.as_str()?;
2933    Some(text_to_sparse(text))
2934}
2935
2936// Maintain the derived sparse inverted index for one point write (ADR-0045): index
2937// the point's sparse vector, or drop any prior entry when the new payload no longer
2938// carries one (an update that removed it). A no-op when the collection has no
2939// inverted index, or when a rebuild is pending — the rebuild repopulates the index
2940// authoritatively from the store, exactly as it does the dense index.
2941fn sparse_index_upsert_point(handle: &mut CollectionHandle, ext_id: &str, payload: &Value) {
2942    if handle.stale {
2943        return;
2944    }
2945    let Some(idx) = handle.sparse.as_mut() else {
2946        return;
2947    };
2948    match sparse_vector_from_value(payload) {
2949        Some(sv) => idx.upsert(ext_id, &sv),
2950        None => {
2951            idx.remove(ext_id);
2952        }
2953    }
2954}
2955
2956// Drop one point from the derived sparse inverted index. A no-op when the
2957// collection has no inverted index.
2958fn sparse_index_delete_point(handle: &mut CollectionHandle, ext_id: &str) {
2959    if let Some(idx) = handle.sparse.as_mut() {
2960        idx.remove(ext_id);
2961    }
2962}
2963
2964// The set of live external ids guaranteed to contain every row the `filter`
2965// accepts (a sound superset), resolved through the collection's secondary
2966// indexes. `None` means the filter cannot be narrowed with the available indexes
2967// and the caller should post-filter instead; `Some(set)` is a candidate
2968// superset, and an empty set proves no row can match.
2969//
2970// Only indexable leaves — equality, `in`, and range comparisons on declared
2971// filterable fields of a matching type — constrain the set. Everything else
2972// (negation, existence, `ne`, non-indexed fields, type mismatches) is treated as
2973// unconstrained, which keeps the result a superset; exactness is restored when
2974// the caller re-checks the full `Filter` on each surviving candidate.
2975fn candidate_ids(
2976    store: &Store,
2977    cid: CollectionId,
2978    filter: &Filter,
2979    filterable: &[FilterableField],
2980) -> Result<Option<BTreeSet<String>>> {
2981    match filter {
2982        Filter::And(subs) => {
2983            // Intersect the constrained children; an unconstrained child (None)
2984            // is dropped, which only widens the set — still a superset.
2985            let mut acc: Option<BTreeSet<String>> = None;
2986            for sub in subs {
2987                if let Some(set) = candidate_ids(store, cid, sub, filterable)? {
2988                    acc = Some(match acc {
2989                        Some(existing) => existing.intersection(&set).cloned().collect(),
2990                        None => set,
2991                    });
2992                }
2993            }
2994            Ok(acc)
2995        }
2996        Filter::Or(subs) => {
2997            // The union of the children — but one unconstrained child makes the
2998            // whole disjunction unconstrained (its rows could be anything).
2999            let mut acc = BTreeSet::new();
3000            for sub in subs {
3001                match candidate_ids(store, cid, sub, filterable)? {
3002                    Some(set) => acc.extend(set),
3003                    None => return Ok(None),
3004                }
3005            }
3006            Ok(Some(acc))
3007        }
3008        // A negation cannot be narrowed to a superset with these indexes.
3009        Filter::Not(_) => Ok(None),
3010        // A leaf: indexable ⇒ its matching ids; otherwise unconstrained.
3011        leaf => match leaf_predicate(leaf, filterable) {
3012            Some(pred) => Ok(Some(store.matching_ids(cid, &pred)?.into_iter().collect())),
3013            None => Ok(None),
3014        },
3015    }
3016}
3017
3018// Map a single filter leaf to an indexable secondary-index predicate, if its
3019// field is declared filterable and the value types line up. Boolean nodes and
3020// predicates the secondary index cannot answer (`Ne`, `Exists`) return `None`.
3021fn leaf_predicate(filter: &Filter, filterable: &[FilterableField]) -> Option<SecPredicate> {
3022    let field_type = |field: &str| {
3023        filterable
3024            .iter()
3025            .find(|f| f.path == field)
3026            .map(|f| f.field_type)
3027    };
3028    match filter {
3029        Filter::Eq { field, value } => Some(SecPredicate::Eq {
3030            field: field.clone(),
3031            value: sec_value(field_type(field)?, value)?,
3032        }),
3033        Filter::In { field, values } => {
3034            let ft = field_type(field)?;
3035            // Every value must encode, or the `in` set would be understated and
3036            // the candidate superset unsound.
3037            let values: Option<Vec<SecValue>> = values.iter().map(|v| sec_value(ft, v)).collect();
3038            Some(SecPredicate::In {
3039                field: field.clone(),
3040                values: values?,
3041            })
3042        }
3043        Filter::Lt { field, value } => {
3044            one_sided_range(field, field_type(field)?, value, false, false)
3045        }
3046        Filter::Lte { field, value } => {
3047            one_sided_range(field, field_type(field)?, value, false, true)
3048        }
3049        Filter::Gt { field, value } => {
3050            one_sided_range(field, field_type(field)?, value, true, false)
3051        }
3052        Filter::Gte { field, value } => {
3053            one_sided_range(field, field_type(field)?, value, true, true)
3054        }
3055        _ => None,
3056    }
3057}
3058
3059// Build a one-sided range predicate from a comparison leaf. `is_lower` selects
3060// whether `value` is the lower (`Gt`/`Gte`) or upper (`Lt`/`Lte`) bound;
3061// `inclusive` is that bound's inclusivity. `None` on a type mismatch.
3062fn one_sided_range(
3063    field: &str,
3064    field_type: FieldType,
3065    value: &Value,
3066    is_lower: bool,
3067    inclusive: bool,
3068) -> Option<SecPredicate> {
3069    let v = sec_value(field_type, value)?;
3070    let (lo, hi, lo_inclusive, hi_inclusive) = if is_lower {
3071        (Some(v), None, inclusive, false)
3072    } else {
3073        (None, Some(v), false, inclusive)
3074    };
3075    Some(SecPredicate::Range {
3076        field: field.to_owned(),
3077        lo,
3078        hi,
3079        lo_inclusive,
3080        hi_inclusive,
3081    })
3082}
3083
3084// Encode a filter value as a typed secondary-index value, or `None` when the
3085// JSON type does not match the field's declared type (so it cannot be
3086// pre-filtered and the planner falls back to post-filtering).
3087fn sec_value(field_type: FieldType, value: &Value) -> Option<SecValue> {
3088    match (field_type, value) {
3089        (FieldType::Keyword, Value::String(s)) => Some(SecValue::Keyword(s.clone())),
3090        (FieldType::Numeric, Value::Number(n)) => n.as_f64().map(SecValue::Numeric),
3091        _ => None,
3092    }
3093}
3094
3095#[cfg(test)]
3096mod tests {
3097    use super::*;
3098    use serde_json::json;
3099
3100    fn desc() -> Descriptor {
3101        Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
3102    }
3103
3104    fn open(dir: &Path) -> Database {
3105        Database::open(dir).unwrap()
3106    }
3107
3108    #[test]
3109    fn prepare_then_apply_matches_a_direct_write() {
3110        // A direct-write engine and a prepare→apply engine fed the same operations
3111        // must reach identical logical state — the invariant the Raft write path
3112        // (ADR-0067) relies on: the leader proposes a *prepared* op, and every
3113        // member applies it via `apply_replicated` exactly as a direct write would.
3114        let dir_a = tempfile::tempdir().unwrap();
3115        let mut direct = open(dir_a.path());
3116        let dir_b = tempfile::tempdir().unwrap();
3117        let mut raft = open(dir_b.path());
3118
3119        let vector = [1.0, 2.0, 3.0, 4.0];
3120        let payload = json!({ "k": "v" });
3121
3122        // Direct path: write, then remove one point.
3123        direct.create_collection("docs", desc()).unwrap();
3124        direct.upsert("docs", "p1", &vector, &payload).unwrap();
3125        direct
3126            .upsert("docs", "p2", &[0.0, 1.0, 0.0, 0.0], &json!({}))
3127            .unwrap();
3128        assert!(direct.delete("docs", "p2").unwrap());
3129
3130        // Raft path: prepare each op, then apply it (as a quorum-committed entry).
3131        let create = raft.prepare_create_collection("docs", &desc()).unwrap();
3132        raft.apply_replicated(create).unwrap();
3133        let up1 = raft
3134            .prepare_upsert("docs", "p1", &vector, &payload)
3135            .unwrap();
3136        raft.apply_replicated(up1).unwrap();
3137        let up2 = raft
3138            .prepare_upsert("docs", "p2", &[0.0, 1.0, 0.0, 0.0], &json!({}))
3139            .unwrap();
3140        raft.apply_replicated(up2).unwrap();
3141        let del = raft
3142            .prepare_delete("docs", "p2")
3143            .unwrap()
3144            .expect("p2 present");
3145        raft.apply_replicated(del).unwrap();
3146
3147        // Identical logical state ⇒ identical ops to recreate it.
3148        assert_eq!(
3149            direct.replication_snapshot().unwrap(),
3150            raft.replication_snapshot().unwrap(),
3151        );
3152    }
3153
3154    #[test]
3155    fn prepare_validates_like_the_direct_path() {
3156        let dir = tempfile::tempdir().unwrap();
3157        let mut db = open(dir.path());
3158        db.create_collection("docs", desc()).unwrap();
3159        // A dimensionality mismatch is rejected at prepare time.
3160        assert!(
3161            db.prepare_upsert("docs", "p", &[1.0, 2.0], &json!({}))
3162                .is_err()
3163        );
3164        // A duplicate collection name is rejected at prepare time.
3165        assert!(db.prepare_create_collection("docs", &desc()).is_err());
3166        // An unknown collection is not found.
3167        assert!(
3168            db.prepare_upsert("nope", "p", &[1.0, 2.0, 3.0, 4.0], &json!({}))
3169                .is_err()
3170        );
3171        // Deleting a missing point prepares to nothing (no op); an unknown
3172        // collection is not found.
3173        assert!(db.prepare_delete("docs", "ghost").unwrap().is_none());
3174        assert!(db.prepare_delete("nope", "x").is_err());
3175    }
3176
3177    #[test]
3178    fn hybrid_search_fuses_dense_and_sparse() {
3179        let tmp = tempfile::tempdir().unwrap();
3180        let mut db = open(tmp.path());
3181        db.create_collection("kb", desc()).unwrap();
3182        // "a" is the nearest dense neighbour of the query; "b" shares the query's
3183        // sparse terms; "c" is good on neither.
3184        db.upsert(
3185            "kb",
3186            "a",
3187            &[1.0, 0.0, 0.0, 0.0],
3188            &json!({ "__quiver_sparse__": { "indices": [100], "values": [0.1] } }),
3189        )
3190        .unwrap();
3191        db.upsert(
3192            "kb",
3193            "b",
3194            &[0.0, 1.0, 0.0, 0.0],
3195            &json!({ "__quiver_sparse__": { "indices": [1, 2], "values": [5.0, 5.0] } }),
3196        )
3197        .unwrap();
3198        db.upsert(
3199            "kb",
3200            "c",
3201            &[0.0, 0.0, 0.0, 1.0],
3202            &json!({ "__quiver_sparse__": { "indices": [9], "values": [1.0] } }),
3203        )
3204        .unwrap();
3205
3206        let dense_q = [1.0, 0.0, 0.0, 0.0];
3207        let sparse_q = SparseVector {
3208            indices: vec![1, 2],
3209            values: vec![1.0, 1.0],
3210        };
3211        let params = SearchParams {
3212            k: 3,
3213            ..SearchParams::default()
3214        };
3215
3216        // Hybrid: "a" (dense) and "b" (sparse) both rank above "c" (neither).
3217        let hits = db
3218            .hybrid_search(
3219                "kb",
3220                Some(&dense_q),
3221                Some(&sparse_q),
3222                None,
3223                &params,
3224                DEFAULT_RRF_K0,
3225            )
3226            .unwrap();
3227        let ids: Vec<&str> = hits.iter().map(|m| m.id.as_str()).collect();
3228        assert!(ids.contains(&"a") && ids.contains(&"b"), "got {ids:?}");
3229        assert_eq!(ids[2], "c", "c is worst on both sides; got {ids:?}");
3230
3231        // Pure sparse: only "b" shares the query's terms.
3232        let sparse_only = db
3233            .hybrid_search("kb", None, Some(&sparse_q), None, &params, DEFAULT_RRF_K0)
3234            .unwrap();
3235        assert_eq!(sparse_only[0].id, "b");
3236
3237        // Pure dense: "a" is nearest.
3238        let dense_only = db
3239            .hybrid_search("kb", Some(&dense_q), None, None, &params, DEFAULT_RRF_K0)
3240            .unwrap();
3241        assert_eq!(dense_only[0].id, "a");
3242
3243        // Neither query is an error.
3244        assert!(
3245            db.hybrid_search("kb", None, None, None, &params, DEFAULT_RRF_K0)
3246                .is_err()
3247        );
3248    }
3249
3250    // Pure-sparse hybrid result ids, in fused order.
3251    fn sparse_ids(db: &mut Database, q: &SparseVector) -> Vec<String> {
3252        let params = SearchParams {
3253            k: 10,
3254            ..SearchParams::default()
3255        };
3256        db.hybrid_search("kb", None, Some(q), None, &params, DEFAULT_RRF_K0)
3257            .unwrap()
3258            .into_iter()
3259            .map(|m| m.id)
3260            .collect()
3261    }
3262
3263    #[test]
3264    fn sparse_index_equals_the_store_scan_fallback() {
3265        let tmp = tempfile::tempdir().unwrap();
3266        let mut db = open(tmp.path());
3267        db.create_collection("kb", desc()).unwrap();
3268        let z = [0.0f32, 0.0, 0.0, 0.0];
3269        for (id, dims, vals) in [
3270            ("a", vec![1u32, 2], vec![5.0f32, 1.0]),
3271            ("b", vec![2u32, 3], vec![3.0f32, 4.0]),
3272            ("c", vec![1u32, 3], vec![2.0f32, 2.0]),
3273            ("d", vec![9u32], vec![1.0f32]), // shares no query term
3274        ] {
3275            db.upsert(
3276                "kb",
3277                id,
3278                &z,
3279                &json!({ "__quiver_sparse__": { "indices": dims, "values": vals } }),
3280            )
3281            .unwrap();
3282        }
3283        let q = SparseVector {
3284            indices: vec![1, 2, 3],
3285            values: vec![1.0, 1.0, 1.0],
3286        };
3287
3288        // The derived index is present and used.
3289        assert!(db.collections.get("kb").unwrap().sparse.is_some());
3290        let via_index = sparse_ids(&mut db, &q);
3291        assert!(!via_index.contains(&"d".to_owned()), "d shares no term");
3292
3293        // Drop the index (not stale, so no rebuild) → the store-scan fallback runs
3294        // and must return the identical ranking.
3295        db.collections.get_mut("kb").unwrap().sparse = None;
3296        let via_scan = sparse_ids(&mut db, &q);
3297        assert_eq!(via_index, via_scan);
3298    }
3299
3300    #[test]
3301    fn sparse_index_reflects_updates_and_deletes_like_a_rebuild() {
3302        let tmp = tempfile::tempdir().unwrap();
3303        let mut db = open(tmp.path());
3304        db.create_collection("kb", desc()).unwrap();
3305        let z = [0.0f32, 0.0, 0.0, 0.0];
3306        db.upsert(
3307            "kb",
3308            "a",
3309            &z,
3310            &json!({ "__quiver_sparse__": { "indices": [1, 2], "values": [5.0, 5.0] } }),
3311        )
3312        .unwrap();
3313        db.upsert(
3314            "kb",
3315            "b",
3316            &z,
3317            &json!({ "__quiver_sparse__": { "indices": [2], "values": [3.0] } }),
3318        )
3319        .unwrap();
3320        db.upsert(
3321            "kb",
3322            "c",
3323            &z,
3324            &json!({ "__quiver_sparse__": { "indices": [1], "values": [9.0] } }),
3325        )
3326        .unwrap();
3327        // Update "a" onto a disjoint term; delete "b".
3328        db.upsert(
3329            "kb",
3330            "a",
3331            &z,
3332            &json!({ "__quiver_sparse__": { "indices": [7], "values": [1.0] } }),
3333        )
3334        .unwrap();
3335        assert!(db.delete("kb", "b").unwrap());
3336
3337        let q = SparseVector {
3338            indices: vec![1, 2],
3339            values: vec![1.0, 1.0],
3340        };
3341        // Only "c" still shares a query term ("a" moved to 7, "b" is gone).
3342        let incremental = sparse_ids(&mut db, &q);
3343        assert_eq!(incremental, vec!["c".to_owned()]);
3344
3345        // A full rebuild from the store must agree with the incremental state.
3346        db.collections.get_mut("kb").unwrap().stale = true;
3347        let rebuilt = sparse_ids(&mut db, &q);
3348        assert_eq!(incremental, rebuilt);
3349    }
3350
3351    #[test]
3352    fn sparse_index_is_rebuilt_on_reopen() {
3353        let tmp = tempfile::tempdir().unwrap();
3354        {
3355            let mut db = open(tmp.path());
3356            db.create_collection("kb", desc()).unwrap();
3357            db.upsert(
3358                "kb",
3359                "a",
3360                &[0.0, 0.0, 0.0, 0.0],
3361                &json!({ "__quiver_sparse__": { "indices": [1], "values": [1.0] } }),
3362            )
3363            .unwrap();
3364        }
3365        let mut db = open(tmp.path());
3366        assert!(db.collections.get("kb").unwrap().sparse.is_some());
3367        let q = SparseVector {
3368            indices: vec![1],
3369            values: vec![1.0],
3370        };
3371        assert_eq!(sparse_ids(&mut db, &q), vec!["a".to_owned()]);
3372    }
3373
3374    #[test]
3375    fn hybrid_sparse_honours_the_payload_filter() {
3376        let tmp = tempfile::tempdir().unwrap();
3377        let mut db = open(tmp.path());
3378        db.create_collection("kb", desc()).unwrap();
3379        let z = [0.0f32, 0.0, 0.0, 0.0];
3380        db.upsert(
3381            "kb",
3382            "a",
3383            &z,
3384            &json!({ "lang": "en", "__quiver_sparse__": { "indices": [1], "values": [5.0] } }),
3385        )
3386        .unwrap();
3387        db.upsert(
3388            "kb",
3389            "b",
3390            &z,
3391            &json!({ "lang": "fr", "__quiver_sparse__": { "indices": [1], "values": [9.0] } }),
3392        )
3393        .unwrap();
3394        let q = SparseVector {
3395            indices: vec![1],
3396            values: vec![1.0],
3397        };
3398        let params = SearchParams {
3399            k: 10,
3400            filter: Some(Filter::Eq {
3401                field: "lang".to_owned(),
3402                value: json!("en"),
3403            }),
3404            ..SearchParams::default()
3405        };
3406        let hits: Vec<String> = db
3407            .hybrid_search("kb", None, Some(&q), None, &params, DEFAULT_RRF_K0)
3408            .unwrap()
3409            .into_iter()
3410            .map(|m| m.id)
3411            .collect();
3412        // "b" scores higher but is filtered out by lang == "en".
3413        assert_eq!(hits, vec!["a".to_owned()]);
3414    }
3415
3416    #[test]
3417    fn hybrid_text_search_indexes_and_ranks_by_bm25() {
3418        let tmp = tempfile::tempdir().unwrap();
3419        let mut db = open(tmp.path());
3420        db.create_collection("kb", desc()).unwrap();
3421        let z = [0.0f32, 0.0, 0.0, 0.0];
3422        // Points carry only a `__quiver_text__` string — tokenized at ingest.
3423        db.upsert(
3424            "kb",
3425            "cats",
3426            &z,
3427            &json!({ "__quiver_text__": "the quick brown cat jumps" }),
3428        )
3429        .unwrap();
3430        db.upsert(
3431            "kb",
3432            "dogs",
3433            &z,
3434            &json!({ "__quiver_text__": "a lazy dog sleeps all day" }),
3435        )
3436        .unwrap();
3437
3438        let params = SearchParams {
3439            k: 10,
3440            ..SearchParams::default()
3441        };
3442        // A text query (no dense, no explicit sparse) ranks the lexical match first;
3443        // "cat"/"cats" conflate via the stemmer.
3444        let hits: Vec<String> = db
3445            .hybrid_search("kb", None, None, Some("cats"), &params, DEFAULT_RRF_K0)
3446            .unwrap()
3447            .into_iter()
3448            .map(|m| m.id)
3449            .collect();
3450        assert_eq!(hits, vec!["cats".to_owned()], "only the cat doc matches");
3451
3452        // A non-matching query returns nothing from the text side.
3453        assert!(
3454            db.hybrid_search("kb", None, None, Some("elephant"), &params, DEFAULT_RRF_K0)
3455                .unwrap()
3456                .is_empty()
3457        );
3458
3459        // Text fuses with a dense query through the same RRF path.
3460        let dense_q = [1.0, 0.0, 0.0, 0.0];
3461        db.upsert("kb", "near", &[1.0, 0.0, 0.0, 0.0], &json!({}))
3462            .unwrap();
3463        let fused: Vec<String> = db
3464            .hybrid_search(
3465                "kb",
3466                Some(&dense_q),
3467                None,
3468                Some("dog"),
3469                &params,
3470                DEFAULT_RRF_K0,
3471            )
3472            .unwrap()
3473            .into_iter()
3474            .map(|m| m.id)
3475            .collect();
3476        assert!(
3477            fused.contains(&"near".to_owned()) && fused.contains(&"dogs".to_owned()),
3478            "dense match + lexical match both surface; got {fused:?}"
3479        );
3480    }
3481
3482    #[test]
3483    fn create_upsert_search_get_end_to_end() {
3484        let tmp = tempfile::tempdir().unwrap();
3485        let mut db = open(tmp.path());
3486        db.create_collection("items", desc()).unwrap();
3487        db.upsert(
3488            "items",
3489            "a",
3490            &[0.0, 0.0, 0.0, 0.0],
3491            &json!({"color": "red"}),
3492        )
3493        .unwrap();
3494        db.upsert(
3495            "items",
3496            "b",
3497            &[1.0, 0.0, 0.0, 0.0],
3498            &json!({"color": "blue"}),
3499        )
3500        .unwrap();
3501        db.upsert(
3502            "items",
3503            "c",
3504            &[5.0, 5.0, 5.0, 5.0],
3505            &json!({"color": "red"}),
3506        )
3507        .unwrap();
3508
3509        let near = db
3510            .search("items", &[0.1, 0.0, 0.0, 0.0], &SearchParams::default())
3511            .unwrap();
3512        assert_eq!(near[0].id, "a");
3513        assert_eq!(near[1].id, "b");
3514
3515        let got = db.get("items", "c").unwrap().unwrap();
3516        assert_eq!(got.vector, Some(vec![5.0, 5.0, 5.0, 5.0]));
3517        assert_eq!(got.payload, Some(json!({"color": "red"})));
3518    }
3519
3520    #[test]
3521    fn upsert_batch_produces_same_search_results_as_sequential() {
3522        let tmp_seq = tempfile::tempdir().unwrap();
3523        let tmp_bat = tempfile::tempdir().unwrap();
3524
3525        let vectors: Vec<[f32; 4]> = (0..20u32).map(|i| [i as f32, 0.0, 0.0, 0.0]).collect();
3526        let ids: Vec<String> = (0..20u32).map(|i| format!("p{i}")).collect();
3527        let payload = json!({});
3528
3529        // Sequential upserts
3530        {
3531            let mut db = open(tmp_seq.path());
3532            db.create_collection("c", desc()).unwrap();
3533            for (id, vec) in ids.iter().zip(vectors.iter()) {
3534                db.upsert("c", id, vec, &payload).unwrap();
3535            }
3536        }
3537        // Batch upsert
3538        {
3539            let mut db = open(tmp_bat.path());
3540            db.create_collection("c", desc()).unwrap();
3541            let pts: Vec<(&str, &[f32], &serde_json::Value)> = ids
3542                .iter()
3543                .zip(vectors.iter())
3544                .map(|(id, v)| (id.as_str(), v.as_slice(), &payload))
3545                .collect();
3546            let n = db.upsert_batch("c", &pts).unwrap();
3547            assert_eq!(n, 20);
3548        }
3549
3550        let query = [10.0f32, 0.0, 0.0, 0.0];
3551        let params = SearchParams {
3552            k: 5,
3553            ..Default::default()
3554        };
3555
3556        let mut seq_db = open(tmp_seq.path());
3557        let mut bat_db = open(tmp_bat.path());
3558        let seq: Vec<String> = seq_db
3559            .search("c", &query, &params)
3560            .unwrap()
3561            .into_iter()
3562            .map(|m| m.id)
3563            .collect();
3564        let bat: Vec<String> = bat_db
3565            .search("c", &query, &params)
3566            .unwrap()
3567            .into_iter()
3568            .map(|m| m.id)
3569            .collect();
3570        assert_eq!(
3571            seq, bat,
3572            "batch and sequential produce different search results"
3573        );
3574    }
3575
3576    #[test]
3577    fn upsert_bulk_defers_the_index_then_searches_correctly() {
3578        let tmp = tempfile::tempdir().unwrap();
3579        let mut db = open(tmp.path());
3580        db.create_collection("c", desc()).unwrap();
3581        let vectors: Vec<[f32; 4]> = (0..20u32).map(|i| [i as f32, 0.0, 0.0, 0.0]).collect();
3582        let ids: Vec<String> = (0..20u32).map(|i| format!("p{i}")).collect();
3583        // Give one point a sparse vector so the deferred rebuild also rebuilds the
3584        // inverted index.
3585        let plain = json!({});
3586        let sparse_payload = json!({ "__quiver_sparse__": { "indices": [7], "values": [1.0] } });
3587        let pts: Vec<(&str, &[f32], &serde_json::Value)> = ids
3588            .iter()
3589            .zip(vectors.iter())
3590            .map(|(id, v)| {
3591                let payload = if id == "p3" { &sparse_payload } else { &plain };
3592                (id.as_str(), v.as_slice(), payload)
3593            })
3594            .collect();
3595        let n = db.upsert_bulk("c", &pts).unwrap();
3596        assert_eq!(n, 20);
3597
3598        // The bulk path defers index maintenance: the handle is stale until a read.
3599        assert!(db.collections.get("c").unwrap().stale);
3600
3601        // Dense search triggers the single rebuild and returns the nearest points.
3602        let query = [10.0f32, 0.0, 0.0, 0.0];
3603        let params = SearchParams {
3604            k: 5,
3605            ..Default::default()
3606        };
3607        let hits: Vec<String> = db
3608            .search("c", &query, &params)
3609            .unwrap()
3610            .into_iter()
3611            .map(|m| m.id)
3612            .collect();
3613        assert_eq!(hits[0], "p10", "nearest to 10 is p10; got {hits:?}");
3614        assert!(!db.collections.get("c").unwrap().stale, "rebuilt on read");
3615
3616        // The sparse vector loaded via the bulk path is searchable after the rebuild.
3617        let q = SparseVector {
3618            indices: vec![7],
3619            values: vec![1.0],
3620        };
3621        let sparse_hits: Vec<String> = db
3622            .hybrid_search("c", None, Some(&q), None, &params, DEFAULT_RRF_K0)
3623            .unwrap()
3624            .into_iter()
3625            .map(|m| m.id)
3626            .collect();
3627        assert_eq!(sparse_hits, vec!["p3".to_owned()]);
3628    }
3629
3630    #[test]
3631    fn filtered_search_only_returns_matching_payloads() {
3632        let tmp = tempfile::tempdir().unwrap();
3633        let mut db = open(tmp.path());
3634        db.create_collection("items", desc()).unwrap();
3635        for i in 0..20u32 {
3636            let color = if i % 2 == 0 { "red" } else { "blue" };
3637            db.upsert(
3638                "items",
3639                &format!("p{i}"),
3640                &[i as f32, 0.0, 0.0, 0.0],
3641                &json!({"color": color, "n": i}),
3642            )
3643            .unwrap();
3644        }
3645        let params = SearchParams {
3646            k: 5,
3647            filter: Some(Filter::Eq {
3648                field: "color".into(),
3649                value: json!("red"),
3650            }),
3651            ef_search: 64,
3652            with_payload: true,
3653            with_vector: false,
3654        };
3655        let results = db.search("items", &[0.0; 4], &params).unwrap();
3656        assert!(!results.is_empty());
3657        for m in &results {
3658            assert_eq!(m.payload.as_ref().unwrap()["color"], json!("red"));
3659        }
3660    }
3661
3662    #[test]
3663    fn persists_and_rebuilds_index_on_reopen() {
3664        let tmp = tempfile::tempdir().unwrap();
3665        {
3666            let mut db = open(tmp.path());
3667            db.create_collection("items", desc()).unwrap();
3668            for i in 0..50u32 {
3669                db.upsert(
3670                    "items",
3671                    &format!("p{i}"),
3672                    &[i as f32, 1.0, 2.0, 3.0],
3673                    &json!({}),
3674                )
3675                .unwrap();
3676            }
3677            db.checkpoint().unwrap();
3678        }
3679        let mut db = open(tmp.path());
3680        assert_eq!(db.len("items").unwrap(), 50);
3681        let res = db
3682            .search("items", &[7.0, 1.0, 2.0, 3.0], &SearchParams::default())
3683            .unwrap();
3684        assert_eq!(res[0].id, "p7");
3685    }
3686
3687    #[test]
3688    fn update_reflects_new_vector_after_rebuild() {
3689        let tmp = tempfile::tempdir().unwrap();
3690        let mut db = open(tmp.path());
3691        db.create_collection("items", desc()).unwrap();
3692        db.upsert("items", "a", &[0.0; 4], &json!({})).unwrap();
3693        db.upsert("items", "b", &[10.0, 0.0, 0.0, 0.0], &json!({}))
3694            .unwrap();
3695        // Move "a" far away; querying near the origin should now prefer "b".
3696        db.upsert("items", "a", &[100.0, 0.0, 0.0, 0.0], &json!({}))
3697            .unwrap();
3698        let res = db
3699            .search("items", &[0.0; 4], &SearchParams::default())
3700            .unwrap();
3701        assert_eq!(res[0].id, "b");
3702        assert_eq!(
3703            db.get("items", "a").unwrap().unwrap().vector,
3704            Some(vec![100.0, 0.0, 0.0, 0.0])
3705        );
3706    }
3707
3708    #[test]
3709    fn delete_removes_from_search() {
3710        let tmp = tempfile::tempdir().unwrap();
3711        let mut db = open(tmp.path());
3712        db.create_collection("items", desc()).unwrap();
3713        db.upsert("items", "a", &[0.0; 4], &json!({})).unwrap();
3714        db.upsert("items", "b", &[1.0, 0.0, 0.0, 0.0], &json!({}))
3715            .unwrap();
3716        assert!(db.delete("items", "a").unwrap());
3717        let res = db
3718            .search("items", &[0.0; 4], &SearchParams::default())
3719            .unwrap();
3720        assert!(res.iter().all(|m| m.id != "a"));
3721        assert!(db.get("items", "a").unwrap().is_none());
3722    }
3723
3724    #[test]
3725    fn unknown_collection_errors() {
3726        let tmp = tempfile::tempdir().unwrap();
3727        let mut db = open(tmp.path());
3728        assert!(matches!(
3729            db.search("nope", &[0.0; 4], &SearchParams::default()),
3730            Err(Error::CollectionNotFound(_))
3731        ));
3732        db.create_collection("c", desc()).unwrap();
3733        assert!(matches!(
3734            db.create_collection("c", desc()),
3735            Err(Error::Core(quiver_core::CoreError::AlreadyExists(_)))
3736        ));
3737    }
3738
3739    fn desc_with(kind: IndexKind) -> Descriptor {
3740        Descriptor::new(4, Dtype::F32, DistanceMetric::L2).with_index(IndexSpec {
3741            kind,
3742            pq_subspaces: None,
3743        })
3744    }
3745
3746    #[test]
3747    fn vamana_and_ivf_collections_find_the_nearest_point() {
3748        for kind in [IndexKind::Vamana, IndexKind::Ivf] {
3749            let tmp = tempfile::tempdir().unwrap();
3750            let mut db = open(tmp.path());
3751            db.create_collection("c", desc_with(kind)).unwrap();
3752            for i in 0..40u32 {
3753                db.upsert(
3754                    "c",
3755                    &format!("p{i}"),
3756                    &[i as f32, 0.0, 0.0, 0.0],
3757                    &json!({}),
3758                )
3759                .unwrap();
3760            }
3761            // ef_search maps onto the index's search width (l_search / nprobe).
3762            let res = db
3763                .search("c", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
3764                .unwrap();
3765            assert_eq!(res[0].id, "p7", "{kind:?} nearest");
3766        }
3767    }
3768
3769    #[test]
3770    fn index_kind_persists_and_rebuilds_on_reopen() {
3771        let tmp = tempfile::tempdir().unwrap();
3772        {
3773            let mut db = open(tmp.path());
3774            db.create_collection("v", desc_with(IndexKind::Vamana))
3775                .unwrap();
3776            for i in 0..20u32 {
3777                db.upsert(
3778                    "v",
3779                    &format!("p{i}"),
3780                    &[i as f32, 1.0, 2.0, 3.0],
3781                    &json!({}),
3782                )
3783                .unwrap();
3784            }
3785            db.checkpoint().unwrap();
3786        }
3787        let mut db = open(tmp.path());
3788        assert_eq!(db.descriptor("v").unwrap().index.kind, IndexKind::Vamana);
3789        let res = db
3790            .search("v", &[7.0, 1.0, 2.0, 3.0], &SearchParams::default())
3791            .unwrap();
3792        assert_eq!(res[0].id, "p7");
3793    }
3794
3795    // ADR-0063: a DiskVamana collection reopens by loading its frugal mmap base +
3796    // replaying the WAL tail, not by an O(N) full-RAM rebuild. Proven structurally:
3797    // the on-disk base keeps its checkpoint row count after reopen (a rebuild would
3798    // rewrite it to include the post-checkpoint inserts).
3799    #[test]
3800    fn disk_index_loads_from_snapshot_without_rebuild_on_reopen() {
3801        let tmp = tempfile::tempdir().unwrap();
3802        let cid;
3803        {
3804            let mut db = open(tmp.path());
3805            db.create_collection("d", desc_with(IndexKind::DiskVamana))
3806                .unwrap();
3807            for i in 0..100u32 {
3808                db.upsert(
3809                    "d",
3810                    &format!("p{i}"),
3811                    &[i as f32, 0.0, 0.0, 0.0],
3812                    &json!({}),
3813                )
3814                .unwrap();
3815            }
3816            // Force the deferred build so the base file + checkpoint blob exist.
3817            db.search("d", &[1.0, 0.0, 0.0, 0.0], &SearchParams::default())
3818                .unwrap();
3819            db.checkpoint().unwrap();
3820            // 15 more after the checkpoint (< the 20% consolidation threshold): they
3821            // land in the in-memory delta + the WAL tail, not the base file.
3822            for i in 100..115u32 {
3823                db.upsert(
3824                    "d",
3825                    &format!("p{i}"),
3826                    &[i as f32, 0.0, 0.0, 0.0],
3827                    &json!({}),
3828                )
3829                .unwrap();
3830            }
3831            cid = db.collections["d"].id;
3832            let base = open_disk_index(
3833                &db.store,
3834                cid,
3835                db.store.collection_codec_clone(cid).unwrap(),
3836            )
3837            .unwrap();
3838            assert_eq!(base.len(), 100, "base sealed at the checkpoint count");
3839        }
3840
3841        let mut db = open(tmp.path());
3842        // A base point and a post-checkpoint (WAL-tail-replayed) point are both found.
3843        assert_eq!(
3844            db.search("d", &[50.0, 0.0, 0.0, 0.0], &SearchParams::default())
3845                .unwrap()[0]
3846                .id,
3847            "p50",
3848        );
3849        assert_eq!(
3850            db.search("d", &[110.0, 0.0, 0.0, 0.0], &SearchParams::default())
3851                .unwrap()[0]
3852                .id,
3853            "p110",
3854            "post-checkpoint insert survived reopen via WAL-tail replay",
3855        );
3856        // Loaded, not rebuilt: the base file still holds exactly the 100 checkpoint
3857        // rows. A rebuild on open would have rewritten it over all 115 live rows.
3858        let base = open_disk_index(
3859            &db.store,
3860            cid,
3861            db.store.collection_codec_clone(cid).unwrap(),
3862        )
3863        .unwrap();
3864        assert_eq!(
3865            base.len(),
3866            100,
3867            "reopen loaded the base; it was not rebuilt"
3868        );
3869    }
3870
3871    // ADR-0063: the durable artifact is never load-bearing for correctness — a
3872    // missing/torn base falls back to an authoritative rebuild that still answers.
3873    #[test]
3874    fn disk_index_falls_back_to_rebuild_when_base_is_missing() {
3875        let tmp = tempfile::tempdir().unwrap();
3876        let base_path;
3877        {
3878            let mut db = open(tmp.path());
3879            db.create_collection("d", desc_with(IndexKind::DiskVamana))
3880                .unwrap();
3881            for i in 0..60u32 {
3882                db.upsert(
3883                    "d",
3884                    &format!("p{i}"),
3885                    &[i as f32, 0.0, 0.0, 0.0],
3886                    &json!({}),
3887                )
3888                .unwrap();
3889            }
3890            db.search("d", &[1.0, 0.0, 0.0, 0.0], &SearchParams::default())
3891                .unwrap();
3892            db.checkpoint().unwrap();
3893            let cid = db.collections["d"].id;
3894            base_path = db.store.index_dir(cid).join(DISK_INDEX_FILE);
3895        }
3896        // Simulate a lost base: remove the file the snapshot references.
3897        std::fs::remove_file(&base_path).unwrap();
3898        {
3899            let mut db = open(tmp.path());
3900            assert_eq!(
3901                db.search("d", &[25.0, 0.0, 0.0, 0.0], &SearchParams::default())
3902                    .unwrap()[0]
3903                    .id,
3904                "p25",
3905                "rebuild fallback still answers correctly after a lost base",
3906            );
3907            // The fallback rebuild re-seals the base, and a subsequent checkpoint
3908            // refreshes the snapshot blob to match it.
3909            assert!(
3910                base_path.exists(),
3911                "the fallback rebuild re-sealed the base file"
3912            );
3913            db.checkpoint().unwrap();
3914        }
3915        // Simulate a torn base: truncate the file mid-way. `DiskVamana::open`'s
3916        // per-page checks reject it, so the load falls back to a rebuild.
3917        let len = std::fs::metadata(&base_path).unwrap().len();
3918        std::fs::OpenOptions::new()
3919            .write(true)
3920            .open(&base_path)
3921            .unwrap()
3922            .set_len(len / 2)
3923            .unwrap();
3924
3925        let mut db = open(tmp.path());
3926        assert_eq!(
3927            db.search("d", &[25.0, 0.0, 0.0, 0.0], &SearchParams::default())
3928                .unwrap()[0]
3929                .id,
3930            "p25",
3931            "rebuild fallback still answers correctly after a torn base",
3932        );
3933    }
3934
3935    #[test]
3936    fn ivf_upserts_and_deletes_incrementally_without_rebuild() {
3937        let tmp = tempfile::tempdir().unwrap();
3938        let mut db = open(tmp.path());
3939        db.create_collection("c", desc_with(IndexKind::Ivf))
3940            .unwrap();
3941        for i in 0..50u32 {
3942            db.upsert(
3943                "c",
3944                &format!("p{i}"),
3945                &[i as f32, 0.0, 0.0, 0.0],
3946                &json!({}),
3947            )
3948            .unwrap();
3949        }
3950        // The first search builds (and trains) the IVF from the store.
3951        let _ = db
3952            .search("c", &[1.0, 0.0, 0.0, 0.0], &SearchParams::default())
3953            .unwrap();
3954        assert!(!db.collections["c"].stale, "the search built the index");
3955
3956        // Incremental insert: a new outlier is found, with no rebuild scheduled.
3957        db.upsert("c", "far", &[500.0, 0.0, 0.0, 0.0], &json!({}))
3958            .unwrap();
3959        assert!(!db.collections["c"].stale, "ivf insert stayed incremental");
3960        let res = db
3961            .search("c", &[500.0, 0.0, 0.0, 0.0], &SearchParams::default())
3962            .unwrap();
3963        assert_eq!(res[0].id, "far");
3964
3965        // Incremental delete: the point disappears, still with no rebuild.
3966        assert!(db.delete("c", "far").unwrap());
3967        assert!(!db.collections["c"].stale, "ivf delete stayed incremental");
3968        let res = db
3969            .search("c", &[500.0, 0.0, 0.0, 0.0], &SearchParams::default())
3970            .unwrap();
3971        assert!(res.iter().all(|m| m.id != "far"), "deleted point is gone");
3972    }
3973
3974    #[test]
3975    fn ivf_incremental_update_replaces_the_vector() {
3976        let tmp = tempfile::tempdir().unwrap();
3977        let mut db = open(tmp.path());
3978        db.create_collection("c", desc_with(IndexKind::Ivf))
3979            .unwrap();
3980        for i in 0..30u32 {
3981            db.upsert(
3982                "c",
3983                &format!("p{i}"),
3984                &[i as f32, 0.0, 0.0, 0.0],
3985                &json!({}),
3986            )
3987            .unwrap();
3988        }
3989        let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
3990        // Move p5 far away in place.
3991        db.upsert("c", "p5", &[900.0, 0.0, 0.0, 0.0], &json!({}))
3992            .unwrap();
3993        assert!(!db.collections["c"].stale);
3994        let at_new = db
3995            .search("c", &[900.0, 0.0, 0.0, 0.0], &SearchParams::default())
3996            .unwrap();
3997        assert_eq!(at_new[0].id, "p5", "p5 found at its new location");
3998        let at_old = db
3999            .search("c", &[5.0, 0.0, 0.0, 0.0], &SearchParams::default())
4000            .unwrap();
4001        assert!(at_old.iter().all(|m| m.id != "p5"), "stale vector is gone");
4002    }
4003
4004    #[test]
4005    fn ivf_reinsert_after_incremental_delete_is_found() {
4006        let tmp = tempfile::tempdir().unwrap();
4007        let mut db = open(tmp.path());
4008        db.create_collection("c", desc_with(IndexKind::Ivf))
4009            .unwrap();
4010        for i in 0..20u32 {
4011            db.upsert(
4012                "c",
4013                &format!("p{i}"),
4014                &[i as f32, 0.0, 0.0, 0.0],
4015                &json!({}),
4016            )
4017            .unwrap();
4018        }
4019        let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
4020        assert!(db.delete("c", "p3").unwrap());
4021        assert!(!db.collections["c"].stale);
4022        // Re-insert the same id; it must be searchable again (the slot is reused).
4023        db.upsert("c", "p3", &[3.0, 0.0, 0.0, 0.0], &json!({}))
4024            .unwrap();
4025        assert!(!db.collections["c"].stale);
4026        let res = db
4027            .search("c", &[3.0, 0.0, 0.0, 0.0], &SearchParams::default())
4028            .unwrap();
4029        assert_eq!(res[0].id, "p3");
4030    }
4031
4032    #[test]
4033    fn hnsw_in_place_update_falls_back_to_rebuild() {
4034        // HNSW cannot update an id in place, so an update marks the index stale.
4035        let tmp = tempfile::tempdir().unwrap();
4036        let mut db = open(tmp.path());
4037        db.create_collection("c", desc()).unwrap();
4038        for i in 0..10u32 {
4039            db.upsert(
4040                "c",
4041                &format!("p{i}"),
4042                &[i as f32, 0.0, 0.0, 0.0],
4043                &json!({}),
4044            )
4045            .unwrap();
4046        }
4047        let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
4048        assert!(!db.collections["c"].stale);
4049        db.upsert("c", "p2", &[42.0, 0.0, 0.0, 0.0], &json!({}))
4050            .unwrap();
4051        assert!(db.collections["c"].stale, "hnsw update schedules a rebuild");
4052        // The rebuild on the next search reflects the new vector.
4053        let res = db
4054            .search("c", &[42.0, 0.0, 0.0, 0.0], &SearchParams::default())
4055            .unwrap();
4056        assert_eq!(res[0].id, "p2");
4057    }
4058
4059    #[test]
4060    fn unsupported_index_configurations_are_rejected() {
4061        let tmp = tempfile::tempdir().unwrap();
4062        let mut db = open(tmp.path());
4063        // Vamana/IVF do not support inner product.
4064        let dot_vamana =
4065            Descriptor::new(4, Dtype::F32, DistanceMetric::Dot).with_index(IndexSpec {
4066                kind: IndexKind::Vamana,
4067                pq_subspaces: None,
4068            });
4069        assert!(matches!(
4070            db.create_collection("a", dot_vamana),
4071            Err(Error::Unsupported(_))
4072        ));
4073        // ...nor does the disk-resident index.
4074        let dot_disk = Descriptor::new(4, Dtype::F32, DistanceMetric::Dot).with_index(IndexSpec {
4075            kind: IndexKind::DiskVamana,
4076            pq_subspaces: None,
4077        });
4078        assert!(matches!(
4079            db.create_collection("b", dot_disk),
4080            Err(Error::Unsupported(_))
4081        ));
4082    }
4083
4084    #[test]
4085    fn dcpe_collections_require_the_l2_metric() {
4086        let tmp = tempfile::tempdir().unwrap();
4087        let mut db = open(tmp.path());
4088        // DCPE preserves Euclidean distance, so cosine/dot are rejected.
4089        for metric in [DistanceMetric::Cosine, DistanceMetric::Dot] {
4090            let bad = Descriptor::new(4, Dtype::F32, metric)
4091                .with_vector_encryption(VectorEncryption::Dcpe);
4092            assert!(matches!(
4093                db.create_collection("bad", bad),
4094                Err(Error::Unsupported(_))
4095            ));
4096        }
4097        // L2 is accepted, and the flag persists on the descriptor.
4098        let good = Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
4099            .with_vector_encryption(VectorEncryption::Dcpe);
4100        db.create_collection("enc", good)
4101            .expect("l2 dcpe collection");
4102        assert_eq!(
4103            db.descriptor("enc").expect("descriptor").vector_encryption,
4104            VectorEncryption::Dcpe
4105        );
4106    }
4107
4108    #[test]
4109    fn client_side_collections_are_fetch_only_and_reject_search() {
4110        let tmp = tempfile::tempdir().unwrap();
4111        let mut db = open(tmp.path());
4112        // Any metric is allowed (the server never ranks), so there is no L2
4113        // restriction as there is for DCPE.
4114        let desc = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
4115            .with_vector_encryption(VectorEncryption::ClientSide);
4116        db.create_collection("vault", desc)
4117            .expect("create client-side collection");
4118        // No server-side index is built for a client-side collection (ADR-0032).
4119        assert!(matches!(
4120            db.collections["vault"].index,
4121            CollectionIndex::None
4122        ));
4123
4124        // Upsert opaque placeholder points: a zero vector plus a payload carrying a
4125        // stand-in sealed vector blob and a cleartext, server-filterable field.
4126        for i in 0..5 {
4127            let tier = if i < 2 { "vip" } else { "std" };
4128            db.upsert(
4129                "vault",
4130                &format!("p{i}"),
4131                &[0.0; 4],
4132                &serde_json::json!({ "__quiver_vec__": format!("ct-{i}"), "tier": tier }),
4133            )
4134            .expect("upsert");
4135        }
4136        assert_eq!(db.len("vault").unwrap(), 5);
4137        // Still no index after writes — it never goes stale or rebuilds.
4138        assert!(matches!(
4139            db.collections["vault"].index,
4140            CollectionIndex::None
4141        ));
4142
4143        // Ranked search is rejected: the server cannot rank opaque vectors.
4144        assert!(matches!(
4145            db.search("vault", &[0.0; 4], &SearchParams::default()),
4146            Err(Error::Unsupported(_))
4147        ));
4148
4149        // Fetch returns the entitled set; each payload carries the blob the client
4150        // would decrypt and rank locally, and vectors are omitted when not asked for.
4151        let all = db.fetch("vault", None, 0, 100, true, false).unwrap();
4152        assert_eq!(all.len(), 5);
4153        assert!(
4154            all.iter()
4155                .all(|m| m.payload.is_some() && m.vector.is_none())
4156        );
4157
4158        // A cleartext payload filter narrows the set server-side.
4159        let vip = db
4160            .fetch(
4161                "vault",
4162                Some(&Filter::Eq {
4163                    field: "tier".to_owned(),
4164                    value: serde_json::json!("vip"),
4165                }),
4166                0,
4167                100,
4168                false,
4169                false,
4170            )
4171            .unwrap();
4172        assert_eq!(vip.len(), 2);
4173        // A limit bounds the returned set.
4174        assert_eq!(
4175            db.fetch("vault", None, 0, 2, false, false).unwrap().len(),
4176            2
4177        );
4178        // An offset scrolls past earlier matches (paginated migration copy).
4179        assert_eq!(
4180            db.fetch("vault", None, 3, 100, false, false).unwrap().len(),
4181            2
4182        );
4183
4184        // get returns the stored placeholder + blob payload by id; delete works
4185        // through the store with no index to update.
4186        assert_eq!(db.get("vault", "p0").unwrap().unwrap().id, "p0");
4187        assert!(db.delete("vault", "p0").unwrap());
4188        assert_eq!(db.len("vault").unwrap(), 4);
4189    }
4190
4191    #[test]
4192    fn client_side_encryption_rejects_multivector() {
4193        let tmp = tempfile::tempdir().unwrap();
4194        let mut db = open(tmp.path());
4195        let desc = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
4196            .with_multivector(true)
4197            .with_vector_encryption(VectorEncryption::ClientSide);
4198        assert!(matches!(
4199            db.create_collection("bad", desc),
4200            Err(Error::Unsupported(_))
4201        ));
4202    }
4203
4204    // Recursively check whether a file named `name` exists under `dir`.
4205    fn contains_file(dir: &Path, name: &str) -> bool {
4206        std::fs::read_dir(dir).is_ok_and(|rd| {
4207            rd.flatten().any(|e| {
4208                let p = e.path();
4209                if p.is_dir() {
4210                    contains_file(&p, name)
4211                } else {
4212                    p.file_name().is_some_and(|f| f == name)
4213                }
4214            })
4215        })
4216    }
4217
4218    #[test]
4219    fn disk_index_collection_searches_persists_and_writes_an_artifact() {
4220        let tmp = tempfile::tempdir().unwrap();
4221        {
4222            let mut db = open(tmp.path());
4223            db.create_collection("d", desc_with(IndexKind::DiskVamana))
4224                .unwrap();
4225            for i in 0..40u32 {
4226                db.upsert(
4227                    "d",
4228                    &format!("p{i}"),
4229                    &[i as f32, 0.0, 0.0, 0.0],
4230                    &json!({}),
4231                )
4232                .unwrap();
4233            }
4234            let res = db
4235                .search("d", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
4236                .unwrap();
4237            assert_eq!(res[0].id, "p7");
4238            db.checkpoint().unwrap();
4239        }
4240        // The encrypted disk artifact was written under the collection's index dir.
4241        assert!(
4242            contains_file(tmp.path(), "vamana.qvx"),
4243            "disk index file missing"
4244        );
4245        // Reopening rebuilds and re-opens the disk index; search still works.
4246        let mut db = open(tmp.path());
4247        assert_eq!(
4248            db.descriptor("d").unwrap().index.kind,
4249            IndexKind::DiskVamana
4250        );
4251        let res = db
4252            .search("d", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
4253            .unwrap();
4254        assert_eq!(res[0].id, "p7");
4255    }
4256
4257    #[test]
4258    fn graph_collections_maintain_writes_incrementally() {
4259        // FreshDiskANN incremental maintenance (ADR-0033): after the base graph is
4260        // built, inserts land in the delta, deletes tombstone, and updates move —
4261        // all without a rebuild or a reopen, for both the in-memory and disk graphs.
4262        for kind in [IndexKind::Vamana, IndexKind::DiskVamana] {
4263            let tmp = tempfile::tempdir().unwrap();
4264            let mut db = open(tmp.path());
4265            db.create_collection("c", desc_with(kind)).unwrap();
4266            for i in 0..40u32 {
4267                db.upsert(
4268                    "c",
4269                    &format!("p{i}"),
4270                    &[i as f32, 0.0, 0.0, 0.0],
4271                    &json!({}),
4272                )
4273                .unwrap();
4274            }
4275            // The first search builds the read-only base graph.
4276            let res = db
4277                .search("c", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
4278                .unwrap();
4279            assert_eq!(res[0].id, "p7", "{kind:?} base nearest");
4280
4281            // A brand-new point lands in the in-memory delta and is immediately
4282            // findable — no rebuild, no reopen.
4283            db.upsert("c", "p7b", &[7.4, 0.0, 0.0, 0.0], &json!({}))
4284                .unwrap();
4285            let res = db
4286                .search("c", &[7.45, 0.0, 0.0, 0.0], &SearchParams::default())
4287                .unwrap();
4288            assert_eq!(res[0].id, "p7b", "{kind:?} delta insert not found");
4289
4290            // A delete tombstones the point: it is never returned again.
4291            assert!(db.delete("c", "p7").unwrap());
4292            let res = db
4293                .search("c", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
4294                .unwrap();
4295            assert!(
4296                res.iter().all(|m| m.id != "p7"),
4297                "{kind:?} deleted id returned"
4298            );
4299
4300            // An update moves a vector: it is found at the new position and no
4301            // longer dominates the old one.
4302            db.upsert("c", "p20", &[500.0, 0.0, 0.0, 0.0], &json!({}))
4303                .unwrap();
4304            let res = db
4305                .search("c", &[500.0, 0.0, 0.0, 0.0], &SearchParams::default())
4306                .unwrap();
4307            assert_eq!(res[0].id, "p20", "{kind:?} updated vector not at new spot");
4308            let res = db
4309                .search("c", &[20.0, 0.0, 0.0, 0.0], &SearchParams::default())
4310                .unwrap();
4311            assert_ne!(
4312                res[0].id, "p20",
4313                "{kind:?} stale copy still nearest old spot"
4314            );
4315        }
4316    }
4317
4318    #[test]
4319    fn graph_consolidates_under_heavy_churn() {
4320        // Churn past the 20% pending threshold forces a consolidation (a derived
4321        // rebuild from the store); results stay correct throughout and across a
4322        // reopen, since the store is the source of truth (ADR-0033).
4323        let tmp = tempfile::tempdir().unwrap();
4324        let mut db = open(tmp.path());
4325        db.create_collection("c", desc_with(IndexKind::Vamana))
4326            .unwrap();
4327        for i in 0..50u32 {
4328            db.upsert(
4329                "c",
4330                &format!("p{i}"),
4331                &[i as f32, 0.0, 0.0, 0.0],
4332                &json!({}),
4333            )
4334            .unwrap();
4335        }
4336        let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
4337
4338        let deleted: Vec<String> = (0..15u32).map(|i| format!("p{i}")).collect();
4339        for i in 0..15u32 {
4340            assert!(db.delete("c", &format!("p{i}")).unwrap());
4341            db.upsert(
4342                "c",
4343                &format!("q{i}"),
4344                &[1000.0 + i as f32, 0.0, 0.0, 0.0],
4345                &json!({}),
4346            )
4347            .unwrap();
4348        }
4349
4350        let near_origin = db
4351            .search("c", &[5.0, 0.0, 0.0, 0.0], &SearchParams::default())
4352            .unwrap();
4353        assert!(
4354            near_origin.iter().all(|m| !deleted.contains(&m.id)),
4355            "a churned-out id was returned"
4356        );
4357        let near_q = db
4358            .search("c", &[1007.0, 0.0, 0.0, 0.0], &SearchParams::default())
4359            .unwrap();
4360        assert_eq!(near_q[0].id, "q7", "new point not found after churn");
4361
4362        db.checkpoint().unwrap();
4363        drop(db);
4364        let mut db = open(tmp.path());
4365        let near_q = db
4366            .search("c", &[1007.0, 0.0, 0.0, 0.0], &SearchParams::default())
4367            .unwrap();
4368        assert_eq!(near_q[0].id, "q7", "new point lost across reopen");
4369        let near_origin = db
4370            .search("c", &[5.0, 0.0, 0.0, 0.0], &SearchParams::default())
4371            .unwrap();
4372        assert!(
4373            near_origin.iter().all(|m| !deleted.contains(&m.id)),
4374            "a churned-out id resurfaced after reopen"
4375        );
4376    }
4377
4378    #[test]
4379    fn multivector_writes_are_incremental_and_match_a_rebuild() {
4380        // Token rows fold into the ANN index incrementally instead of stale->rebuild
4381        // (ADR-0034); the document API stays correct under interleaved
4382        // upsert/delete/re-upsert with no reopen, and a reopen (the authoritative
4383        // rebuild) yields the identical ranking. Docs lie on an arc of increasing
4384        // angle to the query (cosine, so direction alone ranks them), making the
4385        // expected order unambiguous. (Token-pool candidate generation only engages
4386        // above the exact-scan threshold; the incremental token maintenance it
4387        // relies on is the same code the single-vector index tests exercise.)
4388        let dir = |theta: f32| vec![theta.cos(), theta.sin(), 0.0, 0.0];
4389        let doc = |theta: f32| vec![dir(theta), dir(theta)];
4390        for kind in [
4391            IndexKind::Ivf,
4392            IndexKind::Hnsw,
4393            IndexKind::Vamana,
4394            IndexKind::Colbert,
4395        ] {
4396            let tmp = tempfile::tempdir().unwrap();
4397            let desc = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
4398                .with_multivector(true)
4399                .with_index(IndexSpec {
4400                    kind,
4401                    pq_subspaces: None,
4402                });
4403            let mut db = open(tmp.path());
4404            db.create_collection("m", desc).unwrap();
4405            // d1 is most aligned with the query (smallest angle), d10 least.
4406            for i in 1..=10u32 {
4407                db.upsert_document(
4408                    "m",
4409                    &format!("d{i}"),
4410                    &doc(0.1 * i as f32),
4411                    &json!({ "i": i }),
4412                )
4413                .unwrap();
4414            }
4415            let q = vec![dir(0.0)];
4416            let top = |db: &mut Database| {
4417                db.search_multi_vector(
4418                    "m",
4419                    &q,
4420                    &SearchParams {
4421                        k: 3,
4422                        ..Default::default()
4423                    },
4424                )
4425                .unwrap()
4426                .into_iter()
4427                .map(|m| m.id)
4428                .collect::<Vec<_>>()
4429            };
4430            assert_eq!(top(&mut db), vec!["d1", "d2", "d3"], "{kind:?} initial");
4431
4432            // Delete the top document — gone without a reopen.
4433            assert!(db.delete_document("m", "d1").unwrap());
4434            assert_eq!(
4435                top(&mut db),
4436                vec!["d2", "d3", "d4"],
4437                "{kind:?} after delete"
4438            );
4439
4440            // Re-upsert the least-aligned doc onto the query — the update is live.
4441            db.upsert_document("m", "d10", &doc(0.0), &json!({ "i": 10 }))
4442                .unwrap();
4443            assert_eq!(top(&mut db)[0], "d10", "{kind:?} after update");
4444
4445            // A new, near-aligned document is immediately findable, second only to d10.
4446            db.upsert_document("m", "d11", &doc(0.05), &json!({ "i": 11 }))
4447                .unwrap();
4448            let r = top(&mut db);
4449            assert_eq!(r[0], "d10", "{kind:?}");
4450            assert_eq!(r[1], "d11", "{kind:?} new doc not ranked");
4451
4452            // A shorter re-upsert drops the trailing token row.
4453            db.upsert_document("m", "d8", &[dir(0.8)], &json!({ "i": 8 }))
4454                .unwrap();
4455            let d8 = db.get_document("m", "d8", true).unwrap().unwrap();
4456            assert_eq!(d8.vectors.unwrap().len(), 1, "{kind:?} trailing token kept");
4457
4458            // The incremental in-memory state matches an authoritative rebuild.
4459            let before = top(&mut db);
4460            drop(db);
4461            let mut db = open(tmp.path());
4462            assert_eq!(top(&mut db), before, "{kind:?} incremental != rebuild");
4463            assert!(
4464                db.get_document("m", "d1", false).unwrap().is_none(),
4465                "{kind:?} deleted doc resurfaced"
4466            );
4467        }
4468    }
4469
4470    #[test]
4471    fn colbert_index_requires_multivector() {
4472        let tmp = tempfile::tempdir().unwrap();
4473        let mut db = open(tmp.path());
4474        // ColBERT is a late-interaction token-pool index — invalid for a
4475        // single-vector collection (ADR-0034).
4476        let single = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine).with_index(IndexSpec {
4477            kind: IndexKind::Colbert,
4478            pq_subspaces: None,
4479        });
4480        assert!(matches!(
4481            db.create_collection("c", single),
4482            Err(Error::Unsupported(_))
4483        ));
4484        // ...but valid for a multi-vector collection.
4485        let multi = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
4486            .with_multivector(true)
4487            .with_index(IndexSpec {
4488                kind: IndexKind::Colbert,
4489                pq_subspaces: None,
4490            });
4491        assert!(db.create_collection("m", multi).is_ok());
4492    }
4493
4494    // ---- hybrid (pre-filtered) search ----
4495
4496    // A collection whose `city` (keyword) and `n` (numeric) payload fields are
4497    // declared filterable, so the planner can pre-filter on them.
4498    fn desc_filterable() -> Descriptor {
4499        Descriptor::new(4, Dtype::F32, DistanceMetric::L2).with_filterable(vec![
4500            FilterableField::keyword("city"),
4501            FilterableField::numeric("n"),
4502        ])
4503    }
4504
4505    // 30 points on the x-axis (distance to the origin grows with i), cycling
4506    // through three cities, each carrying its index as a numeric `n`.
4507    // Checkpointed so the rows live in a sealed segment's secondary index — the
4508    // pre-filter primitive — not only the active buffer.
4509    fn seed_cities(db: &mut Database) {
4510        const CITIES: [&str; 3] = ["paris", "lyon", "rome"];
4511        db.create_collection("c", desc_filterable()).unwrap();
4512        for i in 0..30u32 {
4513            db.upsert(
4514                "c",
4515                &format!("p{i}"),
4516                &[i as f32, 0.0, 0.0, 0.0],
4517                &json!({"city": CITIES[i as usize % 3], "n": i}),
4518            )
4519            .unwrap();
4520        }
4521        db.checkpoint().unwrap();
4522    }
4523
4524    #[test]
4525    fn hybrid_equality_prefilter_is_exact() {
4526        let tmp = tempfile::tempdir().unwrap();
4527        let mut db = open(tmp.path());
4528        seed_cities(&mut db);
4529        let params = SearchParams {
4530            k: 5,
4531            filter: Some(Filter::Eq {
4532                field: "city".into(),
4533                value: json!("lyon"),
4534            }),
4535            ..SearchParams::default()
4536        };
4537        let res = db.search("c", &[0.0; 4], &params).unwrap();
4538        assert!(!res.is_empty());
4539        // lyon is i % 3 == 1 → p1, p4, p7, …; nearest the origin is p1.
4540        assert_eq!(res[0].id, "p1");
4541        for m in &res {
4542            assert_eq!(m.payload.as_ref().unwrap()["city"], json!("lyon"));
4543        }
4544    }
4545
4546    #[test]
4547    fn hybrid_numeric_range_prefilter_is_exact() {
4548        let tmp = tempfile::tempdir().unwrap();
4549        let mut db = open(tmp.path());
4550        seed_cities(&mut db);
4551        let params = SearchParams {
4552            k: 4,
4553            filter: Some(Filter::Gte {
4554                field: "n".into(),
4555                value: json!(10),
4556            }),
4557            ..SearchParams::default()
4558        };
4559        let res = db.search("c", &[0.0; 4], &params).unwrap();
4560        // Among n >= 10, the nearest the origin is n == 10 (p10).
4561        assert_eq!(res[0].id, "p10");
4562        for m in &res {
4563            assert!(m.payload.as_ref().unwrap()["n"].as_u64().unwrap() >= 10);
4564        }
4565    }
4566
4567    #[test]
4568    fn hybrid_unsatisfiable_filter_returns_empty() {
4569        let tmp = tempfile::tempdir().unwrap();
4570        let mut db = open(tmp.path());
4571        seed_cities(&mut db);
4572        // No row holds this city, so the pre-filter proves the result empty
4573        // without touching the vector index.
4574        let params = SearchParams {
4575            filter: Some(Filter::Eq {
4576                field: "city".into(),
4577                value: json!("atlantis"),
4578            }),
4579            ..SearchParams::default()
4580        };
4581        assert!(db.search("c", &[0.0; 4], &params).unwrap().is_empty());
4582    }
4583
4584    #[test]
4585    fn hybrid_and_or_composition_is_exact() {
4586        let tmp = tempfile::tempdir().unwrap();
4587        let mut db = open(tmp.path());
4588        seed_cities(&mut db);
4589        // (city in {paris, rome}) AND (n < 12): a keyword `in` intersected with a
4590        // numeric range, both pre-filterable.
4591        let params = SearchParams {
4592            k: 10,
4593            filter: Some(Filter::And(vec![
4594                Filter::In {
4595                    field: "city".into(),
4596                    values: vec![json!("paris"), json!("rome")],
4597                },
4598                Filter::Lt {
4599                    field: "n".into(),
4600                    value: json!(12),
4601                },
4602            ])),
4603            ..SearchParams::default()
4604        };
4605        let res = db.search("c", &[0.0; 4], &params).unwrap();
4606        // paris is i % 3 == 0 → the nearest qualifying point is p0.
4607        assert_eq!(res[0].id, "p0");
4608        for m in &res {
4609            let payload = m.payload.as_ref().unwrap();
4610            let city = payload["city"].as_str().unwrap();
4611            assert!(city == "paris" || city == "rome");
4612            assert!(payload["n"].as_u64().unwrap() < 12);
4613        }
4614    }
4615
4616    #[test]
4617    fn hybrid_rechecks_non_indexable_clause() {
4618        let tmp = tempfile::tempdir().unwrap();
4619        let mut db = open(tmp.path());
4620        seed_cities(&mut db);
4621        // city is pre-filterable; the Not(…) clause is not, so it is re-checked
4622        // exactly on the narrowed candidates — paris rows excluding p0.
4623        let params = SearchParams {
4624            k: 10,
4625            filter: Some(Filter::And(vec![
4626                Filter::Eq {
4627                    field: "city".into(),
4628                    value: json!("paris"),
4629                },
4630                Filter::Not(Box::new(Filter::Eq {
4631                    field: "n".into(),
4632                    value: json!(0),
4633                })),
4634            ])),
4635            ..SearchParams::default()
4636        };
4637        let res = db.search("c", &[0.0; 4], &params).unwrap();
4638        assert!(res.iter().all(|m| m.id != "p0"));
4639        // The nearest paris point after p0 is p3.
4640        assert_eq!(res[0].id, "p3");
4641        for m in &res {
4642            assert_eq!(m.payload.as_ref().unwrap()["city"], json!("paris"));
4643        }
4644    }
4645
4646    #[test]
4647    fn post_filter_fallback_on_undeclared_field_is_correct() {
4648        let tmp = tempfile::tempdir().unwrap();
4649        let mut db = open(tmp.path());
4650        // Only `city` is filterable; a filter on the undeclared `tier` cannot
4651        // pre-filter and falls back to ANN post-filtering — still exact.
4652        db.create_collection(
4653            "c",
4654            Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
4655                .with_filterable(vec![FilterableField::keyword("city")]),
4656        )
4657        .unwrap();
4658        for i in 0..20u32 {
4659            let tier = if i % 2 == 0 { "gold" } else { "silver" };
4660            db.upsert(
4661                "c",
4662                &format!("p{i}"),
4663                &[i as f32, 0.0, 0.0, 0.0],
4664                &json!({"city": "paris", "tier": tier}),
4665            )
4666            .unwrap();
4667        }
4668        let params = SearchParams {
4669            k: 5,
4670            filter: Some(Filter::Eq {
4671                field: "tier".into(),
4672                value: json!("gold"),
4673            }),
4674            ..SearchParams::default()
4675        };
4676        let res = db.search("c", &[0.0; 4], &params).unwrap();
4677        assert!(!res.is_empty());
4678        for m in &res {
4679            assert_eq!(m.payload.as_ref().unwrap()["tier"], json!("gold"));
4680        }
4681    }
4682
4683    #[test]
4684    fn leaf_predicate_maps_only_indexable_filterable_leaves() {
4685        let fields = vec![
4686            FilterableField::keyword("city"),
4687            FilterableField::numeric("n"),
4688        ];
4689        // Keyword equality on a filterable field maps.
4690        assert_eq!(
4691            leaf_predicate(
4692                &Filter::Eq {
4693                    field: "city".into(),
4694                    value: json!("paris")
4695                },
4696                &fields
4697            ),
4698            Some(SecPredicate::Eq {
4699                field: "city".into(),
4700                value: SecValue::Keyword("paris".into())
4701            })
4702        );
4703        // A numeric comparison maps to a one-sided range.
4704        assert_eq!(
4705            leaf_predicate(
4706                &Filter::Gte {
4707                    field: "n".into(),
4708                    value: json!(3)
4709                },
4710                &fields
4711            ),
4712            Some(SecPredicate::Range {
4713                field: "n".into(),
4714                lo: Some(SecValue::Numeric(3.0)),
4715                hi: None,
4716                lo_inclusive: true,
4717                hi_inclusive: false,
4718            })
4719        );
4720        // Undeclared field, type mismatch, `ne`, and `exists` do not map.
4721        let undeclared = Filter::Eq {
4722            field: "tier".into(),
4723            value: json!("gold"),
4724        };
4725        let mismatch = Filter::Eq {
4726            field: "city".into(),
4727            value: json!(5),
4728        };
4729        let ne = Filter::Ne {
4730            field: "city".into(),
4731            value: json!("x"),
4732        };
4733        let exists = Filter::Exists {
4734            field: "city".into(),
4735        };
4736        assert!(leaf_predicate(&undeclared, &fields).is_none());
4737        assert!(leaf_predicate(&mismatch, &fields).is_none());
4738        assert!(leaf_predicate(&ne, &fields).is_none());
4739        assert!(leaf_predicate(&exists, &fields).is_none());
4740    }
4741
4742    // ----- durable IVF index recovery (ADR-0025) -----
4743
4744    // The first collection created in a fresh store has id 0.
4745    fn ivf_index_dir(root: &Path) -> std::path::PathBuf {
4746        root.join("collections").join("0000000000").join("index")
4747    }
4748
4749    fn idx_snapshot_files(root: &Path) -> Vec<String> {
4750        let mut v: Vec<String> = std::fs::read_dir(ivf_index_dir(root))
4751            .map(|rd| {
4752                rd.filter_map(std::result::Result::ok)
4753                    .filter_map(|e| e.file_name().to_str().map(str::to_owned))
4754                    .filter(|n| n.starts_with("idx-"))
4755                    .collect()
4756            })
4757            .unwrap_or_default();
4758        v.sort();
4759        v
4760    }
4761
4762    fn nearest(db: &mut Database, q: &[f32]) -> Vec<String> {
4763        db.search("c", q, &SearchParams::default())
4764            .unwrap()
4765            .into_iter()
4766            .map(|m| m.id)
4767            .collect()
4768    }
4769
4770    fn seed_ivf(db: &mut Database, n: u32) {
4771        db.create_collection("c", desc_with(IndexKind::Ivf))
4772            .unwrap();
4773        for i in 0..n {
4774            db.upsert(
4775                "c",
4776                &format!("p{i}"),
4777                &[i as f32, 0.0, 0.0, 0.0],
4778                &json!({}),
4779            )
4780            .unwrap();
4781        }
4782        // The first search builds (and trains) the IVF from the store.
4783        let _ = nearest(db, &[1.0, 0.0, 0.0, 0.0]);
4784    }
4785
4786    #[test]
4787    fn ivf_snapshot_is_written_at_checkpoint() {
4788        let tmp = tempfile::tempdir().unwrap();
4789        let mut db = open(tmp.path());
4790        seed_ivf(&mut db, 40);
4791        db.checkpoint().unwrap();
4792        assert_eq!(idx_snapshot_files(tmp.path()).len(), 1);
4793    }
4794
4795    #[test]
4796    fn ivf_loads_from_snapshot_rather_than_rebuilding() {
4797        let tmp = tempfile::tempdir().unwrap();
4798        {
4799            let mut db = open(tmp.path());
4800            db.create_collection("c", desc_with(IndexKind::Ivf))
4801                .unwrap();
4802            db.upsert("c", "a", &[0.0, 0.0, 0.0, 0.0], &json!({}))
4803                .unwrap();
4804            db.upsert("c", "m", &[1.0, 0.0, 0.0, 0.0], &json!({}))
4805                .unwrap();
4806            // First search builds the IVF — int_to_ext is the sorted scan order.
4807            let _ = nearest(&mut db, &[0.0, 0.0, 0.0, 0.0]);
4808            // Incremental upserts append in insertion order, diverging from sort.
4809            db.upsert("c", "z", &[2.0, 0.0, 0.0, 0.0], &json!({}))
4810                .unwrap();
4811            db.upsert("c", "b", &[3.0, 0.0, 0.0, 0.0], &json!({}))
4812                .unwrap();
4813            db.checkpoint().unwrap();
4814            assert_eq!(db.collections["c"].int_to_ext, ["a", "m", "z", "b"]);
4815        }
4816        let db = open(tmp.path());
4817        // Loaded from the snapshot: the insertion-order mapping is preserved. A
4818        // rebuild would have produced the sorted order ["a", "b", "m", "z"].
4819        assert_eq!(
4820            db.collections["c"].int_to_ext,
4821            ["a", "m", "z", "b"],
4822            "index was rebuilt, not loaded from the snapshot"
4823        );
4824    }
4825
4826    #[test]
4827    fn ivf_recovery_replays_post_checkpoint_upserts() {
4828        let tmp = tempfile::tempdir().unwrap();
4829        {
4830            let mut db = open(tmp.path());
4831            seed_ivf(&mut db, 30);
4832            db.checkpoint().unwrap();
4833            // Post-checkpoint upsert, no further checkpoint.
4834            db.upsert("c", "far", &[500.0, 0.0, 0.0, 0.0], &json!({}))
4835                .unwrap();
4836        }
4837        let mut db = open(tmp.path());
4838        assert_eq!(nearest(&mut db, &[500.0, 0.0, 0.0, 0.0])[0], "far");
4839        assert_eq!(nearest(&mut db, &[1.0, 0.0, 0.0, 0.0])[0], "p1");
4840    }
4841
4842    #[test]
4843    fn ivf_recovery_replays_post_checkpoint_deletes() {
4844        let tmp = tempfile::tempdir().unwrap();
4845        {
4846            let mut db = open(tmp.path());
4847            seed_ivf(&mut db, 30);
4848            db.checkpoint().unwrap();
4849            assert!(db.delete("c", "p7").unwrap());
4850        }
4851        let mut db = open(tmp.path());
4852        assert!(
4853            nearest(&mut db, &[7.0, 0.0, 0.0, 0.0])
4854                .iter()
4855                .all(|id| id != "p7")
4856        );
4857        assert!(db.get("c", "p7").unwrap().is_none());
4858        assert!(db.get("c", "p6").unwrap().is_some());
4859    }
4860
4861    #[test]
4862    fn ivf_recovery_replays_post_checkpoint_updates() {
4863        let tmp = tempfile::tempdir().unwrap();
4864        {
4865            let mut db = open(tmp.path());
4866            seed_ivf(&mut db, 30);
4867            db.checkpoint().unwrap();
4868            // Move p0 far away — an in-place update shadowing its sealed row.
4869            db.upsert("c", "p0", &[999.0, 0.0, 0.0, 0.0], &json!({}))
4870                .unwrap();
4871        }
4872        let mut db = open(tmp.path());
4873        assert_eq!(nearest(&mut db, &[999.0, 0.0, 0.0, 0.0])[0], "p0");
4874        assert_ne!(
4875            nearest(&mut db, &[0.0, 0.0, 0.0, 0.0])[0],
4876            "p0",
4877            "the stale p0 vector survived the update"
4878        );
4879    }
4880
4881    #[test]
4882    fn corrupt_ivf_snapshot_falls_back_to_rebuild() {
4883        let tmp = tempfile::tempdir().unwrap();
4884        {
4885            let mut db = open(tmp.path());
4886            seed_ivf(&mut db, 30);
4887            db.checkpoint().unwrap();
4888        }
4889        // Corrupt the snapshot file; recovery must fall back to a rebuild.
4890        let files = idx_snapshot_files(tmp.path());
4891        assert_eq!(files.len(), 1);
4892        std::fs::write(ivf_index_dir(tmp.path()).join(&files[0]), b"corrupt").unwrap();
4893
4894        let mut db = open(tmp.path());
4895        assert_eq!(nearest(&mut db, &[7.0, 0.0, 0.0, 0.0])[0], "p7");
4896    }
4897
4898    // ---- Multi-vector / late interaction (ColBERT, ADR-0028) ----
4899
4900    fn mv_desc() -> Descriptor {
4901        Descriptor::new(3, Dtype::F32, DistanceMetric::Cosine).with_multivector(true)
4902    }
4903
4904    // Brute-force MaxSim ranking over a corpus: the reference the pipeline must
4905    // reproduce (score desc, ties by id), using the same shared scorer.
4906    fn bf_rank(query: &[Vec<f32>], corpus: &[(&str, Vec<Vec<f32>>)]) -> Vec<(String, f32)> {
4907        let mut v: Vec<(String, f32)> = corpus
4908            .iter()
4909            .map(|(id, toks)| ((*id).to_owned(), max_sim(Metric::Cosine, query, toks)))
4910            .collect();
4911        v.sort_by(|a, b| b.1.total_cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
4912        v
4913    }
4914
4915    #[test]
4916    fn multivector_search_ranks_documents_by_maxsim() {
4917        let tmp = tempfile::tempdir().unwrap();
4918        let mut db = open(tmp.path());
4919        db.create_collection("docs", mv_desc()).unwrap();
4920        let corpus: Vec<(&str, Vec<Vec<f32>>)> = vec![
4921            ("d_cat", vec![vec![1.0, 0.0, 0.0], vec![0.0, 1.0, 0.0]]),
4922            ("d_dog", vec![vec![0.0, 1.0, 0.0], vec![0.0, 0.0, 1.0]]),
4923            (
4924                "d_mix",
4925                vec![
4926                    vec![1.0, 1.0, 0.0],
4927                    vec![0.0, 0.0, 1.0],
4928                    vec![1.0, 0.0, 1.0],
4929                ],
4930            ),
4931        ];
4932        for (id, toks) in &corpus {
4933            db.upsert_document("docs", id, toks, &json!({ "id": id }))
4934                .unwrap();
4935        }
4936        assert_eq!(db.document_count("docs").unwrap(), 3);
4937
4938        let query = vec![vec![1.0, 0.0, 0.0], vec![0.0, 0.0, 1.0]];
4939        let params = SearchParams {
4940            k: 3,
4941            with_payload: false,
4942            ..SearchParams::default()
4943        };
4944        let got = db.search_multi_vector("docs", &query, &params).unwrap();
4945        let expected = bf_rank(&query, &corpus);
4946
4947        assert_eq!(got.len(), 3);
4948        for (g, (eid, escore)) in got.iter().zip(expected.iter()) {
4949            assert_eq!(&g.id, eid, "ranking matches brute force");
4950            assert!(
4951                (g.score - escore).abs() < 1e-5,
4952                "{} score {} vs {escore}",
4953                g.id,
4954                g.score
4955            );
4956        }
4957    }
4958
4959    #[test]
4960    fn multivector_search_truncates_to_k() {
4961        let tmp = tempfile::tempdir().unwrap();
4962        let mut db = open(tmp.path());
4963        db.create_collection("docs", mv_desc()).unwrap();
4964        for i in 0..5 {
4965            let v = vec![vec![1.0, i as f32, 0.0]];
4966            db.upsert_document("docs", &format!("d{i}"), &v, &json!({}))
4967                .unwrap();
4968        }
4969        let params = SearchParams {
4970            k: 2,
4971            ..SearchParams::default()
4972        };
4973        let got = db
4974            .search_multi_vector("docs", &[vec![1.0, 0.0, 0.0]], &params)
4975            .unwrap();
4976        assert_eq!(got.len(), 2);
4977    }
4978
4979    #[test]
4980    fn multivector_filter_selects_documents_exactly() {
4981        let tmp = tempfile::tempdir().unwrap();
4982        let mut db = open(tmp.path());
4983        db.create_collection("docs", mv_desc()).unwrap();
4984        // Identical token sets, so only the filter distinguishes the documents.
4985        db.upsert_document("docs", "a", &[vec![1.0, 0.0, 0.0]], &json!({"lang":"en"}))
4986            .unwrap();
4987        db.upsert_document("docs", "b", &[vec![1.0, 0.0, 0.0]], &json!({"lang":"fr"}))
4988            .unwrap();
4989        let params = SearchParams {
4990            k: 10,
4991            filter: Some(Filter::Eq {
4992                field: "lang".into(),
4993                value: json!("fr"),
4994            }),
4995            ..SearchParams::default()
4996        };
4997        let got = db
4998            .search_multi_vector("docs", &[vec![1.0, 0.0, 0.0]], &params)
4999            .unwrap();
5000        assert_eq!(got.len(), 1);
5001        assert_eq!(got[0].id, "b");
5002        assert_eq!(got[0].payload, Some(json!({"lang":"fr"})));
5003    }
5004
5005    #[test]
5006    fn multivector_reopen_rebuilds_grouping_and_ranking() {
5007        let tmp = tempfile::tempdir().unwrap();
5008        let query = vec![vec![1.0, 0.0, 0.0], vec![0.0, 0.0, 1.0]];
5009        let corpus: Vec<(&str, Vec<Vec<f32>>)> = vec![
5010            ("x", vec![vec![1.0, 0.0, 0.0], vec![0.0, 1.0, 0.0]]),
5011            ("y", vec![vec![0.0, 0.0, 1.0], vec![1.0, 0.0, 1.0]]),
5012        ];
5013        {
5014            let mut db = open(tmp.path());
5015            db.create_collection("docs", mv_desc()).unwrap();
5016            for (id, toks) in &corpus {
5017                db.upsert_document("docs", id, toks, &json!({})).unwrap();
5018            }
5019            db.checkpoint().unwrap();
5020        }
5021        // Reopen: the document grouping is rebuilt from the live rows.
5022        let mut db = open(tmp.path());
5023        assert_eq!(db.document_count("docs").unwrap(), 2);
5024        let params = SearchParams {
5025            k: 2,
5026            ..SearchParams::default()
5027        };
5028        let got = db.search_multi_vector("docs", &query, &params).unwrap();
5029        let expected = bf_rank(&query, &corpus);
5030        assert_eq!(
5031            got.iter().map(|m| m.id.clone()).collect::<Vec<_>>(),
5032            expected
5033                .iter()
5034                .map(|(id, _)| id.clone())
5035                .collect::<Vec<_>>()
5036        );
5037    }
5038
5039    #[test]
5040    fn multivector_delete_document_removes_all_tokens() {
5041        let tmp = tempfile::tempdir().unwrap();
5042        let mut db = open(tmp.path());
5043        db.create_collection("docs", mv_desc()).unwrap();
5044        db.upsert_document(
5045            "docs",
5046            "a",
5047            &[vec![1.0, 0.0, 0.0], vec![0.0, 1.0, 0.0]],
5048            &json!({}),
5049        )
5050        .unwrap();
5051        db.upsert_document("docs", "b", &[vec![0.0, 0.0, 1.0]], &json!({}))
5052            .unwrap();
5053        assert_eq!(db.document_count("docs").unwrap(), 2);
5054        assert_eq!(db.len("docs").unwrap(), 3);
5055
5056        assert!(db.delete_document("docs", "a").unwrap());
5057        assert_eq!(db.document_count("docs").unwrap(), 1);
5058        assert_eq!(db.len("docs").unwrap(), 1);
5059        assert!(db.get_document("docs", "a", false).unwrap().is_none());
5060        let params = SearchParams {
5061            k: 10,
5062            ..SearchParams::default()
5063        };
5064        let got = db
5065            .search_multi_vector("docs", &[vec![1.0, 0.0, 0.0]], &params)
5066            .unwrap();
5067        assert!(got.iter().all(|m| m.id != "a"));
5068        assert!(!db.delete_document("docs", "a").unwrap());
5069    }
5070
5071    #[test]
5072    fn multivector_reupsert_replaces_tokens() {
5073        let tmp = tempfile::tempdir().unwrap();
5074        let mut db = open(tmp.path());
5075        db.create_collection("docs", mv_desc()).unwrap();
5076        db.upsert_document(
5077            "docs",
5078            "a",
5079            &[
5080                vec![1.0, 0.0, 0.0],
5081                vec![0.0, 1.0, 0.0],
5082                vec![0.0, 0.0, 1.0],
5083            ],
5084            &json!({"v":1}),
5085        )
5086        .unwrap();
5087        assert_eq!(db.len("docs").unwrap(), 3);
5088        // Re-upsert with a single token: the trailing two must be gone.
5089        db.upsert_document("docs", "a", &[vec![0.0, 0.0, 1.0]], &json!({"v":2}))
5090            .unwrap();
5091        assert_eq!(db.document_count("docs").unwrap(), 1);
5092        assert_eq!(db.len("docs").unwrap(), 1);
5093        let doc = db.get_document("docs", "a", true).unwrap().unwrap();
5094        assert_eq!(doc.payload, Some(json!({"v":2})));
5095        assert_eq!(doc.vectors, Some(vec![vec![0.0, 0.0, 1.0]]));
5096    }
5097
5098    #[test]
5099    fn single_and_multi_vector_apis_are_mutually_exclusive() {
5100        let tmp = tempfile::tempdir().unwrap();
5101        let mut db = open(tmp.path());
5102        db.create_collection("mv", mv_desc()).unwrap();
5103        db.create_collection("sv", Descriptor::new(3, Dtype::F32, DistanceMetric::Cosine))
5104            .unwrap();
5105        // Single-vector ops on a multi-vector collection are rejected.
5106        assert!(matches!(
5107            db.upsert("mv", "a", &[1.0, 0.0, 0.0], &json!({})),
5108            Err(Error::Unsupported(_))
5109        ));
5110        assert!(matches!(
5111            db.search("mv", &[1.0, 0.0, 0.0], &SearchParams::default()),
5112            Err(Error::Unsupported(_))
5113        ));
5114        // Document ops on a single-vector collection are rejected.
5115        assert!(matches!(
5116            db.upsert_document("sv", "a", &[vec![1.0, 0.0, 0.0]], &json!({})),
5117            Err(Error::Unsupported(_))
5118        ));
5119        assert!(matches!(
5120            db.search_multi_vector("sv", &[vec![1.0, 0.0, 0.0]], &SearchParams::default()),
5121            Err(Error::Unsupported(_))
5122        ));
5123        assert!(matches!(
5124            db.document_count("sv"),
5125            Err(Error::Unsupported(_))
5126        ));
5127    }
5128
5129    #[test]
5130    fn multivector_rejects_l2_metric_and_bad_documents() {
5131        let tmp = tempfile::tempdir().unwrap();
5132        let mut db = open(tmp.path());
5133        let l2 = Descriptor::new(3, Dtype::F32, DistanceMetric::L2).with_multivector(true);
5134        assert!(matches!(
5135            db.create_collection("bad", l2),
5136            Err(Error::Unsupported(_))
5137        ));
5138
5139        db.create_collection("docs", mv_desc()).unwrap();
5140        // A document id may not contain the reserved separator.
5141        assert!(matches!(
5142            db.upsert_document("docs", "a\u{1f}b", &[vec![1.0, 0.0, 0.0]], &json!({})),
5143            Err(Error::Unsupported(_))
5144        ));
5145        // A document needs at least one vector, of the right dimensionality.
5146        assert!(matches!(
5147            db.upsert_document("docs", "a", &[], &json!({})),
5148            Err(Error::Unsupported(_))
5149        ));
5150        assert!(matches!(
5151            db.upsert_document("docs", "a", &[vec![1.0, 0.0]], &json!({})),
5152            Err(Error::Unsupported(_))
5153        ));
5154    }
5155
5156    #[test]
5157    fn snapshot_then_open_reproduces_the_database() {
5158        let src = tempfile::tempdir().unwrap();
5159        let mut db = open(src.path());
5160        db.create_collection("kb", desc()).unwrap();
5161        db.create_collection("kb2", desc()).unwrap();
5162        db.upsert("kb", "a", &[1.0, 0.0, 0.0, 0.0], &json!({ "n": 1 }))
5163            .unwrap();
5164        db.upsert("kb", "b", &[0.0, 1.0, 0.0, 0.0], &json!({ "n": 2 }))
5165            .unwrap();
5166        db.upsert("kb2", "z", &[0.0, 0.0, 1.0, 0.0], &json!({ "n": 3 }))
5167            .unwrap();
5168
5169        let dest = tempfile::tempdir().unwrap();
5170        let snap_dir = dest.path().join("snap");
5171        let info = db.snapshot(&snap_dir).unwrap();
5172        assert!(info.files > 0 && info.bytes > 0);
5173        assert_eq!(info.manifest_version, db.manifest_version());
5174
5175        // A write after the snapshot must not appear in the snapshot.
5176        db.upsert("kb", "late", &[1.0, 1.0, 0.0, 0.0], &json!({ "n": 9 }))
5177            .unwrap();
5178
5179        let restored = open(&snap_dir);
5180        let mut names = restored.collection_names();
5181        names.sort();
5182        assert_eq!(names, vec!["kb".to_owned(), "kb2".to_owned()]);
5183        assert_eq!(restored.len("kb").unwrap(), 2, "no post-snapshot write");
5184        assert_eq!(
5185            restored.get("kb", "a").unwrap().unwrap().payload,
5186            Some(json!({ "n": 1 }))
5187        );
5188        assert_eq!(restored.len("kb2").unwrap(), 1);
5189        assert!(restored.get("kb", "late").unwrap().is_none());
5190    }
5191
5192    #[test]
5193    fn snapshot_refuses_an_existing_destination() {
5194        let src = tempfile::tempdir().unwrap();
5195        let mut db = open(src.path());
5196        let dest = tempfile::tempdir().unwrap(); // already exists
5197        assert!(matches!(
5198            db.snapshot(dest.path()),
5199            Err(Error::Core(quiver_core::CoreError::AlreadyExists(_)))
5200        ));
5201    }
5202
5203    #[test]
5204    fn restore_snapshot_roundtrips_and_guards() {
5205        let src = tempfile::tempdir().unwrap();
5206        let mut db = open(src.path());
5207        db.create_collection("kb", desc()).unwrap();
5208        db.upsert("kb", "a", &[1.0, 0.0, 0.0, 0.0], &json!({ "n": 1 }))
5209            .unwrap();
5210        let work = tempfile::tempdir().unwrap();
5211        let snap_dir = work.path().join("snap");
5212        db.snapshot(&snap_dir).unwrap();
5213
5214        // Restore into a fresh directory, then open it.
5215        let restored_dir = work.path().join("restored");
5216        let info = restore_snapshot(&snap_dir, &restored_dir).unwrap();
5217        assert!(info.files > 0);
5218        let restored = open(&restored_dir);
5219        assert_eq!(restored.len("kb").unwrap(), 1);
5220
5221        // Restoring over an existing directory is refused.
5222        assert!(matches!(
5223            restore_snapshot(&snap_dir, &restored_dir),
5224            Err(Error::Core(quiver_core::CoreError::AlreadyExists(_)))
5225        ));
5226        // A directory that is not a snapshot (no CURRENT) is rejected.
5227        let not_snap = work.path().join("not-a-snapshot");
5228        std::fs::create_dir_all(&not_snap).unwrap();
5229        assert!(matches!(
5230            restore_snapshot(&not_snap, &work.path().join("out")),
5231            Err(Error::Core(quiver_core::CoreError::InvalidArgument(_)))
5232        ));
5233    }
5234}