Skip to main content

quiver_embed/
lib.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2//! The embeddable, in-process Quiver database handle.
3//!
4//! [`Database`] composes the storage engine ([`quiver_core::Store`]) with a
5//! per-collection vector index and payload filtering ([`quiver_query::Filter`])
6//! into one handle. It exposes the same logical operations the server speaks
7//! (`docs/api/wire-protocol.md`), so library mode and server mode exercise
8//! identical engine semantics — the server is a thin transport/policy shell.
9//!
10//! ## Index lifecycle
11//! The store is the source of truth. Each collection chooses its index via the
12//! descriptor's [`IndexSpec`] (default in-memory HNSW); the index is built from
13//! the store on open. HNSW applies new-id inserts incrementally; once an IVF
14//! index is built it applies inserts, in-place updates, and deletes
15//! incrementally with LIRE rebalancing (ADR-0023). The Vamana / disk graph
16//! family is maintained the FreshDiskANN way (ADR-0033): the batch-built graph
17//! is a read-only base, recent inserts land in an in-memory delta graph, and
18//! deletes are tombstoned, so writes are size-independent; when the pending work
19//! grows past a fixed fraction of the base the next access consolidates by
20//! rebuilding from the store. All indexes stay derived (rebuilt from the store
21//! on open), so the crash gate never sees an index write.
22//!
23//! ## Filtered (hybrid) search
24//! A search may carry a [`quiver_query::Filter`] over the payload. The planner
25//! decomposes it into the predicates the collection's secondary indexes can
26//! answer; when those narrow the query to a small candidate set it scans that
27//! set exactly (perfect recall, no filtered-ANN cliff), and otherwise it
28//! over-fetches from the ANN index and post-filters. Both arms re-check the full
29//! filter, so results are exact regardless of which path runs.
30//!
31//! ## Concurrency (ADR-0057 / ADR-0062)
32//! Single-writer. Writes take `&mut self`. Reads come in two flavors: the
33//! `&mut self` convenience methods (`search`, `hybrid_search`,
34//! `search_multi_vector`) rebuild a stale index in place and so give embedded,
35//! single-threaded callers read-your-writes; the `&self` `*_snapshot` methods
36//! read the current immutable snapshot and run **concurrently**, serving the
37//! *prior* snapshot when a write deferred a rebuild (snapshot-isolated, slightly
38//! stale). A server therefore serves concurrent reads behind a reader–writer lock,
39//! and rebuilds **off** the exclusive lock (ADR-0062): it captures the rebuild
40//! inputs under the shared lock ([`Database::snapshot_rebuild_inputs`]), builds the
41//! new index with no lock held ([`RebuildInputs::build`]), and installs it under a
42//! brief write lock ([`Database::commit_rebuild`]) — so a rebuild never stalls
43//! concurrent readers.
44
45use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
46use std::path::Path;
47use std::sync::Arc;
48
49use arc_swap::ArcSwap;
50use quiver_core::{SecPredicate, SecValue, Store};
51use quiver_index::{
52    ColbertConfig, ColbertIndex, DiskVamana, FreshDiskVamana, FreshVamana, Hnsw, HnswConfig, Index,
53    Ivf, IvfConfig, Metric, Neighbor, ProductQuantizer, Vamana, VamanaConfig, max_sim,
54    ordering_distance, report_metric,
55};
56use serde::{Deserialize, Serialize};
57use serde_json::Value;
58use thiserror::Error;
59
60pub use quiver_core::keyring::{KeyRing, SingleCodecKeyRing};
61pub use quiver_core::page::PageCodec;
62pub use quiver_core::{CollectionId, CommitObserver, WalEntry, WalOp};
63pub use quiver_core::{
64    Descriptor, DistanceMetric, Dtype, FieldType, FilterableField, IndexKind, IndexSpec,
65    VectorEncryption,
66};
67pub use quiver_query::Filter;
68pub use quiver_query::{
69    BM25_B, BM25_K1, DEFAULT_RRF_K0, SPARSE_KEY, SparseInvertedIndex, SparseVector, TEXT_KEY,
70    query_term_ids, rrf_fuse, text_to_sparse,
71};
72
73/// Errors returned by the embeddable database.
74#[derive(Debug, Error)]
75#[non_exhaustive]
76pub enum Error {
77    /// An error from the storage engine (includes not-found / already-exists /
78    /// invalid-argument from the catalog and write path).
79    #[error(transparent)]
80    Core(#[from] quiver_core::CoreError),
81    /// An error from the vector index.
82    #[error(transparent)]
83    Index(#[from] quiver_index::IndexError),
84    /// An error from the disk-resident index (build, open, or query).
85    #[error(transparent)]
86    Disk(#[from] quiver_index::DiskError),
87    /// A payload could not be (de)serialized as JSON.
88    #[error("payload json error: {0}")]
89    Json(#[from] serde_json::Error),
90    /// The named collection is not loaded in this database.
91    #[error("collection not found: {0}")]
92    CollectionNotFound(String),
93    /// The requested index / metric combination is not supported.
94    #[error("unsupported configuration: {0}")]
95    Unsupported(&'static str),
96    /// A durable index snapshot could not be restored (ADR-0025); the caller
97    /// falls back to rebuilding from the store, so this does not surface to users.
98    #[error(transparent)]
99    IndexSnapshot(#[from] quiver_index::SnapshotError),
100    /// An index snapshot envelope could not be (de)serialized.
101    #[error("index snapshot envelope: {0}")]
102    Envelope(#[from] postcard::Error),
103}
104
105/// Result alias for database operations.
106pub type Result<T> = std::result::Result<T, Error>;
107
108/// What a [`Database::snapshot`] captured (ADR-0050): the catalog generation and
109/// the number of files / bytes copied.
110#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
111pub struct SnapshotInfo {
112    /// The manifest version the snapshot reflects (its consistent LSN anchor).
113    pub manifest_version: u64,
114    /// Number of files copied into the snapshot directory.
115    pub files: u64,
116    /// Total bytes copied.
117    pub bytes: u64,
118}
119
120/// A single search or fetch result.
121#[derive(Debug, Clone, PartialEq)]
122pub struct Match {
123    /// External id of the point.
124    pub id: String,
125    /// Distance / similarity under the collection metric (0 for a direct fetch).
126    pub score: f32,
127    /// The payload, if requested.
128    pub payload: Option<Value>,
129    /// The vector, if requested.
130    pub vector: Option<Vec<f32>>,
131}
132
133/// A multi-vector (late-interaction / ColBERT) document result: a document id, its
134/// MaxSim relevance, the payload, and — if requested — the document's token
135/// vectors (ADR-0028).
136#[derive(Debug, Clone, PartialEq)]
137pub struct DocumentMatch {
138    /// Document id.
139    pub id: String,
140    /// MaxSim relevance score (0 for a direct fetch).
141    pub score: f32,
142    /// The document payload, if requested / present (stored on the anchor token).
143    pub payload: Option<Value>,
144    /// The document's token vectors, if requested.
145    pub vectors: Option<Vec<Vec<f32>>>,
146}
147
148// A candidate document during multi-vector re-ranking, before it becomes a
149// [`DocumentMatch`]: `(MaxSim score, document id, anchor payload, token vectors
150// when requested)`. Named so the re-rank buffer stays under clippy's
151// type-complexity threshold.
152type ScoredDocument = (f32, String, Option<Value>, Option<Vec<Vec<f32>>>);
153
154/// Parameters for a [`Database::search`].
155#[derive(Debug, Clone)]
156pub struct SearchParams {
157    /// Number of results to return.
158    pub k: usize,
159    /// Optional payload predicate. The planner pre-filters through the
160    /// collection's secondary indexes when the filter is selective enough, and
161    /// otherwise post-filters the ANN candidates; either way the full predicate
162    /// is re-checked, so results are exact.
163    pub filter: Option<Filter>,
164    /// Search beam width (recall/latency knob), clamped up to at least `k`.
165    pub ef_search: usize,
166    /// Include payloads in the results.
167    pub with_payload: bool,
168    /// Include vectors in the results.
169    pub with_vector: bool,
170}
171
172impl Default for SearchParams {
173    fn default() -> Self {
174        Self {
175            k: 10,
176            filter: None,
177            ef_search: 64,
178            with_payload: true,
179            with_vector: false,
180        }
181    }
182}
183
184// How many extra candidates to pull before post-filtering, so a filtered query
185// still has enough survivors to fill `k`.
186const FILTER_OVERFETCH: usize = 8;
187
188// Hybrid search (ADR-0043) pulls `k * RRF_CANDIDATE_FACTOR` (at least
189// `MIN_RRF_CANDIDATES`) candidates from each side before Reciprocal Rank Fusion,
190// so a document ranked outside the top `k` on one side can still surface via the
191// other.
192const RRF_CANDIDATE_FACTOR: usize = 10;
193const MIN_RRF_CANDIDATES: usize = 100;
194
195// Selectivity threshold for the hybrid planner. When a payload filter decomposes
196// into secondary-index predicates that narrow a query to at most this many live
197// candidate rows, those rows are scanned exactly (brute force) instead of going
198// through the ANN index. Below this size an exact scan is both cheaper and
199// higher-recall than filtered ANN, which can return too few results when the
200// filter is very selective (the "filtered-search recall cliff"). Qdrant calls
201// the equivalent knob its full-scan threshold.
202const FULL_SCAN_THRESHOLD: usize = 10_000;
203
204// Soft-deleted fraction at which a built HNSW is rebuilt from the store to
205// reclaim its tombstoned graph nodes (ADR-0026). Below it, a delete is an O(1)
206// soft-delete; at it, the next access rebuilds.
207const HNSW_REBUILD_DELETED_FRACTION: f64 = 0.2;
208
209/// Pending graph work — delta inserts plus tombstones — at which a FreshDiskANN
210/// graph collection consolidates: the next access rebuilds the base from the
211/// store, reclaiming tombstones and folding in the delta (ADR-0033). Below it,
212/// inserts go to the in-memory delta and deletes are `O(1)` tombstones. Mirrors
213/// [`HNSW_REBUILD_DELETED_FRACTION`].
214const GRAPH_REBUILD_PENDING_FRACTION: f64 = 0.2;
215
216// The vector index backing one collection. HNSW and (once built) IVF are
217// maintained incrementally; the Vamana and disk graphs are batch-built from the
218// store (the `Option` is `None` until first build) and then maintained
219// incrementally the FreshDiskANN way — a read-only base graph plus an in-memory
220// delta and deletion set, consolidated by a rebuild past a churn threshold
221// (ADR-0033).
222enum CollectionIndex {
223    // A fetch-only collection with no server-side ANN index: a client-side-encrypted
224    // collection (ADR-0032) stores opaque ciphertext the server never ranks, so the
225    // client fetches points and ranks locally.
226    None,
227    Hnsw(Hnsw),
228    Vamana(Option<FreshVamana>),
229    Ivf(Option<Ivf>),
230    // The disk-resident DiskANN index: PQ codes in RAM, graph + full vectors on
231    // (encrypted) SSD, exact re-rank (ADR-0019), with a FreshDiskANN in-memory
232    // delta layered on top so the on-disk artifact stays immutable.
233    Disk(Option<FreshDiskVamana>),
234    // The ColBERTv2/PLAID compressed token-pool index for a multi-vector
235    // collection (ADR-0034): centroid + residual-PQ codes with centroid-pruned
236    // candidate generation.
237    Colbert(Option<ColbertIndex>),
238}
239
240impl CollectionIndex {
241    // Search, mapping the generic `ef` knob onto each index's search width.
242    fn search(&self, query: &[f32], k: usize, ef: usize) -> Result<Vec<Neighbor>> {
243        Ok(match self {
244            CollectionIndex::Hnsw(h) => h.search(query, k, ef)?,
245            CollectionIndex::Vamana(Some(g)) => g.search(query, k, ef)?,
246            CollectionIndex::Ivf(Some(i)) => i.search(query, k, ef)?,
247            CollectionIndex::Disk(Some(d)) => d.search(query, k, ef)?,
248            CollectionIndex::Colbert(Some(c)) => c.search(query, k, ef)?,
249            CollectionIndex::None
250            | CollectionIndex::Vamana(None)
251            | CollectionIndex::Ivf(None)
252            | CollectionIndex::Disk(None)
253            | CollectionIndex::Colbert(None) => Vec::new(),
254        })
255    }
256}
257
258// === Lock-free MVCC serving snapshot (ADR-0064, increment 1) ===
259//
260// The single writer mutates indexes in place under the write lock, so a reader
261// cannot share the live index by `Arc` and read it lock-free — that is a data
262// race. Instead, in MVCC mode the writer publishes an immutable
263// [`CollectionSnapshot`] (the base index as of the last rebuild + an [`Overlay`]
264// of writes since) into an [`ArcSwap`]; readers `load()` it without a lock. The
265// overlay is index-kind-agnostic and bounded by the rebuild cadence, so it never
266// rewrites the index and a republish costs O(overlay), not O(index). MVCC changes
267// **visibility, not durability**: the WAL-fsync acknowledgement and the `kill -9`
268// crash gate are untouched (the overlay is derived from the same WAL the store
269// already replays).
270
271/// Per-collection lock-free serving snapshot pointer: the single writer
272/// `store`s a new [`CollectionSnapshot`]; readers `load` one without a lock.
273/// (`ArcSwap<T>` stores an `Arc<T>` internally, so this is one `Arc` per load.)
274pub type SnapshotCell = Arc<ArcSwap<CollectionSnapshot>>;
275
276/// Writes accumulated since a [`CollectionSnapshot`]'s base index was published.
277/// A lock-free read brute-scans these recent vectors and merges them with the
278/// base-index search, dropping tombstoned ids. Cloned (O(overlay)) on each write
279/// and reset to empty when a rebuild folds it into a fresh base.
280#[derive(Default, Clone)]
281struct Overlay {
282    // (vector, external id) for points upserted since the base, in id order: the
283    // j-th entry's internal id is `base_len + j`.
284    upserts: Vec<(Arc<[f32]>, String)>,
285    // Internal ids (base or overlay) deleted or superseded since the base; their
286    // hits are dropped from a search.
287    tombstones: HashSet<u64>,
288}
289
290/// An immutable, lock-free-readable view of a single-vector collection (ADR-0064):
291/// the base index as of the last rebuild, the base id map, and the overlay of
292/// writes since. Obtained via [`Database::collection_snapshot`] and read with
293/// [`CollectionSnapshot::search`]; a read is snapshot-isolated — it sees one
294/// consistent `(base, overlay)` pair, and a write that lands mid-read is simply
295/// the next snapshot.
296pub struct CollectionSnapshot {
297    base: Arc<CollectionIndex>,
298    base_int_to_ext: Arc<Vec<String>>,
299    base_len: u64,
300    overlay: Arc<Overlay>,
301    metric: Metric,
302}
303
304impl CollectionSnapshot {
305    // An empty snapshot for a freshly created/opened collection (no base yet); the
306    // writer publishes a real one at the first rebuild. Reading it yields no hits.
307    fn empty(metric: Metric) -> Self {
308        Self {
309            base: Arc::new(CollectionIndex::None),
310            base_int_to_ext: Arc::new(Vec::new()),
311            base_len: 0,
312            overlay: Arc::new(Overlay::default()),
313            metric,
314        }
315    }
316
317    // Map an internal id to its external id: base ids index the base map, overlay
318    // ids (`>= base_len`) index the overlay's upserts.
319    fn ext_id(&self, internal: u64) -> Option<&str> {
320        if internal < self.base_len {
321            self.base_int_to_ext
322                .get(internal as usize)
323                .map(String::as_str)
324        } else {
325            self.overlay
326                .upserts
327                .get((internal - self.base_len) as usize)
328                .map(|(_, e)| e.as_str())
329        }
330    }
331
332    /// Lock-free nearest-neighbor search over the base index merged with the
333    /// overlay (ADR-0064 increment 1) — **pure vector reads**: no payload filter
334    /// and no payload/vector fetch (those need the store and land in increment 2).
335    /// Returns the `k` nearest live points, closest first, scored in the true
336    /// collection metric — identical ordering and scores to the locked
337    /// [`Database::search_snapshot`] path for the same case.
338    ///
339    /// # Errors
340    /// Propagates an index search error.
341    pub fn search(&self, query: &[f32], k: usize, ef_search: usize) -> Result<Vec<Match>> {
342        if k == 0 {
343            return Ok(Vec::new());
344        }
345        // Collect candidates in "smaller is closer" ordering space (uniform across
346        // metrics — `score::ordering_distance`), dropping tombstoned ids.
347        let mut cands: Vec<(f32, u64)> = Vec::new();
348        for n in self.base.search(query, k, ef_search)? {
349            if !self.overlay.tombstones.contains(&n.id) {
350                // `Neighbor.distance` is the reported metric; `report_metric` is its
351                // own inverse (identity for L2, negation for similarities), so it
352                // maps the reported value back to the ordering key.
353                cands.push((report_metric(self.metric, n.distance), n.id));
354            }
355        }
356        for (j, (vector, _)) in self.overlay.upserts.iter().enumerate() {
357            let internal = self.base_len + j as u64;
358            if !self.overlay.tombstones.contains(&internal) {
359                cands.push((ordering_distance(self.metric, query, vector), internal));
360            }
361        }
362        cands.sort_by(|a, b| a.0.total_cmp(&b.0));
363        cands.truncate(k);
364        let mut out = Vec::with_capacity(cands.len());
365        for (ordering, internal) in cands {
366            if let Some(ext) = self.ext_id(internal) {
367                out.push(Match {
368                    id: ext.to_owned(),
369                    score: report_metric(self.metric, ordering),
370                    payload: None,
371                    vector: None,
372                });
373            }
374        }
375        Ok(out)
376    }
377}
378
379// A fresh, empty snapshot cell for a collection's metric — the initial value of
380// `CollectionHandle::snapshot` (the writer publishes a real base at the first
381// rebuild when MVCC is on; left untouched, at one tiny allocation, when off).
382fn empty_snapshot(descriptor: &Descriptor) -> SnapshotCell {
383    let metric = to_index_metric(descriptor.metric);
384    Arc::new(ArcSwap::from_pointee(CollectionSnapshot::empty(metric)))
385}
386
387// Whether a collection *kind* can be served by the lock-free MVCC snapshot path
388// (ADR-0064 increment 1), independent of the flag: single-vector, server-searchable,
389// and an **in-memory** index. Disk-resident indexes are excluded for now — their
390// `mmap` assumes an immutable file, which a superseded snapshot still referencing
391// the old file would violate when a rebuild seals a new one (a later increment
392// versions the file).
393fn mvcc_eligible(descriptor: &Descriptor) -> bool {
394    !descriptor.multivector
395        && descriptor.vector_encryption != VectorEncryption::ClientSide
396        && descriptor.index.kind != IndexKind::DiskVamana
397}
398
399// Whether a collection is *currently* served via the snapshot: eligible and the
400// flag is on.
401fn mvcc_served(handle: &CollectionHandle) -> bool {
402    handle.mvcc && mvcc_eligible(&handle.descriptor)
403}
404
405// Republish a collection's snapshot reusing the immutable base + id map, swapping
406// in a freshly-extended overlay (the per-write O(overlay) cost).
407fn publish_overlay(handle: &CollectionHandle, prior: &CollectionSnapshot, overlay: Arc<Overlay>) {
408    handle.snapshot.store(Arc::new(CollectionSnapshot {
409        base: prior.base.clone(),
410        base_int_to_ext: prior.base_int_to_ext.clone(),
411        base_len: prior.base_len,
412        overlay,
413        metric: prior.metric,
414    }));
415}
416
417// Move a freshly rebuilt index out of `handle.index` into a new published
418// snapshot with an empty overlay (the base now absorbs all prior writes). Leaves
419// `handle.index` empty — in MVCC mode the live base lives in the snapshot.
420fn publish_base(handle: &mut CollectionHandle) {
421    let base = std::mem::replace(&mut handle.index, empty_index(&handle.descriptor));
422    let metric = to_index_metric(handle.descriptor.metric);
423    handle.snapshot.store(Arc::new(CollectionSnapshot {
424        base: Arc::new(base),
425        base_int_to_ext: Arc::new(handle.int_to_ext.clone()),
426        base_len: handle.int_to_ext.len() as u64,
427        overlay: Arc::new(Overlay::default()),
428        metric,
429    }));
430}
431
432// Overlay size at which an MVCC write defers a consolidating rebuild: ~20% churn
433// over the base (the same threshold that already triggers consolidation), with a
434// floor so a tiny base does not rebuild on every write.
435fn overlay_rebuild_threshold(base_len: u64) -> u64 {
436    (base_len / 5).max(1024)
437}
438
439// MVCC-mode single-vector upsert (ADR-0064): append to the published overlay
440// instead of mutating the immutable base, so lock-free readers stay race-free.
441// `ponytail`: republishes per write — O(overlay) each; coalesce per batch if
442// batched upserts under MVCC ever dominate.
443fn overlay_upsert(handle: &mut CollectionHandle, ext_id: &str, vector: &[f32]) {
444    bump_write_gen(handle);
445    let cur = handle.snapshot.load_full();
446    let mut overlay = cur.overlay.as_ref().clone();
447    // An update supersedes the prior copy (in the base or the overlay): tombstone it.
448    if let Some(&old) = handle.ext_to_int.get(ext_id) {
449        overlay.tombstones.insert(old);
450    }
451    let internal = cur.base_len + overlay.upserts.len() as u64;
452    overlay.upserts.push((Arc::from(vector), ext_id.to_owned()));
453    handle.ext_to_int.insert(ext_id.to_owned(), internal);
454    handle.int_to_ext.push(ext_id.to_owned());
455    let crowded = overlay.upserts.len() as u64 >= overlay_rebuild_threshold(cur.base_len);
456    publish_overlay(handle, &cur, Arc::new(overlay));
457    // Defer a consolidating rebuild once the overlay is large (the server runs it
458    // off-lock; the embedded `&mut` search rebuilds synchronously).
459    if crowded {
460        handle.stale = true;
461    }
462}
463
464// MVCC-mode single-vector delete (ADR-0064): tombstone the id in the published
465// overlay; the base stays immutable.
466fn overlay_delete(handle: &mut CollectionHandle, ext_id: &str) {
467    bump_write_gen(handle);
468    let Some(&internal) = handle.ext_to_int.get(ext_id) else {
469        return;
470    };
471    let cur = handle.snapshot.load_full();
472    let mut overlay = cur.overlay.as_ref().clone();
473    overlay.tombstones.insert(internal);
474    publish_overlay(handle, &cur, Arc::new(overlay));
475}
476
477/// On-disk envelope (ADR-0025) for a durable IVF snapshot: the `Ivf` bytes plus
478/// the internal->external id mapping they are addressed by, postcard-encoded and
479/// handed to the store as one opaque blob. On open the envelope is decoded, the
480/// `Ivf` restored, and the post-checkpoint WAL tail replayed. A decode/version
481/// error means "rebuild from the store" — the snapshot is only ever a fast path.
482#[derive(Serialize, Deserialize)]
483struct IndexEnvelope {
484    version: u16,
485    int_to_ext: Vec<String>,
486    ivf: Vec<u8>,
487}
488
489// Envelope format version, independent of the product SemVer (and of the inner
490// `Ivf` snapshot version); a mismatch falls back to a rebuild.
491const INDEX_ENVELOPE_VERSION: u16 = 1;
492
493/// On-disk envelope (ADR-0063) for a durable DiskVamana snapshot. Unlike the IVF
494/// envelope, the bulk (graph + full vectors) stays in the immutable `mmap`-ed
495/// base file (`vamana.qvx`); this blob carries only what ties that base to the
496/// live state — the base point count (validated against the opened file), the
497/// FreshDiskANN tombstones, and the id map. The delta vectors are *not* stored:
498/// the delta ids are implied as `[base_row_count, int_to_ext.len())` and their
499/// vectors re-fetched from the store on open, so the blob stays O(delta ids), not
500/// O(N) vectors. A decode/version/validation error means "rebuild from the store".
501#[derive(Serialize, Deserialize)]
502struct DiskEnvelope {
503    version: u16,
504    int_to_ext: Vec<String>,
505    base_row_count: u64,
506    deleted_ids: Vec<u64>,
507}
508
509struct CollectionHandle {
510    id: CollectionId,
511    descriptor: Descriptor,
512    index: CollectionIndex,
513    int_to_ext: Vec<String>,
514    ext_to_int: HashMap<String, u64>,
515    stale: bool,
516    // Monotonic per-collection write counter, bumped every time a write defers a
517    // rebuild (`mark_stale`). An off-lock rebuild (ADR-0062) captures it before
518    // building and re-checks it at commit: if it advanced, a write landed during
519    // the build, so the freshly built index is already behind — the commit installs
520    // it (still newer than the prior snapshot) but leaves the handle stale so the
521    // next rebuild catches up. Wrapping is fine: equality, not ordering, is what the
522    // commit checks.
523    write_gen: u64,
524    // For a multi-vector (ColBERT) collection: each document id mapped to its
525    // token count, so a re-rank can gather all of a document's token rows
526    // (`<doc-id><US><ordinal>`) and `document_count` is O(1). `None` for a
527    // single-vector collection. Maintained eagerly on document writes and rebuilt
528    // authoritatively from the store on open / rebuild; never persisted (ADR-0028).
529    docs: Option<BTreeMap<String, u32>>,
530    // Derived inverted index over the collection's `__quiver_sparse__` payloads,
531    // for the sparse half of hybrid search (ADR-0045). `Some` for a single-vector,
532    // server-searchable collection; `None` for multi-vector and client-side-encrypted
533    // collections (which never run hybrid search) — and as a backstop, when `None`
534    // the sparse ranking falls back to a full store scan. Built on rebuild from the
535    // store and maintained incrementally on upsert/delete; never persisted.
536    sparse: Option<SparseInvertedIndex>,
537    // Whether this collection is served via the lock-free MVCC snapshot (ADR-0064);
538    // mirrors the database-wide flag, set at creation and by `set_mvcc_reads`. When
539    // off, the `snapshot` cell is never republished and the in-place RwLock read
540    // path is used — zero overhead.
541    mvcc: bool,
542    // The lock-free serving snapshot cell (ADR-0064). Initialized empty; the writer
543    // publishes a real base at each rebuild and an extended overlay per write when
544    // `mvcc` is on. Never persisted (rebuilt on open).
545    snapshot: SnapshotCell,
546}
547
548// Whether a collection should carry a derived sparse inverted index: only
549// single-vector, server-searchable collections run hybrid search (ADR-0045).
550fn uses_sparse_index(descriptor: &Descriptor) -> bool {
551    !descriptor.multivector && descriptor.vector_encryption != VectorEncryption::ClientSide
552}
553
554// Mark a collection's index stale: a write the index could not absorb in place
555// defers a full rebuild. Also bumps the write generation (every staleness-marking
556// write is a write).
557fn mark_stale(handle: &mut CollectionHandle) {
558    handle.stale = true;
559    bump_write_gen(handle);
560}
561
562// Bump a collection's monotonic write generation. Called on **every** write that
563// touches the store — including writes that land while the index is already stale,
564// which the incremental maintenance skips. The off-lock rebuild (ADR-0062) captures
565// this before scanning and re-checks it at commit: if it advanced, a write landed
566// during the build, so the freshly built index is already behind and the commit
567// leaves the collection stale for the next rebuild. Wrapping is fine — the commit
568// checks equality, not ordering — and over-counting is harmless (at worst one extra
569// rebuild); only *missing* a bump could lose a write.
570fn bump_write_gen(handle: &mut CollectionHandle) {
571    handle.write_gen = handle.write_gen.wrapping_add(1);
572}
573
574/// An in-process Quiver database over one data directory.
575pub struct Database {
576    store: Store,
577    collections: HashMap<String, CollectionHandle>,
578    // Lock-free MVCC reads (ADR-0064), default off. Defaulted from
579    // `QUIVER_MVCC_READS` at open and overridable via `set_mvcc_reads`. When off,
580    // the proven in-place RwLock read path is used with zero overhead.
581    mvcc: bool,
582}
583
584// Whether `QUIVER_MVCC_READS` requests lock-free MVCC reads (ADR-0064).
585fn mvcc_from_env() -> bool {
586    std::env::var("QUIVER_MVCC_READS")
587        .map(|v| matches!(v.as_str(), "1" | "true" | "TRUE" | "on" | "yes"))
588        .unwrap_or(false)
589}
590
591impl Database {
592    /// Open (creating if absent) the database at `dir` with encryption-at-rest
593    /// disabled, rebuilding each collection's index from the store.
594    pub fn open(dir: &Path) -> Result<Self> {
595        Self::from_store(Store::open(dir)?)
596    }
597
598    /// Open the database with a specific page codec — used to enable
599    /// encryption-at-rest by passing `quiver-crypto`'s AEAD codec. Mirrors
600    /// [`quiver_core::Store::open_with_codec`]; the codec seals both paged files
601    /// and the WAL, so no plaintext user data reaches the disk.
602    pub fn open_with_codec(dir: &Path, codec: Box<dyn PageCodec>) -> Result<Self> {
603        Self::from_store(Store::open_with_codec(dir, codec)?)
604    }
605
606    /// Open the database with a [`KeyRing`], the seam that lets `quiver-crypto`'s
607    /// envelope key-ring seal each collection under its own data-encryption key
608    /// (enabling crypto-shredding). Mirrors
609    /// [`quiver_core::Store::open_with_keyring`].
610    pub fn open_with_keyring(dir: &Path, keyring: Box<dyn KeyRing>) -> Result<Self> {
611        Self::from_store(Store::open_with_keyring(dir, keyring)?)
612    }
613
614    // Build the in-memory handles (and their HNSW indexes) over an opened store.
615    fn from_store(store: Store) -> Result<Self> {
616        let mvcc = mvcc_from_env();
617        let mut collections = HashMap::new();
618        for name in store.collection_names() {
619            let Some(id) = store.collection_id(&name) else {
620                continue;
621            };
622            let Some(descriptor) = store.descriptor(id).cloned() else {
623                continue;
624            };
625            let snapshot = empty_snapshot(&descriptor);
626            let mut handle = CollectionHandle {
627                id,
628                index: empty_index(&descriptor),
629                descriptor,
630                int_to_ext: Vec::new(),
631                ext_to_int: HashMap::new(),
632                stale: true,
633                write_gen: 0,
634                docs: None,
635                // Populated by `load_index` / `rebuild_index` from the store.
636                sparse: None,
637                mvcc,
638                snapshot,
639            };
640            load_index(&store, &mut handle)?;
641            // MVCC mode: the lock-free base lives in the snapshot, which `load_index`
642            // does not populate. Force a rebuild on first read so the base is
643            // published (a restored durable IVF would otherwise leave the snapshot
644            // empty); MVCC trades the durable-index fast reopen for the snapshot.
645            if mvcc_served(&handle) {
646                handle.stale = true;
647            }
648            collections.insert(name, handle);
649        }
650        Ok(Self {
651            store,
652            collections,
653            mvcc,
654        })
655    }
656
657    /// Create a collection. Errors if the name already exists, or if the index
658    /// specification is unsupported for the metric.
659    pub fn create_collection(&mut self, name: &str, descriptor: Descriptor) -> Result<()> {
660        validate_index(&descriptor)?;
661        let id = self.store.create_collection(name, descriptor.clone())?;
662        let index = empty_index(&descriptor);
663        let docs = descriptor.multivector.then(BTreeMap::new);
664        // A fresh single-vector collection starts with an empty inverted index
665        // maintained incrementally from the first upsert (an empty index allocates
666        // nothing until a sparse vector arrives).
667        let sparse = uses_sparse_index(&descriptor).then(SparseInvertedIndex::new);
668        let snapshot = empty_snapshot(&descriptor);
669        let mvcc = self.mvcc;
670        self.collections.insert(
671            name.to_owned(),
672            CollectionHandle {
673                id,
674                descriptor,
675                index,
676                int_to_ext: Vec::new(),
677                ext_to_int: HashMap::new(),
678                stale: false,
679                write_gen: 0,
680                docs,
681                sparse,
682                mvcc,
683                snapshot,
684            },
685        );
686        Ok(())
687    }
688
689    /// Drop a collection and its data. Returns whether it existed.
690    pub fn drop_collection(&mut self, name: &str) -> Result<bool> {
691        let existed = self.store.drop_collection(name)?;
692        self.collections.remove(name);
693        Ok(existed)
694    }
695
696    /// Crypto-shred a collection: drop it and destroy its data-encryption key, so
697    /// its sealed data is unrecoverable even with the master key, then reclaim
698    /// its files. Mirrors [`quiver_core::Store::shred_collection`]; with an
699    /// envelope key-ring this is irreversible erasure, with a single-codec
700    /// key-ring it is `drop` plus a checkpoint. Returns whether it existed.
701    pub fn shred_collection(&mut self, name: &str) -> Result<bool> {
702        let existed = self.store.shred_collection(name)?;
703        self.collections.remove(name);
704        Ok(existed)
705    }
706
707    /// Install a replication commit observer, invoked with each committed
708    /// [`WalEntry`] in commit order (ADR-0030). The server uses this to drive a
709    /// leader's replication stream.
710    pub fn set_commit_observer(&mut self, observer: CommitObserver) {
711        self.store.set_commit_observer(observer);
712    }
713
714    /// The operations that recreate the current logical state, for a replication
715    /// follower to bootstrap from (ADR-0030).
716    ///
717    /// # Errors
718    /// Propagates a store read error.
719    pub fn replication_snapshot(&self) -> Result<Vec<WalOp>> {
720        Ok(self.store.replication_snapshot()?)
721    }
722
723    /// Apply a replicated operation from a leader (ADR-0030): persist and apply it
724    /// to the store (preserving the leader's collection id), then reconcile the
725    /// in-memory index handles — register a new collection, drop a removed one, or
726    /// mark a touched collection's index stale so the next read rebuilds from the
727    /// replicated state.
728    ///
729    /// # Errors
730    /// Propagates a store apply error.
731    pub fn apply_replicated(&mut self, op: WalOp) -> Result<()> {
732        let target = match &op {
733            WalOp::CreateCollection { collection_id, .. }
734            | WalOp::DropCollection { collection_id }
735            | WalOp::Upsert { collection_id, .. }
736            | WalOp::Delete { collection_id, .. } => Some(*collection_id),
737            WalOp::Checkpoint { .. } => None,
738        };
739        let create_name = match &op {
740            WalOp::CreateCollection { name, .. } => Some(name.clone()),
741            _ => None,
742        };
743        let is_drop = matches!(op, WalOp::DropCollection { .. });
744        self.store.apply_replicated(op)?;
745
746        if let Some(name) = create_name {
747            // Register a fresh handle for the newly replicated collection.
748            if let Some(id) = target
749                && let Some(descriptor) = self.store.descriptor(id).cloned()
750            {
751                let docs = descriptor.multivector.then(BTreeMap::new);
752                let index = empty_index(&descriptor);
753                let snapshot = empty_snapshot(&descriptor);
754                let mvcc = self.mvcc;
755                // Replicated writes mark the handle stale, so the next read rebuilds
756                // the inverted index from the replicated store.
757                self.collections.insert(
758                    name,
759                    CollectionHandle {
760                        id,
761                        descriptor,
762                        index,
763                        int_to_ext: Vec::new(),
764                        ext_to_int: HashMap::new(),
765                        stale: false,
766                        write_gen: 0,
767                        docs,
768                        sparse: None,
769                        mvcc,
770                        snapshot,
771                    },
772                );
773            }
774        } else if is_drop {
775            if let Some(id) = target {
776                self.collections.retain(|_, h| h.id != id);
777            }
778        } else if let Some(id) = target
779            && let Some(handle) = self.collections.values_mut().find(|h| h.id == id)
780        {
781            mark_stale(handle);
782        }
783        Ok(())
784    }
785
786    /// Names of all collections, sorted.
787    #[must_use]
788    pub fn collection_names(&self) -> Vec<String> {
789        self.store.collection_names()
790    }
791
792    /// The descriptor of a collection, if it exists.
793    #[must_use]
794    pub fn descriptor(&self, name: &str) -> Option<&Descriptor> {
795        self.collections.get(name).map(|h| &h.descriptor)
796    }
797
798    /// Number of live points in a collection.
799    pub fn len(&self, name: &str) -> Result<usize> {
800        let handle = self.handle(name)?;
801        Ok(self.store.len(handle.id)?)
802    }
803
804    /// Whether a collection has no points.
805    pub fn is_empty(&self, name: &str) -> Result<bool> {
806        Ok(self.len(name)? == 0)
807    }
808
809    /// Insert or replace a point with a JSON payload.
810    pub fn upsert(
811        &mut self,
812        collection: &str,
813        id: &str,
814        vector: &[f32],
815        payload: &Value,
816    ) -> Result<()> {
817        let handle = self
818            .collections
819            .get_mut(collection)
820            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
821        require_single_vector(handle)?;
822        let payload_bytes = serde_json::to_vec(payload)?;
823        self.store.upsert(handle.id, id, vector, &payload_bytes)?;
824        // Client-side-encrypted collections have no server-side index to maintain
825        // (ADR-0032): the stored vector is an opaque placeholder the server never
826        // ranks. The point is durable in the store; nothing else to do.
827        if handle.descriptor.vector_encryption == VectorEncryption::ClientSide {
828            return Ok(());
829        }
830        // Maintain the in-memory index in place where the kind allows it, else
831        // defer to a lazy rebuild on the next search (ADR-0023/0026/0033).
832        index_upsert_point(handle, id, vector)?;
833        // Keep the derived sparse inverted index in step (ADR-0045).
834        sparse_index_upsert_point(handle, id, payload);
835        Ok(())
836    }
837
838    /// Upsert a batch of points with a single WAL `fdatasync` (ADR-0038).
839    ///
840    /// `points` is `(id, vector, payload)` tuples.  The batch is committed
841    /// atomically — all points or none (from the client's perspective).  This
842    /// is the preferred path for the REST `POST /v1/collections/{c}/points`
843    /// handler which already delivers a batch per HTTP request.
844    pub fn upsert_batch(
845        &mut self,
846        collection: &str,
847        points: &[(&str, &[f32], &serde_json::Value)],
848    ) -> Result<u64> {
849        let handle = self
850            .collections
851            .get(collection)
852            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
853        require_single_vector(handle)?;
854        let coll_id = handle.id;
855        let is_client_side = handle.descriptor.vector_encryption == VectorEncryption::ClientSide;
856
857        let payload_bytes: Vec<Vec<u8>> = points
858            .iter()
859            .map(|(_, _, p)| serde_json::to_vec(p).map_err(Error::Json))
860            .collect::<Result<_>>()?;
861
862        let records: Vec<(&str, &[f32], &[u8])> = points
863            .iter()
864            .zip(payload_bytes.iter())
865            .map(|((id, vec, _), p)| (*id, *vec, p.as_slice()))
866            .collect();
867
868        self.store.upsert_batch(coll_id, &records)?;
869
870        if is_client_side {
871            return Ok(records.len() as u64);
872        }
873
874        for (id, vector, payload) in points {
875            let handle = self
876                .collections
877                .get_mut(collection)
878                .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
879            index_upsert_point(handle, id, vector)?;
880            sparse_index_upsert_point(handle, id, payload);
881        }
882        Ok(records.len() as u64)
883    }
884
885    /// Upsert a large batch for a bulk load, deferring all index work to a single
886    /// rebuild pass (ADR-0045).
887    ///
888    /// Like [`upsert_batch`](Self::upsert_batch) the points are committed with one
889    /// WAL `fdatasync`, but instead of folding each point into the in-memory index
890    /// one at a time, the collection's index is marked **stale** so the next search
891    /// rebuilds it in a single pass over the whole collection — far cheaper for a
892    /// fresh load (one k-means for IVF, one graph build for Vamana, one inverted-index
893    /// scan) than N incremental inserts. Prefer `upsert_batch` for steady-state
894    /// writes where query-after-write latency matters.
895    pub fn upsert_bulk(
896        &mut self,
897        collection: &str,
898        points: &[(&str, &[f32], &serde_json::Value)],
899    ) -> Result<u64> {
900        let handle = self
901            .collections
902            .get_mut(collection)
903            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
904        require_single_vector(handle)?;
905        let coll_id = handle.id;
906
907        let payload_bytes: Vec<Vec<u8>> = points
908            .iter()
909            .map(|(_, _, p)| serde_json::to_vec(p).map_err(Error::Json))
910            .collect::<Result<_>>()?;
911        let records: Vec<(&str, &[f32], &[u8])> = points
912            .iter()
913            .zip(payload_bytes.iter())
914            .map(|((id, vec, _), p)| (*id, *vec, p.as_slice()))
915            .collect();
916
917        self.store.upsert_batch(coll_id, &records)?;
918
919        // Defer the dense and sparse index maintenance to a single rebuild on the
920        // next read (a client-side-encrypted collection has no index to rebuild, but
921        // marking it stale is harmless — its rebuild produces the same no-op index).
922        let handle = self
923            .collections
924            .get_mut(collection)
925            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
926        mark_stale(handle);
927        Ok(records.len() as u64)
928    }
929
930    /// Delete a point by id. Returns whether it existed.
931    pub fn delete(&mut self, collection: &str, id: &str) -> Result<bool> {
932        let handle = self
933            .collections
934            .get_mut(collection)
935            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
936        require_single_vector(handle)?;
937        let existed = self.store.delete(handle.id, id)?;
938        if !existed {
939            return Ok(false);
940        }
941        // No server-side index to update for client-side-encrypted collections
942        // (ADR-0032); the store delete is authoritative.
943        if handle.descriptor.vector_encryption == VectorEncryption::ClientSide {
944            return Ok(true);
945        }
946        // A built IVF removes in place (ADR-0023), a built HNSW soft-deletes
947        // (ADR-0026), and a built FreshDiskANN graph tombstones in its deletion set
948        // (ADR-0033); other kinds defer to a rebuild. The id->internal mapping is
949        // kept so a later re-insert allocates afresh — a removed or soft-deleted
950        // internal is simply never returned by the index.
951        index_delete_point(handle, id);
952        // Drop the point from the derived sparse inverted index too (ADR-0045).
953        sparse_index_delete_point(handle, id);
954        Ok(true)
955    }
956
957    /// Fetch a single point by id, with its payload and vector.
958    pub fn get(&self, collection: &str, id: &str) -> Result<Option<Match>> {
959        let handle = self.handle(collection)?;
960        require_single_vector(handle)?;
961        match self.store.get(handle.id, id)? {
962            Some(record) => Ok(Some(Match {
963                id: id.to_owned(),
964                score: 0.0,
965                payload: Some(serde_json::from_slice(&record.payload)?),
966                vector: Some(record.vector),
967            })),
968            None => Ok(None),
969        }
970    }
971
972    /// Fetch points without ranking — an optional cleartext payload `filter`
973    /// narrows the set and `limit` bounds it. This is the retrieval path for a
974    /// client-side-encrypted collection (ADR-0032): the server returns the entitled
975    /// set (each point's payload carries the sealed vector blob under the reserved
976    /// `__quiver_vec__` key) and the client decrypts and ranks locally. It also
977    /// serves as a general "list points" primitive for any single-vector collection.
978    ///
979    /// Results come in the store's scan order, not by relevance; the filter is
980    /// re-checked exactly against each candidate (a selective filter could use the
981    /// secondary index in future — today it scans).
982    ///
983    /// # Errors
984    /// Errors if the collection does not exist or is multi-vector.
985    pub fn fetch(
986        &self,
987        collection: &str,
988        filter: Option<&Filter>,
989        offset: usize,
990        limit: usize,
991        with_payload: bool,
992        with_vector: bool,
993    ) -> Result<Vec<Match>> {
994        let handle = self.handle(collection)?;
995        require_single_vector(handle)?;
996        let mut out = Vec::new();
997        // `offset` skips matching points already returned by a prior page, so a caller
998        // can scroll the whole collection in `limit`-sized pages (the scan order is
999        // stable for a quiescent collection). Applied after the filter so offset
1000        // counts matches, not raw rows.
1001        let mut skipped = 0usize;
1002        for (id, record) in self.store.scan(handle.id)? {
1003            if out.len() >= limit {
1004                break;
1005            }
1006            let payload: Value = serde_json::from_slice(&record.payload)?;
1007            if let Some(filter) = filter
1008                && !filter.matches(&payload)
1009            {
1010                continue;
1011            }
1012            if skipped < offset {
1013                skipped += 1;
1014                continue;
1015            }
1016            out.push(Match {
1017                id,
1018                score: 0.0,
1019                payload: with_payload.then_some(payload),
1020                vector: with_vector.then_some(record.vector),
1021            });
1022        }
1023        Ok(out)
1024    }
1025
1026    /// Rebuild a collection's index if a prior write deferred it (the `stale`
1027    /// flag), making the collection's read snapshot current. Idempotent and cheap
1028    /// when already fresh. Separating this `&mut self` maintenance from the `&self`
1029    /// `*_snapshot` reads is what lets a server serve concurrent reads behind a
1030    /// shared lock and take the exclusive lock only for the rare rebuild (ADR-0057).
1031    pub fn ensure_indexed(&mut self, collection: &str) -> Result<()> {
1032        if self.handle(collection)?.stale {
1033            let store = &self.store;
1034            let handle = self
1035                .collections
1036                .get_mut(collection)
1037                .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
1038            rebuild_index(store, handle)?;
1039            // MVCC mode: publish the freshly rebuilt base as the new lock-free
1040            // snapshot (an empty overlay; the base now absorbs all prior writes).
1041            // The sync rebuild holds `&mut self`, so no write races it.
1042            if mvcc_served(handle) {
1043                publish_base(handle);
1044            }
1045        }
1046        Ok(())
1047    }
1048
1049    /// Enable or disable lock-free MVCC reads (ADR-0064) at runtime — the server
1050    /// sets this from `QUIVER_MVCC_READS`. Default off: the proven RwLock read path
1051    /// stays the default until MVCC is loom- and benchmark-validated (increments
1052    /// 2–3). Applies to every loaded collection; a collection's first rebuild after
1053    /// enabling publishes its base snapshot.
1054    pub fn set_mvcc_reads(&mut self, on: bool) {
1055        self.mvcc = on;
1056        for handle in self.collections.values_mut() {
1057            handle.mvcc = on;
1058            // A toggle either direction forces a rebuild on the next read of an
1059            // eligible collection: turning on publishes the base snapshot; turning
1060            // off repopulates the in-place `handle.index` that MVCC mode left empty.
1061            if mvcc_eligible(&handle.descriptor) {
1062                handle.stale = true;
1063            }
1064        }
1065    }
1066
1067    /// Whether lock-free MVCC reads are enabled (ADR-0064).
1068    #[must_use]
1069    pub fn mvcc_reads(&self) -> bool {
1070        self.mvcc
1071    }
1072
1073    /// The lock-free serving snapshot cell for a collection (ADR-0064), for a
1074    /// reader to `load()` without taking any lock — the basis of reads that proceed
1075    /// during writes. Only republished for an MVCC-served collection (single-vector,
1076    /// server-searchable, in-memory index, with MVCC enabled); otherwise it stays
1077    /// the initial empty snapshot and callers use the locked [`Database::search`]
1078    /// path. The cell outlives `&self`, so a reader can hold and re-`load` it.
1079    ///
1080    /// # Errors
1081    /// Returns [`Error::CollectionNotFound`] if the collection is not loaded.
1082    pub fn collection_snapshot(&self, collection: &str) -> Result<SnapshotCell> {
1083        Ok(self.handle(collection)?.snapshot.clone())
1084    }
1085
1086    /// The lock-free serving snapshot cell **only** for a collection that is
1087    /// currently MVCC-served (the flag is on and the collection is single-vector,
1088    /// server-searchable, and in-memory); otherwise `None`. A server caches the
1089    /// returned cell once and `load()`s it to serve pure-vector reads with no lock
1090    /// (ADR-0064 increment 3) — the cell self-updates as the writer republishes, so
1091    /// it never needs re-fetching under the lock.
1092    ///
1093    /// # Errors
1094    /// Returns [`Error::CollectionNotFound`] if the collection is not loaded.
1095    pub fn mvcc_cell(&self, collection: &str) -> Result<Option<SnapshotCell>> {
1096        let handle = self.handle(collection)?;
1097        Ok(mvcc_served(handle).then(|| handle.snapshot.clone()))
1098    }
1099
1100    /// Whether a collection's index is stale — a prior write deferred its rebuild.
1101    /// The server reads this to schedule an **off-lock** rebuild (ADR-0062) without
1102    /// holding the exclusive lock; embedded callers never need it (the `&mut self`
1103    /// searches rebuild synchronously via [`Database::ensure_indexed`]).
1104    pub fn needs_rebuild(&self, collection: &str) -> Result<bool> {
1105        Ok(self.handle(collection)?.stale)
1106    }
1107
1108    /// Capture everything an off-lock rebuild needs (ADR-0062): under the shared
1109    /// read lock the caller already holds, scan the collection's live rows and
1110    /// record the write generation. Returns `None` when the index is already fresh
1111    /// (nothing to rebuild). The expensive build then runs with **no lock** via
1112    /// [`RebuildInputs::build`], and [`Database::commit_rebuild`] installs it.
1113    pub fn snapshot_rebuild_inputs(&self, collection: &str) -> Result<Option<RebuildInputs>> {
1114        let handle = self.handle(collection)?;
1115        if !handle.stale {
1116            return Ok(None);
1117        }
1118        let scan = scan_collection(&self.store, handle)?;
1119        Ok(Some(RebuildInputs {
1120            collection: collection.to_owned(),
1121            descriptor: handle.descriptor.clone(),
1122            scan,
1123            write_gen: handle.write_gen,
1124        }))
1125    }
1126
1127    /// Install an index built off-lock (ADR-0062) under the brief exclusive lock the
1128    /// caller holds. Returns whether the collection is **still** stale: if a write
1129    /// landed during the build (the write generation advanced), the fresh index is
1130    /// already behind, so it is installed (still newer than the prior snapshot) but
1131    /// the handle stays stale for the next rebuild. A collection dropped or replaced
1132    /// during the build is ignored (`Ok(false)`) — the build is discarded.
1133    pub fn commit_rebuild(&mut self, rebuilt: RebuiltIndex) -> Result<bool> {
1134        let store = &self.store;
1135        let Some(handle) = self.collections.get_mut(&rebuilt.collection) else {
1136            return Ok(false);
1137        };
1138        match rebuilt.kind {
1139            RebuiltKind::Ready(index) => handle.index = *index,
1140            RebuiltKind::Disk { graph, pq } => {
1141                // Drop the prior index before sealing the artifact in place: its
1142                // `mmap` assumes an immutable file. Safe under the write lock — no
1143                // read can observe the momentary empty index.
1144                handle.index = empty_index(&handle.descriptor);
1145                let disk = write_disk_index(store, handle.id, &graph, &pq)?;
1146                handle.index = CollectionIndex::Disk(Some(FreshDiskVamana::new(disk)?));
1147            }
1148        }
1149        handle.int_to_ext = rebuilt.int_to_ext;
1150        handle.ext_to_int = rebuilt.ext_to_int;
1151        handle.docs = rebuilt.docs;
1152        handle.sparse = rebuilt.sparse;
1153        // Clear stale only if no write landed since the inputs were captured;
1154        // otherwise leave it set so the driver rebuilds again for the newer write.
1155        let still_stale = handle.write_gen != rebuilt.write_gen;
1156        handle.stale = still_stale;
1157        // MVCC mode: publish the new base as the lock-free snapshot with an empty
1158        // overlay. If a write landed during the build (`still_stale`), it is briefly
1159        // invisible to snapshot reads until the next rebuild folds it in — the same
1160        // eventual-consistency window the locked off-lock path already serves
1161        // (ADR-0062), with no double-count (the base is the sole source). The
1162        // single-writer's exclusive lock makes the swap race-free.
1163        if mvcc_served(handle) {
1164            publish_base(handle);
1165        }
1166        Ok(still_stale)
1167    }
1168
1169    /// Search a collection for the nearest points to `query`, optionally
1170    /// post-filtered by payload predicate.
1171    pub fn search(
1172        &mut self,
1173        collection: &str,
1174        query: &[f32],
1175        params: &SearchParams,
1176    ) -> Result<Vec<Match>> {
1177        // Embedded read-your-writes: rebuild a deferred index first (synchronously),
1178        // then read the now-fresh snapshot. The server instead serves the prior
1179        // snapshot and rebuilds off-lock (ADR-0062), so concurrent readers never
1180        // block on a writer's rebuild.
1181        self.ensure_indexed(collection)?;
1182        self.search_snapshot(collection, query, params)
1183    }
1184
1185    /// Search a collection's **current immutable snapshot** for the nearest points
1186    /// to `query`, optionally post-filtered by payload predicate. Takes `&self`, so
1187    /// many readers run concurrently. When a prior write deferred this collection's
1188    /// rebuild, it serves the **prior** snapshot (a snapshot-isolated, slightly stale
1189    /// read — ADR-0062/0053); the caller schedules a rebuild via
1190    /// [`Database::needs_rebuild`].
1191    pub fn search_snapshot(
1192        &self,
1193        collection: &str,
1194        query: &[f32],
1195        params: &SearchParams,
1196    ) -> Result<Vec<Match>> {
1197        require_single_vector(self.handle(collection)?)?;
1198        require_server_searchable(self.handle(collection)?)?;
1199
1200        let handle = self.handle(collection)?;
1201
1202        // MVCC mode (ADR-0064): the immutable base index lives in the published
1203        // snapshot, not `handle.index`. Serve the read from the snapshot — the
1204        // base-index search merged lock-free with the overlay of recent writes —
1205        // reusing the same pre-filter planning and store-fetch enrichment as the
1206        // locked path; only the dense candidate source differs.
1207        if mvcc_served(handle) {
1208            return self.search_snapshot_mvcc(handle, query, params);
1209        }
1210
1211        // Hybrid planning: if the filter narrows to a small, secondary-indexed
1212        // candidate set, scan those rows exactly instead of post-filtering ANN
1213        // hits — exact, and immune to the filtered-ANN recall cliff.
1214        if let Some(filter) = &params.filter
1215            && let Some(candidates) = candidate_ids(
1216                &self.store,
1217                handle.id,
1218                filter,
1219                &handle.descriptor.filterable,
1220            )?
1221            && candidates.len() <= FULL_SCAN_THRESHOLD
1222        {
1223            return self.exact_filtered_search(
1224                handle.id,
1225                &handle.descriptor,
1226                query,
1227                params,
1228                filter,
1229                &candidates,
1230            );
1231        }
1232
1233        let fetch = if params.filter.is_some() {
1234            params
1235                .k
1236                .saturating_mul(FILTER_OVERFETCH)
1237                .max(params.ef_search)
1238        } else {
1239            params.k
1240        };
1241        let raw = handle.index.search(query, fetch, params.ef_search)?;
1242
1243        let need_record = params.filter.is_some() || params.with_payload || params.with_vector;
1244        let mut out = Vec::with_capacity(params.k);
1245        for neighbor in raw {
1246            if out.len() >= params.k {
1247                break;
1248            }
1249            let Some(ext_id) = handle.int_to_ext.get(neighbor.id as usize) else {
1250                continue;
1251            };
1252            let record = if need_record {
1253                self.store.get(handle.id, ext_id)?
1254            } else {
1255                None
1256            };
1257            let payload_value: Option<Value> = match &record {
1258                Some(r) if params.filter.is_some() || params.with_payload => {
1259                    Some(serde_json::from_slice(&r.payload)?)
1260                }
1261                _ => None,
1262            };
1263            if let Some(filter) = &params.filter {
1264                let value = payload_value.as_ref().unwrap_or(&Value::Null);
1265                if !filter.matches(value) {
1266                    continue;
1267                }
1268            }
1269            out.push(Match {
1270                id: ext_id.clone(),
1271                score: neighbor.distance,
1272                payload: if params.with_payload {
1273                    payload_value
1274                } else {
1275                    None
1276                },
1277                vector: if params.with_vector {
1278                    record.map(|r| r.vector)
1279                } else {
1280                    None
1281                },
1282            });
1283        }
1284        Ok(out)
1285    }
1286
1287    // Serve `search_snapshot` for an MVCC-served collection (ADR-0064 increment 2):
1288    // the dense candidates come from the lock-free snapshot (base ⊕ overlay), and
1289    // everything else — the exact small-candidate pre-filter, post-filter, and
1290    // payload/vector enrichment — reuses the same store-only logic as the locked
1291    // path, so filtered and unfiltered reads are both exact.
1292    fn search_snapshot_mvcc(
1293        &self,
1294        handle: &CollectionHandle,
1295        query: &[f32],
1296        params: &SearchParams,
1297    ) -> Result<Vec<Match>> {
1298        // Exact pre-filter for a small, secondary-indexed candidate set (store-only,
1299        // index-independent — identical to the locked path).
1300        if let Some(filter) = &params.filter
1301            && let Some(candidates) = candidate_ids(
1302                &self.store,
1303                handle.id,
1304                filter,
1305                &handle.descriptor.filterable,
1306            )?
1307            && candidates.len() <= FULL_SCAN_THRESHOLD
1308        {
1309            return self.exact_filtered_search(
1310                handle.id,
1311                &handle.descriptor,
1312                query,
1313                params,
1314                filter,
1315                &candidates,
1316            );
1317        }
1318
1319        // Otherwise: dense candidates from the lock-free snapshot (overfetched when
1320        // filtering, to survive the post-filter), then post-filter + enrich by a
1321        // store fetch on the result ids.
1322        let fetch = if params.filter.is_some() {
1323            params
1324                .k
1325                .saturating_mul(FILTER_OVERFETCH)
1326                .max(params.ef_search)
1327        } else {
1328            params.k
1329        };
1330        let dense = handle
1331            .snapshot
1332            .load()
1333            .search(query, fetch, params.ef_search)?;
1334        let need_record = params.filter.is_some() || params.with_payload || params.with_vector;
1335        let mut out = Vec::with_capacity(params.k);
1336        for m in dense {
1337            if out.len() >= params.k {
1338                break;
1339            }
1340            let record = if need_record {
1341                self.store.get(handle.id, &m.id)?
1342            } else {
1343                None
1344            };
1345            let payload_value: Option<Value> = match &record {
1346                Some(r) if params.filter.is_some() || params.with_payload => {
1347                    Some(serde_json::from_slice(&r.payload)?)
1348                }
1349                _ => None,
1350            };
1351            if let Some(filter) = &params.filter
1352                && !filter.matches(payload_value.as_ref().unwrap_or(&Value::Null))
1353            {
1354                continue;
1355            }
1356            out.push(Match {
1357                id: m.id,
1358                score: m.score,
1359                payload: if params.with_payload {
1360                    payload_value
1361                } else {
1362                    None
1363                },
1364                vector: if params.with_vector {
1365                    record.map(|r| r.vector)
1366                } else {
1367                    None
1368                },
1369            });
1370        }
1371        Ok(out)
1372    }
1373
1374    /// Hybrid search (ADR-0043/0046): fuse up to three rankings with Reciprocal
1375    /// Rank Fusion — a dense ANN ranking, a sparse inverted-index dot-product
1376    /// ranking (`sparse_query`), and a BM25 full-text ranking (`text_query`, scored
1377    /// over the same inverted index). Any may be `None`; at least one is required,
1378    /// giving pure dense / sparse / lexical or any blend through the same path. The
1379    /// same payload `filter` is re-checked on every side, so results stay exact.
1380    /// `rrf_k0` is the RRF rank-bias constant ([`DEFAULT_RRF_K0`]).
1381    pub fn hybrid_search(
1382        &mut self,
1383        collection: &str,
1384        dense_query: Option<&[f32]>,
1385        sparse_query: Option<&SparseVector>,
1386        text_query: Option<&str>,
1387        params: &SearchParams,
1388        rrf_k0: f32,
1389    ) -> Result<Vec<Match>> {
1390        // Embedded read-your-writes: rebuild a deferred index synchronously, then
1391        // read the fresh snapshot (the server serves the prior snapshot and rebuilds
1392        // off-lock — ADR-0062).
1393        self.ensure_indexed(collection)?;
1394        self.hybrid_search_snapshot(
1395            collection,
1396            dense_query,
1397            sparse_query,
1398            text_query,
1399            params,
1400            rrf_k0,
1401        )
1402    }
1403
1404    /// Hybrid search over the collection's current immutable snapshot (`&self`, so
1405    /// readers run concurrently). When a prior write deferred the rebuild, it serves
1406    /// the **prior** snapshot (snapshot-isolated, slightly stale — ADR-0062/0053);
1407    /// the caller schedules a rebuild via [`Database::needs_rebuild`].
1408    pub fn hybrid_search_snapshot(
1409        &self,
1410        collection: &str,
1411        dense_query: Option<&[f32]>,
1412        sparse_query: Option<&SparseVector>,
1413        text_query: Option<&str>,
1414        params: &SearchParams,
1415        rrf_k0: f32,
1416    ) -> Result<Vec<Match>> {
1417        require_single_vector(self.handle(collection)?)?;
1418        require_server_searchable(self.handle(collection)?)?;
1419        if dense_query.is_none() && sparse_query.is_none() && text_query.is_none() {
1420            return Err(Error::Unsupported(
1421                "hybrid_search requires a dense query, a sparse query, or a text query",
1422            ));
1423        }
1424        let handle = self.handle(collection)?;
1425
1426        // Pull a deep-enough candidate list from each side so the fusion is
1427        // meaningful, then RRF down to `k`.
1428        let depth = params
1429            .k
1430            .saturating_mul(RRF_CANDIDATE_FACTOR)
1431            .max(MIN_RRF_CANDIDATES);
1432        let filter = params.filter.as_ref();
1433        let mut lists: Vec<Vec<String>> = Vec::new();
1434        if let Some(q) = dense_query {
1435            lists.push(self.dense_ranked_ids(handle, q, depth, params.ef_search, filter)?);
1436        }
1437        if let Some(sp) = sparse_query {
1438            lists.push(self.sparse_ranked_ids(handle, sp, depth, filter)?);
1439        }
1440        if let Some(text) = text_query {
1441            lists.push(self.bm25_ranked_ids(handle, text, depth, filter)?);
1442        }
1443        let fused = rrf_fuse(&lists, rrf_k0, params.k);
1444
1445        let mut out = Vec::with_capacity(fused.len());
1446        for (ext_id, score) in fused {
1447            let record = if params.with_payload || params.with_vector {
1448                self.store.get(handle.id, &ext_id)?
1449            } else {
1450                None
1451            };
1452            let payload = match (&record, params.with_payload) {
1453                (Some(r), true) => Some(serde_json::from_slice(&r.payload)?),
1454                _ => None,
1455            };
1456            out.push(Match {
1457                id: ext_id,
1458                score,
1459                payload,
1460                vector: if params.with_vector {
1461                    record.map(|r| r.vector)
1462                } else {
1463                    None
1464                },
1465            });
1466        }
1467        Ok(out)
1468    }
1469
1470    // Dense candidates as a ranked list of external ids (the filter re-checked),
1471    // for hybrid fusion.
1472    fn dense_ranked_ids(
1473        &self,
1474        handle: &CollectionHandle,
1475        query: &[f32],
1476        depth: usize,
1477        ef_search: usize,
1478        filter: Option<&Filter>,
1479    ) -> Result<Vec<String>> {
1480        let mut ids = Vec::new();
1481        // MVCC mode (ADR-0064): the dense candidates come from the lock-free
1482        // snapshot (base ⊕ overlay), with ext ids already resolved; the sparse/BM25
1483        // sides and the filter re-check are unchanged.
1484        if mvcc_served(handle) {
1485            for m in handle
1486                .snapshot
1487                .load()
1488                .search(query, depth, ef_search.max(depth))?
1489            {
1490                if !self.passes_filter(handle.id, &m.id, filter)? {
1491                    continue;
1492                }
1493                ids.push(m.id);
1494                if ids.len() >= depth {
1495                    break;
1496                }
1497            }
1498            return Ok(ids);
1499        }
1500        let raw = handle.index.search(query, depth, ef_search.max(depth))?;
1501        for neighbor in raw {
1502            let Some(ext_id) = handle.int_to_ext.get(neighbor.id as usize) else {
1503                continue;
1504            };
1505            if !self.passes_filter(handle.id, ext_id, filter)? {
1506                continue;
1507            }
1508            ids.push(ext_id.clone());
1509            if ids.len() >= depth {
1510                break;
1511            }
1512        }
1513        Ok(ids)
1514    }
1515
1516    // Sparse candidates as a ranked list of external ids. With the derived inverted
1517    // index present (the common case), score only the query's nonzero dimensions via
1518    // the posting lists, then re-check the filter on the ranked ids until `depth` are
1519    // filled — so low-scored rows never load a payload (ADR-0045). When the index is
1520    // absent (a not-yet-rebuilt or client-side collection), fall back to the full
1521    // store scan, which stays correct under the incremental upsert/delete path.
1522    fn sparse_ranked_ids(
1523        &self,
1524        handle: &CollectionHandle,
1525        query: &SparseVector,
1526        depth: usize,
1527        filter: Option<&Filter>,
1528    ) -> Result<Vec<String>> {
1529        if let Some(idx) = handle.sparse.as_ref() {
1530            let mut ids = Vec::new();
1531            for (ext_id, _score) in idx.search(query) {
1532                if !self.passes_filter(handle.id, &ext_id, filter)? {
1533                    continue;
1534                }
1535                ids.push(ext_id);
1536                if ids.len() >= depth {
1537                    break;
1538                }
1539            }
1540            return Ok(ids);
1541        }
1542        self.sparse_ranked_ids_by_scan(handle.id, query, depth, filter)
1543    }
1544
1545    // The store-scan fallback for [`sparse_ranked_ids`]: load every row, score its
1546    // `__quiver_sparse__` vector by dot product against the query, re-check the
1547    // filter, and return the top `depth`. O(N-rows), but correct without an index.
1548    fn sparse_ranked_ids_by_scan(
1549        &self,
1550        cid: CollectionId,
1551        query: &SparseVector,
1552        depth: usize,
1553        filter: Option<&Filter>,
1554    ) -> Result<Vec<String>> {
1555        let qmap: HashMap<u32, f32> = query
1556            .indices
1557            .iter()
1558            .copied()
1559            .zip(query.values.iter().copied())
1560            .collect();
1561        let mut scored: Vec<(f32, String)> = Vec::new();
1562        for (ext_id, record) in self.store.scan(cid)? {
1563            if record.payload.is_empty() {
1564                continue;
1565            }
1566            let Ok(value) = serde_json::from_slice::<Value>(&record.payload) else {
1567                continue;
1568            };
1569            if let Some(filter) = filter
1570                && !filter.matches(&value)
1571            {
1572                continue;
1573            }
1574            let Some(raw) = value.get(SPARSE_KEY) else {
1575                continue;
1576            };
1577            let Ok(sv) = serde_json::from_value::<SparseVector>(raw.clone()) else {
1578                continue;
1579            };
1580            let mut score = 0.0f32;
1581            for (dim, weight) in sv.indices.iter().zip(sv.values.iter()) {
1582                if let Some(qw) = qmap.get(dim) {
1583                    score += qw * weight;
1584                }
1585            }
1586            if score > 0.0 {
1587                scored.push((score, ext_id));
1588            }
1589        }
1590        scored.sort_by(|a, b| b.0.total_cmp(&a.0).then(a.1.cmp(&b.1)));
1591        Ok(scored.into_iter().take(depth).map(|(_, id)| id).collect())
1592    }
1593
1594    // BM25 full-text candidates as a ranked list of external ids (ADR-0046): tokenize
1595    // the query text into term ids and score them with BM25 over the derived inverted
1596    // index, re-checking the filter on the ranked ids until `depth` are filled. BM25
1597    // needs the index's corpus statistics, so when a collection has no inverted index
1598    // (a not-yet-rebuilt or client-side collection — neither reachable here, since
1599    // hybrid rebuilds a stale handle and rejects client-side) there is nothing to
1600    // score and the list is empty; any dense side still contributes.
1601    fn bm25_ranked_ids(
1602        &self,
1603        handle: &CollectionHandle,
1604        query_text: &str,
1605        depth: usize,
1606        filter: Option<&Filter>,
1607    ) -> Result<Vec<String>> {
1608        let Some(idx) = handle.sparse.as_ref() else {
1609            return Ok(Vec::new());
1610        };
1611        let terms = query_term_ids(query_text);
1612        let mut ids = Vec::new();
1613        for (ext_id, _score) in idx.bm25_search(&terms, BM25_K1, BM25_B) {
1614            if !self.passes_filter(handle.id, &ext_id, filter)? {
1615                continue;
1616            }
1617            ids.push(ext_id);
1618            if ids.len() >= depth {
1619                break;
1620            }
1621        }
1622        Ok(ids)
1623    }
1624
1625    // Re-check a payload filter against a row (loading its payload). `None` filter
1626    // always passes.
1627    fn passes_filter(
1628        &self,
1629        cid: CollectionId,
1630        ext_id: &str,
1631        filter: Option<&Filter>,
1632    ) -> Result<bool> {
1633        let Some(filter) = filter else {
1634            return Ok(true);
1635        };
1636        let value: Value = match self.store.get(cid, ext_id)? {
1637            Some(r) => serde_json::from_slice(&r.payload)?,
1638            None => Value::Null,
1639        };
1640        Ok(filter.matches(&value))
1641    }
1642
1643    // Exactly score `candidates` (a superset of the filter's matches that the
1644    // secondary indexes produced) against the query, re-check the full filter
1645    // for correctness, and return the top `k`. The pre-filter arm of the hybrid
1646    // planner: perfect recall over an already-narrowed set.
1647    fn exact_filtered_search(
1648        &self,
1649        cid: CollectionId,
1650        descriptor: &Descriptor,
1651        query: &[f32],
1652        params: &SearchParams,
1653        filter: &Filter,
1654        candidates: &BTreeSet<String>,
1655    ) -> Result<Vec<Match>> {
1656        let metric = to_index_metric(descriptor.metric);
1657        let mut scored: Vec<(f32, String, Value, Vec<f32>)> = Vec::new();
1658        for ext_id in candidates {
1659            let Some(record) = self.store.get(cid, ext_id)? else {
1660                continue;
1661            };
1662            let payload: Value = serde_json::from_slice(&record.payload)?;
1663            if !filter.matches(&payload) {
1664                continue;
1665            }
1666            let ordering = ordering_distance(metric, query, &record.vector);
1667            scored.push((ordering, ext_id.clone(), payload, record.vector));
1668        }
1669        scored.sort_by(|a, b| a.0.total_cmp(&b.0));
1670        scored.truncate(params.k);
1671        Ok(scored
1672            .into_iter()
1673            .map(|(ordering, id, payload, vector)| Match {
1674                id,
1675                score: report_metric(metric, ordering),
1676                payload: params.with_payload.then_some(payload),
1677                vector: params.with_vector.then_some(vector),
1678            })
1679            .collect())
1680    }
1681
1682    /// Insert or replace a multi-vector (late-interaction / ColBERT) document: its
1683    /// `vectors` are stored as a group of token rows and its `payload` once on the
1684    /// anchor token (ADR-0028). Re-upserting a document first removes the tokens a
1685    /// shorter version would leave behind, so the document is replaced cleanly.
1686    ///
1687    /// # Errors
1688    /// Errors if the collection is single-vector, the document has no vectors, a
1689    /// vector's dimensionality is wrong, or the id contains the reserved separator.
1690    pub fn upsert_document(
1691        &mut self,
1692        collection: &str,
1693        doc_id: &str,
1694        vectors: &[Vec<f32>],
1695        payload: &Value,
1696    ) -> Result<()> {
1697        let handle = self
1698            .collections
1699            .get_mut(collection)
1700            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
1701        require_multivector(handle)?;
1702        if doc_id.contains(DOC_TOKEN_SEP) {
1703            return Err(Error::Unsupported(
1704                "document id must not contain the reserved 0x1f separator",
1705            ));
1706        }
1707        if vectors.is_empty() {
1708            return Err(Error::Unsupported("a document needs at least one vector"));
1709        }
1710        let dim = handle.descriptor.dim as usize;
1711        if vectors.iter().any(|v| v.len() != dim) {
1712            return Err(Error::Unsupported(
1713                "every document vector must match the collection dimensionality",
1714            ));
1715        }
1716        // Remove only the trailing tokens a shorter re-upsert leaves behind; the
1717        // rest are overwritten by the upserts below.
1718        let previous = handle
1719            .docs
1720            .as_ref()
1721            .and_then(|d| d.get(doc_id))
1722            .copied()
1723            .unwrap_or(0) as usize;
1724        for j in vectors.len()..previous {
1725            self.store.delete(handle.id, &token_id(doc_id, j))?;
1726            // Tombstone the dropped token row in the ANN index too (ADR-0034).
1727            index_delete_point(handle, &token_id(doc_id, j));
1728        }
1729        let payload_bytes = serde_json::to_vec(payload)?;
1730        for (j, vector) in vectors.iter().enumerate() {
1731            // The payload is stored once, on the anchor token; the rest carry none.
1732            let bytes: &[u8] = if j == 0 {
1733                payload_bytes.as_slice()
1734            } else {
1735                &[]
1736            };
1737            self.store
1738                .upsert(handle.id, &token_id(doc_id, j), vector, bytes)?;
1739            // Fold the token row into the ANN index incrementally instead of
1740            // marking the whole collection stale (ADR-0034); the underlying index
1741            // consolidates by a rebuild past its own churn threshold.
1742            index_upsert_point(handle, &token_id(doc_id, j), vector)?;
1743        }
1744        if let Some(docs) = handle.docs.as_mut() {
1745            docs.insert(doc_id.to_owned(), vectors.len() as u32);
1746        }
1747        Ok(())
1748    }
1749
1750    /// Search a multi-vector collection by a set of query token vectors, ranking
1751    /// documents by MaxSim late interaction (ADR-0028). At or below the exact-scan
1752    /// threshold every document is scored exactly; above it, candidates are
1753    /// generated by nearest-neighbour search over the token pool (recall tuned by
1754    /// `ef_search`) and re-ranked exactly. An optional `filter` is applied to each
1755    /// document's payload, exactly. A document has no single vector, so `with_payload`
1756    /// returns the anchor payload and `with_vector` returns the token vectors.
1757    pub fn search_multi_vector(
1758        &mut self,
1759        collection: &str,
1760        query_tokens: &[Vec<f32>],
1761        params: &SearchParams,
1762    ) -> Result<Vec<DocumentMatch>> {
1763        // Embedded read-your-writes: rebuild a deferred index synchronously, then
1764        // read the fresh snapshot (the server serves the prior snapshot and rebuilds
1765        // off-lock — ADR-0062).
1766        self.ensure_indexed(collection)?;
1767        self.search_multi_vector_snapshot(collection, query_tokens, params)
1768    }
1769
1770    /// Multi-vector (late-interaction) search over the collection's current
1771    /// immutable snapshot (`&self`, so readers run concurrently). A small corpus is
1772    /// scored exactly; a large corpus draws candidates from the ANN index, serving
1773    /// the **prior** snapshot when a write deferred its rebuild (snapshot-isolated,
1774    /// slightly stale — ADR-0062/0053); the caller schedules the rebuild via
1775    /// [`Database::needs_rebuild`].
1776    pub fn search_multi_vector_snapshot(
1777        &self,
1778        collection: &str,
1779        query_tokens: &[Vec<f32>],
1780        params: &SearchParams,
1781    ) -> Result<Vec<DocumentMatch>> {
1782        require_multivector(self.handle(collection)?)?;
1783        let dim = self.handle(collection)?.descriptor.dim as usize;
1784        if query_tokens.is_empty() {
1785            return Ok(Vec::new());
1786        }
1787        if query_tokens.iter().any(|v| v.len() != dim) {
1788            return Err(Error::Unsupported(
1789                "every query token must match the collection dimensionality",
1790            ));
1791        }
1792
1793        let doc_count = self
1794            .handle(collection)?
1795            .docs
1796            .as_ref()
1797            .map_or(0, BTreeMap::len);
1798        let candidates: Vec<String> = if doc_count <= MULTIVECTOR_EXACT_DOC_THRESHOLD {
1799            // Exact: score every document. No ANN index needed.
1800            self.handle(collection)?
1801                .docs
1802                .as_ref()
1803                .map(|d| d.keys().cloned().collect())
1804                .unwrap_or_default()
1805        } else {
1806            // Large corpus: generate candidates from the token pool. A prior write
1807            // may have deferred the ANN index rebuild; serve the prior snapshot (the
1808            // server schedules an off-lock rebuild — ADR-0062).
1809            let handle = self.handle(collection)?;
1810            let per_token_k = params
1811                .k
1812                .saturating_mul(MULTIVECTOR_CANDIDATE_FACTOR)
1813                .max(params.ef_search);
1814            let mut set = BTreeSet::new();
1815            for token in query_tokens {
1816                for neighbor in handle.index.search(token, per_token_k, params.ef_search)? {
1817                    if let Some(ext) = handle.int_to_ext.get(neighbor.id as usize)
1818                        && let Some((doc, _)) = parse_token_id(ext)
1819                    {
1820                        set.insert(doc.to_owned());
1821                    }
1822                }
1823            }
1824            set.into_iter().collect()
1825        };
1826
1827        // Re-rank the candidate documents by exact MaxSim over all their tokens.
1828        let handle = self.handle(collection)?;
1829        let cid = handle.id;
1830        let metric = to_index_metric(handle.descriptor.metric);
1831        let mut scored: Vec<ScoredDocument> = Vec::new();
1832        for doc in &candidates {
1833            let count = handle
1834                .docs
1835                .as_ref()
1836                .and_then(|d| d.get(doc))
1837                .copied()
1838                .unwrap_or(0) as usize;
1839            let (tokens, payload) = self.gather_document(cid, doc, count)?;
1840            if tokens.is_empty() {
1841                continue;
1842            }
1843            if let Some(filter) = &params.filter {
1844                let value = payload.clone().unwrap_or(Value::Null);
1845                if !filter.matches(&value) {
1846                    continue;
1847                }
1848            }
1849            let score = max_sim(metric, query_tokens, &tokens);
1850            let vectors = params.with_vector.then_some(tokens);
1851            scored.push((score, doc.clone(), payload, vectors));
1852        }
1853        // Higher MaxSim first; ties broken by id for a deterministic order.
1854        scored.sort_by(|a, b| b.0.total_cmp(&a.0).then_with(|| a.1.cmp(&b.1)));
1855        scored.truncate(params.k);
1856        Ok(scored
1857            .into_iter()
1858            .map(|(score, id, payload, vectors)| DocumentMatch {
1859                id,
1860                score,
1861                payload: params.with_payload.then_some(payload).flatten(),
1862                vectors,
1863            })
1864            .collect())
1865    }
1866
1867    /// Fetch a multi-vector document by id: its anchor payload and, if
1868    /// `with_vectors`, its token vectors. `None` if the document does not exist.
1869    pub fn get_document(
1870        &self,
1871        collection: &str,
1872        doc_id: &str,
1873        with_vectors: bool,
1874    ) -> Result<Option<DocumentMatch>> {
1875        let handle = self.handle(collection)?;
1876        require_multivector(handle)?;
1877        let Some(&count) = handle.docs.as_ref().and_then(|d| d.get(doc_id)) else {
1878            return Ok(None);
1879        };
1880        let (tokens, payload) = self.gather_document(handle.id, doc_id, count as usize)?;
1881        if tokens.is_empty() {
1882            return Ok(None);
1883        }
1884        Ok(Some(DocumentMatch {
1885            id: doc_id.to_owned(),
1886            score: 0.0,
1887            payload,
1888            vectors: with_vectors.then_some(tokens),
1889        }))
1890    }
1891
1892    /// Delete a multi-vector document and all of its token rows. Returns whether it
1893    /// existed.
1894    pub fn delete_document(&mut self, collection: &str, doc_id: &str) -> Result<bool> {
1895        let handle = self
1896            .collections
1897            .get_mut(collection)
1898            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
1899        require_multivector(handle)?;
1900        let Some(count) = handle.docs.as_ref().and_then(|d| d.get(doc_id)).copied() else {
1901            return Ok(false);
1902        };
1903        for j in 0..count as usize {
1904            self.store.delete(handle.id, &token_id(doc_id, j))?;
1905            // Tombstone each token row in the ANN index incrementally (ADR-0034).
1906            index_delete_point(handle, &token_id(doc_id, j));
1907        }
1908        if let Some(docs) = handle.docs.as_mut() {
1909            docs.remove(doc_id);
1910        }
1911        Ok(true)
1912    }
1913
1914    /// The number of documents in a multi-vector collection. Errors if the
1915    /// collection is single-vector.
1916    pub fn document_count(&self, collection: &str) -> Result<usize> {
1917        let handle = self.handle(collection)?;
1918        require_multivector(handle)?;
1919        Ok(handle.docs.as_ref().map_or(0, BTreeMap::len))
1920    }
1921
1922    // Read a document's token vectors (in ordinal order) and its anchor payload
1923    // from the store. Missing token rows are skipped, so a torn document yields a
1924    // short token list the caller treats as empty.
1925    fn gather_document(
1926        &self,
1927        cid: CollectionId,
1928        doc_id: &str,
1929        count: usize,
1930    ) -> Result<(Vec<Vec<f32>>, Option<Value>)> {
1931        let mut tokens = Vec::with_capacity(count);
1932        let mut payload: Option<Value> = None;
1933        for j in 0..count {
1934            let Some(record) = self.store.get(cid, &token_id(doc_id, j))? else {
1935                continue;
1936            };
1937            if j == 0 && !record.payload.is_empty() {
1938                payload = Some(serde_json::from_slice(&record.payload)?);
1939            }
1940            tokens.push(record.vector);
1941        }
1942        Ok((tokens, payload))
1943    }
1944
1945    /// Flush a durable checkpoint of all collections, capturing a durable
1946    /// snapshot of each built, up-to-date IVF index (ADR-0025) so it reloads on
1947    /// open instead of rebuilding. Other index kinds, and a stale or unbuilt IVF,
1948    /// are rebuilt on open.
1949    pub fn checkpoint(&mut self) -> Result<()> {
1950        let mut snapshots: HashMap<CollectionId, Vec<u8>> = HashMap::new();
1951        for handle in self.collections.values() {
1952            if handle.stale {
1953                continue;
1954            }
1955            if let CollectionIndex::Ivf(Some(ivf)) = &handle.index {
1956                if ivf.is_empty() {
1957                    continue;
1958                }
1959                let envelope = IndexEnvelope {
1960                    version: INDEX_ENVELOPE_VERSION,
1961                    int_to_ext: handle.int_to_ext.clone(),
1962                    ivf: ivf.snapshot()?,
1963                };
1964                snapshots.insert(handle.id, postcard::to_allocvec(&envelope)?);
1965            } else if let CollectionIndex::Disk(Some(fresh)) = &handle.index {
1966                // Durable DiskVamana (ADR-0063): the base file is already on disk
1967                // (sealed at build/consolidation); seal only the tiny tie-to-live
1968                // blob. The derived sparse index is not persisted (as for IVF); on
1969                // reopen the durable load leaves it `None` and hybrid search falls
1970                // back to the store scan until the next rebuild — correct, not fast.
1971                let envelope = DiskEnvelope {
1972                    version: INDEX_ENVELOPE_VERSION,
1973                    int_to_ext: handle.int_to_ext.clone(),
1974                    base_row_count: fresh.base_len() as u64,
1975                    deleted_ids: fresh.deleted_ids(),
1976                };
1977                snapshots.insert(handle.id, postcard::to_allocvec(&envelope)?);
1978            }
1979        }
1980        self.store.checkpoint_with_index_snapshots(&snapshots)?;
1981        Ok(())
1982    }
1983
1984    /// Compact every collection with reclaimable space, merging its sealed
1985    /// segments and dropping deleted/shadowed rows. Crash-safe; a no-op for
1986    /// collections with nothing to reclaim.
1987    pub fn compact(&mut self) -> Result<()> {
1988        Ok(self.store.compact()?)
1989    }
1990
1991    /// The manifest version — the catalog generation a snapshot captures
1992    /// (ADR-0050). Surfaced as snapshot-relevant status in `database_stats`.
1993    #[must_use]
1994    pub fn manifest_version(&self) -> u64 {
1995        self.store.manifest_version()
1996    }
1997
1998    /// Best-effort total on-disk size of the data directory, in bytes — what a
1999    /// full snapshot would copy (ADR-0050). Unreadable entries are skipped.
2000    #[must_use]
2001    pub fn disk_usage_bytes(&self) -> u64 {
2002        dir_size(self.store.dir())
2003    }
2004
2005    /// Take a consistent online snapshot of the whole database into `dest`
2006    /// (which must not already exist), returning what was captured (ADR-0050).
2007    ///
2008    /// The writer lock is held for the duration: `checkpoint` seals the active
2009    /// buffer into segments and advances the WAL floor to the head, then the
2010    /// data directory is byte-copied. Opening `dest` afterwards replays an empty
2011    /// WAL tail and reconstructs the database exactly as of this call.
2012    ///
2013    /// # Errors
2014    /// [`Error::Core`] if `dest` already exists, or on any I/O error during the
2015    /// checkpoint or the copy.
2016    pub fn snapshot(&mut self, dest: &Path) -> Result<SnapshotInfo> {
2017        if dest.exists() {
2018            return Err(Error::Core(quiver_core::CoreError::AlreadyExists(
2019                dest.display().to_string(),
2020            )));
2021        }
2022        // Consistency anchor: flush to segments and advance the WAL floor so the
2023        // copied tree opens with no replay (ADR-0050).
2024        self.checkpoint()?;
2025        let (files, bytes) = copy_tree(self.store.dir(), dest)?;
2026        // ponytail: flush the snapshot root's metadata only; a backup target is
2027        // re-takeable, so per-file fsync isn't warranted. Add it if snapshots
2028        // must survive an immediate post-copy crash.
2029        let _ = std::fs::File::open(dest).and_then(|f| f.sync_all());
2030        Ok(SnapshotInfo {
2031            manifest_version: self.store.manifest_version(),
2032            files,
2033            bytes,
2034        })
2035    }
2036
2037    fn handle(&self, name: &str) -> Result<&CollectionHandle> {
2038        self.collections
2039            .get(name)
2040            .ok_or_else(|| Error::CollectionNotFound(name.to_owned()))
2041    }
2042}
2043
2044/// Restore a snapshot directory `src` (produced by [`Database::snapshot`]) into a
2045/// fresh `dest` directory, leaving it ready for the caller to open with the same
2046/// keyring/codec the snapshot was written under (ADR-0050).
2047///
2048/// `dest` must not already exist — restore never overwrites a live data
2049/// directory. The real integrity check is the caller's subsequent open (which,
2050/// for an encrypted store, is the only party holding the key); this function
2051/// only verifies the source looks like a snapshot and copies it.
2052///
2053/// # Errors
2054/// [`Error::Core`] if `dest` exists, `src` is not a snapshot (no `CURRENT`), or
2055/// on any I/O error during the copy.
2056pub fn restore_snapshot(src: &Path, dest: &Path) -> Result<SnapshotInfo> {
2057    if dest.exists() {
2058        return Err(Error::Core(quiver_core::CoreError::AlreadyExists(
2059            dest.display().to_string(),
2060        )));
2061    }
2062    if !src.join("CURRENT").exists() {
2063        return Err(Error::Core(quiver_core::CoreError::InvalidArgument(
2064            format!("{} is not a snapshot (no CURRENT)", src.display()),
2065        )));
2066    }
2067    let (files, bytes) = copy_tree(src, dest)?;
2068    Ok(SnapshotInfo {
2069        // The restored directory's manifest version is whatever the snapshot
2070        // carried; report 0 since we don't re-open here to read it (the caller's
2071        // open is authoritative). `files`/`bytes` reflect the copy.
2072        manifest_version: 0,
2073        files,
2074        bytes,
2075    })
2076}
2077
2078// Recursively copy `src` into `dst`, returning `(files, bytes)`. I/O errors are
2079// tagged with the offending path. The data directory contains only files and
2080// directories (no symlinks), so a plain walk is complete.
2081fn copy_tree(src: &Path, dst: &Path) -> Result<(u64, u64)> {
2082    std::fs::create_dir_all(dst).map_err(|e| quiver_core::CoreError::io(dst, e))?;
2083    let mut files = 0u64;
2084    let mut bytes = 0u64;
2085    for entry in std::fs::read_dir(src).map_err(|e| quiver_core::CoreError::io(src, e))? {
2086        let entry = entry.map_err(|e| quiver_core::CoreError::io(src, e))?;
2087        let from = entry.path();
2088        let to = dst.join(entry.file_name());
2089        let ft = entry
2090            .file_type()
2091            .map_err(|e| quiver_core::CoreError::io(&from, e))?;
2092        if ft.is_dir() {
2093            let (f, b) = copy_tree(&from, &to)?;
2094            files += f;
2095            bytes += b;
2096        } else {
2097            let n = std::fs::copy(&from, &to).map_err(|e| quiver_core::CoreError::io(&from, e))?;
2098            files += 1;
2099            bytes += n;
2100        }
2101    }
2102    Ok((files, bytes))
2103}
2104
2105// Best-effort recursive on-disk size (bytes) of `dir`; unreadable entries are
2106// skipped so a stats read never fails on a transient error.
2107fn dir_size(dir: &Path) -> u64 {
2108    let mut total = 0u64;
2109    let Ok(rd) = std::fs::read_dir(dir) else {
2110        return total;
2111    };
2112    for entry in rd.flatten() {
2113        let Ok(ft) = entry.file_type() else { continue };
2114        if ft.is_dir() {
2115            total += dir_size(&entry.path());
2116        } else if let Ok(meta) = entry.metadata() {
2117            total += meta.len();
2118        }
2119    }
2120    total
2121}
2122
2123// The byte separating a multi-vector document id from a token ordinal in a token
2124// row's external id (`<doc-id><US><ordinal>`): the ASCII Unit Separator, which is
2125// disallowed in user document ids (ADR-0028).
2126const DOC_TOKEN_SEP: char = '\u{1f}';
2127
2128// At or below this document count a multi-vector search scores every document
2129// exactly; above it, nearest-neighbour candidate generation over the token pool
2130// kicks in (mirrors the single-vector planner's full-scan threshold).
2131const MULTIVECTOR_EXACT_DOC_THRESHOLD: usize = 10_000;
2132
2133// Per-query-token candidate breadth for the large-corpus path: each query token
2134// retrieves about `k × this` nearest token rows before the documents are unioned.
2135const MULTIVECTOR_CANDIDATE_FACTOR: usize = 4;
2136
2137// The external id of a multi-vector document's `ordinal`-th token row.
2138fn token_id(doc_id: &str, ordinal: usize) -> String {
2139    format!("{doc_id}{DOC_TOKEN_SEP}{ordinal}")
2140}
2141
2142// Split a token row's external id back into its document id and ordinal, or `None`
2143// if it is not a token id. Splits from the right, so a document id (which cannot
2144// contain the separator) is recovered intact.
2145fn parse_token_id(ext: &str) -> Option<(&str, u32)> {
2146    let (doc, ordinal) = ext.rsplit_once(DOC_TOKEN_SEP)?;
2147    Some((doc, ordinal.parse().ok()?))
2148}
2149
2150// Reject the single-vector API on a multi-vector collection.
2151fn require_single_vector(handle: &CollectionHandle) -> Result<()> {
2152    if handle.descriptor.multivector {
2153        Err(Error::Unsupported(
2154            "collection is multi-vector; use upsert_document / search_multi_vector",
2155        ))
2156    } else {
2157        Ok(())
2158    }
2159}
2160
2161// Reject the document API on a single-vector collection.
2162fn require_multivector(handle: &CollectionHandle) -> Result<()> {
2163    if handle.descriptor.multivector {
2164        Ok(())
2165    } else {
2166        Err(Error::Unsupported(
2167            "collection is single-vector; use upsert / search",
2168        ))
2169    }
2170}
2171
2172// Reject server-side ranked search on a client-side-encrypted collection: the
2173// server holds only opaque ciphertext and cannot rank it. The client fetches the
2174// entitled set (see `Database::fetch`) and ranks locally (ADR-0032).
2175fn require_server_searchable(handle: &CollectionHandle) -> Result<()> {
2176    if handle.descriptor.vector_encryption == VectorEncryption::ClientSide {
2177        Err(Error::Unsupported(
2178            "collection is client-side encrypted; the server cannot rank opaque vectors — \
2179             fetch points and rank client-side",
2180        ))
2181    } else {
2182        Ok(())
2183    }
2184}
2185
2186fn to_index_metric(metric: DistanceMetric) -> Metric {
2187    match metric {
2188        DistanceMetric::Dot => Metric::Dot,
2189        DistanceMetric::Cosine => Metric::Cosine,
2190        DistanceMetric::L2 => Metric::L2,
2191    }
2192}
2193
2194// Reject index/metric combinations the engine cannot serve.
2195fn validate_index(descriptor: &Descriptor) -> Result<()> {
2196    // Late interaction (MaxSim) is a similarity, so multi-vector collections need a
2197    // similarity metric.
2198    if descriptor.multivector && descriptor.metric == DistanceMetric::L2 {
2199        return Err(Error::Unsupported(
2200            "multi-vector collections require a similarity metric (cosine or dot)",
2201        ));
2202    }
2203    // Client-side opaque encryption (ADR-0032) is searched by the client, not the
2204    // server, so it has no metric or index constraints — but it cannot combine with
2205    // the multi-vector document layout.
2206    if descriptor.vector_encryption == VectorEncryption::ClientSide {
2207        if descriptor.multivector {
2208            return Err(Error::Unsupported(
2209                "client-side vector encryption is not supported for multi-vector collections",
2210            ));
2211        }
2212        return Ok(());
2213    }
2214    // DCPE (ADR-0031) preserves Euclidean distance comparison; the secret scaling
2215    // changes vector norms, so cosine and dot orderings are not preserved.
2216    if descriptor.vector_encryption == VectorEncryption::Dcpe
2217        && descriptor.metric != DistanceMetric::L2
2218    {
2219        return Err(Error::Unsupported(
2220            "dcpe-encrypted collections require the l2 metric",
2221        ));
2222    }
2223    // ColBERT is a late-interaction token-pool index (ADR-0034): valid only for a
2224    // multi-vector collection (which already requires a similarity metric).
2225    if descriptor.index.kind == IndexKind::Colbert && !descriptor.multivector {
2226        return Err(Error::Unsupported(
2227            "the colbert index is only for multi-vector collections",
2228        ));
2229    }
2230    match descriptor.index.kind {
2231        IndexKind::Vamana | IndexKind::Ivf | IndexKind::DiskVamana
2232            if descriptor.metric == DistanceMetric::Dot =>
2233        {
2234            Err(Error::Unsupported(
2235                "vamana, ivf, and the disk index support l2 and cosine; use hnsw for dot",
2236            ))
2237        }
2238        _ => Ok(()),
2239    }
2240}
2241
2242// An empty index of the kind the descriptor selects. Batch kinds start unbuilt.
2243fn empty_index(descriptor: &Descriptor) -> CollectionIndex {
2244    if descriptor.vector_encryption == VectorEncryption::ClientSide {
2245        return CollectionIndex::None;
2246    }
2247    match descriptor.index.kind {
2248        IndexKind::Vamana => CollectionIndex::Vamana(None),
2249        IndexKind::DiskVamana => CollectionIndex::Disk(None),
2250        IndexKind::Ivf => CollectionIndex::Ivf(None),
2251        IndexKind::Colbert => CollectionIndex::Colbert(None),
2252        _ => CollectionIndex::Hnsw(Hnsw::new(
2253            descriptor.dim as usize,
2254            to_index_metric(descriptor.metric),
2255            HnswConfig::default(),
2256        )),
2257    }
2258}
2259
2260// A product-quantization subspace count that divides `dim`, targeting roughly
2261// eight dimensions per subspace; falls back to one whole-vector codebook.
2262fn default_pq_m(dim: usize) -> usize {
2263    let target = (dim / 8).max(1);
2264    (1..=target)
2265        .rev()
2266        .find(|&m| dim.is_multiple_of(m))
2267        .unwrap_or(1)
2268}
2269
2270// Build the descriptor's index over `ids` (internal 0..n) and their flat vectors.
2271// Fixed seed for codebook training, so a collection's disk index is reproducible.
2272const PQ_SEED: u64 = 0x5176_5044_5141_5453;
2273// The disk index artifact, overwritten in place each rebuild; the caller drops
2274// the previous handle (unmapping the file) first.
2275const DISK_INDEX_FILE: &str = "vamana.qvx";
2276
2277fn build_index(
2278    store: &Store,
2279    cid: CollectionId,
2280    descriptor: &Descriptor,
2281    ids: &[u64],
2282    flat: &[f32],
2283) -> Result<CollectionIndex> {
2284    Ok(match build_in_memory_index(descriptor, ids, flat)? {
2285        Some(index) => index,
2286        None => {
2287            let (graph, pq) = build_disk_graph_pq(descriptor, ids, flat)?;
2288            CollectionIndex::Disk(Some(FreshDiskVamana::new(write_disk_index(
2289                store, cid, &graph, &pq,
2290            )?)?))
2291        }
2292    })
2293}
2294
2295// Build the in-memory index for a collection, or `None` for the disk-resident kind
2296// whose artifact must be sealed through the store. Pure given the vectors (no
2297// store, no I/O), so the off-lock rebuild (ADR-0062) calls it without a lock; the
2298// synchronous `build_index` and disk path layer the store I/O on top.
2299fn build_in_memory_index(
2300    descriptor: &Descriptor,
2301    ids: &[u64],
2302    flat: &[f32],
2303) -> Result<Option<CollectionIndex>> {
2304    // Client-side-encrypted collections have no server-side index (ADR-0032): the
2305    // server stores opaque ciphertext it never ranks.
2306    if descriptor.vector_encryption == VectorEncryption::ClientSide {
2307        return Ok(Some(CollectionIndex::None));
2308    }
2309    let dim = descriptor.dim as usize;
2310    let metric = to_index_metric(descriptor.metric);
2311    Ok(Some(match descriptor.index.kind {
2312        IndexKind::Vamana => CollectionIndex::Vamana(Some(FreshVamana::new(Vamana::build(
2313            ids,
2314            flat,
2315            dim,
2316            metric,
2317            VamanaConfig::default(),
2318        )?)?)),
2319        // The disk-resident artifact is sealed through the store, not here.
2320        IndexKind::DiskVamana => return Ok(None),
2321        IndexKind::Ivf => {
2322            let cfg = IvfConfig {
2323                quantization: descriptor.index.pq_subspaces.map(|m| m as usize),
2324                ..IvfConfig::default()
2325            };
2326            CollectionIndex::Ivf(Some(Ivf::build(ids, flat, dim, metric, cfg)?))
2327        }
2328        IndexKind::Colbert => {
2329            // Coarse centroids scale ~√(tokens); probe a quarter of them (PLAID),
2330            // and PQ the residual with the same subspace default as the disk path.
2331            let n = ids.len();
2332            let n_centroids = ((n as f64).sqrt().ceil() as usize).clamp(1, 4096);
2333            let cfg = ColbertConfig {
2334                n_centroids,
2335                n_probe: n_centroids.div_ceil(4).clamp(1, n_centroids),
2336                pq_subspaces: descriptor
2337                    .index
2338                    .pq_subspaces
2339                    .map_or_else(|| default_pq_m(dim), |m| m as usize),
2340                seed: PQ_SEED,
2341            };
2342            CollectionIndex::Colbert(Some(ColbertIndex::build(ids, flat, dim, metric, cfg)?))
2343        }
2344        _ => {
2345            let mut h = Hnsw::new(dim, metric, HnswConfig::default());
2346            for (i, &id) in ids.iter().enumerate() {
2347                h.insert(id, &flat[i * dim..(i + 1) * dim])?;
2348            }
2349            CollectionIndex::Hnsw(h)
2350        }
2351    }))
2352}
2353
2354// Build the Vamana graph + PQ codebook for the disk-resident index. Pure (no
2355// store, no I/O), so the off-lock rebuild (ADR-0062) does the expensive graph build
2356// and PQ training without a lock; `write_disk_index` then seals the result.
2357fn build_disk_graph_pq(
2358    descriptor: &Descriptor,
2359    ids: &[u64],
2360    flat: &[f32],
2361) -> Result<(Vamana, ProductQuantizer)> {
2362    let dim = descriptor.dim as usize;
2363    let metric = to_index_metric(descriptor.metric);
2364    let graph = Vamana::build(ids, flat, dim, metric, VamanaConfig::default())?;
2365    let m = descriptor
2366        .index
2367        .pq_subspaces
2368        .map_or_else(|| default_pq_m(dim), |x| x as usize);
2369    let pq = ProductQuantizer::train(flat, ids.len(), dim, m, metric, PQ_SEED)?;
2370    Ok((graph, pq))
2371}
2372
2373// Seal a prebuilt disk artifact under the collection's index dir with the store's
2374// codec, and open it for queries. Overwrites the artifact in place, so the caller
2375// must drop any prior `DiskVamana` for this collection first (its mmap assumes an
2376// immutable file).
2377fn write_disk_index(
2378    store: &Store,
2379    cid: CollectionId,
2380    graph: &Vamana,
2381    pq: &ProductQuantizer,
2382) -> Result<DiskVamana> {
2383    let dir = store.index_dir(cid);
2384    std::fs::create_dir_all(&dir).map_err(quiver_index::DiskError::Io)?;
2385    let path = dir.join(DISK_INDEX_FILE);
2386    // Seal the index artifact with the collection's own codec (its DEK under an
2387    // envelope key-ring), so a crypto-shred of the collection also makes its index
2388    // unreadable. The same owned handle writes and then mmap-opens it.
2389    let codec = store.collection_codec_clone(cid)?;
2390    // Atomic publish (ADR-0063): write to a temp file, fsync it (disk::write does
2391    // the file sync), rename over the live name, then fsync the dir. A crash mid
2392    // write leaves the previous complete base or none — never a torn file that the
2393    // durable load path could `mmap` and serve. The caller has already dropped any
2394    // prior `DiskVamana` (its mmap assumed the old inode), so the rename is safe.
2395    let tmp = dir.join(format!("{DISK_INDEX_FILE}.tmp"));
2396    quiver_index::disk::write(&tmp, graph, pq, codec.as_ref())?;
2397    std::fs::rename(&tmp, &path).map_err(quiver_index::DiskError::Io)?;
2398    let _ = std::fs::File::open(&dir).and_then(|f| f.sync_all());
2399    open_disk_index(store, cid, codec)
2400}
2401
2402// Open a collection's sealed disk base (`vamana.qvx`) for queries, decrypting with
2403// its codec. Errors (absent/torn/wrong-codec file) propagate so the durable load
2404// path can fall back to a rebuild (ADR-0063).
2405fn open_disk_index(
2406    store: &Store,
2407    cid: CollectionId,
2408    codec: Box<dyn PageCodec>,
2409) -> Result<DiskVamana> {
2410    let path = store.index_dir(cid).join(DISK_INDEX_FILE);
2411    Ok(DiskVamana::open(&path, codec)?)
2412}
2413
2414// Load a collection's index on open: restore the durable IVF snapshot and replay
2415// the post-checkpoint tail (ADR-0025) when one is present and intact, otherwise
2416// rebuild from the store. The snapshot is only a fast path — any problem reading
2417// or restoring it falls back to the authoritative rebuild.
2418fn load_index(store: &Store, handle: &mut CollectionHandle) -> Result<()> {
2419    // Multi-vector collections always rebuild on open, so the document grouping is
2420    // derived from the live rows; the snapshot fast-paths stay single-vector.
2421    if !handle.descriptor.multivector
2422        && handle.descriptor.index.kind == IndexKind::Ivf
2423        && let Ok(Some(blob)) = store.read_index_snapshot(handle.id)
2424        && restore_ivf_snapshot(store, handle, &blob).is_ok()
2425    {
2426        return Ok(());
2427    }
2428    // Durable DiskVamana (ADR-0063): mmap the frugal base instead of an O(N)
2429    // full-RAM rebuild. Any failure falls back to rebuild, so the artifact is never
2430    // load-bearing for correctness. The derived sparse index is left `None` (as for
2431    // IVF) and hybrid falls back to the store scan until the next rebuild.
2432    // `QUIVER_DISABLE_DURABLE_DISK_INDEX` is an ops kill switch: set it to force the
2433    // (always-correct) rebuild path if the durable load is ever suspected.
2434    if !handle.descriptor.multivector
2435        && handle.descriptor.index.kind == IndexKind::DiskVamana
2436        && std::env::var_os("QUIVER_DISABLE_DURABLE_DISK_INDEX").is_none()
2437        && let Ok(Some(blob)) = store.read_index_snapshot(handle.id)
2438        && restore_disk_snapshot(store, handle, &blob).is_ok()
2439    {
2440        return Ok(());
2441    }
2442    rebuild_index(store, handle)
2443}
2444
2445// Restore a DiskVamana from its durable snapshot (ADR-0063): `mmap` the immutable
2446// base file, rebuild the in-memory FreshDiskANN delta from the store by the implied
2447// delta ids, re-apply the tombstones, then replay the post-checkpoint WAL tail. Any
2448// problem — absent/torn/mismatched base, a missing delta row — returns an error so
2449// the caller falls back to an authoritative rebuild.
2450fn restore_disk_snapshot(store: &Store, handle: &mut CollectionHandle, blob: &[u8]) -> Result<()> {
2451    let envelope: DiskEnvelope = postcard::from_bytes(blob)?;
2452    if envelope.version != INDEX_ENVELOPE_VERSION {
2453        return Err(Error::Unsupported(
2454            "unsupported disk index snapshot version",
2455        ));
2456    }
2457    let base = open_disk_index(store, handle.id, store.collection_codec_clone(handle.id)?)?;
2458    // The blob's base count must match the opened file, or the (base, blob) pair is
2459    // inconsistent (e.g. a base torn or replaced after the blob was sealed).
2460    if base.len() as u64 != envelope.base_row_count {
2461        return Err(Error::Unsupported(
2462            "disk base count disagrees with snapshot",
2463        ));
2464    }
2465    handle.ext_to_int = envelope
2466        .int_to_ext
2467        .iter()
2468        .enumerate()
2469        .map(|(i, ext)| (ext.clone(), i as u64))
2470        .collect();
2471    handle.int_to_ext = envelope.int_to_ext;
2472    let mut fresh = FreshDiskVamana::new(base)?;
2473    // Reconstruct the delta: the ids above the base were inserted since the last
2474    // consolidation; their vectors live in the store, fetched by id (a row that
2475    // died this window is simply absent and need not enter the delta).
2476    for internal in envelope.base_row_count..handle.int_to_ext.len() as u64 {
2477        let ext = &handle.int_to_ext[internal as usize];
2478        if let Some(record) = store.get(handle.id, ext)? {
2479            fresh.insert(internal, &record.vector)?;
2480        }
2481    }
2482    for id in envelope.deleted_ids {
2483        fresh.mark_deleted(id);
2484    }
2485    handle.index = CollectionIndex::Disk(Some(fresh));
2486    handle.stale = false;
2487    replay_recovery_tail(store, handle)
2488}
2489
2490// Catch a restored index up to the store's current state by replaying the
2491// post-checkpoint WAL tail through the shared incremental apply path (ADR-0025/0063):
2492// tombstones first, then active-buffer upserts, so a row shadowed this window ends
2493// with its new vector. Bounded by the checkpoint cadence, not the collection size.
2494fn replay_recovery_tail(store: &Store, handle: &mut CollectionHandle) -> Result<()> {
2495    let tail = store.recovery_tail(handle.id)?;
2496    for ext in &tail.deleted {
2497        index_delete_point(handle, ext);
2498    }
2499    for (ext, record) in tail.upserts {
2500        index_upsert_point(handle, &ext, &record.vector)?;
2501    }
2502    Ok(())
2503}
2504
2505// Restore an IVF from its snapshot envelope and catch it up to the store's
2506// current state by replaying the post-checkpoint tail (ADR-0025). Tombstoned ids
2507// are removed before the active upserts are applied, so a row shadowed this
2508// window (present in both) ends with its new vector.
2509fn restore_ivf_snapshot(store: &Store, handle: &mut CollectionHandle, blob: &[u8]) -> Result<()> {
2510    let envelope: IndexEnvelope = postcard::from_bytes(blob)?;
2511    if envelope.version != INDEX_ENVELOPE_VERSION {
2512        return Err(Error::Unsupported(
2513            "unsupported index snapshot envelope version",
2514        ));
2515    }
2516    let ivf = Ivf::restore(&envelope.ivf)?;
2517    handle.ext_to_int = envelope
2518        .int_to_ext
2519        .iter()
2520        .enumerate()
2521        .map(|(i, ext)| (ext.clone(), i as u64))
2522        .collect();
2523    handle.int_to_ext = envelope.int_to_ext;
2524    handle.index = CollectionIndex::Ivf(Some(ivf));
2525    handle.stale = false;
2526
2527    let tail = store.recovery_tail(handle.id)?;
2528    for ext in &tail.deleted {
2529        let Some(&internal) = handle.ext_to_int.get(ext) else {
2530            continue;
2531        };
2532        if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2533            ivf.remove(internal);
2534        }
2535    }
2536    for (ext, record) in tail.upserts {
2537        let internal = match handle.ext_to_int.get(&ext) {
2538            Some(&i) => i,
2539            None => {
2540                let i = handle.int_to_ext.len() as u64;
2541                handle.ext_to_int.insert(ext.clone(), i);
2542                handle.int_to_ext.push(ext);
2543                i
2544            }
2545        };
2546        if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2547            ivf.insert(internal, &record.vector)?;
2548        }
2549    }
2550    Ok(())
2551}
2552
2553// Fold one point write (`ext_id` → `vector`) into a built index incrementally,
2554// or mark the handle stale to defer to a rebuild when the kind cannot absorb it
2555// in place. HNSW absorbs a brand-new id but cannot update one (a known id falls
2556// to a rebuild); a built/trained IVF inserts or replaces any id (ADR-0023); a
2557// built FreshDiskANN graph appends to its delta and tombstones the prior copy on
2558// an update, consolidating past its churn threshold (ADR-0033). Shared by the
2559// single-vector `upsert` and each token row of a multi-vector `upsert_document`
2560// (ADR-0034). A no-op once the handle is stale (a rebuild is already pending).
2561fn index_upsert_point(handle: &mut CollectionHandle, ext_id: &str, vector: &[f32]) -> Result<()> {
2562    // MVCC mode: append to the published overlay instead of mutating the immutable
2563    // base index (ADR-0064), keeping lock-free readers race-free. Bumps the write
2564    // generation itself.
2565    if mvcc_served(handle) {
2566        overlay_upsert(handle, ext_id, vector);
2567        return Ok(());
2568    }
2569    // Every write bumps the generation, even one skipped here because a rebuild is
2570    // already pending — an in-flight off-lock rebuild must still notice it (ADR-0062).
2571    bump_write_gen(handle);
2572    if handle.stale {
2573        return Ok(());
2574    }
2575    let known = handle.ext_to_int.contains_key(ext_id);
2576    let is_hnsw = matches!(handle.index, CollectionIndex::Hnsw(_));
2577    let is_live_ivf = matches!(&handle.index, CollectionIndex::Ivf(Some(ivf)) if !ivf.is_empty());
2578    let is_live_graph = matches!(
2579        handle.index,
2580        CollectionIndex::Vamana(Some(_)) | CollectionIndex::Disk(Some(_))
2581    );
2582    let is_live_colbert = matches!(handle.index, CollectionIndex::Colbert(Some(_)));
2583    if is_hnsw && !known {
2584        let internal = handle.int_to_ext.len() as u64;
2585        if let CollectionIndex::Hnsw(h) = &mut handle.index {
2586            h.insert(internal, vector)?;
2587        }
2588        handle.ext_to_int.insert(ext_id.to_owned(), internal);
2589        handle.int_to_ext.push(ext_id.to_owned());
2590    } else if is_live_ivf {
2591        // Reuse the internal id for an in-place update; allocate a fresh, dense one
2592        // for a new id (so `int_to_ext` stays index-addressable).
2593        let internal = if known {
2594            handle.ext_to_int[ext_id]
2595        } else {
2596            let i = handle.int_to_ext.len() as u64;
2597            handle.ext_to_int.insert(ext_id.to_owned(), i);
2598            handle.int_to_ext.push(ext_id.to_owned());
2599            i
2600        };
2601        if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2602            ivf.insert(internal, vector)?;
2603        }
2604    } else if is_live_graph {
2605        // A graph cannot update a node in place: append a new delta node under a
2606        // fresh internal id and tombstone the prior copy on an update (ADR-0033).
2607        let old = handle.ext_to_int.get(ext_id).copied();
2608        let internal = handle.int_to_ext.len() as u64;
2609        let mut pending = 0.0;
2610        match &mut handle.index {
2611            CollectionIndex::Vamana(Some(fresh)) => {
2612                if let Some(o) = old {
2613                    fresh.mark_deleted(o);
2614                }
2615                fresh.insert(internal, vector)?;
2616                pending = fresh.pending_fraction();
2617            }
2618            CollectionIndex::Disk(Some(fresh)) => {
2619                if let Some(o) = old {
2620                    fresh.mark_deleted(o);
2621                }
2622                fresh.insert(internal, vector)?;
2623                pending = fresh.pending_fraction();
2624            }
2625            _ => {}
2626        }
2627        handle.ext_to_int.insert(ext_id.to_owned(), internal);
2628        handle.int_to_ext.push(ext_id.to_owned());
2629        if pending >= GRAPH_REBUILD_PENDING_FRACTION {
2630            mark_stale(handle);
2631        }
2632    } else if is_live_colbert {
2633        // ColBERT appends a new token and tombstones the prior copy on an update —
2634        // its centroids are fixed until a rebuild (ADR-0034); the deletion fraction
2635        // drives consolidation, as for HNSW.
2636        let old = handle.ext_to_int.get(ext_id).copied();
2637        let internal = handle.int_to_ext.len() as u64;
2638        let mut crowded = false;
2639        if let CollectionIndex::Colbert(Some(c)) = &mut handle.index {
2640            if let Some(o) = old {
2641                c.mark_deleted(o);
2642            }
2643            c.insert(internal, vector)?;
2644            crowded = c.deleted_fraction() >= HNSW_REBUILD_DELETED_FRACTION;
2645        }
2646        handle.ext_to_int.insert(ext_id.to_owned(), internal);
2647        handle.int_to_ext.push(ext_id.to_owned());
2648        if crowded {
2649            mark_stale(handle);
2650        }
2651    } else {
2652        mark_stale(handle);
2653    }
2654    Ok(())
2655}
2656
2657// Tombstone one point (`ext_id`) from a built index incrementally, or mark the
2658// handle stale to defer to a rebuild. A built IVF removes in place (ADR-0023), a
2659// built HNSW soft-deletes (ADR-0026), and a built FreshDiskANN graph tombstones in
2660// its deletion set (ADR-0033); each amortizes a rebuild when tombstones dominate.
2661// The id→internal mapping is kept so a later re-insert allocates afresh. Shared by
2662// the single-vector `delete` and each token row of `delete_document` (ADR-0034).
2663fn index_delete_point(handle: &mut CollectionHandle, ext_id: &str) {
2664    // MVCC mode: tombstone the id in the published overlay (ADR-0064); the base
2665    // stays immutable. Bumps the write generation itself.
2666    if mvcc_served(handle) {
2667        overlay_delete(handle, ext_id);
2668        return;
2669    }
2670    // Every write bumps the generation, even one skipped here (see `index_upsert_point`).
2671    bump_write_gen(handle);
2672    if handle.stale {
2673        return;
2674    }
2675    let internal = handle.ext_to_int.get(ext_id).copied();
2676    let live_ivf = matches!(handle.index, CollectionIndex::Ivf(Some(_)));
2677    let live_hnsw = matches!(handle.index, CollectionIndex::Hnsw(_));
2678    let live_graph = matches!(
2679        handle.index,
2680        CollectionIndex::Vamana(Some(_)) | CollectionIndex::Disk(Some(_))
2681    );
2682    let live_colbert = matches!(handle.index, CollectionIndex::Colbert(Some(_)));
2683    match internal {
2684        Some(internal) if live_ivf => {
2685            if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2686                ivf.remove(internal);
2687            }
2688        }
2689        Some(internal) if live_hnsw => {
2690            let mut crowded = false;
2691            if let CollectionIndex::Hnsw(h) = &mut handle.index {
2692                h.mark_deleted(internal as u32);
2693                crowded = h.deleted_fraction() >= HNSW_REBUILD_DELETED_FRACTION;
2694            }
2695            if crowded {
2696                mark_stale(handle);
2697            }
2698        }
2699        Some(internal) if live_graph => {
2700            let mut crowded = false;
2701            match &mut handle.index {
2702                CollectionIndex::Vamana(Some(fresh)) => {
2703                    fresh.mark_deleted(internal);
2704                    crowded = fresh.pending_fraction() >= GRAPH_REBUILD_PENDING_FRACTION;
2705                }
2706                CollectionIndex::Disk(Some(fresh)) => {
2707                    fresh.mark_deleted(internal);
2708                    crowded = fresh.pending_fraction() >= GRAPH_REBUILD_PENDING_FRACTION;
2709                }
2710                _ => {}
2711            }
2712            if crowded {
2713                mark_stale(handle);
2714            }
2715        }
2716        Some(internal) if live_colbert => {
2717            let mut crowded = false;
2718            if let CollectionIndex::Colbert(Some(c)) = &mut handle.index {
2719                c.mark_deleted(internal);
2720                crowded = c.deleted_fraction() >= HNSW_REBUILD_DELETED_FRACTION;
2721            }
2722            if crowded {
2723                mark_stale(handle);
2724            }
2725        }
2726        _ => mark_stale(handle),
2727    }
2728}
2729
2730// The product of scanning a collection's live rows: the dense id maps + contiguous
2731// vectors, the multi-vector document grouping, and the derived sparse inverted
2732// index — everything a rebuild needs from the store, owned so the build can then
2733// proceed with no store and no lock (ADR-0062).
2734struct RebuildScan {
2735    int_to_ext: Vec<String>,
2736    ext_to_int: HashMap<String, u64>,
2737    flat: Vec<f32>,
2738    docs: Option<BTreeMap<String, u32>>,
2739    sparse: Option<SparseInvertedIndex>,
2740}
2741
2742// Scan a collection's live rows into a [`RebuildScan`]. Rebuilds the derived sparse
2743// inverted index (ADR-0045) and, for a multi-vector collection, the document
2744// grouping (doc id → token count) authoritatively from those rows. `&self` on the
2745// store, so it runs under a shared read lock.
2746fn scan_collection(store: &Store, handle: &CollectionHandle) -> Result<RebuildScan> {
2747    let multivector = handle.descriptor.multivector;
2748    let mut int_to_ext = Vec::new();
2749    let mut ext_to_int = HashMap::new();
2750    let mut flat: Vec<f32> = Vec::new();
2751    let mut docs: BTreeMap<String, u32> = BTreeMap::new();
2752    // Only single-vector, server-searchable collections carry a sparse index, and
2753    // only non-empty payloads are parsed, so non-sparse collections pay nothing.
2754    let mut sparse = uses_sparse_index(&handle.descriptor).then(SparseInvertedIndex::new);
2755    for (ext_id, record) in store.scan(handle.id)? {
2756        let internal = int_to_ext.len() as u64;
2757        flat.extend_from_slice(&record.vector);
2758        if multivector && let Some((doc, _)) = parse_token_id(&ext_id) {
2759            *docs.entry(doc.to_owned()).or_insert(0) += 1;
2760        }
2761        if let Some(idx) = sparse.as_mut()
2762            && let Some(sv) = sparse_vector_from_payload(&record.payload)
2763        {
2764            idx.upsert(&ext_id, &sv);
2765        }
2766        ext_to_int.insert(ext_id.clone(), internal);
2767        int_to_ext.push(ext_id);
2768    }
2769    Ok(RebuildScan {
2770        int_to_ext,
2771        ext_to_int,
2772        flat,
2773        docs: multivector.then_some(docs),
2774        sparse,
2775    })
2776}
2777
2778// Rebuild a collection's index from the store's current live rows, synchronously
2779// under `&mut self` — the embedded/open path. The server's runtime path builds
2780// off-lock instead (ADR-0062: `snapshot_rebuild_inputs` → `build` → `commit_rebuild`).
2781fn rebuild_index(store: &Store, handle: &mut CollectionHandle) -> Result<()> {
2782    let scan = scan_collection(store, handle)?;
2783    let ids: Vec<u64> = (0..scan.int_to_ext.len() as u64).collect();
2784    // Drop the previous index before rebuilding: a disk index `mmap`s a file we
2785    // are about to overwrite in place, and the mapping assumes an immutable file.
2786    handle.index = empty_index(&handle.descriptor);
2787    handle.index = build_index(store, handle.id, &handle.descriptor, &ids, &scan.flat)?;
2788    handle.int_to_ext = scan.int_to_ext;
2789    handle.ext_to_int = scan.ext_to_int;
2790    handle.docs = scan.docs;
2791    handle.sparse = scan.sparse;
2792    handle.stale = false;
2793    Ok(())
2794}
2795
2796/// A captured, owned snapshot of everything an off-lock rebuild needs (ADR-0062):
2797/// the scanned rows, the collection's descriptor, and the write generation at
2798/// capture time. Produced under the shared read lock by
2799/// [`Database::snapshot_rebuild_inputs`]; [`RebuildInputs::build`] then constructs
2800/// the new index with no lock held.
2801pub struct RebuildInputs {
2802    collection: String,
2803    descriptor: Descriptor,
2804    scan: RebuildScan,
2805    write_gen: u64,
2806}
2807
2808// The built product of a [`RebuildInputs`], ready to install. In-memory indexes are
2809// fully built; the disk-resident artifact carries its prebuilt graph + PQ, which
2810// `commit_rebuild` seals through the store under the brief write lock (the file
2811// write must not race the prior index's `mmap`).
2812enum RebuiltKind {
2813    Ready(Box<CollectionIndex>),
2814    Disk {
2815        graph: Box<Vamana>,
2816        pq: Box<ProductQuantizer>,
2817    },
2818}
2819
2820/// A new index built off-lock from a [`RebuildInputs`], ready for
2821/// [`Database::commit_rebuild`] to install under the brief write lock (ADR-0062).
2822pub struct RebuiltIndex {
2823    collection: String,
2824    kind: RebuiltKind,
2825    int_to_ext: Vec<String>,
2826    ext_to_int: HashMap<String, u64>,
2827    docs: Option<BTreeMap<String, u32>>,
2828    sparse: Option<SparseInvertedIndex>,
2829    write_gen: u64,
2830}
2831
2832impl RebuildInputs {
2833    /// Build the new index from the captured rows with **no lock held** — the
2834    /// expensive CPU work (graph build, PQ training, HNSW insertion) of an off-lock
2835    /// rebuild (ADR-0062). The disk artifact is built in memory here; its file is
2836    /// sealed later by `commit_rebuild` under the write lock.
2837    pub fn build(self) -> Result<RebuiltIndex> {
2838        let ids: Vec<u64> = (0..self.scan.int_to_ext.len() as u64).collect();
2839        let kind = match build_in_memory_index(&self.descriptor, &ids, &self.scan.flat)? {
2840            Some(index) => RebuiltKind::Ready(Box::new(index)),
2841            None => {
2842                let (graph, pq) = build_disk_graph_pq(&self.descriptor, &ids, &self.scan.flat)?;
2843                RebuiltKind::Disk {
2844                    graph: Box::new(graph),
2845                    pq: Box::new(pq),
2846                }
2847            }
2848        };
2849        Ok(RebuiltIndex {
2850            collection: self.collection,
2851            kind,
2852            int_to_ext: self.scan.int_to_ext,
2853            ext_to_int: self.scan.ext_to_int,
2854            docs: self.scan.docs,
2855            sparse: self.scan.sparse,
2856            write_gen: self.write_gen,
2857        })
2858    }
2859}
2860
2861// Extract a point's sparse vector from its serialized payload, if it carries one
2862// under `__quiver_sparse__` and the value deserializes. An empty payload or a
2863// missing/malformed sparse vector yields `None`.
2864fn sparse_vector_from_payload(payload: &[u8]) -> Option<SparseVector> {
2865    if payload.is_empty() {
2866        return None;
2867    }
2868    let value = serde_json::from_slice::<Value>(payload).ok()?;
2869    sparse_vector_from_value(&value)
2870}
2871
2872// As [`sparse_vector_from_payload`] but over an already-parsed payload `Value`
2873// (the upsert path has the payload before it is serialized). An explicit
2874// `__quiver_sparse__` vector wins; otherwise a `__quiver_text__` string is
2875// tokenized into a term-frequency vector (ADR-0046), so a point is searchable by
2876// BM25 over text alone.
2877fn sparse_vector_from_value(payload: &Value) -> Option<SparseVector> {
2878    if let Some(raw) = payload.get(SPARSE_KEY) {
2879        return serde_json::from_value::<SparseVector>(raw.clone()).ok();
2880    }
2881    let text = payload.get(TEXT_KEY)?.as_str()?;
2882    Some(text_to_sparse(text))
2883}
2884
2885// Maintain the derived sparse inverted index for one point write (ADR-0045): index
2886// the point's sparse vector, or drop any prior entry when the new payload no longer
2887// carries one (an update that removed it). A no-op when the collection has no
2888// inverted index, or when a rebuild is pending — the rebuild repopulates the index
2889// authoritatively from the store, exactly as it does the dense index.
2890fn sparse_index_upsert_point(handle: &mut CollectionHandle, ext_id: &str, payload: &Value) {
2891    if handle.stale {
2892        return;
2893    }
2894    let Some(idx) = handle.sparse.as_mut() else {
2895        return;
2896    };
2897    match sparse_vector_from_value(payload) {
2898        Some(sv) => idx.upsert(ext_id, &sv),
2899        None => {
2900            idx.remove(ext_id);
2901        }
2902    }
2903}
2904
2905// Drop one point from the derived sparse inverted index. A no-op when the
2906// collection has no inverted index.
2907fn sparse_index_delete_point(handle: &mut CollectionHandle, ext_id: &str) {
2908    if let Some(idx) = handle.sparse.as_mut() {
2909        idx.remove(ext_id);
2910    }
2911}
2912
2913// The set of live external ids guaranteed to contain every row the `filter`
2914// accepts (a sound superset), resolved through the collection's secondary
2915// indexes. `None` means the filter cannot be narrowed with the available indexes
2916// and the caller should post-filter instead; `Some(set)` is a candidate
2917// superset, and an empty set proves no row can match.
2918//
2919// Only indexable leaves — equality, `in`, and range comparisons on declared
2920// filterable fields of a matching type — constrain the set. Everything else
2921// (negation, existence, `ne`, non-indexed fields, type mismatches) is treated as
2922// unconstrained, which keeps the result a superset; exactness is restored when
2923// the caller re-checks the full `Filter` on each surviving candidate.
2924fn candidate_ids(
2925    store: &Store,
2926    cid: CollectionId,
2927    filter: &Filter,
2928    filterable: &[FilterableField],
2929) -> Result<Option<BTreeSet<String>>> {
2930    match filter {
2931        Filter::And(subs) => {
2932            // Intersect the constrained children; an unconstrained child (None)
2933            // is dropped, which only widens the set — still a superset.
2934            let mut acc: Option<BTreeSet<String>> = None;
2935            for sub in subs {
2936                if let Some(set) = candidate_ids(store, cid, sub, filterable)? {
2937                    acc = Some(match acc {
2938                        Some(existing) => existing.intersection(&set).cloned().collect(),
2939                        None => set,
2940                    });
2941                }
2942            }
2943            Ok(acc)
2944        }
2945        Filter::Or(subs) => {
2946            // The union of the children — but one unconstrained child makes the
2947            // whole disjunction unconstrained (its rows could be anything).
2948            let mut acc = BTreeSet::new();
2949            for sub in subs {
2950                match candidate_ids(store, cid, sub, filterable)? {
2951                    Some(set) => acc.extend(set),
2952                    None => return Ok(None),
2953                }
2954            }
2955            Ok(Some(acc))
2956        }
2957        // A negation cannot be narrowed to a superset with these indexes.
2958        Filter::Not(_) => Ok(None),
2959        // A leaf: indexable ⇒ its matching ids; otherwise unconstrained.
2960        leaf => match leaf_predicate(leaf, filterable) {
2961            Some(pred) => Ok(Some(store.matching_ids(cid, &pred)?.into_iter().collect())),
2962            None => Ok(None),
2963        },
2964    }
2965}
2966
2967// Map a single filter leaf to an indexable secondary-index predicate, if its
2968// field is declared filterable and the value types line up. Boolean nodes and
2969// predicates the secondary index cannot answer (`Ne`, `Exists`) return `None`.
2970fn leaf_predicate(filter: &Filter, filterable: &[FilterableField]) -> Option<SecPredicate> {
2971    let field_type = |field: &str| {
2972        filterable
2973            .iter()
2974            .find(|f| f.path == field)
2975            .map(|f| f.field_type)
2976    };
2977    match filter {
2978        Filter::Eq { field, value } => Some(SecPredicate::Eq {
2979            field: field.clone(),
2980            value: sec_value(field_type(field)?, value)?,
2981        }),
2982        Filter::In { field, values } => {
2983            let ft = field_type(field)?;
2984            // Every value must encode, or the `in` set would be understated and
2985            // the candidate superset unsound.
2986            let values: Option<Vec<SecValue>> = values.iter().map(|v| sec_value(ft, v)).collect();
2987            Some(SecPredicate::In {
2988                field: field.clone(),
2989                values: values?,
2990            })
2991        }
2992        Filter::Lt { field, value } => {
2993            one_sided_range(field, field_type(field)?, value, false, false)
2994        }
2995        Filter::Lte { field, value } => {
2996            one_sided_range(field, field_type(field)?, value, false, true)
2997        }
2998        Filter::Gt { field, value } => {
2999            one_sided_range(field, field_type(field)?, value, true, false)
3000        }
3001        Filter::Gte { field, value } => {
3002            one_sided_range(field, field_type(field)?, value, true, true)
3003        }
3004        _ => None,
3005    }
3006}
3007
3008// Build a one-sided range predicate from a comparison leaf. `is_lower` selects
3009// whether `value` is the lower (`Gt`/`Gte`) or upper (`Lt`/`Lte`) bound;
3010// `inclusive` is that bound's inclusivity. `None` on a type mismatch.
3011fn one_sided_range(
3012    field: &str,
3013    field_type: FieldType,
3014    value: &Value,
3015    is_lower: bool,
3016    inclusive: bool,
3017) -> Option<SecPredicate> {
3018    let v = sec_value(field_type, value)?;
3019    let (lo, hi, lo_inclusive, hi_inclusive) = if is_lower {
3020        (Some(v), None, inclusive, false)
3021    } else {
3022        (None, Some(v), false, inclusive)
3023    };
3024    Some(SecPredicate::Range {
3025        field: field.to_owned(),
3026        lo,
3027        hi,
3028        lo_inclusive,
3029        hi_inclusive,
3030    })
3031}
3032
3033// Encode a filter value as a typed secondary-index value, or `None` when the
3034// JSON type does not match the field's declared type (so it cannot be
3035// pre-filtered and the planner falls back to post-filtering).
3036fn sec_value(field_type: FieldType, value: &Value) -> Option<SecValue> {
3037    match (field_type, value) {
3038        (FieldType::Keyword, Value::String(s)) => Some(SecValue::Keyword(s.clone())),
3039        (FieldType::Numeric, Value::Number(n)) => n.as_f64().map(SecValue::Numeric),
3040        _ => None,
3041    }
3042}
3043
3044#[cfg(test)]
3045mod tests {
3046    use super::*;
3047    use serde_json::json;
3048
3049    fn desc() -> Descriptor {
3050        Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
3051    }
3052
3053    fn open(dir: &Path) -> Database {
3054        Database::open(dir).unwrap()
3055    }
3056
3057    #[test]
3058    fn hybrid_search_fuses_dense_and_sparse() {
3059        let tmp = tempfile::tempdir().unwrap();
3060        let mut db = open(tmp.path());
3061        db.create_collection("kb", desc()).unwrap();
3062        // "a" is the nearest dense neighbour of the query; "b" shares the query's
3063        // sparse terms; "c" is good on neither.
3064        db.upsert(
3065            "kb",
3066            "a",
3067            &[1.0, 0.0, 0.0, 0.0],
3068            &json!({ "__quiver_sparse__": { "indices": [100], "values": [0.1] } }),
3069        )
3070        .unwrap();
3071        db.upsert(
3072            "kb",
3073            "b",
3074            &[0.0, 1.0, 0.0, 0.0],
3075            &json!({ "__quiver_sparse__": { "indices": [1, 2], "values": [5.0, 5.0] } }),
3076        )
3077        .unwrap();
3078        db.upsert(
3079            "kb",
3080            "c",
3081            &[0.0, 0.0, 0.0, 1.0],
3082            &json!({ "__quiver_sparse__": { "indices": [9], "values": [1.0] } }),
3083        )
3084        .unwrap();
3085
3086        let dense_q = [1.0, 0.0, 0.0, 0.0];
3087        let sparse_q = SparseVector {
3088            indices: vec![1, 2],
3089            values: vec![1.0, 1.0],
3090        };
3091        let params = SearchParams {
3092            k: 3,
3093            ..SearchParams::default()
3094        };
3095
3096        // Hybrid: "a" (dense) and "b" (sparse) both rank above "c" (neither).
3097        let hits = db
3098            .hybrid_search(
3099                "kb",
3100                Some(&dense_q),
3101                Some(&sparse_q),
3102                None,
3103                &params,
3104                DEFAULT_RRF_K0,
3105            )
3106            .unwrap();
3107        let ids: Vec<&str> = hits.iter().map(|m| m.id.as_str()).collect();
3108        assert!(ids.contains(&"a") && ids.contains(&"b"), "got {ids:?}");
3109        assert_eq!(ids[2], "c", "c is worst on both sides; got {ids:?}");
3110
3111        // Pure sparse: only "b" shares the query's terms.
3112        let sparse_only = db
3113            .hybrid_search("kb", None, Some(&sparse_q), None, &params, DEFAULT_RRF_K0)
3114            .unwrap();
3115        assert_eq!(sparse_only[0].id, "b");
3116
3117        // Pure dense: "a" is nearest.
3118        let dense_only = db
3119            .hybrid_search("kb", Some(&dense_q), None, None, &params, DEFAULT_RRF_K0)
3120            .unwrap();
3121        assert_eq!(dense_only[0].id, "a");
3122
3123        // Neither query is an error.
3124        assert!(
3125            db.hybrid_search("kb", None, None, None, &params, DEFAULT_RRF_K0)
3126                .is_err()
3127        );
3128    }
3129
3130    // Pure-sparse hybrid result ids, in fused order.
3131    fn sparse_ids(db: &mut Database, q: &SparseVector) -> Vec<String> {
3132        let params = SearchParams {
3133            k: 10,
3134            ..SearchParams::default()
3135        };
3136        db.hybrid_search("kb", None, Some(q), None, &params, DEFAULT_RRF_K0)
3137            .unwrap()
3138            .into_iter()
3139            .map(|m| m.id)
3140            .collect()
3141    }
3142
3143    #[test]
3144    fn sparse_index_equals_the_store_scan_fallback() {
3145        let tmp = tempfile::tempdir().unwrap();
3146        let mut db = open(tmp.path());
3147        db.create_collection("kb", desc()).unwrap();
3148        let z = [0.0f32, 0.0, 0.0, 0.0];
3149        for (id, dims, vals) in [
3150            ("a", vec![1u32, 2], vec![5.0f32, 1.0]),
3151            ("b", vec![2u32, 3], vec![3.0f32, 4.0]),
3152            ("c", vec![1u32, 3], vec![2.0f32, 2.0]),
3153            ("d", vec![9u32], vec![1.0f32]), // shares no query term
3154        ] {
3155            db.upsert(
3156                "kb",
3157                id,
3158                &z,
3159                &json!({ "__quiver_sparse__": { "indices": dims, "values": vals } }),
3160            )
3161            .unwrap();
3162        }
3163        let q = SparseVector {
3164            indices: vec![1, 2, 3],
3165            values: vec![1.0, 1.0, 1.0],
3166        };
3167
3168        // The derived index is present and used.
3169        assert!(db.collections.get("kb").unwrap().sparse.is_some());
3170        let via_index = sparse_ids(&mut db, &q);
3171        assert!(!via_index.contains(&"d".to_owned()), "d shares no term");
3172
3173        // Drop the index (not stale, so no rebuild) → the store-scan fallback runs
3174        // and must return the identical ranking.
3175        db.collections.get_mut("kb").unwrap().sparse = None;
3176        let via_scan = sparse_ids(&mut db, &q);
3177        assert_eq!(via_index, via_scan);
3178    }
3179
3180    #[test]
3181    fn sparse_index_reflects_updates_and_deletes_like_a_rebuild() {
3182        let tmp = tempfile::tempdir().unwrap();
3183        let mut db = open(tmp.path());
3184        db.create_collection("kb", desc()).unwrap();
3185        let z = [0.0f32, 0.0, 0.0, 0.0];
3186        db.upsert(
3187            "kb",
3188            "a",
3189            &z,
3190            &json!({ "__quiver_sparse__": { "indices": [1, 2], "values": [5.0, 5.0] } }),
3191        )
3192        .unwrap();
3193        db.upsert(
3194            "kb",
3195            "b",
3196            &z,
3197            &json!({ "__quiver_sparse__": { "indices": [2], "values": [3.0] } }),
3198        )
3199        .unwrap();
3200        db.upsert(
3201            "kb",
3202            "c",
3203            &z,
3204            &json!({ "__quiver_sparse__": { "indices": [1], "values": [9.0] } }),
3205        )
3206        .unwrap();
3207        // Update "a" onto a disjoint term; delete "b".
3208        db.upsert(
3209            "kb",
3210            "a",
3211            &z,
3212            &json!({ "__quiver_sparse__": { "indices": [7], "values": [1.0] } }),
3213        )
3214        .unwrap();
3215        assert!(db.delete("kb", "b").unwrap());
3216
3217        let q = SparseVector {
3218            indices: vec![1, 2],
3219            values: vec![1.0, 1.0],
3220        };
3221        // Only "c" still shares a query term ("a" moved to 7, "b" is gone).
3222        let incremental = sparse_ids(&mut db, &q);
3223        assert_eq!(incremental, vec!["c".to_owned()]);
3224
3225        // A full rebuild from the store must agree with the incremental state.
3226        db.collections.get_mut("kb").unwrap().stale = true;
3227        let rebuilt = sparse_ids(&mut db, &q);
3228        assert_eq!(incremental, rebuilt);
3229    }
3230
3231    #[test]
3232    fn sparse_index_is_rebuilt_on_reopen() {
3233        let tmp = tempfile::tempdir().unwrap();
3234        {
3235            let mut db = open(tmp.path());
3236            db.create_collection("kb", desc()).unwrap();
3237            db.upsert(
3238                "kb",
3239                "a",
3240                &[0.0, 0.0, 0.0, 0.0],
3241                &json!({ "__quiver_sparse__": { "indices": [1], "values": [1.0] } }),
3242            )
3243            .unwrap();
3244        }
3245        let mut db = open(tmp.path());
3246        assert!(db.collections.get("kb").unwrap().sparse.is_some());
3247        let q = SparseVector {
3248            indices: vec![1],
3249            values: vec![1.0],
3250        };
3251        assert_eq!(sparse_ids(&mut db, &q), vec!["a".to_owned()]);
3252    }
3253
3254    #[test]
3255    fn hybrid_sparse_honours_the_payload_filter() {
3256        let tmp = tempfile::tempdir().unwrap();
3257        let mut db = open(tmp.path());
3258        db.create_collection("kb", desc()).unwrap();
3259        let z = [0.0f32, 0.0, 0.0, 0.0];
3260        db.upsert(
3261            "kb",
3262            "a",
3263            &z,
3264            &json!({ "lang": "en", "__quiver_sparse__": { "indices": [1], "values": [5.0] } }),
3265        )
3266        .unwrap();
3267        db.upsert(
3268            "kb",
3269            "b",
3270            &z,
3271            &json!({ "lang": "fr", "__quiver_sparse__": { "indices": [1], "values": [9.0] } }),
3272        )
3273        .unwrap();
3274        let q = SparseVector {
3275            indices: vec![1],
3276            values: vec![1.0],
3277        };
3278        let params = SearchParams {
3279            k: 10,
3280            filter: Some(Filter::Eq {
3281                field: "lang".to_owned(),
3282                value: json!("en"),
3283            }),
3284            ..SearchParams::default()
3285        };
3286        let hits: Vec<String> = db
3287            .hybrid_search("kb", None, Some(&q), None, &params, DEFAULT_RRF_K0)
3288            .unwrap()
3289            .into_iter()
3290            .map(|m| m.id)
3291            .collect();
3292        // "b" scores higher but is filtered out by lang == "en".
3293        assert_eq!(hits, vec!["a".to_owned()]);
3294    }
3295
3296    #[test]
3297    fn hybrid_text_search_indexes_and_ranks_by_bm25() {
3298        let tmp = tempfile::tempdir().unwrap();
3299        let mut db = open(tmp.path());
3300        db.create_collection("kb", desc()).unwrap();
3301        let z = [0.0f32, 0.0, 0.0, 0.0];
3302        // Points carry only a `__quiver_text__` string — tokenized at ingest.
3303        db.upsert(
3304            "kb",
3305            "cats",
3306            &z,
3307            &json!({ "__quiver_text__": "the quick brown cat jumps" }),
3308        )
3309        .unwrap();
3310        db.upsert(
3311            "kb",
3312            "dogs",
3313            &z,
3314            &json!({ "__quiver_text__": "a lazy dog sleeps all day" }),
3315        )
3316        .unwrap();
3317
3318        let params = SearchParams {
3319            k: 10,
3320            ..SearchParams::default()
3321        };
3322        // A text query (no dense, no explicit sparse) ranks the lexical match first;
3323        // "cat"/"cats" conflate via the stemmer.
3324        let hits: Vec<String> = db
3325            .hybrid_search("kb", None, None, Some("cats"), &params, DEFAULT_RRF_K0)
3326            .unwrap()
3327            .into_iter()
3328            .map(|m| m.id)
3329            .collect();
3330        assert_eq!(hits, vec!["cats".to_owned()], "only the cat doc matches");
3331
3332        // A non-matching query returns nothing from the text side.
3333        assert!(
3334            db.hybrid_search("kb", None, None, Some("elephant"), &params, DEFAULT_RRF_K0)
3335                .unwrap()
3336                .is_empty()
3337        );
3338
3339        // Text fuses with a dense query through the same RRF path.
3340        let dense_q = [1.0, 0.0, 0.0, 0.0];
3341        db.upsert("kb", "near", &[1.0, 0.0, 0.0, 0.0], &json!({}))
3342            .unwrap();
3343        let fused: Vec<String> = db
3344            .hybrid_search(
3345                "kb",
3346                Some(&dense_q),
3347                None,
3348                Some("dog"),
3349                &params,
3350                DEFAULT_RRF_K0,
3351            )
3352            .unwrap()
3353            .into_iter()
3354            .map(|m| m.id)
3355            .collect();
3356        assert!(
3357            fused.contains(&"near".to_owned()) && fused.contains(&"dogs".to_owned()),
3358            "dense match + lexical match both surface; got {fused:?}"
3359        );
3360    }
3361
3362    #[test]
3363    fn create_upsert_search_get_end_to_end() {
3364        let tmp = tempfile::tempdir().unwrap();
3365        let mut db = open(tmp.path());
3366        db.create_collection("items", desc()).unwrap();
3367        db.upsert(
3368            "items",
3369            "a",
3370            &[0.0, 0.0, 0.0, 0.0],
3371            &json!({"color": "red"}),
3372        )
3373        .unwrap();
3374        db.upsert(
3375            "items",
3376            "b",
3377            &[1.0, 0.0, 0.0, 0.0],
3378            &json!({"color": "blue"}),
3379        )
3380        .unwrap();
3381        db.upsert(
3382            "items",
3383            "c",
3384            &[5.0, 5.0, 5.0, 5.0],
3385            &json!({"color": "red"}),
3386        )
3387        .unwrap();
3388
3389        let near = db
3390            .search("items", &[0.1, 0.0, 0.0, 0.0], &SearchParams::default())
3391            .unwrap();
3392        assert_eq!(near[0].id, "a");
3393        assert_eq!(near[1].id, "b");
3394
3395        let got = db.get("items", "c").unwrap().unwrap();
3396        assert_eq!(got.vector, Some(vec![5.0, 5.0, 5.0, 5.0]));
3397        assert_eq!(got.payload, Some(json!({"color": "red"})));
3398    }
3399
3400    #[test]
3401    fn upsert_batch_produces_same_search_results_as_sequential() {
3402        let tmp_seq = tempfile::tempdir().unwrap();
3403        let tmp_bat = tempfile::tempdir().unwrap();
3404
3405        let vectors: Vec<[f32; 4]> = (0..20u32).map(|i| [i as f32, 0.0, 0.0, 0.0]).collect();
3406        let ids: Vec<String> = (0..20u32).map(|i| format!("p{i}")).collect();
3407        let payload = json!({});
3408
3409        // Sequential upserts
3410        {
3411            let mut db = open(tmp_seq.path());
3412            db.create_collection("c", desc()).unwrap();
3413            for (id, vec) in ids.iter().zip(vectors.iter()) {
3414                db.upsert("c", id, vec, &payload).unwrap();
3415            }
3416        }
3417        // Batch upsert
3418        {
3419            let mut db = open(tmp_bat.path());
3420            db.create_collection("c", desc()).unwrap();
3421            let pts: Vec<(&str, &[f32], &serde_json::Value)> = ids
3422                .iter()
3423                .zip(vectors.iter())
3424                .map(|(id, v)| (id.as_str(), v.as_slice(), &payload))
3425                .collect();
3426            let n = db.upsert_batch("c", &pts).unwrap();
3427            assert_eq!(n, 20);
3428        }
3429
3430        let query = [10.0f32, 0.0, 0.0, 0.0];
3431        let params = SearchParams {
3432            k: 5,
3433            ..Default::default()
3434        };
3435
3436        let mut seq_db = open(tmp_seq.path());
3437        let mut bat_db = open(tmp_bat.path());
3438        let seq: Vec<String> = seq_db
3439            .search("c", &query, &params)
3440            .unwrap()
3441            .into_iter()
3442            .map(|m| m.id)
3443            .collect();
3444        let bat: Vec<String> = bat_db
3445            .search("c", &query, &params)
3446            .unwrap()
3447            .into_iter()
3448            .map(|m| m.id)
3449            .collect();
3450        assert_eq!(
3451            seq, bat,
3452            "batch and sequential produce different search results"
3453        );
3454    }
3455
3456    #[test]
3457    fn upsert_bulk_defers_the_index_then_searches_correctly() {
3458        let tmp = tempfile::tempdir().unwrap();
3459        let mut db = open(tmp.path());
3460        db.create_collection("c", desc()).unwrap();
3461        let vectors: Vec<[f32; 4]> = (0..20u32).map(|i| [i as f32, 0.0, 0.0, 0.0]).collect();
3462        let ids: Vec<String> = (0..20u32).map(|i| format!("p{i}")).collect();
3463        // Give one point a sparse vector so the deferred rebuild also rebuilds the
3464        // inverted index.
3465        let plain = json!({});
3466        let sparse_payload = json!({ "__quiver_sparse__": { "indices": [7], "values": [1.0] } });
3467        let pts: Vec<(&str, &[f32], &serde_json::Value)> = ids
3468            .iter()
3469            .zip(vectors.iter())
3470            .map(|(id, v)| {
3471                let payload = if id == "p3" { &sparse_payload } else { &plain };
3472                (id.as_str(), v.as_slice(), payload)
3473            })
3474            .collect();
3475        let n = db.upsert_bulk("c", &pts).unwrap();
3476        assert_eq!(n, 20);
3477
3478        // The bulk path defers index maintenance: the handle is stale until a read.
3479        assert!(db.collections.get("c").unwrap().stale);
3480
3481        // Dense search triggers the single rebuild and returns the nearest points.
3482        let query = [10.0f32, 0.0, 0.0, 0.0];
3483        let params = SearchParams {
3484            k: 5,
3485            ..Default::default()
3486        };
3487        let hits: Vec<String> = db
3488            .search("c", &query, &params)
3489            .unwrap()
3490            .into_iter()
3491            .map(|m| m.id)
3492            .collect();
3493        assert_eq!(hits[0], "p10", "nearest to 10 is p10; got {hits:?}");
3494        assert!(!db.collections.get("c").unwrap().stale, "rebuilt on read");
3495
3496        // The sparse vector loaded via the bulk path is searchable after the rebuild.
3497        let q = SparseVector {
3498            indices: vec![7],
3499            values: vec![1.0],
3500        };
3501        let sparse_hits: Vec<String> = db
3502            .hybrid_search("c", None, Some(&q), None, &params, DEFAULT_RRF_K0)
3503            .unwrap()
3504            .into_iter()
3505            .map(|m| m.id)
3506            .collect();
3507        assert_eq!(sparse_hits, vec!["p3".to_owned()]);
3508    }
3509
3510    #[test]
3511    fn filtered_search_only_returns_matching_payloads() {
3512        let tmp = tempfile::tempdir().unwrap();
3513        let mut db = open(tmp.path());
3514        db.create_collection("items", desc()).unwrap();
3515        for i in 0..20u32 {
3516            let color = if i % 2 == 0 { "red" } else { "blue" };
3517            db.upsert(
3518                "items",
3519                &format!("p{i}"),
3520                &[i as f32, 0.0, 0.0, 0.0],
3521                &json!({"color": color, "n": i}),
3522            )
3523            .unwrap();
3524        }
3525        let params = SearchParams {
3526            k: 5,
3527            filter: Some(Filter::Eq {
3528                field: "color".into(),
3529                value: json!("red"),
3530            }),
3531            ef_search: 64,
3532            with_payload: true,
3533            with_vector: false,
3534        };
3535        let results = db.search("items", &[0.0; 4], &params).unwrap();
3536        assert!(!results.is_empty());
3537        for m in &results {
3538            assert_eq!(m.payload.as_ref().unwrap()["color"], json!("red"));
3539        }
3540    }
3541
3542    #[test]
3543    fn persists_and_rebuilds_index_on_reopen() {
3544        let tmp = tempfile::tempdir().unwrap();
3545        {
3546            let mut db = open(tmp.path());
3547            db.create_collection("items", desc()).unwrap();
3548            for i in 0..50u32 {
3549                db.upsert(
3550                    "items",
3551                    &format!("p{i}"),
3552                    &[i as f32, 1.0, 2.0, 3.0],
3553                    &json!({}),
3554                )
3555                .unwrap();
3556            }
3557            db.checkpoint().unwrap();
3558        }
3559        let mut db = open(tmp.path());
3560        assert_eq!(db.len("items").unwrap(), 50);
3561        let res = db
3562            .search("items", &[7.0, 1.0, 2.0, 3.0], &SearchParams::default())
3563            .unwrap();
3564        assert_eq!(res[0].id, "p7");
3565    }
3566
3567    #[test]
3568    fn update_reflects_new_vector_after_rebuild() {
3569        let tmp = tempfile::tempdir().unwrap();
3570        let mut db = open(tmp.path());
3571        db.create_collection("items", desc()).unwrap();
3572        db.upsert("items", "a", &[0.0; 4], &json!({})).unwrap();
3573        db.upsert("items", "b", &[10.0, 0.0, 0.0, 0.0], &json!({}))
3574            .unwrap();
3575        // Move "a" far away; querying near the origin should now prefer "b".
3576        db.upsert("items", "a", &[100.0, 0.0, 0.0, 0.0], &json!({}))
3577            .unwrap();
3578        let res = db
3579            .search("items", &[0.0; 4], &SearchParams::default())
3580            .unwrap();
3581        assert_eq!(res[0].id, "b");
3582        assert_eq!(
3583            db.get("items", "a").unwrap().unwrap().vector,
3584            Some(vec![100.0, 0.0, 0.0, 0.0])
3585        );
3586    }
3587
3588    #[test]
3589    fn delete_removes_from_search() {
3590        let tmp = tempfile::tempdir().unwrap();
3591        let mut db = open(tmp.path());
3592        db.create_collection("items", desc()).unwrap();
3593        db.upsert("items", "a", &[0.0; 4], &json!({})).unwrap();
3594        db.upsert("items", "b", &[1.0, 0.0, 0.0, 0.0], &json!({}))
3595            .unwrap();
3596        assert!(db.delete("items", "a").unwrap());
3597        let res = db
3598            .search("items", &[0.0; 4], &SearchParams::default())
3599            .unwrap();
3600        assert!(res.iter().all(|m| m.id != "a"));
3601        assert!(db.get("items", "a").unwrap().is_none());
3602    }
3603
3604    #[test]
3605    fn unknown_collection_errors() {
3606        let tmp = tempfile::tempdir().unwrap();
3607        let mut db = open(tmp.path());
3608        assert!(matches!(
3609            db.search("nope", &[0.0; 4], &SearchParams::default()),
3610            Err(Error::CollectionNotFound(_))
3611        ));
3612        db.create_collection("c", desc()).unwrap();
3613        assert!(matches!(
3614            db.create_collection("c", desc()),
3615            Err(Error::Core(quiver_core::CoreError::AlreadyExists(_)))
3616        ));
3617    }
3618
3619    fn desc_with(kind: IndexKind) -> Descriptor {
3620        Descriptor::new(4, Dtype::F32, DistanceMetric::L2).with_index(IndexSpec {
3621            kind,
3622            pq_subspaces: None,
3623        })
3624    }
3625
3626    #[test]
3627    fn vamana_and_ivf_collections_find_the_nearest_point() {
3628        for kind in [IndexKind::Vamana, IndexKind::Ivf] {
3629            let tmp = tempfile::tempdir().unwrap();
3630            let mut db = open(tmp.path());
3631            db.create_collection("c", desc_with(kind)).unwrap();
3632            for i in 0..40u32 {
3633                db.upsert(
3634                    "c",
3635                    &format!("p{i}"),
3636                    &[i as f32, 0.0, 0.0, 0.0],
3637                    &json!({}),
3638                )
3639                .unwrap();
3640            }
3641            // ef_search maps onto the index's search width (l_search / nprobe).
3642            let res = db
3643                .search("c", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
3644                .unwrap();
3645            assert_eq!(res[0].id, "p7", "{kind:?} nearest");
3646        }
3647    }
3648
3649    #[test]
3650    fn index_kind_persists_and_rebuilds_on_reopen() {
3651        let tmp = tempfile::tempdir().unwrap();
3652        {
3653            let mut db = open(tmp.path());
3654            db.create_collection("v", desc_with(IndexKind::Vamana))
3655                .unwrap();
3656            for i in 0..20u32 {
3657                db.upsert(
3658                    "v",
3659                    &format!("p{i}"),
3660                    &[i as f32, 1.0, 2.0, 3.0],
3661                    &json!({}),
3662                )
3663                .unwrap();
3664            }
3665            db.checkpoint().unwrap();
3666        }
3667        let mut db = open(tmp.path());
3668        assert_eq!(db.descriptor("v").unwrap().index.kind, IndexKind::Vamana);
3669        let res = db
3670            .search("v", &[7.0, 1.0, 2.0, 3.0], &SearchParams::default())
3671            .unwrap();
3672        assert_eq!(res[0].id, "p7");
3673    }
3674
3675    // ADR-0063: a DiskVamana collection reopens by loading its frugal mmap base +
3676    // replaying the WAL tail, not by an O(N) full-RAM rebuild. Proven structurally:
3677    // the on-disk base keeps its checkpoint row count after reopen (a rebuild would
3678    // rewrite it to include the post-checkpoint inserts).
3679    #[test]
3680    fn disk_index_loads_from_snapshot_without_rebuild_on_reopen() {
3681        let tmp = tempfile::tempdir().unwrap();
3682        let cid;
3683        {
3684            let mut db = open(tmp.path());
3685            db.create_collection("d", desc_with(IndexKind::DiskVamana))
3686                .unwrap();
3687            for i in 0..100u32 {
3688                db.upsert(
3689                    "d",
3690                    &format!("p{i}"),
3691                    &[i as f32, 0.0, 0.0, 0.0],
3692                    &json!({}),
3693                )
3694                .unwrap();
3695            }
3696            // Force the deferred build so the base file + checkpoint blob exist.
3697            db.search("d", &[1.0, 0.0, 0.0, 0.0], &SearchParams::default())
3698                .unwrap();
3699            db.checkpoint().unwrap();
3700            // 15 more after the checkpoint (< the 20% consolidation threshold): they
3701            // land in the in-memory delta + the WAL tail, not the base file.
3702            for i in 100..115u32 {
3703                db.upsert(
3704                    "d",
3705                    &format!("p{i}"),
3706                    &[i as f32, 0.0, 0.0, 0.0],
3707                    &json!({}),
3708                )
3709                .unwrap();
3710            }
3711            cid = db.collections["d"].id;
3712            let base = open_disk_index(
3713                &db.store,
3714                cid,
3715                db.store.collection_codec_clone(cid).unwrap(),
3716            )
3717            .unwrap();
3718            assert_eq!(base.len(), 100, "base sealed at the checkpoint count");
3719        }
3720
3721        let mut db = open(tmp.path());
3722        // A base point and a post-checkpoint (WAL-tail-replayed) point are both found.
3723        assert_eq!(
3724            db.search("d", &[50.0, 0.0, 0.0, 0.0], &SearchParams::default())
3725                .unwrap()[0]
3726                .id,
3727            "p50",
3728        );
3729        assert_eq!(
3730            db.search("d", &[110.0, 0.0, 0.0, 0.0], &SearchParams::default())
3731                .unwrap()[0]
3732                .id,
3733            "p110",
3734            "post-checkpoint insert survived reopen via WAL-tail replay",
3735        );
3736        // Loaded, not rebuilt: the base file still holds exactly the 100 checkpoint
3737        // rows. A rebuild on open would have rewritten it over all 115 live rows.
3738        let base = open_disk_index(
3739            &db.store,
3740            cid,
3741            db.store.collection_codec_clone(cid).unwrap(),
3742        )
3743        .unwrap();
3744        assert_eq!(
3745            base.len(),
3746            100,
3747            "reopen loaded the base; it was not rebuilt"
3748        );
3749    }
3750
3751    // ADR-0063: the durable artifact is never load-bearing for correctness — a
3752    // missing/torn base falls back to an authoritative rebuild that still answers.
3753    #[test]
3754    fn disk_index_falls_back_to_rebuild_when_base_is_missing() {
3755        let tmp = tempfile::tempdir().unwrap();
3756        let base_path;
3757        {
3758            let mut db = open(tmp.path());
3759            db.create_collection("d", desc_with(IndexKind::DiskVamana))
3760                .unwrap();
3761            for i in 0..60u32 {
3762                db.upsert(
3763                    "d",
3764                    &format!("p{i}"),
3765                    &[i as f32, 0.0, 0.0, 0.0],
3766                    &json!({}),
3767                )
3768                .unwrap();
3769            }
3770            db.search("d", &[1.0, 0.0, 0.0, 0.0], &SearchParams::default())
3771                .unwrap();
3772            db.checkpoint().unwrap();
3773            let cid = db.collections["d"].id;
3774            base_path = db.store.index_dir(cid).join(DISK_INDEX_FILE);
3775        }
3776        // Simulate a lost base: remove the file the snapshot references.
3777        std::fs::remove_file(&base_path).unwrap();
3778        {
3779            let mut db = open(tmp.path());
3780            assert_eq!(
3781                db.search("d", &[25.0, 0.0, 0.0, 0.0], &SearchParams::default())
3782                    .unwrap()[0]
3783                    .id,
3784                "p25",
3785                "rebuild fallback still answers correctly after a lost base",
3786            );
3787            // The fallback rebuild re-seals the base, and a subsequent checkpoint
3788            // refreshes the snapshot blob to match it.
3789            assert!(
3790                base_path.exists(),
3791                "the fallback rebuild re-sealed the base file"
3792            );
3793            db.checkpoint().unwrap();
3794        }
3795        // Simulate a torn base: truncate the file mid-way. `DiskVamana::open`'s
3796        // per-page checks reject it, so the load falls back to a rebuild.
3797        let len = std::fs::metadata(&base_path).unwrap().len();
3798        std::fs::OpenOptions::new()
3799            .write(true)
3800            .open(&base_path)
3801            .unwrap()
3802            .set_len(len / 2)
3803            .unwrap();
3804
3805        let mut db = open(tmp.path());
3806        assert_eq!(
3807            db.search("d", &[25.0, 0.0, 0.0, 0.0], &SearchParams::default())
3808                .unwrap()[0]
3809                .id,
3810            "p25",
3811            "rebuild fallback still answers correctly after a torn base",
3812        );
3813    }
3814
3815    #[test]
3816    fn ivf_upserts_and_deletes_incrementally_without_rebuild() {
3817        let tmp = tempfile::tempdir().unwrap();
3818        let mut db = open(tmp.path());
3819        db.create_collection("c", desc_with(IndexKind::Ivf))
3820            .unwrap();
3821        for i in 0..50u32 {
3822            db.upsert(
3823                "c",
3824                &format!("p{i}"),
3825                &[i as f32, 0.0, 0.0, 0.0],
3826                &json!({}),
3827            )
3828            .unwrap();
3829        }
3830        // The first search builds (and trains) the IVF from the store.
3831        let _ = db
3832            .search("c", &[1.0, 0.0, 0.0, 0.0], &SearchParams::default())
3833            .unwrap();
3834        assert!(!db.collections["c"].stale, "the search built the index");
3835
3836        // Incremental insert: a new outlier is found, with no rebuild scheduled.
3837        db.upsert("c", "far", &[500.0, 0.0, 0.0, 0.0], &json!({}))
3838            .unwrap();
3839        assert!(!db.collections["c"].stale, "ivf insert stayed incremental");
3840        let res = db
3841            .search("c", &[500.0, 0.0, 0.0, 0.0], &SearchParams::default())
3842            .unwrap();
3843        assert_eq!(res[0].id, "far");
3844
3845        // Incremental delete: the point disappears, still with no rebuild.
3846        assert!(db.delete("c", "far").unwrap());
3847        assert!(!db.collections["c"].stale, "ivf delete stayed incremental");
3848        let res = db
3849            .search("c", &[500.0, 0.0, 0.0, 0.0], &SearchParams::default())
3850            .unwrap();
3851        assert!(res.iter().all(|m| m.id != "far"), "deleted point is gone");
3852    }
3853
3854    #[test]
3855    fn ivf_incremental_update_replaces_the_vector() {
3856        let tmp = tempfile::tempdir().unwrap();
3857        let mut db = open(tmp.path());
3858        db.create_collection("c", desc_with(IndexKind::Ivf))
3859            .unwrap();
3860        for i in 0..30u32 {
3861            db.upsert(
3862                "c",
3863                &format!("p{i}"),
3864                &[i as f32, 0.0, 0.0, 0.0],
3865                &json!({}),
3866            )
3867            .unwrap();
3868        }
3869        let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
3870        // Move p5 far away in place.
3871        db.upsert("c", "p5", &[900.0, 0.0, 0.0, 0.0], &json!({}))
3872            .unwrap();
3873        assert!(!db.collections["c"].stale);
3874        let at_new = db
3875            .search("c", &[900.0, 0.0, 0.0, 0.0], &SearchParams::default())
3876            .unwrap();
3877        assert_eq!(at_new[0].id, "p5", "p5 found at its new location");
3878        let at_old = db
3879            .search("c", &[5.0, 0.0, 0.0, 0.0], &SearchParams::default())
3880            .unwrap();
3881        assert!(at_old.iter().all(|m| m.id != "p5"), "stale vector is gone");
3882    }
3883
3884    #[test]
3885    fn ivf_reinsert_after_incremental_delete_is_found() {
3886        let tmp = tempfile::tempdir().unwrap();
3887        let mut db = open(tmp.path());
3888        db.create_collection("c", desc_with(IndexKind::Ivf))
3889            .unwrap();
3890        for i in 0..20u32 {
3891            db.upsert(
3892                "c",
3893                &format!("p{i}"),
3894                &[i as f32, 0.0, 0.0, 0.0],
3895                &json!({}),
3896            )
3897            .unwrap();
3898        }
3899        let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
3900        assert!(db.delete("c", "p3").unwrap());
3901        assert!(!db.collections["c"].stale);
3902        // Re-insert the same id; it must be searchable again (the slot is reused).
3903        db.upsert("c", "p3", &[3.0, 0.0, 0.0, 0.0], &json!({}))
3904            .unwrap();
3905        assert!(!db.collections["c"].stale);
3906        let res = db
3907            .search("c", &[3.0, 0.0, 0.0, 0.0], &SearchParams::default())
3908            .unwrap();
3909        assert_eq!(res[0].id, "p3");
3910    }
3911
3912    #[test]
3913    fn hnsw_in_place_update_falls_back_to_rebuild() {
3914        // HNSW cannot update an id in place, so an update marks the index stale.
3915        let tmp = tempfile::tempdir().unwrap();
3916        let mut db = open(tmp.path());
3917        db.create_collection("c", desc()).unwrap();
3918        for i in 0..10u32 {
3919            db.upsert(
3920                "c",
3921                &format!("p{i}"),
3922                &[i as f32, 0.0, 0.0, 0.0],
3923                &json!({}),
3924            )
3925            .unwrap();
3926        }
3927        let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
3928        assert!(!db.collections["c"].stale);
3929        db.upsert("c", "p2", &[42.0, 0.0, 0.0, 0.0], &json!({}))
3930            .unwrap();
3931        assert!(db.collections["c"].stale, "hnsw update schedules a rebuild");
3932        // The rebuild on the next search reflects the new vector.
3933        let res = db
3934            .search("c", &[42.0, 0.0, 0.0, 0.0], &SearchParams::default())
3935            .unwrap();
3936        assert_eq!(res[0].id, "p2");
3937    }
3938
3939    #[test]
3940    fn unsupported_index_configurations_are_rejected() {
3941        let tmp = tempfile::tempdir().unwrap();
3942        let mut db = open(tmp.path());
3943        // Vamana/IVF do not support inner product.
3944        let dot_vamana =
3945            Descriptor::new(4, Dtype::F32, DistanceMetric::Dot).with_index(IndexSpec {
3946                kind: IndexKind::Vamana,
3947                pq_subspaces: None,
3948            });
3949        assert!(matches!(
3950            db.create_collection("a", dot_vamana),
3951            Err(Error::Unsupported(_))
3952        ));
3953        // ...nor does the disk-resident index.
3954        let dot_disk = Descriptor::new(4, Dtype::F32, DistanceMetric::Dot).with_index(IndexSpec {
3955            kind: IndexKind::DiskVamana,
3956            pq_subspaces: None,
3957        });
3958        assert!(matches!(
3959            db.create_collection("b", dot_disk),
3960            Err(Error::Unsupported(_))
3961        ));
3962    }
3963
3964    #[test]
3965    fn dcpe_collections_require_the_l2_metric() {
3966        let tmp = tempfile::tempdir().unwrap();
3967        let mut db = open(tmp.path());
3968        // DCPE preserves Euclidean distance, so cosine/dot are rejected.
3969        for metric in [DistanceMetric::Cosine, DistanceMetric::Dot] {
3970            let bad = Descriptor::new(4, Dtype::F32, metric)
3971                .with_vector_encryption(VectorEncryption::Dcpe);
3972            assert!(matches!(
3973                db.create_collection("bad", bad),
3974                Err(Error::Unsupported(_))
3975            ));
3976        }
3977        // L2 is accepted, and the flag persists on the descriptor.
3978        let good = Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
3979            .with_vector_encryption(VectorEncryption::Dcpe);
3980        db.create_collection("enc", good)
3981            .expect("l2 dcpe collection");
3982        assert_eq!(
3983            db.descriptor("enc").expect("descriptor").vector_encryption,
3984            VectorEncryption::Dcpe
3985        );
3986    }
3987
3988    #[test]
3989    fn client_side_collections_are_fetch_only_and_reject_search() {
3990        let tmp = tempfile::tempdir().unwrap();
3991        let mut db = open(tmp.path());
3992        // Any metric is allowed (the server never ranks), so there is no L2
3993        // restriction as there is for DCPE.
3994        let desc = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
3995            .with_vector_encryption(VectorEncryption::ClientSide);
3996        db.create_collection("vault", desc)
3997            .expect("create client-side collection");
3998        // No server-side index is built for a client-side collection (ADR-0032).
3999        assert!(matches!(
4000            db.collections["vault"].index,
4001            CollectionIndex::None
4002        ));
4003
4004        // Upsert opaque placeholder points: a zero vector plus a payload carrying a
4005        // stand-in sealed vector blob and a cleartext, server-filterable field.
4006        for i in 0..5 {
4007            let tier = if i < 2 { "vip" } else { "std" };
4008            db.upsert(
4009                "vault",
4010                &format!("p{i}"),
4011                &[0.0; 4],
4012                &serde_json::json!({ "__quiver_vec__": format!("ct-{i}"), "tier": tier }),
4013            )
4014            .expect("upsert");
4015        }
4016        assert_eq!(db.len("vault").unwrap(), 5);
4017        // Still no index after writes — it never goes stale or rebuilds.
4018        assert!(matches!(
4019            db.collections["vault"].index,
4020            CollectionIndex::None
4021        ));
4022
4023        // Ranked search is rejected: the server cannot rank opaque vectors.
4024        assert!(matches!(
4025            db.search("vault", &[0.0; 4], &SearchParams::default()),
4026            Err(Error::Unsupported(_))
4027        ));
4028
4029        // Fetch returns the entitled set; each payload carries the blob the client
4030        // would decrypt and rank locally, and vectors are omitted when not asked for.
4031        let all = db.fetch("vault", None, 0, 100, true, false).unwrap();
4032        assert_eq!(all.len(), 5);
4033        assert!(
4034            all.iter()
4035                .all(|m| m.payload.is_some() && m.vector.is_none())
4036        );
4037
4038        // A cleartext payload filter narrows the set server-side.
4039        let vip = db
4040            .fetch(
4041                "vault",
4042                Some(&Filter::Eq {
4043                    field: "tier".to_owned(),
4044                    value: serde_json::json!("vip"),
4045                }),
4046                0,
4047                100,
4048                false,
4049                false,
4050            )
4051            .unwrap();
4052        assert_eq!(vip.len(), 2);
4053        // A limit bounds the returned set.
4054        assert_eq!(
4055            db.fetch("vault", None, 0, 2, false, false).unwrap().len(),
4056            2
4057        );
4058        // An offset scrolls past earlier matches (paginated migration copy).
4059        assert_eq!(
4060            db.fetch("vault", None, 3, 100, false, false).unwrap().len(),
4061            2
4062        );
4063
4064        // get returns the stored placeholder + blob payload by id; delete works
4065        // through the store with no index to update.
4066        assert_eq!(db.get("vault", "p0").unwrap().unwrap().id, "p0");
4067        assert!(db.delete("vault", "p0").unwrap());
4068        assert_eq!(db.len("vault").unwrap(), 4);
4069    }
4070
4071    #[test]
4072    fn client_side_encryption_rejects_multivector() {
4073        let tmp = tempfile::tempdir().unwrap();
4074        let mut db = open(tmp.path());
4075        let desc = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
4076            .with_multivector(true)
4077            .with_vector_encryption(VectorEncryption::ClientSide);
4078        assert!(matches!(
4079            db.create_collection("bad", desc),
4080            Err(Error::Unsupported(_))
4081        ));
4082    }
4083
4084    // Recursively check whether a file named `name` exists under `dir`.
4085    fn contains_file(dir: &Path, name: &str) -> bool {
4086        std::fs::read_dir(dir).is_ok_and(|rd| {
4087            rd.flatten().any(|e| {
4088                let p = e.path();
4089                if p.is_dir() {
4090                    contains_file(&p, name)
4091                } else {
4092                    p.file_name().is_some_and(|f| f == name)
4093                }
4094            })
4095        })
4096    }
4097
4098    #[test]
4099    fn disk_index_collection_searches_persists_and_writes_an_artifact() {
4100        let tmp = tempfile::tempdir().unwrap();
4101        {
4102            let mut db = open(tmp.path());
4103            db.create_collection("d", desc_with(IndexKind::DiskVamana))
4104                .unwrap();
4105            for i in 0..40u32 {
4106                db.upsert(
4107                    "d",
4108                    &format!("p{i}"),
4109                    &[i as f32, 0.0, 0.0, 0.0],
4110                    &json!({}),
4111                )
4112                .unwrap();
4113            }
4114            let res = db
4115                .search("d", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
4116                .unwrap();
4117            assert_eq!(res[0].id, "p7");
4118            db.checkpoint().unwrap();
4119        }
4120        // The encrypted disk artifact was written under the collection's index dir.
4121        assert!(
4122            contains_file(tmp.path(), "vamana.qvx"),
4123            "disk index file missing"
4124        );
4125        // Reopening rebuilds and re-opens the disk index; search still works.
4126        let mut db = open(tmp.path());
4127        assert_eq!(
4128            db.descriptor("d").unwrap().index.kind,
4129            IndexKind::DiskVamana
4130        );
4131        let res = db
4132            .search("d", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
4133            .unwrap();
4134        assert_eq!(res[0].id, "p7");
4135    }
4136
4137    #[test]
4138    fn graph_collections_maintain_writes_incrementally() {
4139        // FreshDiskANN incremental maintenance (ADR-0033): after the base graph is
4140        // built, inserts land in the delta, deletes tombstone, and updates move —
4141        // all without a rebuild or a reopen, for both the in-memory and disk graphs.
4142        for kind in [IndexKind::Vamana, IndexKind::DiskVamana] {
4143            let tmp = tempfile::tempdir().unwrap();
4144            let mut db = open(tmp.path());
4145            db.create_collection("c", desc_with(kind)).unwrap();
4146            for i in 0..40u32 {
4147                db.upsert(
4148                    "c",
4149                    &format!("p{i}"),
4150                    &[i as f32, 0.0, 0.0, 0.0],
4151                    &json!({}),
4152                )
4153                .unwrap();
4154            }
4155            // The first search builds the read-only base graph.
4156            let res = db
4157                .search("c", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
4158                .unwrap();
4159            assert_eq!(res[0].id, "p7", "{kind:?} base nearest");
4160
4161            // A brand-new point lands in the in-memory delta and is immediately
4162            // findable — no rebuild, no reopen.
4163            db.upsert("c", "p7b", &[7.4, 0.0, 0.0, 0.0], &json!({}))
4164                .unwrap();
4165            let res = db
4166                .search("c", &[7.45, 0.0, 0.0, 0.0], &SearchParams::default())
4167                .unwrap();
4168            assert_eq!(res[0].id, "p7b", "{kind:?} delta insert not found");
4169
4170            // A delete tombstones the point: it is never returned again.
4171            assert!(db.delete("c", "p7").unwrap());
4172            let res = db
4173                .search("c", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
4174                .unwrap();
4175            assert!(
4176                res.iter().all(|m| m.id != "p7"),
4177                "{kind:?} deleted id returned"
4178            );
4179
4180            // An update moves a vector: it is found at the new position and no
4181            // longer dominates the old one.
4182            db.upsert("c", "p20", &[500.0, 0.0, 0.0, 0.0], &json!({}))
4183                .unwrap();
4184            let res = db
4185                .search("c", &[500.0, 0.0, 0.0, 0.0], &SearchParams::default())
4186                .unwrap();
4187            assert_eq!(res[0].id, "p20", "{kind:?} updated vector not at new spot");
4188            let res = db
4189                .search("c", &[20.0, 0.0, 0.0, 0.0], &SearchParams::default())
4190                .unwrap();
4191            assert_ne!(
4192                res[0].id, "p20",
4193                "{kind:?} stale copy still nearest old spot"
4194            );
4195        }
4196    }
4197
4198    #[test]
4199    fn graph_consolidates_under_heavy_churn() {
4200        // Churn past the 20% pending threshold forces a consolidation (a derived
4201        // rebuild from the store); results stay correct throughout and across a
4202        // reopen, since the store is the source of truth (ADR-0033).
4203        let tmp = tempfile::tempdir().unwrap();
4204        let mut db = open(tmp.path());
4205        db.create_collection("c", desc_with(IndexKind::Vamana))
4206            .unwrap();
4207        for i in 0..50u32 {
4208            db.upsert(
4209                "c",
4210                &format!("p{i}"),
4211                &[i as f32, 0.0, 0.0, 0.0],
4212                &json!({}),
4213            )
4214            .unwrap();
4215        }
4216        let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
4217
4218        let deleted: Vec<String> = (0..15u32).map(|i| format!("p{i}")).collect();
4219        for i in 0..15u32 {
4220            assert!(db.delete("c", &format!("p{i}")).unwrap());
4221            db.upsert(
4222                "c",
4223                &format!("q{i}"),
4224                &[1000.0 + i as f32, 0.0, 0.0, 0.0],
4225                &json!({}),
4226            )
4227            .unwrap();
4228        }
4229
4230        let near_origin = db
4231            .search("c", &[5.0, 0.0, 0.0, 0.0], &SearchParams::default())
4232            .unwrap();
4233        assert!(
4234            near_origin.iter().all(|m| !deleted.contains(&m.id)),
4235            "a churned-out id was returned"
4236        );
4237        let near_q = db
4238            .search("c", &[1007.0, 0.0, 0.0, 0.0], &SearchParams::default())
4239            .unwrap();
4240        assert_eq!(near_q[0].id, "q7", "new point not found after churn");
4241
4242        db.checkpoint().unwrap();
4243        drop(db);
4244        let mut db = open(tmp.path());
4245        let near_q = db
4246            .search("c", &[1007.0, 0.0, 0.0, 0.0], &SearchParams::default())
4247            .unwrap();
4248        assert_eq!(near_q[0].id, "q7", "new point lost across reopen");
4249        let near_origin = db
4250            .search("c", &[5.0, 0.0, 0.0, 0.0], &SearchParams::default())
4251            .unwrap();
4252        assert!(
4253            near_origin.iter().all(|m| !deleted.contains(&m.id)),
4254            "a churned-out id resurfaced after reopen"
4255        );
4256    }
4257
4258    #[test]
4259    fn multivector_writes_are_incremental_and_match_a_rebuild() {
4260        // Token rows fold into the ANN index incrementally instead of stale->rebuild
4261        // (ADR-0034); the document API stays correct under interleaved
4262        // upsert/delete/re-upsert with no reopen, and a reopen (the authoritative
4263        // rebuild) yields the identical ranking. Docs lie on an arc of increasing
4264        // angle to the query (cosine, so direction alone ranks them), making the
4265        // expected order unambiguous. (Token-pool candidate generation only engages
4266        // above the exact-scan threshold; the incremental token maintenance it
4267        // relies on is the same code the single-vector index tests exercise.)
4268        let dir = |theta: f32| vec![theta.cos(), theta.sin(), 0.0, 0.0];
4269        let doc = |theta: f32| vec![dir(theta), dir(theta)];
4270        for kind in [
4271            IndexKind::Ivf,
4272            IndexKind::Hnsw,
4273            IndexKind::Vamana,
4274            IndexKind::Colbert,
4275        ] {
4276            let tmp = tempfile::tempdir().unwrap();
4277            let desc = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
4278                .with_multivector(true)
4279                .with_index(IndexSpec {
4280                    kind,
4281                    pq_subspaces: None,
4282                });
4283            let mut db = open(tmp.path());
4284            db.create_collection("m", desc).unwrap();
4285            // d1 is most aligned with the query (smallest angle), d10 least.
4286            for i in 1..=10u32 {
4287                db.upsert_document(
4288                    "m",
4289                    &format!("d{i}"),
4290                    &doc(0.1 * i as f32),
4291                    &json!({ "i": i }),
4292                )
4293                .unwrap();
4294            }
4295            let q = vec![dir(0.0)];
4296            let top = |db: &mut Database| {
4297                db.search_multi_vector(
4298                    "m",
4299                    &q,
4300                    &SearchParams {
4301                        k: 3,
4302                        ..Default::default()
4303                    },
4304                )
4305                .unwrap()
4306                .into_iter()
4307                .map(|m| m.id)
4308                .collect::<Vec<_>>()
4309            };
4310            assert_eq!(top(&mut db), vec!["d1", "d2", "d3"], "{kind:?} initial");
4311
4312            // Delete the top document — gone without a reopen.
4313            assert!(db.delete_document("m", "d1").unwrap());
4314            assert_eq!(
4315                top(&mut db),
4316                vec!["d2", "d3", "d4"],
4317                "{kind:?} after delete"
4318            );
4319
4320            // Re-upsert the least-aligned doc onto the query — the update is live.
4321            db.upsert_document("m", "d10", &doc(0.0), &json!({ "i": 10 }))
4322                .unwrap();
4323            assert_eq!(top(&mut db)[0], "d10", "{kind:?} after update");
4324
4325            // A new, near-aligned document is immediately findable, second only to d10.
4326            db.upsert_document("m", "d11", &doc(0.05), &json!({ "i": 11 }))
4327                .unwrap();
4328            let r = top(&mut db);
4329            assert_eq!(r[0], "d10", "{kind:?}");
4330            assert_eq!(r[1], "d11", "{kind:?} new doc not ranked");
4331
4332            // A shorter re-upsert drops the trailing token row.
4333            db.upsert_document("m", "d8", &[dir(0.8)], &json!({ "i": 8 }))
4334                .unwrap();
4335            let d8 = db.get_document("m", "d8", true).unwrap().unwrap();
4336            assert_eq!(d8.vectors.unwrap().len(), 1, "{kind:?} trailing token kept");
4337
4338            // The incremental in-memory state matches an authoritative rebuild.
4339            let before = top(&mut db);
4340            drop(db);
4341            let mut db = open(tmp.path());
4342            assert_eq!(top(&mut db), before, "{kind:?} incremental != rebuild");
4343            assert!(
4344                db.get_document("m", "d1", false).unwrap().is_none(),
4345                "{kind:?} deleted doc resurfaced"
4346            );
4347        }
4348    }
4349
4350    #[test]
4351    fn colbert_index_requires_multivector() {
4352        let tmp = tempfile::tempdir().unwrap();
4353        let mut db = open(tmp.path());
4354        // ColBERT is a late-interaction token-pool index — invalid for a
4355        // single-vector collection (ADR-0034).
4356        let single = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine).with_index(IndexSpec {
4357            kind: IndexKind::Colbert,
4358            pq_subspaces: None,
4359        });
4360        assert!(matches!(
4361            db.create_collection("c", single),
4362            Err(Error::Unsupported(_))
4363        ));
4364        // ...but valid for a multi-vector collection.
4365        let multi = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
4366            .with_multivector(true)
4367            .with_index(IndexSpec {
4368                kind: IndexKind::Colbert,
4369                pq_subspaces: None,
4370            });
4371        assert!(db.create_collection("m", multi).is_ok());
4372    }
4373
4374    // ---- hybrid (pre-filtered) search ----
4375
4376    // A collection whose `city` (keyword) and `n` (numeric) payload fields are
4377    // declared filterable, so the planner can pre-filter on them.
4378    fn desc_filterable() -> Descriptor {
4379        Descriptor::new(4, Dtype::F32, DistanceMetric::L2).with_filterable(vec![
4380            FilterableField::keyword("city"),
4381            FilterableField::numeric("n"),
4382        ])
4383    }
4384
4385    // 30 points on the x-axis (distance to the origin grows with i), cycling
4386    // through three cities, each carrying its index as a numeric `n`.
4387    // Checkpointed so the rows live in a sealed segment's secondary index — the
4388    // pre-filter primitive — not only the active buffer.
4389    fn seed_cities(db: &mut Database) {
4390        const CITIES: [&str; 3] = ["paris", "lyon", "rome"];
4391        db.create_collection("c", desc_filterable()).unwrap();
4392        for i in 0..30u32 {
4393            db.upsert(
4394                "c",
4395                &format!("p{i}"),
4396                &[i as f32, 0.0, 0.0, 0.0],
4397                &json!({"city": CITIES[i as usize % 3], "n": i}),
4398            )
4399            .unwrap();
4400        }
4401        db.checkpoint().unwrap();
4402    }
4403
4404    #[test]
4405    fn hybrid_equality_prefilter_is_exact() {
4406        let tmp = tempfile::tempdir().unwrap();
4407        let mut db = open(tmp.path());
4408        seed_cities(&mut db);
4409        let params = SearchParams {
4410            k: 5,
4411            filter: Some(Filter::Eq {
4412                field: "city".into(),
4413                value: json!("lyon"),
4414            }),
4415            ..SearchParams::default()
4416        };
4417        let res = db.search("c", &[0.0; 4], &params).unwrap();
4418        assert!(!res.is_empty());
4419        // lyon is i % 3 == 1 → p1, p4, p7, …; nearest the origin is p1.
4420        assert_eq!(res[0].id, "p1");
4421        for m in &res {
4422            assert_eq!(m.payload.as_ref().unwrap()["city"], json!("lyon"));
4423        }
4424    }
4425
4426    #[test]
4427    fn hybrid_numeric_range_prefilter_is_exact() {
4428        let tmp = tempfile::tempdir().unwrap();
4429        let mut db = open(tmp.path());
4430        seed_cities(&mut db);
4431        let params = SearchParams {
4432            k: 4,
4433            filter: Some(Filter::Gte {
4434                field: "n".into(),
4435                value: json!(10),
4436            }),
4437            ..SearchParams::default()
4438        };
4439        let res = db.search("c", &[0.0; 4], &params).unwrap();
4440        // Among n >= 10, the nearest the origin is n == 10 (p10).
4441        assert_eq!(res[0].id, "p10");
4442        for m in &res {
4443            assert!(m.payload.as_ref().unwrap()["n"].as_u64().unwrap() >= 10);
4444        }
4445    }
4446
4447    #[test]
4448    fn hybrid_unsatisfiable_filter_returns_empty() {
4449        let tmp = tempfile::tempdir().unwrap();
4450        let mut db = open(tmp.path());
4451        seed_cities(&mut db);
4452        // No row holds this city, so the pre-filter proves the result empty
4453        // without touching the vector index.
4454        let params = SearchParams {
4455            filter: Some(Filter::Eq {
4456                field: "city".into(),
4457                value: json!("atlantis"),
4458            }),
4459            ..SearchParams::default()
4460        };
4461        assert!(db.search("c", &[0.0; 4], &params).unwrap().is_empty());
4462    }
4463
4464    #[test]
4465    fn hybrid_and_or_composition_is_exact() {
4466        let tmp = tempfile::tempdir().unwrap();
4467        let mut db = open(tmp.path());
4468        seed_cities(&mut db);
4469        // (city in {paris, rome}) AND (n < 12): a keyword `in` intersected with a
4470        // numeric range, both pre-filterable.
4471        let params = SearchParams {
4472            k: 10,
4473            filter: Some(Filter::And(vec![
4474                Filter::In {
4475                    field: "city".into(),
4476                    values: vec![json!("paris"), json!("rome")],
4477                },
4478                Filter::Lt {
4479                    field: "n".into(),
4480                    value: json!(12),
4481                },
4482            ])),
4483            ..SearchParams::default()
4484        };
4485        let res = db.search("c", &[0.0; 4], &params).unwrap();
4486        // paris is i % 3 == 0 → the nearest qualifying point is p0.
4487        assert_eq!(res[0].id, "p0");
4488        for m in &res {
4489            let payload = m.payload.as_ref().unwrap();
4490            let city = payload["city"].as_str().unwrap();
4491            assert!(city == "paris" || city == "rome");
4492            assert!(payload["n"].as_u64().unwrap() < 12);
4493        }
4494    }
4495
4496    #[test]
4497    fn hybrid_rechecks_non_indexable_clause() {
4498        let tmp = tempfile::tempdir().unwrap();
4499        let mut db = open(tmp.path());
4500        seed_cities(&mut db);
4501        // city is pre-filterable; the Not(…) clause is not, so it is re-checked
4502        // exactly on the narrowed candidates — paris rows excluding p0.
4503        let params = SearchParams {
4504            k: 10,
4505            filter: Some(Filter::And(vec![
4506                Filter::Eq {
4507                    field: "city".into(),
4508                    value: json!("paris"),
4509                },
4510                Filter::Not(Box::new(Filter::Eq {
4511                    field: "n".into(),
4512                    value: json!(0),
4513                })),
4514            ])),
4515            ..SearchParams::default()
4516        };
4517        let res = db.search("c", &[0.0; 4], &params).unwrap();
4518        assert!(res.iter().all(|m| m.id != "p0"));
4519        // The nearest paris point after p0 is p3.
4520        assert_eq!(res[0].id, "p3");
4521        for m in &res {
4522            assert_eq!(m.payload.as_ref().unwrap()["city"], json!("paris"));
4523        }
4524    }
4525
4526    #[test]
4527    fn post_filter_fallback_on_undeclared_field_is_correct() {
4528        let tmp = tempfile::tempdir().unwrap();
4529        let mut db = open(tmp.path());
4530        // Only `city` is filterable; a filter on the undeclared `tier` cannot
4531        // pre-filter and falls back to ANN post-filtering — still exact.
4532        db.create_collection(
4533            "c",
4534            Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
4535                .with_filterable(vec![FilterableField::keyword("city")]),
4536        )
4537        .unwrap();
4538        for i in 0..20u32 {
4539            let tier = if i % 2 == 0 { "gold" } else { "silver" };
4540            db.upsert(
4541                "c",
4542                &format!("p{i}"),
4543                &[i as f32, 0.0, 0.0, 0.0],
4544                &json!({"city": "paris", "tier": tier}),
4545            )
4546            .unwrap();
4547        }
4548        let params = SearchParams {
4549            k: 5,
4550            filter: Some(Filter::Eq {
4551                field: "tier".into(),
4552                value: json!("gold"),
4553            }),
4554            ..SearchParams::default()
4555        };
4556        let res = db.search("c", &[0.0; 4], &params).unwrap();
4557        assert!(!res.is_empty());
4558        for m in &res {
4559            assert_eq!(m.payload.as_ref().unwrap()["tier"], json!("gold"));
4560        }
4561    }
4562
4563    #[test]
4564    fn leaf_predicate_maps_only_indexable_filterable_leaves() {
4565        let fields = vec![
4566            FilterableField::keyword("city"),
4567            FilterableField::numeric("n"),
4568        ];
4569        // Keyword equality on a filterable field maps.
4570        assert_eq!(
4571            leaf_predicate(
4572                &Filter::Eq {
4573                    field: "city".into(),
4574                    value: json!("paris")
4575                },
4576                &fields
4577            ),
4578            Some(SecPredicate::Eq {
4579                field: "city".into(),
4580                value: SecValue::Keyword("paris".into())
4581            })
4582        );
4583        // A numeric comparison maps to a one-sided range.
4584        assert_eq!(
4585            leaf_predicate(
4586                &Filter::Gte {
4587                    field: "n".into(),
4588                    value: json!(3)
4589                },
4590                &fields
4591            ),
4592            Some(SecPredicate::Range {
4593                field: "n".into(),
4594                lo: Some(SecValue::Numeric(3.0)),
4595                hi: None,
4596                lo_inclusive: true,
4597                hi_inclusive: false,
4598            })
4599        );
4600        // Undeclared field, type mismatch, `ne`, and `exists` do not map.
4601        let undeclared = Filter::Eq {
4602            field: "tier".into(),
4603            value: json!("gold"),
4604        };
4605        let mismatch = Filter::Eq {
4606            field: "city".into(),
4607            value: json!(5),
4608        };
4609        let ne = Filter::Ne {
4610            field: "city".into(),
4611            value: json!("x"),
4612        };
4613        let exists = Filter::Exists {
4614            field: "city".into(),
4615        };
4616        assert!(leaf_predicate(&undeclared, &fields).is_none());
4617        assert!(leaf_predicate(&mismatch, &fields).is_none());
4618        assert!(leaf_predicate(&ne, &fields).is_none());
4619        assert!(leaf_predicate(&exists, &fields).is_none());
4620    }
4621
4622    // ----- durable IVF index recovery (ADR-0025) -----
4623
4624    // The first collection created in a fresh store has id 0.
4625    fn ivf_index_dir(root: &Path) -> std::path::PathBuf {
4626        root.join("collections").join("0000000000").join("index")
4627    }
4628
4629    fn idx_snapshot_files(root: &Path) -> Vec<String> {
4630        let mut v: Vec<String> = std::fs::read_dir(ivf_index_dir(root))
4631            .map(|rd| {
4632                rd.filter_map(std::result::Result::ok)
4633                    .filter_map(|e| e.file_name().to_str().map(str::to_owned))
4634                    .filter(|n| n.starts_with("idx-"))
4635                    .collect()
4636            })
4637            .unwrap_or_default();
4638        v.sort();
4639        v
4640    }
4641
4642    fn nearest(db: &mut Database, q: &[f32]) -> Vec<String> {
4643        db.search("c", q, &SearchParams::default())
4644            .unwrap()
4645            .into_iter()
4646            .map(|m| m.id)
4647            .collect()
4648    }
4649
4650    fn seed_ivf(db: &mut Database, n: u32) {
4651        db.create_collection("c", desc_with(IndexKind::Ivf))
4652            .unwrap();
4653        for i in 0..n {
4654            db.upsert(
4655                "c",
4656                &format!("p{i}"),
4657                &[i as f32, 0.0, 0.0, 0.0],
4658                &json!({}),
4659            )
4660            .unwrap();
4661        }
4662        // The first search builds (and trains) the IVF from the store.
4663        let _ = nearest(db, &[1.0, 0.0, 0.0, 0.0]);
4664    }
4665
4666    #[test]
4667    fn ivf_snapshot_is_written_at_checkpoint() {
4668        let tmp = tempfile::tempdir().unwrap();
4669        let mut db = open(tmp.path());
4670        seed_ivf(&mut db, 40);
4671        db.checkpoint().unwrap();
4672        assert_eq!(idx_snapshot_files(tmp.path()).len(), 1);
4673    }
4674
4675    #[test]
4676    fn ivf_loads_from_snapshot_rather_than_rebuilding() {
4677        let tmp = tempfile::tempdir().unwrap();
4678        {
4679            let mut db = open(tmp.path());
4680            db.create_collection("c", desc_with(IndexKind::Ivf))
4681                .unwrap();
4682            db.upsert("c", "a", &[0.0, 0.0, 0.0, 0.0], &json!({}))
4683                .unwrap();
4684            db.upsert("c", "m", &[1.0, 0.0, 0.0, 0.0], &json!({}))
4685                .unwrap();
4686            // First search builds the IVF — int_to_ext is the sorted scan order.
4687            let _ = nearest(&mut db, &[0.0, 0.0, 0.0, 0.0]);
4688            // Incremental upserts append in insertion order, diverging from sort.
4689            db.upsert("c", "z", &[2.0, 0.0, 0.0, 0.0], &json!({}))
4690                .unwrap();
4691            db.upsert("c", "b", &[3.0, 0.0, 0.0, 0.0], &json!({}))
4692                .unwrap();
4693            db.checkpoint().unwrap();
4694            assert_eq!(db.collections["c"].int_to_ext, ["a", "m", "z", "b"]);
4695        }
4696        let db = open(tmp.path());
4697        // Loaded from the snapshot: the insertion-order mapping is preserved. A
4698        // rebuild would have produced the sorted order ["a", "b", "m", "z"].
4699        assert_eq!(
4700            db.collections["c"].int_to_ext,
4701            ["a", "m", "z", "b"],
4702            "index was rebuilt, not loaded from the snapshot"
4703        );
4704    }
4705
4706    #[test]
4707    fn ivf_recovery_replays_post_checkpoint_upserts() {
4708        let tmp = tempfile::tempdir().unwrap();
4709        {
4710            let mut db = open(tmp.path());
4711            seed_ivf(&mut db, 30);
4712            db.checkpoint().unwrap();
4713            // Post-checkpoint upsert, no further checkpoint.
4714            db.upsert("c", "far", &[500.0, 0.0, 0.0, 0.0], &json!({}))
4715                .unwrap();
4716        }
4717        let mut db = open(tmp.path());
4718        assert_eq!(nearest(&mut db, &[500.0, 0.0, 0.0, 0.0])[0], "far");
4719        assert_eq!(nearest(&mut db, &[1.0, 0.0, 0.0, 0.0])[0], "p1");
4720    }
4721
4722    #[test]
4723    fn ivf_recovery_replays_post_checkpoint_deletes() {
4724        let tmp = tempfile::tempdir().unwrap();
4725        {
4726            let mut db = open(tmp.path());
4727            seed_ivf(&mut db, 30);
4728            db.checkpoint().unwrap();
4729            assert!(db.delete("c", "p7").unwrap());
4730        }
4731        let mut db = open(tmp.path());
4732        assert!(
4733            nearest(&mut db, &[7.0, 0.0, 0.0, 0.0])
4734                .iter()
4735                .all(|id| id != "p7")
4736        );
4737        assert!(db.get("c", "p7").unwrap().is_none());
4738        assert!(db.get("c", "p6").unwrap().is_some());
4739    }
4740
4741    #[test]
4742    fn ivf_recovery_replays_post_checkpoint_updates() {
4743        let tmp = tempfile::tempdir().unwrap();
4744        {
4745            let mut db = open(tmp.path());
4746            seed_ivf(&mut db, 30);
4747            db.checkpoint().unwrap();
4748            // Move p0 far away — an in-place update shadowing its sealed row.
4749            db.upsert("c", "p0", &[999.0, 0.0, 0.0, 0.0], &json!({}))
4750                .unwrap();
4751        }
4752        let mut db = open(tmp.path());
4753        assert_eq!(nearest(&mut db, &[999.0, 0.0, 0.0, 0.0])[0], "p0");
4754        assert_ne!(
4755            nearest(&mut db, &[0.0, 0.0, 0.0, 0.0])[0],
4756            "p0",
4757            "the stale p0 vector survived the update"
4758        );
4759    }
4760
4761    #[test]
4762    fn corrupt_ivf_snapshot_falls_back_to_rebuild() {
4763        let tmp = tempfile::tempdir().unwrap();
4764        {
4765            let mut db = open(tmp.path());
4766            seed_ivf(&mut db, 30);
4767            db.checkpoint().unwrap();
4768        }
4769        // Corrupt the snapshot file; recovery must fall back to a rebuild.
4770        let files = idx_snapshot_files(tmp.path());
4771        assert_eq!(files.len(), 1);
4772        std::fs::write(ivf_index_dir(tmp.path()).join(&files[0]), b"corrupt").unwrap();
4773
4774        let mut db = open(tmp.path());
4775        assert_eq!(nearest(&mut db, &[7.0, 0.0, 0.0, 0.0])[0], "p7");
4776    }
4777
4778    // ---- Multi-vector / late interaction (ColBERT, ADR-0028) ----
4779
4780    fn mv_desc() -> Descriptor {
4781        Descriptor::new(3, Dtype::F32, DistanceMetric::Cosine).with_multivector(true)
4782    }
4783
4784    // Brute-force MaxSim ranking over a corpus: the reference the pipeline must
4785    // reproduce (score desc, ties by id), using the same shared scorer.
4786    fn bf_rank(query: &[Vec<f32>], corpus: &[(&str, Vec<Vec<f32>>)]) -> Vec<(String, f32)> {
4787        let mut v: Vec<(String, f32)> = corpus
4788            .iter()
4789            .map(|(id, toks)| ((*id).to_owned(), max_sim(Metric::Cosine, query, toks)))
4790            .collect();
4791        v.sort_by(|a, b| b.1.total_cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
4792        v
4793    }
4794
4795    #[test]
4796    fn multivector_search_ranks_documents_by_maxsim() {
4797        let tmp = tempfile::tempdir().unwrap();
4798        let mut db = open(tmp.path());
4799        db.create_collection("docs", mv_desc()).unwrap();
4800        let corpus: Vec<(&str, Vec<Vec<f32>>)> = vec![
4801            ("d_cat", vec![vec![1.0, 0.0, 0.0], vec![0.0, 1.0, 0.0]]),
4802            ("d_dog", vec![vec![0.0, 1.0, 0.0], vec![0.0, 0.0, 1.0]]),
4803            (
4804                "d_mix",
4805                vec![
4806                    vec![1.0, 1.0, 0.0],
4807                    vec![0.0, 0.0, 1.0],
4808                    vec![1.0, 0.0, 1.0],
4809                ],
4810            ),
4811        ];
4812        for (id, toks) in &corpus {
4813            db.upsert_document("docs", id, toks, &json!({ "id": id }))
4814                .unwrap();
4815        }
4816        assert_eq!(db.document_count("docs").unwrap(), 3);
4817
4818        let query = vec![vec![1.0, 0.0, 0.0], vec![0.0, 0.0, 1.0]];
4819        let params = SearchParams {
4820            k: 3,
4821            with_payload: false,
4822            ..SearchParams::default()
4823        };
4824        let got = db.search_multi_vector("docs", &query, &params).unwrap();
4825        let expected = bf_rank(&query, &corpus);
4826
4827        assert_eq!(got.len(), 3);
4828        for (g, (eid, escore)) in got.iter().zip(expected.iter()) {
4829            assert_eq!(&g.id, eid, "ranking matches brute force");
4830            assert!(
4831                (g.score - escore).abs() < 1e-5,
4832                "{} score {} vs {escore}",
4833                g.id,
4834                g.score
4835            );
4836        }
4837    }
4838
4839    #[test]
4840    fn multivector_search_truncates_to_k() {
4841        let tmp = tempfile::tempdir().unwrap();
4842        let mut db = open(tmp.path());
4843        db.create_collection("docs", mv_desc()).unwrap();
4844        for i in 0..5 {
4845            let v = vec![vec![1.0, i as f32, 0.0]];
4846            db.upsert_document("docs", &format!("d{i}"), &v, &json!({}))
4847                .unwrap();
4848        }
4849        let params = SearchParams {
4850            k: 2,
4851            ..SearchParams::default()
4852        };
4853        let got = db
4854            .search_multi_vector("docs", &[vec![1.0, 0.0, 0.0]], &params)
4855            .unwrap();
4856        assert_eq!(got.len(), 2);
4857    }
4858
4859    #[test]
4860    fn multivector_filter_selects_documents_exactly() {
4861        let tmp = tempfile::tempdir().unwrap();
4862        let mut db = open(tmp.path());
4863        db.create_collection("docs", mv_desc()).unwrap();
4864        // Identical token sets, so only the filter distinguishes the documents.
4865        db.upsert_document("docs", "a", &[vec![1.0, 0.0, 0.0]], &json!({"lang":"en"}))
4866            .unwrap();
4867        db.upsert_document("docs", "b", &[vec![1.0, 0.0, 0.0]], &json!({"lang":"fr"}))
4868            .unwrap();
4869        let params = SearchParams {
4870            k: 10,
4871            filter: Some(Filter::Eq {
4872                field: "lang".into(),
4873                value: json!("fr"),
4874            }),
4875            ..SearchParams::default()
4876        };
4877        let got = db
4878            .search_multi_vector("docs", &[vec![1.0, 0.0, 0.0]], &params)
4879            .unwrap();
4880        assert_eq!(got.len(), 1);
4881        assert_eq!(got[0].id, "b");
4882        assert_eq!(got[0].payload, Some(json!({"lang":"fr"})));
4883    }
4884
4885    #[test]
4886    fn multivector_reopen_rebuilds_grouping_and_ranking() {
4887        let tmp = tempfile::tempdir().unwrap();
4888        let query = vec![vec![1.0, 0.0, 0.0], vec![0.0, 0.0, 1.0]];
4889        let corpus: Vec<(&str, Vec<Vec<f32>>)> = vec![
4890            ("x", vec![vec![1.0, 0.0, 0.0], vec![0.0, 1.0, 0.0]]),
4891            ("y", vec![vec![0.0, 0.0, 1.0], vec![1.0, 0.0, 1.0]]),
4892        ];
4893        {
4894            let mut db = open(tmp.path());
4895            db.create_collection("docs", mv_desc()).unwrap();
4896            for (id, toks) in &corpus {
4897                db.upsert_document("docs", id, toks, &json!({})).unwrap();
4898            }
4899            db.checkpoint().unwrap();
4900        }
4901        // Reopen: the document grouping is rebuilt from the live rows.
4902        let mut db = open(tmp.path());
4903        assert_eq!(db.document_count("docs").unwrap(), 2);
4904        let params = SearchParams {
4905            k: 2,
4906            ..SearchParams::default()
4907        };
4908        let got = db.search_multi_vector("docs", &query, &params).unwrap();
4909        let expected = bf_rank(&query, &corpus);
4910        assert_eq!(
4911            got.iter().map(|m| m.id.clone()).collect::<Vec<_>>(),
4912            expected
4913                .iter()
4914                .map(|(id, _)| id.clone())
4915                .collect::<Vec<_>>()
4916        );
4917    }
4918
4919    #[test]
4920    fn multivector_delete_document_removes_all_tokens() {
4921        let tmp = tempfile::tempdir().unwrap();
4922        let mut db = open(tmp.path());
4923        db.create_collection("docs", mv_desc()).unwrap();
4924        db.upsert_document(
4925            "docs",
4926            "a",
4927            &[vec![1.0, 0.0, 0.0], vec![0.0, 1.0, 0.0]],
4928            &json!({}),
4929        )
4930        .unwrap();
4931        db.upsert_document("docs", "b", &[vec![0.0, 0.0, 1.0]], &json!({}))
4932            .unwrap();
4933        assert_eq!(db.document_count("docs").unwrap(), 2);
4934        assert_eq!(db.len("docs").unwrap(), 3);
4935
4936        assert!(db.delete_document("docs", "a").unwrap());
4937        assert_eq!(db.document_count("docs").unwrap(), 1);
4938        assert_eq!(db.len("docs").unwrap(), 1);
4939        assert!(db.get_document("docs", "a", false).unwrap().is_none());
4940        let params = SearchParams {
4941            k: 10,
4942            ..SearchParams::default()
4943        };
4944        let got = db
4945            .search_multi_vector("docs", &[vec![1.0, 0.0, 0.0]], &params)
4946            .unwrap();
4947        assert!(got.iter().all(|m| m.id != "a"));
4948        assert!(!db.delete_document("docs", "a").unwrap());
4949    }
4950
4951    #[test]
4952    fn multivector_reupsert_replaces_tokens() {
4953        let tmp = tempfile::tempdir().unwrap();
4954        let mut db = open(tmp.path());
4955        db.create_collection("docs", mv_desc()).unwrap();
4956        db.upsert_document(
4957            "docs",
4958            "a",
4959            &[
4960                vec![1.0, 0.0, 0.0],
4961                vec![0.0, 1.0, 0.0],
4962                vec![0.0, 0.0, 1.0],
4963            ],
4964            &json!({"v":1}),
4965        )
4966        .unwrap();
4967        assert_eq!(db.len("docs").unwrap(), 3);
4968        // Re-upsert with a single token: the trailing two must be gone.
4969        db.upsert_document("docs", "a", &[vec![0.0, 0.0, 1.0]], &json!({"v":2}))
4970            .unwrap();
4971        assert_eq!(db.document_count("docs").unwrap(), 1);
4972        assert_eq!(db.len("docs").unwrap(), 1);
4973        let doc = db.get_document("docs", "a", true).unwrap().unwrap();
4974        assert_eq!(doc.payload, Some(json!({"v":2})));
4975        assert_eq!(doc.vectors, Some(vec![vec![0.0, 0.0, 1.0]]));
4976    }
4977
4978    #[test]
4979    fn single_and_multi_vector_apis_are_mutually_exclusive() {
4980        let tmp = tempfile::tempdir().unwrap();
4981        let mut db = open(tmp.path());
4982        db.create_collection("mv", mv_desc()).unwrap();
4983        db.create_collection("sv", Descriptor::new(3, Dtype::F32, DistanceMetric::Cosine))
4984            .unwrap();
4985        // Single-vector ops on a multi-vector collection are rejected.
4986        assert!(matches!(
4987            db.upsert("mv", "a", &[1.0, 0.0, 0.0], &json!({})),
4988            Err(Error::Unsupported(_))
4989        ));
4990        assert!(matches!(
4991            db.search("mv", &[1.0, 0.0, 0.0], &SearchParams::default()),
4992            Err(Error::Unsupported(_))
4993        ));
4994        // Document ops on a single-vector collection are rejected.
4995        assert!(matches!(
4996            db.upsert_document("sv", "a", &[vec![1.0, 0.0, 0.0]], &json!({})),
4997            Err(Error::Unsupported(_))
4998        ));
4999        assert!(matches!(
5000            db.search_multi_vector("sv", &[vec![1.0, 0.0, 0.0]], &SearchParams::default()),
5001            Err(Error::Unsupported(_))
5002        ));
5003        assert!(matches!(
5004            db.document_count("sv"),
5005            Err(Error::Unsupported(_))
5006        ));
5007    }
5008
5009    #[test]
5010    fn multivector_rejects_l2_metric_and_bad_documents() {
5011        let tmp = tempfile::tempdir().unwrap();
5012        let mut db = open(tmp.path());
5013        let l2 = Descriptor::new(3, Dtype::F32, DistanceMetric::L2).with_multivector(true);
5014        assert!(matches!(
5015            db.create_collection("bad", l2),
5016            Err(Error::Unsupported(_))
5017        ));
5018
5019        db.create_collection("docs", mv_desc()).unwrap();
5020        // A document id may not contain the reserved separator.
5021        assert!(matches!(
5022            db.upsert_document("docs", "a\u{1f}b", &[vec![1.0, 0.0, 0.0]], &json!({})),
5023            Err(Error::Unsupported(_))
5024        ));
5025        // A document needs at least one vector, of the right dimensionality.
5026        assert!(matches!(
5027            db.upsert_document("docs", "a", &[], &json!({})),
5028            Err(Error::Unsupported(_))
5029        ));
5030        assert!(matches!(
5031            db.upsert_document("docs", "a", &[vec![1.0, 0.0]], &json!({})),
5032            Err(Error::Unsupported(_))
5033        ));
5034    }
5035
5036    #[test]
5037    fn snapshot_then_open_reproduces_the_database() {
5038        let src = tempfile::tempdir().unwrap();
5039        let mut db = open(src.path());
5040        db.create_collection("kb", desc()).unwrap();
5041        db.create_collection("kb2", desc()).unwrap();
5042        db.upsert("kb", "a", &[1.0, 0.0, 0.0, 0.0], &json!({ "n": 1 }))
5043            .unwrap();
5044        db.upsert("kb", "b", &[0.0, 1.0, 0.0, 0.0], &json!({ "n": 2 }))
5045            .unwrap();
5046        db.upsert("kb2", "z", &[0.0, 0.0, 1.0, 0.0], &json!({ "n": 3 }))
5047            .unwrap();
5048
5049        let dest = tempfile::tempdir().unwrap();
5050        let snap_dir = dest.path().join("snap");
5051        let info = db.snapshot(&snap_dir).unwrap();
5052        assert!(info.files > 0 && info.bytes > 0);
5053        assert_eq!(info.manifest_version, db.manifest_version());
5054
5055        // A write after the snapshot must not appear in the snapshot.
5056        db.upsert("kb", "late", &[1.0, 1.0, 0.0, 0.0], &json!({ "n": 9 }))
5057            .unwrap();
5058
5059        let restored = open(&snap_dir);
5060        let mut names = restored.collection_names();
5061        names.sort();
5062        assert_eq!(names, vec!["kb".to_owned(), "kb2".to_owned()]);
5063        assert_eq!(restored.len("kb").unwrap(), 2, "no post-snapshot write");
5064        assert_eq!(
5065            restored.get("kb", "a").unwrap().unwrap().payload,
5066            Some(json!({ "n": 1 }))
5067        );
5068        assert_eq!(restored.len("kb2").unwrap(), 1);
5069        assert!(restored.get("kb", "late").unwrap().is_none());
5070    }
5071
5072    #[test]
5073    fn snapshot_refuses_an_existing_destination() {
5074        let src = tempfile::tempdir().unwrap();
5075        let mut db = open(src.path());
5076        let dest = tempfile::tempdir().unwrap(); // already exists
5077        assert!(matches!(
5078            db.snapshot(dest.path()),
5079            Err(Error::Core(quiver_core::CoreError::AlreadyExists(_)))
5080        ));
5081    }
5082
5083    #[test]
5084    fn restore_snapshot_roundtrips_and_guards() {
5085        let src = tempfile::tempdir().unwrap();
5086        let mut db = open(src.path());
5087        db.create_collection("kb", desc()).unwrap();
5088        db.upsert("kb", "a", &[1.0, 0.0, 0.0, 0.0], &json!({ "n": 1 }))
5089            .unwrap();
5090        let work = tempfile::tempdir().unwrap();
5091        let snap_dir = work.path().join("snap");
5092        db.snapshot(&snap_dir).unwrap();
5093
5094        // Restore into a fresh directory, then open it.
5095        let restored_dir = work.path().join("restored");
5096        let info = restore_snapshot(&snap_dir, &restored_dir).unwrap();
5097        assert!(info.files > 0);
5098        let restored = open(&restored_dir);
5099        assert_eq!(restored.len("kb").unwrap(), 1);
5100
5101        // Restoring over an existing directory is refused.
5102        assert!(matches!(
5103            restore_snapshot(&snap_dir, &restored_dir),
5104            Err(Error::Core(quiver_core::CoreError::AlreadyExists(_)))
5105        ));
5106        // A directory that is not a snapshot (no CURRENT) is rejected.
5107        let not_snap = work.path().join("not-a-snapshot");
5108        std::fs::create_dir_all(&not_snap).unwrap();
5109        assert!(matches!(
5110            restore_snapshot(&not_snap, &work.path().join("out")),
5111            Err(Error::Core(quiver_core::CoreError::InvalidArgument(_)))
5112        ));
5113    }
5114}