1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
46use std::path::Path;
47use std::sync::Arc;
48
49use arc_swap::ArcSwap;
50use quiver_core::{SecPredicate, SecValue, Store};
51use quiver_index::{
52 ColbertConfig, ColbertIndex, DiskVamana, FreshDiskVamana, FreshVamana, Hnsw, HnswConfig, Index,
53 Ivf, IvfConfig, Metric, Neighbor, ProductQuantizer, Vamana, VamanaConfig, max_sim,
54 ordering_distance, report_metric,
55};
56use serde::{Deserialize, Serialize};
57use serde_json::Value;
58use thiserror::Error;
59
60pub use quiver_core::keyring::{KeyRing, SingleCodecKeyRing};
61pub use quiver_core::page::PageCodec;
62pub use quiver_core::{CollectionId, CommitObserver, WalEntry, WalOp};
63pub use quiver_core::{
64 Descriptor, DistanceMetric, Dtype, FieldType, FilterableField, IndexKind, IndexSpec,
65 VectorEncryption,
66};
67pub use quiver_query::Filter;
68pub use quiver_query::{
69 BM25_B, BM25_K1, DEFAULT_RRF_K0, SPARSE_KEY, SparseInvertedIndex, SparseVector, TEXT_KEY,
70 query_term_ids, rrf_fuse, text_to_sparse,
71};
72
73#[derive(Debug, Error)]
75#[non_exhaustive]
76pub enum Error {
77 #[error(transparent)]
80 Core(#[from] quiver_core::CoreError),
81 #[error(transparent)]
83 Index(#[from] quiver_index::IndexError),
84 #[error(transparent)]
86 Disk(#[from] quiver_index::DiskError),
87 #[error("payload json error: {0}")]
89 Json(#[from] serde_json::Error),
90 #[error("collection not found: {0}")]
92 CollectionNotFound(String),
93 #[error("unsupported configuration: {0}")]
95 Unsupported(&'static str),
96 #[error(transparent)]
99 IndexSnapshot(#[from] quiver_index::SnapshotError),
100 #[error("index snapshot envelope: {0}")]
102 Envelope(#[from] postcard::Error),
103}
104
105pub type Result<T> = std::result::Result<T, Error>;
107
108#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
111pub struct SnapshotInfo {
112 pub manifest_version: u64,
114 pub files: u64,
116 pub bytes: u64,
118}
119
120#[derive(Debug, Clone, PartialEq)]
122pub struct Match {
123 pub id: String,
125 pub score: f32,
127 pub payload: Option<Value>,
129 pub vector: Option<Vec<f32>>,
131}
132
133#[derive(Debug, Clone, PartialEq)]
137pub struct DocumentMatch {
138 pub id: String,
140 pub score: f32,
142 pub payload: Option<Value>,
144 pub vectors: Option<Vec<Vec<f32>>>,
146}
147
148type ScoredDocument = (f32, String, Option<Value>, Option<Vec<Vec<f32>>>);
153
154#[derive(Debug, Clone)]
156pub struct SearchParams {
157 pub k: usize,
159 pub filter: Option<Filter>,
164 pub ef_search: usize,
166 pub with_payload: bool,
168 pub with_vector: bool,
170}
171
172impl Default for SearchParams {
173 fn default() -> Self {
174 Self {
175 k: 10,
176 filter: None,
177 ef_search: 64,
178 with_payload: true,
179 with_vector: false,
180 }
181 }
182}
183
184const FILTER_OVERFETCH: usize = 8;
187
188const RRF_CANDIDATE_FACTOR: usize = 10;
193const MIN_RRF_CANDIDATES: usize = 100;
194
195const FULL_SCAN_THRESHOLD: usize = 10_000;
203
204const HNSW_REBUILD_DELETED_FRACTION: f64 = 0.2;
208
209const GRAPH_REBUILD_PENDING_FRACTION: f64 = 0.2;
215
216enum CollectionIndex {
223 None,
227 Hnsw(Hnsw),
228 Vamana(Option<FreshVamana>),
229 Ivf(Option<Ivf>),
230 Disk(Option<FreshDiskVamana>),
234 Colbert(Option<ColbertIndex>),
238}
239
240impl CollectionIndex {
241 fn search(&self, query: &[f32], k: usize, ef: usize) -> Result<Vec<Neighbor>> {
243 Ok(match self {
244 CollectionIndex::Hnsw(h) => h.search(query, k, ef)?,
245 CollectionIndex::Vamana(Some(g)) => g.search(query, k, ef)?,
246 CollectionIndex::Ivf(Some(i)) => i.search(query, k, ef)?,
247 CollectionIndex::Disk(Some(d)) => d.search(query, k, ef)?,
248 CollectionIndex::Colbert(Some(c)) => c.search(query, k, ef)?,
249 CollectionIndex::None
250 | CollectionIndex::Vamana(None)
251 | CollectionIndex::Ivf(None)
252 | CollectionIndex::Disk(None)
253 | CollectionIndex::Colbert(None) => Vec::new(),
254 })
255 }
256}
257
258pub type SnapshotCell = Arc<ArcSwap<CollectionSnapshot>>;
275
276#[derive(Default, Clone)]
281struct Overlay {
282 upserts: Vec<(Arc<[f32]>, String)>,
285 tombstones: HashSet<u64>,
288}
289
290pub struct CollectionSnapshot {
297 base: Arc<CollectionIndex>,
298 base_int_to_ext: Arc<Vec<String>>,
299 base_len: u64,
300 overlay: Arc<Overlay>,
301 metric: Metric,
302}
303
304impl CollectionSnapshot {
305 fn empty(metric: Metric) -> Self {
308 Self {
309 base: Arc::new(CollectionIndex::None),
310 base_int_to_ext: Arc::new(Vec::new()),
311 base_len: 0,
312 overlay: Arc::new(Overlay::default()),
313 metric,
314 }
315 }
316
317 fn ext_id(&self, internal: u64) -> Option<&str> {
320 if internal < self.base_len {
321 self.base_int_to_ext
322 .get(internal as usize)
323 .map(String::as_str)
324 } else {
325 self.overlay
326 .upserts
327 .get((internal - self.base_len) as usize)
328 .map(|(_, e)| e.as_str())
329 }
330 }
331
332 pub fn search(&self, query: &[f32], k: usize, ef_search: usize) -> Result<Vec<Match>> {
342 if k == 0 {
343 return Ok(Vec::new());
344 }
345 let mut cands: Vec<(f32, u64)> = Vec::new();
348 for n in self.base.search(query, k, ef_search)? {
349 if !self.overlay.tombstones.contains(&n.id) {
350 cands.push((report_metric(self.metric, n.distance), n.id));
354 }
355 }
356 for (j, (vector, _)) in self.overlay.upserts.iter().enumerate() {
357 let internal = self.base_len + j as u64;
358 if !self.overlay.tombstones.contains(&internal) {
359 cands.push((ordering_distance(self.metric, query, vector), internal));
360 }
361 }
362 cands.sort_by(|a, b| a.0.total_cmp(&b.0));
363 cands.truncate(k);
364 let mut out = Vec::with_capacity(cands.len());
365 for (ordering, internal) in cands {
366 if let Some(ext) = self.ext_id(internal) {
367 out.push(Match {
368 id: ext.to_owned(),
369 score: report_metric(self.metric, ordering),
370 payload: None,
371 vector: None,
372 });
373 }
374 }
375 Ok(out)
376 }
377}
378
379fn empty_snapshot(descriptor: &Descriptor) -> SnapshotCell {
383 let metric = to_index_metric(descriptor.metric);
384 Arc::new(ArcSwap::from_pointee(CollectionSnapshot::empty(metric)))
385}
386
387fn mvcc_eligible(descriptor: &Descriptor) -> bool {
394 !descriptor.multivector
395 && descriptor.vector_encryption != VectorEncryption::ClientSide
396 && descriptor.index.kind != IndexKind::DiskVamana
397}
398
399fn mvcc_served(handle: &CollectionHandle) -> bool {
402 handle.mvcc && mvcc_eligible(&handle.descriptor)
403}
404
405fn publish_overlay(handle: &CollectionHandle, prior: &CollectionSnapshot, overlay: Arc<Overlay>) {
408 handle.snapshot.store(Arc::new(CollectionSnapshot {
409 base: prior.base.clone(),
410 base_int_to_ext: prior.base_int_to_ext.clone(),
411 base_len: prior.base_len,
412 overlay,
413 metric: prior.metric,
414 }));
415}
416
417fn publish_base(handle: &mut CollectionHandle) {
421 let base = std::mem::replace(&mut handle.index, empty_index(&handle.descriptor));
422 let metric = to_index_metric(handle.descriptor.metric);
423 handle.snapshot.store(Arc::new(CollectionSnapshot {
424 base: Arc::new(base),
425 base_int_to_ext: Arc::new(handle.int_to_ext.clone()),
426 base_len: handle.int_to_ext.len() as u64,
427 overlay: Arc::new(Overlay::default()),
428 metric,
429 }));
430}
431
432fn overlay_rebuild_threshold(base_len: u64) -> u64 {
436 (base_len / 5).max(1024)
437}
438
439fn overlay_upsert(handle: &mut CollectionHandle, ext_id: &str, vector: &[f32]) {
444 bump_write_gen(handle);
445 let cur = handle.snapshot.load_full();
446 let mut overlay = cur.overlay.as_ref().clone();
447 if let Some(&old) = handle.ext_to_int.get(ext_id) {
449 overlay.tombstones.insert(old);
450 }
451 let internal = cur.base_len + overlay.upserts.len() as u64;
452 overlay.upserts.push((Arc::from(vector), ext_id.to_owned()));
453 handle.ext_to_int.insert(ext_id.to_owned(), internal);
454 handle.int_to_ext.push(ext_id.to_owned());
455 let crowded = overlay.upserts.len() as u64 >= overlay_rebuild_threshold(cur.base_len);
456 publish_overlay(handle, &cur, Arc::new(overlay));
457 if crowded {
460 handle.stale = true;
461 }
462}
463
464fn overlay_delete(handle: &mut CollectionHandle, ext_id: &str) {
467 bump_write_gen(handle);
468 let Some(&internal) = handle.ext_to_int.get(ext_id) else {
469 return;
470 };
471 let cur = handle.snapshot.load_full();
472 let mut overlay = cur.overlay.as_ref().clone();
473 overlay.tombstones.insert(internal);
474 publish_overlay(handle, &cur, Arc::new(overlay));
475}
476
477#[derive(Serialize, Deserialize)]
483struct IndexEnvelope {
484 version: u16,
485 int_to_ext: Vec<String>,
486 ivf: Vec<u8>,
487}
488
489const INDEX_ENVELOPE_VERSION: u16 = 1;
492
493#[derive(Serialize, Deserialize)]
502struct DiskEnvelope {
503 version: u16,
504 int_to_ext: Vec<String>,
505 base_row_count: u64,
506 deleted_ids: Vec<u64>,
507}
508
509struct CollectionHandle {
510 id: CollectionId,
511 descriptor: Descriptor,
512 index: CollectionIndex,
513 int_to_ext: Vec<String>,
514 ext_to_int: HashMap<String, u64>,
515 stale: bool,
516 write_gen: u64,
524 docs: Option<BTreeMap<String, u32>>,
530 sparse: Option<SparseInvertedIndex>,
537 mvcc: bool,
542 snapshot: SnapshotCell,
546}
547
548fn uses_sparse_index(descriptor: &Descriptor) -> bool {
551 !descriptor.multivector && descriptor.vector_encryption != VectorEncryption::ClientSide
552}
553
554fn mark_stale(handle: &mut CollectionHandle) {
558 handle.stale = true;
559 bump_write_gen(handle);
560}
561
562fn bump_write_gen(handle: &mut CollectionHandle) {
571 handle.write_gen = handle.write_gen.wrapping_add(1);
572}
573
574pub struct Database {
576 store: Store,
577 collections: HashMap<String, CollectionHandle>,
578 mvcc: bool,
582}
583
584fn mvcc_from_env() -> bool {
586 std::env::var("QUIVER_MVCC_READS")
587 .map(|v| matches!(v.as_str(), "1" | "true" | "TRUE" | "on" | "yes"))
588 .unwrap_or(false)
589}
590
591impl Database {
592 pub fn open(dir: &Path) -> Result<Self> {
595 Self::from_store(Store::open(dir)?)
596 }
597
598 pub fn open_with_codec(dir: &Path, codec: Box<dyn PageCodec>) -> Result<Self> {
603 Self::from_store(Store::open_with_codec(dir, codec)?)
604 }
605
606 pub fn open_with_keyring(dir: &Path, keyring: Box<dyn KeyRing>) -> Result<Self> {
611 Self::from_store(Store::open_with_keyring(dir, keyring)?)
612 }
613
614 fn from_store(store: Store) -> Result<Self> {
616 let mvcc = mvcc_from_env();
617 let mut collections = HashMap::new();
618 for name in store.collection_names() {
619 let Some(id) = store.collection_id(&name) else {
620 continue;
621 };
622 let Some(descriptor) = store.descriptor(id).cloned() else {
623 continue;
624 };
625 let snapshot = empty_snapshot(&descriptor);
626 let mut handle = CollectionHandle {
627 id,
628 index: empty_index(&descriptor),
629 descriptor,
630 int_to_ext: Vec::new(),
631 ext_to_int: HashMap::new(),
632 stale: true,
633 write_gen: 0,
634 docs: None,
635 sparse: None,
637 mvcc,
638 snapshot,
639 };
640 load_index(&store, &mut handle)?;
641 if mvcc_served(&handle) {
646 handle.stale = true;
647 }
648 collections.insert(name, handle);
649 }
650 Ok(Self {
651 store,
652 collections,
653 mvcc,
654 })
655 }
656
657 pub fn create_collection(&mut self, name: &str, descriptor: Descriptor) -> Result<()> {
660 validate_index(&descriptor)?;
661 let id = self.store.create_collection(name, descriptor.clone())?;
662 let index = empty_index(&descriptor);
663 let docs = descriptor.multivector.then(BTreeMap::new);
664 let sparse = uses_sparse_index(&descriptor).then(SparseInvertedIndex::new);
668 let snapshot = empty_snapshot(&descriptor);
669 let mvcc = self.mvcc;
670 self.collections.insert(
671 name.to_owned(),
672 CollectionHandle {
673 id,
674 descriptor,
675 index,
676 int_to_ext: Vec::new(),
677 ext_to_int: HashMap::new(),
678 stale: false,
679 write_gen: 0,
680 docs,
681 sparse,
682 mvcc,
683 snapshot,
684 },
685 );
686 Ok(())
687 }
688
689 pub fn drop_collection(&mut self, name: &str) -> Result<bool> {
691 let existed = self.store.drop_collection(name)?;
692 self.collections.remove(name);
693 Ok(existed)
694 }
695
696 pub fn shred_collection(&mut self, name: &str) -> Result<bool> {
702 let existed = self.store.shred_collection(name)?;
703 self.collections.remove(name);
704 Ok(existed)
705 }
706
707 pub fn set_commit_observer(&mut self, observer: CommitObserver) {
711 self.store.set_commit_observer(observer);
712 }
713
714 pub fn replication_snapshot(&self) -> Result<Vec<WalOp>> {
720 Ok(self.store.replication_snapshot()?)
721 }
722
723 pub fn apply_replicated(&mut self, op: WalOp) -> Result<()> {
732 let target = match &op {
733 WalOp::CreateCollection { collection_id, .. }
734 | WalOp::DropCollection { collection_id }
735 | WalOp::Upsert { collection_id, .. }
736 | WalOp::Delete { collection_id, .. } => Some(*collection_id),
737 WalOp::Checkpoint { .. } => None,
738 };
739 let create_name = match &op {
740 WalOp::CreateCollection { name, .. } => Some(name.clone()),
741 _ => None,
742 };
743 let is_drop = matches!(op, WalOp::DropCollection { .. });
744 self.store.apply_replicated(op)?;
745
746 if let Some(name) = create_name {
747 if let Some(id) = target
749 && let Some(descriptor) = self.store.descriptor(id).cloned()
750 {
751 let docs = descriptor.multivector.then(BTreeMap::new);
752 let index = empty_index(&descriptor);
753 let snapshot = empty_snapshot(&descriptor);
754 let mvcc = self.mvcc;
755 self.collections.insert(
758 name,
759 CollectionHandle {
760 id,
761 descriptor,
762 index,
763 int_to_ext: Vec::new(),
764 ext_to_int: HashMap::new(),
765 stale: false,
766 write_gen: 0,
767 docs,
768 sparse: None,
769 mvcc,
770 snapshot,
771 },
772 );
773 }
774 } else if is_drop {
775 if let Some(id) = target {
776 self.collections.retain(|_, h| h.id != id);
777 }
778 } else if let Some(id) = target
779 && let Some(handle) = self.collections.values_mut().find(|h| h.id == id)
780 {
781 mark_stale(handle);
782 }
783 Ok(())
784 }
785
786 #[must_use]
788 pub fn collection_names(&self) -> Vec<String> {
789 self.store.collection_names()
790 }
791
792 #[must_use]
794 pub fn descriptor(&self, name: &str) -> Option<&Descriptor> {
795 self.collections.get(name).map(|h| &h.descriptor)
796 }
797
798 pub fn len(&self, name: &str) -> Result<usize> {
800 let handle = self.handle(name)?;
801 Ok(self.store.len(handle.id)?)
802 }
803
804 pub fn is_empty(&self, name: &str) -> Result<bool> {
806 Ok(self.len(name)? == 0)
807 }
808
809 pub fn upsert(
811 &mut self,
812 collection: &str,
813 id: &str,
814 vector: &[f32],
815 payload: &Value,
816 ) -> Result<()> {
817 let handle = self
818 .collections
819 .get_mut(collection)
820 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
821 require_single_vector(handle)?;
822 let payload_bytes = serde_json::to_vec(payload)?;
823 self.store.upsert(handle.id, id, vector, &payload_bytes)?;
824 if handle.descriptor.vector_encryption == VectorEncryption::ClientSide {
828 return Ok(());
829 }
830 index_upsert_point(handle, id, vector)?;
833 sparse_index_upsert_point(handle, id, payload);
835 Ok(())
836 }
837
838 pub fn upsert_batch(
845 &mut self,
846 collection: &str,
847 points: &[(&str, &[f32], &serde_json::Value)],
848 ) -> Result<u64> {
849 let handle = self
850 .collections
851 .get(collection)
852 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
853 require_single_vector(handle)?;
854 let coll_id = handle.id;
855 let is_client_side = handle.descriptor.vector_encryption == VectorEncryption::ClientSide;
856
857 let payload_bytes: Vec<Vec<u8>> = points
858 .iter()
859 .map(|(_, _, p)| serde_json::to_vec(p).map_err(Error::Json))
860 .collect::<Result<_>>()?;
861
862 let records: Vec<(&str, &[f32], &[u8])> = points
863 .iter()
864 .zip(payload_bytes.iter())
865 .map(|((id, vec, _), p)| (*id, *vec, p.as_slice()))
866 .collect();
867
868 self.store.upsert_batch(coll_id, &records)?;
869
870 if is_client_side {
871 return Ok(records.len() as u64);
872 }
873
874 for (id, vector, payload) in points {
875 let handle = self
876 .collections
877 .get_mut(collection)
878 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
879 index_upsert_point(handle, id, vector)?;
880 sparse_index_upsert_point(handle, id, payload);
881 }
882 Ok(records.len() as u64)
883 }
884
885 pub fn upsert_bulk(
896 &mut self,
897 collection: &str,
898 points: &[(&str, &[f32], &serde_json::Value)],
899 ) -> Result<u64> {
900 let handle = self
901 .collections
902 .get_mut(collection)
903 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
904 require_single_vector(handle)?;
905 let coll_id = handle.id;
906
907 let payload_bytes: Vec<Vec<u8>> = points
908 .iter()
909 .map(|(_, _, p)| serde_json::to_vec(p).map_err(Error::Json))
910 .collect::<Result<_>>()?;
911 let records: Vec<(&str, &[f32], &[u8])> = points
912 .iter()
913 .zip(payload_bytes.iter())
914 .map(|((id, vec, _), p)| (*id, *vec, p.as_slice()))
915 .collect();
916
917 self.store.upsert_batch(coll_id, &records)?;
918
919 let handle = self
923 .collections
924 .get_mut(collection)
925 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
926 mark_stale(handle);
927 Ok(records.len() as u64)
928 }
929
930 pub fn delete(&mut self, collection: &str, id: &str) -> Result<bool> {
932 let handle = self
933 .collections
934 .get_mut(collection)
935 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
936 require_single_vector(handle)?;
937 let existed = self.store.delete(handle.id, id)?;
938 if !existed {
939 return Ok(false);
940 }
941 if handle.descriptor.vector_encryption == VectorEncryption::ClientSide {
944 return Ok(true);
945 }
946 index_delete_point(handle, id);
952 sparse_index_delete_point(handle, id);
954 Ok(true)
955 }
956
957 pub fn get(&self, collection: &str, id: &str) -> Result<Option<Match>> {
959 let handle = self.handle(collection)?;
960 require_single_vector(handle)?;
961 match self.store.get(handle.id, id)? {
962 Some(record) => Ok(Some(Match {
963 id: id.to_owned(),
964 score: 0.0,
965 payload: Some(serde_json::from_slice(&record.payload)?),
966 vector: Some(record.vector),
967 })),
968 None => Ok(None),
969 }
970 }
971
972 pub fn fetch(
986 &self,
987 collection: &str,
988 filter: Option<&Filter>,
989 limit: usize,
990 with_payload: bool,
991 with_vector: bool,
992 ) -> Result<Vec<Match>> {
993 let handle = self.handle(collection)?;
994 require_single_vector(handle)?;
995 let mut out = Vec::new();
996 for (id, record) in self.store.scan(handle.id)? {
997 if out.len() >= limit {
998 break;
999 }
1000 let payload: Value = serde_json::from_slice(&record.payload)?;
1001 if let Some(filter) = filter
1002 && !filter.matches(&payload)
1003 {
1004 continue;
1005 }
1006 out.push(Match {
1007 id,
1008 score: 0.0,
1009 payload: with_payload.then_some(payload),
1010 vector: with_vector.then_some(record.vector),
1011 });
1012 }
1013 Ok(out)
1014 }
1015
1016 pub fn ensure_indexed(&mut self, collection: &str) -> Result<()> {
1022 if self.handle(collection)?.stale {
1023 let store = &self.store;
1024 let handle = self
1025 .collections
1026 .get_mut(collection)
1027 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
1028 rebuild_index(store, handle)?;
1029 if mvcc_served(handle) {
1033 publish_base(handle);
1034 }
1035 }
1036 Ok(())
1037 }
1038
1039 pub fn set_mvcc_reads(&mut self, on: bool) {
1045 self.mvcc = on;
1046 for handle in self.collections.values_mut() {
1047 handle.mvcc = on;
1048 if mvcc_eligible(&handle.descriptor) {
1052 handle.stale = true;
1053 }
1054 }
1055 }
1056
1057 #[must_use]
1059 pub fn mvcc_reads(&self) -> bool {
1060 self.mvcc
1061 }
1062
1063 pub fn collection_snapshot(&self, collection: &str) -> Result<SnapshotCell> {
1073 Ok(self.handle(collection)?.snapshot.clone())
1074 }
1075
1076 pub fn mvcc_cell(&self, collection: &str) -> Result<Option<SnapshotCell>> {
1086 let handle = self.handle(collection)?;
1087 Ok(mvcc_served(handle).then(|| handle.snapshot.clone()))
1088 }
1089
1090 pub fn needs_rebuild(&self, collection: &str) -> Result<bool> {
1095 Ok(self.handle(collection)?.stale)
1096 }
1097
1098 pub fn snapshot_rebuild_inputs(&self, collection: &str) -> Result<Option<RebuildInputs>> {
1104 let handle = self.handle(collection)?;
1105 if !handle.stale {
1106 return Ok(None);
1107 }
1108 let scan = scan_collection(&self.store, handle)?;
1109 Ok(Some(RebuildInputs {
1110 collection: collection.to_owned(),
1111 descriptor: handle.descriptor.clone(),
1112 scan,
1113 write_gen: handle.write_gen,
1114 }))
1115 }
1116
1117 pub fn commit_rebuild(&mut self, rebuilt: RebuiltIndex) -> Result<bool> {
1124 let store = &self.store;
1125 let Some(handle) = self.collections.get_mut(&rebuilt.collection) else {
1126 return Ok(false);
1127 };
1128 match rebuilt.kind {
1129 RebuiltKind::Ready(index) => handle.index = *index,
1130 RebuiltKind::Disk { graph, pq } => {
1131 handle.index = empty_index(&handle.descriptor);
1135 let disk = write_disk_index(store, handle.id, &graph, &pq)?;
1136 handle.index = CollectionIndex::Disk(Some(FreshDiskVamana::new(disk)?));
1137 }
1138 }
1139 handle.int_to_ext = rebuilt.int_to_ext;
1140 handle.ext_to_int = rebuilt.ext_to_int;
1141 handle.docs = rebuilt.docs;
1142 handle.sparse = rebuilt.sparse;
1143 let still_stale = handle.write_gen != rebuilt.write_gen;
1146 handle.stale = still_stale;
1147 if mvcc_served(handle) {
1154 publish_base(handle);
1155 }
1156 Ok(still_stale)
1157 }
1158
1159 pub fn search(
1162 &mut self,
1163 collection: &str,
1164 query: &[f32],
1165 params: &SearchParams,
1166 ) -> Result<Vec<Match>> {
1167 self.ensure_indexed(collection)?;
1172 self.search_snapshot(collection, query, params)
1173 }
1174
1175 pub fn search_snapshot(
1182 &self,
1183 collection: &str,
1184 query: &[f32],
1185 params: &SearchParams,
1186 ) -> Result<Vec<Match>> {
1187 require_single_vector(self.handle(collection)?)?;
1188 require_server_searchable(self.handle(collection)?)?;
1189
1190 let handle = self.handle(collection)?;
1191
1192 if mvcc_served(handle) {
1198 return self.search_snapshot_mvcc(handle, query, params);
1199 }
1200
1201 if let Some(filter) = ¶ms.filter
1205 && let Some(candidates) = candidate_ids(
1206 &self.store,
1207 handle.id,
1208 filter,
1209 &handle.descriptor.filterable,
1210 )?
1211 && candidates.len() <= FULL_SCAN_THRESHOLD
1212 {
1213 return self.exact_filtered_search(
1214 handle.id,
1215 &handle.descriptor,
1216 query,
1217 params,
1218 filter,
1219 &candidates,
1220 );
1221 }
1222
1223 let fetch = if params.filter.is_some() {
1224 params
1225 .k
1226 .saturating_mul(FILTER_OVERFETCH)
1227 .max(params.ef_search)
1228 } else {
1229 params.k
1230 };
1231 let raw = handle.index.search(query, fetch, params.ef_search)?;
1232
1233 let need_record = params.filter.is_some() || params.with_payload || params.with_vector;
1234 let mut out = Vec::with_capacity(params.k);
1235 for neighbor in raw {
1236 if out.len() >= params.k {
1237 break;
1238 }
1239 let Some(ext_id) = handle.int_to_ext.get(neighbor.id as usize) else {
1240 continue;
1241 };
1242 let record = if need_record {
1243 self.store.get(handle.id, ext_id)?
1244 } else {
1245 None
1246 };
1247 let payload_value: Option<Value> = match &record {
1248 Some(r) if params.filter.is_some() || params.with_payload => {
1249 Some(serde_json::from_slice(&r.payload)?)
1250 }
1251 _ => None,
1252 };
1253 if let Some(filter) = ¶ms.filter {
1254 let value = payload_value.as_ref().unwrap_or(&Value::Null);
1255 if !filter.matches(value) {
1256 continue;
1257 }
1258 }
1259 out.push(Match {
1260 id: ext_id.clone(),
1261 score: neighbor.distance,
1262 payload: if params.with_payload {
1263 payload_value
1264 } else {
1265 None
1266 },
1267 vector: if params.with_vector {
1268 record.map(|r| r.vector)
1269 } else {
1270 None
1271 },
1272 });
1273 }
1274 Ok(out)
1275 }
1276
1277 fn search_snapshot_mvcc(
1283 &self,
1284 handle: &CollectionHandle,
1285 query: &[f32],
1286 params: &SearchParams,
1287 ) -> Result<Vec<Match>> {
1288 if let Some(filter) = ¶ms.filter
1291 && let Some(candidates) = candidate_ids(
1292 &self.store,
1293 handle.id,
1294 filter,
1295 &handle.descriptor.filterable,
1296 )?
1297 && candidates.len() <= FULL_SCAN_THRESHOLD
1298 {
1299 return self.exact_filtered_search(
1300 handle.id,
1301 &handle.descriptor,
1302 query,
1303 params,
1304 filter,
1305 &candidates,
1306 );
1307 }
1308
1309 let fetch = if params.filter.is_some() {
1313 params
1314 .k
1315 .saturating_mul(FILTER_OVERFETCH)
1316 .max(params.ef_search)
1317 } else {
1318 params.k
1319 };
1320 let dense = handle
1321 .snapshot
1322 .load()
1323 .search(query, fetch, params.ef_search)?;
1324 let need_record = params.filter.is_some() || params.with_payload || params.with_vector;
1325 let mut out = Vec::with_capacity(params.k);
1326 for m in dense {
1327 if out.len() >= params.k {
1328 break;
1329 }
1330 let record = if need_record {
1331 self.store.get(handle.id, &m.id)?
1332 } else {
1333 None
1334 };
1335 let payload_value: Option<Value> = match &record {
1336 Some(r) if params.filter.is_some() || params.with_payload => {
1337 Some(serde_json::from_slice(&r.payload)?)
1338 }
1339 _ => None,
1340 };
1341 if let Some(filter) = ¶ms.filter
1342 && !filter.matches(payload_value.as_ref().unwrap_or(&Value::Null))
1343 {
1344 continue;
1345 }
1346 out.push(Match {
1347 id: m.id,
1348 score: m.score,
1349 payload: if params.with_payload {
1350 payload_value
1351 } else {
1352 None
1353 },
1354 vector: if params.with_vector {
1355 record.map(|r| r.vector)
1356 } else {
1357 None
1358 },
1359 });
1360 }
1361 Ok(out)
1362 }
1363
1364 pub fn hybrid_search(
1372 &mut self,
1373 collection: &str,
1374 dense_query: Option<&[f32]>,
1375 sparse_query: Option<&SparseVector>,
1376 text_query: Option<&str>,
1377 params: &SearchParams,
1378 rrf_k0: f32,
1379 ) -> Result<Vec<Match>> {
1380 self.ensure_indexed(collection)?;
1384 self.hybrid_search_snapshot(
1385 collection,
1386 dense_query,
1387 sparse_query,
1388 text_query,
1389 params,
1390 rrf_k0,
1391 )
1392 }
1393
1394 pub fn hybrid_search_snapshot(
1399 &self,
1400 collection: &str,
1401 dense_query: Option<&[f32]>,
1402 sparse_query: Option<&SparseVector>,
1403 text_query: Option<&str>,
1404 params: &SearchParams,
1405 rrf_k0: f32,
1406 ) -> Result<Vec<Match>> {
1407 require_single_vector(self.handle(collection)?)?;
1408 require_server_searchable(self.handle(collection)?)?;
1409 if dense_query.is_none() && sparse_query.is_none() && text_query.is_none() {
1410 return Err(Error::Unsupported(
1411 "hybrid_search requires a dense query, a sparse query, or a text query",
1412 ));
1413 }
1414 let handle = self.handle(collection)?;
1415
1416 let depth = params
1419 .k
1420 .saturating_mul(RRF_CANDIDATE_FACTOR)
1421 .max(MIN_RRF_CANDIDATES);
1422 let filter = params.filter.as_ref();
1423 let mut lists: Vec<Vec<String>> = Vec::new();
1424 if let Some(q) = dense_query {
1425 lists.push(self.dense_ranked_ids(handle, q, depth, params.ef_search, filter)?);
1426 }
1427 if let Some(sp) = sparse_query {
1428 lists.push(self.sparse_ranked_ids(handle, sp, depth, filter)?);
1429 }
1430 if let Some(text) = text_query {
1431 lists.push(self.bm25_ranked_ids(handle, text, depth, filter)?);
1432 }
1433 let fused = rrf_fuse(&lists, rrf_k0, params.k);
1434
1435 let mut out = Vec::with_capacity(fused.len());
1436 for (ext_id, score) in fused {
1437 let record = if params.with_payload || params.with_vector {
1438 self.store.get(handle.id, &ext_id)?
1439 } else {
1440 None
1441 };
1442 let payload = match (&record, params.with_payload) {
1443 (Some(r), true) => Some(serde_json::from_slice(&r.payload)?),
1444 _ => None,
1445 };
1446 out.push(Match {
1447 id: ext_id,
1448 score,
1449 payload,
1450 vector: if params.with_vector {
1451 record.map(|r| r.vector)
1452 } else {
1453 None
1454 },
1455 });
1456 }
1457 Ok(out)
1458 }
1459
1460 fn dense_ranked_ids(
1463 &self,
1464 handle: &CollectionHandle,
1465 query: &[f32],
1466 depth: usize,
1467 ef_search: usize,
1468 filter: Option<&Filter>,
1469 ) -> Result<Vec<String>> {
1470 let mut ids = Vec::new();
1471 if mvcc_served(handle) {
1475 for m in handle
1476 .snapshot
1477 .load()
1478 .search(query, depth, ef_search.max(depth))?
1479 {
1480 if !self.passes_filter(handle.id, &m.id, filter)? {
1481 continue;
1482 }
1483 ids.push(m.id);
1484 if ids.len() >= depth {
1485 break;
1486 }
1487 }
1488 return Ok(ids);
1489 }
1490 let raw = handle.index.search(query, depth, ef_search.max(depth))?;
1491 for neighbor in raw {
1492 let Some(ext_id) = handle.int_to_ext.get(neighbor.id as usize) else {
1493 continue;
1494 };
1495 if !self.passes_filter(handle.id, ext_id, filter)? {
1496 continue;
1497 }
1498 ids.push(ext_id.clone());
1499 if ids.len() >= depth {
1500 break;
1501 }
1502 }
1503 Ok(ids)
1504 }
1505
1506 fn sparse_ranked_ids(
1513 &self,
1514 handle: &CollectionHandle,
1515 query: &SparseVector,
1516 depth: usize,
1517 filter: Option<&Filter>,
1518 ) -> Result<Vec<String>> {
1519 if let Some(idx) = handle.sparse.as_ref() {
1520 let mut ids = Vec::new();
1521 for (ext_id, _score) in idx.search(query) {
1522 if !self.passes_filter(handle.id, &ext_id, filter)? {
1523 continue;
1524 }
1525 ids.push(ext_id);
1526 if ids.len() >= depth {
1527 break;
1528 }
1529 }
1530 return Ok(ids);
1531 }
1532 self.sparse_ranked_ids_by_scan(handle.id, query, depth, filter)
1533 }
1534
1535 fn sparse_ranked_ids_by_scan(
1539 &self,
1540 cid: CollectionId,
1541 query: &SparseVector,
1542 depth: usize,
1543 filter: Option<&Filter>,
1544 ) -> Result<Vec<String>> {
1545 let qmap: HashMap<u32, f32> = query
1546 .indices
1547 .iter()
1548 .copied()
1549 .zip(query.values.iter().copied())
1550 .collect();
1551 let mut scored: Vec<(f32, String)> = Vec::new();
1552 for (ext_id, record) in self.store.scan(cid)? {
1553 if record.payload.is_empty() {
1554 continue;
1555 }
1556 let Ok(value) = serde_json::from_slice::<Value>(&record.payload) else {
1557 continue;
1558 };
1559 if let Some(filter) = filter
1560 && !filter.matches(&value)
1561 {
1562 continue;
1563 }
1564 let Some(raw) = value.get(SPARSE_KEY) else {
1565 continue;
1566 };
1567 let Ok(sv) = serde_json::from_value::<SparseVector>(raw.clone()) else {
1568 continue;
1569 };
1570 let mut score = 0.0f32;
1571 for (dim, weight) in sv.indices.iter().zip(sv.values.iter()) {
1572 if let Some(qw) = qmap.get(dim) {
1573 score += qw * weight;
1574 }
1575 }
1576 if score > 0.0 {
1577 scored.push((score, ext_id));
1578 }
1579 }
1580 scored.sort_by(|a, b| b.0.total_cmp(&a.0).then(a.1.cmp(&b.1)));
1581 Ok(scored.into_iter().take(depth).map(|(_, id)| id).collect())
1582 }
1583
1584 fn bm25_ranked_ids(
1592 &self,
1593 handle: &CollectionHandle,
1594 query_text: &str,
1595 depth: usize,
1596 filter: Option<&Filter>,
1597 ) -> Result<Vec<String>> {
1598 let Some(idx) = handle.sparse.as_ref() else {
1599 return Ok(Vec::new());
1600 };
1601 let terms = query_term_ids(query_text);
1602 let mut ids = Vec::new();
1603 for (ext_id, _score) in idx.bm25_search(&terms, BM25_K1, BM25_B) {
1604 if !self.passes_filter(handle.id, &ext_id, filter)? {
1605 continue;
1606 }
1607 ids.push(ext_id);
1608 if ids.len() >= depth {
1609 break;
1610 }
1611 }
1612 Ok(ids)
1613 }
1614
1615 fn passes_filter(
1618 &self,
1619 cid: CollectionId,
1620 ext_id: &str,
1621 filter: Option<&Filter>,
1622 ) -> Result<bool> {
1623 let Some(filter) = filter else {
1624 return Ok(true);
1625 };
1626 let value: Value = match self.store.get(cid, ext_id)? {
1627 Some(r) => serde_json::from_slice(&r.payload)?,
1628 None => Value::Null,
1629 };
1630 Ok(filter.matches(&value))
1631 }
1632
1633 fn exact_filtered_search(
1638 &self,
1639 cid: CollectionId,
1640 descriptor: &Descriptor,
1641 query: &[f32],
1642 params: &SearchParams,
1643 filter: &Filter,
1644 candidates: &BTreeSet<String>,
1645 ) -> Result<Vec<Match>> {
1646 let metric = to_index_metric(descriptor.metric);
1647 let mut scored: Vec<(f32, String, Value, Vec<f32>)> = Vec::new();
1648 for ext_id in candidates {
1649 let Some(record) = self.store.get(cid, ext_id)? else {
1650 continue;
1651 };
1652 let payload: Value = serde_json::from_slice(&record.payload)?;
1653 if !filter.matches(&payload) {
1654 continue;
1655 }
1656 let ordering = ordering_distance(metric, query, &record.vector);
1657 scored.push((ordering, ext_id.clone(), payload, record.vector));
1658 }
1659 scored.sort_by(|a, b| a.0.total_cmp(&b.0));
1660 scored.truncate(params.k);
1661 Ok(scored
1662 .into_iter()
1663 .map(|(ordering, id, payload, vector)| Match {
1664 id,
1665 score: report_metric(metric, ordering),
1666 payload: params.with_payload.then_some(payload),
1667 vector: params.with_vector.then_some(vector),
1668 })
1669 .collect())
1670 }
1671
1672 pub fn upsert_document(
1681 &mut self,
1682 collection: &str,
1683 doc_id: &str,
1684 vectors: &[Vec<f32>],
1685 payload: &Value,
1686 ) -> Result<()> {
1687 let handle = self
1688 .collections
1689 .get_mut(collection)
1690 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
1691 require_multivector(handle)?;
1692 if doc_id.contains(DOC_TOKEN_SEP) {
1693 return Err(Error::Unsupported(
1694 "document id must not contain the reserved 0x1f separator",
1695 ));
1696 }
1697 if vectors.is_empty() {
1698 return Err(Error::Unsupported("a document needs at least one vector"));
1699 }
1700 let dim = handle.descriptor.dim as usize;
1701 if vectors.iter().any(|v| v.len() != dim) {
1702 return Err(Error::Unsupported(
1703 "every document vector must match the collection dimensionality",
1704 ));
1705 }
1706 let previous = handle
1709 .docs
1710 .as_ref()
1711 .and_then(|d| d.get(doc_id))
1712 .copied()
1713 .unwrap_or(0) as usize;
1714 for j in vectors.len()..previous {
1715 self.store.delete(handle.id, &token_id(doc_id, j))?;
1716 index_delete_point(handle, &token_id(doc_id, j));
1718 }
1719 let payload_bytes = serde_json::to_vec(payload)?;
1720 for (j, vector) in vectors.iter().enumerate() {
1721 let bytes: &[u8] = if j == 0 {
1723 payload_bytes.as_slice()
1724 } else {
1725 &[]
1726 };
1727 self.store
1728 .upsert(handle.id, &token_id(doc_id, j), vector, bytes)?;
1729 index_upsert_point(handle, &token_id(doc_id, j), vector)?;
1733 }
1734 if let Some(docs) = handle.docs.as_mut() {
1735 docs.insert(doc_id.to_owned(), vectors.len() as u32);
1736 }
1737 Ok(())
1738 }
1739
1740 pub fn search_multi_vector(
1748 &mut self,
1749 collection: &str,
1750 query_tokens: &[Vec<f32>],
1751 params: &SearchParams,
1752 ) -> Result<Vec<DocumentMatch>> {
1753 self.ensure_indexed(collection)?;
1757 self.search_multi_vector_snapshot(collection, query_tokens, params)
1758 }
1759
1760 pub fn search_multi_vector_snapshot(
1767 &self,
1768 collection: &str,
1769 query_tokens: &[Vec<f32>],
1770 params: &SearchParams,
1771 ) -> Result<Vec<DocumentMatch>> {
1772 require_multivector(self.handle(collection)?)?;
1773 let dim = self.handle(collection)?.descriptor.dim as usize;
1774 if query_tokens.is_empty() {
1775 return Ok(Vec::new());
1776 }
1777 if query_tokens.iter().any(|v| v.len() != dim) {
1778 return Err(Error::Unsupported(
1779 "every query token must match the collection dimensionality",
1780 ));
1781 }
1782
1783 let doc_count = self
1784 .handle(collection)?
1785 .docs
1786 .as_ref()
1787 .map_or(0, BTreeMap::len);
1788 let candidates: Vec<String> = if doc_count <= MULTIVECTOR_EXACT_DOC_THRESHOLD {
1789 self.handle(collection)?
1791 .docs
1792 .as_ref()
1793 .map(|d| d.keys().cloned().collect())
1794 .unwrap_or_default()
1795 } else {
1796 let handle = self.handle(collection)?;
1800 let per_token_k = params
1801 .k
1802 .saturating_mul(MULTIVECTOR_CANDIDATE_FACTOR)
1803 .max(params.ef_search);
1804 let mut set = BTreeSet::new();
1805 for token in query_tokens {
1806 for neighbor in handle.index.search(token, per_token_k, params.ef_search)? {
1807 if let Some(ext) = handle.int_to_ext.get(neighbor.id as usize)
1808 && let Some((doc, _)) = parse_token_id(ext)
1809 {
1810 set.insert(doc.to_owned());
1811 }
1812 }
1813 }
1814 set.into_iter().collect()
1815 };
1816
1817 let handle = self.handle(collection)?;
1819 let cid = handle.id;
1820 let metric = to_index_metric(handle.descriptor.metric);
1821 let mut scored: Vec<ScoredDocument> = Vec::new();
1822 for doc in &candidates {
1823 let count = handle
1824 .docs
1825 .as_ref()
1826 .and_then(|d| d.get(doc))
1827 .copied()
1828 .unwrap_or(0) as usize;
1829 let (tokens, payload) = self.gather_document(cid, doc, count)?;
1830 if tokens.is_empty() {
1831 continue;
1832 }
1833 if let Some(filter) = ¶ms.filter {
1834 let value = payload.clone().unwrap_or(Value::Null);
1835 if !filter.matches(&value) {
1836 continue;
1837 }
1838 }
1839 let score = max_sim(metric, query_tokens, &tokens);
1840 let vectors = params.with_vector.then_some(tokens);
1841 scored.push((score, doc.clone(), payload, vectors));
1842 }
1843 scored.sort_by(|a, b| b.0.total_cmp(&a.0).then_with(|| a.1.cmp(&b.1)));
1845 scored.truncate(params.k);
1846 Ok(scored
1847 .into_iter()
1848 .map(|(score, id, payload, vectors)| DocumentMatch {
1849 id,
1850 score,
1851 payload: params.with_payload.then_some(payload).flatten(),
1852 vectors,
1853 })
1854 .collect())
1855 }
1856
1857 pub fn get_document(
1860 &self,
1861 collection: &str,
1862 doc_id: &str,
1863 with_vectors: bool,
1864 ) -> Result<Option<DocumentMatch>> {
1865 let handle = self.handle(collection)?;
1866 require_multivector(handle)?;
1867 let Some(&count) = handle.docs.as_ref().and_then(|d| d.get(doc_id)) else {
1868 return Ok(None);
1869 };
1870 let (tokens, payload) = self.gather_document(handle.id, doc_id, count as usize)?;
1871 if tokens.is_empty() {
1872 return Ok(None);
1873 }
1874 Ok(Some(DocumentMatch {
1875 id: doc_id.to_owned(),
1876 score: 0.0,
1877 payload,
1878 vectors: with_vectors.then_some(tokens),
1879 }))
1880 }
1881
1882 pub fn delete_document(&mut self, collection: &str, doc_id: &str) -> Result<bool> {
1885 let handle = self
1886 .collections
1887 .get_mut(collection)
1888 .ok_or_else(|| Error::CollectionNotFound(collection.to_owned()))?;
1889 require_multivector(handle)?;
1890 let Some(count) = handle.docs.as_ref().and_then(|d| d.get(doc_id)).copied() else {
1891 return Ok(false);
1892 };
1893 for j in 0..count as usize {
1894 self.store.delete(handle.id, &token_id(doc_id, j))?;
1895 index_delete_point(handle, &token_id(doc_id, j));
1897 }
1898 if let Some(docs) = handle.docs.as_mut() {
1899 docs.remove(doc_id);
1900 }
1901 Ok(true)
1902 }
1903
1904 pub fn document_count(&self, collection: &str) -> Result<usize> {
1907 let handle = self.handle(collection)?;
1908 require_multivector(handle)?;
1909 Ok(handle.docs.as_ref().map_or(0, BTreeMap::len))
1910 }
1911
1912 fn gather_document(
1916 &self,
1917 cid: CollectionId,
1918 doc_id: &str,
1919 count: usize,
1920 ) -> Result<(Vec<Vec<f32>>, Option<Value>)> {
1921 let mut tokens = Vec::with_capacity(count);
1922 let mut payload: Option<Value> = None;
1923 for j in 0..count {
1924 let Some(record) = self.store.get(cid, &token_id(doc_id, j))? else {
1925 continue;
1926 };
1927 if j == 0 && !record.payload.is_empty() {
1928 payload = Some(serde_json::from_slice(&record.payload)?);
1929 }
1930 tokens.push(record.vector);
1931 }
1932 Ok((tokens, payload))
1933 }
1934
1935 pub fn checkpoint(&mut self) -> Result<()> {
1940 let mut snapshots: HashMap<CollectionId, Vec<u8>> = HashMap::new();
1941 for handle in self.collections.values() {
1942 if handle.stale {
1943 continue;
1944 }
1945 if let CollectionIndex::Ivf(Some(ivf)) = &handle.index {
1946 if ivf.is_empty() {
1947 continue;
1948 }
1949 let envelope = IndexEnvelope {
1950 version: INDEX_ENVELOPE_VERSION,
1951 int_to_ext: handle.int_to_ext.clone(),
1952 ivf: ivf.snapshot()?,
1953 };
1954 snapshots.insert(handle.id, postcard::to_allocvec(&envelope)?);
1955 } else if let CollectionIndex::Disk(Some(fresh)) = &handle.index {
1956 let envelope = DiskEnvelope {
1962 version: INDEX_ENVELOPE_VERSION,
1963 int_to_ext: handle.int_to_ext.clone(),
1964 base_row_count: fresh.base_len() as u64,
1965 deleted_ids: fresh.deleted_ids(),
1966 };
1967 snapshots.insert(handle.id, postcard::to_allocvec(&envelope)?);
1968 }
1969 }
1970 self.store.checkpoint_with_index_snapshots(&snapshots)?;
1971 Ok(())
1972 }
1973
1974 pub fn compact(&mut self) -> Result<()> {
1978 Ok(self.store.compact()?)
1979 }
1980
1981 #[must_use]
1984 pub fn manifest_version(&self) -> u64 {
1985 self.store.manifest_version()
1986 }
1987
1988 #[must_use]
1991 pub fn disk_usage_bytes(&self) -> u64 {
1992 dir_size(self.store.dir())
1993 }
1994
1995 pub fn snapshot(&mut self, dest: &Path) -> Result<SnapshotInfo> {
2007 if dest.exists() {
2008 return Err(Error::Core(quiver_core::CoreError::AlreadyExists(
2009 dest.display().to_string(),
2010 )));
2011 }
2012 self.checkpoint()?;
2015 let (files, bytes) = copy_tree(self.store.dir(), dest)?;
2016 let _ = std::fs::File::open(dest).and_then(|f| f.sync_all());
2020 Ok(SnapshotInfo {
2021 manifest_version: self.store.manifest_version(),
2022 files,
2023 bytes,
2024 })
2025 }
2026
2027 fn handle(&self, name: &str) -> Result<&CollectionHandle> {
2028 self.collections
2029 .get(name)
2030 .ok_or_else(|| Error::CollectionNotFound(name.to_owned()))
2031 }
2032}
2033
2034pub fn restore_snapshot(src: &Path, dest: &Path) -> Result<SnapshotInfo> {
2047 if dest.exists() {
2048 return Err(Error::Core(quiver_core::CoreError::AlreadyExists(
2049 dest.display().to_string(),
2050 )));
2051 }
2052 if !src.join("CURRENT").exists() {
2053 return Err(Error::Core(quiver_core::CoreError::InvalidArgument(
2054 format!("{} is not a snapshot (no CURRENT)", src.display()),
2055 )));
2056 }
2057 let (files, bytes) = copy_tree(src, dest)?;
2058 Ok(SnapshotInfo {
2059 manifest_version: 0,
2063 files,
2064 bytes,
2065 })
2066}
2067
2068fn copy_tree(src: &Path, dst: &Path) -> Result<(u64, u64)> {
2072 std::fs::create_dir_all(dst).map_err(|e| quiver_core::CoreError::io(dst, e))?;
2073 let mut files = 0u64;
2074 let mut bytes = 0u64;
2075 for entry in std::fs::read_dir(src).map_err(|e| quiver_core::CoreError::io(src, e))? {
2076 let entry = entry.map_err(|e| quiver_core::CoreError::io(src, e))?;
2077 let from = entry.path();
2078 let to = dst.join(entry.file_name());
2079 let ft = entry
2080 .file_type()
2081 .map_err(|e| quiver_core::CoreError::io(&from, e))?;
2082 if ft.is_dir() {
2083 let (f, b) = copy_tree(&from, &to)?;
2084 files += f;
2085 bytes += b;
2086 } else {
2087 let n = std::fs::copy(&from, &to).map_err(|e| quiver_core::CoreError::io(&from, e))?;
2088 files += 1;
2089 bytes += n;
2090 }
2091 }
2092 Ok((files, bytes))
2093}
2094
2095fn dir_size(dir: &Path) -> u64 {
2098 let mut total = 0u64;
2099 let Ok(rd) = std::fs::read_dir(dir) else {
2100 return total;
2101 };
2102 for entry in rd.flatten() {
2103 let Ok(ft) = entry.file_type() else { continue };
2104 if ft.is_dir() {
2105 total += dir_size(&entry.path());
2106 } else if let Ok(meta) = entry.metadata() {
2107 total += meta.len();
2108 }
2109 }
2110 total
2111}
2112
2113const DOC_TOKEN_SEP: char = '\u{1f}';
2117
2118const MULTIVECTOR_EXACT_DOC_THRESHOLD: usize = 10_000;
2122
2123const MULTIVECTOR_CANDIDATE_FACTOR: usize = 4;
2126
2127fn token_id(doc_id: &str, ordinal: usize) -> String {
2129 format!("{doc_id}{DOC_TOKEN_SEP}{ordinal}")
2130}
2131
2132fn parse_token_id(ext: &str) -> Option<(&str, u32)> {
2136 let (doc, ordinal) = ext.rsplit_once(DOC_TOKEN_SEP)?;
2137 Some((doc, ordinal.parse().ok()?))
2138}
2139
2140fn require_single_vector(handle: &CollectionHandle) -> Result<()> {
2142 if handle.descriptor.multivector {
2143 Err(Error::Unsupported(
2144 "collection is multi-vector; use upsert_document / search_multi_vector",
2145 ))
2146 } else {
2147 Ok(())
2148 }
2149}
2150
2151fn require_multivector(handle: &CollectionHandle) -> Result<()> {
2153 if handle.descriptor.multivector {
2154 Ok(())
2155 } else {
2156 Err(Error::Unsupported(
2157 "collection is single-vector; use upsert / search",
2158 ))
2159 }
2160}
2161
2162fn require_server_searchable(handle: &CollectionHandle) -> Result<()> {
2166 if handle.descriptor.vector_encryption == VectorEncryption::ClientSide {
2167 Err(Error::Unsupported(
2168 "collection is client-side encrypted; the server cannot rank opaque vectors — \
2169 fetch points and rank client-side",
2170 ))
2171 } else {
2172 Ok(())
2173 }
2174}
2175
2176fn to_index_metric(metric: DistanceMetric) -> Metric {
2177 match metric {
2178 DistanceMetric::Dot => Metric::Dot,
2179 DistanceMetric::Cosine => Metric::Cosine,
2180 DistanceMetric::L2 => Metric::L2,
2181 }
2182}
2183
2184fn validate_index(descriptor: &Descriptor) -> Result<()> {
2186 if descriptor.multivector && descriptor.metric == DistanceMetric::L2 {
2189 return Err(Error::Unsupported(
2190 "multi-vector collections require a similarity metric (cosine or dot)",
2191 ));
2192 }
2193 if descriptor.vector_encryption == VectorEncryption::ClientSide {
2197 if descriptor.multivector {
2198 return Err(Error::Unsupported(
2199 "client-side vector encryption is not supported for multi-vector collections",
2200 ));
2201 }
2202 return Ok(());
2203 }
2204 if descriptor.vector_encryption == VectorEncryption::Dcpe
2207 && descriptor.metric != DistanceMetric::L2
2208 {
2209 return Err(Error::Unsupported(
2210 "dcpe-encrypted collections require the l2 metric",
2211 ));
2212 }
2213 if descriptor.index.kind == IndexKind::Colbert && !descriptor.multivector {
2216 return Err(Error::Unsupported(
2217 "the colbert index is only for multi-vector collections",
2218 ));
2219 }
2220 match descriptor.index.kind {
2221 IndexKind::Vamana | IndexKind::Ivf | IndexKind::DiskVamana
2222 if descriptor.metric == DistanceMetric::Dot =>
2223 {
2224 Err(Error::Unsupported(
2225 "vamana, ivf, and the disk index support l2 and cosine; use hnsw for dot",
2226 ))
2227 }
2228 _ => Ok(()),
2229 }
2230}
2231
2232fn empty_index(descriptor: &Descriptor) -> CollectionIndex {
2234 if descriptor.vector_encryption == VectorEncryption::ClientSide {
2235 return CollectionIndex::None;
2236 }
2237 match descriptor.index.kind {
2238 IndexKind::Vamana => CollectionIndex::Vamana(None),
2239 IndexKind::DiskVamana => CollectionIndex::Disk(None),
2240 IndexKind::Ivf => CollectionIndex::Ivf(None),
2241 IndexKind::Colbert => CollectionIndex::Colbert(None),
2242 _ => CollectionIndex::Hnsw(Hnsw::new(
2243 descriptor.dim as usize,
2244 to_index_metric(descriptor.metric),
2245 HnswConfig::default(),
2246 )),
2247 }
2248}
2249
2250fn default_pq_m(dim: usize) -> usize {
2253 let target = (dim / 8).max(1);
2254 (1..=target)
2255 .rev()
2256 .find(|&m| dim.is_multiple_of(m))
2257 .unwrap_or(1)
2258}
2259
2260const PQ_SEED: u64 = 0x5176_5044_5141_5453;
2263const DISK_INDEX_FILE: &str = "vamana.qvx";
2266
2267fn build_index(
2268 store: &Store,
2269 cid: CollectionId,
2270 descriptor: &Descriptor,
2271 ids: &[u64],
2272 flat: &[f32],
2273) -> Result<CollectionIndex> {
2274 Ok(match build_in_memory_index(descriptor, ids, flat)? {
2275 Some(index) => index,
2276 None => {
2277 let (graph, pq) = build_disk_graph_pq(descriptor, ids, flat)?;
2278 CollectionIndex::Disk(Some(FreshDiskVamana::new(write_disk_index(
2279 store, cid, &graph, &pq,
2280 )?)?))
2281 }
2282 })
2283}
2284
2285fn build_in_memory_index(
2290 descriptor: &Descriptor,
2291 ids: &[u64],
2292 flat: &[f32],
2293) -> Result<Option<CollectionIndex>> {
2294 if descriptor.vector_encryption == VectorEncryption::ClientSide {
2297 return Ok(Some(CollectionIndex::None));
2298 }
2299 let dim = descriptor.dim as usize;
2300 let metric = to_index_metric(descriptor.metric);
2301 Ok(Some(match descriptor.index.kind {
2302 IndexKind::Vamana => CollectionIndex::Vamana(Some(FreshVamana::new(Vamana::build(
2303 ids,
2304 flat,
2305 dim,
2306 metric,
2307 VamanaConfig::default(),
2308 )?)?)),
2309 IndexKind::DiskVamana => return Ok(None),
2311 IndexKind::Ivf => {
2312 let cfg = IvfConfig {
2313 quantization: descriptor.index.pq_subspaces.map(|m| m as usize),
2314 ..IvfConfig::default()
2315 };
2316 CollectionIndex::Ivf(Some(Ivf::build(ids, flat, dim, metric, cfg)?))
2317 }
2318 IndexKind::Colbert => {
2319 let n = ids.len();
2322 let n_centroids = ((n as f64).sqrt().ceil() as usize).clamp(1, 4096);
2323 let cfg = ColbertConfig {
2324 n_centroids,
2325 n_probe: n_centroids.div_ceil(4).clamp(1, n_centroids),
2326 pq_subspaces: descriptor
2327 .index
2328 .pq_subspaces
2329 .map_or_else(|| default_pq_m(dim), |m| m as usize),
2330 seed: PQ_SEED,
2331 };
2332 CollectionIndex::Colbert(Some(ColbertIndex::build(ids, flat, dim, metric, cfg)?))
2333 }
2334 _ => {
2335 let mut h = Hnsw::new(dim, metric, HnswConfig::default());
2336 for (i, &id) in ids.iter().enumerate() {
2337 h.insert(id, &flat[i * dim..(i + 1) * dim])?;
2338 }
2339 CollectionIndex::Hnsw(h)
2340 }
2341 }))
2342}
2343
2344fn build_disk_graph_pq(
2348 descriptor: &Descriptor,
2349 ids: &[u64],
2350 flat: &[f32],
2351) -> Result<(Vamana, ProductQuantizer)> {
2352 let dim = descriptor.dim as usize;
2353 let metric = to_index_metric(descriptor.metric);
2354 let graph = Vamana::build(ids, flat, dim, metric, VamanaConfig::default())?;
2355 let m = descriptor
2356 .index
2357 .pq_subspaces
2358 .map_or_else(|| default_pq_m(dim), |x| x as usize);
2359 let pq = ProductQuantizer::train(flat, ids.len(), dim, m, metric, PQ_SEED)?;
2360 Ok((graph, pq))
2361}
2362
2363fn write_disk_index(
2368 store: &Store,
2369 cid: CollectionId,
2370 graph: &Vamana,
2371 pq: &ProductQuantizer,
2372) -> Result<DiskVamana> {
2373 let dir = store.index_dir(cid);
2374 std::fs::create_dir_all(&dir).map_err(quiver_index::DiskError::Io)?;
2375 let path = dir.join(DISK_INDEX_FILE);
2376 let codec = store.collection_codec_clone(cid)?;
2380 let tmp = dir.join(format!("{DISK_INDEX_FILE}.tmp"));
2386 quiver_index::disk::write(&tmp, graph, pq, codec.as_ref())?;
2387 std::fs::rename(&tmp, &path).map_err(quiver_index::DiskError::Io)?;
2388 let _ = std::fs::File::open(&dir).and_then(|f| f.sync_all());
2389 open_disk_index(store, cid, codec)
2390}
2391
2392fn open_disk_index(
2396 store: &Store,
2397 cid: CollectionId,
2398 codec: Box<dyn PageCodec>,
2399) -> Result<DiskVamana> {
2400 let path = store.index_dir(cid).join(DISK_INDEX_FILE);
2401 Ok(DiskVamana::open(&path, codec)?)
2402}
2403
2404fn load_index(store: &Store, handle: &mut CollectionHandle) -> Result<()> {
2409 if !handle.descriptor.multivector
2412 && handle.descriptor.index.kind == IndexKind::Ivf
2413 && let Ok(Some(blob)) = store.read_index_snapshot(handle.id)
2414 && restore_ivf_snapshot(store, handle, &blob).is_ok()
2415 {
2416 return Ok(());
2417 }
2418 if !handle.descriptor.multivector
2425 && handle.descriptor.index.kind == IndexKind::DiskVamana
2426 && std::env::var_os("QUIVER_DISABLE_DURABLE_DISK_INDEX").is_none()
2427 && let Ok(Some(blob)) = store.read_index_snapshot(handle.id)
2428 && restore_disk_snapshot(store, handle, &blob).is_ok()
2429 {
2430 return Ok(());
2431 }
2432 rebuild_index(store, handle)
2433}
2434
2435fn restore_disk_snapshot(store: &Store, handle: &mut CollectionHandle, blob: &[u8]) -> Result<()> {
2441 let envelope: DiskEnvelope = postcard::from_bytes(blob)?;
2442 if envelope.version != INDEX_ENVELOPE_VERSION {
2443 return Err(Error::Unsupported(
2444 "unsupported disk index snapshot version",
2445 ));
2446 }
2447 let base = open_disk_index(store, handle.id, store.collection_codec_clone(handle.id)?)?;
2448 if base.len() as u64 != envelope.base_row_count {
2451 return Err(Error::Unsupported(
2452 "disk base count disagrees with snapshot",
2453 ));
2454 }
2455 handle.ext_to_int = envelope
2456 .int_to_ext
2457 .iter()
2458 .enumerate()
2459 .map(|(i, ext)| (ext.clone(), i as u64))
2460 .collect();
2461 handle.int_to_ext = envelope.int_to_ext;
2462 let mut fresh = FreshDiskVamana::new(base)?;
2463 for internal in envelope.base_row_count..handle.int_to_ext.len() as u64 {
2467 let ext = &handle.int_to_ext[internal as usize];
2468 if let Some(record) = store.get(handle.id, ext)? {
2469 fresh.insert(internal, &record.vector)?;
2470 }
2471 }
2472 for id in envelope.deleted_ids {
2473 fresh.mark_deleted(id);
2474 }
2475 handle.index = CollectionIndex::Disk(Some(fresh));
2476 handle.stale = false;
2477 replay_recovery_tail(store, handle)
2478}
2479
2480fn replay_recovery_tail(store: &Store, handle: &mut CollectionHandle) -> Result<()> {
2485 let tail = store.recovery_tail(handle.id)?;
2486 for ext in &tail.deleted {
2487 index_delete_point(handle, ext);
2488 }
2489 for (ext, record) in tail.upserts {
2490 index_upsert_point(handle, &ext, &record.vector)?;
2491 }
2492 Ok(())
2493}
2494
2495fn restore_ivf_snapshot(store: &Store, handle: &mut CollectionHandle, blob: &[u8]) -> Result<()> {
2500 let envelope: IndexEnvelope = postcard::from_bytes(blob)?;
2501 if envelope.version != INDEX_ENVELOPE_VERSION {
2502 return Err(Error::Unsupported(
2503 "unsupported index snapshot envelope version",
2504 ));
2505 }
2506 let ivf = Ivf::restore(&envelope.ivf)?;
2507 handle.ext_to_int = envelope
2508 .int_to_ext
2509 .iter()
2510 .enumerate()
2511 .map(|(i, ext)| (ext.clone(), i as u64))
2512 .collect();
2513 handle.int_to_ext = envelope.int_to_ext;
2514 handle.index = CollectionIndex::Ivf(Some(ivf));
2515 handle.stale = false;
2516
2517 let tail = store.recovery_tail(handle.id)?;
2518 for ext in &tail.deleted {
2519 let Some(&internal) = handle.ext_to_int.get(ext) else {
2520 continue;
2521 };
2522 if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2523 ivf.remove(internal);
2524 }
2525 }
2526 for (ext, record) in tail.upserts {
2527 let internal = match handle.ext_to_int.get(&ext) {
2528 Some(&i) => i,
2529 None => {
2530 let i = handle.int_to_ext.len() as u64;
2531 handle.ext_to_int.insert(ext.clone(), i);
2532 handle.int_to_ext.push(ext);
2533 i
2534 }
2535 };
2536 if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2537 ivf.insert(internal, &record.vector)?;
2538 }
2539 }
2540 Ok(())
2541}
2542
2543fn index_upsert_point(handle: &mut CollectionHandle, ext_id: &str, vector: &[f32]) -> Result<()> {
2552 if mvcc_served(handle) {
2556 overlay_upsert(handle, ext_id, vector);
2557 return Ok(());
2558 }
2559 bump_write_gen(handle);
2562 if handle.stale {
2563 return Ok(());
2564 }
2565 let known = handle.ext_to_int.contains_key(ext_id);
2566 let is_hnsw = matches!(handle.index, CollectionIndex::Hnsw(_));
2567 let is_live_ivf = matches!(&handle.index, CollectionIndex::Ivf(Some(ivf)) if !ivf.is_empty());
2568 let is_live_graph = matches!(
2569 handle.index,
2570 CollectionIndex::Vamana(Some(_)) | CollectionIndex::Disk(Some(_))
2571 );
2572 let is_live_colbert = matches!(handle.index, CollectionIndex::Colbert(Some(_)));
2573 if is_hnsw && !known {
2574 let internal = handle.int_to_ext.len() as u64;
2575 if let CollectionIndex::Hnsw(h) = &mut handle.index {
2576 h.insert(internal, vector)?;
2577 }
2578 handle.ext_to_int.insert(ext_id.to_owned(), internal);
2579 handle.int_to_ext.push(ext_id.to_owned());
2580 } else if is_live_ivf {
2581 let internal = if known {
2584 handle.ext_to_int[ext_id]
2585 } else {
2586 let i = handle.int_to_ext.len() as u64;
2587 handle.ext_to_int.insert(ext_id.to_owned(), i);
2588 handle.int_to_ext.push(ext_id.to_owned());
2589 i
2590 };
2591 if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2592 ivf.insert(internal, vector)?;
2593 }
2594 } else if is_live_graph {
2595 let old = handle.ext_to_int.get(ext_id).copied();
2598 let internal = handle.int_to_ext.len() as u64;
2599 let mut pending = 0.0;
2600 match &mut handle.index {
2601 CollectionIndex::Vamana(Some(fresh)) => {
2602 if let Some(o) = old {
2603 fresh.mark_deleted(o);
2604 }
2605 fresh.insert(internal, vector)?;
2606 pending = fresh.pending_fraction();
2607 }
2608 CollectionIndex::Disk(Some(fresh)) => {
2609 if let Some(o) = old {
2610 fresh.mark_deleted(o);
2611 }
2612 fresh.insert(internal, vector)?;
2613 pending = fresh.pending_fraction();
2614 }
2615 _ => {}
2616 }
2617 handle.ext_to_int.insert(ext_id.to_owned(), internal);
2618 handle.int_to_ext.push(ext_id.to_owned());
2619 if pending >= GRAPH_REBUILD_PENDING_FRACTION {
2620 mark_stale(handle);
2621 }
2622 } else if is_live_colbert {
2623 let old = handle.ext_to_int.get(ext_id).copied();
2627 let internal = handle.int_to_ext.len() as u64;
2628 let mut crowded = false;
2629 if let CollectionIndex::Colbert(Some(c)) = &mut handle.index {
2630 if let Some(o) = old {
2631 c.mark_deleted(o);
2632 }
2633 c.insert(internal, vector)?;
2634 crowded = c.deleted_fraction() >= HNSW_REBUILD_DELETED_FRACTION;
2635 }
2636 handle.ext_to_int.insert(ext_id.to_owned(), internal);
2637 handle.int_to_ext.push(ext_id.to_owned());
2638 if crowded {
2639 mark_stale(handle);
2640 }
2641 } else {
2642 mark_stale(handle);
2643 }
2644 Ok(())
2645}
2646
2647fn index_delete_point(handle: &mut CollectionHandle, ext_id: &str) {
2654 if mvcc_served(handle) {
2657 overlay_delete(handle, ext_id);
2658 return;
2659 }
2660 bump_write_gen(handle);
2662 if handle.stale {
2663 return;
2664 }
2665 let internal = handle.ext_to_int.get(ext_id).copied();
2666 let live_ivf = matches!(handle.index, CollectionIndex::Ivf(Some(_)));
2667 let live_hnsw = matches!(handle.index, CollectionIndex::Hnsw(_));
2668 let live_graph = matches!(
2669 handle.index,
2670 CollectionIndex::Vamana(Some(_)) | CollectionIndex::Disk(Some(_))
2671 );
2672 let live_colbert = matches!(handle.index, CollectionIndex::Colbert(Some(_)));
2673 match internal {
2674 Some(internal) if live_ivf => {
2675 if let CollectionIndex::Ivf(Some(ivf)) = &mut handle.index {
2676 ivf.remove(internal);
2677 }
2678 }
2679 Some(internal) if live_hnsw => {
2680 let mut crowded = false;
2681 if let CollectionIndex::Hnsw(h) = &mut handle.index {
2682 h.mark_deleted(internal as u32);
2683 crowded = h.deleted_fraction() >= HNSW_REBUILD_DELETED_FRACTION;
2684 }
2685 if crowded {
2686 mark_stale(handle);
2687 }
2688 }
2689 Some(internal) if live_graph => {
2690 let mut crowded = false;
2691 match &mut handle.index {
2692 CollectionIndex::Vamana(Some(fresh)) => {
2693 fresh.mark_deleted(internal);
2694 crowded = fresh.pending_fraction() >= GRAPH_REBUILD_PENDING_FRACTION;
2695 }
2696 CollectionIndex::Disk(Some(fresh)) => {
2697 fresh.mark_deleted(internal);
2698 crowded = fresh.pending_fraction() >= GRAPH_REBUILD_PENDING_FRACTION;
2699 }
2700 _ => {}
2701 }
2702 if crowded {
2703 mark_stale(handle);
2704 }
2705 }
2706 Some(internal) if live_colbert => {
2707 let mut crowded = false;
2708 if let CollectionIndex::Colbert(Some(c)) = &mut handle.index {
2709 c.mark_deleted(internal);
2710 crowded = c.deleted_fraction() >= HNSW_REBUILD_DELETED_FRACTION;
2711 }
2712 if crowded {
2713 mark_stale(handle);
2714 }
2715 }
2716 _ => mark_stale(handle),
2717 }
2718}
2719
2720struct RebuildScan {
2725 int_to_ext: Vec<String>,
2726 ext_to_int: HashMap<String, u64>,
2727 flat: Vec<f32>,
2728 docs: Option<BTreeMap<String, u32>>,
2729 sparse: Option<SparseInvertedIndex>,
2730}
2731
2732fn scan_collection(store: &Store, handle: &CollectionHandle) -> Result<RebuildScan> {
2737 let multivector = handle.descriptor.multivector;
2738 let mut int_to_ext = Vec::new();
2739 let mut ext_to_int = HashMap::new();
2740 let mut flat: Vec<f32> = Vec::new();
2741 let mut docs: BTreeMap<String, u32> = BTreeMap::new();
2742 let mut sparse = uses_sparse_index(&handle.descriptor).then(SparseInvertedIndex::new);
2745 for (ext_id, record) in store.scan(handle.id)? {
2746 let internal = int_to_ext.len() as u64;
2747 flat.extend_from_slice(&record.vector);
2748 if multivector && let Some((doc, _)) = parse_token_id(&ext_id) {
2749 *docs.entry(doc.to_owned()).or_insert(0) += 1;
2750 }
2751 if let Some(idx) = sparse.as_mut()
2752 && let Some(sv) = sparse_vector_from_payload(&record.payload)
2753 {
2754 idx.upsert(&ext_id, &sv);
2755 }
2756 ext_to_int.insert(ext_id.clone(), internal);
2757 int_to_ext.push(ext_id);
2758 }
2759 Ok(RebuildScan {
2760 int_to_ext,
2761 ext_to_int,
2762 flat,
2763 docs: multivector.then_some(docs),
2764 sparse,
2765 })
2766}
2767
2768fn rebuild_index(store: &Store, handle: &mut CollectionHandle) -> Result<()> {
2772 let scan = scan_collection(store, handle)?;
2773 let ids: Vec<u64> = (0..scan.int_to_ext.len() as u64).collect();
2774 handle.index = empty_index(&handle.descriptor);
2777 handle.index = build_index(store, handle.id, &handle.descriptor, &ids, &scan.flat)?;
2778 handle.int_to_ext = scan.int_to_ext;
2779 handle.ext_to_int = scan.ext_to_int;
2780 handle.docs = scan.docs;
2781 handle.sparse = scan.sparse;
2782 handle.stale = false;
2783 Ok(())
2784}
2785
2786pub struct RebuildInputs {
2792 collection: String,
2793 descriptor: Descriptor,
2794 scan: RebuildScan,
2795 write_gen: u64,
2796}
2797
2798enum RebuiltKind {
2803 Ready(Box<CollectionIndex>),
2804 Disk {
2805 graph: Box<Vamana>,
2806 pq: Box<ProductQuantizer>,
2807 },
2808}
2809
2810pub struct RebuiltIndex {
2813 collection: String,
2814 kind: RebuiltKind,
2815 int_to_ext: Vec<String>,
2816 ext_to_int: HashMap<String, u64>,
2817 docs: Option<BTreeMap<String, u32>>,
2818 sparse: Option<SparseInvertedIndex>,
2819 write_gen: u64,
2820}
2821
2822impl RebuildInputs {
2823 pub fn build(self) -> Result<RebuiltIndex> {
2828 let ids: Vec<u64> = (0..self.scan.int_to_ext.len() as u64).collect();
2829 let kind = match build_in_memory_index(&self.descriptor, &ids, &self.scan.flat)? {
2830 Some(index) => RebuiltKind::Ready(Box::new(index)),
2831 None => {
2832 let (graph, pq) = build_disk_graph_pq(&self.descriptor, &ids, &self.scan.flat)?;
2833 RebuiltKind::Disk {
2834 graph: Box::new(graph),
2835 pq: Box::new(pq),
2836 }
2837 }
2838 };
2839 Ok(RebuiltIndex {
2840 collection: self.collection,
2841 kind,
2842 int_to_ext: self.scan.int_to_ext,
2843 ext_to_int: self.scan.ext_to_int,
2844 docs: self.scan.docs,
2845 sparse: self.scan.sparse,
2846 write_gen: self.write_gen,
2847 })
2848 }
2849}
2850
2851fn sparse_vector_from_payload(payload: &[u8]) -> Option<SparseVector> {
2855 if payload.is_empty() {
2856 return None;
2857 }
2858 let value = serde_json::from_slice::<Value>(payload).ok()?;
2859 sparse_vector_from_value(&value)
2860}
2861
2862fn sparse_vector_from_value(payload: &Value) -> Option<SparseVector> {
2868 if let Some(raw) = payload.get(SPARSE_KEY) {
2869 return serde_json::from_value::<SparseVector>(raw.clone()).ok();
2870 }
2871 let text = payload.get(TEXT_KEY)?.as_str()?;
2872 Some(text_to_sparse(text))
2873}
2874
2875fn sparse_index_upsert_point(handle: &mut CollectionHandle, ext_id: &str, payload: &Value) {
2881 if handle.stale {
2882 return;
2883 }
2884 let Some(idx) = handle.sparse.as_mut() else {
2885 return;
2886 };
2887 match sparse_vector_from_value(payload) {
2888 Some(sv) => idx.upsert(ext_id, &sv),
2889 None => {
2890 idx.remove(ext_id);
2891 }
2892 }
2893}
2894
2895fn sparse_index_delete_point(handle: &mut CollectionHandle, ext_id: &str) {
2898 if let Some(idx) = handle.sparse.as_mut() {
2899 idx.remove(ext_id);
2900 }
2901}
2902
2903fn candidate_ids(
2915 store: &Store,
2916 cid: CollectionId,
2917 filter: &Filter,
2918 filterable: &[FilterableField],
2919) -> Result<Option<BTreeSet<String>>> {
2920 match filter {
2921 Filter::And(subs) => {
2922 let mut acc: Option<BTreeSet<String>> = None;
2925 for sub in subs {
2926 if let Some(set) = candidate_ids(store, cid, sub, filterable)? {
2927 acc = Some(match acc {
2928 Some(existing) => existing.intersection(&set).cloned().collect(),
2929 None => set,
2930 });
2931 }
2932 }
2933 Ok(acc)
2934 }
2935 Filter::Or(subs) => {
2936 let mut acc = BTreeSet::new();
2939 for sub in subs {
2940 match candidate_ids(store, cid, sub, filterable)? {
2941 Some(set) => acc.extend(set),
2942 None => return Ok(None),
2943 }
2944 }
2945 Ok(Some(acc))
2946 }
2947 Filter::Not(_) => Ok(None),
2949 leaf => match leaf_predicate(leaf, filterable) {
2951 Some(pred) => Ok(Some(store.matching_ids(cid, &pred)?.into_iter().collect())),
2952 None => Ok(None),
2953 },
2954 }
2955}
2956
2957fn leaf_predicate(filter: &Filter, filterable: &[FilterableField]) -> Option<SecPredicate> {
2961 let field_type = |field: &str| {
2962 filterable
2963 .iter()
2964 .find(|f| f.path == field)
2965 .map(|f| f.field_type)
2966 };
2967 match filter {
2968 Filter::Eq { field, value } => Some(SecPredicate::Eq {
2969 field: field.clone(),
2970 value: sec_value(field_type(field)?, value)?,
2971 }),
2972 Filter::In { field, values } => {
2973 let ft = field_type(field)?;
2974 let values: Option<Vec<SecValue>> = values.iter().map(|v| sec_value(ft, v)).collect();
2977 Some(SecPredicate::In {
2978 field: field.clone(),
2979 values: values?,
2980 })
2981 }
2982 Filter::Lt { field, value } => {
2983 one_sided_range(field, field_type(field)?, value, false, false)
2984 }
2985 Filter::Lte { field, value } => {
2986 one_sided_range(field, field_type(field)?, value, false, true)
2987 }
2988 Filter::Gt { field, value } => {
2989 one_sided_range(field, field_type(field)?, value, true, false)
2990 }
2991 Filter::Gte { field, value } => {
2992 one_sided_range(field, field_type(field)?, value, true, true)
2993 }
2994 _ => None,
2995 }
2996}
2997
2998fn one_sided_range(
3002 field: &str,
3003 field_type: FieldType,
3004 value: &Value,
3005 is_lower: bool,
3006 inclusive: bool,
3007) -> Option<SecPredicate> {
3008 let v = sec_value(field_type, value)?;
3009 let (lo, hi, lo_inclusive, hi_inclusive) = if is_lower {
3010 (Some(v), None, inclusive, false)
3011 } else {
3012 (None, Some(v), false, inclusive)
3013 };
3014 Some(SecPredicate::Range {
3015 field: field.to_owned(),
3016 lo,
3017 hi,
3018 lo_inclusive,
3019 hi_inclusive,
3020 })
3021}
3022
3023fn sec_value(field_type: FieldType, value: &Value) -> Option<SecValue> {
3027 match (field_type, value) {
3028 (FieldType::Keyword, Value::String(s)) => Some(SecValue::Keyword(s.clone())),
3029 (FieldType::Numeric, Value::Number(n)) => n.as_f64().map(SecValue::Numeric),
3030 _ => None,
3031 }
3032}
3033
3034#[cfg(test)]
3035mod tests {
3036 use super::*;
3037 use serde_json::json;
3038
3039 fn desc() -> Descriptor {
3040 Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
3041 }
3042
3043 fn open(dir: &Path) -> Database {
3044 Database::open(dir).unwrap()
3045 }
3046
3047 #[test]
3048 fn hybrid_search_fuses_dense_and_sparse() {
3049 let tmp = tempfile::tempdir().unwrap();
3050 let mut db = open(tmp.path());
3051 db.create_collection("kb", desc()).unwrap();
3052 db.upsert(
3055 "kb",
3056 "a",
3057 &[1.0, 0.0, 0.0, 0.0],
3058 &json!({ "__quiver_sparse__": { "indices": [100], "values": [0.1] } }),
3059 )
3060 .unwrap();
3061 db.upsert(
3062 "kb",
3063 "b",
3064 &[0.0, 1.0, 0.0, 0.0],
3065 &json!({ "__quiver_sparse__": { "indices": [1, 2], "values": [5.0, 5.0] } }),
3066 )
3067 .unwrap();
3068 db.upsert(
3069 "kb",
3070 "c",
3071 &[0.0, 0.0, 0.0, 1.0],
3072 &json!({ "__quiver_sparse__": { "indices": [9], "values": [1.0] } }),
3073 )
3074 .unwrap();
3075
3076 let dense_q = [1.0, 0.0, 0.0, 0.0];
3077 let sparse_q = SparseVector {
3078 indices: vec![1, 2],
3079 values: vec![1.0, 1.0],
3080 };
3081 let params = SearchParams {
3082 k: 3,
3083 ..SearchParams::default()
3084 };
3085
3086 let hits = db
3088 .hybrid_search(
3089 "kb",
3090 Some(&dense_q),
3091 Some(&sparse_q),
3092 None,
3093 ¶ms,
3094 DEFAULT_RRF_K0,
3095 )
3096 .unwrap();
3097 let ids: Vec<&str> = hits.iter().map(|m| m.id.as_str()).collect();
3098 assert!(ids.contains(&"a") && ids.contains(&"b"), "got {ids:?}");
3099 assert_eq!(ids[2], "c", "c is worst on both sides; got {ids:?}");
3100
3101 let sparse_only = db
3103 .hybrid_search("kb", None, Some(&sparse_q), None, ¶ms, DEFAULT_RRF_K0)
3104 .unwrap();
3105 assert_eq!(sparse_only[0].id, "b");
3106
3107 let dense_only = db
3109 .hybrid_search("kb", Some(&dense_q), None, None, ¶ms, DEFAULT_RRF_K0)
3110 .unwrap();
3111 assert_eq!(dense_only[0].id, "a");
3112
3113 assert!(
3115 db.hybrid_search("kb", None, None, None, ¶ms, DEFAULT_RRF_K0)
3116 .is_err()
3117 );
3118 }
3119
3120 fn sparse_ids(db: &mut Database, q: &SparseVector) -> Vec<String> {
3122 let params = SearchParams {
3123 k: 10,
3124 ..SearchParams::default()
3125 };
3126 db.hybrid_search("kb", None, Some(q), None, ¶ms, DEFAULT_RRF_K0)
3127 .unwrap()
3128 .into_iter()
3129 .map(|m| m.id)
3130 .collect()
3131 }
3132
3133 #[test]
3134 fn sparse_index_equals_the_store_scan_fallback() {
3135 let tmp = tempfile::tempdir().unwrap();
3136 let mut db = open(tmp.path());
3137 db.create_collection("kb", desc()).unwrap();
3138 let z = [0.0f32, 0.0, 0.0, 0.0];
3139 for (id, dims, vals) in [
3140 ("a", vec![1u32, 2], vec![5.0f32, 1.0]),
3141 ("b", vec![2u32, 3], vec![3.0f32, 4.0]),
3142 ("c", vec![1u32, 3], vec![2.0f32, 2.0]),
3143 ("d", vec![9u32], vec![1.0f32]), ] {
3145 db.upsert(
3146 "kb",
3147 id,
3148 &z,
3149 &json!({ "__quiver_sparse__": { "indices": dims, "values": vals } }),
3150 )
3151 .unwrap();
3152 }
3153 let q = SparseVector {
3154 indices: vec![1, 2, 3],
3155 values: vec![1.0, 1.0, 1.0],
3156 };
3157
3158 assert!(db.collections.get("kb").unwrap().sparse.is_some());
3160 let via_index = sparse_ids(&mut db, &q);
3161 assert!(!via_index.contains(&"d".to_owned()), "d shares no term");
3162
3163 db.collections.get_mut("kb").unwrap().sparse = None;
3166 let via_scan = sparse_ids(&mut db, &q);
3167 assert_eq!(via_index, via_scan);
3168 }
3169
3170 #[test]
3171 fn sparse_index_reflects_updates_and_deletes_like_a_rebuild() {
3172 let tmp = tempfile::tempdir().unwrap();
3173 let mut db = open(tmp.path());
3174 db.create_collection("kb", desc()).unwrap();
3175 let z = [0.0f32, 0.0, 0.0, 0.0];
3176 db.upsert(
3177 "kb",
3178 "a",
3179 &z,
3180 &json!({ "__quiver_sparse__": { "indices": [1, 2], "values": [5.0, 5.0] } }),
3181 )
3182 .unwrap();
3183 db.upsert(
3184 "kb",
3185 "b",
3186 &z,
3187 &json!({ "__quiver_sparse__": { "indices": [2], "values": [3.0] } }),
3188 )
3189 .unwrap();
3190 db.upsert(
3191 "kb",
3192 "c",
3193 &z,
3194 &json!({ "__quiver_sparse__": { "indices": [1], "values": [9.0] } }),
3195 )
3196 .unwrap();
3197 db.upsert(
3199 "kb",
3200 "a",
3201 &z,
3202 &json!({ "__quiver_sparse__": { "indices": [7], "values": [1.0] } }),
3203 )
3204 .unwrap();
3205 assert!(db.delete("kb", "b").unwrap());
3206
3207 let q = SparseVector {
3208 indices: vec![1, 2],
3209 values: vec![1.0, 1.0],
3210 };
3211 let incremental = sparse_ids(&mut db, &q);
3213 assert_eq!(incremental, vec!["c".to_owned()]);
3214
3215 db.collections.get_mut("kb").unwrap().stale = true;
3217 let rebuilt = sparse_ids(&mut db, &q);
3218 assert_eq!(incremental, rebuilt);
3219 }
3220
3221 #[test]
3222 fn sparse_index_is_rebuilt_on_reopen() {
3223 let tmp = tempfile::tempdir().unwrap();
3224 {
3225 let mut db = open(tmp.path());
3226 db.create_collection("kb", desc()).unwrap();
3227 db.upsert(
3228 "kb",
3229 "a",
3230 &[0.0, 0.0, 0.0, 0.0],
3231 &json!({ "__quiver_sparse__": { "indices": [1], "values": [1.0] } }),
3232 )
3233 .unwrap();
3234 }
3235 let mut db = open(tmp.path());
3236 assert!(db.collections.get("kb").unwrap().sparse.is_some());
3237 let q = SparseVector {
3238 indices: vec![1],
3239 values: vec![1.0],
3240 };
3241 assert_eq!(sparse_ids(&mut db, &q), vec!["a".to_owned()]);
3242 }
3243
3244 #[test]
3245 fn hybrid_sparse_honours_the_payload_filter() {
3246 let tmp = tempfile::tempdir().unwrap();
3247 let mut db = open(tmp.path());
3248 db.create_collection("kb", desc()).unwrap();
3249 let z = [0.0f32, 0.0, 0.0, 0.0];
3250 db.upsert(
3251 "kb",
3252 "a",
3253 &z,
3254 &json!({ "lang": "en", "__quiver_sparse__": { "indices": [1], "values": [5.0] } }),
3255 )
3256 .unwrap();
3257 db.upsert(
3258 "kb",
3259 "b",
3260 &z,
3261 &json!({ "lang": "fr", "__quiver_sparse__": { "indices": [1], "values": [9.0] } }),
3262 )
3263 .unwrap();
3264 let q = SparseVector {
3265 indices: vec![1],
3266 values: vec![1.0],
3267 };
3268 let params = SearchParams {
3269 k: 10,
3270 filter: Some(Filter::Eq {
3271 field: "lang".to_owned(),
3272 value: json!("en"),
3273 }),
3274 ..SearchParams::default()
3275 };
3276 let hits: Vec<String> = db
3277 .hybrid_search("kb", None, Some(&q), None, ¶ms, DEFAULT_RRF_K0)
3278 .unwrap()
3279 .into_iter()
3280 .map(|m| m.id)
3281 .collect();
3282 assert_eq!(hits, vec!["a".to_owned()]);
3284 }
3285
3286 #[test]
3287 fn hybrid_text_search_indexes_and_ranks_by_bm25() {
3288 let tmp = tempfile::tempdir().unwrap();
3289 let mut db = open(tmp.path());
3290 db.create_collection("kb", desc()).unwrap();
3291 let z = [0.0f32, 0.0, 0.0, 0.0];
3292 db.upsert(
3294 "kb",
3295 "cats",
3296 &z,
3297 &json!({ "__quiver_text__": "the quick brown cat jumps" }),
3298 )
3299 .unwrap();
3300 db.upsert(
3301 "kb",
3302 "dogs",
3303 &z,
3304 &json!({ "__quiver_text__": "a lazy dog sleeps all day" }),
3305 )
3306 .unwrap();
3307
3308 let params = SearchParams {
3309 k: 10,
3310 ..SearchParams::default()
3311 };
3312 let hits: Vec<String> = db
3315 .hybrid_search("kb", None, None, Some("cats"), ¶ms, DEFAULT_RRF_K0)
3316 .unwrap()
3317 .into_iter()
3318 .map(|m| m.id)
3319 .collect();
3320 assert_eq!(hits, vec!["cats".to_owned()], "only the cat doc matches");
3321
3322 assert!(
3324 db.hybrid_search("kb", None, None, Some("elephant"), ¶ms, DEFAULT_RRF_K0)
3325 .unwrap()
3326 .is_empty()
3327 );
3328
3329 let dense_q = [1.0, 0.0, 0.0, 0.0];
3331 db.upsert("kb", "near", &[1.0, 0.0, 0.0, 0.0], &json!({}))
3332 .unwrap();
3333 let fused: Vec<String> = db
3334 .hybrid_search(
3335 "kb",
3336 Some(&dense_q),
3337 None,
3338 Some("dog"),
3339 ¶ms,
3340 DEFAULT_RRF_K0,
3341 )
3342 .unwrap()
3343 .into_iter()
3344 .map(|m| m.id)
3345 .collect();
3346 assert!(
3347 fused.contains(&"near".to_owned()) && fused.contains(&"dogs".to_owned()),
3348 "dense match + lexical match both surface; got {fused:?}"
3349 );
3350 }
3351
3352 #[test]
3353 fn create_upsert_search_get_end_to_end() {
3354 let tmp = tempfile::tempdir().unwrap();
3355 let mut db = open(tmp.path());
3356 db.create_collection("items", desc()).unwrap();
3357 db.upsert(
3358 "items",
3359 "a",
3360 &[0.0, 0.0, 0.0, 0.0],
3361 &json!({"color": "red"}),
3362 )
3363 .unwrap();
3364 db.upsert(
3365 "items",
3366 "b",
3367 &[1.0, 0.0, 0.0, 0.0],
3368 &json!({"color": "blue"}),
3369 )
3370 .unwrap();
3371 db.upsert(
3372 "items",
3373 "c",
3374 &[5.0, 5.0, 5.0, 5.0],
3375 &json!({"color": "red"}),
3376 )
3377 .unwrap();
3378
3379 let near = db
3380 .search("items", &[0.1, 0.0, 0.0, 0.0], &SearchParams::default())
3381 .unwrap();
3382 assert_eq!(near[0].id, "a");
3383 assert_eq!(near[1].id, "b");
3384
3385 let got = db.get("items", "c").unwrap().unwrap();
3386 assert_eq!(got.vector, Some(vec![5.0, 5.0, 5.0, 5.0]));
3387 assert_eq!(got.payload, Some(json!({"color": "red"})));
3388 }
3389
3390 #[test]
3391 fn upsert_batch_produces_same_search_results_as_sequential() {
3392 let tmp_seq = tempfile::tempdir().unwrap();
3393 let tmp_bat = tempfile::tempdir().unwrap();
3394
3395 let vectors: Vec<[f32; 4]> = (0..20u32).map(|i| [i as f32, 0.0, 0.0, 0.0]).collect();
3396 let ids: Vec<String> = (0..20u32).map(|i| format!("p{i}")).collect();
3397 let payload = json!({});
3398
3399 {
3401 let mut db = open(tmp_seq.path());
3402 db.create_collection("c", desc()).unwrap();
3403 for (id, vec) in ids.iter().zip(vectors.iter()) {
3404 db.upsert("c", id, vec, &payload).unwrap();
3405 }
3406 }
3407 {
3409 let mut db = open(tmp_bat.path());
3410 db.create_collection("c", desc()).unwrap();
3411 let pts: Vec<(&str, &[f32], &serde_json::Value)> = ids
3412 .iter()
3413 .zip(vectors.iter())
3414 .map(|(id, v)| (id.as_str(), v.as_slice(), &payload))
3415 .collect();
3416 let n = db.upsert_batch("c", &pts).unwrap();
3417 assert_eq!(n, 20);
3418 }
3419
3420 let query = [10.0f32, 0.0, 0.0, 0.0];
3421 let params = SearchParams {
3422 k: 5,
3423 ..Default::default()
3424 };
3425
3426 let mut seq_db = open(tmp_seq.path());
3427 let mut bat_db = open(tmp_bat.path());
3428 let seq: Vec<String> = seq_db
3429 .search("c", &query, ¶ms)
3430 .unwrap()
3431 .into_iter()
3432 .map(|m| m.id)
3433 .collect();
3434 let bat: Vec<String> = bat_db
3435 .search("c", &query, ¶ms)
3436 .unwrap()
3437 .into_iter()
3438 .map(|m| m.id)
3439 .collect();
3440 assert_eq!(
3441 seq, bat,
3442 "batch and sequential produce different search results"
3443 );
3444 }
3445
3446 #[test]
3447 fn upsert_bulk_defers_the_index_then_searches_correctly() {
3448 let tmp = tempfile::tempdir().unwrap();
3449 let mut db = open(tmp.path());
3450 db.create_collection("c", desc()).unwrap();
3451 let vectors: Vec<[f32; 4]> = (0..20u32).map(|i| [i as f32, 0.0, 0.0, 0.0]).collect();
3452 let ids: Vec<String> = (0..20u32).map(|i| format!("p{i}")).collect();
3453 let plain = json!({});
3456 let sparse_payload = json!({ "__quiver_sparse__": { "indices": [7], "values": [1.0] } });
3457 let pts: Vec<(&str, &[f32], &serde_json::Value)> = ids
3458 .iter()
3459 .zip(vectors.iter())
3460 .map(|(id, v)| {
3461 let payload = if id == "p3" { &sparse_payload } else { &plain };
3462 (id.as_str(), v.as_slice(), payload)
3463 })
3464 .collect();
3465 let n = db.upsert_bulk("c", &pts).unwrap();
3466 assert_eq!(n, 20);
3467
3468 assert!(db.collections.get("c").unwrap().stale);
3470
3471 let query = [10.0f32, 0.0, 0.0, 0.0];
3473 let params = SearchParams {
3474 k: 5,
3475 ..Default::default()
3476 };
3477 let hits: Vec<String> = db
3478 .search("c", &query, ¶ms)
3479 .unwrap()
3480 .into_iter()
3481 .map(|m| m.id)
3482 .collect();
3483 assert_eq!(hits[0], "p10", "nearest to 10 is p10; got {hits:?}");
3484 assert!(!db.collections.get("c").unwrap().stale, "rebuilt on read");
3485
3486 let q = SparseVector {
3488 indices: vec![7],
3489 values: vec![1.0],
3490 };
3491 let sparse_hits: Vec<String> = db
3492 .hybrid_search("c", None, Some(&q), None, ¶ms, DEFAULT_RRF_K0)
3493 .unwrap()
3494 .into_iter()
3495 .map(|m| m.id)
3496 .collect();
3497 assert_eq!(sparse_hits, vec!["p3".to_owned()]);
3498 }
3499
3500 #[test]
3501 fn filtered_search_only_returns_matching_payloads() {
3502 let tmp = tempfile::tempdir().unwrap();
3503 let mut db = open(tmp.path());
3504 db.create_collection("items", desc()).unwrap();
3505 for i in 0..20u32 {
3506 let color = if i % 2 == 0 { "red" } else { "blue" };
3507 db.upsert(
3508 "items",
3509 &format!("p{i}"),
3510 &[i as f32, 0.0, 0.0, 0.0],
3511 &json!({"color": color, "n": i}),
3512 )
3513 .unwrap();
3514 }
3515 let params = SearchParams {
3516 k: 5,
3517 filter: Some(Filter::Eq {
3518 field: "color".into(),
3519 value: json!("red"),
3520 }),
3521 ef_search: 64,
3522 with_payload: true,
3523 with_vector: false,
3524 };
3525 let results = db.search("items", &[0.0; 4], ¶ms).unwrap();
3526 assert!(!results.is_empty());
3527 for m in &results {
3528 assert_eq!(m.payload.as_ref().unwrap()["color"], json!("red"));
3529 }
3530 }
3531
3532 #[test]
3533 fn persists_and_rebuilds_index_on_reopen() {
3534 let tmp = tempfile::tempdir().unwrap();
3535 {
3536 let mut db = open(tmp.path());
3537 db.create_collection("items", desc()).unwrap();
3538 for i in 0..50u32 {
3539 db.upsert(
3540 "items",
3541 &format!("p{i}"),
3542 &[i as f32, 1.0, 2.0, 3.0],
3543 &json!({}),
3544 )
3545 .unwrap();
3546 }
3547 db.checkpoint().unwrap();
3548 }
3549 let mut db = open(tmp.path());
3550 assert_eq!(db.len("items").unwrap(), 50);
3551 let res = db
3552 .search("items", &[7.0, 1.0, 2.0, 3.0], &SearchParams::default())
3553 .unwrap();
3554 assert_eq!(res[0].id, "p7");
3555 }
3556
3557 #[test]
3558 fn update_reflects_new_vector_after_rebuild() {
3559 let tmp = tempfile::tempdir().unwrap();
3560 let mut db = open(tmp.path());
3561 db.create_collection("items", desc()).unwrap();
3562 db.upsert("items", "a", &[0.0; 4], &json!({})).unwrap();
3563 db.upsert("items", "b", &[10.0, 0.0, 0.0, 0.0], &json!({}))
3564 .unwrap();
3565 db.upsert("items", "a", &[100.0, 0.0, 0.0, 0.0], &json!({}))
3567 .unwrap();
3568 let res = db
3569 .search("items", &[0.0; 4], &SearchParams::default())
3570 .unwrap();
3571 assert_eq!(res[0].id, "b");
3572 assert_eq!(
3573 db.get("items", "a").unwrap().unwrap().vector,
3574 Some(vec![100.0, 0.0, 0.0, 0.0])
3575 );
3576 }
3577
3578 #[test]
3579 fn delete_removes_from_search() {
3580 let tmp = tempfile::tempdir().unwrap();
3581 let mut db = open(tmp.path());
3582 db.create_collection("items", desc()).unwrap();
3583 db.upsert("items", "a", &[0.0; 4], &json!({})).unwrap();
3584 db.upsert("items", "b", &[1.0, 0.0, 0.0, 0.0], &json!({}))
3585 .unwrap();
3586 assert!(db.delete("items", "a").unwrap());
3587 let res = db
3588 .search("items", &[0.0; 4], &SearchParams::default())
3589 .unwrap();
3590 assert!(res.iter().all(|m| m.id != "a"));
3591 assert!(db.get("items", "a").unwrap().is_none());
3592 }
3593
3594 #[test]
3595 fn unknown_collection_errors() {
3596 let tmp = tempfile::tempdir().unwrap();
3597 let mut db = open(tmp.path());
3598 assert!(matches!(
3599 db.search("nope", &[0.0; 4], &SearchParams::default()),
3600 Err(Error::CollectionNotFound(_))
3601 ));
3602 db.create_collection("c", desc()).unwrap();
3603 assert!(matches!(
3604 db.create_collection("c", desc()),
3605 Err(Error::Core(quiver_core::CoreError::AlreadyExists(_)))
3606 ));
3607 }
3608
3609 fn desc_with(kind: IndexKind) -> Descriptor {
3610 Descriptor::new(4, Dtype::F32, DistanceMetric::L2).with_index(IndexSpec {
3611 kind,
3612 pq_subspaces: None,
3613 })
3614 }
3615
3616 #[test]
3617 fn vamana_and_ivf_collections_find_the_nearest_point() {
3618 for kind in [IndexKind::Vamana, IndexKind::Ivf] {
3619 let tmp = tempfile::tempdir().unwrap();
3620 let mut db = open(tmp.path());
3621 db.create_collection("c", desc_with(kind)).unwrap();
3622 for i in 0..40u32 {
3623 db.upsert(
3624 "c",
3625 &format!("p{i}"),
3626 &[i as f32, 0.0, 0.0, 0.0],
3627 &json!({}),
3628 )
3629 .unwrap();
3630 }
3631 let res = db
3633 .search("c", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
3634 .unwrap();
3635 assert_eq!(res[0].id, "p7", "{kind:?} nearest");
3636 }
3637 }
3638
3639 #[test]
3640 fn index_kind_persists_and_rebuilds_on_reopen() {
3641 let tmp = tempfile::tempdir().unwrap();
3642 {
3643 let mut db = open(tmp.path());
3644 db.create_collection("v", desc_with(IndexKind::Vamana))
3645 .unwrap();
3646 for i in 0..20u32 {
3647 db.upsert(
3648 "v",
3649 &format!("p{i}"),
3650 &[i as f32, 1.0, 2.0, 3.0],
3651 &json!({}),
3652 )
3653 .unwrap();
3654 }
3655 db.checkpoint().unwrap();
3656 }
3657 let mut db = open(tmp.path());
3658 assert_eq!(db.descriptor("v").unwrap().index.kind, IndexKind::Vamana);
3659 let res = db
3660 .search("v", &[7.0, 1.0, 2.0, 3.0], &SearchParams::default())
3661 .unwrap();
3662 assert_eq!(res[0].id, "p7");
3663 }
3664
3665 #[test]
3670 fn disk_index_loads_from_snapshot_without_rebuild_on_reopen() {
3671 let tmp = tempfile::tempdir().unwrap();
3672 let cid;
3673 {
3674 let mut db = open(tmp.path());
3675 db.create_collection("d", desc_with(IndexKind::DiskVamana))
3676 .unwrap();
3677 for i in 0..100u32 {
3678 db.upsert(
3679 "d",
3680 &format!("p{i}"),
3681 &[i as f32, 0.0, 0.0, 0.0],
3682 &json!({}),
3683 )
3684 .unwrap();
3685 }
3686 db.search("d", &[1.0, 0.0, 0.0, 0.0], &SearchParams::default())
3688 .unwrap();
3689 db.checkpoint().unwrap();
3690 for i in 100..115u32 {
3693 db.upsert(
3694 "d",
3695 &format!("p{i}"),
3696 &[i as f32, 0.0, 0.0, 0.0],
3697 &json!({}),
3698 )
3699 .unwrap();
3700 }
3701 cid = db.collections["d"].id;
3702 let base = open_disk_index(
3703 &db.store,
3704 cid,
3705 db.store.collection_codec_clone(cid).unwrap(),
3706 )
3707 .unwrap();
3708 assert_eq!(base.len(), 100, "base sealed at the checkpoint count");
3709 }
3710
3711 let mut db = open(tmp.path());
3712 assert_eq!(
3714 db.search("d", &[50.0, 0.0, 0.0, 0.0], &SearchParams::default())
3715 .unwrap()[0]
3716 .id,
3717 "p50",
3718 );
3719 assert_eq!(
3720 db.search("d", &[110.0, 0.0, 0.0, 0.0], &SearchParams::default())
3721 .unwrap()[0]
3722 .id,
3723 "p110",
3724 "post-checkpoint insert survived reopen via WAL-tail replay",
3725 );
3726 let base = open_disk_index(
3729 &db.store,
3730 cid,
3731 db.store.collection_codec_clone(cid).unwrap(),
3732 )
3733 .unwrap();
3734 assert_eq!(
3735 base.len(),
3736 100,
3737 "reopen loaded the base; it was not rebuilt"
3738 );
3739 }
3740
3741 #[test]
3744 fn disk_index_falls_back_to_rebuild_when_base_is_missing() {
3745 let tmp = tempfile::tempdir().unwrap();
3746 let base_path;
3747 {
3748 let mut db = open(tmp.path());
3749 db.create_collection("d", desc_with(IndexKind::DiskVamana))
3750 .unwrap();
3751 for i in 0..60u32 {
3752 db.upsert(
3753 "d",
3754 &format!("p{i}"),
3755 &[i as f32, 0.0, 0.0, 0.0],
3756 &json!({}),
3757 )
3758 .unwrap();
3759 }
3760 db.search("d", &[1.0, 0.0, 0.0, 0.0], &SearchParams::default())
3761 .unwrap();
3762 db.checkpoint().unwrap();
3763 let cid = db.collections["d"].id;
3764 base_path = db.store.index_dir(cid).join(DISK_INDEX_FILE);
3765 }
3766 std::fs::remove_file(&base_path).unwrap();
3768 {
3769 let mut db = open(tmp.path());
3770 assert_eq!(
3771 db.search("d", &[25.0, 0.0, 0.0, 0.0], &SearchParams::default())
3772 .unwrap()[0]
3773 .id,
3774 "p25",
3775 "rebuild fallback still answers correctly after a lost base",
3776 );
3777 assert!(
3780 base_path.exists(),
3781 "the fallback rebuild re-sealed the base file"
3782 );
3783 db.checkpoint().unwrap();
3784 }
3785 let len = std::fs::metadata(&base_path).unwrap().len();
3788 std::fs::OpenOptions::new()
3789 .write(true)
3790 .open(&base_path)
3791 .unwrap()
3792 .set_len(len / 2)
3793 .unwrap();
3794
3795 let mut db = open(tmp.path());
3796 assert_eq!(
3797 db.search("d", &[25.0, 0.0, 0.0, 0.0], &SearchParams::default())
3798 .unwrap()[0]
3799 .id,
3800 "p25",
3801 "rebuild fallback still answers correctly after a torn base",
3802 );
3803 }
3804
3805 #[test]
3806 fn ivf_upserts_and_deletes_incrementally_without_rebuild() {
3807 let tmp = tempfile::tempdir().unwrap();
3808 let mut db = open(tmp.path());
3809 db.create_collection("c", desc_with(IndexKind::Ivf))
3810 .unwrap();
3811 for i in 0..50u32 {
3812 db.upsert(
3813 "c",
3814 &format!("p{i}"),
3815 &[i as f32, 0.0, 0.0, 0.0],
3816 &json!({}),
3817 )
3818 .unwrap();
3819 }
3820 let _ = db
3822 .search("c", &[1.0, 0.0, 0.0, 0.0], &SearchParams::default())
3823 .unwrap();
3824 assert!(!db.collections["c"].stale, "the search built the index");
3825
3826 db.upsert("c", "far", &[500.0, 0.0, 0.0, 0.0], &json!({}))
3828 .unwrap();
3829 assert!(!db.collections["c"].stale, "ivf insert stayed incremental");
3830 let res = db
3831 .search("c", &[500.0, 0.0, 0.0, 0.0], &SearchParams::default())
3832 .unwrap();
3833 assert_eq!(res[0].id, "far");
3834
3835 assert!(db.delete("c", "far").unwrap());
3837 assert!(!db.collections["c"].stale, "ivf delete stayed incremental");
3838 let res = db
3839 .search("c", &[500.0, 0.0, 0.0, 0.0], &SearchParams::default())
3840 .unwrap();
3841 assert!(res.iter().all(|m| m.id != "far"), "deleted point is gone");
3842 }
3843
3844 #[test]
3845 fn ivf_incremental_update_replaces_the_vector() {
3846 let tmp = tempfile::tempdir().unwrap();
3847 let mut db = open(tmp.path());
3848 db.create_collection("c", desc_with(IndexKind::Ivf))
3849 .unwrap();
3850 for i in 0..30u32 {
3851 db.upsert(
3852 "c",
3853 &format!("p{i}"),
3854 &[i as f32, 0.0, 0.0, 0.0],
3855 &json!({}),
3856 )
3857 .unwrap();
3858 }
3859 let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
3860 db.upsert("c", "p5", &[900.0, 0.0, 0.0, 0.0], &json!({}))
3862 .unwrap();
3863 assert!(!db.collections["c"].stale);
3864 let at_new = db
3865 .search("c", &[900.0, 0.0, 0.0, 0.0], &SearchParams::default())
3866 .unwrap();
3867 assert_eq!(at_new[0].id, "p5", "p5 found at its new location");
3868 let at_old = db
3869 .search("c", &[5.0, 0.0, 0.0, 0.0], &SearchParams::default())
3870 .unwrap();
3871 assert!(at_old.iter().all(|m| m.id != "p5"), "stale vector is gone");
3872 }
3873
3874 #[test]
3875 fn ivf_reinsert_after_incremental_delete_is_found() {
3876 let tmp = tempfile::tempdir().unwrap();
3877 let mut db = open(tmp.path());
3878 db.create_collection("c", desc_with(IndexKind::Ivf))
3879 .unwrap();
3880 for i in 0..20u32 {
3881 db.upsert(
3882 "c",
3883 &format!("p{i}"),
3884 &[i as f32, 0.0, 0.0, 0.0],
3885 &json!({}),
3886 )
3887 .unwrap();
3888 }
3889 let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
3890 assert!(db.delete("c", "p3").unwrap());
3891 assert!(!db.collections["c"].stale);
3892 db.upsert("c", "p3", &[3.0, 0.0, 0.0, 0.0], &json!({}))
3894 .unwrap();
3895 assert!(!db.collections["c"].stale);
3896 let res = db
3897 .search("c", &[3.0, 0.0, 0.0, 0.0], &SearchParams::default())
3898 .unwrap();
3899 assert_eq!(res[0].id, "p3");
3900 }
3901
3902 #[test]
3903 fn hnsw_in_place_update_falls_back_to_rebuild() {
3904 let tmp = tempfile::tempdir().unwrap();
3906 let mut db = open(tmp.path());
3907 db.create_collection("c", desc()).unwrap();
3908 for i in 0..10u32 {
3909 db.upsert(
3910 "c",
3911 &format!("p{i}"),
3912 &[i as f32, 0.0, 0.0, 0.0],
3913 &json!({}),
3914 )
3915 .unwrap();
3916 }
3917 let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
3918 assert!(!db.collections["c"].stale);
3919 db.upsert("c", "p2", &[42.0, 0.0, 0.0, 0.0], &json!({}))
3920 .unwrap();
3921 assert!(db.collections["c"].stale, "hnsw update schedules a rebuild");
3922 let res = db
3924 .search("c", &[42.0, 0.0, 0.0, 0.0], &SearchParams::default())
3925 .unwrap();
3926 assert_eq!(res[0].id, "p2");
3927 }
3928
3929 #[test]
3930 fn unsupported_index_configurations_are_rejected() {
3931 let tmp = tempfile::tempdir().unwrap();
3932 let mut db = open(tmp.path());
3933 let dot_vamana =
3935 Descriptor::new(4, Dtype::F32, DistanceMetric::Dot).with_index(IndexSpec {
3936 kind: IndexKind::Vamana,
3937 pq_subspaces: None,
3938 });
3939 assert!(matches!(
3940 db.create_collection("a", dot_vamana),
3941 Err(Error::Unsupported(_))
3942 ));
3943 let dot_disk = Descriptor::new(4, Dtype::F32, DistanceMetric::Dot).with_index(IndexSpec {
3945 kind: IndexKind::DiskVamana,
3946 pq_subspaces: None,
3947 });
3948 assert!(matches!(
3949 db.create_collection("b", dot_disk),
3950 Err(Error::Unsupported(_))
3951 ));
3952 }
3953
3954 #[test]
3955 fn dcpe_collections_require_the_l2_metric() {
3956 let tmp = tempfile::tempdir().unwrap();
3957 let mut db = open(tmp.path());
3958 for metric in [DistanceMetric::Cosine, DistanceMetric::Dot] {
3960 let bad = Descriptor::new(4, Dtype::F32, metric)
3961 .with_vector_encryption(VectorEncryption::Dcpe);
3962 assert!(matches!(
3963 db.create_collection("bad", bad),
3964 Err(Error::Unsupported(_))
3965 ));
3966 }
3967 let good = Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
3969 .with_vector_encryption(VectorEncryption::Dcpe);
3970 db.create_collection("enc", good)
3971 .expect("l2 dcpe collection");
3972 assert_eq!(
3973 db.descriptor("enc").expect("descriptor").vector_encryption,
3974 VectorEncryption::Dcpe
3975 );
3976 }
3977
3978 #[test]
3979 fn client_side_collections_are_fetch_only_and_reject_search() {
3980 let tmp = tempfile::tempdir().unwrap();
3981 let mut db = open(tmp.path());
3982 let desc = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
3985 .with_vector_encryption(VectorEncryption::ClientSide);
3986 db.create_collection("vault", desc)
3987 .expect("create client-side collection");
3988 assert!(matches!(
3990 db.collections["vault"].index,
3991 CollectionIndex::None
3992 ));
3993
3994 for i in 0..5 {
3997 let tier = if i < 2 { "vip" } else { "std" };
3998 db.upsert(
3999 "vault",
4000 &format!("p{i}"),
4001 &[0.0; 4],
4002 &serde_json::json!({ "__quiver_vec__": format!("ct-{i}"), "tier": tier }),
4003 )
4004 .expect("upsert");
4005 }
4006 assert_eq!(db.len("vault").unwrap(), 5);
4007 assert!(matches!(
4009 db.collections["vault"].index,
4010 CollectionIndex::None
4011 ));
4012
4013 assert!(matches!(
4015 db.search("vault", &[0.0; 4], &SearchParams::default()),
4016 Err(Error::Unsupported(_))
4017 ));
4018
4019 let all = db.fetch("vault", None, 100, true, false).unwrap();
4022 assert_eq!(all.len(), 5);
4023 assert!(
4024 all.iter()
4025 .all(|m| m.payload.is_some() && m.vector.is_none())
4026 );
4027
4028 let vip = db
4030 .fetch(
4031 "vault",
4032 Some(&Filter::Eq {
4033 field: "tier".to_owned(),
4034 value: serde_json::json!("vip"),
4035 }),
4036 100,
4037 false,
4038 false,
4039 )
4040 .unwrap();
4041 assert_eq!(vip.len(), 2);
4042 assert_eq!(db.fetch("vault", None, 2, false, false).unwrap().len(), 2);
4044
4045 assert_eq!(db.get("vault", "p0").unwrap().unwrap().id, "p0");
4048 assert!(db.delete("vault", "p0").unwrap());
4049 assert_eq!(db.len("vault").unwrap(), 4);
4050 }
4051
4052 #[test]
4053 fn client_side_encryption_rejects_multivector() {
4054 let tmp = tempfile::tempdir().unwrap();
4055 let mut db = open(tmp.path());
4056 let desc = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
4057 .with_multivector(true)
4058 .with_vector_encryption(VectorEncryption::ClientSide);
4059 assert!(matches!(
4060 db.create_collection("bad", desc),
4061 Err(Error::Unsupported(_))
4062 ));
4063 }
4064
4065 fn contains_file(dir: &Path, name: &str) -> bool {
4067 std::fs::read_dir(dir).is_ok_and(|rd| {
4068 rd.flatten().any(|e| {
4069 let p = e.path();
4070 if p.is_dir() {
4071 contains_file(&p, name)
4072 } else {
4073 p.file_name().is_some_and(|f| f == name)
4074 }
4075 })
4076 })
4077 }
4078
4079 #[test]
4080 fn disk_index_collection_searches_persists_and_writes_an_artifact() {
4081 let tmp = tempfile::tempdir().unwrap();
4082 {
4083 let mut db = open(tmp.path());
4084 db.create_collection("d", desc_with(IndexKind::DiskVamana))
4085 .unwrap();
4086 for i in 0..40u32 {
4087 db.upsert(
4088 "d",
4089 &format!("p{i}"),
4090 &[i as f32, 0.0, 0.0, 0.0],
4091 &json!({}),
4092 )
4093 .unwrap();
4094 }
4095 let res = db
4096 .search("d", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
4097 .unwrap();
4098 assert_eq!(res[0].id, "p7");
4099 db.checkpoint().unwrap();
4100 }
4101 assert!(
4103 contains_file(tmp.path(), "vamana.qvx"),
4104 "disk index file missing"
4105 );
4106 let mut db = open(tmp.path());
4108 assert_eq!(
4109 db.descriptor("d").unwrap().index.kind,
4110 IndexKind::DiskVamana
4111 );
4112 let res = db
4113 .search("d", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
4114 .unwrap();
4115 assert_eq!(res[0].id, "p7");
4116 }
4117
4118 #[test]
4119 fn graph_collections_maintain_writes_incrementally() {
4120 for kind in [IndexKind::Vamana, IndexKind::DiskVamana] {
4124 let tmp = tempfile::tempdir().unwrap();
4125 let mut db = open(tmp.path());
4126 db.create_collection("c", desc_with(kind)).unwrap();
4127 for i in 0..40u32 {
4128 db.upsert(
4129 "c",
4130 &format!("p{i}"),
4131 &[i as f32, 0.0, 0.0, 0.0],
4132 &json!({}),
4133 )
4134 .unwrap();
4135 }
4136 let res = db
4138 .search("c", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
4139 .unwrap();
4140 assert_eq!(res[0].id, "p7", "{kind:?} base nearest");
4141
4142 db.upsert("c", "p7b", &[7.4, 0.0, 0.0, 0.0], &json!({}))
4145 .unwrap();
4146 let res = db
4147 .search("c", &[7.45, 0.0, 0.0, 0.0], &SearchParams::default())
4148 .unwrap();
4149 assert_eq!(res[0].id, "p7b", "{kind:?} delta insert not found");
4150
4151 assert!(db.delete("c", "p7").unwrap());
4153 let res = db
4154 .search("c", &[7.0, 0.0, 0.0, 0.0], &SearchParams::default())
4155 .unwrap();
4156 assert!(
4157 res.iter().all(|m| m.id != "p7"),
4158 "{kind:?} deleted id returned"
4159 );
4160
4161 db.upsert("c", "p20", &[500.0, 0.0, 0.0, 0.0], &json!({}))
4164 .unwrap();
4165 let res = db
4166 .search("c", &[500.0, 0.0, 0.0, 0.0], &SearchParams::default())
4167 .unwrap();
4168 assert_eq!(res[0].id, "p20", "{kind:?} updated vector not at new spot");
4169 let res = db
4170 .search("c", &[20.0, 0.0, 0.0, 0.0], &SearchParams::default())
4171 .unwrap();
4172 assert_ne!(
4173 res[0].id, "p20",
4174 "{kind:?} stale copy still nearest old spot"
4175 );
4176 }
4177 }
4178
4179 #[test]
4180 fn graph_consolidates_under_heavy_churn() {
4181 let tmp = tempfile::tempdir().unwrap();
4185 let mut db = open(tmp.path());
4186 db.create_collection("c", desc_with(IndexKind::Vamana))
4187 .unwrap();
4188 for i in 0..50u32 {
4189 db.upsert(
4190 "c",
4191 &format!("p{i}"),
4192 &[i as f32, 0.0, 0.0, 0.0],
4193 &json!({}),
4194 )
4195 .unwrap();
4196 }
4197 let _ = db.search("c", &[0.0; 4], &SearchParams::default()).unwrap();
4198
4199 let deleted: Vec<String> = (0..15u32).map(|i| format!("p{i}")).collect();
4200 for i in 0..15u32 {
4201 assert!(db.delete("c", &format!("p{i}")).unwrap());
4202 db.upsert(
4203 "c",
4204 &format!("q{i}"),
4205 &[1000.0 + i as f32, 0.0, 0.0, 0.0],
4206 &json!({}),
4207 )
4208 .unwrap();
4209 }
4210
4211 let near_origin = db
4212 .search("c", &[5.0, 0.0, 0.0, 0.0], &SearchParams::default())
4213 .unwrap();
4214 assert!(
4215 near_origin.iter().all(|m| !deleted.contains(&m.id)),
4216 "a churned-out id was returned"
4217 );
4218 let near_q = db
4219 .search("c", &[1007.0, 0.0, 0.0, 0.0], &SearchParams::default())
4220 .unwrap();
4221 assert_eq!(near_q[0].id, "q7", "new point not found after churn");
4222
4223 db.checkpoint().unwrap();
4224 drop(db);
4225 let mut db = open(tmp.path());
4226 let near_q = db
4227 .search("c", &[1007.0, 0.0, 0.0, 0.0], &SearchParams::default())
4228 .unwrap();
4229 assert_eq!(near_q[0].id, "q7", "new point lost across reopen");
4230 let near_origin = db
4231 .search("c", &[5.0, 0.0, 0.0, 0.0], &SearchParams::default())
4232 .unwrap();
4233 assert!(
4234 near_origin.iter().all(|m| !deleted.contains(&m.id)),
4235 "a churned-out id resurfaced after reopen"
4236 );
4237 }
4238
4239 #[test]
4240 fn multivector_writes_are_incremental_and_match_a_rebuild() {
4241 let dir = |theta: f32| vec![theta.cos(), theta.sin(), 0.0, 0.0];
4250 let doc = |theta: f32| vec![dir(theta), dir(theta)];
4251 for kind in [
4252 IndexKind::Ivf,
4253 IndexKind::Hnsw,
4254 IndexKind::Vamana,
4255 IndexKind::Colbert,
4256 ] {
4257 let tmp = tempfile::tempdir().unwrap();
4258 let desc = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
4259 .with_multivector(true)
4260 .with_index(IndexSpec {
4261 kind,
4262 pq_subspaces: None,
4263 });
4264 let mut db = open(tmp.path());
4265 db.create_collection("m", desc).unwrap();
4266 for i in 1..=10u32 {
4268 db.upsert_document(
4269 "m",
4270 &format!("d{i}"),
4271 &doc(0.1 * i as f32),
4272 &json!({ "i": i }),
4273 )
4274 .unwrap();
4275 }
4276 let q = vec![dir(0.0)];
4277 let top = |db: &mut Database| {
4278 db.search_multi_vector(
4279 "m",
4280 &q,
4281 &SearchParams {
4282 k: 3,
4283 ..Default::default()
4284 },
4285 )
4286 .unwrap()
4287 .into_iter()
4288 .map(|m| m.id)
4289 .collect::<Vec<_>>()
4290 };
4291 assert_eq!(top(&mut db), vec!["d1", "d2", "d3"], "{kind:?} initial");
4292
4293 assert!(db.delete_document("m", "d1").unwrap());
4295 assert_eq!(
4296 top(&mut db),
4297 vec!["d2", "d3", "d4"],
4298 "{kind:?} after delete"
4299 );
4300
4301 db.upsert_document("m", "d10", &doc(0.0), &json!({ "i": 10 }))
4303 .unwrap();
4304 assert_eq!(top(&mut db)[0], "d10", "{kind:?} after update");
4305
4306 db.upsert_document("m", "d11", &doc(0.05), &json!({ "i": 11 }))
4308 .unwrap();
4309 let r = top(&mut db);
4310 assert_eq!(r[0], "d10", "{kind:?}");
4311 assert_eq!(r[1], "d11", "{kind:?} new doc not ranked");
4312
4313 db.upsert_document("m", "d8", &[dir(0.8)], &json!({ "i": 8 }))
4315 .unwrap();
4316 let d8 = db.get_document("m", "d8", true).unwrap().unwrap();
4317 assert_eq!(d8.vectors.unwrap().len(), 1, "{kind:?} trailing token kept");
4318
4319 let before = top(&mut db);
4321 drop(db);
4322 let mut db = open(tmp.path());
4323 assert_eq!(top(&mut db), before, "{kind:?} incremental != rebuild");
4324 assert!(
4325 db.get_document("m", "d1", false).unwrap().is_none(),
4326 "{kind:?} deleted doc resurfaced"
4327 );
4328 }
4329 }
4330
4331 #[test]
4332 fn colbert_index_requires_multivector() {
4333 let tmp = tempfile::tempdir().unwrap();
4334 let mut db = open(tmp.path());
4335 let single = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine).with_index(IndexSpec {
4338 kind: IndexKind::Colbert,
4339 pq_subspaces: None,
4340 });
4341 assert!(matches!(
4342 db.create_collection("c", single),
4343 Err(Error::Unsupported(_))
4344 ));
4345 let multi = Descriptor::new(4, Dtype::F32, DistanceMetric::Cosine)
4347 .with_multivector(true)
4348 .with_index(IndexSpec {
4349 kind: IndexKind::Colbert,
4350 pq_subspaces: None,
4351 });
4352 assert!(db.create_collection("m", multi).is_ok());
4353 }
4354
4355 fn desc_filterable() -> Descriptor {
4360 Descriptor::new(4, Dtype::F32, DistanceMetric::L2).with_filterable(vec![
4361 FilterableField::keyword("city"),
4362 FilterableField::numeric("n"),
4363 ])
4364 }
4365
4366 fn seed_cities(db: &mut Database) {
4371 const CITIES: [&str; 3] = ["paris", "lyon", "rome"];
4372 db.create_collection("c", desc_filterable()).unwrap();
4373 for i in 0..30u32 {
4374 db.upsert(
4375 "c",
4376 &format!("p{i}"),
4377 &[i as f32, 0.0, 0.0, 0.0],
4378 &json!({"city": CITIES[i as usize % 3], "n": i}),
4379 )
4380 .unwrap();
4381 }
4382 db.checkpoint().unwrap();
4383 }
4384
4385 #[test]
4386 fn hybrid_equality_prefilter_is_exact() {
4387 let tmp = tempfile::tempdir().unwrap();
4388 let mut db = open(tmp.path());
4389 seed_cities(&mut db);
4390 let params = SearchParams {
4391 k: 5,
4392 filter: Some(Filter::Eq {
4393 field: "city".into(),
4394 value: json!("lyon"),
4395 }),
4396 ..SearchParams::default()
4397 };
4398 let res = db.search("c", &[0.0; 4], ¶ms).unwrap();
4399 assert!(!res.is_empty());
4400 assert_eq!(res[0].id, "p1");
4402 for m in &res {
4403 assert_eq!(m.payload.as_ref().unwrap()["city"], json!("lyon"));
4404 }
4405 }
4406
4407 #[test]
4408 fn hybrid_numeric_range_prefilter_is_exact() {
4409 let tmp = tempfile::tempdir().unwrap();
4410 let mut db = open(tmp.path());
4411 seed_cities(&mut db);
4412 let params = SearchParams {
4413 k: 4,
4414 filter: Some(Filter::Gte {
4415 field: "n".into(),
4416 value: json!(10),
4417 }),
4418 ..SearchParams::default()
4419 };
4420 let res = db.search("c", &[0.0; 4], ¶ms).unwrap();
4421 assert_eq!(res[0].id, "p10");
4423 for m in &res {
4424 assert!(m.payload.as_ref().unwrap()["n"].as_u64().unwrap() >= 10);
4425 }
4426 }
4427
4428 #[test]
4429 fn hybrid_unsatisfiable_filter_returns_empty() {
4430 let tmp = tempfile::tempdir().unwrap();
4431 let mut db = open(tmp.path());
4432 seed_cities(&mut db);
4433 let params = SearchParams {
4436 filter: Some(Filter::Eq {
4437 field: "city".into(),
4438 value: json!("atlantis"),
4439 }),
4440 ..SearchParams::default()
4441 };
4442 assert!(db.search("c", &[0.0; 4], ¶ms).unwrap().is_empty());
4443 }
4444
4445 #[test]
4446 fn hybrid_and_or_composition_is_exact() {
4447 let tmp = tempfile::tempdir().unwrap();
4448 let mut db = open(tmp.path());
4449 seed_cities(&mut db);
4450 let params = SearchParams {
4453 k: 10,
4454 filter: Some(Filter::And(vec![
4455 Filter::In {
4456 field: "city".into(),
4457 values: vec![json!("paris"), json!("rome")],
4458 },
4459 Filter::Lt {
4460 field: "n".into(),
4461 value: json!(12),
4462 },
4463 ])),
4464 ..SearchParams::default()
4465 };
4466 let res = db.search("c", &[0.0; 4], ¶ms).unwrap();
4467 assert_eq!(res[0].id, "p0");
4469 for m in &res {
4470 let payload = m.payload.as_ref().unwrap();
4471 let city = payload["city"].as_str().unwrap();
4472 assert!(city == "paris" || city == "rome");
4473 assert!(payload["n"].as_u64().unwrap() < 12);
4474 }
4475 }
4476
4477 #[test]
4478 fn hybrid_rechecks_non_indexable_clause() {
4479 let tmp = tempfile::tempdir().unwrap();
4480 let mut db = open(tmp.path());
4481 seed_cities(&mut db);
4482 let params = SearchParams {
4485 k: 10,
4486 filter: Some(Filter::And(vec![
4487 Filter::Eq {
4488 field: "city".into(),
4489 value: json!("paris"),
4490 },
4491 Filter::Not(Box::new(Filter::Eq {
4492 field: "n".into(),
4493 value: json!(0),
4494 })),
4495 ])),
4496 ..SearchParams::default()
4497 };
4498 let res = db.search("c", &[0.0; 4], ¶ms).unwrap();
4499 assert!(res.iter().all(|m| m.id != "p0"));
4500 assert_eq!(res[0].id, "p3");
4502 for m in &res {
4503 assert_eq!(m.payload.as_ref().unwrap()["city"], json!("paris"));
4504 }
4505 }
4506
4507 #[test]
4508 fn post_filter_fallback_on_undeclared_field_is_correct() {
4509 let tmp = tempfile::tempdir().unwrap();
4510 let mut db = open(tmp.path());
4511 db.create_collection(
4514 "c",
4515 Descriptor::new(4, Dtype::F32, DistanceMetric::L2)
4516 .with_filterable(vec![FilterableField::keyword("city")]),
4517 )
4518 .unwrap();
4519 for i in 0..20u32 {
4520 let tier = if i % 2 == 0 { "gold" } else { "silver" };
4521 db.upsert(
4522 "c",
4523 &format!("p{i}"),
4524 &[i as f32, 0.0, 0.0, 0.0],
4525 &json!({"city": "paris", "tier": tier}),
4526 )
4527 .unwrap();
4528 }
4529 let params = SearchParams {
4530 k: 5,
4531 filter: Some(Filter::Eq {
4532 field: "tier".into(),
4533 value: json!("gold"),
4534 }),
4535 ..SearchParams::default()
4536 };
4537 let res = db.search("c", &[0.0; 4], ¶ms).unwrap();
4538 assert!(!res.is_empty());
4539 for m in &res {
4540 assert_eq!(m.payload.as_ref().unwrap()["tier"], json!("gold"));
4541 }
4542 }
4543
4544 #[test]
4545 fn leaf_predicate_maps_only_indexable_filterable_leaves() {
4546 let fields = vec![
4547 FilterableField::keyword("city"),
4548 FilterableField::numeric("n"),
4549 ];
4550 assert_eq!(
4552 leaf_predicate(
4553 &Filter::Eq {
4554 field: "city".into(),
4555 value: json!("paris")
4556 },
4557 &fields
4558 ),
4559 Some(SecPredicate::Eq {
4560 field: "city".into(),
4561 value: SecValue::Keyword("paris".into())
4562 })
4563 );
4564 assert_eq!(
4566 leaf_predicate(
4567 &Filter::Gte {
4568 field: "n".into(),
4569 value: json!(3)
4570 },
4571 &fields
4572 ),
4573 Some(SecPredicate::Range {
4574 field: "n".into(),
4575 lo: Some(SecValue::Numeric(3.0)),
4576 hi: None,
4577 lo_inclusive: true,
4578 hi_inclusive: false,
4579 })
4580 );
4581 let undeclared = Filter::Eq {
4583 field: "tier".into(),
4584 value: json!("gold"),
4585 };
4586 let mismatch = Filter::Eq {
4587 field: "city".into(),
4588 value: json!(5),
4589 };
4590 let ne = Filter::Ne {
4591 field: "city".into(),
4592 value: json!("x"),
4593 };
4594 let exists = Filter::Exists {
4595 field: "city".into(),
4596 };
4597 assert!(leaf_predicate(&undeclared, &fields).is_none());
4598 assert!(leaf_predicate(&mismatch, &fields).is_none());
4599 assert!(leaf_predicate(&ne, &fields).is_none());
4600 assert!(leaf_predicate(&exists, &fields).is_none());
4601 }
4602
4603 fn ivf_index_dir(root: &Path) -> std::path::PathBuf {
4607 root.join("collections").join("0000000000").join("index")
4608 }
4609
4610 fn idx_snapshot_files(root: &Path) -> Vec<String> {
4611 let mut v: Vec<String> = std::fs::read_dir(ivf_index_dir(root))
4612 .map(|rd| {
4613 rd.filter_map(std::result::Result::ok)
4614 .filter_map(|e| e.file_name().to_str().map(str::to_owned))
4615 .filter(|n| n.starts_with("idx-"))
4616 .collect()
4617 })
4618 .unwrap_or_default();
4619 v.sort();
4620 v
4621 }
4622
4623 fn nearest(db: &mut Database, q: &[f32]) -> Vec<String> {
4624 db.search("c", q, &SearchParams::default())
4625 .unwrap()
4626 .into_iter()
4627 .map(|m| m.id)
4628 .collect()
4629 }
4630
4631 fn seed_ivf(db: &mut Database, n: u32) {
4632 db.create_collection("c", desc_with(IndexKind::Ivf))
4633 .unwrap();
4634 for i in 0..n {
4635 db.upsert(
4636 "c",
4637 &format!("p{i}"),
4638 &[i as f32, 0.0, 0.0, 0.0],
4639 &json!({}),
4640 )
4641 .unwrap();
4642 }
4643 let _ = nearest(db, &[1.0, 0.0, 0.0, 0.0]);
4645 }
4646
4647 #[test]
4648 fn ivf_snapshot_is_written_at_checkpoint() {
4649 let tmp = tempfile::tempdir().unwrap();
4650 let mut db = open(tmp.path());
4651 seed_ivf(&mut db, 40);
4652 db.checkpoint().unwrap();
4653 assert_eq!(idx_snapshot_files(tmp.path()).len(), 1);
4654 }
4655
4656 #[test]
4657 fn ivf_loads_from_snapshot_rather_than_rebuilding() {
4658 let tmp = tempfile::tempdir().unwrap();
4659 {
4660 let mut db = open(tmp.path());
4661 db.create_collection("c", desc_with(IndexKind::Ivf))
4662 .unwrap();
4663 db.upsert("c", "a", &[0.0, 0.0, 0.0, 0.0], &json!({}))
4664 .unwrap();
4665 db.upsert("c", "m", &[1.0, 0.0, 0.0, 0.0], &json!({}))
4666 .unwrap();
4667 let _ = nearest(&mut db, &[0.0, 0.0, 0.0, 0.0]);
4669 db.upsert("c", "z", &[2.0, 0.0, 0.0, 0.0], &json!({}))
4671 .unwrap();
4672 db.upsert("c", "b", &[3.0, 0.0, 0.0, 0.0], &json!({}))
4673 .unwrap();
4674 db.checkpoint().unwrap();
4675 assert_eq!(db.collections["c"].int_to_ext, ["a", "m", "z", "b"]);
4676 }
4677 let db = open(tmp.path());
4678 assert_eq!(
4681 db.collections["c"].int_to_ext,
4682 ["a", "m", "z", "b"],
4683 "index was rebuilt, not loaded from the snapshot"
4684 );
4685 }
4686
4687 #[test]
4688 fn ivf_recovery_replays_post_checkpoint_upserts() {
4689 let tmp = tempfile::tempdir().unwrap();
4690 {
4691 let mut db = open(tmp.path());
4692 seed_ivf(&mut db, 30);
4693 db.checkpoint().unwrap();
4694 db.upsert("c", "far", &[500.0, 0.0, 0.0, 0.0], &json!({}))
4696 .unwrap();
4697 }
4698 let mut db = open(tmp.path());
4699 assert_eq!(nearest(&mut db, &[500.0, 0.0, 0.0, 0.0])[0], "far");
4700 assert_eq!(nearest(&mut db, &[1.0, 0.0, 0.0, 0.0])[0], "p1");
4701 }
4702
4703 #[test]
4704 fn ivf_recovery_replays_post_checkpoint_deletes() {
4705 let tmp = tempfile::tempdir().unwrap();
4706 {
4707 let mut db = open(tmp.path());
4708 seed_ivf(&mut db, 30);
4709 db.checkpoint().unwrap();
4710 assert!(db.delete("c", "p7").unwrap());
4711 }
4712 let mut db = open(tmp.path());
4713 assert!(
4714 nearest(&mut db, &[7.0, 0.0, 0.0, 0.0])
4715 .iter()
4716 .all(|id| id != "p7")
4717 );
4718 assert!(db.get("c", "p7").unwrap().is_none());
4719 assert!(db.get("c", "p6").unwrap().is_some());
4720 }
4721
4722 #[test]
4723 fn ivf_recovery_replays_post_checkpoint_updates() {
4724 let tmp = tempfile::tempdir().unwrap();
4725 {
4726 let mut db = open(tmp.path());
4727 seed_ivf(&mut db, 30);
4728 db.checkpoint().unwrap();
4729 db.upsert("c", "p0", &[999.0, 0.0, 0.0, 0.0], &json!({}))
4731 .unwrap();
4732 }
4733 let mut db = open(tmp.path());
4734 assert_eq!(nearest(&mut db, &[999.0, 0.0, 0.0, 0.0])[0], "p0");
4735 assert_ne!(
4736 nearest(&mut db, &[0.0, 0.0, 0.0, 0.0])[0],
4737 "p0",
4738 "the stale p0 vector survived the update"
4739 );
4740 }
4741
4742 #[test]
4743 fn corrupt_ivf_snapshot_falls_back_to_rebuild() {
4744 let tmp = tempfile::tempdir().unwrap();
4745 {
4746 let mut db = open(tmp.path());
4747 seed_ivf(&mut db, 30);
4748 db.checkpoint().unwrap();
4749 }
4750 let files = idx_snapshot_files(tmp.path());
4752 assert_eq!(files.len(), 1);
4753 std::fs::write(ivf_index_dir(tmp.path()).join(&files[0]), b"corrupt").unwrap();
4754
4755 let mut db = open(tmp.path());
4756 assert_eq!(nearest(&mut db, &[7.0, 0.0, 0.0, 0.0])[0], "p7");
4757 }
4758
4759 fn mv_desc() -> Descriptor {
4762 Descriptor::new(3, Dtype::F32, DistanceMetric::Cosine).with_multivector(true)
4763 }
4764
4765 fn bf_rank(query: &[Vec<f32>], corpus: &[(&str, Vec<Vec<f32>>)]) -> Vec<(String, f32)> {
4768 let mut v: Vec<(String, f32)> = corpus
4769 .iter()
4770 .map(|(id, toks)| ((*id).to_owned(), max_sim(Metric::Cosine, query, toks)))
4771 .collect();
4772 v.sort_by(|a, b| b.1.total_cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
4773 v
4774 }
4775
4776 #[test]
4777 fn multivector_search_ranks_documents_by_maxsim() {
4778 let tmp = tempfile::tempdir().unwrap();
4779 let mut db = open(tmp.path());
4780 db.create_collection("docs", mv_desc()).unwrap();
4781 let corpus: Vec<(&str, Vec<Vec<f32>>)> = vec![
4782 ("d_cat", vec![vec![1.0, 0.0, 0.0], vec![0.0, 1.0, 0.0]]),
4783 ("d_dog", vec![vec![0.0, 1.0, 0.0], vec![0.0, 0.0, 1.0]]),
4784 (
4785 "d_mix",
4786 vec![
4787 vec![1.0, 1.0, 0.0],
4788 vec![0.0, 0.0, 1.0],
4789 vec![1.0, 0.0, 1.0],
4790 ],
4791 ),
4792 ];
4793 for (id, toks) in &corpus {
4794 db.upsert_document("docs", id, toks, &json!({ "id": id }))
4795 .unwrap();
4796 }
4797 assert_eq!(db.document_count("docs").unwrap(), 3);
4798
4799 let query = vec![vec![1.0, 0.0, 0.0], vec![0.0, 0.0, 1.0]];
4800 let params = SearchParams {
4801 k: 3,
4802 with_payload: false,
4803 ..SearchParams::default()
4804 };
4805 let got = db.search_multi_vector("docs", &query, ¶ms).unwrap();
4806 let expected = bf_rank(&query, &corpus);
4807
4808 assert_eq!(got.len(), 3);
4809 for (g, (eid, escore)) in got.iter().zip(expected.iter()) {
4810 assert_eq!(&g.id, eid, "ranking matches brute force");
4811 assert!(
4812 (g.score - escore).abs() < 1e-5,
4813 "{} score {} vs {escore}",
4814 g.id,
4815 g.score
4816 );
4817 }
4818 }
4819
4820 #[test]
4821 fn multivector_search_truncates_to_k() {
4822 let tmp = tempfile::tempdir().unwrap();
4823 let mut db = open(tmp.path());
4824 db.create_collection("docs", mv_desc()).unwrap();
4825 for i in 0..5 {
4826 let v = vec![vec![1.0, i as f32, 0.0]];
4827 db.upsert_document("docs", &format!("d{i}"), &v, &json!({}))
4828 .unwrap();
4829 }
4830 let params = SearchParams {
4831 k: 2,
4832 ..SearchParams::default()
4833 };
4834 let got = db
4835 .search_multi_vector("docs", &[vec![1.0, 0.0, 0.0]], ¶ms)
4836 .unwrap();
4837 assert_eq!(got.len(), 2);
4838 }
4839
4840 #[test]
4841 fn multivector_filter_selects_documents_exactly() {
4842 let tmp = tempfile::tempdir().unwrap();
4843 let mut db = open(tmp.path());
4844 db.create_collection("docs", mv_desc()).unwrap();
4845 db.upsert_document("docs", "a", &[vec![1.0, 0.0, 0.0]], &json!({"lang":"en"}))
4847 .unwrap();
4848 db.upsert_document("docs", "b", &[vec![1.0, 0.0, 0.0]], &json!({"lang":"fr"}))
4849 .unwrap();
4850 let params = SearchParams {
4851 k: 10,
4852 filter: Some(Filter::Eq {
4853 field: "lang".into(),
4854 value: json!("fr"),
4855 }),
4856 ..SearchParams::default()
4857 };
4858 let got = db
4859 .search_multi_vector("docs", &[vec![1.0, 0.0, 0.0]], ¶ms)
4860 .unwrap();
4861 assert_eq!(got.len(), 1);
4862 assert_eq!(got[0].id, "b");
4863 assert_eq!(got[0].payload, Some(json!({"lang":"fr"})));
4864 }
4865
4866 #[test]
4867 fn multivector_reopen_rebuilds_grouping_and_ranking() {
4868 let tmp = tempfile::tempdir().unwrap();
4869 let query = vec![vec![1.0, 0.0, 0.0], vec![0.0, 0.0, 1.0]];
4870 let corpus: Vec<(&str, Vec<Vec<f32>>)> = vec![
4871 ("x", vec![vec![1.0, 0.0, 0.0], vec![0.0, 1.0, 0.0]]),
4872 ("y", vec![vec![0.0, 0.0, 1.0], vec![1.0, 0.0, 1.0]]),
4873 ];
4874 {
4875 let mut db = open(tmp.path());
4876 db.create_collection("docs", mv_desc()).unwrap();
4877 for (id, toks) in &corpus {
4878 db.upsert_document("docs", id, toks, &json!({})).unwrap();
4879 }
4880 db.checkpoint().unwrap();
4881 }
4882 let mut db = open(tmp.path());
4884 assert_eq!(db.document_count("docs").unwrap(), 2);
4885 let params = SearchParams {
4886 k: 2,
4887 ..SearchParams::default()
4888 };
4889 let got = db.search_multi_vector("docs", &query, ¶ms).unwrap();
4890 let expected = bf_rank(&query, &corpus);
4891 assert_eq!(
4892 got.iter().map(|m| m.id.clone()).collect::<Vec<_>>(),
4893 expected
4894 .iter()
4895 .map(|(id, _)| id.clone())
4896 .collect::<Vec<_>>()
4897 );
4898 }
4899
4900 #[test]
4901 fn multivector_delete_document_removes_all_tokens() {
4902 let tmp = tempfile::tempdir().unwrap();
4903 let mut db = open(tmp.path());
4904 db.create_collection("docs", mv_desc()).unwrap();
4905 db.upsert_document(
4906 "docs",
4907 "a",
4908 &[vec![1.0, 0.0, 0.0], vec![0.0, 1.0, 0.0]],
4909 &json!({}),
4910 )
4911 .unwrap();
4912 db.upsert_document("docs", "b", &[vec![0.0, 0.0, 1.0]], &json!({}))
4913 .unwrap();
4914 assert_eq!(db.document_count("docs").unwrap(), 2);
4915 assert_eq!(db.len("docs").unwrap(), 3);
4916
4917 assert!(db.delete_document("docs", "a").unwrap());
4918 assert_eq!(db.document_count("docs").unwrap(), 1);
4919 assert_eq!(db.len("docs").unwrap(), 1);
4920 assert!(db.get_document("docs", "a", false).unwrap().is_none());
4921 let params = SearchParams {
4922 k: 10,
4923 ..SearchParams::default()
4924 };
4925 let got = db
4926 .search_multi_vector("docs", &[vec![1.0, 0.0, 0.0]], ¶ms)
4927 .unwrap();
4928 assert!(got.iter().all(|m| m.id != "a"));
4929 assert!(!db.delete_document("docs", "a").unwrap());
4930 }
4931
4932 #[test]
4933 fn multivector_reupsert_replaces_tokens() {
4934 let tmp = tempfile::tempdir().unwrap();
4935 let mut db = open(tmp.path());
4936 db.create_collection("docs", mv_desc()).unwrap();
4937 db.upsert_document(
4938 "docs",
4939 "a",
4940 &[
4941 vec![1.0, 0.0, 0.0],
4942 vec![0.0, 1.0, 0.0],
4943 vec![0.0, 0.0, 1.0],
4944 ],
4945 &json!({"v":1}),
4946 )
4947 .unwrap();
4948 assert_eq!(db.len("docs").unwrap(), 3);
4949 db.upsert_document("docs", "a", &[vec![0.0, 0.0, 1.0]], &json!({"v":2}))
4951 .unwrap();
4952 assert_eq!(db.document_count("docs").unwrap(), 1);
4953 assert_eq!(db.len("docs").unwrap(), 1);
4954 let doc = db.get_document("docs", "a", true).unwrap().unwrap();
4955 assert_eq!(doc.payload, Some(json!({"v":2})));
4956 assert_eq!(doc.vectors, Some(vec![vec![0.0, 0.0, 1.0]]));
4957 }
4958
4959 #[test]
4960 fn single_and_multi_vector_apis_are_mutually_exclusive() {
4961 let tmp = tempfile::tempdir().unwrap();
4962 let mut db = open(tmp.path());
4963 db.create_collection("mv", mv_desc()).unwrap();
4964 db.create_collection("sv", Descriptor::new(3, Dtype::F32, DistanceMetric::Cosine))
4965 .unwrap();
4966 assert!(matches!(
4968 db.upsert("mv", "a", &[1.0, 0.0, 0.0], &json!({})),
4969 Err(Error::Unsupported(_))
4970 ));
4971 assert!(matches!(
4972 db.search("mv", &[1.0, 0.0, 0.0], &SearchParams::default()),
4973 Err(Error::Unsupported(_))
4974 ));
4975 assert!(matches!(
4977 db.upsert_document("sv", "a", &[vec![1.0, 0.0, 0.0]], &json!({})),
4978 Err(Error::Unsupported(_))
4979 ));
4980 assert!(matches!(
4981 db.search_multi_vector("sv", &[vec![1.0, 0.0, 0.0]], &SearchParams::default()),
4982 Err(Error::Unsupported(_))
4983 ));
4984 assert!(matches!(
4985 db.document_count("sv"),
4986 Err(Error::Unsupported(_))
4987 ));
4988 }
4989
4990 #[test]
4991 fn multivector_rejects_l2_metric_and_bad_documents() {
4992 let tmp = tempfile::tempdir().unwrap();
4993 let mut db = open(tmp.path());
4994 let l2 = Descriptor::new(3, Dtype::F32, DistanceMetric::L2).with_multivector(true);
4995 assert!(matches!(
4996 db.create_collection("bad", l2),
4997 Err(Error::Unsupported(_))
4998 ));
4999
5000 db.create_collection("docs", mv_desc()).unwrap();
5001 assert!(matches!(
5003 db.upsert_document("docs", "a\u{1f}b", &[vec![1.0, 0.0, 0.0]], &json!({})),
5004 Err(Error::Unsupported(_))
5005 ));
5006 assert!(matches!(
5008 db.upsert_document("docs", "a", &[], &json!({})),
5009 Err(Error::Unsupported(_))
5010 ));
5011 assert!(matches!(
5012 db.upsert_document("docs", "a", &[vec![1.0, 0.0]], &json!({})),
5013 Err(Error::Unsupported(_))
5014 ));
5015 }
5016
5017 #[test]
5018 fn snapshot_then_open_reproduces_the_database() {
5019 let src = tempfile::tempdir().unwrap();
5020 let mut db = open(src.path());
5021 db.create_collection("kb", desc()).unwrap();
5022 db.create_collection("kb2", desc()).unwrap();
5023 db.upsert("kb", "a", &[1.0, 0.0, 0.0, 0.0], &json!({ "n": 1 }))
5024 .unwrap();
5025 db.upsert("kb", "b", &[0.0, 1.0, 0.0, 0.0], &json!({ "n": 2 }))
5026 .unwrap();
5027 db.upsert("kb2", "z", &[0.0, 0.0, 1.0, 0.0], &json!({ "n": 3 }))
5028 .unwrap();
5029
5030 let dest = tempfile::tempdir().unwrap();
5031 let snap_dir = dest.path().join("snap");
5032 let info = db.snapshot(&snap_dir).unwrap();
5033 assert!(info.files > 0 && info.bytes > 0);
5034 assert_eq!(info.manifest_version, db.manifest_version());
5035
5036 db.upsert("kb", "late", &[1.0, 1.0, 0.0, 0.0], &json!({ "n": 9 }))
5038 .unwrap();
5039
5040 let restored = open(&snap_dir);
5041 let mut names = restored.collection_names();
5042 names.sort();
5043 assert_eq!(names, vec!["kb".to_owned(), "kb2".to_owned()]);
5044 assert_eq!(restored.len("kb").unwrap(), 2, "no post-snapshot write");
5045 assert_eq!(
5046 restored.get("kb", "a").unwrap().unwrap().payload,
5047 Some(json!({ "n": 1 }))
5048 );
5049 assert_eq!(restored.len("kb2").unwrap(), 1);
5050 assert!(restored.get("kb", "late").unwrap().is_none());
5051 }
5052
5053 #[test]
5054 fn snapshot_refuses_an_existing_destination() {
5055 let src = tempfile::tempdir().unwrap();
5056 let mut db = open(src.path());
5057 let dest = tempfile::tempdir().unwrap(); assert!(matches!(
5059 db.snapshot(dest.path()),
5060 Err(Error::Core(quiver_core::CoreError::AlreadyExists(_)))
5061 ));
5062 }
5063
5064 #[test]
5065 fn restore_snapshot_roundtrips_and_guards() {
5066 let src = tempfile::tempdir().unwrap();
5067 let mut db = open(src.path());
5068 db.create_collection("kb", desc()).unwrap();
5069 db.upsert("kb", "a", &[1.0, 0.0, 0.0, 0.0], &json!({ "n": 1 }))
5070 .unwrap();
5071 let work = tempfile::tempdir().unwrap();
5072 let snap_dir = work.path().join("snap");
5073 db.snapshot(&snap_dir).unwrap();
5074
5075 let restored_dir = work.path().join("restored");
5077 let info = restore_snapshot(&snap_dir, &restored_dir).unwrap();
5078 assert!(info.files > 0);
5079 let restored = open(&restored_dir);
5080 assert_eq!(restored.len("kb").unwrap(), 1);
5081
5082 assert!(matches!(
5084 restore_snapshot(&snap_dir, &restored_dir),
5085 Err(Error::Core(quiver_core::CoreError::AlreadyExists(_)))
5086 ));
5087 let not_snap = work.path().join("not-a-snapshot");
5089 std::fs::create_dir_all(¬_snap).unwrap();
5090 assert!(matches!(
5091 restore_snapshot(¬_snap, &work.path().join("out")),
5092 Err(Error::Core(quiver_core::CoreError::InvalidArgument(_)))
5093 ));
5094 }
5095}