1use std::collections::{BTreeMap, BTreeSet, HashMap};
46use std::path::Path;
47
48use quiver_core::{SecPredicate, SecValue, Store};
49use quiver_index::{
50 ColbertConfig, ColbertIndex, DiskVamana, FreshDiskVamana, FreshVamana, Hnsw, HnswConfig, Index,
51 Ivf, IvfConfig, Metric, Neighbor, ProductQuantizer, Vamana, VamanaConfig, max_sim,
52 ordering_distance, report_metric,
53};
54use serde::{Deserialize, Serialize};
55use serde_json::Value;
56use thiserror::Error;
57
58pub use quiver_core::keyring::{KeyRing, SingleCodecKeyRing};
59pub use quiver_core::page::PageCodec;
60pub use quiver_core::{CollectionId, CommitObserver, WalEntry, WalOp};
61pub use quiver_core::{
62 Descriptor, DistanceMetric, Dtype, FieldType, FilterableField, IndexKind, IndexSpec,
63 VectorEncryption,
64};
65pub use quiver_query::Filter;
66pub use quiver_query::{
67 BM25_B, BM25_K1, DEFAULT_RRF_K0, SPARSE_KEY, SparseInvertedIndex, SparseVector, TEXT_KEY,
68 query_term_ids, rrf_fuse, text_to_sparse,
69};
70
71#[derive(Debug, Error)]
73#[non_exhaustive]
74pub enum Error {
75 #[error(transparent)]
78 Core(#[from] quiver_core::CoreError),
79 #[error(transparent)]
81 Index(#[from] quiver_index::IndexError),
82 #[error(transparent)]
84 Disk(#[from] quiver_index::DiskError),
85 #[error("payload json error: {0}")]
87 Json(#[from] serde_json::Error),
88 #[error("collection not found: {0}")]
90 CollectionNotFound(String),
91 #[error("unsupported configuration: {0}")]
93 Unsupported(&'static str),
94 #[error(transparent)]
97 IndexSnapshot(#[from] quiver_index::SnapshotError),
98 #[error("index snapshot envelope: {0}")]
100 Envelope(#[from] postcard::Error),
101}
102
103pub type Result<T> = std::result::Result<T, Error>;
105
106#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
109pub struct SnapshotInfo {
110 pub manifest_version: u64,
112 pub files: u64,
114 pub bytes: u64,
116}
117
118#[derive(Debug, Clone, PartialEq)]
120pub struct Match {
121 pub id: String,
123 pub score: f32,
125 pub payload: Option<Value>,
127 pub vector: Option<Vec<f32>>,
129}
130
131#[derive(Debug, Clone, PartialEq)]
135pub struct DocumentMatch {
136 pub id: String,
138 pub score: f32,
140 pub payload: Option<Value>,
142 pub vectors: Option<Vec<Vec<f32>>>,
144}
145
146type ScoredDocument = (f32, String, Option<Value>, Option<Vec<Vec<f32>>>);
151
152#[derive(Debug, Clone)]
154pub struct SearchParams {
155 pub k: usize,
157 pub filter: Option<Filter>,
162 pub ef_search: usize,
164 pub with_payload: bool,
166 pub with_vector: bool,
168}
169
170impl Default for SearchParams {
171 fn default() -> Self {
172 Self {
173 k: 10,
174 filter: None,
175 ef_search: 64,
176 with_payload: true,
177 with_vector: false,
178 }
179 }
180}
181
182const FILTER_OVERFETCH: usize = 8;
185
186const RRF_CANDIDATE_FACTOR: usize = 10;
191const MIN_RRF_CANDIDATES: usize = 100;
192
193const FULL_SCAN_THRESHOLD: usize = 10_000;
201
202const HNSW_REBUILD_DELETED_FRACTION: f64 = 0.2;
206
207const GRAPH_REBUILD_PENDING_FRACTION: f64 = 0.2;
213
214enum CollectionIndex {
221 None,
225 Hnsw(Hnsw),
226 Vamana(Option<FreshVamana>),
227 Ivf(Option<Ivf>),
228 Disk(Option<FreshDiskVamana>),
232 Colbert(Option<ColbertIndex>),
236}
237
238impl CollectionIndex {
239 fn search(&self, query: &[f32], k: usize, ef: usize) -> Result<Vec<Neighbor>> {
241 Ok(match self {
242 CollectionIndex::Hnsw(h) => h.search(query, k, ef)?,
243 CollectionIndex::Vamana(Some(g)) => g.search(query, k, ef)?,
244 CollectionIndex::Ivf(Some(i)) => i.search(query, k, ef)?,
245 CollectionIndex::Disk(Some(d)) => d.search(query, k, ef)?,
246 CollectionIndex::Colbert(Some(c)) => c.search(query, k, ef)?,
247 CollectionIndex::None
248 | CollectionIndex::Vamana(None)
249 | CollectionIndex::Ivf(None)
250 | CollectionIndex::Disk(None)
251 | CollectionIndex::Colbert(None) => Vec::new(),
252 })
253 }
254}
255
256#[derive(Serialize, Deserialize)]
262struct IndexEnvelope {
263 version: u16,
264 int_to_ext: Vec<String>,
265 ivf: Vec<u8>,
266}
267
268const INDEX_ENVELOPE_VERSION: u16 = 1;
271
272struct CollectionHandle {
273 id: CollectionId,
274 descriptor: Descriptor,
275 index: CollectionIndex,
276 int_to_ext: Vec<String>,
277 ext_to_int: HashMap<String, u64>,
278 stale: bool,
279 write_gen: u64,
287 docs: Option<BTreeMap<String, u32>>,
293 sparse: Option<SparseInvertedIndex>,
300}
301
302fn uses_sparse_index(descriptor: &Descriptor) -> bool {
305 !descriptor.multivector && descriptor.vector_encryption != VectorEncryption::ClientSide
306}
307
308fn mark_stale(handle: &mut CollectionHandle) {
312 handle.stale = true;
313 bump_write_gen(handle);
314}
315
316fn bump_write_gen(handle: &mut CollectionHandle) {
325 handle.write_gen = handle.write_gen.wrapping_add(1);
326}
327
328pub struct Database {
330 store: Store,
331 collections: HashMap<String, CollectionHandle>,
332}
333
334impl Database {
335 pub fn open(dir: &Path) -> Result<Self> {
338 Self::from_store(Store::open(dir)?)
339 }
340
341 pub fn open_with_codec(dir: &Path, codec: Box<dyn PageCodec>) -> Result<Self> {
346 Self::from_store(Store::open_with_codec(dir, codec)?)
347 }
348
349 pub fn open_with_keyring(dir: &Path, keyring: Box<dyn KeyRing>) -> Result<Self> {
354 Self::from_store(Store::open_with_keyring(dir, keyring)?)
355 }
356
357 fn from_store(store: Store) -> Result<Self> {
359 let mut collections = HashMap::new();
360 for name in store.collection_names() {
361 let Some(id) = store.collection_id(&name) else {
362 continue;
363 };
364 let Some(descriptor) = store.descriptor(id).cloned() else {
365 continue;
366 };
367 let mut handle = CollectionHandle {
368 id,
369 index: empty_index(&descriptor),
370 descriptor,
371 int_to_ext: Vec::new(),
372 ext_to_int: HashMap::new(),
373 stale: true,
374 write_gen: 0,
375 docs: None,
376 sparse: None,
378 };
379 load_index(&store, &mut handle)?;
380 collections.insert(name, handle);
381 }
382 Ok(Self { store, collections })
383 }
384
385 pub fn create_collection(&mut self, name: &str, descriptor: Descriptor) -> Result<()> {
388 validate_index(&descriptor)?;
389 let id = self.store.create_collection(name, descriptor.clone())?;
390 let index = empty_index(&descriptor);
391 let docs = descriptor.multivector.then(BTreeMap::new);
392 let sparse = uses_sparse_index(&descriptor).then(SparseInvertedIndex::new);
396 self.collections.insert(
397 name.to_owned(),
398 CollectionHandle {
399 id,
400 descriptor,
401 index,
402 int_to_ext: Vec::new(),
403 ext_to_int: HashMap::new(),
404 stale: false,
405 write_gen: 0,
406 docs,
407 sparse,
408 },
409 );
410 Ok(())
411 }
412
413 pub fn drop_collection(&mut self, name: &str) -> Result<bool> {
415 let existed = self.store.drop_collection(name)?;
416 self.collections.remove(name);
417 Ok(existed)
418 }
419
420 pub fn shred_collection(&mut self, name: &str) -> Result<bool> {
426 let existed = self.store.shred_collection(name)?;
427 self.collections.remove(name);
428 Ok(existed)
429 }
430
431 pub fn set_commit_observer(&mut self, observer: CommitObserver) {
435 self.store.set_commit_observer(observer);
436 }
437
438 pub fn replication_snapshot(&self) -> Result<Vec<WalOp>> {
444 Ok(self.store.replication_snapshot()?)
445 }
446
447 pub fn apply_replicated(&mut self, op: WalOp) -> Result<()> {
456 let target = match &op {
457 WalOp::CreateCollection { collection_id, .. }
458 | WalOp::DropCollection { collection_id }
459 | WalOp::Upsert { collection_id, .. }
460 | WalOp::Delete { collection_id, .. } => Some(*collection_id),
461 WalOp::Checkpoint { .. } => None,
462 };
463 let create_name = match &op {
464 WalOp::CreateCollection { name, .. } => Some(name.clone()),
465 _ => None,
466 };
467 let is_drop = matches!(op, WalOp::DropCollection { .. });
468 self.store.apply_replicated(op)?;
469
470 if let Some(name) = create_name {
471 if let Some(id) = target
473 && let Some(descriptor) = self.store.descriptor(id).cloned()
474 {
475 let docs = descriptor.multivector.then(BTreeMap::new);
476 let index = empty_index(&descriptor);
477 self.collections.insert(
480 name,
481 CollectionHandle {
482 id,
483 descriptor,
484 index,
485 int_to_ext: Vec::new(),
486 ext_to_int: HashMap::new(),
487 stale: false,
488 write_gen: 0,
489 docs,
490 sparse: None,
491 },
492 );
493 }
494 } else if is_drop {
495 if let Some(id) = target {
496 self.collections.retain(|_, h| h.id != id);
497 }
498 } else if let Some(id) = target
499 && let Some(handle) = self.collections.values_mut().find(|h| h.id == id)
500 {
501 mark_stale(handle);
502 }
503 Ok(())
504 }
505
506 #[must_use]
508 pub fn collection_names(&self) -> Vec<String> {
509 self.store.collection_names()
510 }
511
512 #[must_use]
514 pub fn descriptor(&self, name: &str) -> Option<&Descriptor> {
515 self.collections.get(name).map(|h| &h.descriptor)
516 }
517
518 pub fn len(&self, name: &str) -> Result<usize> {
520 let handle = self.handle(name)?;
521 Ok(self.store.len(handle.id)?)
522 }
523
524 pub fn is_empty(&self, name: &str) -> Result<bool> {
526 Ok(self.len(name)? == 0)
527 }
528
529 pub fn upsert(
531 &mut self,
532 collection: &str,
533 id: &str,
534 vector: &[f32],
535 payload: &Value,
536 ) -> Result<()> {
537 let handle = self
538 .collections
539 .get_mut(collection)
540 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
541 require_single_vector(handle)?;
542 let payload_bytes = serde_json::to_vec(payload)?;
543 self.store.upsert(handle.id, id, vector, &payload_bytes)?;
544 if handle.descriptor.vector_encryption == VectorEncryption::ClientSide {
548 return Ok(());
549 }
550 index_upsert_point(handle, id, vector)?;
553 sparse_index_upsert_point(handle, id, payload);
555 Ok(())
556 }
557
558 pub fn upsert_batch(
565 &mut self,
566 collection: &str,
567 points: &[(&str, &[f32], &serde_json::Value)],
568 ) -> Result<u64> {
569 let handle = self
570 .collections
571 .get(collection)
572 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
573 require_single_vector(handle)?;
574 let coll_id = handle.id;
575 let is_client_side = handle.descriptor.vector_encryption == VectorEncryption::ClientSide;
576
577 let payload_bytes: Vec<Vec<u8>> = points
578 .iter()
579 .map(|(_, _, p)| serde_json::to_vec(p).map_err(Error::Json))
580 .collect::<Result<_>>()?;
581
582 let records: Vec<(&str, &[f32], &[u8])> = points
583 .iter()
584 .zip(payload_bytes.iter())
585 .map(|((id, vec, _), p)| (*id, *vec, p.as_slice()))
586 .collect();
587
588 self.store.upsert_batch(coll_id, &records)?;
589
590 if is_client_side {
591 return Ok(records.len() as u64);
592 }
593
594 for (id, vector, payload) in points {
595 let handle = self
596 .collections
597 .get_mut(collection)
598 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
599 index_upsert_point(handle, id, vector)?;
600 sparse_index_upsert_point(handle, id, payload);
601 }
602 Ok(records.len() as u64)
603 }
604
605 pub fn upsert_bulk(
616 &mut self,
617 collection: &str,
618 points: &[(&str, &[f32], &serde_json::Value)],
619 ) -> Result<u64> {
620 let handle = self
621 .collections
622 .get_mut(collection)
623 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
624 require_single_vector(handle)?;
625 let coll_id = handle.id;
626
627 let payload_bytes: Vec<Vec<u8>> = points
628 .iter()
629 .map(|(_, _, p)| serde_json::to_vec(p).map_err(Error::Json))
630 .collect::<Result<_>>()?;
631 let records: Vec<(&str, &[f32], &[u8])> = points
632 .iter()
633 .zip(payload_bytes.iter())
634 .map(|((id, vec, _), p)| (*id, *vec, p.as_slice()))
635 .collect();
636
637 self.store.upsert_batch(coll_id, &records)?;
638
639 let handle = self
643 .collections
644 .get_mut(collection)
645 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
646 mark_stale(handle);
647 Ok(records.len() as u64)
648 }
649
650 pub fn delete(&mut self, collection: &str, id: &str) -> Result<bool> {
652 let handle = self
653 .collections
654 .get_mut(collection)
655 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
656 require_single_vector(handle)?;
657 let existed = self.store.delete(handle.id, id)?;
658 if !existed {
659 return Ok(false);
660 }
661 if handle.descriptor.vector_encryption == VectorEncryption::ClientSide {
664 return Ok(true);
665 }
666 index_delete_point(handle, id);
672 sparse_index_delete_point(handle, id);
674 Ok(true)
675 }
676
677 pub fn get(&self, collection: &str, id: &str) -> Result<Option<Match>> {
679 let handle = self.handle(collection)?;
680 require_single_vector(handle)?;
681 match self.store.get(handle.id, id)? {
682 Some(record) => Ok(Some(Match {
683 id: id.to_owned(),
684 score: 0.0,
685 payload: Some(serde_json::from_slice(&record.payload)?),
686 vector: Some(record.vector),
687 })),
688 None => Ok(None),
689 }
690 }
691
692 pub fn fetch(
706 &self,
707 collection: &str,
708 filter: Option<&Filter>,
709 limit: usize,
710 with_payload: bool,
711 with_vector: bool,
712 ) -> Result<Vec<Match>> {
713 let handle = self.handle(collection)?;
714 require_single_vector(handle)?;
715 let mut out = Vec::new();
716 for (id, record) in self.store.scan(handle.id)? {
717 if out.len() >= limit {
718 break;
719 }
720 let payload: Value = serde_json::from_slice(&record.payload)?;
721 if let Some(filter) = filter
722 && !filter.matches(&payload)
723 {
724 continue;
725 }
726 out.push(Match {
727 id,
728 score: 0.0,
729 payload: with_payload.then_some(payload),
730 vector: with_vector.then_some(record.vector),
731 });
732 }
733 Ok(out)
734 }
735
736 pub fn ensure_indexed(&mut self, collection: &str) -> Result<()> {
742 if self.handle(collection)?.stale {
743 let store = &self.store;
744 let handle = self
745 .collections
746 .get_mut(collection)
747 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
748 rebuild_index(store, handle)?;
749 }
750 Ok(())
751 }
752
753 pub fn needs_rebuild(&self, collection: &str) -> Result<bool> {
758 Ok(self.handle(collection)?.stale)
759 }
760
761 pub fn snapshot_rebuild_inputs(&self, collection: &str) -> Result<Option<RebuildInputs>> {
767 let handle = self.handle(collection)?;
768 if !handle.stale {
769 return Ok(None);
770 }
771 let scan = scan_collection(&self.store, handle)?;
772 Ok(Some(RebuildInputs {
773 collection: collection.to_owned(),
774 descriptor: handle.descriptor.clone(),
775 scan,
776 write_gen: handle.write_gen,
777 }))
778 }
779
780 pub fn commit_rebuild(&mut self, rebuilt: RebuiltIndex) -> Result<bool> {
787 let store = &self.store;
788 let Some(handle) = self.collections.get_mut(&rebuilt.collection) else {
789 return Ok(false);
790 };
791 match rebuilt.kind {
792 RebuiltKind::Ready(index) => handle.index = *index,
793 RebuiltKind::Disk { graph, pq } => {
794 handle.index = empty_index(&handle.descriptor);
798 let disk = write_disk_index(store, handle.id, &graph, &pq)?;
799 handle.index = CollectionIndex::Disk(Some(FreshDiskVamana::new(disk)?));
800 }
801 }
802 handle.int_to_ext = rebuilt.int_to_ext;
803 handle.ext_to_int = rebuilt.ext_to_int;
804 handle.docs = rebuilt.docs;
805 handle.sparse = rebuilt.sparse;
806 let still_stale = handle.write_gen != rebuilt.write_gen;
809 handle.stale = still_stale;
810 Ok(still_stale)
811 }
812
813 pub fn search(
816 &mut self,
817 collection: &str,
818 query: &[f32],
819 params: &SearchParams,
820 ) -> Result<Vec<Match>> {
821 self.ensure_indexed(collection)?;
826 self.search_snapshot(collection, query, params)
827 }
828
829 pub fn search_snapshot(
836 &self,
837 collection: &str,
838 query: &[f32],
839 params: &SearchParams,
840 ) -> Result<Vec<Match>> {
841 require_single_vector(self.handle(collection)?)?;
842 require_server_searchable(self.handle(collection)?)?;
843
844 let handle = self.handle(collection)?;
845
846 if let Some(filter) = ¶ms.filter
850 && let Some(candidates) = candidate_ids(
851 &self.store,
852 handle.id,
853 filter,
854 &handle.descriptor.filterable,
855 )?
856 && candidates.len() <= FULL_SCAN_THRESHOLD
857 {
858 return self.exact_filtered_search(
859 handle.id,
860 &handle.descriptor,
861 query,
862 params,
863 filter,
864 &candidates,
865 );
866 }
867
868 let fetch = if params.filter.is_some() {
869 params
870 .k
871 .saturating_mul(FILTER_OVERFETCH)
872 .max(params.ef_search)
873 } else {
874 params.k
875 };
876 let raw = handle.index.search(query, fetch, params.ef_search)?;
877
878 let need_record = params.filter.is_some() || params.with_payload || params.with_vector;
879 let mut out = Vec::with_capacity(params.k);
880 for neighbor in raw {
881 if out.len() >= params.k {
882 break;
883 }
884 let Some(ext_id) = handle.int_to_ext.get(neighbor.id as usize) else {
885 continue;
886 };
887 let record = if need_record {
888 self.store.get(handle.id, ext_id)?
889 } else {
890 None
891 };
892 let payload_value: Option<Value> = match &record {
893 Some(r) if params.filter.is_some() || params.with_payload => {
894 Some(serde_json::from_slice(&r.payload)?)
895 }
896 _ => None,
897 };
898 if let Some(filter) = ¶ms.filter {
899 let value = payload_value.as_ref().unwrap_or(&Value::Null);
900 if !filter.matches(value) {
901 continue;
902 }
903 }
904 out.push(Match {
905 id: ext_id.clone(),
906 score: neighbor.distance,
907 payload: if params.with_payload {
908 payload_value
909 } else {
910 None
911 },
912 vector: if params.with_vector {
913 record.map(|r| r.vector)
914 } else {
915 None
916 },
917 });
918 }
919 Ok(out)
920 }
921
922 pub fn hybrid_search(
930 &mut self,
931 collection: &str,
932 dense_query: Option<&[f32]>,
933 sparse_query: Option<&SparseVector>,
934 text_query: Option<&str>,
935 params: &SearchParams,
936 rrf_k0: f32,
937 ) -> Result<Vec<Match>> {
938 self.ensure_indexed(collection)?;
942 self.hybrid_search_snapshot(
943 collection,
944 dense_query,
945 sparse_query,
946 text_query,
947 params,
948 rrf_k0,
949 )
950 }
951
952 pub fn hybrid_search_snapshot(
957 &self,
958 collection: &str,
959 dense_query: Option<&[f32]>,
960 sparse_query: Option<&SparseVector>,
961 text_query: Option<&str>,
962 params: &SearchParams,
963 rrf_k0: f32,
964 ) -> Result<Vec<Match>> {
965 require_single_vector(self.handle(collection)?)?;
966 require_server_searchable(self.handle(collection)?)?;
967 if dense_query.is_none() && sparse_query.is_none() && text_query.is_none() {
968 return Err(Error::Unsupported(
969 "hybrid_search requires a dense query, a sparse query, or a text query",
970 ));
971 }
972 let handle = self.handle(collection)?;
973
974 let depth = params
977 .k
978 .saturating_mul(RRF_CANDIDATE_FACTOR)
979 .max(MIN_RRF_CANDIDATES);
980 let filter = params.filter.as_ref();
981 let mut lists: Vec<Vec<String>> = Vec::new();
982 if let Some(q) = dense_query {
983 lists.push(self.dense_ranked_ids(handle, q, depth, params.ef_search, filter)?);
984 }
985 if let Some(sp) = sparse_query {
986 lists.push(self.sparse_ranked_ids(handle, sp, depth, filter)?);
987 }
988 if let Some(text) = text_query {
989 lists.push(self.bm25_ranked_ids(handle, text, depth, filter)?);
990 }
991 let fused = rrf_fuse(&lists, rrf_k0, params.k);
992
993 let mut out = Vec::with_capacity(fused.len());
994 for (ext_id, score) in fused {
995 let record = if params.with_payload || params.with_vector {
996 self.store.get(handle.id, &ext_id)?
997 } else {
998 None
999 };
1000 let payload = match (&record, params.with_payload) {
1001 (Some(r), true) => Some(serde_json::from_slice(&r.payload)?),
1002 _ => None,
1003 };
1004 out.push(Match {
1005 id: ext_id,
1006 score,
1007 payload,
1008 vector: if params.with_vector {
1009 record.map(|r| r.vector)
1010 } else {
1011 None
1012 },
1013 });
1014 }
1015 Ok(out)
1016 }
1017
1018 fn dense_ranked_ids(
1021 &self,
1022 handle: &CollectionHandle,
1023 query: &[f32],
1024 depth: usize,
1025 ef_search: usize,
1026 filter: Option<&Filter>,
1027 ) -> Result<Vec<String>> {
1028 let raw = handle.index.search(query, depth, ef_search.max(depth))?;
1029 let mut ids = Vec::new();
1030 for neighbor in raw {
1031 let Some(ext_id) = handle.int_to_ext.get(neighbor.id as usize) else {
1032 continue;
1033 };
1034 if !self.passes_filter(handle.id, ext_id, filter)? {
1035 continue;
1036 }
1037 ids.push(ext_id.clone());
1038 if ids.len() >= depth {
1039 break;
1040 }
1041 }
1042 Ok(ids)
1043 }
1044
1045 fn sparse_ranked_ids(
1052 &self,
1053 handle: &CollectionHandle,
1054 query: &SparseVector,
1055 depth: usize,
1056 filter: Option<&Filter>,
1057 ) -> Result<Vec<String>> {
1058 if let Some(idx) = handle.sparse.as_ref() {
1059 let mut ids = Vec::new();
1060 for (ext_id, _score) in idx.search(query) {
1061 if !self.passes_filter(handle.id, &ext_id, filter)? {
1062 continue;
1063 }
1064 ids.push(ext_id);
1065 if ids.len() >= depth {
1066 break;
1067 }
1068 }
1069 return Ok(ids);
1070 }
1071 self.sparse_ranked_ids_by_scan(handle.id, query, depth, filter)
1072 }
1073
1074 fn sparse_ranked_ids_by_scan(
1078 &self,
1079 cid: CollectionId,
1080 query: &SparseVector,
1081 depth: usize,
1082 filter: Option<&Filter>,
1083 ) -> Result<Vec<String>> {
1084 let qmap: HashMap<u32, f32> = query
1085 .indices
1086 .iter()
1087 .copied()
1088 .zip(query.values.iter().copied())
1089 .collect();
1090 let mut scored: Vec<(f32, String)> = Vec::new();
1091 for (ext_id, record) in self.store.scan(cid)? {
1092 if record.payload.is_empty() {
1093 continue;
1094 }
1095 let Ok(value) = serde_json::from_slice::<Value>(&record.payload) else {
1096 continue;
1097 };
1098 if let Some(filter) = filter
1099 && !filter.matches(&value)
1100 {
1101 continue;
1102 }
1103 let Some(raw) = value.get(SPARSE_KEY) else {
1104 continue;
1105 };
1106 let Ok(sv) = serde_json::from_value::<SparseVector>(raw.clone()) else {
1107 continue;
1108 };
1109 let mut score = 0.0f32;
1110 for (dim, weight) in sv.indices.iter().zip(sv.values.iter()) {
1111 if let Some(qw) = qmap.get(dim) {
1112 score += qw * weight;
1113 }
1114 }
1115 if score > 0.0 {
1116 scored.push((score, ext_id));
1117 }
1118 }
1119 scored.sort_by(|a, b| b.0.total_cmp(&a.0).then(a.1.cmp(&b.1)));
1120 Ok(scored.into_iter().take(depth).map(|(_, id)| id).collect())
1121 }
1122
1123 fn bm25_ranked_ids(
1131 &self,
1132 handle: &CollectionHandle,
1133 query_text: &str,
1134 depth: usize,
1135 filter: Option<&Filter>,
1136 ) -> Result<Vec<String>> {
1137 let Some(idx) = handle.sparse.as_ref() else {
1138 return Ok(Vec::new());
1139 };
1140 let terms = query_term_ids(query_text);
1141 let mut ids = Vec::new();
1142 for (ext_id, _score) in idx.bm25_search(&terms, BM25_K1, BM25_B) {
1143 if !self.passes_filter(handle.id, &ext_id, filter)? {
1144 continue;
1145 }
1146 ids.push(ext_id);
1147 if ids.len() >= depth {
1148 break;
1149 }
1150 }
1151 Ok(ids)
1152 }
1153
1154 fn passes_filter(
1157 &self,
1158 cid: CollectionId,
1159 ext_id: &str,
1160 filter: Option<&Filter>,
1161 ) -> Result<bool> {
1162 let Some(filter) = filter else {
1163 return Ok(true);
1164 };
1165 let value: Value = match self.store.get(cid, ext_id)? {
1166 Some(r) => serde_json::from_slice(&r.payload)?,
1167 None => Value::Null,
1168 };
1169 Ok(filter.matches(&value))
1170 }
1171
1172 fn exact_filtered_search(
1177 &self,
1178 cid: CollectionId,
1179 descriptor: &Descriptor,
1180 query: &[f32],
1181 params: &SearchParams,
1182 filter: &Filter,
1183 candidates: &BTreeSet<String>,
1184 ) -> Result<Vec<Match>> {
1185 let metric = to_index_metric(descriptor.metric);
1186 let mut scored: Vec<(f32, String, Value, Vec<f32>)> = Vec::new();
1187 for ext_id in candidates {
1188 let Some(record) = self.store.get(cid, ext_id)? else {
1189 continue;
1190 };
1191 let payload: Value = serde_json::from_slice(&record.payload)?;
1192 if !filter.matches(&payload) {
1193 continue;
1194 }
1195 let ordering = ordering_distance(metric, query, &record.vector);
1196 scored.push((ordering, ext_id.clone(), payload, record.vector));
1197 }
1198 scored.sort_by(|a, b| a.0.total_cmp(&b.0));
1199 scored.truncate(params.k);
1200 Ok(scored
1201 .into_iter()
1202 .map(|(ordering, id, payload, vector)| Match {
1203 id,
1204 score: report_metric(metric, ordering),
1205 payload: params.with_payload.then_some(payload),
1206 vector: params.with_vector.then_some(vector),
1207 })
1208 .collect())
1209 }
1210
1211 pub fn upsert_document(
1220 &mut self,
1221 collection: &str,
1222 doc_id: &str,
1223 vectors: &[Vec<f32>],
1224 payload: &Value,
1225 ) -> Result<()> {
1226 let handle = self
1227 .collections
1228 .get_mut(collection)
1229 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
1230 require_multivector(handle)?;
1231 if doc_id.contains(DOC_TOKEN_SEP) {
1232 return Err(Error::Unsupported(
1233 "document id must not contain the reserved 0x1f separator",
1234 ));
1235 }
1236 if vectors.is_empty() {
1237 return Err(Error::Unsupported("a document needs at least one vector"));
1238 }
1239 let dim = handle.descriptor.dim as usize;
1240 if vectors.iter().any(|v| v.len() != dim) {
1241 return Err(Error::Unsupported(
1242 "every document vector must match the collection dimensionality",
1243 ));
1244 }
1245 let previous = handle
1248 .docs
1249 .as_ref()
1250 .and_then(|d| d.get(doc_id))
1251 .copied()
1252 .unwrap_or(0) as usize;
1253 for j in vectors.len()..previous {
1254 self.store.delete(handle.id, &token_id(doc_id, j))?;
1255 index_delete_point(handle, &token_id(doc_id, j));
1257 }
1258 let payload_bytes = serde_json::to_vec(payload)?;
1259 for (j, vector) in vectors.iter().enumerate() {
1260 let bytes: &[u8] = if j == 0 {
1262 payload_bytes.as_slice()
1263 } else {
1264 &[]
1265 };
1266 self.store
1267 .upsert(handle.id, &token_id(doc_id, j), vector, bytes)?;
1268 index_upsert_point(handle, &token_id(doc_id, j), vector)?;
1272 }
1273 if let Some(docs) = handle.docs.as_mut() {
1274 docs.insert(doc_id.to_owned(), vectors.len() as u32);
1275 }
1276 Ok(())
1277 }
1278
1279 pub fn search_multi_vector(
1287 &mut self,
1288 collection: &str,
1289 query_tokens: &[Vec<f32>],
1290 params: &SearchParams,
1291 ) -> Result<Vec<DocumentMatch>> {
1292 self.ensure_indexed(collection)?;
1296 self.search_multi_vector_snapshot(collection, query_tokens, params)
1297 }
1298
1299 pub fn search_multi_vector_snapshot(
1306 &self,
1307 collection: &str,
1308 query_tokens: &[Vec<f32>],
1309 params: &SearchParams,
1310 ) -> Result<Vec<DocumentMatch>> {
1311 require_multivector(self.handle(collection)?)?;
1312 let dim = self.handle(collection)?.descriptor.dim as usize;
1313 if query_tokens.is_empty() {
1314 return Ok(Vec::new());
1315 }
1316 if query_tokens.iter().any(|v| v.len() != dim) {
1317 return Err(Error::Unsupported(
1318 "every query token must match the collection dimensionality",
1319 ));
1320 }
1321
1322 let doc_count = self
1323 .handle(collection)?
1324 .docs
1325 .as_ref()
1326 .map_or(0, BTreeMap::len);
1327 let candidates: Vec<String> = if doc_count <= MULTIVECTOR_EXACT_DOC_THRESHOLD {
1328 self.handle(collection)?
1330 .docs
1331 .as_ref()
1332 .map(|d| d.keys().cloned().collect())
1333 .unwrap_or_default()
1334 } else {
1335 let handle = self.handle(collection)?;
1339 let per_token_k = params
1340 .k
1341 .saturating_mul(MULTIVECTOR_CANDIDATE_FACTOR)
1342 .max(params.ef_search);
1343 let mut set = BTreeSet::new();
1344 for token in query_tokens {
1345 for neighbor in handle.index.search(token, per_token_k, params.ef_search)? {
1346 if let Some(ext) = handle.int_to_ext.get(neighbor.id as usize)
1347 && let Some((doc, _)) = parse_token_id(ext)
1348 {
1349 set.insert(doc.to_owned());
1350 }
1351 }
1352 }
1353 set.into_iter().collect()
1354 };
1355
1356 let handle = self.handle(collection)?;
1358 let cid = handle.id;
1359 let metric = to_index_metric(handle.descriptor.metric);
1360 let mut scored: Vec<ScoredDocument> = Vec::new();
1361 for doc in &candidates {
1362 let count = handle
1363 .docs
1364 .as_ref()
1365 .and_then(|d| d.get(doc))
1366 .copied()
1367 .unwrap_or(0) as usize;
1368 let (tokens, payload) = self.gather_document(cid, doc, count)?;
1369 if tokens.is_empty() {
1370 continue;
1371 }
1372 if let Some(filter) = ¶ms.filter {
1373 let value = payload.clone().unwrap_or(Value::Null);
1374 if !filter.matches(&value) {
1375 continue;
1376 }
1377 }
1378 let score = max_sim(metric, query_tokens, &tokens);
1379 let vectors = params.with_vector.then_some(tokens);
1380 scored.push((score, doc.clone(), payload, vectors));
1381 }
1382 scored.sort_by(|a, b| b.0.total_cmp(&a.0).then_with(|| a.1.cmp(&b.1)));
1384 scored.truncate(params.k);
1385 Ok(scored
1386 .into_iter()
1387 .map(|(score, id, payload, vectors)| DocumentMatch {
1388 id,
1389 score,
1390 payload: params.with_payload.then_some(payload).flatten(),
1391 vectors,
1392 })
1393 .collect())
1394 }
1395
1396 pub fn get_document(
1399 &self,
1400 collection: &str,
1401 doc_id: &str,
1402 with_vectors: bool,
1403 ) -> Result<Option<DocumentMatch>> {
1404 let handle = self.handle(collection)?;
1405 require_multivector(handle)?;
1406 let Some(&count) = handle.docs.as_ref().and_then(|d| d.get(doc_id)) else {
1407 return Ok(None);
1408 };
1409 let (tokens, payload) = self.gather_document(handle.id, doc_id, count as usize)?;
1410 if tokens.is_empty() {
1411 return Ok(None);
1412 }
1413 Ok(Some(DocumentMatch {
1414 id: doc_id.to_owned(),
1415 score: 0.0,
1416 payload,
1417 vectors: with_vectors.then_some(tokens),
1418 }))
1419 }
1420
1421 pub fn delete_document(&mut self, collection: &str, doc_id: &str) -> Result<bool> {
1424 let handle = self
1425 .collections
1426 .get_mut(collection)
1427 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
1428 require_multivector(handle)?;
1429 let Some(count) = handle.docs.as_ref().and_then(|d| d.get(doc_id)).copied() else {
1430 return Ok(false);
1431 };
1432 for j in 0..count as usize {
1433 self.store.delete(handle.id, &token_id(doc_id, j))?;
1434 index_delete_point(handle, &token_id(doc_id, j));
1436 }
1437 if let Some(docs) = handle.docs.as_mut() {
1438 docs.remove(doc_id);
1439 }
1440 Ok(true)
1441 }
1442
1443 pub fn document_count(&self, collection: &str) -> Result<usize> {
1446 let handle = self.handle(collection)?;
1447 require_multivector(handle)?;
1448 Ok(handle.docs.as_ref().map_or(0, BTreeMap::len))
1449 }
1450
1451 fn gather_document(
1455 &self,
1456 cid: CollectionId,
1457 doc_id: &str,
1458 count: usize,
1459 ) -> Result<(Vec<Vec<f32>>, Option<Value>)> {
1460 let mut tokens = Vec::with_capacity(count);
1461 let mut payload: Option<Value> = None;
1462 for j in 0..count {
1463 let Some(record) = self.store.get(cid, &token_id(doc_id, j))? else {
1464 continue;
1465 };
1466 if j == 0 && !record.payload.is_empty() {
1467 payload = Some(serde_json::from_slice(&record.payload)?);
1468 }
1469 tokens.push(record.vector);
1470 }
1471 Ok((tokens, payload))
1472 }
1473
1474 pub fn checkpoint(&mut self) -> Result<()> {
1479 let mut snapshots: HashMap<CollectionId, Vec<u8>> = HashMap::new();
1480 for handle in self.collections.values() {
1481 if handle.stale {
1482 continue;
1483 }
1484 if let CollectionIndex::Ivf(Some(ivf)) = &handle.index {
1485 if ivf.is_empty() {
1486 continue;
1487 }
1488 let envelope = IndexEnvelope {
1489 version: INDEX_ENVELOPE_VERSION,
1490 int_to_ext: handle.int_to_ext.clone(),
1491 ivf: ivf.snapshot()?,
1492 };
1493 snapshots.insert(handle.id, postcard::to_allocvec(&envelope)?);
1494 }
1495 }
1496 self.store.checkpoint_with_index_snapshots(&snapshots)?;
1497 Ok(())
1498 }
1499
1500 pub fn compact(&mut self) -> Result<()> {
1504 Ok(self.store.compact()?)
1505 }
1506
1507 #[must_use]
1510 pub fn manifest_version(&self) -> u64 {
1511 self.store.manifest_version()
1512 }
1513
1514 #[must_use]
1517 pub fn disk_usage_bytes(&self) -> u64 {
1518 dir_size(self.store.dir())
1519 }
1520
1521 pub fn snapshot(&mut self, dest: &Path) -> Result<SnapshotInfo> {
1533 if dest.exists() {
1534 return Err(Error::Core(quiver_core::CoreError::AlreadyExists(
1535 dest.display().to_string(),
1536 )));
1537 }
1538 self.checkpoint()?;
1541 let (files, bytes) = copy_tree(self.store.dir(), dest)?;
1542 let _ = std::fs::File::open(dest).and_then(|f| f.sync_all());
1546 Ok(SnapshotInfo {
1547 manifest_version: self.store.manifest_version(),
1548 files,
1549 bytes,
1550 })
1551 }
1552
1553 fn handle(&self, name: &str) -> Result<&CollectionHandle> {
1554 self.collections
1555 .get(name)
1556 .ok_or_else(|| Error::CollectionNotFound(name.to_owned()))
1557 }
1558}
1559
1560pub fn restore_snapshot(src: &Path, dest: &Path) -> Result<SnapshotInfo> {
1573 if dest.exists() {
1574 return Err(Error::Core(quiver_core::CoreError::AlreadyExists(
1575 dest.display().to_string(),
1576 )));
1577 }
1578 if !src.join("CURRENT").exists() {
1579 return Err(Error::Core(quiver_core::CoreError::InvalidArgument(
1580 format!("{} is not a snapshot (no CURRENT)", src.display()),
1581 )));
1582 }
1583 let (files, bytes) = copy_tree(src, dest)?;
1584 Ok(SnapshotInfo {
1585 manifest_version: 0,
1589 files,
1590 bytes,
1591 })
1592}
1593
1594fn copy_tree(src: &Path, dst: &Path) -> Result<(u64, u64)> {
1598 std::fs::create_dir_all(dst).map_err(|e| quiver_core::CoreError::io(dst, e))?;
1599 let mut files = 0u64;
1600 let mut bytes = 0u64;
1601 for entry in std::fs::read_dir(src).map_err(|e| quiver_core::CoreError::io(src, e))? {
1602 let entry = entry.map_err(|e| quiver_core::CoreError::io(src, e))?;
1603 let from = entry.path();
1604 let to = dst.join(entry.file_name());
1605 let ft = entry
1606 .file_type()
1607 .map_err(|e| quiver_core::CoreError::io(&from, e))?;
1608 if ft.is_dir() {
1609 let (f, b) = copy_tree(&from, &to)?;
1610 files += f;
1611 bytes += b;
1612 } else {
1613 let n = std::fs::copy(&from, &to).map_err(|e| quiver_core::CoreError::io(&from, e))?;
1614 files += 1;
1615 bytes += n;
1616 }
1617 }
1618 Ok((files, bytes))
1619}
1620
1621fn dir_size(dir: &Path) -> u64 {
1624 let mut total = 0u64;
1625 let Ok(rd) = std::fs::read_dir(dir) else {
1626 return total;
1627 };
1628 for entry in rd.flatten() {
1629 let Ok(ft) = entry.file_type() else { continue };
1630 if ft.is_dir() {
1631 total += dir_size(&entry.path());
1632 } else if let Ok(meta) = entry.metadata() {
1633 total += meta.len();
1634 }
1635 }
1636 total
1637}
1638
1639const DOC_TOKEN_SEP: char = '\u{1f}';
1643
1644const MULTIVECTOR_EXACT_DOC_THRESHOLD: usize = 10_000;
1648
1649const MULTIVECTOR_CANDIDATE_FACTOR: usize = 4;
1652
1653fn token_id(doc_id: &str, ordinal: usize) -> String {
1655 format!("{doc_id}{DOC_TOKEN_SEP}{ordinal}")
1656}
1657
1658fn parse_token_id(ext: &str) -> Option<(&str, u32)> {
1662 let (doc, ordinal) = ext.rsplit_once(DOC_TOKEN_SEP)?;
1663 Some((doc, ordinal.parse().ok()?))
1664}
1665
1666fn require_single_vector(handle: &CollectionHandle) -> Result<()> {
1668 if handle.descriptor.multivector {
1669 Err(Error::Unsupported(
1670 "collection is multi-vector; use upsert_document / search_multi_vector",
1671 ))
1672 } else {
1673 Ok(())
1674 }
1675}
1676
1677fn require_multivector(handle: &CollectionHandle) -> Result<()> {
1679 if handle.descriptor.multivector {
1680 Ok(())
1681 } else {
1682 Err(Error::Unsupported(
1683 "collection is single-vector; use upsert / search",
1684 ))
1685 }
1686}
1687
1688fn require_server_searchable(handle: &CollectionHandle) -> Result<()> {
1692 if handle.descriptor.vector_encryption == VectorEncryption::ClientSide {
1693 Err(Error::Unsupported(
1694 "collection is client-side encrypted; the server cannot rank opaque vectors — \
1695 fetch points and rank client-side",
1696 ))
1697 } else {
1698 Ok(())
1699 }
1700}
1701
1702fn to_index_metric(metric: DistanceMetric) -> Metric {
1703 match metric {
1704 DistanceMetric::Dot => Metric::Dot,
1705 DistanceMetric::Cosine => Metric::Cosine,
1706 DistanceMetric::L2 => Metric::L2,
1707 }
1708}
1709
1710fn validate_index(descriptor: &Descriptor) -> Result<()> {
1712 if descriptor.multivector && descriptor.metric == DistanceMetric::L2 {
1715 return Err(Error::Unsupported(
1716 "multi-vector collections require a similarity metric (cosine or dot)",
1717 ));
1718 }
1719 if descriptor.vector_encryption == VectorEncryption::ClientSide {
1723 if descriptor.multivector {
1724 return Err(Error::Unsupported(
1725 "client-side vector encryption is not supported for multi-vector collections",
1726 ));
1727 }
1728 return Ok(());
1729 }
1730 if descriptor.vector_encryption == VectorEncryption::Dcpe
1733 && descriptor.metric != DistanceMetric::L2
1734 {
1735 return Err(Error::Unsupported(
1736 "dcpe-encrypted collections require the l2 metric",
1737 ));
1738 }
1739 if descriptor.index.kind == IndexKind::Colbert && !descriptor.multivector {
1742 return Err(Error::Unsupported(
1743 "the colbert index is only for multi-vector collections",
1744 ));
1745 }
1746 match descriptor.index.kind {
1747 IndexKind::Vamana | IndexKind::Ivf | IndexKind::DiskVamana
1748 if descriptor.metric == DistanceMetric::Dot =>
1749 {
1750 Err(Error::Unsupported(
1751 "vamana, ivf, and the disk index support l2 and cosine; use hnsw for dot",
1752 ))
1753 }
1754 _ => Ok(()),
1755 }
1756}
1757
1758fn empty_index(descriptor: &Descriptor) -> CollectionIndex {
1760 if descriptor.vector_encryption == VectorEncryption::ClientSide {
1761 return CollectionIndex::None;
1762 }
1763 match descriptor.index.kind {
1764 IndexKind::Vamana => CollectionIndex::Vamana(None),
1765 IndexKind::DiskVamana => CollectionIndex::Disk(None),
1766 IndexKind::Ivf => CollectionIndex::Ivf(None),
1767 IndexKind::Colbert => CollectionIndex::Colbert(None),
1768 _ => CollectionIndex::Hnsw(Hnsw::new(
1769 descriptor.dim as usize,
1770 to_index_metric(descriptor.metric),
1771 HnswConfig::default(),
1772 )),
1773 }
1774}
1775
1776fn default_pq_m(dim: usize) -> usize {
1779 let target = (dim / 8).max(1);
1780 (1..=target)
1781 .rev()
1782 .find(|&m| dim.is_multiple_of(m))
1783 .unwrap_or(1)
1784}
1785
1786const PQ_SEED: u64 = 0x5176_5044_5141_5453;
1789const DISK_INDEX_FILE: &str = "vamana.qvx";
1792
1793fn build_index(
1794 store: &Store,
1795 cid: CollectionId,
1796 descriptor: &Descriptor,
1797 ids: &[u64],
1798 flat: &[f32],
1799) -> Result<CollectionIndex> {
1800 Ok(match build_in_memory_index(descriptor, ids, flat)? {
1801 Some(index) => index,
1802 None => {
1803 let (graph, pq) = build_disk_graph_pq(descriptor, ids, flat)?;
1804 CollectionIndex::Disk(Some(FreshDiskVamana::new(write_disk_index(
1805 store, cid, &graph, &pq,
1806 )?)?))
1807 }
1808 })
1809}
1810
1811fn build_in_memory_index(
1816 descriptor: &Descriptor,
1817 ids: &[u64],
1818 flat: &[f32],
1819) -> Result<Option<CollectionIndex>> {
1820 if descriptor.vector_encryption == VectorEncryption::ClientSide {
1823 return Ok(Some(CollectionIndex::None));
1824 }
1825 let dim = descriptor.dim as usize;
1826 let metric = to_index_metric(descriptor.metric);
1827 Ok(Some(match descriptor.index.kind {
1828 IndexKind::Vamana => CollectionIndex::Vamana(Some(FreshVamana::new(Vamana::build(
1829 ids,
1830 flat,
1831 dim,
1832 metric,
1833 VamanaConfig::default(),
1834 )?)?)),
1835 IndexKind::DiskVamana => return Ok(None),
1837 IndexKind::Ivf => {
1838 let cfg = IvfConfig {
1839 quantization: descriptor.index.pq_subspaces.map(|m| m as usize),
1840 ..IvfConfig::default()
1841 };
1842 CollectionIndex::Ivf(Some(Ivf::build(ids, flat, dim, metric, cfg)?))
1843 }
1844 IndexKind::Colbert => {
1845 let n = ids.len();
1848 let n_centroids = ((n as f64).sqrt().ceil() as usize).clamp(1, 4096);
1849 let cfg = ColbertConfig {
1850 n_centroids,
1851 n_probe: n_centroids.div_ceil(4).clamp(1, n_centroids),
1852 pq_subspaces: descriptor
1853 .index
1854 .pq_subspaces
1855 .map_or_else(|| default_pq_m(dim), |m| m as usize),
1856 seed: PQ_SEED,
1857 };
1858 CollectionIndex::Colbert(Some(ColbertIndex::build(ids, flat, dim, metric, cfg)?))
1859 }
1860 _ => {
1861 let mut h = Hnsw::new(dim, metric, HnswConfig::default());
1862 for (i, &id) in ids.iter().enumerate() {
1863 h.insert(id, &flat[i * dim..(i + 1) * dim])?;
1864 }
1865 CollectionIndex::Hnsw(h)
1866 }
1867 }))
1868}
1869
1870fn build_disk_graph_pq(
1874 descriptor: &Descriptor,
1875 ids: &[u64],
1876 flat: &[f32],
1877) -> Result<(Vamana, ProductQuantizer)> {
1878 let dim = descriptor.dim as usize;
1879 let metric = to_index_metric(descriptor.metric);
1880 let graph = Vamana::build(ids, flat, dim, metric, VamanaConfig::default())?;
1881 let m = descriptor
1882 .index
1883 .pq_subspaces
1884 .map_or_else(|| default_pq_m(dim), |x| x as usize);
1885 let pq = ProductQuantizer::train(flat, ids.len(), dim, m, metric, PQ_SEED)?;
1886 Ok((graph, pq))
1887}
1888
1889fn write_disk_index(
1894 store: &Store,
1895 cid: CollectionId,
1896 graph: &Vamana,
1897 pq: &ProductQuantizer,
1898) -> Result<DiskVamana> {
1899 let dir = store.index_dir(cid);
1900 std::fs::create_dir_all(&dir).map_err(quiver_index::DiskError::Io)?;
1901 let path = dir.join(DISK_INDEX_FILE);
1902 let codec = store.collection_codec_clone(cid)?;
1906 quiver_index::disk::write(&path, graph, pq, codec.as_ref())?;
1907 Ok(DiskVamana::open(&path, codec)?)
1908}
1909
1910fn load_index(store: &Store, handle: &mut CollectionHandle) -> Result<()> {
1915 if !handle.descriptor.multivector
1918 && handle.descriptor.index.kind == IndexKind::Ivf
1919 && let Ok(Some(blob)) = store.read_index_snapshot(handle.id)
1920 && restore_ivf_snapshot(store, handle, &blob).is_ok()
1921 {
1922 return Ok(());
1923 }
1924 rebuild_index(store, handle)
1925}
1926
1927fn restore_ivf_snapshot(store: &Store, handle: &mut CollectionHandle, blob: &[u8]) -> Result<()> {
1932 let envelope: IndexEnvelope = postcard::from_bytes(blob)?;
1933 if envelope.version != INDEX_ENVELOPE_VERSION {
1934 return Err(Error::Unsupported(
1935 "unsupported index snapshot envelope version",
1936 ));
1937 }
1938 let ivf = Ivf::restore(&envelope.ivf)?;
1939 handle.ext_to_int = envelope
1940 .int_to_ext
1941 .iter()
1942 .enumerate()
1943 .map(|(i, ext)| (ext.clone(), i as u64))
1944 .collect();
1945 handle.int_to_ext = envelope.int_to_ext;
1946 handle.index = CollectionIndex::Ivf(Some(ivf));
1947 handle.stale = false;
1948
1949 let tail = store.recovery_tail(handle.id)?;
1950 for ext in &tail.deleted {
1951 let Some(&internal) = handle.ext_to_int.get(ext) else {
1952 continue;
1953 };
1954 if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
1955 ivf.remove(internal);
1956 }
1957 }
1958 for (ext, record) in tail.upserts {
1959 let internal = match handle.ext_to_int.get(&ext) {
1960 Some(&i) => i,
1961 None => {
1962 let i = handle.int_to_ext.len() as u64;
1963 handle.ext_to_int.insert(ext.clone(), i);
1964 handle.int_to_ext.push(ext);
1965 i
1966 }
1967 };
1968 if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
1969 ivf.insert(internal, &record.vector)?;
1970 }
1971 }
1972 Ok(())
1973}
1974
1975fn index_upsert_point(handle: &mut CollectionHandle, ext_id: &str, vector: &[f32]) -> Result<()> {
1984 bump_write_gen(handle);
1987 if handle.stale {
1988 return Ok(());
1989 }
1990 let known = handle.ext_to_int.contains_key(ext_id);
1991 let is_hnsw = matches!(handle.index, CollectionIndex::Hnsw(_));
1992 let is_live_ivf = matches!(&handle.index, CollectionIndex::Ivf(Some(ivf)) if !ivf.is_empty());
1993 let is_live_graph = matches!(
1994 handle.index,
1995 CollectionIndex::Vamana(Some(_)) | CollectionIndex::Disk(Some(_))
1996 );
1997 let is_live_colbert = matches!(handle.index, CollectionIndex::Colbert(Some(_)));
1998 if is_hnsw && !known {
1999 let internal = handle.int_to_ext.len() as u64;
2000 if let CollectionIndex::Hnsw(h) = &mut handle.index {
2001 h.insert(internal, vector)?;
2002 }
2003 handle.ext_to_int.insert(ext_id.to_owned(), internal);
2004 handle.int_to_ext.push(ext_id.to_owned());
2005 } else if is_live_ivf {
2006 let internal = if known {
2009 handle.ext_to_int[ext_id]
2010 } else {
2011 let i = handle.int_to_ext.len() as u64;
2012 handle.ext_to_int.insert(ext_id.to_owned(), i);
2013 handle.int_to_ext.push(ext_id.to_owned());
2014 i
2015 };
2016 if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2017 ivf.insert(internal, vector)?;
2018 }
2019 } else if is_live_graph {
2020 let old = handle.ext_to_int.get(ext_id).copied();
2023 let internal = handle.int_to_ext.len() as u64;
2024 let mut pending = 0.0;
2025 match &mut handle.index {
2026 CollectionIndex::Vamana(Some(fresh)) => {
2027 if let Some(o) = old {
2028 fresh.mark_deleted(o);
2029 }
2030 fresh.insert(internal, vector)?;
2031 pending = fresh.pending_fraction();
2032 }
2033 CollectionIndex::Disk(Some(fresh)) => {
2034 if let Some(o) = old {
2035 fresh.mark_deleted(o);
2036 }
2037 fresh.insert(internal, vector)?;
2038 pending = fresh.pending_fraction();
2039 }
2040 _ => {}
2041 }
2042 handle.ext_to_int.insert(ext_id.to_owned(), internal);
2043 handle.int_to_ext.push(ext_id.to_owned());
2044 if pending >= GRAPH_REBUILD_PENDING_FRACTION {
2045 mark_stale(handle);
2046 }
2047 } else if is_live_colbert {
2048 let old = handle.ext_to_int.get(ext_id).copied();
2052 let internal = handle.int_to_ext.len() as u64;
2053 let mut crowded = false;
2054 if let CollectionIndex::Colbert(Some(c)) = &mut handle.index {
2055 if let Some(o) = old {
2056 c.mark_deleted(o);
2057 }
2058 c.insert(internal, vector)?;
2059 crowded = c.deleted_fraction() >= HNSW_REBUILD_DELETED_FRACTION;
2060 }
2061 handle.ext_to_int.insert(ext_id.to_owned(), internal);
2062 handle.int_to_ext.push(ext_id.to_owned());
2063 if crowded {
2064 mark_stale(handle);
2065 }
2066 } else {
2067 mark_stale(handle);
2068 }
2069 Ok(())
2070}
2071
2072fn index_delete_point(handle: &mut CollectionHandle, ext_id: &str) {
2079 bump_write_gen(handle);
2081 if handle.stale {
2082 return;
2083 }
2084 let internal = handle.ext_to_int.get(ext_id).copied();
2085 let live_ivf = matches!(handle.index, CollectionIndex::Ivf(Some(_)));
2086 let live_hnsw = matches!(handle.index, CollectionIndex::Hnsw(_));
2087 let live_graph = matches!(
2088 handle.index,
2089 CollectionIndex::Vamana(Some(_)) | CollectionIndex::Disk(Some(_))
2090 );
2091 let live_colbert = matches!(handle.index, CollectionIndex::Colbert(Some(_)));
2092 match internal {
2093 Some(internal) if live_ivf => {
2094 if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2095 ivf.remove(internal);
2096 }
2097 }
2098 Some(internal) if live_hnsw => {
2099 let mut crowded = false;
2100 if let CollectionIndex::Hnsw(h) = &mut handle.index {
2101 h.mark_deleted(internal as u32);
2102 crowded = h.deleted_fraction() >= HNSW_REBUILD_DELETED_FRACTION;
2103 }
2104 if crowded {
2105 mark_stale(handle);
2106 }
2107 }
2108 Some(internal) if live_graph => {
2109 let mut crowded = false;
2110 match &mut handle.index {
2111 CollectionIndex::Vamana(Some(fresh)) => {
2112 fresh.mark_deleted(internal);
2113 crowded = fresh.pending_fraction() >= GRAPH_REBUILD_PENDING_FRACTION;
2114 }
2115 CollectionIndex::Disk(Some(fresh)) => {
2116 fresh.mark_deleted(internal);
2117 crowded = fresh.pending_fraction() >= GRAPH_REBUILD_PENDING_FRACTION;
2118 }
2119 _ => {}
2120 }
2121 if crowded {
2122 mark_stale(handle);
2123 }
2124 }
2125 Some(internal) if live_colbert => {
2126 let mut crowded = false;
2127 if let CollectionIndex::Colbert(Some(c)) = &mut handle.index {
2128 c.mark_deleted(internal);
2129 crowded = c.deleted_fraction() >= HNSW_REBUILD_DELETED_FRACTION;
2130 }
2131 if crowded {
2132 mark_stale(handle);
2133 }
2134 }
2135 _ => mark_stale(handle),
2136 }
2137}
2138
2139struct RebuildScan {
2144 int_to_ext: Vec<String>,
2145 ext_to_int: HashMap<String, u64>,
2146 flat: Vec<f32>,
2147 docs: Option<BTreeMap<String, u32>>,
2148 sparse: Option<SparseInvertedIndex>,
2149}
2150
2151fn scan_collection(store: &Store, handle: &CollectionHandle) -> Result<RebuildScan> {
2156 let multivector = handle.descriptor.multivector;
2157 let mut int_to_ext = Vec::new();
2158 let mut ext_to_int = HashMap::new();
2159 let mut flat: Vec<f32> = Vec::new();
2160 let mut docs: BTreeMap<String, u32> = BTreeMap::new();
2161 let mut sparse = uses_sparse_index(&handle.descriptor).then(SparseInvertedIndex::new);
2164 for (ext_id, record) in store.scan(handle.id)? {
2165 let internal = int_to_ext.len() as u64;
2166 flat.extend_from_slice(&record.vector);
2167 if multivector && let Some((doc, _)) = parse_token_id(&ext_id) {
2168 *docs.entry(doc.to_owned()).or_insert(0) += 1;
2169 }
2170 if let Some(idx) = sparse.as_mut()
2171 && let Some(sv) = sparse_vector_from_payload(&record.payload)
2172 {
2173 idx.upsert(&ext_id, &sv);
2174 }
2175 ext_to_int.insert(ext_id.clone(), internal);
2176 int_to_ext.push(ext_id);
2177 }
2178 Ok(RebuildScan {
2179 int_to_ext,
2180 ext_to_int,
2181 flat,
2182 docs: multivector.then_some(docs),
2183 sparse,
2184 })
2185}
2186
2187fn rebuild_index(store: &Store, handle: &mut CollectionHandle) -> Result<()> {
2191 let scan = scan_collection(store, handle)?;
2192 let ids: Vec<u64> = (0..scan.int_to_ext.len() as u64).collect();
2193 handle.index = empty_index(&handle.descriptor);
2196 handle.index = build_index(store, handle.id, &handle.descriptor, &ids, &scan.flat)?;
2197 handle.int_to_ext = scan.int_to_ext;
2198 handle.ext_to_int = scan.ext_to_int;
2199 handle.docs = scan.docs;
2200 handle.sparse = scan.sparse;
2201 handle.stale = false;
2202 Ok(())
2203}
2204
2205pub struct RebuildInputs {
2211 collection: String,
2212 descriptor: Descriptor,
2213 scan: RebuildScan,
2214 write_gen: u64,
2215}
2216
2217enum RebuiltKind {
2222 Ready(Box<CollectionIndex>),
2223 Disk {
2224 graph: Box<Vamana>,
2225 pq: Box<ProductQuantizer>,
2226 },
2227}
2228
2229pub struct RebuiltIndex {
2232 collection: String,
2233 kind: RebuiltKind,
2234 int_to_ext: Vec<String>,
2235 ext_to_int: HashMap<String, u64>,
2236 docs: Option<BTreeMap<String, u32>>,
2237 sparse: Option<SparseInvertedIndex>,
2238 write_gen: u64,
2239}
2240
2241impl RebuildInputs {
2242 pub fn build(self) -> Result<RebuiltIndex> {
2247 let ids: Vec<u64> = (0..self.scan.int_to_ext.len() as u64).collect();
2248 let kind = match build_in_memory_index(&self.descriptor, &ids, &self.scan.flat)? {
2249 Some(index) => RebuiltKind::Ready(Box::new(index)),
2250 None => {
2251 let (graph, pq) = build_disk_graph_pq(&self.descriptor, &ids, &self.scan.flat)?;
2252 RebuiltKind::Disk {
2253 graph: Box::new(graph),
2254 pq: Box::new(pq),
2255 }
2256 }
2257 };
2258 Ok(RebuiltIndex {
2259 collection: self.collection,
2260 kind,
2261 int_to_ext: self.scan.int_to_ext,
2262 ext_to_int: self.scan.ext_to_int,
2263 docs: self.scan.docs,
2264 sparse: self.scan.sparse,
2265 write_gen: self.write_gen,
2266 })
2267 }
2268}
2269
2270fn sparse_vector_from_payload(payload: &[u8]) -> Option<SparseVector> {
2274 if payload.is_empty() {
2275 return None;
2276 }
2277 let value = serde_json::from_slice::<Value>(payload).ok()?;
2278 sparse_vector_from_value(&value)
2279}
2280
2281fn sparse_vector_from_value(payload: &Value) -> Option<SparseVector> {
2287 if let Some(raw) = payload.get(SPARSE_KEY) {
2288 return serde_json::from_value::<SparseVector>(raw.clone()).ok();
2289 }
2290 let text = payload.get(TEXT_KEY)?.as_str()?;
2291 Some(text_to_sparse(text))
2292}
2293
2294fn sparse_index_upsert_point(handle: &mut CollectionHandle, ext_id: &str, payload: &Value) {
2300 if handle.stale {
2301 return;
2302 }
2303 let Some(idx) = handle.sparse.as_mut() else {
2304 return;
2305 };
2306 match sparse_vector_from_value(payload) {
2307 Some(sv) => idx.upsert(ext_id, &sv),
2308 None => {
2309 idx.remove(ext_id);
2310 }
2311 }
2312}
2313
2314fn sparse_index_delete_point(handle: &mut CollectionHandle, ext_id: &str) {
2317 if let Some(idx) = handle.sparse.as_mut() {
2318 idx.remove(ext_id);
2319 }
2320}
2321
2322fn candidate_ids(
2334 store: &Store,
2335 cid: CollectionId,
2336 filter: &Filter,
2337 filterable: &[FilterableField],
2338) -> Result<Option<BTreeSet<String>>> {
2339 match filter {
2340 Filter::And(subs) => {
2341 let mut acc: Option<BTreeSet<String>> = None;
2344 for sub in subs {
2345 if let Some(set) = candidate_ids(store, cid, sub, filterable)? {
2346 acc = Some(match acc {
2347 Some(existing) => existing.intersection(&set).cloned().collect(),
2348 None => set,
2349 });
2350 }
2351 }
2352 Ok(acc)
2353 }
2354 Filter::Or(subs) => {
2355 let mut acc = BTreeSet::new();
2358 for sub in subs {
2359 match candidate_ids(store, cid, sub, filterable)? {
2360 Some(set) => acc.extend(set),
2361 None => return Ok(None),
2362 }
2363 }
2364 Ok(Some(acc))
2365 }
2366 Filter::Not(_) => Ok(None),
2368 leaf => match leaf_predicate(leaf, filterable) {
2370 Some(pred) => Ok(Some(store.matching_ids(cid, &pred)?.into_iter().collect())),
2371 None => Ok(None),
2372 },
2373 }
2374}
2375
2376fn leaf_predicate(filter: &Filter, filterable: &[FilterableField]) -> Option<SecPredicate> {
2380 let field_type = |field: &str| {
2381 filterable
2382 .iter()
2383 .find(|f| f.path == field)
2384 .map(|f| f.field_type)
2385 };
2386 match filter {
2387 Filter::Eq { field, value } => Some(SecPredicate::Eq {
2388 field: field.clone(),
2389 value: sec_value(field_type(field)?, value)?,
2390 }),
2391 Filter::In { field, values } => {
2392 let ft = field_type(field)?;
2393 let values: Option<Vec<SecValue>> = values.iter().map(|v| sec_value(ft, v)).collect();
2396 Some(SecPredicate::In {
2397 field: field.clone(),
2398 values: values?,
2399 })
2400 }
2401 Filter::Lt { field, value } => {
2402 one_sided_range(field, field_type(field)?, value, false, false)
2403 }
2404 Filter::Lte { field, value } => {
2405 one_sided_range(field, field_type(field)?, value, false, true)
2406 }
2407 Filter::Gt { field, value } => {
2408 one_sided_range(field, field_type(field)?, value, true, false)
2409 }
2410 Filter::Gte { field, value } => {
2411 one_sided_range(field, field_type(field)?, value, true, true)
2412 }
2413 _ => None,
2414 }
2415}
2416
2417fn one_sided_range(
2421 field: &str,
2422 field_type: FieldType,
2423 value: &Value,
2424 is_lower: bool,
2425 inclusive: bool,
2426) -> Option<SecPredicate> {
2427 let v = sec_value(field_type, value)?;
2428 let (lo, hi, lo_inclusive, hi_inclusive) = if is_lower {
2429 (Some(v), None, inclusive, false)
2430 } else {
2431 (None, Some(v), false, inclusive)
2432 };
2433 Some(SecPredicate::Range {
2434 field: field.to_owned(),
2435 lo,
2436 hi,
2437 lo_inclusive,
2438 hi_inclusive,
2439 })
2440}
2441
2442fn sec_value(field_type: FieldType, value: &Value) -> Option<SecValue> {
2446 match (field_type, value) {
2447 (FieldType::Keyword, Value::String(s)) => Some(SecValue::Keyword(s.clone())),
2448 (FieldType::Numeric, Value::Number(n)) => n.as_f64().map(SecValue::Numeric),
2449 _ => None,
2450 }
2451}
2452
2453#[cfg(test)]
2454mod tests {
2455 use super::*;
2456 use serde_json::json;
2457
2458 fn desc() -> Descriptor {
2459 Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
2460 }
2461
2462 fn open(dir: &Path) -> Database {
2463 Database::open(dir).unwrap()
2464 }
2465
2466 #[test]
2467 fn hybrid_search_fuses_dense_and_sparse() {
2468 let tmp = tempfile::tempdir().unwrap();
2469 let mut db = open(tmp.path());
2470 db.create_collection("kb", desc()).unwrap();
2471 db.upsert(
2474 "kb",
2475 "a",
2476 &[1.0, 0.0, 0.0, 0.0],
2477 &json!({ "__quiver_sparse__": { "indices": [100], "values": [0.1] } }),
2478 )
2479 .unwrap();
2480 db.upsert(
2481 "kb",
2482 "b",
2483 &[0.0, 1.0, 0.0, 0.0],
2484 &json!({ "__quiver_sparse__": { "indices": [1, 2], "values": [5.0, 5.0] } }),
2485 )
2486 .unwrap();
2487 db.upsert(
2488 "kb",
2489 "c",
2490 &[0.0, 0.0, 0.0, 1.0],
2491 &json!({ "__quiver_sparse__": { "indices": [9], "values": [1.0] } }),
2492 )
2493 .unwrap();
2494
2495 let dense_q = [1.0, 0.0, 0.0, 0.0];
2496 let sparse_q = SparseVector {
2497 indices: vec![1, 2],
2498 values: vec![1.0, 1.0],
2499 };
2500 let params = SearchParams {
2501 k: 3,
2502 ..SearchParams::default()
2503 };
2504
2505 let hits = db
2507 .hybrid_search(
2508 "kb",
2509 Some(&dense_q),
2510 Some(&sparse_q),
2511 None,
2512 ¶ms,
2513 DEFAULT_RRF_K0,
2514 )
2515 .unwrap();
2516 let ids: Vec<&str> = hits.iter().map(|m| m.id.as_str()).collect();
2517 assert!(ids.contains(&"a") && ids.contains(&"b"), "got {ids:?}");
2518 assert_eq!(ids[2], "c", "c is worst on both sides; got {ids:?}");
2519
2520 let sparse_only = db
2522 .hybrid_search("kb", None, Some(&sparse_q), None, ¶ms, DEFAULT_RRF_K0)
2523 .unwrap();
2524 assert_eq!(sparse_only[0].id, "b");
2525
2526 let dense_only = db
2528 .hybrid_search("kb", Some(&dense_q), None, None, ¶ms, DEFAULT_RRF_K0)
2529 .unwrap();
2530 assert_eq!(dense_only[0].id, "a");
2531
2532 assert!(
2534 db.hybrid_search("kb", None, None, None, ¶ms, DEFAULT_RRF_K0)
2535 .is_err()
2536 );
2537 }
2538
2539 fn sparse_ids(db: &mut Database, q: &SparseVector) -> Vec<String> {
2541 let params = SearchParams {
2542 k: 10,
2543 ..SearchParams::default()
2544 };
2545 db.hybrid_search("kb", None, Some(q), None, ¶ms, DEFAULT_RRF_K0)
2546 .unwrap()
2547 .into_iter()
2548 .map(|m| m.id)
2549 .collect()
2550 }
2551
2552 #[test]
2553 fn sparse_index_equals_the_store_scan_fallback() {
2554 let tmp = tempfile::tempdir().unwrap();
2555 let mut db = open(tmp.path());
2556 db.create_collection("kb", desc()).unwrap();
2557 let z = [0.0f32, 0.0, 0.0, 0.0];
2558 for (id, dims, vals) in [
2559 ("a", vec![1u32, 2], vec![5.0f32, 1.0]),
2560 ("b", vec![2u32, 3], vec![3.0f32, 4.0]),
2561 ("c", vec![1u32, 3], vec![2.0f32, 2.0]),
2562 ("d", vec![9u32], vec![1.0f32]), ] {
2564 db.upsert(
2565 "kb",
2566 id,
2567 &z,
2568 &json!({ "__quiver_sparse__": { "indices": dims, "values": vals } }),
2569 )
2570 .unwrap();
2571 }
2572 let q = SparseVector {
2573 indices: vec![1, 2, 3],
2574 values: vec![1.0, 1.0, 1.0],
2575 };
2576
2577 assert!(db.collections.get("kb").unwrap().sparse.is_some());
2579 let via_index = sparse_ids(&mut db, &q);
2580 assert!(!via_index.contains(&"d".to_owned()), "d shares no term");
2581
2582 db.collections.get_mut("kb").unwrap().sparse = None;
2585 let via_scan = sparse_ids(&mut db, &q);
2586 assert_eq!(via_index, via_scan);
2587 }
2588
2589 #[test]
2590 fn sparse_index_reflects_updates_and_deletes_like_a_rebuild() {
2591 let tmp = tempfile::tempdir().unwrap();
2592 let mut db = open(tmp.path());
2593 db.create_collection("kb", desc()).unwrap();
2594 let z = [0.0f32, 0.0, 0.0, 0.0];
2595 db.upsert(
2596 "kb",
2597 "a",
2598 &z,
2599 &json!({ "__quiver_sparse__": { "indices": [1, 2], "values": [5.0, 5.0] } }),
2600 )
2601 .unwrap();
2602 db.upsert(
2603 "kb",
2604 "b",
2605 &z,
2606 &json!({ "__quiver_sparse__": { "indices": [2], "values": [3.0] } }),
2607 )
2608 .unwrap();
2609 db.upsert(
2610 "kb",
2611 "c",
2612 &z,
2613 &json!({ "__quiver_sparse__": { "indices": [1], "values": [9.0] } }),
2614 )
2615 .unwrap();
2616 db.upsert(
2618 "kb",
2619 "a",
2620 &z,
2621 &json!({ "__quiver_sparse__": { "indices": [7], "values": [1.0] } }),
2622 )
2623 .unwrap();
2624 assert!(db.delete("kb", "b").unwrap());
2625
2626 let q = SparseVector {
2627 indices: vec![1, 2],
2628 values: vec![1.0, 1.0],
2629 };
2630 let incremental = sparse_ids(&mut db, &q);
2632 assert_eq!(incremental, vec!["c".to_owned()]);
2633
2634 db.collections.get_mut("kb").unwrap().stale = true;
2636 let rebuilt = sparse_ids(&mut db, &q);
2637 assert_eq!(incremental, rebuilt);
2638 }
2639
2640 #[test]
2641 fn sparse_index_is_rebuilt_on_reopen() {
2642 let tmp = tempfile::tempdir().unwrap();
2643 {
2644 let mut db = open(tmp.path());
2645 db.create_collection("kb", desc()).unwrap();
2646 db.upsert(
2647 "kb",
2648 "a",
2649 &[0.0, 0.0, 0.0, 0.0],
2650 &json!({ "__quiver_sparse__": { "indices": [1], "values": [1.0] } }),
2651 )
2652 .unwrap();
2653 }
2654 let mut db = open(tmp.path());
2655 assert!(db.collections.get("kb").unwrap().sparse.is_some());
2656 let q = SparseVector {
2657 indices: vec![1],
2658 values: vec![1.0],
2659 };
2660 assert_eq!(sparse_ids(&mut db, &q), vec!["a".to_owned()]);
2661 }
2662
2663 #[test]
2664 fn hybrid_sparse_honours_the_payload_filter() {
2665 let tmp = tempfile::tempdir().unwrap();
2666 let mut db = open(tmp.path());
2667 db.create_collection("kb", desc()).unwrap();
2668 let z = [0.0f32, 0.0, 0.0, 0.0];
2669 db.upsert(
2670 "kb",
2671 "a",
2672 &z,
2673 &json!({ "lang": "en", "__quiver_sparse__": { "indices": [1], "values": [5.0] } }),
2674 )
2675 .unwrap();
2676 db.upsert(
2677 "kb",
2678 "b",
2679 &z,
2680 &json!({ "lang": "fr", "__quiver_sparse__": { "indices": [1], "values": [9.0] } }),
2681 )
2682 .unwrap();
2683 let q = SparseVector {
2684 indices: vec![1],
2685 values: vec![1.0],
2686 };
2687 let params = SearchParams {
2688 k: 10,
2689 filter: Some(Filter::Eq {
2690 field: "lang".to_owned(),
2691 value: json!("en"),
2692 }),
2693 ..SearchParams::default()
2694 };
2695 let hits: Vec<String> = db
2696 .hybrid_search("kb", None, Some(&q), None, ¶ms, DEFAULT_RRF_K0)
2697 .unwrap()
2698 .into_iter()
2699 .map(|m| m.id)
2700 .collect();
2701 assert_eq!(hits, vec!["a".to_owned()]);
2703 }
2704
2705 #[test]
2706 fn hybrid_text_search_indexes_and_ranks_by_bm25() {
2707 let tmp = tempfile::tempdir().unwrap();
2708 let mut db = open(tmp.path());
2709 db.create_collection("kb", desc()).unwrap();
2710 let z = [0.0f32, 0.0, 0.0, 0.0];
2711 db.upsert(
2713 "kb",
2714 "cats",
2715 &z,
2716 &json!({ "__quiver_text__": "the quick brown cat jumps" }),
2717 )
2718 .unwrap();
2719 db.upsert(
2720 "kb",
2721 "dogs",
2722 &z,
2723 &json!({ "__quiver_text__": "a lazy dog sleeps all day" }),
2724 )
2725 .unwrap();
2726
2727 let params = SearchParams {
2728 k: 10,
2729 ..SearchParams::default()
2730 };
2731 let hits: Vec<String> = db
2734 .hybrid_search("kb", None, None, Some("cats"), ¶ms, DEFAULT_RRF_K0)
2735 .unwrap()
2736 .into_iter()
2737 .map(|m| m.id)
2738 .collect();
2739 assert_eq!(hits, vec!["cats".to_owned()], "only the cat doc matches");
2740
2741 assert!(
2743 db.hybrid_search("kb", None, None, Some("elephant"), ¶ms, DEFAULT_RRF_K0)
2744 .unwrap()
2745 .is_empty()
2746 );
2747
2748 let dense_q = [1.0, 0.0, 0.0, 0.0];
2750 db.upsert("kb", "near", &[1.0, 0.0, 0.0, 0.0], &json!({}))
2751 .unwrap();
2752 let fused: Vec<String> = db
2753 .hybrid_search(
2754 "kb",
2755 Some(&dense_q),
2756 None,
2757 Some("dog"),
2758 ¶ms,
2759 DEFAULT_RRF_K0,
2760 )
2761 .unwrap()
2762 .into_iter()
2763 .map(|m| m.id)
2764 .collect();
2765 assert!(
2766 fused.contains(&"near".to_owned()) && fused.contains(&"dogs".to_owned()),
2767 "dense match + lexical match both surface; got {fused:?}"
2768 );
2769 }
2770
2771 #[test]
2772 fn create_upsert_search_get_end_to_end() {
2773 let tmp = tempfile::tempdir().unwrap();
2774 let mut db = open(tmp.path());
2775 db.create_collection("items", desc()).unwrap();
2776 db.upsert(
2777 "items",
2778 "a",
2779 &[0.0, 0.0, 0.0, 0.0],
2780 &json!({"color": "red"}),
2781 )
2782 .unwrap();
2783 db.upsert(
2784 "items",
2785 "b",
2786 &[1.0, 0.0, 0.0, 0.0],
2787 &json!({"color": "blue"}),
2788 )
2789 .unwrap();
2790 db.upsert(
2791 "items",
2792 "c",
2793 &[5.0, 5.0, 5.0, 5.0],
2794 &json!({"color": "red"}),
2795 )
2796 .unwrap();
2797
2798 let near = db
2799 .search("items", &[0.1, 0.0, 0.0, 0.0], &SearchParams::default())
2800 .unwrap();
2801 assert_eq!(near[0].id, "a");
2802 assert_eq!(near[1].id, "b");
2803
2804 let got = db.get("items", "c").unwrap().unwrap();
2805 assert_eq!(got.vector, Some(vec![5.0, 5.0, 5.0, 5.0]));
2806 assert_eq!(got.payload, Some(json!({"color": "red"})));
2807 }
2808
2809 #[test]
2810 fn upsert_batch_produces_same_search_results_as_sequential() {
2811 let tmp_seq = tempfile::tempdir().unwrap();
2812 let tmp_bat = tempfile::tempdir().unwrap();
2813
2814 let vectors: Vec<[f32; 4]> = (0..20u32).map(|i| [i as f32, 0.0, 0.0, 0.0]).collect();
2815 let ids: Vec<String> = (0..20u32).map(|i| format!("p{i}")).collect();
2816 let payload = json!({});
2817
2818 {
2820 let mut db = open(tmp_seq.path());
2821 db.create_collection("c", desc()).unwrap();
2822 for (id, vec) in ids.iter().zip(vectors.iter()) {
2823 db.upsert("c", id, vec, &payload).unwrap();
2824 }
2825 }
2826 {
2828 let mut db = open(tmp_bat.path());
2829 db.create_collection("c", desc()).unwrap();
2830 let pts: Vec<(&str, &[f32], &serde_json::Value)> = ids
2831 .iter()
2832 .zip(vectors.iter())
2833 .map(|(id, v)| (id.as_str(), v.as_slice(), &payload))
2834 .collect();
2835 let n = db.upsert_batch("c", &pts).unwrap();
2836 assert_eq!(n, 20);
2837 }
2838
2839 let query = [10.0f32, 0.0, 0.0, 0.0];
2840 let params = SearchParams {
2841 k: 5,
2842 ..Default::default()
2843 };
2844
2845 let mut seq_db = open(tmp_seq.path());
2846 let mut bat_db = open(tmp_bat.path());
2847 let seq: Vec<String> = seq_db
2848 .search("c", &query, ¶ms)
2849 .unwrap()
2850 .into_iter()
2851 .map(|m| m.id)
2852 .collect();
2853 let bat: Vec<String> = bat_db
2854 .search("c", &query, ¶ms)
2855 .unwrap()
2856 .into_iter()
2857 .map(|m| m.id)
2858 .collect();
2859 assert_eq!(
2860 seq, bat,
2861 "batch and sequential produce different search results"
2862 );
2863 }
2864
2865 #[test]
2866 fn upsert_bulk_defers_the_index_then_searches_correctly() {
2867 let tmp = tempfile::tempdir().unwrap();
2868 let mut db = open(tmp.path());
2869 db.create_collection("c", desc()).unwrap();
2870 let vectors: Vec<[f32; 4]> = (0..20u32).map(|i| [i as f32, 0.0, 0.0, 0.0]).collect();
2871 let ids: Vec<String> = (0..20u32).map(|i| format!("p{i}")).collect();
2872 let plain = json!({});
2875 let sparse_payload = json!({ "__quiver_sparse__": { "indices": [7], "values": [1.0] } });
2876 let pts: Vec<(&str, &[f32], &serde_json::Value)> = ids
2877 .iter()
2878 .zip(vectors.iter())
2879 .map(|(id, v)| {
2880 let payload = if id == "p3" { &sparse_payload } else { &plain };
2881 (id.as_str(), v.as_slice(), payload)
2882 })
2883 .collect();
2884 let n = db.upsert_bulk("c", &pts).unwrap();
2885 assert_eq!(n, 20);
2886
2887 assert!(db.collections.get("c").unwrap().stale);
2889
2890 let query = [10.0f32, 0.0, 0.0, 0.0];
2892 let params = SearchParams {
2893 k: 5,
2894 ..Default::default()
2895 };
2896 let hits: Vec<String> = db
2897 .search("c", &query, ¶ms)
2898 .unwrap()
2899 .into_iter()
2900 .map(|m| m.id)
2901 .collect();
2902 assert_eq!(hits[0], "p10", "nearest to 10 is p10; got {hits:?}");
2903 assert!(!db.collections.get("c").unwrap().stale, "rebuilt on read");
2904
2905 let q = SparseVector {
2907 indices: vec![7],
2908 values: vec![1.0],
2909 };
2910 let sparse_hits: Vec<String> = db
2911 .hybrid_search("c", None, Some(&q), None, ¶ms, DEFAULT_RRF_K0)
2912 .unwrap()
2913 .into_iter()
2914 .map(|m| m.id)
2915 .collect();
2916 assert_eq!(sparse_hits, vec!["p3".to_owned()]);
2917 }
2918
2919 #[test]
2920 fn filtered_search_only_returns_matching_payloads() {
2921 let tmp = tempfile::tempdir().unwrap();
2922 let mut db = open(tmp.path());
2923 db.create_collection("items", desc()).unwrap();
2924 for i in 0..20u32 {
2925 let color = if i % 2 == 0 { "red" } else { "blue" };
2926 db.upsert(
2927 "items",
2928 &format!("p{i}"),
2929 &[i as f32, 0.0, 0.0, 0.0],
2930 &json!({"color": color, "n": i}),
2931 )
2932 .unwrap();
2933 }
2934 let params = SearchParams {
2935 k: 5,
2936 filter: Some(Filter::Eq {
2937 field: "color".into(),
2938 value: json!("red"),
2939 }),
2940 ef_search: 64,
2941 with_payload: true,
2942 with_vector: false,
2943 };
2944 let results = db.search("items", &[0.0; 4], ¶ms).unwrap();
2945 assert!(!results.is_empty());
2946 for m in &results {
2947 assert_eq!(m.payload.as_ref().unwrap()["color"], json!("red"));
2948 }
2949 }
2950
2951 #[test]
2952 fn persists_and_rebuilds_index_on_reopen() {
2953 let tmp = tempfile::tempdir().unwrap();
2954 {
2955 let mut db = open(tmp.path());
2956 db.create_collection("items", desc()).unwrap();
2957 for i in 0..50u32 {
2958 db.upsert(
2959 "items",
2960 &format!("p{i}"),
2961 &[i as f32, 1.0, 2.0, 3.0],
2962 &json!({}),
2963 )
2964 .unwrap();
2965 }
2966 db.checkpoint().unwrap();
2967 }
2968 let mut db = open(tmp.path());
2969 assert_eq!(db.len("items").unwrap(), 50);
2970 let res = db
2971 .search("items", &[7.0, 1.0, 2.0, 3.0], &SearchParams::default())
2972 .unwrap();
2973 assert_eq!(res[0].id, "p7");
2974 }
2975
2976 #[test]
2977 fn update_reflects_new_vector_after_rebuild() {
2978 let tmp = tempfile::tempdir().unwrap();
2979 let mut db = open(tmp.path());
2980 db.create_collection("items", desc()).unwrap();
2981 db.upsert("items", "a", &[0.0; 4], &json!({})).unwrap();
2982 db.upsert("items", "b", &[10.0, 0.0, 0.0, 0.0], &json!({}))
2983 .unwrap();
2984 db.upsert("items", "a", &[100.0, 0.0, 0.0, 0.0], &json!({}))
2986 .unwrap();
2987 let res = db
2988 .search("items", &[0.0; 4], &SearchParams::default())
2989 .unwrap();
2990 assert_eq!(res[0].id, "b");
2991 assert_eq!(
2992 db.get("items", "a").unwrap().unwrap().vector,
2993 Some(vec![100.0, 0.0, 0.0, 0.0])
2994 );
2995 }
2996
2997 #[test]
2998 fn delete_removes_from_search() {
2999 let tmp = tempfile::tempdir().unwrap();
3000 let mut db = open(tmp.path());
3001 db.create_collection("items", desc()).unwrap();
3002 db.upsert("items", "a", &[0.0; 4], &json!({})).unwrap();
3003 db.upsert("items", "b", &[1.0, 0.0, 0.0, 0.0], &json!({}))
3004 .unwrap();
3005 assert!(db.delete("items", "a").unwrap());
3006 let res = db
3007 .search("items", &[0.0; 4], &SearchParams::default())
3008 .unwrap();
3009 assert!(res.iter().all(|m| m.id != "a"));
3010 assert!(db.get("items", "a").unwrap().is_none());
3011 }
3012
3013 #[test]
3014 fn unknown_collection_errors() {
3015 let tmp = tempfile::tempdir().unwrap();
3016 let mut db = open(tmp.path());
3017 assert!(matches!(
3018 db.search("nope", &[0.0; 4], &SearchParams::default()),
3019 Err(Error::CollectionNotFound(_))
3020 ));
3021 db.create_collection("c", desc()).unwrap();
3022 assert!(matches!(
3023 db.create_collection("c", desc()),
3024 Err(Error::Core(quiver_core::CoreError::AlreadyExists(_)))
3025 ));
3026 }
3027
3028 fn desc_with(kind: IndexKind) -> Descriptor {
3029 Descriptor::new(4, Dtype::F32, DistanceMetric::L2).with_index(IndexSpec {
3030 kind,
3031 pq_subspaces: None,
3032 })
3033 }
3034
3035 #[test]
3036 fn vamana_and_ivf_collections_find_the_nearest_point() {
3037 for kind in [IndexKind::Vamana, IndexKind::Ivf] {
3038 let tmp = tempfile::tempdir().unwrap();
3039 let mut db = open(tmp.path());
3040 db.create_collection("c", desc_with(kind)).unwrap();
3041 for i in 0..40u32 {
3042 db.upsert(
3043 "c",
3044 &format!("p{i}"),
3045 &[i as f32, 0.0, 0.0, 0.0],
3046 &json!({}),
3047 )
3048 .unwrap();
3049 }
3050 let res = db
3052 .search("c", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
3053 .unwrap();
3054 assert_eq!(res[0].id, "p7", "{kind:?} nearest");
3055 }
3056 }
3057
3058 #[test]
3059 fn index_kind_persists_and_rebuilds_on_reopen() {
3060 let tmp = tempfile::tempdir().unwrap();
3061 {
3062 let mut db = open(tmp.path());
3063 db.create_collection("v", desc_with(IndexKind::Vamana))
3064 .unwrap();
3065 for i in 0..20u32 {
3066 db.upsert(
3067 "v",
3068 &format!("p{i}"),
3069 &[i as f32, 1.0, 2.0, 3.0],
3070 &json!({}),
3071 )
3072 .unwrap();
3073 }
3074 db.checkpoint().unwrap();
3075 }
3076 let mut db = open(tmp.path());
3077 assert_eq!(db.descriptor("v").unwrap().index.kind, IndexKind::Vamana);
3078 let res = db
3079 .search("v", &[7.0, 1.0, 2.0, 3.0], &SearchParams::default())
3080 .unwrap();
3081 assert_eq!(res[0].id, "p7");
3082 }
3083
3084 #[test]
3085 fn ivf_upserts_and_deletes_incrementally_without_rebuild() {
3086 let tmp = tempfile::tempdir().unwrap();
3087 let mut db = open(tmp.path());
3088 db.create_collection("c", desc_with(IndexKind::Ivf))
3089 .unwrap();
3090 for i in 0..50u32 {
3091 db.upsert(
3092 "c",
3093 &format!("p{i}"),
3094 &[i as f32, 0.0, 0.0, 0.0],
3095 &json!({}),
3096 )
3097 .unwrap();
3098 }
3099 let _ = db
3101 .search("c", &[1.0, 0.0, 0.0, 0.0], &SearchParams::default())
3102 .unwrap();
3103 assert!(!db.collections["c"].stale, "the search built the index");
3104
3105 db.upsert("c", "far", &[500.0, 0.0, 0.0, 0.0], &json!({}))
3107 .unwrap();
3108 assert!(!db.collections["c"].stale, "ivf insert stayed incremental");
3109 let res = db
3110 .search("c", &[500.0, 0.0, 0.0, 0.0], &SearchParams::default())
3111 .unwrap();
3112 assert_eq!(res[0].id, "far");
3113
3114 assert!(db.delete("c", "far").unwrap());
3116 assert!(!db.collections["c"].stale, "ivf delete stayed incremental");
3117 let res = db
3118 .search("c", &[500.0, 0.0, 0.0, 0.0], &SearchParams::default())
3119 .unwrap();
3120 assert!(res.iter().all(|m| m.id != "far"), "deleted point is gone");
3121 }
3122
3123 #[test]
3124 fn ivf_incremental_update_replaces_the_vector() {
3125 let tmp = tempfile::tempdir().unwrap();
3126 let mut db = open(tmp.path());
3127 db.create_collection("c", desc_with(IndexKind::Ivf))
3128 .unwrap();
3129 for i in 0..30u32 {
3130 db.upsert(
3131 "c",
3132 &format!("p{i}"),
3133 &[i as f32, 0.0, 0.0, 0.0],
3134 &json!({}),
3135 )
3136 .unwrap();
3137 }
3138 let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
3139 db.upsert("c", "p5", &[900.0, 0.0, 0.0, 0.0], &json!({}))
3141 .unwrap();
3142 assert!(!db.collections["c"].stale);
3143 let at_new = db
3144 .search("c", &[900.0, 0.0, 0.0, 0.0], &SearchParams::default())
3145 .unwrap();
3146 assert_eq!(at_new[0].id, "p5", "p5 found at its new location");
3147 let at_old = db
3148 .search("c", &[5.0, 0.0, 0.0, 0.0], &SearchParams::default())
3149 .unwrap();
3150 assert!(at_old.iter().all(|m| m.id != "p5"), "stale vector is gone");
3151 }
3152
3153 #[test]
3154 fn ivf_reinsert_after_incremental_delete_is_found() {
3155 let tmp = tempfile::tempdir().unwrap();
3156 let mut db = open(tmp.path());
3157 db.create_collection("c", desc_with(IndexKind::Ivf))
3158 .unwrap();
3159 for i in 0..20u32 {
3160 db.upsert(
3161 "c",
3162 &format!("p{i}"),
3163 &[i as f32, 0.0, 0.0, 0.0],
3164 &json!({}),
3165 )
3166 .unwrap();
3167 }
3168 let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
3169 assert!(db.delete("c", "p3").unwrap());
3170 assert!(!db.collections["c"].stale);
3171 db.upsert("c", "p3", &[3.0, 0.0, 0.0, 0.0], &json!({}))
3173 .unwrap();
3174 assert!(!db.collections["c"].stale);
3175 let res = db
3176 .search("c", &[3.0, 0.0, 0.0, 0.0], &SearchParams::default())
3177 .unwrap();
3178 assert_eq!(res[0].id, "p3");
3179 }
3180
3181 #[test]
3182 fn hnsw_in_place_update_falls_back_to_rebuild() {
3183 let tmp = tempfile::tempdir().unwrap();
3185 let mut db = open(tmp.path());
3186 db.create_collection("c", desc()).unwrap();
3187 for i in 0..10u32 {
3188 db.upsert(
3189 "c",
3190 &format!("p{i}"),
3191 &[i as f32, 0.0, 0.0, 0.0],
3192 &json!({}),
3193 )
3194 .unwrap();
3195 }
3196 let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
3197 assert!(!db.collections["c"].stale);
3198 db.upsert("c", "p2", &[42.0, 0.0, 0.0, 0.0], &json!({}))
3199 .unwrap();
3200 assert!(db.collections["c"].stale, "hnsw update schedules a rebuild");
3201 let res = db
3203 .search("c", &[42.0, 0.0, 0.0, 0.0], &SearchParams::default())
3204 .unwrap();
3205 assert_eq!(res[0].id, "p2");
3206 }
3207
3208 #[test]
3209 fn unsupported_index_configurations_are_rejected() {
3210 let tmp = tempfile::tempdir().unwrap();
3211 let mut db = open(tmp.path());
3212 let dot_vamana =
3214 Descriptor::new(4, Dtype::F32, DistanceMetric::Dot).with_index(IndexSpec {
3215 kind: IndexKind::Vamana,
3216 pq_subspaces: None,
3217 });
3218 assert!(matches!(
3219 db.create_collection("a", dot_vamana),
3220 Err(Error::Unsupported(_))
3221 ));
3222 let dot_disk = Descriptor::new(4, Dtype::F32, DistanceMetric::Dot).with_index(IndexSpec {
3224 kind: IndexKind::DiskVamana,
3225 pq_subspaces: None,
3226 });
3227 assert!(matches!(
3228 db.create_collection("b", dot_disk),
3229 Err(Error::Unsupported(_))
3230 ));
3231 }
3232
3233 #[test]
3234 fn dcpe_collections_require_the_l2_metric() {
3235 let tmp = tempfile::tempdir().unwrap();
3236 let mut db = open(tmp.path());
3237 for metric in [DistanceMetric::Cosine, DistanceMetric::Dot] {
3239 let bad = Descriptor::new(4, Dtype::F32, metric)
3240 .with_vector_encryption(VectorEncryption::Dcpe);
3241 assert!(matches!(
3242 db.create_collection("bad", bad),
3243 Err(Error::Unsupported(_))
3244 ));
3245 }
3246 let good = Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
3248 .with_vector_encryption(VectorEncryption::Dcpe);
3249 db.create_collection("enc", good)
3250 .expect("l2 dcpe collection");
3251 assert_eq!(
3252 db.descriptor("enc").expect("descriptor").vector_encryption,
3253 VectorEncryption::Dcpe
3254 );
3255 }
3256
3257 #[test]
3258 fn client_side_collections_are_fetch_only_and_reject_search() {
3259 let tmp = tempfile::tempdir().unwrap();
3260 let mut db = open(tmp.path());
3261 let desc = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
3264 .with_vector_encryption(VectorEncryption::ClientSide);
3265 db.create_collection("vault", desc)
3266 .expect("create client-side collection");
3267 assert!(matches!(
3269 db.collections["vault"].index,
3270 CollectionIndex::None
3271 ));
3272
3273 for i in 0..5 {
3276 let tier = if i < 2 { "vip" } else { "std" };
3277 db.upsert(
3278 "vault",
3279 &format!("p{i}"),
3280 &[0.0; 4],
3281 &serde_json::json!({ "__quiver_vec__": format!("ct-{i}"), "tier": tier }),
3282 )
3283 .expect("upsert");
3284 }
3285 assert_eq!(db.len("vault").unwrap(), 5);
3286 assert!(matches!(
3288 db.collections["vault"].index,
3289 CollectionIndex::None
3290 ));
3291
3292 assert!(matches!(
3294 db.search("vault", &[0.0; 4], &SearchParams::default()),
3295 Err(Error::Unsupported(_))
3296 ));
3297
3298 let all = db.fetch("vault", None, 100, true, false).unwrap();
3301 assert_eq!(all.len(), 5);
3302 assert!(
3303 all.iter()
3304 .all(|m| m.payload.is_some() && m.vector.is_none())
3305 );
3306
3307 let vip = db
3309 .fetch(
3310 "vault",
3311 Some(&Filter::Eq {
3312 field: "tier".to_owned(),
3313 value: serde_json::json!("vip"),
3314 }),
3315 100,
3316 false,
3317 false,
3318 )
3319 .unwrap();
3320 assert_eq!(vip.len(), 2);
3321 assert_eq!(db.fetch("vault", None, 2, false, false).unwrap().len(), 2);
3323
3324 assert_eq!(db.get("vault", "p0").unwrap().unwrap().id, "p0");
3327 assert!(db.delete("vault", "p0").unwrap());
3328 assert_eq!(db.len("vault").unwrap(), 4);
3329 }
3330
3331 #[test]
3332 fn client_side_encryption_rejects_multivector() {
3333 let tmp = tempfile::tempdir().unwrap();
3334 let mut db = open(tmp.path());
3335 let desc = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
3336 .with_multivector(true)
3337 .with_vector_encryption(VectorEncryption::ClientSide);
3338 assert!(matches!(
3339 db.create_collection("bad", desc),
3340 Err(Error::Unsupported(_))
3341 ));
3342 }
3343
3344 fn contains_file(dir: &Path, name: &str) -> bool {
3346 std::fs::read_dir(dir).is_ok_and(|rd| {
3347 rd.flatten().any(|e| {
3348 let p = e.path();
3349 if p.is_dir() {
3350 contains_file(&p, name)
3351 } else {
3352 p.file_name().is_some_and(|f| f == name)
3353 }
3354 })
3355 })
3356 }
3357
3358 #[test]
3359 fn disk_index_collection_searches_persists_and_writes_an_artifact() {
3360 let tmp = tempfile::tempdir().unwrap();
3361 {
3362 let mut db = open(tmp.path());
3363 db.create_collection("d", desc_with(IndexKind::DiskVamana))
3364 .unwrap();
3365 for i in 0..40u32 {
3366 db.upsert(
3367 "d",
3368 &format!("p{i}"),
3369 &[i as f32, 0.0, 0.0, 0.0],
3370 &json!({}),
3371 )
3372 .unwrap();
3373 }
3374 let res = db
3375 .search("d", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
3376 .unwrap();
3377 assert_eq!(res[0].id, "p7");
3378 db.checkpoint().unwrap();
3379 }
3380 assert!(
3382 contains_file(tmp.path(), "vamana.qvx"),
3383 "disk index file missing"
3384 );
3385 let mut db = open(tmp.path());
3387 assert_eq!(
3388 db.descriptor("d").unwrap().index.kind,
3389 IndexKind::DiskVamana
3390 );
3391 let res = db
3392 .search("d", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
3393 .unwrap();
3394 assert_eq!(res[0].id, "p7");
3395 }
3396
3397 #[test]
3398 fn graph_collections_maintain_writes_incrementally() {
3399 for kind in [IndexKind::Vamana, IndexKind::DiskVamana] {
3403 let tmp = tempfile::tempdir().unwrap();
3404 let mut db = open(tmp.path());
3405 db.create_collection("c", desc_with(kind)).unwrap();
3406 for i in 0..40u32 {
3407 db.upsert(
3408 "c",
3409 &format!("p{i}"),
3410 &[i as f32, 0.0, 0.0, 0.0],
3411 &json!({}),
3412 )
3413 .unwrap();
3414 }
3415 let res = db
3417 .search("c", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
3418 .unwrap();
3419 assert_eq!(res[0].id, "p7", "{kind:?} base nearest");
3420
3421 db.upsert("c", "p7b", &[7.4, 0.0, 0.0, 0.0], &json!({}))
3424 .unwrap();
3425 let res = db
3426 .search("c", &[7.45, 0.0, 0.0, 0.0], &SearchParams::default())
3427 .unwrap();
3428 assert_eq!(res[0].id, "p7b", "{kind:?} delta insert not found");
3429
3430 assert!(db.delete("c", "p7").unwrap());
3432 let res = db
3433 .search("c", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
3434 .unwrap();
3435 assert!(
3436 res.iter().all(|m| m.id != "p7"),
3437 "{kind:?} deleted id returned"
3438 );
3439
3440 db.upsert("c", "p20", &[500.0, 0.0, 0.0, 0.0], &json!({}))
3443 .unwrap();
3444 let res = db
3445 .search("c", &[500.0, 0.0, 0.0, 0.0], &SearchParams::default())
3446 .unwrap();
3447 assert_eq!(res[0].id, "p20", "{kind:?} updated vector not at new spot");
3448 let res = db
3449 .search("c", &[20.0, 0.0, 0.0, 0.0], &SearchParams::default())
3450 .unwrap();
3451 assert_ne!(
3452 res[0].id, "p20",
3453 "{kind:?} stale copy still nearest old spot"
3454 );
3455 }
3456 }
3457
3458 #[test]
3459 fn graph_consolidates_under_heavy_churn() {
3460 let tmp = tempfile::tempdir().unwrap();
3464 let mut db = open(tmp.path());
3465 db.create_collection("c", desc_with(IndexKind::Vamana))
3466 .unwrap();
3467 for i in 0..50u32 {
3468 db.upsert(
3469 "c",
3470 &format!("p{i}"),
3471 &[i as f32, 0.0, 0.0, 0.0],
3472 &json!({}),
3473 )
3474 .unwrap();
3475 }
3476 let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
3477
3478 let deleted: Vec<String> = (0..15u32).map(|i| format!("p{i}")).collect();
3479 for i in 0..15u32 {
3480 assert!(db.delete("c", &format!("p{i}")).unwrap());
3481 db.upsert(
3482 "c",
3483 &format!("q{i}"),
3484 &[1000.0 + i as f32, 0.0, 0.0, 0.0],
3485 &json!({}),
3486 )
3487 .unwrap();
3488 }
3489
3490 let near_origin = db
3491 .search("c", &[5.0, 0.0, 0.0, 0.0], &SearchParams::default())
3492 .unwrap();
3493 assert!(
3494 near_origin.iter().all(|m| !deleted.contains(&m.id)),
3495 "a churned-out id was returned"
3496 );
3497 let near_q = db
3498 .search("c", &[1007.0, 0.0, 0.0, 0.0], &SearchParams::default())
3499 .unwrap();
3500 assert_eq!(near_q[0].id, "q7", "new point not found after churn");
3501
3502 db.checkpoint().unwrap();
3503 drop(db);
3504 let mut db = open(tmp.path());
3505 let near_q = db
3506 .search("c", &[1007.0, 0.0, 0.0, 0.0], &SearchParams::default())
3507 .unwrap();
3508 assert_eq!(near_q[0].id, "q7", "new point lost across reopen");
3509 let near_origin = db
3510 .search("c", &[5.0, 0.0, 0.0, 0.0], &SearchParams::default())
3511 .unwrap();
3512 assert!(
3513 near_origin.iter().all(|m| !deleted.contains(&m.id)),
3514 "a churned-out id resurfaced after reopen"
3515 );
3516 }
3517
3518 #[test]
3519 fn multivector_writes_are_incremental_and_match_a_rebuild() {
3520 let dir = |theta: f32| vec![theta.cos(), theta.sin(), 0.0, 0.0];
3529 let doc = |theta: f32| vec![dir(theta), dir(theta)];
3530 for kind in [
3531 IndexKind::Ivf,
3532 IndexKind::Hnsw,
3533 IndexKind::Vamana,
3534 IndexKind::Colbert,
3535 ] {
3536 let tmp = tempfile::tempdir().unwrap();
3537 let desc = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
3538 .with_multivector(true)
3539 .with_index(IndexSpec {
3540 kind,
3541 pq_subspaces: None,
3542 });
3543 let mut db = open(tmp.path());
3544 db.create_collection("m", desc).unwrap();
3545 for i in 1..=10u32 {
3547 db.upsert_document(
3548 "m",
3549 &format!("d{i}"),
3550 &doc(0.1 * i as f32),
3551 &json!({ "i": i }),
3552 )
3553 .unwrap();
3554 }
3555 let q = vec![dir(0.0)];
3556 let top = |db: &mut Database| {
3557 db.search_multi_vector(
3558 "m",
3559 &q,
3560 &SearchParams {
3561 k: 3,
3562 ..Default::default()
3563 },
3564 )
3565 .unwrap()
3566 .into_iter()
3567 .map(|m| m.id)
3568 .collect::<Vec<_>>()
3569 };
3570 assert_eq!(top(&mut db), vec!["d1", "d2", "d3"], "{kind:?} initial");
3571
3572 assert!(db.delete_document("m", "d1").unwrap());
3574 assert_eq!(
3575 top(&mut db),
3576 vec!["d2", "d3", "d4"],
3577 "{kind:?} after delete"
3578 );
3579
3580 db.upsert_document("m", "d10", &doc(0.0), &json!({ "i": 10 }))
3582 .unwrap();
3583 assert_eq!(top(&mut db)[0], "d10", "{kind:?} after update");
3584
3585 db.upsert_document("m", "d11", &doc(0.05), &json!({ "i": 11 }))
3587 .unwrap();
3588 let r = top(&mut db);
3589 assert_eq!(r[0], "d10", "{kind:?}");
3590 assert_eq!(r[1], "d11", "{kind:?} new doc not ranked");
3591
3592 db.upsert_document("m", "d8", &[dir(0.8)], &json!({ "i": 8 }))
3594 .unwrap();
3595 let d8 = db.get_document("m", "d8", true).unwrap().unwrap();
3596 assert_eq!(d8.vectors.unwrap().len(), 1, "{kind:?} trailing token kept");
3597
3598 let before = top(&mut db);
3600 drop(db);
3601 let mut db = open(tmp.path());
3602 assert_eq!(top(&mut db), before, "{kind:?} incremental != rebuild");
3603 assert!(
3604 db.get_document("m", "d1", false).unwrap().is_none(),
3605 "{kind:?} deleted doc resurfaced"
3606 );
3607 }
3608 }
3609
3610 #[test]
3611 fn colbert_index_requires_multivector() {
3612 let tmp = tempfile::tempdir().unwrap();
3613 let mut db = open(tmp.path());
3614 let single = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine).with_index(IndexSpec {
3617 kind: IndexKind::Colbert,
3618 pq_subspaces: None,
3619 });
3620 assert!(matches!(
3621 db.create_collection("c", single),
3622 Err(Error::Unsupported(_))
3623 ));
3624 let multi = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
3626 .with_multivector(true)
3627 .with_index(IndexSpec {
3628 kind: IndexKind::Colbert,
3629 pq_subspaces: None,
3630 });
3631 assert!(db.create_collection("m", multi).is_ok());
3632 }
3633
3634 fn desc_filterable() -> Descriptor {
3639 Descriptor::new(4, Dtype::F32, DistanceMetric::L2).with_filterable(vec![
3640 FilterableField::keyword("city"),
3641 FilterableField::numeric("n"),
3642 ])
3643 }
3644
3645 fn seed_cities(db: &mut Database) {
3650 const CITIES: [&str; 3] = ["paris", "lyon", "rome"];
3651 db.create_collection("c", desc_filterable()).unwrap();
3652 for i in 0..30u32 {
3653 db.upsert(
3654 "c",
3655 &format!("p{i}"),
3656 &[i as f32, 0.0, 0.0, 0.0],
3657 &json!({"city": CITIES[i as usize % 3], "n": i}),
3658 )
3659 .unwrap();
3660 }
3661 db.checkpoint().unwrap();
3662 }
3663
3664 #[test]
3665 fn hybrid_equality_prefilter_is_exact() {
3666 let tmp = tempfile::tempdir().unwrap();
3667 let mut db = open(tmp.path());
3668 seed_cities(&mut db);
3669 let params = SearchParams {
3670 k: 5,
3671 filter: Some(Filter::Eq {
3672 field: "city".into(),
3673 value: json!("lyon"),
3674 }),
3675 ..SearchParams::default()
3676 };
3677 let res = db.search("c", &[0.0; 4], ¶ms).unwrap();
3678 assert!(!res.is_empty());
3679 assert_eq!(res[0].id, "p1");
3681 for m in &res {
3682 assert_eq!(m.payload.as_ref().unwrap()["city"], json!("lyon"));
3683 }
3684 }
3685
3686 #[test]
3687 fn hybrid_numeric_range_prefilter_is_exact() {
3688 let tmp = tempfile::tempdir().unwrap();
3689 let mut db = open(tmp.path());
3690 seed_cities(&mut db);
3691 let params = SearchParams {
3692 k: 4,
3693 filter: Some(Filter::Gte {
3694 field: "n".into(),
3695 value: json!(10),
3696 }),
3697 ..SearchParams::default()
3698 };
3699 let res = db.search("c", &[0.0; 4], ¶ms).unwrap();
3700 assert_eq!(res[0].id, "p10");
3702 for m in &res {
3703 assert!(m.payload.as_ref().unwrap()["n"].as_u64().unwrap() >= 10);
3704 }
3705 }
3706
3707 #[test]
3708 fn hybrid_unsatisfiable_filter_returns_empty() {
3709 let tmp = tempfile::tempdir().unwrap();
3710 let mut db = open(tmp.path());
3711 seed_cities(&mut db);
3712 let params = SearchParams {
3715 filter: Some(Filter::Eq {
3716 field: "city".into(),
3717 value: json!("atlantis"),
3718 }),
3719 ..SearchParams::default()
3720 };
3721 assert!(db.search("c", &[0.0; 4], ¶ms).unwrap().is_empty());
3722 }
3723
3724 #[test]
3725 fn hybrid_and_or_composition_is_exact() {
3726 let tmp = tempfile::tempdir().unwrap();
3727 let mut db = open(tmp.path());
3728 seed_cities(&mut db);
3729 let params = SearchParams {
3732 k: 10,
3733 filter: Some(Filter::And(vec![
3734 Filter::In {
3735 field: "city".into(),
3736 values: vec![json!("paris"), json!("rome")],
3737 },
3738 Filter::Lt {
3739 field: "n".into(),
3740 value: json!(12),
3741 },
3742 ])),
3743 ..SearchParams::default()
3744 };
3745 let res = db.search("c", &[0.0; 4], ¶ms).unwrap();
3746 assert_eq!(res[0].id, "p0");
3748 for m in &res {
3749 let payload = m.payload.as_ref().unwrap();
3750 let city = payload["city"].as_str().unwrap();
3751 assert!(city == "paris" || city == "rome");
3752 assert!(payload["n"].as_u64().unwrap() < 12);
3753 }
3754 }
3755
3756 #[test]
3757 fn hybrid_rechecks_non_indexable_clause() {
3758 let tmp = tempfile::tempdir().unwrap();
3759 let mut db = open(tmp.path());
3760 seed_cities(&mut db);
3761 let params = SearchParams {
3764 k: 10,
3765 filter: Some(Filter::And(vec![
3766 Filter::Eq {
3767 field: "city".into(),
3768 value: json!("paris"),
3769 },
3770 Filter::Not(Box::new(Filter::Eq {
3771 field: "n".into(),
3772 value: json!(0),
3773 })),
3774 ])),
3775 ..SearchParams::default()
3776 };
3777 let res = db.search("c", &[0.0; 4], ¶ms).unwrap();
3778 assert!(res.iter().all(|m| m.id != "p0"));
3779 assert_eq!(res[0].id, "p3");
3781 for m in &res {
3782 assert_eq!(m.payload.as_ref().unwrap()["city"], json!("paris"));
3783 }
3784 }
3785
3786 #[test]
3787 fn post_filter_fallback_on_undeclared_field_is_correct() {
3788 let tmp = tempfile::tempdir().unwrap();
3789 let mut db = open(tmp.path());
3790 db.create_collection(
3793 "c",
3794 Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
3795 .with_filterable(vec![FilterableField::keyword("city")]),
3796 )
3797 .unwrap();
3798 for i in 0..20u32 {
3799 let tier = if i % 2 == 0 { "gold" } else { "silver" };
3800 db.upsert(
3801 "c",
3802 &format!("p{i}"),
3803 &[i as f32, 0.0, 0.0, 0.0],
3804 &json!({"city": "paris", "tier": tier}),
3805 )
3806 .unwrap();
3807 }
3808 let params = SearchParams {
3809 k: 5,
3810 filter: Some(Filter::Eq {
3811 field: "tier".into(),
3812 value: json!("gold"),
3813 }),
3814 ..SearchParams::default()
3815 };
3816 let res = db.search("c", &[0.0; 4], ¶ms).unwrap();
3817 assert!(!res.is_empty());
3818 for m in &res {
3819 assert_eq!(m.payload.as_ref().unwrap()["tier"], json!("gold"));
3820 }
3821 }
3822
3823 #[test]
3824 fn leaf_predicate_maps_only_indexable_filterable_leaves() {
3825 let fields = vec![
3826 FilterableField::keyword("city"),
3827 FilterableField::numeric("n"),
3828 ];
3829 assert_eq!(
3831 leaf_predicate(
3832 &Filter::Eq {
3833 field: "city".into(),
3834 value: json!("paris")
3835 },
3836 &fields
3837 ),
3838 Some(SecPredicate::Eq {
3839 field: "city".into(),
3840 value: SecValue::Keyword("paris".into())
3841 })
3842 );
3843 assert_eq!(
3845 leaf_predicate(
3846 &Filter::Gte {
3847 field: "n".into(),
3848 value: json!(3)
3849 },
3850 &fields
3851 ),
3852 Some(SecPredicate::Range {
3853 field: "n".into(),
3854 lo: Some(SecValue::Numeric(3.0)),
3855 hi: None,
3856 lo_inclusive: true,
3857 hi_inclusive: false,
3858 })
3859 );
3860 let undeclared = Filter::Eq {
3862 field: "tier".into(),
3863 value: json!("gold"),
3864 };
3865 let mismatch = Filter::Eq {
3866 field: "city".into(),
3867 value: json!(5),
3868 };
3869 let ne = Filter::Ne {
3870 field: "city".into(),
3871 value: json!("x"),
3872 };
3873 let exists = Filter::Exists {
3874 field: "city".into(),
3875 };
3876 assert!(leaf_predicate(&undeclared, &fields).is_none());
3877 assert!(leaf_predicate(&mismatch, &fields).is_none());
3878 assert!(leaf_predicate(&ne, &fields).is_none());
3879 assert!(leaf_predicate(&exists, &fields).is_none());
3880 }
3881
3882 fn ivf_index_dir(root: &Path) -> std::path::PathBuf {
3886 root.join("collections").join("0000000000").join("index")
3887 }
3888
3889 fn idx_snapshot_files(root: &Path) -> Vec<String> {
3890 let mut v: Vec<String> = std::fs::read_dir(ivf_index_dir(root))
3891 .map(|rd| {
3892 rd.filter_map(std::result::Result::ok)
3893 .filter_map(|e| e.file_name().to_str().map(str::to_owned))
3894 .filter(|n| n.starts_with("idx-"))
3895 .collect()
3896 })
3897 .unwrap_or_default();
3898 v.sort();
3899 v
3900 }
3901
3902 fn nearest(db: &mut Database, q: &[f32]) -> Vec<String> {
3903 db.search("c", q, &SearchParams::default())
3904 .unwrap()
3905 .into_iter()
3906 .map(|m| m.id)
3907 .collect()
3908 }
3909
3910 fn seed_ivf(db: &mut Database, n: u32) {
3911 db.create_collection("c", desc_with(IndexKind::Ivf))
3912 .unwrap();
3913 for i in 0..n {
3914 db.upsert(
3915 "c",
3916 &format!("p{i}"),
3917 &[i as f32, 0.0, 0.0, 0.0],
3918 &json!({}),
3919 )
3920 .unwrap();
3921 }
3922 let _ = nearest(db, &[1.0, 0.0, 0.0, 0.0]);
3924 }
3925
3926 #[test]
3927 fn ivf_snapshot_is_written_at_checkpoint() {
3928 let tmp = tempfile::tempdir().unwrap();
3929 let mut db = open(tmp.path());
3930 seed_ivf(&mut db, 40);
3931 db.checkpoint().unwrap();
3932 assert_eq!(idx_snapshot_files(tmp.path()).len(), 1);
3933 }
3934
3935 #[test]
3936 fn ivf_loads_from_snapshot_rather_than_rebuilding() {
3937 let tmp = tempfile::tempdir().unwrap();
3938 {
3939 let mut db = open(tmp.path());
3940 db.create_collection("c", desc_with(IndexKind::Ivf))
3941 .unwrap();
3942 db.upsert("c", "a", &[0.0, 0.0, 0.0, 0.0], &json!({}))
3943 .unwrap();
3944 db.upsert("c", "m", &[1.0, 0.0, 0.0, 0.0], &json!({}))
3945 .unwrap();
3946 let _ = nearest(&mut db, &[0.0, 0.0, 0.0, 0.0]);
3948 db.upsert("c", "z", &[2.0, 0.0, 0.0, 0.0], &json!({}))
3950 .unwrap();
3951 db.upsert("c", "b", &[3.0, 0.0, 0.0, 0.0], &json!({}))
3952 .unwrap();
3953 db.checkpoint().unwrap();
3954 assert_eq!(db.collections["c"].int_to_ext, ["a", "m", "z", "b"]);
3955 }
3956 let db = open(tmp.path());
3957 assert_eq!(
3960 db.collections["c"].int_to_ext,
3961 ["a", "m", "z", "b"],
3962 "index was rebuilt, not loaded from the snapshot"
3963 );
3964 }
3965
3966 #[test]
3967 fn ivf_recovery_replays_post_checkpoint_upserts() {
3968 let tmp = tempfile::tempdir().unwrap();
3969 {
3970 let mut db = open(tmp.path());
3971 seed_ivf(&mut db, 30);
3972 db.checkpoint().unwrap();
3973 db.upsert("c", "far", &[500.0, 0.0, 0.0, 0.0], &json!({}))
3975 .unwrap();
3976 }
3977 let mut db = open(tmp.path());
3978 assert_eq!(nearest(&mut db, &[500.0, 0.0, 0.0, 0.0])[0], "far");
3979 assert_eq!(nearest(&mut db, &[1.0, 0.0, 0.0, 0.0])[0], "p1");
3980 }
3981
3982 #[test]
3983 fn ivf_recovery_replays_post_checkpoint_deletes() {
3984 let tmp = tempfile::tempdir().unwrap();
3985 {
3986 let mut db = open(tmp.path());
3987 seed_ivf(&mut db, 30);
3988 db.checkpoint().unwrap();
3989 assert!(db.delete("c", "p7").unwrap());
3990 }
3991 let mut db = open(tmp.path());
3992 assert!(
3993 nearest(&mut db, &[7.0, 0.0, 0.0, 0.0])
3994 .iter()
3995 .all(|id| id != "p7")
3996 );
3997 assert!(db.get("c", "p7").unwrap().is_none());
3998 assert!(db.get("c", "p6").unwrap().is_some());
3999 }
4000
4001 #[test]
4002 fn ivf_recovery_replays_post_checkpoint_updates() {
4003 let tmp = tempfile::tempdir().unwrap();
4004 {
4005 let mut db = open(tmp.path());
4006 seed_ivf(&mut db, 30);
4007 db.checkpoint().unwrap();
4008 db.upsert("c", "p0", &[999.0, 0.0, 0.0, 0.0], &json!({}))
4010 .unwrap();
4011 }
4012 let mut db = open(tmp.path());
4013 assert_eq!(nearest(&mut db, &[999.0, 0.0, 0.0, 0.0])[0], "p0");
4014 assert_ne!(
4015 nearest(&mut db, &[0.0, 0.0, 0.0, 0.0])[0],
4016 "p0",
4017 "the stale p0 vector survived the update"
4018 );
4019 }
4020
4021 #[test]
4022 fn corrupt_ivf_snapshot_falls_back_to_rebuild() {
4023 let tmp = tempfile::tempdir().unwrap();
4024 {
4025 let mut db = open(tmp.path());
4026 seed_ivf(&mut db, 30);
4027 db.checkpoint().unwrap();
4028 }
4029 let files = idx_snapshot_files(tmp.path());
4031 assert_eq!(files.len(), 1);
4032 std::fs::write(ivf_index_dir(tmp.path()).join(&files[0]), b"corrupt").unwrap();
4033
4034 let mut db = open(tmp.path());
4035 assert_eq!(nearest(&mut db, &[7.0, 0.0, 0.0, 0.0])[0], "p7");
4036 }
4037
4038 fn mv_desc() -> Descriptor {
4041 Descriptor::new(3, Dtype::F32, DistanceMetric::Cosine).with_multivector(true)
4042 }
4043
4044 fn bf_rank(query: &[Vec<f32>], corpus: &[(&str, Vec<Vec<f32>>)]) -> Vec<(String, f32)> {
4047 let mut v: Vec<(String, f32)> = corpus
4048 .iter()
4049 .map(|(id, toks)| ((*id).to_owned(), max_sim(Metric::Cosine, query, toks)))
4050 .collect();
4051 v.sort_by(|a, b| b.1.total_cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
4052 v
4053 }
4054
4055 #[test]
4056 fn multivector_search_ranks_documents_by_maxsim() {
4057 let tmp = tempfile::tempdir().unwrap();
4058 let mut db = open(tmp.path());
4059 db.create_collection("docs", mv_desc()).unwrap();
4060 let corpus: Vec<(&str, Vec<Vec<f32>>)> = vec![
4061 ("d_cat", vec![vec![1.0, 0.0, 0.0], vec![0.0, 1.0, 0.0]]),
4062 ("d_dog", vec![vec![0.0, 1.0, 0.0], vec![0.0, 0.0, 1.0]]),
4063 (
4064 "d_mix",
4065 vec![
4066 vec![1.0, 1.0, 0.0],
4067 vec![0.0, 0.0, 1.0],
4068 vec![1.0, 0.0, 1.0],
4069 ],
4070 ),
4071 ];
4072 for (id, toks) in &corpus {
4073 db.upsert_document("docs", id, toks, &json!({ "id": id }))
4074 .unwrap();
4075 }
4076 assert_eq!(db.document_count("docs").unwrap(), 3);
4077
4078 let query = vec![vec![1.0, 0.0, 0.0], vec![0.0, 0.0, 1.0]];
4079 let params = SearchParams {
4080 k: 3,
4081 with_payload: false,
4082 ..SearchParams::default()
4083 };
4084 let got = db.search_multi_vector("docs", &query, ¶ms).unwrap();
4085 let expected = bf_rank(&query, &corpus);
4086
4087 assert_eq!(got.len(), 3);
4088 for (g, (eid, escore)) in got.iter().zip(expected.iter()) {
4089 assert_eq!(&g.id, eid, "ranking matches brute force");
4090 assert!(
4091 (g.score - escore).abs() < 1e-5,
4092 "{} score {} vs {escore}",
4093 g.id,
4094 g.score
4095 );
4096 }
4097 }
4098
4099 #[test]
4100 fn multivector_search_truncates_to_k() {
4101 let tmp = tempfile::tempdir().unwrap();
4102 let mut db = open(tmp.path());
4103 db.create_collection("docs", mv_desc()).unwrap();
4104 for i in 0..5 {
4105 let v = vec![vec![1.0, i as f32, 0.0]];
4106 db.upsert_document("docs", &format!("d{i}"), &v, &json!({}))
4107 .unwrap();
4108 }
4109 let params = SearchParams {
4110 k: 2,
4111 ..SearchParams::default()
4112 };
4113 let got = db
4114 .search_multi_vector("docs", &[vec![1.0, 0.0, 0.0]], ¶ms)
4115 .unwrap();
4116 assert_eq!(got.len(), 2);
4117 }
4118
4119 #[test]
4120 fn multivector_filter_selects_documents_exactly() {
4121 let tmp = tempfile::tempdir().unwrap();
4122 let mut db = open(tmp.path());
4123 db.create_collection("docs", mv_desc()).unwrap();
4124 db.upsert_document("docs", "a", &[vec![1.0, 0.0, 0.0]], &json!({"lang":"en"}))
4126 .unwrap();
4127 db.upsert_document("docs", "b", &[vec![1.0, 0.0, 0.0]], &json!({"lang":"fr"}))
4128 .unwrap();
4129 let params = SearchParams {
4130 k: 10,
4131 filter: Some(Filter::Eq {
4132 field: "lang".into(),
4133 value: json!("fr"),
4134 }),
4135 ..SearchParams::default()
4136 };
4137 let got = db
4138 .search_multi_vector("docs", &[vec![1.0, 0.0, 0.0]], ¶ms)
4139 .unwrap();
4140 assert_eq!(got.len(), 1);
4141 assert_eq!(got[0].id, "b");
4142 assert_eq!(got[0].payload, Some(json!({"lang":"fr"})));
4143 }
4144
4145 #[test]
4146 fn multivector_reopen_rebuilds_grouping_and_ranking() {
4147 let tmp = tempfile::tempdir().unwrap();
4148 let query = vec![vec![1.0, 0.0, 0.0], vec![0.0, 0.0, 1.0]];
4149 let corpus: Vec<(&str, Vec<Vec<f32>>)> = vec![
4150 ("x", vec![vec![1.0, 0.0, 0.0], vec![0.0, 1.0, 0.0]]),
4151 ("y", vec![vec![0.0, 0.0, 1.0], vec![1.0, 0.0, 1.0]]),
4152 ];
4153 {
4154 let mut db = open(tmp.path());
4155 db.create_collection("docs", mv_desc()).unwrap();
4156 for (id, toks) in &corpus {
4157 db.upsert_document("docs", id, toks, &json!({})).unwrap();
4158 }
4159 db.checkpoint().unwrap();
4160 }
4161 let mut db = open(tmp.path());
4163 assert_eq!(db.document_count("docs").unwrap(), 2);
4164 let params = SearchParams {
4165 k: 2,
4166 ..SearchParams::default()
4167 };
4168 let got = db.search_multi_vector("docs", &query, ¶ms).unwrap();
4169 let expected = bf_rank(&query, &corpus);
4170 assert_eq!(
4171 got.iter().map(|m| m.id.clone()).collect::<Vec<_>>(),
4172 expected
4173 .iter()
4174 .map(|(id, _)| id.clone())
4175 .collect::<Vec<_>>()
4176 );
4177 }
4178
4179 #[test]
4180 fn multivector_delete_document_removes_all_tokens() {
4181 let tmp = tempfile::tempdir().unwrap();
4182 let mut db = open(tmp.path());
4183 db.create_collection("docs", mv_desc()).unwrap();
4184 db.upsert_document(
4185 "docs",
4186 "a",
4187 &[vec![1.0, 0.0, 0.0], vec![0.0, 1.0, 0.0]],
4188 &json!({}),
4189 )
4190 .unwrap();
4191 db.upsert_document("docs", "b", &[vec![0.0, 0.0, 1.0]], &json!({}))
4192 .unwrap();
4193 assert_eq!(db.document_count("docs").unwrap(), 2);
4194 assert_eq!(db.len("docs").unwrap(), 3);
4195
4196 assert!(db.delete_document("docs", "a").unwrap());
4197 assert_eq!(db.document_count("docs").unwrap(), 1);
4198 assert_eq!(db.len("docs").unwrap(), 1);
4199 assert!(db.get_document("docs", "a", false).unwrap().is_none());
4200 let params = SearchParams {
4201 k: 10,
4202 ..SearchParams::default()
4203 };
4204 let got = db
4205 .search_multi_vector("docs", &[vec![1.0, 0.0, 0.0]], ¶ms)
4206 .unwrap();
4207 assert!(got.iter().all(|m| m.id != "a"));
4208 assert!(!db.delete_document("docs", "a").unwrap());
4209 }
4210
4211 #[test]
4212 fn multivector_reupsert_replaces_tokens() {
4213 let tmp = tempfile::tempdir().unwrap();
4214 let mut db = open(tmp.path());
4215 db.create_collection("docs", mv_desc()).unwrap();
4216 db.upsert_document(
4217 "docs",
4218 "a",
4219 &[
4220 vec![1.0, 0.0, 0.0],
4221 vec![0.0, 1.0, 0.0],
4222 vec![0.0, 0.0, 1.0],
4223 ],
4224 &json!({"v":1}),
4225 )
4226 .unwrap();
4227 assert_eq!(db.len("docs").unwrap(), 3);
4228 db.upsert_document("docs", "a", &[vec![0.0, 0.0, 1.0]], &json!({"v":2}))
4230 .unwrap();
4231 assert_eq!(db.document_count("docs").unwrap(), 1);
4232 assert_eq!(db.len("docs").unwrap(), 1);
4233 let doc = db.get_document("docs", "a", true).unwrap().unwrap();
4234 assert_eq!(doc.payload, Some(json!({"v":2})));
4235 assert_eq!(doc.vectors, Some(vec![vec![0.0, 0.0, 1.0]]));
4236 }
4237
4238 #[test]
4239 fn single_and_multi_vector_apis_are_mutually_exclusive() {
4240 let tmp = tempfile::tempdir().unwrap();
4241 let mut db = open(tmp.path());
4242 db.create_collection("mv", mv_desc()).unwrap();
4243 db.create_collection("sv", Descriptor::new(3, Dtype::F32, DistanceMetric::Cosine))
4244 .unwrap();
4245 assert!(matches!(
4247 db.upsert("mv", "a", &[1.0, 0.0, 0.0], &json!({})),
4248 Err(Error::Unsupported(_))
4249 ));
4250 assert!(matches!(
4251 db.search("mv", &[1.0, 0.0, 0.0], &SearchParams::default()),
4252 Err(Error::Unsupported(_))
4253 ));
4254 assert!(matches!(
4256 db.upsert_document("sv", "a", &[vec![1.0, 0.0, 0.0]], &json!({})),
4257 Err(Error::Unsupported(_))
4258 ));
4259 assert!(matches!(
4260 db.search_multi_vector("sv", &[vec![1.0, 0.0, 0.0]], &SearchParams::default()),
4261 Err(Error::Unsupported(_))
4262 ));
4263 assert!(matches!(
4264 db.document_count("sv"),
4265 Err(Error::Unsupported(_))
4266 ));
4267 }
4268
4269 #[test]
4270 fn multivector_rejects_l2_metric_and_bad_documents() {
4271 let tmp = tempfile::tempdir().unwrap();
4272 let mut db = open(tmp.path());
4273 let l2 = Descriptor::new(3, Dtype::F32, DistanceMetric::L2).with_multivector(true);
4274 assert!(matches!(
4275 db.create_collection("bad", l2),
4276 Err(Error::Unsupported(_))
4277 ));
4278
4279 db.create_collection("docs", mv_desc()).unwrap();
4280 assert!(matches!(
4282 db.upsert_document("docs", "a\u{1f}b", &[vec![1.0, 0.0, 0.0]], &json!({})),
4283 Err(Error::Unsupported(_))
4284 ));
4285 assert!(matches!(
4287 db.upsert_document("docs", "a", &[], &json!({})),
4288 Err(Error::Unsupported(_))
4289 ));
4290 assert!(matches!(
4291 db.upsert_document("docs", "a", &[vec![1.0, 0.0]], &json!({})),
4292 Err(Error::Unsupported(_))
4293 ));
4294 }
4295
4296 #[test]
4297 fn snapshot_then_open_reproduces_the_database() {
4298 let src = tempfile::tempdir().unwrap();
4299 let mut db = open(src.path());
4300 db.create_collection("kb", desc()).unwrap();
4301 db.create_collection("kb2", desc()).unwrap();
4302 db.upsert("kb", "a", &[1.0, 0.0, 0.0, 0.0], &json!({ "n": 1 }))
4303 .unwrap();
4304 db.upsert("kb", "b", &[0.0, 1.0, 0.0, 0.0], &json!({ "n": 2 }))
4305 .unwrap();
4306 db.upsert("kb2", "z", &[0.0, 0.0, 1.0, 0.0], &json!({ "n": 3 }))
4307 .unwrap();
4308
4309 let dest = tempfile::tempdir().unwrap();
4310 let snap_dir = dest.path().join("snap");
4311 let info = db.snapshot(&snap_dir).unwrap();
4312 assert!(info.files > 0 && info.bytes > 0);
4313 assert_eq!(info.manifest_version, db.manifest_version());
4314
4315 db.upsert("kb", "late", &[1.0, 1.0, 0.0, 0.0], &json!({ "n": 9 }))
4317 .unwrap();
4318
4319 let restored = open(&snap_dir);
4320 let mut names = restored.collection_names();
4321 names.sort();
4322 assert_eq!(names, vec!["kb".to_owned(), "kb2".to_owned()]);
4323 assert_eq!(restored.len("kb").unwrap(), 2, "no post-snapshot write");
4324 assert_eq!(
4325 restored.get("kb", "a").unwrap().unwrap().payload,
4326 Some(json!({ "n": 1 }))
4327 );
4328 assert_eq!(restored.len("kb2").unwrap(), 1);
4329 assert!(restored.get("kb", "late").unwrap().is_none());
4330 }
4331
4332 #[test]
4333 fn snapshot_refuses_an_existing_destination() {
4334 let src = tempfile::tempdir().unwrap();
4335 let mut db = open(src.path());
4336 let dest = tempfile::tempdir().unwrap(); assert!(matches!(
4338 db.snapshot(dest.path()),
4339 Err(Error::Core(quiver_core::CoreError::AlreadyExists(_)))
4340 ));
4341 }
4342
4343 #[test]
4344 fn restore_snapshot_roundtrips_and_guards() {
4345 let src = tempfile::tempdir().unwrap();
4346 let mut db = open(src.path());
4347 db.create_collection("kb", desc()).unwrap();
4348 db.upsert("kb", "a", &[1.0, 0.0, 0.0, 0.0], &json!({ "n": 1 }))
4349 .unwrap();
4350 let work = tempfile::tempdir().unwrap();
4351 let snap_dir = work.path().join("snap");
4352 db.snapshot(&snap_dir).unwrap();
4353
4354 let restored_dir = work.path().join("restored");
4356 let info = restore_snapshot(&snap_dir, &restored_dir).unwrap();
4357 assert!(info.files > 0);
4358 let restored = open(&restored_dir);
4359 assert_eq!(restored.len("kb").unwrap(), 1);
4360
4361 assert!(matches!(
4363 restore_snapshot(&snap_dir, &restored_dir),
4364 Err(Error::Core(quiver_core::CoreError::AlreadyExists(_)))
4365 ));
4366 let not_snap = work.path().join("not-a-snapshot");
4368 std::fs::create_dir_all(¬_snap).unwrap();
4369 assert!(matches!(
4370 restore_snapshot(¬_snap, &work.path().join("out")),
4371 Err(Error::Core(quiver_core::CoreError::InvalidArgument(_)))
4372 ));
4373 }
4374}