1use std::collections::{BTreeMap, HashMap, HashSet};
29use std::time::{SystemTime, UNIX_EPOCH};
30
31use parking_lot::RwLock;
32
33use super::entity::EntityId;
34use super::metadata::{Metadata, MetadataStorage, MetadataValue};
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
42pub enum EdgeDirection {
43 Outgoing,
44 Incoming,
45 Both,
46}
47
48#[derive(Debug, Clone)]
50pub struct AdjacencyEntry {
51 pub edge_id: EntityId,
53 pub neighbor_id: EntityId,
55 pub label: String,
57 pub weight: f32,
59}
60
61pub struct GraphAdjacencyIndex {
68 outgoing: RwLock<HashMap<EntityId, Vec<AdjacencyEntry>>>,
70 incoming: RwLock<HashMap<EntityId, Vec<AdjacencyEntry>>>,
72 by_label: RwLock<HashMap<String, HashSet<EntityId>>>,
74 edge_count: RwLock<usize>,
76 node_count: RwLock<usize>,
78}
79
80impl GraphAdjacencyIndex {
81 pub fn new() -> Self {
83 Self {
84 outgoing: RwLock::new(HashMap::new()),
85 incoming: RwLock::new(HashMap::new()),
86 by_label: RwLock::new(HashMap::new()),
87 edge_count: RwLock::new(0),
88 node_count: RwLock::new(0),
89 }
90 }
91
92 pub fn index_edge(
94 &self,
95 edge_id: EntityId,
96 source_id: EntityId,
97 target_id: EntityId,
98 label: &str,
99 weight: f32,
100 ) {
101 {
103 let mut outgoing = self.outgoing.write();
104 let entry = AdjacencyEntry {
105 edge_id,
106 neighbor_id: target_id,
107 label: label.to_string(),
108 weight,
109 };
110 outgoing.entry(source_id).or_default().push(entry);
111 }
112
113 {
115 let mut incoming = self.incoming.write();
116 let entry = AdjacencyEntry {
117 edge_id,
118 neighbor_id: source_id,
119 label: label.to_string(),
120 weight,
121 };
122 incoming.entry(target_id).or_default().push(entry);
123 }
124
125 {
127 let mut by_label = self.by_label.write();
128 by_label
129 .entry(label.to_string())
130 .or_default()
131 .insert(edge_id);
132 }
133
134 {
136 let mut count = self.edge_count.write();
137 *count += 1;
138 }
139
140 self.update_node_count();
142 }
143
144 pub fn remove_edge(&self, edge_id: EntityId) {
146 {
148 let mut outgoing = self.outgoing.write();
149 for entries in outgoing.values_mut() {
150 entries.retain(|e| e.edge_id != edge_id);
151 }
152 }
153
154 {
156 let mut incoming = self.incoming.write();
157 for entries in incoming.values_mut() {
158 entries.retain(|e| e.edge_id != edge_id);
159 }
160 }
161
162 {
164 let mut by_label = self.by_label.write();
165 for edges in by_label.values_mut() {
166 edges.remove(&edge_id);
167 }
168 }
169
170 {
172 let mut count = self.edge_count.write();
173 *count = count.saturating_sub(1);
174 }
175 }
176
177 pub fn get_neighbors(
179 &self,
180 node_id: EntityId,
181 direction: EdgeDirection,
182 label_filter: Option<&str>,
183 ) -> Vec<AdjacencyEntry> {
184 let mut results = Vec::new();
185
186 if matches!(direction, EdgeDirection::Outgoing | EdgeDirection::Both) {
187 let outgoing = self.outgoing.read();
188 if let Some(entries) = outgoing.get(&node_id) {
189 for entry in entries {
190 if label_filter.is_none_or(|l| entry.label == l) {
191 results.push(entry.clone());
192 }
193 }
194 }
195 }
196
197 if matches!(direction, EdgeDirection::Incoming | EdgeDirection::Both) {
198 let incoming = self.incoming.read();
199 if let Some(entries) = incoming.get(&node_id) {
200 for entry in entries {
201 if label_filter.is_none_or(|l| entry.label == l) {
202 results.push(entry.clone());
203 }
204 }
205 }
206 }
207
208 results
209 }
210
211 pub fn get_edges_by_label(&self, label: &str) -> Vec<EntityId> {
213 let idx = self.by_label.read();
214 idx.get(label)
215 .map(|s| s.iter().copied().collect())
216 .unwrap_or_default()
217 }
218
219 pub fn out_degree(&self, node_id: EntityId) -> usize {
221 self.outgoing
222 .read()
223 .get(&node_id)
224 .map(|v| v.len())
225 .unwrap_or(0)
226 }
227
228 pub fn in_degree(&self, node_id: EntityId) -> usize {
230 self.incoming
231 .read()
232 .get(&node_id)
233 .map(|v| v.len())
234 .unwrap_or(0)
235 }
236
237 pub fn degree(&self, node_id: EntityId) -> usize {
239 self.out_degree(node_id) + self.in_degree(node_id)
240 }
241
242 pub fn edge_count(&self) -> usize {
244 *self.edge_count.read()
245 }
246
247 pub fn node_count(&self) -> usize {
249 *self.node_count.read()
250 }
251
252 pub fn clear(&self) {
254 self.outgoing.write().clear();
255 self.incoming.write().clear();
256 self.by_label.write().clear();
257 *self.edge_count.write() = 0;
258 *self.node_count.write() = 0;
259 }
260
261 fn update_node_count(&self) {
262 let out_nodes: HashSet<_> = self.outgoing.read().keys().copied().collect();
263 let in_nodes: HashSet<_> = self.incoming.read().keys().copied().collect();
264
265 let total: HashSet<_> = out_nodes.union(&in_nodes).collect();
266 *self.node_count.write() = total.len();
267 }
268}
269
270impl Default for GraphAdjacencyIndex {
271 fn default() -> Self {
272 Self::new()
273 }
274}
275
276#[derive(Debug, Clone)]
282pub struct TokenPosition {
283 pub entity_id: EntityId,
284 pub field: String,
285 pub position: u32,
286}
287
288#[derive(Debug, Clone)]
290pub struct PostingEntry {
291 pub entity_id: EntityId,
292 pub collection: String,
293 pub field: String,
294 pub positions: Vec<u32>,
295 pub term_frequency: f32,
296}
297
298pub struct InvertedIndex {
300 index: RwLock<BTreeMap<String, Vec<PostingEntry>>>,
302 doc_count: RwLock<usize>,
304 indexed_fields: RwLock<HashMap<String, HashSet<String>>>,
306}
307
308impl InvertedIndex {
309 pub fn new() -> Self {
311 Self {
312 index: RwLock::new(BTreeMap::new()),
313 doc_count: RwLock::new(0),
314 indexed_fields: RwLock::new(HashMap::new()),
315 }
316 }
317
318 pub fn add_indexed_field(&self, collection: &str, field: &str) {
320 self.indexed_fields
321 .write()
322 .entry(collection.to_string())
323 .or_default()
324 .insert(field.to_string());
325 }
326
327 pub fn index_document(
329 &self,
330 collection: &str,
331 entity_id: EntityId,
332 field: &str,
333 content: &str,
334 ) {
335 let tokens = self.tokenize(content);
336 let term_count = tokens.len() as f32;
337
338 let mut term_freqs: HashMap<String, Vec<u32>> = HashMap::new();
340 for (position, token) in tokens.iter().enumerate() {
341 term_freqs
342 .entry(token.clone())
343 .or_default()
344 .push(position as u32);
345 }
346
347 {
349 let mut index = self.index.write();
350 for (term, positions) in term_freqs {
351 let tf = positions.len() as f32 / term_count.max(1.0);
352
353 let entry = PostingEntry {
354 entity_id,
355 collection: collection.to_string(),
356 field: field.to_string(),
357 positions,
358 term_frequency: tf,
359 };
360
361 index.entry(term).or_default().push(entry);
362 }
363 }
364
365 *self.doc_count.write() += 1;
367 }
368
369 pub fn remove_document(&self, entity_id: EntityId) {
371 let mut index = self.index.write();
372 for postings in index.values_mut() {
373 postings.retain(|p| p.entity_id != entity_id);
374 }
375 }
376
377 pub fn search(&self, query: &str, limit: usize) -> Vec<TextSearchResult> {
379 let terms = self.tokenize(query);
380 if terms.is_empty() {
381 return Vec::new();
382 }
383
384 let index = self.index.read();
385
386 let doc_count = *self.doc_count.read();
387
388 let mut term_postings: Vec<&Vec<PostingEntry>> = Vec::new();
390 for term in &terms {
391 if let Some(postings) = index.get(term) {
392 term_postings.push(postings);
393 } else {
394 return Vec::new();
396 }
397 }
398
399 let mut scores: HashMap<EntityId, f32> = HashMap::new();
401
402 if let Some(first_postings) = term_postings.first() {
404 for posting in *first_postings {
405 let idf = ((doc_count as f32) / (first_postings.len() as f32 + 1.0)).ln();
406 scores.insert(posting.entity_id, posting.term_frequency * idf);
407 }
408 }
409
410 for postings in term_postings.iter().skip(1) {
412 let idf = ((doc_count as f32) / (postings.len() as f32 + 1.0)).ln();
413 let entities_in_term: HashSet<EntityId> =
414 postings.iter().map(|p| p.entity_id).collect();
415
416 scores.retain(|id, _| entities_in_term.contains(id));
418
419 for posting in *postings {
421 if let Some(score) = scores.get_mut(&posting.entity_id) {
422 *score += posting.term_frequency * idf;
423 }
424 }
425 }
426
427 let mut results: Vec<TextSearchResult> = scores
429 .into_iter()
430 .map(|(entity_id, score)| {
431 let collection = term_postings
433 .first()
434 .and_then(|p| p.iter().find(|e| e.entity_id == entity_id))
435 .map(|p| p.collection.clone())
436 .unwrap_or_default();
437
438 TextSearchResult {
439 entity_id,
440 collection,
441 score,
442 matched_terms: terms.clone(),
443 }
444 })
445 .collect();
446
447 results.sort_by(|a, b| {
448 b.score
449 .partial_cmp(&a.score)
450 .unwrap_or(std::cmp::Ordering::Equal)
451 });
452 results.truncate(limit);
453 results
454 }
455
456 pub fn search_prefix(&self, prefix: &str, limit: usize) -> Vec<String> {
458 let prefix_lower = prefix.to_lowercase();
459
460 let index = self.index.read();
461
462 index
463 .range(prefix_lower.clone()..)
464 .take_while(|(term, _)| term.starts_with(&prefix_lower))
465 .take(limit)
466 .map(|(term, _)| term.clone())
467 .collect()
468 }
469
470 fn tokenize(&self, text: &str) -> Vec<String> {
472 text.to_lowercase()
473 .split(|c: char| !c.is_alphanumeric())
474 .filter(|s| s.len() >= 2)
475 .map(|s| s.to_string())
476 .collect()
477 }
478}
479
480impl Default for InvertedIndex {
481 fn default() -> Self {
482 Self::new()
483 }
484}
485
486#[derive(Debug, Clone)]
488pub struct TextSearchResult {
489 pub entity_id: EntityId,
490 pub collection: String,
491 pub score: f32,
492 pub matched_terms: Vec<String>,
493}
494
495#[derive(Debug, Clone)]
501pub struct IntegratedIndexConfig {
502 pub enable_hnsw: bool,
504 pub enable_fulltext: bool,
506 pub enable_metadata: bool,
508 pub enable_graph: bool,
510 pub hnsw_m: usize,
512 pub hnsw_ef_construction: usize,
514 pub hnsw_ef_search: usize,
516}
517
518impl Default for IntegratedIndexConfig {
519 fn default() -> Self {
520 Self {
521 enable_hnsw: true,
522 enable_fulltext: true,
523 enable_metadata: true,
524 enable_graph: true,
525 hnsw_m: 16,
526 hnsw_ef_construction: 100,
527 hnsw_ef_search: 50,
528 }
529 }
530}
531
532#[derive(Debug, Clone, Default)]
534pub struct IndexStats {
535 pub vector_count: usize,
537 pub document_count: usize,
539 pub term_count: usize,
541 pub metadata_entries: usize,
543 pub graph_node_count: usize,
545 pub graph_edge_count: usize,
547 pub created_at: u64,
549 pub updated_at: u64,
551}
552
553#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
555pub enum IndexType {
556 Hnsw,
558 Fulltext,
560 Metadata,
562 Graph,
564}
565
566#[derive(Debug, Clone)]
568pub enum IndexStatus {
569 Ready,
571 Building { progress: f32 },
573 Stale,
575 Disabled,
577 Error(String),
579}
580
581#[derive(Debug, Clone)]
583pub struct IndexEvent {
584 pub index_type: IndexType,
585 pub collection: Option<String>,
586 pub event: IndexEventKind,
587 pub timestamp: u64,
588}
589
590#[derive(Debug, Clone)]
591pub enum IndexEventKind {
592 Created,
593 Dropped,
594 Rebuilt,
595 Updated { entries_affected: usize },
596}
597
598pub struct IntegratedIndexManager {
600 config: IntegratedIndexConfig,
602 text_index: InvertedIndex,
604 metadata_index: RwLock<MetadataStorage>,
606 hnsw_indices: RwLock<HashMap<String, HnswIndexInfo>>,
609 graph_index: GraphAdjacencyIndex,
611 index_status: RwLock<HashMap<(IndexType, Option<String>), IndexStatus>>,
613 event_history: RwLock<Vec<IndexEvent>>,
615 created_at: u64,
617}
618
619struct HnswIndexInfo {
621 dimension: usize,
623 vectors: HashMap<EntityId, Vec<f32>>,
625 entry_point: Option<EntityId>,
628}
629
630pub mod incremental;
631mod manager_impl;
632pub use incremental::{IncrementalIndexMaintainer, IndexDeltaOp, SecondaryIndexHandle};
633impl Default for IntegratedIndexManager {
634 fn default() -> Self {
635 Self::new()
636 }
637}
638
639#[derive(Debug, Clone)]
641pub struct VectorSearchResult {
642 pub entity_id: EntityId,
643 pub collection: String,
644 pub similarity: f32,
645}
646
647#[derive(Debug, Clone)]
649pub enum MetadataQueryFilter {
650 Equals(MetadataValue),
652 Range {
654 min: Option<MetadataValue>,
655 max: Option<MetadataValue>,
656 },
657 Contains(String),
659 In(Vec<MetadataValue>),
661}
662
663fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
669 if a.len() != b.len() || a.is_empty() {
670 return 0.0;
671 }
672
673 let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
674 let norm_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
675 let norm_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
676
677 if norm_a == 0.0 || norm_b == 0.0 {
678 return 0.0;
679 }
680
681 dot / (norm_a * norm_b)
682}
683
684#[cfg(test)]
689mod tests {
690 use super::*;
691
692 #[test]
693 fn test_inverted_index_basic() {
694 let index = InvertedIndex::new();
695
696 index.index_document(
697 "docs",
698 EntityId(1),
699 "content",
700 "hello world rust programming",
701 );
702 index.index_document("docs", EntityId(2), "content", "rust is fast and safe");
703 index.index_document("docs", EntityId(3), "content", "python is easy to learn");
704
705 let results = index.search("rust", 10);
706 assert_eq!(results.len(), 2);
707 assert!(results.iter().any(|r| r.entity_id == EntityId(1)));
708 assert!(results.iter().any(|r| r.entity_id == EntityId(2)));
709 }
710
711 #[test]
712 fn test_inverted_index_and_query() {
713 let index = InvertedIndex::new();
714
715 index.index_document("docs", EntityId(1), "content", "rust programming language");
716 index.index_document("docs", EntityId(2), "content", "rust systems programming");
717 index.index_document(
718 "docs",
719 EntityId(3),
720 "content",
721 "python programming language",
722 );
723
724 let results = index.search("rust programming", 10);
726 assert_eq!(results.len(), 2);
727
728 let results = index.search("language programming", 10);
731 assert_eq!(results.len(), 2);
732 }
733
734 #[test]
735 fn test_prefix_search() {
736 let index = InvertedIndex::new();
737
738 index.index_document("docs", EntityId(1), "content", "programming rust rustacean");
739
740 let suggestions = index.search_prefix("rust", 10);
741 assert!(suggestions.contains(&"rust".to_string()));
742 assert!(suggestions.contains(&"rustacean".to_string()));
743 }
744
745 #[test]
746 fn test_vector_search() {
747 let manager = IntegratedIndexManager::new();
748
749 manager.index_vector("embeddings", EntityId(1), &[1.0, 0.0, 0.0]);
750 manager.index_vector("embeddings", EntityId(2), &[0.9, 0.1, 0.0]);
751 manager.index_vector("embeddings", EntityId(3), &[0.0, 1.0, 0.0]);
752
753 let results = manager.search_similar("embeddings", &[1.0, 0.0, 0.0], 2);
754 assert_eq!(results.len(), 2);
755 assert_eq!(results[0].entity_id, EntityId(1));
756 assert!(results[0].similarity > 0.99);
757 }
758
759 #[test]
760 fn test_cosine_similarity() {
761 let a = [1.0, 0.0, 0.0];
762 let b = [1.0, 0.0, 0.0];
763 assert!((cosine_similarity(&a, &b) - 1.0).abs() < 0.001);
764
765 let c = [0.0, 1.0, 0.0];
766 assert!(cosine_similarity(&a, &c).abs() < 0.001);
767 }
768
769 #[test]
774 fn test_graph_adjacency_basic() {
775 let index = GraphAdjacencyIndex::new();
776
777 index.index_edge(EntityId(100), EntityId(1), EntityId(2), "KNOWS", 1.0);
779 index.index_edge(EntityId(101), EntityId(2), EntityId(3), "KNOWS", 1.0);
780
781 let neighbors = index.get_neighbors(EntityId(1), EdgeDirection::Outgoing, None);
783 assert_eq!(neighbors.len(), 1);
784 assert_eq!(neighbors[0].neighbor_id, EntityId(2));
785
786 let neighbors = index.get_neighbors(EntityId(2), EdgeDirection::Incoming, None);
788 assert_eq!(neighbors.len(), 1);
789 assert_eq!(neighbors[0].neighbor_id, EntityId(1));
790
791 let neighbors = index.get_neighbors(EntityId(2), EdgeDirection::Both, None);
793 assert_eq!(neighbors.len(), 2);
794 }
795
796 #[test]
797 fn test_graph_adjacency_label_filter() {
798 let index = GraphAdjacencyIndex::new();
799
800 index.index_edge(EntityId(100), EntityId(1), EntityId(2), "KNOWS", 1.0);
802 index.index_edge(EntityId(101), EntityId(1), EntityId(3), "WORKS_WITH", 1.0);
803 index.index_edge(EntityId(102), EntityId(1), EntityId(4), "KNOWS", 1.0);
804
805 let neighbors = index.get_neighbors(EntityId(1), EdgeDirection::Outgoing, Some("KNOWS"));
807 assert_eq!(neighbors.len(), 2);
808
809 let neighbors =
811 index.get_neighbors(EntityId(1), EdgeDirection::Outgoing, Some("WORKS_WITH"));
812 assert_eq!(neighbors.len(), 1);
813 assert_eq!(neighbors[0].neighbor_id, EntityId(3));
814 }
815
816 #[test]
817 fn test_graph_adjacency_degree() {
818 let index = GraphAdjacencyIndex::new();
819
820 index.index_edge(EntityId(100), EntityId(1), EntityId(2), "LINK", 1.0);
822 index.index_edge(EntityId(101), EntityId(1), EntityId(3), "LINK", 1.0);
823 index.index_edge(EntityId(102), EntityId(1), EntityId(4), "LINK", 1.0);
824 index.index_edge(EntityId(103), EntityId(1), EntityId(5), "LINK", 1.0);
825
826 assert_eq!(index.out_degree(EntityId(1)), 4);
827 assert_eq!(index.in_degree(EntityId(1)), 0);
828 assert_eq!(index.degree(EntityId(1)), 4);
829
830 assert_eq!(index.in_degree(EntityId(2)), 1);
832 assert_eq!(index.out_degree(EntityId(2)), 0);
833 }
834
835 #[test]
836 fn test_graph_adjacency_edge_by_label() {
837 let index = GraphAdjacencyIndex::new();
838
839 index.index_edge(EntityId(100), EntityId(1), EntityId(2), "A", 1.0);
840 index.index_edge(EntityId(101), EntityId(2), EntityId(3), "B", 1.0);
841 index.index_edge(EntityId(102), EntityId(3), EntityId(4), "A", 1.0);
842
843 let edges_a = index.get_edges_by_label("A");
844 assert_eq!(edges_a.len(), 2);
845 assert!(edges_a.contains(&EntityId(100)));
846 assert!(edges_a.contains(&EntityId(102)));
847
848 let edges_b = index.get_edges_by_label("B");
849 assert_eq!(edges_b.len(), 1);
850 assert!(edges_b.contains(&EntityId(101)));
851 }
852
853 #[test]
854 fn test_graph_adjacency_remove() {
855 let index = GraphAdjacencyIndex::new();
856
857 index.index_edge(EntityId(100), EntityId(1), EntityId(2), "LINK", 1.0);
858 index.index_edge(EntityId(101), EntityId(1), EntityId(3), "LINK", 1.0);
859
860 assert_eq!(index.edge_count(), 2);
861
862 index.remove_edge(EntityId(100));
864
865 assert_eq!(index.edge_count(), 1);
866 let neighbors = index.get_neighbors(EntityId(1), EdgeDirection::Outgoing, None);
867 assert_eq!(neighbors.len(), 1);
868 assert_eq!(neighbors[0].neighbor_id, EntityId(3));
869 }
870
871 #[test]
876 fn test_index_lifecycle_create_drop() {
877 let manager = IntegratedIndexManager::new();
878
879 let result = manager.create_index(IndexType::Hnsw, Some("my_collection"));
881 assert!(result.is_ok());
882
883 let status = manager.index_status(IndexType::Hnsw, Some("my_collection"));
885 assert!(matches!(status, IndexStatus::Ready));
886
887 let result = manager.drop_index(IndexType::Hnsw, Some("my_collection"));
889 assert!(result.is_ok());
890 }
891
892 #[test]
893 fn test_index_lifecycle_rebuild() {
894 let manager = IntegratedIndexManager::new();
895
896 manager.index_vector("test", EntityId(1), &[1.0, 0.0, 0.0]);
898 manager.index_vector("test", EntityId(2), &[0.0, 1.0, 0.0]);
899
900 let result = manager.rebuild_index(IndexType::Hnsw, Some("test"));
902 assert!(result.is_ok());
903
904 let status = manager.index_status(IndexType::Hnsw, Some("test"));
906 assert!(matches!(status, IndexStatus::Ready));
907 }
908
909 #[test]
910 fn test_index_stats_with_graph() {
911 let manager = IntegratedIndexManager::new();
912
913 manager.index_edge(EntityId(100), EntityId(1), EntityId(2), "LINK", 1.0);
915 manager.index_edge(EntityId(101), EntityId(2), EntityId(3), "LINK", 1.0);
916
917 let stats = manager.stats();
918 assert_eq!(stats.graph_edge_count, 2);
919 assert!(stats.graph_node_count >= 2); }
921
922 #[test]
923 fn test_integrated_manager_graph_operations() {
924 let manager = IntegratedIndexManager::new();
925
926 manager.index_edge(EntityId(100), EntityId(1), EntityId(2), "KNOWS", 1.0);
928 manager.index_edge(EntityId(101), EntityId(2), EntityId(3), "KNOWS", 0.5);
929
930 let neighbors = manager.get_neighbors(EntityId(1), EdgeDirection::Outgoing, None);
932 assert_eq!(neighbors.len(), 1);
933 assert_eq!(neighbors[0].neighbor_id, EntityId(2));
934 assert_eq!(neighbors[0].weight, 1.0);
935
936 assert_eq!(manager.node_degree(EntityId(2), EdgeDirection::Both), 2);
938 }
939}