1use std::sync::Arc;
68
69use manifoldb_core::{CollectionId, EntityId};
70use manifoldb_storage::StorageEngine;
71
72use crate::distance::DistanceMetric;
73use crate::error::VectorError;
74use crate::store::CollectionVectorStore;
75use crate::types::{CollectionName, Embedding, VectorData};
76
77use super::config::HnswConfig;
78use super::manager::HnswIndexManager;
79use super::traits::{SearchResult, VectorIndex};
80
81pub struct VectorIndexCoordinator<E: StorageEngine> {
94 vector_store: CollectionVectorStore<E>,
96 index_manager: Arc<HnswIndexManager<E>>,
98}
99
100impl<E: StorageEngine> VectorIndexCoordinator<E> {
101 #[must_use]
107 pub fn new(engine: E) -> Self {
108 Self {
109 vector_store: CollectionVectorStore::new(engine),
110 index_manager: Arc::new(HnswIndexManager::new()),
111 }
112 }
113
114 #[must_use]
118 pub fn with_manager(engine: E, index_manager: Arc<HnswIndexManager<E>>) -> Self {
119 Self { vector_store: CollectionVectorStore::new(engine), index_manager }
120 }
121
122 #[must_use]
124 pub fn vector_store(&self) -> &CollectionVectorStore<E> {
125 &self.vector_store
126 }
127
128 #[must_use]
130 pub fn index_manager(&self) -> &Arc<HnswIndexManager<E>> {
131 &self.index_manager
132 }
133
134 pub fn upsert_vector(
156 &self,
157 collection_id: CollectionId,
158 entity_id: EntityId,
159 collection_name: &str,
160 vector_name: &str,
161 data: &VectorData,
162 ) -> Result<(), VectorError> {
163 self.vector_store.put_vector(collection_id, entity_id, vector_name, data)?;
165
166 if let Some(dense) = data.as_dense() {
168 if let Ok(Some(index)) = self.index_manager.get_index(collection_name, vector_name) {
170 let embedding = Embedding::new(dense.to_vec())?;
171 let mut guard = index.write().map_err(|_| VectorError::LockPoisoned)?;
172 guard.insert(entity_id, &embedding)?;
173 }
174 }
175
176 Ok(())
177 }
178
179 pub fn upsert_vectors_batch(
190 &self,
191 collection_id: CollectionId,
192 collection_name: &str,
193 vectors: &[(EntityId, &str, &VectorData)],
194 ) -> Result<(), VectorError> {
195 if vectors.is_empty() {
196 return Ok(());
197 }
198
199 self.vector_store.put_vectors_batch(collection_id, vectors)?;
201
202 use std::collections::HashMap;
204 let mut by_name: HashMap<&str, Vec<(EntityId, &VectorData)>> = HashMap::new();
205
206 for (entity_id, name, data) in vectors {
207 by_name.entry(*name).or_default().push((*entity_id, *data));
208 }
209
210 for (vector_name, entity_vectors) in by_name {
211 if let Ok(Some(index)) = self.index_manager.get_index(collection_name, vector_name) {
212 let embeddings: Vec<(EntityId, Embedding)> = entity_vectors
214 .into_iter()
215 .filter_map(|(id, data)| {
216 data.as_dense().map(|d| Embedding::new(d.to_vec()).map(|e| (id, e)))
217 })
218 .filter_map(Result::ok)
219 .collect();
220
221 if !embeddings.is_empty() {
222 let refs: Vec<(EntityId, &Embedding)> =
223 embeddings.iter().map(|(id, e)| (*id, e)).collect();
224
225 let mut guard = index.write().map_err(|_| VectorError::LockPoisoned)?;
226 guard.insert_batch(&refs)?;
227 }
228 }
229 }
230
231 Ok(())
232 }
233
234 pub fn delete_vector(
240 &self,
241 collection_id: CollectionId,
242 entity_id: EntityId,
243 collection_name: &str,
244 vector_name: &str,
245 ) -> Result<bool, VectorError> {
246 let deleted = self.vector_store.delete_vector(collection_id, entity_id, vector_name)?;
248
249 if let Ok(Some(index)) = self.index_manager.get_index(collection_name, vector_name) {
251 let mut guard = index.write().map_err(|_| VectorError::LockPoisoned)?;
252 let _ = guard.delete(entity_id);
253 }
254
255 Ok(deleted)
256 }
257
258 pub fn delete_entity_vectors(
266 &self,
267 collection_id: CollectionId,
268 entity_id: EntityId,
269 collection_name: &str,
270 ) -> Result<usize, VectorError> {
271 let vectors = self.vector_store.get_all_vectors(collection_id, entity_id)?;
273
274 let count = self.vector_store.delete_all_vectors(collection_id, entity_id)?;
276
277 for vector_name in vectors.keys() {
279 if let Ok(Some(index)) = self.index_manager.get_index(collection_name, vector_name) {
280 let mut guard = index.write().map_err(|_| VectorError::LockPoisoned)?;
281 let _ = guard.delete(entity_id);
282 }
283 }
284
285 Ok(count)
286 }
287
288 pub fn get_vector(
294 &self,
295 collection_id: CollectionId,
296 entity_id: EntityId,
297 vector_name: &str,
298 ) -> Result<Option<VectorData>, VectorError> {
299 self.vector_store.get_vector(collection_id, entity_id, vector_name)
300 }
301
302 pub fn get_all_vectors(
304 &self,
305 collection_id: CollectionId,
306 entity_id: EntityId,
307 ) -> Result<std::collections::HashMap<String, VectorData>, VectorError> {
308 self.vector_store.get_all_vectors(collection_id, entity_id)
309 }
310
311 pub fn search(
329 &self,
330 collection_name: &str,
331 vector_name: &str,
332 query: &Embedding,
333 k: usize,
334 ef_search: Option<usize>,
335 ) -> Result<Vec<SearchResult>, VectorError> {
336 let index =
337 self.index_manager.get_index(collection_name, vector_name)?.ok_or_else(|| {
338 VectorError::SpaceNotFound(format!(
339 "no HNSW index for {}.{}",
340 collection_name, vector_name
341 ))
342 })?;
343
344 let guard = index.read().map_err(|_| VectorError::LockPoisoned)?;
345 guard.search(query, k, ef_search)
346 }
347
348 pub fn search_with_filter<F>(
352 &self,
353 collection_name: &str,
354 vector_name: &str,
355 query: &Embedding,
356 k: usize,
357 predicate: F,
358 ef_search: Option<usize>,
359 ) -> Result<Vec<SearchResult>, VectorError>
360 where
361 F: Fn(EntityId) -> bool,
362 {
363 let index =
364 self.index_manager.get_index(collection_name, vector_name)?.ok_or_else(|| {
365 VectorError::SpaceNotFound(format!(
366 "no HNSW index for {}.{}",
367 collection_name, vector_name
368 ))
369 })?;
370
371 let guard = index.read().map_err(|_| VectorError::LockPoisoned)?;
372 guard.search_with_filter(query, k, predicate, ef_search, None)
373 }
374
375 pub fn rebuild_index_from_store(
396 &self,
397 collection_id: CollectionId,
398 collection_name: &str,
399 vector_name: &str,
400 ) -> Result<usize, VectorError> {
401 let entity_ids = self.vector_store.list_entities_with_vector(collection_id, vector_name)?;
403
404 let mut vectors = Vec::with_capacity(entity_ids.len());
406 for entity_id in entity_ids {
407 if let Some(data) =
408 self.vector_store.get_vector(collection_id, entity_id, vector_name)?
409 {
410 if let Some(dense) = data.as_dense() {
411 vectors.push((entity_id, dense.to_vec()));
412 }
413 }
414 }
415
416 let points =
418 vectors.into_iter().map(|(id, v)| (manifoldb_core::PointId::new(id.as_u64()), v));
419
420 self.index_manager.rebuild_index(collection_name, vector_name, points)
421 }
422
423 pub fn is_index_loaded(&self, collection: &str, vector_name: &str) -> bool {
429 self.index_manager.is_index_loaded(collection, vector_name).unwrap_or(false)
430 }
431}
432
433impl<E: StorageEngine> VectorIndexCoordinator<E> {
435 pub fn create_index(
453 &self,
454 engine: E,
455 collection: &str,
456 vector_name: &str,
457 dimension: usize,
458 distance_metric: DistanceMetric,
459 config: &HnswConfig,
460 ) -> Result<String, VectorError> {
461 let collection_name = CollectionName::new(collection)?;
462 self.index_manager.create_index_for_vector(
463 engine,
464 &collection_name,
465 vector_name,
466 dimension,
467 distance_metric,
468 config,
469 )
470 }
471
472 pub fn drop_index(&self, engine: &E, index_name: &str) -> Result<bool, VectorError> {
474 self.index_manager.drop_index(engine, index_name)
475 }
476
477 pub fn drop_collection_indexes(
479 &self,
480 engine: &E,
481 collection: &str,
482 ) -> Result<Vec<String>, VectorError> {
483 let collection_name = CollectionName::new(collection)?;
484 self.index_manager.drop_indexes_for_collection(engine, &collection_name)
485 }
486
487 pub fn has_index(&self, engine: &E, collection: &str, vector_name: &str) -> bool {
489 self.index_manager.has_index(engine, collection, vector_name).unwrap_or(false)
490 }
491
492 pub fn load_index(&self, engine: E, index_name: &str) -> Result<(), VectorError> {
494 self.index_manager.load_index(engine, index_name)
495 }
496
497 pub fn rebuild_index_from_scratch(
512 &self,
513 engine: E,
514 collection_id: CollectionId,
515 collection_name: &str,
516 vector_name: &str,
517 ) -> Result<usize, VectorError> {
518 let entity_ids = self.vector_store.list_entities_with_vector(collection_id, vector_name)?;
520
521 let mut vectors = Vec::with_capacity(entity_ids.len());
523 for entity_id in entity_ids {
524 if let Some(data) =
525 self.vector_store.get_vector(collection_id, entity_id, vector_name)?
526 {
527 if let Some(dense) = data.as_dense() {
528 vectors.push((entity_id, dense.to_vec()));
529 }
530 }
531 }
532
533 let points =
535 vectors.into_iter().map(|(id, v)| (manifoldb_core::PointId::new(id.as_u64()), v));
536
537 self.index_manager.rebuild_index_from_scratch(engine, collection_name, vector_name, points)
538 }
539}
540
541#[cfg(test)]
542mod tests {
543 use super::*;
544 use manifoldb_storage::backends::RedbEngine;
545
546 fn create_test_engines() -> (RedbEngine, RedbEngine) {
547 (RedbEngine::in_memory().unwrap(), RedbEngine::in_memory().unwrap())
550 }
551
552 #[test]
553 fn test_create_coordinator() {
554 let (coord_engine, _) = create_test_engines();
555 let coordinator = VectorIndexCoordinator::new(coord_engine);
556 assert!(!coordinator.is_index_loaded("test", "vec"));
558 }
559
560 #[test]
561 fn test_create_index() {
562 let (coord_engine, index_engine) = create_test_engines();
563 let coordinator = VectorIndexCoordinator::new(coord_engine);
564
565 let index_name = coordinator
566 .create_index(
567 index_engine,
568 "documents",
569 "text_embedding",
570 384,
571 DistanceMetric::Cosine,
572 &HnswConfig::default(),
573 )
574 .unwrap();
575
576 assert_eq!(index_name, "documents_text_embedding_hnsw");
577 assert!(coordinator.is_index_loaded("documents", "text_embedding"));
578 }
579
580 #[test]
581 fn test_upsert_and_search() {
582 let (coord_engine, index_engine) = create_test_engines();
583 let coordinator = VectorIndexCoordinator::new(coord_engine);
584
585 coordinator
587 .create_index(
588 index_engine,
589 "docs",
590 "vec",
591 4,
592 DistanceMetric::Euclidean,
593 &HnswConfig::default(),
594 )
595 .unwrap();
596
597 let collection_id = CollectionId::new(1);
598
599 for i in 1..=5 {
601 let data = VectorData::Dense(vec![i as f32; 4]);
602 coordinator
603 .upsert_vector(collection_id, EntityId::new(i), "docs", "vec", &data)
604 .unwrap();
605 }
606
607 let query = Embedding::new(vec![3.0; 4]).unwrap();
609 let results = coordinator.search("docs", "vec", &query, 3, None).unwrap();
610
611 assert_eq!(results.len(), 3);
612 assert_eq!(results[0].entity_id, EntityId::new(3));
614 }
615
616 #[test]
617 fn test_delete_vector() {
618 let (coord_engine, index_engine) = create_test_engines();
619 let coordinator = VectorIndexCoordinator::new(coord_engine);
620
621 coordinator
622 .create_index(
623 index_engine,
624 "docs",
625 "vec",
626 4,
627 DistanceMetric::Euclidean,
628 &HnswConfig::default(),
629 )
630 .unwrap();
631
632 let collection_id = CollectionId::new(1);
633 let entity_id = EntityId::new(1);
634
635 coordinator
637 .upsert_vector(
638 collection_id,
639 entity_id,
640 "docs",
641 "vec",
642 &VectorData::Dense(vec![1.0; 4]),
643 )
644 .unwrap();
645
646 assert!(coordinator.get_vector(collection_id, entity_id, "vec").unwrap().is_some());
648
649 let deleted = coordinator.delete_vector(collection_id, entity_id, "docs", "vec").unwrap();
651 assert!(deleted);
652
653 assert!(coordinator.get_vector(collection_id, entity_id, "vec").unwrap().is_none());
655
656 let query = Embedding::new(vec![1.0; 4]).unwrap();
658 let results = coordinator.search("docs", "vec", &query, 1, None).unwrap();
659 assert!(results.is_empty());
660 }
661
662 #[test]
663 fn test_batch_upsert() {
664 let (coord_engine, index_engine) = create_test_engines();
665 let coordinator = VectorIndexCoordinator::new(coord_engine);
666
667 coordinator
668 .create_index(
669 index_engine,
670 "docs",
671 "vec",
672 4,
673 DistanceMetric::Euclidean,
674 &HnswConfig::default(),
675 )
676 .unwrap();
677
678 let collection_id = CollectionId::new(1);
679
680 let data1 = VectorData::Dense(vec![1.0; 4]);
682 let data2 = VectorData::Dense(vec![2.0; 4]);
683 let data3 = VectorData::Dense(vec![3.0; 4]);
684
685 let vectors: Vec<(EntityId, &str, &VectorData)> = vec![
686 (EntityId::new(1), "vec", &data1),
687 (EntityId::new(2), "vec", &data2),
688 (EntityId::new(3), "vec", &data3),
689 ];
690
691 coordinator.upsert_vectors_batch(collection_id, "docs", &vectors).unwrap();
692
693 for i in 1..=3 {
695 assert!(coordinator
696 .get_vector(collection_id, EntityId::new(i), "vec")
697 .unwrap()
698 .is_some());
699 }
700
701 let query = Embedding::new(vec![2.0; 4]).unwrap();
703 let results = coordinator.search("docs", "vec", &query, 3, None).unwrap();
704 assert_eq!(results.len(), 3);
705 }
706
707 #[test]
708 fn test_rebuild_from_store() {
709 let (coord_engine, index_engine) = create_test_engines();
710 let coordinator = VectorIndexCoordinator::new(coord_engine);
711
712 let collection_id = CollectionId::new(1);
713
714 for i in 1..=5 {
716 coordinator
717 .vector_store()
718 .put_vector(
719 collection_id,
720 EntityId::new(i),
721 "vec",
722 &VectorData::Dense(vec![i as f32; 4]),
723 )
724 .unwrap();
725 }
726
727 coordinator
729 .create_index(
730 index_engine,
731 "docs",
732 "vec",
733 4,
734 DistanceMetric::Euclidean,
735 &HnswConfig::default(),
736 )
737 .unwrap();
738
739 let count = coordinator.rebuild_index_from_store(collection_id, "docs", "vec").unwrap();
741 assert_eq!(count, 5);
742
743 let query = Embedding::new(vec![3.0; 4]).unwrap();
745 let results = coordinator.search("docs", "vec", &query, 3, None).unwrap();
746 assert_eq!(results.len(), 3);
747 }
748
749 #[test]
750 fn test_search_with_filter() {
751 let (coord_engine, index_engine) = create_test_engines();
752 let coordinator = VectorIndexCoordinator::new(coord_engine);
753
754 coordinator
755 .create_index(
756 index_engine,
757 "docs",
758 "vec",
759 4,
760 DistanceMetric::Euclidean,
761 &HnswConfig::default(),
762 )
763 .unwrap();
764
765 let collection_id = CollectionId::new(1);
766
767 for i in 1..=10 {
769 let data = VectorData::Dense(vec![i as f32; 4]);
770 coordinator
771 .upsert_vector(collection_id, EntityId::new(i), "docs", "vec", &data)
772 .unwrap();
773 }
774
775 let query = Embedding::new(vec![5.0; 4]).unwrap();
777 let predicate = |id: EntityId| id.as_u64() % 2 == 0;
778
779 let results =
780 coordinator.search_with_filter("docs", "vec", &query, 3, predicate, None).unwrap();
781
782 for result in &results {
784 assert_eq!(result.entity_id.as_u64() % 2, 0);
785 }
786 }
787
788 #[test]
789 fn test_sparse_vector_ignored_for_hnsw() {
790 let (coord_engine, index_engine) = create_test_engines();
791 let coordinator = VectorIndexCoordinator::new(coord_engine);
792
793 coordinator
794 .create_index(
795 index_engine,
796 "docs",
797 "vec",
798 4,
799 DistanceMetric::Euclidean,
800 &HnswConfig::default(),
801 )
802 .unwrap();
803
804 let collection_id = CollectionId::new(1);
805
806 let sparse = VectorData::Sparse(vec![(0, 1.0), (2, 0.5)]);
808 coordinator.upsert_vector(collection_id, EntityId::new(1), "docs", "vec", &sparse).unwrap();
809
810 let retrieved = coordinator.get_vector(collection_id, EntityId::new(1), "vec").unwrap();
812 assert!(retrieved.is_some());
813 assert!(retrieved.unwrap().is_sparse());
814
815 let query = Embedding::new(vec![1.0; 4]).unwrap();
817 let results = coordinator.search("docs", "vec", &query, 1, None).unwrap();
818 assert!(results.is_empty());
819 }
820
821 #[test]
822 fn test_multiple_named_vectors() {
823 let (coord_engine, index_engine1) = create_test_engines();
824 let index_engine2 = RedbEngine::in_memory().unwrap();
825 let coordinator = VectorIndexCoordinator::new(coord_engine);
826
827 coordinator
829 .create_index(
830 index_engine1,
831 "docs",
832 "text",
833 4,
834 DistanceMetric::Cosine,
835 &HnswConfig::default(),
836 )
837 .unwrap();
838
839 coordinator
840 .create_index(
841 index_engine2,
842 "docs",
843 "image",
844 8,
845 DistanceMetric::Euclidean,
846 &HnswConfig::default(),
847 )
848 .unwrap();
849
850 let collection_id = CollectionId::new(1);
851 let entity_id = EntityId::new(1);
852
853 coordinator
855 .upsert_vector(
856 collection_id,
857 entity_id,
858 "docs",
859 "text",
860 &VectorData::Dense(vec![0.5; 4]),
861 )
862 .unwrap();
863
864 coordinator
865 .upsert_vector(
866 collection_id,
867 entity_id,
868 "docs",
869 "image",
870 &VectorData::Dense(vec![0.25; 8]),
871 )
872 .unwrap();
873
874 let text_vec = coordinator.get_vector(collection_id, entity_id, "text").unwrap();
876 let image_vec = coordinator.get_vector(collection_id, entity_id, "image").unwrap();
877
878 assert!(text_vec.is_some());
879 assert!(image_vec.is_some());
880 assert_eq!(text_vec.unwrap().dimension(), 4);
881 assert_eq!(image_vec.unwrap().dimension(), 8);
882
883 let text_query = Embedding::new(vec![0.5; 4]).unwrap();
885 let text_results = coordinator.search("docs", "text", &text_query, 1, None).unwrap();
886 assert_eq!(text_results.len(), 1);
887
888 let image_query = Embedding::new(vec![0.25; 8]).unwrap();
889 let image_results = coordinator.search("docs", "image", &image_query, 1, None).unwrap();
890 assert_eq!(image_results.len(), 1);
891 }
892}