Skip to main content

quiver_embed/
lib.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2//! The embeddable, in-process Quiver database handle.
3//!
4//! [`Database`] composes the storage engine ([`quiver_core::Store`]) with a
5//! per-collection vector index and payload filtering ([`quiver_query::Filter`])
6//! into one handle. It exposes the same logical operations the server speaks
7//! (`docs/api/wire-protocol.md`), so library mode and server mode exercise
8//! identical engine semantics — the server is a thin transport/policy shell.
9//!
10//! ## Index lifecycle
11//! The store is the source of truth. Each collection chooses its index via the
12//! descriptor's [`IndexSpec`] (default in-memory HNSW); the index is built from
13//! the store on open. HNSW applies new-id inserts incrementally; once an IVF
14//! index is built it applies inserts, in-place updates, and deletes
15//! incrementally with LIRE rebalancing (ADR-0023). The Vamana / disk graph
16//! family is maintained the FreshDiskANN way (ADR-0033): the batch-built graph
17//! is a read-only base, recent inserts land in an in-memory delta graph, and
18//! deletes are tombstoned, so writes are size-independent; when the pending work
19//! grows past a fixed fraction of the base the next access consolidates by
20//! rebuilding from the store. All indexes stay derived (rebuilt from the store
21//! on open), so the crash gate never sees an index write.
22//!
23//! ## Filtered (hybrid) search
24//! A search may carry a [`quiver_query::Filter`] over the payload. The planner
25//! decomposes it into the predicates the collection's secondary indexes can
26//! answer; when those narrow the query to a small candidate set it scans that
27//! set exactly (perfect recall, no filtered-ANN cliff), and otherwise it
28//! over-fetches from the ANN index and post-filters. Both arms re-check the full
29//! filter, so results are exact regardless of which path runs.
30//!
31//! ## Concurrency (ADR-0057 / ADR-0062)
32//! Single-writer. Writes take `&mut self`. Reads come in two flavors: the
33//! `&mut self` convenience methods (`search`, `hybrid_search`,
34//! `search_multi_vector`) rebuild a stale index in place and so give embedded,
35//! single-threaded callers read-your-writes; the `&self` `*_snapshot` methods
36//! read the current immutable snapshot and run **concurrently**, serving the
37//! *prior* snapshot when a write deferred a rebuild (snapshot-isolated, slightly
38//! stale). A server therefore serves concurrent reads behind a reader–writer lock,
39//! and rebuilds **off** the exclusive lock (ADR-0062): it captures the rebuild
40//! inputs under the shared lock ([`Database::snapshot_rebuild_inputs`]), builds the
41//! new index with no lock held ([`RebuildInputs::build`]), and installs it under a
42//! brief write lock ([`Database::commit_rebuild`]) — so a rebuild never stalls
43//! concurrent readers.
44
45use std::collections::{BTreeMap, BTreeSet, HashMap};
46use std::path::Path;
47
48use quiver_core::{SecPredicate, SecValue, Store};
49use quiver_index::{
50    ColbertConfig, ColbertIndex, DiskVamana, FreshDiskVamana, FreshVamana, Hnsw, HnswConfig, Index,
51    Ivf, IvfConfig, Metric, Neighbor, ProductQuantizer, Vamana, VamanaConfig, max_sim,
52    ordering_distance, report_metric,
53};
54use serde::{Deserialize, Serialize};
55use serde_json::Value;
56use thiserror::Error;
57
58pub use quiver_core::keyring::{KeyRing, SingleCodecKeyRing};
59pub use quiver_core::page::PageCodec;
60pub use quiver_core::{CollectionId, CommitObserver, WalEntry, WalOp};
61pub use quiver_core::{
62    Descriptor, DistanceMetric, Dtype, FieldType, FilterableField, IndexKind, IndexSpec,
63    VectorEncryption,
64};
65pub use quiver_query::Filter;
66pub use quiver_query::{
67    BM25_B, BM25_K1, DEFAULT_RRF_K0, SPARSE_KEY, SparseInvertedIndex, SparseVector, TEXT_KEY,
68    query_term_ids, rrf_fuse, text_to_sparse,
69};
70
71/// Errors returned by the embeddable database.
72#[derive(Debug, Error)]
73#[non_exhaustive]
74pub enum Error {
75    /// An error from the storage engine (includes not-found / already-exists /
76    /// invalid-argument from the catalog and write path).
77    #[error(transparent)]
78    Core(#[from] quiver_core::CoreError),
79    /// An error from the vector index.
80    #[error(transparent)]
81    Index(#[from] quiver_index::IndexError),
82    /// An error from the disk-resident index (build, open, or query).
83    #[error(transparent)]
84    Disk(#[from] quiver_index::DiskError),
85    /// A payload could not be (de)serialized as JSON.
86    #[error("payload json error: {0}")]
87    Json(#[from] serde_json::Error),
88    /// The named collection is not loaded in this database.
89    #[error("collection not found: {0}")]
90    CollectionNotFound(String),
91    /// The requested index / metric combination is not supported.
92    #[error("unsupported configuration: {0}")]
93    Unsupported(&'static str),
94    /// A durable index snapshot could not be restored (ADR-0025); the caller
95    /// falls back to rebuilding from the store, so this does not surface to users.
96    #[error(transparent)]
97    IndexSnapshot(#[from] quiver_index::SnapshotError),
98    /// An index snapshot envelope could not be (de)serialized.
99    #[error("index snapshot envelope: {0}")]
100    Envelope(#[from] postcard::Error),
101}
102
103/// Result alias for database operations.
104pub type Result<T> = std::result::Result<T, Error>;
105
106/// What a [`Database::snapshot`] captured (ADR-0050): the catalog generation and
107/// the number of files / bytes copied.
108#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
109pub struct SnapshotInfo {
110    /// The manifest version the snapshot reflects (its consistent LSN anchor).
111    pub manifest_version: u64,
112    /// Number of files copied into the snapshot directory.
113    pub files: u64,
114    /// Total bytes copied.
115    pub bytes: u64,
116}
117
118/// A single search or fetch result.
119#[derive(Debug, Clone, PartialEq)]
120pub struct Match {
121    /// External id of the point.
122    pub id: String,
123    /// Distance / similarity under the collection metric (0 for a direct fetch).
124    pub score: f32,
125    /// The payload, if requested.
126    pub payload: Option<Value>,
127    /// The vector, if requested.
128    pub vector: Option<Vec<f32>>,
129}
130
131/// A multi-vector (late-interaction / ColBERT) document result: a document id, its
132/// MaxSim relevance, the payload, and — if requested — the document's token
133/// vectors (ADR-0028).
134#[derive(Debug, Clone, PartialEq)]
135pub struct DocumentMatch {
136    /// Document id.
137    pub id: String,
138    /// MaxSim relevance score (0 for a direct fetch).
139    pub score: f32,
140    /// The document payload, if requested / present (stored on the anchor token).
141    pub payload: Option<Value>,
142    /// The document's token vectors, if requested.
143    pub vectors: Option<Vec<Vec<f32>>>,
144}
145
146// A candidate document during multi-vector re-ranking, before it becomes a
147// [`DocumentMatch`]: `(MaxSim score, document id, anchor payload, token vectors
148// when requested)`. Named so the re-rank buffer stays under clippy's
149// type-complexity threshold.
150type ScoredDocument = (f32, String, Option<Value>, Option<Vec<Vec<f32>>>);
151
152/// Parameters for a [`Database::search`].
153#[derive(Debug, Clone)]
154pub struct SearchParams {
155    /// Number of results to return.
156    pub k: usize,
157    /// Optional payload predicate. The planner pre-filters through the
158    /// collection's secondary indexes when the filter is selective enough, and
159    /// otherwise post-filters the ANN candidates; either way the full predicate
160    /// is re-checked, so results are exact.
161    pub filter: Option<Filter>,
162    /// Search beam width (recall/latency knob), clamped up to at least `k`.
163    pub ef_search: usize,
164    /// Include payloads in the results.
165    pub with_payload: bool,
166    /// Include vectors in the results.
167    pub with_vector: bool,
168}
169
170impl Default for SearchParams {
171    fn default() -> Self {
172        Self {
173            k: 10,
174            filter: None,
175            ef_search: 64,
176            with_payload: true,
177            with_vector: false,
178        }
179    }
180}
181
182// How many extra candidates to pull before post-filtering, so a filtered query
183// still has enough survivors to fill `k`.
184const FILTER_OVERFETCH: usize = 8;
185
186// Hybrid search (ADR-0043) pulls `k * RRF_CANDIDATE_FACTOR` (at least
187// `MIN_RRF_CANDIDATES`) candidates from each side before Reciprocal Rank Fusion,
188// so a document ranked outside the top `k` on one side can still surface via the
189// other.
190const RRF_CANDIDATE_FACTOR: usize = 10;
191const MIN_RRF_CANDIDATES: usize = 100;
192
193// Selectivity threshold for the hybrid planner. When a payload filter decomposes
194// into secondary-index predicates that narrow a query to at most this many live
195// candidate rows, those rows are scanned exactly (brute force) instead of going
196// through the ANN index. Below this size an exact scan is both cheaper and
197// higher-recall than filtered ANN, which can return too few results when the
198// filter is very selective (the "filtered-search recall cliff"). Qdrant calls
199// the equivalent knob its full-scan threshold.
200const FULL_SCAN_THRESHOLD: usize = 10_000;
201
202// Soft-deleted fraction at which a built HNSW is rebuilt from the store to
203// reclaim its tombstoned graph nodes (ADR-0026). Below it, a delete is an O(1)
204// soft-delete; at it, the next access rebuilds.
205const HNSW_REBUILD_DELETED_FRACTION: f64 = 0.2;
206
207/// Pending graph work — delta inserts plus tombstones — at which a FreshDiskANN
208/// graph collection consolidates: the next access rebuilds the base from the
209/// store, reclaiming tombstones and folding in the delta (ADR-0033). Below it,
210/// inserts go to the in-memory delta and deletes are `O(1)` tombstones. Mirrors
211/// [`HNSW_REBUILD_DELETED_FRACTION`].
212const GRAPH_REBUILD_PENDING_FRACTION: f64 = 0.2;
213
214// The vector index backing one collection. HNSW and (once built) IVF are
215// maintained incrementally; the Vamana and disk graphs are batch-built from the
216// store (the `Option` is `None` until first build) and then maintained
217// incrementally the FreshDiskANN way — a read-only base graph plus an in-memory
218// delta and deletion set, consolidated by a rebuild past a churn threshold
219// (ADR-0033).
220enum CollectionIndex {
221    // A fetch-only collection with no server-side ANN index: a client-side-encrypted
222    // collection (ADR-0032) stores opaque ciphertext the server never ranks, so the
223    // client fetches points and ranks locally.
224    None,
225    Hnsw(Hnsw),
226    Vamana(Option<FreshVamana>),
227    Ivf(Option<Ivf>),
228    // The disk-resident DiskANN index: PQ codes in RAM, graph + full vectors on
229    // (encrypted) SSD, exact re-rank (ADR-0019), with a FreshDiskANN in-memory
230    // delta layered on top so the on-disk artifact stays immutable.
231    Disk(Option<FreshDiskVamana>),
232    // The ColBERTv2/PLAID compressed token-pool index for a multi-vector
233    // collection (ADR-0034): centroid + residual-PQ codes with centroid-pruned
234    // candidate generation.
235    Colbert(Option<ColbertIndex>),
236}
237
238impl CollectionIndex {
239    // Search, mapping the generic `ef` knob onto each index's search width.
240    fn search(&self, query: &[f32], k: usize, ef: usize) -> Result<Vec<Neighbor>> {
241        Ok(match self {
242            CollectionIndex::Hnsw(h) => h.search(query, k, ef)?,
243            CollectionIndex::Vamana(Some(g)) => g.search(query, k, ef)?,
244            CollectionIndex::Ivf(Some(i)) => i.search(query, k, ef)?,
245            CollectionIndex::Disk(Some(d)) => d.search(query, k, ef)?,
246            CollectionIndex::Colbert(Some(c)) => c.search(query, k, ef)?,
247            CollectionIndex::None
248            | CollectionIndex::Vamana(None)
249            | CollectionIndex::Ivf(None)
250            | CollectionIndex::Disk(None)
251            | CollectionIndex::Colbert(None) => Vec::new(),
252        })
253    }
254}
255
256/// On-disk envelope (ADR-0025) for a durable IVF snapshot: the `Ivf` bytes plus
257/// the internal->external id mapping they are addressed by, postcard-encoded and
258/// handed to the store as one opaque blob. On open the envelope is decoded, the
259/// `Ivf` restored, and the post-checkpoint WAL tail replayed. A decode/version
260/// error means "rebuild from the store" — the snapshot is only ever a fast path.
261#[derive(Serialize, Deserialize)]
262struct IndexEnvelope {
263    version: u16,
264    int_to_ext: Vec<String>,
265    ivf: Vec<u8>,
266}
267
268// Envelope format version, independent of the product SemVer (and of the inner
269// `Ivf` snapshot version); a mismatch falls back to a rebuild.
270const INDEX_ENVELOPE_VERSION: u16 = 1;
271
272/// On-disk envelope (ADR-0063) for a durable DiskVamana snapshot. Unlike the IVF
273/// envelope, the bulk (graph + full vectors) stays in the immutable `mmap`-ed
274/// base file (`vamana.qvx`); this blob carries only what ties that base to the
275/// live state — the base point count (validated against the opened file), the
276/// FreshDiskANN tombstones, and the id map. The delta vectors are *not* stored:
277/// the delta ids are implied as `[base_row_count, int_to_ext.len())` and their
278/// vectors re-fetched from the store on open, so the blob stays O(delta ids), not
279/// O(N) vectors. A decode/version/validation error means "rebuild from the store".
280#[derive(Serialize, Deserialize)]
281struct DiskEnvelope {
282    version: u16,
283    int_to_ext: Vec<String>,
284    base_row_count: u64,
285    deleted_ids: Vec<u64>,
286}
287
288struct CollectionHandle {
289    id: CollectionId,
290    descriptor: Descriptor,
291    index: CollectionIndex,
292    int_to_ext: Vec<String>,
293    ext_to_int: HashMap<String, u64>,
294    stale: bool,
295    // Monotonic per-collection write counter, bumped every time a write defers a
296    // rebuild (`mark_stale`). An off-lock rebuild (ADR-0062) captures it before
297    // building and re-checks it at commit: if it advanced, a write landed during
298    // the build, so the freshly built index is already behind — the commit installs
299    // it (still newer than the prior snapshot) but leaves the handle stale so the
300    // next rebuild catches up. Wrapping is fine: equality, not ordering, is what the
301    // commit checks.
302    write_gen: u64,
303    // For a multi-vector (ColBERT) collection: each document id mapped to its
304    // token count, so a re-rank can gather all of a document's token rows
305    // (`<doc-id><US><ordinal>`) and `document_count` is O(1). `None` for a
306    // single-vector collection. Maintained eagerly on document writes and rebuilt
307    // authoritatively from the store on open / rebuild; never persisted (ADR-0028).
308    docs: Option<BTreeMap<String, u32>>,
309    // Derived inverted index over the collection's `__quiver_sparse__` payloads,
310    // for the sparse half of hybrid search (ADR-0045). `Some` for a single-vector,
311    // server-searchable collection; `None` for multi-vector and client-side-encrypted
312    // collections (which never run hybrid search) — and as a backstop, when `None`
313    // the sparse ranking falls back to a full store scan. Built on rebuild from the
314    // store and maintained incrementally on upsert/delete; never persisted.
315    sparse: Option<SparseInvertedIndex>,
316}
317
318// Whether a collection should carry a derived sparse inverted index: only
319// single-vector, server-searchable collections run hybrid search (ADR-0045).
320fn uses_sparse_index(descriptor: &Descriptor) -> bool {
321    !descriptor.multivector && descriptor.vector_encryption != VectorEncryption::ClientSide
322}
323
324// Mark a collection's index stale: a write the index could not absorb in place
325// defers a full rebuild. Also bumps the write generation (every staleness-marking
326// write is a write).
327fn mark_stale(handle: &mut CollectionHandle) {
328    handle.stale = true;
329    bump_write_gen(handle);
330}
331
332// Bump a collection's monotonic write generation. Called on **every** write that
333// touches the store — including writes that land while the index is already stale,
334// which the incremental maintenance skips. The off-lock rebuild (ADR-0062) captures
335// this before scanning and re-checks it at commit: if it advanced, a write landed
336// during the build, so the freshly built index is already behind and the commit
337// leaves the collection stale for the next rebuild. Wrapping is fine — the commit
338// checks equality, not ordering — and over-counting is harmless (at worst one extra
339// rebuild); only *missing* a bump could lose a write.
340fn bump_write_gen(handle: &mut CollectionHandle) {
341    handle.write_gen = handle.write_gen.wrapping_add(1);
342}
343
344/// An in-process Quiver database over one data directory.
345pub struct Database {
346    store: Store,
347    collections: HashMap<String, CollectionHandle>,
348}
349
350impl Database {
351    /// Open (creating if absent) the database at `dir` with encryption-at-rest
352    /// disabled, rebuilding each collection's index from the store.
353    pub fn open(dir: &Path) -> Result<Self> {
354        Self::from_store(Store::open(dir)?)
355    }
356
357    /// Open the database with a specific page codec — used to enable
358    /// encryption-at-rest by passing `quiver-crypto`'s AEAD codec. Mirrors
359    /// [`quiver_core::Store::open_with_codec`]; the codec seals both paged files
360    /// and the WAL, so no plaintext user data reaches the disk.
361    pub fn open_with_codec(dir: &Path, codec: Box<dyn PageCodec>) -> Result<Self> {
362        Self::from_store(Store::open_with_codec(dir, codec)?)
363    }
364
365    /// Open the database with a [`KeyRing`], the seam that lets `quiver-crypto`'s
366    /// envelope key-ring seal each collection under its own data-encryption key
367    /// (enabling crypto-shredding). Mirrors
368    /// [`quiver_core::Store::open_with_keyring`].
369    pub fn open_with_keyring(dir: &Path, keyring: Box<dyn KeyRing>) -> Result<Self> {
370        Self::from_store(Store::open_with_keyring(dir, keyring)?)
371    }
372
373    // Build the in-memory handles (and their HNSW indexes) over an opened store.
374    fn from_store(store: Store) -> Result<Self> {
375        let mut collections = HashMap::new();
376        for name in store.collection_names() {
377            let Some(id) = store.collection_id(&name) else {
378                continue;
379            };
380            let Some(descriptor) = store.descriptor(id).cloned() else {
381                continue;
382            };
383            let mut handle = CollectionHandle {
384                id,
385                index: empty_index(&descriptor),
386                descriptor,
387                int_to_ext: Vec::new(),
388                ext_to_int: HashMap::new(),
389                stale: true,
390                write_gen: 0,
391                docs: None,
392                // Populated by `load_index` / `rebuild_index` from the store.
393                sparse: None,
394            };
395            load_index(&store, &mut handle)?;
396            collections.insert(name, handle);
397        }
398        Ok(Self { store, collections })
399    }
400
401    /// Create a collection. Errors if the name already exists, or if the index
402    /// specification is unsupported for the metric.
403    pub fn create_collection(&mut self, name: &str, descriptor: Descriptor) -> Result<()> {
404        validate_index(&descriptor)?;
405        let id = self.store.create_collection(name, descriptor.clone())?;
406        let index = empty_index(&descriptor);
407        let docs = descriptor.multivector.then(BTreeMap::new);
408        // A fresh single-vector collection starts with an empty inverted index
409        // maintained incrementally from the first upsert (an empty index allocates
410        // nothing until a sparse vector arrives).
411        let sparse = uses_sparse_index(&descriptor).then(SparseInvertedIndex::new);
412        self.collections.insert(
413            name.to_owned(),
414            CollectionHandle {
415                id,
416                descriptor,
417                index,
418                int_to_ext: Vec::new(),
419                ext_to_int: HashMap::new(),
420                stale: false,
421                write_gen: 0,
422                docs,
423                sparse,
424            },
425        );
426        Ok(())
427    }
428
429    /// Drop a collection and its data. Returns whether it existed.
430    pub fn drop_collection(&mut self, name: &str) -> Result<bool> {
431        let existed = self.store.drop_collection(name)?;
432        self.collections.remove(name);
433        Ok(existed)
434    }
435
436    /// Crypto-shred a collection: drop it and destroy its data-encryption key, so
437    /// its sealed data is unrecoverable even with the master key, then reclaim
438    /// its files. Mirrors [`quiver_core::Store::shred_collection`]; with an
439    /// envelope key-ring this is irreversible erasure, with a single-codec
440    /// key-ring it is `drop` plus a checkpoint. Returns whether it existed.
441    pub fn shred_collection(&mut self, name: &str) -> Result<bool> {
442        let existed = self.store.shred_collection(name)?;
443        self.collections.remove(name);
444        Ok(existed)
445    }
446
447    /// Install a replication commit observer, invoked with each committed
448    /// [`WalEntry`] in commit order (ADR-0030). The server uses this to drive a
449    /// leader's replication stream.
450    pub fn set_commit_observer(&mut self, observer: CommitObserver) {
451        self.store.set_commit_observer(observer);
452    }
453
454    /// The operations that recreate the current logical state, for a replication
455    /// follower to bootstrap from (ADR-0030).
456    ///
457    /// # Errors
458    /// Propagates a store read error.
459    pub fn replication_snapshot(&self) -> Result<Vec<WalOp>> {
460        Ok(self.store.replication_snapshot()?)
461    }
462
463    /// Apply a replicated operation from a leader (ADR-0030): persist and apply it
464    /// to the store (preserving the leader's collection id), then reconcile the
465    /// in-memory index handles — register a new collection, drop a removed one, or
466    /// mark a touched collection's index stale so the next read rebuilds from the
467    /// replicated state.
468    ///
469    /// # Errors
470    /// Propagates a store apply error.
471    pub fn apply_replicated(&mut self, op: WalOp) -> Result<()> {
472        let target = match &op {
473            WalOp::CreateCollection { collection_id, .. }
474            | WalOp::DropCollection { collection_id }
475            | WalOp::Upsert { collection_id, .. }
476            | WalOp::Delete { collection_id, .. } => Some(*collection_id),
477            WalOp::Checkpoint { .. } => None,
478        };
479        let create_name = match &op {
480            WalOp::CreateCollection { name, .. } => Some(name.clone()),
481            _ => None,
482        };
483        let is_drop = matches!(op, WalOp::DropCollection { .. });
484        self.store.apply_replicated(op)?;
485
486        if let Some(name) = create_name {
487            // Register a fresh handle for the newly replicated collection.
488            if let Some(id) = target
489                && let Some(descriptor) = self.store.descriptor(id).cloned()
490            {
491                let docs = descriptor.multivector.then(BTreeMap::new);
492                let index = empty_index(&descriptor);
493                // Replicated writes mark the handle stale, so the next read rebuilds
494                // the inverted index from the replicated store.
495                self.collections.insert(
496                    name,
497                    CollectionHandle {
498                        id,
499                        descriptor,
500                        index,
501                        int_to_ext: Vec::new(),
502                        ext_to_int: HashMap::new(),
503                        stale: false,
504                        write_gen: 0,
505                        docs,
506                        sparse: None,
507                    },
508                );
509            }
510        } else if is_drop {
511            if let Some(id) = target {
512                self.collections.retain(|_, h| h.id != id);
513            }
514        } else if let Some(id) = target
515            && let Some(handle) = self.collections.values_mut().find(|h| h.id == id)
516        {
517            mark_stale(handle);
518        }
519        Ok(())
520    }
521
522    /// Names of all collections, sorted.
523    #[must_use]
524    pub fn collection_names(&self) -> Vec<String> {
525        self.store.collection_names()
526    }
527
528    /// The descriptor of a collection, if it exists.
529    #[must_use]
530    pub fn descriptor(&self, name: &str) -> Option<&Descriptor> {
531        self.collections.get(name).map(|h| &h.descriptor)
532    }
533
534    /// Number of live points in a collection.
535    pub fn len(&self, name: &str) -> Result<usize> {
536        let handle = self.handle(name)?;
537        Ok(self.store.len(handle.id)?)
538    }
539
540    /// Whether a collection has no points.
541    pub fn is_empty(&self, name: &str) -> Result<bool> {
542        Ok(self.len(name)? == 0)
543    }
544
545    /// Insert or replace a point with a JSON payload.
546    pub fn upsert(
547        &mut self,
548        collection: &str,
549        id: &str,
550        vector: &[f32],
551        payload: &Value,
552    ) -> Result<()> {
553        let handle = self
554            .collections
555            .get_mut(collection)
556            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
557        require_single_vector(handle)?;
558        let payload_bytes = serde_json::to_vec(payload)?;
559        self.store.upsert(handle.id, id, vector, &payload_bytes)?;
560        // Client-side-encrypted collections have no server-side index to maintain
561        // (ADR-0032): the stored vector is an opaque placeholder the server never
562        // ranks. The point is durable in the store; nothing else to do.
563        if handle.descriptor.vector_encryption == VectorEncryption::ClientSide {
564            return Ok(());
565        }
566        // Maintain the in-memory index in place where the kind allows it, else
567        // defer to a lazy rebuild on the next search (ADR-0023/0026/0033).
568        index_upsert_point(handle, id, vector)?;
569        // Keep the derived sparse inverted index in step (ADR-0045).
570        sparse_index_upsert_point(handle, id, payload);
571        Ok(())
572    }
573
574    /// Upsert a batch of points with a single WAL `fdatasync` (ADR-0038).
575    ///
576    /// `points` is `(id, vector, payload)` tuples.  The batch is committed
577    /// atomically — all points or none (from the client's perspective).  This
578    /// is the preferred path for the REST `POST /v1/collections/{c}/points`
579    /// handler which already delivers a batch per HTTP request.
580    pub fn upsert_batch(
581        &mut self,
582        collection: &str,
583        points: &[(&str, &[f32], &serde_json::Value)],
584    ) -> Result<u64> {
585        let handle = self
586            .collections
587            .get(collection)
588            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
589        require_single_vector(handle)?;
590        let coll_id = handle.id;
591        let is_client_side = handle.descriptor.vector_encryption == VectorEncryption::ClientSide;
592
593        let payload_bytes: Vec<Vec<u8>> = points
594            .iter()
595            .map(|(_, _, p)| serde_json::to_vec(p).map_err(Error::Json))
596            .collect::<Result<_>>()?;
597
598        let records: Vec<(&str, &[f32], &[u8])> = points
599            .iter()
600            .zip(payload_bytes.iter())
601            .map(|((id, vec, _), p)| (*id, *vec, p.as_slice()))
602            .collect();
603
604        self.store.upsert_batch(coll_id, &records)?;
605
606        if is_client_side {
607            return Ok(records.len() as u64);
608        }
609
610        for (id, vector, payload) in points {
611            let handle = self
612                .collections
613                .get_mut(collection)
614                .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
615            index_upsert_point(handle, id, vector)?;
616            sparse_index_upsert_point(handle, id, payload);
617        }
618        Ok(records.len() as u64)
619    }
620
621    /// Upsert a large batch for a bulk load, deferring all index work to a single
622    /// rebuild pass (ADR-0045).
623    ///
624    /// Like [`upsert_batch`](Self::upsert_batch) the points are committed with one
625    /// WAL `fdatasync`, but instead of folding each point into the in-memory index
626    /// one at a time, the collection's index is marked **stale** so the next search
627    /// rebuilds it in a single pass over the whole collection — far cheaper for a
628    /// fresh load (one k-means for IVF, one graph build for Vamana, one inverted-index
629    /// scan) than N incremental inserts. Prefer `upsert_batch` for steady-state
630    /// writes where query-after-write latency matters.
631    pub fn upsert_bulk(
632        &mut self,
633        collection: &str,
634        points: &[(&str, &[f32], &serde_json::Value)],
635    ) -> Result<u64> {
636        let handle = self
637            .collections
638            .get_mut(collection)
639            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
640        require_single_vector(handle)?;
641        let coll_id = handle.id;
642
643        let payload_bytes: Vec<Vec<u8>> = points
644            .iter()
645            .map(|(_, _, p)| serde_json::to_vec(p).map_err(Error::Json))
646            .collect::<Result<_>>()?;
647        let records: Vec<(&str, &[f32], &[u8])> = points
648            .iter()
649            .zip(payload_bytes.iter())
650            .map(|((id, vec, _), p)| (*id, *vec, p.as_slice()))
651            .collect();
652
653        self.store.upsert_batch(coll_id, &records)?;
654
655        // Defer the dense and sparse index maintenance to a single rebuild on the
656        // next read (a client-side-encrypted collection has no index to rebuild, but
657        // marking it stale is harmless — its rebuild produces the same no-op index).
658        let handle = self
659            .collections
660            .get_mut(collection)
661            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
662        mark_stale(handle);
663        Ok(records.len() as u64)
664    }
665
666    /// Delete a point by id. Returns whether it existed.
667    pub fn delete(&mut self, collection: &str, id: &str) -> Result<bool> {
668        let handle = self
669            .collections
670            .get_mut(collection)
671            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
672        require_single_vector(handle)?;
673        let existed = self.store.delete(handle.id, id)?;
674        if !existed {
675            return Ok(false);
676        }
677        // No server-side index to update for client-side-encrypted collections
678        // (ADR-0032); the store delete is authoritative.
679        if handle.descriptor.vector_encryption == VectorEncryption::ClientSide {
680            return Ok(true);
681        }
682        // A built IVF removes in place (ADR-0023), a built HNSW soft-deletes
683        // (ADR-0026), and a built FreshDiskANN graph tombstones in its deletion set
684        // (ADR-0033); other kinds defer to a rebuild. The id->internal mapping is
685        // kept so a later re-insert allocates afresh — a removed or soft-deleted
686        // internal is simply never returned by the index.
687        index_delete_point(handle, id);
688        // Drop the point from the derived sparse inverted index too (ADR-0045).
689        sparse_index_delete_point(handle, id);
690        Ok(true)
691    }
692
693    /// Fetch a single point by id, with its payload and vector.
694    pub fn get(&self, collection: &str, id: &str) -> Result<Option<Match>> {
695        let handle = self.handle(collection)?;
696        require_single_vector(handle)?;
697        match self.store.get(handle.id, id)? {
698            Some(record) => Ok(Some(Match {
699                id: id.to_owned(),
700                score: 0.0,
701                payload: Some(serde_json::from_slice(&record.payload)?),
702                vector: Some(record.vector),
703            })),
704            None => Ok(None),
705        }
706    }
707
708    /// Fetch points without ranking — an optional cleartext payload `filter`
709    /// narrows the set and `limit` bounds it. This is the retrieval path for a
710    /// client-side-encrypted collection (ADR-0032): the server returns the entitled
711    /// set (each point's payload carries the sealed vector blob under the reserved
712    /// `__quiver_vec__` key) and the client decrypts and ranks locally. It also
713    /// serves as a general "list points" primitive for any single-vector collection.
714    ///
715    /// Results come in the store's scan order, not by relevance; the filter is
716    /// re-checked exactly against each candidate (a selective filter could use the
717    /// secondary index in future — today it scans).
718    ///
719    /// # Errors
720    /// Errors if the collection does not exist or is multi-vector.
721    pub fn fetch(
722        &self,
723        collection: &str,
724        filter: Option<&Filter>,
725        limit: usize,
726        with_payload: bool,
727        with_vector: bool,
728    ) -> Result<Vec<Match>> {
729        let handle = self.handle(collection)?;
730        require_single_vector(handle)?;
731        let mut out = Vec::new();
732        for (id, record) in self.store.scan(handle.id)? {
733            if out.len() >= limit {
734                break;
735            }
736            let payload: Value = serde_json::from_slice(&record.payload)?;
737            if let Some(filter) = filter
738                && !filter.matches(&payload)
739            {
740                continue;
741            }
742            out.push(Match {
743                id,
744                score: 0.0,
745                payload: with_payload.then_some(payload),
746                vector: with_vector.then_some(record.vector),
747            });
748        }
749        Ok(out)
750    }
751
752    /// Rebuild a collection's index if a prior write deferred it (the `stale`
753    /// flag), making the collection's read snapshot current. Idempotent and cheap
754    /// when already fresh. Separating this `&mut self` maintenance from the `&self`
755    /// `*_snapshot` reads is what lets a server serve concurrent reads behind a
756    /// shared lock and take the exclusive lock only for the rare rebuild (ADR-0057).
757    pub fn ensure_indexed(&mut self, collection: &str) -> Result<()> {
758        if self.handle(collection)?.stale {
759            let store = &self.store;
760            let handle = self
761                .collections
762                .get_mut(collection)
763                .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
764            rebuild_index(store, handle)?;
765        }
766        Ok(())
767    }
768
769    /// Whether a collection's index is stale — a prior write deferred its rebuild.
770    /// The server reads this to schedule an **off-lock** rebuild (ADR-0062) without
771    /// holding the exclusive lock; embedded callers never need it (the `&mut self`
772    /// searches rebuild synchronously via [`Database::ensure_indexed`]).
773    pub fn needs_rebuild(&self, collection: &str) -> Result<bool> {
774        Ok(self.handle(collection)?.stale)
775    }
776
777    /// Capture everything an off-lock rebuild needs (ADR-0062): under the shared
778    /// read lock the caller already holds, scan the collection's live rows and
779    /// record the write generation. Returns `None` when the index is already fresh
780    /// (nothing to rebuild). The expensive build then runs with **no lock** via
781    /// [`RebuildInputs::build`], and [`Database::commit_rebuild`] installs it.
782    pub fn snapshot_rebuild_inputs(&self, collection: &str) -> Result<Option<RebuildInputs>> {
783        let handle = self.handle(collection)?;
784        if !handle.stale {
785            return Ok(None);
786        }
787        let scan = scan_collection(&self.store, handle)?;
788        Ok(Some(RebuildInputs {
789            collection: collection.to_owned(),
790            descriptor: handle.descriptor.clone(),
791            scan,
792            write_gen: handle.write_gen,
793        }))
794    }
795
796    /// Install an index built off-lock (ADR-0062) under the brief exclusive lock the
797    /// caller holds. Returns whether the collection is **still** stale: if a write
798    /// landed during the build (the write generation advanced), the fresh index is
799    /// already behind, so it is installed (still newer than the prior snapshot) but
800    /// the handle stays stale for the next rebuild. A collection dropped or replaced
801    /// during the build is ignored (`Ok(false)`) — the build is discarded.
802    pub fn commit_rebuild(&mut self, rebuilt: RebuiltIndex) -> Result<bool> {
803        let store = &self.store;
804        let Some(handle) = self.collections.get_mut(&rebuilt.collection) else {
805            return Ok(false);
806        };
807        match rebuilt.kind {
808            RebuiltKind::Ready(index) => handle.index = *index,
809            RebuiltKind::Disk { graph, pq } => {
810                // Drop the prior index before sealing the artifact in place: its
811                // `mmap` assumes an immutable file. Safe under the write lock — no
812                // read can observe the momentary empty index.
813                handle.index = empty_index(&handle.descriptor);
814                let disk = write_disk_index(store, handle.id, &graph, &pq)?;
815                handle.index = CollectionIndex::Disk(Some(FreshDiskVamana::new(disk)?));
816            }
817        }
818        handle.int_to_ext = rebuilt.int_to_ext;
819        handle.ext_to_int = rebuilt.ext_to_int;
820        handle.docs = rebuilt.docs;
821        handle.sparse = rebuilt.sparse;
822        // Clear stale only if no write landed since the inputs were captured;
823        // otherwise leave it set so the driver rebuilds again for the newer write.
824        let still_stale = handle.write_gen != rebuilt.write_gen;
825        handle.stale = still_stale;
826        Ok(still_stale)
827    }
828
829    /// Search a collection for the nearest points to `query`, optionally
830    /// post-filtered by payload predicate.
831    pub fn search(
832        &mut self,
833        collection: &str,
834        query: &[f32],
835        params: &SearchParams,
836    ) -> Result<Vec<Match>> {
837        // Embedded read-your-writes: rebuild a deferred index first (synchronously),
838        // then read the now-fresh snapshot. The server instead serves the prior
839        // snapshot and rebuilds off-lock (ADR-0062), so concurrent readers never
840        // block on a writer's rebuild.
841        self.ensure_indexed(collection)?;
842        self.search_snapshot(collection, query, params)
843    }
844
845    /// Search a collection's **current immutable snapshot** for the nearest points
846    /// to `query`, optionally post-filtered by payload predicate. Takes `&self`, so
847    /// many readers run concurrently. When a prior write deferred this collection's
848    /// rebuild, it serves the **prior** snapshot (a snapshot-isolated, slightly stale
849    /// read — ADR-0062/0053); the caller schedules a rebuild via
850    /// [`Database::needs_rebuild`].
851    pub fn search_snapshot(
852        &self,
853        collection: &str,
854        query: &[f32],
855        params: &SearchParams,
856    ) -> Result<Vec<Match>> {
857        require_single_vector(self.handle(collection)?)?;
858        require_server_searchable(self.handle(collection)?)?;
859
860        let handle = self.handle(collection)?;
861
862        // Hybrid planning: if the filter narrows to a small, secondary-indexed
863        // candidate set, scan those rows exactly instead of post-filtering ANN
864        // hits — exact, and immune to the filtered-ANN recall cliff.
865        if let Some(filter) = &params.filter
866            && let Some(candidates) = candidate_ids(
867                &self.store,
868                handle.id,
869                filter,
870                &handle.descriptor.filterable,
871            )?
872            && candidates.len() <= FULL_SCAN_THRESHOLD
873        {
874            return self.exact_filtered_search(
875                handle.id,
876                &handle.descriptor,
877                query,
878                params,
879                filter,
880                &candidates,
881            );
882        }
883
884        let fetch = if params.filter.is_some() {
885            params
886                .k
887                .saturating_mul(FILTER_OVERFETCH)
888                .max(params.ef_search)
889        } else {
890            params.k
891        };
892        let raw = handle.index.search(query, fetch, params.ef_search)?;
893
894        let need_record = params.filter.is_some() || params.with_payload || params.with_vector;
895        let mut out = Vec::with_capacity(params.k);
896        for neighbor in raw {
897            if out.len() >= params.k {
898                break;
899            }
900            let Some(ext_id) = handle.int_to_ext.get(neighbor.id as usize) else {
901                continue;
902            };
903            let record = if need_record {
904                self.store.get(handle.id, ext_id)?
905            } else {
906                None
907            };
908            let payload_value: Option<Value> = match &record {
909                Some(r) if params.filter.is_some() || params.with_payload => {
910                    Some(serde_json::from_slice(&r.payload)?)
911                }
912                _ => None,
913            };
914            if let Some(filter) = &params.filter {
915                let value = payload_value.as_ref().unwrap_or(&Value::Null);
916                if !filter.matches(value) {
917                    continue;
918                }
919            }
920            out.push(Match {
921                id: ext_id.clone(),
922                score: neighbor.distance,
923                payload: if params.with_payload {
924                    payload_value
925                } else {
926                    None
927                },
928                vector: if params.with_vector {
929                    record.map(|r| r.vector)
930                } else {
931                    None
932                },
933            });
934        }
935        Ok(out)
936    }
937
938    /// Hybrid search (ADR-0043/0046): fuse up to three rankings with Reciprocal
939    /// Rank Fusion — a dense ANN ranking, a sparse inverted-index dot-product
940    /// ranking (`sparse_query`), and a BM25 full-text ranking (`text_query`, scored
941    /// over the same inverted index). Any may be `None`; at least one is required,
942    /// giving pure dense / sparse / lexical or any blend through the same path. The
943    /// same payload `filter` is re-checked on every side, so results stay exact.
944    /// `rrf_k0` is the RRF rank-bias constant ([`DEFAULT_RRF_K0`]).
945    pub fn hybrid_search(
946        &mut self,
947        collection: &str,
948        dense_query: Option<&[f32]>,
949        sparse_query: Option<&SparseVector>,
950        text_query: Option<&str>,
951        params: &SearchParams,
952        rrf_k0: f32,
953    ) -> Result<Vec<Match>> {
954        // Embedded read-your-writes: rebuild a deferred index synchronously, then
955        // read the fresh snapshot (the server serves the prior snapshot and rebuilds
956        // off-lock — ADR-0062).
957        self.ensure_indexed(collection)?;
958        self.hybrid_search_snapshot(
959            collection,
960            dense_query,
961            sparse_query,
962            text_query,
963            params,
964            rrf_k0,
965        )
966    }
967
968    /// Hybrid search over the collection's current immutable snapshot (`&self`, so
969    /// readers run concurrently). When a prior write deferred the rebuild, it serves
970    /// the **prior** snapshot (snapshot-isolated, slightly stale — ADR-0062/0053);
971    /// the caller schedules a rebuild via [`Database::needs_rebuild`].
972    pub fn hybrid_search_snapshot(
973        &self,
974        collection: &str,
975        dense_query: Option<&[f32]>,
976        sparse_query: Option<&SparseVector>,
977        text_query: Option<&str>,
978        params: &SearchParams,
979        rrf_k0: f32,
980    ) -> Result<Vec<Match>> {
981        require_single_vector(self.handle(collection)?)?;
982        require_server_searchable(self.handle(collection)?)?;
983        if dense_query.is_none() && sparse_query.is_none() && text_query.is_none() {
984            return Err(Error::Unsupported(
985                "hybrid_search requires a dense query, a sparse query, or a text query",
986            ));
987        }
988        let handle = self.handle(collection)?;
989
990        // Pull a deep-enough candidate list from each side so the fusion is
991        // meaningful, then RRF down to `k`.
992        let depth = params
993            .k
994            .saturating_mul(RRF_CANDIDATE_FACTOR)
995            .max(MIN_RRF_CANDIDATES);
996        let filter = params.filter.as_ref();
997        let mut lists: Vec<Vec<String>> = Vec::new();
998        if let Some(q) = dense_query {
999            lists.push(self.dense_ranked_ids(handle, q, depth, params.ef_search, filter)?);
1000        }
1001        if let Some(sp) = sparse_query {
1002            lists.push(self.sparse_ranked_ids(handle, sp, depth, filter)?);
1003        }
1004        if let Some(text) = text_query {
1005            lists.push(self.bm25_ranked_ids(handle, text, depth, filter)?);
1006        }
1007        let fused = rrf_fuse(&lists, rrf_k0, params.k);
1008
1009        let mut out = Vec::with_capacity(fused.len());
1010        for (ext_id, score) in fused {
1011            let record = if params.with_payload || params.with_vector {
1012                self.store.get(handle.id, &ext_id)?
1013            } else {
1014                None
1015            };
1016            let payload = match (&record, params.with_payload) {
1017                (Some(r), true) => Some(serde_json::from_slice(&r.payload)?),
1018                _ => None,
1019            };
1020            out.push(Match {
1021                id: ext_id,
1022                score,
1023                payload,
1024                vector: if params.with_vector {
1025                    record.map(|r| r.vector)
1026                } else {
1027                    None
1028                },
1029            });
1030        }
1031        Ok(out)
1032    }
1033
1034    // Dense candidates as a ranked list of external ids (the filter re-checked),
1035    // for hybrid fusion.
1036    fn dense_ranked_ids(
1037        &self,
1038        handle: &CollectionHandle,
1039        query: &[f32],
1040        depth: usize,
1041        ef_search: usize,
1042        filter: Option<&Filter>,
1043    ) -> Result<Vec<String>> {
1044        let raw = handle.index.search(query, depth, ef_search.max(depth))?;
1045        let mut ids = Vec::new();
1046        for neighbor in raw {
1047            let Some(ext_id) = handle.int_to_ext.get(neighbor.id as usize) else {
1048                continue;
1049            };
1050            if !self.passes_filter(handle.id, ext_id, filter)? {
1051                continue;
1052            }
1053            ids.push(ext_id.clone());
1054            if ids.len() >= depth {
1055                break;
1056            }
1057        }
1058        Ok(ids)
1059    }
1060
1061    // Sparse candidates as a ranked list of external ids. With the derived inverted
1062    // index present (the common case), score only the query's nonzero dimensions via
1063    // the posting lists, then re-check the filter on the ranked ids until `depth` are
1064    // filled — so low-scored rows never load a payload (ADR-0045). When the index is
1065    // absent (a not-yet-rebuilt or client-side collection), fall back to the full
1066    // store scan, which stays correct under the incremental upsert/delete path.
1067    fn sparse_ranked_ids(
1068        &self,
1069        handle: &CollectionHandle,
1070        query: &SparseVector,
1071        depth: usize,
1072        filter: Option<&Filter>,
1073    ) -> Result<Vec<String>> {
1074        if let Some(idx) = handle.sparse.as_ref() {
1075            let mut ids = Vec::new();
1076            for (ext_id, _score) in idx.search(query) {
1077                if !self.passes_filter(handle.id, &ext_id, filter)? {
1078                    continue;
1079                }
1080                ids.push(ext_id);
1081                if ids.len() >= depth {
1082                    break;
1083                }
1084            }
1085            return Ok(ids);
1086        }
1087        self.sparse_ranked_ids_by_scan(handle.id, query, depth, filter)
1088    }
1089
1090    // The store-scan fallback for [`sparse_ranked_ids`]: load every row, score its
1091    // `__quiver_sparse__` vector by dot product against the query, re-check the
1092    // filter, and return the top `depth`. O(N-rows), but correct without an index.
1093    fn sparse_ranked_ids_by_scan(
1094        &self,
1095        cid: CollectionId,
1096        query: &SparseVector,
1097        depth: usize,
1098        filter: Option<&Filter>,
1099    ) -> Result<Vec<String>> {
1100        let qmap: HashMap<u32, f32> = query
1101            .indices
1102            .iter()
1103            .copied()
1104            .zip(query.values.iter().copied())
1105            .collect();
1106        let mut scored: Vec<(f32, String)> = Vec::new();
1107        for (ext_id, record) in self.store.scan(cid)? {
1108            if record.payload.is_empty() {
1109                continue;
1110            }
1111            let Ok(value) = serde_json::from_slice::<Value>(&record.payload) else {
1112                continue;
1113            };
1114            if let Some(filter) = filter
1115                && !filter.matches(&value)
1116            {
1117                continue;
1118            }
1119            let Some(raw) = value.get(SPARSE_KEY) else {
1120                continue;
1121            };
1122            let Ok(sv) = serde_json::from_value::<SparseVector>(raw.clone()) else {
1123                continue;
1124            };
1125            let mut score = 0.0f32;
1126            for (dim, weight) in sv.indices.iter().zip(sv.values.iter()) {
1127                if let Some(qw) = qmap.get(dim) {
1128                    score += qw * weight;
1129                }
1130            }
1131            if score > 0.0 {
1132                scored.push((score, ext_id));
1133            }
1134        }
1135        scored.sort_by(|a, b| b.0.total_cmp(&a.0).then(a.1.cmp(&b.1)));
1136        Ok(scored.into_iter().take(depth).map(|(_, id)| id).collect())
1137    }
1138
1139    // BM25 full-text candidates as a ranked list of external ids (ADR-0046): tokenize
1140    // the query text into term ids and score them with BM25 over the derived inverted
1141    // index, re-checking the filter on the ranked ids until `depth` are filled. BM25
1142    // needs the index's corpus statistics, so when a collection has no inverted index
1143    // (a not-yet-rebuilt or client-side collection — neither reachable here, since
1144    // hybrid rebuilds a stale handle and rejects client-side) there is nothing to
1145    // score and the list is empty; any dense side still contributes.
1146    fn bm25_ranked_ids(
1147        &self,
1148        handle: &CollectionHandle,
1149        query_text: &str,
1150        depth: usize,
1151        filter: Option<&Filter>,
1152    ) -> Result<Vec<String>> {
1153        let Some(idx) = handle.sparse.as_ref() else {
1154            return Ok(Vec::new());
1155        };
1156        let terms = query_term_ids(query_text);
1157        let mut ids = Vec::new();
1158        for (ext_id, _score) in idx.bm25_search(&terms, BM25_K1, BM25_B) {
1159            if !self.passes_filter(handle.id, &ext_id, filter)? {
1160                continue;
1161            }
1162            ids.push(ext_id);
1163            if ids.len() >= depth {
1164                break;
1165            }
1166        }
1167        Ok(ids)
1168    }
1169
1170    // Re-check a payload filter against a row (loading its payload). `None` filter
1171    // always passes.
1172    fn passes_filter(
1173        &self,
1174        cid: CollectionId,
1175        ext_id: &str,
1176        filter: Option<&Filter>,
1177    ) -> Result<bool> {
1178        let Some(filter) = filter else {
1179            return Ok(true);
1180        };
1181        let value: Value = match self.store.get(cid, ext_id)? {
1182            Some(r) => serde_json::from_slice(&r.payload)?,
1183            None => Value::Null,
1184        };
1185        Ok(filter.matches(&value))
1186    }
1187
1188    // Exactly score `candidates` (a superset of the filter's matches that the
1189    // secondary indexes produced) against the query, re-check the full filter
1190    // for correctness, and return the top `k`. The pre-filter arm of the hybrid
1191    // planner: perfect recall over an already-narrowed set.
1192    fn exact_filtered_search(
1193        &self,
1194        cid: CollectionId,
1195        descriptor: &Descriptor,
1196        query: &[f32],
1197        params: &SearchParams,
1198        filter: &Filter,
1199        candidates: &BTreeSet<String>,
1200    ) -> Result<Vec<Match>> {
1201        let metric = to_index_metric(descriptor.metric);
1202        let mut scored: Vec<(f32, String, Value, Vec<f32>)> = Vec::new();
1203        for ext_id in candidates {
1204            let Some(record) = self.store.get(cid, ext_id)? else {
1205                continue;
1206            };
1207            let payload: Value = serde_json::from_slice(&record.payload)?;
1208            if !filter.matches(&payload) {
1209                continue;
1210            }
1211            let ordering = ordering_distance(metric, query, &record.vector);
1212            scored.push((ordering, ext_id.clone(), payload, record.vector));
1213        }
1214        scored.sort_by(|a, b| a.0.total_cmp(&b.0));
1215        scored.truncate(params.k);
1216        Ok(scored
1217            .into_iter()
1218            .map(|(ordering, id, payload, vector)| Match {
1219                id,
1220                score: report_metric(metric, ordering),
1221                payload: params.with_payload.then_some(payload),
1222                vector: params.with_vector.then_some(vector),
1223            })
1224            .collect())
1225    }
1226
1227    /// Insert or replace a multi-vector (late-interaction / ColBERT) document: its
1228    /// `vectors` are stored as a group of token rows and its `payload` once on the
1229    /// anchor token (ADR-0028). Re-upserting a document first removes the tokens a
1230    /// shorter version would leave behind, so the document is replaced cleanly.
1231    ///
1232    /// # Errors
1233    /// Errors if the collection is single-vector, the document has no vectors, a
1234    /// vector's dimensionality is wrong, or the id contains the reserved separator.
1235    pub fn upsert_document(
1236        &mut self,
1237        collection: &str,
1238        doc_id: &str,
1239        vectors: &[Vec<f32>],
1240        payload: &Value,
1241    ) -> Result<()> {
1242        let handle = self
1243            .collections
1244            .get_mut(collection)
1245            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
1246        require_multivector(handle)?;
1247        if doc_id.contains(DOC_TOKEN_SEP) {
1248            return Err(Error::Unsupported(
1249                "document id must not contain the reserved 0x1f separator",
1250            ));
1251        }
1252        if vectors.is_empty() {
1253            return Err(Error::Unsupported("a document needs at least one vector"));
1254        }
1255        let dim = handle.descriptor.dim as usize;
1256        if vectors.iter().any(|v| v.len() != dim) {
1257            return Err(Error::Unsupported(
1258                "every document vector must match the collection dimensionality",
1259            ));
1260        }
1261        // Remove only the trailing tokens a shorter re-upsert leaves behind; the
1262        // rest are overwritten by the upserts below.
1263        let previous = handle
1264            .docs
1265            .as_ref()
1266            .and_then(|d| d.get(doc_id))
1267            .copied()
1268            .unwrap_or(0) as usize;
1269        for j in vectors.len()..previous {
1270            self.store.delete(handle.id, &token_id(doc_id, j))?;
1271            // Tombstone the dropped token row in the ANN index too (ADR-0034).
1272            index_delete_point(handle, &token_id(doc_id, j));
1273        }
1274        let payload_bytes = serde_json::to_vec(payload)?;
1275        for (j, vector) in vectors.iter().enumerate() {
1276            // The payload is stored once, on the anchor token; the rest carry none.
1277            let bytes: &[u8] = if j == 0 {
1278                payload_bytes.as_slice()
1279            } else {
1280                &[]
1281            };
1282            self.store
1283                .upsert(handle.id, &token_id(doc_id, j), vector, bytes)?;
1284            // Fold the token row into the ANN index incrementally instead of
1285            // marking the whole collection stale (ADR-0034); the underlying index
1286            // consolidates by a rebuild past its own churn threshold.
1287            index_upsert_point(handle, &token_id(doc_id, j), vector)?;
1288        }
1289        if let Some(docs) = handle.docs.as_mut() {
1290            docs.insert(doc_id.to_owned(), vectors.len() as u32);
1291        }
1292        Ok(())
1293    }
1294
1295    /// Search a multi-vector collection by a set of query token vectors, ranking
1296    /// documents by MaxSim late interaction (ADR-0028). At or below the exact-scan
1297    /// threshold every document is scored exactly; above it, candidates are
1298    /// generated by nearest-neighbour search over the token pool (recall tuned by
1299    /// `ef_search`) and re-ranked exactly. An optional `filter` is applied to each
1300    /// document's payload, exactly. A document has no single vector, so `with_payload`
1301    /// returns the anchor payload and `with_vector` returns the token vectors.
1302    pub fn search_multi_vector(
1303        &mut self,
1304        collection: &str,
1305        query_tokens: &[Vec<f32>],
1306        params: &SearchParams,
1307    ) -> Result<Vec<DocumentMatch>> {
1308        // Embedded read-your-writes: rebuild a deferred index synchronously, then
1309        // read the fresh snapshot (the server serves the prior snapshot and rebuilds
1310        // off-lock — ADR-0062).
1311        self.ensure_indexed(collection)?;
1312        self.search_multi_vector_snapshot(collection, query_tokens, params)
1313    }
1314
1315    /// Multi-vector (late-interaction) search over the collection's current
1316    /// immutable snapshot (`&self`, so readers run concurrently). A small corpus is
1317    /// scored exactly; a large corpus draws candidates from the ANN index, serving
1318    /// the **prior** snapshot when a write deferred its rebuild (snapshot-isolated,
1319    /// slightly stale — ADR-0062/0053); the caller schedules the rebuild via
1320    /// [`Database::needs_rebuild`].
1321    pub fn search_multi_vector_snapshot(
1322        &self,
1323        collection: &str,
1324        query_tokens: &[Vec<f32>],
1325        params: &SearchParams,
1326    ) -> Result<Vec<DocumentMatch>> {
1327        require_multivector(self.handle(collection)?)?;
1328        let dim = self.handle(collection)?.descriptor.dim as usize;
1329        if query_tokens.is_empty() {
1330            return Ok(Vec::new());
1331        }
1332        if query_tokens.iter().any(|v| v.len() != dim) {
1333            return Err(Error::Unsupported(
1334                "every query token must match the collection dimensionality",
1335            ));
1336        }
1337
1338        let doc_count = self
1339            .handle(collection)?
1340            .docs
1341            .as_ref()
1342            .map_or(0, BTreeMap::len);
1343        let candidates: Vec<String> = if doc_count <= MULTIVECTOR_EXACT_DOC_THRESHOLD {
1344            // Exact: score every document. No ANN index needed.
1345            self.handle(collection)?
1346                .docs
1347                .as_ref()
1348                .map(|d| d.keys().cloned().collect())
1349                .unwrap_or_default()
1350        } else {
1351            // Large corpus: generate candidates from the token pool. A prior write
1352            // may have deferred the ANN index rebuild; serve the prior snapshot (the
1353            // server schedules an off-lock rebuild — ADR-0062).
1354            let handle = self.handle(collection)?;
1355            let per_token_k = params
1356                .k
1357                .saturating_mul(MULTIVECTOR_CANDIDATE_FACTOR)
1358                .max(params.ef_search);
1359            let mut set = BTreeSet::new();
1360            for token in query_tokens {
1361                for neighbor in handle.index.search(token, per_token_k, params.ef_search)? {
1362                    if let Some(ext) = handle.int_to_ext.get(neighbor.id as usize)
1363                        && let Some((doc, _)) = parse_token_id(ext)
1364                    {
1365                        set.insert(doc.to_owned());
1366                    }
1367                }
1368            }
1369            set.into_iter().collect()
1370        };
1371
1372        // Re-rank the candidate documents by exact MaxSim over all their tokens.
1373        let handle = self.handle(collection)?;
1374        let cid = handle.id;
1375        let metric = to_index_metric(handle.descriptor.metric);
1376        let mut scored: Vec<ScoredDocument> = Vec::new();
1377        for doc in &candidates {
1378            let count = handle
1379                .docs
1380                .as_ref()
1381                .and_then(|d| d.get(doc))
1382                .copied()
1383                .unwrap_or(0) as usize;
1384            let (tokens, payload) = self.gather_document(cid, doc, count)?;
1385            if tokens.is_empty() {
1386                continue;
1387            }
1388            if let Some(filter) = &params.filter {
1389                let value = payload.clone().unwrap_or(Value::Null);
1390                if !filter.matches(&value) {
1391                    continue;
1392                }
1393            }
1394            let score = max_sim(metric, query_tokens, &tokens);
1395            let vectors = params.with_vector.then_some(tokens);
1396            scored.push((score, doc.clone(), payload, vectors));
1397        }
1398        // Higher MaxSim first; ties broken by id for a deterministic order.
1399        scored.sort_by(|a, b| b.0.total_cmp(&a.0).then_with(|| a.1.cmp(&b.1)));
1400        scored.truncate(params.k);
1401        Ok(scored
1402            .into_iter()
1403            .map(|(score, id, payload, vectors)| DocumentMatch {
1404                id,
1405                score,
1406                payload: params.with_payload.then_some(payload).flatten(),
1407                vectors,
1408            })
1409            .collect())
1410    }
1411
1412    /// Fetch a multi-vector document by id: its anchor payload and, if
1413    /// `with_vectors`, its token vectors. `None` if the document does not exist.
1414    pub fn get_document(
1415        &self,
1416        collection: &str,
1417        doc_id: &str,
1418        with_vectors: bool,
1419    ) -> Result<Option<DocumentMatch>> {
1420        let handle = self.handle(collection)?;
1421        require_multivector(handle)?;
1422        let Some(&count) = handle.docs.as_ref().and_then(|d| d.get(doc_id)) else {
1423            return Ok(None);
1424        };
1425        let (tokens, payload) = self.gather_document(handle.id, doc_id, count as usize)?;
1426        if tokens.is_empty() {
1427            return Ok(None);
1428        }
1429        Ok(Some(DocumentMatch {
1430            id: doc_id.to_owned(),
1431            score: 0.0,
1432            payload,
1433            vectors: with_vectors.then_some(tokens),
1434        }))
1435    }
1436
1437    /// Delete a multi-vector document and all of its token rows. Returns whether it
1438    /// existed.
1439    pub fn delete_document(&mut self, collection: &str, doc_id: &str) -> Result<bool> {
1440        let handle = self
1441            .collections
1442            .get_mut(collection)
1443            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
1444        require_multivector(handle)?;
1445        let Some(count) = handle.docs.as_ref().and_then(|d| d.get(doc_id)).copied() else {
1446            return Ok(false);
1447        };
1448        for j in 0..count as usize {
1449            self.store.delete(handle.id, &token_id(doc_id, j))?;
1450            // Tombstone each token row in the ANN index incrementally (ADR-0034).
1451            index_delete_point(handle, &token_id(doc_id, j));
1452        }
1453        if let Some(docs) = handle.docs.as_mut() {
1454            docs.remove(doc_id);
1455        }
1456        Ok(true)
1457    }
1458
1459    /// The number of documents in a multi-vector collection. Errors if the
1460    /// collection is single-vector.
1461    pub fn document_count(&self, collection: &str) -> Result<usize> {
1462        let handle = self.handle(collection)?;
1463        require_multivector(handle)?;
1464        Ok(handle.docs.as_ref().map_or(0, BTreeMap::len))
1465    }
1466
1467    // Read a document's token vectors (in ordinal order) and its anchor payload
1468    // from the store. Missing token rows are skipped, so a torn document yields a
1469    // short token list the caller treats as empty.
1470    fn gather_document(
1471        &self,
1472        cid: CollectionId,
1473        doc_id: &str,
1474        count: usize,
1475    ) -> Result<(Vec<Vec<f32>>, Option<Value>)> {
1476        let mut tokens = Vec::with_capacity(count);
1477        let mut payload: Option<Value> = None;
1478        for j in 0..count {
1479            let Some(record) = self.store.get(cid, &token_id(doc_id, j))? else {
1480                continue;
1481            };
1482            if j == 0 && !record.payload.is_empty() {
1483                payload = Some(serde_json::from_slice(&record.payload)?);
1484            }
1485            tokens.push(record.vector);
1486        }
1487        Ok((tokens, payload))
1488    }
1489
1490    /// Flush a durable checkpoint of all collections, capturing a durable
1491    /// snapshot of each built, up-to-date IVF index (ADR-0025) so it reloads on
1492    /// open instead of rebuilding. Other index kinds, and a stale or unbuilt IVF,
1493    /// are rebuilt on open.
1494    pub fn checkpoint(&mut self) -> Result<()> {
1495        let mut snapshots: HashMap<CollectionId, Vec<u8>> = HashMap::new();
1496        for handle in self.collections.values() {
1497            if handle.stale {
1498                continue;
1499            }
1500            if let CollectionIndex::Ivf(Some(ivf)) = &handle.index {
1501                if ivf.is_empty() {
1502                    continue;
1503                }
1504                let envelope = IndexEnvelope {
1505                    version: INDEX_ENVELOPE_VERSION,
1506                    int_to_ext: handle.int_to_ext.clone(),
1507                    ivf: ivf.snapshot()?,
1508                };
1509                snapshots.insert(handle.id, postcard::to_allocvec(&envelope)?);
1510            } else if let CollectionIndex::Disk(Some(fresh)) = &handle.index {
1511                // Durable DiskVamana (ADR-0063): the base file is already on disk
1512                // (sealed at build/consolidation); seal only the tiny tie-to-live
1513                // blob. The derived sparse index is not persisted (as for IVF); on
1514                // reopen the durable load leaves it `None` and hybrid search falls
1515                // back to the store scan until the next rebuild — correct, not fast.
1516                let envelope = DiskEnvelope {
1517                    version: INDEX_ENVELOPE_VERSION,
1518                    int_to_ext: handle.int_to_ext.clone(),
1519                    base_row_count: fresh.base_len() as u64,
1520                    deleted_ids: fresh.deleted_ids(),
1521                };
1522                snapshots.insert(handle.id, postcard::to_allocvec(&envelope)?);
1523            }
1524        }
1525        self.store.checkpoint_with_index_snapshots(&snapshots)?;
1526        Ok(())
1527    }
1528
1529    /// Compact every collection with reclaimable space, merging its sealed
1530    /// segments and dropping deleted/shadowed rows. Crash-safe; a no-op for
1531    /// collections with nothing to reclaim.
1532    pub fn compact(&mut self) -> Result<()> {
1533        Ok(self.store.compact()?)
1534    }
1535
1536    /// The manifest version — the catalog generation a snapshot captures
1537    /// (ADR-0050). Surfaced as snapshot-relevant status in `database_stats`.
1538    #[must_use]
1539    pub fn manifest_version(&self) -> u64 {
1540        self.store.manifest_version()
1541    }
1542
1543    /// Best-effort total on-disk size of the data directory, in bytes — what a
1544    /// full snapshot would copy (ADR-0050). Unreadable entries are skipped.
1545    #[must_use]
1546    pub fn disk_usage_bytes(&self) -> u64 {
1547        dir_size(self.store.dir())
1548    }
1549
1550    /// Take a consistent online snapshot of the whole database into `dest`
1551    /// (which must not already exist), returning what was captured (ADR-0050).
1552    ///
1553    /// The writer lock is held for the duration: `checkpoint` seals the active
1554    /// buffer into segments and advances the WAL floor to the head, then the
1555    /// data directory is byte-copied. Opening `dest` afterwards replays an empty
1556    /// WAL tail and reconstructs the database exactly as of this call.
1557    ///
1558    /// # Errors
1559    /// [`Error::Core`] if `dest` already exists, or on any I/O error during the
1560    /// checkpoint or the copy.
1561    pub fn snapshot(&mut self, dest: &Path) -> Result<SnapshotInfo> {
1562        if dest.exists() {
1563            return Err(Error::Core(quiver_core::CoreError::AlreadyExists(
1564                dest.display().to_string(),
1565            )));
1566        }
1567        // Consistency anchor: flush to segments and advance the WAL floor so the
1568        // copied tree opens with no replay (ADR-0050).
1569        self.checkpoint()?;
1570        let (files, bytes) = copy_tree(self.store.dir(), dest)?;
1571        // ponytail: flush the snapshot root's metadata only; a backup target is
1572        // re-takeable, so per-file fsync isn't warranted. Add it if snapshots
1573        // must survive an immediate post-copy crash.
1574        let _ = std::fs::File::open(dest).and_then(|f| f.sync_all());
1575        Ok(SnapshotInfo {
1576            manifest_version: self.store.manifest_version(),
1577            files,
1578            bytes,
1579        })
1580    }
1581
1582    fn handle(&self, name: &str) -> Result<&CollectionHandle> {
1583        self.collections
1584            .get(name)
1585            .ok_or_else(|| Error::CollectionNotFound(name.to_owned()))
1586    }
1587}
1588
1589/// Restore a snapshot directory `src` (produced by [`Database::snapshot`]) into a
1590/// fresh `dest` directory, leaving it ready for the caller to open with the same
1591/// keyring/codec the snapshot was written under (ADR-0050).
1592///
1593/// `dest` must not already exist — restore never overwrites a live data
1594/// directory. The real integrity check is the caller's subsequent open (which,
1595/// for an encrypted store, is the only party holding the key); this function
1596/// only verifies the source looks like a snapshot and copies it.
1597///
1598/// # Errors
1599/// [`Error::Core`] if `dest` exists, `src` is not a snapshot (no `CURRENT`), or
1600/// on any I/O error during the copy.
1601pub fn restore_snapshot(src: &Path, dest: &Path) -> Result<SnapshotInfo> {
1602    if dest.exists() {
1603        return Err(Error::Core(quiver_core::CoreError::AlreadyExists(
1604            dest.display().to_string(),
1605        )));
1606    }
1607    if !src.join("CURRENT").exists() {
1608        return Err(Error::Core(quiver_core::CoreError::InvalidArgument(
1609            format!("{} is not a snapshot (no CURRENT)", src.display()),
1610        )));
1611    }
1612    let (files, bytes) = copy_tree(src, dest)?;
1613    Ok(SnapshotInfo {
1614        // The restored directory's manifest version is whatever the snapshot
1615        // carried; report 0 since we don't re-open here to read it (the caller's
1616        // open is authoritative). `files`/`bytes` reflect the copy.
1617        manifest_version: 0,
1618        files,
1619        bytes,
1620    })
1621}
1622
1623// Recursively copy `src` into `dst`, returning `(files, bytes)`. I/O errors are
1624// tagged with the offending path. The data directory contains only files and
1625// directories (no symlinks), so a plain walk is complete.
1626fn copy_tree(src: &Path, dst: &Path) -> Result<(u64, u64)> {
1627    std::fs::create_dir_all(dst).map_err(|e| quiver_core::CoreError::io(dst, e))?;
1628    let mut files = 0u64;
1629    let mut bytes = 0u64;
1630    for entry in std::fs::read_dir(src).map_err(|e| quiver_core::CoreError::io(src, e))? {
1631        let entry = entry.map_err(|e| quiver_core::CoreError::io(src, e))?;
1632        let from = entry.path();
1633        let to = dst.join(entry.file_name());
1634        let ft = entry
1635            .file_type()
1636            .map_err(|e| quiver_core::CoreError::io(&from, e))?;
1637        if ft.is_dir() {
1638            let (f, b) = copy_tree(&from, &to)?;
1639            files += f;
1640            bytes += b;
1641        } else {
1642            let n = std::fs::copy(&from, &to).map_err(|e| quiver_core::CoreError::io(&from, e))?;
1643            files += 1;
1644            bytes += n;
1645        }
1646    }
1647    Ok((files, bytes))
1648}
1649
1650// Best-effort recursive on-disk size (bytes) of `dir`; unreadable entries are
1651// skipped so a stats read never fails on a transient error.
1652fn dir_size(dir: &Path) -> u64 {
1653    let mut total = 0u64;
1654    let Ok(rd) = std::fs::read_dir(dir) else {
1655        return total;
1656    };
1657    for entry in rd.flatten() {
1658        let Ok(ft) = entry.file_type() else { continue };
1659        if ft.is_dir() {
1660            total += dir_size(&entry.path());
1661        } else if let Ok(meta) = entry.metadata() {
1662            total += meta.len();
1663        }
1664    }
1665    total
1666}
1667
1668// The byte separating a multi-vector document id from a token ordinal in a token
1669// row's external id (`<doc-id><US><ordinal>`): the ASCII Unit Separator, which is
1670// disallowed in user document ids (ADR-0028).
1671const DOC_TOKEN_SEP: char = '\u{1f}';
1672
1673// At or below this document count a multi-vector search scores every document
1674// exactly; above it, nearest-neighbour candidate generation over the token pool
1675// kicks in (mirrors the single-vector planner's full-scan threshold).
1676const MULTIVECTOR_EXACT_DOC_THRESHOLD: usize = 10_000;
1677
1678// Per-query-token candidate breadth for the large-corpus path: each query token
1679// retrieves about `k × this` nearest token rows before the documents are unioned.
1680const MULTIVECTOR_CANDIDATE_FACTOR: usize = 4;
1681
1682// The external id of a multi-vector document's `ordinal`-th token row.
1683fn token_id(doc_id: &str, ordinal: usize) -> String {
1684    format!("{doc_id}{DOC_TOKEN_SEP}{ordinal}")
1685}
1686
1687// Split a token row's external id back into its document id and ordinal, or `None`
1688// if it is not a token id. Splits from the right, so a document id (which cannot
1689// contain the separator) is recovered intact.
1690fn parse_token_id(ext: &str) -> Option<(&str, u32)> {
1691    let (doc, ordinal) = ext.rsplit_once(DOC_TOKEN_SEP)?;
1692    Some((doc, ordinal.parse().ok()?))
1693}
1694
1695// Reject the single-vector API on a multi-vector collection.
1696fn require_single_vector(handle: &CollectionHandle) -> Result<()> {
1697    if handle.descriptor.multivector {
1698        Err(Error::Unsupported(
1699            "collection is multi-vector; use upsert_document / search_multi_vector",
1700        ))
1701    } else {
1702        Ok(())
1703    }
1704}
1705
1706// Reject the document API on a single-vector collection.
1707fn require_multivector(handle: &CollectionHandle) -> Result<()> {
1708    if handle.descriptor.multivector {
1709        Ok(())
1710    } else {
1711        Err(Error::Unsupported(
1712            "collection is single-vector; use upsert / search",
1713        ))
1714    }
1715}
1716
1717// Reject server-side ranked search on a client-side-encrypted collection: the
1718// server holds only opaque ciphertext and cannot rank it. The client fetches the
1719// entitled set (see `Database::fetch`) and ranks locally (ADR-0032).
1720fn require_server_searchable(handle: &CollectionHandle) -> Result<()> {
1721    if handle.descriptor.vector_encryption == VectorEncryption::ClientSide {
1722        Err(Error::Unsupported(
1723            "collection is client-side encrypted; the server cannot rank opaque vectors — \
1724             fetch points and rank client-side",
1725        ))
1726    } else {
1727        Ok(())
1728    }
1729}
1730
1731fn to_index_metric(metric: DistanceMetric) -> Metric {
1732    match metric {
1733        DistanceMetric::Dot => Metric::Dot,
1734        DistanceMetric::Cosine => Metric::Cosine,
1735        DistanceMetric::L2 => Metric::L2,
1736    }
1737}
1738
1739// Reject index/metric combinations the engine cannot serve.
1740fn validate_index(descriptor: &Descriptor) -> Result<()> {
1741    // Late interaction (MaxSim) is a similarity, so multi-vector collections need a
1742    // similarity metric.
1743    if descriptor.multivector && descriptor.metric == DistanceMetric::L2 {
1744        return Err(Error::Unsupported(
1745            "multi-vector collections require a similarity metric (cosine or dot)",
1746        ));
1747    }
1748    // Client-side opaque encryption (ADR-0032) is searched by the client, not the
1749    // server, so it has no metric or index constraints — but it cannot combine with
1750    // the multi-vector document layout.
1751    if descriptor.vector_encryption == VectorEncryption::ClientSide {
1752        if descriptor.multivector {
1753            return Err(Error::Unsupported(
1754                "client-side vector encryption is not supported for multi-vector collections",
1755            ));
1756        }
1757        return Ok(());
1758    }
1759    // DCPE (ADR-0031) preserves Euclidean distance comparison; the secret scaling
1760    // changes vector norms, so cosine and dot orderings are not preserved.
1761    if descriptor.vector_encryption == VectorEncryption::Dcpe
1762        && descriptor.metric != DistanceMetric::L2
1763    {
1764        return Err(Error::Unsupported(
1765            "dcpe-encrypted collections require the l2 metric",
1766        ));
1767    }
1768    // ColBERT is a late-interaction token-pool index (ADR-0034): valid only for a
1769    // multi-vector collection (which already requires a similarity metric).
1770    if descriptor.index.kind == IndexKind::Colbert && !descriptor.multivector {
1771        return Err(Error::Unsupported(
1772            "the colbert index is only for multi-vector collections",
1773        ));
1774    }
1775    match descriptor.index.kind {
1776        IndexKind::Vamana | IndexKind::Ivf | IndexKind::DiskVamana
1777            if descriptor.metric == DistanceMetric::Dot =>
1778        {
1779            Err(Error::Unsupported(
1780                "vamana, ivf, and the disk index support l2 and cosine; use hnsw for dot",
1781            ))
1782        }
1783        _ => Ok(()),
1784    }
1785}
1786
1787// An empty index of the kind the descriptor selects. Batch kinds start unbuilt.
1788fn empty_index(descriptor: &Descriptor) -> CollectionIndex {
1789    if descriptor.vector_encryption == VectorEncryption::ClientSide {
1790        return CollectionIndex::None;
1791    }
1792    match descriptor.index.kind {
1793        IndexKind::Vamana => CollectionIndex::Vamana(None),
1794        IndexKind::DiskVamana => CollectionIndex::Disk(None),
1795        IndexKind::Ivf => CollectionIndex::Ivf(None),
1796        IndexKind::Colbert => CollectionIndex::Colbert(None),
1797        _ => CollectionIndex::Hnsw(Hnsw::new(
1798            descriptor.dim as usize,
1799            to_index_metric(descriptor.metric),
1800            HnswConfig::default(),
1801        )),
1802    }
1803}
1804
1805// A product-quantization subspace count that divides `dim`, targeting roughly
1806// eight dimensions per subspace; falls back to one whole-vector codebook.
1807fn default_pq_m(dim: usize) -> usize {
1808    let target = (dim / 8).max(1);
1809    (1..=target)
1810        .rev()
1811        .find(|&m| dim.is_multiple_of(m))
1812        .unwrap_or(1)
1813}
1814
1815// Build the descriptor's index over `ids` (internal 0..n) and their flat vectors.
1816// Fixed seed for codebook training, so a collection's disk index is reproducible.
1817const PQ_SEED: u64 = 0x5176_5044_5141_5453;
1818// The disk index artifact, overwritten in place each rebuild; the caller drops
1819// the previous handle (unmapping the file) first.
1820const DISK_INDEX_FILE: &str = "vamana.qvx";
1821
1822fn build_index(
1823    store: &Store,
1824    cid: CollectionId,
1825    descriptor: &Descriptor,
1826    ids: &[u64],
1827    flat: &[f32],
1828) -> Result<CollectionIndex> {
1829    Ok(match build_in_memory_index(descriptor, ids, flat)? {
1830        Some(index) => index,
1831        None => {
1832            let (graph, pq) = build_disk_graph_pq(descriptor, ids, flat)?;
1833            CollectionIndex::Disk(Some(FreshDiskVamana::new(write_disk_index(
1834                store, cid, &graph, &pq,
1835            )?)?))
1836        }
1837    })
1838}
1839
1840// Build the in-memory index for a collection, or `None` for the disk-resident kind
1841// whose artifact must be sealed through the store. Pure given the vectors (no
1842// store, no I/O), so the off-lock rebuild (ADR-0062) calls it without a lock; the
1843// synchronous `build_index` and disk path layer the store I/O on top.
1844fn build_in_memory_index(
1845    descriptor: &Descriptor,
1846    ids: &[u64],
1847    flat: &[f32],
1848) -> Result<Option<CollectionIndex>> {
1849    // Client-side-encrypted collections have no server-side index (ADR-0032): the
1850    // server stores opaque ciphertext it never ranks.
1851    if descriptor.vector_encryption == VectorEncryption::ClientSide {
1852        return Ok(Some(CollectionIndex::None));
1853    }
1854    let dim = descriptor.dim as usize;
1855    let metric = to_index_metric(descriptor.metric);
1856    Ok(Some(match descriptor.index.kind {
1857        IndexKind::Vamana => CollectionIndex::Vamana(Some(FreshVamana::new(Vamana::build(
1858            ids,
1859            flat,
1860            dim,
1861            metric,
1862            VamanaConfig::default(),
1863        )?)?)),
1864        // The disk-resident artifact is sealed through the store, not here.
1865        IndexKind::DiskVamana => return Ok(None),
1866        IndexKind::Ivf => {
1867            let cfg = IvfConfig {
1868                quantization: descriptor.index.pq_subspaces.map(|m| m as usize),
1869                ..IvfConfig::default()
1870            };
1871            CollectionIndex::Ivf(Some(Ivf::build(ids, flat, dim, metric, cfg)?))
1872        }
1873        IndexKind::Colbert => {
1874            // Coarse centroids scale ~√(tokens); probe a quarter of them (PLAID),
1875            // and PQ the residual with the same subspace default as the disk path.
1876            let n = ids.len();
1877            let n_centroids = ((n as f64).sqrt().ceil() as usize).clamp(1, 4096);
1878            let cfg = ColbertConfig {
1879                n_centroids,
1880                n_probe: n_centroids.div_ceil(4).clamp(1, n_centroids),
1881                pq_subspaces: descriptor
1882                    .index
1883                    .pq_subspaces
1884                    .map_or_else(|| default_pq_m(dim), |m| m as usize),
1885                seed: PQ_SEED,
1886            };
1887            CollectionIndex::Colbert(Some(ColbertIndex::build(ids, flat, dim, metric, cfg)?))
1888        }
1889        _ => {
1890            let mut h = Hnsw::new(dim, metric, HnswConfig::default());
1891            for (i, &id) in ids.iter().enumerate() {
1892                h.insert(id, &flat[i * dim..(i + 1) * dim])?;
1893            }
1894            CollectionIndex::Hnsw(h)
1895        }
1896    }))
1897}
1898
1899// Build the Vamana graph + PQ codebook for the disk-resident index. Pure (no
1900// store, no I/O), so the off-lock rebuild (ADR-0062) does the expensive graph build
1901// and PQ training without a lock; `write_disk_index` then seals the result.
1902fn build_disk_graph_pq(
1903    descriptor: &Descriptor,
1904    ids: &[u64],
1905    flat: &[f32],
1906) -> Result<(Vamana, ProductQuantizer)> {
1907    let dim = descriptor.dim as usize;
1908    let metric = to_index_metric(descriptor.metric);
1909    let graph = Vamana::build(ids, flat, dim, metric, VamanaConfig::default())?;
1910    let m = descriptor
1911        .index
1912        .pq_subspaces
1913        .map_or_else(|| default_pq_m(dim), |x| x as usize);
1914    let pq = ProductQuantizer::train(flat, ids.len(), dim, m, metric, PQ_SEED)?;
1915    Ok((graph, pq))
1916}
1917
1918// Seal a prebuilt disk artifact under the collection's index dir with the store's
1919// codec, and open it for queries. Overwrites the artifact in place, so the caller
1920// must drop any prior `DiskVamana` for this collection first (its mmap assumes an
1921// immutable file).
1922fn write_disk_index(
1923    store: &Store,
1924    cid: CollectionId,
1925    graph: &Vamana,
1926    pq: &ProductQuantizer,
1927) -> Result<DiskVamana> {
1928    let dir = store.index_dir(cid);
1929    std::fs::create_dir_all(&dir).map_err(quiver_index::DiskError::Io)?;
1930    let path = dir.join(DISK_INDEX_FILE);
1931    // Seal the index artifact with the collection's own codec (its DEK under an
1932    // envelope key-ring), so a crypto-shred of the collection also makes its index
1933    // unreadable. The same owned handle writes and then mmap-opens it.
1934    let codec = store.collection_codec_clone(cid)?;
1935    // Atomic publish (ADR-0063): write to a temp file, fsync it (disk::write does
1936    // the file sync), rename over the live name, then fsync the dir. A crash mid
1937    // write leaves the previous complete base or none — never a torn file that the
1938    // durable load path could `mmap` and serve. The caller has already dropped any
1939    // prior `DiskVamana` (its mmap assumed the old inode), so the rename is safe.
1940    let tmp = dir.join(format!("{DISK_INDEX_FILE}.tmp"));
1941    quiver_index::disk::write(&tmp, graph, pq, codec.as_ref())?;
1942    std::fs::rename(&tmp, &path).map_err(quiver_index::DiskError::Io)?;
1943    let _ = std::fs::File::open(&dir).and_then(|f| f.sync_all());
1944    open_disk_index(store, cid, codec)
1945}
1946
1947// Open a collection's sealed disk base (`vamana.qvx`) for queries, decrypting with
1948// its codec. Errors (absent/torn/wrong-codec file) propagate so the durable load
1949// path can fall back to a rebuild (ADR-0063).
1950fn open_disk_index(
1951    store: &Store,
1952    cid: CollectionId,
1953    codec: Box<dyn PageCodec>,
1954) -> Result<DiskVamana> {
1955    let path = store.index_dir(cid).join(DISK_INDEX_FILE);
1956    Ok(DiskVamana::open(&path, codec)?)
1957}
1958
1959// Load a collection's index on open: restore the durable IVF snapshot and replay
1960// the post-checkpoint tail (ADR-0025) when one is present and intact, otherwise
1961// rebuild from the store. The snapshot is only a fast path — any problem reading
1962// or restoring it falls back to the authoritative rebuild.
1963fn load_index(store: &Store, handle: &mut CollectionHandle) -> Result<()> {
1964    // Multi-vector collections always rebuild on open, so the document grouping is
1965    // derived from the live rows; the snapshot fast-paths stay single-vector.
1966    if !handle.descriptor.multivector
1967        && handle.descriptor.index.kind == IndexKind::Ivf
1968        && let Ok(Some(blob)) = store.read_index_snapshot(handle.id)
1969        && restore_ivf_snapshot(store, handle, &blob).is_ok()
1970    {
1971        return Ok(());
1972    }
1973    // Durable DiskVamana (ADR-0063): mmap the frugal base instead of an O(N)
1974    // full-RAM rebuild. Any failure falls back to rebuild, so the artifact is never
1975    // load-bearing for correctness. The derived sparse index is left `None` (as for
1976    // IVF) and hybrid falls back to the store scan until the next rebuild.
1977    // `QUIVER_DISABLE_DURABLE_DISK_INDEX` is an ops kill switch: set it to force the
1978    // (always-correct) rebuild path if the durable load is ever suspected.
1979    if !handle.descriptor.multivector
1980        && handle.descriptor.index.kind == IndexKind::DiskVamana
1981        && std::env::var_os("QUIVER_DISABLE_DURABLE_DISK_INDEX").is_none()
1982        && let Ok(Some(blob)) = store.read_index_snapshot(handle.id)
1983        && restore_disk_snapshot(store, handle, &blob).is_ok()
1984    {
1985        return Ok(());
1986    }
1987    rebuild_index(store, handle)
1988}
1989
1990// Restore a DiskVamana from its durable snapshot (ADR-0063): `mmap` the immutable
1991// base file, rebuild the in-memory FreshDiskANN delta from the store by the implied
1992// delta ids, re-apply the tombstones, then replay the post-checkpoint WAL tail. Any
1993// problem — absent/torn/mismatched base, a missing delta row — returns an error so
1994// the caller falls back to an authoritative rebuild.
1995fn restore_disk_snapshot(store: &Store, handle: &mut CollectionHandle, blob: &[u8]) -> Result<()> {
1996    let envelope: DiskEnvelope = postcard::from_bytes(blob)?;
1997    if envelope.version != INDEX_ENVELOPE_VERSION {
1998        return Err(Error::Unsupported(
1999            "unsupported disk index snapshot version",
2000        ));
2001    }
2002    let base = open_disk_index(store, handle.id, store.collection_codec_clone(handle.id)?)?;
2003    // The blob's base count must match the opened file, or the (base, blob) pair is
2004    // inconsistent (e.g. a base torn or replaced after the blob was sealed).
2005    if base.len() as u64 != envelope.base_row_count {
2006        return Err(Error::Unsupported(
2007            "disk base count disagrees with snapshot",
2008        ));
2009    }
2010    handle.ext_to_int = envelope
2011        .int_to_ext
2012        .iter()
2013        .enumerate()
2014        .map(|(i, ext)| (ext.clone(), i as u64))
2015        .collect();
2016    handle.int_to_ext = envelope.int_to_ext;
2017    let mut fresh = FreshDiskVamana::new(base)?;
2018    // Reconstruct the delta: the ids above the base were inserted since the last
2019    // consolidation; their vectors live in the store, fetched by id (a row that
2020    // died this window is simply absent and need not enter the delta).
2021    for internal in envelope.base_row_count..handle.int_to_ext.len() as u64 {
2022        let ext = &handle.int_to_ext[internal as usize];
2023        if let Some(record) = store.get(handle.id, ext)? {
2024            fresh.insert(internal, &record.vector)?;
2025        }
2026    }
2027    for id in envelope.deleted_ids {
2028        fresh.mark_deleted(id);
2029    }
2030    handle.index = CollectionIndex::Disk(Some(fresh));
2031    handle.stale = false;
2032    replay_recovery_tail(store, handle)
2033}
2034
2035// Catch a restored index up to the store's current state by replaying the
2036// post-checkpoint WAL tail through the shared incremental apply path (ADR-0025/0063):
2037// tombstones first, then active-buffer upserts, so a row shadowed this window ends
2038// with its new vector. Bounded by the checkpoint cadence, not the collection size.
2039fn replay_recovery_tail(store: &Store, handle: &mut CollectionHandle) -> Result<()> {
2040    let tail = store.recovery_tail(handle.id)?;
2041    for ext in &tail.deleted {
2042        index_delete_point(handle, ext);
2043    }
2044    for (ext, record) in tail.upserts {
2045        index_upsert_point(handle, &ext, &record.vector)?;
2046    }
2047    Ok(())
2048}
2049
2050// Restore an IVF from its snapshot envelope and catch it up to the store's
2051// current state by replaying the post-checkpoint tail (ADR-0025). Tombstoned ids
2052// are removed before the active upserts are applied, so a row shadowed this
2053// window (present in both) ends with its new vector.
2054fn restore_ivf_snapshot(store: &Store, handle: &mut CollectionHandle, blob: &[u8]) -> Result<()> {
2055    let envelope: IndexEnvelope = postcard::from_bytes(blob)?;
2056    if envelope.version != INDEX_ENVELOPE_VERSION {
2057        return Err(Error::Unsupported(
2058            "unsupported index snapshot envelope version",
2059        ));
2060    }
2061    let ivf = Ivf::restore(&envelope.ivf)?;
2062    handle.ext_to_int = envelope
2063        .int_to_ext
2064        .iter()
2065        .enumerate()
2066        .map(|(i, ext)| (ext.clone(), i as u64))
2067        .collect();
2068    handle.int_to_ext = envelope.int_to_ext;
2069    handle.index = CollectionIndex::Ivf(Some(ivf));
2070    handle.stale = false;
2071
2072    let tail = store.recovery_tail(handle.id)?;
2073    for ext in &tail.deleted {
2074        let Some(&internal) = handle.ext_to_int.get(ext) else {
2075            continue;
2076        };
2077        if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2078            ivf.remove(internal);
2079        }
2080    }
2081    for (ext, record) in tail.upserts {
2082        let internal = match handle.ext_to_int.get(&ext) {
2083            Some(&i) => i,
2084            None => {
2085                let i = handle.int_to_ext.len() as u64;
2086                handle.ext_to_int.insert(ext.clone(), i);
2087                handle.int_to_ext.push(ext);
2088                i
2089            }
2090        };
2091        if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2092            ivf.insert(internal, &record.vector)?;
2093        }
2094    }
2095    Ok(())
2096}
2097
2098// Fold one point write (`ext_id` → `vector`) into a built index incrementally,
2099// or mark the handle stale to defer to a rebuild when the kind cannot absorb it
2100// in place. HNSW absorbs a brand-new id but cannot update one (a known id falls
2101// to a rebuild); a built/trained IVF inserts or replaces any id (ADR-0023); a
2102// built FreshDiskANN graph appends to its delta and tombstones the prior copy on
2103// an update, consolidating past its churn threshold (ADR-0033). Shared by the
2104// single-vector `upsert` and each token row of a multi-vector `upsert_document`
2105// (ADR-0034). A no-op once the handle is stale (a rebuild is already pending).
2106fn index_upsert_point(handle: &mut CollectionHandle, ext_id: &str, vector: &[f32]) -> Result<()> {
2107    // Every write bumps the generation, even one skipped here because a rebuild is
2108    // already pending — an in-flight off-lock rebuild must still notice it (ADR-0062).
2109    bump_write_gen(handle);
2110    if handle.stale {
2111        return Ok(());
2112    }
2113    let known = handle.ext_to_int.contains_key(ext_id);
2114    let is_hnsw = matches!(handle.index, CollectionIndex::Hnsw(_));
2115    let is_live_ivf = matches!(&handle.index, CollectionIndex::Ivf(Some(ivf)) if !ivf.is_empty());
2116    let is_live_graph = matches!(
2117        handle.index,
2118        CollectionIndex::Vamana(Some(_)) | CollectionIndex::Disk(Some(_))
2119    );
2120    let is_live_colbert = matches!(handle.index, CollectionIndex::Colbert(Some(_)));
2121    if is_hnsw && !known {
2122        let internal = handle.int_to_ext.len() as u64;
2123        if let CollectionIndex::Hnsw(h) = &mut handle.index {
2124            h.insert(internal, vector)?;
2125        }
2126        handle.ext_to_int.insert(ext_id.to_owned(), internal);
2127        handle.int_to_ext.push(ext_id.to_owned());
2128    } else if is_live_ivf {
2129        // Reuse the internal id for an in-place update; allocate a fresh, dense one
2130        // for a new id (so `int_to_ext` stays index-addressable).
2131        let internal = if known {
2132            handle.ext_to_int[ext_id]
2133        } else {
2134            let i = handle.int_to_ext.len() as u64;
2135            handle.ext_to_int.insert(ext_id.to_owned(), i);
2136            handle.int_to_ext.push(ext_id.to_owned());
2137            i
2138        };
2139        if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2140            ivf.insert(internal, vector)?;
2141        }
2142    } else if is_live_graph {
2143        // A graph cannot update a node in place: append a new delta node under a
2144        // fresh internal id and tombstone the prior copy on an update (ADR-0033).
2145        let old = handle.ext_to_int.get(ext_id).copied();
2146        let internal = handle.int_to_ext.len() as u64;
2147        let mut pending = 0.0;
2148        match &mut handle.index {
2149            CollectionIndex::Vamana(Some(fresh)) => {
2150                if let Some(o) = old {
2151                    fresh.mark_deleted(o);
2152                }
2153                fresh.insert(internal, vector)?;
2154                pending = fresh.pending_fraction();
2155            }
2156            CollectionIndex::Disk(Some(fresh)) => {
2157                if let Some(o) = old {
2158                    fresh.mark_deleted(o);
2159                }
2160                fresh.insert(internal, vector)?;
2161                pending = fresh.pending_fraction();
2162            }
2163            _ => {}
2164        }
2165        handle.ext_to_int.insert(ext_id.to_owned(), internal);
2166        handle.int_to_ext.push(ext_id.to_owned());
2167        if pending >= GRAPH_REBUILD_PENDING_FRACTION {
2168            mark_stale(handle);
2169        }
2170    } else if is_live_colbert {
2171        // ColBERT appends a new token and tombstones the prior copy on an update —
2172        // its centroids are fixed until a rebuild (ADR-0034); the deletion fraction
2173        // drives consolidation, as for HNSW.
2174        let old = handle.ext_to_int.get(ext_id).copied();
2175        let internal = handle.int_to_ext.len() as u64;
2176        let mut crowded = false;
2177        if let CollectionIndex::Colbert(Some(c)) = &mut handle.index {
2178            if let Some(o) = old {
2179                c.mark_deleted(o);
2180            }
2181            c.insert(internal, vector)?;
2182            crowded = c.deleted_fraction() >= HNSW_REBUILD_DELETED_FRACTION;
2183        }
2184        handle.ext_to_int.insert(ext_id.to_owned(), internal);
2185        handle.int_to_ext.push(ext_id.to_owned());
2186        if crowded {
2187            mark_stale(handle);
2188        }
2189    } else {
2190        mark_stale(handle);
2191    }
2192    Ok(())
2193}
2194
2195// Tombstone one point (`ext_id`) from a built index incrementally, or mark the
2196// handle stale to defer to a rebuild. A built IVF removes in place (ADR-0023), a
2197// built HNSW soft-deletes (ADR-0026), and a built FreshDiskANN graph tombstones in
2198// its deletion set (ADR-0033); each amortizes a rebuild when tombstones dominate.
2199// The id→internal mapping is kept so a later re-insert allocates afresh. Shared by
2200// the single-vector `delete` and each token row of `delete_document` (ADR-0034).
2201fn index_delete_point(handle: &mut CollectionHandle, ext_id: &str) {
2202    // Every write bumps the generation, even one skipped here (see `index_upsert_point`).
2203    bump_write_gen(handle);
2204    if handle.stale {
2205        return;
2206    }
2207    let internal = handle.ext_to_int.get(ext_id).copied();
2208    let live_ivf = matches!(handle.index, CollectionIndex::Ivf(Some(_)));
2209    let live_hnsw = matches!(handle.index, CollectionIndex::Hnsw(_));
2210    let live_graph = matches!(
2211        handle.index,
2212        CollectionIndex::Vamana(Some(_)) | CollectionIndex::Disk(Some(_))
2213    );
2214    let live_colbert = matches!(handle.index, CollectionIndex::Colbert(Some(_)));
2215    match internal {
2216        Some(internal) if live_ivf => {
2217            if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2218                ivf.remove(internal);
2219            }
2220        }
2221        Some(internal) if live_hnsw => {
2222            let mut crowded = false;
2223            if let CollectionIndex::Hnsw(h) = &mut handle.index {
2224                h.mark_deleted(internal as u32);
2225                crowded = h.deleted_fraction() >= HNSW_REBUILD_DELETED_FRACTION;
2226            }
2227            if crowded {
2228                mark_stale(handle);
2229            }
2230        }
2231        Some(internal) if live_graph => {
2232            let mut crowded = false;
2233            match &mut handle.index {
2234                CollectionIndex::Vamana(Some(fresh)) => {
2235                    fresh.mark_deleted(internal);
2236                    crowded = fresh.pending_fraction() >= GRAPH_REBUILD_PENDING_FRACTION;
2237                }
2238                CollectionIndex::Disk(Some(fresh)) => {
2239                    fresh.mark_deleted(internal);
2240                    crowded = fresh.pending_fraction() >= GRAPH_REBUILD_PENDING_FRACTION;
2241                }
2242                _ => {}
2243            }
2244            if crowded {
2245                mark_stale(handle);
2246            }
2247        }
2248        Some(internal) if live_colbert => {
2249            let mut crowded = false;
2250            if let CollectionIndex::Colbert(Some(c)) = &mut handle.index {
2251                c.mark_deleted(internal);
2252                crowded = c.deleted_fraction() >= HNSW_REBUILD_DELETED_FRACTION;
2253            }
2254            if crowded {
2255                mark_stale(handle);
2256            }
2257        }
2258        _ => mark_stale(handle),
2259    }
2260}
2261
2262// The product of scanning a collection's live rows: the dense id maps + contiguous
2263// vectors, the multi-vector document grouping, and the derived sparse inverted
2264// index — everything a rebuild needs from the store, owned so the build can then
2265// proceed with no store and no lock (ADR-0062).
2266struct RebuildScan {
2267    int_to_ext: Vec<String>,
2268    ext_to_int: HashMap<String, u64>,
2269    flat: Vec<f32>,
2270    docs: Option<BTreeMap<String, u32>>,
2271    sparse: Option<SparseInvertedIndex>,
2272}
2273
2274// Scan a collection's live rows into a [`RebuildScan`]. Rebuilds the derived sparse
2275// inverted index (ADR-0045) and, for a multi-vector collection, the document
2276// grouping (doc id → token count) authoritatively from those rows. `&self` on the
2277// store, so it runs under a shared read lock.
2278fn scan_collection(store: &Store, handle: &CollectionHandle) -> Result<RebuildScan> {
2279    let multivector = handle.descriptor.multivector;
2280    let mut int_to_ext = Vec::new();
2281    let mut ext_to_int = HashMap::new();
2282    let mut flat: Vec<f32> = Vec::new();
2283    let mut docs: BTreeMap<String, u32> = BTreeMap::new();
2284    // Only single-vector, server-searchable collections carry a sparse index, and
2285    // only non-empty payloads are parsed, so non-sparse collections pay nothing.
2286    let mut sparse = uses_sparse_index(&handle.descriptor).then(SparseInvertedIndex::new);
2287    for (ext_id, record) in store.scan(handle.id)? {
2288        let internal = int_to_ext.len() as u64;
2289        flat.extend_from_slice(&record.vector);
2290        if multivector && let Some((doc, _)) = parse_token_id(&ext_id) {
2291            *docs.entry(doc.to_owned()).or_insert(0) += 1;
2292        }
2293        if let Some(idx) = sparse.as_mut()
2294            && let Some(sv) = sparse_vector_from_payload(&record.payload)
2295        {
2296            idx.upsert(&ext_id, &sv);
2297        }
2298        ext_to_int.insert(ext_id.clone(), internal);
2299        int_to_ext.push(ext_id);
2300    }
2301    Ok(RebuildScan {
2302        int_to_ext,
2303        ext_to_int,
2304        flat,
2305        docs: multivector.then_some(docs),
2306        sparse,
2307    })
2308}
2309
2310// Rebuild a collection's index from the store's current live rows, synchronously
2311// under `&mut self` — the embedded/open path. The server's runtime path builds
2312// off-lock instead (ADR-0062: `snapshot_rebuild_inputs` → `build` → `commit_rebuild`).
2313fn rebuild_index(store: &Store, handle: &mut CollectionHandle) -> Result<()> {
2314    let scan = scan_collection(store, handle)?;
2315    let ids: Vec<u64> = (0..scan.int_to_ext.len() as u64).collect();
2316    // Drop the previous index before rebuilding: a disk index `mmap`s a file we
2317    // are about to overwrite in place, and the mapping assumes an immutable file.
2318    handle.index = empty_index(&handle.descriptor);
2319    handle.index = build_index(store, handle.id, &handle.descriptor, &ids, &scan.flat)?;
2320    handle.int_to_ext = scan.int_to_ext;
2321    handle.ext_to_int = scan.ext_to_int;
2322    handle.docs = scan.docs;
2323    handle.sparse = scan.sparse;
2324    handle.stale = false;
2325    Ok(())
2326}
2327
2328/// A captured, owned snapshot of everything an off-lock rebuild needs (ADR-0062):
2329/// the scanned rows, the collection's descriptor, and the write generation at
2330/// capture time. Produced under the shared read lock by
2331/// [`Database::snapshot_rebuild_inputs`]; [`RebuildInputs::build`] then constructs
2332/// the new index with no lock held.
2333pub struct RebuildInputs {
2334    collection: String,
2335    descriptor: Descriptor,
2336    scan: RebuildScan,
2337    write_gen: u64,
2338}
2339
2340// The built product of a [`RebuildInputs`], ready to install. In-memory indexes are
2341// fully built; the disk-resident artifact carries its prebuilt graph + PQ, which
2342// `commit_rebuild` seals through the store under the brief write lock (the file
2343// write must not race the prior index's `mmap`).
2344enum RebuiltKind {
2345    Ready(Box<CollectionIndex>),
2346    Disk {
2347        graph: Box<Vamana>,
2348        pq: Box<ProductQuantizer>,
2349    },
2350}
2351
2352/// A new index built off-lock from a [`RebuildInputs`], ready for
2353/// [`Database::commit_rebuild`] to install under the brief write lock (ADR-0062).
2354pub struct RebuiltIndex {
2355    collection: String,
2356    kind: RebuiltKind,
2357    int_to_ext: Vec<String>,
2358    ext_to_int: HashMap<String, u64>,
2359    docs: Option<BTreeMap<String, u32>>,
2360    sparse: Option<SparseInvertedIndex>,
2361    write_gen: u64,
2362}
2363
2364impl RebuildInputs {
2365    /// Build the new index from the captured rows with **no lock held** — the
2366    /// expensive CPU work (graph build, PQ training, HNSW insertion) of an off-lock
2367    /// rebuild (ADR-0062). The disk artifact is built in memory here; its file is
2368    /// sealed later by `commit_rebuild` under the write lock.
2369    pub fn build(self) -> Result<RebuiltIndex> {
2370        let ids: Vec<u64> = (0..self.scan.int_to_ext.len() as u64).collect();
2371        let kind = match build_in_memory_index(&self.descriptor, &ids, &self.scan.flat)? {
2372            Some(index) => RebuiltKind::Ready(Box::new(index)),
2373            None => {
2374                let (graph, pq) = build_disk_graph_pq(&self.descriptor, &ids, &self.scan.flat)?;
2375                RebuiltKind::Disk {
2376                    graph: Box::new(graph),
2377                    pq: Box::new(pq),
2378                }
2379            }
2380        };
2381        Ok(RebuiltIndex {
2382            collection: self.collection,
2383            kind,
2384            int_to_ext: self.scan.int_to_ext,
2385            ext_to_int: self.scan.ext_to_int,
2386            docs: self.scan.docs,
2387            sparse: self.scan.sparse,
2388            write_gen: self.write_gen,
2389        })
2390    }
2391}
2392
2393// Extract a point's sparse vector from its serialized payload, if it carries one
2394// under `__quiver_sparse__` and the value deserializes. An empty payload or a
2395// missing/malformed sparse vector yields `None`.
2396fn sparse_vector_from_payload(payload: &[u8]) -> Option<SparseVector> {
2397    if payload.is_empty() {
2398        return None;
2399    }
2400    let value = serde_json::from_slice::<Value>(payload).ok()?;
2401    sparse_vector_from_value(&value)
2402}
2403
2404// As [`sparse_vector_from_payload`] but over an already-parsed payload `Value`
2405// (the upsert path has the payload before it is serialized). An explicit
2406// `__quiver_sparse__` vector wins; otherwise a `__quiver_text__` string is
2407// tokenized into a term-frequency vector (ADR-0046), so a point is searchable by
2408// BM25 over text alone.
2409fn sparse_vector_from_value(payload: &Value) -> Option<SparseVector> {
2410    if let Some(raw) = payload.get(SPARSE_KEY) {
2411        return serde_json::from_value::<SparseVector>(raw.clone()).ok();
2412    }
2413    let text = payload.get(TEXT_KEY)?.as_str()?;
2414    Some(text_to_sparse(text))
2415}
2416
2417// Maintain the derived sparse inverted index for one point write (ADR-0045): index
2418// the point's sparse vector, or drop any prior entry when the new payload no longer
2419// carries one (an update that removed it). A no-op when the collection has no
2420// inverted index, or when a rebuild is pending — the rebuild repopulates the index
2421// authoritatively from the store, exactly as it does the dense index.
2422fn sparse_index_upsert_point(handle: &mut CollectionHandle, ext_id: &str, payload: &Value) {
2423    if handle.stale {
2424        return;
2425    }
2426    let Some(idx) = handle.sparse.as_mut() else {
2427        return;
2428    };
2429    match sparse_vector_from_value(payload) {
2430        Some(sv) => idx.upsert(ext_id, &sv),
2431        None => {
2432            idx.remove(ext_id);
2433        }
2434    }
2435}
2436
2437// Drop one point from the derived sparse inverted index. A no-op when the
2438// collection has no inverted index.
2439fn sparse_index_delete_point(handle: &mut CollectionHandle, ext_id: &str) {
2440    if let Some(idx) = handle.sparse.as_mut() {
2441        idx.remove(ext_id);
2442    }
2443}
2444
2445// The set of live external ids guaranteed to contain every row the `filter`
2446// accepts (a sound superset), resolved through the collection's secondary
2447// indexes. `None` means the filter cannot be narrowed with the available indexes
2448// and the caller should post-filter instead; `Some(set)` is a candidate
2449// superset, and an empty set proves no row can match.
2450//
2451// Only indexable leaves — equality, `in`, and range comparisons on declared
2452// filterable fields of a matching type — constrain the set. Everything else
2453// (negation, existence, `ne`, non-indexed fields, type mismatches) is treated as
2454// unconstrained, which keeps the result a superset; exactness is restored when
2455// the caller re-checks the full `Filter` on each surviving candidate.
2456fn candidate_ids(
2457    store: &Store,
2458    cid: CollectionId,
2459    filter: &Filter,
2460    filterable: &[FilterableField],
2461) -> Result<Option<BTreeSet<String>>> {
2462    match filter {
2463        Filter::And(subs) => {
2464            // Intersect the constrained children; an unconstrained child (None)
2465            // is dropped, which only widens the set — still a superset.
2466            let mut acc: Option<BTreeSet<String>> = None;
2467            for sub in subs {
2468                if let Some(set) = candidate_ids(store, cid, sub, filterable)? {
2469                    acc = Some(match acc {
2470                        Some(existing) => existing.intersection(&set).cloned().collect(),
2471                        None => set,
2472                    });
2473                }
2474            }
2475            Ok(acc)
2476        }
2477        Filter::Or(subs) => {
2478            // The union of the children — but one unconstrained child makes the
2479            // whole disjunction unconstrained (its rows could be anything).
2480            let mut acc = BTreeSet::new();
2481            for sub in subs {
2482                match candidate_ids(store, cid, sub, filterable)? {
2483                    Some(set) => acc.extend(set),
2484                    None => return Ok(None),
2485                }
2486            }
2487            Ok(Some(acc))
2488        }
2489        // A negation cannot be narrowed to a superset with these indexes.
2490        Filter::Not(_) => Ok(None),
2491        // A leaf: indexable ⇒ its matching ids; otherwise unconstrained.
2492        leaf => match leaf_predicate(leaf, filterable) {
2493            Some(pred) => Ok(Some(store.matching_ids(cid, &pred)?.into_iter().collect())),
2494            None => Ok(None),
2495        },
2496    }
2497}
2498
2499// Map a single filter leaf to an indexable secondary-index predicate, if its
2500// field is declared filterable and the value types line up. Boolean nodes and
2501// predicates the secondary index cannot answer (`Ne`, `Exists`) return `None`.
2502fn leaf_predicate(filter: &Filter, filterable: &[FilterableField]) -> Option<SecPredicate> {
2503    let field_type = |field: &str| {
2504        filterable
2505            .iter()
2506            .find(|f| f.path == field)
2507            .map(|f| f.field_type)
2508    };
2509    match filter {
2510        Filter::Eq { field, value } => Some(SecPredicate::Eq {
2511            field: field.clone(),
2512            value: sec_value(field_type(field)?, value)?,
2513        }),
2514        Filter::In { field, values } => {
2515            let ft = field_type(field)?;
2516            // Every value must encode, or the `in` set would be understated and
2517            // the candidate superset unsound.
2518            let values: Option<Vec<SecValue>> = values.iter().map(|v| sec_value(ft, v)).collect();
2519            Some(SecPredicate::In {
2520                field: field.clone(),
2521                values: values?,
2522            })
2523        }
2524        Filter::Lt { field, value } => {
2525            one_sided_range(field, field_type(field)?, value, false, false)
2526        }
2527        Filter::Lte { field, value } => {
2528            one_sided_range(field, field_type(field)?, value, false, true)
2529        }
2530        Filter::Gt { field, value } => {
2531            one_sided_range(field, field_type(field)?, value, true, false)
2532        }
2533        Filter::Gte { field, value } => {
2534            one_sided_range(field, field_type(field)?, value, true, true)
2535        }
2536        _ => None,
2537    }
2538}
2539
2540// Build a one-sided range predicate from a comparison leaf. `is_lower` selects
2541// whether `value` is the lower (`Gt`/`Gte`) or upper (`Lt`/`Lte`) bound;
2542// `inclusive` is that bound's inclusivity. `None` on a type mismatch.
2543fn one_sided_range(
2544    field: &str,
2545    field_type: FieldType,
2546    value: &Value,
2547    is_lower: bool,
2548    inclusive: bool,
2549) -> Option<SecPredicate> {
2550    let v = sec_value(field_type, value)?;
2551    let (lo, hi, lo_inclusive, hi_inclusive) = if is_lower {
2552        (Some(v), None, inclusive, false)
2553    } else {
2554        (None, Some(v), false, inclusive)
2555    };
2556    Some(SecPredicate::Range {
2557        field: field.to_owned(),
2558        lo,
2559        hi,
2560        lo_inclusive,
2561        hi_inclusive,
2562    })
2563}
2564
2565// Encode a filter value as a typed secondary-index value, or `None` when the
2566// JSON type does not match the field's declared type (so it cannot be
2567// pre-filtered and the planner falls back to post-filtering).
2568fn sec_value(field_type: FieldType, value: &Value) -> Option<SecValue> {
2569    match (field_type, value) {
2570        (FieldType::Keyword, Value::String(s)) => Some(SecValue::Keyword(s.clone())),
2571        (FieldType::Numeric, Value::Number(n)) => n.as_f64().map(SecValue::Numeric),
2572        _ => None,
2573    }
2574}
2575
2576#[cfg(test)]
2577mod tests {
2578    use super::*;
2579    use serde_json::json;
2580
2581    fn desc() -> Descriptor {
2582        Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
2583    }
2584
2585    fn open(dir: &Path) -> Database {
2586        Database::open(dir).unwrap()
2587    }
2588
2589    #[test]
2590    fn hybrid_search_fuses_dense_and_sparse() {
2591        let tmp = tempfile::tempdir().unwrap();
2592        let mut db = open(tmp.path());
2593        db.create_collection("kb", desc()).unwrap();
2594        // "a" is the nearest dense neighbour of the query; "b" shares the query's
2595        // sparse terms; "c" is good on neither.
2596        db.upsert(
2597            "kb",
2598            "a",
2599            &[1.0, 0.0, 0.0, 0.0],
2600            &json!({ "__quiver_sparse__": { "indices": [100], "values": [0.1] } }),
2601        )
2602        .unwrap();
2603        db.upsert(
2604            "kb",
2605            "b",
2606            &[0.0, 1.0, 0.0, 0.0],
2607            &json!({ "__quiver_sparse__": { "indices": [1, 2], "values": [5.0, 5.0] } }),
2608        )
2609        .unwrap();
2610        db.upsert(
2611            "kb",
2612            "c",
2613            &[0.0, 0.0, 0.0, 1.0],
2614            &json!({ "__quiver_sparse__": { "indices": [9], "values": [1.0] } }),
2615        )
2616        .unwrap();
2617
2618        let dense_q = [1.0, 0.0, 0.0, 0.0];
2619        let sparse_q = SparseVector {
2620            indices: vec![1, 2],
2621            values: vec![1.0, 1.0],
2622        };
2623        let params = SearchParams {
2624            k: 3,
2625            ..SearchParams::default()
2626        };
2627
2628        // Hybrid: "a" (dense) and "b" (sparse) both rank above "c" (neither).
2629        let hits = db
2630            .hybrid_search(
2631                "kb",
2632                Some(&dense_q),
2633                Some(&sparse_q),
2634                None,
2635                &params,
2636                DEFAULT_RRF_K0,
2637            )
2638            .unwrap();
2639        let ids: Vec<&str> = hits.iter().map(|m| m.id.as_str()).collect();
2640        assert!(ids.contains(&"a") && ids.contains(&"b"), "got {ids:?}");
2641        assert_eq!(ids[2], "c", "c is worst on both sides; got {ids:?}");
2642
2643        // Pure sparse: only "b" shares the query's terms.
2644        let sparse_only = db
2645            .hybrid_search("kb", None, Some(&sparse_q), None, &params, DEFAULT_RRF_K0)
2646            .unwrap();
2647        assert_eq!(sparse_only[0].id, "b");
2648
2649        // Pure dense: "a" is nearest.
2650        let dense_only = db
2651            .hybrid_search("kb", Some(&dense_q), None, None, &params, DEFAULT_RRF_K0)
2652            .unwrap();
2653        assert_eq!(dense_only[0].id, "a");
2654
2655        // Neither query is an error.
2656        assert!(
2657            db.hybrid_search("kb", None, None, None, &params, DEFAULT_RRF_K0)
2658                .is_err()
2659        );
2660    }
2661
2662    // Pure-sparse hybrid result ids, in fused order.
2663    fn sparse_ids(db: &mut Database, q: &SparseVector) -> Vec<String> {
2664        let params = SearchParams {
2665            k: 10,
2666            ..SearchParams::default()
2667        };
2668        db.hybrid_search("kb", None, Some(q), None, &params, DEFAULT_RRF_K0)
2669            .unwrap()
2670            .into_iter()
2671            .map(|m| m.id)
2672            .collect()
2673    }
2674
2675    #[test]
2676    fn sparse_index_equals_the_store_scan_fallback() {
2677        let tmp = tempfile::tempdir().unwrap();
2678        let mut db = open(tmp.path());
2679        db.create_collection("kb", desc()).unwrap();
2680        let z = [0.0f32, 0.0, 0.0, 0.0];
2681        for (id, dims, vals) in [
2682            ("a", vec![1u32, 2], vec![5.0f32, 1.0]),
2683            ("b", vec![2u32, 3], vec![3.0f32, 4.0]),
2684            ("c", vec![1u32, 3], vec![2.0f32, 2.0]),
2685            ("d", vec![9u32], vec![1.0f32]), // shares no query term
2686        ] {
2687            db.upsert(
2688                "kb",
2689                id,
2690                &z,
2691                &json!({ "__quiver_sparse__": { "indices": dims, "values": vals } }),
2692            )
2693            .unwrap();
2694        }
2695        let q = SparseVector {
2696            indices: vec![1, 2, 3],
2697            values: vec![1.0, 1.0, 1.0],
2698        };
2699
2700        // The derived index is present and used.
2701        assert!(db.collections.get("kb").unwrap().sparse.is_some());
2702        let via_index = sparse_ids(&mut db, &q);
2703        assert!(!via_index.contains(&"d".to_owned()), "d shares no term");
2704
2705        // Drop the index (not stale, so no rebuild) → the store-scan fallback runs
2706        // and must return the identical ranking.
2707        db.collections.get_mut("kb").unwrap().sparse = None;
2708        let via_scan = sparse_ids(&mut db, &q);
2709        assert_eq!(via_index, via_scan);
2710    }
2711
2712    #[test]
2713    fn sparse_index_reflects_updates_and_deletes_like_a_rebuild() {
2714        let tmp = tempfile::tempdir().unwrap();
2715        let mut db = open(tmp.path());
2716        db.create_collection("kb", desc()).unwrap();
2717        let z = [0.0f32, 0.0, 0.0, 0.0];
2718        db.upsert(
2719            "kb",
2720            "a",
2721            &z,
2722            &json!({ "__quiver_sparse__": { "indices": [1, 2], "values": [5.0, 5.0] } }),
2723        )
2724        .unwrap();
2725        db.upsert(
2726            "kb",
2727            "b",
2728            &z,
2729            &json!({ "__quiver_sparse__": { "indices": [2], "values": [3.0] } }),
2730        )
2731        .unwrap();
2732        db.upsert(
2733            "kb",
2734            "c",
2735            &z,
2736            &json!({ "__quiver_sparse__": { "indices": [1], "values": [9.0] } }),
2737        )
2738        .unwrap();
2739        // Update "a" onto a disjoint term; delete "b".
2740        db.upsert(
2741            "kb",
2742            "a",
2743            &z,
2744            &json!({ "__quiver_sparse__": { "indices": [7], "values": [1.0] } }),
2745        )
2746        .unwrap();
2747        assert!(db.delete("kb", "b").unwrap());
2748
2749        let q = SparseVector {
2750            indices: vec![1, 2],
2751            values: vec![1.0, 1.0],
2752        };
2753        // Only "c" still shares a query term ("a" moved to 7, "b" is gone).
2754        let incremental = sparse_ids(&mut db, &q);
2755        assert_eq!(incremental, vec!["c".to_owned()]);
2756
2757        // A full rebuild from the store must agree with the incremental state.
2758        db.collections.get_mut("kb").unwrap().stale = true;
2759        let rebuilt = sparse_ids(&mut db, &q);
2760        assert_eq!(incremental, rebuilt);
2761    }
2762
2763    #[test]
2764    fn sparse_index_is_rebuilt_on_reopen() {
2765        let tmp = tempfile::tempdir().unwrap();
2766        {
2767            let mut db = open(tmp.path());
2768            db.create_collection("kb", desc()).unwrap();
2769            db.upsert(
2770                "kb",
2771                "a",
2772                &[0.0, 0.0, 0.0, 0.0],
2773                &json!({ "__quiver_sparse__": { "indices": [1], "values": [1.0] } }),
2774            )
2775            .unwrap();
2776        }
2777        let mut db = open(tmp.path());
2778        assert!(db.collections.get("kb").unwrap().sparse.is_some());
2779        let q = SparseVector {
2780            indices: vec![1],
2781            values: vec![1.0],
2782        };
2783        assert_eq!(sparse_ids(&mut db, &q), vec!["a".to_owned()]);
2784    }
2785
2786    #[test]
2787    fn hybrid_sparse_honours_the_payload_filter() {
2788        let tmp = tempfile::tempdir().unwrap();
2789        let mut db = open(tmp.path());
2790        db.create_collection("kb", desc()).unwrap();
2791        let z = [0.0f32, 0.0, 0.0, 0.0];
2792        db.upsert(
2793            "kb",
2794            "a",
2795            &z,
2796            &json!({ "lang": "en", "__quiver_sparse__": { "indices": [1], "values": [5.0] } }),
2797        )
2798        .unwrap();
2799        db.upsert(
2800            "kb",
2801            "b",
2802            &z,
2803            &json!({ "lang": "fr", "__quiver_sparse__": { "indices": [1], "values": [9.0] } }),
2804        )
2805        .unwrap();
2806        let q = SparseVector {
2807            indices: vec![1],
2808            values: vec![1.0],
2809        };
2810        let params = SearchParams {
2811            k: 10,
2812            filter: Some(Filter::Eq {
2813                field: "lang".to_owned(),
2814                value: json!("en"),
2815            }),
2816            ..SearchParams::default()
2817        };
2818        let hits: Vec<String> = db
2819            .hybrid_search("kb", None, Some(&q), None, &params, DEFAULT_RRF_K0)
2820            .unwrap()
2821            .into_iter()
2822            .map(|m| m.id)
2823            .collect();
2824        // "b" scores higher but is filtered out by lang == "en".
2825        assert_eq!(hits, vec!["a".to_owned()]);
2826    }
2827
2828    #[test]
2829    fn hybrid_text_search_indexes_and_ranks_by_bm25() {
2830        let tmp = tempfile::tempdir().unwrap();
2831        let mut db = open(tmp.path());
2832        db.create_collection("kb", desc()).unwrap();
2833        let z = [0.0f32, 0.0, 0.0, 0.0];
2834        // Points carry only a `__quiver_text__` string — tokenized at ingest.
2835        db.upsert(
2836            "kb",
2837            "cats",
2838            &z,
2839            &json!({ "__quiver_text__": "the quick brown cat jumps" }),
2840        )
2841        .unwrap();
2842        db.upsert(
2843            "kb",
2844            "dogs",
2845            &z,
2846            &json!({ "__quiver_text__": "a lazy dog sleeps all day" }),
2847        )
2848        .unwrap();
2849
2850        let params = SearchParams {
2851            k: 10,
2852            ..SearchParams::default()
2853        };
2854        // A text query (no dense, no explicit sparse) ranks the lexical match first;
2855        // "cat"/"cats" conflate via the stemmer.
2856        let hits: Vec<String> = db
2857            .hybrid_search("kb", None, None, Some("cats"), &params, DEFAULT_RRF_K0)
2858            .unwrap()
2859            .into_iter()
2860            .map(|m| m.id)
2861            .collect();
2862        assert_eq!(hits, vec!["cats".to_owned()], "only the cat doc matches");
2863
2864        // A non-matching query returns nothing from the text side.
2865        assert!(
2866            db.hybrid_search("kb", None, None, Some("elephant"), &params, DEFAULT_RRF_K0)
2867                .unwrap()
2868                .is_empty()
2869        );
2870
2871        // Text fuses with a dense query through the same RRF path.
2872        let dense_q = [1.0, 0.0, 0.0, 0.0];
2873        db.upsert("kb", "near", &[1.0, 0.0, 0.0, 0.0], &json!({}))
2874            .unwrap();
2875        let fused: Vec<String> = db
2876            .hybrid_search(
2877                "kb",
2878                Some(&dense_q),
2879                None,
2880                Some("dog"),
2881                &params,
2882                DEFAULT_RRF_K0,
2883            )
2884            .unwrap()
2885            .into_iter()
2886            .map(|m| m.id)
2887            .collect();
2888        assert!(
2889            fused.contains(&"near".to_owned()) && fused.contains(&"dogs".to_owned()),
2890            "dense match + lexical match both surface; got {fused:?}"
2891        );
2892    }
2893
2894    #[test]
2895    fn create_upsert_search_get_end_to_end() {
2896        let tmp = tempfile::tempdir().unwrap();
2897        let mut db = open(tmp.path());
2898        db.create_collection("items", desc()).unwrap();
2899        db.upsert(
2900            "items",
2901            "a",
2902            &[0.0, 0.0, 0.0, 0.0],
2903            &json!({"color": "red"}),
2904        )
2905        .unwrap();
2906        db.upsert(
2907            "items",
2908            "b",
2909            &[1.0, 0.0, 0.0, 0.0],
2910            &json!({"color": "blue"}),
2911        )
2912        .unwrap();
2913        db.upsert(
2914            "items",
2915            "c",
2916            &[5.0, 5.0, 5.0, 5.0],
2917            &json!({"color": "red"}),
2918        )
2919        .unwrap();
2920
2921        let near = db
2922            .search("items", &[0.1, 0.0, 0.0, 0.0], &SearchParams::default())
2923            .unwrap();
2924        assert_eq!(near[0].id, "a");
2925        assert_eq!(near[1].id, "b");
2926
2927        let got = db.get("items", "c").unwrap().unwrap();
2928        assert_eq!(got.vector, Some(vec![5.0, 5.0, 5.0, 5.0]));
2929        assert_eq!(got.payload, Some(json!({"color": "red"})));
2930    }
2931
2932    #[test]
2933    fn upsert_batch_produces_same_search_results_as_sequential() {
2934        let tmp_seq = tempfile::tempdir().unwrap();
2935        let tmp_bat = tempfile::tempdir().unwrap();
2936
2937        let vectors: Vec<[f32; 4]> = (0..20u32).map(|i| [i as f32, 0.0, 0.0, 0.0]).collect();
2938        let ids: Vec<String> = (0..20u32).map(|i| format!("p{i}")).collect();
2939        let payload = json!({});
2940
2941        // Sequential upserts
2942        {
2943            let mut db = open(tmp_seq.path());
2944            db.create_collection("c", desc()).unwrap();
2945            for (id, vec) in ids.iter().zip(vectors.iter()) {
2946                db.upsert("c", id, vec, &payload).unwrap();
2947            }
2948        }
2949        // Batch upsert
2950        {
2951            let mut db = open(tmp_bat.path());
2952            db.create_collection("c", desc()).unwrap();
2953            let pts: Vec<(&str, &[f32], &serde_json::Value)> = ids
2954                .iter()
2955                .zip(vectors.iter())
2956                .map(|(id, v)| (id.as_str(), v.as_slice(), &payload))
2957                .collect();
2958            let n = db.upsert_batch("c", &pts).unwrap();
2959            assert_eq!(n, 20);
2960        }
2961
2962        let query = [10.0f32, 0.0, 0.0, 0.0];
2963        let params = SearchParams {
2964            k: 5,
2965            ..Default::default()
2966        };
2967
2968        let mut seq_db = open(tmp_seq.path());
2969        let mut bat_db = open(tmp_bat.path());
2970        let seq: Vec<String> = seq_db
2971            .search("c", &query, &params)
2972            .unwrap()
2973            .into_iter()
2974            .map(|m| m.id)
2975            .collect();
2976        let bat: Vec<String> = bat_db
2977            .search("c", &query, &params)
2978            .unwrap()
2979            .into_iter()
2980            .map(|m| m.id)
2981            .collect();
2982        assert_eq!(
2983            seq, bat,
2984            "batch and sequential produce different search results"
2985        );
2986    }
2987
2988    #[test]
2989    fn upsert_bulk_defers_the_index_then_searches_correctly() {
2990        let tmp = tempfile::tempdir().unwrap();
2991        let mut db = open(tmp.path());
2992        db.create_collection("c", desc()).unwrap();
2993        let vectors: Vec<[f32; 4]> = (0..20u32).map(|i| [i as f32, 0.0, 0.0, 0.0]).collect();
2994        let ids: Vec<String> = (0..20u32).map(|i| format!("p{i}")).collect();
2995        // Give one point a sparse vector so the deferred rebuild also rebuilds the
2996        // inverted index.
2997        let plain = json!({});
2998        let sparse_payload = json!({ "__quiver_sparse__": { "indices": [7], "values": [1.0] } });
2999        let pts: Vec<(&str, &[f32], &serde_json::Value)> = ids
3000            .iter()
3001            .zip(vectors.iter())
3002            .map(|(id, v)| {
3003                let payload = if id == "p3" { &sparse_payload } else { &plain };
3004                (id.as_str(), v.as_slice(), payload)
3005            })
3006            .collect();
3007        let n = db.upsert_bulk("c", &pts).unwrap();
3008        assert_eq!(n, 20);
3009
3010        // The bulk path defers index maintenance: the handle is stale until a read.
3011        assert!(db.collections.get("c").unwrap().stale);
3012
3013        // Dense search triggers the single rebuild and returns the nearest points.
3014        let query = [10.0f32, 0.0, 0.0, 0.0];
3015        let params = SearchParams {
3016            k: 5,
3017            ..Default::default()
3018        };
3019        let hits: Vec<String> = db
3020            .search("c", &query, &params)
3021            .unwrap()
3022            .into_iter()
3023            .map(|m| m.id)
3024            .collect();
3025        assert_eq!(hits[0], "p10", "nearest to 10 is p10; got {hits:?}");
3026        assert!(!db.collections.get("c").unwrap().stale, "rebuilt on read");
3027
3028        // The sparse vector loaded via the bulk path is searchable after the rebuild.
3029        let q = SparseVector {
3030            indices: vec![7],
3031            values: vec![1.0],
3032        };
3033        let sparse_hits: Vec<String> = db
3034            .hybrid_search("c", None, Some(&q), None, &params, DEFAULT_RRF_K0)
3035            .unwrap()
3036            .into_iter()
3037            .map(|m| m.id)
3038            .collect();
3039        assert_eq!(sparse_hits, vec!["p3".to_owned()]);
3040    }
3041
3042    #[test]
3043    fn filtered_search_only_returns_matching_payloads() {
3044        let tmp = tempfile::tempdir().unwrap();
3045        let mut db = open(tmp.path());
3046        db.create_collection("items", desc()).unwrap();
3047        for i in 0..20u32 {
3048            let color = if i % 2 == 0 { "red" } else { "blue" };
3049            db.upsert(
3050                "items",
3051                &format!("p{i}"),
3052                &[i as f32, 0.0, 0.0, 0.0],
3053                &json!({"color": color, "n": i}),
3054            )
3055            .unwrap();
3056        }
3057        let params = SearchParams {
3058            k: 5,
3059            filter: Some(Filter::Eq {
3060                field: "color".into(),
3061                value: json!("red"),
3062            }),
3063            ef_search: 64,
3064            with_payload: true,
3065            with_vector: false,
3066        };
3067        let results = db.search("items", &[0.0; 4], &params).unwrap();
3068        assert!(!results.is_empty());
3069        for m in &results {
3070            assert_eq!(m.payload.as_ref().unwrap()["color"], json!("red"));
3071        }
3072    }
3073
3074    #[test]
3075    fn persists_and_rebuilds_index_on_reopen() {
3076        let tmp = tempfile::tempdir().unwrap();
3077        {
3078            let mut db = open(tmp.path());
3079            db.create_collection("items", desc()).unwrap();
3080            for i in 0..50u32 {
3081                db.upsert(
3082                    "items",
3083                    &format!("p{i}"),
3084                    &[i as f32, 1.0, 2.0, 3.0],
3085                    &json!({}),
3086                )
3087                .unwrap();
3088            }
3089            db.checkpoint().unwrap();
3090        }
3091        let mut db = open(tmp.path());
3092        assert_eq!(db.len("items").unwrap(), 50);
3093        let res = db
3094            .search("items", &[7.0, 1.0, 2.0, 3.0], &SearchParams::default())
3095            .unwrap();
3096        assert_eq!(res[0].id, "p7");
3097    }
3098
3099    #[test]
3100    fn update_reflects_new_vector_after_rebuild() {
3101        let tmp = tempfile::tempdir().unwrap();
3102        let mut db = open(tmp.path());
3103        db.create_collection("items", desc()).unwrap();
3104        db.upsert("items", "a", &[0.0; 4], &json!({})).unwrap();
3105        db.upsert("items", "b", &[10.0, 0.0, 0.0, 0.0], &json!({}))
3106            .unwrap();
3107        // Move "a" far away; querying near the origin should now prefer "b".
3108        db.upsert("items", "a", &[100.0, 0.0, 0.0, 0.0], &json!({}))
3109            .unwrap();
3110        let res = db
3111            .search("items", &[0.0; 4], &SearchParams::default())
3112            .unwrap();
3113        assert_eq!(res[0].id, "b");
3114        assert_eq!(
3115            db.get("items", "a").unwrap().unwrap().vector,
3116            Some(vec![100.0, 0.0, 0.0, 0.0])
3117        );
3118    }
3119
3120    #[test]
3121    fn delete_removes_from_search() {
3122        let tmp = tempfile::tempdir().unwrap();
3123        let mut db = open(tmp.path());
3124        db.create_collection("items", desc()).unwrap();
3125        db.upsert("items", "a", &[0.0; 4], &json!({})).unwrap();
3126        db.upsert("items", "b", &[1.0, 0.0, 0.0, 0.0], &json!({}))
3127            .unwrap();
3128        assert!(db.delete("items", "a").unwrap());
3129        let res = db
3130            .search("items", &[0.0; 4], &SearchParams::default())
3131            .unwrap();
3132        assert!(res.iter().all(|m| m.id != "a"));
3133        assert!(db.get("items", "a").unwrap().is_none());
3134    }
3135
3136    #[test]
3137    fn unknown_collection_errors() {
3138        let tmp = tempfile::tempdir().unwrap();
3139        let mut db = open(tmp.path());
3140        assert!(matches!(
3141            db.search("nope", &[0.0; 4], &SearchParams::default()),
3142            Err(Error::CollectionNotFound(_))
3143        ));
3144        db.create_collection("c", desc()).unwrap();
3145        assert!(matches!(
3146            db.create_collection("c", desc()),
3147            Err(Error::Core(quiver_core::CoreError::AlreadyExists(_)))
3148        ));
3149    }
3150
3151    fn desc_with(kind: IndexKind) -> Descriptor {
3152        Descriptor::new(4, Dtype::F32, DistanceMetric::L2).with_index(IndexSpec {
3153            kind,
3154            pq_subspaces: None,
3155        })
3156    }
3157
3158    #[test]
3159    fn vamana_and_ivf_collections_find_the_nearest_point() {
3160        for kind in [IndexKind::Vamana, IndexKind::Ivf] {
3161            let tmp = tempfile::tempdir().unwrap();
3162            let mut db = open(tmp.path());
3163            db.create_collection("c", desc_with(kind)).unwrap();
3164            for i in 0..40u32 {
3165                db.upsert(
3166                    "c",
3167                    &format!("p{i}"),
3168                    &[i as f32, 0.0, 0.0, 0.0],
3169                    &json!({}),
3170                )
3171                .unwrap();
3172            }
3173            // ef_search maps onto the index's search width (l_search / nprobe).
3174            let res = db
3175                .search("c", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
3176                .unwrap();
3177            assert_eq!(res[0].id, "p7", "{kind:?} nearest");
3178        }
3179    }
3180
3181    #[test]
3182    fn index_kind_persists_and_rebuilds_on_reopen() {
3183        let tmp = tempfile::tempdir().unwrap();
3184        {
3185            let mut db = open(tmp.path());
3186            db.create_collection("v", desc_with(IndexKind::Vamana))
3187                .unwrap();
3188            for i in 0..20u32 {
3189                db.upsert(
3190                    "v",
3191                    &format!("p{i}"),
3192                    &[i as f32, 1.0, 2.0, 3.0],
3193                    &json!({}),
3194                )
3195                .unwrap();
3196            }
3197            db.checkpoint().unwrap();
3198        }
3199        let mut db = open(tmp.path());
3200        assert_eq!(db.descriptor("v").unwrap().index.kind, IndexKind::Vamana);
3201        let res = db
3202            .search("v", &[7.0, 1.0, 2.0, 3.0], &SearchParams::default())
3203            .unwrap();
3204        assert_eq!(res[0].id, "p7");
3205    }
3206
3207    // ADR-0063: a DiskVamana collection reopens by loading its frugal mmap base +
3208    // replaying the WAL tail, not by an O(N) full-RAM rebuild. Proven structurally:
3209    // the on-disk base keeps its checkpoint row count after reopen (a rebuild would
3210    // rewrite it to include the post-checkpoint inserts).
3211    #[test]
3212    fn disk_index_loads_from_snapshot_without_rebuild_on_reopen() {
3213        let tmp = tempfile::tempdir().unwrap();
3214        let cid;
3215        {
3216            let mut db = open(tmp.path());
3217            db.create_collection("d", desc_with(IndexKind::DiskVamana))
3218                .unwrap();
3219            for i in 0..100u32 {
3220                db.upsert(
3221                    "d",
3222                    &format!("p{i}"),
3223                    &[i as f32, 0.0, 0.0, 0.0],
3224                    &json!({}),
3225                )
3226                .unwrap();
3227            }
3228            // Force the deferred build so the base file + checkpoint blob exist.
3229            db.search("d", &[1.0, 0.0, 0.0, 0.0], &SearchParams::default())
3230                .unwrap();
3231            db.checkpoint().unwrap();
3232            // 15 more after the checkpoint (< the 20% consolidation threshold): they
3233            // land in the in-memory delta + the WAL tail, not the base file.
3234            for i in 100..115u32 {
3235                db.upsert(
3236                    "d",
3237                    &format!("p{i}"),
3238                    &[i as f32, 0.0, 0.0, 0.0],
3239                    &json!({}),
3240                )
3241                .unwrap();
3242            }
3243            cid = db.collections["d"].id;
3244            let base = open_disk_index(
3245                &db.store,
3246                cid,
3247                db.store.collection_codec_clone(cid).unwrap(),
3248            )
3249            .unwrap();
3250            assert_eq!(base.len(), 100, "base sealed at the checkpoint count");
3251        }
3252
3253        let mut db = open(tmp.path());
3254        // A base point and a post-checkpoint (WAL-tail-replayed) point are both found.
3255        assert_eq!(
3256            db.search("d", &[50.0, 0.0, 0.0, 0.0], &SearchParams::default())
3257                .unwrap()[0]
3258                .id,
3259            "p50",
3260        );
3261        assert_eq!(
3262            db.search("d", &[110.0, 0.0, 0.0, 0.0], &SearchParams::default())
3263                .unwrap()[0]
3264                .id,
3265            "p110",
3266            "post-checkpoint insert survived reopen via WAL-tail replay",
3267        );
3268        // Loaded, not rebuilt: the base file still holds exactly the 100 checkpoint
3269        // rows. A rebuild on open would have rewritten it over all 115 live rows.
3270        let base = open_disk_index(
3271            &db.store,
3272            cid,
3273            db.store.collection_codec_clone(cid).unwrap(),
3274        )
3275        .unwrap();
3276        assert_eq!(
3277            base.len(),
3278            100,
3279            "reopen loaded the base; it was not rebuilt"
3280        );
3281    }
3282
3283    // ADR-0063: the durable artifact is never load-bearing for correctness — a
3284    // missing/torn base falls back to an authoritative rebuild that still answers.
3285    #[test]
3286    fn disk_index_falls_back_to_rebuild_when_base_is_missing() {
3287        let tmp = tempfile::tempdir().unwrap();
3288        let base_path;
3289        {
3290            let mut db = open(tmp.path());
3291            db.create_collection("d", desc_with(IndexKind::DiskVamana))
3292                .unwrap();
3293            for i in 0..60u32 {
3294                db.upsert(
3295                    "d",
3296                    &format!("p{i}"),
3297                    &[i as f32, 0.0, 0.0, 0.0],
3298                    &json!({}),
3299                )
3300                .unwrap();
3301            }
3302            db.search("d", &[1.0, 0.0, 0.0, 0.0], &SearchParams::default())
3303                .unwrap();
3304            db.checkpoint().unwrap();
3305            let cid = db.collections["d"].id;
3306            base_path = db.store.index_dir(cid).join(DISK_INDEX_FILE);
3307        }
3308        // Simulate a lost base: remove the file the snapshot references.
3309        std::fs::remove_file(&base_path).unwrap();
3310        {
3311            let mut db = open(tmp.path());
3312            assert_eq!(
3313                db.search("d", &[25.0, 0.0, 0.0, 0.0], &SearchParams::default())
3314                    .unwrap()[0]
3315                    .id,
3316                "p25",
3317                "rebuild fallback still answers correctly after a lost base",
3318            );
3319            // The fallback rebuild re-seals the base, and a subsequent checkpoint
3320            // refreshes the snapshot blob to match it.
3321            assert!(
3322                base_path.exists(),
3323                "the fallback rebuild re-sealed the base file"
3324            );
3325            db.checkpoint().unwrap();
3326        }
3327        // Simulate a torn base: truncate the file mid-way. `DiskVamana::open`'s
3328        // per-page checks reject it, so the load falls back to a rebuild.
3329        let len = std::fs::metadata(&base_path).unwrap().len();
3330        std::fs::OpenOptions::new()
3331            .write(true)
3332            .open(&base_path)
3333            .unwrap()
3334            .set_len(len / 2)
3335            .unwrap();
3336
3337        let mut db = open(tmp.path());
3338        assert_eq!(
3339            db.search("d", &[25.0, 0.0, 0.0, 0.0], &SearchParams::default())
3340                .unwrap()[0]
3341                .id,
3342            "p25",
3343            "rebuild fallback still answers correctly after a torn base",
3344        );
3345    }
3346
3347    #[test]
3348    fn ivf_upserts_and_deletes_incrementally_without_rebuild() {
3349        let tmp = tempfile::tempdir().unwrap();
3350        let mut db = open(tmp.path());
3351        db.create_collection("c", desc_with(IndexKind::Ivf))
3352            .unwrap();
3353        for i in 0..50u32 {
3354            db.upsert(
3355                "c",
3356                &format!("p{i}"),
3357                &[i as f32, 0.0, 0.0, 0.0],
3358                &json!({}),
3359            )
3360            .unwrap();
3361        }
3362        // The first search builds (and trains) the IVF from the store.
3363        let _ = db
3364            .search("c", &[1.0, 0.0, 0.0, 0.0], &SearchParams::default())
3365            .unwrap();
3366        assert!(!db.collections["c"].stale, "the search built the index");
3367
3368        // Incremental insert: a new outlier is found, with no rebuild scheduled.
3369        db.upsert("c", "far", &[500.0, 0.0, 0.0, 0.0], &json!({}))
3370            .unwrap();
3371        assert!(!db.collections["c"].stale, "ivf insert stayed incremental");
3372        let res = db
3373            .search("c", &[500.0, 0.0, 0.0, 0.0], &SearchParams::default())
3374            .unwrap();
3375        assert_eq!(res[0].id, "far");
3376
3377        // Incremental delete: the point disappears, still with no rebuild.
3378        assert!(db.delete("c", "far").unwrap());
3379        assert!(!db.collections["c"].stale, "ivf delete stayed incremental");
3380        let res = db
3381            .search("c", &[500.0, 0.0, 0.0, 0.0], &SearchParams::default())
3382            .unwrap();
3383        assert!(res.iter().all(|m| m.id != "far"), "deleted point is gone");
3384    }
3385
3386    #[test]
3387    fn ivf_incremental_update_replaces_the_vector() {
3388        let tmp = tempfile::tempdir().unwrap();
3389        let mut db = open(tmp.path());
3390        db.create_collection("c", desc_with(IndexKind::Ivf))
3391            .unwrap();
3392        for i in 0..30u32 {
3393            db.upsert(
3394                "c",
3395                &format!("p{i}"),
3396                &[i as f32, 0.0, 0.0, 0.0],
3397                &json!({}),
3398            )
3399            .unwrap();
3400        }
3401        let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
3402        // Move p5 far away in place.
3403        db.upsert("c", "p5", &[900.0, 0.0, 0.0, 0.0], &json!({}))
3404            .unwrap();
3405        assert!(!db.collections["c"].stale);
3406        let at_new = db
3407            .search("c", &[900.0, 0.0, 0.0, 0.0], &SearchParams::default())
3408            .unwrap();
3409        assert_eq!(at_new[0].id, "p5", "p5 found at its new location");
3410        let at_old = db
3411            .search("c", &[5.0, 0.0, 0.0, 0.0], &SearchParams::default())
3412            .unwrap();
3413        assert!(at_old.iter().all(|m| m.id != "p5"), "stale vector is gone");
3414    }
3415
3416    #[test]
3417    fn ivf_reinsert_after_incremental_delete_is_found() {
3418        let tmp = tempfile::tempdir().unwrap();
3419        let mut db = open(tmp.path());
3420        db.create_collection("c", desc_with(IndexKind::Ivf))
3421            .unwrap();
3422        for i in 0..20u32 {
3423            db.upsert(
3424                "c",
3425                &format!("p{i}"),
3426                &[i as f32, 0.0, 0.0, 0.0],
3427                &json!({}),
3428            )
3429            .unwrap();
3430        }
3431        let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
3432        assert!(db.delete("c", "p3").unwrap());
3433        assert!(!db.collections["c"].stale);
3434        // Re-insert the same id; it must be searchable again (the slot is reused).
3435        db.upsert("c", "p3", &[3.0, 0.0, 0.0, 0.0], &json!({}))
3436            .unwrap();
3437        assert!(!db.collections["c"].stale);
3438        let res = db
3439            .search("c", &[3.0, 0.0, 0.0, 0.0], &SearchParams::default())
3440            .unwrap();
3441        assert_eq!(res[0].id, "p3");
3442    }
3443
3444    #[test]
3445    fn hnsw_in_place_update_falls_back_to_rebuild() {
3446        // HNSW cannot update an id in place, so an update marks the index stale.
3447        let tmp = tempfile::tempdir().unwrap();
3448        let mut db = open(tmp.path());
3449        db.create_collection("c", desc()).unwrap();
3450        for i in 0..10u32 {
3451            db.upsert(
3452                "c",
3453                &format!("p{i}"),
3454                &[i as f32, 0.0, 0.0, 0.0],
3455                &json!({}),
3456            )
3457            .unwrap();
3458        }
3459        let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
3460        assert!(!db.collections["c"].stale);
3461        db.upsert("c", "p2", &[42.0, 0.0, 0.0, 0.0], &json!({}))
3462            .unwrap();
3463        assert!(db.collections["c"].stale, "hnsw update schedules a rebuild");
3464        // The rebuild on the next search reflects the new vector.
3465        let res = db
3466            .search("c", &[42.0, 0.0, 0.0, 0.0], &SearchParams::default())
3467            .unwrap();
3468        assert_eq!(res[0].id, "p2");
3469    }
3470
3471    #[test]
3472    fn unsupported_index_configurations_are_rejected() {
3473        let tmp = tempfile::tempdir().unwrap();
3474        let mut db = open(tmp.path());
3475        // Vamana/IVF do not support inner product.
3476        let dot_vamana =
3477            Descriptor::new(4, Dtype::F32, DistanceMetric::Dot).with_index(IndexSpec {
3478                kind: IndexKind::Vamana,
3479                pq_subspaces: None,
3480            });
3481        assert!(matches!(
3482            db.create_collection("a", dot_vamana),
3483            Err(Error::Unsupported(_))
3484        ));
3485        // ...nor does the disk-resident index.
3486        let dot_disk = Descriptor::new(4, Dtype::F32, DistanceMetric::Dot).with_index(IndexSpec {
3487            kind: IndexKind::DiskVamana,
3488            pq_subspaces: None,
3489        });
3490        assert!(matches!(
3491            db.create_collection("b", dot_disk),
3492            Err(Error::Unsupported(_))
3493        ));
3494    }
3495
3496    #[test]
3497    fn dcpe_collections_require_the_l2_metric() {
3498        let tmp = tempfile::tempdir().unwrap();
3499        let mut db = open(tmp.path());
3500        // DCPE preserves Euclidean distance, so cosine/dot are rejected.
3501        for metric in [DistanceMetric::Cosine, DistanceMetric::Dot] {
3502            let bad = Descriptor::new(4, Dtype::F32, metric)
3503                .with_vector_encryption(VectorEncryption::Dcpe);
3504            assert!(matches!(
3505                db.create_collection("bad", bad),
3506                Err(Error::Unsupported(_))
3507            ));
3508        }
3509        // L2 is accepted, and the flag persists on the descriptor.
3510        let good = Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
3511            .with_vector_encryption(VectorEncryption::Dcpe);
3512        db.create_collection("enc", good)
3513            .expect("l2 dcpe collection");
3514        assert_eq!(
3515            db.descriptor("enc").expect("descriptor").vector_encryption,
3516            VectorEncryption::Dcpe
3517        );
3518    }
3519
3520    #[test]
3521    fn client_side_collections_are_fetch_only_and_reject_search() {
3522        let tmp = tempfile::tempdir().unwrap();
3523        let mut db = open(tmp.path());
3524        // Any metric is allowed (the server never ranks), so there is no L2
3525        // restriction as there is for DCPE.
3526        let desc = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
3527            .with_vector_encryption(VectorEncryption::ClientSide);
3528        db.create_collection("vault", desc)
3529            .expect("create client-side collection");
3530        // No server-side index is built for a client-side collection (ADR-0032).
3531        assert!(matches!(
3532            db.collections["vault"].index,
3533            CollectionIndex::None
3534        ));
3535
3536        // Upsert opaque placeholder points: a zero vector plus a payload carrying a
3537        // stand-in sealed vector blob and a cleartext, server-filterable field.
3538        for i in 0..5 {
3539            let tier = if i < 2 { "vip" } else { "std" };
3540            db.upsert(
3541                "vault",
3542                &format!("p{i}"),
3543                &[0.0; 4],
3544                &serde_json::json!({ "__quiver_vec__": format!("ct-{i}"), "tier": tier }),
3545            )
3546            .expect("upsert");
3547        }
3548        assert_eq!(db.len("vault").unwrap(), 5);
3549        // Still no index after writes — it never goes stale or rebuilds.
3550        assert!(matches!(
3551            db.collections["vault"].index,
3552            CollectionIndex::None
3553        ));
3554
3555        // Ranked search is rejected: the server cannot rank opaque vectors.
3556        assert!(matches!(
3557            db.search("vault", &[0.0; 4], &SearchParams::default()),
3558            Err(Error::Unsupported(_))
3559        ));
3560
3561        // Fetch returns the entitled set; each payload carries the blob the client
3562        // would decrypt and rank locally, and vectors are omitted when not asked for.
3563        let all = db.fetch("vault", None, 100, true, false).unwrap();
3564        assert_eq!(all.len(), 5);
3565        assert!(
3566            all.iter()
3567                .all(|m| m.payload.is_some() && m.vector.is_none())
3568        );
3569
3570        // A cleartext payload filter narrows the set server-side.
3571        let vip = db
3572            .fetch(
3573                "vault",
3574                Some(&Filter::Eq {
3575                    field: "tier".to_owned(),
3576                    value: serde_json::json!("vip"),
3577                }),
3578                100,
3579                false,
3580                false,
3581            )
3582            .unwrap();
3583        assert_eq!(vip.len(), 2);
3584        // A limit bounds the returned set.
3585        assert_eq!(db.fetch("vault", None, 2, false, false).unwrap().len(), 2);
3586
3587        // get returns the stored placeholder + blob payload by id; delete works
3588        // through the store with no index to update.
3589        assert_eq!(db.get("vault", "p0").unwrap().unwrap().id, "p0");
3590        assert!(db.delete("vault", "p0").unwrap());
3591        assert_eq!(db.len("vault").unwrap(), 4);
3592    }
3593
3594    #[test]
3595    fn client_side_encryption_rejects_multivector() {
3596        let tmp = tempfile::tempdir().unwrap();
3597        let mut db = open(tmp.path());
3598        let desc = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
3599            .with_multivector(true)
3600            .with_vector_encryption(VectorEncryption::ClientSide);
3601        assert!(matches!(
3602            db.create_collection("bad", desc),
3603            Err(Error::Unsupported(_))
3604        ));
3605    }
3606
3607    // Recursively check whether a file named `name` exists under `dir`.
3608    fn contains_file(dir: &Path, name: &str) -> bool {
3609        std::fs::read_dir(dir).is_ok_and(|rd| {
3610            rd.flatten().any(|e| {
3611                let p = e.path();
3612                if p.is_dir() {
3613                    contains_file(&p, name)
3614                } else {
3615                    p.file_name().is_some_and(|f| f == name)
3616                }
3617            })
3618        })
3619    }
3620
3621    #[test]
3622    fn disk_index_collection_searches_persists_and_writes_an_artifact() {
3623        let tmp = tempfile::tempdir().unwrap();
3624        {
3625            let mut db = open(tmp.path());
3626            db.create_collection("d", desc_with(IndexKind::DiskVamana))
3627                .unwrap();
3628            for i in 0..40u32 {
3629                db.upsert(
3630                    "d",
3631                    &format!("p{i}"),
3632                    &[i as f32, 0.0, 0.0, 0.0],
3633                    &json!({}),
3634                )
3635                .unwrap();
3636            }
3637            let res = db
3638                .search("d", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
3639                .unwrap();
3640            assert_eq!(res[0].id, "p7");
3641            db.checkpoint().unwrap();
3642        }
3643        // The encrypted disk artifact was written under the collection's index dir.
3644        assert!(
3645            contains_file(tmp.path(), "vamana.qvx"),
3646            "disk index file missing"
3647        );
3648        // Reopening rebuilds and re-opens the disk index; search still works.
3649        let mut db = open(tmp.path());
3650        assert_eq!(
3651            db.descriptor("d").unwrap().index.kind,
3652            IndexKind::DiskVamana
3653        );
3654        let res = db
3655            .search("d", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
3656            .unwrap();
3657        assert_eq!(res[0].id, "p7");
3658    }
3659
3660    #[test]
3661    fn graph_collections_maintain_writes_incrementally() {
3662        // FreshDiskANN incremental maintenance (ADR-0033): after the base graph is
3663        // built, inserts land in the delta, deletes tombstone, and updates move —
3664        // all without a rebuild or a reopen, for both the in-memory and disk graphs.
3665        for kind in [IndexKind::Vamana, IndexKind::DiskVamana] {
3666            let tmp = tempfile::tempdir().unwrap();
3667            let mut db = open(tmp.path());
3668            db.create_collection("c", desc_with(kind)).unwrap();
3669            for i in 0..40u32 {
3670                db.upsert(
3671                    "c",
3672                    &format!("p{i}"),
3673                    &[i as f32, 0.0, 0.0, 0.0],
3674                    &json!({}),
3675                )
3676                .unwrap();
3677            }
3678            // The first search builds the read-only base graph.
3679            let res = db
3680                .search("c", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
3681                .unwrap();
3682            assert_eq!(res[0].id, "p7", "{kind:?} base nearest");
3683
3684            // A brand-new point lands in the in-memory delta and is immediately
3685            // findable — no rebuild, no reopen.
3686            db.upsert("c", "p7b", &[7.4, 0.0, 0.0, 0.0], &json!({}))
3687                .unwrap();
3688            let res = db
3689                .search("c", &[7.45, 0.0, 0.0, 0.0], &SearchParams::default())
3690                .unwrap();
3691            assert_eq!(res[0].id, "p7b", "{kind:?} delta insert not found");
3692
3693            // A delete tombstones the point: it is never returned again.
3694            assert!(db.delete("c", "p7").unwrap());
3695            let res = db
3696                .search("c", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
3697                .unwrap();
3698            assert!(
3699                res.iter().all(|m| m.id != "p7"),
3700                "{kind:?} deleted id returned"
3701            );
3702
3703            // An update moves a vector: it is found at the new position and no
3704            // longer dominates the old one.
3705            db.upsert("c", "p20", &[500.0, 0.0, 0.0, 0.0], &json!({}))
3706                .unwrap();
3707            let res = db
3708                .search("c", &[500.0, 0.0, 0.0, 0.0], &SearchParams::default())
3709                .unwrap();
3710            assert_eq!(res[0].id, "p20", "{kind:?} updated vector not at new spot");
3711            let res = db
3712                .search("c", &[20.0, 0.0, 0.0, 0.0], &SearchParams::default())
3713                .unwrap();
3714            assert_ne!(
3715                res[0].id, "p20",
3716                "{kind:?} stale copy still nearest old spot"
3717            );
3718        }
3719    }
3720
3721    #[test]
3722    fn graph_consolidates_under_heavy_churn() {
3723        // Churn past the 20% pending threshold forces a consolidation (a derived
3724        // rebuild from the store); results stay correct throughout and across a
3725        // reopen, since the store is the source of truth (ADR-0033).
3726        let tmp = tempfile::tempdir().unwrap();
3727        let mut db = open(tmp.path());
3728        db.create_collection("c", desc_with(IndexKind::Vamana))
3729            .unwrap();
3730        for i in 0..50u32 {
3731            db.upsert(
3732                "c",
3733                &format!("p{i}"),
3734                &[i as f32, 0.0, 0.0, 0.0],
3735                &json!({}),
3736            )
3737            .unwrap();
3738        }
3739        let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
3740
3741        let deleted: Vec<String> = (0..15u32).map(|i| format!("p{i}")).collect();
3742        for i in 0..15u32 {
3743            assert!(db.delete("c", &format!("p{i}")).unwrap());
3744            db.upsert(
3745                "c",
3746                &format!("q{i}"),
3747                &[1000.0 + i as f32, 0.0, 0.0, 0.0],
3748                &json!({}),
3749            )
3750            .unwrap();
3751        }
3752
3753        let near_origin = db
3754            .search("c", &[5.0, 0.0, 0.0, 0.0], &SearchParams::default())
3755            .unwrap();
3756        assert!(
3757            near_origin.iter().all(|m| !deleted.contains(&m.id)),
3758            "a churned-out id was returned"
3759        );
3760        let near_q = db
3761            .search("c", &[1007.0, 0.0, 0.0, 0.0], &SearchParams::default())
3762            .unwrap();
3763        assert_eq!(near_q[0].id, "q7", "new point not found after churn");
3764
3765        db.checkpoint().unwrap();
3766        drop(db);
3767        let mut db = open(tmp.path());
3768        let near_q = db
3769            .search("c", &[1007.0, 0.0, 0.0, 0.0], &SearchParams::default())
3770            .unwrap();
3771        assert_eq!(near_q[0].id, "q7", "new point lost across reopen");
3772        let near_origin = db
3773            .search("c", &[5.0, 0.0, 0.0, 0.0], &SearchParams::default())
3774            .unwrap();
3775        assert!(
3776            near_origin.iter().all(|m| !deleted.contains(&m.id)),
3777            "a churned-out id resurfaced after reopen"
3778        );
3779    }
3780
3781    #[test]
3782    fn multivector_writes_are_incremental_and_match_a_rebuild() {
3783        // Token rows fold into the ANN index incrementally instead of stale->rebuild
3784        // (ADR-0034); the document API stays correct under interleaved
3785        // upsert/delete/re-upsert with no reopen, and a reopen (the authoritative
3786        // rebuild) yields the identical ranking. Docs lie on an arc of increasing
3787        // angle to the query (cosine, so direction alone ranks them), making the
3788        // expected order unambiguous. (Token-pool candidate generation only engages
3789        // above the exact-scan threshold; the incremental token maintenance it
3790        // relies on is the same code the single-vector index tests exercise.)
3791        let dir = |theta: f32| vec![theta.cos(), theta.sin(), 0.0, 0.0];
3792        let doc = |theta: f32| vec![dir(theta), dir(theta)];
3793        for kind in [
3794            IndexKind::Ivf,
3795            IndexKind::Hnsw,
3796            IndexKind::Vamana,
3797            IndexKind::Colbert,
3798        ] {
3799            let tmp = tempfile::tempdir().unwrap();
3800            let desc = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
3801                .with_multivector(true)
3802                .with_index(IndexSpec {
3803                    kind,
3804                    pq_subspaces: None,
3805                });
3806            let mut db = open(tmp.path());
3807            db.create_collection("m", desc).unwrap();
3808            // d1 is most aligned with the query (smallest angle), d10 least.
3809            for i in 1..=10u32 {
3810                db.upsert_document(
3811                    "m",
3812                    &format!("d{i}"),
3813                    &doc(0.1 * i as f32),
3814                    &json!({ "i": i }),
3815                )
3816                .unwrap();
3817            }
3818            let q = vec![dir(0.0)];
3819            let top = |db: &mut Database| {
3820                db.search_multi_vector(
3821                    "m",
3822                    &q,
3823                    &SearchParams {
3824                        k: 3,
3825                        ..Default::default()
3826                    },
3827                )
3828                .unwrap()
3829                .into_iter()
3830                .map(|m| m.id)
3831                .collect::<Vec<_>>()
3832            };
3833            assert_eq!(top(&mut db), vec!["d1", "d2", "d3"], "{kind:?} initial");
3834
3835            // Delete the top document — gone without a reopen.
3836            assert!(db.delete_document("m", "d1").unwrap());
3837            assert_eq!(
3838                top(&mut db),
3839                vec!["d2", "d3", "d4"],
3840                "{kind:?} after delete"
3841            );
3842
3843            // Re-upsert the least-aligned doc onto the query — the update is live.
3844            db.upsert_document("m", "d10", &doc(0.0), &json!({ "i": 10 }))
3845                .unwrap();
3846            assert_eq!(top(&mut db)[0], "d10", "{kind:?} after update");
3847
3848            // A new, near-aligned document is immediately findable, second only to d10.
3849            db.upsert_document("m", "d11", &doc(0.05), &json!({ "i": 11 }))
3850                .unwrap();
3851            let r = top(&mut db);
3852            assert_eq!(r[0], "d10", "{kind:?}");
3853            assert_eq!(r[1], "d11", "{kind:?} new doc not ranked");
3854
3855            // A shorter re-upsert drops the trailing token row.
3856            db.upsert_document("m", "d8", &[dir(0.8)], &json!({ "i": 8 }))
3857                .unwrap();
3858            let d8 = db.get_document("m", "d8", true).unwrap().unwrap();
3859            assert_eq!(d8.vectors.unwrap().len(), 1, "{kind:?} trailing token kept");
3860
3861            // The incremental in-memory state matches an authoritative rebuild.
3862            let before = top(&mut db);
3863            drop(db);
3864            let mut db = open(tmp.path());
3865            assert_eq!(top(&mut db), before, "{kind:?} incremental != rebuild");
3866            assert!(
3867                db.get_document("m", "d1", false).unwrap().is_none(),
3868                "{kind:?} deleted doc resurfaced"
3869            );
3870        }
3871    }
3872
3873    #[test]
3874    fn colbert_index_requires_multivector() {
3875        let tmp = tempfile::tempdir().unwrap();
3876        let mut db = open(tmp.path());
3877        // ColBERT is a late-interaction token-pool index — invalid for a
3878        // single-vector collection (ADR-0034).
3879        let single = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine).with_index(IndexSpec {
3880            kind: IndexKind::Colbert,
3881            pq_subspaces: None,
3882        });
3883        assert!(matches!(
3884            db.create_collection("c", single),
3885            Err(Error::Unsupported(_))
3886        ));
3887        // ...but valid for a multi-vector collection.
3888        let multi = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
3889            .with_multivector(true)
3890            .with_index(IndexSpec {
3891                kind: IndexKind::Colbert,
3892                pq_subspaces: None,
3893            });
3894        assert!(db.create_collection("m", multi).is_ok());
3895    }
3896
3897    // ---- hybrid (pre-filtered) search ----
3898
3899    // A collection whose `city` (keyword) and `n` (numeric) payload fields are
3900    // declared filterable, so the planner can pre-filter on them.
3901    fn desc_filterable() -> Descriptor {
3902        Descriptor::new(4, Dtype::F32, DistanceMetric::L2).with_filterable(vec![
3903            FilterableField::keyword("city"),
3904            FilterableField::numeric("n"),
3905        ])
3906    }
3907
3908    // 30 points on the x-axis (distance to the origin grows with i), cycling
3909    // through three cities, each carrying its index as a numeric `n`.
3910    // Checkpointed so the rows live in a sealed segment's secondary index — the
3911    // pre-filter primitive — not only the active buffer.
3912    fn seed_cities(db: &mut Database) {
3913        const CITIES: [&str; 3] = ["paris", "lyon", "rome"];
3914        db.create_collection("c", desc_filterable()).unwrap();
3915        for i in 0..30u32 {
3916            db.upsert(
3917                "c",
3918                &format!("p{i}"),
3919                &[i as f32, 0.0, 0.0, 0.0],
3920                &json!({"city": CITIES[i as usize % 3], "n": i}),
3921            )
3922            .unwrap();
3923        }
3924        db.checkpoint().unwrap();
3925    }
3926
3927    #[test]
3928    fn hybrid_equality_prefilter_is_exact() {
3929        let tmp = tempfile::tempdir().unwrap();
3930        let mut db = open(tmp.path());
3931        seed_cities(&mut db);
3932        let params = SearchParams {
3933            k: 5,
3934            filter: Some(Filter::Eq {
3935                field: "city".into(),
3936                value: json!("lyon"),
3937            }),
3938            ..SearchParams::default()
3939        };
3940        let res = db.search("c", &[0.0; 4], &params).unwrap();
3941        assert!(!res.is_empty());
3942        // lyon is i % 3 == 1 → p1, p4, p7, …; nearest the origin is p1.
3943        assert_eq!(res[0].id, "p1");
3944        for m in &res {
3945            assert_eq!(m.payload.as_ref().unwrap()["city"], json!("lyon"));
3946        }
3947    }
3948
3949    #[test]
3950    fn hybrid_numeric_range_prefilter_is_exact() {
3951        let tmp = tempfile::tempdir().unwrap();
3952        let mut db = open(tmp.path());
3953        seed_cities(&mut db);
3954        let params = SearchParams {
3955            k: 4,
3956            filter: Some(Filter::Gte {
3957                field: "n".into(),
3958                value: json!(10),
3959            }),
3960            ..SearchParams::default()
3961        };
3962        let res = db.search("c", &[0.0; 4], &params).unwrap();
3963        // Among n >= 10, the nearest the origin is n == 10 (p10).
3964        assert_eq!(res[0].id, "p10");
3965        for m in &res {
3966            assert!(m.payload.as_ref().unwrap()["n"].as_u64().unwrap() >= 10);
3967        }
3968    }
3969
3970    #[test]
3971    fn hybrid_unsatisfiable_filter_returns_empty() {
3972        let tmp = tempfile::tempdir().unwrap();
3973        let mut db = open(tmp.path());
3974        seed_cities(&mut db);
3975        // No row holds this city, so the pre-filter proves the result empty
3976        // without touching the vector index.
3977        let params = SearchParams {
3978            filter: Some(Filter::Eq {
3979                field: "city".into(),
3980                value: json!("atlantis"),
3981            }),
3982            ..SearchParams::default()
3983        };
3984        assert!(db.search("c", &[0.0; 4], &params).unwrap().is_empty());
3985    }
3986
3987    #[test]
3988    fn hybrid_and_or_composition_is_exact() {
3989        let tmp = tempfile::tempdir().unwrap();
3990        let mut db = open(tmp.path());
3991        seed_cities(&mut db);
3992        // (city in {paris, rome}) AND (n < 12): a keyword `in` intersected with a
3993        // numeric range, both pre-filterable.
3994        let params = SearchParams {
3995            k: 10,
3996            filter: Some(Filter::And(vec![
3997                Filter::In {
3998                    field: "city".into(),
3999                    values: vec![json!("paris"), json!("rome")],
4000                },
4001                Filter::Lt {
4002                    field: "n".into(),
4003                    value: json!(12),
4004                },
4005            ])),
4006            ..SearchParams::default()
4007        };
4008        let res = db.search("c", &[0.0; 4], &params).unwrap();
4009        // paris is i % 3 == 0 → the nearest qualifying point is p0.
4010        assert_eq!(res[0].id, "p0");
4011        for m in &res {
4012            let payload = m.payload.as_ref().unwrap();
4013            let city = payload["city"].as_str().unwrap();
4014            assert!(city == "paris" || city == "rome");
4015            assert!(payload["n"].as_u64().unwrap() < 12);
4016        }
4017    }
4018
4019    #[test]
4020    fn hybrid_rechecks_non_indexable_clause() {
4021        let tmp = tempfile::tempdir().unwrap();
4022        let mut db = open(tmp.path());
4023        seed_cities(&mut db);
4024        // city is pre-filterable; the Not(…) clause is not, so it is re-checked
4025        // exactly on the narrowed candidates — paris rows excluding p0.
4026        let params = SearchParams {
4027            k: 10,
4028            filter: Some(Filter::And(vec![
4029                Filter::Eq {
4030                    field: "city".into(),
4031                    value: json!("paris"),
4032                },
4033                Filter::Not(Box::new(Filter::Eq {
4034                    field: "n".into(),
4035                    value: json!(0),
4036                })),
4037            ])),
4038            ..SearchParams::default()
4039        };
4040        let res = db.search("c", &[0.0; 4], &params).unwrap();
4041        assert!(res.iter().all(|m| m.id != "p0"));
4042        // The nearest paris point after p0 is p3.
4043        assert_eq!(res[0].id, "p3");
4044        for m in &res {
4045            assert_eq!(m.payload.as_ref().unwrap()["city"], json!("paris"));
4046        }
4047    }
4048
4049    #[test]
4050    fn post_filter_fallback_on_undeclared_field_is_correct() {
4051        let tmp = tempfile::tempdir().unwrap();
4052        let mut db = open(tmp.path());
4053        // Only `city` is filterable; a filter on the undeclared `tier` cannot
4054        // pre-filter and falls back to ANN post-filtering — still exact.
4055        db.create_collection(
4056            "c",
4057            Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
4058                .with_filterable(vec![FilterableField::keyword("city")]),
4059        )
4060        .unwrap();
4061        for i in 0..20u32 {
4062            let tier = if i % 2 == 0 { "gold" } else { "silver" };
4063            db.upsert(
4064                "c",
4065                &format!("p{i}"),
4066                &[i as f32, 0.0, 0.0, 0.0],
4067                &json!({"city": "paris", "tier": tier}),
4068            )
4069            .unwrap();
4070        }
4071        let params = SearchParams {
4072            k: 5,
4073            filter: Some(Filter::Eq {
4074                field: "tier".into(),
4075                value: json!("gold"),
4076            }),
4077            ..SearchParams::default()
4078        };
4079        let res = db.search("c", &[0.0; 4], &params).unwrap();
4080        assert!(!res.is_empty());
4081        for m in &res {
4082            assert_eq!(m.payload.as_ref().unwrap()["tier"], json!("gold"));
4083        }
4084    }
4085
4086    #[test]
4087    fn leaf_predicate_maps_only_indexable_filterable_leaves() {
4088        let fields = vec![
4089            FilterableField::keyword("city"),
4090            FilterableField::numeric("n"),
4091        ];
4092        // Keyword equality on a filterable field maps.
4093        assert_eq!(
4094            leaf_predicate(
4095                &Filter::Eq {
4096                    field: "city".into(),
4097                    value: json!("paris")
4098                },
4099                &fields
4100            ),
4101            Some(SecPredicate::Eq {
4102                field: "city".into(),
4103                value: SecValue::Keyword("paris".into())
4104            })
4105        );
4106        // A numeric comparison maps to a one-sided range.
4107        assert_eq!(
4108            leaf_predicate(
4109                &Filter::Gte {
4110                    field: "n".into(),
4111                    value: json!(3)
4112                },
4113                &fields
4114            ),
4115            Some(SecPredicate::Range {
4116                field: "n".into(),
4117                lo: Some(SecValue::Numeric(3.0)),
4118                hi: None,
4119                lo_inclusive: true,
4120                hi_inclusive: false,
4121            })
4122        );
4123        // Undeclared field, type mismatch, `ne`, and `exists` do not map.
4124        let undeclared = Filter::Eq {
4125            field: "tier".into(),
4126            value: json!("gold"),
4127        };
4128        let mismatch = Filter::Eq {
4129            field: "city".into(),
4130            value: json!(5),
4131        };
4132        let ne = Filter::Ne {
4133            field: "city".into(),
4134            value: json!("x"),
4135        };
4136        let exists = Filter::Exists {
4137            field: "city".into(),
4138        };
4139        assert!(leaf_predicate(&undeclared, &fields).is_none());
4140        assert!(leaf_predicate(&mismatch, &fields).is_none());
4141        assert!(leaf_predicate(&ne, &fields).is_none());
4142        assert!(leaf_predicate(&exists, &fields).is_none());
4143    }
4144
4145    // ----- durable IVF index recovery (ADR-0025) -----
4146
4147    // The first collection created in a fresh store has id 0.
4148    fn ivf_index_dir(root: &Path) -> std::path::PathBuf {
4149        root.join("collections").join("0000000000").join("index")
4150    }
4151
4152    fn idx_snapshot_files(root: &Path) -> Vec<String> {
4153        let mut v: Vec<String> = std::fs::read_dir(ivf_index_dir(root))
4154            .map(|rd| {
4155                rd.filter_map(std::result::Result::ok)
4156                    .filter_map(|e| e.file_name().to_str().map(str::to_owned))
4157                    .filter(|n| n.starts_with("idx-"))
4158                    .collect()
4159            })
4160            .unwrap_or_default();
4161        v.sort();
4162        v
4163    }
4164
4165    fn nearest(db: &mut Database, q: &[f32]) -> Vec<String> {
4166        db.search("c", q, &SearchParams::default())
4167            .unwrap()
4168            .into_iter()
4169            .map(|m| m.id)
4170            .collect()
4171    }
4172
4173    fn seed_ivf(db: &mut Database, n: u32) {
4174        db.create_collection("c", desc_with(IndexKind::Ivf))
4175            .unwrap();
4176        for i in 0..n {
4177            db.upsert(
4178                "c",
4179                &format!("p{i}"),
4180                &[i as f32, 0.0, 0.0, 0.0],
4181                &json!({}),
4182            )
4183            .unwrap();
4184        }
4185        // The first search builds (and trains) the IVF from the store.
4186        let _ = nearest(db, &[1.0, 0.0, 0.0, 0.0]);
4187    }
4188
4189    #[test]
4190    fn ivf_snapshot_is_written_at_checkpoint() {
4191        let tmp = tempfile::tempdir().unwrap();
4192        let mut db = open(tmp.path());
4193        seed_ivf(&mut db, 40);
4194        db.checkpoint().unwrap();
4195        assert_eq!(idx_snapshot_files(tmp.path()).len(), 1);
4196    }
4197
4198    #[test]
4199    fn ivf_loads_from_snapshot_rather_than_rebuilding() {
4200        let tmp = tempfile::tempdir().unwrap();
4201        {
4202            let mut db = open(tmp.path());
4203            db.create_collection("c", desc_with(IndexKind::Ivf))
4204                .unwrap();
4205            db.upsert("c", "a", &[0.0, 0.0, 0.0, 0.0], &json!({}))
4206                .unwrap();
4207            db.upsert("c", "m", &[1.0, 0.0, 0.0, 0.0], &json!({}))
4208                .unwrap();
4209            // First search builds the IVF — int_to_ext is the sorted scan order.
4210            let _ = nearest(&mut db, &[0.0, 0.0, 0.0, 0.0]);
4211            // Incremental upserts append in insertion order, diverging from sort.
4212            db.upsert("c", "z", &[2.0, 0.0, 0.0, 0.0], &json!({}))
4213                .unwrap();
4214            db.upsert("c", "b", &[3.0, 0.0, 0.0, 0.0], &json!({}))
4215                .unwrap();
4216            db.checkpoint().unwrap();
4217            assert_eq!(db.collections["c"].int_to_ext, ["a", "m", "z", "b"]);
4218        }
4219        let db = open(tmp.path());
4220        // Loaded from the snapshot: the insertion-order mapping is preserved. A
4221        // rebuild would have produced the sorted order ["a", "b", "m", "z"].
4222        assert_eq!(
4223            db.collections["c"].int_to_ext,
4224            ["a", "m", "z", "b"],
4225            "index was rebuilt, not loaded from the snapshot"
4226        );
4227    }
4228
4229    #[test]
4230    fn ivf_recovery_replays_post_checkpoint_upserts() {
4231        let tmp = tempfile::tempdir().unwrap();
4232        {
4233            let mut db = open(tmp.path());
4234            seed_ivf(&mut db, 30);
4235            db.checkpoint().unwrap();
4236            // Post-checkpoint upsert, no further checkpoint.
4237            db.upsert("c", "far", &[500.0, 0.0, 0.0, 0.0], &json!({}))
4238                .unwrap();
4239        }
4240        let mut db = open(tmp.path());
4241        assert_eq!(nearest(&mut db, &[500.0, 0.0, 0.0, 0.0])[0], "far");
4242        assert_eq!(nearest(&mut db, &[1.0, 0.0, 0.0, 0.0])[0], "p1");
4243    }
4244
4245    #[test]
4246    fn ivf_recovery_replays_post_checkpoint_deletes() {
4247        let tmp = tempfile::tempdir().unwrap();
4248        {
4249            let mut db = open(tmp.path());
4250            seed_ivf(&mut db, 30);
4251            db.checkpoint().unwrap();
4252            assert!(db.delete("c", "p7").unwrap());
4253        }
4254        let mut db = open(tmp.path());
4255        assert!(
4256            nearest(&mut db, &[7.0, 0.0, 0.0, 0.0])
4257                .iter()
4258                .all(|id| id != "p7")
4259        );
4260        assert!(db.get("c", "p7").unwrap().is_none());
4261        assert!(db.get("c", "p6").unwrap().is_some());
4262    }
4263
4264    #[test]
4265    fn ivf_recovery_replays_post_checkpoint_updates() {
4266        let tmp = tempfile::tempdir().unwrap();
4267        {
4268            let mut db = open(tmp.path());
4269            seed_ivf(&mut db, 30);
4270            db.checkpoint().unwrap();
4271            // Move p0 far away — an in-place update shadowing its sealed row.
4272            db.upsert("c", "p0", &[999.0, 0.0, 0.0, 0.0], &json!({}))
4273                .unwrap();
4274        }
4275        let mut db = open(tmp.path());
4276        assert_eq!(nearest(&mut db, &[999.0, 0.0, 0.0, 0.0])[0], "p0");
4277        assert_ne!(
4278            nearest(&mut db, &[0.0, 0.0, 0.0, 0.0])[0],
4279            "p0",
4280            "the stale p0 vector survived the update"
4281        );
4282    }
4283
4284    #[test]
4285    fn corrupt_ivf_snapshot_falls_back_to_rebuild() {
4286        let tmp = tempfile::tempdir().unwrap();
4287        {
4288            let mut db = open(tmp.path());
4289            seed_ivf(&mut db, 30);
4290            db.checkpoint().unwrap();
4291        }
4292        // Corrupt the snapshot file; recovery must fall back to a rebuild.
4293        let files = idx_snapshot_files(tmp.path());
4294        assert_eq!(files.len(), 1);
4295        std::fs::write(ivf_index_dir(tmp.path()).join(&files[0]), b"corrupt").unwrap();
4296
4297        let mut db = open(tmp.path());
4298        assert_eq!(nearest(&mut db, &[7.0, 0.0, 0.0, 0.0])[0], "p7");
4299    }
4300
4301    // ---- Multi-vector / late interaction (ColBERT, ADR-0028) ----
4302
4303    fn mv_desc() -> Descriptor {
4304        Descriptor::new(3, Dtype::F32, DistanceMetric::Cosine).with_multivector(true)
4305    }
4306
4307    // Brute-force MaxSim ranking over a corpus: the reference the pipeline must
4308    // reproduce (score desc, ties by id), using the same shared scorer.
4309    fn bf_rank(query: &[Vec<f32>], corpus: &[(&str, Vec<Vec<f32>>)]) -> Vec<(String, f32)> {
4310        let mut v: Vec<(String, f32)> = corpus
4311            .iter()
4312            .map(|(id, toks)| ((*id).to_owned(), max_sim(Metric::Cosine, query, toks)))
4313            .collect();
4314        v.sort_by(|a, b| b.1.total_cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
4315        v
4316    }
4317
4318    #[test]
4319    fn multivector_search_ranks_documents_by_maxsim() {
4320        let tmp = tempfile::tempdir().unwrap();
4321        let mut db = open(tmp.path());
4322        db.create_collection("docs", mv_desc()).unwrap();
4323        let corpus: Vec<(&str, Vec<Vec<f32>>)> = vec![
4324            ("d_cat", vec![vec![1.0, 0.0, 0.0], vec![0.0, 1.0, 0.0]]),
4325            ("d_dog", vec![vec![0.0, 1.0, 0.0], vec![0.0, 0.0, 1.0]]),
4326            (
4327                "d_mix",
4328                vec![
4329                    vec![1.0, 1.0, 0.0],
4330                    vec![0.0, 0.0, 1.0],
4331                    vec![1.0, 0.0, 1.0],
4332                ],
4333            ),
4334        ];
4335        for (id, toks) in &corpus {
4336            db.upsert_document("docs", id, toks, &json!({ "id": id }))
4337                .unwrap();
4338        }
4339        assert_eq!(db.document_count("docs").unwrap(), 3);
4340
4341        let query = vec![vec![1.0, 0.0, 0.0], vec![0.0, 0.0, 1.0]];
4342        let params = SearchParams {
4343            k: 3,
4344            with_payload: false,
4345            ..SearchParams::default()
4346        };
4347        let got = db.search_multi_vector("docs", &query, &params).unwrap();
4348        let expected = bf_rank(&query, &corpus);
4349
4350        assert_eq!(got.len(), 3);
4351        for (g, (eid, escore)) in got.iter().zip(expected.iter()) {
4352            assert_eq!(&g.id, eid, "ranking matches brute force");
4353            assert!(
4354                (g.score - escore).abs() < 1e-5,
4355                "{} score {} vs {escore}",
4356                g.id,
4357                g.score
4358            );
4359        }
4360    }
4361
4362    #[test]
4363    fn multivector_search_truncates_to_k() {
4364        let tmp = tempfile::tempdir().unwrap();
4365        let mut db = open(tmp.path());
4366        db.create_collection("docs", mv_desc()).unwrap();
4367        for i in 0..5 {
4368            let v = vec![vec![1.0, i as f32, 0.0]];
4369            db.upsert_document("docs", &format!("d{i}"), &v, &json!({}))
4370                .unwrap();
4371        }
4372        let params = SearchParams {
4373            k: 2,
4374            ..SearchParams::default()
4375        };
4376        let got = db
4377            .search_multi_vector("docs", &[vec![1.0, 0.0, 0.0]], &params)
4378            .unwrap();
4379        assert_eq!(got.len(), 2);
4380    }
4381
4382    #[test]
4383    fn multivector_filter_selects_documents_exactly() {
4384        let tmp = tempfile::tempdir().unwrap();
4385        let mut db = open(tmp.path());
4386        db.create_collection("docs", mv_desc()).unwrap();
4387        // Identical token sets, so only the filter distinguishes the documents.
4388        db.upsert_document("docs", "a", &[vec![1.0, 0.0, 0.0]], &json!({"lang":"en"}))
4389            .unwrap();
4390        db.upsert_document("docs", "b", &[vec![1.0, 0.0, 0.0]], &json!({"lang":"fr"}))
4391            .unwrap();
4392        let params = SearchParams {
4393            k: 10,
4394            filter: Some(Filter::Eq {
4395                field: "lang".into(),
4396                value: json!("fr"),
4397            }),
4398            ..SearchParams::default()
4399        };
4400        let got = db
4401            .search_multi_vector("docs", &[vec![1.0, 0.0, 0.0]], &params)
4402            .unwrap();
4403        assert_eq!(got.len(), 1);
4404        assert_eq!(got[0].id, "b");
4405        assert_eq!(got[0].payload, Some(json!({"lang":"fr"})));
4406    }
4407
4408    #[test]
4409    fn multivector_reopen_rebuilds_grouping_and_ranking() {
4410        let tmp = tempfile::tempdir().unwrap();
4411        let query = vec![vec![1.0, 0.0, 0.0], vec![0.0, 0.0, 1.0]];
4412        let corpus: Vec<(&str, Vec<Vec<f32>>)> = vec![
4413            ("x", vec![vec![1.0, 0.0, 0.0], vec![0.0, 1.0, 0.0]]),
4414            ("y", vec![vec![0.0, 0.0, 1.0], vec![1.0, 0.0, 1.0]]),
4415        ];
4416        {
4417            let mut db = open(tmp.path());
4418            db.create_collection("docs", mv_desc()).unwrap();
4419            for (id, toks) in &corpus {
4420                db.upsert_document("docs", id, toks, &json!({})).unwrap();
4421            }
4422            db.checkpoint().unwrap();
4423        }
4424        // Reopen: the document grouping is rebuilt from the live rows.
4425        let mut db = open(tmp.path());
4426        assert_eq!(db.document_count("docs").unwrap(), 2);
4427        let params = SearchParams {
4428            k: 2,
4429            ..SearchParams::default()
4430        };
4431        let got = db.search_multi_vector("docs", &query, &params).unwrap();
4432        let expected = bf_rank(&query, &corpus);
4433        assert_eq!(
4434            got.iter().map(|m| m.id.clone()).collect::<Vec<_>>(),
4435            expected
4436                .iter()
4437                .map(|(id, _)| id.clone())
4438                .collect::<Vec<_>>()
4439        );
4440    }
4441
4442    #[test]
4443    fn multivector_delete_document_removes_all_tokens() {
4444        let tmp = tempfile::tempdir().unwrap();
4445        let mut db = open(tmp.path());
4446        db.create_collection("docs", mv_desc()).unwrap();
4447        db.upsert_document(
4448            "docs",
4449            "a",
4450            &[vec![1.0, 0.0, 0.0], vec![0.0, 1.0, 0.0]],
4451            &json!({}),
4452        )
4453        .unwrap();
4454        db.upsert_document("docs", "b", &[vec![0.0, 0.0, 1.0]], &json!({}))
4455            .unwrap();
4456        assert_eq!(db.document_count("docs").unwrap(), 2);
4457        assert_eq!(db.len("docs").unwrap(), 3);
4458
4459        assert!(db.delete_document("docs", "a").unwrap());
4460        assert_eq!(db.document_count("docs").unwrap(), 1);
4461        assert_eq!(db.len("docs").unwrap(), 1);
4462        assert!(db.get_document("docs", "a", false).unwrap().is_none());
4463        let params = SearchParams {
4464            k: 10,
4465            ..SearchParams::default()
4466        };
4467        let got = db
4468            .search_multi_vector("docs", &[vec![1.0, 0.0, 0.0]], &params)
4469            .unwrap();
4470        assert!(got.iter().all(|m| m.id != "a"));
4471        assert!(!db.delete_document("docs", "a").unwrap());
4472    }
4473
4474    #[test]
4475    fn multivector_reupsert_replaces_tokens() {
4476        let tmp = tempfile::tempdir().unwrap();
4477        let mut db = open(tmp.path());
4478        db.create_collection("docs", mv_desc()).unwrap();
4479        db.upsert_document(
4480            "docs",
4481            "a",
4482            &[
4483                vec![1.0, 0.0, 0.0],
4484                vec![0.0, 1.0, 0.0],
4485                vec![0.0, 0.0, 1.0],
4486            ],
4487            &json!({"v":1}),
4488        )
4489        .unwrap();
4490        assert_eq!(db.len("docs").unwrap(), 3);
4491        // Re-upsert with a single token: the trailing two must be gone.
4492        db.upsert_document("docs", "a", &[vec![0.0, 0.0, 1.0]], &json!({"v":2}))
4493            .unwrap();
4494        assert_eq!(db.document_count("docs").unwrap(), 1);
4495        assert_eq!(db.len("docs").unwrap(), 1);
4496        let doc = db.get_document("docs", "a", true).unwrap().unwrap();
4497        assert_eq!(doc.payload, Some(json!({"v":2})));
4498        assert_eq!(doc.vectors, Some(vec![vec![0.0, 0.0, 1.0]]));
4499    }
4500
4501    #[test]
4502    fn single_and_multi_vector_apis_are_mutually_exclusive() {
4503        let tmp = tempfile::tempdir().unwrap();
4504        let mut db = open(tmp.path());
4505        db.create_collection("mv", mv_desc()).unwrap();
4506        db.create_collection("sv", Descriptor::new(3, Dtype::F32, DistanceMetric::Cosine))
4507            .unwrap();
4508        // Single-vector ops on a multi-vector collection are rejected.
4509        assert!(matches!(
4510            db.upsert("mv", "a", &[1.0, 0.0, 0.0], &json!({})),
4511            Err(Error::Unsupported(_))
4512        ));
4513        assert!(matches!(
4514            db.search("mv", &[1.0, 0.0, 0.0], &SearchParams::default()),
4515            Err(Error::Unsupported(_))
4516        ));
4517        // Document ops on a single-vector collection are rejected.
4518        assert!(matches!(
4519            db.upsert_document("sv", "a", &[vec![1.0, 0.0, 0.0]], &json!({})),
4520            Err(Error::Unsupported(_))
4521        ));
4522        assert!(matches!(
4523            db.search_multi_vector("sv", &[vec![1.0, 0.0, 0.0]], &SearchParams::default()),
4524            Err(Error::Unsupported(_))
4525        ));
4526        assert!(matches!(
4527            db.document_count("sv"),
4528            Err(Error::Unsupported(_))
4529        ));
4530    }
4531
4532    #[test]
4533    fn multivector_rejects_l2_metric_and_bad_documents() {
4534        let tmp = tempfile::tempdir().unwrap();
4535        let mut db = open(tmp.path());
4536        let l2 = Descriptor::new(3, Dtype::F32, DistanceMetric::L2).with_multivector(true);
4537        assert!(matches!(
4538            db.create_collection("bad", l2),
4539            Err(Error::Unsupported(_))
4540        ));
4541
4542        db.create_collection("docs", mv_desc()).unwrap();
4543        // A document id may not contain the reserved separator.
4544        assert!(matches!(
4545            db.upsert_document("docs", "a\u{1f}b", &[vec![1.0, 0.0, 0.0]], &json!({})),
4546            Err(Error::Unsupported(_))
4547        ));
4548        // A document needs at least one vector, of the right dimensionality.
4549        assert!(matches!(
4550            db.upsert_document("docs", "a", &[], &json!({})),
4551            Err(Error::Unsupported(_))
4552        ));
4553        assert!(matches!(
4554            db.upsert_document("docs", "a", &[vec![1.0, 0.0]], &json!({})),
4555            Err(Error::Unsupported(_))
4556        ));
4557    }
4558
4559    #[test]
4560    fn snapshot_then_open_reproduces_the_database() {
4561        let src = tempfile::tempdir().unwrap();
4562        let mut db = open(src.path());
4563        db.create_collection("kb", desc()).unwrap();
4564        db.create_collection("kb2", desc()).unwrap();
4565        db.upsert("kb", "a", &[1.0, 0.0, 0.0, 0.0], &json!({ "n": 1 }))
4566            .unwrap();
4567        db.upsert("kb", "b", &[0.0, 1.0, 0.0, 0.0], &json!({ "n": 2 }))
4568            .unwrap();
4569        db.upsert("kb2", "z", &[0.0, 0.0, 1.0, 0.0], &json!({ "n": 3 }))
4570            .unwrap();
4571
4572        let dest = tempfile::tempdir().unwrap();
4573        let snap_dir = dest.path().join("snap");
4574        let info = db.snapshot(&snap_dir).unwrap();
4575        assert!(info.files > 0 && info.bytes > 0);
4576        assert_eq!(info.manifest_version, db.manifest_version());
4577
4578        // A write after the snapshot must not appear in the snapshot.
4579        db.upsert("kb", "late", &[1.0, 1.0, 0.0, 0.0], &json!({ "n": 9 }))
4580            .unwrap();
4581
4582        let restored = open(&snap_dir);
4583        let mut names = restored.collection_names();
4584        names.sort();
4585        assert_eq!(names, vec!["kb".to_owned(), "kb2".to_owned()]);
4586        assert_eq!(restored.len("kb").unwrap(), 2, "no post-snapshot write");
4587        assert_eq!(
4588            restored.get("kb", "a").unwrap().unwrap().payload,
4589            Some(json!({ "n": 1 }))
4590        );
4591        assert_eq!(restored.len("kb2").unwrap(), 1);
4592        assert!(restored.get("kb", "late").unwrap().is_none());
4593    }
4594
4595    #[test]
4596    fn snapshot_refuses_an_existing_destination() {
4597        let src = tempfile::tempdir().unwrap();
4598        let mut db = open(src.path());
4599        let dest = tempfile::tempdir().unwrap(); // already exists
4600        assert!(matches!(
4601            db.snapshot(dest.path()),
4602            Err(Error::Core(quiver_core::CoreError::AlreadyExists(_)))
4603        ));
4604    }
4605
4606    #[test]
4607    fn restore_snapshot_roundtrips_and_guards() {
4608        let src = tempfile::tempdir().unwrap();
4609        let mut db = open(src.path());
4610        db.create_collection("kb", desc()).unwrap();
4611        db.upsert("kb", "a", &[1.0, 0.0, 0.0, 0.0], &json!({ "n": 1 }))
4612            .unwrap();
4613        let work = tempfile::tempdir().unwrap();
4614        let snap_dir = work.path().join("snap");
4615        db.snapshot(&snap_dir).unwrap();
4616
4617        // Restore into a fresh directory, then open it.
4618        let restored_dir = work.path().join("restored");
4619        let info = restore_snapshot(&snap_dir, &restored_dir).unwrap();
4620        assert!(info.files > 0);
4621        let restored = open(&restored_dir);
4622        assert_eq!(restored.len("kb").unwrap(), 1);
4623
4624        // Restoring over an existing directory is refused.
4625        assert!(matches!(
4626            restore_snapshot(&snap_dir, &restored_dir),
4627            Err(Error::Core(quiver_core::CoreError::AlreadyExists(_)))
4628        ));
4629        // A directory that is not a snapshot (no CURRENT) is rejected.
4630        let not_snap = work.path().join("not-a-snapshot");
4631        std::fs::create_dir_all(&not_snap).unwrap();
4632        assert!(matches!(
4633            restore_snapshot(&not_snap, &work.path().join("out")),
4634            Err(Error::Core(quiver_core::CoreError::InvalidArgument(_)))
4635        ));
4636    }
4637}