manifoldb_vector/index/
coordinator.rs

1//! Coordinator for HNSW indexes and separated vector storage.
2//!
3//! This module provides [`VectorIndexCoordinator`] which bridges the HNSW index
4//! system with the [`CollectionVectorStore`]. It handles:
5//!
6//! - Automatic HNSW updates when vectors are stored
7//! - Index rebuild from vector storage
8//! - Cascade operations (delete entity → delete from index)
9//! - Unified search across collections with named vectors
10//!
11//! # Architecture
12//!
13//! ```text
14//! ┌──────────────────────────────────────────────────────────────────────┐
15//! │                     VectorIndexCoordinator                           │
16//! ├──────────────────────────────────────────────────────────────────────┤
17//! │                                                                      │
18//! │  ┌────────────────────┐          ┌──────────────────────┐           │
19//! │  │ CollectionVector   │          │   HnswIndexManager   │           │
20//! │  │     Store          │          │                      │           │
21//! │  │                    │          │  ┌────────────────┐  │           │
22//! │  │ (collection_id,    │   ─────▶ │  │ documents_text │  │           │
23//! │  │  entity_id,        │          │  │   _hnsw        │  │           │
24//! │  │  vector_name)      │          │  ├────────────────┤  │           │
25//! │  │      ↓             │          │  │ documents_image│  │           │
26//! │  │   VectorData       │          │  │   _hnsw        │  │           │
27//! │  └────────────────────┘          │  └────────────────┘  │           │
28//! │                                   └──────────────────────┘           │
29//! │                                                                      │
30//! └──────────────────────────────────────────────────────────────────────┘
31//! ```
32//!
33//! # Example
34//!
35//! ```ignore
36//! use manifoldb_vector::index::{VectorIndexCoordinator, HnswConfig};
37//! use manifoldb_vector::distance::DistanceMetric;
38//! use manifoldb_core::{CollectionId, EntityId};
39//!
40//! let coordinator = VectorIndexCoordinator::new(engine);
41//!
42//! // Create an HNSW index for a collection's named vector
43//! coordinator.create_index(
44//!     "documents",
45//!     "text_embedding",
46//!     1536,
47//!     DistanceMetric::Cosine,
48//!     &HnswConfig::default(),
49//! )?;
50//!
51//! // Store a vector and automatically update HNSW
52//! let collection_id = CollectionId::new(1);
53//! let entity_id = EntityId::new(42);
54//! coordinator.upsert_vector(
55//!     collection_id,
56//!     entity_id,
57//!     "documents",
58//!     "text_embedding",
59//!     &VectorData::Dense(vec![0.1; 1536]),
60//! )?;
61//!
62//! // Search returns entity IDs
63//! let query = Embedding::new(vec![0.1; 1536])?;
64//! let results = coordinator.search("documents", "text_embedding", &query, 10)?;
65//! ```
66
67use std::sync::Arc;
68
69use manifoldb_core::{CollectionId, EntityId};
70use manifoldb_storage::StorageEngine;
71
72use crate::distance::DistanceMetric;
73use crate::error::VectorError;
74use crate::store::CollectionVectorStore;
75use crate::types::{CollectionName, Embedding, VectorData};
76
77use super::config::HnswConfig;
78use super::manager::HnswIndexManager;
79use super::traits::{SearchResult, VectorIndex};
80
81/// Coordinator for HNSW indexes with separated vector storage.
82///
83/// This struct provides a unified interface for:
84/// - Storing vectors in [`CollectionVectorStore`]
85/// - Maintaining HNSW indexes via [`HnswIndexManager`]
86/// - Coordinating insert/update/delete operations
87/// - Rebuilding indexes from stored vectors
88///
89/// The coordinator ensures that:
90/// 1. Vectors are stored in the vector store for persistence
91/// 2. HNSW indexes are updated automatically for fast similarity search
92/// 3. Delete operations cascade properly to both stores
93pub struct VectorIndexCoordinator<E: StorageEngine> {
94    /// The vector store for persisted vector data.
95    vector_store: CollectionVectorStore<E>,
96    /// The HNSW index manager for similarity search.
97    index_manager: Arc<HnswIndexManager<E>>,
98}
99
100impl<E: StorageEngine> VectorIndexCoordinator<E> {
101    /// Create a new coordinator.
102    ///
103    /// # Arguments
104    ///
105    /// * `engine` - The storage engine
106    #[must_use]
107    pub fn new(engine: E) -> Self {
108        Self {
109            vector_store: CollectionVectorStore::new(engine),
110            index_manager: Arc::new(HnswIndexManager::new()),
111        }
112    }
113
114    /// Create a coordinator with an existing index manager.
115    ///
116    /// Use this when you need to share an index manager across multiple coordinators.
117    #[must_use]
118    pub fn with_manager(engine: E, index_manager: Arc<HnswIndexManager<E>>) -> Self {
119        Self { vector_store: CollectionVectorStore::new(engine), index_manager }
120    }
121
122    /// Get a reference to the vector store.
123    #[must_use]
124    pub fn vector_store(&self) -> &CollectionVectorStore<E> {
125        &self.vector_store
126    }
127
128    /// Get a reference to the index manager.
129    #[must_use]
130    pub fn index_manager(&self) -> &Arc<HnswIndexManager<E>> {
131        &self.index_manager
132    }
133
134    // ========================================================================
135    // Vector Operations (with automatic HNSW updates)
136    // ========================================================================
137
138    /// Upsert a vector, updating both storage and HNSW index.
139    ///
140    /// This method:
141    /// 1. Stores the vector in [`CollectionVectorStore`]
142    /// 2. Updates the HNSW index (if one exists for this named vector)
143    ///
144    /// # Arguments
145    ///
146    /// * `collection_id` - The collection ID
147    /// * `entity_id` - The entity ID
148    /// * `collection_name` - Collection name for index lookup
149    /// * `vector_name` - The vector name within the collection
150    /// * `data` - The vector data to store
151    ///
152    /// # Errors
153    ///
154    /// Returns an error if storage or index update fails.
155    pub fn upsert_vector(
156        &self,
157        collection_id: CollectionId,
158        entity_id: EntityId,
159        collection_name: &str,
160        vector_name: &str,
161        data: &VectorData,
162    ) -> Result<(), VectorError> {
163        // 1. Store in vector store
164        self.vector_store.put_vector(collection_id, entity_id, vector_name, data)?;
165
166        // 2. Update HNSW index (if one exists and this is a dense vector)
167        if let Some(dense) = data.as_dense() {
168            // Get the index from the manager
169            if let Ok(Some(index)) = self.index_manager.get_index(collection_name, vector_name) {
170                let embedding = Embedding::new(dense.to_vec())?;
171                let mut guard = index.write().map_err(|_| VectorError::LockPoisoned)?;
172                guard.insert(entity_id, &embedding)?;
173            }
174        }
175
176        Ok(())
177    }
178
179    /// Upsert multiple vectors in a batch.
180    ///
181    /// More efficient than calling `upsert_vector` multiple times as it
182    /// batches storage operations and HNSW insertions.
183    ///
184    /// # Arguments
185    ///
186    /// * `collection_id` - The collection ID
187    /// * `collection_name` - Collection name for index lookup
188    /// * `vectors` - List of (entity_id, vector_name, vector_data) tuples
189    pub fn upsert_vectors_batch(
190        &self,
191        collection_id: CollectionId,
192        collection_name: &str,
193        vectors: &[(EntityId, &str, &VectorData)],
194    ) -> Result<(), VectorError> {
195        if vectors.is_empty() {
196            return Ok(());
197        }
198
199        // 1. Batch store in vector store
200        self.vector_store.put_vectors_batch(collection_id, vectors)?;
201
202        // 2. Group vectors by name and update each HNSW index
203        use std::collections::HashMap;
204        let mut by_name: HashMap<&str, Vec<(EntityId, &VectorData)>> = HashMap::new();
205
206        for (entity_id, name, data) in vectors {
207            by_name.entry(*name).or_default().push((*entity_id, *data));
208        }
209
210        for (vector_name, entity_vectors) in by_name {
211            if let Ok(Some(index)) = self.index_manager.get_index(collection_name, vector_name) {
212                // Filter to dense vectors only and convert to embeddings
213                let embeddings: Vec<(EntityId, Embedding)> = entity_vectors
214                    .into_iter()
215                    .filter_map(|(id, data)| {
216                        data.as_dense().map(|d| Embedding::new(d.to_vec()).map(|e| (id, e)))
217                    })
218                    .filter_map(Result::ok)
219                    .collect();
220
221                if !embeddings.is_empty() {
222                    let refs: Vec<(EntityId, &Embedding)> =
223                        embeddings.iter().map(|(id, e)| (*id, e)).collect();
224
225                    let mut guard = index.write().map_err(|_| VectorError::LockPoisoned)?;
226                    guard.insert_batch(&refs)?;
227                }
228            }
229        }
230
231        Ok(())
232    }
233
234    /// Delete a vector from both storage and HNSW index.
235    ///
236    /// # Returns
237    ///
238    /// Returns `true` if the vector was deleted from storage.
239    pub fn delete_vector(
240        &self,
241        collection_id: CollectionId,
242        entity_id: EntityId,
243        collection_name: &str,
244        vector_name: &str,
245    ) -> Result<bool, VectorError> {
246        // 1. Delete from vector store
247        let deleted = self.vector_store.delete_vector(collection_id, entity_id, vector_name)?;
248
249        // 2. Delete from HNSW index
250        if let Ok(Some(index)) = self.index_manager.get_index(collection_name, vector_name) {
251            let mut guard = index.write().map_err(|_| VectorError::LockPoisoned)?;
252            let _ = guard.delete(entity_id);
253        }
254
255        Ok(deleted)
256    }
257
258    /// Delete all vectors for an entity.
259    ///
260    /// This cascades the delete to all HNSW indexes for the collection.
261    ///
262    /// # Returns
263    ///
264    /// Returns the number of vectors deleted from storage.
265    pub fn delete_entity_vectors(
266        &self,
267        collection_id: CollectionId,
268        entity_id: EntityId,
269        collection_name: &str,
270    ) -> Result<usize, VectorError> {
271        // 1. Get all vector names before deleting (for HNSW cleanup)
272        let vectors = self.vector_store.get_all_vectors(collection_id, entity_id)?;
273
274        // 2. Delete from vector store
275        let count = self.vector_store.delete_all_vectors(collection_id, entity_id)?;
276
277        // 3. Delete from all relevant HNSW indexes
278        for vector_name in vectors.keys() {
279            if let Ok(Some(index)) = self.index_manager.get_index(collection_name, vector_name) {
280                let mut guard = index.write().map_err(|_| VectorError::LockPoisoned)?;
281                let _ = guard.delete(entity_id);
282            }
283        }
284
285        Ok(count)
286    }
287
288    // ========================================================================
289    // Vector Retrieval
290    // ========================================================================
291
292    /// Get a vector from storage.
293    pub fn get_vector(
294        &self,
295        collection_id: CollectionId,
296        entity_id: EntityId,
297        vector_name: &str,
298    ) -> Result<Option<VectorData>, VectorError> {
299        self.vector_store.get_vector(collection_id, entity_id, vector_name)
300    }
301
302    /// Get all vectors for an entity.
303    pub fn get_all_vectors(
304        &self,
305        collection_id: CollectionId,
306        entity_id: EntityId,
307    ) -> Result<std::collections::HashMap<String, VectorData>, VectorError> {
308        self.vector_store.get_all_vectors(collection_id, entity_id)
309    }
310
311    // ========================================================================
312    // Similarity Search
313    // ========================================================================
314
315    /// Search for similar vectors using HNSW.
316    ///
317    /// # Arguments
318    ///
319    /// * `collection_name` - The collection to search
320    /// * `vector_name` - The named vector to search
321    /// * `query` - The query embedding
322    /// * `k` - Number of results to return
323    /// * `ef_search` - Optional beam width (uses default if None)
324    ///
325    /// # Returns
326    ///
327    /// A list of search results with entity IDs and distances.
328    pub fn search(
329        &self,
330        collection_name: &str,
331        vector_name: &str,
332        query: &Embedding,
333        k: usize,
334        ef_search: Option<usize>,
335    ) -> Result<Vec<SearchResult>, VectorError> {
336        let index =
337            self.index_manager.get_index(collection_name, vector_name)?.ok_or_else(|| {
338                VectorError::SpaceNotFound(format!(
339                    "no HNSW index for {}.{}",
340                    collection_name, vector_name
341                ))
342            })?;
343
344        let guard = index.read().map_err(|_| VectorError::LockPoisoned)?;
345        guard.search(query, k, ef_search)
346    }
347
348    /// Search with a filter predicate.
349    ///
350    /// The predicate is applied during graph traversal, not as a post-filter.
351    pub fn search_with_filter<F>(
352        &self,
353        collection_name: &str,
354        vector_name: &str,
355        query: &Embedding,
356        k: usize,
357        predicate: F,
358        ef_search: Option<usize>,
359    ) -> Result<Vec<SearchResult>, VectorError>
360    where
361        F: Fn(EntityId) -> bool,
362    {
363        let index =
364            self.index_manager.get_index(collection_name, vector_name)?.ok_or_else(|| {
365                VectorError::SpaceNotFound(format!(
366                    "no HNSW index for {}.{}",
367                    collection_name, vector_name
368                ))
369            })?;
370
371        let guard = index.read().map_err(|_| VectorError::LockPoisoned)?;
372        guard.search_with_filter(query, k, predicate, ef_search, None)
373    }
374
375    // ========================================================================
376    // Index Rebuild
377    // ========================================================================
378
379    /// Rebuild an HNSW index from the vector store.
380    ///
381    /// This is useful for:
382    /// - Recovering from index corruption
383    /// - Building an index for existing vectors
384    /// - Re-indexing after configuration changes
385    ///
386    /// # Arguments
387    ///
388    /// * `collection_id` - The collection ID
389    /// * `collection_name` - Collection name for index lookup
390    /// * `vector_name` - The named vector to rebuild
391    ///
392    /// # Returns
393    ///
394    /// The number of vectors indexed.
395    pub fn rebuild_index_from_store(
396        &self,
397        collection_id: CollectionId,
398        collection_name: &str,
399        vector_name: &str,
400    ) -> Result<usize, VectorError> {
401        // Get all entities with this vector
402        let entity_ids = self.vector_store.list_entities_with_vector(collection_id, vector_name)?;
403
404        // Collect vectors
405        let mut vectors = Vec::with_capacity(entity_ids.len());
406        for entity_id in entity_ids {
407            if let Some(data) =
408                self.vector_store.get_vector(collection_id, entity_id, vector_name)?
409            {
410                if let Some(dense) = data.as_dense() {
411                    vectors.push((entity_id, dense.to_vec()));
412                }
413            }
414        }
415
416        // Rebuild using the index manager
417        let points =
418            vectors.into_iter().map(|(id, v)| (manifoldb_core::PointId::new(id.as_u64()), v));
419
420        self.index_manager.rebuild_index(collection_name, vector_name, points)
421    }
422
423    // ========================================================================
424    // Index Status
425    // ========================================================================
426
427    /// Check if an index is loaded in memory.
428    pub fn is_index_loaded(&self, collection: &str, vector_name: &str) -> bool {
429        self.index_manager.is_index_loaded(collection, vector_name).unwrap_or(false)
430    }
431}
432
433/// Extension methods for the coordinator that require ownership of a storage engine.
434impl<E: StorageEngine> VectorIndexCoordinator<E> {
435    /// Create an HNSW index for a collection's named vector.
436    ///
437    /// Note: This method takes a separate engine instance for index persistence.
438    /// You may want to call this with a shared engine reference or a cloned engine.
439    ///
440    /// # Arguments
441    ///
442    /// * `engine` - Storage engine for the index persistence
443    /// * `collection` - Collection name (e.g., "documents")
444    /// * `vector_name` - Vector name within the collection (e.g., "text_embedding")
445    /// * `dimension` - Vector dimension
446    /// * `distance_metric` - Distance metric for similarity
447    /// * `config` - HNSW configuration
448    ///
449    /// # Returns
450    ///
451    /// The name of the created index (e.g., "documents_text_embedding_hnsw")
452    pub fn create_index(
453        &self,
454        engine: E,
455        collection: &str,
456        vector_name: &str,
457        dimension: usize,
458        distance_metric: DistanceMetric,
459        config: &HnswConfig,
460    ) -> Result<String, VectorError> {
461        let collection_name = CollectionName::new(collection)?;
462        self.index_manager.create_index_for_vector(
463            engine,
464            &collection_name,
465            vector_name,
466            dimension,
467            distance_metric,
468            config,
469        )
470    }
471
472    /// Drop an HNSW index.
473    pub fn drop_index(&self, engine: &E, index_name: &str) -> Result<bool, VectorError> {
474        self.index_manager.drop_index(engine, index_name)
475    }
476
477    /// Drop all HNSW indexes for a collection.
478    pub fn drop_collection_indexes(
479        &self,
480        engine: &E,
481        collection: &str,
482    ) -> Result<Vec<String>, VectorError> {
483        let collection_name = CollectionName::new(collection)?;
484        self.index_manager.drop_indexes_for_collection(engine, &collection_name)
485    }
486
487    /// Check if an HNSW index exists for a collection's named vector.
488    pub fn has_index(&self, engine: &E, collection: &str, vector_name: &str) -> bool {
489        self.index_manager.has_index(engine, collection, vector_name).unwrap_or(false)
490    }
491
492    /// Load an existing index into memory.
493    pub fn load_index(&self, engine: E, index_name: &str) -> Result<(), VectorError> {
494        self.index_manager.load_index(engine, index_name)
495    }
496
497    /// Rebuild an index from scratch with new configuration.
498    ///
499    /// This drops the existing index data and creates a fresh index.
500    ///
501    /// # Arguments
502    ///
503    /// * `engine` - Storage engine for the new index
504    /// * `collection_id` - The collection ID
505    /// * `collection_name` - Collection name
506    /// * `vector_name` - The named vector to rebuild
507    ///
508    /// # Returns
509    ///
510    /// The number of vectors indexed.
511    pub fn rebuild_index_from_scratch(
512        &self,
513        engine: E,
514        collection_id: CollectionId,
515        collection_name: &str,
516        vector_name: &str,
517    ) -> Result<usize, VectorError> {
518        // Get all entities with this vector
519        let entity_ids = self.vector_store.list_entities_with_vector(collection_id, vector_name)?;
520
521        // Collect vectors
522        let mut vectors = Vec::with_capacity(entity_ids.len());
523        for entity_id in entity_ids {
524            if let Some(data) =
525                self.vector_store.get_vector(collection_id, entity_id, vector_name)?
526            {
527                if let Some(dense) = data.as_dense() {
528                    vectors.push((entity_id, dense.to_vec()));
529                }
530            }
531        }
532
533        // Rebuild from scratch using the index manager
534        let points =
535            vectors.into_iter().map(|(id, v)| (manifoldb_core::PointId::new(id.as_u64()), v));
536
537        self.index_manager.rebuild_index_from_scratch(engine, collection_name, vector_name, points)
538    }
539}
540
541#[cfg(test)]
542mod tests {
543    use super::*;
544    use manifoldb_storage::backends::RedbEngine;
545
546    fn create_test_engines() -> (RedbEngine, RedbEngine) {
547        // Create two separate in-memory engines for coordinator and index creation
548        // This is needed because we can't clone RedbEngine
549        (RedbEngine::in_memory().unwrap(), RedbEngine::in_memory().unwrap())
550    }
551
552    #[test]
553    fn test_create_coordinator() {
554        let (coord_engine, _) = create_test_engines();
555        let coordinator = VectorIndexCoordinator::new(coord_engine);
556        // Basic sanity check
557        assert!(!coordinator.is_index_loaded("test", "vec"));
558    }
559
560    #[test]
561    fn test_create_index() {
562        let (coord_engine, index_engine) = create_test_engines();
563        let coordinator = VectorIndexCoordinator::new(coord_engine);
564
565        let index_name = coordinator
566            .create_index(
567                index_engine,
568                "documents",
569                "text_embedding",
570                384,
571                DistanceMetric::Cosine,
572                &HnswConfig::default(),
573            )
574            .unwrap();
575
576        assert_eq!(index_name, "documents_text_embedding_hnsw");
577        assert!(coordinator.is_index_loaded("documents", "text_embedding"));
578    }
579
580    #[test]
581    fn test_upsert_and_search() {
582        let (coord_engine, index_engine) = create_test_engines();
583        let coordinator = VectorIndexCoordinator::new(coord_engine);
584
585        // Create an index
586        coordinator
587            .create_index(
588                index_engine,
589                "docs",
590                "vec",
591                4,
592                DistanceMetric::Euclidean,
593                &HnswConfig::default(),
594            )
595            .unwrap();
596
597        let collection_id = CollectionId::new(1);
598
599        // Upsert vectors
600        for i in 1..=5 {
601            let data = VectorData::Dense(vec![i as f32; 4]);
602            coordinator
603                .upsert_vector(collection_id, EntityId::new(i), "docs", "vec", &data)
604                .unwrap();
605        }
606
607        // Search
608        let query = Embedding::new(vec![3.0; 4]).unwrap();
609        let results = coordinator.search("docs", "vec", &query, 3, None).unwrap();
610
611        assert_eq!(results.len(), 3);
612        // Entity 3 should be closest (exact match)
613        assert_eq!(results[0].entity_id, EntityId::new(3));
614    }
615
616    #[test]
617    fn test_delete_vector() {
618        let (coord_engine, index_engine) = create_test_engines();
619        let coordinator = VectorIndexCoordinator::new(coord_engine);
620
621        coordinator
622            .create_index(
623                index_engine,
624                "docs",
625                "vec",
626                4,
627                DistanceMetric::Euclidean,
628                &HnswConfig::default(),
629            )
630            .unwrap();
631
632        let collection_id = CollectionId::new(1);
633        let entity_id = EntityId::new(1);
634
635        // Upsert
636        coordinator
637            .upsert_vector(
638                collection_id,
639                entity_id,
640                "docs",
641                "vec",
642                &VectorData::Dense(vec![1.0; 4]),
643            )
644            .unwrap();
645
646        // Verify it exists
647        assert!(coordinator.get_vector(collection_id, entity_id, "vec").unwrap().is_some());
648
649        // Delete
650        let deleted = coordinator.delete_vector(collection_id, entity_id, "docs", "vec").unwrap();
651        assert!(deleted);
652
653        // Verify it's gone
654        assert!(coordinator.get_vector(collection_id, entity_id, "vec").unwrap().is_none());
655
656        // Search should return no results
657        let query = Embedding::new(vec![1.0; 4]).unwrap();
658        let results = coordinator.search("docs", "vec", &query, 1, None).unwrap();
659        assert!(results.is_empty());
660    }
661
662    #[test]
663    fn test_batch_upsert() {
664        let (coord_engine, index_engine) = create_test_engines();
665        let coordinator = VectorIndexCoordinator::new(coord_engine);
666
667        coordinator
668            .create_index(
669                index_engine,
670                "docs",
671                "vec",
672                4,
673                DistanceMetric::Euclidean,
674                &HnswConfig::default(),
675            )
676            .unwrap();
677
678        let collection_id = CollectionId::new(1);
679
680        // Create batch data
681        let data1 = VectorData::Dense(vec![1.0; 4]);
682        let data2 = VectorData::Dense(vec![2.0; 4]);
683        let data3 = VectorData::Dense(vec![3.0; 4]);
684
685        let vectors: Vec<(EntityId, &str, &VectorData)> = vec![
686            (EntityId::new(1), "vec", &data1),
687            (EntityId::new(2), "vec", &data2),
688            (EntityId::new(3), "vec", &data3),
689        ];
690
691        coordinator.upsert_vectors_batch(collection_id, "docs", &vectors).unwrap();
692
693        // Verify all vectors are stored
694        for i in 1..=3 {
695            assert!(coordinator
696                .get_vector(collection_id, EntityId::new(i), "vec")
697                .unwrap()
698                .is_some());
699        }
700
701        // Search should work
702        let query = Embedding::new(vec![2.0; 4]).unwrap();
703        let results = coordinator.search("docs", "vec", &query, 3, None).unwrap();
704        assert_eq!(results.len(), 3);
705    }
706
707    #[test]
708    fn test_rebuild_from_store() {
709        let (coord_engine, index_engine) = create_test_engines();
710        let coordinator = VectorIndexCoordinator::new(coord_engine);
711
712        let collection_id = CollectionId::new(1);
713
714        // Store vectors directly (without HNSW index)
715        for i in 1..=5 {
716            coordinator
717                .vector_store()
718                .put_vector(
719                    collection_id,
720                    EntityId::new(i),
721                    "vec",
722                    &VectorData::Dense(vec![i as f32; 4]),
723                )
724                .unwrap();
725        }
726
727        // Create index (empty initially)
728        coordinator
729            .create_index(
730                index_engine,
731                "docs",
732                "vec",
733                4,
734                DistanceMetric::Euclidean,
735                &HnswConfig::default(),
736            )
737            .unwrap();
738
739        // Rebuild from store
740        let count = coordinator.rebuild_index_from_store(collection_id, "docs", "vec").unwrap();
741        assert_eq!(count, 5);
742
743        // Search should now work
744        let query = Embedding::new(vec![3.0; 4]).unwrap();
745        let results = coordinator.search("docs", "vec", &query, 3, None).unwrap();
746        assert_eq!(results.len(), 3);
747    }
748
749    #[test]
750    fn test_search_with_filter() {
751        let (coord_engine, index_engine) = create_test_engines();
752        let coordinator = VectorIndexCoordinator::new(coord_engine);
753
754        coordinator
755            .create_index(
756                index_engine,
757                "docs",
758                "vec",
759                4,
760                DistanceMetric::Euclidean,
761                &HnswConfig::default(),
762            )
763            .unwrap();
764
765        let collection_id = CollectionId::new(1);
766
767        // Upsert vectors
768        for i in 1..=10 {
769            let data = VectorData::Dense(vec![i as f32; 4]);
770            coordinator
771                .upsert_vector(collection_id, EntityId::new(i), "docs", "vec", &data)
772                .unwrap();
773        }
774
775        // Search with filter: only even IDs
776        let query = Embedding::new(vec![5.0; 4]).unwrap();
777        let predicate = |id: EntityId| id.as_u64() % 2 == 0;
778
779        let results =
780            coordinator.search_with_filter("docs", "vec", &query, 3, predicate, None).unwrap();
781
782        // All results should be even
783        for result in &results {
784            assert_eq!(result.entity_id.as_u64() % 2, 0);
785        }
786    }
787
788    #[test]
789    fn test_sparse_vector_ignored_for_hnsw() {
790        let (coord_engine, index_engine) = create_test_engines();
791        let coordinator = VectorIndexCoordinator::new(coord_engine);
792
793        coordinator
794            .create_index(
795                index_engine,
796                "docs",
797                "vec",
798                4,
799                DistanceMetric::Euclidean,
800                &HnswConfig::default(),
801            )
802            .unwrap();
803
804        let collection_id = CollectionId::new(1);
805
806        // Upsert a sparse vector (should be stored but not indexed)
807        let sparse = VectorData::Sparse(vec![(0, 1.0), (2, 0.5)]);
808        coordinator.upsert_vector(collection_id, EntityId::new(1), "docs", "vec", &sparse).unwrap();
809
810        // Vector should be stored
811        let retrieved = coordinator.get_vector(collection_id, EntityId::new(1), "vec").unwrap();
812        assert!(retrieved.is_some());
813        assert!(retrieved.unwrap().is_sparse());
814
815        // HNSW search should return nothing (sparse vectors not indexed)
816        let query = Embedding::new(vec![1.0; 4]).unwrap();
817        let results = coordinator.search("docs", "vec", &query, 1, None).unwrap();
818        assert!(results.is_empty());
819    }
820
821    #[test]
822    fn test_multiple_named_vectors() {
823        let (coord_engine, index_engine1) = create_test_engines();
824        let index_engine2 = RedbEngine::in_memory().unwrap();
825        let coordinator = VectorIndexCoordinator::new(coord_engine);
826
827        // Create indexes for different named vectors
828        coordinator
829            .create_index(
830                index_engine1,
831                "docs",
832                "text",
833                4,
834                DistanceMetric::Cosine,
835                &HnswConfig::default(),
836            )
837            .unwrap();
838
839        coordinator
840            .create_index(
841                index_engine2,
842                "docs",
843                "image",
844                8,
845                DistanceMetric::Euclidean,
846                &HnswConfig::default(),
847            )
848            .unwrap();
849
850        let collection_id = CollectionId::new(1);
851        let entity_id = EntityId::new(1);
852
853        // Upsert different vectors for the same entity
854        coordinator
855            .upsert_vector(
856                collection_id,
857                entity_id,
858                "docs",
859                "text",
860                &VectorData::Dense(vec![0.5; 4]),
861            )
862            .unwrap();
863
864        coordinator
865            .upsert_vector(
866                collection_id,
867                entity_id,
868                "docs",
869                "image",
870                &VectorData::Dense(vec![0.25; 8]),
871            )
872            .unwrap();
873
874        // Both vectors should be stored
875        let text_vec = coordinator.get_vector(collection_id, entity_id, "text").unwrap();
876        let image_vec = coordinator.get_vector(collection_id, entity_id, "image").unwrap();
877
878        assert!(text_vec.is_some());
879        assert!(image_vec.is_some());
880        assert_eq!(text_vec.unwrap().dimension(), 4);
881        assert_eq!(image_vec.unwrap().dimension(), 8);
882
883        // Search should work on each index
884        let text_query = Embedding::new(vec![0.5; 4]).unwrap();
885        let text_results = coordinator.search("docs", "text", &text_query, 1, None).unwrap();
886        assert_eq!(text_results.len(), 1);
887
888        let image_query = Embedding::new(vec![0.25; 8]).unwrap();
889        let image_results = coordinator.search("docs", "image", &image_query, 1, None).unwrap();
890        assert_eq!(image_results.len(), 1);
891    }
892}