1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
46use std::path::Path;
47use std::sync::Arc;
48
49use arc_swap::ArcSwap;
50use quiver_core::{SecPredicate, SecValue, Store};
51use quiver_index::{
52 ColbertConfig, ColbertIndex, DiskVamana, FreshDiskVamana, FreshVamana, Hnsw, HnswConfig, Index,
53 Ivf, IvfConfig, Metric, Neighbor, ProductQuantizer, Vamana, VamanaConfig, max_sim,
54 ordering_distance, report_metric,
55};
56use serde::{Deserialize, Serialize};
57use serde_json::Value;
58use thiserror::Error;
59
60pub use quiver_core::keyring::{KeyRing, SingleCodecKeyRing};
61pub use quiver_core::page::PageCodec;
62pub use quiver_core::{CollectionId, CommitObserver, WalEntry, WalOp};
63pub use quiver_core::{
64 Descriptor, DistanceMetric, Dtype, FieldType, FilterableField, IndexKind, IndexSpec,
65 VectorEncryption,
66};
67pub use quiver_query::Filter;
68pub use quiver_query::{
69 BM25_B, BM25_K1, DEFAULT_RRF_K0, SPARSE_KEY, SparseInvertedIndex, SparseVector, TEXT_KEY,
70 query_term_ids, rrf_fuse, text_to_sparse,
71};
72
73#[derive(Debug, Error)]
75#[non_exhaustive]
76pub enum Error {
77 #[error(transparent)]
80 Core(#[from] quiver_core::CoreError),
81 #[error(transparent)]
83 Index(#[from] quiver_index::IndexError),
84 #[error(transparent)]
86 Disk(#[from] quiver_index::DiskError),
87 #[error("payload json error: {0}")]
89 Json(#[from] serde_json::Error),
90 #[error("collection not found: {0}")]
92 CollectionNotFound(String),
93 #[error("unsupported configuration: {0}")]
95 Unsupported(&'static str),
96 #[error(transparent)]
99 IndexSnapshot(#[from] quiver_index::SnapshotError),
100 #[error("index snapshot envelope: {0}")]
102 Envelope(#[from] postcard::Error),
103}
104
105pub type Result<T> = std::result::Result<T, Error>;
107
108#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
111pub struct SnapshotInfo {
112 pub manifest_version: u64,
114 pub files: u64,
116 pub bytes: u64,
118}
119
120#[derive(Debug, Clone, PartialEq)]
122pub struct Match {
123 pub id: String,
125 pub score: f32,
127 pub payload: Option<Value>,
129 pub vector: Option<Vec<f32>>,
131}
132
133#[derive(Debug, Clone, PartialEq)]
137pub struct DocumentMatch {
138 pub id: String,
140 pub score: f32,
142 pub payload: Option<Value>,
144 pub vectors: Option<Vec<Vec<f32>>>,
146}
147
148type ScoredDocument = (f32, String, Option<Value>, Option<Vec<Vec<f32>>>);
153
154#[derive(Debug, Clone)]
156pub struct SearchParams {
157 pub k: usize,
159 pub filter: Option<Filter>,
164 pub ef_search: usize,
166 pub with_payload: bool,
168 pub with_vector: bool,
170}
171
172impl Default for SearchParams {
173 fn default() -> Self {
174 Self {
175 k: 10,
176 filter: None,
177 ef_search: 64,
178 with_payload: true,
179 with_vector: false,
180 }
181 }
182}
183
184const FILTER_OVERFETCH: usize = 8;
187
188const RRF_CANDIDATE_FACTOR: usize = 10;
193const MIN_RRF_CANDIDATES: usize = 100;
194
195const FULL_SCAN_THRESHOLD: usize = 10_000;
203
204const HNSW_REBUILD_DELETED_FRACTION: f64 = 0.2;
208
209const GRAPH_REBUILD_PENDING_FRACTION: f64 = 0.2;
215
216enum CollectionIndex {
223 None,
227 Hnsw(Hnsw),
228 Vamana(Option<FreshVamana>),
229 Ivf(Option<Ivf>),
230 Disk(Option<FreshDiskVamana>),
234 Colbert(Option<ColbertIndex>),
238}
239
240impl CollectionIndex {
241 fn search(&self, query: &[f32], k: usize, ef: usize) -> Result<Vec<Neighbor>> {
243 Ok(match self {
244 CollectionIndex::Hnsw(h) => h.search(query, k, ef)?,
245 CollectionIndex::Vamana(Some(g)) => g.search(query, k, ef)?,
246 CollectionIndex::Ivf(Some(i)) => i.search(query, k, ef)?,
247 CollectionIndex::Disk(Some(d)) => d.search(query, k, ef)?,
248 CollectionIndex::Colbert(Some(c)) => c.search(query, k, ef)?,
249 CollectionIndex::None
250 | CollectionIndex::Vamana(None)
251 | CollectionIndex::Ivf(None)
252 | CollectionIndex::Disk(None)
253 | CollectionIndex::Colbert(None) => Vec::new(),
254 })
255 }
256}
257
258pub type SnapshotCell = Arc<ArcSwap<CollectionSnapshot>>;
275
276#[derive(Default, Clone)]
281struct Overlay {
282 upserts: Vec<(Arc<[f32]>, String)>,
285 tombstones: HashSet<u64>,
288}
289
290pub struct CollectionSnapshot {
297 base: Arc<CollectionIndex>,
298 base_int_to_ext: Arc<Vec<String>>,
299 base_len: u64,
300 overlay: Arc<Overlay>,
301 metric: Metric,
302}
303
304impl CollectionSnapshot {
305 fn empty(metric: Metric) -> Self {
308 Self {
309 base: Arc::new(CollectionIndex::None),
310 base_int_to_ext: Arc::new(Vec::new()),
311 base_len: 0,
312 overlay: Arc::new(Overlay::default()),
313 metric,
314 }
315 }
316
317 fn ext_id(&self, internal: u64) -> Option<&str> {
320 if internal < self.base_len {
321 self.base_int_to_ext
322 .get(internal as usize)
323 .map(String::as_str)
324 } else {
325 self.overlay
326 .upserts
327 .get((internal - self.base_len) as usize)
328 .map(|(_, e)| e.as_str())
329 }
330 }
331
332 pub fn search(&self, query: &[f32], k: usize, ef_search: usize) -> Result<Vec<Match>> {
342 if k == 0 {
343 return Ok(Vec::new());
344 }
345 let mut cands: Vec<(f32, u64)> = Vec::new();
348 for n in self.base.search(query, k, ef_search)? {
349 if !self.overlay.tombstones.contains(&n.id) {
350 cands.push((report_metric(self.metric, n.distance), n.id));
354 }
355 }
356 for (j, (vector, _)) in self.overlay.upserts.iter().enumerate() {
357 let internal = self.base_len + j as u64;
358 if !self.overlay.tombstones.contains(&internal) {
359 cands.push((ordering_distance(self.metric, query, vector), internal));
360 }
361 }
362 cands.sort_by(|a, b| a.0.total_cmp(&b.0));
363 cands.truncate(k);
364 let mut out = Vec::with_capacity(cands.len());
365 for (ordering, internal) in cands {
366 if let Some(ext) = self.ext_id(internal) {
367 out.push(Match {
368 id: ext.to_owned(),
369 score: report_metric(self.metric, ordering),
370 payload: None,
371 vector: None,
372 });
373 }
374 }
375 Ok(out)
376 }
377}
378
379fn empty_snapshot(descriptor: &Descriptor) -> SnapshotCell {
383 let metric = to_index_metric(descriptor.metric);
384 Arc::new(ArcSwap::from_pointee(CollectionSnapshot::empty(metric)))
385}
386
387fn mvcc_eligible(descriptor: &Descriptor) -> bool {
394 !descriptor.multivector
395 && descriptor.vector_encryption != VectorEncryption::ClientSide
396 && descriptor.index.kind != IndexKind::DiskVamana
397}
398
399fn mvcc_served(handle: &CollectionHandle) -> bool {
402 handle.mvcc && mvcc_eligible(&handle.descriptor)
403}
404
405fn publish_overlay(handle: &CollectionHandle, prior: &CollectionSnapshot, overlay: Arc<Overlay>) {
408 handle.snapshot.store(Arc::new(CollectionSnapshot {
409 base: prior.base.clone(),
410 base_int_to_ext: prior.base_int_to_ext.clone(),
411 base_len: prior.base_len,
412 overlay,
413 metric: prior.metric,
414 }));
415}
416
417fn publish_base(handle: &mut CollectionHandle) {
421 let base = std::mem::replace(&mut handle.index, empty_index(&handle.descriptor));
422 let metric = to_index_metric(handle.descriptor.metric);
423 handle.snapshot.store(Arc::new(CollectionSnapshot {
424 base: Arc::new(base),
425 base_int_to_ext: Arc::new(handle.int_to_ext.clone()),
426 base_len: handle.int_to_ext.len() as u64,
427 overlay: Arc::new(Overlay::default()),
428 metric,
429 }));
430}
431
432fn overlay_rebuild_threshold(base_len: u64) -> u64 {
436 (base_len / 5).max(1024)
437}
438
439fn overlay_upsert(handle: &mut CollectionHandle, ext_id: &str, vector: &[f32]) {
444 bump_write_gen(handle);
445 let cur = handle.snapshot.load_full();
446 let mut overlay = cur.overlay.as_ref().clone();
447 if let Some(&old) = handle.ext_to_int.get(ext_id) {
449 overlay.tombstones.insert(old);
450 }
451 let internal = cur.base_len + overlay.upserts.len() as u64;
452 overlay.upserts.push((Arc::from(vector), ext_id.to_owned()));
453 handle.ext_to_int.insert(ext_id.to_owned(), internal);
454 handle.int_to_ext.push(ext_id.to_owned());
455 let crowded = overlay.upserts.len() as u64 >= overlay_rebuild_threshold(cur.base_len);
456 publish_overlay(handle, &cur, Arc::new(overlay));
457 if crowded {
460 handle.stale = true;
461 }
462}
463
464fn overlay_delete(handle: &mut CollectionHandle, ext_id: &str) {
467 bump_write_gen(handle);
468 let Some(&internal) = handle.ext_to_int.get(ext_id) else {
469 return;
470 };
471 let cur = handle.snapshot.load_full();
472 let mut overlay = cur.overlay.as_ref().clone();
473 overlay.tombstones.insert(internal);
474 publish_overlay(handle, &cur, Arc::new(overlay));
475}
476
477#[derive(Serialize, Deserialize)]
483struct IndexEnvelope {
484 version: u16,
485 int_to_ext: Vec<String>,
486 ivf: Vec<u8>,
487}
488
489const INDEX_ENVELOPE_VERSION: u16 = 1;
492
493#[derive(Serialize, Deserialize)]
502struct DiskEnvelope {
503 version: u16,
504 int_to_ext: Vec<String>,
505 base_row_count: u64,
506 deleted_ids: Vec<u64>,
507}
508
509struct CollectionHandle {
510 id: CollectionId,
511 descriptor: Descriptor,
512 index: CollectionIndex,
513 int_to_ext: Vec<String>,
514 ext_to_int: HashMap<String, u64>,
515 stale: bool,
516 write_gen: u64,
524 docs: Option<BTreeMap<String, u32>>,
530 sparse: Option<SparseInvertedIndex>,
537 mvcc: bool,
542 snapshot: SnapshotCell,
546}
547
548fn uses_sparse_index(descriptor: &Descriptor) -> bool {
551 !descriptor.multivector && descriptor.vector_encryption != VectorEncryption::ClientSide
552}
553
554fn mark_stale(handle: &mut CollectionHandle) {
558 handle.stale = true;
559 bump_write_gen(handle);
560}
561
562fn bump_write_gen(handle: &mut CollectionHandle) {
571 handle.write_gen = handle.write_gen.wrapping_add(1);
572}
573
574pub struct Database {
576 store: Store,
577 collections: HashMap<String, CollectionHandle>,
578 mvcc: bool,
582}
583
584fn mvcc_from_env() -> bool {
586 std::env::var("QUIVER_MVCC_READS")
587 .map(|v| matches!(v.as_str(), "1" | "true" | "TRUE" | "on" | "yes"))
588 .unwrap_or(false)
589}
590
591impl Database {
592 pub fn open(dir: &Path) -> Result<Self> {
595 Self::from_store(Store::open(dir)?)
596 }
597
598 pub fn open_with_codec(dir: &Path, codec: Box<dyn PageCodec>) -> Result<Self> {
603 Self::from_store(Store::open_with_codec(dir, codec)?)
604 }
605
606 pub fn open_with_keyring(dir: &Path, keyring: Box<dyn KeyRing>) -> Result<Self> {
611 Self::from_store(Store::open_with_keyring(dir, keyring)?)
612 }
613
614 fn from_store(store: Store) -> Result<Self> {
616 let mvcc = mvcc_from_env();
617 let mut collections = HashMap::new();
618 for name in store.collection_names() {
619 let Some(id) = store.collection_id(&name) else {
620 continue;
621 };
622 let Some(descriptor) = store.descriptor(id).cloned() else {
623 continue;
624 };
625 let snapshot = empty_snapshot(&descriptor);
626 let mut handle = CollectionHandle {
627 id,
628 index: empty_index(&descriptor),
629 descriptor,
630 int_to_ext: Vec::new(),
631 ext_to_int: HashMap::new(),
632 stale: true,
633 write_gen: 0,
634 docs: None,
635 sparse: None,
637 mvcc,
638 snapshot,
639 };
640 load_index(&store, &mut handle)?;
641 if mvcc_served(&handle) {
646 handle.stale = true;
647 }
648 collections.insert(name, handle);
649 }
650 Ok(Self {
651 store,
652 collections,
653 mvcc,
654 })
655 }
656
657 pub fn create_collection(&mut self, name: &str, descriptor: Descriptor) -> Result<()> {
660 validate_index(&descriptor)?;
661 let id = self.store.create_collection(name, descriptor.clone())?;
662 let index = empty_index(&descriptor);
663 let docs = descriptor.multivector.then(BTreeMap::new);
664 let sparse = uses_sparse_index(&descriptor).then(SparseInvertedIndex::new);
668 let snapshot = empty_snapshot(&descriptor);
669 let mvcc = self.mvcc;
670 self.collections.insert(
671 name.to_owned(),
672 CollectionHandle {
673 id,
674 descriptor,
675 index,
676 int_to_ext: Vec::new(),
677 ext_to_int: HashMap::new(),
678 stale: false,
679 write_gen: 0,
680 docs,
681 sparse,
682 mvcc,
683 snapshot,
684 },
685 );
686 Ok(())
687 }
688
689 pub fn drop_collection(&mut self, name: &str) -> Result<bool> {
691 let existed = self.store.drop_collection(name)?;
692 self.collections.remove(name);
693 Ok(existed)
694 }
695
696 pub fn shred_collection(&mut self, name: &str) -> Result<bool> {
702 let existed = self.store.shred_collection(name)?;
703 self.collections.remove(name);
704 Ok(existed)
705 }
706
707 pub fn set_commit_observer(&mut self, observer: CommitObserver) {
711 self.store.set_commit_observer(observer);
712 }
713
714 pub fn replication_snapshot(&self) -> Result<Vec<WalOp>> {
720 Ok(self.store.replication_snapshot()?)
721 }
722
723 pub fn apply_replicated(&mut self, op: WalOp) -> Result<()> {
732 let target = match &op {
733 WalOp::CreateCollection { collection_id, .. }
734 | WalOp::DropCollection { collection_id }
735 | WalOp::Upsert { collection_id, .. }
736 | WalOp::Delete { collection_id, .. } => Some(*collection_id),
737 WalOp::Checkpoint { .. } => None,
738 };
739 let create_name = match &op {
740 WalOp::CreateCollection { name, .. } => Some(name.clone()),
741 _ => None,
742 };
743 let is_drop = matches!(op, WalOp::DropCollection { .. });
744 self.store.apply_replicated(op)?;
745
746 if let Some(name) = create_name {
747 if let Some(id) = target
749 && let Some(descriptor) = self.store.descriptor(id).cloned()
750 {
751 let docs = descriptor.multivector.then(BTreeMap::new);
752 let index = empty_index(&descriptor);
753 let snapshot = empty_snapshot(&descriptor);
754 let mvcc = self.mvcc;
755 self.collections.insert(
758 name,
759 CollectionHandle {
760 id,
761 descriptor,
762 index,
763 int_to_ext: Vec::new(),
764 ext_to_int: HashMap::new(),
765 stale: false,
766 write_gen: 0,
767 docs,
768 sparse: None,
769 mvcc,
770 snapshot,
771 },
772 );
773 }
774 } else if is_drop {
775 if let Some(id) = target {
776 self.collections.retain(|_, h| h.id != id);
777 }
778 } else if let Some(id) = target
779 && let Some(handle) = self.collections.values_mut().find(|h| h.id == id)
780 {
781 mark_stale(handle);
782 }
783 Ok(())
784 }
785
786 #[must_use]
788 pub fn collection_names(&self) -> Vec<String> {
789 self.store.collection_names()
790 }
791
792 #[must_use]
794 pub fn descriptor(&self, name: &str) -> Option<&Descriptor> {
795 self.collections.get(name).map(|h| &h.descriptor)
796 }
797
798 pub fn len(&self, name: &str) -> Result<usize> {
800 let handle = self.handle(name)?;
801 Ok(self.store.len(handle.id)?)
802 }
803
804 pub fn is_empty(&self, name: &str) -> Result<bool> {
806 Ok(self.len(name)? == 0)
807 }
808
809 pub fn upsert(
811 &mut self,
812 collection: &str,
813 id: &str,
814 vector: &[f32],
815 payload: &Value,
816 ) -> Result<()> {
817 let handle = self
818 .collections
819 .get_mut(collection)
820 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
821 require_single_vector(handle)?;
822 let payload_bytes = serde_json::to_vec(payload)?;
823 self.store.upsert(handle.id, id, vector, &payload_bytes)?;
824 if handle.descriptor.vector_encryption == VectorEncryption::ClientSide {
828 return Ok(());
829 }
830 index_upsert_point(handle, id, vector)?;
833 sparse_index_upsert_point(handle, id, payload);
835 Ok(())
836 }
837
838 pub fn upsert_batch(
845 &mut self,
846 collection: &str,
847 points: &[(&str, &[f32], &serde_json::Value)],
848 ) -> Result<u64> {
849 let handle = self
850 .collections
851 .get(collection)
852 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
853 require_single_vector(handle)?;
854 let coll_id = handle.id;
855 let is_client_side = handle.descriptor.vector_encryption == VectorEncryption::ClientSide;
856
857 let payload_bytes: Vec<Vec<u8>> = points
858 .iter()
859 .map(|(_, _, p)| serde_json::to_vec(p).map_err(Error::Json))
860 .collect::<Result<_>>()?;
861
862 let records: Vec<(&str, &[f32], &[u8])> = points
863 .iter()
864 .zip(payload_bytes.iter())
865 .map(|((id, vec, _), p)| (*id, *vec, p.as_slice()))
866 .collect();
867
868 self.store.upsert_batch(coll_id, &records)?;
869
870 if is_client_side {
871 return Ok(records.len() as u64);
872 }
873
874 for (id, vector, payload) in points {
875 let handle = self
876 .collections
877 .get_mut(collection)
878 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
879 index_upsert_point(handle, id, vector)?;
880 sparse_index_upsert_point(handle, id, payload);
881 }
882 Ok(records.len() as u64)
883 }
884
885 pub fn upsert_bulk(
896 &mut self,
897 collection: &str,
898 points: &[(&str, &[f32], &serde_json::Value)],
899 ) -> Result<u64> {
900 let handle = self
901 .collections
902 .get_mut(collection)
903 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
904 require_single_vector(handle)?;
905 let coll_id = handle.id;
906
907 let payload_bytes: Vec<Vec<u8>> = points
908 .iter()
909 .map(|(_, _, p)| serde_json::to_vec(p).map_err(Error::Json))
910 .collect::<Result<_>>()?;
911 let records: Vec<(&str, &[f32], &[u8])> = points
912 .iter()
913 .zip(payload_bytes.iter())
914 .map(|((id, vec, _), p)| (*id, *vec, p.as_slice()))
915 .collect();
916
917 self.store.upsert_batch(coll_id, &records)?;
918
919 let handle = self
923 .collections
924 .get_mut(collection)
925 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
926 mark_stale(handle);
927 Ok(records.len() as u64)
928 }
929
930 pub fn delete(&mut self, collection: &str, id: &str) -> Result<bool> {
932 let handle = self
933 .collections
934 .get_mut(collection)
935 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
936 require_single_vector(handle)?;
937 let existed = self.store.delete(handle.id, id)?;
938 if !existed {
939 return Ok(false);
940 }
941 if handle.descriptor.vector_encryption == VectorEncryption::ClientSide {
944 return Ok(true);
945 }
946 index_delete_point(handle, id);
952 sparse_index_delete_point(handle, id);
954 Ok(true)
955 }
956
957 pub fn get(&self, collection: &str, id: &str) -> Result<Option<Match>> {
959 let handle = self.handle(collection)?;
960 require_single_vector(handle)?;
961 match self.store.get(handle.id, id)? {
962 Some(record) => Ok(Some(Match {
963 id: id.to_owned(),
964 score: 0.0,
965 payload: Some(serde_json::from_slice(&record.payload)?),
966 vector: Some(record.vector),
967 })),
968 None => Ok(None),
969 }
970 }
971
972 pub fn fetch(
986 &self,
987 collection: &str,
988 filter: Option<&Filter>,
989 offset: usize,
990 limit: usize,
991 with_payload: bool,
992 with_vector: bool,
993 ) -> Result<Vec<Match>> {
994 let handle = self.handle(collection)?;
995 require_single_vector(handle)?;
996 let mut out = Vec::new();
997 let mut skipped = 0usize;
1002 for (id, record) in self.store.scan(handle.id)? {
1003 if out.len() >= limit {
1004 break;
1005 }
1006 let payload: Value = serde_json::from_slice(&record.payload)?;
1007 if let Some(filter) = filter
1008 && !filter.matches(&payload)
1009 {
1010 continue;
1011 }
1012 if skipped < offset {
1013 skipped += 1;
1014 continue;
1015 }
1016 out.push(Match {
1017 id,
1018 score: 0.0,
1019 payload: with_payload.then_some(payload),
1020 vector: with_vector.then_some(record.vector),
1021 });
1022 }
1023 Ok(out)
1024 }
1025
1026 pub fn ensure_indexed(&mut self, collection: &str) -> Result<()> {
1032 if self.handle(collection)?.stale {
1033 let store = &self.store;
1034 let handle = self
1035 .collections
1036 .get_mut(collection)
1037 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
1038 rebuild_index(store, handle)?;
1039 if mvcc_served(handle) {
1043 publish_base(handle);
1044 }
1045 }
1046 Ok(())
1047 }
1048
1049 pub fn set_mvcc_reads(&mut self, on: bool) {
1055 self.mvcc = on;
1056 for handle in self.collections.values_mut() {
1057 handle.mvcc = on;
1058 if mvcc_eligible(&handle.descriptor) {
1062 handle.stale = true;
1063 }
1064 }
1065 }
1066
1067 #[must_use]
1069 pub fn mvcc_reads(&self) -> bool {
1070 self.mvcc
1071 }
1072
1073 pub fn collection_snapshot(&self, collection: &str) -> Result<SnapshotCell> {
1083 Ok(self.handle(collection)?.snapshot.clone())
1084 }
1085
1086 pub fn mvcc_cell(&self, collection: &str) -> Result<Option<SnapshotCell>> {
1096 let handle = self.handle(collection)?;
1097 Ok(mvcc_served(handle).then(|| handle.snapshot.clone()))
1098 }
1099
1100 pub fn needs_rebuild(&self, collection: &str) -> Result<bool> {
1105 Ok(self.handle(collection)?.stale)
1106 }
1107
1108 pub fn snapshot_rebuild_inputs(&self, collection: &str) -> Result<Option<RebuildInputs>> {
1114 let handle = self.handle(collection)?;
1115 if !handle.stale {
1116 return Ok(None);
1117 }
1118 let scan = scan_collection(&self.store, handle)?;
1119 Ok(Some(RebuildInputs {
1120 collection: collection.to_owned(),
1121 descriptor: handle.descriptor.clone(),
1122 scan,
1123 write_gen: handle.write_gen,
1124 }))
1125 }
1126
1127 pub fn commit_rebuild(&mut self, rebuilt: RebuiltIndex) -> Result<bool> {
1134 let store = &self.store;
1135 let Some(handle) = self.collections.get_mut(&rebuilt.collection) else {
1136 return Ok(false);
1137 };
1138 match rebuilt.kind {
1139 RebuiltKind::Ready(index) => handle.index = *index,
1140 RebuiltKind::Disk { graph, pq } => {
1141 handle.index = empty_index(&handle.descriptor);
1145 let disk = write_disk_index(store, handle.id, &graph, &pq)?;
1146 handle.index = CollectionIndex::Disk(Some(FreshDiskVamana::new(disk)?));
1147 }
1148 }
1149 handle.int_to_ext = rebuilt.int_to_ext;
1150 handle.ext_to_int = rebuilt.ext_to_int;
1151 handle.docs = rebuilt.docs;
1152 handle.sparse = rebuilt.sparse;
1153 let still_stale = handle.write_gen != rebuilt.write_gen;
1156 handle.stale = still_stale;
1157 if mvcc_served(handle) {
1164 publish_base(handle);
1165 }
1166 Ok(still_stale)
1167 }
1168
1169 pub fn search(
1172 &mut self,
1173 collection: &str,
1174 query: &[f32],
1175 params: &SearchParams,
1176 ) -> Result<Vec<Match>> {
1177 self.ensure_indexed(collection)?;
1182 self.search_snapshot(collection, query, params)
1183 }
1184
1185 pub fn search_snapshot(
1192 &self,
1193 collection: &str,
1194 query: &[f32],
1195 params: &SearchParams,
1196 ) -> Result<Vec<Match>> {
1197 require_single_vector(self.handle(collection)?)?;
1198 require_server_searchable(self.handle(collection)?)?;
1199
1200 let handle = self.handle(collection)?;
1201
1202 if mvcc_served(handle) {
1208 return self.search_snapshot_mvcc(handle, query, params);
1209 }
1210
1211 if let Some(filter) = ¶ms.filter
1215 && let Some(candidates) = candidate_ids(
1216 &self.store,
1217 handle.id,
1218 filter,
1219 &handle.descriptor.filterable,
1220 )?
1221 && candidates.len() <= FULL_SCAN_THRESHOLD
1222 {
1223 return self.exact_filtered_search(
1224 handle.id,
1225 &handle.descriptor,
1226 query,
1227 params,
1228 filter,
1229 &candidates,
1230 );
1231 }
1232
1233 let fetch = if params.filter.is_some() {
1234 params
1235 .k
1236 .saturating_mul(FILTER_OVERFETCH)
1237 .max(params.ef_search)
1238 } else {
1239 params.k
1240 };
1241 let raw = handle.index.search(query, fetch, params.ef_search)?;
1242
1243 let need_record = params.filter.is_some() || params.with_payload || params.with_vector;
1244 let mut out = Vec::with_capacity(params.k);
1245 for neighbor in raw {
1246 if out.len() >= params.k {
1247 break;
1248 }
1249 let Some(ext_id) = handle.int_to_ext.get(neighbor.id as usize) else {
1250 continue;
1251 };
1252 let record = if need_record {
1253 self.store.get(handle.id, ext_id)?
1254 } else {
1255 None
1256 };
1257 let payload_value: Option<Value> = match &record {
1258 Some(r) if params.filter.is_some() || params.with_payload => {
1259 Some(serde_json::from_slice(&r.payload)?)
1260 }
1261 _ => None,
1262 };
1263 if let Some(filter) = ¶ms.filter {
1264 let value = payload_value.as_ref().unwrap_or(&Value::Null);
1265 if !filter.matches(value) {
1266 continue;
1267 }
1268 }
1269 out.push(Match {
1270 id: ext_id.clone(),
1271 score: neighbor.distance,
1272 payload: if params.with_payload {
1273 payload_value
1274 } else {
1275 None
1276 },
1277 vector: if params.with_vector {
1278 record.map(|r| r.vector)
1279 } else {
1280 None
1281 },
1282 });
1283 }
1284 Ok(out)
1285 }
1286
1287 fn search_snapshot_mvcc(
1293 &self,
1294 handle: &CollectionHandle,
1295 query: &[f32],
1296 params: &SearchParams,
1297 ) -> Result<Vec<Match>> {
1298 if let Some(filter) = ¶ms.filter
1301 && let Some(candidates) = candidate_ids(
1302 &self.store,
1303 handle.id,
1304 filter,
1305 &handle.descriptor.filterable,
1306 )?
1307 && candidates.len() <= FULL_SCAN_THRESHOLD
1308 {
1309 return self.exact_filtered_search(
1310 handle.id,
1311 &handle.descriptor,
1312 query,
1313 params,
1314 filter,
1315 &candidates,
1316 );
1317 }
1318
1319 let fetch = if params.filter.is_some() {
1323 params
1324 .k
1325 .saturating_mul(FILTER_OVERFETCH)
1326 .max(params.ef_search)
1327 } else {
1328 params.k
1329 };
1330 let dense = handle
1331 .snapshot
1332 .load()
1333 .search(query, fetch, params.ef_search)?;
1334 let need_record = params.filter.is_some() || params.with_payload || params.with_vector;
1335 let mut out = Vec::with_capacity(params.k);
1336 for m in dense {
1337 if out.len() >= params.k {
1338 break;
1339 }
1340 let record = if need_record {
1341 self.store.get(handle.id, &m.id)?
1342 } else {
1343 None
1344 };
1345 let payload_value: Option<Value> = match &record {
1346 Some(r) if params.filter.is_some() || params.with_payload => {
1347 Some(serde_json::from_slice(&r.payload)?)
1348 }
1349 _ => None,
1350 };
1351 if let Some(filter) = ¶ms.filter
1352 && !filter.matches(payload_value.as_ref().unwrap_or(&Value::Null))
1353 {
1354 continue;
1355 }
1356 out.push(Match {
1357 id: m.id,
1358 score: m.score,
1359 payload: if params.with_payload {
1360 payload_value
1361 } else {
1362 None
1363 },
1364 vector: if params.with_vector {
1365 record.map(|r| r.vector)
1366 } else {
1367 None
1368 },
1369 });
1370 }
1371 Ok(out)
1372 }
1373
1374 pub fn hybrid_search(
1382 &mut self,
1383 collection: &str,
1384 dense_query: Option<&[f32]>,
1385 sparse_query: Option<&SparseVector>,
1386 text_query: Option<&str>,
1387 params: &SearchParams,
1388 rrf_k0: f32,
1389 ) -> Result<Vec<Match>> {
1390 self.ensure_indexed(collection)?;
1394 self.hybrid_search_snapshot(
1395 collection,
1396 dense_query,
1397 sparse_query,
1398 text_query,
1399 params,
1400 rrf_k0,
1401 )
1402 }
1403
1404 pub fn hybrid_search_snapshot(
1409 &self,
1410 collection: &str,
1411 dense_query: Option<&[f32]>,
1412 sparse_query: Option<&SparseVector>,
1413 text_query: Option<&str>,
1414 params: &SearchParams,
1415 rrf_k0: f32,
1416 ) -> Result<Vec<Match>> {
1417 require_single_vector(self.handle(collection)?)?;
1418 require_server_searchable(self.handle(collection)?)?;
1419 if dense_query.is_none() && sparse_query.is_none() && text_query.is_none() {
1420 return Err(Error::Unsupported(
1421 "hybrid_search requires a dense query, a sparse query, or a text query",
1422 ));
1423 }
1424 let handle = self.handle(collection)?;
1425
1426 let depth = params
1429 .k
1430 .saturating_mul(RRF_CANDIDATE_FACTOR)
1431 .max(MIN_RRF_CANDIDATES);
1432 let filter = params.filter.as_ref();
1433 let mut lists: Vec<Vec<String>> = Vec::new();
1434 if let Some(q) = dense_query {
1435 lists.push(self.dense_ranked_ids(handle, q, depth, params.ef_search, filter)?);
1436 }
1437 if let Some(sp) = sparse_query {
1438 lists.push(self.sparse_ranked_ids(handle, sp, depth, filter)?);
1439 }
1440 if let Some(text) = text_query {
1441 lists.push(self.bm25_ranked_ids(handle, text, depth, filter)?);
1442 }
1443 let fused = rrf_fuse(&lists, rrf_k0, params.k);
1444
1445 let mut out = Vec::with_capacity(fused.len());
1446 for (ext_id, score) in fused {
1447 let record = if params.with_payload || params.with_vector {
1448 self.store.get(handle.id, &ext_id)?
1449 } else {
1450 None
1451 };
1452 let payload = match (&record, params.with_payload) {
1453 (Some(r), true) => Some(serde_json::from_slice(&r.payload)?),
1454 _ => None,
1455 };
1456 out.push(Match {
1457 id: ext_id,
1458 score,
1459 payload,
1460 vector: if params.with_vector {
1461 record.map(|r| r.vector)
1462 } else {
1463 None
1464 },
1465 });
1466 }
1467 Ok(out)
1468 }
1469
1470 fn dense_ranked_ids(
1473 &self,
1474 handle: &CollectionHandle,
1475 query: &[f32],
1476 depth: usize,
1477 ef_search: usize,
1478 filter: Option<&Filter>,
1479 ) -> Result<Vec<String>> {
1480 let mut ids = Vec::new();
1481 if mvcc_served(handle) {
1485 for m in handle
1486 .snapshot
1487 .load()
1488 .search(query, depth, ef_search.max(depth))?
1489 {
1490 if !self.passes_filter(handle.id, &m.id, filter)? {
1491 continue;
1492 }
1493 ids.push(m.id);
1494 if ids.len() >= depth {
1495 break;
1496 }
1497 }
1498 return Ok(ids);
1499 }
1500 let raw = handle.index.search(query, depth, ef_search.max(depth))?;
1501 for neighbor in raw {
1502 let Some(ext_id) = handle.int_to_ext.get(neighbor.id as usize) else {
1503 continue;
1504 };
1505 if !self.passes_filter(handle.id, ext_id, filter)? {
1506 continue;
1507 }
1508 ids.push(ext_id.clone());
1509 if ids.len() >= depth {
1510 break;
1511 }
1512 }
1513 Ok(ids)
1514 }
1515
1516 fn sparse_ranked_ids(
1523 &self,
1524 handle: &CollectionHandle,
1525 query: &SparseVector,
1526 depth: usize,
1527 filter: Option<&Filter>,
1528 ) -> Result<Vec<String>> {
1529 if let Some(idx) = handle.sparse.as_ref() {
1530 let mut ids = Vec::new();
1531 for (ext_id, _score) in idx.search(query) {
1532 if !self.passes_filter(handle.id, &ext_id, filter)? {
1533 continue;
1534 }
1535 ids.push(ext_id);
1536 if ids.len() >= depth {
1537 break;
1538 }
1539 }
1540 return Ok(ids);
1541 }
1542 self.sparse_ranked_ids_by_scan(handle.id, query, depth, filter)
1543 }
1544
1545 fn sparse_ranked_ids_by_scan(
1549 &self,
1550 cid: CollectionId,
1551 query: &SparseVector,
1552 depth: usize,
1553 filter: Option<&Filter>,
1554 ) -> Result<Vec<String>> {
1555 let qmap: HashMap<u32, f32> = query
1556 .indices
1557 .iter()
1558 .copied()
1559 .zip(query.values.iter().copied())
1560 .collect();
1561 let mut scored: Vec<(f32, String)> = Vec::new();
1562 for (ext_id, record) in self.store.scan(cid)? {
1563 if record.payload.is_empty() {
1564 continue;
1565 }
1566 let Ok(value) = serde_json::from_slice::<Value>(&record.payload) else {
1567 continue;
1568 };
1569 if let Some(filter) = filter
1570 && !filter.matches(&value)
1571 {
1572 continue;
1573 }
1574 let Some(raw) = value.get(SPARSE_KEY) else {
1575 continue;
1576 };
1577 let Ok(sv) = serde_json::from_value::<SparseVector>(raw.clone()) else {
1578 continue;
1579 };
1580 let mut score = 0.0f32;
1581 for (dim, weight) in sv.indices.iter().zip(sv.values.iter()) {
1582 if let Some(qw) = qmap.get(dim) {
1583 score += qw * weight;
1584 }
1585 }
1586 if score > 0.0 {
1587 scored.push((score, ext_id));
1588 }
1589 }
1590 scored.sort_by(|a, b| b.0.total_cmp(&a.0).then(a.1.cmp(&b.1)));
1591 Ok(scored.into_iter().take(depth).map(|(_, id)| id).collect())
1592 }
1593
1594 fn bm25_ranked_ids(
1602 &self,
1603 handle: &CollectionHandle,
1604 query_text: &str,
1605 depth: usize,
1606 filter: Option<&Filter>,
1607 ) -> Result<Vec<String>> {
1608 let Some(idx) = handle.sparse.as_ref() else {
1609 return Ok(Vec::new());
1610 };
1611 let terms = query_term_ids(query_text);
1612 let mut ids = Vec::new();
1613 for (ext_id, _score) in idx.bm25_search(&terms, BM25_K1, BM25_B) {
1614 if !self.passes_filter(handle.id, &ext_id, filter)? {
1615 continue;
1616 }
1617 ids.push(ext_id);
1618 if ids.len() >= depth {
1619 break;
1620 }
1621 }
1622 Ok(ids)
1623 }
1624
1625 fn passes_filter(
1628 &self,
1629 cid: CollectionId,
1630 ext_id: &str,
1631 filter: Option<&Filter>,
1632 ) -> Result<bool> {
1633 let Some(filter) = filter else {
1634 return Ok(true);
1635 };
1636 let value: Value = match self.store.get(cid, ext_id)? {
1637 Some(r) => serde_json::from_slice(&r.payload)?,
1638 None => Value::Null,
1639 };
1640 Ok(filter.matches(&value))
1641 }
1642
1643 fn exact_filtered_search(
1648 &self,
1649 cid: CollectionId,
1650 descriptor: &Descriptor,
1651 query: &[f32],
1652 params: &SearchParams,
1653 filter: &Filter,
1654 candidates: &BTreeSet<String>,
1655 ) -> Result<Vec<Match>> {
1656 let metric = to_index_metric(descriptor.metric);
1657 let mut scored: Vec<(f32, String, Value, Vec<f32>)> = Vec::new();
1658 for ext_id in candidates {
1659 let Some(record) = self.store.get(cid, ext_id)? else {
1660 continue;
1661 };
1662 let payload: Value = serde_json::from_slice(&record.payload)?;
1663 if !filter.matches(&payload) {
1664 continue;
1665 }
1666 let ordering = ordering_distance(metric, query, &record.vector);
1667 scored.push((ordering, ext_id.clone(), payload, record.vector));
1668 }
1669 scored.sort_by(|a, b| a.0.total_cmp(&b.0));
1670 scored.truncate(params.k);
1671 Ok(scored
1672 .into_iter()
1673 .map(|(ordering, id, payload, vector)| Match {
1674 id,
1675 score: report_metric(metric, ordering),
1676 payload: params.with_payload.then_some(payload),
1677 vector: params.with_vector.then_some(vector),
1678 })
1679 .collect())
1680 }
1681
1682 pub fn upsert_document(
1691 &mut self,
1692 collection: &str,
1693 doc_id: &str,
1694 vectors: &[Vec<f32>],
1695 payload: &Value,
1696 ) -> Result<()> {
1697 let handle = self
1698 .collections
1699 .get_mut(collection)
1700 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
1701 require_multivector(handle)?;
1702 if doc_id.contains(DOC_TOKEN_SEP) {
1703 return Err(Error::Unsupported(
1704 "document id must not contain the reserved 0x1f separator",
1705 ));
1706 }
1707 if vectors.is_empty() {
1708 return Err(Error::Unsupported("a document needs at least one vector"));
1709 }
1710 let dim = handle.descriptor.dim as usize;
1711 if vectors.iter().any(|v| v.len() != dim) {
1712 return Err(Error::Unsupported(
1713 "every document vector must match the collection dimensionality",
1714 ));
1715 }
1716 let previous = handle
1719 .docs
1720 .as_ref()
1721 .and_then(|d| d.get(doc_id))
1722 .copied()
1723 .unwrap_or(0) as usize;
1724 for j in vectors.len()..previous {
1725 self.store.delete(handle.id, &token_id(doc_id, j))?;
1726 index_delete_point(handle, &token_id(doc_id, j));
1728 }
1729 let payload_bytes = serde_json::to_vec(payload)?;
1730 for (j, vector) in vectors.iter().enumerate() {
1731 let bytes: &[u8] = if j == 0 {
1733 payload_bytes.as_slice()
1734 } else {
1735 &[]
1736 };
1737 self.store
1738 .upsert(handle.id, &token_id(doc_id, j), vector, bytes)?;
1739 index_upsert_point(handle, &token_id(doc_id, j), vector)?;
1743 }
1744 if let Some(docs) = handle.docs.as_mut() {
1745 docs.insert(doc_id.to_owned(), vectors.len() as u32);
1746 }
1747 Ok(())
1748 }
1749
1750 pub fn search_multi_vector(
1758 &mut self,
1759 collection: &str,
1760 query_tokens: &[Vec<f32>],
1761 params: &SearchParams,
1762 ) -> Result<Vec<DocumentMatch>> {
1763 self.ensure_indexed(collection)?;
1767 self.search_multi_vector_snapshot(collection, query_tokens, params)
1768 }
1769
1770 pub fn search_multi_vector_snapshot(
1777 &self,
1778 collection: &str,
1779 query_tokens: &[Vec<f32>],
1780 params: &SearchParams,
1781 ) -> Result<Vec<DocumentMatch>> {
1782 require_multivector(self.handle(collection)?)?;
1783 let dim = self.handle(collection)?.descriptor.dim as usize;
1784 if query_tokens.is_empty() {
1785 return Ok(Vec::new());
1786 }
1787 if query_tokens.iter().any(|v| v.len() != dim) {
1788 return Err(Error::Unsupported(
1789 "every query token must match the collection dimensionality",
1790 ));
1791 }
1792
1793 let doc_count = self
1794 .handle(collection)?
1795 .docs
1796 .as_ref()
1797 .map_or(0, BTreeMap::len);
1798 let candidates: Vec<String> = if doc_count <= MULTIVECTOR_EXACT_DOC_THRESHOLD {
1799 self.handle(collection)?
1801 .docs
1802 .as_ref()
1803 .map(|d| d.keys().cloned().collect())
1804 .unwrap_or_default()
1805 } else {
1806 let handle = self.handle(collection)?;
1810 let per_token_k = params
1811 .k
1812 .saturating_mul(MULTIVECTOR_CANDIDATE_FACTOR)
1813 .max(params.ef_search);
1814 let mut set = BTreeSet::new();
1815 for token in query_tokens {
1816 for neighbor in handle.index.search(token, per_token_k, params.ef_search)? {
1817 if let Some(ext) = handle.int_to_ext.get(neighbor.id as usize)
1818 && let Some((doc, _)) = parse_token_id(ext)
1819 {
1820 set.insert(doc.to_owned());
1821 }
1822 }
1823 }
1824 set.into_iter().collect()
1825 };
1826
1827 let handle = self.handle(collection)?;
1829 let cid = handle.id;
1830 let metric = to_index_metric(handle.descriptor.metric);
1831 let mut scored: Vec<ScoredDocument> = Vec::new();
1832 for doc in &candidates {
1833 let count = handle
1834 .docs
1835 .as_ref()
1836 .and_then(|d| d.get(doc))
1837 .copied()
1838 .unwrap_or(0) as usize;
1839 let (tokens, payload) = self.gather_document(cid, doc, count)?;
1840 if tokens.is_empty() {
1841 continue;
1842 }
1843 if let Some(filter) = ¶ms.filter {
1844 let value = payload.clone().unwrap_or(Value::Null);
1845 if !filter.matches(&value) {
1846 continue;
1847 }
1848 }
1849 let score = max_sim(metric, query_tokens, &tokens);
1850 let vectors = params.with_vector.then_some(tokens);
1851 scored.push((score, doc.clone(), payload, vectors));
1852 }
1853 scored.sort_by(|a, b| b.0.total_cmp(&a.0).then_with(|| a.1.cmp(&b.1)));
1855 scored.truncate(params.k);
1856 Ok(scored
1857 .into_iter()
1858 .map(|(score, id, payload, vectors)| DocumentMatch {
1859 id,
1860 score,
1861 payload: params.with_payload.then_some(payload).flatten(),
1862 vectors,
1863 })
1864 .collect())
1865 }
1866
1867 pub fn get_document(
1870 &self,
1871 collection: &str,
1872 doc_id: &str,
1873 with_vectors: bool,
1874 ) -> Result<Option<DocumentMatch>> {
1875 let handle = self.handle(collection)?;
1876 require_multivector(handle)?;
1877 let Some(&count) = handle.docs.as_ref().and_then(|d| d.get(doc_id)) else {
1878 return Ok(None);
1879 };
1880 let (tokens, payload) = self.gather_document(handle.id, doc_id, count as usize)?;
1881 if tokens.is_empty() {
1882 return Ok(None);
1883 }
1884 Ok(Some(DocumentMatch {
1885 id: doc_id.to_owned(),
1886 score: 0.0,
1887 payload,
1888 vectors: with_vectors.then_some(tokens),
1889 }))
1890 }
1891
1892 pub fn delete_document(&mut self, collection: &str, doc_id: &str) -> Result<bool> {
1895 let handle = self
1896 .collections
1897 .get_mut(collection)
1898 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
1899 require_multivector(handle)?;
1900 let Some(count) = handle.docs.as_ref().and_then(|d| d.get(doc_id)).copied() else {
1901 return Ok(false);
1902 };
1903 for j in 0..count as usize {
1904 self.store.delete(handle.id, &token_id(doc_id, j))?;
1905 index_delete_point(handle, &token_id(doc_id, j));
1907 }
1908 if let Some(docs) = handle.docs.as_mut() {
1909 docs.remove(doc_id);
1910 }
1911 Ok(true)
1912 }
1913
1914 pub fn document_count(&self, collection: &str) -> Result<usize> {
1917 let handle = self.handle(collection)?;
1918 require_multivector(handle)?;
1919 Ok(handle.docs.as_ref().map_or(0, BTreeMap::len))
1920 }
1921
1922 fn gather_document(
1926 &self,
1927 cid: CollectionId,
1928 doc_id: &str,
1929 count: usize,
1930 ) -> Result<(Vec<Vec<f32>>, Option<Value>)> {
1931 let mut tokens = Vec::with_capacity(count);
1932 let mut payload: Option<Value> = None;
1933 for j in 0..count {
1934 let Some(record) = self.store.get(cid, &token_id(doc_id, j))? else {
1935 continue;
1936 };
1937 if j == 0 && !record.payload.is_empty() {
1938 payload = Some(serde_json::from_slice(&record.payload)?);
1939 }
1940 tokens.push(record.vector);
1941 }
1942 Ok((tokens, payload))
1943 }
1944
1945 pub fn checkpoint(&mut self) -> Result<()> {
1950 let mut snapshots: HashMap<CollectionId, Vec<u8>> = HashMap::new();
1951 for handle in self.collections.values() {
1952 if handle.stale {
1953 continue;
1954 }
1955 if let CollectionIndex::Ivf(Some(ivf)) = &handle.index {
1956 if ivf.is_empty() {
1957 continue;
1958 }
1959 let envelope = IndexEnvelope {
1960 version: INDEX_ENVELOPE_VERSION,
1961 int_to_ext: handle.int_to_ext.clone(),
1962 ivf: ivf.snapshot()?,
1963 };
1964 snapshots.insert(handle.id, postcard::to_allocvec(&envelope)?);
1965 } else if let CollectionIndex::Disk(Some(fresh)) = &handle.index {
1966 let envelope = DiskEnvelope {
1972 version: INDEX_ENVELOPE_VERSION,
1973 int_to_ext: handle.int_to_ext.clone(),
1974 base_row_count: fresh.base_len() as u64,
1975 deleted_ids: fresh.deleted_ids(),
1976 };
1977 snapshots.insert(handle.id, postcard::to_allocvec(&envelope)?);
1978 }
1979 }
1980 self.store.checkpoint_with_index_snapshots(&snapshots)?;
1981 Ok(())
1982 }
1983
1984 pub fn compact(&mut self) -> Result<()> {
1988 Ok(self.store.compact()?)
1989 }
1990
1991 #[must_use]
1994 pub fn manifest_version(&self) -> u64 {
1995 self.store.manifest_version()
1996 }
1997
1998 #[must_use]
2001 pub fn disk_usage_bytes(&self) -> u64 {
2002 dir_size(self.store.dir())
2003 }
2004
2005 pub fn snapshot(&mut self, dest: &Path) -> Result<SnapshotInfo> {
2017 if dest.exists() {
2018 return Err(Error::Core(quiver_core::CoreError::AlreadyExists(
2019 dest.display().to_string(),
2020 )));
2021 }
2022 self.checkpoint()?;
2025 let (files, bytes) = copy_tree(self.store.dir(), dest)?;
2026 let _ = std::fs::File::open(dest).and_then(|f| f.sync_all());
2030 Ok(SnapshotInfo {
2031 manifest_version: self.store.manifest_version(),
2032 files,
2033 bytes,
2034 })
2035 }
2036
2037 fn handle(&self, name: &str) -> Result<&CollectionHandle> {
2038 self.collections
2039 .get(name)
2040 .ok_or_else(|| Error::CollectionNotFound(name.to_owned()))
2041 }
2042}
2043
2044pub fn restore_snapshot(src: &Path, dest: &Path) -> Result<SnapshotInfo> {
2057 if dest.exists() {
2058 return Err(Error::Core(quiver_core::CoreError::AlreadyExists(
2059 dest.display().to_string(),
2060 )));
2061 }
2062 if !src.join("CURRENT").exists() {
2063 return Err(Error::Core(quiver_core::CoreError::InvalidArgument(
2064 format!("{} is not a snapshot (no CURRENT)", src.display()),
2065 )));
2066 }
2067 let (files, bytes) = copy_tree(src, dest)?;
2068 Ok(SnapshotInfo {
2069 manifest_version: 0,
2073 files,
2074 bytes,
2075 })
2076}
2077
2078fn copy_tree(src: &Path, dst: &Path) -> Result<(u64, u64)> {
2082 std::fs::create_dir_all(dst).map_err(|e| quiver_core::CoreError::io(dst, e))?;
2083 let mut files = 0u64;
2084 let mut bytes = 0u64;
2085 for entry in std::fs::read_dir(src).map_err(|e| quiver_core::CoreError::io(src, e))? {
2086 let entry = entry.map_err(|e| quiver_core::CoreError::io(src, e))?;
2087 let from = entry.path();
2088 let to = dst.join(entry.file_name());
2089 let ft = entry
2090 .file_type()
2091 .map_err(|e| quiver_core::CoreError::io(&from, e))?;
2092 if ft.is_dir() {
2093 let (f, b) = copy_tree(&from, &to)?;
2094 files += f;
2095 bytes += b;
2096 } else {
2097 let n = std::fs::copy(&from, &to).map_err(|e| quiver_core::CoreError::io(&from, e))?;
2098 files += 1;
2099 bytes += n;
2100 }
2101 }
2102 Ok((files, bytes))
2103}
2104
2105fn dir_size(dir: &Path) -> u64 {
2108 let mut total = 0u64;
2109 let Ok(rd) = std::fs::read_dir(dir) else {
2110 return total;
2111 };
2112 for entry in rd.flatten() {
2113 let Ok(ft) = entry.file_type() else { continue };
2114 if ft.is_dir() {
2115 total += dir_size(&entry.path());
2116 } else if let Ok(meta) = entry.metadata() {
2117 total += meta.len();
2118 }
2119 }
2120 total
2121}
2122
2123const DOC_TOKEN_SEP: char = '\u{1f}';
2127
2128const MULTIVECTOR_EXACT_DOC_THRESHOLD: usize = 10_000;
2132
2133const MULTIVECTOR_CANDIDATE_FACTOR: usize = 4;
2136
2137fn token_id(doc_id: &str, ordinal: usize) -> String {
2139 format!("{doc_id}{DOC_TOKEN_SEP}{ordinal}")
2140}
2141
2142fn parse_token_id(ext: &str) -> Option<(&str, u32)> {
2146 let (doc, ordinal) = ext.rsplit_once(DOC_TOKEN_SEP)?;
2147 Some((doc, ordinal.parse().ok()?))
2148}
2149
2150fn require_single_vector(handle: &CollectionHandle) -> Result<()> {
2152 if handle.descriptor.multivector {
2153 Err(Error::Unsupported(
2154 "collection is multi-vector; use upsert_document / search_multi_vector",
2155 ))
2156 } else {
2157 Ok(())
2158 }
2159}
2160
2161fn require_multivector(handle: &CollectionHandle) -> Result<()> {
2163 if handle.descriptor.multivector {
2164 Ok(())
2165 } else {
2166 Err(Error::Unsupported(
2167 "collection is single-vector; use upsert / search",
2168 ))
2169 }
2170}
2171
2172fn require_server_searchable(handle: &CollectionHandle) -> Result<()> {
2176 if handle.descriptor.vector_encryption == VectorEncryption::ClientSide {
2177 Err(Error::Unsupported(
2178 "collection is client-side encrypted; the server cannot rank opaque vectors — \
2179 fetch points and rank client-side",
2180 ))
2181 } else {
2182 Ok(())
2183 }
2184}
2185
2186fn to_index_metric(metric: DistanceMetric) -> Metric {
2187 match metric {
2188 DistanceMetric::Dot => Metric::Dot,
2189 DistanceMetric::Cosine => Metric::Cosine,
2190 DistanceMetric::L2 => Metric::L2,
2191 }
2192}
2193
2194fn validate_index(descriptor: &Descriptor) -> Result<()> {
2196 if descriptor.multivector && descriptor.metric == DistanceMetric::L2 {
2199 return Err(Error::Unsupported(
2200 "multi-vector collections require a similarity metric (cosine or dot)",
2201 ));
2202 }
2203 if descriptor.vector_encryption == VectorEncryption::ClientSide {
2207 if descriptor.multivector {
2208 return Err(Error::Unsupported(
2209 "client-side vector encryption is not supported for multi-vector collections",
2210 ));
2211 }
2212 return Ok(());
2213 }
2214 if descriptor.vector_encryption == VectorEncryption::Dcpe
2217 && descriptor.metric != DistanceMetric::L2
2218 {
2219 return Err(Error::Unsupported(
2220 "dcpe-encrypted collections require the l2 metric",
2221 ));
2222 }
2223 if descriptor.index.kind == IndexKind::Colbert && !descriptor.multivector {
2226 return Err(Error::Unsupported(
2227 "the colbert index is only for multi-vector collections",
2228 ));
2229 }
2230 match descriptor.index.kind {
2231 IndexKind::Vamana | IndexKind::Ivf | IndexKind::DiskVamana
2232 if descriptor.metric == DistanceMetric::Dot =>
2233 {
2234 Err(Error::Unsupported(
2235 "vamana, ivf, and the disk index support l2 and cosine; use hnsw for dot",
2236 ))
2237 }
2238 _ => Ok(()),
2239 }
2240}
2241
2242fn empty_index(descriptor: &Descriptor) -> CollectionIndex {
2244 if descriptor.vector_encryption == VectorEncryption::ClientSide {
2245 return CollectionIndex::None;
2246 }
2247 match descriptor.index.kind {
2248 IndexKind::Vamana => CollectionIndex::Vamana(None),
2249 IndexKind::DiskVamana => CollectionIndex::Disk(None),
2250 IndexKind::Ivf => CollectionIndex::Ivf(None),
2251 IndexKind::Colbert => CollectionIndex::Colbert(None),
2252 _ => CollectionIndex::Hnsw(Hnsw::new(
2253 descriptor.dim as usize,
2254 to_index_metric(descriptor.metric),
2255 HnswConfig::default(),
2256 )),
2257 }
2258}
2259
2260fn default_pq_m(dim: usize) -> usize {
2263 let target = (dim / 8).max(1);
2264 (1..=target)
2265 .rev()
2266 .find(|&m| dim.is_multiple_of(m))
2267 .unwrap_or(1)
2268}
2269
2270const PQ_SEED: u64 = 0x5176_5044_5141_5453;
2273const DISK_INDEX_FILE: &str = "vamana.qvx";
2276
2277fn build_index(
2278 store: &Store,
2279 cid: CollectionId,
2280 descriptor: &Descriptor,
2281 ids: &[u64],
2282 flat: &[f32],
2283) -> Result<CollectionIndex> {
2284 Ok(match build_in_memory_index(descriptor, ids, flat)? {
2285 Some(index) => index,
2286 None => {
2287 let (graph, pq) = build_disk_graph_pq(descriptor, ids, flat)?;
2288 CollectionIndex::Disk(Some(FreshDiskVamana::new(write_disk_index(
2289 store, cid, &graph, &pq,
2290 )?)?))
2291 }
2292 })
2293}
2294
2295fn build_in_memory_index(
2300 descriptor: &Descriptor,
2301 ids: &[u64],
2302 flat: &[f32],
2303) -> Result<Option<CollectionIndex>> {
2304 if descriptor.vector_encryption == VectorEncryption::ClientSide {
2307 return Ok(Some(CollectionIndex::None));
2308 }
2309 let dim = descriptor.dim as usize;
2310 let metric = to_index_metric(descriptor.metric);
2311 Ok(Some(match descriptor.index.kind {
2312 IndexKind::Vamana => CollectionIndex::Vamana(Some(FreshVamana::new(Vamana::build(
2313 ids,
2314 flat,
2315 dim,
2316 metric,
2317 VamanaConfig::default(),
2318 )?)?)),
2319 IndexKind::DiskVamana => return Ok(None),
2321 IndexKind::Ivf => {
2322 let cfg = IvfConfig {
2323 quantization: descriptor.index.pq_subspaces.map(|m| m as usize),
2324 ..IvfConfig::default()
2325 };
2326 CollectionIndex::Ivf(Some(Ivf::build(ids, flat, dim, metric, cfg)?))
2327 }
2328 IndexKind::Colbert => {
2329 let n = ids.len();
2332 let n_centroids = ((n as f64).sqrt().ceil() as usize).clamp(1, 4096);
2333 let cfg = ColbertConfig {
2334 n_centroids,
2335 n_probe: n_centroids.div_ceil(4).clamp(1, n_centroids),
2336 pq_subspaces: descriptor
2337 .index
2338 .pq_subspaces
2339 .map_or_else(|| default_pq_m(dim), |m| m as usize),
2340 seed: PQ_SEED,
2341 };
2342 CollectionIndex::Colbert(Some(ColbertIndex::build(ids, flat, dim, metric, cfg)?))
2343 }
2344 _ => {
2345 let mut h = Hnsw::new(dim, metric, HnswConfig::default());
2346 for (i, &id) in ids.iter().enumerate() {
2347 h.insert(id, &flat[i * dim..(i + 1) * dim])?;
2348 }
2349 CollectionIndex::Hnsw(h)
2350 }
2351 }))
2352}
2353
2354fn build_disk_graph_pq(
2358 descriptor: &Descriptor,
2359 ids: &[u64],
2360 flat: &[f32],
2361) -> Result<(Vamana, ProductQuantizer)> {
2362 let dim = descriptor.dim as usize;
2363 let metric = to_index_metric(descriptor.metric);
2364 let graph = Vamana::build(ids, flat, dim, metric, VamanaConfig::default())?;
2365 let m = descriptor
2366 .index
2367 .pq_subspaces
2368 .map_or_else(|| default_pq_m(dim), |x| x as usize);
2369 let pq = ProductQuantizer::train(flat, ids.len(), dim, m, metric, PQ_SEED)?;
2370 Ok((graph, pq))
2371}
2372
2373fn write_disk_index(
2378 store: &Store,
2379 cid: CollectionId,
2380 graph: &Vamana,
2381 pq: &ProductQuantizer,
2382) -> Result<DiskVamana> {
2383 let dir = store.index_dir(cid);
2384 std::fs::create_dir_all(&dir).map_err(quiver_index::DiskError::Io)?;
2385 let path = dir.join(DISK_INDEX_FILE);
2386 let codec = store.collection_codec_clone(cid)?;
2390 let tmp = dir.join(format!("{DISK_INDEX_FILE}.tmp"));
2396 quiver_index::disk::write(&tmp, graph, pq, codec.as_ref())?;
2397 std::fs::rename(&tmp, &path).map_err(quiver_index::DiskError::Io)?;
2398 let _ = std::fs::File::open(&dir).and_then(|f| f.sync_all());
2399 open_disk_index(store, cid, codec)
2400}
2401
2402fn open_disk_index(
2406 store: &Store,
2407 cid: CollectionId,
2408 codec: Box<dyn PageCodec>,
2409) -> Result<DiskVamana> {
2410 let path = store.index_dir(cid).join(DISK_INDEX_FILE);
2411 Ok(DiskVamana::open(&path, codec)?)
2412}
2413
2414fn load_index(store: &Store, handle: &mut CollectionHandle) -> Result<()> {
2419 if !handle.descriptor.multivector
2422 && handle.descriptor.index.kind == IndexKind::Ivf
2423 && let Ok(Some(blob)) = store.read_index_snapshot(handle.id)
2424 && restore_ivf_snapshot(store, handle, &blob).is_ok()
2425 {
2426 return Ok(());
2427 }
2428 if !handle.descriptor.multivector
2435 && handle.descriptor.index.kind == IndexKind::DiskVamana
2436 && std::env::var_os("QUIVER_DISABLE_DURABLE_DISK_INDEX").is_none()
2437 && let Ok(Some(blob)) = store.read_index_snapshot(handle.id)
2438 && restore_disk_snapshot(store, handle, &blob).is_ok()
2439 {
2440 return Ok(());
2441 }
2442 rebuild_index(store, handle)
2443}
2444
2445fn restore_disk_snapshot(store: &Store, handle: &mut CollectionHandle, blob: &[u8]) -> Result<()> {
2451 let envelope: DiskEnvelope = postcard::from_bytes(blob)?;
2452 if envelope.version != INDEX_ENVELOPE_VERSION {
2453 return Err(Error::Unsupported(
2454 "unsupported disk index snapshot version",
2455 ));
2456 }
2457 let base = open_disk_index(store, handle.id, store.collection_codec_clone(handle.id)?)?;
2458 if base.len() as u64 != envelope.base_row_count {
2461 return Err(Error::Unsupported(
2462 "disk base count disagrees with snapshot",
2463 ));
2464 }
2465 handle.ext_to_int = envelope
2466 .int_to_ext
2467 .iter()
2468 .enumerate()
2469 .map(|(i, ext)| (ext.clone(), i as u64))
2470 .collect();
2471 handle.int_to_ext = envelope.int_to_ext;
2472 let mut fresh = FreshDiskVamana::new(base)?;
2473 for internal in envelope.base_row_count..handle.int_to_ext.len() as u64 {
2477 let ext = &handle.int_to_ext[internal as usize];
2478 if let Some(record) = store.get(handle.id, ext)? {
2479 fresh.insert(internal, &record.vector)?;
2480 }
2481 }
2482 for id in envelope.deleted_ids {
2483 fresh.mark_deleted(id);
2484 }
2485 handle.index = CollectionIndex::Disk(Some(fresh));
2486 handle.stale = false;
2487 replay_recovery_tail(store, handle)
2488}
2489
2490fn replay_recovery_tail(store: &Store, handle: &mut CollectionHandle) -> Result<()> {
2495 let tail = store.recovery_tail(handle.id)?;
2496 for ext in &tail.deleted {
2497 index_delete_point(handle, ext);
2498 }
2499 for (ext, record) in tail.upserts {
2500 index_upsert_point(handle, &ext, &record.vector)?;
2501 }
2502 Ok(())
2503}
2504
2505fn restore_ivf_snapshot(store: &Store, handle: &mut CollectionHandle, blob: &[u8]) -> Result<()> {
2510 let envelope: IndexEnvelope = postcard::from_bytes(blob)?;
2511 if envelope.version != INDEX_ENVELOPE_VERSION {
2512 return Err(Error::Unsupported(
2513 "unsupported index snapshot envelope version",
2514 ));
2515 }
2516 let ivf = Ivf::restore(&envelope.ivf)?;
2517 handle.ext_to_int = envelope
2518 .int_to_ext
2519 .iter()
2520 .enumerate()
2521 .map(|(i, ext)| (ext.clone(), i as u64))
2522 .collect();
2523 handle.int_to_ext = envelope.int_to_ext;
2524 handle.index = CollectionIndex::Ivf(Some(ivf));
2525 handle.stale = false;
2526
2527 let tail = store.recovery_tail(handle.id)?;
2528 for ext in &tail.deleted {
2529 let Some(&internal) = handle.ext_to_int.get(ext) else {
2530 continue;
2531 };
2532 if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2533 ivf.remove(internal);
2534 }
2535 }
2536 for (ext, record) in tail.upserts {
2537 let internal = match handle.ext_to_int.get(&ext) {
2538 Some(&i) => i,
2539 None => {
2540 let i = handle.int_to_ext.len() as u64;
2541 handle.ext_to_int.insert(ext.clone(), i);
2542 handle.int_to_ext.push(ext);
2543 i
2544 }
2545 };
2546 if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2547 ivf.insert(internal, &record.vector)?;
2548 }
2549 }
2550 Ok(())
2551}
2552
2553fn index_upsert_point(handle: &mut CollectionHandle, ext_id: &str, vector: &[f32]) -> Result<()> {
2562 if mvcc_served(handle) {
2566 overlay_upsert(handle, ext_id, vector);
2567 return Ok(());
2568 }
2569 bump_write_gen(handle);
2572 if handle.stale {
2573 return Ok(());
2574 }
2575 let known = handle.ext_to_int.contains_key(ext_id);
2576 let is_hnsw = matches!(handle.index, CollectionIndex::Hnsw(_));
2577 let is_live_ivf = matches!(&handle.index, CollectionIndex::Ivf(Some(ivf)) if !ivf.is_empty());
2578 let is_live_graph = matches!(
2579 handle.index,
2580 CollectionIndex::Vamana(Some(_)) | CollectionIndex::Disk(Some(_))
2581 );
2582 let is_live_colbert = matches!(handle.index, CollectionIndex::Colbert(Some(_)));
2583 if is_hnsw && !known {
2584 let internal = handle.int_to_ext.len() as u64;
2585 if let CollectionIndex::Hnsw(h) = &mut handle.index {
2586 h.insert(internal, vector)?;
2587 }
2588 handle.ext_to_int.insert(ext_id.to_owned(), internal);
2589 handle.int_to_ext.push(ext_id.to_owned());
2590 } else if is_live_ivf {
2591 let internal = if known {
2594 handle.ext_to_int[ext_id]
2595 } else {
2596 let i = handle.int_to_ext.len() as u64;
2597 handle.ext_to_int.insert(ext_id.to_owned(), i);
2598 handle.int_to_ext.push(ext_id.to_owned());
2599 i
2600 };
2601 if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2602 ivf.insert(internal, vector)?;
2603 }
2604 } else if is_live_graph {
2605 let old = handle.ext_to_int.get(ext_id).copied();
2608 let internal = handle.int_to_ext.len() as u64;
2609 let mut pending = 0.0;
2610 match &mut handle.index {
2611 CollectionIndex::Vamana(Some(fresh)) => {
2612 if let Some(o) = old {
2613 fresh.mark_deleted(o);
2614 }
2615 fresh.insert(internal, vector)?;
2616 pending = fresh.pending_fraction();
2617 }
2618 CollectionIndex::Disk(Some(fresh)) => {
2619 if let Some(o) = old {
2620 fresh.mark_deleted(o);
2621 }
2622 fresh.insert(internal, vector)?;
2623 pending = fresh.pending_fraction();
2624 }
2625 _ => {}
2626 }
2627 handle.ext_to_int.insert(ext_id.to_owned(), internal);
2628 handle.int_to_ext.push(ext_id.to_owned());
2629 if pending >= GRAPH_REBUILD_PENDING_FRACTION {
2630 mark_stale(handle);
2631 }
2632 } else if is_live_colbert {
2633 let old = handle.ext_to_int.get(ext_id).copied();
2637 let internal = handle.int_to_ext.len() as u64;
2638 let mut crowded = false;
2639 if let CollectionIndex::Colbert(Some(c)) = &mut handle.index {
2640 if let Some(o) = old {
2641 c.mark_deleted(o);
2642 }
2643 c.insert(internal, vector)?;
2644 crowded = c.deleted_fraction() >= HNSW_REBUILD_DELETED_FRACTION;
2645 }
2646 handle.ext_to_int.insert(ext_id.to_owned(), internal);
2647 handle.int_to_ext.push(ext_id.to_owned());
2648 if crowded {
2649 mark_stale(handle);
2650 }
2651 } else {
2652 mark_stale(handle);
2653 }
2654 Ok(())
2655}
2656
2657fn index_delete_point(handle: &mut CollectionHandle, ext_id: &str) {
2664 if mvcc_served(handle) {
2667 overlay_delete(handle, ext_id);
2668 return;
2669 }
2670 bump_write_gen(handle);
2672 if handle.stale {
2673 return;
2674 }
2675 let internal = handle.ext_to_int.get(ext_id).copied();
2676 let live_ivf = matches!(handle.index, CollectionIndex::Ivf(Some(_)));
2677 let live_hnsw = matches!(handle.index, CollectionIndex::Hnsw(_));
2678 let live_graph = matches!(
2679 handle.index,
2680 CollectionIndex::Vamana(Some(_)) | CollectionIndex::Disk(Some(_))
2681 );
2682 let live_colbert = matches!(handle.index, CollectionIndex::Colbert(Some(_)));
2683 match internal {
2684 Some(internal) if live_ivf => {
2685 if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2686 ivf.remove(internal);
2687 }
2688 }
2689 Some(internal) if live_hnsw => {
2690 let mut crowded = false;
2691 if let CollectionIndex::Hnsw(h) = &mut handle.index {
2692 h.mark_deleted(internal as u32);
2693 crowded = h.deleted_fraction() >= HNSW_REBUILD_DELETED_FRACTION;
2694 }
2695 if crowded {
2696 mark_stale(handle);
2697 }
2698 }
2699 Some(internal) if live_graph => {
2700 let mut crowded = false;
2701 match &mut handle.index {
2702 CollectionIndex::Vamana(Some(fresh)) => {
2703 fresh.mark_deleted(internal);
2704 crowded = fresh.pending_fraction() >= GRAPH_REBUILD_PENDING_FRACTION;
2705 }
2706 CollectionIndex::Disk(Some(fresh)) => {
2707 fresh.mark_deleted(internal);
2708 crowded = fresh.pending_fraction() >= GRAPH_REBUILD_PENDING_FRACTION;
2709 }
2710 _ => {}
2711 }
2712 if crowded {
2713 mark_stale(handle);
2714 }
2715 }
2716 Some(internal) if live_colbert => {
2717 let mut crowded = false;
2718 if let CollectionIndex::Colbert(Some(c)) = &mut handle.index {
2719 c.mark_deleted(internal);
2720 crowded = c.deleted_fraction() >= HNSW_REBUILD_DELETED_FRACTION;
2721 }
2722 if crowded {
2723 mark_stale(handle);
2724 }
2725 }
2726 _ => mark_stale(handle),
2727 }
2728}
2729
2730struct RebuildScan {
2735 int_to_ext: Vec<String>,
2736 ext_to_int: HashMap<String, u64>,
2737 flat: Vec<f32>,
2738 docs: Option<BTreeMap<String, u32>>,
2739 sparse: Option<SparseInvertedIndex>,
2740}
2741
2742fn scan_collection(store: &Store, handle: &CollectionHandle) -> Result<RebuildScan> {
2747 let multivector = handle.descriptor.multivector;
2748 let mut int_to_ext = Vec::new();
2749 let mut ext_to_int = HashMap::new();
2750 let mut flat: Vec<f32> = Vec::new();
2751 let mut docs: BTreeMap<String, u32> = BTreeMap::new();
2752 let mut sparse = uses_sparse_index(&handle.descriptor).then(SparseInvertedIndex::new);
2755 for (ext_id, record) in store.scan(handle.id)? {
2756 let internal = int_to_ext.len() as u64;
2757 flat.extend_from_slice(&record.vector);
2758 if multivector && let Some((doc, _)) = parse_token_id(&ext_id) {
2759 *docs.entry(doc.to_owned()).or_insert(0) += 1;
2760 }
2761 if let Some(idx) = sparse.as_mut()
2762 && let Some(sv) = sparse_vector_from_payload(&record.payload)
2763 {
2764 idx.upsert(&ext_id, &sv);
2765 }
2766 ext_to_int.insert(ext_id.clone(), internal);
2767 int_to_ext.push(ext_id);
2768 }
2769 Ok(RebuildScan {
2770 int_to_ext,
2771 ext_to_int,
2772 flat,
2773 docs: multivector.then_some(docs),
2774 sparse,
2775 })
2776}
2777
2778fn rebuild_index(store: &Store, handle: &mut CollectionHandle) -> Result<()> {
2782 let scan = scan_collection(store, handle)?;
2783 let ids: Vec<u64> = (0..scan.int_to_ext.len() as u64).collect();
2784 handle.index = empty_index(&handle.descriptor);
2787 handle.index = build_index(store, handle.id, &handle.descriptor, &ids, &scan.flat)?;
2788 handle.int_to_ext = scan.int_to_ext;
2789 handle.ext_to_int = scan.ext_to_int;
2790 handle.docs = scan.docs;
2791 handle.sparse = scan.sparse;
2792 handle.stale = false;
2793 Ok(())
2794}
2795
2796pub struct RebuildInputs {
2802 collection: String,
2803 descriptor: Descriptor,
2804 scan: RebuildScan,
2805 write_gen: u64,
2806}
2807
2808enum RebuiltKind {
2813 Ready(Box<CollectionIndex>),
2814 Disk {
2815 graph: Box<Vamana>,
2816 pq: Box<ProductQuantizer>,
2817 },
2818}
2819
2820pub struct RebuiltIndex {
2823 collection: String,
2824 kind: RebuiltKind,
2825 int_to_ext: Vec<String>,
2826 ext_to_int: HashMap<String, u64>,
2827 docs: Option<BTreeMap<String, u32>>,
2828 sparse: Option<SparseInvertedIndex>,
2829 write_gen: u64,
2830}
2831
2832impl RebuildInputs {
2833 pub fn build(self) -> Result<RebuiltIndex> {
2838 let ids: Vec<u64> = (0..self.scan.int_to_ext.len() as u64).collect();
2839 let kind = match build_in_memory_index(&self.descriptor, &ids, &self.scan.flat)? {
2840 Some(index) => RebuiltKind::Ready(Box::new(index)),
2841 None => {
2842 let (graph, pq) = build_disk_graph_pq(&self.descriptor, &ids, &self.scan.flat)?;
2843 RebuiltKind::Disk {
2844 graph: Box::new(graph),
2845 pq: Box::new(pq),
2846 }
2847 }
2848 };
2849 Ok(RebuiltIndex {
2850 collection: self.collection,
2851 kind,
2852 int_to_ext: self.scan.int_to_ext,
2853 ext_to_int: self.scan.ext_to_int,
2854 docs: self.scan.docs,
2855 sparse: self.scan.sparse,
2856 write_gen: self.write_gen,
2857 })
2858 }
2859}
2860
2861fn sparse_vector_from_payload(payload: &[u8]) -> Option<SparseVector> {
2865 if payload.is_empty() {
2866 return None;
2867 }
2868 let value = serde_json::from_slice::<Value>(payload).ok()?;
2869 sparse_vector_from_value(&value)
2870}
2871
2872fn sparse_vector_from_value(payload: &Value) -> Option<SparseVector> {
2878 if let Some(raw) = payload.get(SPARSE_KEY) {
2879 return serde_json::from_value::<SparseVector>(raw.clone()).ok();
2880 }
2881 let text = payload.get(TEXT_KEY)?.as_str()?;
2882 Some(text_to_sparse(text))
2883}
2884
2885fn sparse_index_upsert_point(handle: &mut CollectionHandle, ext_id: &str, payload: &Value) {
2891 if handle.stale {
2892 return;
2893 }
2894 let Some(idx) = handle.sparse.as_mut() else {
2895 return;
2896 };
2897 match sparse_vector_from_value(payload) {
2898 Some(sv) => idx.upsert(ext_id, &sv),
2899 None => {
2900 idx.remove(ext_id);
2901 }
2902 }
2903}
2904
2905fn sparse_index_delete_point(handle: &mut CollectionHandle, ext_id: &str) {
2908 if let Some(idx) = handle.sparse.as_mut() {
2909 idx.remove(ext_id);
2910 }
2911}
2912
2913fn candidate_ids(
2925 store: &Store,
2926 cid: CollectionId,
2927 filter: &Filter,
2928 filterable: &[FilterableField],
2929) -> Result<Option<BTreeSet<String>>> {
2930 match filter {
2931 Filter::And(subs) => {
2932 let mut acc: Option<BTreeSet<String>> = None;
2935 for sub in subs {
2936 if let Some(set) = candidate_ids(store, cid, sub, filterable)? {
2937 acc = Some(match acc {
2938 Some(existing) => existing.intersection(&set).cloned().collect(),
2939 None => set,
2940 });
2941 }
2942 }
2943 Ok(acc)
2944 }
2945 Filter::Or(subs) => {
2946 let mut acc = BTreeSet::new();
2949 for sub in subs {
2950 match candidate_ids(store, cid, sub, filterable)? {
2951 Some(set) => acc.extend(set),
2952 None => return Ok(None),
2953 }
2954 }
2955 Ok(Some(acc))
2956 }
2957 Filter::Not(_) => Ok(None),
2959 leaf => match leaf_predicate(leaf, filterable) {
2961 Some(pred) => Ok(Some(store.matching_ids(cid, &pred)?.into_iter().collect())),
2962 None => Ok(None),
2963 },
2964 }
2965}
2966
2967fn leaf_predicate(filter: &Filter, filterable: &[FilterableField]) -> Option<SecPredicate> {
2971 let field_type = |field: &str| {
2972 filterable
2973 .iter()
2974 .find(|f| f.path == field)
2975 .map(|f| f.field_type)
2976 };
2977 match filter {
2978 Filter::Eq { field, value } => Some(SecPredicate::Eq {
2979 field: field.clone(),
2980 value: sec_value(field_type(field)?, value)?,
2981 }),
2982 Filter::In { field, values } => {
2983 let ft = field_type(field)?;
2984 let values: Option<Vec<SecValue>> = values.iter().map(|v| sec_value(ft, v)).collect();
2987 Some(SecPredicate::In {
2988 field: field.clone(),
2989 values: values?,
2990 })
2991 }
2992 Filter::Lt { field, value } => {
2993 one_sided_range(field, field_type(field)?, value, false, false)
2994 }
2995 Filter::Lte { field, value } => {
2996 one_sided_range(field, field_type(field)?, value, false, true)
2997 }
2998 Filter::Gt { field, value } => {
2999 one_sided_range(field, field_type(field)?, value, true, false)
3000 }
3001 Filter::Gte { field, value } => {
3002 one_sided_range(field, field_type(field)?, value, true, true)
3003 }
3004 _ => None,
3005 }
3006}
3007
3008fn one_sided_range(
3012 field: &str,
3013 field_type: FieldType,
3014 value: &Value,
3015 is_lower: bool,
3016 inclusive: bool,
3017) -> Option<SecPredicate> {
3018 let v = sec_value(field_type, value)?;
3019 let (lo, hi, lo_inclusive, hi_inclusive) = if is_lower {
3020 (Some(v), None, inclusive, false)
3021 } else {
3022 (None, Some(v), false, inclusive)
3023 };
3024 Some(SecPredicate::Range {
3025 field: field.to_owned(),
3026 lo,
3027 hi,
3028 lo_inclusive,
3029 hi_inclusive,
3030 })
3031}
3032
3033fn sec_value(field_type: FieldType, value: &Value) -> Option<SecValue> {
3037 match (field_type, value) {
3038 (FieldType::Keyword, Value::String(s)) => Some(SecValue::Keyword(s.clone())),
3039 (FieldType::Numeric, Value::Number(n)) => n.as_f64().map(SecValue::Numeric),
3040 _ => None,
3041 }
3042}
3043
3044#[cfg(test)]
3045mod tests {
3046 use super::*;
3047 use serde_json::json;
3048
3049 fn desc() -> Descriptor {
3050 Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
3051 }
3052
3053 fn open(dir: &Path) -> Database {
3054 Database::open(dir).unwrap()
3055 }
3056
3057 #[test]
3058 fn hybrid_search_fuses_dense_and_sparse() {
3059 let tmp = tempfile::tempdir().unwrap();
3060 let mut db = open(tmp.path());
3061 db.create_collection("kb", desc()).unwrap();
3062 db.upsert(
3065 "kb",
3066 "a",
3067 &[1.0, 0.0, 0.0, 0.0],
3068 &json!({ "__quiver_sparse__": { "indices": [100], "values": [0.1] } }),
3069 )
3070 .unwrap();
3071 db.upsert(
3072 "kb",
3073 "b",
3074 &[0.0, 1.0, 0.0, 0.0],
3075 &json!({ "__quiver_sparse__": { "indices": [1, 2], "values": [5.0, 5.0] } }),
3076 )
3077 .unwrap();
3078 db.upsert(
3079 "kb",
3080 "c",
3081 &[0.0, 0.0, 0.0, 1.0],
3082 &json!({ "__quiver_sparse__": { "indices": [9], "values": [1.0] } }),
3083 )
3084 .unwrap();
3085
3086 let dense_q = [1.0, 0.0, 0.0, 0.0];
3087 let sparse_q = SparseVector {
3088 indices: vec![1, 2],
3089 values: vec![1.0, 1.0],
3090 };
3091 let params = SearchParams {
3092 k: 3,
3093 ..SearchParams::default()
3094 };
3095
3096 let hits = db
3098 .hybrid_search(
3099 "kb",
3100 Some(&dense_q),
3101 Some(&sparse_q),
3102 None,
3103 ¶ms,
3104 DEFAULT_RRF_K0,
3105 )
3106 .unwrap();
3107 let ids: Vec<&str> = hits.iter().map(|m| m.id.as_str()).collect();
3108 assert!(ids.contains(&"a") && ids.contains(&"b"), "got {ids:?}");
3109 assert_eq!(ids[2], "c", "c is worst on both sides; got {ids:?}");
3110
3111 let sparse_only = db
3113 .hybrid_search("kb", None, Some(&sparse_q), None, ¶ms, DEFAULT_RRF_K0)
3114 .unwrap();
3115 assert_eq!(sparse_only[0].id, "b");
3116
3117 let dense_only = db
3119 .hybrid_search("kb", Some(&dense_q), None, None, ¶ms, DEFAULT_RRF_K0)
3120 .unwrap();
3121 assert_eq!(dense_only[0].id, "a");
3122
3123 assert!(
3125 db.hybrid_search("kb", None, None, None, ¶ms, DEFAULT_RRF_K0)
3126 .is_err()
3127 );
3128 }
3129
3130 fn sparse_ids(db: &mut Database, q: &SparseVector) -> Vec<String> {
3132 let params = SearchParams {
3133 k: 10,
3134 ..SearchParams::default()
3135 };
3136 db.hybrid_search("kb", None, Some(q), None, ¶ms, DEFAULT_RRF_K0)
3137 .unwrap()
3138 .into_iter()
3139 .map(|m| m.id)
3140 .collect()
3141 }
3142
3143 #[test]
3144 fn sparse_index_equals_the_store_scan_fallback() {
3145 let tmp = tempfile::tempdir().unwrap();
3146 let mut db = open(tmp.path());
3147 db.create_collection("kb", desc()).unwrap();
3148 let z = [0.0f32, 0.0, 0.0, 0.0];
3149 for (id, dims, vals) in [
3150 ("a", vec![1u32, 2], vec![5.0f32, 1.0]),
3151 ("b", vec![2u32, 3], vec![3.0f32, 4.0]),
3152 ("c", vec![1u32, 3], vec![2.0f32, 2.0]),
3153 ("d", vec![9u32], vec![1.0f32]), ] {
3155 db.upsert(
3156 "kb",
3157 id,
3158 &z,
3159 &json!({ "__quiver_sparse__": { "indices": dims, "values": vals } }),
3160 )
3161 .unwrap();
3162 }
3163 let q = SparseVector {
3164 indices: vec![1, 2, 3],
3165 values: vec![1.0, 1.0, 1.0],
3166 };
3167
3168 assert!(db.collections.get("kb").unwrap().sparse.is_some());
3170 let via_index = sparse_ids(&mut db, &q);
3171 assert!(!via_index.contains(&"d".to_owned()), "d shares no term");
3172
3173 db.collections.get_mut("kb").unwrap().sparse = None;
3176 let via_scan = sparse_ids(&mut db, &q);
3177 assert_eq!(via_index, via_scan);
3178 }
3179
3180 #[test]
3181 fn sparse_index_reflects_updates_and_deletes_like_a_rebuild() {
3182 let tmp = tempfile::tempdir().unwrap();
3183 let mut db = open(tmp.path());
3184 db.create_collection("kb", desc()).unwrap();
3185 let z = [0.0f32, 0.0, 0.0, 0.0];
3186 db.upsert(
3187 "kb",
3188 "a",
3189 &z,
3190 &json!({ "__quiver_sparse__": { "indices": [1, 2], "values": [5.0, 5.0] } }),
3191 )
3192 .unwrap();
3193 db.upsert(
3194 "kb",
3195 "b",
3196 &z,
3197 &json!({ "__quiver_sparse__": { "indices": [2], "values": [3.0] } }),
3198 )
3199 .unwrap();
3200 db.upsert(
3201 "kb",
3202 "c",
3203 &z,
3204 &json!({ "__quiver_sparse__": { "indices": [1], "values": [9.0] } }),
3205 )
3206 .unwrap();
3207 db.upsert(
3209 "kb",
3210 "a",
3211 &z,
3212 &json!({ "__quiver_sparse__": { "indices": [7], "values": [1.0] } }),
3213 )
3214 .unwrap();
3215 assert!(db.delete("kb", "b").unwrap());
3216
3217 let q = SparseVector {
3218 indices: vec![1, 2],
3219 values: vec![1.0, 1.0],
3220 };
3221 let incremental = sparse_ids(&mut db, &q);
3223 assert_eq!(incremental, vec!["c".to_owned()]);
3224
3225 db.collections.get_mut("kb").unwrap().stale = true;
3227 let rebuilt = sparse_ids(&mut db, &q);
3228 assert_eq!(incremental, rebuilt);
3229 }
3230
3231 #[test]
3232 fn sparse_index_is_rebuilt_on_reopen() {
3233 let tmp = tempfile::tempdir().unwrap();
3234 {
3235 let mut db = open(tmp.path());
3236 db.create_collection("kb", desc()).unwrap();
3237 db.upsert(
3238 "kb",
3239 "a",
3240 &[0.0, 0.0, 0.0, 0.0],
3241 &json!({ "__quiver_sparse__": { "indices": [1], "values": [1.0] } }),
3242 )
3243 .unwrap();
3244 }
3245 let mut db = open(tmp.path());
3246 assert!(db.collections.get("kb").unwrap().sparse.is_some());
3247 let q = SparseVector {
3248 indices: vec![1],
3249 values: vec![1.0],
3250 };
3251 assert_eq!(sparse_ids(&mut db, &q), vec!["a".to_owned()]);
3252 }
3253
3254 #[test]
3255 fn hybrid_sparse_honours_the_payload_filter() {
3256 let tmp = tempfile::tempdir().unwrap();
3257 let mut db = open(tmp.path());
3258 db.create_collection("kb", desc()).unwrap();
3259 let z = [0.0f32, 0.0, 0.0, 0.0];
3260 db.upsert(
3261 "kb",
3262 "a",
3263 &z,
3264 &json!({ "lang": "en", "__quiver_sparse__": { "indices": [1], "values": [5.0] } }),
3265 )
3266 .unwrap();
3267 db.upsert(
3268 "kb",
3269 "b",
3270 &z,
3271 &json!({ "lang": "fr", "__quiver_sparse__": { "indices": [1], "values": [9.0] } }),
3272 )
3273 .unwrap();
3274 let q = SparseVector {
3275 indices: vec![1],
3276 values: vec![1.0],
3277 };
3278 let params = SearchParams {
3279 k: 10,
3280 filter: Some(Filter::Eq {
3281 field: "lang".to_owned(),
3282 value: json!("en"),
3283 }),
3284 ..SearchParams::default()
3285 };
3286 let hits: Vec<String> = db
3287 .hybrid_search("kb", None, Some(&q), None, ¶ms, DEFAULT_RRF_K0)
3288 .unwrap()
3289 .into_iter()
3290 .map(|m| m.id)
3291 .collect();
3292 assert_eq!(hits, vec!["a".to_owned()]);
3294 }
3295
3296 #[test]
3297 fn hybrid_text_search_indexes_and_ranks_by_bm25() {
3298 let tmp = tempfile::tempdir().unwrap();
3299 let mut db = open(tmp.path());
3300 db.create_collection("kb", desc()).unwrap();
3301 let z = [0.0f32, 0.0, 0.0, 0.0];
3302 db.upsert(
3304 "kb",
3305 "cats",
3306 &z,
3307 &json!({ "__quiver_text__": "the quick brown cat jumps" }),
3308 )
3309 .unwrap();
3310 db.upsert(
3311 "kb",
3312 "dogs",
3313 &z,
3314 &json!({ "__quiver_text__": "a lazy dog sleeps all day" }),
3315 )
3316 .unwrap();
3317
3318 let params = SearchParams {
3319 k: 10,
3320 ..SearchParams::default()
3321 };
3322 let hits: Vec<String> = db
3325 .hybrid_search("kb", None, None, Some("cats"), ¶ms, DEFAULT_RRF_K0)
3326 .unwrap()
3327 .into_iter()
3328 .map(|m| m.id)
3329 .collect();
3330 assert_eq!(hits, vec!["cats".to_owned()], "only the cat doc matches");
3331
3332 assert!(
3334 db.hybrid_search("kb", None, None, Some("elephant"), ¶ms, DEFAULT_RRF_K0)
3335 .unwrap()
3336 .is_empty()
3337 );
3338
3339 let dense_q = [1.0, 0.0, 0.0, 0.0];
3341 db.upsert("kb", "near", &[1.0, 0.0, 0.0, 0.0], &json!({}))
3342 .unwrap();
3343 let fused: Vec<String> = db
3344 .hybrid_search(
3345 "kb",
3346 Some(&dense_q),
3347 None,
3348 Some("dog"),
3349 ¶ms,
3350 DEFAULT_RRF_K0,
3351 )
3352 .unwrap()
3353 .into_iter()
3354 .map(|m| m.id)
3355 .collect();
3356 assert!(
3357 fused.contains(&"near".to_owned()) && fused.contains(&"dogs".to_owned()),
3358 "dense match + lexical match both surface; got {fused:?}"
3359 );
3360 }
3361
3362 #[test]
3363 fn create_upsert_search_get_end_to_end() {
3364 let tmp = tempfile::tempdir().unwrap();
3365 let mut db = open(tmp.path());
3366 db.create_collection("items", desc()).unwrap();
3367 db.upsert(
3368 "items",
3369 "a",
3370 &[0.0, 0.0, 0.0, 0.0],
3371 &json!({"color": "red"}),
3372 )
3373 .unwrap();
3374 db.upsert(
3375 "items",
3376 "b",
3377 &[1.0, 0.0, 0.0, 0.0],
3378 &json!({"color": "blue"}),
3379 )
3380 .unwrap();
3381 db.upsert(
3382 "items",
3383 "c",
3384 &[5.0, 5.0, 5.0, 5.0],
3385 &json!({"color": "red"}),
3386 )
3387 .unwrap();
3388
3389 let near = db
3390 .search("items", &[0.1, 0.0, 0.0, 0.0], &SearchParams::default())
3391 .unwrap();
3392 assert_eq!(near[0].id, "a");
3393 assert_eq!(near[1].id, "b");
3394
3395 let got = db.get("items", "c").unwrap().unwrap();
3396 assert_eq!(got.vector, Some(vec![5.0, 5.0, 5.0, 5.0]));
3397 assert_eq!(got.payload, Some(json!({"color": "red"})));
3398 }
3399
3400 #[test]
3401 fn upsert_batch_produces_same_search_results_as_sequential() {
3402 let tmp_seq = tempfile::tempdir().unwrap();
3403 let tmp_bat = tempfile::tempdir().unwrap();
3404
3405 let vectors: Vec<[f32; 4]> = (0..20u32).map(|i| [i as f32, 0.0, 0.0, 0.0]).collect();
3406 let ids: Vec<String> = (0..20u32).map(|i| format!("p{i}")).collect();
3407 let payload = json!({});
3408
3409 {
3411 let mut db = open(tmp_seq.path());
3412 db.create_collection("c", desc()).unwrap();
3413 for (id, vec) in ids.iter().zip(vectors.iter()) {
3414 db.upsert("c", id, vec, &payload).unwrap();
3415 }
3416 }
3417 {
3419 let mut db = open(tmp_bat.path());
3420 db.create_collection("c", desc()).unwrap();
3421 let pts: Vec<(&str, &[f32], &serde_json::Value)> = ids
3422 .iter()
3423 .zip(vectors.iter())
3424 .map(|(id, v)| (id.as_str(), v.as_slice(), &payload))
3425 .collect();
3426 let n = db.upsert_batch("c", &pts).unwrap();
3427 assert_eq!(n, 20);
3428 }
3429
3430 let query = [10.0f32, 0.0, 0.0, 0.0];
3431 let params = SearchParams {
3432 k: 5,
3433 ..Default::default()
3434 };
3435
3436 let mut seq_db = open(tmp_seq.path());
3437 let mut bat_db = open(tmp_bat.path());
3438 let seq: Vec<String> = seq_db
3439 .search("c", &query, ¶ms)
3440 .unwrap()
3441 .into_iter()
3442 .map(|m| m.id)
3443 .collect();
3444 let bat: Vec<String> = bat_db
3445 .search("c", &query, ¶ms)
3446 .unwrap()
3447 .into_iter()
3448 .map(|m| m.id)
3449 .collect();
3450 assert_eq!(
3451 seq, bat,
3452 "batch and sequential produce different search results"
3453 );
3454 }
3455
3456 #[test]
3457 fn upsert_bulk_defers_the_index_then_searches_correctly() {
3458 let tmp = tempfile::tempdir().unwrap();
3459 let mut db = open(tmp.path());
3460 db.create_collection("c", desc()).unwrap();
3461 let vectors: Vec<[f32; 4]> = (0..20u32).map(|i| [i as f32, 0.0, 0.0, 0.0]).collect();
3462 let ids: Vec<String> = (0..20u32).map(|i| format!("p{i}")).collect();
3463 let plain = json!({});
3466 let sparse_payload = json!({ "__quiver_sparse__": { "indices": [7], "values": [1.0] } });
3467 let pts: Vec<(&str, &[f32], &serde_json::Value)> = ids
3468 .iter()
3469 .zip(vectors.iter())
3470 .map(|(id, v)| {
3471 let payload = if id == "p3" { &sparse_payload } else { &plain };
3472 (id.as_str(), v.as_slice(), payload)
3473 })
3474 .collect();
3475 let n = db.upsert_bulk("c", &pts).unwrap();
3476 assert_eq!(n, 20);
3477
3478 assert!(db.collections.get("c").unwrap().stale);
3480
3481 let query = [10.0f32, 0.0, 0.0, 0.0];
3483 let params = SearchParams {
3484 k: 5,
3485 ..Default::default()
3486 };
3487 let hits: Vec<String> = db
3488 .search("c", &query, ¶ms)
3489 .unwrap()
3490 .into_iter()
3491 .map(|m| m.id)
3492 .collect();
3493 assert_eq!(hits[0], "p10", "nearest to 10 is p10; got {hits:?}");
3494 assert!(!db.collections.get("c").unwrap().stale, "rebuilt on read");
3495
3496 let q = SparseVector {
3498 indices: vec![7],
3499 values: vec![1.0],
3500 };
3501 let sparse_hits: Vec<String> = db
3502 .hybrid_search("c", None, Some(&q), None, ¶ms, DEFAULT_RRF_K0)
3503 .unwrap()
3504 .into_iter()
3505 .map(|m| m.id)
3506 .collect();
3507 assert_eq!(sparse_hits, vec!["p3".to_owned()]);
3508 }
3509
3510 #[test]
3511 fn filtered_search_only_returns_matching_payloads() {
3512 let tmp = tempfile::tempdir().unwrap();
3513 let mut db = open(tmp.path());
3514 db.create_collection("items", desc()).unwrap();
3515 for i in 0..20u32 {
3516 let color = if i % 2 == 0 { "red" } else { "blue" };
3517 db.upsert(
3518 "items",
3519 &format!("p{i}"),
3520 &[i as f32, 0.0, 0.0, 0.0],
3521 &json!({"color": color, "n": i}),
3522 )
3523 .unwrap();
3524 }
3525 let params = SearchParams {
3526 k: 5,
3527 filter: Some(Filter::Eq {
3528 field: "color".into(),
3529 value: json!("red"),
3530 }),
3531 ef_search: 64,
3532 with_payload: true,
3533 with_vector: false,
3534 };
3535 let results = db.search("items", &[0.0; 4], ¶ms).unwrap();
3536 assert!(!results.is_empty());
3537 for m in &results {
3538 assert_eq!(m.payload.as_ref().unwrap()["color"], json!("red"));
3539 }
3540 }
3541
3542 #[test]
3543 fn persists_and_rebuilds_index_on_reopen() {
3544 let tmp = tempfile::tempdir().unwrap();
3545 {
3546 let mut db = open(tmp.path());
3547 db.create_collection("items", desc()).unwrap();
3548 for i in 0..50u32 {
3549 db.upsert(
3550 "items",
3551 &format!("p{i}"),
3552 &[i as f32, 1.0, 2.0, 3.0],
3553 &json!({}),
3554 )
3555 .unwrap();
3556 }
3557 db.checkpoint().unwrap();
3558 }
3559 let mut db = open(tmp.path());
3560 assert_eq!(db.len("items").unwrap(), 50);
3561 let res = db
3562 .search("items", &[7.0, 1.0, 2.0, 3.0], &SearchParams::default())
3563 .unwrap();
3564 assert_eq!(res[0].id, "p7");
3565 }
3566
3567 #[test]
3568 fn update_reflects_new_vector_after_rebuild() {
3569 let tmp = tempfile::tempdir().unwrap();
3570 let mut db = open(tmp.path());
3571 db.create_collection("items", desc()).unwrap();
3572 db.upsert("items", "a", &[0.0; 4], &json!({})).unwrap();
3573 db.upsert("items", "b", &[10.0, 0.0, 0.0, 0.0], &json!({}))
3574 .unwrap();
3575 db.upsert("items", "a", &[100.0, 0.0, 0.0, 0.0], &json!({}))
3577 .unwrap();
3578 let res = db
3579 .search("items", &[0.0; 4], &SearchParams::default())
3580 .unwrap();
3581 assert_eq!(res[0].id, "b");
3582 assert_eq!(
3583 db.get("items", "a").unwrap().unwrap().vector,
3584 Some(vec![100.0, 0.0, 0.0, 0.0])
3585 );
3586 }
3587
3588 #[test]
3589 fn delete_removes_from_search() {
3590 let tmp = tempfile::tempdir().unwrap();
3591 let mut db = open(tmp.path());
3592 db.create_collection("items", desc()).unwrap();
3593 db.upsert("items", "a", &[0.0; 4], &json!({})).unwrap();
3594 db.upsert("items", "b", &[1.0, 0.0, 0.0, 0.0], &json!({}))
3595 .unwrap();
3596 assert!(db.delete("items", "a").unwrap());
3597 let res = db
3598 .search("items", &[0.0; 4], &SearchParams::default())
3599 .unwrap();
3600 assert!(res.iter().all(|m| m.id != "a"));
3601 assert!(db.get("items", "a").unwrap().is_none());
3602 }
3603
3604 #[test]
3605 fn unknown_collection_errors() {
3606 let tmp = tempfile::tempdir().unwrap();
3607 let mut db = open(tmp.path());
3608 assert!(matches!(
3609 db.search("nope", &[0.0; 4], &SearchParams::default()),
3610 Err(Error::CollectionNotFound(_))
3611 ));
3612 db.create_collection("c", desc()).unwrap();
3613 assert!(matches!(
3614 db.create_collection("c", desc()),
3615 Err(Error::Core(quiver_core::CoreError::AlreadyExists(_)))
3616 ));
3617 }
3618
3619 fn desc_with(kind: IndexKind) -> Descriptor {
3620 Descriptor::new(4, Dtype::F32, DistanceMetric::L2).with_index(IndexSpec {
3621 kind,
3622 pq_subspaces: None,
3623 })
3624 }
3625
3626 #[test]
3627 fn vamana_and_ivf_collections_find_the_nearest_point() {
3628 for kind in [IndexKind::Vamana, IndexKind::Ivf] {
3629 let tmp = tempfile::tempdir().unwrap();
3630 let mut db = open(tmp.path());
3631 db.create_collection("c", desc_with(kind)).unwrap();
3632 for i in 0..40u32 {
3633 db.upsert(
3634 "c",
3635 &format!("p{i}"),
3636 &[i as f32, 0.0, 0.0, 0.0],
3637 &json!({}),
3638 )
3639 .unwrap();
3640 }
3641 let res = db
3643 .search("c", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
3644 .unwrap();
3645 assert_eq!(res[0].id, "p7", "{kind:?} nearest");
3646 }
3647 }
3648
3649 #[test]
3650 fn index_kind_persists_and_rebuilds_on_reopen() {
3651 let tmp = tempfile::tempdir().unwrap();
3652 {
3653 let mut db = open(tmp.path());
3654 db.create_collection("v", desc_with(IndexKind::Vamana))
3655 .unwrap();
3656 for i in 0..20u32 {
3657 db.upsert(
3658 "v",
3659 &format!("p{i}"),
3660 &[i as f32, 1.0, 2.0, 3.0],
3661 &json!({}),
3662 )
3663 .unwrap();
3664 }
3665 db.checkpoint().unwrap();
3666 }
3667 let mut db = open(tmp.path());
3668 assert_eq!(db.descriptor("v").unwrap().index.kind, IndexKind::Vamana);
3669 let res = db
3670 .search("v", &[7.0, 1.0, 2.0, 3.0], &SearchParams::default())
3671 .unwrap();
3672 assert_eq!(res[0].id, "p7");
3673 }
3674
3675 #[test]
3680 fn disk_index_loads_from_snapshot_without_rebuild_on_reopen() {
3681 let tmp = tempfile::tempdir().unwrap();
3682 let cid;
3683 {
3684 let mut db = open(tmp.path());
3685 db.create_collection("d", desc_with(IndexKind::DiskVamana))
3686 .unwrap();
3687 for i in 0..100u32 {
3688 db.upsert(
3689 "d",
3690 &format!("p{i}"),
3691 &[i as f32, 0.0, 0.0, 0.0],
3692 &json!({}),
3693 )
3694 .unwrap();
3695 }
3696 db.search("d", &[1.0, 0.0, 0.0, 0.0], &SearchParams::default())
3698 .unwrap();
3699 db.checkpoint().unwrap();
3700 for i in 100..115u32 {
3703 db.upsert(
3704 "d",
3705 &format!("p{i}"),
3706 &[i as f32, 0.0, 0.0, 0.0],
3707 &json!({}),
3708 )
3709 .unwrap();
3710 }
3711 cid = db.collections["d"].id;
3712 let base = open_disk_index(
3713 &db.store,
3714 cid,
3715 db.store.collection_codec_clone(cid).unwrap(),
3716 )
3717 .unwrap();
3718 assert_eq!(base.len(), 100, "base sealed at the checkpoint count");
3719 }
3720
3721 let mut db = open(tmp.path());
3722 assert_eq!(
3724 db.search("d", &[50.0, 0.0, 0.0, 0.0], &SearchParams::default())
3725 .unwrap()[0]
3726 .id,
3727 "p50",
3728 );
3729 assert_eq!(
3730 db.search("d", &[110.0, 0.0, 0.0, 0.0], &SearchParams::default())
3731 .unwrap()[0]
3732 .id,
3733 "p110",
3734 "post-checkpoint insert survived reopen via WAL-tail replay",
3735 );
3736 let base = open_disk_index(
3739 &db.store,
3740 cid,
3741 db.store.collection_codec_clone(cid).unwrap(),
3742 )
3743 .unwrap();
3744 assert_eq!(
3745 base.len(),
3746 100,
3747 "reopen loaded the base; it was not rebuilt"
3748 );
3749 }
3750
3751 #[test]
3754 fn disk_index_falls_back_to_rebuild_when_base_is_missing() {
3755 let tmp = tempfile::tempdir().unwrap();
3756 let base_path;
3757 {
3758 let mut db = open(tmp.path());
3759 db.create_collection("d", desc_with(IndexKind::DiskVamana))
3760 .unwrap();
3761 for i in 0..60u32 {
3762 db.upsert(
3763 "d",
3764 &format!("p{i}"),
3765 &[i as f32, 0.0, 0.0, 0.0],
3766 &json!({}),
3767 )
3768 .unwrap();
3769 }
3770 db.search("d", &[1.0, 0.0, 0.0, 0.0], &SearchParams::default())
3771 .unwrap();
3772 db.checkpoint().unwrap();
3773 let cid = db.collections["d"].id;
3774 base_path = db.store.index_dir(cid).join(DISK_INDEX_FILE);
3775 }
3776 std::fs::remove_file(&base_path).unwrap();
3778 {
3779 let mut db = open(tmp.path());
3780 assert_eq!(
3781 db.search("d", &[25.0, 0.0, 0.0, 0.0], &SearchParams::default())
3782 .unwrap()[0]
3783 .id,
3784 "p25",
3785 "rebuild fallback still answers correctly after a lost base",
3786 );
3787 assert!(
3790 base_path.exists(),
3791 "the fallback rebuild re-sealed the base file"
3792 );
3793 db.checkpoint().unwrap();
3794 }
3795 let len = std::fs::metadata(&base_path).unwrap().len();
3798 std::fs::OpenOptions::new()
3799 .write(true)
3800 .open(&base_path)
3801 .unwrap()
3802 .set_len(len / 2)
3803 .unwrap();
3804
3805 let mut db = open(tmp.path());
3806 assert_eq!(
3807 db.search("d", &[25.0, 0.0, 0.0, 0.0], &SearchParams::default())
3808 .unwrap()[0]
3809 .id,
3810 "p25",
3811 "rebuild fallback still answers correctly after a torn base",
3812 );
3813 }
3814
3815 #[test]
3816 fn ivf_upserts_and_deletes_incrementally_without_rebuild() {
3817 let tmp = tempfile::tempdir().unwrap();
3818 let mut db = open(tmp.path());
3819 db.create_collection("c", desc_with(IndexKind::Ivf))
3820 .unwrap();
3821 for i in 0..50u32 {
3822 db.upsert(
3823 "c",
3824 &format!("p{i}"),
3825 &[i as f32, 0.0, 0.0, 0.0],
3826 &json!({}),
3827 )
3828 .unwrap();
3829 }
3830 let _ = db
3832 .search("c", &[1.0, 0.0, 0.0, 0.0], &SearchParams::default())
3833 .unwrap();
3834 assert!(!db.collections["c"].stale, "the search built the index");
3835
3836 db.upsert("c", "far", &[500.0, 0.0, 0.0, 0.0], &json!({}))
3838 .unwrap();
3839 assert!(!db.collections["c"].stale, "ivf insert stayed incremental");
3840 let res = db
3841 .search("c", &[500.0, 0.0, 0.0, 0.0], &SearchParams::default())
3842 .unwrap();
3843 assert_eq!(res[0].id, "far");
3844
3845 assert!(db.delete("c", "far").unwrap());
3847 assert!(!db.collections["c"].stale, "ivf delete stayed incremental");
3848 let res = db
3849 .search("c", &[500.0, 0.0, 0.0, 0.0], &SearchParams::default())
3850 .unwrap();
3851 assert!(res.iter().all(|m| m.id != "far"), "deleted point is gone");
3852 }
3853
3854 #[test]
3855 fn ivf_incremental_update_replaces_the_vector() {
3856 let tmp = tempfile::tempdir().unwrap();
3857 let mut db = open(tmp.path());
3858 db.create_collection("c", desc_with(IndexKind::Ivf))
3859 .unwrap();
3860 for i in 0..30u32 {
3861 db.upsert(
3862 "c",
3863 &format!("p{i}"),
3864 &[i as f32, 0.0, 0.0, 0.0],
3865 &json!({}),
3866 )
3867 .unwrap();
3868 }
3869 let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
3870 db.upsert("c", "p5", &[900.0, 0.0, 0.0, 0.0], &json!({}))
3872 .unwrap();
3873 assert!(!db.collections["c"].stale);
3874 let at_new = db
3875 .search("c", &[900.0, 0.0, 0.0, 0.0], &SearchParams::default())
3876 .unwrap();
3877 assert_eq!(at_new[0].id, "p5", "p5 found at its new location");
3878 let at_old = db
3879 .search("c", &[5.0, 0.0, 0.0, 0.0], &SearchParams::default())
3880 .unwrap();
3881 assert!(at_old.iter().all(|m| m.id != "p5"), "stale vector is gone");
3882 }
3883
3884 #[test]
3885 fn ivf_reinsert_after_incremental_delete_is_found() {
3886 let tmp = tempfile::tempdir().unwrap();
3887 let mut db = open(tmp.path());
3888 db.create_collection("c", desc_with(IndexKind::Ivf))
3889 .unwrap();
3890 for i in 0..20u32 {
3891 db.upsert(
3892 "c",
3893 &format!("p{i}"),
3894 &[i as f32, 0.0, 0.0, 0.0],
3895 &json!({}),
3896 )
3897 .unwrap();
3898 }
3899 let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
3900 assert!(db.delete("c", "p3").unwrap());
3901 assert!(!db.collections["c"].stale);
3902 db.upsert("c", "p3", &[3.0, 0.0, 0.0, 0.0], &json!({}))
3904 .unwrap();
3905 assert!(!db.collections["c"].stale);
3906 let res = db
3907 .search("c", &[3.0, 0.0, 0.0, 0.0], &SearchParams::default())
3908 .unwrap();
3909 assert_eq!(res[0].id, "p3");
3910 }
3911
3912 #[test]
3913 fn hnsw_in_place_update_falls_back_to_rebuild() {
3914 let tmp = tempfile::tempdir().unwrap();
3916 let mut db = open(tmp.path());
3917 db.create_collection("c", desc()).unwrap();
3918 for i in 0..10u32 {
3919 db.upsert(
3920 "c",
3921 &format!("p{i}"),
3922 &[i as f32, 0.0, 0.0, 0.0],
3923 &json!({}),
3924 )
3925 .unwrap();
3926 }
3927 let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
3928 assert!(!db.collections["c"].stale);
3929 db.upsert("c", "p2", &[42.0, 0.0, 0.0, 0.0], &json!({}))
3930 .unwrap();
3931 assert!(db.collections["c"].stale, "hnsw update schedules a rebuild");
3932 let res = db
3934 .search("c", &[42.0, 0.0, 0.0, 0.0], &SearchParams::default())
3935 .unwrap();
3936 assert_eq!(res[0].id, "p2");
3937 }
3938
3939 #[test]
3940 fn unsupported_index_configurations_are_rejected() {
3941 let tmp = tempfile::tempdir().unwrap();
3942 let mut db = open(tmp.path());
3943 let dot_vamana =
3945 Descriptor::new(4, Dtype::F32, DistanceMetric::Dot).with_index(IndexSpec {
3946 kind: IndexKind::Vamana,
3947 pq_subspaces: None,
3948 });
3949 assert!(matches!(
3950 db.create_collection("a", dot_vamana),
3951 Err(Error::Unsupported(_))
3952 ));
3953 let dot_disk = Descriptor::new(4, Dtype::F32, DistanceMetric::Dot).with_index(IndexSpec {
3955 kind: IndexKind::DiskVamana,
3956 pq_subspaces: None,
3957 });
3958 assert!(matches!(
3959 db.create_collection("b", dot_disk),
3960 Err(Error::Unsupported(_))
3961 ));
3962 }
3963
3964 #[test]
3965 fn dcpe_collections_require_the_l2_metric() {
3966 let tmp = tempfile::tempdir().unwrap();
3967 let mut db = open(tmp.path());
3968 for metric in [DistanceMetric::Cosine, DistanceMetric::Dot] {
3970 let bad = Descriptor::new(4, Dtype::F32, metric)
3971 .with_vector_encryption(VectorEncryption::Dcpe);
3972 assert!(matches!(
3973 db.create_collection("bad", bad),
3974 Err(Error::Unsupported(_))
3975 ));
3976 }
3977 let good = Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
3979 .with_vector_encryption(VectorEncryption::Dcpe);
3980 db.create_collection("enc", good)
3981 .expect("l2 dcpe collection");
3982 assert_eq!(
3983 db.descriptor("enc").expect("descriptor").vector_encryption,
3984 VectorEncryption::Dcpe
3985 );
3986 }
3987
3988 #[test]
3989 fn client_side_collections_are_fetch_only_and_reject_search() {
3990 let tmp = tempfile::tempdir().unwrap();
3991 let mut db = open(tmp.path());
3992 let desc = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
3995 .with_vector_encryption(VectorEncryption::ClientSide);
3996 db.create_collection("vault", desc)
3997 .expect("create client-side collection");
3998 assert!(matches!(
4000 db.collections["vault"].index,
4001 CollectionIndex::None
4002 ));
4003
4004 for i in 0..5 {
4007 let tier = if i < 2 { "vip" } else { "std" };
4008 db.upsert(
4009 "vault",
4010 &format!("p{i}"),
4011 &[0.0; 4],
4012 &serde_json::json!({ "__quiver_vec__": format!("ct-{i}"), "tier": tier }),
4013 )
4014 .expect("upsert");
4015 }
4016 assert_eq!(db.len("vault").unwrap(), 5);
4017 assert!(matches!(
4019 db.collections["vault"].index,
4020 CollectionIndex::None
4021 ));
4022
4023 assert!(matches!(
4025 db.search("vault", &[0.0; 4], &SearchParams::default()),
4026 Err(Error::Unsupported(_))
4027 ));
4028
4029 let all = db.fetch("vault", None, 0, 100, true, false).unwrap();
4032 assert_eq!(all.len(), 5);
4033 assert!(
4034 all.iter()
4035 .all(|m| m.payload.is_some() && m.vector.is_none())
4036 );
4037
4038 let vip = db
4040 .fetch(
4041 "vault",
4042 Some(&Filter::Eq {
4043 field: "tier".to_owned(),
4044 value: serde_json::json!("vip"),
4045 }),
4046 0,
4047 100,
4048 false,
4049 false,
4050 )
4051 .unwrap();
4052 assert_eq!(vip.len(), 2);
4053 assert_eq!(
4055 db.fetch("vault", None, 0, 2, false, false).unwrap().len(),
4056 2
4057 );
4058 assert_eq!(
4060 db.fetch("vault", None, 3, 100, false, false).unwrap().len(),
4061 2
4062 );
4063
4064 assert_eq!(db.get("vault", "p0").unwrap().unwrap().id, "p0");
4067 assert!(db.delete("vault", "p0").unwrap());
4068 assert_eq!(db.len("vault").unwrap(), 4);
4069 }
4070
4071 #[test]
4072 fn client_side_encryption_rejects_multivector() {
4073 let tmp = tempfile::tempdir().unwrap();
4074 let mut db = open(tmp.path());
4075 let desc = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
4076 .with_multivector(true)
4077 .with_vector_encryption(VectorEncryption::ClientSide);
4078 assert!(matches!(
4079 db.create_collection("bad", desc),
4080 Err(Error::Unsupported(_))
4081 ));
4082 }
4083
4084 fn contains_file(dir: &Path, name: &str) -> bool {
4086 std::fs::read_dir(dir).is_ok_and(|rd| {
4087 rd.flatten().any(|e| {
4088 let p = e.path();
4089 if p.is_dir() {
4090 contains_file(&p, name)
4091 } else {
4092 p.file_name().is_some_and(|f| f == name)
4093 }
4094 })
4095 })
4096 }
4097
4098 #[test]
4099 fn disk_index_collection_searches_persists_and_writes_an_artifact() {
4100 let tmp = tempfile::tempdir().unwrap();
4101 {
4102 let mut db = open(tmp.path());
4103 db.create_collection("d", desc_with(IndexKind::DiskVamana))
4104 .unwrap();
4105 for i in 0..40u32 {
4106 db.upsert(
4107 "d",
4108 &format!("p{i}"),
4109 &[i as f32, 0.0, 0.0, 0.0],
4110 &json!({}),
4111 )
4112 .unwrap();
4113 }
4114 let res = db
4115 .search("d", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
4116 .unwrap();
4117 assert_eq!(res[0].id, "p7");
4118 db.checkpoint().unwrap();
4119 }
4120 assert!(
4122 contains_file(tmp.path(), "vamana.qvx"),
4123 "disk index file missing"
4124 );
4125 let mut db = open(tmp.path());
4127 assert_eq!(
4128 db.descriptor("d").unwrap().index.kind,
4129 IndexKind::DiskVamana
4130 );
4131 let res = db
4132 .search("d", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
4133 .unwrap();
4134 assert_eq!(res[0].id, "p7");
4135 }
4136
4137 #[test]
4138 fn graph_collections_maintain_writes_incrementally() {
4139 for kind in [IndexKind::Vamana, IndexKind::DiskVamana] {
4143 let tmp = tempfile::tempdir().unwrap();
4144 let mut db = open(tmp.path());
4145 db.create_collection("c", desc_with(kind)).unwrap();
4146 for i in 0..40u32 {
4147 db.upsert(
4148 "c",
4149 &format!("p{i}"),
4150 &[i as f32, 0.0, 0.0, 0.0],
4151 &json!({}),
4152 )
4153 .unwrap();
4154 }
4155 let res = db
4157 .search("c", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
4158 .unwrap();
4159 assert_eq!(res[0].id, "p7", "{kind:?} base nearest");
4160
4161 db.upsert("c", "p7b", &[7.4, 0.0, 0.0, 0.0], &json!({}))
4164 .unwrap();
4165 let res = db
4166 .search("c", &[7.45, 0.0, 0.0, 0.0], &SearchParams::default())
4167 .unwrap();
4168 assert_eq!(res[0].id, "p7b", "{kind:?} delta insert not found");
4169
4170 assert!(db.delete("c", "p7").unwrap());
4172 let res = db
4173 .search("c", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
4174 .unwrap();
4175 assert!(
4176 res.iter().all(|m| m.id != "p7"),
4177 "{kind:?} deleted id returned"
4178 );
4179
4180 db.upsert("c", "p20", &[500.0, 0.0, 0.0, 0.0], &json!({}))
4183 .unwrap();
4184 let res = db
4185 .search("c", &[500.0, 0.0, 0.0, 0.0], &SearchParams::default())
4186 .unwrap();
4187 assert_eq!(res[0].id, "p20", "{kind:?} updated vector not at new spot");
4188 let res = db
4189 .search("c", &[20.0, 0.0, 0.0, 0.0], &SearchParams::default())
4190 .unwrap();
4191 assert_ne!(
4192 res[0].id, "p20",
4193 "{kind:?} stale copy still nearest old spot"
4194 );
4195 }
4196 }
4197
4198 #[test]
4199 fn graph_consolidates_under_heavy_churn() {
4200 let tmp = tempfile::tempdir().unwrap();
4204 let mut db = open(tmp.path());
4205 db.create_collection("c", desc_with(IndexKind::Vamana))
4206 .unwrap();
4207 for i in 0..50u32 {
4208 db.upsert(
4209 "c",
4210 &format!("p{i}"),
4211 &[i as f32, 0.0, 0.0, 0.0],
4212 &json!({}),
4213 )
4214 .unwrap();
4215 }
4216 let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
4217
4218 let deleted: Vec<String> = (0..15u32).map(|i| format!("p{i}")).collect();
4219 for i in 0..15u32 {
4220 assert!(db.delete("c", &format!("p{i}")).unwrap());
4221 db.upsert(
4222 "c",
4223 &format!("q{i}"),
4224 &[1000.0 + i as f32, 0.0, 0.0, 0.0],
4225 &json!({}),
4226 )
4227 .unwrap();
4228 }
4229
4230 let near_origin = db
4231 .search("c", &[5.0, 0.0, 0.0, 0.0], &SearchParams::default())
4232 .unwrap();
4233 assert!(
4234 near_origin.iter().all(|m| !deleted.contains(&m.id)),
4235 "a churned-out id was returned"
4236 );
4237 let near_q = db
4238 .search("c", &[1007.0, 0.0, 0.0, 0.0], &SearchParams::default())
4239 .unwrap();
4240 assert_eq!(near_q[0].id, "q7", "new point not found after churn");
4241
4242 db.checkpoint().unwrap();
4243 drop(db);
4244 let mut db = open(tmp.path());
4245 let near_q = db
4246 .search("c", &[1007.0, 0.0, 0.0, 0.0], &SearchParams::default())
4247 .unwrap();
4248 assert_eq!(near_q[0].id, "q7", "new point lost across reopen");
4249 let near_origin = db
4250 .search("c", &[5.0, 0.0, 0.0, 0.0], &SearchParams::default())
4251 .unwrap();
4252 assert!(
4253 near_origin.iter().all(|m| !deleted.contains(&m.id)),
4254 "a churned-out id resurfaced after reopen"
4255 );
4256 }
4257
4258 #[test]
4259 fn multivector_writes_are_incremental_and_match_a_rebuild() {
4260 let dir = |theta: f32| vec![theta.cos(), theta.sin(), 0.0, 0.0];
4269 let doc = |theta: f32| vec![dir(theta), dir(theta)];
4270 for kind in [
4271 IndexKind::Ivf,
4272 IndexKind::Hnsw,
4273 IndexKind::Vamana,
4274 IndexKind::Colbert,
4275 ] {
4276 let tmp = tempfile::tempdir().unwrap();
4277 let desc = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
4278 .with_multivector(true)
4279 .with_index(IndexSpec {
4280 kind,
4281 pq_subspaces: None,
4282 });
4283 let mut db = open(tmp.path());
4284 db.create_collection("m", desc).unwrap();
4285 for i in 1..=10u32 {
4287 db.upsert_document(
4288 "m",
4289 &format!("d{i}"),
4290 &doc(0.1 * i as f32),
4291 &json!({ "i": i }),
4292 )
4293 .unwrap();
4294 }
4295 let q = vec![dir(0.0)];
4296 let top = |db: &mut Database| {
4297 db.search_multi_vector(
4298 "m",
4299 &q,
4300 &SearchParams {
4301 k: 3,
4302 ..Default::default()
4303 },
4304 )
4305 .unwrap()
4306 .into_iter()
4307 .map(|m| m.id)
4308 .collect::<Vec<_>>()
4309 };
4310 assert_eq!(top(&mut db), vec!["d1", "d2", "d3"], "{kind:?} initial");
4311
4312 assert!(db.delete_document("m", "d1").unwrap());
4314 assert_eq!(
4315 top(&mut db),
4316 vec!["d2", "d3", "d4"],
4317 "{kind:?} after delete"
4318 );
4319
4320 db.upsert_document("m", "d10", &doc(0.0), &json!({ "i": 10 }))
4322 .unwrap();
4323 assert_eq!(top(&mut db)[0], "d10", "{kind:?} after update");
4324
4325 db.upsert_document("m", "d11", &doc(0.05), &json!({ "i": 11 }))
4327 .unwrap();
4328 let r = top(&mut db);
4329 assert_eq!(r[0], "d10", "{kind:?}");
4330 assert_eq!(r[1], "d11", "{kind:?} new doc not ranked");
4331
4332 db.upsert_document("m", "d8", &[dir(0.8)], &json!({ "i": 8 }))
4334 .unwrap();
4335 let d8 = db.get_document("m", "d8", true).unwrap().unwrap();
4336 assert_eq!(d8.vectors.unwrap().len(), 1, "{kind:?} trailing token kept");
4337
4338 let before = top(&mut db);
4340 drop(db);
4341 let mut db = open(tmp.path());
4342 assert_eq!(top(&mut db), before, "{kind:?} incremental != rebuild");
4343 assert!(
4344 db.get_document("m", "d1", false).unwrap().is_none(),
4345 "{kind:?} deleted doc resurfaced"
4346 );
4347 }
4348 }
4349
4350 #[test]
4351 fn colbert_index_requires_multivector() {
4352 let tmp = tempfile::tempdir().unwrap();
4353 let mut db = open(tmp.path());
4354 let single = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine).with_index(IndexSpec {
4357 kind: IndexKind::Colbert,
4358 pq_subspaces: None,
4359 });
4360 assert!(matches!(
4361 db.create_collection("c", single),
4362 Err(Error::Unsupported(_))
4363 ));
4364 let multi = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
4366 .with_multivector(true)
4367 .with_index(IndexSpec {
4368 kind: IndexKind::Colbert,
4369 pq_subspaces: None,
4370 });
4371 assert!(db.create_collection("m", multi).is_ok());
4372 }
4373
4374 fn desc_filterable() -> Descriptor {
4379 Descriptor::new(4, Dtype::F32, DistanceMetric::L2).with_filterable(vec![
4380 FilterableField::keyword("city"),
4381 FilterableField::numeric("n"),
4382 ])
4383 }
4384
4385 fn seed_cities(db: &mut Database) {
4390 const CITIES: [&str; 3] = ["paris", "lyon", "rome"];
4391 db.create_collection("c", desc_filterable()).unwrap();
4392 for i in 0..30u32 {
4393 db.upsert(
4394 "c",
4395 &format!("p{i}"),
4396 &[i as f32, 0.0, 0.0, 0.0],
4397 &json!({"city": CITIES[i as usize % 3], "n": i}),
4398 )
4399 .unwrap();
4400 }
4401 db.checkpoint().unwrap();
4402 }
4403
4404 #[test]
4405 fn hybrid_equality_prefilter_is_exact() {
4406 let tmp = tempfile::tempdir().unwrap();
4407 let mut db = open(tmp.path());
4408 seed_cities(&mut db);
4409 let params = SearchParams {
4410 k: 5,
4411 filter: Some(Filter::Eq {
4412 field: "city".into(),
4413 value: json!("lyon"),
4414 }),
4415 ..SearchParams::default()
4416 };
4417 let res = db.search("c", &[0.0; 4], ¶ms).unwrap();
4418 assert!(!res.is_empty());
4419 assert_eq!(res[0].id, "p1");
4421 for m in &res {
4422 assert_eq!(m.payload.as_ref().unwrap()["city"], json!("lyon"));
4423 }
4424 }
4425
4426 #[test]
4427 fn hybrid_numeric_range_prefilter_is_exact() {
4428 let tmp = tempfile::tempdir().unwrap();
4429 let mut db = open(tmp.path());
4430 seed_cities(&mut db);
4431 let params = SearchParams {
4432 k: 4,
4433 filter: Some(Filter::Gte {
4434 field: "n".into(),
4435 value: json!(10),
4436 }),
4437 ..SearchParams::default()
4438 };
4439 let res = db.search("c", &[0.0; 4], ¶ms).unwrap();
4440 assert_eq!(res[0].id, "p10");
4442 for m in &res {
4443 assert!(m.payload.as_ref().unwrap()["n"].as_u64().unwrap() >= 10);
4444 }
4445 }
4446
4447 #[test]
4448 fn hybrid_unsatisfiable_filter_returns_empty() {
4449 let tmp = tempfile::tempdir().unwrap();
4450 let mut db = open(tmp.path());
4451 seed_cities(&mut db);
4452 let params = SearchParams {
4455 filter: Some(Filter::Eq {
4456 field: "city".into(),
4457 value: json!("atlantis"),
4458 }),
4459 ..SearchParams::default()
4460 };
4461 assert!(db.search("c", &[0.0; 4], ¶ms).unwrap().is_empty());
4462 }
4463
4464 #[test]
4465 fn hybrid_and_or_composition_is_exact() {
4466 let tmp = tempfile::tempdir().unwrap();
4467 let mut db = open(tmp.path());
4468 seed_cities(&mut db);
4469 let params = SearchParams {
4472 k: 10,
4473 filter: Some(Filter::And(vec![
4474 Filter::In {
4475 field: "city".into(),
4476 values: vec![json!("paris"), json!("rome")],
4477 },
4478 Filter::Lt {
4479 field: "n".into(),
4480 value: json!(12),
4481 },
4482 ])),
4483 ..SearchParams::default()
4484 };
4485 let res = db.search("c", &[0.0; 4], ¶ms).unwrap();
4486 assert_eq!(res[0].id, "p0");
4488 for m in &res {
4489 let payload = m.payload.as_ref().unwrap();
4490 let city = payload["city"].as_str().unwrap();
4491 assert!(city == "paris" || city == "rome");
4492 assert!(payload["n"].as_u64().unwrap() < 12);
4493 }
4494 }
4495
4496 #[test]
4497 fn hybrid_rechecks_non_indexable_clause() {
4498 let tmp = tempfile::tempdir().unwrap();
4499 let mut db = open(tmp.path());
4500 seed_cities(&mut db);
4501 let params = SearchParams {
4504 k: 10,
4505 filter: Some(Filter::And(vec![
4506 Filter::Eq {
4507 field: "city".into(),
4508 value: json!("paris"),
4509 },
4510 Filter::Not(Box::new(Filter::Eq {
4511 field: "n".into(),
4512 value: json!(0),
4513 })),
4514 ])),
4515 ..SearchParams::default()
4516 };
4517 let res = db.search("c", &[0.0; 4], ¶ms).unwrap();
4518 assert!(res.iter().all(|m| m.id != "p0"));
4519 assert_eq!(res[0].id, "p3");
4521 for m in &res {
4522 assert_eq!(m.payload.as_ref().unwrap()["city"], json!("paris"));
4523 }
4524 }
4525
4526 #[test]
4527 fn post_filter_fallback_on_undeclared_field_is_correct() {
4528 let tmp = tempfile::tempdir().unwrap();
4529 let mut db = open(tmp.path());
4530 db.create_collection(
4533 "c",
4534 Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
4535 .with_filterable(vec![FilterableField::keyword("city")]),
4536 )
4537 .unwrap();
4538 for i in 0..20u32 {
4539 let tier = if i % 2 == 0 { "gold" } else { "silver" };
4540 db.upsert(
4541 "c",
4542 &format!("p{i}"),
4543 &[i as f32, 0.0, 0.0, 0.0],
4544 &json!({"city": "paris", "tier": tier}),
4545 )
4546 .unwrap();
4547 }
4548 let params = SearchParams {
4549 k: 5,
4550 filter: Some(Filter::Eq {
4551 field: "tier".into(),
4552 value: json!("gold"),
4553 }),
4554 ..SearchParams::default()
4555 };
4556 let res = db.search("c", &[0.0; 4], ¶ms).unwrap();
4557 assert!(!res.is_empty());
4558 for m in &res {
4559 assert_eq!(m.payload.as_ref().unwrap()["tier"], json!("gold"));
4560 }
4561 }
4562
4563 #[test]
4564 fn leaf_predicate_maps_only_indexable_filterable_leaves() {
4565 let fields = vec![
4566 FilterableField::keyword("city"),
4567 FilterableField::numeric("n"),
4568 ];
4569 assert_eq!(
4571 leaf_predicate(
4572 &Filter::Eq {
4573 field: "city".into(),
4574 value: json!("paris")
4575 },
4576 &fields
4577 ),
4578 Some(SecPredicate::Eq {
4579 field: "city".into(),
4580 value: SecValue::Keyword("paris".into())
4581 })
4582 );
4583 assert_eq!(
4585 leaf_predicate(
4586 &Filter::Gte {
4587 field: "n".into(),
4588 value: json!(3)
4589 },
4590 &fields
4591 ),
4592 Some(SecPredicate::Range {
4593 field: "n".into(),
4594 lo: Some(SecValue::Numeric(3.0)),
4595 hi: None,
4596 lo_inclusive: true,
4597 hi_inclusive: false,
4598 })
4599 );
4600 let undeclared = Filter::Eq {
4602 field: "tier".into(),
4603 value: json!("gold"),
4604 };
4605 let mismatch = Filter::Eq {
4606 field: "city".into(),
4607 value: json!(5),
4608 };
4609 let ne = Filter::Ne {
4610 field: "city".into(),
4611 value: json!("x"),
4612 };
4613 let exists = Filter::Exists {
4614 field: "city".into(),
4615 };
4616 assert!(leaf_predicate(&undeclared, &fields).is_none());
4617 assert!(leaf_predicate(&mismatch, &fields).is_none());
4618 assert!(leaf_predicate(&ne, &fields).is_none());
4619 assert!(leaf_predicate(&exists, &fields).is_none());
4620 }
4621
4622 fn ivf_index_dir(root: &Path) -> std::path::PathBuf {
4626 root.join("collections").join("0000000000").join("index")
4627 }
4628
4629 fn idx_snapshot_files(root: &Path) -> Vec<String> {
4630 let mut v: Vec<String> = std::fs::read_dir(ivf_index_dir(root))
4631 .map(|rd| {
4632 rd.filter_map(std::result::Result::ok)
4633 .filter_map(|e| e.file_name().to_str().map(str::to_owned))
4634 .filter(|n| n.starts_with("idx-"))
4635 .collect()
4636 })
4637 .unwrap_or_default();
4638 v.sort();
4639 v
4640 }
4641
4642 fn nearest(db: &mut Database, q: &[f32]) -> Vec<String> {
4643 db.search("c", q, &SearchParams::default())
4644 .unwrap()
4645 .into_iter()
4646 .map(|m| m.id)
4647 .collect()
4648 }
4649
4650 fn seed_ivf(db: &mut Database, n: u32) {
4651 db.create_collection("c", desc_with(IndexKind::Ivf))
4652 .unwrap();
4653 for i in 0..n {
4654 db.upsert(
4655 "c",
4656 &format!("p{i}"),
4657 &[i as f32, 0.0, 0.0, 0.0],
4658 &json!({}),
4659 )
4660 .unwrap();
4661 }
4662 let _ = nearest(db, &[1.0, 0.0, 0.0, 0.0]);
4664 }
4665
4666 #[test]
4667 fn ivf_snapshot_is_written_at_checkpoint() {
4668 let tmp = tempfile::tempdir().unwrap();
4669 let mut db = open(tmp.path());
4670 seed_ivf(&mut db, 40);
4671 db.checkpoint().unwrap();
4672 assert_eq!(idx_snapshot_files(tmp.path()).len(), 1);
4673 }
4674
4675 #[test]
4676 fn ivf_loads_from_snapshot_rather_than_rebuilding() {
4677 let tmp = tempfile::tempdir().unwrap();
4678 {
4679 let mut db = open(tmp.path());
4680 db.create_collection("c", desc_with(IndexKind::Ivf))
4681 .unwrap();
4682 db.upsert("c", "a", &[0.0, 0.0, 0.0, 0.0], &json!({}))
4683 .unwrap();
4684 db.upsert("c", "m", &[1.0, 0.0, 0.0, 0.0], &json!({}))
4685 .unwrap();
4686 let _ = nearest(&mut db, &[0.0, 0.0, 0.0, 0.0]);
4688 db.upsert("c", "z", &[2.0, 0.0, 0.0, 0.0], &json!({}))
4690 .unwrap();
4691 db.upsert("c", "b", &[3.0, 0.0, 0.0, 0.0], &json!({}))
4692 .unwrap();
4693 db.checkpoint().unwrap();
4694 assert_eq!(db.collections["c"].int_to_ext, ["a", "m", "z", "b"]);
4695 }
4696 let db = open(tmp.path());
4697 assert_eq!(
4700 db.collections["c"].int_to_ext,
4701 ["a", "m", "z", "b"],
4702 "index was rebuilt, not loaded from the snapshot"
4703 );
4704 }
4705
4706 #[test]
4707 fn ivf_recovery_replays_post_checkpoint_upserts() {
4708 let tmp = tempfile::tempdir().unwrap();
4709 {
4710 let mut db = open(tmp.path());
4711 seed_ivf(&mut db, 30);
4712 db.checkpoint().unwrap();
4713 db.upsert("c", "far", &[500.0, 0.0, 0.0, 0.0], &json!({}))
4715 .unwrap();
4716 }
4717 let mut db = open(tmp.path());
4718 assert_eq!(nearest(&mut db, &[500.0, 0.0, 0.0, 0.0])[0], "far");
4719 assert_eq!(nearest(&mut db, &[1.0, 0.0, 0.0, 0.0])[0], "p1");
4720 }
4721
4722 #[test]
4723 fn ivf_recovery_replays_post_checkpoint_deletes() {
4724 let tmp = tempfile::tempdir().unwrap();
4725 {
4726 let mut db = open(tmp.path());
4727 seed_ivf(&mut db, 30);
4728 db.checkpoint().unwrap();
4729 assert!(db.delete("c", "p7").unwrap());
4730 }
4731 let mut db = open(tmp.path());
4732 assert!(
4733 nearest(&mut db, &[7.0, 0.0, 0.0, 0.0])
4734 .iter()
4735 .all(|id| id != "p7")
4736 );
4737 assert!(db.get("c", "p7").unwrap().is_none());
4738 assert!(db.get("c", "p6").unwrap().is_some());
4739 }
4740
4741 #[test]
4742 fn ivf_recovery_replays_post_checkpoint_updates() {
4743 let tmp = tempfile::tempdir().unwrap();
4744 {
4745 let mut db = open(tmp.path());
4746 seed_ivf(&mut db, 30);
4747 db.checkpoint().unwrap();
4748 db.upsert("c", "p0", &[999.0, 0.0, 0.0, 0.0], &json!({}))
4750 .unwrap();
4751 }
4752 let mut db = open(tmp.path());
4753 assert_eq!(nearest(&mut db, &[999.0, 0.0, 0.0, 0.0])[0], "p0");
4754 assert_ne!(
4755 nearest(&mut db, &[0.0, 0.0, 0.0, 0.0])[0],
4756 "p0",
4757 "the stale p0 vector survived the update"
4758 );
4759 }
4760
4761 #[test]
4762 fn corrupt_ivf_snapshot_falls_back_to_rebuild() {
4763 let tmp = tempfile::tempdir().unwrap();
4764 {
4765 let mut db = open(tmp.path());
4766 seed_ivf(&mut db, 30);
4767 db.checkpoint().unwrap();
4768 }
4769 let files = idx_snapshot_files(tmp.path());
4771 assert_eq!(files.len(), 1);
4772 std::fs::write(ivf_index_dir(tmp.path()).join(&files[0]), b"corrupt").unwrap();
4773
4774 let mut db = open(tmp.path());
4775 assert_eq!(nearest(&mut db, &[7.0, 0.0, 0.0, 0.0])[0], "p7");
4776 }
4777
4778 fn mv_desc() -> Descriptor {
4781 Descriptor::new(3, Dtype::F32, DistanceMetric::Cosine).with_multivector(true)
4782 }
4783
4784 fn bf_rank(query: &[Vec<f32>], corpus: &[(&str, Vec<Vec<f32>>)]) -> Vec<(String, f32)> {
4787 let mut v: Vec<(String, f32)> = corpus
4788 .iter()
4789 .map(|(id, toks)| ((*id).to_owned(), max_sim(Metric::Cosine, query, toks)))
4790 .collect();
4791 v.sort_by(|a, b| b.1.total_cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
4792 v
4793 }
4794
4795 #[test]
4796 fn multivector_search_ranks_documents_by_maxsim() {
4797 let tmp = tempfile::tempdir().unwrap();
4798 let mut db = open(tmp.path());
4799 db.create_collection("docs", mv_desc()).unwrap();
4800 let corpus: Vec<(&str, Vec<Vec<f32>>)> = vec![
4801 ("d_cat", vec![vec![1.0, 0.0, 0.0], vec![0.0, 1.0, 0.0]]),
4802 ("d_dog", vec![vec![0.0, 1.0, 0.0], vec![0.0, 0.0, 1.0]]),
4803 (
4804 "d_mix",
4805 vec![
4806 vec![1.0, 1.0, 0.0],
4807 vec![0.0, 0.0, 1.0],
4808 vec![1.0, 0.0, 1.0],
4809 ],
4810 ),
4811 ];
4812 for (id, toks) in &corpus {
4813 db.upsert_document("docs", id, toks, &json!({ "id": id }))
4814 .unwrap();
4815 }
4816 assert_eq!(db.document_count("docs").unwrap(), 3);
4817
4818 let query = vec![vec![1.0, 0.0, 0.0], vec![0.0, 0.0, 1.0]];
4819 let params = SearchParams {
4820 k: 3,
4821 with_payload: false,
4822 ..SearchParams::default()
4823 };
4824 let got = db.search_multi_vector("docs", &query, ¶ms).unwrap();
4825 let expected = bf_rank(&query, &corpus);
4826
4827 assert_eq!(got.len(), 3);
4828 for (g, (eid, escore)) in got.iter().zip(expected.iter()) {
4829 assert_eq!(&g.id, eid, "ranking matches brute force");
4830 assert!(
4831 (g.score - escore).abs() < 1e-5,
4832 "{} score {} vs {escore}",
4833 g.id,
4834 g.score
4835 );
4836 }
4837 }
4838
4839 #[test]
4840 fn multivector_search_truncates_to_k() {
4841 let tmp = tempfile::tempdir().unwrap();
4842 let mut db = open(tmp.path());
4843 db.create_collection("docs", mv_desc()).unwrap();
4844 for i in 0..5 {
4845 let v = vec![vec![1.0, i as f32, 0.0]];
4846 db.upsert_document("docs", &format!("d{i}"), &v, &json!({}))
4847 .unwrap();
4848 }
4849 let params = SearchParams {
4850 k: 2,
4851 ..SearchParams::default()
4852 };
4853 let got = db
4854 .search_multi_vector("docs", &[vec![1.0, 0.0, 0.0]], ¶ms)
4855 .unwrap();
4856 assert_eq!(got.len(), 2);
4857 }
4858
4859 #[test]
4860 fn multivector_filter_selects_documents_exactly() {
4861 let tmp = tempfile::tempdir().unwrap();
4862 let mut db = open(tmp.path());
4863 db.create_collection("docs", mv_desc()).unwrap();
4864 db.upsert_document("docs", "a", &[vec![1.0, 0.0, 0.0]], &json!({"lang":"en"}))
4866 .unwrap();
4867 db.upsert_document("docs", "b", &[vec![1.0, 0.0, 0.0]], &json!({"lang":"fr"}))
4868 .unwrap();
4869 let params = SearchParams {
4870 k: 10,
4871 filter: Some(Filter::Eq {
4872 field: "lang".into(),
4873 value: json!("fr"),
4874 }),
4875 ..SearchParams::default()
4876 };
4877 let got = db
4878 .search_multi_vector("docs", &[vec![1.0, 0.0, 0.0]], ¶ms)
4879 .unwrap();
4880 assert_eq!(got.len(), 1);
4881 assert_eq!(got[0].id, "b");
4882 assert_eq!(got[0].payload, Some(json!({"lang":"fr"})));
4883 }
4884
4885 #[test]
4886 fn multivector_reopen_rebuilds_grouping_and_ranking() {
4887 let tmp = tempfile::tempdir().unwrap();
4888 let query = vec![vec![1.0, 0.0, 0.0], vec![0.0, 0.0, 1.0]];
4889 let corpus: Vec<(&str, Vec<Vec<f32>>)> = vec![
4890 ("x", vec![vec![1.0, 0.0, 0.0], vec![0.0, 1.0, 0.0]]),
4891 ("y", vec![vec![0.0, 0.0, 1.0], vec![1.0, 0.0, 1.0]]),
4892 ];
4893 {
4894 let mut db = open(tmp.path());
4895 db.create_collection("docs", mv_desc()).unwrap();
4896 for (id, toks) in &corpus {
4897 db.upsert_document("docs", id, toks, &json!({})).unwrap();
4898 }
4899 db.checkpoint().unwrap();
4900 }
4901 let mut db = open(tmp.path());
4903 assert_eq!(db.document_count("docs").unwrap(), 2);
4904 let params = SearchParams {
4905 k: 2,
4906 ..SearchParams::default()
4907 };
4908 let got = db.search_multi_vector("docs", &query, ¶ms).unwrap();
4909 let expected = bf_rank(&query, &corpus);
4910 assert_eq!(
4911 got.iter().map(|m| m.id.clone()).collect::<Vec<_>>(),
4912 expected
4913 .iter()
4914 .map(|(id, _)| id.clone())
4915 .collect::<Vec<_>>()
4916 );
4917 }
4918
4919 #[test]
4920 fn multivector_delete_document_removes_all_tokens() {
4921 let tmp = tempfile::tempdir().unwrap();
4922 let mut db = open(tmp.path());
4923 db.create_collection("docs", mv_desc()).unwrap();
4924 db.upsert_document(
4925 "docs",
4926 "a",
4927 &[vec![1.0, 0.0, 0.0], vec![0.0, 1.0, 0.0]],
4928 &json!({}),
4929 )
4930 .unwrap();
4931 db.upsert_document("docs", "b", &[vec![0.0, 0.0, 1.0]], &json!({}))
4932 .unwrap();
4933 assert_eq!(db.document_count("docs").unwrap(), 2);
4934 assert_eq!(db.len("docs").unwrap(), 3);
4935
4936 assert!(db.delete_document("docs", "a").unwrap());
4937 assert_eq!(db.document_count("docs").unwrap(), 1);
4938 assert_eq!(db.len("docs").unwrap(), 1);
4939 assert!(db.get_document("docs", "a", false).unwrap().is_none());
4940 let params = SearchParams {
4941 k: 10,
4942 ..SearchParams::default()
4943 };
4944 let got = db
4945 .search_multi_vector("docs", &[vec![1.0, 0.0, 0.0]], ¶ms)
4946 .unwrap();
4947 assert!(got.iter().all(|m| m.id != "a"));
4948 assert!(!db.delete_document("docs", "a").unwrap());
4949 }
4950
4951 #[test]
4952 fn multivector_reupsert_replaces_tokens() {
4953 let tmp = tempfile::tempdir().unwrap();
4954 let mut db = open(tmp.path());
4955 db.create_collection("docs", mv_desc()).unwrap();
4956 db.upsert_document(
4957 "docs",
4958 "a",
4959 &[
4960 vec![1.0, 0.0, 0.0],
4961 vec![0.0, 1.0, 0.0],
4962 vec![0.0, 0.0, 1.0],
4963 ],
4964 &json!({"v":1}),
4965 )
4966 .unwrap();
4967 assert_eq!(db.len("docs").unwrap(), 3);
4968 db.upsert_document("docs", "a", &[vec![0.0, 0.0, 1.0]], &json!({"v":2}))
4970 .unwrap();
4971 assert_eq!(db.document_count("docs").unwrap(), 1);
4972 assert_eq!(db.len("docs").unwrap(), 1);
4973 let doc = db.get_document("docs", "a", true).unwrap().unwrap();
4974 assert_eq!(doc.payload, Some(json!({"v":2})));
4975 assert_eq!(doc.vectors, Some(vec![vec![0.0, 0.0, 1.0]]));
4976 }
4977
4978 #[test]
4979 fn single_and_multi_vector_apis_are_mutually_exclusive() {
4980 let tmp = tempfile::tempdir().unwrap();
4981 let mut db = open(tmp.path());
4982 db.create_collection("mv", mv_desc()).unwrap();
4983 db.create_collection("sv", Descriptor::new(3, Dtype::F32, DistanceMetric::Cosine))
4984 .unwrap();
4985 assert!(matches!(
4987 db.upsert("mv", "a", &[1.0, 0.0, 0.0], &json!({})),
4988 Err(Error::Unsupported(_))
4989 ));
4990 assert!(matches!(
4991 db.search("mv", &[1.0, 0.0, 0.0], &SearchParams::default()),
4992 Err(Error::Unsupported(_))
4993 ));
4994 assert!(matches!(
4996 db.upsert_document("sv", "a", &[vec![1.0, 0.0, 0.0]], &json!({})),
4997 Err(Error::Unsupported(_))
4998 ));
4999 assert!(matches!(
5000 db.search_multi_vector("sv", &[vec![1.0, 0.0, 0.0]], &SearchParams::default()),
5001 Err(Error::Unsupported(_))
5002 ));
5003 assert!(matches!(
5004 db.document_count("sv"),
5005 Err(Error::Unsupported(_))
5006 ));
5007 }
5008
5009 #[test]
5010 fn multivector_rejects_l2_metric_and_bad_documents() {
5011 let tmp = tempfile::tempdir().unwrap();
5012 let mut db = open(tmp.path());
5013 let l2 = Descriptor::new(3, Dtype::F32, DistanceMetric::L2).with_multivector(true);
5014 assert!(matches!(
5015 db.create_collection("bad", l2),
5016 Err(Error::Unsupported(_))
5017 ));
5018
5019 db.create_collection("docs", mv_desc()).unwrap();
5020 assert!(matches!(
5022 db.upsert_document("docs", "a\u{1f}b", &[vec![1.0, 0.0, 0.0]], &json!({})),
5023 Err(Error::Unsupported(_))
5024 ));
5025 assert!(matches!(
5027 db.upsert_document("docs", "a", &[], &json!({})),
5028 Err(Error::Unsupported(_))
5029 ));
5030 assert!(matches!(
5031 db.upsert_document("docs", "a", &[vec![1.0, 0.0]], &json!({})),
5032 Err(Error::Unsupported(_))
5033 ));
5034 }
5035
5036 #[test]
5037 fn snapshot_then_open_reproduces_the_database() {
5038 let src = tempfile::tempdir().unwrap();
5039 let mut db = open(src.path());
5040 db.create_collection("kb", desc()).unwrap();
5041 db.create_collection("kb2", desc()).unwrap();
5042 db.upsert("kb", "a", &[1.0, 0.0, 0.0, 0.0], &json!({ "n": 1 }))
5043 .unwrap();
5044 db.upsert("kb", "b", &[0.0, 1.0, 0.0, 0.0], &json!({ "n": 2 }))
5045 .unwrap();
5046 db.upsert("kb2", "z", &[0.0, 0.0, 1.0, 0.0], &json!({ "n": 3 }))
5047 .unwrap();
5048
5049 let dest = tempfile::tempdir().unwrap();
5050 let snap_dir = dest.path().join("snap");
5051 let info = db.snapshot(&snap_dir).unwrap();
5052 assert!(info.files > 0 && info.bytes > 0);
5053 assert_eq!(info.manifest_version, db.manifest_version());
5054
5055 db.upsert("kb", "late", &[1.0, 1.0, 0.0, 0.0], &json!({ "n": 9 }))
5057 .unwrap();
5058
5059 let restored = open(&snap_dir);
5060 let mut names = restored.collection_names();
5061 names.sort();
5062 assert_eq!(names, vec!["kb".to_owned(), "kb2".to_owned()]);
5063 assert_eq!(restored.len("kb").unwrap(), 2, "no post-snapshot write");
5064 assert_eq!(
5065 restored.get("kb", "a").unwrap().unwrap().payload,
5066 Some(json!({ "n": 1 }))
5067 );
5068 assert_eq!(restored.len("kb2").unwrap(), 1);
5069 assert!(restored.get("kb", "late").unwrap().is_none());
5070 }
5071
5072 #[test]
5073 fn snapshot_refuses_an_existing_destination() {
5074 let src = tempfile::tempdir().unwrap();
5075 let mut db = open(src.path());
5076 let dest = tempfile::tempdir().unwrap(); assert!(matches!(
5078 db.snapshot(dest.path()),
5079 Err(Error::Core(quiver_core::CoreError::AlreadyExists(_)))
5080 ));
5081 }
5082
5083 #[test]
5084 fn restore_snapshot_roundtrips_and_guards() {
5085 let src = tempfile::tempdir().unwrap();
5086 let mut db = open(src.path());
5087 db.create_collection("kb", desc()).unwrap();
5088 db.upsert("kb", "a", &[1.0, 0.0, 0.0, 0.0], &json!({ "n": 1 }))
5089 .unwrap();
5090 let work = tempfile::tempdir().unwrap();
5091 let snap_dir = work.path().join("snap");
5092 db.snapshot(&snap_dir).unwrap();
5093
5094 let restored_dir = work.path().join("restored");
5096 let info = restore_snapshot(&snap_dir, &restored_dir).unwrap();
5097 assert!(info.files > 0);
5098 let restored = open(&restored_dir);
5099 assert_eq!(restored.len("kb").unwrap(), 1);
5100
5101 assert!(matches!(
5103 restore_snapshot(&snap_dir, &restored_dir),
5104 Err(Error::Core(quiver_core::CoreError::AlreadyExists(_)))
5105 ));
5106 let not_snap = work.path().join("not-a-snapshot");
5108 std::fs::create_dir_all(¬_snap).unwrap();
5109 assert!(matches!(
5110 restore_snapshot(¬_snap, &work.path().join("out")),
5111 Err(Error::Core(quiver_core::CoreError::InvalidArgument(_)))
5112 ));
5113 }
5114}