Skip to main content

reddb_server/storage/unified/index/
manager_impl.rs

1use super::*;
2
3impl IntegratedIndexManager {
4    /// Create a new index manager with default config
5    pub fn new() -> Self {
6        Self::with_config(IntegratedIndexConfig::default())
7    }
8
9    /// Create with custom configuration
10    pub fn with_config(config: IntegratedIndexConfig) -> Self {
11        let now = SystemTime::now()
12            .duration_since(UNIX_EPOCH)
13            .map(|d| d.as_secs())
14            .unwrap_or(0);
15
16        let mut status = HashMap::new();
17
18        // Initialize status for each index type based on config
19        if config.enable_hnsw {
20            status.insert((IndexType::Hnsw, None), IndexStatus::Ready);
21        } else {
22            status.insert((IndexType::Hnsw, None), IndexStatus::Disabled);
23        }
24        if config.enable_fulltext {
25            status.insert((IndexType::Fulltext, None), IndexStatus::Ready);
26        } else {
27            status.insert((IndexType::Fulltext, None), IndexStatus::Disabled);
28        }
29        if config.enable_metadata {
30            status.insert((IndexType::Metadata, None), IndexStatus::Ready);
31        } else {
32            status.insert((IndexType::Metadata, None), IndexStatus::Disabled);
33        }
34        if config.enable_graph {
35            status.insert((IndexType::Graph, None), IndexStatus::Ready);
36        } else {
37            status.insert((IndexType::Graph, None), IndexStatus::Disabled);
38        }
39
40        Self {
41            config,
42            text_index: InvertedIndex::new(),
43            metadata_index: RwLock::new(MetadataStorage::new()),
44            hnsw_indices: RwLock::new(HashMap::new()),
45            graph_index: GraphAdjacencyIndex::new(),
46            index_status: RwLock::new(status),
47            event_history: RwLock::new(Vec::new()),
48            created_at: now,
49        }
50    }
51
52    /// Index a vector for similarity search
53    pub fn index_vector(&self, collection: &str, id: EntityId, vector: &[f32]) {
54        if !self.config.enable_hnsw {
55            return;
56        }
57
58        {
59            let mut indices = self.hnsw_indices.write();
60            let info = indices
61                .entry(collection.to_string())
62                .or_insert_with(|| HnswIndexInfo {
63                    dimension: vector.len(),
64                    vectors: HashMap::new(),
65                    entry_point: None,
66                });
67
68            // Verify dimension
69            if info.dimension != vector.len() && !info.vectors.is_empty() {
70                return; // Dimension mismatch
71            }
72
73            info.vectors.insert(id, vector.to_vec());
74            if info.entry_point.is_none() {
75                info.entry_point = Some(id);
76            }
77        }
78    }
79
80    /// Search for similar vectors
81    pub fn search_similar(
82        &self,
83        collection: &str,
84        query: &[f32],
85        k: usize,
86    ) -> Vec<VectorSearchResult> {
87        let indices = self.hnsw_indices.read();
88
89        let info = match indices.get(collection) {
90            Some(i) => i,
91            None => return Vec::new(),
92        };
93
94        if query.len() != info.dimension {
95            return Vec::new();
96        }
97
98        // Simple brute-force for now (in production, use actual HNSW)
99        let mut results: Vec<VectorSearchResult> = info
100            .vectors
101            .iter()
102            .map(|(id, vec)| {
103                let similarity = cosine_similarity(query, vec);
104                VectorSearchResult {
105                    entity_id: *id,
106                    collection: collection.to_string(),
107                    similarity,
108                }
109            })
110            .collect();
111
112        results.sort_by(|a, b| {
113            b.similarity
114                .partial_cmp(&a.similarity)
115                .unwrap_or(std::cmp::Ordering::Equal)
116                .then_with(|| a.entity_id.cmp(&b.entity_id))
117        });
118        results.truncate(k);
119        results
120    }
121
122    /// Index text content for full-text search
123    pub fn index_text(&self, collection: &str, id: EntityId, field: &str, content: &str) {
124        if !self.config.enable_fulltext {
125            return;
126        }
127        self.text_index
128            .index_document(collection, id, field, content);
129    }
130
131    /// Full-text search
132    pub fn search_text(&self, query: &str, limit: usize) -> Vec<TextSearchResult> {
133        self.text_index.search(query, limit)
134    }
135
136    /// Prefix search for autocomplete
137    pub fn autocomplete(&self, prefix: &str, limit: usize) -> Vec<String> {
138        self.text_index.search_prefix(prefix, limit)
139    }
140
141    /// Index metadata for range queries
142    pub fn index_metadata(&self, _collection: &str, id: EntityId, metadata: &Metadata) {
143        if !self.config.enable_metadata {
144            return;
145        }
146        // MetadataStorage handles this internally via set()
147        {
148            let mut storage = self.metadata_index.write();
149            for (key, value) in &metadata.fields {
150                storage.set(id, key.clone(), value.clone());
151            }
152        }
153    }
154
155    /// Query metadata with filters
156    pub fn query_metadata(&self, key: &str, filter: MetadataQueryFilter) -> Vec<EntityId> {
157        let storage = self.metadata_index.read();
158
159        match filter {
160            MetadataQueryFilter::Equals(ref value) => storage.filter_eq(key, value),
161            MetadataQueryFilter::Range { min, max } => {
162                // Handle int ranges
163                let min_int = min.as_ref().and_then(|v| {
164                    if let MetadataValue::Int(n) = v {
165                        Some(*n)
166                    } else {
167                        None
168                    }
169                });
170                let max_int = max.as_ref().and_then(|v| {
171                    if let MetadataValue::Int(n) = v {
172                        Some(*n)
173                    } else {
174                        None
175                    }
176                });
177                if min_int.is_some() || max_int.is_some() {
178                    return storage.filter_int_range(key, min_int, max_int);
179                }
180                Vec::new()
181            }
182            MetadataQueryFilter::Contains(ref substring) => {
183                storage.filter_string_prefix(key, substring)
184            }
185            MetadataQueryFilter::In(ref values) => {
186                // Collect entities matching any value
187                let mut result = Vec::new();
188                for value in values {
189                    result.extend(storage.filter_eq(key, value));
190                }
191                result.sort();
192                result.dedup();
193                result
194            }
195        }
196    }
197
198    /// Remove entity from all indices
199    pub fn remove_entity(&self, id: EntityId) {
200        // Remove from text index
201        self.text_index.remove_document(id);
202
203        // Remove from vector indices
204        {
205            let mut indices = self.hnsw_indices.write();
206            for info in indices.values_mut() {
207                info.vectors.remove(&id);
208            }
209        }
210
211        // Remove from graph index (if it's an edge)
212        self.graph_index.remove_edge(id);
213
214        // Metadata removal handled by storage layer
215    }
216
217    // =========================================================================
218    // Graph Index Operations
219    // =========================================================================
220
221    /// Index an edge in the graph adjacency index
222    pub fn index_edge(
223        &self,
224        edge_id: EntityId,
225        source_id: EntityId,
226        target_id: EntityId,
227        label: &str,
228        weight: f32,
229    ) {
230        if !self.config.enable_graph {
231            return;
232        }
233        self.graph_index
234            .index_edge(edge_id, source_id, target_id, label, weight);
235    }
236
237    /// Get neighbors of a node in a given direction
238    pub fn get_neighbors(
239        &self,
240        node_id: EntityId,
241        direction: EdgeDirection,
242        label_filter: Option<&str>,
243    ) -> Vec<AdjacencyEntry> {
244        self.graph_index
245            .get_neighbors(node_id, direction, label_filter)
246    }
247
248    /// Get all edges with a specific label
249    pub fn get_edges_by_label(&self, label: &str) -> Vec<EntityId> {
250        self.graph_index.get_edges_by_label(label)
251    }
252
253    /// Get degree of a node
254    pub fn node_degree(&self, node_id: EntityId, direction: EdgeDirection) -> usize {
255        match direction {
256            EdgeDirection::Outgoing => self.graph_index.out_degree(node_id),
257            EdgeDirection::Incoming => self.graph_index.in_degree(node_id),
258            EdgeDirection::Both => self.graph_index.degree(node_id),
259        }
260    }
261
262    /// Get a reference to the graph adjacency index
263    pub fn graph_index(&self) -> &GraphAdjacencyIndex {
264        &self.graph_index
265    }
266
267    // =========================================================================
268    // Index Lifecycle Management
269    // =========================================================================
270
271    /// Create a new index of the specified type for a collection
272    pub fn create_index(
273        &self,
274        index_type: IndexType,
275        collection: Option<&str>,
276    ) -> Result<(), String> {
277        let key = (index_type, collection.map(|s| s.to_string()));
278
279        // Check if already exists
280        {
281            let status = self.index_status.read();
282            if let Some(IndexStatus::Ready) = status.get(&key) {
283                return Err(format!("Index {:?} already exists", index_type));
284            }
285        }
286
287        // Set status to building
288        self.index_status
289            .write()
290            .insert(key.clone(), IndexStatus::Building { progress: 0.0 });
291
292        // For now, just mark as ready (actual building would be async)
293        self.index_status
294            .write()
295            .insert(key.clone(), IndexStatus::Ready);
296
297        // Record event
298        self.record_event(IndexEvent {
299            index_type,
300            collection: collection.map(|s| s.to_string()),
301            event: IndexEventKind::Created,
302            timestamp: Self::now(),
303        });
304
305        Ok(())
306    }
307
308    /// Drop an index
309    pub fn drop_index(
310        &self,
311        index_type: IndexType,
312        collection: Option<&str>,
313    ) -> Result<(), String> {
314        let key = (index_type, collection.map(|s| s.to_string()));
315
316        // Clear the index data
317        match index_type {
318            IndexType::Hnsw => {
319                if let Some(coll) = collection {
320                    self.hnsw_indices.write().remove(coll);
321                } else {
322                    self.hnsw_indices.write().clear();
323                }
324            }
325            IndexType::Graph => {
326                self.graph_index.clear();
327            }
328            // Fulltext and Metadata don't support per-collection drop yet
329            _ => {}
330        }
331
332        // Update status
333        self.index_status.write().remove(&key);
334
335        // Record event
336        self.record_event(IndexEvent {
337            index_type,
338            collection: collection.map(|s| s.to_string()),
339            event: IndexEventKind::Dropped,
340            timestamp: Self::now(),
341        });
342
343        Ok(())
344    }
345
346    /// Rebuild an index (clear and recreate)
347    pub fn rebuild_index(
348        &self,
349        index_type: IndexType,
350        collection: Option<&str>,
351    ) -> Result<(), String> {
352        let key = (index_type, collection.map(|s| s.to_string()));
353
354        // Set status to building
355        self.index_status
356            .write()
357            .insert(key.clone(), IndexStatus::Building { progress: 0.0 });
358
359        // Clear existing data
360        match index_type {
361            IndexType::Hnsw => {
362                if let Some(coll) = collection {
363                    let mut indices = self.hnsw_indices.write();
364                    if let Some(info) = indices.get_mut(coll) {
365                        info.vectors.clear();
366                        info.entry_point = None;
367                    }
368                }
369            }
370            IndexType::Graph => {
371                self.graph_index.clear();
372            }
373            _ => {}
374        }
375
376        // Mark as ready (actual rebuild would re-index all entities)
377        self.index_status
378            .write()
379            .insert(key.clone(), IndexStatus::Ready);
380
381        // Record event
382        self.record_event(IndexEvent {
383            index_type,
384            collection: collection.map(|s| s.to_string()),
385            event: IndexEventKind::Rebuilt,
386            timestamp: Self::now(),
387        });
388
389        Ok(())
390    }
391
392    /// Get the status of an index
393    pub fn index_status(&self, index_type: IndexType, collection: Option<&str>) -> IndexStatus {
394        let key = (index_type, collection.map(|s| s.to_string()));
395        self.index_status
396            .read()
397            .get(&key)
398            .cloned()
399            .unwrap_or(IndexStatus::Disabled)
400    }
401
402    /// Get all index statuses
403    pub fn all_index_statuses(&self) -> HashMap<(IndexType, Option<String>), IndexStatus> {
404        self.index_status.read().clone()
405    }
406
407    /// Get index event history
408    pub fn event_history(&self) -> Vec<IndexEvent> {
409        self.event_history.read().clone()
410    }
411
412    // =========================================================================
413    // Statistics
414    // =========================================================================
415
416    /// Get index statistics
417    pub fn stats(&self) -> IndexStats {
418        let now = Self::now();
419
420        let vector_count = self
421            .hnsw_indices
422            .read()
423            .values()
424            .map(|info| info.vectors.len())
425            .sum();
426
427        let (document_count, term_count) = {
428            let i = self.text_index.index.read();
429            let terms = i.len();
430            let docs: HashSet<EntityId> = i
431                .values()
432                .flat_map(|postings| postings.iter().map(|p| p.entity_id))
433                .collect();
434            (docs.len(), terms)
435        };
436
437        IndexStats {
438            vector_count,
439            document_count,
440            term_count,
441            metadata_entries: 0, // Would require adding a count method to MetadataStorage
442            graph_node_count: self.graph_index.node_count(),
443            graph_edge_count: self.graph_index.edge_count(),
444            created_at: self.created_at,
445            updated_at: now,
446        }
447    }
448
449    /// Get configuration
450    pub fn config(&self) -> &IntegratedIndexConfig {
451        &self.config
452    }
453
454    // =========================================================================
455    // Internal Helpers
456    // =========================================================================
457
458    fn record_event(&self, event: IndexEvent) {
459        let mut history = self.event_history.write();
460        history.push(event);
461        // Keep last 1000 events
462        if history.len() > 1000 {
463            history.drain(0..100);
464        }
465    }
466
467    fn now() -> u64 {
468        SystemTime::now()
469            .duration_since(UNIX_EPOCH)
470            .map(|d| d.as_secs())
471            .unwrap_or(0)
472    }
473}