1use std::collections::{BTreeMap, BTreeSet, HashMap};
46use std::path::Path;
47
48use quiver_core::{SecPredicate, SecValue, Store};
49use quiver_index::{
50 ColbertConfig, ColbertIndex, DiskVamana, FreshDiskVamana, FreshVamana, Hnsw, HnswConfig, Index,
51 Ivf, IvfConfig, Metric, Neighbor, ProductQuantizer, Vamana, VamanaConfig, max_sim,
52 ordering_distance, report_metric,
53};
54use serde::{Deserialize, Serialize};
55use serde_json::Value;
56use thiserror::Error;
57
58pub use quiver_core::keyring::{KeyRing, SingleCodecKeyRing};
59pub use quiver_core::page::PageCodec;
60pub use quiver_core::{CollectionId, CommitObserver, WalEntry, WalOp};
61pub use quiver_core::{
62 Descriptor, DistanceMetric, Dtype, FieldType, FilterableField, IndexKind, IndexSpec,
63 VectorEncryption,
64};
65pub use quiver_query::Filter;
66pub use quiver_query::{
67 BM25_B, BM25_K1, DEFAULT_RRF_K0, SPARSE_KEY, SparseInvertedIndex, SparseVector, TEXT_KEY,
68 query_term_ids, rrf_fuse, text_to_sparse,
69};
70
71#[derive(Debug, Error)]
73#[non_exhaustive]
74pub enum Error {
75 #[error(transparent)]
78 Core(#[from] quiver_core::CoreError),
79 #[error(transparent)]
81 Index(#[from] quiver_index::IndexError),
82 #[error(transparent)]
84 Disk(#[from] quiver_index::DiskError),
85 #[error("payload json error: {0}")]
87 Json(#[from] serde_json::Error),
88 #[error("collection not found: {0}")]
90 CollectionNotFound(String),
91 #[error("unsupported configuration: {0}")]
93 Unsupported(&'static str),
94 #[error(transparent)]
97 IndexSnapshot(#[from] quiver_index::SnapshotError),
98 #[error("index snapshot envelope: {0}")]
100 Envelope(#[from] postcard::Error),
101}
102
103pub type Result<T> = std::result::Result<T, Error>;
105
106#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
109pub struct SnapshotInfo {
110 pub manifest_version: u64,
112 pub files: u64,
114 pub bytes: u64,
116}
117
118#[derive(Debug, Clone, PartialEq)]
120pub struct Match {
121 pub id: String,
123 pub score: f32,
125 pub payload: Option<Value>,
127 pub vector: Option<Vec<f32>>,
129}
130
131#[derive(Debug, Clone, PartialEq)]
135pub struct DocumentMatch {
136 pub id: String,
138 pub score: f32,
140 pub payload: Option<Value>,
142 pub vectors: Option<Vec<Vec<f32>>>,
144}
145
146type ScoredDocument = (f32, String, Option<Value>, Option<Vec<Vec<f32>>>);
151
152#[derive(Debug, Clone)]
154pub struct SearchParams {
155 pub k: usize,
157 pub filter: Option<Filter>,
162 pub ef_search: usize,
164 pub with_payload: bool,
166 pub with_vector: bool,
168}
169
170impl Default for SearchParams {
171 fn default() -> Self {
172 Self {
173 k: 10,
174 filter: None,
175 ef_search: 64,
176 with_payload: true,
177 with_vector: false,
178 }
179 }
180}
181
182const FILTER_OVERFETCH: usize = 8;
185
186const RRF_CANDIDATE_FACTOR: usize = 10;
191const MIN_RRF_CANDIDATES: usize = 100;
192
193const FULL_SCAN_THRESHOLD: usize = 10_000;
201
202const HNSW_REBUILD_DELETED_FRACTION: f64 = 0.2;
206
207const GRAPH_REBUILD_PENDING_FRACTION: f64 = 0.2;
213
214enum CollectionIndex {
221 None,
225 Hnsw(Hnsw),
226 Vamana(Option<FreshVamana>),
227 Ivf(Option<Ivf>),
228 Disk(Option<FreshDiskVamana>),
232 Colbert(Option<ColbertIndex>),
236}
237
238impl CollectionIndex {
239 fn search(&self, query: &[f32], k: usize, ef: usize) -> Result<Vec<Neighbor>> {
241 Ok(match self {
242 CollectionIndex::Hnsw(h) => h.search(query, k, ef)?,
243 CollectionIndex::Vamana(Some(g)) => g.search(query, k, ef)?,
244 CollectionIndex::Ivf(Some(i)) => i.search(query, k, ef)?,
245 CollectionIndex::Disk(Some(d)) => d.search(query, k, ef)?,
246 CollectionIndex::Colbert(Some(c)) => c.search(query, k, ef)?,
247 CollectionIndex::None
248 | CollectionIndex::Vamana(None)
249 | CollectionIndex::Ivf(None)
250 | CollectionIndex::Disk(None)
251 | CollectionIndex::Colbert(None) => Vec::new(),
252 })
253 }
254}
255
256#[derive(Serialize, Deserialize)]
262struct IndexEnvelope {
263 version: u16,
264 int_to_ext: Vec<String>,
265 ivf: Vec<u8>,
266}
267
268const INDEX_ENVELOPE_VERSION: u16 = 1;
271
272#[derive(Serialize, Deserialize)]
281struct DiskEnvelope {
282 version: u16,
283 int_to_ext: Vec<String>,
284 base_row_count: u64,
285 deleted_ids: Vec<u64>,
286}
287
288struct CollectionHandle {
289 id: CollectionId,
290 descriptor: Descriptor,
291 index: CollectionIndex,
292 int_to_ext: Vec<String>,
293 ext_to_int: HashMap<String, u64>,
294 stale: bool,
295 write_gen: u64,
303 docs: Option<BTreeMap<String, u32>>,
309 sparse: Option<SparseInvertedIndex>,
316}
317
318fn uses_sparse_index(descriptor: &Descriptor) -> bool {
321 !descriptor.multivector && descriptor.vector_encryption != VectorEncryption::ClientSide
322}
323
324fn mark_stale(handle: &mut CollectionHandle) {
328 handle.stale = true;
329 bump_write_gen(handle);
330}
331
332fn bump_write_gen(handle: &mut CollectionHandle) {
341 handle.write_gen = handle.write_gen.wrapping_add(1);
342}
343
344pub struct Database {
346 store: Store,
347 collections: HashMap<String, CollectionHandle>,
348}
349
350impl Database {
351 pub fn open(dir: &Path) -> Result<Self> {
354 Self::from_store(Store::open(dir)?)
355 }
356
357 pub fn open_with_codec(dir: &Path, codec: Box<dyn PageCodec>) -> Result<Self> {
362 Self::from_store(Store::open_with_codec(dir, codec)?)
363 }
364
365 pub fn open_with_keyring(dir: &Path, keyring: Box<dyn KeyRing>) -> Result<Self> {
370 Self::from_store(Store::open_with_keyring(dir, keyring)?)
371 }
372
373 fn from_store(store: Store) -> Result<Self> {
375 let mut collections = HashMap::new();
376 for name in store.collection_names() {
377 let Some(id) = store.collection_id(&name) else {
378 continue;
379 };
380 let Some(descriptor) = store.descriptor(id).cloned() else {
381 continue;
382 };
383 let mut handle = CollectionHandle {
384 id,
385 index: empty_index(&descriptor),
386 descriptor,
387 int_to_ext: Vec::new(),
388 ext_to_int: HashMap::new(),
389 stale: true,
390 write_gen: 0,
391 docs: None,
392 sparse: None,
394 };
395 load_index(&store, &mut handle)?;
396 collections.insert(name, handle);
397 }
398 Ok(Self { store, collections })
399 }
400
401 pub fn create_collection(&mut self, name: &str, descriptor: Descriptor) -> Result<()> {
404 validate_index(&descriptor)?;
405 let id = self.store.create_collection(name, descriptor.clone())?;
406 let index = empty_index(&descriptor);
407 let docs = descriptor.multivector.then(BTreeMap::new);
408 let sparse = uses_sparse_index(&descriptor).then(SparseInvertedIndex::new);
412 self.collections.insert(
413 name.to_owned(),
414 CollectionHandle {
415 id,
416 descriptor,
417 index,
418 int_to_ext: Vec::new(),
419 ext_to_int: HashMap::new(),
420 stale: false,
421 write_gen: 0,
422 docs,
423 sparse,
424 },
425 );
426 Ok(())
427 }
428
429 pub fn drop_collection(&mut self, name: &str) -> Result<bool> {
431 let existed = self.store.drop_collection(name)?;
432 self.collections.remove(name);
433 Ok(existed)
434 }
435
436 pub fn shred_collection(&mut self, name: &str) -> Result<bool> {
442 let existed = self.store.shred_collection(name)?;
443 self.collections.remove(name);
444 Ok(existed)
445 }
446
447 pub fn set_commit_observer(&mut self, observer: CommitObserver) {
451 self.store.set_commit_observer(observer);
452 }
453
454 pub fn replication_snapshot(&self) -> Result<Vec<WalOp>> {
460 Ok(self.store.replication_snapshot()?)
461 }
462
463 pub fn apply_replicated(&mut self, op: WalOp) -> Result<()> {
472 let target = match &op {
473 WalOp::CreateCollection { collection_id, .. }
474 | WalOp::DropCollection { collection_id }
475 | WalOp::Upsert { collection_id, .. }
476 | WalOp::Delete { collection_id, .. } => Some(*collection_id),
477 WalOp::Checkpoint { .. } => None,
478 };
479 let create_name = match &op {
480 WalOp::CreateCollection { name, .. } => Some(name.clone()),
481 _ => None,
482 };
483 let is_drop = matches!(op, WalOp::DropCollection { .. });
484 self.store.apply_replicated(op)?;
485
486 if let Some(name) = create_name {
487 if let Some(id) = target
489 && let Some(descriptor) = self.store.descriptor(id).cloned()
490 {
491 let docs = descriptor.multivector.then(BTreeMap::new);
492 let index = empty_index(&descriptor);
493 self.collections.insert(
496 name,
497 CollectionHandle {
498 id,
499 descriptor,
500 index,
501 int_to_ext: Vec::new(),
502 ext_to_int: HashMap::new(),
503 stale: false,
504 write_gen: 0,
505 docs,
506 sparse: None,
507 },
508 );
509 }
510 } else if is_drop {
511 if let Some(id) = target {
512 self.collections.retain(|_, h| h.id != id);
513 }
514 } else if let Some(id) = target
515 && let Some(handle) = self.collections.values_mut().find(|h| h.id == id)
516 {
517 mark_stale(handle);
518 }
519 Ok(())
520 }
521
522 #[must_use]
524 pub fn collection_names(&self) -> Vec<String> {
525 self.store.collection_names()
526 }
527
528 #[must_use]
530 pub fn descriptor(&self, name: &str) -> Option<&Descriptor> {
531 self.collections.get(name).map(|h| &h.descriptor)
532 }
533
534 pub fn len(&self, name: &str) -> Result<usize> {
536 let handle = self.handle(name)?;
537 Ok(self.store.len(handle.id)?)
538 }
539
540 pub fn is_empty(&self, name: &str) -> Result<bool> {
542 Ok(self.len(name)? == 0)
543 }
544
545 pub fn upsert(
547 &mut self,
548 collection: &str,
549 id: &str,
550 vector: &[f32],
551 payload: &Value,
552 ) -> Result<()> {
553 let handle = self
554 .collections
555 .get_mut(collection)
556 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
557 require_single_vector(handle)?;
558 let payload_bytes = serde_json::to_vec(payload)?;
559 self.store.upsert(handle.id, id, vector, &payload_bytes)?;
560 if handle.descriptor.vector_encryption == VectorEncryption::ClientSide {
564 return Ok(());
565 }
566 index_upsert_point(handle, id, vector)?;
569 sparse_index_upsert_point(handle, id, payload);
571 Ok(())
572 }
573
574 pub fn upsert_batch(
581 &mut self,
582 collection: &str,
583 points: &[(&str, &[f32], &serde_json::Value)],
584 ) -> Result<u64> {
585 let handle = self
586 .collections
587 .get(collection)
588 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
589 require_single_vector(handle)?;
590 let coll_id = handle.id;
591 let is_client_side = handle.descriptor.vector_encryption == VectorEncryption::ClientSide;
592
593 let payload_bytes: Vec<Vec<u8>> = points
594 .iter()
595 .map(|(_, _, p)| serde_json::to_vec(p).map_err(Error::Json))
596 .collect::<Result<_>>()?;
597
598 let records: Vec<(&str, &[f32], &[u8])> = points
599 .iter()
600 .zip(payload_bytes.iter())
601 .map(|((id, vec, _), p)| (*id, *vec, p.as_slice()))
602 .collect();
603
604 self.store.upsert_batch(coll_id, &records)?;
605
606 if is_client_side {
607 return Ok(records.len() as u64);
608 }
609
610 for (id, vector, payload) in points {
611 let handle = self
612 .collections
613 .get_mut(collection)
614 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
615 index_upsert_point(handle, id, vector)?;
616 sparse_index_upsert_point(handle, id, payload);
617 }
618 Ok(records.len() as u64)
619 }
620
621 pub fn upsert_bulk(
632 &mut self,
633 collection: &str,
634 points: &[(&str, &[f32], &serde_json::Value)],
635 ) -> Result<u64> {
636 let handle = self
637 .collections
638 .get_mut(collection)
639 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
640 require_single_vector(handle)?;
641 let coll_id = handle.id;
642
643 let payload_bytes: Vec<Vec<u8>> = points
644 .iter()
645 .map(|(_, _, p)| serde_json::to_vec(p).map_err(Error::Json))
646 .collect::<Result<_>>()?;
647 let records: Vec<(&str, &[f32], &[u8])> = points
648 .iter()
649 .zip(payload_bytes.iter())
650 .map(|((id, vec, _), p)| (*id, *vec, p.as_slice()))
651 .collect();
652
653 self.store.upsert_batch(coll_id, &records)?;
654
655 let handle = self
659 .collections
660 .get_mut(collection)
661 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
662 mark_stale(handle);
663 Ok(records.len() as u64)
664 }
665
666 pub fn delete(&mut self, collection: &str, id: &str) -> Result<bool> {
668 let handle = self
669 .collections
670 .get_mut(collection)
671 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
672 require_single_vector(handle)?;
673 let existed = self.store.delete(handle.id, id)?;
674 if !existed {
675 return Ok(false);
676 }
677 if handle.descriptor.vector_encryption == VectorEncryption::ClientSide {
680 return Ok(true);
681 }
682 index_delete_point(handle, id);
688 sparse_index_delete_point(handle, id);
690 Ok(true)
691 }
692
693 pub fn get(&self, collection: &str, id: &str) -> Result<Option<Match>> {
695 let handle = self.handle(collection)?;
696 require_single_vector(handle)?;
697 match self.store.get(handle.id, id)? {
698 Some(record) => Ok(Some(Match {
699 id: id.to_owned(),
700 score: 0.0,
701 payload: Some(serde_json::from_slice(&record.payload)?),
702 vector: Some(record.vector),
703 })),
704 None => Ok(None),
705 }
706 }
707
708 pub fn fetch(
722 &self,
723 collection: &str,
724 filter: Option<&Filter>,
725 limit: usize,
726 with_payload: bool,
727 with_vector: bool,
728 ) -> Result<Vec<Match>> {
729 let handle = self.handle(collection)?;
730 require_single_vector(handle)?;
731 let mut out = Vec::new();
732 for (id, record) in self.store.scan(handle.id)? {
733 if out.len() >= limit {
734 break;
735 }
736 let payload: Value = serde_json::from_slice(&record.payload)?;
737 if let Some(filter) = filter
738 && !filter.matches(&payload)
739 {
740 continue;
741 }
742 out.push(Match {
743 id,
744 score: 0.0,
745 payload: with_payload.then_some(payload),
746 vector: with_vector.then_some(record.vector),
747 });
748 }
749 Ok(out)
750 }
751
752 pub fn ensure_indexed(&mut self, collection: &str) -> Result<()> {
758 if self.handle(collection)?.stale {
759 let store = &self.store;
760 let handle = self
761 .collections
762 .get_mut(collection)
763 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
764 rebuild_index(store, handle)?;
765 }
766 Ok(())
767 }
768
769 pub fn needs_rebuild(&self, collection: &str) -> Result<bool> {
774 Ok(self.handle(collection)?.stale)
775 }
776
777 pub fn snapshot_rebuild_inputs(&self, collection: &str) -> Result<Option<RebuildInputs>> {
783 let handle = self.handle(collection)?;
784 if !handle.stale {
785 return Ok(None);
786 }
787 let scan = scan_collection(&self.store, handle)?;
788 Ok(Some(RebuildInputs {
789 collection: collection.to_owned(),
790 descriptor: handle.descriptor.clone(),
791 scan,
792 write_gen: handle.write_gen,
793 }))
794 }
795
796 pub fn commit_rebuild(&mut self, rebuilt: RebuiltIndex) -> Result<bool> {
803 let store = &self.store;
804 let Some(handle) = self.collections.get_mut(&rebuilt.collection) else {
805 return Ok(false);
806 };
807 match rebuilt.kind {
808 RebuiltKind::Ready(index) => handle.index = *index,
809 RebuiltKind::Disk { graph, pq } => {
810 handle.index = empty_index(&handle.descriptor);
814 let disk = write_disk_index(store, handle.id, &graph, &pq)?;
815 handle.index = CollectionIndex::Disk(Some(FreshDiskVamana::new(disk)?));
816 }
817 }
818 handle.int_to_ext = rebuilt.int_to_ext;
819 handle.ext_to_int = rebuilt.ext_to_int;
820 handle.docs = rebuilt.docs;
821 handle.sparse = rebuilt.sparse;
822 let still_stale = handle.write_gen != rebuilt.write_gen;
825 handle.stale = still_stale;
826 Ok(still_stale)
827 }
828
829 pub fn search(
832 &mut self,
833 collection: &str,
834 query: &[f32],
835 params: &SearchParams,
836 ) -> Result<Vec<Match>> {
837 self.ensure_indexed(collection)?;
842 self.search_snapshot(collection, query, params)
843 }
844
845 pub fn search_snapshot(
852 &self,
853 collection: &str,
854 query: &[f32],
855 params: &SearchParams,
856 ) -> Result<Vec<Match>> {
857 require_single_vector(self.handle(collection)?)?;
858 require_server_searchable(self.handle(collection)?)?;
859
860 let handle = self.handle(collection)?;
861
862 if let Some(filter) = ¶ms.filter
866 && let Some(candidates) = candidate_ids(
867 &self.store,
868 handle.id,
869 filter,
870 &handle.descriptor.filterable,
871 )?
872 && candidates.len() <= FULL_SCAN_THRESHOLD
873 {
874 return self.exact_filtered_search(
875 handle.id,
876 &handle.descriptor,
877 query,
878 params,
879 filter,
880 &candidates,
881 );
882 }
883
884 let fetch = if params.filter.is_some() {
885 params
886 .k
887 .saturating_mul(FILTER_OVERFETCH)
888 .max(params.ef_search)
889 } else {
890 params.k
891 };
892 let raw = handle.index.search(query, fetch, params.ef_search)?;
893
894 let need_record = params.filter.is_some() || params.with_payload || params.with_vector;
895 let mut out = Vec::with_capacity(params.k);
896 for neighbor in raw {
897 if out.len() >= params.k {
898 break;
899 }
900 let Some(ext_id) = handle.int_to_ext.get(neighbor.id as usize) else {
901 continue;
902 };
903 let record = if need_record {
904 self.store.get(handle.id, ext_id)?
905 } else {
906 None
907 };
908 let payload_value: Option<Value> = match &record {
909 Some(r) if params.filter.is_some() || params.with_payload => {
910 Some(serde_json::from_slice(&r.payload)?)
911 }
912 _ => None,
913 };
914 if let Some(filter) = ¶ms.filter {
915 let value = payload_value.as_ref().unwrap_or(&Value::Null);
916 if !filter.matches(value) {
917 continue;
918 }
919 }
920 out.push(Match {
921 id: ext_id.clone(),
922 score: neighbor.distance,
923 payload: if params.with_payload {
924 payload_value
925 } else {
926 None
927 },
928 vector: if params.with_vector {
929 record.map(|r| r.vector)
930 } else {
931 None
932 },
933 });
934 }
935 Ok(out)
936 }
937
938 pub fn hybrid_search(
946 &mut self,
947 collection: &str,
948 dense_query: Option<&[f32]>,
949 sparse_query: Option<&SparseVector>,
950 text_query: Option<&str>,
951 params: &SearchParams,
952 rrf_k0: f32,
953 ) -> Result<Vec<Match>> {
954 self.ensure_indexed(collection)?;
958 self.hybrid_search_snapshot(
959 collection,
960 dense_query,
961 sparse_query,
962 text_query,
963 params,
964 rrf_k0,
965 )
966 }
967
968 pub fn hybrid_search_snapshot(
973 &self,
974 collection: &str,
975 dense_query: Option<&[f32]>,
976 sparse_query: Option<&SparseVector>,
977 text_query: Option<&str>,
978 params: &SearchParams,
979 rrf_k0: f32,
980 ) -> Result<Vec<Match>> {
981 require_single_vector(self.handle(collection)?)?;
982 require_server_searchable(self.handle(collection)?)?;
983 if dense_query.is_none() && sparse_query.is_none() && text_query.is_none() {
984 return Err(Error::Unsupported(
985 "hybrid_search requires a dense query, a sparse query, or a text query",
986 ));
987 }
988 let handle = self.handle(collection)?;
989
990 let depth = params
993 .k
994 .saturating_mul(RRF_CANDIDATE_FACTOR)
995 .max(MIN_RRF_CANDIDATES);
996 let filter = params.filter.as_ref();
997 let mut lists: Vec<Vec<String>> = Vec::new();
998 if let Some(q) = dense_query {
999 lists.push(self.dense_ranked_ids(handle, q, depth, params.ef_search, filter)?);
1000 }
1001 if let Some(sp) = sparse_query {
1002 lists.push(self.sparse_ranked_ids(handle, sp, depth, filter)?);
1003 }
1004 if let Some(text) = text_query {
1005 lists.push(self.bm25_ranked_ids(handle, text, depth, filter)?);
1006 }
1007 let fused = rrf_fuse(&lists, rrf_k0, params.k);
1008
1009 let mut out = Vec::with_capacity(fused.len());
1010 for (ext_id, score) in fused {
1011 let record = if params.with_payload || params.with_vector {
1012 self.store.get(handle.id, &ext_id)?
1013 } else {
1014 None
1015 };
1016 let payload = match (&record, params.with_payload) {
1017 (Some(r), true) => Some(serde_json::from_slice(&r.payload)?),
1018 _ => None,
1019 };
1020 out.push(Match {
1021 id: ext_id,
1022 score,
1023 payload,
1024 vector: if params.with_vector {
1025 record.map(|r| r.vector)
1026 } else {
1027 None
1028 },
1029 });
1030 }
1031 Ok(out)
1032 }
1033
1034 fn dense_ranked_ids(
1037 &self,
1038 handle: &CollectionHandle,
1039 query: &[f32],
1040 depth: usize,
1041 ef_search: usize,
1042 filter: Option<&Filter>,
1043 ) -> Result<Vec<String>> {
1044 let raw = handle.index.search(query, depth, ef_search.max(depth))?;
1045 let mut ids = Vec::new();
1046 for neighbor in raw {
1047 let Some(ext_id) = handle.int_to_ext.get(neighbor.id as usize) else {
1048 continue;
1049 };
1050 if !self.passes_filter(handle.id, ext_id, filter)? {
1051 continue;
1052 }
1053 ids.push(ext_id.clone());
1054 if ids.len() >= depth {
1055 break;
1056 }
1057 }
1058 Ok(ids)
1059 }
1060
1061 fn sparse_ranked_ids(
1068 &self,
1069 handle: &CollectionHandle,
1070 query: &SparseVector,
1071 depth: usize,
1072 filter: Option<&Filter>,
1073 ) -> Result<Vec<String>> {
1074 if let Some(idx) = handle.sparse.as_ref() {
1075 let mut ids = Vec::new();
1076 for (ext_id, _score) in idx.search(query) {
1077 if !self.passes_filter(handle.id, &ext_id, filter)? {
1078 continue;
1079 }
1080 ids.push(ext_id);
1081 if ids.len() >= depth {
1082 break;
1083 }
1084 }
1085 return Ok(ids);
1086 }
1087 self.sparse_ranked_ids_by_scan(handle.id, query, depth, filter)
1088 }
1089
1090 fn sparse_ranked_ids_by_scan(
1094 &self,
1095 cid: CollectionId,
1096 query: &SparseVector,
1097 depth: usize,
1098 filter: Option<&Filter>,
1099 ) -> Result<Vec<String>> {
1100 let qmap: HashMap<u32, f32> = query
1101 .indices
1102 .iter()
1103 .copied()
1104 .zip(query.values.iter().copied())
1105 .collect();
1106 let mut scored: Vec<(f32, String)> = Vec::new();
1107 for (ext_id, record) in self.store.scan(cid)? {
1108 if record.payload.is_empty() {
1109 continue;
1110 }
1111 let Ok(value) = serde_json::from_slice::<Value>(&record.payload) else {
1112 continue;
1113 };
1114 if let Some(filter) = filter
1115 && !filter.matches(&value)
1116 {
1117 continue;
1118 }
1119 let Some(raw) = value.get(SPARSE_KEY) else {
1120 continue;
1121 };
1122 let Ok(sv) = serde_json::from_value::<SparseVector>(raw.clone()) else {
1123 continue;
1124 };
1125 let mut score = 0.0f32;
1126 for (dim, weight) in sv.indices.iter().zip(sv.values.iter()) {
1127 if let Some(qw) = qmap.get(dim) {
1128 score += qw * weight;
1129 }
1130 }
1131 if score > 0.0 {
1132 scored.push((score, ext_id));
1133 }
1134 }
1135 scored.sort_by(|a, b| b.0.total_cmp(&a.0).then(a.1.cmp(&b.1)));
1136 Ok(scored.into_iter().take(depth).map(|(_, id)| id).collect())
1137 }
1138
1139 fn bm25_ranked_ids(
1147 &self,
1148 handle: &CollectionHandle,
1149 query_text: &str,
1150 depth: usize,
1151 filter: Option<&Filter>,
1152 ) -> Result<Vec<String>> {
1153 let Some(idx) = handle.sparse.as_ref() else {
1154 return Ok(Vec::new());
1155 };
1156 let terms = query_term_ids(query_text);
1157 let mut ids = Vec::new();
1158 for (ext_id, _score) in idx.bm25_search(&terms, BM25_K1, BM25_B) {
1159 if !self.passes_filter(handle.id, &ext_id, filter)? {
1160 continue;
1161 }
1162 ids.push(ext_id);
1163 if ids.len() >= depth {
1164 break;
1165 }
1166 }
1167 Ok(ids)
1168 }
1169
1170 fn passes_filter(
1173 &self,
1174 cid: CollectionId,
1175 ext_id: &str,
1176 filter: Option<&Filter>,
1177 ) -> Result<bool> {
1178 let Some(filter) = filter else {
1179 return Ok(true);
1180 };
1181 let value: Value = match self.store.get(cid, ext_id)? {
1182 Some(r) => serde_json::from_slice(&r.payload)?,
1183 None => Value::Null,
1184 };
1185 Ok(filter.matches(&value))
1186 }
1187
1188 fn exact_filtered_search(
1193 &self,
1194 cid: CollectionId,
1195 descriptor: &Descriptor,
1196 query: &[f32],
1197 params: &SearchParams,
1198 filter: &Filter,
1199 candidates: &BTreeSet<String>,
1200 ) -> Result<Vec<Match>> {
1201 let metric = to_index_metric(descriptor.metric);
1202 let mut scored: Vec<(f32, String, Value, Vec<f32>)> = Vec::new();
1203 for ext_id in candidates {
1204 let Some(record) = self.store.get(cid, ext_id)? else {
1205 continue;
1206 };
1207 let payload: Value = serde_json::from_slice(&record.payload)?;
1208 if !filter.matches(&payload) {
1209 continue;
1210 }
1211 let ordering = ordering_distance(metric, query, &record.vector);
1212 scored.push((ordering, ext_id.clone(), payload, record.vector));
1213 }
1214 scored.sort_by(|a, b| a.0.total_cmp(&b.0));
1215 scored.truncate(params.k);
1216 Ok(scored
1217 .into_iter()
1218 .map(|(ordering, id, payload, vector)| Match {
1219 id,
1220 score: report_metric(metric, ordering),
1221 payload: params.with_payload.then_some(payload),
1222 vector: params.with_vector.then_some(vector),
1223 })
1224 .collect())
1225 }
1226
1227 pub fn upsert_document(
1236 &mut self,
1237 collection: &str,
1238 doc_id: &str,
1239 vectors: &[Vec<f32>],
1240 payload: &Value,
1241 ) -> Result<()> {
1242 let handle = self
1243 .collections
1244 .get_mut(collection)
1245 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
1246 require_multivector(handle)?;
1247 if doc_id.contains(DOC_TOKEN_SEP) {
1248 return Err(Error::Unsupported(
1249 "document id must not contain the reserved 0x1f separator",
1250 ));
1251 }
1252 if vectors.is_empty() {
1253 return Err(Error::Unsupported("a document needs at least one vector"));
1254 }
1255 let dim = handle.descriptor.dim as usize;
1256 if vectors.iter().any(|v| v.len() != dim) {
1257 return Err(Error::Unsupported(
1258 "every document vector must match the collection dimensionality",
1259 ));
1260 }
1261 let previous = handle
1264 .docs
1265 .as_ref()
1266 .and_then(|d| d.get(doc_id))
1267 .copied()
1268 .unwrap_or(0) as usize;
1269 for j in vectors.len()..previous {
1270 self.store.delete(handle.id, &token_id(doc_id, j))?;
1271 index_delete_point(handle, &token_id(doc_id, j));
1273 }
1274 let payload_bytes = serde_json::to_vec(payload)?;
1275 for (j, vector) in vectors.iter().enumerate() {
1276 let bytes: &[u8] = if j == 0 {
1278 payload_bytes.as_slice()
1279 } else {
1280 &[]
1281 };
1282 self.store
1283 .upsert(handle.id, &token_id(doc_id, j), vector, bytes)?;
1284 index_upsert_point(handle, &token_id(doc_id, j), vector)?;
1288 }
1289 if let Some(docs) = handle.docs.as_mut() {
1290 docs.insert(doc_id.to_owned(), vectors.len() as u32);
1291 }
1292 Ok(())
1293 }
1294
1295 pub fn search_multi_vector(
1303 &mut self,
1304 collection: &str,
1305 query_tokens: &[Vec<f32>],
1306 params: &SearchParams,
1307 ) -> Result<Vec<DocumentMatch>> {
1308 self.ensure_indexed(collection)?;
1312 self.search_multi_vector_snapshot(collection, query_tokens, params)
1313 }
1314
1315 pub fn search_multi_vector_snapshot(
1322 &self,
1323 collection: &str,
1324 query_tokens: &[Vec<f32>],
1325 params: &SearchParams,
1326 ) -> Result<Vec<DocumentMatch>> {
1327 require_multivector(self.handle(collection)?)?;
1328 let dim = self.handle(collection)?.descriptor.dim as usize;
1329 if query_tokens.is_empty() {
1330 return Ok(Vec::new());
1331 }
1332 if query_tokens.iter().any(|v| v.len() != dim) {
1333 return Err(Error::Unsupported(
1334 "every query token must match the collection dimensionality",
1335 ));
1336 }
1337
1338 let doc_count = self
1339 .handle(collection)?
1340 .docs
1341 .as_ref()
1342 .map_or(0, BTreeMap::len);
1343 let candidates: Vec<String> = if doc_count <= MULTIVECTOR_EXACT_DOC_THRESHOLD {
1344 self.handle(collection)?
1346 .docs
1347 .as_ref()
1348 .map(|d| d.keys().cloned().collect())
1349 .unwrap_or_default()
1350 } else {
1351 let handle = self.handle(collection)?;
1355 let per_token_k = params
1356 .k
1357 .saturating_mul(MULTIVECTOR_CANDIDATE_FACTOR)
1358 .max(params.ef_search);
1359 let mut set = BTreeSet::new();
1360 for token in query_tokens {
1361 for neighbor in handle.index.search(token, per_token_k, params.ef_search)? {
1362 if let Some(ext) = handle.int_to_ext.get(neighbor.id as usize)
1363 && let Some((doc, _)) = parse_token_id(ext)
1364 {
1365 set.insert(doc.to_owned());
1366 }
1367 }
1368 }
1369 set.into_iter().collect()
1370 };
1371
1372 let handle = self.handle(collection)?;
1374 let cid = handle.id;
1375 let metric = to_index_metric(handle.descriptor.metric);
1376 let mut scored: Vec<ScoredDocument> = Vec::new();
1377 for doc in &candidates {
1378 let count = handle
1379 .docs
1380 .as_ref()
1381 .and_then(|d| d.get(doc))
1382 .copied()
1383 .unwrap_or(0) as usize;
1384 let (tokens, payload) = self.gather_document(cid, doc, count)?;
1385 if tokens.is_empty() {
1386 continue;
1387 }
1388 if let Some(filter) = ¶ms.filter {
1389 let value = payload.clone().unwrap_or(Value::Null);
1390 if !filter.matches(&value) {
1391 continue;
1392 }
1393 }
1394 let score = max_sim(metric, query_tokens, &tokens);
1395 let vectors = params.with_vector.then_some(tokens);
1396 scored.push((score, doc.clone(), payload, vectors));
1397 }
1398 scored.sort_by(|a, b| b.0.total_cmp(&a.0).then_with(|| a.1.cmp(&b.1)));
1400 scored.truncate(params.k);
1401 Ok(scored
1402 .into_iter()
1403 .map(|(score, id, payload, vectors)| DocumentMatch {
1404 id,
1405 score,
1406 payload: params.with_payload.then_some(payload).flatten(),
1407 vectors,
1408 })
1409 .collect())
1410 }
1411
1412 pub fn get_document(
1415 &self,
1416 collection: &str,
1417 doc_id: &str,
1418 with_vectors: bool,
1419 ) -> Result<Option<DocumentMatch>> {
1420 let handle = self.handle(collection)?;
1421 require_multivector(handle)?;
1422 let Some(&count) = handle.docs.as_ref().and_then(|d| d.get(doc_id)) else {
1423 return Ok(None);
1424 };
1425 let (tokens, payload) = self.gather_document(handle.id, doc_id, count as usize)?;
1426 if tokens.is_empty() {
1427 return Ok(None);
1428 }
1429 Ok(Some(DocumentMatch {
1430 id: doc_id.to_owned(),
1431 score: 0.0,
1432 payload,
1433 vectors: with_vectors.then_some(tokens),
1434 }))
1435 }
1436
1437 pub fn delete_document(&mut self, collection: &str, doc_id: &str) -> Result<bool> {
1440 let handle = self
1441 .collections
1442 .get_mut(collection)
1443 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
1444 require_multivector(handle)?;
1445 let Some(count) = handle.docs.as_ref().and_then(|d| d.get(doc_id)).copied() else {
1446 return Ok(false);
1447 };
1448 for j in 0..count as usize {
1449 self.store.delete(handle.id, &token_id(doc_id, j))?;
1450 index_delete_point(handle, &token_id(doc_id, j));
1452 }
1453 if let Some(docs) = handle.docs.as_mut() {
1454 docs.remove(doc_id);
1455 }
1456 Ok(true)
1457 }
1458
1459 pub fn document_count(&self, collection: &str) -> Result<usize> {
1462 let handle = self.handle(collection)?;
1463 require_multivector(handle)?;
1464 Ok(handle.docs.as_ref().map_or(0, BTreeMap::len))
1465 }
1466
1467 fn gather_document(
1471 &self,
1472 cid: CollectionId,
1473 doc_id: &str,
1474 count: usize,
1475 ) -> Result<(Vec<Vec<f32>>, Option<Value>)> {
1476 let mut tokens = Vec::with_capacity(count);
1477 let mut payload: Option<Value> = None;
1478 for j in 0..count {
1479 let Some(record) = self.store.get(cid, &token_id(doc_id, j))? else {
1480 continue;
1481 };
1482 if j == 0 && !record.payload.is_empty() {
1483 payload = Some(serde_json::from_slice(&record.payload)?);
1484 }
1485 tokens.push(record.vector);
1486 }
1487 Ok((tokens, payload))
1488 }
1489
1490 pub fn checkpoint(&mut self) -> Result<()> {
1495 let mut snapshots: HashMap<CollectionId, Vec<u8>> = HashMap::new();
1496 for handle in self.collections.values() {
1497 if handle.stale {
1498 continue;
1499 }
1500 if let CollectionIndex::Ivf(Some(ivf)) = &handle.index {
1501 if ivf.is_empty() {
1502 continue;
1503 }
1504 let envelope = IndexEnvelope {
1505 version: INDEX_ENVELOPE_VERSION,
1506 int_to_ext: handle.int_to_ext.clone(),
1507 ivf: ivf.snapshot()?,
1508 };
1509 snapshots.insert(handle.id, postcard::to_allocvec(&envelope)?);
1510 } else if let CollectionIndex::Disk(Some(fresh)) = &handle.index {
1511 let envelope = DiskEnvelope {
1517 version: INDEX_ENVELOPE_VERSION,
1518 int_to_ext: handle.int_to_ext.clone(),
1519 base_row_count: fresh.base_len() as u64,
1520 deleted_ids: fresh.deleted_ids(),
1521 };
1522 snapshots.insert(handle.id, postcard::to_allocvec(&envelope)?);
1523 }
1524 }
1525 self.store.checkpoint_with_index_snapshots(&snapshots)?;
1526 Ok(())
1527 }
1528
1529 pub fn compact(&mut self) -> Result<()> {
1533 Ok(self.store.compact()?)
1534 }
1535
1536 #[must_use]
1539 pub fn manifest_version(&self) -> u64 {
1540 self.store.manifest_version()
1541 }
1542
1543 #[must_use]
1546 pub fn disk_usage_bytes(&self) -> u64 {
1547 dir_size(self.store.dir())
1548 }
1549
1550 pub fn snapshot(&mut self, dest: &Path) -> Result<SnapshotInfo> {
1562 if dest.exists() {
1563 return Err(Error::Core(quiver_core::CoreError::AlreadyExists(
1564 dest.display().to_string(),
1565 )));
1566 }
1567 self.checkpoint()?;
1570 let (files, bytes) = copy_tree(self.store.dir(), dest)?;
1571 let _ = std::fs::File::open(dest).and_then(|f| f.sync_all());
1575 Ok(SnapshotInfo {
1576 manifest_version: self.store.manifest_version(),
1577 files,
1578 bytes,
1579 })
1580 }
1581
1582 fn handle(&self, name: &str) -> Result<&CollectionHandle> {
1583 self.collections
1584 .get(name)
1585 .ok_or_else(|| Error::CollectionNotFound(name.to_owned()))
1586 }
1587}
1588
1589pub fn restore_snapshot(src: &Path, dest: &Path) -> Result<SnapshotInfo> {
1602 if dest.exists() {
1603 return Err(Error::Core(quiver_core::CoreError::AlreadyExists(
1604 dest.display().to_string(),
1605 )));
1606 }
1607 if !src.join("CURRENT").exists() {
1608 return Err(Error::Core(quiver_core::CoreError::InvalidArgument(
1609 format!("{} is not a snapshot (no CURRENT)", src.display()),
1610 )));
1611 }
1612 let (files, bytes) = copy_tree(src, dest)?;
1613 Ok(SnapshotInfo {
1614 manifest_version: 0,
1618 files,
1619 bytes,
1620 })
1621}
1622
1623fn copy_tree(src: &Path, dst: &Path) -> Result<(u64, u64)> {
1627 std::fs::create_dir_all(dst).map_err(|e| quiver_core::CoreError::io(dst, e))?;
1628 let mut files = 0u64;
1629 let mut bytes = 0u64;
1630 for entry in std::fs::read_dir(src).map_err(|e| quiver_core::CoreError::io(src, e))? {
1631 let entry = entry.map_err(|e| quiver_core::CoreError::io(src, e))?;
1632 let from = entry.path();
1633 let to = dst.join(entry.file_name());
1634 let ft = entry
1635 .file_type()
1636 .map_err(|e| quiver_core::CoreError::io(&from, e))?;
1637 if ft.is_dir() {
1638 let (f, b) = copy_tree(&from, &to)?;
1639 files += f;
1640 bytes += b;
1641 } else {
1642 let n = std::fs::copy(&from, &to).map_err(|e| quiver_core::CoreError::io(&from, e))?;
1643 files += 1;
1644 bytes += n;
1645 }
1646 }
1647 Ok((files, bytes))
1648}
1649
1650fn dir_size(dir: &Path) -> u64 {
1653 let mut total = 0u64;
1654 let Ok(rd) = std::fs::read_dir(dir) else {
1655 return total;
1656 };
1657 for entry in rd.flatten() {
1658 let Ok(ft) = entry.file_type() else { continue };
1659 if ft.is_dir() {
1660 total += dir_size(&entry.path());
1661 } else if let Ok(meta) = entry.metadata() {
1662 total += meta.len();
1663 }
1664 }
1665 total
1666}
1667
1668const DOC_TOKEN_SEP: char = '\u{1f}';
1672
1673const MULTIVECTOR_EXACT_DOC_THRESHOLD: usize = 10_000;
1677
1678const MULTIVECTOR_CANDIDATE_FACTOR: usize = 4;
1681
1682fn token_id(doc_id: &str, ordinal: usize) -> String {
1684 format!("{doc_id}{DOC_TOKEN_SEP}{ordinal}")
1685}
1686
1687fn parse_token_id(ext: &str) -> Option<(&str, u32)> {
1691 let (doc, ordinal) = ext.rsplit_once(DOC_TOKEN_SEP)?;
1692 Some((doc, ordinal.parse().ok()?))
1693}
1694
1695fn require_single_vector(handle: &CollectionHandle) -> Result<()> {
1697 if handle.descriptor.multivector {
1698 Err(Error::Unsupported(
1699 "collection is multi-vector; use upsert_document / search_multi_vector",
1700 ))
1701 } else {
1702 Ok(())
1703 }
1704}
1705
1706fn require_multivector(handle: &CollectionHandle) -> Result<()> {
1708 if handle.descriptor.multivector {
1709 Ok(())
1710 } else {
1711 Err(Error::Unsupported(
1712 "collection is single-vector; use upsert / search",
1713 ))
1714 }
1715}
1716
1717fn require_server_searchable(handle: &CollectionHandle) -> Result<()> {
1721 if handle.descriptor.vector_encryption == VectorEncryption::ClientSide {
1722 Err(Error::Unsupported(
1723 "collection is client-side encrypted; the server cannot rank opaque vectors — \
1724 fetch points and rank client-side",
1725 ))
1726 } else {
1727 Ok(())
1728 }
1729}
1730
1731fn to_index_metric(metric: DistanceMetric) -> Metric {
1732 match metric {
1733 DistanceMetric::Dot => Metric::Dot,
1734 DistanceMetric::Cosine => Metric::Cosine,
1735 DistanceMetric::L2 => Metric::L2,
1736 }
1737}
1738
1739fn validate_index(descriptor: &Descriptor) -> Result<()> {
1741 if descriptor.multivector && descriptor.metric == DistanceMetric::L2 {
1744 return Err(Error::Unsupported(
1745 "multi-vector collections require a similarity metric (cosine or dot)",
1746 ));
1747 }
1748 if descriptor.vector_encryption == VectorEncryption::ClientSide {
1752 if descriptor.multivector {
1753 return Err(Error::Unsupported(
1754 "client-side vector encryption is not supported for multi-vector collections",
1755 ));
1756 }
1757 return Ok(());
1758 }
1759 if descriptor.vector_encryption == VectorEncryption::Dcpe
1762 && descriptor.metric != DistanceMetric::L2
1763 {
1764 return Err(Error::Unsupported(
1765 "dcpe-encrypted collections require the l2 metric",
1766 ));
1767 }
1768 if descriptor.index.kind == IndexKind::Colbert && !descriptor.multivector {
1771 return Err(Error::Unsupported(
1772 "the colbert index is only for multi-vector collections",
1773 ));
1774 }
1775 match descriptor.index.kind {
1776 IndexKind::Vamana | IndexKind::Ivf | IndexKind::DiskVamana
1777 if descriptor.metric == DistanceMetric::Dot =>
1778 {
1779 Err(Error::Unsupported(
1780 "vamana, ivf, and the disk index support l2 and cosine; use hnsw for dot",
1781 ))
1782 }
1783 _ => Ok(()),
1784 }
1785}
1786
1787fn empty_index(descriptor: &Descriptor) -> CollectionIndex {
1789 if descriptor.vector_encryption == VectorEncryption::ClientSide {
1790 return CollectionIndex::None;
1791 }
1792 match descriptor.index.kind {
1793 IndexKind::Vamana => CollectionIndex::Vamana(None),
1794 IndexKind::DiskVamana => CollectionIndex::Disk(None),
1795 IndexKind::Ivf => CollectionIndex::Ivf(None),
1796 IndexKind::Colbert => CollectionIndex::Colbert(None),
1797 _ => CollectionIndex::Hnsw(Hnsw::new(
1798 descriptor.dim as usize,
1799 to_index_metric(descriptor.metric),
1800 HnswConfig::default(),
1801 )),
1802 }
1803}
1804
1805fn default_pq_m(dim: usize) -> usize {
1808 let target = (dim / 8).max(1);
1809 (1..=target)
1810 .rev()
1811 .find(|&m| dim.is_multiple_of(m))
1812 .unwrap_or(1)
1813}
1814
1815const PQ_SEED: u64 = 0x5176_5044_5141_5453;
1818const DISK_INDEX_FILE: &str = "vamana.qvx";
1821
1822fn build_index(
1823 store: &Store,
1824 cid: CollectionId,
1825 descriptor: &Descriptor,
1826 ids: &[u64],
1827 flat: &[f32],
1828) -> Result<CollectionIndex> {
1829 Ok(match build_in_memory_index(descriptor, ids, flat)? {
1830 Some(index) => index,
1831 None => {
1832 let (graph, pq) = build_disk_graph_pq(descriptor, ids, flat)?;
1833 CollectionIndex::Disk(Some(FreshDiskVamana::new(write_disk_index(
1834 store, cid, &graph, &pq,
1835 )?)?))
1836 }
1837 })
1838}
1839
1840fn build_in_memory_index(
1845 descriptor: &Descriptor,
1846 ids: &[u64],
1847 flat: &[f32],
1848) -> Result<Option<CollectionIndex>> {
1849 if descriptor.vector_encryption == VectorEncryption::ClientSide {
1852 return Ok(Some(CollectionIndex::None));
1853 }
1854 let dim = descriptor.dim as usize;
1855 let metric = to_index_metric(descriptor.metric);
1856 Ok(Some(match descriptor.index.kind {
1857 IndexKind::Vamana => CollectionIndex::Vamana(Some(FreshVamana::new(Vamana::build(
1858 ids,
1859 flat,
1860 dim,
1861 metric,
1862 VamanaConfig::default(),
1863 )?)?)),
1864 IndexKind::DiskVamana => return Ok(None),
1866 IndexKind::Ivf => {
1867 let cfg = IvfConfig {
1868 quantization: descriptor.index.pq_subspaces.map(|m| m as usize),
1869 ..IvfConfig::default()
1870 };
1871 CollectionIndex::Ivf(Some(Ivf::build(ids, flat, dim, metric, cfg)?))
1872 }
1873 IndexKind::Colbert => {
1874 let n = ids.len();
1877 let n_centroids = ((n as f64).sqrt().ceil() as usize).clamp(1, 4096);
1878 let cfg = ColbertConfig {
1879 n_centroids,
1880 n_probe: n_centroids.div_ceil(4).clamp(1, n_centroids),
1881 pq_subspaces: descriptor
1882 .index
1883 .pq_subspaces
1884 .map_or_else(|| default_pq_m(dim), |m| m as usize),
1885 seed: PQ_SEED,
1886 };
1887 CollectionIndex::Colbert(Some(ColbertIndex::build(ids, flat, dim, metric, cfg)?))
1888 }
1889 _ => {
1890 let mut h = Hnsw::new(dim, metric, HnswConfig::default());
1891 for (i, &id) in ids.iter().enumerate() {
1892 h.insert(id, &flat[i * dim..(i + 1) * dim])?;
1893 }
1894 CollectionIndex::Hnsw(h)
1895 }
1896 }))
1897}
1898
1899fn build_disk_graph_pq(
1903 descriptor: &Descriptor,
1904 ids: &[u64],
1905 flat: &[f32],
1906) -> Result<(Vamana, ProductQuantizer)> {
1907 let dim = descriptor.dim as usize;
1908 let metric = to_index_metric(descriptor.metric);
1909 let graph = Vamana::build(ids, flat, dim, metric, VamanaConfig::default())?;
1910 let m = descriptor
1911 .index
1912 .pq_subspaces
1913 .map_or_else(|| default_pq_m(dim), |x| x as usize);
1914 let pq = ProductQuantizer::train(flat, ids.len(), dim, m, metric, PQ_SEED)?;
1915 Ok((graph, pq))
1916}
1917
1918fn write_disk_index(
1923 store: &Store,
1924 cid: CollectionId,
1925 graph: &Vamana,
1926 pq: &ProductQuantizer,
1927) -> Result<DiskVamana> {
1928 let dir = store.index_dir(cid);
1929 std::fs::create_dir_all(&dir).map_err(quiver_index::DiskError::Io)?;
1930 let path = dir.join(DISK_INDEX_FILE);
1931 let codec = store.collection_codec_clone(cid)?;
1935 let tmp = dir.join(format!("{DISK_INDEX_FILE}.tmp"));
1941 quiver_index::disk::write(&tmp, graph, pq, codec.as_ref())?;
1942 std::fs::rename(&tmp, &path).map_err(quiver_index::DiskError::Io)?;
1943 let _ = std::fs::File::open(&dir).and_then(|f| f.sync_all());
1944 open_disk_index(store, cid, codec)
1945}
1946
1947fn open_disk_index(
1951 store: &Store,
1952 cid: CollectionId,
1953 codec: Box<dyn PageCodec>,
1954) -> Result<DiskVamana> {
1955 let path = store.index_dir(cid).join(DISK_INDEX_FILE);
1956 Ok(DiskVamana::open(&path, codec)?)
1957}
1958
1959fn load_index(store: &Store, handle: &mut CollectionHandle) -> Result<()> {
1964 if !handle.descriptor.multivector
1967 && handle.descriptor.index.kind == IndexKind::Ivf
1968 && let Ok(Some(blob)) = store.read_index_snapshot(handle.id)
1969 && restore_ivf_snapshot(store, handle, &blob).is_ok()
1970 {
1971 return Ok(());
1972 }
1973 if !handle.descriptor.multivector
1980 && handle.descriptor.index.kind == IndexKind::DiskVamana
1981 && std::env::var_os("QUIVER_DISABLE_DURABLE_DISK_INDEX").is_none()
1982 && let Ok(Some(blob)) = store.read_index_snapshot(handle.id)
1983 && restore_disk_snapshot(store, handle, &blob).is_ok()
1984 {
1985 return Ok(());
1986 }
1987 rebuild_index(store, handle)
1988}
1989
1990fn restore_disk_snapshot(store: &Store, handle: &mut CollectionHandle, blob: &[u8]) -> Result<()> {
1996 let envelope: DiskEnvelope = postcard::from_bytes(blob)?;
1997 if envelope.version != INDEX_ENVELOPE_VERSION {
1998 return Err(Error::Unsupported(
1999 "unsupported disk index snapshot version",
2000 ));
2001 }
2002 let base = open_disk_index(store, handle.id, store.collection_codec_clone(handle.id)?)?;
2003 if base.len() as u64 != envelope.base_row_count {
2006 return Err(Error::Unsupported(
2007 "disk base count disagrees with snapshot",
2008 ));
2009 }
2010 handle.ext_to_int = envelope
2011 .int_to_ext
2012 .iter()
2013 .enumerate()
2014 .map(|(i, ext)| (ext.clone(), i as u64))
2015 .collect();
2016 handle.int_to_ext = envelope.int_to_ext;
2017 let mut fresh = FreshDiskVamana::new(base)?;
2018 for internal in envelope.base_row_count..handle.int_to_ext.len() as u64 {
2022 let ext = &handle.int_to_ext[internal as usize];
2023 if let Some(record) = store.get(handle.id, ext)? {
2024 fresh.insert(internal, &record.vector)?;
2025 }
2026 }
2027 for id in envelope.deleted_ids {
2028 fresh.mark_deleted(id);
2029 }
2030 handle.index = CollectionIndex::Disk(Some(fresh));
2031 handle.stale = false;
2032 replay_recovery_tail(store, handle)
2033}
2034
2035fn replay_recovery_tail(store: &Store, handle: &mut CollectionHandle) -> Result<()> {
2040 let tail = store.recovery_tail(handle.id)?;
2041 for ext in &tail.deleted {
2042 index_delete_point(handle, ext);
2043 }
2044 for (ext, record) in tail.upserts {
2045 index_upsert_point(handle, &ext, &record.vector)?;
2046 }
2047 Ok(())
2048}
2049
2050fn restore_ivf_snapshot(store: &Store, handle: &mut CollectionHandle, blob: &[u8]) -> Result<()> {
2055 let envelope: IndexEnvelope = postcard::from_bytes(blob)?;
2056 if envelope.version != INDEX_ENVELOPE_VERSION {
2057 return Err(Error::Unsupported(
2058 "unsupported index snapshot envelope version",
2059 ));
2060 }
2061 let ivf = Ivf::restore(&envelope.ivf)?;
2062 handle.ext_to_int = envelope
2063 .int_to_ext
2064 .iter()
2065 .enumerate()
2066 .map(|(i, ext)| (ext.clone(), i as u64))
2067 .collect();
2068 handle.int_to_ext = envelope.int_to_ext;
2069 handle.index = CollectionIndex::Ivf(Some(ivf));
2070 handle.stale = false;
2071
2072 let tail = store.recovery_tail(handle.id)?;
2073 for ext in &tail.deleted {
2074 let Some(&internal) = handle.ext_to_int.get(ext) else {
2075 continue;
2076 };
2077 if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2078 ivf.remove(internal);
2079 }
2080 }
2081 for (ext, record) in tail.upserts {
2082 let internal = match handle.ext_to_int.get(&ext) {
2083 Some(&i) => i,
2084 None => {
2085 let i = handle.int_to_ext.len() as u64;
2086 handle.ext_to_int.insert(ext.clone(), i);
2087 handle.int_to_ext.push(ext);
2088 i
2089 }
2090 };
2091 if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2092 ivf.insert(internal, &record.vector)?;
2093 }
2094 }
2095 Ok(())
2096}
2097
2098fn index_upsert_point(handle: &mut CollectionHandle, ext_id: &str, vector: &[f32]) -> Result<()> {
2107 bump_write_gen(handle);
2110 if handle.stale {
2111 return Ok(());
2112 }
2113 let known = handle.ext_to_int.contains_key(ext_id);
2114 let is_hnsw = matches!(handle.index, CollectionIndex::Hnsw(_));
2115 let is_live_ivf = matches!(&handle.index, CollectionIndex::Ivf(Some(ivf)) if !ivf.is_empty());
2116 let is_live_graph = matches!(
2117 handle.index,
2118 CollectionIndex::Vamana(Some(_)) | CollectionIndex::Disk(Some(_))
2119 );
2120 let is_live_colbert = matches!(handle.index, CollectionIndex::Colbert(Some(_)));
2121 if is_hnsw && !known {
2122 let internal = handle.int_to_ext.len() as u64;
2123 if let CollectionIndex::Hnsw(h) = &mut handle.index {
2124 h.insert(internal, vector)?;
2125 }
2126 handle.ext_to_int.insert(ext_id.to_owned(), internal);
2127 handle.int_to_ext.push(ext_id.to_owned());
2128 } else if is_live_ivf {
2129 let internal = if known {
2132 handle.ext_to_int[ext_id]
2133 } else {
2134 let i = handle.int_to_ext.len() as u64;
2135 handle.ext_to_int.insert(ext_id.to_owned(), i);
2136 handle.int_to_ext.push(ext_id.to_owned());
2137 i
2138 };
2139 if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2140 ivf.insert(internal, vector)?;
2141 }
2142 } else if is_live_graph {
2143 let old = handle.ext_to_int.get(ext_id).copied();
2146 let internal = handle.int_to_ext.len() as u64;
2147 let mut pending = 0.0;
2148 match &mut handle.index {
2149 CollectionIndex::Vamana(Some(fresh)) => {
2150 if let Some(o) = old {
2151 fresh.mark_deleted(o);
2152 }
2153 fresh.insert(internal, vector)?;
2154 pending = fresh.pending_fraction();
2155 }
2156 CollectionIndex::Disk(Some(fresh)) => {
2157 if let Some(o) = old {
2158 fresh.mark_deleted(o);
2159 }
2160 fresh.insert(internal, vector)?;
2161 pending = fresh.pending_fraction();
2162 }
2163 _ => {}
2164 }
2165 handle.ext_to_int.insert(ext_id.to_owned(), internal);
2166 handle.int_to_ext.push(ext_id.to_owned());
2167 if pending >= GRAPH_REBUILD_PENDING_FRACTION {
2168 mark_stale(handle);
2169 }
2170 } else if is_live_colbert {
2171 let old = handle.ext_to_int.get(ext_id).copied();
2175 let internal = handle.int_to_ext.len() as u64;
2176 let mut crowded = false;
2177 if let CollectionIndex::Colbert(Some(c)) = &mut handle.index {
2178 if let Some(o) = old {
2179 c.mark_deleted(o);
2180 }
2181 c.insert(internal, vector)?;
2182 crowded = c.deleted_fraction() >= HNSW_REBUILD_DELETED_FRACTION;
2183 }
2184 handle.ext_to_int.insert(ext_id.to_owned(), internal);
2185 handle.int_to_ext.push(ext_id.to_owned());
2186 if crowded {
2187 mark_stale(handle);
2188 }
2189 } else {
2190 mark_stale(handle);
2191 }
2192 Ok(())
2193}
2194
2195fn index_delete_point(handle: &mut CollectionHandle, ext_id: &str) {
2202 bump_write_gen(handle);
2204 if handle.stale {
2205 return;
2206 }
2207 let internal = handle.ext_to_int.get(ext_id).copied();
2208 let live_ivf = matches!(handle.index, CollectionIndex::Ivf(Some(_)));
2209 let live_hnsw = matches!(handle.index, CollectionIndex::Hnsw(_));
2210 let live_graph = matches!(
2211 handle.index,
2212 CollectionIndex::Vamana(Some(_)) | CollectionIndex::Disk(Some(_))
2213 );
2214 let live_colbert = matches!(handle.index, CollectionIndex::Colbert(Some(_)));
2215 match internal {
2216 Some(internal) if live_ivf => {
2217 if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2218 ivf.remove(internal);
2219 }
2220 }
2221 Some(internal) if live_hnsw => {
2222 let mut crowded = false;
2223 if let CollectionIndex::Hnsw(h) = &mut handle.index {
2224 h.mark_deleted(internal as u32);
2225 crowded = h.deleted_fraction() >= HNSW_REBUILD_DELETED_FRACTION;
2226 }
2227 if crowded {
2228 mark_stale(handle);
2229 }
2230 }
2231 Some(internal) if live_graph => {
2232 let mut crowded = false;
2233 match &mut handle.index {
2234 CollectionIndex::Vamana(Some(fresh)) => {
2235 fresh.mark_deleted(internal);
2236 crowded = fresh.pending_fraction() >= GRAPH_REBUILD_PENDING_FRACTION;
2237 }
2238 CollectionIndex::Disk(Some(fresh)) => {
2239 fresh.mark_deleted(internal);
2240 crowded = fresh.pending_fraction() >= GRAPH_REBUILD_PENDING_FRACTION;
2241 }
2242 _ => {}
2243 }
2244 if crowded {
2245 mark_stale(handle);
2246 }
2247 }
2248 Some(internal) if live_colbert => {
2249 let mut crowded = false;
2250 if let CollectionIndex::Colbert(Some(c)) = &mut handle.index {
2251 c.mark_deleted(internal);
2252 crowded = c.deleted_fraction() >= HNSW_REBUILD_DELETED_FRACTION;
2253 }
2254 if crowded {
2255 mark_stale(handle);
2256 }
2257 }
2258 _ => mark_stale(handle),
2259 }
2260}
2261
2262struct RebuildScan {
2267 int_to_ext: Vec<String>,
2268 ext_to_int: HashMap<String, u64>,
2269 flat: Vec<f32>,
2270 docs: Option<BTreeMap<String, u32>>,
2271 sparse: Option<SparseInvertedIndex>,
2272}
2273
2274fn scan_collection(store: &Store, handle: &CollectionHandle) -> Result<RebuildScan> {
2279 let multivector = handle.descriptor.multivector;
2280 let mut int_to_ext = Vec::new();
2281 let mut ext_to_int = HashMap::new();
2282 let mut flat: Vec<f32> = Vec::new();
2283 let mut docs: BTreeMap<String, u32> = BTreeMap::new();
2284 let mut sparse = uses_sparse_index(&handle.descriptor).then(SparseInvertedIndex::new);
2287 for (ext_id, record) in store.scan(handle.id)? {
2288 let internal = int_to_ext.len() as u64;
2289 flat.extend_from_slice(&record.vector);
2290 if multivector && let Some((doc, _)) = parse_token_id(&ext_id) {
2291 *docs.entry(doc.to_owned()).or_insert(0) += 1;
2292 }
2293 if let Some(idx) = sparse.as_mut()
2294 && let Some(sv) = sparse_vector_from_payload(&record.payload)
2295 {
2296 idx.upsert(&ext_id, &sv);
2297 }
2298 ext_to_int.insert(ext_id.clone(), internal);
2299 int_to_ext.push(ext_id);
2300 }
2301 Ok(RebuildScan {
2302 int_to_ext,
2303 ext_to_int,
2304 flat,
2305 docs: multivector.then_some(docs),
2306 sparse,
2307 })
2308}
2309
2310fn rebuild_index(store: &Store, handle: &mut CollectionHandle) -> Result<()> {
2314 let scan = scan_collection(store, handle)?;
2315 let ids: Vec<u64> = (0..scan.int_to_ext.len() as u64).collect();
2316 handle.index = empty_index(&handle.descriptor);
2319 handle.index = build_index(store, handle.id, &handle.descriptor, &ids, &scan.flat)?;
2320 handle.int_to_ext = scan.int_to_ext;
2321 handle.ext_to_int = scan.ext_to_int;
2322 handle.docs = scan.docs;
2323 handle.sparse = scan.sparse;
2324 handle.stale = false;
2325 Ok(())
2326}
2327
2328pub struct RebuildInputs {
2334 collection: String,
2335 descriptor: Descriptor,
2336 scan: RebuildScan,
2337 write_gen: u64,
2338}
2339
2340enum RebuiltKind {
2345 Ready(Box<CollectionIndex>),
2346 Disk {
2347 graph: Box<Vamana>,
2348 pq: Box<ProductQuantizer>,
2349 },
2350}
2351
2352pub struct RebuiltIndex {
2355 collection: String,
2356 kind: RebuiltKind,
2357 int_to_ext: Vec<String>,
2358 ext_to_int: HashMap<String, u64>,
2359 docs: Option<BTreeMap<String, u32>>,
2360 sparse: Option<SparseInvertedIndex>,
2361 write_gen: u64,
2362}
2363
2364impl RebuildInputs {
2365 pub fn build(self) -> Result<RebuiltIndex> {
2370 let ids: Vec<u64> = (0..self.scan.int_to_ext.len() as u64).collect();
2371 let kind = match build_in_memory_index(&self.descriptor, &ids, &self.scan.flat)? {
2372 Some(index) => RebuiltKind::Ready(Box::new(index)),
2373 None => {
2374 let (graph, pq) = build_disk_graph_pq(&self.descriptor, &ids, &self.scan.flat)?;
2375 RebuiltKind::Disk {
2376 graph: Box::new(graph),
2377 pq: Box::new(pq),
2378 }
2379 }
2380 };
2381 Ok(RebuiltIndex {
2382 collection: self.collection,
2383 kind,
2384 int_to_ext: self.scan.int_to_ext,
2385 ext_to_int: self.scan.ext_to_int,
2386 docs: self.scan.docs,
2387 sparse: self.scan.sparse,
2388 write_gen: self.write_gen,
2389 })
2390 }
2391}
2392
2393fn sparse_vector_from_payload(payload: &[u8]) -> Option<SparseVector> {
2397 if payload.is_empty() {
2398 return None;
2399 }
2400 let value = serde_json::from_slice::<Value>(payload).ok()?;
2401 sparse_vector_from_value(&value)
2402}
2403
2404fn sparse_vector_from_value(payload: &Value) -> Option<SparseVector> {
2410 if let Some(raw) = payload.get(SPARSE_KEY) {
2411 return serde_json::from_value::<SparseVector>(raw.clone()).ok();
2412 }
2413 let text = payload.get(TEXT_KEY)?.as_str()?;
2414 Some(text_to_sparse(text))
2415}
2416
2417fn sparse_index_upsert_point(handle: &mut CollectionHandle, ext_id: &str, payload: &Value) {
2423 if handle.stale {
2424 return;
2425 }
2426 let Some(idx) = handle.sparse.as_mut() else {
2427 return;
2428 };
2429 match sparse_vector_from_value(payload) {
2430 Some(sv) => idx.upsert(ext_id, &sv),
2431 None => {
2432 idx.remove(ext_id);
2433 }
2434 }
2435}
2436
2437fn sparse_index_delete_point(handle: &mut CollectionHandle, ext_id: &str) {
2440 if let Some(idx) = handle.sparse.as_mut() {
2441 idx.remove(ext_id);
2442 }
2443}
2444
2445fn candidate_ids(
2457 store: &Store,
2458 cid: CollectionId,
2459 filter: &Filter,
2460 filterable: &[FilterableField],
2461) -> Result<Option<BTreeSet<String>>> {
2462 match filter {
2463 Filter::And(subs) => {
2464 let mut acc: Option<BTreeSet<String>> = None;
2467 for sub in subs {
2468 if let Some(set) = candidate_ids(store, cid, sub, filterable)? {
2469 acc = Some(match acc {
2470 Some(existing) => existing.intersection(&set).cloned().collect(),
2471 None => set,
2472 });
2473 }
2474 }
2475 Ok(acc)
2476 }
2477 Filter::Or(subs) => {
2478 let mut acc = BTreeSet::new();
2481 for sub in subs {
2482 match candidate_ids(store, cid, sub, filterable)? {
2483 Some(set) => acc.extend(set),
2484 None => return Ok(None),
2485 }
2486 }
2487 Ok(Some(acc))
2488 }
2489 Filter::Not(_) => Ok(None),
2491 leaf => match leaf_predicate(leaf, filterable) {
2493 Some(pred) => Ok(Some(store.matching_ids(cid, &pred)?.into_iter().collect())),
2494 None => Ok(None),
2495 },
2496 }
2497}
2498
2499fn leaf_predicate(filter: &Filter, filterable: &[FilterableField]) -> Option<SecPredicate> {
2503 let field_type = |field: &str| {
2504 filterable
2505 .iter()
2506 .find(|f| f.path == field)
2507 .map(|f| f.field_type)
2508 };
2509 match filter {
2510 Filter::Eq { field, value } => Some(SecPredicate::Eq {
2511 field: field.clone(),
2512 value: sec_value(field_type(field)?, value)?,
2513 }),
2514 Filter::In { field, values } => {
2515 let ft = field_type(field)?;
2516 let values: Option<Vec<SecValue>> = values.iter().map(|v| sec_value(ft, v)).collect();
2519 Some(SecPredicate::In {
2520 field: field.clone(),
2521 values: values?,
2522 })
2523 }
2524 Filter::Lt { field, value } => {
2525 one_sided_range(field, field_type(field)?, value, false, false)
2526 }
2527 Filter::Lte { field, value } => {
2528 one_sided_range(field, field_type(field)?, value, false, true)
2529 }
2530 Filter::Gt { field, value } => {
2531 one_sided_range(field, field_type(field)?, value, true, false)
2532 }
2533 Filter::Gte { field, value } => {
2534 one_sided_range(field, field_type(field)?, value, true, true)
2535 }
2536 _ => None,
2537 }
2538}
2539
2540fn one_sided_range(
2544 field: &str,
2545 field_type: FieldType,
2546 value: &Value,
2547 is_lower: bool,
2548 inclusive: bool,
2549) -> Option<SecPredicate> {
2550 let v = sec_value(field_type, value)?;
2551 let (lo, hi, lo_inclusive, hi_inclusive) = if is_lower {
2552 (Some(v), None, inclusive, false)
2553 } else {
2554 (None, Some(v), false, inclusive)
2555 };
2556 Some(SecPredicate::Range {
2557 field: field.to_owned(),
2558 lo,
2559 hi,
2560 lo_inclusive,
2561 hi_inclusive,
2562 })
2563}
2564
2565fn sec_value(field_type: FieldType, value: &Value) -> Option<SecValue> {
2569 match (field_type, value) {
2570 (FieldType::Keyword, Value::String(s)) => Some(SecValue::Keyword(s.clone())),
2571 (FieldType::Numeric, Value::Number(n)) => n.as_f64().map(SecValue::Numeric),
2572 _ => None,
2573 }
2574}
2575
2576#[cfg(test)]
2577mod tests {
2578 use super::*;
2579 use serde_json::json;
2580
2581 fn desc() -> Descriptor {
2582 Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
2583 }
2584
2585 fn open(dir: &Path) -> Database {
2586 Database::open(dir).unwrap()
2587 }
2588
2589 #[test]
2590 fn hybrid_search_fuses_dense_and_sparse() {
2591 let tmp = tempfile::tempdir().unwrap();
2592 let mut db = open(tmp.path());
2593 db.create_collection("kb", desc()).unwrap();
2594 db.upsert(
2597 "kb",
2598 "a",
2599 &[1.0, 0.0, 0.0, 0.0],
2600 &json!({ "__quiver_sparse__": { "indices": [100], "values": [0.1] } }),
2601 )
2602 .unwrap();
2603 db.upsert(
2604 "kb",
2605 "b",
2606 &[0.0, 1.0, 0.0, 0.0],
2607 &json!({ "__quiver_sparse__": { "indices": [1, 2], "values": [5.0, 5.0] } }),
2608 )
2609 .unwrap();
2610 db.upsert(
2611 "kb",
2612 "c",
2613 &[0.0, 0.0, 0.0, 1.0],
2614 &json!({ "__quiver_sparse__": { "indices": [9], "values": [1.0] } }),
2615 )
2616 .unwrap();
2617
2618 let dense_q = [1.0, 0.0, 0.0, 0.0];
2619 let sparse_q = SparseVector {
2620 indices: vec![1, 2],
2621 values: vec![1.0, 1.0],
2622 };
2623 let params = SearchParams {
2624 k: 3,
2625 ..SearchParams::default()
2626 };
2627
2628 let hits = db
2630 .hybrid_search(
2631 "kb",
2632 Some(&dense_q),
2633 Some(&sparse_q),
2634 None,
2635 ¶ms,
2636 DEFAULT_RRF_K0,
2637 )
2638 .unwrap();
2639 let ids: Vec<&str> = hits.iter().map(|m| m.id.as_str()).collect();
2640 assert!(ids.contains(&"a") && ids.contains(&"b"), "got {ids:?}");
2641 assert_eq!(ids[2], "c", "c is worst on both sides; got {ids:?}");
2642
2643 let sparse_only = db
2645 .hybrid_search("kb", None, Some(&sparse_q), None, ¶ms, DEFAULT_RRF_K0)
2646 .unwrap();
2647 assert_eq!(sparse_only[0].id, "b");
2648
2649 let dense_only = db
2651 .hybrid_search("kb", Some(&dense_q), None, None, ¶ms, DEFAULT_RRF_K0)
2652 .unwrap();
2653 assert_eq!(dense_only[0].id, "a");
2654
2655 assert!(
2657 db.hybrid_search("kb", None, None, None, ¶ms, DEFAULT_RRF_K0)
2658 .is_err()
2659 );
2660 }
2661
2662 fn sparse_ids(db: &mut Database, q: &SparseVector) -> Vec<String> {
2664 let params = SearchParams {
2665 k: 10,
2666 ..SearchParams::default()
2667 };
2668 db.hybrid_search("kb", None, Some(q), None, ¶ms, DEFAULT_RRF_K0)
2669 .unwrap()
2670 .into_iter()
2671 .map(|m| m.id)
2672 .collect()
2673 }
2674
2675 #[test]
2676 fn sparse_index_equals_the_store_scan_fallback() {
2677 let tmp = tempfile::tempdir().unwrap();
2678 let mut db = open(tmp.path());
2679 db.create_collection("kb", desc()).unwrap();
2680 let z = [0.0f32, 0.0, 0.0, 0.0];
2681 for (id, dims, vals) in [
2682 ("a", vec![1u32, 2], vec![5.0f32, 1.0]),
2683 ("b", vec![2u32, 3], vec![3.0f32, 4.0]),
2684 ("c", vec![1u32, 3], vec![2.0f32, 2.0]),
2685 ("d", vec![9u32], vec![1.0f32]), ] {
2687 db.upsert(
2688 "kb",
2689 id,
2690 &z,
2691 &json!({ "__quiver_sparse__": { "indices": dims, "values": vals } }),
2692 )
2693 .unwrap();
2694 }
2695 let q = SparseVector {
2696 indices: vec![1, 2, 3],
2697 values: vec![1.0, 1.0, 1.0],
2698 };
2699
2700 assert!(db.collections.get("kb").unwrap().sparse.is_some());
2702 let via_index = sparse_ids(&mut db, &q);
2703 assert!(!via_index.contains(&"d".to_owned()), "d shares no term");
2704
2705 db.collections.get_mut("kb").unwrap().sparse = None;
2708 let via_scan = sparse_ids(&mut db, &q);
2709 assert_eq!(via_index, via_scan);
2710 }
2711
2712 #[test]
2713 fn sparse_index_reflects_updates_and_deletes_like_a_rebuild() {
2714 let tmp = tempfile::tempdir().unwrap();
2715 let mut db = open(tmp.path());
2716 db.create_collection("kb", desc()).unwrap();
2717 let z = [0.0f32, 0.0, 0.0, 0.0];
2718 db.upsert(
2719 "kb",
2720 "a",
2721 &z,
2722 &json!({ "__quiver_sparse__": { "indices": [1, 2], "values": [5.0, 5.0] } }),
2723 )
2724 .unwrap();
2725 db.upsert(
2726 "kb",
2727 "b",
2728 &z,
2729 &json!({ "__quiver_sparse__": { "indices": [2], "values": [3.0] } }),
2730 )
2731 .unwrap();
2732 db.upsert(
2733 "kb",
2734 "c",
2735 &z,
2736 &json!({ "__quiver_sparse__": { "indices": [1], "values": [9.0] } }),
2737 )
2738 .unwrap();
2739 db.upsert(
2741 "kb",
2742 "a",
2743 &z,
2744 &json!({ "__quiver_sparse__": { "indices": [7], "values": [1.0] } }),
2745 )
2746 .unwrap();
2747 assert!(db.delete("kb", "b").unwrap());
2748
2749 let q = SparseVector {
2750 indices: vec![1, 2],
2751 values: vec![1.0, 1.0],
2752 };
2753 let incremental = sparse_ids(&mut db, &q);
2755 assert_eq!(incremental, vec!["c".to_owned()]);
2756
2757 db.collections.get_mut("kb").unwrap().stale = true;
2759 let rebuilt = sparse_ids(&mut db, &q);
2760 assert_eq!(incremental, rebuilt);
2761 }
2762
2763 #[test]
2764 fn sparse_index_is_rebuilt_on_reopen() {
2765 let tmp = tempfile::tempdir().unwrap();
2766 {
2767 let mut db = open(tmp.path());
2768 db.create_collection("kb", desc()).unwrap();
2769 db.upsert(
2770 "kb",
2771 "a",
2772 &[0.0, 0.0, 0.0, 0.0],
2773 &json!({ "__quiver_sparse__": { "indices": [1], "values": [1.0] } }),
2774 )
2775 .unwrap();
2776 }
2777 let mut db = open(tmp.path());
2778 assert!(db.collections.get("kb").unwrap().sparse.is_some());
2779 let q = SparseVector {
2780 indices: vec![1],
2781 values: vec![1.0],
2782 };
2783 assert_eq!(sparse_ids(&mut db, &q), vec!["a".to_owned()]);
2784 }
2785
2786 #[test]
2787 fn hybrid_sparse_honours_the_payload_filter() {
2788 let tmp = tempfile::tempdir().unwrap();
2789 let mut db = open(tmp.path());
2790 db.create_collection("kb", desc()).unwrap();
2791 let z = [0.0f32, 0.0, 0.0, 0.0];
2792 db.upsert(
2793 "kb",
2794 "a",
2795 &z,
2796 &json!({ "lang": "en", "__quiver_sparse__": { "indices": [1], "values": [5.0] } }),
2797 )
2798 .unwrap();
2799 db.upsert(
2800 "kb",
2801 "b",
2802 &z,
2803 &json!({ "lang": "fr", "__quiver_sparse__": { "indices": [1], "values": [9.0] } }),
2804 )
2805 .unwrap();
2806 let q = SparseVector {
2807 indices: vec![1],
2808 values: vec![1.0],
2809 };
2810 let params = SearchParams {
2811 k: 10,
2812 filter: Some(Filter::Eq {
2813 field: "lang".to_owned(),
2814 value: json!("en"),
2815 }),
2816 ..SearchParams::default()
2817 };
2818 let hits: Vec<String> = db
2819 .hybrid_search("kb", None, Some(&q), None, ¶ms, DEFAULT_RRF_K0)
2820 .unwrap()
2821 .into_iter()
2822 .map(|m| m.id)
2823 .collect();
2824 assert_eq!(hits, vec!["a".to_owned()]);
2826 }
2827
2828 #[test]
2829 fn hybrid_text_search_indexes_and_ranks_by_bm25() {
2830 let tmp = tempfile::tempdir().unwrap();
2831 let mut db = open(tmp.path());
2832 db.create_collection("kb", desc()).unwrap();
2833 let z = [0.0f32, 0.0, 0.0, 0.0];
2834 db.upsert(
2836 "kb",
2837 "cats",
2838 &z,
2839 &json!({ "__quiver_text__": "the quick brown cat jumps" }),
2840 )
2841 .unwrap();
2842 db.upsert(
2843 "kb",
2844 "dogs",
2845 &z,
2846 &json!({ "__quiver_text__": "a lazy dog sleeps all day" }),
2847 )
2848 .unwrap();
2849
2850 let params = SearchParams {
2851 k: 10,
2852 ..SearchParams::default()
2853 };
2854 let hits: Vec<String> = db
2857 .hybrid_search("kb", None, None, Some("cats"), ¶ms, DEFAULT_RRF_K0)
2858 .unwrap()
2859 .into_iter()
2860 .map(|m| m.id)
2861 .collect();
2862 assert_eq!(hits, vec!["cats".to_owned()], "only the cat doc matches");
2863
2864 assert!(
2866 db.hybrid_search("kb", None, None, Some("elephant"), ¶ms, DEFAULT_RRF_K0)
2867 .unwrap()
2868 .is_empty()
2869 );
2870
2871 let dense_q = [1.0, 0.0, 0.0, 0.0];
2873 db.upsert("kb", "near", &[1.0, 0.0, 0.0, 0.0], &json!({}))
2874 .unwrap();
2875 let fused: Vec<String> = db
2876 .hybrid_search(
2877 "kb",
2878 Some(&dense_q),
2879 None,
2880 Some("dog"),
2881 ¶ms,
2882 DEFAULT_RRF_K0,
2883 )
2884 .unwrap()
2885 .into_iter()
2886 .map(|m| m.id)
2887 .collect();
2888 assert!(
2889 fused.contains(&"near".to_owned()) && fused.contains(&"dogs".to_owned()),
2890 "dense match + lexical match both surface; got {fused:?}"
2891 );
2892 }
2893
2894 #[test]
2895 fn create_upsert_search_get_end_to_end() {
2896 let tmp = tempfile::tempdir().unwrap();
2897 let mut db = open(tmp.path());
2898 db.create_collection("items", desc()).unwrap();
2899 db.upsert(
2900 "items",
2901 "a",
2902 &[0.0, 0.0, 0.0, 0.0],
2903 &json!({"color": "red"}),
2904 )
2905 .unwrap();
2906 db.upsert(
2907 "items",
2908 "b",
2909 &[1.0, 0.0, 0.0, 0.0],
2910 &json!({"color": "blue"}),
2911 )
2912 .unwrap();
2913 db.upsert(
2914 "items",
2915 "c",
2916 &[5.0, 5.0, 5.0, 5.0],
2917 &json!({"color": "red"}),
2918 )
2919 .unwrap();
2920
2921 let near = db
2922 .search("items", &[0.1, 0.0, 0.0, 0.0], &SearchParams::default())
2923 .unwrap();
2924 assert_eq!(near[0].id, "a");
2925 assert_eq!(near[1].id, "b");
2926
2927 let got = db.get("items", "c").unwrap().unwrap();
2928 assert_eq!(got.vector, Some(vec![5.0, 5.0, 5.0, 5.0]));
2929 assert_eq!(got.payload, Some(json!({"color": "red"})));
2930 }
2931
2932 #[test]
2933 fn upsert_batch_produces_same_search_results_as_sequential() {
2934 let tmp_seq = tempfile::tempdir().unwrap();
2935 let tmp_bat = tempfile::tempdir().unwrap();
2936
2937 let vectors: Vec<[f32; 4]> = (0..20u32).map(|i| [i as f32, 0.0, 0.0, 0.0]).collect();
2938 let ids: Vec<String> = (0..20u32).map(|i| format!("p{i}")).collect();
2939 let payload = json!({});
2940
2941 {
2943 let mut db = open(tmp_seq.path());
2944 db.create_collection("c", desc()).unwrap();
2945 for (id, vec) in ids.iter().zip(vectors.iter()) {
2946 db.upsert("c", id, vec, &payload).unwrap();
2947 }
2948 }
2949 {
2951 let mut db = open(tmp_bat.path());
2952 db.create_collection("c", desc()).unwrap();
2953 let pts: Vec<(&str, &[f32], &serde_json::Value)> = ids
2954 .iter()
2955 .zip(vectors.iter())
2956 .map(|(id, v)| (id.as_str(), v.as_slice(), &payload))
2957 .collect();
2958 let n = db.upsert_batch("c", &pts).unwrap();
2959 assert_eq!(n, 20);
2960 }
2961
2962 let query = [10.0f32, 0.0, 0.0, 0.0];
2963 let params = SearchParams {
2964 k: 5,
2965 ..Default::default()
2966 };
2967
2968 let mut seq_db = open(tmp_seq.path());
2969 let mut bat_db = open(tmp_bat.path());
2970 let seq: Vec<String> = seq_db
2971 .search("c", &query, ¶ms)
2972 .unwrap()
2973 .into_iter()
2974 .map(|m| m.id)
2975 .collect();
2976 let bat: Vec<String> = bat_db
2977 .search("c", &query, ¶ms)
2978 .unwrap()
2979 .into_iter()
2980 .map(|m| m.id)
2981 .collect();
2982 assert_eq!(
2983 seq, bat,
2984 "batch and sequential produce different search results"
2985 );
2986 }
2987
2988 #[test]
2989 fn upsert_bulk_defers_the_index_then_searches_correctly() {
2990 let tmp = tempfile::tempdir().unwrap();
2991 let mut db = open(tmp.path());
2992 db.create_collection("c", desc()).unwrap();
2993 let vectors: Vec<[f32; 4]> = (0..20u32).map(|i| [i as f32, 0.0, 0.0, 0.0]).collect();
2994 let ids: Vec<String> = (0..20u32).map(|i| format!("p{i}")).collect();
2995 let plain = json!({});
2998 let sparse_payload = json!({ "__quiver_sparse__": { "indices": [7], "values": [1.0] } });
2999 let pts: Vec<(&str, &[f32], &serde_json::Value)> = ids
3000 .iter()
3001 .zip(vectors.iter())
3002 .map(|(id, v)| {
3003 let payload = if id == "p3" { &sparse_payload } else { &plain };
3004 (id.as_str(), v.as_slice(), payload)
3005 })
3006 .collect();
3007 let n = db.upsert_bulk("c", &pts).unwrap();
3008 assert_eq!(n, 20);
3009
3010 assert!(db.collections.get("c").unwrap().stale);
3012
3013 let query = [10.0f32, 0.0, 0.0, 0.0];
3015 let params = SearchParams {
3016 k: 5,
3017 ..Default::default()
3018 };
3019 let hits: Vec<String> = db
3020 .search("c", &query, ¶ms)
3021 .unwrap()
3022 .into_iter()
3023 .map(|m| m.id)
3024 .collect();
3025 assert_eq!(hits[0], "p10", "nearest to 10 is p10; got {hits:?}");
3026 assert!(!db.collections.get("c").unwrap().stale, "rebuilt on read");
3027
3028 let q = SparseVector {
3030 indices: vec![7],
3031 values: vec![1.0],
3032 };
3033 let sparse_hits: Vec<String> = db
3034 .hybrid_search("c", None, Some(&q), None, ¶ms, DEFAULT_RRF_K0)
3035 .unwrap()
3036 .into_iter()
3037 .map(|m| m.id)
3038 .collect();
3039 assert_eq!(sparse_hits, vec!["p3".to_owned()]);
3040 }
3041
3042 #[test]
3043 fn filtered_search_only_returns_matching_payloads() {
3044 let tmp = tempfile::tempdir().unwrap();
3045 let mut db = open(tmp.path());
3046 db.create_collection("items", desc()).unwrap();
3047 for i in 0..20u32 {
3048 let color = if i % 2 == 0 { "red" } else { "blue" };
3049 db.upsert(
3050 "items",
3051 &format!("p{i}"),
3052 &[i as f32, 0.0, 0.0, 0.0],
3053 &json!({"color": color, "n": i}),
3054 )
3055 .unwrap();
3056 }
3057 let params = SearchParams {
3058 k: 5,
3059 filter: Some(Filter::Eq {
3060 field: "color".into(),
3061 value: json!("red"),
3062 }),
3063 ef_search: 64,
3064 with_payload: true,
3065 with_vector: false,
3066 };
3067 let results = db.search("items", &[0.0; 4], ¶ms).unwrap();
3068 assert!(!results.is_empty());
3069 for m in &results {
3070 assert_eq!(m.payload.as_ref().unwrap()["color"], json!("red"));
3071 }
3072 }
3073
3074 #[test]
3075 fn persists_and_rebuilds_index_on_reopen() {
3076 let tmp = tempfile::tempdir().unwrap();
3077 {
3078 let mut db = open(tmp.path());
3079 db.create_collection("items", desc()).unwrap();
3080 for i in 0..50u32 {
3081 db.upsert(
3082 "items",
3083 &format!("p{i}"),
3084 &[i as f32, 1.0, 2.0, 3.0],
3085 &json!({}),
3086 )
3087 .unwrap();
3088 }
3089 db.checkpoint().unwrap();
3090 }
3091 let mut db = open(tmp.path());
3092 assert_eq!(db.len("items").unwrap(), 50);
3093 let res = db
3094 .search("items", &[7.0, 1.0, 2.0, 3.0], &SearchParams::default())
3095 .unwrap();
3096 assert_eq!(res[0].id, "p7");
3097 }
3098
3099 #[test]
3100 fn update_reflects_new_vector_after_rebuild() {
3101 let tmp = tempfile::tempdir().unwrap();
3102 let mut db = open(tmp.path());
3103 db.create_collection("items", desc()).unwrap();
3104 db.upsert("items", "a", &[0.0; 4], &json!({})).unwrap();
3105 db.upsert("items", "b", &[10.0, 0.0, 0.0, 0.0], &json!({}))
3106 .unwrap();
3107 db.upsert("items", "a", &[100.0, 0.0, 0.0, 0.0], &json!({}))
3109 .unwrap();
3110 let res = db
3111 .search("items", &[0.0; 4], &SearchParams::default())
3112 .unwrap();
3113 assert_eq!(res[0].id, "b");
3114 assert_eq!(
3115 db.get("items", "a").unwrap().unwrap().vector,
3116 Some(vec![100.0, 0.0, 0.0, 0.0])
3117 );
3118 }
3119
3120 #[test]
3121 fn delete_removes_from_search() {
3122 let tmp = tempfile::tempdir().unwrap();
3123 let mut db = open(tmp.path());
3124 db.create_collection("items", desc()).unwrap();
3125 db.upsert("items", "a", &[0.0; 4], &json!({})).unwrap();
3126 db.upsert("items", "b", &[1.0, 0.0, 0.0, 0.0], &json!({}))
3127 .unwrap();
3128 assert!(db.delete("items", "a").unwrap());
3129 let res = db
3130 .search("items", &[0.0; 4], &SearchParams::default())
3131 .unwrap();
3132 assert!(res.iter().all(|m| m.id != "a"));
3133 assert!(db.get("items", "a").unwrap().is_none());
3134 }
3135
3136 #[test]
3137 fn unknown_collection_errors() {
3138 let tmp = tempfile::tempdir().unwrap();
3139 let mut db = open(tmp.path());
3140 assert!(matches!(
3141 db.search("nope", &[0.0; 4], &SearchParams::default()),
3142 Err(Error::CollectionNotFound(_))
3143 ));
3144 db.create_collection("c", desc()).unwrap();
3145 assert!(matches!(
3146 db.create_collection("c", desc()),
3147 Err(Error::Core(quiver_core::CoreError::AlreadyExists(_)))
3148 ));
3149 }
3150
3151 fn desc_with(kind: IndexKind) -> Descriptor {
3152 Descriptor::new(4, Dtype::F32, DistanceMetric::L2).with_index(IndexSpec {
3153 kind,
3154 pq_subspaces: None,
3155 })
3156 }
3157
3158 #[test]
3159 fn vamana_and_ivf_collections_find_the_nearest_point() {
3160 for kind in [IndexKind::Vamana, IndexKind::Ivf] {
3161 let tmp = tempfile::tempdir().unwrap();
3162 let mut db = open(tmp.path());
3163 db.create_collection("c", desc_with(kind)).unwrap();
3164 for i in 0..40u32 {
3165 db.upsert(
3166 "c",
3167 &format!("p{i}"),
3168 &[i as f32, 0.0, 0.0, 0.0],
3169 &json!({}),
3170 )
3171 .unwrap();
3172 }
3173 let res = db
3175 .search("c", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
3176 .unwrap();
3177 assert_eq!(res[0].id, "p7", "{kind:?} nearest");
3178 }
3179 }
3180
3181 #[test]
3182 fn index_kind_persists_and_rebuilds_on_reopen() {
3183 let tmp = tempfile::tempdir().unwrap();
3184 {
3185 let mut db = open(tmp.path());
3186 db.create_collection("v", desc_with(IndexKind::Vamana))
3187 .unwrap();
3188 for i in 0..20u32 {
3189 db.upsert(
3190 "v",
3191 &format!("p{i}"),
3192 &[i as f32, 1.0, 2.0, 3.0],
3193 &json!({}),
3194 )
3195 .unwrap();
3196 }
3197 db.checkpoint().unwrap();
3198 }
3199 let mut db = open(tmp.path());
3200 assert_eq!(db.descriptor("v").unwrap().index.kind, IndexKind::Vamana);
3201 let res = db
3202 .search("v", &[7.0, 1.0, 2.0, 3.0], &SearchParams::default())
3203 .unwrap();
3204 assert_eq!(res[0].id, "p7");
3205 }
3206
3207 #[test]
3212 fn disk_index_loads_from_snapshot_without_rebuild_on_reopen() {
3213 let tmp = tempfile::tempdir().unwrap();
3214 let cid;
3215 {
3216 let mut db = open(tmp.path());
3217 db.create_collection("d", desc_with(IndexKind::DiskVamana))
3218 .unwrap();
3219 for i in 0..100u32 {
3220 db.upsert(
3221 "d",
3222 &format!("p{i}"),
3223 &[i as f32, 0.0, 0.0, 0.0],
3224 &json!({}),
3225 )
3226 .unwrap();
3227 }
3228 db.search("d", &[1.0, 0.0, 0.0, 0.0], &SearchParams::default())
3230 .unwrap();
3231 db.checkpoint().unwrap();
3232 for i in 100..115u32 {
3235 db.upsert(
3236 "d",
3237 &format!("p{i}"),
3238 &[i as f32, 0.0, 0.0, 0.0],
3239 &json!({}),
3240 )
3241 .unwrap();
3242 }
3243 cid = db.collections["d"].id;
3244 let base = open_disk_index(
3245 &db.store,
3246 cid,
3247 db.store.collection_codec_clone(cid).unwrap(),
3248 )
3249 .unwrap();
3250 assert_eq!(base.len(), 100, "base sealed at the checkpoint count");
3251 }
3252
3253 let mut db = open(tmp.path());
3254 assert_eq!(
3256 db.search("d", &[50.0, 0.0, 0.0, 0.0], &SearchParams::default())
3257 .unwrap()[0]
3258 .id,
3259 "p50",
3260 );
3261 assert_eq!(
3262 db.search("d", &[110.0, 0.0, 0.0, 0.0], &SearchParams::default())
3263 .unwrap()[0]
3264 .id,
3265 "p110",
3266 "post-checkpoint insert survived reopen via WAL-tail replay",
3267 );
3268 let base = open_disk_index(
3271 &db.store,
3272 cid,
3273 db.store.collection_codec_clone(cid).unwrap(),
3274 )
3275 .unwrap();
3276 assert_eq!(
3277 base.len(),
3278 100,
3279 "reopen loaded the base; it was not rebuilt"
3280 );
3281 }
3282
3283 #[test]
3286 fn disk_index_falls_back_to_rebuild_when_base_is_missing() {
3287 let tmp = tempfile::tempdir().unwrap();
3288 let base_path;
3289 {
3290 let mut db = open(tmp.path());
3291 db.create_collection("d", desc_with(IndexKind::DiskVamana))
3292 .unwrap();
3293 for i in 0..60u32 {
3294 db.upsert(
3295 "d",
3296 &format!("p{i}"),
3297 &[i as f32, 0.0, 0.0, 0.0],
3298 &json!({}),
3299 )
3300 .unwrap();
3301 }
3302 db.search("d", &[1.0, 0.0, 0.0, 0.0], &SearchParams::default())
3303 .unwrap();
3304 db.checkpoint().unwrap();
3305 let cid = db.collections["d"].id;
3306 base_path = db.store.index_dir(cid).join(DISK_INDEX_FILE);
3307 }
3308 std::fs::remove_file(&base_path).unwrap();
3310 {
3311 let mut db = open(tmp.path());
3312 assert_eq!(
3313 db.search("d", &[25.0, 0.0, 0.0, 0.0], &SearchParams::default())
3314 .unwrap()[0]
3315 .id,
3316 "p25",
3317 "rebuild fallback still answers correctly after a lost base",
3318 );
3319 assert!(
3322 base_path.exists(),
3323 "the fallback rebuild re-sealed the base file"
3324 );
3325 db.checkpoint().unwrap();
3326 }
3327 let len = std::fs::metadata(&base_path).unwrap().len();
3330 std::fs::OpenOptions::new()
3331 .write(true)
3332 .open(&base_path)
3333 .unwrap()
3334 .set_len(len / 2)
3335 .unwrap();
3336
3337 let mut db = open(tmp.path());
3338 assert_eq!(
3339 db.search("d", &[25.0, 0.0, 0.0, 0.0], &SearchParams::default())
3340 .unwrap()[0]
3341 .id,
3342 "p25",
3343 "rebuild fallback still answers correctly after a torn base",
3344 );
3345 }
3346
3347 #[test]
3348 fn ivf_upserts_and_deletes_incrementally_without_rebuild() {
3349 let tmp = tempfile::tempdir().unwrap();
3350 let mut db = open(tmp.path());
3351 db.create_collection("c", desc_with(IndexKind::Ivf))
3352 .unwrap();
3353 for i in 0..50u32 {
3354 db.upsert(
3355 "c",
3356 &format!("p{i}"),
3357 &[i as f32, 0.0, 0.0, 0.0],
3358 &json!({}),
3359 )
3360 .unwrap();
3361 }
3362 let _ = db
3364 .search("c", &[1.0, 0.0, 0.0, 0.0], &SearchParams::default())
3365 .unwrap();
3366 assert!(!db.collections["c"].stale, "the search built the index");
3367
3368 db.upsert("c", "far", &[500.0, 0.0, 0.0, 0.0], &json!({}))
3370 .unwrap();
3371 assert!(!db.collections["c"].stale, "ivf insert stayed incremental");
3372 let res = db
3373 .search("c", &[500.0, 0.0, 0.0, 0.0], &SearchParams::default())
3374 .unwrap();
3375 assert_eq!(res[0].id, "far");
3376
3377 assert!(db.delete("c", "far").unwrap());
3379 assert!(!db.collections["c"].stale, "ivf delete stayed incremental");
3380 let res = db
3381 .search("c", &[500.0, 0.0, 0.0, 0.0], &SearchParams::default())
3382 .unwrap();
3383 assert!(res.iter().all(|m| m.id != "far"), "deleted point is gone");
3384 }
3385
3386 #[test]
3387 fn ivf_incremental_update_replaces_the_vector() {
3388 let tmp = tempfile::tempdir().unwrap();
3389 let mut db = open(tmp.path());
3390 db.create_collection("c", desc_with(IndexKind::Ivf))
3391 .unwrap();
3392 for i in 0..30u32 {
3393 db.upsert(
3394 "c",
3395 &format!("p{i}"),
3396 &[i as f32, 0.0, 0.0, 0.0],
3397 &json!({}),
3398 )
3399 .unwrap();
3400 }
3401 let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
3402 db.upsert("c", "p5", &[900.0, 0.0, 0.0, 0.0], &json!({}))
3404 .unwrap();
3405 assert!(!db.collections["c"].stale);
3406 let at_new = db
3407 .search("c", &[900.0, 0.0, 0.0, 0.0], &SearchParams::default())
3408 .unwrap();
3409 assert_eq!(at_new[0].id, "p5", "p5 found at its new location");
3410 let at_old = db
3411 .search("c", &[5.0, 0.0, 0.0, 0.0], &SearchParams::default())
3412 .unwrap();
3413 assert!(at_old.iter().all(|m| m.id != "p5"), "stale vector is gone");
3414 }
3415
3416 #[test]
3417 fn ivf_reinsert_after_incremental_delete_is_found() {
3418 let tmp = tempfile::tempdir().unwrap();
3419 let mut db = open(tmp.path());
3420 db.create_collection("c", desc_with(IndexKind::Ivf))
3421 .unwrap();
3422 for i in 0..20u32 {
3423 db.upsert(
3424 "c",
3425 &format!("p{i}"),
3426 &[i as f32, 0.0, 0.0, 0.0],
3427 &json!({}),
3428 )
3429 .unwrap();
3430 }
3431 let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
3432 assert!(db.delete("c", "p3").unwrap());
3433 assert!(!db.collections["c"].stale);
3434 db.upsert("c", "p3", &[3.0, 0.0, 0.0, 0.0], &json!({}))
3436 .unwrap();
3437 assert!(!db.collections["c"].stale);
3438 let res = db
3439 .search("c", &[3.0, 0.0, 0.0, 0.0], &SearchParams::default())
3440 .unwrap();
3441 assert_eq!(res[0].id, "p3");
3442 }
3443
3444 #[test]
3445 fn hnsw_in_place_update_falls_back_to_rebuild() {
3446 let tmp = tempfile::tempdir().unwrap();
3448 let mut db = open(tmp.path());
3449 db.create_collection("c", desc()).unwrap();
3450 for i in 0..10u32 {
3451 db.upsert(
3452 "c",
3453 &format!("p{i}"),
3454 &[i as f32, 0.0, 0.0, 0.0],
3455 &json!({}),
3456 )
3457 .unwrap();
3458 }
3459 let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
3460 assert!(!db.collections["c"].stale);
3461 db.upsert("c", "p2", &[42.0, 0.0, 0.0, 0.0], &json!({}))
3462 .unwrap();
3463 assert!(db.collections["c"].stale, "hnsw update schedules a rebuild");
3464 let res = db
3466 .search("c", &[42.0, 0.0, 0.0, 0.0], &SearchParams::default())
3467 .unwrap();
3468 assert_eq!(res[0].id, "p2");
3469 }
3470
3471 #[test]
3472 fn unsupported_index_configurations_are_rejected() {
3473 let tmp = tempfile::tempdir().unwrap();
3474 let mut db = open(tmp.path());
3475 let dot_vamana =
3477 Descriptor::new(4, Dtype::F32, DistanceMetric::Dot).with_index(IndexSpec {
3478 kind: IndexKind::Vamana,
3479 pq_subspaces: None,
3480 });
3481 assert!(matches!(
3482 db.create_collection("a", dot_vamana),
3483 Err(Error::Unsupported(_))
3484 ));
3485 let dot_disk = Descriptor::new(4, Dtype::F32, DistanceMetric::Dot).with_index(IndexSpec {
3487 kind: IndexKind::DiskVamana,
3488 pq_subspaces: None,
3489 });
3490 assert!(matches!(
3491 db.create_collection("b", dot_disk),
3492 Err(Error::Unsupported(_))
3493 ));
3494 }
3495
3496 #[test]
3497 fn dcpe_collections_require_the_l2_metric() {
3498 let tmp = tempfile::tempdir().unwrap();
3499 let mut db = open(tmp.path());
3500 for metric in [DistanceMetric::Cosine, DistanceMetric::Dot] {
3502 let bad = Descriptor::new(4, Dtype::F32, metric)
3503 .with_vector_encryption(VectorEncryption::Dcpe);
3504 assert!(matches!(
3505 db.create_collection("bad", bad),
3506 Err(Error::Unsupported(_))
3507 ));
3508 }
3509 let good = Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
3511 .with_vector_encryption(VectorEncryption::Dcpe);
3512 db.create_collection("enc", good)
3513 .expect("l2 dcpe collection");
3514 assert_eq!(
3515 db.descriptor("enc").expect("descriptor").vector_encryption,
3516 VectorEncryption::Dcpe
3517 );
3518 }
3519
3520 #[test]
3521 fn client_side_collections_are_fetch_only_and_reject_search() {
3522 let tmp = tempfile::tempdir().unwrap();
3523 let mut db = open(tmp.path());
3524 let desc = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
3527 .with_vector_encryption(VectorEncryption::ClientSide);
3528 db.create_collection("vault", desc)
3529 .expect("create client-side collection");
3530 assert!(matches!(
3532 db.collections["vault"].index,
3533 CollectionIndex::None
3534 ));
3535
3536 for i in 0..5 {
3539 let tier = if i < 2 { "vip" } else { "std" };
3540 db.upsert(
3541 "vault",
3542 &format!("p{i}"),
3543 &[0.0; 4],
3544 &serde_json::json!({ "__quiver_vec__": format!("ct-{i}"), "tier": tier }),
3545 )
3546 .expect("upsert");
3547 }
3548 assert_eq!(db.len("vault").unwrap(), 5);
3549 assert!(matches!(
3551 db.collections["vault"].index,
3552 CollectionIndex::None
3553 ));
3554
3555 assert!(matches!(
3557 db.search("vault", &[0.0; 4], &SearchParams::default()),
3558 Err(Error::Unsupported(_))
3559 ));
3560
3561 let all = db.fetch("vault", None, 100, true, false).unwrap();
3564 assert_eq!(all.len(), 5);
3565 assert!(
3566 all.iter()
3567 .all(|m| m.payload.is_some() && m.vector.is_none())
3568 );
3569
3570 let vip = db
3572 .fetch(
3573 "vault",
3574 Some(&Filter::Eq {
3575 field: "tier".to_owned(),
3576 value: serde_json::json!("vip"),
3577 }),
3578 100,
3579 false,
3580 false,
3581 )
3582 .unwrap();
3583 assert_eq!(vip.len(), 2);
3584 assert_eq!(db.fetch("vault", None, 2, false, false).unwrap().len(), 2);
3586
3587 assert_eq!(db.get("vault", "p0").unwrap().unwrap().id, "p0");
3590 assert!(db.delete("vault", "p0").unwrap());
3591 assert_eq!(db.len("vault").unwrap(), 4);
3592 }
3593
3594 #[test]
3595 fn client_side_encryption_rejects_multivector() {
3596 let tmp = tempfile::tempdir().unwrap();
3597 let mut db = open(tmp.path());
3598 let desc = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
3599 .with_multivector(true)
3600 .with_vector_encryption(VectorEncryption::ClientSide);
3601 assert!(matches!(
3602 db.create_collection("bad", desc),
3603 Err(Error::Unsupported(_))
3604 ));
3605 }
3606
3607 fn contains_file(dir: &Path, name: &str) -> bool {
3609 std::fs::read_dir(dir).is_ok_and(|rd| {
3610 rd.flatten().any(|e| {
3611 let p = e.path();
3612 if p.is_dir() {
3613 contains_file(&p, name)
3614 } else {
3615 p.file_name().is_some_and(|f| f == name)
3616 }
3617 })
3618 })
3619 }
3620
3621 #[test]
3622 fn disk_index_collection_searches_persists_and_writes_an_artifact() {
3623 let tmp = tempfile::tempdir().unwrap();
3624 {
3625 let mut db = open(tmp.path());
3626 db.create_collection("d", desc_with(IndexKind::DiskVamana))
3627 .unwrap();
3628 for i in 0..40u32 {
3629 db.upsert(
3630 "d",
3631 &format!("p{i}"),
3632 &[i as f32, 0.0, 0.0, 0.0],
3633 &json!({}),
3634 )
3635 .unwrap();
3636 }
3637 let res = db
3638 .search("d", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
3639 .unwrap();
3640 assert_eq!(res[0].id, "p7");
3641 db.checkpoint().unwrap();
3642 }
3643 assert!(
3645 contains_file(tmp.path(), "vamana.qvx"),
3646 "disk index file missing"
3647 );
3648 let mut db = open(tmp.path());
3650 assert_eq!(
3651 db.descriptor("d").unwrap().index.kind,
3652 IndexKind::DiskVamana
3653 );
3654 let res = db
3655 .search("d", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
3656 .unwrap();
3657 assert_eq!(res[0].id, "p7");
3658 }
3659
3660 #[test]
3661 fn graph_collections_maintain_writes_incrementally() {
3662 for kind in [IndexKind::Vamana, IndexKind::DiskVamana] {
3666 let tmp = tempfile::tempdir().unwrap();
3667 let mut db = open(tmp.path());
3668 db.create_collection("c", desc_with(kind)).unwrap();
3669 for i in 0..40u32 {
3670 db.upsert(
3671 "c",
3672 &format!("p{i}"),
3673 &[i as f32, 0.0, 0.0, 0.0],
3674 &json!({}),
3675 )
3676 .unwrap();
3677 }
3678 let res = db
3680 .search("c", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
3681 .unwrap();
3682 assert_eq!(res[0].id, "p7", "{kind:?} base nearest");
3683
3684 db.upsert("c", "p7b", &[7.4, 0.0, 0.0, 0.0], &json!({}))
3687 .unwrap();
3688 let res = db
3689 .search("c", &[7.45, 0.0, 0.0, 0.0], &SearchParams::default())
3690 .unwrap();
3691 assert_eq!(res[0].id, "p7b", "{kind:?} delta insert not found");
3692
3693 assert!(db.delete("c", "p7").unwrap());
3695 let res = db
3696 .search("c", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
3697 .unwrap();
3698 assert!(
3699 res.iter().all(|m| m.id != "p7"),
3700 "{kind:?} deleted id returned"
3701 );
3702
3703 db.upsert("c", "p20", &[500.0, 0.0, 0.0, 0.0], &json!({}))
3706 .unwrap();
3707 let res = db
3708 .search("c", &[500.0, 0.0, 0.0, 0.0], &SearchParams::default())
3709 .unwrap();
3710 assert_eq!(res[0].id, "p20", "{kind:?} updated vector not at new spot");
3711 let res = db
3712 .search("c", &[20.0, 0.0, 0.0, 0.0], &SearchParams::default())
3713 .unwrap();
3714 assert_ne!(
3715 res[0].id, "p20",
3716 "{kind:?} stale copy still nearest old spot"
3717 );
3718 }
3719 }
3720
3721 #[test]
3722 fn graph_consolidates_under_heavy_churn() {
3723 let tmp = tempfile::tempdir().unwrap();
3727 let mut db = open(tmp.path());
3728 db.create_collection("c", desc_with(IndexKind::Vamana))
3729 .unwrap();
3730 for i in 0..50u32 {
3731 db.upsert(
3732 "c",
3733 &format!("p{i}"),
3734 &[i as f32, 0.0, 0.0, 0.0],
3735 &json!({}),
3736 )
3737 .unwrap();
3738 }
3739 let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
3740
3741 let deleted: Vec<String> = (0..15u32).map(|i| format!("p{i}")).collect();
3742 for i in 0..15u32 {
3743 assert!(db.delete("c", &format!("p{i}")).unwrap());
3744 db.upsert(
3745 "c",
3746 &format!("q{i}"),
3747 &[1000.0 + i as f32, 0.0, 0.0, 0.0],
3748 &json!({}),
3749 )
3750 .unwrap();
3751 }
3752
3753 let near_origin = db
3754 .search("c", &[5.0, 0.0, 0.0, 0.0], &SearchParams::default())
3755 .unwrap();
3756 assert!(
3757 near_origin.iter().all(|m| !deleted.contains(&m.id)),
3758 "a churned-out id was returned"
3759 );
3760 let near_q = db
3761 .search("c", &[1007.0, 0.0, 0.0, 0.0], &SearchParams::default())
3762 .unwrap();
3763 assert_eq!(near_q[0].id, "q7", "new point not found after churn");
3764
3765 db.checkpoint().unwrap();
3766 drop(db);
3767 let mut db = open(tmp.path());
3768 let near_q = db
3769 .search("c", &[1007.0, 0.0, 0.0, 0.0], &SearchParams::default())
3770 .unwrap();
3771 assert_eq!(near_q[0].id, "q7", "new point lost across reopen");
3772 let near_origin = db
3773 .search("c", &[5.0, 0.0, 0.0, 0.0], &SearchParams::default())
3774 .unwrap();
3775 assert!(
3776 near_origin.iter().all(|m| !deleted.contains(&m.id)),
3777 "a churned-out id resurfaced after reopen"
3778 );
3779 }
3780
3781 #[test]
3782 fn multivector_writes_are_incremental_and_match_a_rebuild() {
3783 let dir = |theta: f32| vec![theta.cos(), theta.sin(), 0.0, 0.0];
3792 let doc = |theta: f32| vec![dir(theta), dir(theta)];
3793 for kind in [
3794 IndexKind::Ivf,
3795 IndexKind::Hnsw,
3796 IndexKind::Vamana,
3797 IndexKind::Colbert,
3798 ] {
3799 let tmp = tempfile::tempdir().unwrap();
3800 let desc = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
3801 .with_multivector(true)
3802 .with_index(IndexSpec {
3803 kind,
3804 pq_subspaces: None,
3805 });
3806 let mut db = open(tmp.path());
3807 db.create_collection("m", desc).unwrap();
3808 for i in 1..=10u32 {
3810 db.upsert_document(
3811 "m",
3812 &format!("d{i}"),
3813 &doc(0.1 * i as f32),
3814 &json!({ "i": i }),
3815 )
3816 .unwrap();
3817 }
3818 let q = vec![dir(0.0)];
3819 let top = |db: &mut Database| {
3820 db.search_multi_vector(
3821 "m",
3822 &q,
3823 &SearchParams {
3824 k: 3,
3825 ..Default::default()
3826 },
3827 )
3828 .unwrap()
3829 .into_iter()
3830 .map(|m| m.id)
3831 .collect::<Vec<_>>()
3832 };
3833 assert_eq!(top(&mut db), vec!["d1", "d2", "d3"], "{kind:?} initial");
3834
3835 assert!(db.delete_document("m", "d1").unwrap());
3837 assert_eq!(
3838 top(&mut db),
3839 vec!["d2", "d3", "d4"],
3840 "{kind:?} after delete"
3841 );
3842
3843 db.upsert_document("m", "d10", &doc(0.0), &json!({ "i": 10 }))
3845 .unwrap();
3846 assert_eq!(top(&mut db)[0], "d10", "{kind:?} after update");
3847
3848 db.upsert_document("m", "d11", &doc(0.05), &json!({ "i": 11 }))
3850 .unwrap();
3851 let r = top(&mut db);
3852 assert_eq!(r[0], "d10", "{kind:?}");
3853 assert_eq!(r[1], "d11", "{kind:?} new doc not ranked");
3854
3855 db.upsert_document("m", "d8", &[dir(0.8)], &json!({ "i": 8 }))
3857 .unwrap();
3858 let d8 = db.get_document("m", "d8", true).unwrap().unwrap();
3859 assert_eq!(d8.vectors.unwrap().len(), 1, "{kind:?} trailing token kept");
3860
3861 let before = top(&mut db);
3863 drop(db);
3864 let mut db = open(tmp.path());
3865 assert_eq!(top(&mut db), before, "{kind:?} incremental != rebuild");
3866 assert!(
3867 db.get_document("m", "d1", false).unwrap().is_none(),
3868 "{kind:?} deleted doc resurfaced"
3869 );
3870 }
3871 }
3872
3873 #[test]
3874 fn colbert_index_requires_multivector() {
3875 let tmp = tempfile::tempdir().unwrap();
3876 let mut db = open(tmp.path());
3877 let single = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine).with_index(IndexSpec {
3880 kind: IndexKind::Colbert,
3881 pq_subspaces: None,
3882 });
3883 assert!(matches!(
3884 db.create_collection("c", single),
3885 Err(Error::Unsupported(_))
3886 ));
3887 let multi = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
3889 .with_multivector(true)
3890 .with_index(IndexSpec {
3891 kind: IndexKind::Colbert,
3892 pq_subspaces: None,
3893 });
3894 assert!(db.create_collection("m", multi).is_ok());
3895 }
3896
3897 fn desc_filterable() -> Descriptor {
3902 Descriptor::new(4, Dtype::F32, DistanceMetric::L2).with_filterable(vec![
3903 FilterableField::keyword("city"),
3904 FilterableField::numeric("n"),
3905 ])
3906 }
3907
3908 fn seed_cities(db: &mut Database) {
3913 const CITIES: [&str; 3] = ["paris", "lyon", "rome"];
3914 db.create_collection("c", desc_filterable()).unwrap();
3915 for i in 0..30u32 {
3916 db.upsert(
3917 "c",
3918 &format!("p{i}"),
3919 &[i as f32, 0.0, 0.0, 0.0],
3920 &json!({"city": CITIES[i as usize % 3], "n": i}),
3921 )
3922 .unwrap();
3923 }
3924 db.checkpoint().unwrap();
3925 }
3926
3927 #[test]
3928 fn hybrid_equality_prefilter_is_exact() {
3929 let tmp = tempfile::tempdir().unwrap();
3930 let mut db = open(tmp.path());
3931 seed_cities(&mut db);
3932 let params = SearchParams {
3933 k: 5,
3934 filter: Some(Filter::Eq {
3935 field: "city".into(),
3936 value: json!("lyon"),
3937 }),
3938 ..SearchParams::default()
3939 };
3940 let res = db.search("c", &[0.0; 4], ¶ms).unwrap();
3941 assert!(!res.is_empty());
3942 assert_eq!(res[0].id, "p1");
3944 for m in &res {
3945 assert_eq!(m.payload.as_ref().unwrap()["city"], json!("lyon"));
3946 }
3947 }
3948
3949 #[test]
3950 fn hybrid_numeric_range_prefilter_is_exact() {
3951 let tmp = tempfile::tempdir().unwrap();
3952 let mut db = open(tmp.path());
3953 seed_cities(&mut db);
3954 let params = SearchParams {
3955 k: 4,
3956 filter: Some(Filter::Gte {
3957 field: "n".into(),
3958 value: json!(10),
3959 }),
3960 ..SearchParams::default()
3961 };
3962 let res = db.search("c", &[0.0; 4], ¶ms).unwrap();
3963 assert_eq!(res[0].id, "p10");
3965 for m in &res {
3966 assert!(m.payload.as_ref().unwrap()["n"].as_u64().unwrap() >= 10);
3967 }
3968 }
3969
3970 #[test]
3971 fn hybrid_unsatisfiable_filter_returns_empty() {
3972 let tmp = tempfile::tempdir().unwrap();
3973 let mut db = open(tmp.path());
3974 seed_cities(&mut db);
3975 let params = SearchParams {
3978 filter: Some(Filter::Eq {
3979 field: "city".into(),
3980 value: json!("atlantis"),
3981 }),
3982 ..SearchParams::default()
3983 };
3984 assert!(db.search("c", &[0.0; 4], ¶ms).unwrap().is_empty());
3985 }
3986
3987 #[test]
3988 fn hybrid_and_or_composition_is_exact() {
3989 let tmp = tempfile::tempdir().unwrap();
3990 let mut db = open(tmp.path());
3991 seed_cities(&mut db);
3992 let params = SearchParams {
3995 k: 10,
3996 filter: Some(Filter::And(vec![
3997 Filter::In {
3998 field: "city".into(),
3999 values: vec![json!("paris"), json!("rome")],
4000 },
4001 Filter::Lt {
4002 field: "n".into(),
4003 value: json!(12),
4004 },
4005 ])),
4006 ..SearchParams::default()
4007 };
4008 let res = db.search("c", &[0.0; 4], ¶ms).unwrap();
4009 assert_eq!(res[0].id, "p0");
4011 for m in &res {
4012 let payload = m.payload.as_ref().unwrap();
4013 let city = payload["city"].as_str().unwrap();
4014 assert!(city == "paris" || city == "rome");
4015 assert!(payload["n"].as_u64().unwrap() < 12);
4016 }
4017 }
4018
4019 #[test]
4020 fn hybrid_rechecks_non_indexable_clause() {
4021 let tmp = tempfile::tempdir().unwrap();
4022 let mut db = open(tmp.path());
4023 seed_cities(&mut db);
4024 let params = SearchParams {
4027 k: 10,
4028 filter: Some(Filter::And(vec![
4029 Filter::Eq {
4030 field: "city".into(),
4031 value: json!("paris"),
4032 },
4033 Filter::Not(Box::new(Filter::Eq {
4034 field: "n".into(),
4035 value: json!(0),
4036 })),
4037 ])),
4038 ..SearchParams::default()
4039 };
4040 let res = db.search("c", &[0.0; 4], ¶ms).unwrap();
4041 assert!(res.iter().all(|m| m.id != "p0"));
4042 assert_eq!(res[0].id, "p3");
4044 for m in &res {
4045 assert_eq!(m.payload.as_ref().unwrap()["city"], json!("paris"));
4046 }
4047 }
4048
4049 #[test]
4050 fn post_filter_fallback_on_undeclared_field_is_correct() {
4051 let tmp = tempfile::tempdir().unwrap();
4052 let mut db = open(tmp.path());
4053 db.create_collection(
4056 "c",
4057 Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
4058 .with_filterable(vec![FilterableField::keyword("city")]),
4059 )
4060 .unwrap();
4061 for i in 0..20u32 {
4062 let tier = if i % 2 == 0 { "gold" } else { "silver" };
4063 db.upsert(
4064 "c",
4065 &format!("p{i}"),
4066 &[i as f32, 0.0, 0.0, 0.0],
4067 &json!({"city": "paris", "tier": tier}),
4068 )
4069 .unwrap();
4070 }
4071 let params = SearchParams {
4072 k: 5,
4073 filter: Some(Filter::Eq {
4074 field: "tier".into(),
4075 value: json!("gold"),
4076 }),
4077 ..SearchParams::default()
4078 };
4079 let res = db.search("c", &[0.0; 4], ¶ms).unwrap();
4080 assert!(!res.is_empty());
4081 for m in &res {
4082 assert_eq!(m.payload.as_ref().unwrap()["tier"], json!("gold"));
4083 }
4084 }
4085
4086 #[test]
4087 fn leaf_predicate_maps_only_indexable_filterable_leaves() {
4088 let fields = vec![
4089 FilterableField::keyword("city"),
4090 FilterableField::numeric("n"),
4091 ];
4092 assert_eq!(
4094 leaf_predicate(
4095 &Filter::Eq {
4096 field: "city".into(),
4097 value: json!("paris")
4098 },
4099 &fields
4100 ),
4101 Some(SecPredicate::Eq {
4102 field: "city".into(),
4103 value: SecValue::Keyword("paris".into())
4104 })
4105 );
4106 assert_eq!(
4108 leaf_predicate(
4109 &Filter::Gte {
4110 field: "n".into(),
4111 value: json!(3)
4112 },
4113 &fields
4114 ),
4115 Some(SecPredicate::Range {
4116 field: "n".into(),
4117 lo: Some(SecValue::Numeric(3.0)),
4118 hi: None,
4119 lo_inclusive: true,
4120 hi_inclusive: false,
4121 })
4122 );
4123 let undeclared = Filter::Eq {
4125 field: "tier".into(),
4126 value: json!("gold"),
4127 };
4128 let mismatch = Filter::Eq {
4129 field: "city".into(),
4130 value: json!(5),
4131 };
4132 let ne = Filter::Ne {
4133 field: "city".into(),
4134 value: json!("x"),
4135 };
4136 let exists = Filter::Exists {
4137 field: "city".into(),
4138 };
4139 assert!(leaf_predicate(&undeclared, &fields).is_none());
4140 assert!(leaf_predicate(&mismatch, &fields).is_none());
4141 assert!(leaf_predicate(&ne, &fields).is_none());
4142 assert!(leaf_predicate(&exists, &fields).is_none());
4143 }
4144
4145 fn ivf_index_dir(root: &Path) -> std::path::PathBuf {
4149 root.join("collections").join("0000000000").join("index")
4150 }
4151
4152 fn idx_snapshot_files(root: &Path) -> Vec<String> {
4153 let mut v: Vec<String> = std::fs::read_dir(ivf_index_dir(root))
4154 .map(|rd| {
4155 rd.filter_map(std::result::Result::ok)
4156 .filter_map(|e| e.file_name().to_str().map(str::to_owned))
4157 .filter(|n| n.starts_with("idx-"))
4158 .collect()
4159 })
4160 .unwrap_or_default();
4161 v.sort();
4162 v
4163 }
4164
4165 fn nearest(db: &mut Database, q: &[f32]) -> Vec<String> {
4166 db.search("c", q, &SearchParams::default())
4167 .unwrap()
4168 .into_iter()
4169 .map(|m| m.id)
4170 .collect()
4171 }
4172
4173 fn seed_ivf(db: &mut Database, n: u32) {
4174 db.create_collection("c", desc_with(IndexKind::Ivf))
4175 .unwrap();
4176 for i in 0..n {
4177 db.upsert(
4178 "c",
4179 &format!("p{i}"),
4180 &[i as f32, 0.0, 0.0, 0.0],
4181 &json!({}),
4182 )
4183 .unwrap();
4184 }
4185 let _ = nearest(db, &[1.0, 0.0, 0.0, 0.0]);
4187 }
4188
4189 #[test]
4190 fn ivf_snapshot_is_written_at_checkpoint() {
4191 let tmp = tempfile::tempdir().unwrap();
4192 let mut db = open(tmp.path());
4193 seed_ivf(&mut db, 40);
4194 db.checkpoint().unwrap();
4195 assert_eq!(idx_snapshot_files(tmp.path()).len(), 1);
4196 }
4197
4198 #[test]
4199 fn ivf_loads_from_snapshot_rather_than_rebuilding() {
4200 let tmp = tempfile::tempdir().unwrap();
4201 {
4202 let mut db = open(tmp.path());
4203 db.create_collection("c", desc_with(IndexKind::Ivf))
4204 .unwrap();
4205 db.upsert("c", "a", &[0.0, 0.0, 0.0, 0.0], &json!({}))
4206 .unwrap();
4207 db.upsert("c", "m", &[1.0, 0.0, 0.0, 0.0], &json!({}))
4208 .unwrap();
4209 let _ = nearest(&mut db, &[0.0, 0.0, 0.0, 0.0]);
4211 db.upsert("c", "z", &[2.0, 0.0, 0.0, 0.0], &json!({}))
4213 .unwrap();
4214 db.upsert("c", "b", &[3.0, 0.0, 0.0, 0.0], &json!({}))
4215 .unwrap();
4216 db.checkpoint().unwrap();
4217 assert_eq!(db.collections["c"].int_to_ext, ["a", "m", "z", "b"]);
4218 }
4219 let db = open(tmp.path());
4220 assert_eq!(
4223 db.collections["c"].int_to_ext,
4224 ["a", "m", "z", "b"],
4225 "index was rebuilt, not loaded from the snapshot"
4226 );
4227 }
4228
4229 #[test]
4230 fn ivf_recovery_replays_post_checkpoint_upserts() {
4231 let tmp = tempfile::tempdir().unwrap();
4232 {
4233 let mut db = open(tmp.path());
4234 seed_ivf(&mut db, 30);
4235 db.checkpoint().unwrap();
4236 db.upsert("c", "far", &[500.0, 0.0, 0.0, 0.0], &json!({}))
4238 .unwrap();
4239 }
4240 let mut db = open(tmp.path());
4241 assert_eq!(nearest(&mut db, &[500.0, 0.0, 0.0, 0.0])[0], "far");
4242 assert_eq!(nearest(&mut db, &[1.0, 0.0, 0.0, 0.0])[0], "p1");
4243 }
4244
4245 #[test]
4246 fn ivf_recovery_replays_post_checkpoint_deletes() {
4247 let tmp = tempfile::tempdir().unwrap();
4248 {
4249 let mut db = open(tmp.path());
4250 seed_ivf(&mut db, 30);
4251 db.checkpoint().unwrap();
4252 assert!(db.delete("c", "p7").unwrap());
4253 }
4254 let mut db = open(tmp.path());
4255 assert!(
4256 nearest(&mut db, &[7.0, 0.0, 0.0, 0.0])
4257 .iter()
4258 .all(|id| id != "p7")
4259 );
4260 assert!(db.get("c", "p7").unwrap().is_none());
4261 assert!(db.get("c", "p6").unwrap().is_some());
4262 }
4263
4264 #[test]
4265 fn ivf_recovery_replays_post_checkpoint_updates() {
4266 let tmp = tempfile::tempdir().unwrap();
4267 {
4268 let mut db = open(tmp.path());
4269 seed_ivf(&mut db, 30);
4270 db.checkpoint().unwrap();
4271 db.upsert("c", "p0", &[999.0, 0.0, 0.0, 0.0], &json!({}))
4273 .unwrap();
4274 }
4275 let mut db = open(tmp.path());
4276 assert_eq!(nearest(&mut db, &[999.0, 0.0, 0.0, 0.0])[0], "p0");
4277 assert_ne!(
4278 nearest(&mut db, &[0.0, 0.0, 0.0, 0.0])[0],
4279 "p0",
4280 "the stale p0 vector survived the update"
4281 );
4282 }
4283
4284 #[test]
4285 fn corrupt_ivf_snapshot_falls_back_to_rebuild() {
4286 let tmp = tempfile::tempdir().unwrap();
4287 {
4288 let mut db = open(tmp.path());
4289 seed_ivf(&mut db, 30);
4290 db.checkpoint().unwrap();
4291 }
4292 let files = idx_snapshot_files(tmp.path());
4294 assert_eq!(files.len(), 1);
4295 std::fs::write(ivf_index_dir(tmp.path()).join(&files[0]), b"corrupt").unwrap();
4296
4297 let mut db = open(tmp.path());
4298 assert_eq!(nearest(&mut db, &[7.0, 0.0, 0.0, 0.0])[0], "p7");
4299 }
4300
4301 fn mv_desc() -> Descriptor {
4304 Descriptor::new(3, Dtype::F32, DistanceMetric::Cosine).with_multivector(true)
4305 }
4306
4307 fn bf_rank(query: &[Vec<f32>], corpus: &[(&str, Vec<Vec<f32>>)]) -> Vec<(String, f32)> {
4310 let mut v: Vec<(String, f32)> = corpus
4311 .iter()
4312 .map(|(id, toks)| ((*id).to_owned(), max_sim(Metric::Cosine, query, toks)))
4313 .collect();
4314 v.sort_by(|a, b| b.1.total_cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
4315 v
4316 }
4317
4318 #[test]
4319 fn multivector_search_ranks_documents_by_maxsim() {
4320 let tmp = tempfile::tempdir().unwrap();
4321 let mut db = open(tmp.path());
4322 db.create_collection("docs", mv_desc()).unwrap();
4323 let corpus: Vec<(&str, Vec<Vec<f32>>)> = vec![
4324 ("d_cat", vec![vec![1.0, 0.0, 0.0], vec![0.0, 1.0, 0.0]]),
4325 ("d_dog", vec![vec![0.0, 1.0, 0.0], vec![0.0, 0.0, 1.0]]),
4326 (
4327 "d_mix",
4328 vec![
4329 vec![1.0, 1.0, 0.0],
4330 vec![0.0, 0.0, 1.0],
4331 vec![1.0, 0.0, 1.0],
4332 ],
4333 ),
4334 ];
4335 for (id, toks) in &corpus {
4336 db.upsert_document("docs", id, toks, &json!({ "id": id }))
4337 .unwrap();
4338 }
4339 assert_eq!(db.document_count("docs").unwrap(), 3);
4340
4341 let query = vec![vec![1.0, 0.0, 0.0], vec![0.0, 0.0, 1.0]];
4342 let params = SearchParams {
4343 k: 3,
4344 with_payload: false,
4345 ..SearchParams::default()
4346 };
4347 let got = db.search_multi_vector("docs", &query, ¶ms).unwrap();
4348 let expected = bf_rank(&query, &corpus);
4349
4350 assert_eq!(got.len(), 3);
4351 for (g, (eid, escore)) in got.iter().zip(expected.iter()) {
4352 assert_eq!(&g.id, eid, "ranking matches brute force");
4353 assert!(
4354 (g.score - escore).abs() < 1e-5,
4355 "{} score {} vs {escore}",
4356 g.id,
4357 g.score
4358 );
4359 }
4360 }
4361
4362 #[test]
4363 fn multivector_search_truncates_to_k() {
4364 let tmp = tempfile::tempdir().unwrap();
4365 let mut db = open(tmp.path());
4366 db.create_collection("docs", mv_desc()).unwrap();
4367 for i in 0..5 {
4368 let v = vec![vec![1.0, i as f32, 0.0]];
4369 db.upsert_document("docs", &format!("d{i}"), &v, &json!({}))
4370 .unwrap();
4371 }
4372 let params = SearchParams {
4373 k: 2,
4374 ..SearchParams::default()
4375 };
4376 let got = db
4377 .search_multi_vector("docs", &[vec![1.0, 0.0, 0.0]], ¶ms)
4378 .unwrap();
4379 assert_eq!(got.len(), 2);
4380 }
4381
4382 #[test]
4383 fn multivector_filter_selects_documents_exactly() {
4384 let tmp = tempfile::tempdir().unwrap();
4385 let mut db = open(tmp.path());
4386 db.create_collection("docs", mv_desc()).unwrap();
4387 db.upsert_document("docs", "a", &[vec![1.0, 0.0, 0.0]], &json!({"lang":"en"}))
4389 .unwrap();
4390 db.upsert_document("docs", "b", &[vec![1.0, 0.0, 0.0]], &json!({"lang":"fr"}))
4391 .unwrap();
4392 let params = SearchParams {
4393 k: 10,
4394 filter: Some(Filter::Eq {
4395 field: "lang".into(),
4396 value: json!("fr"),
4397 }),
4398 ..SearchParams::default()
4399 };
4400 let got = db
4401 .search_multi_vector("docs", &[vec![1.0, 0.0, 0.0]], ¶ms)
4402 .unwrap();
4403 assert_eq!(got.len(), 1);
4404 assert_eq!(got[0].id, "b");
4405 assert_eq!(got[0].payload, Some(json!({"lang":"fr"})));
4406 }
4407
4408 #[test]
4409 fn multivector_reopen_rebuilds_grouping_and_ranking() {
4410 let tmp = tempfile::tempdir().unwrap();
4411 let query = vec![vec![1.0, 0.0, 0.0], vec![0.0, 0.0, 1.0]];
4412 let corpus: Vec<(&str, Vec<Vec<f32>>)> = vec![
4413 ("x", vec![vec![1.0, 0.0, 0.0], vec![0.0, 1.0, 0.0]]),
4414 ("y", vec![vec![0.0, 0.0, 1.0], vec![1.0, 0.0, 1.0]]),
4415 ];
4416 {
4417 let mut db = open(tmp.path());
4418 db.create_collection("docs", mv_desc()).unwrap();
4419 for (id, toks) in &corpus {
4420 db.upsert_document("docs", id, toks, &json!({})).unwrap();
4421 }
4422 db.checkpoint().unwrap();
4423 }
4424 let mut db = open(tmp.path());
4426 assert_eq!(db.document_count("docs").unwrap(), 2);
4427 let params = SearchParams {
4428 k: 2,
4429 ..SearchParams::default()
4430 };
4431 let got = db.search_multi_vector("docs", &query, ¶ms).unwrap();
4432 let expected = bf_rank(&query, &corpus);
4433 assert_eq!(
4434 got.iter().map(|m| m.id.clone()).collect::<Vec<_>>(),
4435 expected
4436 .iter()
4437 .map(|(id, _)| id.clone())
4438 .collect::<Vec<_>>()
4439 );
4440 }
4441
4442 #[test]
4443 fn multivector_delete_document_removes_all_tokens() {
4444 let tmp = tempfile::tempdir().unwrap();
4445 let mut db = open(tmp.path());
4446 db.create_collection("docs", mv_desc()).unwrap();
4447 db.upsert_document(
4448 "docs",
4449 "a",
4450 &[vec![1.0, 0.0, 0.0], vec![0.0, 1.0, 0.0]],
4451 &json!({}),
4452 )
4453 .unwrap();
4454 db.upsert_document("docs", "b", &[vec![0.0, 0.0, 1.0]], &json!({}))
4455 .unwrap();
4456 assert_eq!(db.document_count("docs").unwrap(), 2);
4457 assert_eq!(db.len("docs").unwrap(), 3);
4458
4459 assert!(db.delete_document("docs", "a").unwrap());
4460 assert_eq!(db.document_count("docs").unwrap(), 1);
4461 assert_eq!(db.len("docs").unwrap(), 1);
4462 assert!(db.get_document("docs", "a", false).unwrap().is_none());
4463 let params = SearchParams {
4464 k: 10,
4465 ..SearchParams::default()
4466 };
4467 let got = db
4468 .search_multi_vector("docs", &[vec![1.0, 0.0, 0.0]], ¶ms)
4469 .unwrap();
4470 assert!(got.iter().all(|m| m.id != "a"));
4471 assert!(!db.delete_document("docs", "a").unwrap());
4472 }
4473
4474 #[test]
4475 fn multivector_reupsert_replaces_tokens() {
4476 let tmp = tempfile::tempdir().unwrap();
4477 let mut db = open(tmp.path());
4478 db.create_collection("docs", mv_desc()).unwrap();
4479 db.upsert_document(
4480 "docs",
4481 "a",
4482 &[
4483 vec![1.0, 0.0, 0.0],
4484 vec![0.0, 1.0, 0.0],
4485 vec![0.0, 0.0, 1.0],
4486 ],
4487 &json!({"v":1}),
4488 )
4489 .unwrap();
4490 assert_eq!(db.len("docs").unwrap(), 3);
4491 db.upsert_document("docs", "a", &[vec![0.0, 0.0, 1.0]], &json!({"v":2}))
4493 .unwrap();
4494 assert_eq!(db.document_count("docs").unwrap(), 1);
4495 assert_eq!(db.len("docs").unwrap(), 1);
4496 let doc = db.get_document("docs", "a", true).unwrap().unwrap();
4497 assert_eq!(doc.payload, Some(json!({"v":2})));
4498 assert_eq!(doc.vectors, Some(vec![vec![0.0, 0.0, 1.0]]));
4499 }
4500
4501 #[test]
4502 fn single_and_multi_vector_apis_are_mutually_exclusive() {
4503 let tmp = tempfile::tempdir().unwrap();
4504 let mut db = open(tmp.path());
4505 db.create_collection("mv", mv_desc()).unwrap();
4506 db.create_collection("sv", Descriptor::new(3, Dtype::F32, DistanceMetric::Cosine))
4507 .unwrap();
4508 assert!(matches!(
4510 db.upsert("mv", "a", &[1.0, 0.0, 0.0], &json!({})),
4511 Err(Error::Unsupported(_))
4512 ));
4513 assert!(matches!(
4514 db.search("mv", &[1.0, 0.0, 0.0], &SearchParams::default()),
4515 Err(Error::Unsupported(_))
4516 ));
4517 assert!(matches!(
4519 db.upsert_document("sv", "a", &[vec![1.0, 0.0, 0.0]], &json!({})),
4520 Err(Error::Unsupported(_))
4521 ));
4522 assert!(matches!(
4523 db.search_multi_vector("sv", &[vec![1.0, 0.0, 0.0]], &SearchParams::default()),
4524 Err(Error::Unsupported(_))
4525 ));
4526 assert!(matches!(
4527 db.document_count("sv"),
4528 Err(Error::Unsupported(_))
4529 ));
4530 }
4531
4532 #[test]
4533 fn multivector_rejects_l2_metric_and_bad_documents() {
4534 let tmp = tempfile::tempdir().unwrap();
4535 let mut db = open(tmp.path());
4536 let l2 = Descriptor::new(3, Dtype::F32, DistanceMetric::L2).with_multivector(true);
4537 assert!(matches!(
4538 db.create_collection("bad", l2),
4539 Err(Error::Unsupported(_))
4540 ));
4541
4542 db.create_collection("docs", mv_desc()).unwrap();
4543 assert!(matches!(
4545 db.upsert_document("docs", "a\u{1f}b", &[vec![1.0, 0.0, 0.0]], &json!({})),
4546 Err(Error::Unsupported(_))
4547 ));
4548 assert!(matches!(
4550 db.upsert_document("docs", "a", &[], &json!({})),
4551 Err(Error::Unsupported(_))
4552 ));
4553 assert!(matches!(
4554 db.upsert_document("docs", "a", &[vec![1.0, 0.0]], &json!({})),
4555 Err(Error::Unsupported(_))
4556 ));
4557 }
4558
4559 #[test]
4560 fn snapshot_then_open_reproduces_the_database() {
4561 let src = tempfile::tempdir().unwrap();
4562 let mut db = open(src.path());
4563 db.create_collection("kb", desc()).unwrap();
4564 db.create_collection("kb2", desc()).unwrap();
4565 db.upsert("kb", "a", &[1.0, 0.0, 0.0, 0.0], &json!({ "n": 1 }))
4566 .unwrap();
4567 db.upsert("kb", "b", &[0.0, 1.0, 0.0, 0.0], &json!({ "n": 2 }))
4568 .unwrap();
4569 db.upsert("kb2", "z", &[0.0, 0.0, 1.0, 0.0], &json!({ "n": 3 }))
4570 .unwrap();
4571
4572 let dest = tempfile::tempdir().unwrap();
4573 let snap_dir = dest.path().join("snap");
4574 let info = db.snapshot(&snap_dir).unwrap();
4575 assert!(info.files > 0 && info.bytes > 0);
4576 assert_eq!(info.manifest_version, db.manifest_version());
4577
4578 db.upsert("kb", "late", &[1.0, 1.0, 0.0, 0.0], &json!({ "n": 9 }))
4580 .unwrap();
4581
4582 let restored = open(&snap_dir);
4583 let mut names = restored.collection_names();
4584 names.sort();
4585 assert_eq!(names, vec!["kb".to_owned(), "kb2".to_owned()]);
4586 assert_eq!(restored.len("kb").unwrap(), 2, "no post-snapshot write");
4587 assert_eq!(
4588 restored.get("kb", "a").unwrap().unwrap().payload,
4589 Some(json!({ "n": 1 }))
4590 );
4591 assert_eq!(restored.len("kb2").unwrap(), 1);
4592 assert!(restored.get("kb", "late").unwrap().is_none());
4593 }
4594
4595 #[test]
4596 fn snapshot_refuses_an_existing_destination() {
4597 let src = tempfile::tempdir().unwrap();
4598 let mut db = open(src.path());
4599 let dest = tempfile::tempdir().unwrap(); assert!(matches!(
4601 db.snapshot(dest.path()),
4602 Err(Error::Core(quiver_core::CoreError::AlreadyExists(_)))
4603 ));
4604 }
4605
4606 #[test]
4607 fn restore_snapshot_roundtrips_and_guards() {
4608 let src = tempfile::tempdir().unwrap();
4609 let mut db = open(src.path());
4610 db.create_collection("kb", desc()).unwrap();
4611 db.upsert("kb", "a", &[1.0, 0.0, 0.0, 0.0], &json!({ "n": 1 }))
4612 .unwrap();
4613 let work = tempfile::tempdir().unwrap();
4614 let snap_dir = work.path().join("snap");
4615 db.snapshot(&snap_dir).unwrap();
4616
4617 let restored_dir = work.path().join("restored");
4619 let info = restore_snapshot(&snap_dir, &restored_dir).unwrap();
4620 assert!(info.files > 0);
4621 let restored = open(&restored_dir);
4622 assert_eq!(restored.len("kb").unwrap(), 1);
4623
4624 assert!(matches!(
4626 restore_snapshot(&snap_dir, &restored_dir),
4627 Err(Error::Core(quiver_core::CoreError::AlreadyExists(_)))
4628 ));
4629 let not_snap = work.path().join("not-a-snapshot");
4631 std::fs::create_dir_all(¬_snap).unwrap();
4632 assert!(matches!(
4633 restore_snapshot(¬_snap, &work.path().join("out")),
4634 Err(Error::Core(quiver_core::CoreError::InvalidArgument(_)))
4635 ));
4636 }
4637}