use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::path::Path;
use std::sync::Arc;
use arc_swap::ArcSwap;
use quiver_core::{SecPredicate, SecValue, Store};
use quiver_index::{
ColbertConfig, ColbertIndex, DiskVamana, FreshDiskVamana, FreshVamana, Hnsw, HnswConfig, Index,
Ivf, IvfConfig, Metric, Neighbor, ProductQuantizer, Vamana, VamanaConfig, max_sim,
ordering_distance, report_metric,
};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use thiserror::Error;
pub use quiver_core::keyring::{KeyRing, SingleCodecKeyRing};
pub use quiver_core::page::PageCodec;
pub use quiver_core::{CollectionId, CommitObserver, WalEntry, WalOp};
pub use quiver_core::{
Descriptor, DistanceMetric, Dtype, FieldType, FilterableField, IndexKind, IndexSpec,
VectorEncryption,
};
pub use quiver_query::Filter;
pub use quiver_query::{
BM25_B, BM25_K1, DEFAULT_RRF_K0, SPARSE_KEY, SparseInvertedIndex, SparseVector, TEXT_KEY,
query_term_ids, rrf_fuse, text_to_sparse,
};
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum Error {
#[error(transparent)]
Core(#[from] quiver_core::CoreError),
#[error(transparent)]
Index(#[from] quiver_index::IndexError),
#[error(transparent)]
Disk(#[from] quiver_index::DiskError),
#[error("payload json error: {0}")]
Json(#[from] serde_json::Error),
#[error("collection not found: {0}")]
CollectionNotFound(String),
#[error("unsupported configuration: {0}")]
Unsupported(&'static str),
#[error(transparent)]
IndexSnapshot(#[from] quiver_index::SnapshotError),
#[error("index snapshot envelope: {0}")]
Envelope(#[from] postcard::Error),
}
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct SnapshotInfo {
pub manifest_version: u64,
pub files: u64,
pub bytes: u64,
}
#[derive(Debug, Clone, PartialEq)]
pub struct Match {
pub id: String,
pub score: f32,
pub payload: Option<Value>,
pub vector: Option<Vec<f32>>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct DocumentMatch {
pub id: String,
pub score: f32,
pub payload: Option<Value>,
pub vectors: Option<Vec<Vec<f32>>>,
}
type ScoredDocument = (f32, String, Option<Value>, Option<Vec<Vec<f32>>>);
#[derive(Debug, Clone)]
pub struct SearchParams {
pub k: usize,
pub filter: Option<Filter>,
pub ef_search: usize,
pub with_payload: bool,
pub with_vector: bool,
}
impl Default for SearchParams {
fn default() -> Self {
Self {
k: 10,
filter: None,
ef_search: 64,
with_payload: true,
with_vector: false,
}
}
}
const FILTER_OVERFETCH: usize = 8;
const RRF_CANDIDATE_FACTOR: usize = 10;
const MIN_RRF_CANDIDATES: usize = 100;
const FULL_SCAN_THRESHOLD: usize = 10_000;
const HNSW_REBUILD_DELETED_FRACTION: f64 = 0.2;
const GRAPH_REBUILD_PENDING_FRACTION: f64 = 0.2;
enum CollectionIndex {
None,
Hnsw(Hnsw),
Vamana(Option<FreshVamana>),
Ivf(Option<Ivf>),
Disk(Option<FreshDiskVamana>),
Colbert(Option<ColbertIndex>),
}
impl CollectionIndex {
fn search(&self, query: &[f32], k: usize, ef: usize) -> Result<Vec<Neighbor>> {
Ok(match self {
CollectionIndex::Hnsw(h) => h.search(query, k, ef)?,
CollectionIndex::Vamana(Some(g)) => g.search(query, k, ef)?,
CollectionIndex::Ivf(Some(i)) => i.search(query, k, ef)?,
CollectionIndex::Disk(Some(d)) => d.search(query, k, ef)?,
CollectionIndex::Colbert(Some(c)) => c.search(query, k, ef)?,
CollectionIndex::None
| CollectionIndex::Vamana(None)
| CollectionIndex::Ivf(None)
| CollectionIndex::Disk(None)
| CollectionIndex::Colbert(None) => Vec::new(),
})
}
}
pub type SnapshotCell = Arc<ArcSwap<CollectionSnapshot>>;
#[derive(Default, Clone)]
struct Overlay {
upserts: Vec<(Arc<[f32]>, String)>,
tombstones: HashSet<u64>,
}
pub struct CollectionSnapshot {
base: Arc<CollectionIndex>,
base_int_to_ext: Arc<Vec<String>>,
base_len: u64,
overlay: Arc<Overlay>,
metric: Metric,
}
impl CollectionSnapshot {
fn empty(metric: Metric) -> Self {
Self {
base: Arc::new(CollectionIndex::None),
base_int_to_ext: Arc::new(Vec::new()),
base_len: 0,
overlay: Arc::new(Overlay::default()),
metric,
}
}
fn ext_id(&self, internal: u64) -> Option<&str> {
if internal < self.base_len {
self.base_int_to_ext
.get(internal as usize)
.map(String::as_str)
} else {
self.overlay
.upserts
.get((internal - self.base_len) as usize)
.map(|(_, e)| e.as_str())
}
}
pub fn search(&self, query: &[f32], k: usize, ef_search: usize) -> Result<Vec<Match>> {
if k == 0 {
return Ok(Vec::new());
}
let mut cands: Vec<(f32, u64)> = Vec::new();
for n in self.base.search(query, k, ef_search)? {
if !self.overlay.tombstones.contains(&n.id) {
cands.push((report_metric(self.metric, n.distance), n.id));
}
}
for (j, (vector, _)) in self.overlay.upserts.iter().enumerate() {
let internal = self.base_len + j as u64;
if !self.overlay.tombstones.contains(&internal) {
cands.push((ordering_distance(self.metric, query, vector), internal));
}
}
cands.sort_by(|a, b| a.0.total_cmp(&b.0));
cands.truncate(k);
let mut out = Vec::with_capacity(cands.len());
for (ordering, internal) in cands {
if let Some(ext) = self.ext_id(internal) {
out.push(Match {
id: ext.to_owned(),
score: report_metric(self.metric, ordering),
payload: None,
vector: None,
});
}
}
Ok(out)
}
}
fn empty_snapshot(descriptor: &Descriptor) -> SnapshotCell {
let metric = to_index_metric(descriptor.metric);
Arc::new(ArcSwap::from_pointee(CollectionSnapshot::empty(metric)))
}
fn mvcc_eligible(descriptor: &Descriptor) -> bool {
!descriptor.multivector
&& descriptor.vector_encryption != VectorEncryption::ClientSide
&& descriptor.index.kind != IndexKind::DiskVamana
}
fn mvcc_served(handle: &CollectionHandle) -> bool {
handle.mvcc && mvcc_eligible(&handle.descriptor)
}
fn publish_overlay(handle: &CollectionHandle, prior: &CollectionSnapshot, overlay: Arc<Overlay>) {
handle.snapshot.store(Arc::new(CollectionSnapshot {
base: prior.base.clone(),
base_int_to_ext: prior.base_int_to_ext.clone(),
base_len: prior.base_len,
overlay,
metric: prior.metric,
}));
}
fn publish_base(handle: &mut CollectionHandle) {
let base = std::mem::replace(&mut handle.index, empty_index(&handle.descriptor));
let metric = to_index_metric(handle.descriptor.metric);
handle.snapshot.store(Arc::new(CollectionSnapshot {
base: Arc::new(base),
base_int_to_ext: Arc::new(handle.int_to_ext.clone()),
base_len: handle.int_to_ext.len() as u64,
overlay: Arc::new(Overlay::default()),
metric,
}));
}
fn overlay_rebuild_threshold(base_len: u64) -> u64 {
(base_len / 5).max(1024)
}
fn overlay_upsert(handle: &mut CollectionHandle, ext_id: &str, vector: &[f32]) {
bump_write_gen(handle);
let cur = handle.snapshot.load_full();
let mut overlay = cur.overlay.as_ref().clone();
if let Some(&old) = handle.ext_to_int.get(ext_id) {
overlay.tombstones.insert(old);
}
let internal = cur.base_len + overlay.upserts.len() as u64;
overlay.upserts.push((Arc::from(vector), ext_id.to_owned()));
handle.ext_to_int.insert(ext_id.to_owned(), internal);
handle.int_to_ext.push(ext_id.to_owned());
let crowded = overlay.upserts.len() as u64 >= overlay_rebuild_threshold(cur.base_len);
publish_overlay(handle, &cur, Arc::new(overlay));
if crowded {
handle.stale = true;
}
}
fn overlay_delete(handle: &mut CollectionHandle, ext_id: &str) {
bump_write_gen(handle);
let Some(&internal) = handle.ext_to_int.get(ext_id) else {
return;
};
let cur = handle.snapshot.load_full();
let mut overlay = cur.overlay.as_ref().clone();
overlay.tombstones.insert(internal);
publish_overlay(handle, &cur, Arc::new(overlay));
}
#[derive(Serialize, Deserialize)]
struct IndexEnvelope {
version: u16,
int_to_ext: Vec<String>,
ivf: Vec<u8>,
}
const INDEX_ENVELOPE_VERSION: u16 = 1;
#[derive(Serialize, Deserialize)]
struct DiskEnvelope {
version: u16,
int_to_ext: Vec<String>,
base_row_count: u64,
deleted_ids: Vec<u64>,
}
struct CollectionHandle {
id: CollectionId,
descriptor: Descriptor,
index: CollectionIndex,
int_to_ext: Vec<String>,
ext_to_int: HashMap<String, u64>,
stale: bool,
write_gen: u64,
docs: Option<BTreeMap<String, u32>>,
sparse: Option<SparseInvertedIndex>,
mvcc: bool,
snapshot: SnapshotCell,
}
fn uses_sparse_index(descriptor: &Descriptor) -> bool {
!descriptor.multivector && descriptor.vector_encryption != VectorEncryption::ClientSide
}
fn mark_stale(handle: &mut CollectionHandle) {
handle.stale = true;
bump_write_gen(handle);
}
fn bump_write_gen(handle: &mut CollectionHandle) {
handle.write_gen = handle.write_gen.wrapping_add(1);
}
pub struct Database {
store: Store,
collections: HashMap<String, CollectionHandle>,
mvcc: bool,
}
fn mvcc_from_env() -> bool {
std::env::var("QUIVER_MVCC_READS")
.map(|v| matches!(v.as_str(), "1" | "true" | "TRUE" | "on" | "yes"))
.unwrap_or(false)
}
impl Database {
pub fn open(dir: &Path) -> Result<Self> {
Self::from_store(Store::open(dir)?)
}
pub fn open_with_codec(dir: &Path, codec: Box<dyn PageCodec>) -> Result<Self> {
Self::from_store(Store::open_with_codec(dir, codec)?)
}
pub fn open_with_keyring(dir: &Path, keyring: Box<dyn KeyRing>) -> Result<Self> {
Self::from_store(Store::open_with_keyring(dir, keyring)?)
}
fn from_store(store: Store) -> Result<Self> {
let mvcc = mvcc_from_env();
let mut collections = HashMap::new();
for name in store.collection_names() {
let Some(id) = store.collection_id(&name) else {
continue;
};
let Some(descriptor) = store.descriptor(id).cloned() else {
continue;
};
let snapshot = empty_snapshot(&descriptor);
let mut handle = CollectionHandle {
id,
index: empty_index(&descriptor),
descriptor,
int_to_ext: Vec::new(),
ext_to_int: HashMap::new(),
stale: true,
write_gen: 0,
docs: None,
sparse: None,
mvcc,
snapshot,
};
load_index(&store, &mut handle)?;
if mvcc_served(&handle) {
handle.stale = true;
}
collections.insert(name, handle);
}
Ok(Self {
store,
collections,
mvcc,
})
}
pub fn create_collection(&mut self, name: &str, descriptor: Descriptor) -> Result<()> {
validate_index(&descriptor)?;
let id = self.store.create_collection(name, descriptor.clone())?;
let index = empty_index(&descriptor);
let docs = descriptor.multivector.then(BTreeMap::new);
let sparse = uses_sparse_index(&descriptor).then(SparseInvertedIndex::new);
let snapshot = empty_snapshot(&descriptor);
let mvcc = self.mvcc;
self.collections.insert(
name.to_owned(),
CollectionHandle {
id,
descriptor,
index,
int_to_ext: Vec::new(),
ext_to_int: HashMap::new(),
stale: false,
write_gen: 0,
docs,
sparse,
mvcc,
snapshot,
},
);
Ok(())
}
pub fn drop_collection(&mut self, name: &str) -> Result<bool> {
let existed = self.store.drop_collection(name)?;
self.collections.remove(name);
Ok(existed)
}
pub fn shred_collection(&mut self, name: &str) -> Result<bool> {
let existed = self.store.shred_collection(name)?;
self.collections.remove(name);
Ok(existed)
}
pub fn set_commit_observer(&mut self, observer: CommitObserver) {
self.store.set_commit_observer(observer);
}
pub fn replication_snapshot(&self) -> Result<Vec<WalOp>> {
Ok(self.store.replication_snapshot()?)
}
pub fn apply_replicated(&mut self, op: WalOp) -> Result<()> {
let target = match &op {
WalOp::CreateCollection { collection_id, .. }
| WalOp::DropCollection { collection_id }
| WalOp::Upsert { collection_id, .. }
| WalOp::Delete { collection_id, .. } => Some(*collection_id),
WalOp::Checkpoint { .. } => None,
};
let create_name = match &op {
WalOp::CreateCollection { name, .. } => Some(name.clone()),
_ => None,
};
let is_drop = matches!(op, WalOp::DropCollection { .. });
self.store.apply_replicated(op)?;
if let Some(name) = create_name {
if let Some(id) = target
&& let Some(descriptor) = self.store.descriptor(id).cloned()
{
let docs = descriptor.multivector.then(BTreeMap::new);
let index = empty_index(&descriptor);
let snapshot = empty_snapshot(&descriptor);
let mvcc = self.mvcc;
self.collections.insert(
name,
CollectionHandle {
id,
descriptor,
index,
int_to_ext: Vec::new(),
ext_to_int: HashMap::new(),
stale: false,
write_gen: 0,
docs,
sparse: None,
mvcc,
snapshot,
},
);
}
} else if is_drop {
if let Some(id) = target {
self.collections.retain(|_, h| h.id != id);
}
} else if let Some(id) = target
&& let Some(handle) = self.collections.values_mut().find(|h| h.id == id)
{
mark_stale(handle);
}
Ok(())
}
#[must_use]
pub fn collection_names(&self) -> Vec<String> {
self.store.collection_names()
}
#[must_use]
pub fn descriptor(&self, name: &str) -> Option<&Descriptor> {
self.collections.get(name).map(|h| &h.descriptor)
}
pub fn len(&self, name: &str) -> Result<usize> {
let handle = self.handle(name)?;
Ok(self.store.len(handle.id)?)
}
pub fn is_empty(&self, name: &str) -> Result<bool> {
Ok(self.len(name)? == 0)
}
pub fn upsert(
&mut self,
collection: &str,
id: &str,
vector: &[f32],
payload: &Value,
) -> Result<()> {
let handle = self
.collections
.get_mut(collection)
.ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
require_single_vector(handle)?;
let payload_bytes = serde_json::to_vec(payload)?;
self.store.upsert(handle.id, id, vector, &payload_bytes)?;
if handle.descriptor.vector_encryption == VectorEncryption::ClientSide {
return Ok(());
}
index_upsert_point(handle, id, vector)?;
sparse_index_upsert_point(handle, id, payload);
Ok(())
}
pub fn upsert_batch(
&mut self,
collection: &str,
points: &[(&str, &[f32], &serde_json::Value)],
) -> Result<u64> {
let handle = self
.collections
.get(collection)
.ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
require_single_vector(handle)?;
let coll_id = handle.id;
let is_client_side = handle.descriptor.vector_encryption == VectorEncryption::ClientSide;
let payload_bytes: Vec<Vec<u8>> = points
.iter()
.map(|(_, _, p)| serde_json::to_vec(p).map_err(Error::Json))
.collect::<Result<_>>()?;
let records: Vec<(&str, &[f32], &[u8])> = points
.iter()
.zip(payload_bytes.iter())
.map(|((id, vec, _), p)| (*id, *vec, p.as_slice()))
.collect();
self.store.upsert_batch(coll_id, &records)?;
if is_client_side {
return Ok(records.len() as u64);
}
for (id, vector, payload) in points {
let handle = self
.collections
.get_mut(collection)
.ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
index_upsert_point(handle, id, vector)?;
sparse_index_upsert_point(handle, id, payload);
}
Ok(records.len() as u64)
}
pub fn upsert_bulk(
&mut self,
collection: &str,
points: &[(&str, &[f32], &serde_json::Value)],
) -> Result<u64> {
let handle = self
.collections
.get_mut(collection)
.ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
require_single_vector(handle)?;
let coll_id = handle.id;
let payload_bytes: Vec<Vec<u8>> = points
.iter()
.map(|(_, _, p)| serde_json::to_vec(p).map_err(Error::Json))
.collect::<Result<_>>()?;
let records: Vec<(&str, &[f32], &[u8])> = points
.iter()
.zip(payload_bytes.iter())
.map(|((id, vec, _), p)| (*id, *vec, p.as_slice()))
.collect();
self.store.upsert_batch(coll_id, &records)?;
let handle = self
.collections
.get_mut(collection)
.ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
mark_stale(handle);
Ok(records.len() as u64)
}
pub fn delete(&mut self, collection: &str, id: &str) -> Result<bool> {
let handle = self
.collections
.get_mut(collection)
.ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
require_single_vector(handle)?;
let existed = self.store.delete(handle.id, id)?;
if !existed {
return Ok(false);
}
if handle.descriptor.vector_encryption == VectorEncryption::ClientSide {
return Ok(true);
}
index_delete_point(handle, id);
sparse_index_delete_point(handle, id);
Ok(true)
}
pub fn get(&self, collection: &str, id: &str) -> Result<Option<Match>> {
let handle = self.handle(collection)?;
require_single_vector(handle)?;
match self.store.get(handle.id, id)? {
Some(record) => Ok(Some(Match {
id: id.to_owned(),
score: 0.0,
payload: Some(serde_json::from_slice(&record.payload)?),
vector: Some(record.vector),
})),
None => Ok(None),
}
}
pub fn fetch(
&self,
collection: &str,
filter: Option<&Filter>,
limit: usize,
with_payload: bool,
with_vector: bool,
) -> Result<Vec<Match>> {
let handle = self.handle(collection)?;
require_single_vector(handle)?;
let mut out = Vec::new();
for (id, record) in self.store.scan(handle.id)? {
if out.len() >= limit {
break;
}
let payload: Value = serde_json::from_slice(&record.payload)?;
if let Some(filter) = filter
&& !filter.matches(&payload)
{
continue;
}
out.push(Match {
id,
score: 0.0,
payload: with_payload.then_some(payload),
vector: with_vector.then_some(record.vector),
});
}
Ok(out)
}
pub fn ensure_indexed(&mut self, collection: &str) -> Result<()> {
if self.handle(collection)?.stale {
let store = &self.store;
let handle = self
.collections
.get_mut(collection)
.ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
rebuild_index(store, handle)?;
if mvcc_served(handle) {
publish_base(handle);
}
}
Ok(())
}
pub fn set_mvcc_reads(&mut self, on: bool) {
self.mvcc = on;
for handle in self.collections.values_mut() {
handle.mvcc = on;
if mvcc_eligible(&handle.descriptor) {
handle.stale = true;
}
}
}
#[must_use]
pub fn mvcc_reads(&self) -> bool {
self.mvcc
}
pub fn collection_snapshot(&self, collection: &str) -> Result<SnapshotCell> {
Ok(self.handle(collection)?.snapshot.clone())
}
pub fn mvcc_cell(&self, collection: &str) -> Result<Option<SnapshotCell>> {
let handle = self.handle(collection)?;
Ok(mvcc_served(handle).then(|| handle.snapshot.clone()))
}
pub fn needs_rebuild(&self, collection: &str) -> Result<bool> {
Ok(self.handle(collection)?.stale)
}
pub fn snapshot_rebuild_inputs(&self, collection: &str) -> Result<Option<RebuildInputs>> {
let handle = self.handle(collection)?;
if !handle.stale {
return Ok(None);
}
let scan = scan_collection(&self.store, handle)?;
Ok(Some(RebuildInputs {
collection: collection.to_owned(),
descriptor: handle.descriptor.clone(),
scan,
write_gen: handle.write_gen,
}))
}
pub fn commit_rebuild(&mut self, rebuilt: RebuiltIndex) -> Result<bool> {
let store = &self.store;
let Some(handle) = self.collections.get_mut(&rebuilt.collection) else {
return Ok(false);
};
match rebuilt.kind {
RebuiltKind::Ready(index) => handle.index = *index,
RebuiltKind::Disk { graph, pq } => {
handle.index = empty_index(&handle.descriptor);
let disk = write_disk_index(store, handle.id, &graph, &pq)?;
handle.index = CollectionIndex::Disk(Some(FreshDiskVamana::new(disk)?));
}
}
handle.int_to_ext = rebuilt.int_to_ext;
handle.ext_to_int = rebuilt.ext_to_int;
handle.docs = rebuilt.docs;
handle.sparse = rebuilt.sparse;
let still_stale = handle.write_gen != rebuilt.write_gen;
handle.stale = still_stale;
if mvcc_served(handle) {
publish_base(handle);
}
Ok(still_stale)
}
pub fn search(
&mut self,
collection: &str,
query: &[f32],
params: &SearchParams,
) -> Result<Vec<Match>> {
self.ensure_indexed(collection)?;
self.search_snapshot(collection, query, params)
}
pub fn search_snapshot(
&self,
collection: &str,
query: &[f32],
params: &SearchParams,
) -> Result<Vec<Match>> {
require_single_vector(self.handle(collection)?)?;
require_server_searchable(self.handle(collection)?)?;
let handle = self.handle(collection)?;
if mvcc_served(handle) {
return self.search_snapshot_mvcc(handle, query, params);
}
if let Some(filter) = ¶ms.filter
&& let Some(candidates) = candidate_ids(
&self.store,
handle.id,
filter,
&handle.descriptor.filterable,
)?
&& candidates.len() <= FULL_SCAN_THRESHOLD
{
return self.exact_filtered_search(
handle.id,
&handle.descriptor,
query,
params,
filter,
&candidates,
);
}
let fetch = if params.filter.is_some() {
params
.k
.saturating_mul(FILTER_OVERFETCH)
.max(params.ef_search)
} else {
params.k
};
let raw = handle.index.search(query, fetch, params.ef_search)?;
let need_record = params.filter.is_some() || params.with_payload || params.with_vector;
let mut out = Vec::with_capacity(params.k);
for neighbor in raw {
if out.len() >= params.k {
break;
}
let Some(ext_id) = handle.int_to_ext.get(neighbor.id as usize) else {
continue;
};
let record = if need_record {
self.store.get(handle.id, ext_id)?
} else {
None
};
let payload_value: Option<Value> = match &record {
Some(r) if params.filter.is_some() || params.with_payload => {
Some(serde_json::from_slice(&r.payload)?)
}
_ => None,
};
if let Some(filter) = ¶ms.filter {
let value = payload_value.as_ref().unwrap_or(&Value::Null);
if !filter.matches(value) {
continue;
}
}
out.push(Match {
id: ext_id.clone(),
score: neighbor.distance,
payload: if params.with_payload {
payload_value
} else {
None
},
vector: if params.with_vector {
record.map(|r| r.vector)
} else {
None
},
});
}
Ok(out)
}
fn search_snapshot_mvcc(
&self,
handle: &CollectionHandle,
query: &[f32],
params: &SearchParams,
) -> Result<Vec<Match>> {
if let Some(filter) = ¶ms.filter
&& let Some(candidates) = candidate_ids(
&self.store,
handle.id,
filter,
&handle.descriptor.filterable,
)?
&& candidates.len() <= FULL_SCAN_THRESHOLD
{
return self.exact_filtered_search(
handle.id,
&handle.descriptor,
query,
params,
filter,
&candidates,
);
}
let fetch = if params.filter.is_some() {
params
.k
.saturating_mul(FILTER_OVERFETCH)
.max(params.ef_search)
} else {
params.k
};
let dense = handle
.snapshot
.load()
.search(query, fetch, params.ef_search)?;
let need_record = params.filter.is_some() || params.with_payload || params.with_vector;
let mut out = Vec::with_capacity(params.k);
for m in dense {
if out.len() >= params.k {
break;
}
let record = if need_record {
self.store.get(handle.id, &m.id)?
} else {
None
};
let payload_value: Option<Value> = match &record {
Some(r) if params.filter.is_some() || params.with_payload => {
Some(serde_json::from_slice(&r.payload)?)
}
_ => None,
};
if let Some(filter) = ¶ms.filter
&& !filter.matches(payload_value.as_ref().unwrap_or(&Value::Null))
{
continue;
}
out.push(Match {
id: m.id,
score: m.score,
payload: if params.with_payload {
payload_value
} else {
None
},
vector: if params.with_vector {
record.map(|r| r.vector)
} else {
None
},
});
}
Ok(out)
}
pub fn hybrid_search(
&mut self,
collection: &str,
dense_query: Option<&[f32]>,
sparse_query: Option<&SparseVector>,
text_query: Option<&str>,
params: &SearchParams,
rrf_k0: f32,
) -> Result<Vec<Match>> {
self.ensure_indexed(collection)?;
self.hybrid_search_snapshot(
collection,
dense_query,
sparse_query,
text_query,
params,
rrf_k0,
)
}
pub fn hybrid_search_snapshot(
&self,
collection: &str,
dense_query: Option<&[f32]>,
sparse_query: Option<&SparseVector>,
text_query: Option<&str>,
params: &SearchParams,
rrf_k0: f32,
) -> Result<Vec<Match>> {
require_single_vector(self.handle(collection)?)?;
require_server_searchable(self.handle(collection)?)?;
if dense_query.is_none() && sparse_query.is_none() && text_query.is_none() {
return Err(Error::Unsupported(
"hybrid_search requires a dense query, a sparse query, or a text query",
));
}
let handle = self.handle(collection)?;
let depth = params
.k
.saturating_mul(RRF_CANDIDATE_FACTOR)
.max(MIN_RRF_CANDIDATES);
let filter = params.filter.as_ref();
let mut lists: Vec<Vec<String>> = Vec::new();
if let Some(q) = dense_query {
lists.push(self.dense_ranked_ids(handle, q, depth, params.ef_search, filter)?);
}
if let Some(sp) = sparse_query {
lists.push(self.sparse_ranked_ids(handle, sp, depth, filter)?);
}
if let Some(text) = text_query {
lists.push(self.bm25_ranked_ids(handle, text, depth, filter)?);
}
let fused = rrf_fuse(&lists, rrf_k0, params.k);
let mut out = Vec::with_capacity(fused.len());
for (ext_id, score) in fused {
let record = if params.with_payload || params.with_vector {
self.store.get(handle.id, &ext_id)?
} else {
None
};
let payload = match (&record, params.with_payload) {
(Some(r), true) => Some(serde_json::from_slice(&r.payload)?),
_ => None,
};
out.push(Match {
id: ext_id,
score,
payload,
vector: if params.with_vector {
record.map(|r| r.vector)
} else {
None
},
});
}
Ok(out)
}
fn dense_ranked_ids(
&self,
handle: &CollectionHandle,
query: &[f32],
depth: usize,
ef_search: usize,
filter: Option<&Filter>,
) -> Result<Vec<String>> {
let mut ids = Vec::new();
if mvcc_served(handle) {
for m in handle
.snapshot
.load()
.search(query, depth, ef_search.max(depth))?
{
if !self.passes_filter(handle.id, &m.id, filter)? {
continue;
}
ids.push(m.id);
if ids.len() >= depth {
break;
}
}
return Ok(ids);
}
let raw = handle.index.search(query, depth, ef_search.max(depth))?;
for neighbor in raw {
let Some(ext_id) = handle.int_to_ext.get(neighbor.id as usize) else {
continue;
};
if !self.passes_filter(handle.id, ext_id, filter)? {
continue;
}
ids.push(ext_id.clone());
if ids.len() >= depth {
break;
}
}
Ok(ids)
}
fn sparse_ranked_ids(
&self,
handle: &CollectionHandle,
query: &SparseVector,
depth: usize,
filter: Option<&Filter>,
) -> Result<Vec<String>> {
if let Some(idx) = handle.sparse.as_ref() {
let mut ids = Vec::new();
for (ext_id, _score) in idx.search(query) {
if !self.passes_filter(handle.id, &ext_id, filter)? {
continue;
}
ids.push(ext_id);
if ids.len() >= depth {
break;
}
}
return Ok(ids);
}
self.sparse_ranked_ids_by_scan(handle.id, query, depth, filter)
}
fn sparse_ranked_ids_by_scan(
&self,
cid: CollectionId,
query: &SparseVector,
depth: usize,
filter: Option<&Filter>,
) -> Result<Vec<String>> {
let qmap: HashMap<u32, f32> = query
.indices
.iter()
.copied()
.zip(query.values.iter().copied())
.collect();
let mut scored: Vec<(f32, String)> = Vec::new();
for (ext_id, record) in self.store.scan(cid)? {
if record.payload.is_empty() {
continue;
}
let Ok(value) = serde_json::from_slice::<Value>(&record.payload) else {
continue;
};
if let Some(filter) = filter
&& !filter.matches(&value)
{
continue;
}
let Some(raw) = value.get(SPARSE_KEY) else {
continue;
};
let Ok(sv) = serde_json::from_value::<SparseVector>(raw.clone()) else {
continue;
};
let mut score = 0.0f32;
for (dim, weight) in sv.indices.iter().zip(sv.values.iter()) {
if let Some(qw) = qmap.get(dim) {
score += qw * weight;
}
}
if score > 0.0 {
scored.push((score, ext_id));
}
}
scored.sort_by(|a, b| b.0.total_cmp(&a.0).then(a.1.cmp(&b.1)));
Ok(scored.into_iter().take(depth).map(|(_, id)| id).collect())
}
fn bm25_ranked_ids(
&self,
handle: &CollectionHandle,
query_text: &str,
depth: usize,
filter: Option<&Filter>,
) -> Result<Vec<String>> {
let Some(idx) = handle.sparse.as_ref() else {
return Ok(Vec::new());
};
let terms = query_term_ids(query_text);
let mut ids = Vec::new();
for (ext_id, _score) in idx.bm25_search(&terms, BM25_K1, BM25_B) {
if !self.passes_filter(handle.id, &ext_id, filter)? {
continue;
}
ids.push(ext_id);
if ids.len() >= depth {
break;
}
}
Ok(ids)
}
fn passes_filter(
&self,
cid: CollectionId,
ext_id: &str,
filter: Option<&Filter>,
) -> Result<bool> {
let Some(filter) = filter else {
return Ok(true);
};
let value: Value = match self.store.get(cid, ext_id)? {
Some(r) => serde_json::from_slice(&r.payload)?,
None => Value::Null,
};
Ok(filter.matches(&value))
}
fn exact_filtered_search(
&self,
cid: CollectionId,
descriptor: &Descriptor,
query: &[f32],
params: &SearchParams,
filter: &Filter,
candidates: &BTreeSet<String>,
) -> Result<Vec<Match>> {
let metric = to_index_metric(descriptor.metric);
let mut scored: Vec<(f32, String, Value, Vec<f32>)> = Vec::new();
for ext_id in candidates {
let Some(record) = self.store.get(cid, ext_id)? else {
continue;
};
let payload: Value = serde_json::from_slice(&record.payload)?;
if !filter.matches(&payload) {
continue;
}
let ordering = ordering_distance(metric, query, &record.vector);
scored.push((ordering, ext_id.clone(), payload, record.vector));
}
scored.sort_by(|a, b| a.0.total_cmp(&b.0));
scored.truncate(params.k);
Ok(scored
.into_iter()
.map(|(ordering, id, payload, vector)| Match {
id,
score: report_metric(metric, ordering),
payload: params.with_payload.then_some(payload),
vector: params.with_vector.then_some(vector),
})
.collect())
}
pub fn upsert_document(
&mut self,
collection: &str,
doc_id: &str,
vectors: &[Vec<f32>],
payload: &Value,
) -> Result<()> {
let handle = self
.collections
.get_mut(collection)
.ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
require_multivector(handle)?;
if doc_id.contains(DOC_TOKEN_SEP) {
return Err(Error::Unsupported(
"document id must not contain the reserved 0x1f separator",
));
}
if vectors.is_empty() {
return Err(Error::Unsupported("a document needs at least one vector"));
}
let dim = handle.descriptor.dim as usize;
if vectors.iter().any(|v| v.len() != dim) {
return Err(Error::Unsupported(
"every document vector must match the collection dimensionality",
));
}
let previous = handle
.docs
.as_ref()
.and_then(|d| d.get(doc_id))
.copied()
.unwrap_or(0) as usize;
for j in vectors.len()..previous {
self.store.delete(handle.id, &token_id(doc_id, j))?;
index_delete_point(handle, &token_id(doc_id, j));
}
let payload_bytes = serde_json::to_vec(payload)?;
for (j, vector) in vectors.iter().enumerate() {
let bytes: &[u8] = if j == 0 {
payload_bytes.as_slice()
} else {
&[]
};
self.store
.upsert(handle.id, &token_id(doc_id, j), vector, bytes)?;
index_upsert_point(handle, &token_id(doc_id, j), vector)?;
}
if let Some(docs) = handle.docs.as_mut() {
docs.insert(doc_id.to_owned(), vectors.len() as u32);
}
Ok(())
}
pub fn search_multi_vector(
&mut self,
collection: &str,
query_tokens: &[Vec<f32>],
params: &SearchParams,
) -> Result<Vec<DocumentMatch>> {
self.ensure_indexed(collection)?;
self.search_multi_vector_snapshot(collection, query_tokens, params)
}
pub fn search_multi_vector_snapshot(
&self,
collection: &str,
query_tokens: &[Vec<f32>],
params: &SearchParams,
) -> Result<Vec<DocumentMatch>> {
require_multivector(self.handle(collection)?)?;
let dim = self.handle(collection)?.descriptor.dim as usize;
if query_tokens.is_empty() {
return Ok(Vec::new());
}
if query_tokens.iter().any(|v| v.len() != dim) {
return Err(Error::Unsupported(
"every query token must match the collection dimensionality",
));
}
let doc_count = self
.handle(collection)?
.docs
.as_ref()
.map_or(0, BTreeMap::len);
let candidates: Vec<String> = if doc_count <= MULTIVECTOR_EXACT_DOC_THRESHOLD {
self.handle(collection)?
.docs
.as_ref()
.map(|d| d.keys().cloned().collect())
.unwrap_or_default()
} else {
let handle = self.handle(collection)?;
let per_token_k = params
.k
.saturating_mul(MULTIVECTOR_CANDIDATE_FACTOR)
.max(params.ef_search);
let mut set = BTreeSet::new();
for token in query_tokens {
for neighbor in handle.index.search(token, per_token_k, params.ef_search)? {
if let Some(ext) = handle.int_to_ext.get(neighbor.id as usize)
&& let Some((doc, _)) = parse_token_id(ext)
{
set.insert(doc.to_owned());
}
}
}
set.into_iter().collect()
};
let handle = self.handle(collection)?;
let cid = handle.id;
let metric = to_index_metric(handle.descriptor.metric);
let mut scored: Vec<ScoredDocument> = Vec::new();
for doc in &candidates {
let count = handle
.docs
.as_ref()
.and_then(|d| d.get(doc))
.copied()
.unwrap_or(0) as usize;
let (tokens, payload) = self.gather_document(cid, doc, count)?;
if tokens.is_empty() {
continue;
}
if let Some(filter) = ¶ms.filter {
let value = payload.clone().unwrap_or(Value::Null);
if !filter.matches(&value) {
continue;
}
}
let score = max_sim(metric, query_tokens, &tokens);
let vectors = params.with_vector.then_some(tokens);
scored.push((score, doc.clone(), payload, vectors));
}
scored.sort_by(|a, b| b.0.total_cmp(&a.0).then_with(|| a.1.cmp(&b.1)));
scored.truncate(params.k);
Ok(scored
.into_iter()
.map(|(score, id, payload, vectors)| DocumentMatch {
id,
score,
payload: params.with_payload.then_some(payload).flatten(),
vectors,
})
.collect())
}
pub fn get_document(
&self,
collection: &str,
doc_id: &str,
with_vectors: bool,
) -> Result<Option<DocumentMatch>> {
let handle = self.handle(collection)?;
require_multivector(handle)?;
let Some(&count) = handle.docs.as_ref().and_then(|d| d.get(doc_id)) else {
return Ok(None);
};
let (tokens, payload) = self.gather_document(handle.id, doc_id, count as usize)?;
if tokens.is_empty() {
return Ok(None);
}
Ok(Some(DocumentMatch {
id: doc_id.to_owned(),
score: 0.0,
payload,
vectors: with_vectors.then_some(tokens),
}))
}
pub fn delete_document(&mut self, collection: &str, doc_id: &str) -> Result<bool> {
let handle = self
.collections
.get_mut(collection)
.ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
require_multivector(handle)?;
let Some(count) = handle.docs.as_ref().and_then(|d| d.get(doc_id)).copied() else {
return Ok(false);
};
for j in 0..count as usize {
self.store.delete(handle.id, &token_id(doc_id, j))?;
index_delete_point(handle, &token_id(doc_id, j));
}
if let Some(docs) = handle.docs.as_mut() {
docs.remove(doc_id);
}
Ok(true)
}
pub fn document_count(&self, collection: &str) -> Result<usize> {
let handle = self.handle(collection)?;
require_multivector(handle)?;
Ok(handle.docs.as_ref().map_or(0, BTreeMap::len))
}
fn gather_document(
&self,
cid: CollectionId,
doc_id: &str,
count: usize,
) -> Result<(Vec<Vec<f32>>, Option<Value>)> {
let mut tokens = Vec::with_capacity(count);
let mut payload: Option<Value> = None;
for j in 0..count {
let Some(record) = self.store.get(cid, &token_id(doc_id, j))? else {
continue;
};
if j == 0 && !record.payload.is_empty() {
payload = Some(serde_json::from_slice(&record.payload)?);
}
tokens.push(record.vector);
}
Ok((tokens, payload))
}
pub fn checkpoint(&mut self) -> Result<()> {
let mut snapshots: HashMap<CollectionId, Vec<u8>> = HashMap::new();
for handle in self.collections.values() {
if handle.stale {
continue;
}
if let CollectionIndex::Ivf(Some(ivf)) = &handle.index {
if ivf.is_empty() {
continue;
}
let envelope = IndexEnvelope {
version: INDEX_ENVELOPE_VERSION,
int_to_ext: handle.int_to_ext.clone(),
ivf: ivf.snapshot()?,
};
snapshots.insert(handle.id, postcard::to_allocvec(&envelope)?);
} else if let CollectionIndex::Disk(Some(fresh)) = &handle.index {
let envelope = DiskEnvelope {
version: INDEX_ENVELOPE_VERSION,
int_to_ext: handle.int_to_ext.clone(),
base_row_count: fresh.base_len() as u64,
deleted_ids: fresh.deleted_ids(),
};
snapshots.insert(handle.id, postcard::to_allocvec(&envelope)?);
}
}
self.store.checkpoint_with_index_snapshots(&snapshots)?;
Ok(())
}
pub fn compact(&mut self) -> Result<()> {
Ok(self.store.compact()?)
}
#[must_use]
pub fn manifest_version(&self) -> u64 {
self.store.manifest_version()
}
#[must_use]
pub fn disk_usage_bytes(&self) -> u64 {
dir_size(self.store.dir())
}
pub fn snapshot(&mut self, dest: &Path) -> Result<SnapshotInfo> {
if dest.exists() {
return Err(Error::Core(quiver_core::CoreError::AlreadyExists(
dest.display().to_string(),
)));
}
self.checkpoint()?;
let (files, bytes) = copy_tree(self.store.dir(), dest)?;
let _ = std::fs::File::open(dest).and_then(|f| f.sync_all());
Ok(SnapshotInfo {
manifest_version: self.store.manifest_version(),
files,
bytes,
})
}
fn handle(&self, name: &str) -> Result<&CollectionHandle> {
self.collections
.get(name)
.ok_or_else(|| Error::CollectionNotFound(name.to_owned()))
}
}
pub fn restore_snapshot(src: &Path, dest: &Path) -> Result<SnapshotInfo> {
if dest.exists() {
return Err(Error::Core(quiver_core::CoreError::AlreadyExists(
dest.display().to_string(),
)));
}
if !src.join("CURRENT").exists() {
return Err(Error::Core(quiver_core::CoreError::InvalidArgument(
format!("{} is not a snapshot (no CURRENT)", src.display()),
)));
}
let (files, bytes) = copy_tree(src, dest)?;
Ok(SnapshotInfo {
manifest_version: 0,
files,
bytes,
})
}
fn copy_tree(src: &Path, dst: &Path) -> Result<(u64, u64)> {
std::fs::create_dir_all(dst).map_err(|e| quiver_core::CoreError::io(dst, e))?;
let mut files = 0u64;
let mut bytes = 0u64;
for entry in std::fs::read_dir(src).map_err(|e| quiver_core::CoreError::io(src, e))? {
let entry = entry.map_err(|e| quiver_core::CoreError::io(src, e))?;
let from = entry.path();
let to = dst.join(entry.file_name());
let ft = entry
.file_type()
.map_err(|e| quiver_core::CoreError::io(&from, e))?;
if ft.is_dir() {
let (f, b) = copy_tree(&from, &to)?;
files += f;
bytes += b;
} else {
let n = std::fs::copy(&from, &to).map_err(|e| quiver_core::CoreError::io(&from, e))?;
files += 1;
bytes += n;
}
}
Ok((files, bytes))
}
fn dir_size(dir: &Path) -> u64 {
let mut total = 0u64;
let Ok(rd) = std::fs::read_dir(dir) else {
return total;
};
for entry in rd.flatten() {
let Ok(ft) = entry.file_type() else { continue };
if ft.is_dir() {
total += dir_size(&entry.path());
} else if let Ok(meta) = entry.metadata() {
total += meta.len();
}
}
total
}
const DOC_TOKEN_SEP: char = '\u{1f}';
const MULTIVECTOR_EXACT_DOC_THRESHOLD: usize = 10_000;
const MULTIVECTOR_CANDIDATE_FACTOR: usize = 4;
fn token_id(doc_id: &str, ordinal: usize) -> String {
format!("{doc_id}{DOC_TOKEN_SEP}{ordinal}")
}
fn parse_token_id(ext: &str) -> Option<(&str, u32)> {
let (doc, ordinal) = ext.rsplit_once(DOC_TOKEN_SEP)?;
Some((doc, ordinal.parse().ok()?))
}
fn require_single_vector(handle: &CollectionHandle) -> Result<()> {
if handle.descriptor.multivector {
Err(Error::Unsupported(
"collection is multi-vector; use upsert_document / search_multi_vector",
))
} else {
Ok(())
}
}
fn require_multivector(handle: &CollectionHandle) -> Result<()> {
if handle.descriptor.multivector {
Ok(())
} else {
Err(Error::Unsupported(
"collection is single-vector; use upsert / search",
))
}
}
fn require_server_searchable(handle: &CollectionHandle) -> Result<()> {
if handle.descriptor.vector_encryption == VectorEncryption::ClientSide {
Err(Error::Unsupported(
"collection is client-side encrypted; the server cannot rank opaque vectors — \
fetch points and rank client-side",
))
} else {
Ok(())
}
}
fn to_index_metric(metric: DistanceMetric) -> Metric {
match metric {
DistanceMetric::Dot => Metric::Dot,
DistanceMetric::Cosine => Metric::Cosine,
DistanceMetric::L2 => Metric::L2,
}
}
fn validate_index(descriptor: &Descriptor) -> Result<()> {
if descriptor.multivector && descriptor.metric == DistanceMetric::L2 {
return Err(Error::Unsupported(
"multi-vector collections require a similarity metric (cosine or dot)",
));
}
if descriptor.vector_encryption == VectorEncryption::ClientSide {
if descriptor.multivector {
return Err(Error::Unsupported(
"client-side vector encryption is not supported for multi-vector collections",
));
}
return Ok(());
}
if descriptor.vector_encryption == VectorEncryption::Dcpe
&& descriptor.metric != DistanceMetric::L2
{
return Err(Error::Unsupported(
"dcpe-encrypted collections require the l2 metric",
));
}
if descriptor.index.kind == IndexKind::Colbert && !descriptor.multivector {
return Err(Error::Unsupported(
"the colbert index is only for multi-vector collections",
));
}
match descriptor.index.kind {
IndexKind::Vamana | IndexKind::Ivf | IndexKind::DiskVamana
if descriptor.metric == DistanceMetric::Dot =>
{
Err(Error::Unsupported(
"vamana, ivf, and the disk index support l2 and cosine; use hnsw for dot",
))
}
_ => Ok(()),
}
}
fn empty_index(descriptor: &Descriptor) -> CollectionIndex {
if descriptor.vector_encryption == VectorEncryption::ClientSide {
return CollectionIndex::None;
}
match descriptor.index.kind {
IndexKind::Vamana => CollectionIndex::Vamana(None),
IndexKind::DiskVamana => CollectionIndex::Disk(None),
IndexKind::Ivf => CollectionIndex::Ivf(None),
IndexKind::Colbert => CollectionIndex::Colbert(None),
_ => CollectionIndex::Hnsw(Hnsw::new(
descriptor.dim as usize,
to_index_metric(descriptor.metric),
HnswConfig::default(),
)),
}
}
fn default_pq_m(dim: usize) -> usize {
let target = (dim / 8).max(1);
(1..=target)
.rev()
.find(|&m| dim.is_multiple_of(m))
.unwrap_or(1)
}
const PQ_SEED: u64 = 0x5176_5044_5141_5453;
const DISK_INDEX_FILE: &str = "vamana.qvx";
fn build_index(
store: &Store,
cid: CollectionId,
descriptor: &Descriptor,
ids: &[u64],
flat: &[f32],
) -> Result<CollectionIndex> {
Ok(match build_in_memory_index(descriptor, ids, flat)? {
Some(index) => index,
None => {
let (graph, pq) = build_disk_graph_pq(descriptor, ids, flat)?;
CollectionIndex::Disk(Some(FreshDiskVamana::new(write_disk_index(
store, cid, &graph, &pq,
)?)?))
}
})
}
fn build_in_memory_index(
descriptor: &Descriptor,
ids: &[u64],
flat: &[f32],
) -> Result<Option<CollectionIndex>> {
if descriptor.vector_encryption == VectorEncryption::ClientSide {
return Ok(Some(CollectionIndex::None));
}
let dim = descriptor.dim as usize;
let metric = to_index_metric(descriptor.metric);
Ok(Some(match descriptor.index.kind {
IndexKind::Vamana => CollectionIndex::Vamana(Some(FreshVamana::new(Vamana::build(
ids,
flat,
dim,
metric,
VamanaConfig::default(),
)?)?)),
IndexKind::DiskVamana => return Ok(None),
IndexKind::Ivf => {
let cfg = IvfConfig {
quantization: descriptor.index.pq_subspaces.map(|m| m as usize),
..IvfConfig::default()
};
CollectionIndex::Ivf(Some(Ivf::build(ids, flat, dim, metric, cfg)?))
}
IndexKind::Colbert => {
let n = ids.len();
let n_centroids = ((n as f64).sqrt().ceil() as usize).clamp(1, 4096);
let cfg = ColbertConfig {
n_centroids,
n_probe: n_centroids.div_ceil(4).clamp(1, n_centroids),
pq_subspaces: descriptor
.index
.pq_subspaces
.map_or_else(|| default_pq_m(dim), |m| m as usize),
seed: PQ_SEED,
};
CollectionIndex::Colbert(Some(ColbertIndex::build(ids, flat, dim, metric, cfg)?))
}
_ => {
let mut h = Hnsw::new(dim, metric, HnswConfig::default());
for (i, &id) in ids.iter().enumerate() {
h.insert(id, &flat[i * dim..(i + 1) * dim])?;
}
CollectionIndex::Hnsw(h)
}
}))
}
fn build_disk_graph_pq(
descriptor: &Descriptor,
ids: &[u64],
flat: &[f32],
) -> Result<(Vamana, ProductQuantizer)> {
let dim = descriptor.dim as usize;
let metric = to_index_metric(descriptor.metric);
let graph = Vamana::build(ids, flat, dim, metric, VamanaConfig::default())?;
let m = descriptor
.index
.pq_subspaces
.map_or_else(|| default_pq_m(dim), |x| x as usize);
let pq = ProductQuantizer::train(flat, ids.len(), dim, m, metric, PQ_SEED)?;
Ok((graph, pq))
}
fn write_disk_index(
store: &Store,
cid: CollectionId,
graph: &Vamana,
pq: &ProductQuantizer,
) -> Result<DiskVamana> {
let dir = store.index_dir(cid);
std::fs::create_dir_all(&dir).map_err(quiver_index::DiskError::Io)?;
let path = dir.join(DISK_INDEX_FILE);
let codec = store.collection_codec_clone(cid)?;
let tmp = dir.join(format!("{DISK_INDEX_FILE}.tmp"));
quiver_index::disk::write(&tmp, graph, pq, codec.as_ref())?;
std::fs::rename(&tmp, &path).map_err(quiver_index::DiskError::Io)?;
let _ = std::fs::File::open(&dir).and_then(|f| f.sync_all());
open_disk_index(store, cid, codec)
}
fn open_disk_index(
store: &Store,
cid: CollectionId,
codec: Box<dyn PageCodec>,
) -> Result<DiskVamana> {
let path = store.index_dir(cid).join(DISK_INDEX_FILE);
Ok(DiskVamana::open(&path, codec)?)
}
fn load_index(store: &Store, handle: &mut CollectionHandle) -> Result<()> {
if !handle.descriptor.multivector
&& handle.descriptor.index.kind == IndexKind::Ivf
&& let Ok(Some(blob)) = store.read_index_snapshot(handle.id)
&& restore_ivf_snapshot(store, handle, &blob).is_ok()
{
return Ok(());
}
if !handle.descriptor.multivector
&& handle.descriptor.index.kind == IndexKind::DiskVamana
&& std::env::var_os("QUIVER_DISABLE_DURABLE_DISK_INDEX").is_none()
&& let Ok(Some(blob)) = store.read_index_snapshot(handle.id)
&& restore_disk_snapshot(store, handle, &blob).is_ok()
{
return Ok(());
}
rebuild_index(store, handle)
}
fn restore_disk_snapshot(store: &Store, handle: &mut CollectionHandle, blob: &[u8]) -> Result<()> {
let envelope: DiskEnvelope = postcard::from_bytes(blob)?;
if envelope.version != INDEX_ENVELOPE_VERSION {
return Err(Error::Unsupported(
"unsupported disk index snapshot version",
));
}
let base = open_disk_index(store, handle.id, store.collection_codec_clone(handle.id)?)?;
if base.len() as u64 != envelope.base_row_count {
return Err(Error::Unsupported(
"disk base count disagrees with snapshot",
));
}
handle.ext_to_int = envelope
.int_to_ext
.iter()
.enumerate()
.map(|(i, ext)| (ext.clone(), i as u64))
.collect();
handle.int_to_ext = envelope.int_to_ext;
let mut fresh = FreshDiskVamana::new(base)?;
for internal in envelope.base_row_count..handle.int_to_ext.len() as u64 {
let ext = &handle.int_to_ext[internal as usize];
if let Some(record) = store.get(handle.id, ext)? {
fresh.insert(internal, &record.vector)?;
}
}
for id in envelope.deleted_ids {
fresh.mark_deleted(id);
}
handle.index = CollectionIndex::Disk(Some(fresh));
handle.stale = false;
replay_recovery_tail(store, handle)
}
fn replay_recovery_tail(store: &Store, handle: &mut CollectionHandle) -> Result<()> {
let tail = store.recovery_tail(handle.id)?;
for ext in &tail.deleted {
index_delete_point(handle, ext);
}
for (ext, record) in tail.upserts {
index_upsert_point(handle, &ext, &record.vector)?;
}
Ok(())
}
fn restore_ivf_snapshot(store: &Store, handle: &mut CollectionHandle, blob: &[u8]) -> Result<()> {
let envelope: IndexEnvelope = postcard::from_bytes(blob)?;
if envelope.version != INDEX_ENVELOPE_VERSION {
return Err(Error::Unsupported(
"unsupported index snapshot envelope version",
));
}
let ivf = Ivf::restore(&envelope.ivf)?;
handle.ext_to_int = envelope
.int_to_ext
.iter()
.enumerate()
.map(|(i, ext)| (ext.clone(), i as u64))
.collect();
handle.int_to_ext = envelope.int_to_ext;
handle.index = CollectionIndex::Ivf(Some(ivf));
handle.stale = false;
let tail = store.recovery_tail(handle.id)?;
for ext in &tail.deleted {
let Some(&internal) = handle.ext_to_int.get(ext) else {
continue;
};
if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
ivf.remove(internal);
}
}
for (ext, record) in tail.upserts {
let internal = match handle.ext_to_int.get(&ext) {
Some(&i) => i,
None => {
let i = handle.int_to_ext.len() as u64;
handle.ext_to_int.insert(ext.clone(), i);
handle.int_to_ext.push(ext);
i
}
};
if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
ivf.insert(internal, &record.vector)?;
}
}
Ok(())
}
fn index_upsert_point(handle: &mut CollectionHandle, ext_id: &str, vector: &[f32]) -> Result<()> {
if mvcc_served(handle) {
overlay_upsert(handle, ext_id, vector);
return Ok(());
}
bump_write_gen(handle);
if handle.stale {
return Ok(());
}
let known = handle.ext_to_int.contains_key(ext_id);
let is_hnsw = matches!(handle.index, CollectionIndex::Hnsw(_));
let is_live_ivf = matches!(&handle.index, CollectionIndex::Ivf(Some(ivf)) if !ivf.is_empty());
let is_live_graph = matches!(
handle.index,
CollectionIndex::Vamana(Some(_)) | CollectionIndex::Disk(Some(_))
);
let is_live_colbert = matches!(handle.index, CollectionIndex::Colbert(Some(_)));
if is_hnsw && !known {
let internal = handle.int_to_ext.len() as u64;
if let CollectionIndex::Hnsw(h) = &mut handle.index {
h.insert(internal, vector)?;
}
handle.ext_to_int.insert(ext_id.to_owned(), internal);
handle.int_to_ext.push(ext_id.to_owned());
} else if is_live_ivf {
let internal = if known {
handle.ext_to_int[ext_id]
} else {
let i = handle.int_to_ext.len() as u64;
handle.ext_to_int.insert(ext_id.to_owned(), i);
handle.int_to_ext.push(ext_id.to_owned());
i
};
if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
ivf.insert(internal, vector)?;
}
} else if is_live_graph {
let old = handle.ext_to_int.get(ext_id).copied();
let internal = handle.int_to_ext.len() as u64;
let mut pending = 0.0;
match &mut handle.index {
CollectionIndex::Vamana(Some(fresh)) => {
if let Some(o) = old {
fresh.mark_deleted(o);
}
fresh.insert(internal, vector)?;
pending = fresh.pending_fraction();
}
CollectionIndex::Disk(Some(fresh)) => {
if let Some(o) = old {
fresh.mark_deleted(o);
}
fresh.insert(internal, vector)?;
pending = fresh.pending_fraction();
}
_ => {}
}
handle.ext_to_int.insert(ext_id.to_owned(), internal);
handle.int_to_ext.push(ext_id.to_owned());
if pending >= GRAPH_REBUILD_PENDING_FRACTION {
mark_stale(handle);
}
} else if is_live_colbert {
let old = handle.ext_to_int.get(ext_id).copied();
let internal = handle.int_to_ext.len() as u64;
let mut crowded = false;
if let CollectionIndex::Colbert(Some(c)) = &mut handle.index {
if let Some(o) = old {
c.mark_deleted(o);
}
c.insert(internal, vector)?;
crowded = c.deleted_fraction() >= HNSW_REBUILD_DELETED_FRACTION;
}
handle.ext_to_int.insert(ext_id.to_owned(), internal);
handle.int_to_ext.push(ext_id.to_owned());
if crowded {
mark_stale(handle);
}
} else {
mark_stale(handle);
}
Ok(())
}
fn index_delete_point(handle: &mut CollectionHandle, ext_id: &str) {
if mvcc_served(handle) {
overlay_delete(handle, ext_id);
return;
}
bump_write_gen(handle);
if handle.stale {
return;
}
let internal = handle.ext_to_int.get(ext_id).copied();
let live_ivf = matches!(handle.index, CollectionIndex::Ivf(Some(_)));
let live_hnsw = matches!(handle.index, CollectionIndex::Hnsw(_));
let live_graph = matches!(
handle.index,
CollectionIndex::Vamana(Some(_)) | CollectionIndex::Disk(Some(_))
);
let live_colbert = matches!(handle.index, CollectionIndex::Colbert(Some(_)));
match internal {
Some(internal) if live_ivf => {
if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
ivf.remove(internal);
}
}
Some(internal) if live_hnsw => {
let mut crowded = false;
if let CollectionIndex::Hnsw(h) = &mut handle.index {
h.mark_deleted(internal as u32);
crowded = h.deleted_fraction() >= HNSW_REBUILD_DELETED_FRACTION;
}
if crowded {
mark_stale(handle);
}
}
Some(internal) if live_graph => {
let mut crowded = false;
match &mut handle.index {
CollectionIndex::Vamana(Some(fresh)) => {
fresh.mark_deleted(internal);
crowded = fresh.pending_fraction() >= GRAPH_REBUILD_PENDING_FRACTION;
}
CollectionIndex::Disk(Some(fresh)) => {
fresh.mark_deleted(internal);
crowded = fresh.pending_fraction() >= GRAPH_REBUILD_PENDING_FRACTION;
}
_ => {}
}
if crowded {
mark_stale(handle);
}
}
Some(internal) if live_colbert => {
let mut crowded = false;
if let CollectionIndex::Colbert(Some(c)) = &mut handle.index {
c.mark_deleted(internal);
crowded = c.deleted_fraction() >= HNSW_REBUILD_DELETED_FRACTION;
}
if crowded {
mark_stale(handle);
}
}
_ => mark_stale(handle),
}
}
struct RebuildScan {
int_to_ext: Vec<String>,
ext_to_int: HashMap<String, u64>,
flat: Vec<f32>,
docs: Option<BTreeMap<String, u32>>,
sparse: Option<SparseInvertedIndex>,
}
fn scan_collection(store: &Store, handle: &CollectionHandle) -> Result<RebuildScan> {
let multivector = handle.descriptor.multivector;
let mut int_to_ext = Vec::new();
let mut ext_to_int = HashMap::new();
let mut flat: Vec<f32> = Vec::new();
let mut docs: BTreeMap<String, u32> = BTreeMap::new();
let mut sparse = uses_sparse_index(&handle.descriptor).then(SparseInvertedIndex::new);
for (ext_id, record) in store.scan(handle.id)? {
let internal = int_to_ext.len() as u64;
flat.extend_from_slice(&record.vector);
if multivector && let Some((doc, _)) = parse_token_id(&ext_id) {
*docs.entry(doc.to_owned()).or_insert(0) += 1;
}
if let Some(idx) = sparse.as_mut()
&& let Some(sv) = sparse_vector_from_payload(&record.payload)
{
idx.upsert(&ext_id, &sv);
}
ext_to_int.insert(ext_id.clone(), internal);
int_to_ext.push(ext_id);
}
Ok(RebuildScan {
int_to_ext,
ext_to_int,
flat,
docs: multivector.then_some(docs),
sparse,
})
}
fn rebuild_index(store: &Store, handle: &mut CollectionHandle) -> Result<()> {
let scan = scan_collection(store, handle)?;
let ids: Vec<u64> = (0..scan.int_to_ext.len() as u64).collect();
handle.index = empty_index(&handle.descriptor);
handle.index = build_index(store, handle.id, &handle.descriptor, &ids, &scan.flat)?;
handle.int_to_ext = scan.int_to_ext;
handle.ext_to_int = scan.ext_to_int;
handle.docs = scan.docs;
handle.sparse = scan.sparse;
handle.stale = false;
Ok(())
}
pub struct RebuildInputs {
collection: String,
descriptor: Descriptor,
scan: RebuildScan,
write_gen: u64,
}
enum RebuiltKind {
Ready(Box<CollectionIndex>),
Disk {
graph: Box<Vamana>,
pq: Box<ProductQuantizer>,
},
}
pub struct RebuiltIndex {
collection: String,
kind: RebuiltKind,
int_to_ext: Vec<String>,
ext_to_int: HashMap<String, u64>,
docs: Option<BTreeMap<String, u32>>,
sparse: Option<SparseInvertedIndex>,
write_gen: u64,
}
impl RebuildInputs {
pub fn build(self) -> Result<RebuiltIndex> {
let ids: Vec<u64> = (0..self.scan.int_to_ext.len() as u64).collect();
let kind = match build_in_memory_index(&self.descriptor, &ids, &self.scan.flat)? {
Some(index) => RebuiltKind::Ready(Box::new(index)),
None => {
let (graph, pq) = build_disk_graph_pq(&self.descriptor, &ids, &self.scan.flat)?;
RebuiltKind::Disk {
graph: Box::new(graph),
pq: Box::new(pq),
}
}
};
Ok(RebuiltIndex {
collection: self.collection,
kind,
int_to_ext: self.scan.int_to_ext,
ext_to_int: self.scan.ext_to_int,
docs: self.scan.docs,
sparse: self.scan.sparse,
write_gen: self.write_gen,
})
}
}
fn sparse_vector_from_payload(payload: &[u8]) -> Option<SparseVector> {
if payload.is_empty() {
return None;
}
let value = serde_json::from_slice::<Value>(payload).ok()?;
sparse_vector_from_value(&value)
}
fn sparse_vector_from_value(payload: &Value) -> Option<SparseVector> {
if let Some(raw) = payload.get(SPARSE_KEY) {
return serde_json::from_value::<SparseVector>(raw.clone()).ok();
}
let text = payload.get(TEXT_KEY)?.as_str()?;
Some(text_to_sparse(text))
}
fn sparse_index_upsert_point(handle: &mut CollectionHandle, ext_id: &str, payload: &Value) {
if handle.stale {
return;
}
let Some(idx) = handle.sparse.as_mut() else {
return;
};
match sparse_vector_from_value(payload) {
Some(sv) => idx.upsert(ext_id, &sv),
None => {
idx.remove(ext_id);
}
}
}
fn sparse_index_delete_point(handle: &mut CollectionHandle, ext_id: &str) {
if let Some(idx) = handle.sparse.as_mut() {
idx.remove(ext_id);
}
}
fn candidate_ids(
store: &Store,
cid: CollectionId,
filter: &Filter,
filterable: &[FilterableField],
) -> Result<Option<BTreeSet<String>>> {
match filter {
Filter::And(subs) => {
let mut acc: Option<BTreeSet<String>> = None;
for sub in subs {
if let Some(set) = candidate_ids(store, cid, sub, filterable)? {
acc = Some(match acc {
Some(existing) => existing.intersection(&set).cloned().collect(),
None => set,
});
}
}
Ok(acc)
}
Filter::Or(subs) => {
let mut acc = BTreeSet::new();
for sub in subs {
match candidate_ids(store, cid, sub, filterable)? {
Some(set) => acc.extend(set),
None => return Ok(None),
}
}
Ok(Some(acc))
}
Filter::Not(_) => Ok(None),
leaf => match leaf_predicate(leaf, filterable) {
Some(pred) => Ok(Some(store.matching_ids(cid, &pred)?.into_iter().collect())),
None => Ok(None),
},
}
}
fn leaf_predicate(filter: &Filter, filterable: &[FilterableField]) -> Option<SecPredicate> {
let field_type = |field: &str| {
filterable
.iter()
.find(|f| f.path == field)
.map(|f| f.field_type)
};
match filter {
Filter::Eq { field, value } => Some(SecPredicate::Eq {
field: field.clone(),
value: sec_value(field_type(field)?, value)?,
}),
Filter::In { field, values } => {
let ft = field_type(field)?;
let values: Option<Vec<SecValue>> = values.iter().map(|v| sec_value(ft, v)).collect();
Some(SecPredicate::In {
field: field.clone(),
values: values?,
})
}
Filter::Lt { field, value } => {
one_sided_range(field, field_type(field)?, value, false, false)
}
Filter::Lte { field, value } => {
one_sided_range(field, field_type(field)?, value, false, true)
}
Filter::Gt { field, value } => {
one_sided_range(field, field_type(field)?, value, true, false)
}
Filter::Gte { field, value } => {
one_sided_range(field, field_type(field)?, value, true, true)
}
_ => None,
}
}
fn one_sided_range(
field: &str,
field_type: FieldType,
value: &Value,
is_lower: bool,
inclusive: bool,
) -> Option<SecPredicate> {
let v = sec_value(field_type, value)?;
let (lo, hi, lo_inclusive, hi_inclusive) = if is_lower {
(Some(v), None, inclusive, false)
} else {
(None, Some(v), false, inclusive)
};
Some(SecPredicate::Range {
field: field.to_owned(),
lo,
hi,
lo_inclusive,
hi_inclusive,
})
}
fn sec_value(field_type: FieldType, value: &Value) -> Option<SecValue> {
match (field_type, value) {
(FieldType::Keyword, Value::String(s)) => Some(SecValue::Keyword(s.clone())),
(FieldType::Numeric, Value::Number(n)) => n.as_f64().map(SecValue::Numeric),
_ => None,
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
fn desc() -> Descriptor {
Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
}
fn open(dir: &Path) -> Database {
Database::open(dir).unwrap()
}
#[test]
fn hybrid_search_fuses_dense_and_sparse() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
db.create_collection("kb", desc()).unwrap();
db.upsert(
"kb",
"a",
&[1.0, 0.0, 0.0, 0.0],
&json!({ "__quiver_sparse__": { "indices": [100], "values": [0.1] } }),
)
.unwrap();
db.upsert(
"kb",
"b",
&[0.0, 1.0, 0.0, 0.0],
&json!({ "__quiver_sparse__": { "indices": [1, 2], "values": [5.0, 5.0] } }),
)
.unwrap();
db.upsert(
"kb",
"c",
&[0.0, 0.0, 0.0, 1.0],
&json!({ "__quiver_sparse__": { "indices": [9], "values": [1.0] } }),
)
.unwrap();
let dense_q = [1.0, 0.0, 0.0, 0.0];
let sparse_q = SparseVector {
indices: vec![1, 2],
values: vec![1.0, 1.0],
};
let params = SearchParams {
k: 3,
..SearchParams::default()
};
let hits = db
.hybrid_search(
"kb",
Some(&dense_q),
Some(&sparse_q),
None,
¶ms,
DEFAULT_RRF_K0,
)
.unwrap();
let ids: Vec<&str> = hits.iter().map(|m| m.id.as_str()).collect();
assert!(ids.contains(&"a") && ids.contains(&"b"), "got {ids:?}");
assert_eq!(ids[2], "c", "c is worst on both sides; got {ids:?}");
let sparse_only = db
.hybrid_search("kb", None, Some(&sparse_q), None, ¶ms, DEFAULT_RRF_K0)
.unwrap();
assert_eq!(sparse_only[0].id, "b");
let dense_only = db
.hybrid_search("kb", Some(&dense_q), None, None, ¶ms, DEFAULT_RRF_K0)
.unwrap();
assert_eq!(dense_only[0].id, "a");
assert!(
db.hybrid_search("kb", None, None, None, ¶ms, DEFAULT_RRF_K0)
.is_err()
);
}
fn sparse_ids(db: &mut Database, q: &SparseVector) -> Vec<String> {
let params = SearchParams {
k: 10,
..SearchParams::default()
};
db.hybrid_search("kb", None, Some(q), None, ¶ms, DEFAULT_RRF_K0)
.unwrap()
.into_iter()
.map(|m| m.id)
.collect()
}
#[test]
fn sparse_index_equals_the_store_scan_fallback() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
db.create_collection("kb", desc()).unwrap();
let z = [0.0f32, 0.0, 0.0, 0.0];
for (id, dims, vals) in [
("a", vec![1u32, 2], vec![5.0f32, 1.0]),
("b", vec![2u32, 3], vec![3.0f32, 4.0]),
("c", vec![1u32, 3], vec![2.0f32, 2.0]),
("d", vec![9u32], vec![1.0f32]), ] {
db.upsert(
"kb",
id,
&z,
&json!({ "__quiver_sparse__": { "indices": dims, "values": vals } }),
)
.unwrap();
}
let q = SparseVector {
indices: vec![1, 2, 3],
values: vec![1.0, 1.0, 1.0],
};
assert!(db.collections.get("kb").unwrap().sparse.is_some());
let via_index = sparse_ids(&mut db, &q);
assert!(!via_index.contains(&"d".to_owned()), "d shares no term");
db.collections.get_mut("kb").unwrap().sparse = None;
let via_scan = sparse_ids(&mut db, &q);
assert_eq!(via_index, via_scan);
}
#[test]
fn sparse_index_reflects_updates_and_deletes_like_a_rebuild() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
db.create_collection("kb", desc()).unwrap();
let z = [0.0f32, 0.0, 0.0, 0.0];
db.upsert(
"kb",
"a",
&z,
&json!({ "__quiver_sparse__": { "indices": [1, 2], "values": [5.0, 5.0] } }),
)
.unwrap();
db.upsert(
"kb",
"b",
&z,
&json!({ "__quiver_sparse__": { "indices": [2], "values": [3.0] } }),
)
.unwrap();
db.upsert(
"kb",
"c",
&z,
&json!({ "__quiver_sparse__": { "indices": [1], "values": [9.0] } }),
)
.unwrap();
db.upsert(
"kb",
"a",
&z,
&json!({ "__quiver_sparse__": { "indices": [7], "values": [1.0] } }),
)
.unwrap();
assert!(db.delete("kb", "b").unwrap());
let q = SparseVector {
indices: vec![1, 2],
values: vec![1.0, 1.0],
};
let incremental = sparse_ids(&mut db, &q);
assert_eq!(incremental, vec!["c".to_owned()]);
db.collections.get_mut("kb").unwrap().stale = true;
let rebuilt = sparse_ids(&mut db, &q);
assert_eq!(incremental, rebuilt);
}
#[test]
fn sparse_index_is_rebuilt_on_reopen() {
let tmp = tempfile::tempdir().unwrap();
{
let mut db = open(tmp.path());
db.create_collection("kb", desc()).unwrap();
db.upsert(
"kb",
"a",
&[0.0, 0.0, 0.0, 0.0],
&json!({ "__quiver_sparse__": { "indices": [1], "values": [1.0] } }),
)
.unwrap();
}
let mut db = open(tmp.path());
assert!(db.collections.get("kb").unwrap().sparse.is_some());
let q = SparseVector {
indices: vec![1],
values: vec![1.0],
};
assert_eq!(sparse_ids(&mut db, &q), vec!["a".to_owned()]);
}
#[test]
fn hybrid_sparse_honours_the_payload_filter() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
db.create_collection("kb", desc()).unwrap();
let z = [0.0f32, 0.0, 0.0, 0.0];
db.upsert(
"kb",
"a",
&z,
&json!({ "lang": "en", "__quiver_sparse__": { "indices": [1], "values": [5.0] } }),
)
.unwrap();
db.upsert(
"kb",
"b",
&z,
&json!({ "lang": "fr", "__quiver_sparse__": { "indices": [1], "values": [9.0] } }),
)
.unwrap();
let q = SparseVector {
indices: vec![1],
values: vec![1.0],
};
let params = SearchParams {
k: 10,
filter: Some(Filter::Eq {
field: "lang".to_owned(),
value: json!("en"),
}),
..SearchParams::default()
};
let hits: Vec<String> = db
.hybrid_search("kb", None, Some(&q), None, ¶ms, DEFAULT_RRF_K0)
.unwrap()
.into_iter()
.map(|m| m.id)
.collect();
assert_eq!(hits, vec!["a".to_owned()]);
}
#[test]
fn hybrid_text_search_indexes_and_ranks_by_bm25() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
db.create_collection("kb", desc()).unwrap();
let z = [0.0f32, 0.0, 0.0, 0.0];
db.upsert(
"kb",
"cats",
&z,
&json!({ "__quiver_text__": "the quick brown cat jumps" }),
)
.unwrap();
db.upsert(
"kb",
"dogs",
&z,
&json!({ "__quiver_text__": "a lazy dog sleeps all day" }),
)
.unwrap();
let params = SearchParams {
k: 10,
..SearchParams::default()
};
let hits: Vec<String> = db
.hybrid_search("kb", None, None, Some("cats"), ¶ms, DEFAULT_RRF_K0)
.unwrap()
.into_iter()
.map(|m| m.id)
.collect();
assert_eq!(hits, vec!["cats".to_owned()], "only the cat doc matches");
assert!(
db.hybrid_search("kb", None, None, Some("elephant"), ¶ms, DEFAULT_RRF_K0)
.unwrap()
.is_empty()
);
let dense_q = [1.0, 0.0, 0.0, 0.0];
db.upsert("kb", "near", &[1.0, 0.0, 0.0, 0.0], &json!({}))
.unwrap();
let fused: Vec<String> = db
.hybrid_search(
"kb",
Some(&dense_q),
None,
Some("dog"),
¶ms,
DEFAULT_RRF_K0,
)
.unwrap()
.into_iter()
.map(|m| m.id)
.collect();
assert!(
fused.contains(&"near".to_owned()) && fused.contains(&"dogs".to_owned()),
"dense match + lexical match both surface; got {fused:?}"
);
}
#[test]
fn create_upsert_search_get_end_to_end() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
db.create_collection("items", desc()).unwrap();
db.upsert(
"items",
"a",
&[0.0, 0.0, 0.0, 0.0],
&json!({"color": "red"}),
)
.unwrap();
db.upsert(
"items",
"b",
&[1.0, 0.0, 0.0, 0.0],
&json!({"color": "blue"}),
)
.unwrap();
db.upsert(
"items",
"c",
&[5.0, 5.0, 5.0, 5.0],
&json!({"color": "red"}),
)
.unwrap();
let near = db
.search("items", &[0.1, 0.0, 0.0, 0.0], &SearchParams::default())
.unwrap();
assert_eq!(near[0].id, "a");
assert_eq!(near[1].id, "b");
let got = db.get("items", "c").unwrap().unwrap();
assert_eq!(got.vector, Some(vec![5.0, 5.0, 5.0, 5.0]));
assert_eq!(got.payload, Some(json!({"color": "red"})));
}
#[test]
fn upsert_batch_produces_same_search_results_as_sequential() {
let tmp_seq = tempfile::tempdir().unwrap();
let tmp_bat = tempfile::tempdir().unwrap();
let vectors: Vec<[f32; 4]> = (0..20u32).map(|i| [i as f32, 0.0, 0.0, 0.0]).collect();
let ids: Vec<String> = (0..20u32).map(|i| format!("p{i}")).collect();
let payload = json!({});
{
let mut db = open(tmp_seq.path());
db.create_collection("c", desc()).unwrap();
for (id, vec) in ids.iter().zip(vectors.iter()) {
db.upsert("c", id, vec, &payload).unwrap();
}
}
{
let mut db = open(tmp_bat.path());
db.create_collection("c", desc()).unwrap();
let pts: Vec<(&str, &[f32], &serde_json::Value)> = ids
.iter()
.zip(vectors.iter())
.map(|(id, v)| (id.as_str(), v.as_slice(), &payload))
.collect();
let n = db.upsert_batch("c", &pts).unwrap();
assert_eq!(n, 20);
}
let query = [10.0f32, 0.0, 0.0, 0.0];
let params = SearchParams {
k: 5,
..Default::default()
};
let mut seq_db = open(tmp_seq.path());
let mut bat_db = open(tmp_bat.path());
let seq: Vec<String> = seq_db
.search("c", &query, ¶ms)
.unwrap()
.into_iter()
.map(|m| m.id)
.collect();
let bat: Vec<String> = bat_db
.search("c", &query, ¶ms)
.unwrap()
.into_iter()
.map(|m| m.id)
.collect();
assert_eq!(
seq, bat,
"batch and sequential produce different search results"
);
}
#[test]
fn upsert_bulk_defers_the_index_then_searches_correctly() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
db.create_collection("c", desc()).unwrap();
let vectors: Vec<[f32; 4]> = (0..20u32).map(|i| [i as f32, 0.0, 0.0, 0.0]).collect();
let ids: Vec<String> = (0..20u32).map(|i| format!("p{i}")).collect();
let plain = json!({});
let sparse_payload = json!({ "__quiver_sparse__": { "indices": [7], "values": [1.0] } });
let pts: Vec<(&str, &[f32], &serde_json::Value)> = ids
.iter()
.zip(vectors.iter())
.map(|(id, v)| {
let payload = if id == "p3" { &sparse_payload } else { &plain };
(id.as_str(), v.as_slice(), payload)
})
.collect();
let n = db.upsert_bulk("c", &pts).unwrap();
assert_eq!(n, 20);
assert!(db.collections.get("c").unwrap().stale);
let query = [10.0f32, 0.0, 0.0, 0.0];
let params = SearchParams {
k: 5,
..Default::default()
};
let hits: Vec<String> = db
.search("c", &query, ¶ms)
.unwrap()
.into_iter()
.map(|m| m.id)
.collect();
assert_eq!(hits[0], "p10", "nearest to 10 is p10; got {hits:?}");
assert!(!db.collections.get("c").unwrap().stale, "rebuilt on read");
let q = SparseVector {
indices: vec![7],
values: vec![1.0],
};
let sparse_hits: Vec<String> = db
.hybrid_search("c", None, Some(&q), None, ¶ms, DEFAULT_RRF_K0)
.unwrap()
.into_iter()
.map(|m| m.id)
.collect();
assert_eq!(sparse_hits, vec!["p3".to_owned()]);
}
#[test]
fn filtered_search_only_returns_matching_payloads() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
db.create_collection("items", desc()).unwrap();
for i in 0..20u32 {
let color = if i % 2 == 0 { "red" } else { "blue" };
db.upsert(
"items",
&format!("p{i}"),
&[i as f32, 0.0, 0.0, 0.0],
&json!({"color": color, "n": i}),
)
.unwrap();
}
let params = SearchParams {
k: 5,
filter: Some(Filter::Eq {
field: "color".into(),
value: json!("red"),
}),
ef_search: 64,
with_payload: true,
with_vector: false,
};
let results = db.search("items", &[0.0; 4], ¶ms).unwrap();
assert!(!results.is_empty());
for m in &results {
assert_eq!(m.payload.as_ref().unwrap()["color"], json!("red"));
}
}
#[test]
fn persists_and_rebuilds_index_on_reopen() {
let tmp = tempfile::tempdir().unwrap();
{
let mut db = open(tmp.path());
db.create_collection("items", desc()).unwrap();
for i in 0..50u32 {
db.upsert(
"items",
&format!("p{i}"),
&[i as f32, 1.0, 2.0, 3.0],
&json!({}),
)
.unwrap();
}
db.checkpoint().unwrap();
}
let mut db = open(tmp.path());
assert_eq!(db.len("items").unwrap(), 50);
let res = db
.search("items", &[7.0, 1.0, 2.0, 3.0], &SearchParams::default())
.unwrap();
assert_eq!(res[0].id, "p7");
}
#[test]
fn update_reflects_new_vector_after_rebuild() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
db.create_collection("items", desc()).unwrap();
db.upsert("items", "a", &[0.0; 4], &json!({})).unwrap();
db.upsert("items", "b", &[10.0, 0.0, 0.0, 0.0], &json!({}))
.unwrap();
db.upsert("items", "a", &[100.0, 0.0, 0.0, 0.0], &json!({}))
.unwrap();
let res = db
.search("items", &[0.0; 4], &SearchParams::default())
.unwrap();
assert_eq!(res[0].id, "b");
assert_eq!(
db.get("items", "a").unwrap().unwrap().vector,
Some(vec![100.0, 0.0, 0.0, 0.0])
);
}
#[test]
fn delete_removes_from_search() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
db.create_collection("items", desc()).unwrap();
db.upsert("items", "a", &[0.0; 4], &json!({})).unwrap();
db.upsert("items", "b", &[1.0, 0.0, 0.0, 0.0], &json!({}))
.unwrap();
assert!(db.delete("items", "a").unwrap());
let res = db
.search("items", &[0.0; 4], &SearchParams::default())
.unwrap();
assert!(res.iter().all(|m| m.id != "a"));
assert!(db.get("items", "a").unwrap().is_none());
}
#[test]
fn unknown_collection_errors() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
assert!(matches!(
db.search("nope", &[0.0; 4], &SearchParams::default()),
Err(Error::CollectionNotFound(_))
));
db.create_collection("c", desc()).unwrap();
assert!(matches!(
db.create_collection("c", desc()),
Err(Error::Core(quiver_core::CoreError::AlreadyExists(_)))
));
}
fn desc_with(kind: IndexKind) -> Descriptor {
Descriptor::new(4, Dtype::F32, DistanceMetric::L2).with_index(IndexSpec {
kind,
pq_subspaces: None,
})
}
#[test]
fn vamana_and_ivf_collections_find_the_nearest_point() {
for kind in [IndexKind::Vamana, IndexKind::Ivf] {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
db.create_collection("c", desc_with(kind)).unwrap();
for i in 0..40u32 {
db.upsert(
"c",
&format!("p{i}"),
&[i as f32, 0.0, 0.0, 0.0],
&json!({}),
)
.unwrap();
}
let res = db
.search("c", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
.unwrap();
assert_eq!(res[0].id, "p7", "{kind:?} nearest");
}
}
#[test]
fn index_kind_persists_and_rebuilds_on_reopen() {
let tmp = tempfile::tempdir().unwrap();
{
let mut db = open(tmp.path());
db.create_collection("v", desc_with(IndexKind::Vamana))
.unwrap();
for i in 0..20u32 {
db.upsert(
"v",
&format!("p{i}"),
&[i as f32, 1.0, 2.0, 3.0],
&json!({}),
)
.unwrap();
}
db.checkpoint().unwrap();
}
let mut db = open(tmp.path());
assert_eq!(db.descriptor("v").unwrap().index.kind, IndexKind::Vamana);
let res = db
.search("v", &[7.0, 1.0, 2.0, 3.0], &SearchParams::default())
.unwrap();
assert_eq!(res[0].id, "p7");
}
#[test]
fn disk_index_loads_from_snapshot_without_rebuild_on_reopen() {
let tmp = tempfile::tempdir().unwrap();
let cid;
{
let mut db = open(tmp.path());
db.create_collection("d", desc_with(IndexKind::DiskVamana))
.unwrap();
for i in 0..100u32 {
db.upsert(
"d",
&format!("p{i}"),
&[i as f32, 0.0, 0.0, 0.0],
&json!({}),
)
.unwrap();
}
db.search("d", &[1.0, 0.0, 0.0, 0.0], &SearchParams::default())
.unwrap();
db.checkpoint().unwrap();
for i in 100..115u32 {
db.upsert(
"d",
&format!("p{i}"),
&[i as f32, 0.0, 0.0, 0.0],
&json!({}),
)
.unwrap();
}
cid = db.collections["d"].id;
let base = open_disk_index(
&db.store,
cid,
db.store.collection_codec_clone(cid).unwrap(),
)
.unwrap();
assert_eq!(base.len(), 100, "base sealed at the checkpoint count");
}
let mut db = open(tmp.path());
assert_eq!(
db.search("d", &[50.0, 0.0, 0.0, 0.0], &SearchParams::default())
.unwrap()[0]
.id,
"p50",
);
assert_eq!(
db.search("d", &[110.0, 0.0, 0.0, 0.0], &SearchParams::default())
.unwrap()[0]
.id,
"p110",
"post-checkpoint insert survived reopen via WAL-tail replay",
);
let base = open_disk_index(
&db.store,
cid,
db.store.collection_codec_clone(cid).unwrap(),
)
.unwrap();
assert_eq!(
base.len(),
100,
"reopen loaded the base; it was not rebuilt"
);
}
#[test]
fn disk_index_falls_back_to_rebuild_when_base_is_missing() {
let tmp = tempfile::tempdir().unwrap();
let base_path;
{
let mut db = open(tmp.path());
db.create_collection("d", desc_with(IndexKind::DiskVamana))
.unwrap();
for i in 0..60u32 {
db.upsert(
"d",
&format!("p{i}"),
&[i as f32, 0.0, 0.0, 0.0],
&json!({}),
)
.unwrap();
}
db.search("d", &[1.0, 0.0, 0.0, 0.0], &SearchParams::default())
.unwrap();
db.checkpoint().unwrap();
let cid = db.collections["d"].id;
base_path = db.store.index_dir(cid).join(DISK_INDEX_FILE);
}
std::fs::remove_file(&base_path).unwrap();
{
let mut db = open(tmp.path());
assert_eq!(
db.search("d", &[25.0, 0.0, 0.0, 0.0], &SearchParams::default())
.unwrap()[0]
.id,
"p25",
"rebuild fallback still answers correctly after a lost base",
);
assert!(
base_path.exists(),
"the fallback rebuild re-sealed the base file"
);
db.checkpoint().unwrap();
}
let len = std::fs::metadata(&base_path).unwrap().len();
std::fs::OpenOptions::new()
.write(true)
.open(&base_path)
.unwrap()
.set_len(len / 2)
.unwrap();
let mut db = open(tmp.path());
assert_eq!(
db.search("d", &[25.0, 0.0, 0.0, 0.0], &SearchParams::default())
.unwrap()[0]
.id,
"p25",
"rebuild fallback still answers correctly after a torn base",
);
}
#[test]
fn ivf_upserts_and_deletes_incrementally_without_rebuild() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
db.create_collection("c", desc_with(IndexKind::Ivf))
.unwrap();
for i in 0..50u32 {
db.upsert(
"c",
&format!("p{i}"),
&[i as f32, 0.0, 0.0, 0.0],
&json!({}),
)
.unwrap();
}
let _ = db
.search("c", &[1.0, 0.0, 0.0, 0.0], &SearchParams::default())
.unwrap();
assert!(!db.collections["c"].stale, "the search built the index");
db.upsert("c", "far", &[500.0, 0.0, 0.0, 0.0], &json!({}))
.unwrap();
assert!(!db.collections["c"].stale, "ivf insert stayed incremental");
let res = db
.search("c", &[500.0, 0.0, 0.0, 0.0], &SearchParams::default())
.unwrap();
assert_eq!(res[0].id, "far");
assert!(db.delete("c", "far").unwrap());
assert!(!db.collections["c"].stale, "ivf delete stayed incremental");
let res = db
.search("c", &[500.0, 0.0, 0.0, 0.0], &SearchParams::default())
.unwrap();
assert!(res.iter().all(|m| m.id != "far"), "deleted point is gone");
}
#[test]
fn ivf_incremental_update_replaces_the_vector() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
db.create_collection("c", desc_with(IndexKind::Ivf))
.unwrap();
for i in 0..30u32 {
db.upsert(
"c",
&format!("p{i}"),
&[i as f32, 0.0, 0.0, 0.0],
&json!({}),
)
.unwrap();
}
let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
db.upsert("c", "p5", &[900.0, 0.0, 0.0, 0.0], &json!({}))
.unwrap();
assert!(!db.collections["c"].stale);
let at_new = db
.search("c", &[900.0, 0.0, 0.0, 0.0], &SearchParams::default())
.unwrap();
assert_eq!(at_new[0].id, "p5", "p5 found at its new location");
let at_old = db
.search("c", &[5.0, 0.0, 0.0, 0.0], &SearchParams::default())
.unwrap();
assert!(at_old.iter().all(|m| m.id != "p5"), "stale vector is gone");
}
#[test]
fn ivf_reinsert_after_incremental_delete_is_found() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
db.create_collection("c", desc_with(IndexKind::Ivf))
.unwrap();
for i in 0..20u32 {
db.upsert(
"c",
&format!("p{i}"),
&[i as f32, 0.0, 0.0, 0.0],
&json!({}),
)
.unwrap();
}
let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
assert!(db.delete("c", "p3").unwrap());
assert!(!db.collections["c"].stale);
db.upsert("c", "p3", &[3.0, 0.0, 0.0, 0.0], &json!({}))
.unwrap();
assert!(!db.collections["c"].stale);
let res = db
.search("c", &[3.0, 0.0, 0.0, 0.0], &SearchParams::default())
.unwrap();
assert_eq!(res[0].id, "p3");
}
#[test]
fn hnsw_in_place_update_falls_back_to_rebuild() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
db.create_collection("c", desc()).unwrap();
for i in 0..10u32 {
db.upsert(
"c",
&format!("p{i}"),
&[i as f32, 0.0, 0.0, 0.0],
&json!({}),
)
.unwrap();
}
let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
assert!(!db.collections["c"].stale);
db.upsert("c", "p2", &[42.0, 0.0, 0.0, 0.0], &json!({}))
.unwrap();
assert!(db.collections["c"].stale, "hnsw update schedules a rebuild");
let res = db
.search("c", &[42.0, 0.0, 0.0, 0.0], &SearchParams::default())
.unwrap();
assert_eq!(res[0].id, "p2");
}
#[test]
fn unsupported_index_configurations_are_rejected() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
let dot_vamana =
Descriptor::new(4, Dtype::F32, DistanceMetric::Dot).with_index(IndexSpec {
kind: IndexKind::Vamana,
pq_subspaces: None,
});
assert!(matches!(
db.create_collection("a", dot_vamana),
Err(Error::Unsupported(_))
));
let dot_disk = Descriptor::new(4, Dtype::F32, DistanceMetric::Dot).with_index(IndexSpec {
kind: IndexKind::DiskVamana,
pq_subspaces: None,
});
assert!(matches!(
db.create_collection("b", dot_disk),
Err(Error::Unsupported(_))
));
}
#[test]
fn dcpe_collections_require_the_l2_metric() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
for metric in [DistanceMetric::Cosine, DistanceMetric::Dot] {
let bad = Descriptor::new(4, Dtype::F32, metric)
.with_vector_encryption(VectorEncryption::Dcpe);
assert!(matches!(
db.create_collection("bad", bad),
Err(Error::Unsupported(_))
));
}
let good = Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
.with_vector_encryption(VectorEncryption::Dcpe);
db.create_collection("enc", good)
.expect("l2 dcpe collection");
assert_eq!(
db.descriptor("enc").expect("descriptor").vector_encryption,
VectorEncryption::Dcpe
);
}
#[test]
fn client_side_collections_are_fetch_only_and_reject_search() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
let desc = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
.with_vector_encryption(VectorEncryption::ClientSide);
db.create_collection("vault", desc)
.expect("create client-side collection");
assert!(matches!(
db.collections["vault"].index,
CollectionIndex::None
));
for i in 0..5 {
let tier = if i < 2 { "vip" } else { "std" };
db.upsert(
"vault",
&format!("p{i}"),
&[0.0; 4],
&serde_json::json!({ "__quiver_vec__": format!("ct-{i}"), "tier": tier }),
)
.expect("upsert");
}
assert_eq!(db.len("vault").unwrap(), 5);
assert!(matches!(
db.collections["vault"].index,
CollectionIndex::None
));
assert!(matches!(
db.search("vault", &[0.0; 4], &SearchParams::default()),
Err(Error::Unsupported(_))
));
let all = db.fetch("vault", None, 100, true, false).unwrap();
assert_eq!(all.len(), 5);
assert!(
all.iter()
.all(|m| m.payload.is_some() && m.vector.is_none())
);
let vip = db
.fetch(
"vault",
Some(&Filter::Eq {
field: "tier".to_owned(),
value: serde_json::json!("vip"),
}),
100,
false,
false,
)
.unwrap();
assert_eq!(vip.len(), 2);
assert_eq!(db.fetch("vault", None, 2, false, false).unwrap().len(), 2);
assert_eq!(db.get("vault", "p0").unwrap().unwrap().id, "p0");
assert!(db.delete("vault", "p0").unwrap());
assert_eq!(db.len("vault").unwrap(), 4);
}
#[test]
fn client_side_encryption_rejects_multivector() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
let desc = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
.with_multivector(true)
.with_vector_encryption(VectorEncryption::ClientSide);
assert!(matches!(
db.create_collection("bad", desc),
Err(Error::Unsupported(_))
));
}
fn contains_file(dir: &Path, name: &str) -> bool {
std::fs::read_dir(dir).is_ok_and(|rd| {
rd.flatten().any(|e| {
let p = e.path();
if p.is_dir() {
contains_file(&p, name)
} else {
p.file_name().is_some_and(|f| f == name)
}
})
})
}
#[test]
fn disk_index_collection_searches_persists_and_writes_an_artifact() {
let tmp = tempfile::tempdir().unwrap();
{
let mut db = open(tmp.path());
db.create_collection("d", desc_with(IndexKind::DiskVamana))
.unwrap();
for i in 0..40u32 {
db.upsert(
"d",
&format!("p{i}"),
&[i as f32, 0.0, 0.0, 0.0],
&json!({}),
)
.unwrap();
}
let res = db
.search("d", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
.unwrap();
assert_eq!(res[0].id, "p7");
db.checkpoint().unwrap();
}
assert!(
contains_file(tmp.path(), "vamana.qvx"),
"disk index file missing"
);
let mut db = open(tmp.path());
assert_eq!(
db.descriptor("d").unwrap().index.kind,
IndexKind::DiskVamana
);
let res = db
.search("d", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
.unwrap();
assert_eq!(res[0].id, "p7");
}
#[test]
fn graph_collections_maintain_writes_incrementally() {
for kind in [IndexKind::Vamana, IndexKind::DiskVamana] {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
db.create_collection("c", desc_with(kind)).unwrap();
for i in 0..40u32 {
db.upsert(
"c",
&format!("p{i}"),
&[i as f32, 0.0, 0.0, 0.0],
&json!({}),
)
.unwrap();
}
let res = db
.search("c", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
.unwrap();
assert_eq!(res[0].id, "p7", "{kind:?} base nearest");
db.upsert("c", "p7b", &[7.4, 0.0, 0.0, 0.0], &json!({}))
.unwrap();
let res = db
.search("c", &[7.45, 0.0, 0.0, 0.0], &SearchParams::default())
.unwrap();
assert_eq!(res[0].id, "p7b", "{kind:?} delta insert not found");
assert!(db.delete("c", "p7").unwrap());
let res = db
.search("c", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
.unwrap();
assert!(
res.iter().all(|m| m.id != "p7"),
"{kind:?} deleted id returned"
);
db.upsert("c", "p20", &[500.0, 0.0, 0.0, 0.0], &json!({}))
.unwrap();
let res = db
.search("c", &[500.0, 0.0, 0.0, 0.0], &SearchParams::default())
.unwrap();
assert_eq!(res[0].id, "p20", "{kind:?} updated vector not at new spot");
let res = db
.search("c", &[20.0, 0.0, 0.0, 0.0], &SearchParams::default())
.unwrap();
assert_ne!(
res[0].id, "p20",
"{kind:?} stale copy still nearest old spot"
);
}
}
#[test]
fn graph_consolidates_under_heavy_churn() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
db.create_collection("c", desc_with(IndexKind::Vamana))
.unwrap();
for i in 0..50u32 {
db.upsert(
"c",
&format!("p{i}"),
&[i as f32, 0.0, 0.0, 0.0],
&json!({}),
)
.unwrap();
}
let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
let deleted: Vec<String> = (0..15u32).map(|i| format!("p{i}")).collect();
for i in 0..15u32 {
assert!(db.delete("c", &format!("p{i}")).unwrap());
db.upsert(
"c",
&format!("q{i}"),
&[1000.0 + i as f32, 0.0, 0.0, 0.0],
&json!({}),
)
.unwrap();
}
let near_origin = db
.search("c", &[5.0, 0.0, 0.0, 0.0], &SearchParams::default())
.unwrap();
assert!(
near_origin.iter().all(|m| !deleted.contains(&m.id)),
"a churned-out id was returned"
);
let near_q = db
.search("c", &[1007.0, 0.0, 0.0, 0.0], &SearchParams::default())
.unwrap();
assert_eq!(near_q[0].id, "q7", "new point not found after churn");
db.checkpoint().unwrap();
drop(db);
let mut db = open(tmp.path());
let near_q = db
.search("c", &[1007.0, 0.0, 0.0, 0.0], &SearchParams::default())
.unwrap();
assert_eq!(near_q[0].id, "q7", "new point lost across reopen");
let near_origin = db
.search("c", &[5.0, 0.0, 0.0, 0.0], &SearchParams::default())
.unwrap();
assert!(
near_origin.iter().all(|m| !deleted.contains(&m.id)),
"a churned-out id resurfaced after reopen"
);
}
#[test]
fn multivector_writes_are_incremental_and_match_a_rebuild() {
let dir = |theta: f32| vec![theta.cos(), theta.sin(), 0.0, 0.0];
let doc = |theta: f32| vec![dir(theta), dir(theta)];
for kind in [
IndexKind::Ivf,
IndexKind::Hnsw,
IndexKind::Vamana,
IndexKind::Colbert,
] {
let tmp = tempfile::tempdir().unwrap();
let desc = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
.with_multivector(true)
.with_index(IndexSpec {
kind,
pq_subspaces: None,
});
let mut db = open(tmp.path());
db.create_collection("m", desc).unwrap();
for i in 1..=10u32 {
db.upsert_document(
"m",
&format!("d{i}"),
&doc(0.1 * i as f32),
&json!({ "i": i }),
)
.unwrap();
}
let q = vec![dir(0.0)];
let top = |db: &mut Database| {
db.search_multi_vector(
"m",
&q,
&SearchParams {
k: 3,
..Default::default()
},
)
.unwrap()
.into_iter()
.map(|m| m.id)
.collect::<Vec<_>>()
};
assert_eq!(top(&mut db), vec!["d1", "d2", "d3"], "{kind:?} initial");
assert!(db.delete_document("m", "d1").unwrap());
assert_eq!(
top(&mut db),
vec!["d2", "d3", "d4"],
"{kind:?} after delete"
);
db.upsert_document("m", "d10", &doc(0.0), &json!({ "i": 10 }))
.unwrap();
assert_eq!(top(&mut db)[0], "d10", "{kind:?} after update");
db.upsert_document("m", "d11", &doc(0.05), &json!({ "i": 11 }))
.unwrap();
let r = top(&mut db);
assert_eq!(r[0], "d10", "{kind:?}");
assert_eq!(r[1], "d11", "{kind:?} new doc not ranked");
db.upsert_document("m", "d8", &[dir(0.8)], &json!({ "i": 8 }))
.unwrap();
let d8 = db.get_document("m", "d8", true).unwrap().unwrap();
assert_eq!(d8.vectors.unwrap().len(), 1, "{kind:?} trailing token kept");
let before = top(&mut db);
drop(db);
let mut db = open(tmp.path());
assert_eq!(top(&mut db), before, "{kind:?} incremental != rebuild");
assert!(
db.get_document("m", "d1", false).unwrap().is_none(),
"{kind:?} deleted doc resurfaced"
);
}
}
#[test]
fn colbert_index_requires_multivector() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
let single = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine).with_index(IndexSpec {
kind: IndexKind::Colbert,
pq_subspaces: None,
});
assert!(matches!(
db.create_collection("c", single),
Err(Error::Unsupported(_))
));
let multi = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
.with_multivector(true)
.with_index(IndexSpec {
kind: IndexKind::Colbert,
pq_subspaces: None,
});
assert!(db.create_collection("m", multi).is_ok());
}
fn desc_filterable() -> Descriptor {
Descriptor::new(4, Dtype::F32, DistanceMetric::L2).with_filterable(vec![
FilterableField::keyword("city"),
FilterableField::numeric("n"),
])
}
fn seed_cities(db: &mut Database) {
const CITIES: [&str; 3] = ["paris", "lyon", "rome"];
db.create_collection("c", desc_filterable()).unwrap();
for i in 0..30u32 {
db.upsert(
"c",
&format!("p{i}"),
&[i as f32, 0.0, 0.0, 0.0],
&json!({"city": CITIES[i as usize % 3], "n": i}),
)
.unwrap();
}
db.checkpoint().unwrap();
}
#[test]
fn hybrid_equality_prefilter_is_exact() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
seed_cities(&mut db);
let params = SearchParams {
k: 5,
filter: Some(Filter::Eq {
field: "city".into(),
value: json!("lyon"),
}),
..SearchParams::default()
};
let res = db.search("c", &[0.0; 4], ¶ms).unwrap();
assert!(!res.is_empty());
assert_eq!(res[0].id, "p1");
for m in &res {
assert_eq!(m.payload.as_ref().unwrap()["city"], json!("lyon"));
}
}
#[test]
fn hybrid_numeric_range_prefilter_is_exact() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
seed_cities(&mut db);
let params = SearchParams {
k: 4,
filter: Some(Filter::Gte {
field: "n".into(),
value: json!(10),
}),
..SearchParams::default()
};
let res = db.search("c", &[0.0; 4], ¶ms).unwrap();
assert_eq!(res[0].id, "p10");
for m in &res {
assert!(m.payload.as_ref().unwrap()["n"].as_u64().unwrap() >= 10);
}
}
#[test]
fn hybrid_unsatisfiable_filter_returns_empty() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
seed_cities(&mut db);
let params = SearchParams {
filter: Some(Filter::Eq {
field: "city".into(),
value: json!("atlantis"),
}),
..SearchParams::default()
};
assert!(db.search("c", &[0.0; 4], ¶ms).unwrap().is_empty());
}
#[test]
fn hybrid_and_or_composition_is_exact() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
seed_cities(&mut db);
let params = SearchParams {
k: 10,
filter: Some(Filter::And(vec![
Filter::In {
field: "city".into(),
values: vec![json!("paris"), json!("rome")],
},
Filter::Lt {
field: "n".into(),
value: json!(12),
},
])),
..SearchParams::default()
};
let res = db.search("c", &[0.0; 4], ¶ms).unwrap();
assert_eq!(res[0].id, "p0");
for m in &res {
let payload = m.payload.as_ref().unwrap();
let city = payload["city"].as_str().unwrap();
assert!(city == "paris" || city == "rome");
assert!(payload["n"].as_u64().unwrap() < 12);
}
}
#[test]
fn hybrid_rechecks_non_indexable_clause() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
seed_cities(&mut db);
let params = SearchParams {
k: 10,
filter: Some(Filter::And(vec![
Filter::Eq {
field: "city".into(),
value: json!("paris"),
},
Filter::Not(Box::new(Filter::Eq {
field: "n".into(),
value: json!(0),
})),
])),
..SearchParams::default()
};
let res = db.search("c", &[0.0; 4], ¶ms).unwrap();
assert!(res.iter().all(|m| m.id != "p0"));
assert_eq!(res[0].id, "p3");
for m in &res {
assert_eq!(m.payload.as_ref().unwrap()["city"], json!("paris"));
}
}
#[test]
fn post_filter_fallback_on_undeclared_field_is_correct() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
db.create_collection(
"c",
Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
.with_filterable(vec![FilterableField::keyword("city")]),
)
.unwrap();
for i in 0..20u32 {
let tier = if i % 2 == 0 { "gold" } else { "silver" };
db.upsert(
"c",
&format!("p{i}"),
&[i as f32, 0.0, 0.0, 0.0],
&json!({"city": "paris", "tier": tier}),
)
.unwrap();
}
let params = SearchParams {
k: 5,
filter: Some(Filter::Eq {
field: "tier".into(),
value: json!("gold"),
}),
..SearchParams::default()
};
let res = db.search("c", &[0.0; 4], ¶ms).unwrap();
assert!(!res.is_empty());
for m in &res {
assert_eq!(m.payload.as_ref().unwrap()["tier"], json!("gold"));
}
}
#[test]
fn leaf_predicate_maps_only_indexable_filterable_leaves() {
let fields = vec![
FilterableField::keyword("city"),
FilterableField::numeric("n"),
];
assert_eq!(
leaf_predicate(
&Filter::Eq {
field: "city".into(),
value: json!("paris")
},
&fields
),
Some(SecPredicate::Eq {
field: "city".into(),
value: SecValue::Keyword("paris".into())
})
);
assert_eq!(
leaf_predicate(
&Filter::Gte {
field: "n".into(),
value: json!(3)
},
&fields
),
Some(SecPredicate::Range {
field: "n".into(),
lo: Some(SecValue::Numeric(3.0)),
hi: None,
lo_inclusive: true,
hi_inclusive: false,
})
);
let undeclared = Filter::Eq {
field: "tier".into(),
value: json!("gold"),
};
let mismatch = Filter::Eq {
field: "city".into(),
value: json!(5),
};
let ne = Filter::Ne {
field: "city".into(),
value: json!("x"),
};
let exists = Filter::Exists {
field: "city".into(),
};
assert!(leaf_predicate(&undeclared, &fields).is_none());
assert!(leaf_predicate(&mismatch, &fields).is_none());
assert!(leaf_predicate(&ne, &fields).is_none());
assert!(leaf_predicate(&exists, &fields).is_none());
}
fn ivf_index_dir(root: &Path) -> std::path::PathBuf {
root.join("collections").join("0000000000").join("index")
}
fn idx_snapshot_files(root: &Path) -> Vec<String> {
let mut v: Vec<String> = std::fs::read_dir(ivf_index_dir(root))
.map(|rd| {
rd.filter_map(std::result::Result::ok)
.filter_map(|e| e.file_name().to_str().map(str::to_owned))
.filter(|n| n.starts_with("idx-"))
.collect()
})
.unwrap_or_default();
v.sort();
v
}
fn nearest(db: &mut Database, q: &[f32]) -> Vec<String> {
db.search("c", q, &SearchParams::default())
.unwrap()
.into_iter()
.map(|m| m.id)
.collect()
}
fn seed_ivf(db: &mut Database, n: u32) {
db.create_collection("c", desc_with(IndexKind::Ivf))
.unwrap();
for i in 0..n {
db.upsert(
"c",
&format!("p{i}"),
&[i as f32, 0.0, 0.0, 0.0],
&json!({}),
)
.unwrap();
}
let _ = nearest(db, &[1.0, 0.0, 0.0, 0.0]);
}
#[test]
fn ivf_snapshot_is_written_at_checkpoint() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
seed_ivf(&mut db, 40);
db.checkpoint().unwrap();
assert_eq!(idx_snapshot_files(tmp.path()).len(), 1);
}
#[test]
fn ivf_loads_from_snapshot_rather_than_rebuilding() {
let tmp = tempfile::tempdir().unwrap();
{
let mut db = open(tmp.path());
db.create_collection("c", desc_with(IndexKind::Ivf))
.unwrap();
db.upsert("c", "a", &[0.0, 0.0, 0.0, 0.0], &json!({}))
.unwrap();
db.upsert("c", "m", &[1.0, 0.0, 0.0, 0.0], &json!({}))
.unwrap();
let _ = nearest(&mut db, &[0.0, 0.0, 0.0, 0.0]);
db.upsert("c", "z", &[2.0, 0.0, 0.0, 0.0], &json!({}))
.unwrap();
db.upsert("c", "b", &[3.0, 0.0, 0.0, 0.0], &json!({}))
.unwrap();
db.checkpoint().unwrap();
assert_eq!(db.collections["c"].int_to_ext, ["a", "m", "z", "b"]);
}
let db = open(tmp.path());
assert_eq!(
db.collections["c"].int_to_ext,
["a", "m", "z", "b"],
"index was rebuilt, not loaded from the snapshot"
);
}
#[test]
fn ivf_recovery_replays_post_checkpoint_upserts() {
let tmp = tempfile::tempdir().unwrap();
{
let mut db = open(tmp.path());
seed_ivf(&mut db, 30);
db.checkpoint().unwrap();
db.upsert("c", "far", &[500.0, 0.0, 0.0, 0.0], &json!({}))
.unwrap();
}
let mut db = open(tmp.path());
assert_eq!(nearest(&mut db, &[500.0, 0.0, 0.0, 0.0])[0], "far");
assert_eq!(nearest(&mut db, &[1.0, 0.0, 0.0, 0.0])[0], "p1");
}
#[test]
fn ivf_recovery_replays_post_checkpoint_deletes() {
let tmp = tempfile::tempdir().unwrap();
{
let mut db = open(tmp.path());
seed_ivf(&mut db, 30);
db.checkpoint().unwrap();
assert!(db.delete("c", "p7").unwrap());
}
let mut db = open(tmp.path());
assert!(
nearest(&mut db, &[7.0, 0.0, 0.0, 0.0])
.iter()
.all(|id| id != "p7")
);
assert!(db.get("c", "p7").unwrap().is_none());
assert!(db.get("c", "p6").unwrap().is_some());
}
#[test]
fn ivf_recovery_replays_post_checkpoint_updates() {
let tmp = tempfile::tempdir().unwrap();
{
let mut db = open(tmp.path());
seed_ivf(&mut db, 30);
db.checkpoint().unwrap();
db.upsert("c", "p0", &[999.0, 0.0, 0.0, 0.0], &json!({}))
.unwrap();
}
let mut db = open(tmp.path());
assert_eq!(nearest(&mut db, &[999.0, 0.0, 0.0, 0.0])[0], "p0");
assert_ne!(
nearest(&mut db, &[0.0, 0.0, 0.0, 0.0])[0],
"p0",
"the stale p0 vector survived the update"
);
}
#[test]
fn corrupt_ivf_snapshot_falls_back_to_rebuild() {
let tmp = tempfile::tempdir().unwrap();
{
let mut db = open(tmp.path());
seed_ivf(&mut db, 30);
db.checkpoint().unwrap();
}
let files = idx_snapshot_files(tmp.path());
assert_eq!(files.len(), 1);
std::fs::write(ivf_index_dir(tmp.path()).join(&files[0]), b"corrupt").unwrap();
let mut db = open(tmp.path());
assert_eq!(nearest(&mut db, &[7.0, 0.0, 0.0, 0.0])[0], "p7");
}
fn mv_desc() -> Descriptor {
Descriptor::new(3, Dtype::F32, DistanceMetric::Cosine).with_multivector(true)
}
fn bf_rank(query: &[Vec<f32>], corpus: &[(&str, Vec<Vec<f32>>)]) -> Vec<(String, f32)> {
let mut v: Vec<(String, f32)> = corpus
.iter()
.map(|(id, toks)| ((*id).to_owned(), max_sim(Metric::Cosine, query, toks)))
.collect();
v.sort_by(|a, b| b.1.total_cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
v
}
#[test]
fn multivector_search_ranks_documents_by_maxsim() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
db.create_collection("docs", mv_desc()).unwrap();
let corpus: Vec<(&str, Vec<Vec<f32>>)> = vec![
("d_cat", vec![vec![1.0, 0.0, 0.0], vec![0.0, 1.0, 0.0]]),
("d_dog", vec![vec![0.0, 1.0, 0.0], vec![0.0, 0.0, 1.0]]),
(
"d_mix",
vec![
vec![1.0, 1.0, 0.0],
vec![0.0, 0.0, 1.0],
vec![1.0, 0.0, 1.0],
],
),
];
for (id, toks) in &corpus {
db.upsert_document("docs", id, toks, &json!({ "id": id }))
.unwrap();
}
assert_eq!(db.document_count("docs").unwrap(), 3);
let query = vec![vec![1.0, 0.0, 0.0], vec![0.0, 0.0, 1.0]];
let params = SearchParams {
k: 3,
with_payload: false,
..SearchParams::default()
};
let got = db.search_multi_vector("docs", &query, ¶ms).unwrap();
let expected = bf_rank(&query, &corpus);
assert_eq!(got.len(), 3);
for (g, (eid, escore)) in got.iter().zip(expected.iter()) {
assert_eq!(&g.id, eid, "ranking matches brute force");
assert!(
(g.score - escore).abs() < 1e-5,
"{} score {} vs {escore}",
g.id,
g.score
);
}
}
#[test]
fn multivector_search_truncates_to_k() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
db.create_collection("docs", mv_desc()).unwrap();
for i in 0..5 {
let v = vec![vec![1.0, i as f32, 0.0]];
db.upsert_document("docs", &format!("d{i}"), &v, &json!({}))
.unwrap();
}
let params = SearchParams {
k: 2,
..SearchParams::default()
};
let got = db
.search_multi_vector("docs", &[vec![1.0, 0.0, 0.0]], ¶ms)
.unwrap();
assert_eq!(got.len(), 2);
}
#[test]
fn multivector_filter_selects_documents_exactly() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
db.create_collection("docs", mv_desc()).unwrap();
db.upsert_document("docs", "a", &[vec![1.0, 0.0, 0.0]], &json!({"lang":"en"}))
.unwrap();
db.upsert_document("docs", "b", &[vec![1.0, 0.0, 0.0]], &json!({"lang":"fr"}))
.unwrap();
let params = SearchParams {
k: 10,
filter: Some(Filter::Eq {
field: "lang".into(),
value: json!("fr"),
}),
..SearchParams::default()
};
let got = db
.search_multi_vector("docs", &[vec![1.0, 0.0, 0.0]], ¶ms)
.unwrap();
assert_eq!(got.len(), 1);
assert_eq!(got[0].id, "b");
assert_eq!(got[0].payload, Some(json!({"lang":"fr"})));
}
#[test]
fn multivector_reopen_rebuilds_grouping_and_ranking() {
let tmp = tempfile::tempdir().unwrap();
let query = vec![vec![1.0, 0.0, 0.0], vec![0.0, 0.0, 1.0]];
let corpus: Vec<(&str, Vec<Vec<f32>>)> = vec![
("x", vec![vec![1.0, 0.0, 0.0], vec![0.0, 1.0, 0.0]]),
("y", vec![vec![0.0, 0.0, 1.0], vec![1.0, 0.0, 1.0]]),
];
{
let mut db = open(tmp.path());
db.create_collection("docs", mv_desc()).unwrap();
for (id, toks) in &corpus {
db.upsert_document("docs", id, toks, &json!({})).unwrap();
}
db.checkpoint().unwrap();
}
let mut db = open(tmp.path());
assert_eq!(db.document_count("docs").unwrap(), 2);
let params = SearchParams {
k: 2,
..SearchParams::default()
};
let got = db.search_multi_vector("docs", &query, ¶ms).unwrap();
let expected = bf_rank(&query, &corpus);
assert_eq!(
got.iter().map(|m| m.id.clone()).collect::<Vec<_>>(),
expected
.iter()
.map(|(id, _)| id.clone())
.collect::<Vec<_>>()
);
}
#[test]
fn multivector_delete_document_removes_all_tokens() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
db.create_collection("docs", mv_desc()).unwrap();
db.upsert_document(
"docs",
"a",
&[vec![1.0, 0.0, 0.0], vec![0.0, 1.0, 0.0]],
&json!({}),
)
.unwrap();
db.upsert_document("docs", "b", &[vec![0.0, 0.0, 1.0]], &json!({}))
.unwrap();
assert_eq!(db.document_count("docs").unwrap(), 2);
assert_eq!(db.len("docs").unwrap(), 3);
assert!(db.delete_document("docs", "a").unwrap());
assert_eq!(db.document_count("docs").unwrap(), 1);
assert_eq!(db.len("docs").unwrap(), 1);
assert!(db.get_document("docs", "a", false).unwrap().is_none());
let params = SearchParams {
k: 10,
..SearchParams::default()
};
let got = db
.search_multi_vector("docs", &[vec![1.0, 0.0, 0.0]], ¶ms)
.unwrap();
assert!(got.iter().all(|m| m.id != "a"));
assert!(!db.delete_document("docs", "a").unwrap());
}
#[test]
fn multivector_reupsert_replaces_tokens() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
db.create_collection("docs", mv_desc()).unwrap();
db.upsert_document(
"docs",
"a",
&[
vec![1.0, 0.0, 0.0],
vec![0.0, 1.0, 0.0],
vec![0.0, 0.0, 1.0],
],
&json!({"v":1}),
)
.unwrap();
assert_eq!(db.len("docs").unwrap(), 3);
db.upsert_document("docs", "a", &[vec![0.0, 0.0, 1.0]], &json!({"v":2}))
.unwrap();
assert_eq!(db.document_count("docs").unwrap(), 1);
assert_eq!(db.len("docs").unwrap(), 1);
let doc = db.get_document("docs", "a", true).unwrap().unwrap();
assert_eq!(doc.payload, Some(json!({"v":2})));
assert_eq!(doc.vectors, Some(vec![vec![0.0, 0.0, 1.0]]));
}
#[test]
fn single_and_multi_vector_apis_are_mutually_exclusive() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
db.create_collection("mv", mv_desc()).unwrap();
db.create_collection("sv", Descriptor::new(3, Dtype::F32, DistanceMetric::Cosine))
.unwrap();
assert!(matches!(
db.upsert("mv", "a", &[1.0, 0.0, 0.0], &json!({})),
Err(Error::Unsupported(_))
));
assert!(matches!(
db.search("mv", &[1.0, 0.0, 0.0], &SearchParams::default()),
Err(Error::Unsupported(_))
));
assert!(matches!(
db.upsert_document("sv", "a", &[vec![1.0, 0.0, 0.0]], &json!({})),
Err(Error::Unsupported(_))
));
assert!(matches!(
db.search_multi_vector("sv", &[vec![1.0, 0.0, 0.0]], &SearchParams::default()),
Err(Error::Unsupported(_))
));
assert!(matches!(
db.document_count("sv"),
Err(Error::Unsupported(_))
));
}
#[test]
fn multivector_rejects_l2_metric_and_bad_documents() {
let tmp = tempfile::tempdir().unwrap();
let mut db = open(tmp.path());
let l2 = Descriptor::new(3, Dtype::F32, DistanceMetric::L2).with_multivector(true);
assert!(matches!(
db.create_collection("bad", l2),
Err(Error::Unsupported(_))
));
db.create_collection("docs", mv_desc()).unwrap();
assert!(matches!(
db.upsert_document("docs", "a\u{1f}b", &[vec![1.0, 0.0, 0.0]], &json!({})),
Err(Error::Unsupported(_))
));
assert!(matches!(
db.upsert_document("docs", "a", &[], &json!({})),
Err(Error::Unsupported(_))
));
assert!(matches!(
db.upsert_document("docs", "a", &[vec![1.0, 0.0]], &json!({})),
Err(Error::Unsupported(_))
));
}
#[test]
fn snapshot_then_open_reproduces_the_database() {
let src = tempfile::tempdir().unwrap();
let mut db = open(src.path());
db.create_collection("kb", desc()).unwrap();
db.create_collection("kb2", desc()).unwrap();
db.upsert("kb", "a", &[1.0, 0.0, 0.0, 0.0], &json!({ "n": 1 }))
.unwrap();
db.upsert("kb", "b", &[0.0, 1.0, 0.0, 0.0], &json!({ "n": 2 }))
.unwrap();
db.upsert("kb2", "z", &[0.0, 0.0, 1.0, 0.0], &json!({ "n": 3 }))
.unwrap();
let dest = tempfile::tempdir().unwrap();
let snap_dir = dest.path().join("snap");
let info = db.snapshot(&snap_dir).unwrap();
assert!(info.files > 0 && info.bytes > 0);
assert_eq!(info.manifest_version, db.manifest_version());
db.upsert("kb", "late", &[1.0, 1.0, 0.0, 0.0], &json!({ "n": 9 }))
.unwrap();
let restored = open(&snap_dir);
let mut names = restored.collection_names();
names.sort();
assert_eq!(names, vec!["kb".to_owned(), "kb2".to_owned()]);
assert_eq!(restored.len("kb").unwrap(), 2, "no post-snapshot write");
assert_eq!(
restored.get("kb", "a").unwrap().unwrap().payload,
Some(json!({ "n": 1 }))
);
assert_eq!(restored.len("kb2").unwrap(), 1);
assert!(restored.get("kb", "late").unwrap().is_none());
}
#[test]
fn snapshot_refuses_an_existing_destination() {
let src = tempfile::tempdir().unwrap();
let mut db = open(src.path());
let dest = tempfile::tempdir().unwrap(); assert!(matches!(
db.snapshot(dest.path()),
Err(Error::Core(quiver_core::CoreError::AlreadyExists(_)))
));
}
#[test]
fn restore_snapshot_roundtrips_and_guards() {
let src = tempfile::tempdir().unwrap();
let mut db = open(src.path());
db.create_collection("kb", desc()).unwrap();
db.upsert("kb", "a", &[1.0, 0.0, 0.0, 0.0], &json!({ "n": 1 }))
.unwrap();
let work = tempfile::tempdir().unwrap();
let snap_dir = work.path().join("snap");
db.snapshot(&snap_dir).unwrap();
let restored_dir = work.path().join("restored");
let info = restore_snapshot(&snap_dir, &restored_dir).unwrap();
assert!(info.files > 0);
let restored = open(&restored_dir);
assert_eq!(restored.len("kb").unwrap(), 1);
assert!(matches!(
restore_snapshot(&snap_dir, &restored_dir),
Err(Error::Core(quiver_core::CoreError::AlreadyExists(_)))
));
let not_snap = work.path().join("not-a-snapshot");
std::fs::create_dir_all(¬_snap).unwrap();
assert!(matches!(
restore_snapshot(¬_snap, &work.path().join("out")),
Err(Error::Core(quiver_core::CoreError::InvalidArgument(_)))
));
}
}