Skip to main content

quiver_embed/
lib.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2//! The embeddable, in-process Quiver database handle.
3//!
4//! [`Database`] composes the storage engine ([`quiver_core::Store`]) with a
5//! per-collection vector index and payload filtering ([`quiver_query::Filter`])
6//! into one handle. It exposes the same logical operations the server speaks
7//! (`docs/api/wire-protocol.md`), so library mode and server mode exercise
8//! identical engine semantics — the server is a thin transport/policy shell.
9//!
10//! ## Index lifecycle
11//! The store is the source of truth. Each collection chooses its index via the
12//! descriptor's [`IndexSpec`] (default in-memory HNSW); the index is built from
13//! the store on open. HNSW applies new-id inserts incrementally; once an IVF
14//! index is built it applies inserts, in-place updates, and deletes
15//! incrementally with LIRE rebalancing (ADR-0023). The Vamana / disk graph
16//! family is maintained the FreshDiskANN way (ADR-0033): the batch-built graph
17//! is a read-only base, recent inserts land in an in-memory delta graph, and
18//! deletes are tombstoned, so writes are size-independent; when the pending work
19//! grows past a fixed fraction of the base the next access consolidates by
20//! rebuilding from the store. All indexes stay derived (rebuilt from the store
21//! on open), so the crash gate never sees an index write.
22//!
23//! ## Filtered (hybrid) search
24//! A search may carry a [`quiver_query::Filter`] over the payload. The planner
25//! decomposes it into the predicates the collection's secondary indexes can
26//! answer; when those narrow the query to a small candidate set it scans that
27//! set exactly (perfect recall, no filtered-ANN cliff), and otherwise it
28//! over-fetches from the ANN index and post-filters. Both arms re-check the full
29//! filter, so results are exact regardless of which path runs.
30//!
31//! ## Concurrency (ADR-0057 / ADR-0062)
32//! Single-writer. Writes take `&mut self`. Reads come in two flavors: the
33//! `&mut self` convenience methods (`search`, `hybrid_search`,
34//! `search_multi_vector`) rebuild a stale index in place and so give embedded,
35//! single-threaded callers read-your-writes; the `&self` `*_snapshot` methods
36//! read the current immutable snapshot and run **concurrently**, serving the
37//! *prior* snapshot when a write deferred a rebuild (snapshot-isolated, slightly
38//! stale). A server therefore serves concurrent reads behind a reader–writer lock,
39//! and rebuilds **off** the exclusive lock (ADR-0062): it captures the rebuild
40//! inputs under the shared lock ([`Database::snapshot_rebuild_inputs`]), builds the
41//! new index with no lock held ([`RebuildInputs::build`]), and installs it under a
42//! brief write lock ([`Database::commit_rebuild`]) — so a rebuild never stalls
43//! concurrent readers.
44
45use std::collections::{BTreeMap, BTreeSet, HashMap};
46use std::path::Path;
47
48use quiver_core::{SecPredicate, SecValue, Store};
49use quiver_index::{
50    ColbertConfig, ColbertIndex, DiskVamana, FreshDiskVamana, FreshVamana, Hnsw, HnswConfig, Index,
51    Ivf, IvfConfig, Metric, Neighbor, ProductQuantizer, Vamana, VamanaConfig, max_sim,
52    ordering_distance, report_metric,
53};
54use serde::{Deserialize, Serialize};
55use serde_json::Value;
56use thiserror::Error;
57
58pub use quiver_core::keyring::{KeyRing, SingleCodecKeyRing};
59pub use quiver_core::page::PageCodec;
60pub use quiver_core::{CollectionId, CommitObserver, WalEntry, WalOp};
61pub use quiver_core::{
62    Descriptor, DistanceMetric, Dtype, FieldType, FilterableField, IndexKind, IndexSpec,
63    VectorEncryption,
64};
65pub use quiver_query::Filter;
66pub use quiver_query::{
67    BM25_B, BM25_K1, DEFAULT_RRF_K0, SPARSE_KEY, SparseInvertedIndex, SparseVector, TEXT_KEY,
68    query_term_ids, rrf_fuse, text_to_sparse,
69};
70
71/// Errors returned by the embeddable database.
72#[derive(Debug, Error)]
73#[non_exhaustive]
74pub enum Error {
75    /// An error from the storage engine (includes not-found / already-exists /
76    /// invalid-argument from the catalog and write path).
77    #[error(transparent)]
78    Core(#[from] quiver_core::CoreError),
79    /// An error from the vector index.
80    #[error(transparent)]
81    Index(#[from] quiver_index::IndexError),
82    /// An error from the disk-resident index (build, open, or query).
83    #[error(transparent)]
84    Disk(#[from] quiver_index::DiskError),
85    /// A payload could not be (de)serialized as JSON.
86    #[error("payload json error: {0}")]
87    Json(#[from] serde_json::Error),
88    /// The named collection is not loaded in this database.
89    #[error("collection not found: {0}")]
90    CollectionNotFound(String),
91    /// The requested index / metric combination is not supported.
92    #[error("unsupported configuration: {0}")]
93    Unsupported(&'static str),
94    /// A durable index snapshot could not be restored (ADR-0025); the caller
95    /// falls back to rebuilding from the store, so this does not surface to users.
96    #[error(transparent)]
97    IndexSnapshot(#[from] quiver_index::SnapshotError),
98    /// An index snapshot envelope could not be (de)serialized.
99    #[error("index snapshot envelope: {0}")]
100    Envelope(#[from] postcard::Error),
101}
102
103/// Result alias for database operations.
104pub type Result<T> = std::result::Result<T, Error>;
105
106/// What a [`Database::snapshot`] captured (ADR-0050): the catalog generation and
107/// the number of files / bytes copied.
108#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
109pub struct SnapshotInfo {
110    /// The manifest version the snapshot reflects (its consistent LSN anchor).
111    pub manifest_version: u64,
112    /// Number of files copied into the snapshot directory.
113    pub files: u64,
114    /// Total bytes copied.
115    pub bytes: u64,
116}
117
118/// A single search or fetch result.
119#[derive(Debug, Clone, PartialEq)]
120pub struct Match {
121    /// External id of the point.
122    pub id: String,
123    /// Distance / similarity under the collection metric (0 for a direct fetch).
124    pub score: f32,
125    /// The payload, if requested.
126    pub payload: Option<Value>,
127    /// The vector, if requested.
128    pub vector: Option<Vec<f32>>,
129}
130
131/// A multi-vector (late-interaction / ColBERT) document result: a document id, its
132/// MaxSim relevance, the payload, and — if requested — the document's token
133/// vectors (ADR-0028).
134#[derive(Debug, Clone, PartialEq)]
135pub struct DocumentMatch {
136    /// Document id.
137    pub id: String,
138    /// MaxSim relevance score (0 for a direct fetch).
139    pub score: f32,
140    /// The document payload, if requested / present (stored on the anchor token).
141    pub payload: Option<Value>,
142    /// The document's token vectors, if requested.
143    pub vectors: Option<Vec<Vec<f32>>>,
144}
145
146// A candidate document during multi-vector re-ranking, before it becomes a
147// [`DocumentMatch`]: `(MaxSim score, document id, anchor payload, token vectors
148// when requested)`. Named so the re-rank buffer stays under clippy's
149// type-complexity threshold.
150type ScoredDocument = (f32, String, Option<Value>, Option<Vec<Vec<f32>>>);
151
152/// Parameters for a [`Database::search`].
153#[derive(Debug, Clone)]
154pub struct SearchParams {
155    /// Number of results to return.
156    pub k: usize,
157    /// Optional payload predicate. The planner pre-filters through the
158    /// collection's secondary indexes when the filter is selective enough, and
159    /// otherwise post-filters the ANN candidates; either way the full predicate
160    /// is re-checked, so results are exact.
161    pub filter: Option<Filter>,
162    /// Search beam width (recall/latency knob), clamped up to at least `k`.
163    pub ef_search: usize,
164    /// Include payloads in the results.
165    pub with_payload: bool,
166    /// Include vectors in the results.
167    pub with_vector: bool,
168}
169
170impl Default for SearchParams {
171    fn default() -> Self {
172        Self {
173            k: 10,
174            filter: None,
175            ef_search: 64,
176            with_payload: true,
177            with_vector: false,
178        }
179    }
180}
181
182// How many extra candidates to pull before post-filtering, so a filtered query
183// still has enough survivors to fill `k`.
184const FILTER_OVERFETCH: usize = 8;
185
186// Hybrid search (ADR-0043) pulls `k * RRF_CANDIDATE_FACTOR` (at least
187// `MIN_RRF_CANDIDATES`) candidates from each side before Reciprocal Rank Fusion,
188// so a document ranked outside the top `k` on one side can still surface via the
189// other.
190const RRF_CANDIDATE_FACTOR: usize = 10;
191const MIN_RRF_CANDIDATES: usize = 100;
192
193// Selectivity threshold for the hybrid planner. When a payload filter decomposes
194// into secondary-index predicates that narrow a query to at most this many live
195// candidate rows, those rows are scanned exactly (brute force) instead of going
196// through the ANN index. Below this size an exact scan is both cheaper and
197// higher-recall than filtered ANN, which can return too few results when the
198// filter is very selective (the "filtered-search recall cliff"). Qdrant calls
199// the equivalent knob its full-scan threshold.
200const FULL_SCAN_THRESHOLD: usize = 10_000;
201
202// Soft-deleted fraction at which a built HNSW is rebuilt from the store to
203// reclaim its tombstoned graph nodes (ADR-0026). Below it, a delete is an O(1)
204// soft-delete; at it, the next access rebuilds.
205const HNSW_REBUILD_DELETED_FRACTION: f64 = 0.2;
206
207/// Pending graph work — delta inserts plus tombstones — at which a FreshDiskANN
208/// graph collection consolidates: the next access rebuilds the base from the
209/// store, reclaiming tombstones and folding in the delta (ADR-0033). Below it,
210/// inserts go to the in-memory delta and deletes are `O(1)` tombstones. Mirrors
211/// [`HNSW_REBUILD_DELETED_FRACTION`].
212const GRAPH_REBUILD_PENDING_FRACTION: f64 = 0.2;
213
214// The vector index backing one collection. HNSW and (once built) IVF are
215// maintained incrementally; the Vamana and disk graphs are batch-built from the
216// store (the `Option` is `None` until first build) and then maintained
217// incrementally the FreshDiskANN way — a read-only base graph plus an in-memory
218// delta and deletion set, consolidated by a rebuild past a churn threshold
219// (ADR-0033).
220enum CollectionIndex {
221    // A fetch-only collection with no server-side ANN index: a client-side-encrypted
222    // collection (ADR-0032) stores opaque ciphertext the server never ranks, so the
223    // client fetches points and ranks locally.
224    None,
225    Hnsw(Hnsw),
226    Vamana(Option<FreshVamana>),
227    Ivf(Option<Ivf>),
228    // The disk-resident DiskANN index: PQ codes in RAM, graph + full vectors on
229    // (encrypted) SSD, exact re-rank (ADR-0019), with a FreshDiskANN in-memory
230    // delta layered on top so the on-disk artifact stays immutable.
231    Disk(Option<FreshDiskVamana>),
232    // The ColBERTv2/PLAID compressed token-pool index for a multi-vector
233    // collection (ADR-0034): centroid + residual-PQ codes with centroid-pruned
234    // candidate generation.
235    Colbert(Option<ColbertIndex>),
236}
237
238impl CollectionIndex {
239    // Search, mapping the generic `ef` knob onto each index's search width.
240    fn search(&self, query: &[f32], k: usize, ef: usize) -> Result<Vec<Neighbor>> {
241        Ok(match self {
242            CollectionIndex::Hnsw(h) => h.search(query, k, ef)?,
243            CollectionIndex::Vamana(Some(g)) => g.search(query, k, ef)?,
244            CollectionIndex::Ivf(Some(i)) => i.search(query, k, ef)?,
245            CollectionIndex::Disk(Some(d)) => d.search(query, k, ef)?,
246            CollectionIndex::Colbert(Some(c)) => c.search(query, k, ef)?,
247            CollectionIndex::None
248            | CollectionIndex::Vamana(None)
249            | CollectionIndex::Ivf(None)
250            | CollectionIndex::Disk(None)
251            | CollectionIndex::Colbert(None) => Vec::new(),
252        })
253    }
254}
255
256/// On-disk envelope (ADR-0025) for a durable IVF snapshot: the `Ivf` bytes plus
257/// the internal->external id mapping they are addressed by, postcard-encoded and
258/// handed to the store as one opaque blob. On open the envelope is decoded, the
259/// `Ivf` restored, and the post-checkpoint WAL tail replayed. A decode/version
260/// error means "rebuild from the store" — the snapshot is only ever a fast path.
261#[derive(Serialize, Deserialize)]
262struct IndexEnvelope {
263    version: u16,
264    int_to_ext: Vec<String>,
265    ivf: Vec<u8>,
266}
267
268// Envelope format version, independent of the product SemVer (and of the inner
269// `Ivf` snapshot version); a mismatch falls back to a rebuild.
270const INDEX_ENVELOPE_VERSION: u16 = 1;
271
272struct CollectionHandle {
273    id: CollectionId,
274    descriptor: Descriptor,
275    index: CollectionIndex,
276    int_to_ext: Vec<String>,
277    ext_to_int: HashMap<String, u64>,
278    stale: bool,
279    // Monotonic per-collection write counter, bumped every time a write defers a
280    // rebuild (`mark_stale`). An off-lock rebuild (ADR-0062) captures it before
281    // building and re-checks it at commit: if it advanced, a write landed during
282    // the build, so the freshly built index is already behind — the commit installs
283    // it (still newer than the prior snapshot) but leaves the handle stale so the
284    // next rebuild catches up. Wrapping is fine: equality, not ordering, is what the
285    // commit checks.
286    write_gen: u64,
287    // For a multi-vector (ColBERT) collection: each document id mapped to its
288    // token count, so a re-rank can gather all of a document's token rows
289    // (`<doc-id><US><ordinal>`) and `document_count` is O(1). `None` for a
290    // single-vector collection. Maintained eagerly on document writes and rebuilt
291    // authoritatively from the store on open / rebuild; never persisted (ADR-0028).
292    docs: Option<BTreeMap<String, u32>>,
293    // Derived inverted index over the collection's `__quiver_sparse__` payloads,
294    // for the sparse half of hybrid search (ADR-0045). `Some` for a single-vector,
295    // server-searchable collection; `None` for multi-vector and client-side-encrypted
296    // collections (which never run hybrid search) — and as a backstop, when `None`
297    // the sparse ranking falls back to a full store scan. Built on rebuild from the
298    // store and maintained incrementally on upsert/delete; never persisted.
299    sparse: Option<SparseInvertedIndex>,
300}
301
302// Whether a collection should carry a derived sparse inverted index: only
303// single-vector, server-searchable collections run hybrid search (ADR-0045).
304fn uses_sparse_index(descriptor: &Descriptor) -> bool {
305    !descriptor.multivector && descriptor.vector_encryption != VectorEncryption::ClientSide
306}
307
308// Mark a collection's index stale: a write the index could not absorb in place
309// defers a full rebuild. Also bumps the write generation (every staleness-marking
310// write is a write).
311fn mark_stale(handle: &mut CollectionHandle) {
312    handle.stale = true;
313    bump_write_gen(handle);
314}
315
316// Bump a collection's monotonic write generation. Called on **every** write that
317// touches the store — including writes that land while the index is already stale,
318// which the incremental maintenance skips. The off-lock rebuild (ADR-0062) captures
319// this before scanning and re-checks it at commit: if it advanced, a write landed
320// during the build, so the freshly built index is already behind and the commit
321// leaves the collection stale for the next rebuild. Wrapping is fine — the commit
322// checks equality, not ordering — and over-counting is harmless (at worst one extra
323// rebuild); only *missing* a bump could lose a write.
324fn bump_write_gen(handle: &mut CollectionHandle) {
325    handle.write_gen = handle.write_gen.wrapping_add(1);
326}
327
328/// An in-process Quiver database over one data directory.
329pub struct Database {
330    store: Store,
331    collections: HashMap<String, CollectionHandle>,
332}
333
334impl Database {
335    /// Open (creating if absent) the database at `dir` with encryption-at-rest
336    /// disabled, rebuilding each collection's index from the store.
337    pub fn open(dir: &Path) -> Result<Self> {
338        Self::from_store(Store::open(dir)?)
339    }
340
341    /// Open the database with a specific page codec — used to enable
342    /// encryption-at-rest by passing `quiver-crypto`'s AEAD codec. Mirrors
343    /// [`quiver_core::Store::open_with_codec`]; the codec seals both paged files
344    /// and the WAL, so no plaintext user data reaches the disk.
345    pub fn open_with_codec(dir: &Path, codec: Box<dyn PageCodec>) -> Result<Self> {
346        Self::from_store(Store::open_with_codec(dir, codec)?)
347    }
348
349    /// Open the database with a [`KeyRing`], the seam that lets `quiver-crypto`'s
350    /// envelope key-ring seal each collection under its own data-encryption key
351    /// (enabling crypto-shredding). Mirrors
352    /// [`quiver_core::Store::open_with_keyring`].
353    pub fn open_with_keyring(dir: &Path, keyring: Box<dyn KeyRing>) -> Result<Self> {
354        Self::from_store(Store::open_with_keyring(dir, keyring)?)
355    }
356
357    // Build the in-memory handles (and their HNSW indexes) over an opened store.
358    fn from_store(store: Store) -> Result<Self> {
359        let mut collections = HashMap::new();
360        for name in store.collection_names() {
361            let Some(id) = store.collection_id(&name) else {
362                continue;
363            };
364            let Some(descriptor) = store.descriptor(id).cloned() else {
365                continue;
366            };
367            let mut handle = CollectionHandle {
368                id,
369                index: empty_index(&descriptor),
370                descriptor,
371                int_to_ext: Vec::new(),
372                ext_to_int: HashMap::new(),
373                stale: true,
374                write_gen: 0,
375                docs: None,
376                // Populated by `load_index` / `rebuild_index` from the store.
377                sparse: None,
378            };
379            load_index(&store, &mut handle)?;
380            collections.insert(name, handle);
381        }
382        Ok(Self { store, collections })
383    }
384
385    /// Create a collection. Errors if the name already exists, or if the index
386    /// specification is unsupported for the metric.
387    pub fn create_collection(&mut self, name: &str, descriptor: Descriptor) -> Result<()> {
388        validate_index(&descriptor)?;
389        let id = self.store.create_collection(name, descriptor.clone())?;
390        let index = empty_index(&descriptor);
391        let docs = descriptor.multivector.then(BTreeMap::new);
392        // A fresh single-vector collection starts with an empty inverted index
393        // maintained incrementally from the first upsert (an empty index allocates
394        // nothing until a sparse vector arrives).
395        let sparse = uses_sparse_index(&descriptor).then(SparseInvertedIndex::new);
396        self.collections.insert(
397            name.to_owned(),
398            CollectionHandle {
399                id,
400                descriptor,
401                index,
402                int_to_ext: Vec::new(),
403                ext_to_int: HashMap::new(),
404                stale: false,
405                write_gen: 0,
406                docs,
407                sparse,
408            },
409        );
410        Ok(())
411    }
412
413    /// Drop a collection and its data. Returns whether it existed.
414    pub fn drop_collection(&mut self, name: &str) -> Result<bool> {
415        let existed = self.store.drop_collection(name)?;
416        self.collections.remove(name);
417        Ok(existed)
418    }
419
420    /// Crypto-shred a collection: drop it and destroy its data-encryption key, so
421    /// its sealed data is unrecoverable even with the master key, then reclaim
422    /// its files. Mirrors [`quiver_core::Store::shred_collection`]; with an
423    /// envelope key-ring this is irreversible erasure, with a single-codec
424    /// key-ring it is `drop` plus a checkpoint. Returns whether it existed.
425    pub fn shred_collection(&mut self, name: &str) -> Result<bool> {
426        let existed = self.store.shred_collection(name)?;
427        self.collections.remove(name);
428        Ok(existed)
429    }
430
431    /// Install a replication commit observer, invoked with each committed
432    /// [`WalEntry`] in commit order (ADR-0030). The server uses this to drive a
433    /// leader's replication stream.
434    pub fn set_commit_observer(&mut self, observer: CommitObserver) {
435        self.store.set_commit_observer(observer);
436    }
437
438    /// The operations that recreate the current logical state, for a replication
439    /// follower to bootstrap from (ADR-0030).
440    ///
441    /// # Errors
442    /// Propagates a store read error.
443    pub fn replication_snapshot(&self) -> Result<Vec<WalOp>> {
444        Ok(self.store.replication_snapshot()?)
445    }
446
447    /// Apply a replicated operation from a leader (ADR-0030): persist and apply it
448    /// to the store (preserving the leader's collection id), then reconcile the
449    /// in-memory index handles — register a new collection, drop a removed one, or
450    /// mark a touched collection's index stale so the next read rebuilds from the
451    /// replicated state.
452    ///
453    /// # Errors
454    /// Propagates a store apply error.
455    pub fn apply_replicated(&mut self, op: WalOp) -> Result<()> {
456        let target = match &op {
457            WalOp::CreateCollection { collection_id, .. }
458            | WalOp::DropCollection { collection_id }
459            | WalOp::Upsert { collection_id, .. }
460            | WalOp::Delete { collection_id, .. } => Some(*collection_id),
461            WalOp::Checkpoint { .. } => None,
462        };
463        let create_name = match &op {
464            WalOp::CreateCollection { name, .. } => Some(name.clone()),
465            _ => None,
466        };
467        let is_drop = matches!(op, WalOp::DropCollection { .. });
468        self.store.apply_replicated(op)?;
469
470        if let Some(name) = create_name {
471            // Register a fresh handle for the newly replicated collection.
472            if let Some(id) = target
473                && let Some(descriptor) = self.store.descriptor(id).cloned()
474            {
475                let docs = descriptor.multivector.then(BTreeMap::new);
476                let index = empty_index(&descriptor);
477                // Replicated writes mark the handle stale, so the next read rebuilds
478                // the inverted index from the replicated store.
479                self.collections.insert(
480                    name,
481                    CollectionHandle {
482                        id,
483                        descriptor,
484                        index,
485                        int_to_ext: Vec::new(),
486                        ext_to_int: HashMap::new(),
487                        stale: false,
488                        write_gen: 0,
489                        docs,
490                        sparse: None,
491                    },
492                );
493            }
494        } else if is_drop {
495            if let Some(id) = target {
496                self.collections.retain(|_, h| h.id != id);
497            }
498        } else if let Some(id) = target
499            && let Some(handle) = self.collections.values_mut().find(|h| h.id == id)
500        {
501            mark_stale(handle);
502        }
503        Ok(())
504    }
505
506    /// Names of all collections, sorted.
507    #[must_use]
508    pub fn collection_names(&self) -> Vec<String> {
509        self.store.collection_names()
510    }
511
512    /// The descriptor of a collection, if it exists.
513    #[must_use]
514    pub fn descriptor(&self, name: &str) -> Option<&Descriptor> {
515        self.collections.get(name).map(|h| &h.descriptor)
516    }
517
518    /// Number of live points in a collection.
519    pub fn len(&self, name: &str) -> Result<usize> {
520        let handle = self.handle(name)?;
521        Ok(self.store.len(handle.id)?)
522    }
523
524    /// Whether a collection has no points.
525    pub fn is_empty(&self, name: &str) -> Result<bool> {
526        Ok(self.len(name)? == 0)
527    }
528
529    /// Insert or replace a point with a JSON payload.
530    pub fn upsert(
531        &mut self,
532        collection: &str,
533        id: &str,
534        vector: &[f32],
535        payload: &Value,
536    ) -> Result<()> {
537        let handle = self
538            .collections
539            .get_mut(collection)
540            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
541        require_single_vector(handle)?;
542        let payload_bytes = serde_json::to_vec(payload)?;
543        self.store.upsert(handle.id, id, vector, &payload_bytes)?;
544        // Client-side-encrypted collections have no server-side index to maintain
545        // (ADR-0032): the stored vector is an opaque placeholder the server never
546        // ranks. The point is durable in the store; nothing else to do.
547        if handle.descriptor.vector_encryption == VectorEncryption::ClientSide {
548            return Ok(());
549        }
550        // Maintain the in-memory index in place where the kind allows it, else
551        // defer to a lazy rebuild on the next search (ADR-0023/0026/0033).
552        index_upsert_point(handle, id, vector)?;
553        // Keep the derived sparse inverted index in step (ADR-0045).
554        sparse_index_upsert_point(handle, id, payload);
555        Ok(())
556    }
557
558    /// Upsert a batch of points with a single WAL `fdatasync` (ADR-0038).
559    ///
560    /// `points` is `(id, vector, payload)` tuples.  The batch is committed
561    /// atomically — all points or none (from the client's perspective).  This
562    /// is the preferred path for the REST `POST /v1/collections/{c}/points`
563    /// handler which already delivers a batch per HTTP request.
564    pub fn upsert_batch(
565        &mut self,
566        collection: &str,
567        points: &[(&str, &[f32], &serde_json::Value)],
568    ) -> Result<u64> {
569        let handle = self
570            .collections
571            .get(collection)
572            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
573        require_single_vector(handle)?;
574        let coll_id = handle.id;
575        let is_client_side = handle.descriptor.vector_encryption == VectorEncryption::ClientSide;
576
577        let payload_bytes: Vec<Vec<u8>> = points
578            .iter()
579            .map(|(_, _, p)| serde_json::to_vec(p).map_err(Error::Json))
580            .collect::<Result<_>>()?;
581
582        let records: Vec<(&str, &[f32], &[u8])> = points
583            .iter()
584            .zip(payload_bytes.iter())
585            .map(|((id, vec, _), p)| (*id, *vec, p.as_slice()))
586            .collect();
587
588        self.store.upsert_batch(coll_id, &records)?;
589
590        if is_client_side {
591            return Ok(records.len() as u64);
592        }
593
594        for (id, vector, payload) in points {
595            let handle = self
596                .collections
597                .get_mut(collection)
598                .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
599            index_upsert_point(handle, id, vector)?;
600            sparse_index_upsert_point(handle, id, payload);
601        }
602        Ok(records.len() as u64)
603    }
604
605    /// Upsert a large batch for a bulk load, deferring all index work to a single
606    /// rebuild pass (ADR-0045).
607    ///
608    /// Like [`upsert_batch`](Self::upsert_batch) the points are committed with one
609    /// WAL `fdatasync`, but instead of folding each point into the in-memory index
610    /// one at a time, the collection's index is marked **stale** so the next search
611    /// rebuilds it in a single pass over the whole collection — far cheaper for a
612    /// fresh load (one k-means for IVF, one graph build for Vamana, one inverted-index
613    /// scan) than N incremental inserts. Prefer `upsert_batch` for steady-state
614    /// writes where query-after-write latency matters.
615    pub fn upsert_bulk(
616        &mut self,
617        collection: &str,
618        points: &[(&str, &[f32], &serde_json::Value)],
619    ) -> Result<u64> {
620        let handle = self
621            .collections
622            .get_mut(collection)
623            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
624        require_single_vector(handle)?;
625        let coll_id = handle.id;
626
627        let payload_bytes: Vec<Vec<u8>> = points
628            .iter()
629            .map(|(_, _, p)| serde_json::to_vec(p).map_err(Error::Json))
630            .collect::<Result<_>>()?;
631        let records: Vec<(&str, &[f32], &[u8])> = points
632            .iter()
633            .zip(payload_bytes.iter())
634            .map(|((id, vec, _), p)| (*id, *vec, p.as_slice()))
635            .collect();
636
637        self.store.upsert_batch(coll_id, &records)?;
638
639        // Defer the dense and sparse index maintenance to a single rebuild on the
640        // next read (a client-side-encrypted collection has no index to rebuild, but
641        // marking it stale is harmless — its rebuild produces the same no-op index).
642        let handle = self
643            .collections
644            .get_mut(collection)
645            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
646        mark_stale(handle);
647        Ok(records.len() as u64)
648    }
649
650    /// Delete a point by id. Returns whether it existed.
651    pub fn delete(&mut self, collection: &str, id: &str) -> Result<bool> {
652        let handle = self
653            .collections
654            .get_mut(collection)
655            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
656        require_single_vector(handle)?;
657        let existed = self.store.delete(handle.id, id)?;
658        if !existed {
659            return Ok(false);
660        }
661        // No server-side index to update for client-side-encrypted collections
662        // (ADR-0032); the store delete is authoritative.
663        if handle.descriptor.vector_encryption == VectorEncryption::ClientSide {
664            return Ok(true);
665        }
666        // A built IVF removes in place (ADR-0023), a built HNSW soft-deletes
667        // (ADR-0026), and a built FreshDiskANN graph tombstones in its deletion set
668        // (ADR-0033); other kinds defer to a rebuild. The id->internal mapping is
669        // kept so a later re-insert allocates afresh — a removed or soft-deleted
670        // internal is simply never returned by the index.
671        index_delete_point(handle, id);
672        // Drop the point from the derived sparse inverted index too (ADR-0045).
673        sparse_index_delete_point(handle, id);
674        Ok(true)
675    }
676
677    /// Fetch a single point by id, with its payload and vector.
678    pub fn get(&self, collection: &str, id: &str) -> Result<Option<Match>> {
679        let handle = self.handle(collection)?;
680        require_single_vector(handle)?;
681        match self.store.get(handle.id, id)? {
682            Some(record) => Ok(Some(Match {
683                id: id.to_owned(),
684                score: 0.0,
685                payload: Some(serde_json::from_slice(&record.payload)?),
686                vector: Some(record.vector),
687            })),
688            None => Ok(None),
689        }
690    }
691
692    /// Fetch points without ranking — an optional cleartext payload `filter`
693    /// narrows the set and `limit` bounds it. This is the retrieval path for a
694    /// client-side-encrypted collection (ADR-0032): the server returns the entitled
695    /// set (each point's payload carries the sealed vector blob under the reserved
696    /// `__quiver_vec__` key) and the client decrypts and ranks locally. It also
697    /// serves as a general "list points" primitive for any single-vector collection.
698    ///
699    /// Results come in the store's scan order, not by relevance; the filter is
700    /// re-checked exactly against each candidate (a selective filter could use the
701    /// secondary index in future — today it scans).
702    ///
703    /// # Errors
704    /// Errors if the collection does not exist or is multi-vector.
705    pub fn fetch(
706        &self,
707        collection: &str,
708        filter: Option<&Filter>,
709        limit: usize,
710        with_payload: bool,
711        with_vector: bool,
712    ) -> Result<Vec<Match>> {
713        let handle = self.handle(collection)?;
714        require_single_vector(handle)?;
715        let mut out = Vec::new();
716        for (id, record) in self.store.scan(handle.id)? {
717            if out.len() >= limit {
718                break;
719            }
720            let payload: Value = serde_json::from_slice(&record.payload)?;
721            if let Some(filter) = filter
722                && !filter.matches(&payload)
723            {
724                continue;
725            }
726            out.push(Match {
727                id,
728                score: 0.0,
729                payload: with_payload.then_some(payload),
730                vector: with_vector.then_some(record.vector),
731            });
732        }
733        Ok(out)
734    }
735
736    /// Rebuild a collection's index if a prior write deferred it (the `stale`
737    /// flag), making the collection's read snapshot current. Idempotent and cheap
738    /// when already fresh. Separating this `&mut self` maintenance from the `&self`
739    /// `*_snapshot` reads is what lets a server serve concurrent reads behind a
740    /// shared lock and take the exclusive lock only for the rare rebuild (ADR-0057).
741    pub fn ensure_indexed(&mut self, collection: &str) -> Result<()> {
742        if self.handle(collection)?.stale {
743            let store = &self.store;
744            let handle = self
745                .collections
746                .get_mut(collection)
747                .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
748            rebuild_index(store, handle)?;
749        }
750        Ok(())
751    }
752
753    /// Whether a collection's index is stale — a prior write deferred its rebuild.
754    /// The server reads this to schedule an **off-lock** rebuild (ADR-0062) without
755    /// holding the exclusive lock; embedded callers never need it (the `&mut self`
756    /// searches rebuild synchronously via [`Database::ensure_indexed`]).
757    pub fn needs_rebuild(&self, collection: &str) -> Result<bool> {
758        Ok(self.handle(collection)?.stale)
759    }
760
761    /// Capture everything an off-lock rebuild needs (ADR-0062): under the shared
762    /// read lock the caller already holds, scan the collection's live rows and
763    /// record the write generation. Returns `None` when the index is already fresh
764    /// (nothing to rebuild). The expensive build then runs with **no lock** via
765    /// [`RebuildInputs::build`], and [`Database::commit_rebuild`] installs it.
766    pub fn snapshot_rebuild_inputs(&self, collection: &str) -> Result<Option<RebuildInputs>> {
767        let handle = self.handle(collection)?;
768        if !handle.stale {
769            return Ok(None);
770        }
771        let scan = scan_collection(&self.store, handle)?;
772        Ok(Some(RebuildInputs {
773            collection: collection.to_owned(),
774            descriptor: handle.descriptor.clone(),
775            scan,
776            write_gen: handle.write_gen,
777        }))
778    }
779
780    /// Install an index built off-lock (ADR-0062) under the brief exclusive lock the
781    /// caller holds. Returns whether the collection is **still** stale: if a write
782    /// landed during the build (the write generation advanced), the fresh index is
783    /// already behind, so it is installed (still newer than the prior snapshot) but
784    /// the handle stays stale for the next rebuild. A collection dropped or replaced
785    /// during the build is ignored (`Ok(false)`) — the build is discarded.
786    pub fn commit_rebuild(&mut self, rebuilt: RebuiltIndex) -> Result<bool> {
787        let store = &self.store;
788        let Some(handle) = self.collections.get_mut(&rebuilt.collection) else {
789            return Ok(false);
790        };
791        match rebuilt.kind {
792            RebuiltKind::Ready(index) => handle.index = *index,
793            RebuiltKind::Disk { graph, pq } => {
794                // Drop the prior index before sealing the artifact in place: its
795                // `mmap` assumes an immutable file. Safe under the write lock — no
796                // read can observe the momentary empty index.
797                handle.index = empty_index(&handle.descriptor);
798                let disk = write_disk_index(store, handle.id, &graph, &pq)?;
799                handle.index = CollectionIndex::Disk(Some(FreshDiskVamana::new(disk)?));
800            }
801        }
802        handle.int_to_ext = rebuilt.int_to_ext;
803        handle.ext_to_int = rebuilt.ext_to_int;
804        handle.docs = rebuilt.docs;
805        handle.sparse = rebuilt.sparse;
806        // Clear stale only if no write landed since the inputs were captured;
807        // otherwise leave it set so the driver rebuilds again for the newer write.
808        let still_stale = handle.write_gen != rebuilt.write_gen;
809        handle.stale = still_stale;
810        Ok(still_stale)
811    }
812
813    /// Search a collection for the nearest points to `query`, optionally
814    /// post-filtered by payload predicate.
815    pub fn search(
816        &mut self,
817        collection: &str,
818        query: &[f32],
819        params: &SearchParams,
820    ) -> Result<Vec<Match>> {
821        // Embedded read-your-writes: rebuild a deferred index first (synchronously),
822        // then read the now-fresh snapshot. The server instead serves the prior
823        // snapshot and rebuilds off-lock (ADR-0062), so concurrent readers never
824        // block on a writer's rebuild.
825        self.ensure_indexed(collection)?;
826        self.search_snapshot(collection, query, params)
827    }
828
829    /// Search a collection's **current immutable snapshot** for the nearest points
830    /// to `query`, optionally post-filtered by payload predicate. Takes `&self`, so
831    /// many readers run concurrently. When a prior write deferred this collection's
832    /// rebuild, it serves the **prior** snapshot (a snapshot-isolated, slightly stale
833    /// read — ADR-0062/0053); the caller schedules a rebuild via
834    /// [`Database::needs_rebuild`].
835    pub fn search_snapshot(
836        &self,
837        collection: &str,
838        query: &[f32],
839        params: &SearchParams,
840    ) -> Result<Vec<Match>> {
841        require_single_vector(self.handle(collection)?)?;
842        require_server_searchable(self.handle(collection)?)?;
843
844        let handle = self.handle(collection)?;
845
846        // Hybrid planning: if the filter narrows to a small, secondary-indexed
847        // candidate set, scan those rows exactly instead of post-filtering ANN
848        // hits — exact, and immune to the filtered-ANN recall cliff.
849        if let Some(filter) = &params.filter
850            && let Some(candidates) = candidate_ids(
851                &self.store,
852                handle.id,
853                filter,
854                &handle.descriptor.filterable,
855            )?
856            && candidates.len() <= FULL_SCAN_THRESHOLD
857        {
858            return self.exact_filtered_search(
859                handle.id,
860                &handle.descriptor,
861                query,
862                params,
863                filter,
864                &candidates,
865            );
866        }
867
868        let fetch = if params.filter.is_some() {
869            params
870                .k
871                .saturating_mul(FILTER_OVERFETCH)
872                .max(params.ef_search)
873        } else {
874            params.k
875        };
876        let raw = handle.index.search(query, fetch, params.ef_search)?;
877
878        let need_record = params.filter.is_some() || params.with_payload || params.with_vector;
879        let mut out = Vec::with_capacity(params.k);
880        for neighbor in raw {
881            if out.len() >= params.k {
882                break;
883            }
884            let Some(ext_id) = handle.int_to_ext.get(neighbor.id as usize) else {
885                continue;
886            };
887            let record = if need_record {
888                self.store.get(handle.id, ext_id)?
889            } else {
890                None
891            };
892            let payload_value: Option<Value> = match &record {
893                Some(r) if params.filter.is_some() || params.with_payload => {
894                    Some(serde_json::from_slice(&r.payload)?)
895                }
896                _ => None,
897            };
898            if let Some(filter) = &params.filter {
899                let value = payload_value.as_ref().unwrap_or(&Value::Null);
900                if !filter.matches(value) {
901                    continue;
902                }
903            }
904            out.push(Match {
905                id: ext_id.clone(),
906                score: neighbor.distance,
907                payload: if params.with_payload {
908                    payload_value
909                } else {
910                    None
911                },
912                vector: if params.with_vector {
913                    record.map(|r| r.vector)
914                } else {
915                    None
916                },
917            });
918        }
919        Ok(out)
920    }
921
922    /// Hybrid search (ADR-0043/0046): fuse up to three rankings with Reciprocal
923    /// Rank Fusion — a dense ANN ranking, a sparse inverted-index dot-product
924    /// ranking (`sparse_query`), and a BM25 full-text ranking (`text_query`, scored
925    /// over the same inverted index). Any may be `None`; at least one is required,
926    /// giving pure dense / sparse / lexical or any blend through the same path. The
927    /// same payload `filter` is re-checked on every side, so results stay exact.
928    /// `rrf_k0` is the RRF rank-bias constant ([`DEFAULT_RRF_K0`]).
929    pub fn hybrid_search(
930        &mut self,
931        collection: &str,
932        dense_query: Option<&[f32]>,
933        sparse_query: Option<&SparseVector>,
934        text_query: Option<&str>,
935        params: &SearchParams,
936        rrf_k0: f32,
937    ) -> Result<Vec<Match>> {
938        // Embedded read-your-writes: rebuild a deferred index synchronously, then
939        // read the fresh snapshot (the server serves the prior snapshot and rebuilds
940        // off-lock — ADR-0062).
941        self.ensure_indexed(collection)?;
942        self.hybrid_search_snapshot(
943            collection,
944            dense_query,
945            sparse_query,
946            text_query,
947            params,
948            rrf_k0,
949        )
950    }
951
952    /// Hybrid search over the collection's current immutable snapshot (`&self`, so
953    /// readers run concurrently). When a prior write deferred the rebuild, it serves
954    /// the **prior** snapshot (snapshot-isolated, slightly stale — ADR-0062/0053);
955    /// the caller schedules a rebuild via [`Database::needs_rebuild`].
956    pub fn hybrid_search_snapshot(
957        &self,
958        collection: &str,
959        dense_query: Option<&[f32]>,
960        sparse_query: Option<&SparseVector>,
961        text_query: Option<&str>,
962        params: &SearchParams,
963        rrf_k0: f32,
964    ) -> Result<Vec<Match>> {
965        require_single_vector(self.handle(collection)?)?;
966        require_server_searchable(self.handle(collection)?)?;
967        if dense_query.is_none() && sparse_query.is_none() && text_query.is_none() {
968            return Err(Error::Unsupported(
969                "hybrid_search requires a dense query, a sparse query, or a text query",
970            ));
971        }
972        let handle = self.handle(collection)?;
973
974        // Pull a deep-enough candidate list from each side so the fusion is
975        // meaningful, then RRF down to `k`.
976        let depth = params
977            .k
978            .saturating_mul(RRF_CANDIDATE_FACTOR)
979            .max(MIN_RRF_CANDIDATES);
980        let filter = params.filter.as_ref();
981        let mut lists: Vec<Vec<String>> = Vec::new();
982        if let Some(q) = dense_query {
983            lists.push(self.dense_ranked_ids(handle, q, depth, params.ef_search, filter)?);
984        }
985        if let Some(sp) = sparse_query {
986            lists.push(self.sparse_ranked_ids(handle, sp, depth, filter)?);
987        }
988        if let Some(text) = text_query {
989            lists.push(self.bm25_ranked_ids(handle, text, depth, filter)?);
990        }
991        let fused = rrf_fuse(&lists, rrf_k0, params.k);
992
993        let mut out = Vec::with_capacity(fused.len());
994        for (ext_id, score) in fused {
995            let record = if params.with_payload || params.with_vector {
996                self.store.get(handle.id, &ext_id)?
997            } else {
998                None
999            };
1000            let payload = match (&record, params.with_payload) {
1001                (Some(r), true) => Some(serde_json::from_slice(&r.payload)?),
1002                _ => None,
1003            };
1004            out.push(Match {
1005                id: ext_id,
1006                score,
1007                payload,
1008                vector: if params.with_vector {
1009                    record.map(|r| r.vector)
1010                } else {
1011                    None
1012                },
1013            });
1014        }
1015        Ok(out)
1016    }
1017
1018    // Dense candidates as a ranked list of external ids (the filter re-checked),
1019    // for hybrid fusion.
1020    fn dense_ranked_ids(
1021        &self,
1022        handle: &CollectionHandle,
1023        query: &[f32],
1024        depth: usize,
1025        ef_search: usize,
1026        filter: Option<&Filter>,
1027    ) -> Result<Vec<String>> {
1028        let raw = handle.index.search(query, depth, ef_search.max(depth))?;
1029        let mut ids = Vec::new();
1030        for neighbor in raw {
1031            let Some(ext_id) = handle.int_to_ext.get(neighbor.id as usize) else {
1032                continue;
1033            };
1034            if !self.passes_filter(handle.id, ext_id, filter)? {
1035                continue;
1036            }
1037            ids.push(ext_id.clone());
1038            if ids.len() >= depth {
1039                break;
1040            }
1041        }
1042        Ok(ids)
1043    }
1044
1045    // Sparse candidates as a ranked list of external ids. With the derived inverted
1046    // index present (the common case), score only the query's nonzero dimensions via
1047    // the posting lists, then re-check the filter on the ranked ids until `depth` are
1048    // filled — so low-scored rows never load a payload (ADR-0045). When the index is
1049    // absent (a not-yet-rebuilt or client-side collection), fall back to the full
1050    // store scan, which stays correct under the incremental upsert/delete path.
1051    fn sparse_ranked_ids(
1052        &self,
1053        handle: &CollectionHandle,
1054        query: &SparseVector,
1055        depth: usize,
1056        filter: Option<&Filter>,
1057    ) -> Result<Vec<String>> {
1058        if let Some(idx) = handle.sparse.as_ref() {
1059            let mut ids = Vec::new();
1060            for (ext_id, _score) in idx.search(query) {
1061                if !self.passes_filter(handle.id, &ext_id, filter)? {
1062                    continue;
1063                }
1064                ids.push(ext_id);
1065                if ids.len() >= depth {
1066                    break;
1067                }
1068            }
1069            return Ok(ids);
1070        }
1071        self.sparse_ranked_ids_by_scan(handle.id, query, depth, filter)
1072    }
1073
1074    // The store-scan fallback for [`sparse_ranked_ids`]: load every row, score its
1075    // `__quiver_sparse__` vector by dot product against the query, re-check the
1076    // filter, and return the top `depth`. O(N-rows), but correct without an index.
1077    fn sparse_ranked_ids_by_scan(
1078        &self,
1079        cid: CollectionId,
1080        query: &SparseVector,
1081        depth: usize,
1082        filter: Option<&Filter>,
1083    ) -> Result<Vec<String>> {
1084        let qmap: HashMap<u32, f32> = query
1085            .indices
1086            .iter()
1087            .copied()
1088            .zip(query.values.iter().copied())
1089            .collect();
1090        let mut scored: Vec<(f32, String)> = Vec::new();
1091        for (ext_id, record) in self.store.scan(cid)? {
1092            if record.payload.is_empty() {
1093                continue;
1094            }
1095            let Ok(value) = serde_json::from_slice::<Value>(&record.payload) else {
1096                continue;
1097            };
1098            if let Some(filter) = filter
1099                && !filter.matches(&value)
1100            {
1101                continue;
1102            }
1103            let Some(raw) = value.get(SPARSE_KEY) else {
1104                continue;
1105            };
1106            let Ok(sv) = serde_json::from_value::<SparseVector>(raw.clone()) else {
1107                continue;
1108            };
1109            let mut score = 0.0f32;
1110            for (dim, weight) in sv.indices.iter().zip(sv.values.iter()) {
1111                if let Some(qw) = qmap.get(dim) {
1112                    score += qw * weight;
1113                }
1114            }
1115            if score > 0.0 {
1116                scored.push((score, ext_id));
1117            }
1118        }
1119        scored.sort_by(|a, b| b.0.total_cmp(&a.0).then(a.1.cmp(&b.1)));
1120        Ok(scored.into_iter().take(depth).map(|(_, id)| id).collect())
1121    }
1122
1123    // BM25 full-text candidates as a ranked list of external ids (ADR-0046): tokenize
1124    // the query text into term ids and score them with BM25 over the derived inverted
1125    // index, re-checking the filter on the ranked ids until `depth` are filled. BM25
1126    // needs the index's corpus statistics, so when a collection has no inverted index
1127    // (a not-yet-rebuilt or client-side collection — neither reachable here, since
1128    // hybrid rebuilds a stale handle and rejects client-side) there is nothing to
1129    // score and the list is empty; any dense side still contributes.
1130    fn bm25_ranked_ids(
1131        &self,
1132        handle: &CollectionHandle,
1133        query_text: &str,
1134        depth: usize,
1135        filter: Option<&Filter>,
1136    ) -> Result<Vec<String>> {
1137        let Some(idx) = handle.sparse.as_ref() else {
1138            return Ok(Vec::new());
1139        };
1140        let terms = query_term_ids(query_text);
1141        let mut ids = Vec::new();
1142        for (ext_id, _score) in idx.bm25_search(&terms, BM25_K1, BM25_B) {
1143            if !self.passes_filter(handle.id, &ext_id, filter)? {
1144                continue;
1145            }
1146            ids.push(ext_id);
1147            if ids.len() >= depth {
1148                break;
1149            }
1150        }
1151        Ok(ids)
1152    }
1153
1154    // Re-check a payload filter against a row (loading its payload). `None` filter
1155    // always passes.
1156    fn passes_filter(
1157        &self,
1158        cid: CollectionId,
1159        ext_id: &str,
1160        filter: Option<&Filter>,
1161    ) -> Result<bool> {
1162        let Some(filter) = filter else {
1163            return Ok(true);
1164        };
1165        let value: Value = match self.store.get(cid, ext_id)? {
1166            Some(r) => serde_json::from_slice(&r.payload)?,
1167            None => Value::Null,
1168        };
1169        Ok(filter.matches(&value))
1170    }
1171
1172    // Exactly score `candidates` (a superset of the filter's matches that the
1173    // secondary indexes produced) against the query, re-check the full filter
1174    // for correctness, and return the top `k`. The pre-filter arm of the hybrid
1175    // planner: perfect recall over an already-narrowed set.
1176    fn exact_filtered_search(
1177        &self,
1178        cid: CollectionId,
1179        descriptor: &Descriptor,
1180        query: &[f32],
1181        params: &SearchParams,
1182        filter: &Filter,
1183        candidates: &BTreeSet<String>,
1184    ) -> Result<Vec<Match>> {
1185        let metric = to_index_metric(descriptor.metric);
1186        let mut scored: Vec<(f32, String, Value, Vec<f32>)> = Vec::new();
1187        for ext_id in candidates {
1188            let Some(record) = self.store.get(cid, ext_id)? else {
1189                continue;
1190            };
1191            let payload: Value = serde_json::from_slice(&record.payload)?;
1192            if !filter.matches(&payload) {
1193                continue;
1194            }
1195            let ordering = ordering_distance(metric, query, &record.vector);
1196            scored.push((ordering, ext_id.clone(), payload, record.vector));
1197        }
1198        scored.sort_by(|a, b| a.0.total_cmp(&b.0));
1199        scored.truncate(params.k);
1200        Ok(scored
1201            .into_iter()
1202            .map(|(ordering, id, payload, vector)| Match {
1203                id,
1204                score: report_metric(metric, ordering),
1205                payload: params.with_payload.then_some(payload),
1206                vector: params.with_vector.then_some(vector),
1207            })
1208            .collect())
1209    }
1210
1211    /// Insert or replace a multi-vector (late-interaction / ColBERT) document: its
1212    /// `vectors` are stored as a group of token rows and its `payload` once on the
1213    /// anchor token (ADR-0028). Re-upserting a document first removes the tokens a
1214    /// shorter version would leave behind, so the document is replaced cleanly.
1215    ///
1216    /// # Errors
1217    /// Errors if the collection is single-vector, the document has no vectors, a
1218    /// vector's dimensionality is wrong, or the id contains the reserved separator.
1219    pub fn upsert_document(
1220        &mut self,
1221        collection: &str,
1222        doc_id: &str,
1223        vectors: &[Vec<f32>],
1224        payload: &Value,
1225    ) -> Result<()> {
1226        let handle = self
1227            .collections
1228            .get_mut(collection)
1229            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
1230        require_multivector(handle)?;
1231        if doc_id.contains(DOC_TOKEN_SEP) {
1232            return Err(Error::Unsupported(
1233                "document id must not contain the reserved 0x1f separator",
1234            ));
1235        }
1236        if vectors.is_empty() {
1237            return Err(Error::Unsupported("a document needs at least one vector"));
1238        }
1239        let dim = handle.descriptor.dim as usize;
1240        if vectors.iter().any(|v| v.len() != dim) {
1241            return Err(Error::Unsupported(
1242                "every document vector must match the collection dimensionality",
1243            ));
1244        }
1245        // Remove only the trailing tokens a shorter re-upsert leaves behind; the
1246        // rest are overwritten by the upserts below.
1247        let previous = handle
1248            .docs
1249            .as_ref()
1250            .and_then(|d| d.get(doc_id))
1251            .copied()
1252            .unwrap_or(0) as usize;
1253        for j in vectors.len()..previous {
1254            self.store.delete(handle.id, &token_id(doc_id, j))?;
1255            // Tombstone the dropped token row in the ANN index too (ADR-0034).
1256            index_delete_point(handle, &token_id(doc_id, j));
1257        }
1258        let payload_bytes = serde_json::to_vec(payload)?;
1259        for (j, vector) in vectors.iter().enumerate() {
1260            // The payload is stored once, on the anchor token; the rest carry none.
1261            let bytes: &[u8] = if j == 0 {
1262                payload_bytes.as_slice()
1263            } else {
1264                &[]
1265            };
1266            self.store
1267                .upsert(handle.id, &token_id(doc_id, j), vector, bytes)?;
1268            // Fold the token row into the ANN index incrementally instead of
1269            // marking the whole collection stale (ADR-0034); the underlying index
1270            // consolidates by a rebuild past its own churn threshold.
1271            index_upsert_point(handle, &token_id(doc_id, j), vector)?;
1272        }
1273        if let Some(docs) = handle.docs.as_mut() {
1274            docs.insert(doc_id.to_owned(), vectors.len() as u32);
1275        }
1276        Ok(())
1277    }
1278
1279    /// Search a multi-vector collection by a set of query token vectors, ranking
1280    /// documents by MaxSim late interaction (ADR-0028). At or below the exact-scan
1281    /// threshold every document is scored exactly; above it, candidates are
1282    /// generated by nearest-neighbour search over the token pool (recall tuned by
1283    /// `ef_search`) and re-ranked exactly. An optional `filter` is applied to each
1284    /// document's payload, exactly. A document has no single vector, so `with_payload`
1285    /// returns the anchor payload and `with_vector` returns the token vectors.
1286    pub fn search_multi_vector(
1287        &mut self,
1288        collection: &str,
1289        query_tokens: &[Vec<f32>],
1290        params: &SearchParams,
1291    ) -> Result<Vec<DocumentMatch>> {
1292        // Embedded read-your-writes: rebuild a deferred index synchronously, then
1293        // read the fresh snapshot (the server serves the prior snapshot and rebuilds
1294        // off-lock — ADR-0062).
1295        self.ensure_indexed(collection)?;
1296        self.search_multi_vector_snapshot(collection, query_tokens, params)
1297    }
1298
1299    /// Multi-vector (late-interaction) search over the collection's current
1300    /// immutable snapshot (`&self`, so readers run concurrently). A small corpus is
1301    /// scored exactly; a large corpus draws candidates from the ANN index, serving
1302    /// the **prior** snapshot when a write deferred its rebuild (snapshot-isolated,
1303    /// slightly stale — ADR-0062/0053); the caller schedules the rebuild via
1304    /// [`Database::needs_rebuild`].
1305    pub fn search_multi_vector_snapshot(
1306        &self,
1307        collection: &str,
1308        query_tokens: &[Vec<f32>],
1309        params: &SearchParams,
1310    ) -> Result<Vec<DocumentMatch>> {
1311        require_multivector(self.handle(collection)?)?;
1312        let dim = self.handle(collection)?.descriptor.dim as usize;
1313        if query_tokens.is_empty() {
1314            return Ok(Vec::new());
1315        }
1316        if query_tokens.iter().any(|v| v.len() != dim) {
1317            return Err(Error::Unsupported(
1318                "every query token must match the collection dimensionality",
1319            ));
1320        }
1321
1322        let doc_count = self
1323            .handle(collection)?
1324            .docs
1325            .as_ref()
1326            .map_or(0, BTreeMap::len);
1327        let candidates: Vec<String> = if doc_count <= MULTIVECTOR_EXACT_DOC_THRESHOLD {
1328            // Exact: score every document. No ANN index needed.
1329            self.handle(collection)?
1330                .docs
1331                .as_ref()
1332                .map(|d| d.keys().cloned().collect())
1333                .unwrap_or_default()
1334        } else {
1335            // Large corpus: generate candidates from the token pool. A prior write
1336            // may have deferred the ANN index rebuild; serve the prior snapshot (the
1337            // server schedules an off-lock rebuild — ADR-0062).
1338            let handle = self.handle(collection)?;
1339            let per_token_k = params
1340                .k
1341                .saturating_mul(MULTIVECTOR_CANDIDATE_FACTOR)
1342                .max(params.ef_search);
1343            let mut set = BTreeSet::new();
1344            for token in query_tokens {
1345                for neighbor in handle.index.search(token, per_token_k, params.ef_search)? {
1346                    if let Some(ext) = handle.int_to_ext.get(neighbor.id as usize)
1347                        && let Some((doc, _)) = parse_token_id(ext)
1348                    {
1349                        set.insert(doc.to_owned());
1350                    }
1351                }
1352            }
1353            set.into_iter().collect()
1354        };
1355
1356        // Re-rank the candidate documents by exact MaxSim over all their tokens.
1357        let handle = self.handle(collection)?;
1358        let cid = handle.id;
1359        let metric = to_index_metric(handle.descriptor.metric);
1360        let mut scored: Vec<ScoredDocument> = Vec::new();
1361        for doc in &candidates {
1362            let count = handle
1363                .docs
1364                .as_ref()
1365                .and_then(|d| d.get(doc))
1366                .copied()
1367                .unwrap_or(0) as usize;
1368            let (tokens, payload) = self.gather_document(cid, doc, count)?;
1369            if tokens.is_empty() {
1370                continue;
1371            }
1372            if let Some(filter) = &params.filter {
1373                let value = payload.clone().unwrap_or(Value::Null);
1374                if !filter.matches(&value) {
1375                    continue;
1376                }
1377            }
1378            let score = max_sim(metric, query_tokens, &tokens);
1379            let vectors = params.with_vector.then_some(tokens);
1380            scored.push((score, doc.clone(), payload, vectors));
1381        }
1382        // Higher MaxSim first; ties broken by id for a deterministic order.
1383        scored.sort_by(|a, b| b.0.total_cmp(&a.0).then_with(|| a.1.cmp(&b.1)));
1384        scored.truncate(params.k);
1385        Ok(scored
1386            .into_iter()
1387            .map(|(score, id, payload, vectors)| DocumentMatch {
1388                id,
1389                score,
1390                payload: params.with_payload.then_some(payload).flatten(),
1391                vectors,
1392            })
1393            .collect())
1394    }
1395
1396    /// Fetch a multi-vector document by id: its anchor payload and, if
1397    /// `with_vectors`, its token vectors. `None` if the document does not exist.
1398    pub fn get_document(
1399        &self,
1400        collection: &str,
1401        doc_id: &str,
1402        with_vectors: bool,
1403    ) -> Result<Option<DocumentMatch>> {
1404        let handle = self.handle(collection)?;
1405        require_multivector(handle)?;
1406        let Some(&count) = handle.docs.as_ref().and_then(|d| d.get(doc_id)) else {
1407            return Ok(None);
1408        };
1409        let (tokens, payload) = self.gather_document(handle.id, doc_id, count as usize)?;
1410        if tokens.is_empty() {
1411            return Ok(None);
1412        }
1413        Ok(Some(DocumentMatch {
1414            id: doc_id.to_owned(),
1415            score: 0.0,
1416            payload,
1417            vectors: with_vectors.then_some(tokens),
1418        }))
1419    }
1420
1421    /// Delete a multi-vector document and all of its token rows. Returns whether it
1422    /// existed.
1423    pub fn delete_document(&mut self, collection: &str, doc_id: &str) -> Result<bool> {
1424        let handle = self
1425            .collections
1426            .get_mut(collection)
1427            .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
1428        require_multivector(handle)?;
1429        let Some(count) = handle.docs.as_ref().and_then(|d| d.get(doc_id)).copied() else {
1430            return Ok(false);
1431        };
1432        for j in 0..count as usize {
1433            self.store.delete(handle.id, &token_id(doc_id, j))?;
1434            // Tombstone each token row in the ANN index incrementally (ADR-0034).
1435            index_delete_point(handle, &token_id(doc_id, j));
1436        }
1437        if let Some(docs) = handle.docs.as_mut() {
1438            docs.remove(doc_id);
1439        }
1440        Ok(true)
1441    }
1442
1443    /// The number of documents in a multi-vector collection. Errors if the
1444    /// collection is single-vector.
1445    pub fn document_count(&self, collection: &str) -> Result<usize> {
1446        let handle = self.handle(collection)?;
1447        require_multivector(handle)?;
1448        Ok(handle.docs.as_ref().map_or(0, BTreeMap::len))
1449    }
1450
1451    // Read a document's token vectors (in ordinal order) and its anchor payload
1452    // from the store. Missing token rows are skipped, so a torn document yields a
1453    // short token list the caller treats as empty.
1454    fn gather_document(
1455        &self,
1456        cid: CollectionId,
1457        doc_id: &str,
1458        count: usize,
1459    ) -> Result<(Vec<Vec<f32>>, Option<Value>)> {
1460        let mut tokens = Vec::with_capacity(count);
1461        let mut payload: Option<Value> = None;
1462        for j in 0..count {
1463            let Some(record) = self.store.get(cid, &token_id(doc_id, j))? else {
1464                continue;
1465            };
1466            if j == 0 && !record.payload.is_empty() {
1467                payload = Some(serde_json::from_slice(&record.payload)?);
1468            }
1469            tokens.push(record.vector);
1470        }
1471        Ok((tokens, payload))
1472    }
1473
1474    /// Flush a durable checkpoint of all collections, capturing a durable
1475    /// snapshot of each built, up-to-date IVF index (ADR-0025) so it reloads on
1476    /// open instead of rebuilding. Other index kinds, and a stale or unbuilt IVF,
1477    /// are rebuilt on open.
1478    pub fn checkpoint(&mut self) -> Result<()> {
1479        let mut snapshots: HashMap<CollectionId, Vec<u8>> = HashMap::new();
1480        for handle in self.collections.values() {
1481            if handle.stale {
1482                continue;
1483            }
1484            if let CollectionIndex::Ivf(Some(ivf)) = &handle.index {
1485                if ivf.is_empty() {
1486                    continue;
1487                }
1488                let envelope = IndexEnvelope {
1489                    version: INDEX_ENVELOPE_VERSION,
1490                    int_to_ext: handle.int_to_ext.clone(),
1491                    ivf: ivf.snapshot()?,
1492                };
1493                snapshots.insert(handle.id, postcard::to_allocvec(&envelope)?);
1494            }
1495        }
1496        self.store.checkpoint_with_index_snapshots(&snapshots)?;
1497        Ok(())
1498    }
1499
1500    /// Compact every collection with reclaimable space, merging its sealed
1501    /// segments and dropping deleted/shadowed rows. Crash-safe; a no-op for
1502    /// collections with nothing to reclaim.
1503    pub fn compact(&mut self) -> Result<()> {
1504        Ok(self.store.compact()?)
1505    }
1506
1507    /// The manifest version — the catalog generation a snapshot captures
1508    /// (ADR-0050). Surfaced as snapshot-relevant status in `database_stats`.
1509    #[must_use]
1510    pub fn manifest_version(&self) -> u64 {
1511        self.store.manifest_version()
1512    }
1513
1514    /// Best-effort total on-disk size of the data directory, in bytes — what a
1515    /// full snapshot would copy (ADR-0050). Unreadable entries are skipped.
1516    #[must_use]
1517    pub fn disk_usage_bytes(&self) -> u64 {
1518        dir_size(self.store.dir())
1519    }
1520
1521    /// Take a consistent online snapshot of the whole database into `dest`
1522    /// (which must not already exist), returning what was captured (ADR-0050).
1523    ///
1524    /// The writer lock is held for the duration: `checkpoint` seals the active
1525    /// buffer into segments and advances the WAL floor to the head, then the
1526    /// data directory is byte-copied. Opening `dest` afterwards replays an empty
1527    /// WAL tail and reconstructs the database exactly as of this call.
1528    ///
1529    /// # Errors
1530    /// [`Error::Core`] if `dest` already exists, or on any I/O error during the
1531    /// checkpoint or the copy.
1532    pub fn snapshot(&mut self, dest: &Path) -> Result<SnapshotInfo> {
1533        if dest.exists() {
1534            return Err(Error::Core(quiver_core::CoreError::AlreadyExists(
1535                dest.display().to_string(),
1536            )));
1537        }
1538        // Consistency anchor: flush to segments and advance the WAL floor so the
1539        // copied tree opens with no replay (ADR-0050).
1540        self.checkpoint()?;
1541        let (files, bytes) = copy_tree(self.store.dir(), dest)?;
1542        // ponytail: flush the snapshot root's metadata only; a backup target is
1543        // re-takeable, so per-file fsync isn't warranted. Add it if snapshots
1544        // must survive an immediate post-copy crash.
1545        let _ = std::fs::File::open(dest).and_then(|f| f.sync_all());
1546        Ok(SnapshotInfo {
1547            manifest_version: self.store.manifest_version(),
1548            files,
1549            bytes,
1550        })
1551    }
1552
1553    fn handle(&self, name: &str) -> Result<&CollectionHandle> {
1554        self.collections
1555            .get(name)
1556            .ok_or_else(|| Error::CollectionNotFound(name.to_owned()))
1557    }
1558}
1559
1560/// Restore a snapshot directory `src` (produced by [`Database::snapshot`]) into a
1561/// fresh `dest` directory, leaving it ready for the caller to open with the same
1562/// keyring/codec the snapshot was written under (ADR-0050).
1563///
1564/// `dest` must not already exist — restore never overwrites a live data
1565/// directory. The real integrity check is the caller's subsequent open (which,
1566/// for an encrypted store, is the only party holding the key); this function
1567/// only verifies the source looks like a snapshot and copies it.
1568///
1569/// # Errors
1570/// [`Error::Core`] if `dest` exists, `src` is not a snapshot (no `CURRENT`), or
1571/// on any I/O error during the copy.
1572pub fn restore_snapshot(src: &Path, dest: &Path) -> Result<SnapshotInfo> {
1573    if dest.exists() {
1574        return Err(Error::Core(quiver_core::CoreError::AlreadyExists(
1575            dest.display().to_string(),
1576        )));
1577    }
1578    if !src.join("CURRENT").exists() {
1579        return Err(Error::Core(quiver_core::CoreError::InvalidArgument(
1580            format!("{} is not a snapshot (no CURRENT)", src.display()),
1581        )));
1582    }
1583    let (files, bytes) = copy_tree(src, dest)?;
1584    Ok(SnapshotInfo {
1585        // The restored directory's manifest version is whatever the snapshot
1586        // carried; report 0 since we don't re-open here to read it (the caller's
1587        // open is authoritative). `files`/`bytes` reflect the copy.
1588        manifest_version: 0,
1589        files,
1590        bytes,
1591    })
1592}
1593
1594// Recursively copy `src` into `dst`, returning `(files, bytes)`. I/O errors are
1595// tagged with the offending path. The data directory contains only files and
1596// directories (no symlinks), so a plain walk is complete.
1597fn copy_tree(src: &Path, dst: &Path) -> Result<(u64, u64)> {
1598    std::fs::create_dir_all(dst).map_err(|e| quiver_core::CoreError::io(dst, e))?;
1599    let mut files = 0u64;
1600    let mut bytes = 0u64;
1601    for entry in std::fs::read_dir(src).map_err(|e| quiver_core::CoreError::io(src, e))? {
1602        let entry = entry.map_err(|e| quiver_core::CoreError::io(src, e))?;
1603        let from = entry.path();
1604        let to = dst.join(entry.file_name());
1605        let ft = entry
1606            .file_type()
1607            .map_err(|e| quiver_core::CoreError::io(&from, e))?;
1608        if ft.is_dir() {
1609            let (f, b) = copy_tree(&from, &to)?;
1610            files += f;
1611            bytes += b;
1612        } else {
1613            let n = std::fs::copy(&from, &to).map_err(|e| quiver_core::CoreError::io(&from, e))?;
1614            files += 1;
1615            bytes += n;
1616        }
1617    }
1618    Ok((files, bytes))
1619}
1620
1621// Best-effort recursive on-disk size (bytes) of `dir`; unreadable entries are
1622// skipped so a stats read never fails on a transient error.
1623fn dir_size(dir: &Path) -> u64 {
1624    let mut total = 0u64;
1625    let Ok(rd) = std::fs::read_dir(dir) else {
1626        return total;
1627    };
1628    for entry in rd.flatten() {
1629        let Ok(ft) = entry.file_type() else { continue };
1630        if ft.is_dir() {
1631            total += dir_size(&entry.path());
1632        } else if let Ok(meta) = entry.metadata() {
1633            total += meta.len();
1634        }
1635    }
1636    total
1637}
1638
1639// The byte separating a multi-vector document id from a token ordinal in a token
1640// row's external id (`<doc-id><US><ordinal>`): the ASCII Unit Separator, which is
1641// disallowed in user document ids (ADR-0028).
1642const DOC_TOKEN_SEP: char = '\u{1f}';
1643
1644// At or below this document count a multi-vector search scores every document
1645// exactly; above it, nearest-neighbour candidate generation over the token pool
1646// kicks in (mirrors the single-vector planner's full-scan threshold).
1647const MULTIVECTOR_EXACT_DOC_THRESHOLD: usize = 10_000;
1648
1649// Per-query-token candidate breadth for the large-corpus path: each query token
1650// retrieves about `k × this` nearest token rows before the documents are unioned.
1651const MULTIVECTOR_CANDIDATE_FACTOR: usize = 4;
1652
1653// The external id of a multi-vector document's `ordinal`-th token row.
1654fn token_id(doc_id: &str, ordinal: usize) -> String {
1655    format!("{doc_id}{DOC_TOKEN_SEP}{ordinal}")
1656}
1657
1658// Split a token row's external id back into its document id and ordinal, or `None`
1659// if it is not a token id. Splits from the right, so a document id (which cannot
1660// contain the separator) is recovered intact.
1661fn parse_token_id(ext: &str) -> Option<(&str, u32)> {
1662    let (doc, ordinal) = ext.rsplit_once(DOC_TOKEN_SEP)?;
1663    Some((doc, ordinal.parse().ok()?))
1664}
1665
1666// Reject the single-vector API on a multi-vector collection.
1667fn require_single_vector(handle: &CollectionHandle) -> Result<()> {
1668    if handle.descriptor.multivector {
1669        Err(Error::Unsupported(
1670            "collection is multi-vector; use upsert_document / search_multi_vector",
1671        ))
1672    } else {
1673        Ok(())
1674    }
1675}
1676
1677// Reject the document API on a single-vector collection.
1678fn require_multivector(handle: &CollectionHandle) -> Result<()> {
1679    if handle.descriptor.multivector {
1680        Ok(())
1681    } else {
1682        Err(Error::Unsupported(
1683            "collection is single-vector; use upsert / search",
1684        ))
1685    }
1686}
1687
1688// Reject server-side ranked search on a client-side-encrypted collection: the
1689// server holds only opaque ciphertext and cannot rank it. The client fetches the
1690// entitled set (see `Database::fetch`) and ranks locally (ADR-0032).
1691fn require_server_searchable(handle: &CollectionHandle) -> Result<()> {
1692    if handle.descriptor.vector_encryption == VectorEncryption::ClientSide {
1693        Err(Error::Unsupported(
1694            "collection is client-side encrypted; the server cannot rank opaque vectors — \
1695             fetch points and rank client-side",
1696        ))
1697    } else {
1698        Ok(())
1699    }
1700}
1701
1702fn to_index_metric(metric: DistanceMetric) -> Metric {
1703    match metric {
1704        DistanceMetric::Dot => Metric::Dot,
1705        DistanceMetric::Cosine => Metric::Cosine,
1706        DistanceMetric::L2 => Metric::L2,
1707    }
1708}
1709
1710// Reject index/metric combinations the engine cannot serve.
1711fn validate_index(descriptor: &Descriptor) -> Result<()> {
1712    // Late interaction (MaxSim) is a similarity, so multi-vector collections need a
1713    // similarity metric.
1714    if descriptor.multivector && descriptor.metric == DistanceMetric::L2 {
1715        return Err(Error::Unsupported(
1716            "multi-vector collections require a similarity metric (cosine or dot)",
1717        ));
1718    }
1719    // Client-side opaque encryption (ADR-0032) is searched by the client, not the
1720    // server, so it has no metric or index constraints — but it cannot combine with
1721    // the multi-vector document layout.
1722    if descriptor.vector_encryption == VectorEncryption::ClientSide {
1723        if descriptor.multivector {
1724            return Err(Error::Unsupported(
1725                "client-side vector encryption is not supported for multi-vector collections",
1726            ));
1727        }
1728        return Ok(());
1729    }
1730    // DCPE (ADR-0031) preserves Euclidean distance comparison; the secret scaling
1731    // changes vector norms, so cosine and dot orderings are not preserved.
1732    if descriptor.vector_encryption == VectorEncryption::Dcpe
1733        && descriptor.metric != DistanceMetric::L2
1734    {
1735        return Err(Error::Unsupported(
1736            "dcpe-encrypted collections require the l2 metric",
1737        ));
1738    }
1739    // ColBERT is a late-interaction token-pool index (ADR-0034): valid only for a
1740    // multi-vector collection (which already requires a similarity metric).
1741    if descriptor.index.kind == IndexKind::Colbert && !descriptor.multivector {
1742        return Err(Error::Unsupported(
1743            "the colbert index is only for multi-vector collections",
1744        ));
1745    }
1746    match descriptor.index.kind {
1747        IndexKind::Vamana | IndexKind::Ivf | IndexKind::DiskVamana
1748            if descriptor.metric == DistanceMetric::Dot =>
1749        {
1750            Err(Error::Unsupported(
1751                "vamana, ivf, and the disk index support l2 and cosine; use hnsw for dot",
1752            ))
1753        }
1754        _ => Ok(()),
1755    }
1756}
1757
1758// An empty index of the kind the descriptor selects. Batch kinds start unbuilt.
1759fn empty_index(descriptor: &Descriptor) -> CollectionIndex {
1760    if descriptor.vector_encryption == VectorEncryption::ClientSide {
1761        return CollectionIndex::None;
1762    }
1763    match descriptor.index.kind {
1764        IndexKind::Vamana => CollectionIndex::Vamana(None),
1765        IndexKind::DiskVamana => CollectionIndex::Disk(None),
1766        IndexKind::Ivf => CollectionIndex::Ivf(None),
1767        IndexKind::Colbert => CollectionIndex::Colbert(None),
1768        _ => CollectionIndex::Hnsw(Hnsw::new(
1769            descriptor.dim as usize,
1770            to_index_metric(descriptor.metric),
1771            HnswConfig::default(),
1772        )),
1773    }
1774}
1775
1776// A product-quantization subspace count that divides `dim`, targeting roughly
1777// eight dimensions per subspace; falls back to one whole-vector codebook.
1778fn default_pq_m(dim: usize) -> usize {
1779    let target = (dim / 8).max(1);
1780    (1..=target)
1781        .rev()
1782        .find(|&m| dim.is_multiple_of(m))
1783        .unwrap_or(1)
1784}
1785
1786// Build the descriptor's index over `ids` (internal 0..n) and their flat vectors.
1787// Fixed seed for codebook training, so a collection's disk index is reproducible.
1788const PQ_SEED: u64 = 0x5176_5044_5141_5453;
1789// The disk index artifact, overwritten in place each rebuild; the caller drops
1790// the previous handle (unmapping the file) first.
1791const DISK_INDEX_FILE: &str = "vamana.qvx";
1792
1793fn build_index(
1794    store: &Store,
1795    cid: CollectionId,
1796    descriptor: &Descriptor,
1797    ids: &[u64],
1798    flat: &[f32],
1799) -> Result<CollectionIndex> {
1800    Ok(match build_in_memory_index(descriptor, ids, flat)? {
1801        Some(index) => index,
1802        None => {
1803            let (graph, pq) = build_disk_graph_pq(descriptor, ids, flat)?;
1804            CollectionIndex::Disk(Some(FreshDiskVamana::new(write_disk_index(
1805                store, cid, &graph, &pq,
1806            )?)?))
1807        }
1808    })
1809}
1810
1811// Build the in-memory index for a collection, or `None` for the disk-resident kind
1812// whose artifact must be sealed through the store. Pure given the vectors (no
1813// store, no I/O), so the off-lock rebuild (ADR-0062) calls it without a lock; the
1814// synchronous `build_index` and disk path layer the store I/O on top.
1815fn build_in_memory_index(
1816    descriptor: &Descriptor,
1817    ids: &[u64],
1818    flat: &[f32],
1819) -> Result<Option<CollectionIndex>> {
1820    // Client-side-encrypted collections have no server-side index (ADR-0032): the
1821    // server stores opaque ciphertext it never ranks.
1822    if descriptor.vector_encryption == VectorEncryption::ClientSide {
1823        return Ok(Some(CollectionIndex::None));
1824    }
1825    let dim = descriptor.dim as usize;
1826    let metric = to_index_metric(descriptor.metric);
1827    Ok(Some(match descriptor.index.kind {
1828        IndexKind::Vamana => CollectionIndex::Vamana(Some(FreshVamana::new(Vamana::build(
1829            ids,
1830            flat,
1831            dim,
1832            metric,
1833            VamanaConfig::default(),
1834        )?)?)),
1835        // The disk-resident artifact is sealed through the store, not here.
1836        IndexKind::DiskVamana => return Ok(None),
1837        IndexKind::Ivf => {
1838            let cfg = IvfConfig {
1839                quantization: descriptor.index.pq_subspaces.map(|m| m as usize),
1840                ..IvfConfig::default()
1841            };
1842            CollectionIndex::Ivf(Some(Ivf::build(ids, flat, dim, metric, cfg)?))
1843        }
1844        IndexKind::Colbert => {
1845            // Coarse centroids scale ~√(tokens); probe a quarter of them (PLAID),
1846            // and PQ the residual with the same subspace default as the disk path.
1847            let n = ids.len();
1848            let n_centroids = ((n as f64).sqrt().ceil() as usize).clamp(1, 4096);
1849            let cfg = ColbertConfig {
1850                n_centroids,
1851                n_probe: n_centroids.div_ceil(4).clamp(1, n_centroids),
1852                pq_subspaces: descriptor
1853                    .index
1854                    .pq_subspaces
1855                    .map_or_else(|| default_pq_m(dim), |m| m as usize),
1856                seed: PQ_SEED,
1857            };
1858            CollectionIndex::Colbert(Some(ColbertIndex::build(ids, flat, dim, metric, cfg)?))
1859        }
1860        _ => {
1861            let mut h = Hnsw::new(dim, metric, HnswConfig::default());
1862            for (i, &id) in ids.iter().enumerate() {
1863                h.insert(id, &flat[i * dim..(i + 1) * dim])?;
1864            }
1865            CollectionIndex::Hnsw(h)
1866        }
1867    }))
1868}
1869
1870// Build the Vamana graph + PQ codebook for the disk-resident index. Pure (no
1871// store, no I/O), so the off-lock rebuild (ADR-0062) does the expensive graph build
1872// and PQ training without a lock; `write_disk_index` then seals the result.
1873fn build_disk_graph_pq(
1874    descriptor: &Descriptor,
1875    ids: &[u64],
1876    flat: &[f32],
1877) -> Result<(Vamana, ProductQuantizer)> {
1878    let dim = descriptor.dim as usize;
1879    let metric = to_index_metric(descriptor.metric);
1880    let graph = Vamana::build(ids, flat, dim, metric, VamanaConfig::default())?;
1881    let m = descriptor
1882        .index
1883        .pq_subspaces
1884        .map_or_else(|| default_pq_m(dim), |x| x as usize);
1885    let pq = ProductQuantizer::train(flat, ids.len(), dim, m, metric, PQ_SEED)?;
1886    Ok((graph, pq))
1887}
1888
1889// Seal a prebuilt disk artifact under the collection's index dir with the store's
1890// codec, and open it for queries. Overwrites the artifact in place, so the caller
1891// must drop any prior `DiskVamana` for this collection first (its mmap assumes an
1892// immutable file).
1893fn write_disk_index(
1894    store: &Store,
1895    cid: CollectionId,
1896    graph: &Vamana,
1897    pq: &ProductQuantizer,
1898) -> Result<DiskVamana> {
1899    let dir = store.index_dir(cid);
1900    std::fs::create_dir_all(&dir).map_err(quiver_index::DiskError::Io)?;
1901    let path = dir.join(DISK_INDEX_FILE);
1902    // Seal the index artifact with the collection's own codec (its DEK under an
1903    // envelope key-ring), so a crypto-shred of the collection also makes its index
1904    // unreadable. The same owned handle writes and then mmap-opens it.
1905    let codec = store.collection_codec_clone(cid)?;
1906    quiver_index::disk::write(&path, graph, pq, codec.as_ref())?;
1907    Ok(DiskVamana::open(&path, codec)?)
1908}
1909
1910// Load a collection's index on open: restore the durable IVF snapshot and replay
1911// the post-checkpoint tail (ADR-0025) when one is present and intact, otherwise
1912// rebuild from the store. The snapshot is only a fast path — any problem reading
1913// or restoring it falls back to the authoritative rebuild.
1914fn load_index(store: &Store, handle: &mut CollectionHandle) -> Result<()> {
1915    // Multi-vector collections always rebuild on open, so the document grouping is
1916    // derived from the live rows; the IVF snapshot fast-path stays single-vector.
1917    if !handle.descriptor.multivector
1918        && handle.descriptor.index.kind == IndexKind::Ivf
1919        && let Ok(Some(blob)) = store.read_index_snapshot(handle.id)
1920        && restore_ivf_snapshot(store, handle, &blob).is_ok()
1921    {
1922        return Ok(());
1923    }
1924    rebuild_index(store, handle)
1925}
1926
1927// Restore an IVF from its snapshot envelope and catch it up to the store's
1928// current state by replaying the post-checkpoint tail (ADR-0025). Tombstoned ids
1929// are removed before the active upserts are applied, so a row shadowed this
1930// window (present in both) ends with its new vector.
1931fn restore_ivf_snapshot(store: &Store, handle: &mut CollectionHandle, blob: &[u8]) -> Result<()> {
1932    let envelope: IndexEnvelope = postcard::from_bytes(blob)?;
1933    if envelope.version != INDEX_ENVELOPE_VERSION {
1934        return Err(Error::Unsupported(
1935            "unsupported index snapshot envelope version",
1936        ));
1937    }
1938    let ivf = Ivf::restore(&envelope.ivf)?;
1939    handle.ext_to_int = envelope
1940        .int_to_ext
1941        .iter()
1942        .enumerate()
1943        .map(|(i, ext)| (ext.clone(), i as u64))
1944        .collect();
1945    handle.int_to_ext = envelope.int_to_ext;
1946    handle.index = CollectionIndex::Ivf(Some(ivf));
1947    handle.stale = false;
1948
1949    let tail = store.recovery_tail(handle.id)?;
1950    for ext in &tail.deleted {
1951        let Some(&internal) = handle.ext_to_int.get(ext) else {
1952            continue;
1953        };
1954        if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
1955            ivf.remove(internal);
1956        }
1957    }
1958    for (ext, record) in tail.upserts {
1959        let internal = match handle.ext_to_int.get(&ext) {
1960            Some(&i) => i,
1961            None => {
1962                let i = handle.int_to_ext.len() as u64;
1963                handle.ext_to_int.insert(ext.clone(), i);
1964                handle.int_to_ext.push(ext);
1965                i
1966            }
1967        };
1968        if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
1969            ivf.insert(internal, &record.vector)?;
1970        }
1971    }
1972    Ok(())
1973}
1974
1975// Fold one point write (`ext_id` → `vector`) into a built index incrementally,
1976// or mark the handle stale to defer to a rebuild when the kind cannot absorb it
1977// in place. HNSW absorbs a brand-new id but cannot update one (a known id falls
1978// to a rebuild); a built/trained IVF inserts or replaces any id (ADR-0023); a
1979// built FreshDiskANN graph appends to its delta and tombstones the prior copy on
1980// an update, consolidating past its churn threshold (ADR-0033). Shared by the
1981// single-vector `upsert` and each token row of a multi-vector `upsert_document`
1982// (ADR-0034). A no-op once the handle is stale (a rebuild is already pending).
1983fn index_upsert_point(handle: &mut CollectionHandle, ext_id: &str, vector: &[f32]) -> Result<()> {
1984    // Every write bumps the generation, even one skipped here because a rebuild is
1985    // already pending — an in-flight off-lock rebuild must still notice it (ADR-0062).
1986    bump_write_gen(handle);
1987    if handle.stale {
1988        return Ok(());
1989    }
1990    let known = handle.ext_to_int.contains_key(ext_id);
1991    let is_hnsw = matches!(handle.index, CollectionIndex::Hnsw(_));
1992    let is_live_ivf = matches!(&handle.index, CollectionIndex::Ivf(Some(ivf)) if !ivf.is_empty());
1993    let is_live_graph = matches!(
1994        handle.index,
1995        CollectionIndex::Vamana(Some(_)) | CollectionIndex::Disk(Some(_))
1996    );
1997    let is_live_colbert = matches!(handle.index, CollectionIndex::Colbert(Some(_)));
1998    if is_hnsw && !known {
1999        let internal = handle.int_to_ext.len() as u64;
2000        if let CollectionIndex::Hnsw(h) = &mut handle.index {
2001            h.insert(internal, vector)?;
2002        }
2003        handle.ext_to_int.insert(ext_id.to_owned(), internal);
2004        handle.int_to_ext.push(ext_id.to_owned());
2005    } else if is_live_ivf {
2006        // Reuse the internal id for an in-place update; allocate a fresh, dense one
2007        // for a new id (so `int_to_ext` stays index-addressable).
2008        let internal = if known {
2009            handle.ext_to_int[ext_id]
2010        } else {
2011            let i = handle.int_to_ext.len() as u64;
2012            handle.ext_to_int.insert(ext_id.to_owned(), i);
2013            handle.int_to_ext.push(ext_id.to_owned());
2014            i
2015        };
2016        if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2017            ivf.insert(internal, vector)?;
2018        }
2019    } else if is_live_graph {
2020        // A graph cannot update a node in place: append a new delta node under a
2021        // fresh internal id and tombstone the prior copy on an update (ADR-0033).
2022        let old = handle.ext_to_int.get(ext_id).copied();
2023        let internal = handle.int_to_ext.len() as u64;
2024        let mut pending = 0.0;
2025        match &mut handle.index {
2026            CollectionIndex::Vamana(Some(fresh)) => {
2027                if let Some(o) = old {
2028                    fresh.mark_deleted(o);
2029                }
2030                fresh.insert(internal, vector)?;
2031                pending = fresh.pending_fraction();
2032            }
2033            CollectionIndex::Disk(Some(fresh)) => {
2034                if let Some(o) = old {
2035                    fresh.mark_deleted(o);
2036                }
2037                fresh.insert(internal, vector)?;
2038                pending = fresh.pending_fraction();
2039            }
2040            _ => {}
2041        }
2042        handle.ext_to_int.insert(ext_id.to_owned(), internal);
2043        handle.int_to_ext.push(ext_id.to_owned());
2044        if pending >= GRAPH_REBUILD_PENDING_FRACTION {
2045            mark_stale(handle);
2046        }
2047    } else if is_live_colbert {
2048        // ColBERT appends a new token and tombstones the prior copy on an update —
2049        // its centroids are fixed until a rebuild (ADR-0034); the deletion fraction
2050        // drives consolidation, as for HNSW.
2051        let old = handle.ext_to_int.get(ext_id).copied();
2052        let internal = handle.int_to_ext.len() as u64;
2053        let mut crowded = false;
2054        if let CollectionIndex::Colbert(Some(c)) = &mut handle.index {
2055            if let Some(o) = old {
2056                c.mark_deleted(o);
2057            }
2058            c.insert(internal, vector)?;
2059            crowded = c.deleted_fraction() >= HNSW_REBUILD_DELETED_FRACTION;
2060        }
2061        handle.ext_to_int.insert(ext_id.to_owned(), internal);
2062        handle.int_to_ext.push(ext_id.to_owned());
2063        if crowded {
2064            mark_stale(handle);
2065        }
2066    } else {
2067        mark_stale(handle);
2068    }
2069    Ok(())
2070}
2071
2072// Tombstone one point (`ext_id`) from a built index incrementally, or mark the
2073// handle stale to defer to a rebuild. A built IVF removes in place (ADR-0023), a
2074// built HNSW soft-deletes (ADR-0026), and a built FreshDiskANN graph tombstones in
2075// its deletion set (ADR-0033); each amortizes a rebuild when tombstones dominate.
2076// The id→internal mapping is kept so a later re-insert allocates afresh. Shared by
2077// the single-vector `delete` and each token row of `delete_document` (ADR-0034).
2078fn index_delete_point(handle: &mut CollectionHandle, ext_id: &str) {
2079    // Every write bumps the generation, even one skipped here (see `index_upsert_point`).
2080    bump_write_gen(handle);
2081    if handle.stale {
2082        return;
2083    }
2084    let internal = handle.ext_to_int.get(ext_id).copied();
2085    let live_ivf = matches!(handle.index, CollectionIndex::Ivf(Some(_)));
2086    let live_hnsw = matches!(handle.index, CollectionIndex::Hnsw(_));
2087    let live_graph = matches!(
2088        handle.index,
2089        CollectionIndex::Vamana(Some(_)) | CollectionIndex::Disk(Some(_))
2090    );
2091    let live_colbert = matches!(handle.index, CollectionIndex::Colbert(Some(_)));
2092    match internal {
2093        Some(internal) if live_ivf => {
2094            if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2095                ivf.remove(internal);
2096            }
2097        }
2098        Some(internal) if live_hnsw => {
2099            let mut crowded = false;
2100            if let CollectionIndex::Hnsw(h) = &mut handle.index {
2101                h.mark_deleted(internal as u32);
2102                crowded = h.deleted_fraction() >= HNSW_REBUILD_DELETED_FRACTION;
2103            }
2104            if crowded {
2105                mark_stale(handle);
2106            }
2107        }
2108        Some(internal) if live_graph => {
2109            let mut crowded = false;
2110            match &mut handle.index {
2111                CollectionIndex::Vamana(Some(fresh)) => {
2112                    fresh.mark_deleted(internal);
2113                    crowded = fresh.pending_fraction() >= GRAPH_REBUILD_PENDING_FRACTION;
2114                }
2115                CollectionIndex::Disk(Some(fresh)) => {
2116                    fresh.mark_deleted(internal);
2117                    crowded = fresh.pending_fraction() >= GRAPH_REBUILD_PENDING_FRACTION;
2118                }
2119                _ => {}
2120            }
2121            if crowded {
2122                mark_stale(handle);
2123            }
2124        }
2125        Some(internal) if live_colbert => {
2126            let mut crowded = false;
2127            if let CollectionIndex::Colbert(Some(c)) = &mut handle.index {
2128                c.mark_deleted(internal);
2129                crowded = c.deleted_fraction() >= HNSW_REBUILD_DELETED_FRACTION;
2130            }
2131            if crowded {
2132                mark_stale(handle);
2133            }
2134        }
2135        _ => mark_stale(handle),
2136    }
2137}
2138
2139// The product of scanning a collection's live rows: the dense id maps + contiguous
2140// vectors, the multi-vector document grouping, and the derived sparse inverted
2141// index — everything a rebuild needs from the store, owned so the build can then
2142// proceed with no store and no lock (ADR-0062).
2143struct RebuildScan {
2144    int_to_ext: Vec<String>,
2145    ext_to_int: HashMap<String, u64>,
2146    flat: Vec<f32>,
2147    docs: Option<BTreeMap<String, u32>>,
2148    sparse: Option<SparseInvertedIndex>,
2149}
2150
2151// Scan a collection's live rows into a [`RebuildScan`]. Rebuilds the derived sparse
2152// inverted index (ADR-0045) and, for a multi-vector collection, the document
2153// grouping (doc id → token count) authoritatively from those rows. `&self` on the
2154// store, so it runs under a shared read lock.
2155fn scan_collection(store: &Store, handle: &CollectionHandle) -> Result<RebuildScan> {
2156    let multivector = handle.descriptor.multivector;
2157    let mut int_to_ext = Vec::new();
2158    let mut ext_to_int = HashMap::new();
2159    let mut flat: Vec<f32> = Vec::new();
2160    let mut docs: BTreeMap<String, u32> = BTreeMap::new();
2161    // Only single-vector, server-searchable collections carry a sparse index, and
2162    // only non-empty payloads are parsed, so non-sparse collections pay nothing.
2163    let mut sparse = uses_sparse_index(&handle.descriptor).then(SparseInvertedIndex::new);
2164    for (ext_id, record) in store.scan(handle.id)? {
2165        let internal = int_to_ext.len() as u64;
2166        flat.extend_from_slice(&record.vector);
2167        if multivector && let Some((doc, _)) = parse_token_id(&ext_id) {
2168            *docs.entry(doc.to_owned()).or_insert(0) += 1;
2169        }
2170        if let Some(idx) = sparse.as_mut()
2171            && let Some(sv) = sparse_vector_from_payload(&record.payload)
2172        {
2173            idx.upsert(&ext_id, &sv);
2174        }
2175        ext_to_int.insert(ext_id.clone(), internal);
2176        int_to_ext.push(ext_id);
2177    }
2178    Ok(RebuildScan {
2179        int_to_ext,
2180        ext_to_int,
2181        flat,
2182        docs: multivector.then_some(docs),
2183        sparse,
2184    })
2185}
2186
2187// Rebuild a collection's index from the store's current live rows, synchronously
2188// under `&mut self` — the embedded/open path. The server's runtime path builds
2189// off-lock instead (ADR-0062: `snapshot_rebuild_inputs` → `build` → `commit_rebuild`).
2190fn rebuild_index(store: &Store, handle: &mut CollectionHandle) -> Result<()> {
2191    let scan = scan_collection(store, handle)?;
2192    let ids: Vec<u64> = (0..scan.int_to_ext.len() as u64).collect();
2193    // Drop the previous index before rebuilding: a disk index `mmap`s a file we
2194    // are about to overwrite in place, and the mapping assumes an immutable file.
2195    handle.index = empty_index(&handle.descriptor);
2196    handle.index = build_index(store, handle.id, &handle.descriptor, &ids, &scan.flat)?;
2197    handle.int_to_ext = scan.int_to_ext;
2198    handle.ext_to_int = scan.ext_to_int;
2199    handle.docs = scan.docs;
2200    handle.sparse = scan.sparse;
2201    handle.stale = false;
2202    Ok(())
2203}
2204
2205/// A captured, owned snapshot of everything an off-lock rebuild needs (ADR-0062):
2206/// the scanned rows, the collection's descriptor, and the write generation at
2207/// capture time. Produced under the shared read lock by
2208/// [`Database::snapshot_rebuild_inputs`]; [`RebuildInputs::build`] then constructs
2209/// the new index with no lock held.
2210pub struct RebuildInputs {
2211    collection: String,
2212    descriptor: Descriptor,
2213    scan: RebuildScan,
2214    write_gen: u64,
2215}
2216
2217// The built product of a [`RebuildInputs`], ready to install. In-memory indexes are
2218// fully built; the disk-resident artifact carries its prebuilt graph + PQ, which
2219// `commit_rebuild` seals through the store under the brief write lock (the file
2220// write must not race the prior index's `mmap`).
2221enum RebuiltKind {
2222    Ready(Box<CollectionIndex>),
2223    Disk {
2224        graph: Box<Vamana>,
2225        pq: Box<ProductQuantizer>,
2226    },
2227}
2228
2229/// A new index built off-lock from a [`RebuildInputs`], ready for
2230/// [`Database::commit_rebuild`] to install under the brief write lock (ADR-0062).
2231pub struct RebuiltIndex {
2232    collection: String,
2233    kind: RebuiltKind,
2234    int_to_ext: Vec<String>,
2235    ext_to_int: HashMap<String, u64>,
2236    docs: Option<BTreeMap<String, u32>>,
2237    sparse: Option<SparseInvertedIndex>,
2238    write_gen: u64,
2239}
2240
2241impl RebuildInputs {
2242    /// Build the new index from the captured rows with **no lock held** — the
2243    /// expensive CPU work (graph build, PQ training, HNSW insertion) of an off-lock
2244    /// rebuild (ADR-0062). The disk artifact is built in memory here; its file is
2245    /// sealed later by `commit_rebuild` under the write lock.
2246    pub fn build(self) -> Result<RebuiltIndex> {
2247        let ids: Vec<u64> = (0..self.scan.int_to_ext.len() as u64).collect();
2248        let kind = match build_in_memory_index(&self.descriptor, &ids, &self.scan.flat)? {
2249            Some(index) => RebuiltKind::Ready(Box::new(index)),
2250            None => {
2251                let (graph, pq) = build_disk_graph_pq(&self.descriptor, &ids, &self.scan.flat)?;
2252                RebuiltKind::Disk {
2253                    graph: Box::new(graph),
2254                    pq: Box::new(pq),
2255                }
2256            }
2257        };
2258        Ok(RebuiltIndex {
2259            collection: self.collection,
2260            kind,
2261            int_to_ext: self.scan.int_to_ext,
2262            ext_to_int: self.scan.ext_to_int,
2263            docs: self.scan.docs,
2264            sparse: self.scan.sparse,
2265            write_gen: self.write_gen,
2266        })
2267    }
2268}
2269
2270// Extract a point's sparse vector from its serialized payload, if it carries one
2271// under `__quiver_sparse__` and the value deserializes. An empty payload or a
2272// missing/malformed sparse vector yields `None`.
2273fn sparse_vector_from_payload(payload: &[u8]) -> Option<SparseVector> {
2274    if payload.is_empty() {
2275        return None;
2276    }
2277    let value = serde_json::from_slice::<Value>(payload).ok()?;
2278    sparse_vector_from_value(&value)
2279}
2280
2281// As [`sparse_vector_from_payload`] but over an already-parsed payload `Value`
2282// (the upsert path has the payload before it is serialized). An explicit
2283// `__quiver_sparse__` vector wins; otherwise a `__quiver_text__` string is
2284// tokenized into a term-frequency vector (ADR-0046), so a point is searchable by
2285// BM25 over text alone.
2286fn sparse_vector_from_value(payload: &Value) -> Option<SparseVector> {
2287    if let Some(raw) = payload.get(SPARSE_KEY) {
2288        return serde_json::from_value::<SparseVector>(raw.clone()).ok();
2289    }
2290    let text = payload.get(TEXT_KEY)?.as_str()?;
2291    Some(text_to_sparse(text))
2292}
2293
2294// Maintain the derived sparse inverted index for one point write (ADR-0045): index
2295// the point's sparse vector, or drop any prior entry when the new payload no longer
2296// carries one (an update that removed it). A no-op when the collection has no
2297// inverted index, or when a rebuild is pending — the rebuild repopulates the index
2298// authoritatively from the store, exactly as it does the dense index.
2299fn sparse_index_upsert_point(handle: &mut CollectionHandle, ext_id: &str, payload: &Value) {
2300    if handle.stale {
2301        return;
2302    }
2303    let Some(idx) = handle.sparse.as_mut() else {
2304        return;
2305    };
2306    match sparse_vector_from_value(payload) {
2307        Some(sv) => idx.upsert(ext_id, &sv),
2308        None => {
2309            idx.remove(ext_id);
2310        }
2311    }
2312}
2313
2314// Drop one point from the derived sparse inverted index. A no-op when the
2315// collection has no inverted index.
2316fn sparse_index_delete_point(handle: &mut CollectionHandle, ext_id: &str) {
2317    if let Some(idx) = handle.sparse.as_mut() {
2318        idx.remove(ext_id);
2319    }
2320}
2321
2322// The set of live external ids guaranteed to contain every row the `filter`
2323// accepts (a sound superset), resolved through the collection's secondary
2324// indexes. `None` means the filter cannot be narrowed with the available indexes
2325// and the caller should post-filter instead; `Some(set)` is a candidate
2326// superset, and an empty set proves no row can match.
2327//
2328// Only indexable leaves — equality, `in`, and range comparisons on declared
2329// filterable fields of a matching type — constrain the set. Everything else
2330// (negation, existence, `ne`, non-indexed fields, type mismatches) is treated as
2331// unconstrained, which keeps the result a superset; exactness is restored when
2332// the caller re-checks the full `Filter` on each surviving candidate.
2333fn candidate_ids(
2334    store: &Store,
2335    cid: CollectionId,
2336    filter: &Filter,
2337    filterable: &[FilterableField],
2338) -> Result<Option<BTreeSet<String>>> {
2339    match filter {
2340        Filter::And(subs) => {
2341            // Intersect the constrained children; an unconstrained child (None)
2342            // is dropped, which only widens the set — still a superset.
2343            let mut acc: Option<BTreeSet<String>> = None;
2344            for sub in subs {
2345                if let Some(set) = candidate_ids(store, cid, sub, filterable)? {
2346                    acc = Some(match acc {
2347                        Some(existing) => existing.intersection(&set).cloned().collect(),
2348                        None => set,
2349                    });
2350                }
2351            }
2352            Ok(acc)
2353        }
2354        Filter::Or(subs) => {
2355            // The union of the children — but one unconstrained child makes the
2356            // whole disjunction unconstrained (its rows could be anything).
2357            let mut acc = BTreeSet::new();
2358            for sub in subs {
2359                match candidate_ids(store, cid, sub, filterable)? {
2360                    Some(set) => acc.extend(set),
2361                    None => return Ok(None),
2362                }
2363            }
2364            Ok(Some(acc))
2365        }
2366        // A negation cannot be narrowed to a superset with these indexes.
2367        Filter::Not(_) => Ok(None),
2368        // A leaf: indexable ⇒ its matching ids; otherwise unconstrained.
2369        leaf => match leaf_predicate(leaf, filterable) {
2370            Some(pred) => Ok(Some(store.matching_ids(cid, &pred)?.into_iter().collect())),
2371            None => Ok(None),
2372        },
2373    }
2374}
2375
2376// Map a single filter leaf to an indexable secondary-index predicate, if its
2377// field is declared filterable and the value types line up. Boolean nodes and
2378// predicates the secondary index cannot answer (`Ne`, `Exists`) return `None`.
2379fn leaf_predicate(filter: &Filter, filterable: &[FilterableField]) -> Option<SecPredicate> {
2380    let field_type = |field: &str| {
2381        filterable
2382            .iter()
2383            .find(|f| f.path == field)
2384            .map(|f| f.field_type)
2385    };
2386    match filter {
2387        Filter::Eq { field, value } => Some(SecPredicate::Eq {
2388            field: field.clone(),
2389            value: sec_value(field_type(field)?, value)?,
2390        }),
2391        Filter::In { field, values } => {
2392            let ft = field_type(field)?;
2393            // Every value must encode, or the `in` set would be understated and
2394            // the candidate superset unsound.
2395            let values: Option<Vec<SecValue>> = values.iter().map(|v| sec_value(ft, v)).collect();
2396            Some(SecPredicate::In {
2397                field: field.clone(),
2398                values: values?,
2399            })
2400        }
2401        Filter::Lt { field, value } => {
2402            one_sided_range(field, field_type(field)?, value, false, false)
2403        }
2404        Filter::Lte { field, value } => {
2405            one_sided_range(field, field_type(field)?, value, false, true)
2406        }
2407        Filter::Gt { field, value } => {
2408            one_sided_range(field, field_type(field)?, value, true, false)
2409        }
2410        Filter::Gte { field, value } => {
2411            one_sided_range(field, field_type(field)?, value, true, true)
2412        }
2413        _ => None,
2414    }
2415}
2416
2417// Build a one-sided range predicate from a comparison leaf. `is_lower` selects
2418// whether `value` is the lower (`Gt`/`Gte`) or upper (`Lt`/`Lte`) bound;
2419// `inclusive` is that bound's inclusivity. `None` on a type mismatch.
2420fn one_sided_range(
2421    field: &str,
2422    field_type: FieldType,
2423    value: &Value,
2424    is_lower: bool,
2425    inclusive: bool,
2426) -> Option<SecPredicate> {
2427    let v = sec_value(field_type, value)?;
2428    let (lo, hi, lo_inclusive, hi_inclusive) = if is_lower {
2429        (Some(v), None, inclusive, false)
2430    } else {
2431        (None, Some(v), false, inclusive)
2432    };
2433    Some(SecPredicate::Range {
2434        field: field.to_owned(),
2435        lo,
2436        hi,
2437        lo_inclusive,
2438        hi_inclusive,
2439    })
2440}
2441
2442// Encode a filter value as a typed secondary-index value, or `None` when the
2443// JSON type does not match the field's declared type (so it cannot be
2444// pre-filtered and the planner falls back to post-filtering).
2445fn sec_value(field_type: FieldType, value: &Value) -> Option<SecValue> {
2446    match (field_type, value) {
2447        (FieldType::Keyword, Value::String(s)) => Some(SecValue::Keyword(s.clone())),
2448        (FieldType::Numeric, Value::Number(n)) => n.as_f64().map(SecValue::Numeric),
2449        _ => None,
2450    }
2451}
2452
2453#[cfg(test)]
2454mod tests {
2455    use super::*;
2456    use serde_json::json;
2457
2458    fn desc() -> Descriptor {
2459        Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
2460    }
2461
2462    fn open(dir: &Path) -> Database {
2463        Database::open(dir).unwrap()
2464    }
2465
2466    #[test]
2467    fn hybrid_search_fuses_dense_and_sparse() {
2468        let tmp = tempfile::tempdir().unwrap();
2469        let mut db = open(tmp.path());
2470        db.create_collection("kb", desc()).unwrap();
2471        // "a" is the nearest dense neighbour of the query; "b" shares the query's
2472        // sparse terms; "c" is good on neither.
2473        db.upsert(
2474            "kb",
2475            "a",
2476            &[1.0, 0.0, 0.0, 0.0],
2477            &json!({ "__quiver_sparse__": { "indices": [100], "values": [0.1] } }),
2478        )
2479        .unwrap();
2480        db.upsert(
2481            "kb",
2482            "b",
2483            &[0.0, 1.0, 0.0, 0.0],
2484            &json!({ "__quiver_sparse__": { "indices": [1, 2], "values": [5.0, 5.0] } }),
2485        )
2486        .unwrap();
2487        db.upsert(
2488            "kb",
2489            "c",
2490            &[0.0, 0.0, 0.0, 1.0],
2491            &json!({ "__quiver_sparse__": { "indices": [9], "values": [1.0] } }),
2492        )
2493        .unwrap();
2494
2495        let dense_q = [1.0, 0.0, 0.0, 0.0];
2496        let sparse_q = SparseVector {
2497            indices: vec![1, 2],
2498            values: vec![1.0, 1.0],
2499        };
2500        let params = SearchParams {
2501            k: 3,
2502            ..SearchParams::default()
2503        };
2504
2505        // Hybrid: "a" (dense) and "b" (sparse) both rank above "c" (neither).
2506        let hits = db
2507            .hybrid_search(
2508                "kb",
2509                Some(&dense_q),
2510                Some(&sparse_q),
2511                None,
2512                &params,
2513                DEFAULT_RRF_K0,
2514            )
2515            .unwrap();
2516        let ids: Vec<&str> = hits.iter().map(|m| m.id.as_str()).collect();
2517        assert!(ids.contains(&"a") && ids.contains(&"b"), "got {ids:?}");
2518        assert_eq!(ids[2], "c", "c is worst on both sides; got {ids:?}");
2519
2520        // Pure sparse: only "b" shares the query's terms.
2521        let sparse_only = db
2522            .hybrid_search("kb", None, Some(&sparse_q), None, &params, DEFAULT_RRF_K0)
2523            .unwrap();
2524        assert_eq!(sparse_only[0].id, "b");
2525
2526        // Pure dense: "a" is nearest.
2527        let dense_only = db
2528            .hybrid_search("kb", Some(&dense_q), None, None, &params, DEFAULT_RRF_K0)
2529            .unwrap();
2530        assert_eq!(dense_only[0].id, "a");
2531
2532        // Neither query is an error.
2533        assert!(
2534            db.hybrid_search("kb", None, None, None, &params, DEFAULT_RRF_K0)
2535                .is_err()
2536        );
2537    }
2538
2539    // Pure-sparse hybrid result ids, in fused order.
2540    fn sparse_ids(db: &mut Database, q: &SparseVector) -> Vec<String> {
2541        let params = SearchParams {
2542            k: 10,
2543            ..SearchParams::default()
2544        };
2545        db.hybrid_search("kb", None, Some(q), None, &params, DEFAULT_RRF_K0)
2546            .unwrap()
2547            .into_iter()
2548            .map(|m| m.id)
2549            .collect()
2550    }
2551
2552    #[test]
2553    fn sparse_index_equals_the_store_scan_fallback() {
2554        let tmp = tempfile::tempdir().unwrap();
2555        let mut db = open(tmp.path());
2556        db.create_collection("kb", desc()).unwrap();
2557        let z = [0.0f32, 0.0, 0.0, 0.0];
2558        for (id, dims, vals) in [
2559            ("a", vec![1u32, 2], vec![5.0f32, 1.0]),
2560            ("b", vec![2u32, 3], vec![3.0f32, 4.0]),
2561            ("c", vec![1u32, 3], vec![2.0f32, 2.0]),
2562            ("d", vec![9u32], vec![1.0f32]), // shares no query term
2563        ] {
2564            db.upsert(
2565                "kb",
2566                id,
2567                &z,
2568                &json!({ "__quiver_sparse__": { "indices": dims, "values": vals } }),
2569            )
2570            .unwrap();
2571        }
2572        let q = SparseVector {
2573            indices: vec![1, 2, 3],
2574            values: vec![1.0, 1.0, 1.0],
2575        };
2576
2577        // The derived index is present and used.
2578        assert!(db.collections.get("kb").unwrap().sparse.is_some());
2579        let via_index = sparse_ids(&mut db, &q);
2580        assert!(!via_index.contains(&"d".to_owned()), "d shares no term");
2581
2582        // Drop the index (not stale, so no rebuild) → the store-scan fallback runs
2583        // and must return the identical ranking.
2584        db.collections.get_mut("kb").unwrap().sparse = None;
2585        let via_scan = sparse_ids(&mut db, &q);
2586        assert_eq!(via_index, via_scan);
2587    }
2588
2589    #[test]
2590    fn sparse_index_reflects_updates_and_deletes_like_a_rebuild() {
2591        let tmp = tempfile::tempdir().unwrap();
2592        let mut db = open(tmp.path());
2593        db.create_collection("kb", desc()).unwrap();
2594        let z = [0.0f32, 0.0, 0.0, 0.0];
2595        db.upsert(
2596            "kb",
2597            "a",
2598            &z,
2599            &json!({ "__quiver_sparse__": { "indices": [1, 2], "values": [5.0, 5.0] } }),
2600        )
2601        .unwrap();
2602        db.upsert(
2603            "kb",
2604            "b",
2605            &z,
2606            &json!({ "__quiver_sparse__": { "indices": [2], "values": [3.0] } }),
2607        )
2608        .unwrap();
2609        db.upsert(
2610            "kb",
2611            "c",
2612            &z,
2613            &json!({ "__quiver_sparse__": { "indices": [1], "values": [9.0] } }),
2614        )
2615        .unwrap();
2616        // Update "a" onto a disjoint term; delete "b".
2617        db.upsert(
2618            "kb",
2619            "a",
2620            &z,
2621            &json!({ "__quiver_sparse__": { "indices": [7], "values": [1.0] } }),
2622        )
2623        .unwrap();
2624        assert!(db.delete("kb", "b").unwrap());
2625
2626        let q = SparseVector {
2627            indices: vec![1, 2],
2628            values: vec![1.0, 1.0],
2629        };
2630        // Only "c" still shares a query term ("a" moved to 7, "b" is gone).
2631        let incremental = sparse_ids(&mut db, &q);
2632        assert_eq!(incremental, vec!["c".to_owned()]);
2633
2634        // A full rebuild from the store must agree with the incremental state.
2635        db.collections.get_mut("kb").unwrap().stale = true;
2636        let rebuilt = sparse_ids(&mut db, &q);
2637        assert_eq!(incremental, rebuilt);
2638    }
2639
2640    #[test]
2641    fn sparse_index_is_rebuilt_on_reopen() {
2642        let tmp = tempfile::tempdir().unwrap();
2643        {
2644            let mut db = open(tmp.path());
2645            db.create_collection("kb", desc()).unwrap();
2646            db.upsert(
2647                "kb",
2648                "a",
2649                &[0.0, 0.0, 0.0, 0.0],
2650                &json!({ "__quiver_sparse__": { "indices": [1], "values": [1.0] } }),
2651            )
2652            .unwrap();
2653        }
2654        let mut db = open(tmp.path());
2655        assert!(db.collections.get("kb").unwrap().sparse.is_some());
2656        let q = SparseVector {
2657            indices: vec![1],
2658            values: vec![1.0],
2659        };
2660        assert_eq!(sparse_ids(&mut db, &q), vec!["a".to_owned()]);
2661    }
2662
2663    #[test]
2664    fn hybrid_sparse_honours_the_payload_filter() {
2665        let tmp = tempfile::tempdir().unwrap();
2666        let mut db = open(tmp.path());
2667        db.create_collection("kb", desc()).unwrap();
2668        let z = [0.0f32, 0.0, 0.0, 0.0];
2669        db.upsert(
2670            "kb",
2671            "a",
2672            &z,
2673            &json!({ "lang": "en", "__quiver_sparse__": { "indices": [1], "values": [5.0] } }),
2674        )
2675        .unwrap();
2676        db.upsert(
2677            "kb",
2678            "b",
2679            &z,
2680            &json!({ "lang": "fr", "__quiver_sparse__": { "indices": [1], "values": [9.0] } }),
2681        )
2682        .unwrap();
2683        let q = SparseVector {
2684            indices: vec![1],
2685            values: vec![1.0],
2686        };
2687        let params = SearchParams {
2688            k: 10,
2689            filter: Some(Filter::Eq {
2690                field: "lang".to_owned(),
2691                value: json!("en"),
2692            }),
2693            ..SearchParams::default()
2694        };
2695        let hits: Vec<String> = db
2696            .hybrid_search("kb", None, Some(&q), None, &params, DEFAULT_RRF_K0)
2697            .unwrap()
2698            .into_iter()
2699            .map(|m| m.id)
2700            .collect();
2701        // "b" scores higher but is filtered out by lang == "en".
2702        assert_eq!(hits, vec!["a".to_owned()]);
2703    }
2704
2705    #[test]
2706    fn hybrid_text_search_indexes_and_ranks_by_bm25() {
2707        let tmp = tempfile::tempdir().unwrap();
2708        let mut db = open(tmp.path());
2709        db.create_collection("kb", desc()).unwrap();
2710        let z = [0.0f32, 0.0, 0.0, 0.0];
2711        // Points carry only a `__quiver_text__` string — tokenized at ingest.
2712        db.upsert(
2713            "kb",
2714            "cats",
2715            &z,
2716            &json!({ "__quiver_text__": "the quick brown cat jumps" }),
2717        )
2718        .unwrap();
2719        db.upsert(
2720            "kb",
2721            "dogs",
2722            &z,
2723            &json!({ "__quiver_text__": "a lazy dog sleeps all day" }),
2724        )
2725        .unwrap();
2726
2727        let params = SearchParams {
2728            k: 10,
2729            ..SearchParams::default()
2730        };
2731        // A text query (no dense, no explicit sparse) ranks the lexical match first;
2732        // "cat"/"cats" conflate via the stemmer.
2733        let hits: Vec<String> = db
2734            .hybrid_search("kb", None, None, Some("cats"), &params, DEFAULT_RRF_K0)
2735            .unwrap()
2736            .into_iter()
2737            .map(|m| m.id)
2738            .collect();
2739        assert_eq!(hits, vec!["cats".to_owned()], "only the cat doc matches");
2740
2741        // A non-matching query returns nothing from the text side.
2742        assert!(
2743            db.hybrid_search("kb", None, None, Some("elephant"), &params, DEFAULT_RRF_K0)
2744                .unwrap()
2745                .is_empty()
2746        );
2747
2748        // Text fuses with a dense query through the same RRF path.
2749        let dense_q = [1.0, 0.0, 0.0, 0.0];
2750        db.upsert("kb", "near", &[1.0, 0.0, 0.0, 0.0], &json!({}))
2751            .unwrap();
2752        let fused: Vec<String> = db
2753            .hybrid_search(
2754                "kb",
2755                Some(&dense_q),
2756                None,
2757                Some("dog"),
2758                &params,
2759                DEFAULT_RRF_K0,
2760            )
2761            .unwrap()
2762            .into_iter()
2763            .map(|m| m.id)
2764            .collect();
2765        assert!(
2766            fused.contains(&"near".to_owned()) && fused.contains(&"dogs".to_owned()),
2767            "dense match + lexical match both surface; got {fused:?}"
2768        );
2769    }
2770
2771    #[test]
2772    fn create_upsert_search_get_end_to_end() {
2773        let tmp = tempfile::tempdir().unwrap();
2774        let mut db = open(tmp.path());
2775        db.create_collection("items", desc()).unwrap();
2776        db.upsert(
2777            "items",
2778            "a",
2779            &[0.0, 0.0, 0.0, 0.0],
2780            &json!({"color": "red"}),
2781        )
2782        .unwrap();
2783        db.upsert(
2784            "items",
2785            "b",
2786            &[1.0, 0.0, 0.0, 0.0],
2787            &json!({"color": "blue"}),
2788        )
2789        .unwrap();
2790        db.upsert(
2791            "items",
2792            "c",
2793            &[5.0, 5.0, 5.0, 5.0],
2794            &json!({"color": "red"}),
2795        )
2796        .unwrap();
2797
2798        let near = db
2799            .search("items", &[0.1, 0.0, 0.0, 0.0], &SearchParams::default())
2800            .unwrap();
2801        assert_eq!(near[0].id, "a");
2802        assert_eq!(near[1].id, "b");
2803
2804        let got = db.get("items", "c").unwrap().unwrap();
2805        assert_eq!(got.vector, Some(vec![5.0, 5.0, 5.0, 5.0]));
2806        assert_eq!(got.payload, Some(json!({"color": "red"})));
2807    }
2808
2809    #[test]
2810    fn upsert_batch_produces_same_search_results_as_sequential() {
2811        let tmp_seq = tempfile::tempdir().unwrap();
2812        let tmp_bat = tempfile::tempdir().unwrap();
2813
2814        let vectors: Vec<[f32; 4]> = (0..20u32).map(|i| [i as f32, 0.0, 0.0, 0.0]).collect();
2815        let ids: Vec<String> = (0..20u32).map(|i| format!("p{i}")).collect();
2816        let payload = json!({});
2817
2818        // Sequential upserts
2819        {
2820            let mut db = open(tmp_seq.path());
2821            db.create_collection("c", desc()).unwrap();
2822            for (id, vec) in ids.iter().zip(vectors.iter()) {
2823                db.upsert("c", id, vec, &payload).unwrap();
2824            }
2825        }
2826        // Batch upsert
2827        {
2828            let mut db = open(tmp_bat.path());
2829            db.create_collection("c", desc()).unwrap();
2830            let pts: Vec<(&str, &[f32], &serde_json::Value)> = ids
2831                .iter()
2832                .zip(vectors.iter())
2833                .map(|(id, v)| (id.as_str(), v.as_slice(), &payload))
2834                .collect();
2835            let n = db.upsert_batch("c", &pts).unwrap();
2836            assert_eq!(n, 20);
2837        }
2838
2839        let query = [10.0f32, 0.0, 0.0, 0.0];
2840        let params = SearchParams {
2841            k: 5,
2842            ..Default::default()
2843        };
2844
2845        let mut seq_db = open(tmp_seq.path());
2846        let mut bat_db = open(tmp_bat.path());
2847        let seq: Vec<String> = seq_db
2848            .search("c", &query, &params)
2849            .unwrap()
2850            .into_iter()
2851            .map(|m| m.id)
2852            .collect();
2853        let bat: Vec<String> = bat_db
2854            .search("c", &query, &params)
2855            .unwrap()
2856            .into_iter()
2857            .map(|m| m.id)
2858            .collect();
2859        assert_eq!(
2860            seq, bat,
2861            "batch and sequential produce different search results"
2862        );
2863    }
2864
2865    #[test]
2866    fn upsert_bulk_defers_the_index_then_searches_correctly() {
2867        let tmp = tempfile::tempdir().unwrap();
2868        let mut db = open(tmp.path());
2869        db.create_collection("c", desc()).unwrap();
2870        let vectors: Vec<[f32; 4]> = (0..20u32).map(|i| [i as f32, 0.0, 0.0, 0.0]).collect();
2871        let ids: Vec<String> = (0..20u32).map(|i| format!("p{i}")).collect();
2872        // Give one point a sparse vector so the deferred rebuild also rebuilds the
2873        // inverted index.
2874        let plain = json!({});
2875        let sparse_payload = json!({ "__quiver_sparse__": { "indices": [7], "values": [1.0] } });
2876        let pts: Vec<(&str, &[f32], &serde_json::Value)> = ids
2877            .iter()
2878            .zip(vectors.iter())
2879            .map(|(id, v)| {
2880                let payload = if id == "p3" { &sparse_payload } else { &plain };
2881                (id.as_str(), v.as_slice(), payload)
2882            })
2883            .collect();
2884        let n = db.upsert_bulk("c", &pts).unwrap();
2885        assert_eq!(n, 20);
2886
2887        // The bulk path defers index maintenance: the handle is stale until a read.
2888        assert!(db.collections.get("c").unwrap().stale);
2889
2890        // Dense search triggers the single rebuild and returns the nearest points.
2891        let query = [10.0f32, 0.0, 0.0, 0.0];
2892        let params = SearchParams {
2893            k: 5,
2894            ..Default::default()
2895        };
2896        let hits: Vec<String> = db
2897            .search("c", &query, &params)
2898            .unwrap()
2899            .into_iter()
2900            .map(|m| m.id)
2901            .collect();
2902        assert_eq!(hits[0], "p10", "nearest to 10 is p10; got {hits:?}");
2903        assert!(!db.collections.get("c").unwrap().stale, "rebuilt on read");
2904
2905        // The sparse vector loaded via the bulk path is searchable after the rebuild.
2906        let q = SparseVector {
2907            indices: vec![7],
2908            values: vec![1.0],
2909        };
2910        let sparse_hits: Vec<String> = db
2911            .hybrid_search("c", None, Some(&q), None, &params, DEFAULT_RRF_K0)
2912            .unwrap()
2913            .into_iter()
2914            .map(|m| m.id)
2915            .collect();
2916        assert_eq!(sparse_hits, vec!["p3".to_owned()]);
2917    }
2918
2919    #[test]
2920    fn filtered_search_only_returns_matching_payloads() {
2921        let tmp = tempfile::tempdir().unwrap();
2922        let mut db = open(tmp.path());
2923        db.create_collection("items", desc()).unwrap();
2924        for i in 0..20u32 {
2925            let color = if i % 2 == 0 { "red" } else { "blue" };
2926            db.upsert(
2927                "items",
2928                &format!("p{i}"),
2929                &[i as f32, 0.0, 0.0, 0.0],
2930                &json!({"color": color, "n": i}),
2931            )
2932            .unwrap();
2933        }
2934        let params = SearchParams {
2935            k: 5,
2936            filter: Some(Filter::Eq {
2937                field: "color".into(),
2938                value: json!("red"),
2939            }),
2940            ef_search: 64,
2941            with_payload: true,
2942            with_vector: false,
2943        };
2944        let results = db.search("items", &[0.0; 4], &params).unwrap();
2945        assert!(!results.is_empty());
2946        for m in &results {
2947            assert_eq!(m.payload.as_ref().unwrap()["color"], json!("red"));
2948        }
2949    }
2950
2951    #[test]
2952    fn persists_and_rebuilds_index_on_reopen() {
2953        let tmp = tempfile::tempdir().unwrap();
2954        {
2955            let mut db = open(tmp.path());
2956            db.create_collection("items", desc()).unwrap();
2957            for i in 0..50u32 {
2958                db.upsert(
2959                    "items",
2960                    &format!("p{i}"),
2961                    &[i as f32, 1.0, 2.0, 3.0],
2962                    &json!({}),
2963                )
2964                .unwrap();
2965            }
2966            db.checkpoint().unwrap();
2967        }
2968        let mut db = open(tmp.path());
2969        assert_eq!(db.len("items").unwrap(), 50);
2970        let res = db
2971            .search("items", &[7.0, 1.0, 2.0, 3.0], &SearchParams::default())
2972            .unwrap();
2973        assert_eq!(res[0].id, "p7");
2974    }
2975
2976    #[test]
2977    fn update_reflects_new_vector_after_rebuild() {
2978        let tmp = tempfile::tempdir().unwrap();
2979        let mut db = open(tmp.path());
2980        db.create_collection("items", desc()).unwrap();
2981        db.upsert("items", "a", &[0.0; 4], &json!({})).unwrap();
2982        db.upsert("items", "b", &[10.0, 0.0, 0.0, 0.0], &json!({}))
2983            .unwrap();
2984        // Move "a" far away; querying near the origin should now prefer "b".
2985        db.upsert("items", "a", &[100.0, 0.0, 0.0, 0.0], &json!({}))
2986            .unwrap();
2987        let res = db
2988            .search("items", &[0.0; 4], &SearchParams::default())
2989            .unwrap();
2990        assert_eq!(res[0].id, "b");
2991        assert_eq!(
2992            db.get("items", "a").unwrap().unwrap().vector,
2993            Some(vec![100.0, 0.0, 0.0, 0.0])
2994        );
2995    }
2996
2997    #[test]
2998    fn delete_removes_from_search() {
2999        let tmp = tempfile::tempdir().unwrap();
3000        let mut db = open(tmp.path());
3001        db.create_collection("items", desc()).unwrap();
3002        db.upsert("items", "a", &[0.0; 4], &json!({})).unwrap();
3003        db.upsert("items", "b", &[1.0, 0.0, 0.0, 0.0], &json!({}))
3004            .unwrap();
3005        assert!(db.delete("items", "a").unwrap());
3006        let res = db
3007            .search("items", &[0.0; 4], &SearchParams::default())
3008            .unwrap();
3009        assert!(res.iter().all(|m| m.id != "a"));
3010        assert!(db.get("items", "a").unwrap().is_none());
3011    }
3012
3013    #[test]
3014    fn unknown_collection_errors() {
3015        let tmp = tempfile::tempdir().unwrap();
3016        let mut db = open(tmp.path());
3017        assert!(matches!(
3018            db.search("nope", &[0.0; 4], &SearchParams::default()),
3019            Err(Error::CollectionNotFound(_))
3020        ));
3021        db.create_collection("c", desc()).unwrap();
3022        assert!(matches!(
3023            db.create_collection("c", desc()),
3024            Err(Error::Core(quiver_core::CoreError::AlreadyExists(_)))
3025        ));
3026    }
3027
3028    fn desc_with(kind: IndexKind) -> Descriptor {
3029        Descriptor::new(4, Dtype::F32, DistanceMetric::L2).with_index(IndexSpec {
3030            kind,
3031            pq_subspaces: None,
3032        })
3033    }
3034
3035    #[test]
3036    fn vamana_and_ivf_collections_find_the_nearest_point() {
3037        for kind in [IndexKind::Vamana, IndexKind::Ivf] {
3038            let tmp = tempfile::tempdir().unwrap();
3039            let mut db = open(tmp.path());
3040            db.create_collection("c", desc_with(kind)).unwrap();
3041            for i in 0..40u32 {
3042                db.upsert(
3043                    "c",
3044                    &format!("p{i}"),
3045                    &[i as f32, 0.0, 0.0, 0.0],
3046                    &json!({}),
3047                )
3048                .unwrap();
3049            }
3050            // ef_search maps onto the index's search width (l_search / nprobe).
3051            let res = db
3052                .search("c", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
3053                .unwrap();
3054            assert_eq!(res[0].id, "p7", "{kind:?} nearest");
3055        }
3056    }
3057
3058    #[test]
3059    fn index_kind_persists_and_rebuilds_on_reopen() {
3060        let tmp = tempfile::tempdir().unwrap();
3061        {
3062            let mut db = open(tmp.path());
3063            db.create_collection("v", desc_with(IndexKind::Vamana))
3064                .unwrap();
3065            for i in 0..20u32 {
3066                db.upsert(
3067                    "v",
3068                    &format!("p{i}"),
3069                    &[i as f32, 1.0, 2.0, 3.0],
3070                    &json!({}),
3071                )
3072                .unwrap();
3073            }
3074            db.checkpoint().unwrap();
3075        }
3076        let mut db = open(tmp.path());
3077        assert_eq!(db.descriptor("v").unwrap().index.kind, IndexKind::Vamana);
3078        let res = db
3079            .search("v", &[7.0, 1.0, 2.0, 3.0], &SearchParams::default())
3080            .unwrap();
3081        assert_eq!(res[0].id, "p7");
3082    }
3083
3084    #[test]
3085    fn ivf_upserts_and_deletes_incrementally_without_rebuild() {
3086        let tmp = tempfile::tempdir().unwrap();
3087        let mut db = open(tmp.path());
3088        db.create_collection("c", desc_with(IndexKind::Ivf))
3089            .unwrap();
3090        for i in 0..50u32 {
3091            db.upsert(
3092                "c",
3093                &format!("p{i}"),
3094                &[i as f32, 0.0, 0.0, 0.0],
3095                &json!({}),
3096            )
3097            .unwrap();
3098        }
3099        // The first search builds (and trains) the IVF from the store.
3100        let _ = db
3101            .search("c", &[1.0, 0.0, 0.0, 0.0], &SearchParams::default())
3102            .unwrap();
3103        assert!(!db.collections["c"].stale, "the search built the index");
3104
3105        // Incremental insert: a new outlier is found, with no rebuild scheduled.
3106        db.upsert("c", "far", &[500.0, 0.0, 0.0, 0.0], &json!({}))
3107            .unwrap();
3108        assert!(!db.collections["c"].stale, "ivf insert stayed incremental");
3109        let res = db
3110            .search("c", &[500.0, 0.0, 0.0, 0.0], &SearchParams::default())
3111            .unwrap();
3112        assert_eq!(res[0].id, "far");
3113
3114        // Incremental delete: the point disappears, still with no rebuild.
3115        assert!(db.delete("c", "far").unwrap());
3116        assert!(!db.collections["c"].stale, "ivf delete stayed incremental");
3117        let res = db
3118            .search("c", &[500.0, 0.0, 0.0, 0.0], &SearchParams::default())
3119            .unwrap();
3120        assert!(res.iter().all(|m| m.id != "far"), "deleted point is gone");
3121    }
3122
3123    #[test]
3124    fn ivf_incremental_update_replaces_the_vector() {
3125        let tmp = tempfile::tempdir().unwrap();
3126        let mut db = open(tmp.path());
3127        db.create_collection("c", desc_with(IndexKind::Ivf))
3128            .unwrap();
3129        for i in 0..30u32 {
3130            db.upsert(
3131                "c",
3132                &format!("p{i}"),
3133                &[i as f32, 0.0, 0.0, 0.0],
3134                &json!({}),
3135            )
3136            .unwrap();
3137        }
3138        let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
3139        // Move p5 far away in place.
3140        db.upsert("c", "p5", &[900.0, 0.0, 0.0, 0.0], &json!({}))
3141            .unwrap();
3142        assert!(!db.collections["c"].stale);
3143        let at_new = db
3144            .search("c", &[900.0, 0.0, 0.0, 0.0], &SearchParams::default())
3145            .unwrap();
3146        assert_eq!(at_new[0].id, "p5", "p5 found at its new location");
3147        let at_old = db
3148            .search("c", &[5.0, 0.0, 0.0, 0.0], &SearchParams::default())
3149            .unwrap();
3150        assert!(at_old.iter().all(|m| m.id != "p5"), "stale vector is gone");
3151    }
3152
3153    #[test]
3154    fn ivf_reinsert_after_incremental_delete_is_found() {
3155        let tmp = tempfile::tempdir().unwrap();
3156        let mut db = open(tmp.path());
3157        db.create_collection("c", desc_with(IndexKind::Ivf))
3158            .unwrap();
3159        for i in 0..20u32 {
3160            db.upsert(
3161                "c",
3162                &format!("p{i}"),
3163                &[i as f32, 0.0, 0.0, 0.0],
3164                &json!({}),
3165            )
3166            .unwrap();
3167        }
3168        let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
3169        assert!(db.delete("c", "p3").unwrap());
3170        assert!(!db.collections["c"].stale);
3171        // Re-insert the same id; it must be searchable again (the slot is reused).
3172        db.upsert("c", "p3", &[3.0, 0.0, 0.0, 0.0], &json!({}))
3173            .unwrap();
3174        assert!(!db.collections["c"].stale);
3175        let res = db
3176            .search("c", &[3.0, 0.0, 0.0, 0.0], &SearchParams::default())
3177            .unwrap();
3178        assert_eq!(res[0].id, "p3");
3179    }
3180
3181    #[test]
3182    fn hnsw_in_place_update_falls_back_to_rebuild() {
3183        // HNSW cannot update an id in place, so an update marks the index stale.
3184        let tmp = tempfile::tempdir().unwrap();
3185        let mut db = open(tmp.path());
3186        db.create_collection("c", desc()).unwrap();
3187        for i in 0..10u32 {
3188            db.upsert(
3189                "c",
3190                &format!("p{i}"),
3191                &[i as f32, 0.0, 0.0, 0.0],
3192                &json!({}),
3193            )
3194            .unwrap();
3195        }
3196        let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
3197        assert!(!db.collections["c"].stale);
3198        db.upsert("c", "p2", &[42.0, 0.0, 0.0, 0.0], &json!({}))
3199            .unwrap();
3200        assert!(db.collections["c"].stale, "hnsw update schedules a rebuild");
3201        // The rebuild on the next search reflects the new vector.
3202        let res = db
3203            .search("c", &[42.0, 0.0, 0.0, 0.0], &SearchParams::default())
3204            .unwrap();
3205        assert_eq!(res[0].id, "p2");
3206    }
3207
3208    #[test]
3209    fn unsupported_index_configurations_are_rejected() {
3210        let tmp = tempfile::tempdir().unwrap();
3211        let mut db = open(tmp.path());
3212        // Vamana/IVF do not support inner product.
3213        let dot_vamana =
3214            Descriptor::new(4, Dtype::F32, DistanceMetric::Dot).with_index(IndexSpec {
3215                kind: IndexKind::Vamana,
3216                pq_subspaces: None,
3217            });
3218        assert!(matches!(
3219            db.create_collection("a", dot_vamana),
3220            Err(Error::Unsupported(_))
3221        ));
3222        // ...nor does the disk-resident index.
3223        let dot_disk = Descriptor::new(4, Dtype::F32, DistanceMetric::Dot).with_index(IndexSpec {
3224            kind: IndexKind::DiskVamana,
3225            pq_subspaces: None,
3226        });
3227        assert!(matches!(
3228            db.create_collection("b", dot_disk),
3229            Err(Error::Unsupported(_))
3230        ));
3231    }
3232
3233    #[test]
3234    fn dcpe_collections_require_the_l2_metric() {
3235        let tmp = tempfile::tempdir().unwrap();
3236        let mut db = open(tmp.path());
3237        // DCPE preserves Euclidean distance, so cosine/dot are rejected.
3238        for metric in [DistanceMetric::Cosine, DistanceMetric::Dot] {
3239            let bad = Descriptor::new(4, Dtype::F32, metric)
3240                .with_vector_encryption(VectorEncryption::Dcpe);
3241            assert!(matches!(
3242                db.create_collection("bad", bad),
3243                Err(Error::Unsupported(_))
3244            ));
3245        }
3246        // L2 is accepted, and the flag persists on the descriptor.
3247        let good = Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
3248            .with_vector_encryption(VectorEncryption::Dcpe);
3249        db.create_collection("enc", good)
3250            .expect("l2 dcpe collection");
3251        assert_eq!(
3252            db.descriptor("enc").expect("descriptor").vector_encryption,
3253            VectorEncryption::Dcpe
3254        );
3255    }
3256
3257    #[test]
3258    fn client_side_collections_are_fetch_only_and_reject_search() {
3259        let tmp = tempfile::tempdir().unwrap();
3260        let mut db = open(tmp.path());
3261        // Any metric is allowed (the server never ranks), so there is no L2
3262        // restriction as there is for DCPE.
3263        let desc = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
3264            .with_vector_encryption(VectorEncryption::ClientSide);
3265        db.create_collection("vault", desc)
3266            .expect("create client-side collection");
3267        // No server-side index is built for a client-side collection (ADR-0032).
3268        assert!(matches!(
3269            db.collections["vault"].index,
3270            CollectionIndex::None
3271        ));
3272
3273        // Upsert opaque placeholder points: a zero vector plus a payload carrying a
3274        // stand-in sealed vector blob and a cleartext, server-filterable field.
3275        for i in 0..5 {
3276            let tier = if i < 2 { "vip" } else { "std" };
3277            db.upsert(
3278                "vault",
3279                &format!("p{i}"),
3280                &[0.0; 4],
3281                &serde_json::json!({ "__quiver_vec__": format!("ct-{i}"), "tier": tier }),
3282            )
3283            .expect("upsert");
3284        }
3285        assert_eq!(db.len("vault").unwrap(), 5);
3286        // Still no index after writes — it never goes stale or rebuilds.
3287        assert!(matches!(
3288            db.collections["vault"].index,
3289            CollectionIndex::None
3290        ));
3291
3292        // Ranked search is rejected: the server cannot rank opaque vectors.
3293        assert!(matches!(
3294            db.search("vault", &[0.0; 4], &SearchParams::default()),
3295            Err(Error::Unsupported(_))
3296        ));
3297
3298        // Fetch returns the entitled set; each payload carries the blob the client
3299        // would decrypt and rank locally, and vectors are omitted when not asked for.
3300        let all = db.fetch("vault", None, 100, true, false).unwrap();
3301        assert_eq!(all.len(), 5);
3302        assert!(
3303            all.iter()
3304                .all(|m| m.payload.is_some() && m.vector.is_none())
3305        );
3306
3307        // A cleartext payload filter narrows the set server-side.
3308        let vip = db
3309            .fetch(
3310                "vault",
3311                Some(&Filter::Eq {
3312                    field: "tier".to_owned(),
3313                    value: serde_json::json!("vip"),
3314                }),
3315                100,
3316                false,
3317                false,
3318            )
3319            .unwrap();
3320        assert_eq!(vip.len(), 2);
3321        // A limit bounds the returned set.
3322        assert_eq!(db.fetch("vault", None, 2, false, false).unwrap().len(), 2);
3323
3324        // get returns the stored placeholder + blob payload by id; delete works
3325        // through the store with no index to update.
3326        assert_eq!(db.get("vault", "p0").unwrap().unwrap().id, "p0");
3327        assert!(db.delete("vault", "p0").unwrap());
3328        assert_eq!(db.len("vault").unwrap(), 4);
3329    }
3330
3331    #[test]
3332    fn client_side_encryption_rejects_multivector() {
3333        let tmp = tempfile::tempdir().unwrap();
3334        let mut db = open(tmp.path());
3335        let desc = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
3336            .with_multivector(true)
3337            .with_vector_encryption(VectorEncryption::ClientSide);
3338        assert!(matches!(
3339            db.create_collection("bad", desc),
3340            Err(Error::Unsupported(_))
3341        ));
3342    }
3343
3344    // Recursively check whether a file named `name` exists under `dir`.
3345    fn contains_file(dir: &Path, name: &str) -> bool {
3346        std::fs::read_dir(dir).is_ok_and(|rd| {
3347            rd.flatten().any(|e| {
3348                let p = e.path();
3349                if p.is_dir() {
3350                    contains_file(&p, name)
3351                } else {
3352                    p.file_name().is_some_and(|f| f == name)
3353                }
3354            })
3355        })
3356    }
3357
3358    #[test]
3359    fn disk_index_collection_searches_persists_and_writes_an_artifact() {
3360        let tmp = tempfile::tempdir().unwrap();
3361        {
3362            let mut db = open(tmp.path());
3363            db.create_collection("d", desc_with(IndexKind::DiskVamana))
3364                .unwrap();
3365            for i in 0..40u32 {
3366                db.upsert(
3367                    "d",
3368                    &format!("p{i}"),
3369                    &[i as f32, 0.0, 0.0, 0.0],
3370                    &json!({}),
3371                )
3372                .unwrap();
3373            }
3374            let res = db
3375                .search("d", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
3376                .unwrap();
3377            assert_eq!(res[0].id, "p7");
3378            db.checkpoint().unwrap();
3379        }
3380        // The encrypted disk artifact was written under the collection's index dir.
3381        assert!(
3382            contains_file(tmp.path(), "vamana.qvx"),
3383            "disk index file missing"
3384        );
3385        // Reopening rebuilds and re-opens the disk index; search still works.
3386        let mut db = open(tmp.path());
3387        assert_eq!(
3388            db.descriptor("d").unwrap().index.kind,
3389            IndexKind::DiskVamana
3390        );
3391        let res = db
3392            .search("d", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
3393            .unwrap();
3394        assert_eq!(res[0].id, "p7");
3395    }
3396
3397    #[test]
3398    fn graph_collections_maintain_writes_incrementally() {
3399        // FreshDiskANN incremental maintenance (ADR-0033): after the base graph is
3400        // built, inserts land in the delta, deletes tombstone, and updates move —
3401        // all without a rebuild or a reopen, for both the in-memory and disk graphs.
3402        for kind in [IndexKind::Vamana, IndexKind::DiskVamana] {
3403            let tmp = tempfile::tempdir().unwrap();
3404            let mut db = open(tmp.path());
3405            db.create_collection("c", desc_with(kind)).unwrap();
3406            for i in 0..40u32 {
3407                db.upsert(
3408                    "c",
3409                    &format!("p{i}"),
3410                    &[i as f32, 0.0, 0.0, 0.0],
3411                    &json!({}),
3412                )
3413                .unwrap();
3414            }
3415            // The first search builds the read-only base graph.
3416            let res = db
3417                .search("c", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
3418                .unwrap();
3419            assert_eq!(res[0].id, "p7", "{kind:?} base nearest");
3420
3421            // A brand-new point lands in the in-memory delta and is immediately
3422            // findable — no rebuild, no reopen.
3423            db.upsert("c", "p7b", &[7.4, 0.0, 0.0, 0.0], &json!({}))
3424                .unwrap();
3425            let res = db
3426                .search("c", &[7.45, 0.0, 0.0, 0.0], &SearchParams::default())
3427                .unwrap();
3428            assert_eq!(res[0].id, "p7b", "{kind:?} delta insert not found");
3429
3430            // A delete tombstones the point: it is never returned again.
3431            assert!(db.delete("c", "p7").unwrap());
3432            let res = db
3433                .search("c", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
3434                .unwrap();
3435            assert!(
3436                res.iter().all(|m| m.id != "p7"),
3437                "{kind:?} deleted id returned"
3438            );
3439
3440            // An update moves a vector: it is found at the new position and no
3441            // longer dominates the old one.
3442            db.upsert("c", "p20", &[500.0, 0.0, 0.0, 0.0], &json!({}))
3443                .unwrap();
3444            let res = db
3445                .search("c", &[500.0, 0.0, 0.0, 0.0], &SearchParams::default())
3446                .unwrap();
3447            assert_eq!(res[0].id, "p20", "{kind:?} updated vector not at new spot");
3448            let res = db
3449                .search("c", &[20.0, 0.0, 0.0, 0.0], &SearchParams::default())
3450                .unwrap();
3451            assert_ne!(
3452                res[0].id, "p20",
3453                "{kind:?} stale copy still nearest old spot"
3454            );
3455        }
3456    }
3457
3458    #[test]
3459    fn graph_consolidates_under_heavy_churn() {
3460        // Churn past the 20% pending threshold forces a consolidation (a derived
3461        // rebuild from the store); results stay correct throughout and across a
3462        // reopen, since the store is the source of truth (ADR-0033).
3463        let tmp = tempfile::tempdir().unwrap();
3464        let mut db = open(tmp.path());
3465        db.create_collection("c", desc_with(IndexKind::Vamana))
3466            .unwrap();
3467        for i in 0..50u32 {
3468            db.upsert(
3469                "c",
3470                &format!("p{i}"),
3471                &[i as f32, 0.0, 0.0, 0.0],
3472                &json!({}),
3473            )
3474            .unwrap();
3475        }
3476        let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
3477
3478        let deleted: Vec<String> = (0..15u32).map(|i| format!("p{i}")).collect();
3479        for i in 0..15u32 {
3480            assert!(db.delete("c", &format!("p{i}")).unwrap());
3481            db.upsert(
3482                "c",
3483                &format!("q{i}"),
3484                &[1000.0 + i as f32, 0.0, 0.0, 0.0],
3485                &json!({}),
3486            )
3487            .unwrap();
3488        }
3489
3490        let near_origin = db
3491            .search("c", &[5.0, 0.0, 0.0, 0.0], &SearchParams::default())
3492            .unwrap();
3493        assert!(
3494            near_origin.iter().all(|m| !deleted.contains(&m.id)),
3495            "a churned-out id was returned"
3496        );
3497        let near_q = db
3498            .search("c", &[1007.0, 0.0, 0.0, 0.0], &SearchParams::default())
3499            .unwrap();
3500        assert_eq!(near_q[0].id, "q7", "new point not found after churn");
3501
3502        db.checkpoint().unwrap();
3503        drop(db);
3504        let mut db = open(tmp.path());
3505        let near_q = db
3506            .search("c", &[1007.0, 0.0, 0.0, 0.0], &SearchParams::default())
3507            .unwrap();
3508        assert_eq!(near_q[0].id, "q7", "new point lost across reopen");
3509        let near_origin = db
3510            .search("c", &[5.0, 0.0, 0.0, 0.0], &SearchParams::default())
3511            .unwrap();
3512        assert!(
3513            near_origin.iter().all(|m| !deleted.contains(&m.id)),
3514            "a churned-out id resurfaced after reopen"
3515        );
3516    }
3517
3518    #[test]
3519    fn multivector_writes_are_incremental_and_match_a_rebuild() {
3520        // Token rows fold into the ANN index incrementally instead of stale->rebuild
3521        // (ADR-0034); the document API stays correct under interleaved
3522        // upsert/delete/re-upsert with no reopen, and a reopen (the authoritative
3523        // rebuild) yields the identical ranking. Docs lie on an arc of increasing
3524        // angle to the query (cosine, so direction alone ranks them), making the
3525        // expected order unambiguous. (Token-pool candidate generation only engages
3526        // above the exact-scan threshold; the incremental token maintenance it
3527        // relies on is the same code the single-vector index tests exercise.)
3528        let dir = |theta: f32| vec![theta.cos(), theta.sin(), 0.0, 0.0];
3529        let doc = |theta: f32| vec![dir(theta), dir(theta)];
3530        for kind in [
3531            IndexKind::Ivf,
3532            IndexKind::Hnsw,
3533            IndexKind::Vamana,
3534            IndexKind::Colbert,
3535        ] {
3536            let tmp = tempfile::tempdir().unwrap();
3537            let desc = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
3538                .with_multivector(true)
3539                .with_index(IndexSpec {
3540                    kind,
3541                    pq_subspaces: None,
3542                });
3543            let mut db = open(tmp.path());
3544            db.create_collection("m", desc).unwrap();
3545            // d1 is most aligned with the query (smallest angle), d10 least.
3546            for i in 1..=10u32 {
3547                db.upsert_document(
3548                    "m",
3549                    &format!("d{i}"),
3550                    &doc(0.1 * i as f32),
3551                    &json!({ "i": i }),
3552                )
3553                .unwrap();
3554            }
3555            let q = vec![dir(0.0)];
3556            let top = |db: &mut Database| {
3557                db.search_multi_vector(
3558                    "m",
3559                    &q,
3560                    &SearchParams {
3561                        k: 3,
3562                        ..Default::default()
3563                    },
3564                )
3565                .unwrap()
3566                .into_iter()
3567                .map(|m| m.id)
3568                .collect::<Vec<_>>()
3569            };
3570            assert_eq!(top(&mut db), vec!["d1", "d2", "d3"], "{kind:?} initial");
3571
3572            // Delete the top document — gone without a reopen.
3573            assert!(db.delete_document("m", "d1").unwrap());
3574            assert_eq!(
3575                top(&mut db),
3576                vec!["d2", "d3", "d4"],
3577                "{kind:?} after delete"
3578            );
3579
3580            // Re-upsert the least-aligned doc onto the query — the update is live.
3581            db.upsert_document("m", "d10", &doc(0.0), &json!({ "i": 10 }))
3582                .unwrap();
3583            assert_eq!(top(&mut db)[0], "d10", "{kind:?} after update");
3584
3585            // A new, near-aligned document is immediately findable, second only to d10.
3586            db.upsert_document("m", "d11", &doc(0.05), &json!({ "i": 11 }))
3587                .unwrap();
3588            let r = top(&mut db);
3589            assert_eq!(r[0], "d10", "{kind:?}");
3590            assert_eq!(r[1], "d11", "{kind:?} new doc not ranked");
3591
3592            // A shorter re-upsert drops the trailing token row.
3593            db.upsert_document("m", "d8", &[dir(0.8)], &json!({ "i": 8 }))
3594                .unwrap();
3595            let d8 = db.get_document("m", "d8", true).unwrap().unwrap();
3596            assert_eq!(d8.vectors.unwrap().len(), 1, "{kind:?} trailing token kept");
3597
3598            // The incremental in-memory state matches an authoritative rebuild.
3599            let before = top(&mut db);
3600            drop(db);
3601            let mut db = open(tmp.path());
3602            assert_eq!(top(&mut db), before, "{kind:?} incremental != rebuild");
3603            assert!(
3604                db.get_document("m", "d1", false).unwrap().is_none(),
3605                "{kind:?} deleted doc resurfaced"
3606            );
3607        }
3608    }
3609
3610    #[test]
3611    fn colbert_index_requires_multivector() {
3612        let tmp = tempfile::tempdir().unwrap();
3613        let mut db = open(tmp.path());
3614        // ColBERT is a late-interaction token-pool index — invalid for a
3615        // single-vector collection (ADR-0034).
3616        let single = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine).with_index(IndexSpec {
3617            kind: IndexKind::Colbert,
3618            pq_subspaces: None,
3619        });
3620        assert!(matches!(
3621            db.create_collection("c", single),
3622            Err(Error::Unsupported(_))
3623        ));
3624        // ...but valid for a multi-vector collection.
3625        let multi = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
3626            .with_multivector(true)
3627            .with_index(IndexSpec {
3628                kind: IndexKind::Colbert,
3629                pq_subspaces: None,
3630            });
3631        assert!(db.create_collection("m", multi).is_ok());
3632    }
3633
3634    // ---- hybrid (pre-filtered) search ----
3635
3636    // A collection whose `city` (keyword) and `n` (numeric) payload fields are
3637    // declared filterable, so the planner can pre-filter on them.
3638    fn desc_filterable() -> Descriptor {
3639        Descriptor::new(4, Dtype::F32, DistanceMetric::L2).with_filterable(vec![
3640            FilterableField::keyword("city"),
3641            FilterableField::numeric("n"),
3642        ])
3643    }
3644
3645    // 30 points on the x-axis (distance to the origin grows with i), cycling
3646    // through three cities, each carrying its index as a numeric `n`.
3647    // Checkpointed so the rows live in a sealed segment's secondary index — the
3648    // pre-filter primitive — not only the active buffer.
3649    fn seed_cities(db: &mut Database) {
3650        const CITIES: [&str; 3] = ["paris", "lyon", "rome"];
3651        db.create_collection("c", desc_filterable()).unwrap();
3652        for i in 0..30u32 {
3653            db.upsert(
3654                "c",
3655                &format!("p{i}"),
3656                &[i as f32, 0.0, 0.0, 0.0],
3657                &json!({"city": CITIES[i as usize % 3], "n": i}),
3658            )
3659            .unwrap();
3660        }
3661        db.checkpoint().unwrap();
3662    }
3663
3664    #[test]
3665    fn hybrid_equality_prefilter_is_exact() {
3666        let tmp = tempfile::tempdir().unwrap();
3667        let mut db = open(tmp.path());
3668        seed_cities(&mut db);
3669        let params = SearchParams {
3670            k: 5,
3671            filter: Some(Filter::Eq {
3672                field: "city".into(),
3673                value: json!("lyon"),
3674            }),
3675            ..SearchParams::default()
3676        };
3677        let res = db.search("c", &[0.0; 4], &params).unwrap();
3678        assert!(!res.is_empty());
3679        // lyon is i % 3 == 1 → p1, p4, p7, …; nearest the origin is p1.
3680        assert_eq!(res[0].id, "p1");
3681        for m in &res {
3682            assert_eq!(m.payload.as_ref().unwrap()["city"], json!("lyon"));
3683        }
3684    }
3685
3686    #[test]
3687    fn hybrid_numeric_range_prefilter_is_exact() {
3688        let tmp = tempfile::tempdir().unwrap();
3689        let mut db = open(tmp.path());
3690        seed_cities(&mut db);
3691        let params = SearchParams {
3692            k: 4,
3693            filter: Some(Filter::Gte {
3694                field: "n".into(),
3695                value: json!(10),
3696            }),
3697            ..SearchParams::default()
3698        };
3699        let res = db.search("c", &[0.0; 4], &params).unwrap();
3700        // Among n >= 10, the nearest the origin is n == 10 (p10).
3701        assert_eq!(res[0].id, "p10");
3702        for m in &res {
3703            assert!(m.payload.as_ref().unwrap()["n"].as_u64().unwrap() >= 10);
3704        }
3705    }
3706
3707    #[test]
3708    fn hybrid_unsatisfiable_filter_returns_empty() {
3709        let tmp = tempfile::tempdir().unwrap();
3710        let mut db = open(tmp.path());
3711        seed_cities(&mut db);
3712        // No row holds this city, so the pre-filter proves the result empty
3713        // without touching the vector index.
3714        let params = SearchParams {
3715            filter: Some(Filter::Eq {
3716                field: "city".into(),
3717                value: json!("atlantis"),
3718            }),
3719            ..SearchParams::default()
3720        };
3721        assert!(db.search("c", &[0.0; 4], &params).unwrap().is_empty());
3722    }
3723
3724    #[test]
3725    fn hybrid_and_or_composition_is_exact() {
3726        let tmp = tempfile::tempdir().unwrap();
3727        let mut db = open(tmp.path());
3728        seed_cities(&mut db);
3729        // (city in {paris, rome}) AND (n < 12): a keyword `in` intersected with a
3730        // numeric range, both pre-filterable.
3731        let params = SearchParams {
3732            k: 10,
3733            filter: Some(Filter::And(vec![
3734                Filter::In {
3735                    field: "city".into(),
3736                    values: vec![json!("paris"), json!("rome")],
3737                },
3738                Filter::Lt {
3739                    field: "n".into(),
3740                    value: json!(12),
3741                },
3742            ])),
3743            ..SearchParams::default()
3744        };
3745        let res = db.search("c", &[0.0; 4], &params).unwrap();
3746        // paris is i % 3 == 0 → the nearest qualifying point is p0.
3747        assert_eq!(res[0].id, "p0");
3748        for m in &res {
3749            let payload = m.payload.as_ref().unwrap();
3750            let city = payload["city"].as_str().unwrap();
3751            assert!(city == "paris" || city == "rome");
3752            assert!(payload["n"].as_u64().unwrap() < 12);
3753        }
3754    }
3755
3756    #[test]
3757    fn hybrid_rechecks_non_indexable_clause() {
3758        let tmp = tempfile::tempdir().unwrap();
3759        let mut db = open(tmp.path());
3760        seed_cities(&mut db);
3761        // city is pre-filterable; the Not(…) clause is not, so it is re-checked
3762        // exactly on the narrowed candidates — paris rows excluding p0.
3763        let params = SearchParams {
3764            k: 10,
3765            filter: Some(Filter::And(vec![
3766                Filter::Eq {
3767                    field: "city".into(),
3768                    value: json!("paris"),
3769                },
3770                Filter::Not(Box::new(Filter::Eq {
3771                    field: "n".into(),
3772                    value: json!(0),
3773                })),
3774            ])),
3775            ..SearchParams::default()
3776        };
3777        let res = db.search("c", &[0.0; 4], &params).unwrap();
3778        assert!(res.iter().all(|m| m.id != "p0"));
3779        // The nearest paris point after p0 is p3.
3780        assert_eq!(res[0].id, "p3");
3781        for m in &res {
3782            assert_eq!(m.payload.as_ref().unwrap()["city"], json!("paris"));
3783        }
3784    }
3785
3786    #[test]
3787    fn post_filter_fallback_on_undeclared_field_is_correct() {
3788        let tmp = tempfile::tempdir().unwrap();
3789        let mut db = open(tmp.path());
3790        // Only `city` is filterable; a filter on the undeclared `tier` cannot
3791        // pre-filter and falls back to ANN post-filtering — still exact.
3792        db.create_collection(
3793            "c",
3794            Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
3795                .with_filterable(vec![FilterableField::keyword("city")]),
3796        )
3797        .unwrap();
3798        for i in 0..20u32 {
3799            let tier = if i % 2 == 0 { "gold" } else { "silver" };
3800            db.upsert(
3801                "c",
3802                &format!("p{i}"),
3803                &[i as f32, 0.0, 0.0, 0.0],
3804                &json!({"city": "paris", "tier": tier}),
3805            )
3806            .unwrap();
3807        }
3808        let params = SearchParams {
3809            k: 5,
3810            filter: Some(Filter::Eq {
3811                field: "tier".into(),
3812                value: json!("gold"),
3813            }),
3814            ..SearchParams::default()
3815        };
3816        let res = db.search("c", &[0.0; 4], &params).unwrap();
3817        assert!(!res.is_empty());
3818        for m in &res {
3819            assert_eq!(m.payload.as_ref().unwrap()["tier"], json!("gold"));
3820        }
3821    }
3822
3823    #[test]
3824    fn leaf_predicate_maps_only_indexable_filterable_leaves() {
3825        let fields = vec![
3826            FilterableField::keyword("city"),
3827            FilterableField::numeric("n"),
3828        ];
3829        // Keyword equality on a filterable field maps.
3830        assert_eq!(
3831            leaf_predicate(
3832                &Filter::Eq {
3833                    field: "city".into(),
3834                    value: json!("paris")
3835                },
3836                &fields
3837            ),
3838            Some(SecPredicate::Eq {
3839                field: "city".into(),
3840                value: SecValue::Keyword("paris".into())
3841            })
3842        );
3843        // A numeric comparison maps to a one-sided range.
3844        assert_eq!(
3845            leaf_predicate(
3846                &Filter::Gte {
3847                    field: "n".into(),
3848                    value: json!(3)
3849                },
3850                &fields
3851            ),
3852            Some(SecPredicate::Range {
3853                field: "n".into(),
3854                lo: Some(SecValue::Numeric(3.0)),
3855                hi: None,
3856                lo_inclusive: true,
3857                hi_inclusive: false,
3858            })
3859        );
3860        // Undeclared field, type mismatch, `ne`, and `exists` do not map.
3861        let undeclared = Filter::Eq {
3862            field: "tier".into(),
3863            value: json!("gold"),
3864        };
3865        let mismatch = Filter::Eq {
3866            field: "city".into(),
3867            value: json!(5),
3868        };
3869        let ne = Filter::Ne {
3870            field: "city".into(),
3871            value: json!("x"),
3872        };
3873        let exists = Filter::Exists {
3874            field: "city".into(),
3875        };
3876        assert!(leaf_predicate(&undeclared, &fields).is_none());
3877        assert!(leaf_predicate(&mismatch, &fields).is_none());
3878        assert!(leaf_predicate(&ne, &fields).is_none());
3879        assert!(leaf_predicate(&exists, &fields).is_none());
3880    }
3881
3882    // ----- durable IVF index recovery (ADR-0025) -----
3883
3884    // The first collection created in a fresh store has id 0.
3885    fn ivf_index_dir(root: &Path) -> std::path::PathBuf {
3886        root.join("collections").join("0000000000").join("index")
3887    }
3888
3889    fn idx_snapshot_files(root: &Path) -> Vec<String> {
3890        let mut v: Vec<String> = std::fs::read_dir(ivf_index_dir(root))
3891            .map(|rd| {
3892                rd.filter_map(std::result::Result::ok)
3893                    .filter_map(|e| e.file_name().to_str().map(str::to_owned))
3894                    .filter(|n| n.starts_with("idx-"))
3895                    .collect()
3896            })
3897            .unwrap_or_default();
3898        v.sort();
3899        v
3900    }
3901
3902    fn nearest(db: &mut Database, q: &[f32]) -> Vec<String> {
3903        db.search("c", q, &SearchParams::default())
3904            .unwrap()
3905            .into_iter()
3906            .map(|m| m.id)
3907            .collect()
3908    }
3909
3910    fn seed_ivf(db: &mut Database, n: u32) {
3911        db.create_collection("c", desc_with(IndexKind::Ivf))
3912            .unwrap();
3913        for i in 0..n {
3914            db.upsert(
3915                "c",
3916                &format!("p{i}"),
3917                &[i as f32, 0.0, 0.0, 0.0],
3918                &json!({}),
3919            )
3920            .unwrap();
3921        }
3922        // The first search builds (and trains) the IVF from the store.
3923        let _ = nearest(db, &[1.0, 0.0, 0.0, 0.0]);
3924    }
3925
3926    #[test]
3927    fn ivf_snapshot_is_written_at_checkpoint() {
3928        let tmp = tempfile::tempdir().unwrap();
3929        let mut db = open(tmp.path());
3930        seed_ivf(&mut db, 40);
3931        db.checkpoint().unwrap();
3932        assert_eq!(idx_snapshot_files(tmp.path()).len(), 1);
3933    }
3934
3935    #[test]
3936    fn ivf_loads_from_snapshot_rather_than_rebuilding() {
3937        let tmp = tempfile::tempdir().unwrap();
3938        {
3939            let mut db = open(tmp.path());
3940            db.create_collection("c", desc_with(IndexKind::Ivf))
3941                .unwrap();
3942            db.upsert("c", "a", &[0.0, 0.0, 0.0, 0.0], &json!({}))
3943                .unwrap();
3944            db.upsert("c", "m", &[1.0, 0.0, 0.0, 0.0], &json!({}))
3945                .unwrap();
3946            // First search builds the IVF — int_to_ext is the sorted scan order.
3947            let _ = nearest(&mut db, &[0.0, 0.0, 0.0, 0.0]);
3948            // Incremental upserts append in insertion order, diverging from sort.
3949            db.upsert("c", "z", &[2.0, 0.0, 0.0, 0.0], &json!({}))
3950                .unwrap();
3951            db.upsert("c", "b", &[3.0, 0.0, 0.0, 0.0], &json!({}))
3952                .unwrap();
3953            db.checkpoint().unwrap();
3954            assert_eq!(db.collections["c"].int_to_ext, ["a", "m", "z", "b"]);
3955        }
3956        let db = open(tmp.path());
3957        // Loaded from the snapshot: the insertion-order mapping is preserved. A
3958        // rebuild would have produced the sorted order ["a", "b", "m", "z"].
3959        assert_eq!(
3960            db.collections["c"].int_to_ext,
3961            ["a", "m", "z", "b"],
3962            "index was rebuilt, not loaded from the snapshot"
3963        );
3964    }
3965
3966    #[test]
3967    fn ivf_recovery_replays_post_checkpoint_upserts() {
3968        let tmp = tempfile::tempdir().unwrap();
3969        {
3970            let mut db = open(tmp.path());
3971            seed_ivf(&mut db, 30);
3972            db.checkpoint().unwrap();
3973            // Post-checkpoint upsert, no further checkpoint.
3974            db.upsert("c", "far", &[500.0, 0.0, 0.0, 0.0], &json!({}))
3975                .unwrap();
3976        }
3977        let mut db = open(tmp.path());
3978        assert_eq!(nearest(&mut db, &[500.0, 0.0, 0.0, 0.0])[0], "far");
3979        assert_eq!(nearest(&mut db, &[1.0, 0.0, 0.0, 0.0])[0], "p1");
3980    }
3981
3982    #[test]
3983    fn ivf_recovery_replays_post_checkpoint_deletes() {
3984        let tmp = tempfile::tempdir().unwrap();
3985        {
3986            let mut db = open(tmp.path());
3987            seed_ivf(&mut db, 30);
3988            db.checkpoint().unwrap();
3989            assert!(db.delete("c", "p7").unwrap());
3990        }
3991        let mut db = open(tmp.path());
3992        assert!(
3993            nearest(&mut db, &[7.0, 0.0, 0.0, 0.0])
3994                .iter()
3995                .all(|id| id != "p7")
3996        );
3997        assert!(db.get("c", "p7").unwrap().is_none());
3998        assert!(db.get("c", "p6").unwrap().is_some());
3999    }
4000
4001    #[test]
4002    fn ivf_recovery_replays_post_checkpoint_updates() {
4003        let tmp = tempfile::tempdir().unwrap();
4004        {
4005            let mut db = open(tmp.path());
4006            seed_ivf(&mut db, 30);
4007            db.checkpoint().unwrap();
4008            // Move p0 far away — an in-place update shadowing its sealed row.
4009            db.upsert("c", "p0", &[999.0, 0.0, 0.0, 0.0], &json!({}))
4010                .unwrap();
4011        }
4012        let mut db = open(tmp.path());
4013        assert_eq!(nearest(&mut db, &[999.0, 0.0, 0.0, 0.0])[0], "p0");
4014        assert_ne!(
4015            nearest(&mut db, &[0.0, 0.0, 0.0, 0.0])[0],
4016            "p0",
4017            "the stale p0 vector survived the update"
4018        );
4019    }
4020
4021    #[test]
4022    fn corrupt_ivf_snapshot_falls_back_to_rebuild() {
4023        let tmp = tempfile::tempdir().unwrap();
4024        {
4025            let mut db = open(tmp.path());
4026            seed_ivf(&mut db, 30);
4027            db.checkpoint().unwrap();
4028        }
4029        // Corrupt the snapshot file; recovery must fall back to a rebuild.
4030        let files = idx_snapshot_files(tmp.path());
4031        assert_eq!(files.len(), 1);
4032        std::fs::write(ivf_index_dir(tmp.path()).join(&files[0]), b"corrupt").unwrap();
4033
4034        let mut db = open(tmp.path());
4035        assert_eq!(nearest(&mut db, &[7.0, 0.0, 0.0, 0.0])[0], "p7");
4036    }
4037
4038    // ---- Multi-vector / late interaction (ColBERT, ADR-0028) ----
4039
4040    fn mv_desc() -> Descriptor {
4041        Descriptor::new(3, Dtype::F32, DistanceMetric::Cosine).with_multivector(true)
4042    }
4043
4044    // Brute-force MaxSim ranking over a corpus: the reference the pipeline must
4045    // reproduce (score desc, ties by id), using the same shared scorer.
4046    fn bf_rank(query: &[Vec<f32>], corpus: &[(&str, Vec<Vec<f32>>)]) -> Vec<(String, f32)> {
4047        let mut v: Vec<(String, f32)> = corpus
4048            .iter()
4049            .map(|(id, toks)| ((*id).to_owned(), max_sim(Metric::Cosine, query, toks)))
4050            .collect();
4051        v.sort_by(|a, b| b.1.total_cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
4052        v
4053    }
4054
4055    #[test]
4056    fn multivector_search_ranks_documents_by_maxsim() {
4057        let tmp = tempfile::tempdir().unwrap();
4058        let mut db = open(tmp.path());
4059        db.create_collection("docs", mv_desc()).unwrap();
4060        let corpus: Vec<(&str, Vec<Vec<f32>>)> = vec![
4061            ("d_cat", vec![vec![1.0, 0.0, 0.0], vec![0.0, 1.0, 0.0]]),
4062            ("d_dog", vec![vec![0.0, 1.0, 0.0], vec![0.0, 0.0, 1.0]]),
4063            (
4064                "d_mix",
4065                vec![
4066                    vec![1.0, 1.0, 0.0],
4067                    vec![0.0, 0.0, 1.0],
4068                    vec![1.0, 0.0, 1.0],
4069                ],
4070            ),
4071        ];
4072        for (id, toks) in &corpus {
4073            db.upsert_document("docs", id, toks, &json!({ "id": id }))
4074                .unwrap();
4075        }
4076        assert_eq!(db.document_count("docs").unwrap(), 3);
4077
4078        let query = vec![vec![1.0, 0.0, 0.0], vec![0.0, 0.0, 1.0]];
4079        let params = SearchParams {
4080            k: 3,
4081            with_payload: false,
4082            ..SearchParams::default()
4083        };
4084        let got = db.search_multi_vector("docs", &query, &params).unwrap();
4085        let expected = bf_rank(&query, &corpus);
4086
4087        assert_eq!(got.len(), 3);
4088        for (g, (eid, escore)) in got.iter().zip(expected.iter()) {
4089            assert_eq!(&g.id, eid, "ranking matches brute force");
4090            assert!(
4091                (g.score - escore).abs() < 1e-5,
4092                "{} score {} vs {escore}",
4093                g.id,
4094                g.score
4095            );
4096        }
4097    }
4098
4099    #[test]
4100    fn multivector_search_truncates_to_k() {
4101        let tmp = tempfile::tempdir().unwrap();
4102        let mut db = open(tmp.path());
4103        db.create_collection("docs", mv_desc()).unwrap();
4104        for i in 0..5 {
4105            let v = vec![vec![1.0, i as f32, 0.0]];
4106            db.upsert_document("docs", &format!("d{i}"), &v, &json!({}))
4107                .unwrap();
4108        }
4109        let params = SearchParams {
4110            k: 2,
4111            ..SearchParams::default()
4112        };
4113        let got = db
4114            .search_multi_vector("docs", &[vec![1.0, 0.0, 0.0]], &params)
4115            .unwrap();
4116        assert_eq!(got.len(), 2);
4117    }
4118
4119    #[test]
4120    fn multivector_filter_selects_documents_exactly() {
4121        let tmp = tempfile::tempdir().unwrap();
4122        let mut db = open(tmp.path());
4123        db.create_collection("docs", mv_desc()).unwrap();
4124        // Identical token sets, so only the filter distinguishes the documents.
4125        db.upsert_document("docs", "a", &[vec![1.0, 0.0, 0.0]], &json!({"lang":"en"}))
4126            .unwrap();
4127        db.upsert_document("docs", "b", &[vec![1.0, 0.0, 0.0]], &json!({"lang":"fr"}))
4128            .unwrap();
4129        let params = SearchParams {
4130            k: 10,
4131            filter: Some(Filter::Eq {
4132                field: "lang".into(),
4133                value: json!("fr"),
4134            }),
4135            ..SearchParams::default()
4136        };
4137        let got = db
4138            .search_multi_vector("docs", &[vec![1.0, 0.0, 0.0]], &params)
4139            .unwrap();
4140        assert_eq!(got.len(), 1);
4141        assert_eq!(got[0].id, "b");
4142        assert_eq!(got[0].payload, Some(json!({"lang":"fr"})));
4143    }
4144
4145    #[test]
4146    fn multivector_reopen_rebuilds_grouping_and_ranking() {
4147        let tmp = tempfile::tempdir().unwrap();
4148        let query = vec![vec![1.0, 0.0, 0.0], vec![0.0, 0.0, 1.0]];
4149        let corpus: Vec<(&str, Vec<Vec<f32>>)> = vec![
4150            ("x", vec![vec![1.0, 0.0, 0.0], vec![0.0, 1.0, 0.0]]),
4151            ("y", vec![vec![0.0, 0.0, 1.0], vec![1.0, 0.0, 1.0]]),
4152        ];
4153        {
4154            let mut db = open(tmp.path());
4155            db.create_collection("docs", mv_desc()).unwrap();
4156            for (id, toks) in &corpus {
4157                db.upsert_document("docs", id, toks, &json!({})).unwrap();
4158            }
4159            db.checkpoint().unwrap();
4160        }
4161        // Reopen: the document grouping is rebuilt from the live rows.
4162        let mut db = open(tmp.path());
4163        assert_eq!(db.document_count("docs").unwrap(), 2);
4164        let params = SearchParams {
4165            k: 2,
4166            ..SearchParams::default()
4167        };
4168        let got = db.search_multi_vector("docs", &query, &params).unwrap();
4169        let expected = bf_rank(&query, &corpus);
4170        assert_eq!(
4171            got.iter().map(|m| m.id.clone()).collect::<Vec<_>>(),
4172            expected
4173                .iter()
4174                .map(|(id, _)| id.clone())
4175                .collect::<Vec<_>>()
4176        );
4177    }
4178
4179    #[test]
4180    fn multivector_delete_document_removes_all_tokens() {
4181        let tmp = tempfile::tempdir().unwrap();
4182        let mut db = open(tmp.path());
4183        db.create_collection("docs", mv_desc()).unwrap();
4184        db.upsert_document(
4185            "docs",
4186            "a",
4187            &[vec![1.0, 0.0, 0.0], vec![0.0, 1.0, 0.0]],
4188            &json!({}),
4189        )
4190        .unwrap();
4191        db.upsert_document("docs", "b", &[vec![0.0, 0.0, 1.0]], &json!({}))
4192            .unwrap();
4193        assert_eq!(db.document_count("docs").unwrap(), 2);
4194        assert_eq!(db.len("docs").unwrap(), 3);
4195
4196        assert!(db.delete_document("docs", "a").unwrap());
4197        assert_eq!(db.document_count("docs").unwrap(), 1);
4198        assert_eq!(db.len("docs").unwrap(), 1);
4199        assert!(db.get_document("docs", "a", false).unwrap().is_none());
4200        let params = SearchParams {
4201            k: 10,
4202            ..SearchParams::default()
4203        };
4204        let got = db
4205            .search_multi_vector("docs", &[vec![1.0, 0.0, 0.0]], &params)
4206            .unwrap();
4207        assert!(got.iter().all(|m| m.id != "a"));
4208        assert!(!db.delete_document("docs", "a").unwrap());
4209    }
4210
4211    #[test]
4212    fn multivector_reupsert_replaces_tokens() {
4213        let tmp = tempfile::tempdir().unwrap();
4214        let mut db = open(tmp.path());
4215        db.create_collection("docs", mv_desc()).unwrap();
4216        db.upsert_document(
4217            "docs",
4218            "a",
4219            &[
4220                vec![1.0, 0.0, 0.0],
4221                vec![0.0, 1.0, 0.0],
4222                vec![0.0, 0.0, 1.0],
4223            ],
4224            &json!({"v":1}),
4225        )
4226        .unwrap();
4227        assert_eq!(db.len("docs").unwrap(), 3);
4228        // Re-upsert with a single token: the trailing two must be gone.
4229        db.upsert_document("docs", "a", &[vec![0.0, 0.0, 1.0]], &json!({"v":2}))
4230            .unwrap();
4231        assert_eq!(db.document_count("docs").unwrap(), 1);
4232        assert_eq!(db.len("docs").unwrap(), 1);
4233        let doc = db.get_document("docs", "a", true).unwrap().unwrap();
4234        assert_eq!(doc.payload, Some(json!({"v":2})));
4235        assert_eq!(doc.vectors, Some(vec![vec![0.0, 0.0, 1.0]]));
4236    }
4237
4238    #[test]
4239    fn single_and_multi_vector_apis_are_mutually_exclusive() {
4240        let tmp = tempfile::tempdir().unwrap();
4241        let mut db = open(tmp.path());
4242        db.create_collection("mv", mv_desc()).unwrap();
4243        db.create_collection("sv", Descriptor::new(3, Dtype::F32, DistanceMetric::Cosine))
4244            .unwrap();
4245        // Single-vector ops on a multi-vector collection are rejected.
4246        assert!(matches!(
4247            db.upsert("mv", "a", &[1.0, 0.0, 0.0], &json!({})),
4248            Err(Error::Unsupported(_))
4249        ));
4250        assert!(matches!(
4251            db.search("mv", &[1.0, 0.0, 0.0], &SearchParams::default()),
4252            Err(Error::Unsupported(_))
4253        ));
4254        // Document ops on a single-vector collection are rejected.
4255        assert!(matches!(
4256            db.upsert_document("sv", "a", &[vec![1.0, 0.0, 0.0]], &json!({})),
4257            Err(Error::Unsupported(_))
4258        ));
4259        assert!(matches!(
4260            db.search_multi_vector("sv", &[vec![1.0, 0.0, 0.0]], &SearchParams::default()),
4261            Err(Error::Unsupported(_))
4262        ));
4263        assert!(matches!(
4264            db.document_count("sv"),
4265            Err(Error::Unsupported(_))
4266        ));
4267    }
4268
4269    #[test]
4270    fn multivector_rejects_l2_metric_and_bad_documents() {
4271        let tmp = tempfile::tempdir().unwrap();
4272        let mut db = open(tmp.path());
4273        let l2 = Descriptor::new(3, Dtype::F32, DistanceMetric::L2).with_multivector(true);
4274        assert!(matches!(
4275            db.create_collection("bad", l2),
4276            Err(Error::Unsupported(_))
4277        ));
4278
4279        db.create_collection("docs", mv_desc()).unwrap();
4280        // A document id may not contain the reserved separator.
4281        assert!(matches!(
4282            db.upsert_document("docs", "a\u{1f}b", &[vec![1.0, 0.0, 0.0]], &json!({})),
4283            Err(Error::Unsupported(_))
4284        ));
4285        // A document needs at least one vector, of the right dimensionality.
4286        assert!(matches!(
4287            db.upsert_document("docs", "a", &[], &json!({})),
4288            Err(Error::Unsupported(_))
4289        ));
4290        assert!(matches!(
4291            db.upsert_document("docs", "a", &[vec![1.0, 0.0]], &json!({})),
4292            Err(Error::Unsupported(_))
4293        ));
4294    }
4295
4296    #[test]
4297    fn snapshot_then_open_reproduces_the_database() {
4298        let src = tempfile::tempdir().unwrap();
4299        let mut db = open(src.path());
4300        db.create_collection("kb", desc()).unwrap();
4301        db.create_collection("kb2", desc()).unwrap();
4302        db.upsert("kb", "a", &[1.0, 0.0, 0.0, 0.0], &json!({ "n": 1 }))
4303            .unwrap();
4304        db.upsert("kb", "b", &[0.0, 1.0, 0.0, 0.0], &json!({ "n": 2 }))
4305            .unwrap();
4306        db.upsert("kb2", "z", &[0.0, 0.0, 1.0, 0.0], &json!({ "n": 3 }))
4307            .unwrap();
4308
4309        let dest = tempfile::tempdir().unwrap();
4310        let snap_dir = dest.path().join("snap");
4311        let info = db.snapshot(&snap_dir).unwrap();
4312        assert!(info.files > 0 && info.bytes > 0);
4313        assert_eq!(info.manifest_version, db.manifest_version());
4314
4315        // A write after the snapshot must not appear in the snapshot.
4316        db.upsert("kb", "late", &[1.0, 1.0, 0.0, 0.0], &json!({ "n": 9 }))
4317            .unwrap();
4318
4319        let restored = open(&snap_dir);
4320        let mut names = restored.collection_names();
4321        names.sort();
4322        assert_eq!(names, vec!["kb".to_owned(), "kb2".to_owned()]);
4323        assert_eq!(restored.len("kb").unwrap(), 2, "no post-snapshot write");
4324        assert_eq!(
4325            restored.get("kb", "a").unwrap().unwrap().payload,
4326            Some(json!({ "n": 1 }))
4327        );
4328        assert_eq!(restored.len("kb2").unwrap(), 1);
4329        assert!(restored.get("kb", "late").unwrap().is_none());
4330    }
4331
4332    #[test]
4333    fn snapshot_refuses_an_existing_destination() {
4334        let src = tempfile::tempdir().unwrap();
4335        let mut db = open(src.path());
4336        let dest = tempfile::tempdir().unwrap(); // already exists
4337        assert!(matches!(
4338            db.snapshot(dest.path()),
4339            Err(Error::Core(quiver_core::CoreError::AlreadyExists(_)))
4340        ));
4341    }
4342
4343    #[test]
4344    fn restore_snapshot_roundtrips_and_guards() {
4345        let src = tempfile::tempdir().unwrap();
4346        let mut db = open(src.path());
4347        db.create_collection("kb", desc()).unwrap();
4348        db.upsert("kb", "a", &[1.0, 0.0, 0.0, 0.0], &json!({ "n": 1 }))
4349            .unwrap();
4350        let work = tempfile::tempdir().unwrap();
4351        let snap_dir = work.path().join("snap");
4352        db.snapshot(&snap_dir).unwrap();
4353
4354        // Restore into a fresh directory, then open it.
4355        let restored_dir = work.path().join("restored");
4356        let info = restore_snapshot(&snap_dir, &restored_dir).unwrap();
4357        assert!(info.files > 0);
4358        let restored = open(&restored_dir);
4359        assert_eq!(restored.len("kb").unwrap(), 1);
4360
4361        // Restoring over an existing directory is refused.
4362        assert!(matches!(
4363            restore_snapshot(&snap_dir, &restored_dir),
4364            Err(Error::Core(quiver_core::CoreError::AlreadyExists(_)))
4365        ));
4366        // A directory that is not a snapshot (no CURRENT) is rejected.
4367        let not_snap = work.path().join("not-a-snapshot");
4368        std::fs::create_dir_all(&not_snap).unwrap();
4369        assert!(matches!(
4370            restore_snapshot(&not_snap, &work.path().join("out")),
4371            Err(Error::Core(quiver_core::CoreError::InvalidArgument(_)))
4372        ));
4373    }
4374}