1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
46use std::path::Path;
47use std::sync::Arc;
48
49use arc_swap::ArcSwap;
50use quiver_core::{SecPredicate, SecValue, Store};
51use quiver_index::{
52 ColbertConfig, ColbertIndex, DiskVamana, FreshDiskVamana, FreshVamana, Hnsw, HnswConfig, Index,
53 Ivf, IvfConfig, Metric, Neighbor, ProductQuantizer, Vamana, VamanaConfig, max_sim,
54 ordering_distance, report_metric,
55};
56use serde::{Deserialize, Serialize};
57use serde_json::Value;
58use thiserror::Error;
59
60pub use quiver_core::keyring::{KeyRing, SingleCodecKeyRing};
61pub use quiver_core::page::PageCodec;
62pub use quiver_core::{CollectionId, CommitObserver, WalEntry, WalOp};
63pub use quiver_core::{
64 Descriptor, DistanceMetric, Dtype, FieldType, FilterableField, IndexKind, IndexSpec,
65 VectorEncryption,
66};
67pub use quiver_query::Filter;
68pub use quiver_query::{
69 BM25_B, BM25_K1, DEFAULT_RRF_K0, SPARSE_KEY, SparseInvertedIndex, SparseVector, TEXT_KEY,
70 query_term_ids, rrf_fuse, text_to_sparse,
71};
72
73#[derive(Debug, Error)]
75#[non_exhaustive]
76pub enum Error {
77 #[error(transparent)]
80 Core(#[from] quiver_core::CoreError),
81 #[error(transparent)]
83 Index(#[from] quiver_index::IndexError),
84 #[error(transparent)]
86 Disk(#[from] quiver_index::DiskError),
87 #[error("payload json error: {0}")]
89 Json(#[from] serde_json::Error),
90 #[error("collection not found: {0}")]
92 CollectionNotFound(String),
93 #[error("unsupported configuration: {0}")]
95 Unsupported(&'static str),
96 #[error(transparent)]
99 IndexSnapshot(#[from] quiver_index::SnapshotError),
100 #[error("index snapshot envelope: {0}")]
102 Envelope(#[from] postcard::Error),
103}
104
105pub type Result<T> = std::result::Result<T, Error>;
107
108#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
111pub struct SnapshotInfo {
112 pub manifest_version: u64,
114 pub files: u64,
116 pub bytes: u64,
118}
119
120#[derive(Debug, Clone, PartialEq)]
122pub struct Match {
123 pub id: String,
125 pub score: f32,
127 pub payload: Option<Value>,
129 pub vector: Option<Vec<f32>>,
131}
132
133#[derive(Debug, Clone, PartialEq)]
137pub struct DocumentMatch {
138 pub id: String,
140 pub score: f32,
142 pub payload: Option<Value>,
144 pub vectors: Option<Vec<Vec<f32>>>,
146}
147
148type ScoredDocument = (f32, String, Option<Value>, Option<Vec<Vec<f32>>>);
153
154#[derive(Debug, Clone)]
156pub struct SearchParams {
157 pub k: usize,
159 pub filter: Option<Filter>,
164 pub ef_search: usize,
166 pub with_payload: bool,
168 pub with_vector: bool,
170}
171
172impl Default for SearchParams {
173 fn default() -> Self {
174 Self {
175 k: 10,
176 filter: None,
177 ef_search: 64,
178 with_payload: true,
179 with_vector: false,
180 }
181 }
182}
183
184const FILTER_OVERFETCH: usize = 8;
187
188const RRF_CANDIDATE_FACTOR: usize = 10;
193const MIN_RRF_CANDIDATES: usize = 100;
194
195const FULL_SCAN_THRESHOLD: usize = 10_000;
203
204const HNSW_REBUILD_DELETED_FRACTION: f64 = 0.2;
208
209const GRAPH_REBUILD_PENDING_FRACTION: f64 = 0.2;
215
216enum CollectionIndex {
223 None,
227 Hnsw(Hnsw),
228 Vamana(Option<FreshVamana>),
229 Ivf(Option<Ivf>),
230 Disk(Option<FreshDiskVamana>),
234 Colbert(Option<ColbertIndex>),
238}
239
240impl CollectionIndex {
241 fn search(&self, query: &[f32], k: usize, ef: usize) -> Result<Vec<Neighbor>> {
243 Ok(match self {
244 CollectionIndex::Hnsw(h) => h.search(query, k, ef)?,
245 CollectionIndex::Vamana(Some(g)) => g.search(query, k, ef)?,
246 CollectionIndex::Ivf(Some(i)) => i.search(query, k, ef)?,
247 CollectionIndex::Disk(Some(d)) => d.search(query, k, ef)?,
248 CollectionIndex::Colbert(Some(c)) => c.search(query, k, ef)?,
249 CollectionIndex::None
250 | CollectionIndex::Vamana(None)
251 | CollectionIndex::Ivf(None)
252 | CollectionIndex::Disk(None)
253 | CollectionIndex::Colbert(None) => Vec::new(),
254 })
255 }
256}
257
258pub type SnapshotCell = Arc<ArcSwap<CollectionSnapshot>>;
275
276#[derive(Default, Clone)]
281struct Overlay {
282 upserts: Vec<(Arc<[f32]>, String)>,
285 tombstones: HashSet<u64>,
288}
289
290pub struct CollectionSnapshot {
297 base: Arc<CollectionIndex>,
298 base_int_to_ext: Arc<Vec<String>>,
299 base_len: u64,
300 overlay: Arc<Overlay>,
301 metric: Metric,
302}
303
304impl CollectionSnapshot {
305 fn empty(metric: Metric) -> Self {
308 Self {
309 base: Arc::new(CollectionIndex::None),
310 base_int_to_ext: Arc::new(Vec::new()),
311 base_len: 0,
312 overlay: Arc::new(Overlay::default()),
313 metric,
314 }
315 }
316
317 fn ext_id(&self, internal: u64) -> Option<&str> {
320 if internal < self.base_len {
321 self.base_int_to_ext
322 .get(internal as usize)
323 .map(String::as_str)
324 } else {
325 self.overlay
326 .upserts
327 .get((internal - self.base_len) as usize)
328 .map(|(_, e)| e.as_str())
329 }
330 }
331
332 pub fn search(&self, query: &[f32], k: usize, ef_search: usize) -> Result<Vec<Match>> {
342 if k == 0 {
343 return Ok(Vec::new());
344 }
345 let mut cands: Vec<(f32, u64)> = Vec::new();
348 for n in self.base.search(query, k, ef_search)? {
349 if !self.overlay.tombstones.contains(&n.id) {
350 cands.push((report_metric(self.metric, n.distance), n.id));
354 }
355 }
356 for (j, (vector, _)) in self.overlay.upserts.iter().enumerate() {
357 let internal = self.base_len + j as u64;
358 if !self.overlay.tombstones.contains(&internal) {
359 cands.push((ordering_distance(self.metric, query, vector), internal));
360 }
361 }
362 cands.sort_by(|a, b| a.0.total_cmp(&b.0));
363 cands.truncate(k);
364 let mut out = Vec::with_capacity(cands.len());
365 for (ordering, internal) in cands {
366 if let Some(ext) = self.ext_id(internal) {
367 out.push(Match {
368 id: ext.to_owned(),
369 score: report_metric(self.metric, ordering),
370 payload: None,
371 vector: None,
372 });
373 }
374 }
375 Ok(out)
376 }
377}
378
379fn empty_snapshot(descriptor: &Descriptor) -> SnapshotCell {
383 let metric = to_index_metric(descriptor.metric);
384 Arc::new(ArcSwap::from_pointee(CollectionSnapshot::empty(metric)))
385}
386
387fn mvcc_eligible(descriptor: &Descriptor) -> bool {
394 !descriptor.multivector
395 && descriptor.vector_encryption != VectorEncryption::ClientSide
396 && descriptor.index.kind != IndexKind::DiskVamana
397}
398
399fn mvcc_served(handle: &CollectionHandle) -> bool {
402 handle.mvcc && mvcc_eligible(&handle.descriptor)
403}
404
405fn publish_overlay(handle: &CollectionHandle, prior: &CollectionSnapshot, overlay: Arc<Overlay>) {
408 handle.snapshot.store(Arc::new(CollectionSnapshot {
409 base: prior.base.clone(),
410 base_int_to_ext: prior.base_int_to_ext.clone(),
411 base_len: prior.base_len,
412 overlay,
413 metric: prior.metric,
414 }));
415}
416
417fn publish_base(handle: &mut CollectionHandle) {
421 let base = std::mem::replace(&mut handle.index, empty_index(&handle.descriptor));
422 let metric = to_index_metric(handle.descriptor.metric);
423 handle.snapshot.store(Arc::new(CollectionSnapshot {
424 base: Arc::new(base),
425 base_int_to_ext: Arc::new(handle.int_to_ext.clone()),
426 base_len: handle.int_to_ext.len() as u64,
427 overlay: Arc::new(Overlay::default()),
428 metric,
429 }));
430}
431
432fn overlay_rebuild_threshold(base_len: u64) -> u64 {
436 (base_len / 5).max(1024)
437}
438
439fn overlay_upsert(handle: &mut CollectionHandle, ext_id: &str, vector: &[f32]) {
444 bump_write_gen(handle);
445 let cur = handle.snapshot.load_full();
446 let mut overlay = cur.overlay.as_ref().clone();
447 if let Some(&old) = handle.ext_to_int.get(ext_id) {
449 overlay.tombstones.insert(old);
450 }
451 let internal = cur.base_len + overlay.upserts.len() as u64;
452 overlay.upserts.push((Arc::from(vector), ext_id.to_owned()));
453 handle.ext_to_int.insert(ext_id.to_owned(), internal);
454 handle.int_to_ext.push(ext_id.to_owned());
455 let crowded = overlay.upserts.len() as u64 >= overlay_rebuild_threshold(cur.base_len);
456 publish_overlay(handle, &cur, Arc::new(overlay));
457 if crowded {
460 handle.stale = true;
461 }
462}
463
464fn overlay_delete(handle: &mut CollectionHandle, ext_id: &str) {
467 bump_write_gen(handle);
468 let Some(&internal) = handle.ext_to_int.get(ext_id) else {
469 return;
470 };
471 let cur = handle.snapshot.load_full();
472 let mut overlay = cur.overlay.as_ref().clone();
473 overlay.tombstones.insert(internal);
474 publish_overlay(handle, &cur, Arc::new(overlay));
475}
476
477#[derive(Serialize, Deserialize)]
483struct IndexEnvelope {
484 version: u16,
485 int_to_ext: Vec<String>,
486 ivf: Vec<u8>,
487}
488
489const INDEX_ENVELOPE_VERSION: u16 = 1;
492
493#[derive(Serialize, Deserialize)]
502struct DiskEnvelope {
503 version: u16,
504 int_to_ext: Vec<String>,
505 base_row_count: u64,
506 deleted_ids: Vec<u64>,
507}
508
509struct CollectionHandle {
510 id: CollectionId,
511 descriptor: Descriptor,
512 index: CollectionIndex,
513 int_to_ext: Vec<String>,
514 ext_to_int: HashMap<String, u64>,
515 stale: bool,
516 write_gen: u64,
524 docs: Option<BTreeMap<String, u32>>,
530 sparse: Option<SparseInvertedIndex>,
537 mvcc: bool,
542 snapshot: SnapshotCell,
546}
547
548fn uses_sparse_index(descriptor: &Descriptor) -> bool {
551 !descriptor.multivector && descriptor.vector_encryption != VectorEncryption::ClientSide
552}
553
554fn mark_stale(handle: &mut CollectionHandle) {
558 handle.stale = true;
559 bump_write_gen(handle);
560}
561
562fn bump_write_gen(handle: &mut CollectionHandle) {
571 handle.write_gen = handle.write_gen.wrapping_add(1);
572}
573
574pub struct Database {
576 store: Store,
577 collections: HashMap<String, CollectionHandle>,
578 mvcc: bool,
582}
583
584fn mvcc_from_env() -> bool {
586 std::env::var("QUIVER_MVCC_READS")
587 .map(|v| matches!(v.as_str(), "1" | "true" | "TRUE" | "on" | "yes"))
588 .unwrap_or(false)
589}
590
591impl Database {
592 pub fn open(dir: &Path) -> Result<Self> {
595 Self::from_store(Store::open(dir)?)
596 }
597
598 pub fn open_with_codec(dir: &Path, codec: Box<dyn PageCodec>) -> Result<Self> {
603 Self::from_store(Store::open_with_codec(dir, codec)?)
604 }
605
606 pub fn open_with_keyring(dir: &Path, keyring: Box<dyn KeyRing>) -> Result<Self> {
611 Self::from_store(Store::open_with_keyring(dir, keyring)?)
612 }
613
614 fn from_store(store: Store) -> Result<Self> {
616 let mvcc = mvcc_from_env();
617 let mut collections = HashMap::new();
618 for name in store.collection_names() {
619 let Some(id) = store.collection_id(&name) else {
620 continue;
621 };
622 let Some(descriptor) = store.descriptor(id).cloned() else {
623 continue;
624 };
625 let snapshot = empty_snapshot(&descriptor);
626 let mut handle = CollectionHandle {
627 id,
628 index: empty_index(&descriptor),
629 descriptor,
630 int_to_ext: Vec::new(),
631 ext_to_int: HashMap::new(),
632 stale: true,
633 write_gen: 0,
634 docs: None,
635 sparse: None,
637 mvcc,
638 snapshot,
639 };
640 load_index(&store, &mut handle)?;
641 if mvcc_served(&handle) {
646 handle.stale = true;
647 }
648 collections.insert(name, handle);
649 }
650 Ok(Self {
651 store,
652 collections,
653 mvcc,
654 })
655 }
656
657 pub fn create_collection(&mut self, name: &str, descriptor: Descriptor) -> Result<()> {
660 validate_index(&descriptor)?;
661 let id = self.store.create_collection(name, descriptor.clone())?;
662 let index = empty_index(&descriptor);
663 let docs = descriptor.multivector.then(BTreeMap::new);
664 let sparse = uses_sparse_index(&descriptor).then(SparseInvertedIndex::new);
668 let snapshot = empty_snapshot(&descriptor);
669 let mvcc = self.mvcc;
670 self.collections.insert(
671 name.to_owned(),
672 CollectionHandle {
673 id,
674 descriptor,
675 index,
676 int_to_ext: Vec::new(),
677 ext_to_int: HashMap::new(),
678 stale: false,
679 write_gen: 0,
680 docs,
681 sparse,
682 mvcc,
683 snapshot,
684 },
685 );
686 Ok(())
687 }
688
689 pub fn drop_collection(&mut self, name: &str) -> Result<bool> {
691 let existed = self.store.drop_collection(name)?;
692 self.collections.remove(name);
693 Ok(existed)
694 }
695
696 pub fn shred_collection(&mut self, name: &str) -> Result<bool> {
702 let existed = self.store.shred_collection(name)?;
703 self.collections.remove(name);
704 Ok(existed)
705 }
706
707 pub fn set_commit_observer(&mut self, observer: CommitObserver) {
711 self.store.set_commit_observer(observer);
712 }
713
714 pub fn replication_snapshot(&self) -> Result<Vec<WalOp>> {
720 Ok(self.store.replication_snapshot()?)
721 }
722
723 pub fn apply_replicated(&mut self, op: WalOp) -> Result<()> {
732 let target = match &op {
733 WalOp::CreateCollection { collection_id, .. }
734 | WalOp::DropCollection { collection_id }
735 | WalOp::Upsert { collection_id, .. }
736 | WalOp::Delete { collection_id, .. } => Some(*collection_id),
737 WalOp::Checkpoint { .. } => None,
738 };
739 let create_name = match &op {
740 WalOp::CreateCollection { name, .. } => Some(name.clone()),
741 _ => None,
742 };
743 let is_drop = matches!(op, WalOp::DropCollection { .. });
744 self.store.apply_replicated(op)?;
745
746 if let Some(name) = create_name {
747 if let Some(id) = target
749 && let Some(descriptor) = self.store.descriptor(id).cloned()
750 {
751 let docs = descriptor.multivector.then(BTreeMap::new);
752 let index = empty_index(&descriptor);
753 let snapshot = empty_snapshot(&descriptor);
754 let mvcc = self.mvcc;
755 self.collections.insert(
758 name,
759 CollectionHandle {
760 id,
761 descriptor,
762 index,
763 int_to_ext: Vec::new(),
764 ext_to_int: HashMap::new(),
765 stale: false,
766 write_gen: 0,
767 docs,
768 sparse: None,
769 mvcc,
770 snapshot,
771 },
772 );
773 }
774 } else if is_drop {
775 if let Some(id) = target {
776 self.collections.retain(|_, h| h.id != id);
777 }
778 } else if let Some(id) = target
779 && let Some(handle) = self.collections.values_mut().find(|h| h.id == id)
780 {
781 mark_stale(handle);
782 }
783 Ok(())
784 }
785
786 #[must_use]
788 pub fn collection_names(&self) -> Vec<String> {
789 self.store.collection_names()
790 }
791
792 #[must_use]
794 pub fn descriptor(&self, name: &str) -> Option<&Descriptor> {
795 self.collections.get(name).map(|h| &h.descriptor)
796 }
797
798 pub fn len(&self, name: &str) -> Result<usize> {
800 let handle = self.handle(name)?;
801 Ok(self.store.len(handle.id)?)
802 }
803
804 pub fn is_empty(&self, name: &str) -> Result<bool> {
806 Ok(self.len(name)? == 0)
807 }
808
809 pub fn upsert(
811 &mut self,
812 collection: &str,
813 id: &str,
814 vector: &[f32],
815 payload: &Value,
816 ) -> Result<()> {
817 let handle = self
818 .collections
819 .get_mut(collection)
820 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
821 require_single_vector(handle)?;
822 let payload_bytes = serde_json::to_vec(payload)?;
823 self.store.upsert(handle.id, id, vector, &payload_bytes)?;
824 if handle.descriptor.vector_encryption == VectorEncryption::ClientSide {
828 return Ok(());
829 }
830 index_upsert_point(handle, id, vector)?;
833 sparse_index_upsert_point(handle, id, payload);
835 Ok(())
836 }
837
838 pub fn prepare_create_collection(&self, name: &str, descriptor: &Descriptor) -> Result<WalOp> {
848 validate_index(descriptor)?;
849 Ok(self.store.prepare_create_collection(name, descriptor)?)
850 }
851
852 pub fn prepare_upsert(
859 &self,
860 collection: &str,
861 id: &str,
862 vector: &[f32],
863 payload: &Value,
864 ) -> Result<WalOp> {
865 let handle = self
866 .collections
867 .get(collection)
868 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
869 require_single_vector(handle)?;
870 let payload_bytes = serde_json::to_vec(payload)?;
871 Ok(self
872 .store
873 .prepare_upsert(handle.id, id, vector, &payload_bytes)?)
874 }
875
876 pub fn prepare_delete(&self, collection: &str, id: &str) -> Result<Option<WalOp>> {
882 let handle = self
883 .collections
884 .get(collection)
885 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
886 Ok(self.store.prepare_delete(handle.id, id)?)
887 }
888
889 pub fn upsert_batch(
896 &mut self,
897 collection: &str,
898 points: &[(&str, &[f32], &serde_json::Value)],
899 ) -> Result<u64> {
900 let handle = self
901 .collections
902 .get(collection)
903 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
904 require_single_vector(handle)?;
905 let coll_id = handle.id;
906 let is_client_side = handle.descriptor.vector_encryption == VectorEncryption::ClientSide;
907
908 let payload_bytes: Vec<Vec<u8>> = points
909 .iter()
910 .map(|(_, _, p)| serde_json::to_vec(p).map_err(Error::Json))
911 .collect::<Result<_>>()?;
912
913 let records: Vec<(&str, &[f32], &[u8])> = points
914 .iter()
915 .zip(payload_bytes.iter())
916 .map(|((id, vec, _), p)| (*id, *vec, p.as_slice()))
917 .collect();
918
919 self.store.upsert_batch(coll_id, &records)?;
920
921 if is_client_side {
922 return Ok(records.len() as u64);
923 }
924
925 for (id, vector, payload) in points {
926 let handle = self
927 .collections
928 .get_mut(collection)
929 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
930 index_upsert_point(handle, id, vector)?;
931 sparse_index_upsert_point(handle, id, payload);
932 }
933 Ok(records.len() as u64)
934 }
935
936 pub fn upsert_bulk(
947 &mut self,
948 collection: &str,
949 points: &[(&str, &[f32], &serde_json::Value)],
950 ) -> Result<u64> {
951 let handle = self
952 .collections
953 .get_mut(collection)
954 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
955 require_single_vector(handle)?;
956 let coll_id = handle.id;
957
958 let payload_bytes: Vec<Vec<u8>> = points
959 .iter()
960 .map(|(_, _, p)| serde_json::to_vec(p).map_err(Error::Json))
961 .collect::<Result<_>>()?;
962 let records: Vec<(&str, &[f32], &[u8])> = points
963 .iter()
964 .zip(payload_bytes.iter())
965 .map(|((id, vec, _), p)| (*id, *vec, p.as_slice()))
966 .collect();
967
968 self.store.upsert_batch(coll_id, &records)?;
969
970 let handle = self
974 .collections
975 .get_mut(collection)
976 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
977 mark_stale(handle);
978 Ok(records.len() as u64)
979 }
980
981 pub fn delete(&mut self, collection: &str, id: &str) -> Result<bool> {
983 let handle = self
984 .collections
985 .get_mut(collection)
986 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
987 require_single_vector(handle)?;
988 let existed = self.store.delete(handle.id, id)?;
989 if !existed {
990 return Ok(false);
991 }
992 if handle.descriptor.vector_encryption == VectorEncryption::ClientSide {
995 return Ok(true);
996 }
997 index_delete_point(handle, id);
1003 sparse_index_delete_point(handle, id);
1005 Ok(true)
1006 }
1007
1008 pub fn get(&self, collection: &str, id: &str) -> Result<Option<Match>> {
1010 let handle = self.handle(collection)?;
1011 require_single_vector(handle)?;
1012 match self.store.get(handle.id, id)? {
1013 Some(record) => Ok(Some(Match {
1014 id: id.to_owned(),
1015 score: 0.0,
1016 payload: Some(serde_json::from_slice(&record.payload)?),
1017 vector: Some(record.vector),
1018 })),
1019 None => Ok(None),
1020 }
1021 }
1022
1023 pub fn fetch(
1037 &self,
1038 collection: &str,
1039 filter: Option<&Filter>,
1040 offset: usize,
1041 limit: usize,
1042 with_payload: bool,
1043 with_vector: bool,
1044 ) -> Result<Vec<Match>> {
1045 let handle = self.handle(collection)?;
1046 require_single_vector(handle)?;
1047 let mut out = Vec::new();
1048 let mut skipped = 0usize;
1053 for (id, record) in self.store.scan(handle.id)? {
1054 if out.len() >= limit {
1055 break;
1056 }
1057 let payload: Value = serde_json::from_slice(&record.payload)?;
1058 if let Some(filter) = filter
1059 && !filter.matches(&payload)
1060 {
1061 continue;
1062 }
1063 if skipped < offset {
1064 skipped += 1;
1065 continue;
1066 }
1067 out.push(Match {
1068 id,
1069 score: 0.0,
1070 payload: with_payload.then_some(payload),
1071 vector: with_vector.then_some(record.vector),
1072 });
1073 }
1074 Ok(out)
1075 }
1076
1077 pub fn ensure_indexed(&mut self, collection: &str) -> Result<()> {
1083 if self.handle(collection)?.stale {
1084 let store = &self.store;
1085 let handle = self
1086 .collections
1087 .get_mut(collection)
1088 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
1089 rebuild_index(store, handle)?;
1090 if mvcc_served(handle) {
1094 publish_base(handle);
1095 }
1096 }
1097 Ok(())
1098 }
1099
1100 pub fn set_mvcc_reads(&mut self, on: bool) {
1106 self.mvcc = on;
1107 for handle in self.collections.values_mut() {
1108 handle.mvcc = on;
1109 if mvcc_eligible(&handle.descriptor) {
1113 handle.stale = true;
1114 }
1115 }
1116 }
1117
1118 #[must_use]
1120 pub fn mvcc_reads(&self) -> bool {
1121 self.mvcc
1122 }
1123
1124 pub fn collection_snapshot(&self, collection: &str) -> Result<SnapshotCell> {
1134 Ok(self.handle(collection)?.snapshot.clone())
1135 }
1136
1137 pub fn mvcc_cell(&self, collection: &str) -> Result<Option<SnapshotCell>> {
1147 let handle = self.handle(collection)?;
1148 Ok(mvcc_served(handle).then(|| handle.snapshot.clone()))
1149 }
1150
1151 pub fn needs_rebuild(&self, collection: &str) -> Result<bool> {
1156 Ok(self.handle(collection)?.stale)
1157 }
1158
1159 pub fn snapshot_rebuild_inputs(&self, collection: &str) -> Result<Option<RebuildInputs>> {
1165 let handle = self.handle(collection)?;
1166 if !handle.stale {
1167 return Ok(None);
1168 }
1169 let scan = scan_collection(&self.store, handle)?;
1170 Ok(Some(RebuildInputs {
1171 collection: collection.to_owned(),
1172 descriptor: handle.descriptor.clone(),
1173 scan,
1174 write_gen: handle.write_gen,
1175 }))
1176 }
1177
1178 pub fn commit_rebuild(&mut self, rebuilt: RebuiltIndex) -> Result<bool> {
1185 let store = &self.store;
1186 let Some(handle) = self.collections.get_mut(&rebuilt.collection) else {
1187 return Ok(false);
1188 };
1189 match rebuilt.kind {
1190 RebuiltKind::Ready(index) => handle.index = *index,
1191 RebuiltKind::Disk { graph, pq } => {
1192 handle.index = empty_index(&handle.descriptor);
1196 let disk = write_disk_index(store, handle.id, &graph, &pq)?;
1197 handle.index = CollectionIndex::Disk(Some(FreshDiskVamana::new(disk)?));
1198 }
1199 }
1200 handle.int_to_ext = rebuilt.int_to_ext;
1201 handle.ext_to_int = rebuilt.ext_to_int;
1202 handle.docs = rebuilt.docs;
1203 handle.sparse = rebuilt.sparse;
1204 let still_stale = handle.write_gen != rebuilt.write_gen;
1207 handle.stale = still_stale;
1208 if mvcc_served(handle) {
1215 publish_base(handle);
1216 }
1217 Ok(still_stale)
1218 }
1219
1220 pub fn search(
1223 &mut self,
1224 collection: &str,
1225 query: &[f32],
1226 params: &SearchParams,
1227 ) -> Result<Vec<Match>> {
1228 self.ensure_indexed(collection)?;
1233 self.search_snapshot(collection, query, params)
1234 }
1235
1236 pub fn search_snapshot(
1243 &self,
1244 collection: &str,
1245 query: &[f32],
1246 params: &SearchParams,
1247 ) -> Result<Vec<Match>> {
1248 require_single_vector(self.handle(collection)?)?;
1249 require_server_searchable(self.handle(collection)?)?;
1250
1251 let handle = self.handle(collection)?;
1252
1253 if mvcc_served(handle) {
1259 return self.search_snapshot_mvcc(handle, query, params);
1260 }
1261
1262 if let Some(filter) = ¶ms.filter
1266 && let Some(candidates) = candidate_ids(
1267 &self.store,
1268 handle.id,
1269 filter,
1270 &handle.descriptor.filterable,
1271 )?
1272 && candidates.len() <= FULL_SCAN_THRESHOLD
1273 {
1274 return self.exact_filtered_search(
1275 handle.id,
1276 &handle.descriptor,
1277 query,
1278 params,
1279 filter,
1280 &candidates,
1281 );
1282 }
1283
1284 let fetch = if params.filter.is_some() {
1285 params
1286 .k
1287 .saturating_mul(FILTER_OVERFETCH)
1288 .max(params.ef_search)
1289 } else {
1290 params.k
1291 };
1292 let raw = handle.index.search(query, fetch, params.ef_search)?;
1293
1294 let need_record = params.filter.is_some() || params.with_payload || params.with_vector;
1295 let mut out = Vec::with_capacity(params.k);
1296 for neighbor in raw {
1297 if out.len() >= params.k {
1298 break;
1299 }
1300 let Some(ext_id) = handle.int_to_ext.get(neighbor.id as usize) else {
1301 continue;
1302 };
1303 let record = if need_record {
1304 self.store.get(handle.id, ext_id)?
1305 } else {
1306 None
1307 };
1308 let payload_value: Option<Value> = match &record {
1309 Some(r) if params.filter.is_some() || params.with_payload => {
1310 Some(serde_json::from_slice(&r.payload)?)
1311 }
1312 _ => None,
1313 };
1314 if let Some(filter) = ¶ms.filter {
1315 let value = payload_value.as_ref().unwrap_or(&Value::Null);
1316 if !filter.matches(value) {
1317 continue;
1318 }
1319 }
1320 out.push(Match {
1321 id: ext_id.clone(),
1322 score: neighbor.distance,
1323 payload: if params.with_payload {
1324 payload_value
1325 } else {
1326 None
1327 },
1328 vector: if params.with_vector {
1329 record.map(|r| r.vector)
1330 } else {
1331 None
1332 },
1333 });
1334 }
1335 Ok(out)
1336 }
1337
1338 fn search_snapshot_mvcc(
1344 &self,
1345 handle: &CollectionHandle,
1346 query: &[f32],
1347 params: &SearchParams,
1348 ) -> Result<Vec<Match>> {
1349 if let Some(filter) = ¶ms.filter
1352 && let Some(candidates) = candidate_ids(
1353 &self.store,
1354 handle.id,
1355 filter,
1356 &handle.descriptor.filterable,
1357 )?
1358 && candidates.len() <= FULL_SCAN_THRESHOLD
1359 {
1360 return self.exact_filtered_search(
1361 handle.id,
1362 &handle.descriptor,
1363 query,
1364 params,
1365 filter,
1366 &candidates,
1367 );
1368 }
1369
1370 let fetch = if params.filter.is_some() {
1374 params
1375 .k
1376 .saturating_mul(FILTER_OVERFETCH)
1377 .max(params.ef_search)
1378 } else {
1379 params.k
1380 };
1381 let dense = handle
1382 .snapshot
1383 .load()
1384 .search(query, fetch, params.ef_search)?;
1385 let need_record = params.filter.is_some() || params.with_payload || params.with_vector;
1386 let mut out = Vec::with_capacity(params.k);
1387 for m in dense {
1388 if out.len() >= params.k {
1389 break;
1390 }
1391 let record = if need_record {
1392 self.store.get(handle.id, &m.id)?
1393 } else {
1394 None
1395 };
1396 let payload_value: Option<Value> = match &record {
1397 Some(r) if params.filter.is_some() || params.with_payload => {
1398 Some(serde_json::from_slice(&r.payload)?)
1399 }
1400 _ => None,
1401 };
1402 if let Some(filter) = ¶ms.filter
1403 && !filter.matches(payload_value.as_ref().unwrap_or(&Value::Null))
1404 {
1405 continue;
1406 }
1407 out.push(Match {
1408 id: m.id,
1409 score: m.score,
1410 payload: if params.with_payload {
1411 payload_value
1412 } else {
1413 None
1414 },
1415 vector: if params.with_vector {
1416 record.map(|r| r.vector)
1417 } else {
1418 None
1419 },
1420 });
1421 }
1422 Ok(out)
1423 }
1424
1425 pub fn hybrid_search(
1433 &mut self,
1434 collection: &str,
1435 dense_query: Option<&[f32]>,
1436 sparse_query: Option<&SparseVector>,
1437 text_query: Option<&str>,
1438 params: &SearchParams,
1439 rrf_k0: f32,
1440 ) -> Result<Vec<Match>> {
1441 self.ensure_indexed(collection)?;
1445 self.hybrid_search_snapshot(
1446 collection,
1447 dense_query,
1448 sparse_query,
1449 text_query,
1450 params,
1451 rrf_k0,
1452 )
1453 }
1454
1455 pub fn hybrid_search_snapshot(
1460 &self,
1461 collection: &str,
1462 dense_query: Option<&[f32]>,
1463 sparse_query: Option<&SparseVector>,
1464 text_query: Option<&str>,
1465 params: &SearchParams,
1466 rrf_k0: f32,
1467 ) -> Result<Vec<Match>> {
1468 require_single_vector(self.handle(collection)?)?;
1469 require_server_searchable(self.handle(collection)?)?;
1470 if dense_query.is_none() && sparse_query.is_none() && text_query.is_none() {
1471 return Err(Error::Unsupported(
1472 "hybrid_search requires a dense query, a sparse query, or a text query",
1473 ));
1474 }
1475 let handle = self.handle(collection)?;
1476
1477 let depth = params
1480 .k
1481 .saturating_mul(RRF_CANDIDATE_FACTOR)
1482 .max(MIN_RRF_CANDIDATES);
1483 let filter = params.filter.as_ref();
1484 let mut lists: Vec<Vec<String>> = Vec::new();
1485 if let Some(q) = dense_query {
1486 lists.push(self.dense_ranked_ids(handle, q, depth, params.ef_search, filter)?);
1487 }
1488 if let Some(sp) = sparse_query {
1489 lists.push(self.sparse_ranked_ids(handle, sp, depth, filter)?);
1490 }
1491 if let Some(text) = text_query {
1492 lists.push(self.bm25_ranked_ids(handle, text, depth, filter)?);
1493 }
1494 let fused = rrf_fuse(&lists, rrf_k0, params.k);
1495
1496 let mut out = Vec::with_capacity(fused.len());
1497 for (ext_id, score) in fused {
1498 let record = if params.with_payload || params.with_vector {
1499 self.store.get(handle.id, &ext_id)?
1500 } else {
1501 None
1502 };
1503 let payload = match (&record, params.with_payload) {
1504 (Some(r), true) => Some(serde_json::from_slice(&r.payload)?),
1505 _ => None,
1506 };
1507 out.push(Match {
1508 id: ext_id,
1509 score,
1510 payload,
1511 vector: if params.with_vector {
1512 record.map(|r| r.vector)
1513 } else {
1514 None
1515 },
1516 });
1517 }
1518 Ok(out)
1519 }
1520
1521 fn dense_ranked_ids(
1524 &self,
1525 handle: &CollectionHandle,
1526 query: &[f32],
1527 depth: usize,
1528 ef_search: usize,
1529 filter: Option<&Filter>,
1530 ) -> Result<Vec<String>> {
1531 let mut ids = Vec::new();
1532 if mvcc_served(handle) {
1536 for m in handle
1537 .snapshot
1538 .load()
1539 .search(query, depth, ef_search.max(depth))?
1540 {
1541 if !self.passes_filter(handle.id, &m.id, filter)? {
1542 continue;
1543 }
1544 ids.push(m.id);
1545 if ids.len() >= depth {
1546 break;
1547 }
1548 }
1549 return Ok(ids);
1550 }
1551 let raw = handle.index.search(query, depth, ef_search.max(depth))?;
1552 for neighbor in raw {
1553 let Some(ext_id) = handle.int_to_ext.get(neighbor.id as usize) else {
1554 continue;
1555 };
1556 if !self.passes_filter(handle.id, ext_id, filter)? {
1557 continue;
1558 }
1559 ids.push(ext_id.clone());
1560 if ids.len() >= depth {
1561 break;
1562 }
1563 }
1564 Ok(ids)
1565 }
1566
1567 fn sparse_ranked_ids(
1574 &self,
1575 handle: &CollectionHandle,
1576 query: &SparseVector,
1577 depth: usize,
1578 filter: Option<&Filter>,
1579 ) -> Result<Vec<String>> {
1580 if let Some(idx) = handle.sparse.as_ref() {
1581 let mut ids = Vec::new();
1582 for (ext_id, _score) in idx.search(query) {
1583 if !self.passes_filter(handle.id, &ext_id, filter)? {
1584 continue;
1585 }
1586 ids.push(ext_id);
1587 if ids.len() >= depth {
1588 break;
1589 }
1590 }
1591 return Ok(ids);
1592 }
1593 self.sparse_ranked_ids_by_scan(handle.id, query, depth, filter)
1594 }
1595
1596 fn sparse_ranked_ids_by_scan(
1600 &self,
1601 cid: CollectionId,
1602 query: &SparseVector,
1603 depth: usize,
1604 filter: Option<&Filter>,
1605 ) -> Result<Vec<String>> {
1606 let qmap: HashMap<u32, f32> = query
1607 .indices
1608 .iter()
1609 .copied()
1610 .zip(query.values.iter().copied())
1611 .collect();
1612 let mut scored: Vec<(f32, String)> = Vec::new();
1613 for (ext_id, record) in self.store.scan(cid)? {
1614 if record.payload.is_empty() {
1615 continue;
1616 }
1617 let Ok(value) = serde_json::from_slice::<Value>(&record.payload) else {
1618 continue;
1619 };
1620 if let Some(filter) = filter
1621 && !filter.matches(&value)
1622 {
1623 continue;
1624 }
1625 let Some(raw) = value.get(SPARSE_KEY) else {
1626 continue;
1627 };
1628 let Ok(sv) = serde_json::from_value::<SparseVector>(raw.clone()) else {
1629 continue;
1630 };
1631 let mut score = 0.0f32;
1632 for (dim, weight) in sv.indices.iter().zip(sv.values.iter()) {
1633 if let Some(qw) = qmap.get(dim) {
1634 score += qw * weight;
1635 }
1636 }
1637 if score > 0.0 {
1638 scored.push((score, ext_id));
1639 }
1640 }
1641 scored.sort_by(|a, b| b.0.total_cmp(&a.0).then(a.1.cmp(&b.1)));
1642 Ok(scored.into_iter().take(depth).map(|(_, id)| id).collect())
1643 }
1644
1645 fn bm25_ranked_ids(
1653 &self,
1654 handle: &CollectionHandle,
1655 query_text: &str,
1656 depth: usize,
1657 filter: Option<&Filter>,
1658 ) -> Result<Vec<String>> {
1659 let Some(idx) = handle.sparse.as_ref() else {
1660 return Ok(Vec::new());
1661 };
1662 let terms = query_term_ids(query_text);
1663 let mut ids = Vec::new();
1664 for (ext_id, _score) in idx.bm25_search(&terms, BM25_K1, BM25_B) {
1665 if !self.passes_filter(handle.id, &ext_id, filter)? {
1666 continue;
1667 }
1668 ids.push(ext_id);
1669 if ids.len() >= depth {
1670 break;
1671 }
1672 }
1673 Ok(ids)
1674 }
1675
1676 fn passes_filter(
1679 &self,
1680 cid: CollectionId,
1681 ext_id: &str,
1682 filter: Option<&Filter>,
1683 ) -> Result<bool> {
1684 let Some(filter) = filter else {
1685 return Ok(true);
1686 };
1687 let value: Value = match self.store.get(cid, ext_id)? {
1688 Some(r) => serde_json::from_slice(&r.payload)?,
1689 None => Value::Null,
1690 };
1691 Ok(filter.matches(&value))
1692 }
1693
1694 fn exact_filtered_search(
1699 &self,
1700 cid: CollectionId,
1701 descriptor: &Descriptor,
1702 query: &[f32],
1703 params: &SearchParams,
1704 filter: &Filter,
1705 candidates: &BTreeSet<String>,
1706 ) -> Result<Vec<Match>> {
1707 let metric = to_index_metric(descriptor.metric);
1708 let mut scored: Vec<(f32, String, Value, Vec<f32>)> = Vec::new();
1709 for ext_id in candidates {
1710 let Some(record) = self.store.get(cid, ext_id)? else {
1711 continue;
1712 };
1713 let payload: Value = serde_json::from_slice(&record.payload)?;
1714 if !filter.matches(&payload) {
1715 continue;
1716 }
1717 let ordering = ordering_distance(metric, query, &record.vector);
1718 scored.push((ordering, ext_id.clone(), payload, record.vector));
1719 }
1720 scored.sort_by(|a, b| a.0.total_cmp(&b.0));
1721 scored.truncate(params.k);
1722 Ok(scored
1723 .into_iter()
1724 .map(|(ordering, id, payload, vector)| Match {
1725 id,
1726 score: report_metric(metric, ordering),
1727 payload: params.with_payload.then_some(payload),
1728 vector: params.with_vector.then_some(vector),
1729 })
1730 .collect())
1731 }
1732
1733 pub fn upsert_document(
1742 &mut self,
1743 collection: &str,
1744 doc_id: &str,
1745 vectors: &[Vec<f32>],
1746 payload: &Value,
1747 ) -> Result<()> {
1748 let handle = self
1749 .collections
1750 .get_mut(collection)
1751 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
1752 require_multivector(handle)?;
1753 if doc_id.contains(DOC_TOKEN_SEP) {
1754 return Err(Error::Unsupported(
1755 "document id must not contain the reserved 0x1f separator",
1756 ));
1757 }
1758 if vectors.is_empty() {
1759 return Err(Error::Unsupported("a document needs at least one vector"));
1760 }
1761 let dim = handle.descriptor.dim as usize;
1762 if vectors.iter().any(|v| v.len() != dim) {
1763 return Err(Error::Unsupported(
1764 "every document vector must match the collection dimensionality",
1765 ));
1766 }
1767 let previous = handle
1770 .docs
1771 .as_ref()
1772 .and_then(|d| d.get(doc_id))
1773 .copied()
1774 .unwrap_or(0) as usize;
1775 for j in vectors.len()..previous {
1776 self.store.delete(handle.id, &token_id(doc_id, j))?;
1777 index_delete_point(handle, &token_id(doc_id, j));
1779 }
1780 let payload_bytes = serde_json::to_vec(payload)?;
1781 for (j, vector) in vectors.iter().enumerate() {
1782 let bytes: &[u8] = if j == 0 {
1784 payload_bytes.as_slice()
1785 } else {
1786 &[]
1787 };
1788 self.store
1789 .upsert(handle.id, &token_id(doc_id, j), vector, bytes)?;
1790 index_upsert_point(handle, &token_id(doc_id, j), vector)?;
1794 }
1795 if let Some(docs) = handle.docs.as_mut() {
1796 docs.insert(doc_id.to_owned(), vectors.len() as u32);
1797 }
1798 Ok(())
1799 }
1800
1801 pub fn search_multi_vector(
1809 &mut self,
1810 collection: &str,
1811 query_tokens: &[Vec<f32>],
1812 params: &SearchParams,
1813 ) -> Result<Vec<DocumentMatch>> {
1814 self.ensure_indexed(collection)?;
1818 self.search_multi_vector_snapshot(collection, query_tokens, params)
1819 }
1820
1821 pub fn search_multi_vector_snapshot(
1828 &self,
1829 collection: &str,
1830 query_tokens: &[Vec<f32>],
1831 params: &SearchParams,
1832 ) -> Result<Vec<DocumentMatch>> {
1833 require_multivector(self.handle(collection)?)?;
1834 let dim = self.handle(collection)?.descriptor.dim as usize;
1835 if query_tokens.is_empty() {
1836 return Ok(Vec::new());
1837 }
1838 if query_tokens.iter().any(|v| v.len() != dim) {
1839 return Err(Error::Unsupported(
1840 "every query token must match the collection dimensionality",
1841 ));
1842 }
1843
1844 let doc_count = self
1845 .handle(collection)?
1846 .docs
1847 .as_ref()
1848 .map_or(0, BTreeMap::len);
1849 let candidates: Vec<String> = if doc_count <= MULTIVECTOR_EXACT_DOC_THRESHOLD {
1850 self.handle(collection)?
1852 .docs
1853 .as_ref()
1854 .map(|d| d.keys().cloned().collect())
1855 .unwrap_or_default()
1856 } else {
1857 let handle = self.handle(collection)?;
1861 let per_token_k = params
1862 .k
1863 .saturating_mul(MULTIVECTOR_CANDIDATE_FACTOR)
1864 .max(params.ef_search);
1865 let mut set = BTreeSet::new();
1866 for token in query_tokens {
1867 for neighbor in handle.index.search(token, per_token_k, params.ef_search)? {
1868 if let Some(ext) = handle.int_to_ext.get(neighbor.id as usize)
1869 && let Some((doc, _)) = parse_token_id(ext)
1870 {
1871 set.insert(doc.to_owned());
1872 }
1873 }
1874 }
1875 set.into_iter().collect()
1876 };
1877
1878 let handle = self.handle(collection)?;
1880 let cid = handle.id;
1881 let metric = to_index_metric(handle.descriptor.metric);
1882 let mut scored: Vec<ScoredDocument> = Vec::new();
1883 for doc in &candidates {
1884 let count = handle
1885 .docs
1886 .as_ref()
1887 .and_then(|d| d.get(doc))
1888 .copied()
1889 .unwrap_or(0) as usize;
1890 let (tokens, payload) = self.gather_document(cid, doc, count)?;
1891 if tokens.is_empty() {
1892 continue;
1893 }
1894 if let Some(filter) = ¶ms.filter {
1895 let value = payload.clone().unwrap_or(Value::Null);
1896 if !filter.matches(&value) {
1897 continue;
1898 }
1899 }
1900 let score = max_sim(metric, query_tokens, &tokens);
1901 let vectors = params.with_vector.then_some(tokens);
1902 scored.push((score, doc.clone(), payload, vectors));
1903 }
1904 scored.sort_by(|a, b| b.0.total_cmp(&a.0).then_with(|| a.1.cmp(&b.1)));
1906 scored.truncate(params.k);
1907 Ok(scored
1908 .into_iter()
1909 .map(|(score, id, payload, vectors)| DocumentMatch {
1910 id,
1911 score,
1912 payload: params.with_payload.then_some(payload).flatten(),
1913 vectors,
1914 })
1915 .collect())
1916 }
1917
1918 pub fn get_document(
1921 &self,
1922 collection: &str,
1923 doc_id: &str,
1924 with_vectors: bool,
1925 ) -> Result<Option<DocumentMatch>> {
1926 let handle = self.handle(collection)?;
1927 require_multivector(handle)?;
1928 let Some(&count) = handle.docs.as_ref().and_then(|d| d.get(doc_id)) else {
1929 return Ok(None);
1930 };
1931 let (tokens, payload) = self.gather_document(handle.id, doc_id, count as usize)?;
1932 if tokens.is_empty() {
1933 return Ok(None);
1934 }
1935 Ok(Some(DocumentMatch {
1936 id: doc_id.to_owned(),
1937 score: 0.0,
1938 payload,
1939 vectors: with_vectors.then_some(tokens),
1940 }))
1941 }
1942
1943 pub fn delete_document(&mut self, collection: &str, doc_id: &str) -> Result<bool> {
1946 let handle = self
1947 .collections
1948 .get_mut(collection)
1949 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
1950 require_multivector(handle)?;
1951 let Some(count) = handle.docs.as_ref().and_then(|d| d.get(doc_id)).copied() else {
1952 return Ok(false);
1953 };
1954 for j in 0..count as usize {
1955 self.store.delete(handle.id, &token_id(doc_id, j))?;
1956 index_delete_point(handle, &token_id(doc_id, j));
1958 }
1959 if let Some(docs) = handle.docs.as_mut() {
1960 docs.remove(doc_id);
1961 }
1962 Ok(true)
1963 }
1964
1965 pub fn document_count(&self, collection: &str) -> Result<usize> {
1968 let handle = self.handle(collection)?;
1969 require_multivector(handle)?;
1970 Ok(handle.docs.as_ref().map_or(0, BTreeMap::len))
1971 }
1972
1973 fn gather_document(
1977 &self,
1978 cid: CollectionId,
1979 doc_id: &str,
1980 count: usize,
1981 ) -> Result<(Vec<Vec<f32>>, Option<Value>)> {
1982 let mut tokens = Vec::with_capacity(count);
1983 let mut payload: Option<Value> = None;
1984 for j in 0..count {
1985 let Some(record) = self.store.get(cid, &token_id(doc_id, j))? else {
1986 continue;
1987 };
1988 if j == 0 && !record.payload.is_empty() {
1989 payload = Some(serde_json::from_slice(&record.payload)?);
1990 }
1991 tokens.push(record.vector);
1992 }
1993 Ok((tokens, payload))
1994 }
1995
1996 pub fn checkpoint(&mut self) -> Result<()> {
2001 let mut snapshots: HashMap<CollectionId, Vec<u8>> = HashMap::new();
2002 for handle in self.collections.values() {
2003 if handle.stale {
2004 continue;
2005 }
2006 if let CollectionIndex::Ivf(Some(ivf)) = &handle.index {
2007 if ivf.is_empty() {
2008 continue;
2009 }
2010 let envelope = IndexEnvelope {
2011 version: INDEX_ENVELOPE_VERSION,
2012 int_to_ext: handle.int_to_ext.clone(),
2013 ivf: ivf.snapshot()?,
2014 };
2015 snapshots.insert(handle.id, postcard::to_allocvec(&envelope)?);
2016 } else if let CollectionIndex::Disk(Some(fresh)) = &handle.index {
2017 let envelope = DiskEnvelope {
2023 version: INDEX_ENVELOPE_VERSION,
2024 int_to_ext: handle.int_to_ext.clone(),
2025 base_row_count: fresh.base_len() as u64,
2026 deleted_ids: fresh.deleted_ids(),
2027 };
2028 snapshots.insert(handle.id, postcard::to_allocvec(&envelope)?);
2029 }
2030 }
2031 self.store.checkpoint_with_index_snapshots(&snapshots)?;
2032 Ok(())
2033 }
2034
2035 pub fn compact(&mut self) -> Result<()> {
2039 Ok(self.store.compact()?)
2040 }
2041
2042 #[must_use]
2045 pub fn manifest_version(&self) -> u64 {
2046 self.store.manifest_version()
2047 }
2048
2049 #[must_use]
2052 pub fn disk_usage_bytes(&self) -> u64 {
2053 dir_size(self.store.dir())
2054 }
2055
2056 pub fn snapshot(&mut self, dest: &Path) -> Result<SnapshotInfo> {
2068 if dest.exists() {
2069 return Err(Error::Core(quiver_core::CoreError::AlreadyExists(
2070 dest.display().to_string(),
2071 )));
2072 }
2073 self.checkpoint()?;
2076 let (files, bytes) = copy_tree(self.store.dir(), dest)?;
2077 let _ = std::fs::File::open(dest).and_then(|f| f.sync_all());
2081 Ok(SnapshotInfo {
2082 manifest_version: self.store.manifest_version(),
2083 files,
2084 bytes,
2085 })
2086 }
2087
2088 fn handle(&self, name: &str) -> Result<&CollectionHandle> {
2089 self.collections
2090 .get(name)
2091 .ok_or_else(|| Error::CollectionNotFound(name.to_owned()))
2092 }
2093}
2094
2095pub fn restore_snapshot(src: &Path, dest: &Path) -> Result<SnapshotInfo> {
2108 if dest.exists() {
2109 return Err(Error::Core(quiver_core::CoreError::AlreadyExists(
2110 dest.display().to_string(),
2111 )));
2112 }
2113 if !src.join("CURRENT").exists() {
2114 return Err(Error::Core(quiver_core::CoreError::InvalidArgument(
2115 format!("{} is not a snapshot (no CURRENT)", src.display()),
2116 )));
2117 }
2118 let (files, bytes) = copy_tree(src, dest)?;
2119 Ok(SnapshotInfo {
2120 manifest_version: 0,
2124 files,
2125 bytes,
2126 })
2127}
2128
2129fn copy_tree(src: &Path, dst: &Path) -> Result<(u64, u64)> {
2133 std::fs::create_dir_all(dst).map_err(|e| quiver_core::CoreError::io(dst, e))?;
2134 let mut files = 0u64;
2135 let mut bytes = 0u64;
2136 for entry in std::fs::read_dir(src).map_err(|e| quiver_core::CoreError::io(src, e))? {
2137 let entry = entry.map_err(|e| quiver_core::CoreError::io(src, e))?;
2138 let from = entry.path();
2139 let to = dst.join(entry.file_name());
2140 let ft = entry
2141 .file_type()
2142 .map_err(|e| quiver_core::CoreError::io(&from, e))?;
2143 if ft.is_dir() {
2144 let (f, b) = copy_tree(&from, &to)?;
2145 files += f;
2146 bytes += b;
2147 } else {
2148 let n = std::fs::copy(&from, &to).map_err(|e| quiver_core::CoreError::io(&from, e))?;
2149 files += 1;
2150 bytes += n;
2151 }
2152 }
2153 Ok((files, bytes))
2154}
2155
2156fn dir_size(dir: &Path) -> u64 {
2159 let mut total = 0u64;
2160 let Ok(rd) = std::fs::read_dir(dir) else {
2161 return total;
2162 };
2163 for entry in rd.flatten() {
2164 let Ok(ft) = entry.file_type() else { continue };
2165 if ft.is_dir() {
2166 total += dir_size(&entry.path());
2167 } else if let Ok(meta) = entry.metadata() {
2168 total += meta.len();
2169 }
2170 }
2171 total
2172}
2173
2174const DOC_TOKEN_SEP: char = '\u{1f}';
2178
2179const MULTIVECTOR_EXACT_DOC_THRESHOLD: usize = 10_000;
2183
2184const MULTIVECTOR_CANDIDATE_FACTOR: usize = 4;
2187
2188fn token_id(doc_id: &str, ordinal: usize) -> String {
2190 format!("{doc_id}{DOC_TOKEN_SEP}{ordinal}")
2191}
2192
2193fn parse_token_id(ext: &str) -> Option<(&str, u32)> {
2197 let (doc, ordinal) = ext.rsplit_once(DOC_TOKEN_SEP)?;
2198 Some((doc, ordinal.parse().ok()?))
2199}
2200
2201fn require_single_vector(handle: &CollectionHandle) -> Result<()> {
2203 if handle.descriptor.multivector {
2204 Err(Error::Unsupported(
2205 "collection is multi-vector; use upsert_document / search_multi_vector",
2206 ))
2207 } else {
2208 Ok(())
2209 }
2210}
2211
2212fn require_multivector(handle: &CollectionHandle) -> Result<()> {
2214 if handle.descriptor.multivector {
2215 Ok(())
2216 } else {
2217 Err(Error::Unsupported(
2218 "collection is single-vector; use upsert / search",
2219 ))
2220 }
2221}
2222
2223fn require_server_searchable(handle: &CollectionHandle) -> Result<()> {
2227 if handle.descriptor.vector_encryption == VectorEncryption::ClientSide {
2228 Err(Error::Unsupported(
2229 "collection is client-side encrypted; the server cannot rank opaque vectors — \
2230 fetch points and rank client-side",
2231 ))
2232 } else {
2233 Ok(())
2234 }
2235}
2236
2237fn to_index_metric(metric: DistanceMetric) -> Metric {
2238 match metric {
2239 DistanceMetric::Dot => Metric::Dot,
2240 DistanceMetric::Cosine => Metric::Cosine,
2241 DistanceMetric::L2 => Metric::L2,
2242 }
2243}
2244
2245fn validate_index(descriptor: &Descriptor) -> Result<()> {
2247 if descriptor.multivector && descriptor.metric == DistanceMetric::L2 {
2250 return Err(Error::Unsupported(
2251 "multi-vector collections require a similarity metric (cosine or dot)",
2252 ));
2253 }
2254 if descriptor.vector_encryption == VectorEncryption::ClientSide {
2258 if descriptor.multivector {
2259 return Err(Error::Unsupported(
2260 "client-side vector encryption is not supported for multi-vector collections",
2261 ));
2262 }
2263 return Ok(());
2264 }
2265 if descriptor.vector_encryption == VectorEncryption::Dcpe
2268 && descriptor.metric != DistanceMetric::L2
2269 {
2270 return Err(Error::Unsupported(
2271 "dcpe-encrypted collections require the l2 metric",
2272 ));
2273 }
2274 if descriptor.index.kind == IndexKind::Colbert && !descriptor.multivector {
2277 return Err(Error::Unsupported(
2278 "the colbert index is only for multi-vector collections",
2279 ));
2280 }
2281 match descriptor.index.kind {
2282 IndexKind::Vamana | IndexKind::Ivf | IndexKind::DiskVamana
2283 if descriptor.metric == DistanceMetric::Dot =>
2284 {
2285 Err(Error::Unsupported(
2286 "vamana, ivf, and the disk index support l2 and cosine; use hnsw for dot",
2287 ))
2288 }
2289 _ => Ok(()),
2290 }
2291}
2292
2293fn empty_index(descriptor: &Descriptor) -> CollectionIndex {
2295 if descriptor.vector_encryption == VectorEncryption::ClientSide {
2296 return CollectionIndex::None;
2297 }
2298 match descriptor.index.kind {
2299 IndexKind::Vamana => CollectionIndex::Vamana(None),
2300 IndexKind::DiskVamana => CollectionIndex::Disk(None),
2301 IndexKind::Ivf => CollectionIndex::Ivf(None),
2302 IndexKind::Colbert => CollectionIndex::Colbert(None),
2303 _ => CollectionIndex::Hnsw(Hnsw::new(
2304 descriptor.dim as usize,
2305 to_index_metric(descriptor.metric),
2306 HnswConfig::default(),
2307 )),
2308 }
2309}
2310
2311fn default_pq_m(dim: usize) -> usize {
2314 let target = (dim / 8).max(1);
2315 (1..=target)
2316 .rev()
2317 .find(|&m| dim.is_multiple_of(m))
2318 .unwrap_or(1)
2319}
2320
2321const PQ_SEED: u64 = 0x5176_5044_5141_5453;
2324const DISK_INDEX_FILE: &str = "vamana.qvx";
2327
2328fn build_index(
2329 store: &Store,
2330 cid: CollectionId,
2331 descriptor: &Descriptor,
2332 ids: &[u64],
2333 flat: &[f32],
2334) -> Result<CollectionIndex> {
2335 Ok(match build_in_memory_index(descriptor, ids, flat)? {
2336 Some(index) => index,
2337 None => {
2338 let (graph, pq) = build_disk_graph_pq(descriptor, ids, flat)?;
2339 CollectionIndex::Disk(Some(FreshDiskVamana::new(write_disk_index(
2340 store, cid, &graph, &pq,
2341 )?)?))
2342 }
2343 })
2344}
2345
2346fn build_in_memory_index(
2351 descriptor: &Descriptor,
2352 ids: &[u64],
2353 flat: &[f32],
2354) -> Result<Option<CollectionIndex>> {
2355 if descriptor.vector_encryption == VectorEncryption::ClientSide {
2358 return Ok(Some(CollectionIndex::None));
2359 }
2360 let dim = descriptor.dim as usize;
2361 let metric = to_index_metric(descriptor.metric);
2362 Ok(Some(match descriptor.index.kind {
2363 IndexKind::Vamana => CollectionIndex::Vamana(Some(FreshVamana::new(Vamana::build(
2364 ids,
2365 flat,
2366 dim,
2367 metric,
2368 VamanaConfig::default(),
2369 )?)?)),
2370 IndexKind::DiskVamana => return Ok(None),
2372 IndexKind::Ivf => {
2373 let cfg = IvfConfig {
2374 quantization: descriptor.index.pq_subspaces.map(|m| m as usize),
2375 ..IvfConfig::default()
2376 };
2377 CollectionIndex::Ivf(Some(Ivf::build(ids, flat, dim, metric, cfg)?))
2378 }
2379 IndexKind::Colbert => {
2380 let n = ids.len();
2383 let n_centroids = ((n as f64).sqrt().ceil() as usize).clamp(1, 4096);
2384 let cfg = ColbertConfig {
2385 n_centroids,
2386 n_probe: n_centroids.div_ceil(4).clamp(1, n_centroids),
2387 pq_subspaces: descriptor
2388 .index
2389 .pq_subspaces
2390 .map_or_else(|| default_pq_m(dim), |m| m as usize),
2391 seed: PQ_SEED,
2392 };
2393 CollectionIndex::Colbert(Some(ColbertIndex::build(ids, flat, dim, metric, cfg)?))
2394 }
2395 _ => {
2396 let mut h = Hnsw::new(dim, metric, HnswConfig::default());
2397 for (i, &id) in ids.iter().enumerate() {
2398 h.insert(id, &flat[i * dim..(i + 1) * dim])?;
2399 }
2400 CollectionIndex::Hnsw(h)
2401 }
2402 }))
2403}
2404
2405fn build_disk_graph_pq(
2409 descriptor: &Descriptor,
2410 ids: &[u64],
2411 flat: &[f32],
2412) -> Result<(Vamana, ProductQuantizer)> {
2413 let dim = descriptor.dim as usize;
2414 let metric = to_index_metric(descriptor.metric);
2415 let graph = Vamana::build(ids, flat, dim, metric, VamanaConfig::default())?;
2416 let m = descriptor
2417 .index
2418 .pq_subspaces
2419 .map_or_else(|| default_pq_m(dim), |x| x as usize);
2420 let pq = ProductQuantizer::train(flat, ids.len(), dim, m, metric, PQ_SEED)?;
2421 Ok((graph, pq))
2422}
2423
2424fn write_disk_index(
2429 store: &Store,
2430 cid: CollectionId,
2431 graph: &Vamana,
2432 pq: &ProductQuantizer,
2433) -> Result<DiskVamana> {
2434 let dir = store.index_dir(cid);
2435 std::fs::create_dir_all(&dir).map_err(quiver_index::DiskError::Io)?;
2436 let path = dir.join(DISK_INDEX_FILE);
2437 let codec = store.collection_codec_clone(cid)?;
2441 let tmp = dir.join(format!("{DISK_INDEX_FILE}.tmp"));
2447 quiver_index::disk::write(&tmp, graph, pq, codec.as_ref())?;
2448 std::fs::rename(&tmp, &path).map_err(quiver_index::DiskError::Io)?;
2449 let _ = std::fs::File::open(&dir).and_then(|f| f.sync_all());
2450 open_disk_index(store, cid, codec)
2451}
2452
2453fn open_disk_index(
2457 store: &Store,
2458 cid: CollectionId,
2459 codec: Box<dyn PageCodec>,
2460) -> Result<DiskVamana> {
2461 let path = store.index_dir(cid).join(DISK_INDEX_FILE);
2462 Ok(DiskVamana::open(&path, codec)?)
2463}
2464
2465fn load_index(store: &Store, handle: &mut CollectionHandle) -> Result<()> {
2470 if !handle.descriptor.multivector
2473 && handle.descriptor.index.kind == IndexKind::Ivf
2474 && let Ok(Some(blob)) = store.read_index_snapshot(handle.id)
2475 && restore_ivf_snapshot(store, handle, &blob).is_ok()
2476 {
2477 return Ok(());
2478 }
2479 if !handle.descriptor.multivector
2486 && handle.descriptor.index.kind == IndexKind::DiskVamana
2487 && std::env::var_os("QUIVER_DISABLE_DURABLE_DISK_INDEX").is_none()
2488 && let Ok(Some(blob)) = store.read_index_snapshot(handle.id)
2489 && restore_disk_snapshot(store, handle, &blob).is_ok()
2490 {
2491 return Ok(());
2492 }
2493 rebuild_index(store, handle)
2494}
2495
2496fn restore_disk_snapshot(store: &Store, handle: &mut CollectionHandle, blob: &[u8]) -> Result<()> {
2502 let envelope: DiskEnvelope = postcard::from_bytes(blob)?;
2503 if envelope.version != INDEX_ENVELOPE_VERSION {
2504 return Err(Error::Unsupported(
2505 "unsupported disk index snapshot version",
2506 ));
2507 }
2508 let base = open_disk_index(store, handle.id, store.collection_codec_clone(handle.id)?)?;
2509 if base.len() as u64 != envelope.base_row_count {
2512 return Err(Error::Unsupported(
2513 "disk base count disagrees with snapshot",
2514 ));
2515 }
2516 handle.ext_to_int = envelope
2517 .int_to_ext
2518 .iter()
2519 .enumerate()
2520 .map(|(i, ext)| (ext.clone(), i as u64))
2521 .collect();
2522 handle.int_to_ext = envelope.int_to_ext;
2523 let mut fresh = FreshDiskVamana::new(base)?;
2524 for internal in envelope.base_row_count..handle.int_to_ext.len() as u64 {
2528 let ext = &handle.int_to_ext[internal as usize];
2529 if let Some(record) = store.get(handle.id, ext)? {
2530 fresh.insert(internal, &record.vector)?;
2531 }
2532 }
2533 for id in envelope.deleted_ids {
2534 fresh.mark_deleted(id);
2535 }
2536 handle.index = CollectionIndex::Disk(Some(fresh));
2537 handle.stale = false;
2538 replay_recovery_tail(store, handle)
2539}
2540
2541fn replay_recovery_tail(store: &Store, handle: &mut CollectionHandle) -> Result<()> {
2546 let tail = store.recovery_tail(handle.id)?;
2547 for ext in &tail.deleted {
2548 index_delete_point(handle, ext);
2549 }
2550 for (ext, record) in tail.upserts {
2551 index_upsert_point(handle, &ext, &record.vector)?;
2552 }
2553 Ok(())
2554}
2555
2556fn restore_ivf_snapshot(store: &Store, handle: &mut CollectionHandle, blob: &[u8]) -> Result<()> {
2561 let envelope: IndexEnvelope = postcard::from_bytes(blob)?;
2562 if envelope.version != INDEX_ENVELOPE_VERSION {
2563 return Err(Error::Unsupported(
2564 "unsupported index snapshot envelope version",
2565 ));
2566 }
2567 let ivf = Ivf::restore(&envelope.ivf)?;
2568 handle.ext_to_int = envelope
2569 .int_to_ext
2570 .iter()
2571 .enumerate()
2572 .map(|(i, ext)| (ext.clone(), i as u64))
2573 .collect();
2574 handle.int_to_ext = envelope.int_to_ext;
2575 handle.index = CollectionIndex::Ivf(Some(ivf));
2576 handle.stale = false;
2577
2578 let tail = store.recovery_tail(handle.id)?;
2579 for ext in &tail.deleted {
2580 let Some(&internal) = handle.ext_to_int.get(ext) else {
2581 continue;
2582 };
2583 if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2584 ivf.remove(internal);
2585 }
2586 }
2587 for (ext, record) in tail.upserts {
2588 let internal = match handle.ext_to_int.get(&ext) {
2589 Some(&i) => i,
2590 None => {
2591 let i = handle.int_to_ext.len() as u64;
2592 handle.ext_to_int.insert(ext.clone(), i);
2593 handle.int_to_ext.push(ext);
2594 i
2595 }
2596 };
2597 if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2598 ivf.insert(internal, &record.vector)?;
2599 }
2600 }
2601 Ok(())
2602}
2603
2604fn index_upsert_point(handle: &mut CollectionHandle, ext_id: &str, vector: &[f32]) -> Result<()> {
2613 if mvcc_served(handle) {
2617 overlay_upsert(handle, ext_id, vector);
2618 return Ok(());
2619 }
2620 bump_write_gen(handle);
2623 if handle.stale {
2624 return Ok(());
2625 }
2626 let known = handle.ext_to_int.contains_key(ext_id);
2627 let is_hnsw = matches!(handle.index, CollectionIndex::Hnsw(_));
2628 let is_live_ivf = matches!(&handle.index, CollectionIndex::Ivf(Some(ivf)) if !ivf.is_empty());
2629 let is_live_graph = matches!(
2630 handle.index,
2631 CollectionIndex::Vamana(Some(_)) | CollectionIndex::Disk(Some(_))
2632 );
2633 let is_live_colbert = matches!(handle.index, CollectionIndex::Colbert(Some(_)));
2634 if is_hnsw && !known {
2635 let internal = handle.int_to_ext.len() as u64;
2636 if let CollectionIndex::Hnsw(h) = &mut handle.index {
2637 h.insert(internal, vector)?;
2638 }
2639 handle.ext_to_int.insert(ext_id.to_owned(), internal);
2640 handle.int_to_ext.push(ext_id.to_owned());
2641 } else if is_live_ivf {
2642 let internal = if known {
2645 handle.ext_to_int[ext_id]
2646 } else {
2647 let i = handle.int_to_ext.len() as u64;
2648 handle.ext_to_int.insert(ext_id.to_owned(), i);
2649 handle.int_to_ext.push(ext_id.to_owned());
2650 i
2651 };
2652 if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2653 ivf.insert(internal, vector)?;
2654 }
2655 } else if is_live_graph {
2656 let old = handle.ext_to_int.get(ext_id).copied();
2659 let internal = handle.int_to_ext.len() as u64;
2660 let mut pending = 0.0;
2661 match &mut handle.index {
2662 CollectionIndex::Vamana(Some(fresh)) => {
2663 if let Some(o) = old {
2664 fresh.mark_deleted(o);
2665 }
2666 fresh.insert(internal, vector)?;
2667 pending = fresh.pending_fraction();
2668 }
2669 CollectionIndex::Disk(Some(fresh)) => {
2670 if let Some(o) = old {
2671 fresh.mark_deleted(o);
2672 }
2673 fresh.insert(internal, vector)?;
2674 pending = fresh.pending_fraction();
2675 }
2676 _ => {}
2677 }
2678 handle.ext_to_int.insert(ext_id.to_owned(), internal);
2679 handle.int_to_ext.push(ext_id.to_owned());
2680 if pending >= GRAPH_REBUILD_PENDING_FRACTION {
2681 mark_stale(handle);
2682 }
2683 } else if is_live_colbert {
2684 let old = handle.ext_to_int.get(ext_id).copied();
2688 let internal = handle.int_to_ext.len() as u64;
2689 let mut crowded = false;
2690 if let CollectionIndex::Colbert(Some(c)) = &mut handle.index {
2691 if let Some(o) = old {
2692 c.mark_deleted(o);
2693 }
2694 c.insert(internal, vector)?;
2695 crowded = c.deleted_fraction() >= HNSW_REBUILD_DELETED_FRACTION;
2696 }
2697 handle.ext_to_int.insert(ext_id.to_owned(), internal);
2698 handle.int_to_ext.push(ext_id.to_owned());
2699 if crowded {
2700 mark_stale(handle);
2701 }
2702 } else {
2703 mark_stale(handle);
2704 }
2705 Ok(())
2706}
2707
2708fn index_delete_point(handle: &mut CollectionHandle, ext_id: &str) {
2715 if mvcc_served(handle) {
2718 overlay_delete(handle, ext_id);
2719 return;
2720 }
2721 bump_write_gen(handle);
2723 if handle.stale {
2724 return;
2725 }
2726 let internal = handle.ext_to_int.get(ext_id).copied();
2727 let live_ivf = matches!(handle.index, CollectionIndex::Ivf(Some(_)));
2728 let live_hnsw = matches!(handle.index, CollectionIndex::Hnsw(_));
2729 let live_graph = matches!(
2730 handle.index,
2731 CollectionIndex::Vamana(Some(_)) | CollectionIndex::Disk(Some(_))
2732 );
2733 let live_colbert = matches!(handle.index, CollectionIndex::Colbert(Some(_)));
2734 match internal {
2735 Some(internal) if live_ivf => {
2736 if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2737 ivf.remove(internal);
2738 }
2739 }
2740 Some(internal) if live_hnsw => {
2741 let mut crowded = false;
2742 if let CollectionIndex::Hnsw(h) = &mut handle.index {
2743 h.mark_deleted(internal as u32);
2744 crowded = h.deleted_fraction() >= HNSW_REBUILD_DELETED_FRACTION;
2745 }
2746 if crowded {
2747 mark_stale(handle);
2748 }
2749 }
2750 Some(internal) if live_graph => {
2751 let mut crowded = false;
2752 match &mut handle.index {
2753 CollectionIndex::Vamana(Some(fresh)) => {
2754 fresh.mark_deleted(internal);
2755 crowded = fresh.pending_fraction() >= GRAPH_REBUILD_PENDING_FRACTION;
2756 }
2757 CollectionIndex::Disk(Some(fresh)) => {
2758 fresh.mark_deleted(internal);
2759 crowded = fresh.pending_fraction() >= GRAPH_REBUILD_PENDING_FRACTION;
2760 }
2761 _ => {}
2762 }
2763 if crowded {
2764 mark_stale(handle);
2765 }
2766 }
2767 Some(internal) if live_colbert => {
2768 let mut crowded = false;
2769 if let CollectionIndex::Colbert(Some(c)) = &mut handle.index {
2770 c.mark_deleted(internal);
2771 crowded = c.deleted_fraction() >= HNSW_REBUILD_DELETED_FRACTION;
2772 }
2773 if crowded {
2774 mark_stale(handle);
2775 }
2776 }
2777 _ => mark_stale(handle),
2778 }
2779}
2780
2781struct RebuildScan {
2786 int_to_ext: Vec<String>,
2787 ext_to_int: HashMap<String, u64>,
2788 flat: Vec<f32>,
2789 docs: Option<BTreeMap<String, u32>>,
2790 sparse: Option<SparseInvertedIndex>,
2791}
2792
2793fn scan_collection(store: &Store, handle: &CollectionHandle) -> Result<RebuildScan> {
2798 let multivector = handle.descriptor.multivector;
2799 let mut int_to_ext = Vec::new();
2800 let mut ext_to_int = HashMap::new();
2801 let mut flat: Vec<f32> = Vec::new();
2802 let mut docs: BTreeMap<String, u32> = BTreeMap::new();
2803 let mut sparse = uses_sparse_index(&handle.descriptor).then(SparseInvertedIndex::new);
2806 for (ext_id, record) in store.scan(handle.id)? {
2807 let internal = int_to_ext.len() as u64;
2808 flat.extend_from_slice(&record.vector);
2809 if multivector && let Some((doc, _)) = parse_token_id(&ext_id) {
2810 *docs.entry(doc.to_owned()).or_insert(0) += 1;
2811 }
2812 if let Some(idx) = sparse.as_mut()
2813 && let Some(sv) = sparse_vector_from_payload(&record.payload)
2814 {
2815 idx.upsert(&ext_id, &sv);
2816 }
2817 ext_to_int.insert(ext_id.clone(), internal);
2818 int_to_ext.push(ext_id);
2819 }
2820 Ok(RebuildScan {
2821 int_to_ext,
2822 ext_to_int,
2823 flat,
2824 docs: multivector.then_some(docs),
2825 sparse,
2826 })
2827}
2828
2829fn rebuild_index(store: &Store, handle: &mut CollectionHandle) -> Result<()> {
2833 let scan = scan_collection(store, handle)?;
2834 let ids: Vec<u64> = (0..scan.int_to_ext.len() as u64).collect();
2835 handle.index = empty_index(&handle.descriptor);
2838 handle.index = build_index(store, handle.id, &handle.descriptor, &ids, &scan.flat)?;
2839 handle.int_to_ext = scan.int_to_ext;
2840 handle.ext_to_int = scan.ext_to_int;
2841 handle.docs = scan.docs;
2842 handle.sparse = scan.sparse;
2843 handle.stale = false;
2844 Ok(())
2845}
2846
2847pub struct RebuildInputs {
2853 collection: String,
2854 descriptor: Descriptor,
2855 scan: RebuildScan,
2856 write_gen: u64,
2857}
2858
2859enum RebuiltKind {
2864 Ready(Box<CollectionIndex>),
2865 Disk {
2866 graph: Box<Vamana>,
2867 pq: Box<ProductQuantizer>,
2868 },
2869}
2870
2871pub struct RebuiltIndex {
2874 collection: String,
2875 kind: RebuiltKind,
2876 int_to_ext: Vec<String>,
2877 ext_to_int: HashMap<String, u64>,
2878 docs: Option<BTreeMap<String, u32>>,
2879 sparse: Option<SparseInvertedIndex>,
2880 write_gen: u64,
2881}
2882
2883impl RebuildInputs {
2884 pub fn build(self) -> Result<RebuiltIndex> {
2889 let ids: Vec<u64> = (0..self.scan.int_to_ext.len() as u64).collect();
2890 let kind = match build_in_memory_index(&self.descriptor, &ids, &self.scan.flat)? {
2891 Some(index) => RebuiltKind::Ready(Box::new(index)),
2892 None => {
2893 let (graph, pq) = build_disk_graph_pq(&self.descriptor, &ids, &self.scan.flat)?;
2894 RebuiltKind::Disk {
2895 graph: Box::new(graph),
2896 pq: Box::new(pq),
2897 }
2898 }
2899 };
2900 Ok(RebuiltIndex {
2901 collection: self.collection,
2902 kind,
2903 int_to_ext: self.scan.int_to_ext,
2904 ext_to_int: self.scan.ext_to_int,
2905 docs: self.scan.docs,
2906 sparse: self.scan.sparse,
2907 write_gen: self.write_gen,
2908 })
2909 }
2910}
2911
2912fn sparse_vector_from_payload(payload: &[u8]) -> Option<SparseVector> {
2916 if payload.is_empty() {
2917 return None;
2918 }
2919 let value = serde_json::from_slice::<Value>(payload).ok()?;
2920 sparse_vector_from_value(&value)
2921}
2922
2923fn sparse_vector_from_value(payload: &Value) -> Option<SparseVector> {
2929 if let Some(raw) = payload.get(SPARSE_KEY) {
2930 return serde_json::from_value::<SparseVector>(raw.clone()).ok();
2931 }
2932 let text = payload.get(TEXT_KEY)?.as_str()?;
2933 Some(text_to_sparse(text))
2934}
2935
2936fn sparse_index_upsert_point(handle: &mut CollectionHandle, ext_id: &str, payload: &Value) {
2942 if handle.stale {
2943 return;
2944 }
2945 let Some(idx) = handle.sparse.as_mut() else {
2946 return;
2947 };
2948 match sparse_vector_from_value(payload) {
2949 Some(sv) => idx.upsert(ext_id, &sv),
2950 None => {
2951 idx.remove(ext_id);
2952 }
2953 }
2954}
2955
2956fn sparse_index_delete_point(handle: &mut CollectionHandle, ext_id: &str) {
2959 if let Some(idx) = handle.sparse.as_mut() {
2960 idx.remove(ext_id);
2961 }
2962}
2963
2964fn candidate_ids(
2976 store: &Store,
2977 cid: CollectionId,
2978 filter: &Filter,
2979 filterable: &[FilterableField],
2980) -> Result<Option<BTreeSet<String>>> {
2981 match filter {
2982 Filter::And(subs) => {
2983 let mut acc: Option<BTreeSet<String>> = None;
2986 for sub in subs {
2987 if let Some(set) = candidate_ids(store, cid, sub, filterable)? {
2988 acc = Some(match acc {
2989 Some(existing) => existing.intersection(&set).cloned().collect(),
2990 None => set,
2991 });
2992 }
2993 }
2994 Ok(acc)
2995 }
2996 Filter::Or(subs) => {
2997 let mut acc = BTreeSet::new();
3000 for sub in subs {
3001 match candidate_ids(store, cid, sub, filterable)? {
3002 Some(set) => acc.extend(set),
3003 None => return Ok(None),
3004 }
3005 }
3006 Ok(Some(acc))
3007 }
3008 Filter::Not(_) => Ok(None),
3010 leaf => match leaf_predicate(leaf, filterable) {
3012 Some(pred) => Ok(Some(store.matching_ids(cid, &pred)?.into_iter().collect())),
3013 None => Ok(None),
3014 },
3015 }
3016}
3017
3018fn leaf_predicate(filter: &Filter, filterable: &[FilterableField]) -> Option<SecPredicate> {
3022 let field_type = |field: &str| {
3023 filterable
3024 .iter()
3025 .find(|f| f.path == field)
3026 .map(|f| f.field_type)
3027 };
3028 match filter {
3029 Filter::Eq { field, value } => Some(SecPredicate::Eq {
3030 field: field.clone(),
3031 value: sec_value(field_type(field)?, value)?,
3032 }),
3033 Filter::In { field, values } => {
3034 let ft = field_type(field)?;
3035 let values: Option<Vec<SecValue>> = values.iter().map(|v| sec_value(ft, v)).collect();
3038 Some(SecPredicate::In {
3039 field: field.clone(),
3040 values: values?,
3041 })
3042 }
3043 Filter::Lt { field, value } => {
3044 one_sided_range(field, field_type(field)?, value, false, false)
3045 }
3046 Filter::Lte { field, value } => {
3047 one_sided_range(field, field_type(field)?, value, false, true)
3048 }
3049 Filter::Gt { field, value } => {
3050 one_sided_range(field, field_type(field)?, value, true, false)
3051 }
3052 Filter::Gte { field, value } => {
3053 one_sided_range(field, field_type(field)?, value, true, true)
3054 }
3055 _ => None,
3056 }
3057}
3058
3059fn one_sided_range(
3063 field: &str,
3064 field_type: FieldType,
3065 value: &Value,
3066 is_lower: bool,
3067 inclusive: bool,
3068) -> Option<SecPredicate> {
3069 let v = sec_value(field_type, value)?;
3070 let (lo, hi, lo_inclusive, hi_inclusive) = if is_lower {
3071 (Some(v), None, inclusive, false)
3072 } else {
3073 (None, Some(v), false, inclusive)
3074 };
3075 Some(SecPredicate::Range {
3076 field: field.to_owned(),
3077 lo,
3078 hi,
3079 lo_inclusive,
3080 hi_inclusive,
3081 })
3082}
3083
3084fn sec_value(field_type: FieldType, value: &Value) -> Option<SecValue> {
3088 match (field_type, value) {
3089 (FieldType::Keyword, Value::String(s)) => Some(SecValue::Keyword(s.clone())),
3090 (FieldType::Numeric, Value::Number(n)) => n.as_f64().map(SecValue::Numeric),
3091 _ => None,
3092 }
3093}
3094
3095#[cfg(test)]
3096mod tests {
3097 use super::*;
3098 use serde_json::json;
3099
3100 fn desc() -> Descriptor {
3101 Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
3102 }
3103
3104 fn open(dir: &Path) -> Database {
3105 Database::open(dir).unwrap()
3106 }
3107
3108 #[test]
3109 fn prepare_then_apply_matches_a_direct_write() {
3110 let dir_a = tempfile::tempdir().unwrap();
3115 let mut direct = open(dir_a.path());
3116 let dir_b = tempfile::tempdir().unwrap();
3117 let mut raft = open(dir_b.path());
3118
3119 let vector = [1.0, 2.0, 3.0, 4.0];
3120 let payload = json!({ "k": "v" });
3121
3122 direct.create_collection("docs", desc()).unwrap();
3124 direct.upsert("docs", "p1", &vector, &payload).unwrap();
3125 direct
3126 .upsert("docs", "p2", &[0.0, 1.0, 0.0, 0.0], &json!({}))
3127 .unwrap();
3128 assert!(direct.delete("docs", "p2").unwrap());
3129
3130 let create = raft.prepare_create_collection("docs", &desc()).unwrap();
3132 raft.apply_replicated(create).unwrap();
3133 let up1 = raft
3134 .prepare_upsert("docs", "p1", &vector, &payload)
3135 .unwrap();
3136 raft.apply_replicated(up1).unwrap();
3137 let up2 = raft
3138 .prepare_upsert("docs", "p2", &[0.0, 1.0, 0.0, 0.0], &json!({}))
3139 .unwrap();
3140 raft.apply_replicated(up2).unwrap();
3141 let del = raft
3142 .prepare_delete("docs", "p2")
3143 .unwrap()
3144 .expect("p2 present");
3145 raft.apply_replicated(del).unwrap();
3146
3147 assert_eq!(
3149 direct.replication_snapshot().unwrap(),
3150 raft.replication_snapshot().unwrap(),
3151 );
3152 }
3153
3154 #[test]
3155 fn prepare_validates_like_the_direct_path() {
3156 let dir = tempfile::tempdir().unwrap();
3157 let mut db = open(dir.path());
3158 db.create_collection("docs", desc()).unwrap();
3159 assert!(
3161 db.prepare_upsert("docs", "p", &[1.0, 2.0], &json!({}))
3162 .is_err()
3163 );
3164 assert!(db.prepare_create_collection("docs", &desc()).is_err());
3166 assert!(
3168 db.prepare_upsert("nope", "p", &[1.0, 2.0, 3.0, 4.0], &json!({}))
3169 .is_err()
3170 );
3171 assert!(db.prepare_delete("docs", "ghost").unwrap().is_none());
3174 assert!(db.prepare_delete("nope", "x").is_err());
3175 }
3176
3177 #[test]
3178 fn hybrid_search_fuses_dense_and_sparse() {
3179 let tmp = tempfile::tempdir().unwrap();
3180 let mut db = open(tmp.path());
3181 db.create_collection("kb", desc()).unwrap();
3182 db.upsert(
3185 "kb",
3186 "a",
3187 &[1.0, 0.0, 0.0, 0.0],
3188 &json!({ "__quiver_sparse__": { "indices": [100], "values": [0.1] } }),
3189 )
3190 .unwrap();
3191 db.upsert(
3192 "kb",
3193 "b",
3194 &[0.0, 1.0, 0.0, 0.0],
3195 &json!({ "__quiver_sparse__": { "indices": [1, 2], "values": [5.0, 5.0] } }),
3196 )
3197 .unwrap();
3198 db.upsert(
3199 "kb",
3200 "c",
3201 &[0.0, 0.0, 0.0, 1.0],
3202 &json!({ "__quiver_sparse__": { "indices": [9], "values": [1.0] } }),
3203 )
3204 .unwrap();
3205
3206 let dense_q = [1.0, 0.0, 0.0, 0.0];
3207 let sparse_q = SparseVector {
3208 indices: vec![1, 2],
3209 values: vec![1.0, 1.0],
3210 };
3211 let params = SearchParams {
3212 k: 3,
3213 ..SearchParams::default()
3214 };
3215
3216 let hits = db
3218 .hybrid_search(
3219 "kb",
3220 Some(&dense_q),
3221 Some(&sparse_q),
3222 None,
3223 ¶ms,
3224 DEFAULT_RRF_K0,
3225 )
3226 .unwrap();
3227 let ids: Vec<&str> = hits.iter().map(|m| m.id.as_str()).collect();
3228 assert!(ids.contains(&"a") && ids.contains(&"b"), "got {ids:?}");
3229 assert_eq!(ids[2], "c", "c is worst on both sides; got {ids:?}");
3230
3231 let sparse_only = db
3233 .hybrid_search("kb", None, Some(&sparse_q), None, ¶ms, DEFAULT_RRF_K0)
3234 .unwrap();
3235 assert_eq!(sparse_only[0].id, "b");
3236
3237 let dense_only = db
3239 .hybrid_search("kb", Some(&dense_q), None, None, ¶ms, DEFAULT_RRF_K0)
3240 .unwrap();
3241 assert_eq!(dense_only[0].id, "a");
3242
3243 assert!(
3245 db.hybrid_search("kb", None, None, None, ¶ms, DEFAULT_RRF_K0)
3246 .is_err()
3247 );
3248 }
3249
3250 fn sparse_ids(db: &mut Database, q: &SparseVector) -> Vec<String> {
3252 let params = SearchParams {
3253 k: 10,
3254 ..SearchParams::default()
3255 };
3256 db.hybrid_search("kb", None, Some(q), None, ¶ms, DEFAULT_RRF_K0)
3257 .unwrap()
3258 .into_iter()
3259 .map(|m| m.id)
3260 .collect()
3261 }
3262
3263 #[test]
3264 fn sparse_index_equals_the_store_scan_fallback() {
3265 let tmp = tempfile::tempdir().unwrap();
3266 let mut db = open(tmp.path());
3267 db.create_collection("kb", desc()).unwrap();
3268 let z = [0.0f32, 0.0, 0.0, 0.0];
3269 for (id, dims, vals) in [
3270 ("a", vec![1u32, 2], vec![5.0f32, 1.0]),
3271 ("b", vec![2u32, 3], vec![3.0f32, 4.0]),
3272 ("c", vec![1u32, 3], vec![2.0f32, 2.0]),
3273 ("d", vec![9u32], vec![1.0f32]), ] {
3275 db.upsert(
3276 "kb",
3277 id,
3278 &z,
3279 &json!({ "__quiver_sparse__": { "indices": dims, "values": vals } }),
3280 )
3281 .unwrap();
3282 }
3283 let q = SparseVector {
3284 indices: vec![1, 2, 3],
3285 values: vec![1.0, 1.0, 1.0],
3286 };
3287
3288 assert!(db.collections.get("kb").unwrap().sparse.is_some());
3290 let via_index = sparse_ids(&mut db, &q);
3291 assert!(!via_index.contains(&"d".to_owned()), "d shares no term");
3292
3293 db.collections.get_mut("kb").unwrap().sparse = None;
3296 let via_scan = sparse_ids(&mut db, &q);
3297 assert_eq!(via_index, via_scan);
3298 }
3299
3300 #[test]
3301 fn sparse_index_reflects_updates_and_deletes_like_a_rebuild() {
3302 let tmp = tempfile::tempdir().unwrap();
3303 let mut db = open(tmp.path());
3304 db.create_collection("kb", desc()).unwrap();
3305 let z = [0.0f32, 0.0, 0.0, 0.0];
3306 db.upsert(
3307 "kb",
3308 "a",
3309 &z,
3310 &json!({ "__quiver_sparse__": { "indices": [1, 2], "values": [5.0, 5.0] } }),
3311 )
3312 .unwrap();
3313 db.upsert(
3314 "kb",
3315 "b",
3316 &z,
3317 &json!({ "__quiver_sparse__": { "indices": [2], "values": [3.0] } }),
3318 )
3319 .unwrap();
3320 db.upsert(
3321 "kb",
3322 "c",
3323 &z,
3324 &json!({ "__quiver_sparse__": { "indices": [1], "values": [9.0] } }),
3325 )
3326 .unwrap();
3327 db.upsert(
3329 "kb",
3330 "a",
3331 &z,
3332 &json!({ "__quiver_sparse__": { "indices": [7], "values": [1.0] } }),
3333 )
3334 .unwrap();
3335 assert!(db.delete("kb", "b").unwrap());
3336
3337 let q = SparseVector {
3338 indices: vec![1, 2],
3339 values: vec![1.0, 1.0],
3340 };
3341 let incremental = sparse_ids(&mut db, &q);
3343 assert_eq!(incremental, vec!["c".to_owned()]);
3344
3345 db.collections.get_mut("kb").unwrap().stale = true;
3347 let rebuilt = sparse_ids(&mut db, &q);
3348 assert_eq!(incremental, rebuilt);
3349 }
3350
3351 #[test]
3352 fn sparse_index_is_rebuilt_on_reopen() {
3353 let tmp = tempfile::tempdir().unwrap();
3354 {
3355 let mut db = open(tmp.path());
3356 db.create_collection("kb", desc()).unwrap();
3357 db.upsert(
3358 "kb",
3359 "a",
3360 &[0.0, 0.0, 0.0, 0.0],
3361 &json!({ "__quiver_sparse__": { "indices": [1], "values": [1.0] } }),
3362 )
3363 .unwrap();
3364 }
3365 let mut db = open(tmp.path());
3366 assert!(db.collections.get("kb").unwrap().sparse.is_some());
3367 let q = SparseVector {
3368 indices: vec![1],
3369 values: vec![1.0],
3370 };
3371 assert_eq!(sparse_ids(&mut db, &q), vec!["a".to_owned()]);
3372 }
3373
3374 #[test]
3375 fn hybrid_sparse_honours_the_payload_filter() {
3376 let tmp = tempfile::tempdir().unwrap();
3377 let mut db = open(tmp.path());
3378 db.create_collection("kb", desc()).unwrap();
3379 let z = [0.0f32, 0.0, 0.0, 0.0];
3380 db.upsert(
3381 "kb",
3382 "a",
3383 &z,
3384 &json!({ "lang": "en", "__quiver_sparse__": { "indices": [1], "values": [5.0] } }),
3385 )
3386 .unwrap();
3387 db.upsert(
3388 "kb",
3389 "b",
3390 &z,
3391 &json!({ "lang": "fr", "__quiver_sparse__": { "indices": [1], "values": [9.0] } }),
3392 )
3393 .unwrap();
3394 let q = SparseVector {
3395 indices: vec![1],
3396 values: vec![1.0],
3397 };
3398 let params = SearchParams {
3399 k: 10,
3400 filter: Some(Filter::Eq {
3401 field: "lang".to_owned(),
3402 value: json!("en"),
3403 }),
3404 ..SearchParams::default()
3405 };
3406 let hits: Vec<String> = db
3407 .hybrid_search("kb", None, Some(&q), None, ¶ms, DEFAULT_RRF_K0)
3408 .unwrap()
3409 .into_iter()
3410 .map(|m| m.id)
3411 .collect();
3412 assert_eq!(hits, vec!["a".to_owned()]);
3414 }
3415
3416 #[test]
3417 fn hybrid_text_search_indexes_and_ranks_by_bm25() {
3418 let tmp = tempfile::tempdir().unwrap();
3419 let mut db = open(tmp.path());
3420 db.create_collection("kb", desc()).unwrap();
3421 let z = [0.0f32, 0.0, 0.0, 0.0];
3422 db.upsert(
3424 "kb",
3425 "cats",
3426 &z,
3427 &json!({ "__quiver_text__": "the quick brown cat jumps" }),
3428 )
3429 .unwrap();
3430 db.upsert(
3431 "kb",
3432 "dogs",
3433 &z,
3434 &json!({ "__quiver_text__": "a lazy dog sleeps all day" }),
3435 )
3436 .unwrap();
3437
3438 let params = SearchParams {
3439 k: 10,
3440 ..SearchParams::default()
3441 };
3442 let hits: Vec<String> = db
3445 .hybrid_search("kb", None, None, Some("cats"), ¶ms, DEFAULT_RRF_K0)
3446 .unwrap()
3447 .into_iter()
3448 .map(|m| m.id)
3449 .collect();
3450 assert_eq!(hits, vec!["cats".to_owned()], "only the cat doc matches");
3451
3452 assert!(
3454 db.hybrid_search("kb", None, None, Some("elephant"), ¶ms, DEFAULT_RRF_K0)
3455 .unwrap()
3456 .is_empty()
3457 );
3458
3459 let dense_q = [1.0, 0.0, 0.0, 0.0];
3461 db.upsert("kb", "near", &[1.0, 0.0, 0.0, 0.0], &json!({}))
3462 .unwrap();
3463 let fused: Vec<String> = db
3464 .hybrid_search(
3465 "kb",
3466 Some(&dense_q),
3467 None,
3468 Some("dog"),
3469 ¶ms,
3470 DEFAULT_RRF_K0,
3471 )
3472 .unwrap()
3473 .into_iter()
3474 .map(|m| m.id)
3475 .collect();
3476 assert!(
3477 fused.contains(&"near".to_owned()) && fused.contains(&"dogs".to_owned()),
3478 "dense match + lexical match both surface; got {fused:?}"
3479 );
3480 }
3481
3482 #[test]
3483 fn create_upsert_search_get_end_to_end() {
3484 let tmp = tempfile::tempdir().unwrap();
3485 let mut db = open(tmp.path());
3486 db.create_collection("items", desc()).unwrap();
3487 db.upsert(
3488 "items",
3489 "a",
3490 &[0.0, 0.0, 0.0, 0.0],
3491 &json!({"color": "red"}),
3492 )
3493 .unwrap();
3494 db.upsert(
3495 "items",
3496 "b",
3497 &[1.0, 0.0, 0.0, 0.0],
3498 &json!({"color": "blue"}),
3499 )
3500 .unwrap();
3501 db.upsert(
3502 "items",
3503 "c",
3504 &[5.0, 5.0, 5.0, 5.0],
3505 &json!({"color": "red"}),
3506 )
3507 .unwrap();
3508
3509 let near = db
3510 .search("items", &[0.1, 0.0, 0.0, 0.0], &SearchParams::default())
3511 .unwrap();
3512 assert_eq!(near[0].id, "a");
3513 assert_eq!(near[1].id, "b");
3514
3515 let got = db.get("items", "c").unwrap().unwrap();
3516 assert_eq!(got.vector, Some(vec![5.0, 5.0, 5.0, 5.0]));
3517 assert_eq!(got.payload, Some(json!({"color": "red"})));
3518 }
3519
3520 #[test]
3521 fn upsert_batch_produces_same_search_results_as_sequential() {
3522 let tmp_seq = tempfile::tempdir().unwrap();
3523 let tmp_bat = tempfile::tempdir().unwrap();
3524
3525 let vectors: Vec<[f32; 4]> = (0..20u32).map(|i| [i as f32, 0.0, 0.0, 0.0]).collect();
3526 let ids: Vec<String> = (0..20u32).map(|i| format!("p{i}")).collect();
3527 let payload = json!({});
3528
3529 {
3531 let mut db = open(tmp_seq.path());
3532 db.create_collection("c", desc()).unwrap();
3533 for (id, vec) in ids.iter().zip(vectors.iter()) {
3534 db.upsert("c", id, vec, &payload).unwrap();
3535 }
3536 }
3537 {
3539 let mut db = open(tmp_bat.path());
3540 db.create_collection("c", desc()).unwrap();
3541 let pts: Vec<(&str, &[f32], &serde_json::Value)> = ids
3542 .iter()
3543 .zip(vectors.iter())
3544 .map(|(id, v)| (id.as_str(), v.as_slice(), &payload))
3545 .collect();
3546 let n = db.upsert_batch("c", &pts).unwrap();
3547 assert_eq!(n, 20);
3548 }
3549
3550 let query = [10.0f32, 0.0, 0.0, 0.0];
3551 let params = SearchParams {
3552 k: 5,
3553 ..Default::default()
3554 };
3555
3556 let mut seq_db = open(tmp_seq.path());
3557 let mut bat_db = open(tmp_bat.path());
3558 let seq: Vec<String> = seq_db
3559 .search("c", &query, ¶ms)
3560 .unwrap()
3561 .into_iter()
3562 .map(|m| m.id)
3563 .collect();
3564 let bat: Vec<String> = bat_db
3565 .search("c", &query, ¶ms)
3566 .unwrap()
3567 .into_iter()
3568 .map(|m| m.id)
3569 .collect();
3570 assert_eq!(
3571 seq, bat,
3572 "batch and sequential produce different search results"
3573 );
3574 }
3575
3576 #[test]
3577 fn upsert_bulk_defers_the_index_then_searches_correctly() {
3578 let tmp = tempfile::tempdir().unwrap();
3579 let mut db = open(tmp.path());
3580 db.create_collection("c", desc()).unwrap();
3581 let vectors: Vec<[f32; 4]> = (0..20u32).map(|i| [i as f32, 0.0, 0.0, 0.0]).collect();
3582 let ids: Vec<String> = (0..20u32).map(|i| format!("p{i}")).collect();
3583 let plain = json!({});
3586 let sparse_payload = json!({ "__quiver_sparse__": { "indices": [7], "values": [1.0] } });
3587 let pts: Vec<(&str, &[f32], &serde_json::Value)> = ids
3588 .iter()
3589 .zip(vectors.iter())
3590 .map(|(id, v)| {
3591 let payload = if id == "p3" { &sparse_payload } else { &plain };
3592 (id.as_str(), v.as_slice(), payload)
3593 })
3594 .collect();
3595 let n = db.upsert_bulk("c", &pts).unwrap();
3596 assert_eq!(n, 20);
3597
3598 assert!(db.collections.get("c").unwrap().stale);
3600
3601 let query = [10.0f32, 0.0, 0.0, 0.0];
3603 let params = SearchParams {
3604 k: 5,
3605 ..Default::default()
3606 };
3607 let hits: Vec<String> = db
3608 .search("c", &query, ¶ms)
3609 .unwrap()
3610 .into_iter()
3611 .map(|m| m.id)
3612 .collect();
3613 assert_eq!(hits[0], "p10", "nearest to 10 is p10; got {hits:?}");
3614 assert!(!db.collections.get("c").unwrap().stale, "rebuilt on read");
3615
3616 let q = SparseVector {
3618 indices: vec![7],
3619 values: vec![1.0],
3620 };
3621 let sparse_hits: Vec<String> = db
3622 .hybrid_search("c", None, Some(&q), None, ¶ms, DEFAULT_RRF_K0)
3623 .unwrap()
3624 .into_iter()
3625 .map(|m| m.id)
3626 .collect();
3627 assert_eq!(sparse_hits, vec!["p3".to_owned()]);
3628 }
3629
3630 #[test]
3631 fn filtered_search_only_returns_matching_payloads() {
3632 let tmp = tempfile::tempdir().unwrap();
3633 let mut db = open(tmp.path());
3634 db.create_collection("items", desc()).unwrap();
3635 for i in 0..20u32 {
3636 let color = if i % 2 == 0 { "red" } else { "blue" };
3637 db.upsert(
3638 "items",
3639 &format!("p{i}"),
3640 &[i as f32, 0.0, 0.0, 0.0],
3641 &json!({"color": color, "n": i}),
3642 )
3643 .unwrap();
3644 }
3645 let params = SearchParams {
3646 k: 5,
3647 filter: Some(Filter::Eq {
3648 field: "color".into(),
3649 value: json!("red"),
3650 }),
3651 ef_search: 64,
3652 with_payload: true,
3653 with_vector: false,
3654 };
3655 let results = db.search("items", &[0.0; 4], ¶ms).unwrap();
3656 assert!(!results.is_empty());
3657 for m in &results {
3658 assert_eq!(m.payload.as_ref().unwrap()["color"], json!("red"));
3659 }
3660 }
3661
3662 #[test]
3663 fn persists_and_rebuilds_index_on_reopen() {
3664 let tmp = tempfile::tempdir().unwrap();
3665 {
3666 let mut db = open(tmp.path());
3667 db.create_collection("items", desc()).unwrap();
3668 for i in 0..50u32 {
3669 db.upsert(
3670 "items",
3671 &format!("p{i}"),
3672 &[i as f32, 1.0, 2.0, 3.0],
3673 &json!({}),
3674 )
3675 .unwrap();
3676 }
3677 db.checkpoint().unwrap();
3678 }
3679 let mut db = open(tmp.path());
3680 assert_eq!(db.len("items").unwrap(), 50);
3681 let res = db
3682 .search("items", &[7.0, 1.0, 2.0, 3.0], &SearchParams::default())
3683 .unwrap();
3684 assert_eq!(res[0].id, "p7");
3685 }
3686
3687 #[test]
3688 fn update_reflects_new_vector_after_rebuild() {
3689 let tmp = tempfile::tempdir().unwrap();
3690 let mut db = open(tmp.path());
3691 db.create_collection("items", desc()).unwrap();
3692 db.upsert("items", "a", &[0.0; 4], &json!({})).unwrap();
3693 db.upsert("items", "b", &[10.0, 0.0, 0.0, 0.0], &json!({}))
3694 .unwrap();
3695 db.upsert("items", "a", &[100.0, 0.0, 0.0, 0.0], &json!({}))
3697 .unwrap();
3698 let res = db
3699 .search("items", &[0.0; 4], &SearchParams::default())
3700 .unwrap();
3701 assert_eq!(res[0].id, "b");
3702 assert_eq!(
3703 db.get("items", "a").unwrap().unwrap().vector,
3704 Some(vec![100.0, 0.0, 0.0, 0.0])
3705 );
3706 }
3707
3708 #[test]
3709 fn delete_removes_from_search() {
3710 let tmp = tempfile::tempdir().unwrap();
3711 let mut db = open(tmp.path());
3712 db.create_collection("items", desc()).unwrap();
3713 db.upsert("items", "a", &[0.0; 4], &json!({})).unwrap();
3714 db.upsert("items", "b", &[1.0, 0.0, 0.0, 0.0], &json!({}))
3715 .unwrap();
3716 assert!(db.delete("items", "a").unwrap());
3717 let res = db
3718 .search("items", &[0.0; 4], &SearchParams::default())
3719 .unwrap();
3720 assert!(res.iter().all(|m| m.id != "a"));
3721 assert!(db.get("items", "a").unwrap().is_none());
3722 }
3723
3724 #[test]
3725 fn unknown_collection_errors() {
3726 let tmp = tempfile::tempdir().unwrap();
3727 let mut db = open(tmp.path());
3728 assert!(matches!(
3729 db.search("nope", &[0.0; 4], &SearchParams::default()),
3730 Err(Error::CollectionNotFound(_))
3731 ));
3732 db.create_collection("c", desc()).unwrap();
3733 assert!(matches!(
3734 db.create_collection("c", desc()),
3735 Err(Error::Core(quiver_core::CoreError::AlreadyExists(_)))
3736 ));
3737 }
3738
3739 fn desc_with(kind: IndexKind) -> Descriptor {
3740 Descriptor::new(4, Dtype::F32, DistanceMetric::L2).with_index(IndexSpec {
3741 kind,
3742 pq_subspaces: None,
3743 })
3744 }
3745
3746 #[test]
3747 fn vamana_and_ivf_collections_find_the_nearest_point() {
3748 for kind in [IndexKind::Vamana, IndexKind::Ivf] {
3749 let tmp = tempfile::tempdir().unwrap();
3750 let mut db = open(tmp.path());
3751 db.create_collection("c", desc_with(kind)).unwrap();
3752 for i in 0..40u32 {
3753 db.upsert(
3754 "c",
3755 &format!("p{i}"),
3756 &[i as f32, 0.0, 0.0, 0.0],
3757 &json!({}),
3758 )
3759 .unwrap();
3760 }
3761 let res = db
3763 .search("c", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
3764 .unwrap();
3765 assert_eq!(res[0].id, "p7", "{kind:?} nearest");
3766 }
3767 }
3768
3769 #[test]
3770 fn index_kind_persists_and_rebuilds_on_reopen() {
3771 let tmp = tempfile::tempdir().unwrap();
3772 {
3773 let mut db = open(tmp.path());
3774 db.create_collection("v", desc_with(IndexKind::Vamana))
3775 .unwrap();
3776 for i in 0..20u32 {
3777 db.upsert(
3778 "v",
3779 &format!("p{i}"),
3780 &[i as f32, 1.0, 2.0, 3.0],
3781 &json!({}),
3782 )
3783 .unwrap();
3784 }
3785 db.checkpoint().unwrap();
3786 }
3787 let mut db = open(tmp.path());
3788 assert_eq!(db.descriptor("v").unwrap().index.kind, IndexKind::Vamana);
3789 let res = db
3790 .search("v", &[7.0, 1.0, 2.0, 3.0], &SearchParams::default())
3791 .unwrap();
3792 assert_eq!(res[0].id, "p7");
3793 }
3794
3795 #[test]
3800 fn disk_index_loads_from_snapshot_without_rebuild_on_reopen() {
3801 let tmp = tempfile::tempdir().unwrap();
3802 let cid;
3803 {
3804 let mut db = open(tmp.path());
3805 db.create_collection("d", desc_with(IndexKind::DiskVamana))
3806 .unwrap();
3807 for i in 0..100u32 {
3808 db.upsert(
3809 "d",
3810 &format!("p{i}"),
3811 &[i as f32, 0.0, 0.0, 0.0],
3812 &json!({}),
3813 )
3814 .unwrap();
3815 }
3816 db.search("d", &[1.0, 0.0, 0.0, 0.0], &SearchParams::default())
3818 .unwrap();
3819 db.checkpoint().unwrap();
3820 for i in 100..115u32 {
3823 db.upsert(
3824 "d",
3825 &format!("p{i}"),
3826 &[i as f32, 0.0, 0.0, 0.0],
3827 &json!({}),
3828 )
3829 .unwrap();
3830 }
3831 cid = db.collections["d"].id;
3832 let base = open_disk_index(
3833 &db.store,
3834 cid,
3835 db.store.collection_codec_clone(cid).unwrap(),
3836 )
3837 .unwrap();
3838 assert_eq!(base.len(), 100, "base sealed at the checkpoint count");
3839 }
3840
3841 let mut db = open(tmp.path());
3842 assert_eq!(
3844 db.search("d", &[50.0, 0.0, 0.0, 0.0], &SearchParams::default())
3845 .unwrap()[0]
3846 .id,
3847 "p50",
3848 );
3849 assert_eq!(
3850 db.search("d", &[110.0, 0.0, 0.0, 0.0], &SearchParams::default())
3851 .unwrap()[0]
3852 .id,
3853 "p110",
3854 "post-checkpoint insert survived reopen via WAL-tail replay",
3855 );
3856 let base = open_disk_index(
3859 &db.store,
3860 cid,
3861 db.store.collection_codec_clone(cid).unwrap(),
3862 )
3863 .unwrap();
3864 assert_eq!(
3865 base.len(),
3866 100,
3867 "reopen loaded the base; it was not rebuilt"
3868 );
3869 }
3870
3871 #[test]
3874 fn disk_index_falls_back_to_rebuild_when_base_is_missing() {
3875 let tmp = tempfile::tempdir().unwrap();
3876 let base_path;
3877 {
3878 let mut db = open(tmp.path());
3879 db.create_collection("d", desc_with(IndexKind::DiskVamana))
3880 .unwrap();
3881 for i in 0..60u32 {
3882 db.upsert(
3883 "d",
3884 &format!("p{i}"),
3885 &[i as f32, 0.0, 0.0, 0.0],
3886 &json!({}),
3887 )
3888 .unwrap();
3889 }
3890 db.search("d", &[1.0, 0.0, 0.0, 0.0], &SearchParams::default())
3891 .unwrap();
3892 db.checkpoint().unwrap();
3893 let cid = db.collections["d"].id;
3894 base_path = db.store.index_dir(cid).join(DISK_INDEX_FILE);
3895 }
3896 std::fs::remove_file(&base_path).unwrap();
3898 {
3899 let mut db = open(tmp.path());
3900 assert_eq!(
3901 db.search("d", &[25.0, 0.0, 0.0, 0.0], &SearchParams::default())
3902 .unwrap()[0]
3903 .id,
3904 "p25",
3905 "rebuild fallback still answers correctly after a lost base",
3906 );
3907 assert!(
3910 base_path.exists(),
3911 "the fallback rebuild re-sealed the base file"
3912 );
3913 db.checkpoint().unwrap();
3914 }
3915 let len = std::fs::metadata(&base_path).unwrap().len();
3918 std::fs::OpenOptions::new()
3919 .write(true)
3920 .open(&base_path)
3921 .unwrap()
3922 .set_len(len / 2)
3923 .unwrap();
3924
3925 let mut db = open(tmp.path());
3926 assert_eq!(
3927 db.search("d", &[25.0, 0.0, 0.0, 0.0], &SearchParams::default())
3928 .unwrap()[0]
3929 .id,
3930 "p25",
3931 "rebuild fallback still answers correctly after a torn base",
3932 );
3933 }
3934
3935 #[test]
3936 fn ivf_upserts_and_deletes_incrementally_without_rebuild() {
3937 let tmp = tempfile::tempdir().unwrap();
3938 let mut db = open(tmp.path());
3939 db.create_collection("c", desc_with(IndexKind::Ivf))
3940 .unwrap();
3941 for i in 0..50u32 {
3942 db.upsert(
3943 "c",
3944 &format!("p{i}"),
3945 &[i as f32, 0.0, 0.0, 0.0],
3946 &json!({}),
3947 )
3948 .unwrap();
3949 }
3950 let _ = db
3952 .search("c", &[1.0, 0.0, 0.0, 0.0], &SearchParams::default())
3953 .unwrap();
3954 assert!(!db.collections["c"].stale, "the search built the index");
3955
3956 db.upsert("c", "far", &[500.0, 0.0, 0.0, 0.0], &json!({}))
3958 .unwrap();
3959 assert!(!db.collections["c"].stale, "ivf insert stayed incremental");
3960 let res = db
3961 .search("c", &[500.0, 0.0, 0.0, 0.0], &SearchParams::default())
3962 .unwrap();
3963 assert_eq!(res[0].id, "far");
3964
3965 assert!(db.delete("c", "far").unwrap());
3967 assert!(!db.collections["c"].stale, "ivf delete stayed incremental");
3968 let res = db
3969 .search("c", &[500.0, 0.0, 0.0, 0.0], &SearchParams::default())
3970 .unwrap();
3971 assert!(res.iter().all(|m| m.id != "far"), "deleted point is gone");
3972 }
3973
3974 #[test]
3975 fn ivf_incremental_update_replaces_the_vector() {
3976 let tmp = tempfile::tempdir().unwrap();
3977 let mut db = open(tmp.path());
3978 db.create_collection("c", desc_with(IndexKind::Ivf))
3979 .unwrap();
3980 for i in 0..30u32 {
3981 db.upsert(
3982 "c",
3983 &format!("p{i}"),
3984 &[i as f32, 0.0, 0.0, 0.0],
3985 &json!({}),
3986 )
3987 .unwrap();
3988 }
3989 let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
3990 db.upsert("c", "p5", &[900.0, 0.0, 0.0, 0.0], &json!({}))
3992 .unwrap();
3993 assert!(!db.collections["c"].stale);
3994 let at_new = db
3995 .search("c", &[900.0, 0.0, 0.0, 0.0], &SearchParams::default())
3996 .unwrap();
3997 assert_eq!(at_new[0].id, "p5", "p5 found at its new location");
3998 let at_old = db
3999 .search("c", &[5.0, 0.0, 0.0, 0.0], &SearchParams::default())
4000 .unwrap();
4001 assert!(at_old.iter().all(|m| m.id != "p5"), "stale vector is gone");
4002 }
4003
4004 #[test]
4005 fn ivf_reinsert_after_incremental_delete_is_found() {
4006 let tmp = tempfile::tempdir().unwrap();
4007 let mut db = open(tmp.path());
4008 db.create_collection("c", desc_with(IndexKind::Ivf))
4009 .unwrap();
4010 for i in 0..20u32 {
4011 db.upsert(
4012 "c",
4013 &format!("p{i}"),
4014 &[i as f32, 0.0, 0.0, 0.0],
4015 &json!({}),
4016 )
4017 .unwrap();
4018 }
4019 let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
4020 assert!(db.delete("c", "p3").unwrap());
4021 assert!(!db.collections["c"].stale);
4022 db.upsert("c", "p3", &[3.0, 0.0, 0.0, 0.0], &json!({}))
4024 .unwrap();
4025 assert!(!db.collections["c"].stale);
4026 let res = db
4027 .search("c", &[3.0, 0.0, 0.0, 0.0], &SearchParams::default())
4028 .unwrap();
4029 assert_eq!(res[0].id, "p3");
4030 }
4031
4032 #[test]
4033 fn hnsw_in_place_update_falls_back_to_rebuild() {
4034 let tmp = tempfile::tempdir().unwrap();
4036 let mut db = open(tmp.path());
4037 db.create_collection("c", desc()).unwrap();
4038 for i in 0..10u32 {
4039 db.upsert(
4040 "c",
4041 &format!("p{i}"),
4042 &[i as f32, 0.0, 0.0, 0.0],
4043 &json!({}),
4044 )
4045 .unwrap();
4046 }
4047 let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
4048 assert!(!db.collections["c"].stale);
4049 db.upsert("c", "p2", &[42.0, 0.0, 0.0, 0.0], &json!({}))
4050 .unwrap();
4051 assert!(db.collections["c"].stale, "hnsw update schedules a rebuild");
4052 let res = db
4054 .search("c", &[42.0, 0.0, 0.0, 0.0], &SearchParams::default())
4055 .unwrap();
4056 assert_eq!(res[0].id, "p2");
4057 }
4058
4059 #[test]
4060 fn unsupported_index_configurations_are_rejected() {
4061 let tmp = tempfile::tempdir().unwrap();
4062 let mut db = open(tmp.path());
4063 let dot_vamana =
4065 Descriptor::new(4, Dtype::F32, DistanceMetric::Dot).with_index(IndexSpec {
4066 kind: IndexKind::Vamana,
4067 pq_subspaces: None,
4068 });
4069 assert!(matches!(
4070 db.create_collection("a", dot_vamana),
4071 Err(Error::Unsupported(_))
4072 ));
4073 let dot_disk = Descriptor::new(4, Dtype::F32, DistanceMetric::Dot).with_index(IndexSpec {
4075 kind: IndexKind::DiskVamana,
4076 pq_subspaces: None,
4077 });
4078 assert!(matches!(
4079 db.create_collection("b", dot_disk),
4080 Err(Error::Unsupported(_))
4081 ));
4082 }
4083
4084 #[test]
4085 fn dcpe_collections_require_the_l2_metric() {
4086 let tmp = tempfile::tempdir().unwrap();
4087 let mut db = open(tmp.path());
4088 for metric in [DistanceMetric::Cosine, DistanceMetric::Dot] {
4090 let bad = Descriptor::new(4, Dtype::F32, metric)
4091 .with_vector_encryption(VectorEncryption::Dcpe);
4092 assert!(matches!(
4093 db.create_collection("bad", bad),
4094 Err(Error::Unsupported(_))
4095 ));
4096 }
4097 let good = Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
4099 .with_vector_encryption(VectorEncryption::Dcpe);
4100 db.create_collection("enc", good)
4101 .expect("l2 dcpe collection");
4102 assert_eq!(
4103 db.descriptor("enc").expect("descriptor").vector_encryption,
4104 VectorEncryption::Dcpe
4105 );
4106 }
4107
4108 #[test]
4109 fn client_side_collections_are_fetch_only_and_reject_search() {
4110 let tmp = tempfile::tempdir().unwrap();
4111 let mut db = open(tmp.path());
4112 let desc = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
4115 .with_vector_encryption(VectorEncryption::ClientSide);
4116 db.create_collection("vault", desc)
4117 .expect("create client-side collection");
4118 assert!(matches!(
4120 db.collections["vault"].index,
4121 CollectionIndex::None
4122 ));
4123
4124 for i in 0..5 {
4127 let tier = if i < 2 { "vip" } else { "std" };
4128 db.upsert(
4129 "vault",
4130 &format!("p{i}"),
4131 &[0.0; 4],
4132 &serde_json::json!({ "__quiver_vec__": format!("ct-{i}"), "tier": tier }),
4133 )
4134 .expect("upsert");
4135 }
4136 assert_eq!(db.len("vault").unwrap(), 5);
4137 assert!(matches!(
4139 db.collections["vault"].index,
4140 CollectionIndex::None
4141 ));
4142
4143 assert!(matches!(
4145 db.search("vault", &[0.0; 4], &SearchParams::default()),
4146 Err(Error::Unsupported(_))
4147 ));
4148
4149 let all = db.fetch("vault", None, 0, 100, true, false).unwrap();
4152 assert_eq!(all.len(), 5);
4153 assert!(
4154 all.iter()
4155 .all(|m| m.payload.is_some() && m.vector.is_none())
4156 );
4157
4158 let vip = db
4160 .fetch(
4161 "vault",
4162 Some(&Filter::Eq {
4163 field: "tier".to_owned(),
4164 value: serde_json::json!("vip"),
4165 }),
4166 0,
4167 100,
4168 false,
4169 false,
4170 )
4171 .unwrap();
4172 assert_eq!(vip.len(), 2);
4173 assert_eq!(
4175 db.fetch("vault", None, 0, 2, false, false).unwrap().len(),
4176 2
4177 );
4178 assert_eq!(
4180 db.fetch("vault", None, 3, 100, false, false).unwrap().len(),
4181 2
4182 );
4183
4184 assert_eq!(db.get("vault", "p0").unwrap().unwrap().id, "p0");
4187 assert!(db.delete("vault", "p0").unwrap());
4188 assert_eq!(db.len("vault").unwrap(), 4);
4189 }
4190
4191 #[test]
4192 fn client_side_encryption_rejects_multivector() {
4193 let tmp = tempfile::tempdir().unwrap();
4194 let mut db = open(tmp.path());
4195 let desc = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
4196 .with_multivector(true)
4197 .with_vector_encryption(VectorEncryption::ClientSide);
4198 assert!(matches!(
4199 db.create_collection("bad", desc),
4200 Err(Error::Unsupported(_))
4201 ));
4202 }
4203
4204 fn contains_file(dir: &Path, name: &str) -> bool {
4206 std::fs::read_dir(dir).is_ok_and(|rd| {
4207 rd.flatten().any(|e| {
4208 let p = e.path();
4209 if p.is_dir() {
4210 contains_file(&p, name)
4211 } else {
4212 p.file_name().is_some_and(|f| f == name)
4213 }
4214 })
4215 })
4216 }
4217
4218 #[test]
4219 fn disk_index_collection_searches_persists_and_writes_an_artifact() {
4220 let tmp = tempfile::tempdir().unwrap();
4221 {
4222 let mut db = open(tmp.path());
4223 db.create_collection("d", desc_with(IndexKind::DiskVamana))
4224 .unwrap();
4225 for i in 0..40u32 {
4226 db.upsert(
4227 "d",
4228 &format!("p{i}"),
4229 &[i as f32, 0.0, 0.0, 0.0],
4230 &json!({}),
4231 )
4232 .unwrap();
4233 }
4234 let res = db
4235 .search("d", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
4236 .unwrap();
4237 assert_eq!(res[0].id, "p7");
4238 db.checkpoint().unwrap();
4239 }
4240 assert!(
4242 contains_file(tmp.path(), "vamana.qvx"),
4243 "disk index file missing"
4244 );
4245 let mut db = open(tmp.path());
4247 assert_eq!(
4248 db.descriptor("d").unwrap().index.kind,
4249 IndexKind::DiskVamana
4250 );
4251 let res = db
4252 .search("d", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
4253 .unwrap();
4254 assert_eq!(res[0].id, "p7");
4255 }
4256
4257 #[test]
4258 fn graph_collections_maintain_writes_incrementally() {
4259 for kind in [IndexKind::Vamana, IndexKind::DiskVamana] {
4263 let tmp = tempfile::tempdir().unwrap();
4264 let mut db = open(tmp.path());
4265 db.create_collection("c", desc_with(kind)).unwrap();
4266 for i in 0..40u32 {
4267 db.upsert(
4268 "c",
4269 &format!("p{i}"),
4270 &[i as f32, 0.0, 0.0, 0.0],
4271 &json!({}),
4272 )
4273 .unwrap();
4274 }
4275 let res = db
4277 .search("c", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
4278 .unwrap();
4279 assert_eq!(res[0].id, "p7", "{kind:?} base nearest");
4280
4281 db.upsert("c", "p7b", &[7.4, 0.0, 0.0, 0.0], &json!({}))
4284 .unwrap();
4285 let res = db
4286 .search("c", &[7.45, 0.0, 0.0, 0.0], &SearchParams::default())
4287 .unwrap();
4288 assert_eq!(res[0].id, "p7b", "{kind:?} delta insert not found");
4289
4290 assert!(db.delete("c", "p7").unwrap());
4292 let res = db
4293 .search("c", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
4294 .unwrap();
4295 assert!(
4296 res.iter().all(|m| m.id != "p7"),
4297 "{kind:?} deleted id returned"
4298 );
4299
4300 db.upsert("c", "p20", &[500.0, 0.0, 0.0, 0.0], &json!({}))
4303 .unwrap();
4304 let res = db
4305 .search("c", &[500.0, 0.0, 0.0, 0.0], &SearchParams::default())
4306 .unwrap();
4307 assert_eq!(res[0].id, "p20", "{kind:?} updated vector not at new spot");
4308 let res = db
4309 .search("c", &[20.0, 0.0, 0.0, 0.0], &SearchParams::default())
4310 .unwrap();
4311 assert_ne!(
4312 res[0].id, "p20",
4313 "{kind:?} stale copy still nearest old spot"
4314 );
4315 }
4316 }
4317
4318 #[test]
4319 fn graph_consolidates_under_heavy_churn() {
4320 let tmp = tempfile::tempdir().unwrap();
4324 let mut db = open(tmp.path());
4325 db.create_collection("c", desc_with(IndexKind::Vamana))
4326 .unwrap();
4327 for i in 0..50u32 {
4328 db.upsert(
4329 "c",
4330 &format!("p{i}"),
4331 &[i as f32, 0.0, 0.0, 0.0],
4332 &json!({}),
4333 )
4334 .unwrap();
4335 }
4336 let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
4337
4338 let deleted: Vec<String> = (0..15u32).map(|i| format!("p{i}")).collect();
4339 for i in 0..15u32 {
4340 assert!(db.delete("c", &format!("p{i}")).unwrap());
4341 db.upsert(
4342 "c",
4343 &format!("q{i}"),
4344 &[1000.0 + i as f32, 0.0, 0.0, 0.0],
4345 &json!({}),
4346 )
4347 .unwrap();
4348 }
4349
4350 let near_origin = db
4351 .search("c", &[5.0, 0.0, 0.0, 0.0], &SearchParams::default())
4352 .unwrap();
4353 assert!(
4354 near_origin.iter().all(|m| !deleted.contains(&m.id)),
4355 "a churned-out id was returned"
4356 );
4357 let near_q = db
4358 .search("c", &[1007.0, 0.0, 0.0, 0.0], &SearchParams::default())
4359 .unwrap();
4360 assert_eq!(near_q[0].id, "q7", "new point not found after churn");
4361
4362 db.checkpoint().unwrap();
4363 drop(db);
4364 let mut db = open(tmp.path());
4365 let near_q = db
4366 .search("c", &[1007.0, 0.0, 0.0, 0.0], &SearchParams::default())
4367 .unwrap();
4368 assert_eq!(near_q[0].id, "q7", "new point lost across reopen");
4369 let near_origin = db
4370 .search("c", &[5.0, 0.0, 0.0, 0.0], &SearchParams::default())
4371 .unwrap();
4372 assert!(
4373 near_origin.iter().all(|m| !deleted.contains(&m.id)),
4374 "a churned-out id resurfaced after reopen"
4375 );
4376 }
4377
4378 #[test]
4379 fn multivector_writes_are_incremental_and_match_a_rebuild() {
4380 let dir = |theta: f32| vec![theta.cos(), theta.sin(), 0.0, 0.0];
4389 let doc = |theta: f32| vec![dir(theta), dir(theta)];
4390 for kind in [
4391 IndexKind::Ivf,
4392 IndexKind::Hnsw,
4393 IndexKind::Vamana,
4394 IndexKind::Colbert,
4395 ] {
4396 let tmp = tempfile::tempdir().unwrap();
4397 let desc = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
4398 .with_multivector(true)
4399 .with_index(IndexSpec {
4400 kind,
4401 pq_subspaces: None,
4402 });
4403 let mut db = open(tmp.path());
4404 db.create_collection("m", desc).unwrap();
4405 for i in 1..=10u32 {
4407 db.upsert_document(
4408 "m",
4409 &format!("d{i}"),
4410 &doc(0.1 * i as f32),
4411 &json!({ "i": i }),
4412 )
4413 .unwrap();
4414 }
4415 let q = vec![dir(0.0)];
4416 let top = |db: &mut Database| {
4417 db.search_multi_vector(
4418 "m",
4419 &q,
4420 &SearchParams {
4421 k: 3,
4422 ..Default::default()
4423 },
4424 )
4425 .unwrap()
4426 .into_iter()
4427 .map(|m| m.id)
4428 .collect::<Vec<_>>()
4429 };
4430 assert_eq!(top(&mut db), vec!["d1", "d2", "d3"], "{kind:?} initial");
4431
4432 assert!(db.delete_document("m", "d1").unwrap());
4434 assert_eq!(
4435 top(&mut db),
4436 vec!["d2", "d3", "d4"],
4437 "{kind:?} after delete"
4438 );
4439
4440 db.upsert_document("m", "d10", &doc(0.0), &json!({ "i": 10 }))
4442 .unwrap();
4443 assert_eq!(top(&mut db)[0], "d10", "{kind:?} after update");
4444
4445 db.upsert_document("m", "d11", &doc(0.05), &json!({ "i": 11 }))
4447 .unwrap();
4448 let r = top(&mut db);
4449 assert_eq!(r[0], "d10", "{kind:?}");
4450 assert_eq!(r[1], "d11", "{kind:?} new doc not ranked");
4451
4452 db.upsert_document("m", "d8", &[dir(0.8)], &json!({ "i": 8 }))
4454 .unwrap();
4455 let d8 = db.get_document("m", "d8", true).unwrap().unwrap();
4456 assert_eq!(d8.vectors.unwrap().len(), 1, "{kind:?} trailing token kept");
4457
4458 let before = top(&mut db);
4460 drop(db);
4461 let mut db = open(tmp.path());
4462 assert_eq!(top(&mut db), before, "{kind:?} incremental != rebuild");
4463 assert!(
4464 db.get_document("m", "d1", false).unwrap().is_none(),
4465 "{kind:?} deleted doc resurfaced"
4466 );
4467 }
4468 }
4469
4470 #[test]
4471 fn colbert_index_requires_multivector() {
4472 let tmp = tempfile::tempdir().unwrap();
4473 let mut db = open(tmp.path());
4474 let single = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine).with_index(IndexSpec {
4477 kind: IndexKind::Colbert,
4478 pq_subspaces: None,
4479 });
4480 assert!(matches!(
4481 db.create_collection("c", single),
4482 Err(Error::Unsupported(_))
4483 ));
4484 let multi = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
4486 .with_multivector(true)
4487 .with_index(IndexSpec {
4488 kind: IndexKind::Colbert,
4489 pq_subspaces: None,
4490 });
4491 assert!(db.create_collection("m", multi).is_ok());
4492 }
4493
4494 fn desc_filterable() -> Descriptor {
4499 Descriptor::new(4, Dtype::F32, DistanceMetric::L2).with_filterable(vec![
4500 FilterableField::keyword("city"),
4501 FilterableField::numeric("n"),
4502 ])
4503 }
4504
4505 fn seed_cities(db: &mut Database) {
4510 const CITIES: [&str; 3] = ["paris", "lyon", "rome"];
4511 db.create_collection("c", desc_filterable()).unwrap();
4512 for i in 0..30u32 {
4513 db.upsert(
4514 "c",
4515 &format!("p{i}"),
4516 &[i as f32, 0.0, 0.0, 0.0],
4517 &json!({"city": CITIES[i as usize % 3], "n": i}),
4518 )
4519 .unwrap();
4520 }
4521 db.checkpoint().unwrap();
4522 }
4523
4524 #[test]
4525 fn hybrid_equality_prefilter_is_exact() {
4526 let tmp = tempfile::tempdir().unwrap();
4527 let mut db = open(tmp.path());
4528 seed_cities(&mut db);
4529 let params = SearchParams {
4530 k: 5,
4531 filter: Some(Filter::Eq {
4532 field: "city".into(),
4533 value: json!("lyon"),
4534 }),
4535 ..SearchParams::default()
4536 };
4537 let res = db.search("c", &[0.0; 4], ¶ms).unwrap();
4538 assert!(!res.is_empty());
4539 assert_eq!(res[0].id, "p1");
4541 for m in &res {
4542 assert_eq!(m.payload.as_ref().unwrap()["city"], json!("lyon"));
4543 }
4544 }
4545
4546 #[test]
4547 fn hybrid_numeric_range_prefilter_is_exact() {
4548 let tmp = tempfile::tempdir().unwrap();
4549 let mut db = open(tmp.path());
4550 seed_cities(&mut db);
4551 let params = SearchParams {
4552 k: 4,
4553 filter: Some(Filter::Gte {
4554 field: "n".into(),
4555 value: json!(10),
4556 }),
4557 ..SearchParams::default()
4558 };
4559 let res = db.search("c", &[0.0; 4], ¶ms).unwrap();
4560 assert_eq!(res[0].id, "p10");
4562 for m in &res {
4563 assert!(m.payload.as_ref().unwrap()["n"].as_u64().unwrap() >= 10);
4564 }
4565 }
4566
4567 #[test]
4568 fn hybrid_unsatisfiable_filter_returns_empty() {
4569 let tmp = tempfile::tempdir().unwrap();
4570 let mut db = open(tmp.path());
4571 seed_cities(&mut db);
4572 let params = SearchParams {
4575 filter: Some(Filter::Eq {
4576 field: "city".into(),
4577 value: json!("atlantis"),
4578 }),
4579 ..SearchParams::default()
4580 };
4581 assert!(db.search("c", &[0.0; 4], ¶ms).unwrap().is_empty());
4582 }
4583
4584 #[test]
4585 fn hybrid_and_or_composition_is_exact() {
4586 let tmp = tempfile::tempdir().unwrap();
4587 let mut db = open(tmp.path());
4588 seed_cities(&mut db);
4589 let params = SearchParams {
4592 k: 10,
4593 filter: Some(Filter::And(vec![
4594 Filter::In {
4595 field: "city".into(),
4596 values: vec![json!("paris"), json!("rome")],
4597 },
4598 Filter::Lt {
4599 field: "n".into(),
4600 value: json!(12),
4601 },
4602 ])),
4603 ..SearchParams::default()
4604 };
4605 let res = db.search("c", &[0.0; 4], ¶ms).unwrap();
4606 assert_eq!(res[0].id, "p0");
4608 for m in &res {
4609 let payload = m.payload.as_ref().unwrap();
4610 let city = payload["city"].as_str().unwrap();
4611 assert!(city == "paris" || city == "rome");
4612 assert!(payload["n"].as_u64().unwrap() < 12);
4613 }
4614 }
4615
4616 #[test]
4617 fn hybrid_rechecks_non_indexable_clause() {
4618 let tmp = tempfile::tempdir().unwrap();
4619 let mut db = open(tmp.path());
4620 seed_cities(&mut db);
4621 let params = SearchParams {
4624 k: 10,
4625 filter: Some(Filter::And(vec![
4626 Filter::Eq {
4627 field: "city".into(),
4628 value: json!("paris"),
4629 },
4630 Filter::Not(Box::new(Filter::Eq {
4631 field: "n".into(),
4632 value: json!(0),
4633 })),
4634 ])),
4635 ..SearchParams::default()
4636 };
4637 let res = db.search("c", &[0.0; 4], ¶ms).unwrap();
4638 assert!(res.iter().all(|m| m.id != "p0"));
4639 assert_eq!(res[0].id, "p3");
4641 for m in &res {
4642 assert_eq!(m.payload.as_ref().unwrap()["city"], json!("paris"));
4643 }
4644 }
4645
4646 #[test]
4647 fn post_filter_fallback_on_undeclared_field_is_correct() {
4648 let tmp = tempfile::tempdir().unwrap();
4649 let mut db = open(tmp.path());
4650 db.create_collection(
4653 "c",
4654 Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
4655 .with_filterable(vec![FilterableField::keyword("city")]),
4656 )
4657 .unwrap();
4658 for i in 0..20u32 {
4659 let tier = if i % 2 == 0 { "gold" } else { "silver" };
4660 db.upsert(
4661 "c",
4662 &format!("p{i}"),
4663 &[i as f32, 0.0, 0.0, 0.0],
4664 &json!({"city": "paris", "tier": tier}),
4665 )
4666 .unwrap();
4667 }
4668 let params = SearchParams {
4669 k: 5,
4670 filter: Some(Filter::Eq {
4671 field: "tier".into(),
4672 value: json!("gold"),
4673 }),
4674 ..SearchParams::default()
4675 };
4676 let res = db.search("c", &[0.0; 4], ¶ms).unwrap();
4677 assert!(!res.is_empty());
4678 for m in &res {
4679 assert_eq!(m.payload.as_ref().unwrap()["tier"], json!("gold"));
4680 }
4681 }
4682
4683 #[test]
4684 fn leaf_predicate_maps_only_indexable_filterable_leaves() {
4685 let fields = vec![
4686 FilterableField::keyword("city"),
4687 FilterableField::numeric("n"),
4688 ];
4689 assert_eq!(
4691 leaf_predicate(
4692 &Filter::Eq {
4693 field: "city".into(),
4694 value: json!("paris")
4695 },
4696 &fields
4697 ),
4698 Some(SecPredicate::Eq {
4699 field: "city".into(),
4700 value: SecValue::Keyword("paris".into())
4701 })
4702 );
4703 assert_eq!(
4705 leaf_predicate(
4706 &Filter::Gte {
4707 field: "n".into(),
4708 value: json!(3)
4709 },
4710 &fields
4711 ),
4712 Some(SecPredicate::Range {
4713 field: "n".into(),
4714 lo: Some(SecValue::Numeric(3.0)),
4715 hi: None,
4716 lo_inclusive: true,
4717 hi_inclusive: false,
4718 })
4719 );
4720 let undeclared = Filter::Eq {
4722 field: "tier".into(),
4723 value: json!("gold"),
4724 };
4725 let mismatch = Filter::Eq {
4726 field: "city".into(),
4727 value: json!(5),
4728 };
4729 let ne = Filter::Ne {
4730 field: "city".into(),
4731 value: json!("x"),
4732 };
4733 let exists = Filter::Exists {
4734 field: "city".into(),
4735 };
4736 assert!(leaf_predicate(&undeclared, &fields).is_none());
4737 assert!(leaf_predicate(&mismatch, &fields).is_none());
4738 assert!(leaf_predicate(&ne, &fields).is_none());
4739 assert!(leaf_predicate(&exists, &fields).is_none());
4740 }
4741
4742 fn ivf_index_dir(root: &Path) -> std::path::PathBuf {
4746 root.join("collections").join("0000000000").join("index")
4747 }
4748
4749 fn idx_snapshot_files(root: &Path) -> Vec<String> {
4750 let mut v: Vec<String> = std::fs::read_dir(ivf_index_dir(root))
4751 .map(|rd| {
4752 rd.filter_map(std::result::Result::ok)
4753 .filter_map(|e| e.file_name().to_str().map(str::to_owned))
4754 .filter(|n| n.starts_with("idx-"))
4755 .collect()
4756 })
4757 .unwrap_or_default();
4758 v.sort();
4759 v
4760 }
4761
4762 fn nearest(db: &mut Database, q: &[f32]) -> Vec<String> {
4763 db.search("c", q, &SearchParams::default())
4764 .unwrap()
4765 .into_iter()
4766 .map(|m| m.id)
4767 .collect()
4768 }
4769
4770 fn seed_ivf(db: &mut Database, n: u32) {
4771 db.create_collection("c", desc_with(IndexKind::Ivf))
4772 .unwrap();
4773 for i in 0..n {
4774 db.upsert(
4775 "c",
4776 &format!("p{i}"),
4777 &[i as f32, 0.0, 0.0, 0.0],
4778 &json!({}),
4779 )
4780 .unwrap();
4781 }
4782 let _ = nearest(db, &[1.0, 0.0, 0.0, 0.0]);
4784 }
4785
4786 #[test]
4787 fn ivf_snapshot_is_written_at_checkpoint() {
4788 let tmp = tempfile::tempdir().unwrap();
4789 let mut db = open(tmp.path());
4790 seed_ivf(&mut db, 40);
4791 db.checkpoint().unwrap();
4792 assert_eq!(idx_snapshot_files(tmp.path()).len(), 1);
4793 }
4794
4795 #[test]
4796 fn ivf_loads_from_snapshot_rather_than_rebuilding() {
4797 let tmp = tempfile::tempdir().unwrap();
4798 {
4799 let mut db = open(tmp.path());
4800 db.create_collection("c", desc_with(IndexKind::Ivf))
4801 .unwrap();
4802 db.upsert("c", "a", &[0.0, 0.0, 0.0, 0.0], &json!({}))
4803 .unwrap();
4804 db.upsert("c", "m", &[1.0, 0.0, 0.0, 0.0], &json!({}))
4805 .unwrap();
4806 let _ = nearest(&mut db, &[0.0, 0.0, 0.0, 0.0]);
4808 db.upsert("c", "z", &[2.0, 0.0, 0.0, 0.0], &json!({}))
4810 .unwrap();
4811 db.upsert("c", "b", &[3.0, 0.0, 0.0, 0.0], &json!({}))
4812 .unwrap();
4813 db.checkpoint().unwrap();
4814 assert_eq!(db.collections["c"].int_to_ext, ["a", "m", "z", "b"]);
4815 }
4816 let db = open(tmp.path());
4817 assert_eq!(
4820 db.collections["c"].int_to_ext,
4821 ["a", "m", "z", "b"],
4822 "index was rebuilt, not loaded from the snapshot"
4823 );
4824 }
4825
4826 #[test]
4827 fn ivf_recovery_replays_post_checkpoint_upserts() {
4828 let tmp = tempfile::tempdir().unwrap();
4829 {
4830 let mut db = open(tmp.path());
4831 seed_ivf(&mut db, 30);
4832 db.checkpoint().unwrap();
4833 db.upsert("c", "far", &[500.0, 0.0, 0.0, 0.0], &json!({}))
4835 .unwrap();
4836 }
4837 let mut db = open(tmp.path());
4838 assert_eq!(nearest(&mut db, &[500.0, 0.0, 0.0, 0.0])[0], "far");
4839 assert_eq!(nearest(&mut db, &[1.0, 0.0, 0.0, 0.0])[0], "p1");
4840 }
4841
4842 #[test]
4843 fn ivf_recovery_replays_post_checkpoint_deletes() {
4844 let tmp = tempfile::tempdir().unwrap();
4845 {
4846 let mut db = open(tmp.path());
4847 seed_ivf(&mut db, 30);
4848 db.checkpoint().unwrap();
4849 assert!(db.delete("c", "p7").unwrap());
4850 }
4851 let mut db = open(tmp.path());
4852 assert!(
4853 nearest(&mut db, &[7.0, 0.0, 0.0, 0.0])
4854 .iter()
4855 .all(|id| id != "p7")
4856 );
4857 assert!(db.get("c", "p7").unwrap().is_none());
4858 assert!(db.get("c", "p6").unwrap().is_some());
4859 }
4860
4861 #[test]
4862 fn ivf_recovery_replays_post_checkpoint_updates() {
4863 let tmp = tempfile::tempdir().unwrap();
4864 {
4865 let mut db = open(tmp.path());
4866 seed_ivf(&mut db, 30);
4867 db.checkpoint().unwrap();
4868 db.upsert("c", "p0", &[999.0, 0.0, 0.0, 0.0], &json!({}))
4870 .unwrap();
4871 }
4872 let mut db = open(tmp.path());
4873 assert_eq!(nearest(&mut db, &[999.0, 0.0, 0.0, 0.0])[0], "p0");
4874 assert_ne!(
4875 nearest(&mut db, &[0.0, 0.0, 0.0, 0.0])[0],
4876 "p0",
4877 "the stale p0 vector survived the update"
4878 );
4879 }
4880
4881 #[test]
4882 fn corrupt_ivf_snapshot_falls_back_to_rebuild() {
4883 let tmp = tempfile::tempdir().unwrap();
4884 {
4885 let mut db = open(tmp.path());
4886 seed_ivf(&mut db, 30);
4887 db.checkpoint().unwrap();
4888 }
4889 let files = idx_snapshot_files(tmp.path());
4891 assert_eq!(files.len(), 1);
4892 std::fs::write(ivf_index_dir(tmp.path()).join(&files[0]), b"corrupt").unwrap();
4893
4894 let mut db = open(tmp.path());
4895 assert_eq!(nearest(&mut db, &[7.0, 0.0, 0.0, 0.0])[0], "p7");
4896 }
4897
4898 fn mv_desc() -> Descriptor {
4901 Descriptor::new(3, Dtype::F32, DistanceMetric::Cosine).with_multivector(true)
4902 }
4903
4904 fn bf_rank(query: &[Vec<f32>], corpus: &[(&str, Vec<Vec<f32>>)]) -> Vec<(String, f32)> {
4907 let mut v: Vec<(String, f32)> = corpus
4908 .iter()
4909 .map(|(id, toks)| ((*id).to_owned(), max_sim(Metric::Cosine, query, toks)))
4910 .collect();
4911 v.sort_by(|a, b| b.1.total_cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
4912 v
4913 }
4914
4915 #[test]
4916 fn multivector_search_ranks_documents_by_maxsim() {
4917 let tmp = tempfile::tempdir().unwrap();
4918 let mut db = open(tmp.path());
4919 db.create_collection("docs", mv_desc()).unwrap();
4920 let corpus: Vec<(&str, Vec<Vec<f32>>)> = vec![
4921 ("d_cat", vec![vec![1.0, 0.0, 0.0], vec![0.0, 1.0, 0.0]]),
4922 ("d_dog", vec![vec![0.0, 1.0, 0.0], vec![0.0, 0.0, 1.0]]),
4923 (
4924 "d_mix",
4925 vec![
4926 vec![1.0, 1.0, 0.0],
4927 vec![0.0, 0.0, 1.0],
4928 vec![1.0, 0.0, 1.0],
4929 ],
4930 ),
4931 ];
4932 for (id, toks) in &corpus {
4933 db.upsert_document("docs", id, toks, &json!({ "id": id }))
4934 .unwrap();
4935 }
4936 assert_eq!(db.document_count("docs").unwrap(), 3);
4937
4938 let query = vec![vec![1.0, 0.0, 0.0], vec![0.0, 0.0, 1.0]];
4939 let params = SearchParams {
4940 k: 3,
4941 with_payload: false,
4942 ..SearchParams::default()
4943 };
4944 let got = db.search_multi_vector("docs", &query, ¶ms).unwrap();
4945 let expected = bf_rank(&query, &corpus);
4946
4947 assert_eq!(got.len(), 3);
4948 for (g, (eid, escore)) in got.iter().zip(expected.iter()) {
4949 assert_eq!(&g.id, eid, "ranking matches brute force");
4950 assert!(
4951 (g.score - escore).abs() < 1e-5,
4952 "{} score {} vs {escore}",
4953 g.id,
4954 g.score
4955 );
4956 }
4957 }
4958
4959 #[test]
4960 fn multivector_search_truncates_to_k() {
4961 let tmp = tempfile::tempdir().unwrap();
4962 let mut db = open(tmp.path());
4963 db.create_collection("docs", mv_desc()).unwrap();
4964 for i in 0..5 {
4965 let v = vec![vec![1.0, i as f32, 0.0]];
4966 db.upsert_document("docs", &format!("d{i}"), &v, &json!({}))
4967 .unwrap();
4968 }
4969 let params = SearchParams {
4970 k: 2,
4971 ..SearchParams::default()
4972 };
4973 let got = db
4974 .search_multi_vector("docs", &[vec![1.0, 0.0, 0.0]], ¶ms)
4975 .unwrap();
4976 assert_eq!(got.len(), 2);
4977 }
4978
4979 #[test]
4980 fn multivector_filter_selects_documents_exactly() {
4981 let tmp = tempfile::tempdir().unwrap();
4982 let mut db = open(tmp.path());
4983 db.create_collection("docs", mv_desc()).unwrap();
4984 db.upsert_document("docs", "a", &[vec![1.0, 0.0, 0.0]], &json!({"lang":"en"}))
4986 .unwrap();
4987 db.upsert_document("docs", "b", &[vec![1.0, 0.0, 0.0]], &json!({"lang":"fr"}))
4988 .unwrap();
4989 let params = SearchParams {
4990 k: 10,
4991 filter: Some(Filter::Eq {
4992 field: "lang".into(),
4993 value: json!("fr"),
4994 }),
4995 ..SearchParams::default()
4996 };
4997 let got = db
4998 .search_multi_vector("docs", &[vec![1.0, 0.0, 0.0]], ¶ms)
4999 .unwrap();
5000 assert_eq!(got.len(), 1);
5001 assert_eq!(got[0].id, "b");
5002 assert_eq!(got[0].payload, Some(json!({"lang":"fr"})));
5003 }
5004
5005 #[test]
5006 fn multivector_reopen_rebuilds_grouping_and_ranking() {
5007 let tmp = tempfile::tempdir().unwrap();
5008 let query = vec![vec![1.0, 0.0, 0.0], vec![0.0, 0.0, 1.0]];
5009 let corpus: Vec<(&str, Vec<Vec<f32>>)> = vec![
5010 ("x", vec![vec![1.0, 0.0, 0.0], vec![0.0, 1.0, 0.0]]),
5011 ("y", vec![vec![0.0, 0.0, 1.0], vec![1.0, 0.0, 1.0]]),
5012 ];
5013 {
5014 let mut db = open(tmp.path());
5015 db.create_collection("docs", mv_desc()).unwrap();
5016 for (id, toks) in &corpus {
5017 db.upsert_document("docs", id, toks, &json!({})).unwrap();
5018 }
5019 db.checkpoint().unwrap();
5020 }
5021 let mut db = open(tmp.path());
5023 assert_eq!(db.document_count("docs").unwrap(), 2);
5024 let params = SearchParams {
5025 k: 2,
5026 ..SearchParams::default()
5027 };
5028 let got = db.search_multi_vector("docs", &query, ¶ms).unwrap();
5029 let expected = bf_rank(&query, &corpus);
5030 assert_eq!(
5031 got.iter().map(|m| m.id.clone()).collect::<Vec<_>>(),
5032 expected
5033 .iter()
5034 .map(|(id, _)| id.clone())
5035 .collect::<Vec<_>>()
5036 );
5037 }
5038
5039 #[test]
5040 fn multivector_delete_document_removes_all_tokens() {
5041 let tmp = tempfile::tempdir().unwrap();
5042 let mut db = open(tmp.path());
5043 db.create_collection("docs", mv_desc()).unwrap();
5044 db.upsert_document(
5045 "docs",
5046 "a",
5047 &[vec![1.0, 0.0, 0.0], vec![0.0, 1.0, 0.0]],
5048 &json!({}),
5049 )
5050 .unwrap();
5051 db.upsert_document("docs", "b", &[vec![0.0, 0.0, 1.0]], &json!({}))
5052 .unwrap();
5053 assert_eq!(db.document_count("docs").unwrap(), 2);
5054 assert_eq!(db.len("docs").unwrap(), 3);
5055
5056 assert!(db.delete_document("docs", "a").unwrap());
5057 assert_eq!(db.document_count("docs").unwrap(), 1);
5058 assert_eq!(db.len("docs").unwrap(), 1);
5059 assert!(db.get_document("docs", "a", false).unwrap().is_none());
5060 let params = SearchParams {
5061 k: 10,
5062 ..SearchParams::default()
5063 };
5064 let got = db
5065 .search_multi_vector("docs", &[vec![1.0, 0.0, 0.0]], ¶ms)
5066 .unwrap();
5067 assert!(got.iter().all(|m| m.id != "a"));
5068 assert!(!db.delete_document("docs", "a").unwrap());
5069 }
5070
5071 #[test]
5072 fn multivector_reupsert_replaces_tokens() {
5073 let tmp = tempfile::tempdir().unwrap();
5074 let mut db = open(tmp.path());
5075 db.create_collection("docs", mv_desc()).unwrap();
5076 db.upsert_document(
5077 "docs",
5078 "a",
5079 &[
5080 vec![1.0, 0.0, 0.0],
5081 vec![0.0, 1.0, 0.0],
5082 vec![0.0, 0.0, 1.0],
5083 ],
5084 &json!({"v":1}),
5085 )
5086 .unwrap();
5087 assert_eq!(db.len("docs").unwrap(), 3);
5088 db.upsert_document("docs", "a", &[vec![0.0, 0.0, 1.0]], &json!({"v":2}))
5090 .unwrap();
5091 assert_eq!(db.document_count("docs").unwrap(), 1);
5092 assert_eq!(db.len("docs").unwrap(), 1);
5093 let doc = db.get_document("docs", "a", true).unwrap().unwrap();
5094 assert_eq!(doc.payload, Some(json!({"v":2})));
5095 assert_eq!(doc.vectors, Some(vec![vec![0.0, 0.0, 1.0]]));
5096 }
5097
5098 #[test]
5099 fn single_and_multi_vector_apis_are_mutually_exclusive() {
5100 let tmp = tempfile::tempdir().unwrap();
5101 let mut db = open(tmp.path());
5102 db.create_collection("mv", mv_desc()).unwrap();
5103 db.create_collection("sv", Descriptor::new(3, Dtype::F32, DistanceMetric::Cosine))
5104 .unwrap();
5105 assert!(matches!(
5107 db.upsert("mv", "a", &[1.0, 0.0, 0.0], &json!({})),
5108 Err(Error::Unsupported(_))
5109 ));
5110 assert!(matches!(
5111 db.search("mv", &[1.0, 0.0, 0.0], &SearchParams::default()),
5112 Err(Error::Unsupported(_))
5113 ));
5114 assert!(matches!(
5116 db.upsert_document("sv", "a", &[vec![1.0, 0.0, 0.0]], &json!({})),
5117 Err(Error::Unsupported(_))
5118 ));
5119 assert!(matches!(
5120 db.search_multi_vector("sv", &[vec![1.0, 0.0, 0.0]], &SearchParams::default()),
5121 Err(Error::Unsupported(_))
5122 ));
5123 assert!(matches!(
5124 db.document_count("sv"),
5125 Err(Error::Unsupported(_))
5126 ));
5127 }
5128
5129 #[test]
5130 fn multivector_rejects_l2_metric_and_bad_documents() {
5131 let tmp = tempfile::tempdir().unwrap();
5132 let mut db = open(tmp.path());
5133 let l2 = Descriptor::new(3, Dtype::F32, DistanceMetric::L2).with_multivector(true);
5134 assert!(matches!(
5135 db.create_collection("bad", l2),
5136 Err(Error::Unsupported(_))
5137 ));
5138
5139 db.create_collection("docs", mv_desc()).unwrap();
5140 assert!(matches!(
5142 db.upsert_document("docs", "a\u{1f}b", &[vec![1.0, 0.0, 0.0]], &json!({})),
5143 Err(Error::Unsupported(_))
5144 ));
5145 assert!(matches!(
5147 db.upsert_document("docs", "a", &[], &json!({})),
5148 Err(Error::Unsupported(_))
5149 ));
5150 assert!(matches!(
5151 db.upsert_document("docs", "a", &[vec![1.0, 0.0]], &json!({})),
5152 Err(Error::Unsupported(_))
5153 ));
5154 }
5155
5156 #[test]
5157 fn snapshot_then_open_reproduces_the_database() {
5158 let src = tempfile::tempdir().unwrap();
5159 let mut db = open(src.path());
5160 db.create_collection("kb", desc()).unwrap();
5161 db.create_collection("kb2", desc()).unwrap();
5162 db.upsert("kb", "a", &[1.0, 0.0, 0.0, 0.0], &json!({ "n": 1 }))
5163 .unwrap();
5164 db.upsert("kb", "b", &[0.0, 1.0, 0.0, 0.0], &json!({ "n": 2 }))
5165 .unwrap();
5166 db.upsert("kb2", "z", &[0.0, 0.0, 1.0, 0.0], &json!({ "n": 3 }))
5167 .unwrap();
5168
5169 let dest = tempfile::tempdir().unwrap();
5170 let snap_dir = dest.path().join("snap");
5171 let info = db.snapshot(&snap_dir).unwrap();
5172 assert!(info.files > 0 && info.bytes > 0);
5173 assert_eq!(info.manifest_version, db.manifest_version());
5174
5175 db.upsert("kb", "late", &[1.0, 1.0, 0.0, 0.0], &json!({ "n": 9 }))
5177 .unwrap();
5178
5179 let restored = open(&snap_dir);
5180 let mut names = restored.collection_names();
5181 names.sort();
5182 assert_eq!(names, vec!["kb".to_owned(), "kb2".to_owned()]);
5183 assert_eq!(restored.len("kb").unwrap(), 2, "no post-snapshot write");
5184 assert_eq!(
5185 restored.get("kb", "a").unwrap().unwrap().payload,
5186 Some(json!({ "n": 1 }))
5187 );
5188 assert_eq!(restored.len("kb2").unwrap(), 1);
5189 assert!(restored.get("kb", "late").unwrap().is_none());
5190 }
5191
5192 #[test]
5193 fn snapshot_refuses_an_existing_destination() {
5194 let src = tempfile::tempdir().unwrap();
5195 let mut db = open(src.path());
5196 let dest = tempfile::tempdir().unwrap(); assert!(matches!(
5198 db.snapshot(dest.path()),
5199 Err(Error::Core(quiver_core::CoreError::AlreadyExists(_)))
5200 ));
5201 }
5202
5203 #[test]
5204 fn restore_snapshot_roundtrips_and_guards() {
5205 let src = tempfile::tempdir().unwrap();
5206 let mut db = open(src.path());
5207 db.create_collection("kb", desc()).unwrap();
5208 db.upsert("kb", "a", &[1.0, 0.0, 0.0, 0.0], &json!({ "n": 1 }))
5209 .unwrap();
5210 let work = tempfile::tempdir().unwrap();
5211 let snap_dir = work.path().join("snap");
5212 db.snapshot(&snap_dir).unwrap();
5213
5214 let restored_dir = work.path().join("restored");
5216 let info = restore_snapshot(&snap_dir, &restored_dir).unwrap();
5217 assert!(info.files > 0);
5218 let restored = open(&restored_dir);
5219 assert_eq!(restored.len("kb").unwrap(), 1);
5220
5221 assert!(matches!(
5223 restore_snapshot(&snap_dir, &restored_dir),
5224 Err(Error::Core(quiver_core::CoreError::AlreadyExists(_)))
5225 ));
5226 let not_snap = work.path().join("not-a-snapshot");
5228 std::fs::create_dir_all(¬_snap).unwrap();
5229 assert!(matches!(
5230 restore_snapshot(¬_snap, &work.path().join("out")),
5231 Err(Error::Core(quiver_core::CoreError::InvalidArgument(_)))
5232 ));
5233 }
5234}