1use std::collections::{HashMap, HashSet, VecDeque};
14use std::time::{SystemTime, UNIX_EPOCH};
15
16use serde::{Deserialize, Serialize};
17
18#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
24pub enum SubgraphEventType {
25 NodeAdded,
27 NodeRemoved,
29 EdgeAdded,
31 EdgeRemoved,
33 SubgraphMatch(String),
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct SubgraphEvent {
40 pub event_type: SubgraphEventType,
42 pub node: String,
44 pub timestamp: i64,
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
54pub enum SubgraphFilter {
55 HasLabel(String),
57 MaxDegree(usize),
59 MinDegree(usize),
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
65pub struct SubgraphPattern {
66 pub anchor: String,
68 pub predicates: Vec<String>,
70 pub depth: usize,
72 pub filter: Option<SubgraphFilter>,
74}
75
76type AdjList = HashMap<String, Vec<(String, String)>>;
82
83pub struct StreamingSubgraphExtractor {
85 graph: AdjList,
87 buffer: VecDeque<SubgraphEvent>,
89 max_depth: usize,
91 max_nodes: usize,
93 events_total: u64,
95}
96
97impl StreamingSubgraphExtractor {
98 pub fn new(max_depth: usize, max_nodes: usize) -> Self {
103 Self {
104 graph: HashMap::new(),
105 buffer: VecDeque::new(),
106 max_depth,
107 max_nodes,
108 events_total: 0,
109 }
110 }
111
112 pub fn add_triple(&mut self, s: &str, p: &str, o: &str) -> Vec<SubgraphEvent> {
116 let now = unix_ms();
117 let mut events = Vec::new();
118
119 let subject_is_new = !self.graph.contains_key(s);
121 let edges = self.graph.entry(s.to_string()).or_default();
122
123 if !edges.iter().any(|(ep, eo)| ep == p && eo == o) {
125 edges.push((p.to_string(), o.to_string()));
126
127 if subject_is_new {
128 let ev = SubgraphEvent {
129 event_type: SubgraphEventType::NodeAdded,
130 node: s.to_string(),
131 timestamp: now,
132 };
133 events.push(ev.clone());
134 self.buffer.push_back(ev);
135 }
136
137 let ev = SubgraphEvent {
138 event_type: SubgraphEventType::EdgeAdded,
139 node: s.to_string(),
140 timestamp: now,
141 };
142 events.push(ev.clone());
143 self.buffer.push_back(ev);
144
145 if !self.graph.contains_key(o) {
147 self.graph.entry(o.to_string()).or_default();
149 let ev = SubgraphEvent {
150 event_type: SubgraphEventType::NodeAdded,
151 node: o.to_string(),
152 timestamp: now,
153 };
154 events.push(ev.clone());
155 self.buffer.push_back(ev);
156 }
157 }
158
159 self.events_total += events.len() as u64;
160 events
161 }
162
163 pub fn remove_triple(&mut self, s: &str, p: &str, o: &str) -> Vec<SubgraphEvent> {
167 let now = unix_ms();
168 let mut events = Vec::new();
169
170 let removed = if let Some(edges) = self.graph.get_mut(s) {
171 let before = edges.len();
172 edges.retain(|(ep, eo)| !(ep == p && eo == o));
173 edges.len() < before
174 } else {
175 false
176 };
177
178 if removed {
179 let ev = SubgraphEvent {
180 event_type: SubgraphEventType::EdgeRemoved,
181 node: s.to_string(),
182 timestamp: now,
183 };
184 events.push(ev.clone());
185 self.buffer.push_back(ev);
186
187 if self.graph.get(s).map(|e| e.is_empty()).unwrap_or(false) {
189 self.graph.remove(s);
190 let ev = SubgraphEvent {
191 event_type: SubgraphEventType::NodeRemoved,
192 node: s.to_string(),
193 timestamp: now,
194 };
195 events.push(ev.clone());
196 self.buffer.push_back(ev);
197 }
198 }
199
200 self.events_total += events.len() as u64;
201 events
202 }
203
204 pub fn extract_subgraph(&self, pattern: &SubgraphPattern) -> Vec<(String, String, String)> {
209 let anchors: Vec<&str> = if pattern.anchor == "?x" {
210 self.graph.keys().map(|s| s.as_str()).collect()
211 } else {
212 vec![pattern.anchor.as_str()]
213 };
214
215 let pred_filter: Option<HashSet<&str>> = if pattern.predicates.is_empty() {
216 None
217 } else {
218 Some(pattern.predicates.iter().map(|s| s.as_str()).collect())
219 };
220
221 let mut visited: HashSet<String> = HashSet::new();
222 let mut result: Vec<(String, String, String)> = Vec::new();
223 let mut queue: VecDeque<(String, usize)> = VecDeque::new();
224
225 for anchor in anchors {
226 if visited.len() >= self.max_nodes {
227 break;
228 }
229 if !visited.contains(anchor) {
230 queue.push_back((anchor.to_string(), 0));
231 visited.insert(anchor.to_string());
232 }
233 }
234
235 while let Some((node, depth)) = queue.pop_front() {
236 if visited.len() > self.max_nodes {
237 break;
238 }
239
240 if let Some(edges) = self.graph.get(&node) {
241 for (pred, obj) in edges {
242 if let Some(ref pf) = pred_filter {
244 if !pf.contains(pred.as_str()) {
245 continue;
246 }
247 }
248
249 if let Some(ref filter) = pattern.filter {
251 if !self.node_passes_filter(&node, filter) {
252 continue;
253 }
254 }
255
256 result.push((node.clone(), pred.clone(), obj.clone()));
257
258 if depth < pattern.depth && !visited.contains(obj) {
259 visited.insert(obj.clone());
260 queue.push_back((obj.clone(), depth + 1));
261 }
262 }
263 }
264 }
265
266 result
267 }
268
269 pub fn extract_neighborhood(&self, node: &str, depth: usize) -> Vec<(String, String, String)> {
271 let pattern = SubgraphPattern {
272 anchor: node.to_string(),
273 predicates: vec![],
274 depth,
275 filter: None,
276 };
277 self.extract_subgraph(&pattern)
278 }
279
280 pub fn drain_events(&mut self) -> Vec<SubgraphEvent> {
282 self.buffer.drain(..).collect()
283 }
284
285 pub fn node_count(&self) -> usize {
287 self.graph.len()
288 }
289
290 pub fn edge_count(&self) -> usize {
292 self.graph.values().map(|e| e.len()).sum()
293 }
294
295 fn node_passes_filter(&self, node: &str, filter: &SubgraphFilter) -> bool {
298 match filter {
299 SubgraphFilter::HasLabel(label) => {
300 self.graph
303 .get(node)
304 .map(|edges| edges.iter().any(|(_, o)| o.contains(label.as_str())))
305 .unwrap_or(false)
306 }
307 SubgraphFilter::MaxDegree(max) => {
308 self.graph.get(node).map(|e| e.len()).unwrap_or(0) <= *max
309 }
310 SubgraphFilter::MinDegree(min) => {
311 self.graph.get(node).map(|e| e.len()).unwrap_or(0) >= *min
312 }
313 }
314 }
315}
316
317#[derive(Debug, Clone, Serialize, Deserialize)]
323pub struct StreamingStats {
324 pub nodes: usize,
326 pub edges: usize,
328 pub cache_hit_rate: f64,
330 pub events_processed: u64,
332}
333
334pub struct StreamingGraphRag {
341 extractor: StreamingSubgraphExtractor,
342 query_cache: HashMap<String, Vec<(String, String, String)>>,
343 cache_ttl_ms: i64,
344 cache_timestamps: HashMap<String, i64>,
345 cache_hits: u64,
346 cache_misses: u64,
347 events_processed: u64,
348}
349
350impl StreamingGraphRag {
351 pub fn new(max_depth: usize) -> Self {
356 Self::with_cache_ttl(max_depth, 60_000) }
358
359 pub fn with_cache_ttl(max_depth: usize, cache_ttl_ms: i64) -> Self {
361 Self {
362 extractor: StreamingSubgraphExtractor::new(max_depth, 10_000),
363 query_cache: HashMap::new(),
364 cache_ttl_ms,
365 cache_timestamps: HashMap::new(),
366 cache_hits: 0,
367 cache_misses: 0,
368 events_processed: 0,
369 }
370 }
371
372 pub fn process_event(&mut self, s: &str, p: &str, o: &str) {
374 let events = self.extractor.add_triple(s, p, o);
375 self.events_processed += events.len() as u64;
376
377 let dirty: Vec<String> = self
379 .query_cache
380 .keys()
381 .filter(|k| k.contains(s) || k.contains(o))
382 .cloned()
383 .collect();
384 for key in dirty {
385 self.query_cache.remove(&key);
386 self.cache_timestamps.remove(&key);
387 }
388 }
389
390 pub fn query_live(&mut self, pattern: &SubgraphPattern) -> Vec<(String, String, String)> {
392 let cache_key = pattern_cache_key(pattern);
393 let now = unix_ms();
394
395 if let Some(result) = self.query_cache.get(&cache_key) {
396 let cached_at = self.cache_timestamps.get(&cache_key).copied().unwrap_or(0);
397 let still_fresh = self.cache_ttl_ms == 0 || now - cached_at < self.cache_ttl_ms;
398 if still_fresh {
399 self.cache_hits += 1;
400 return result.clone();
401 }
402 }
403
404 self.cache_misses += 1;
405 let result = self.extractor.extract_subgraph(pattern);
406 self.query_cache.insert(cache_key.clone(), result.clone());
407 self.cache_timestamps.insert(cache_key, now);
408 result
409 }
410
411 pub fn stats(&self) -> StreamingStats {
413 let total = self.cache_hits + self.cache_misses;
414 let cache_hit_rate = if total == 0 {
415 f64::NAN
416 } else {
417 self.cache_hits as f64 / total as f64
418 };
419 StreamingStats {
420 nodes: self.extractor.node_count(),
421 edges: self.extractor.edge_count(),
422 cache_hit_rate,
423 events_processed: self.events_processed,
424 }
425 }
426
427 pub fn drain_events(&mut self) -> Vec<SubgraphEvent> {
429 self.extractor.drain_events()
430 }
431}
432
433fn unix_ms() -> i64 {
438 SystemTime::now()
439 .duration_since(UNIX_EPOCH)
440 .map(|d| d.as_millis() as i64)
441 .unwrap_or(0)
442}
443
444fn pattern_cache_key(p: &SubgraphPattern) -> String {
445 format!("{}|{}|{}", p.anchor, p.predicates.join(","), p.depth)
446}
447
448#[cfg(test)]
453mod tests {
454 use super::*;
455
456 fn make_extractor() -> StreamingSubgraphExtractor {
457 StreamingSubgraphExtractor::new(3, 1000)
458 }
459
460 #[test]
463 fn test_add_triple_emits_node_added_for_new_subject() {
464 let mut ext = make_extractor();
465 let events = ext.add_triple("Alice", "knows", "Bob");
466 let types: Vec<_> = events.iter().map(|e| &e.event_type).collect();
467 assert!(types.contains(&&SubgraphEventType::NodeAdded));
468 }
469
470 #[test]
471 fn test_add_triple_emits_edge_added() {
472 let mut ext = make_extractor();
473 let events = ext.add_triple("Alice", "knows", "Bob");
474 assert!(events
475 .iter()
476 .any(|e| e.event_type == SubgraphEventType::EdgeAdded));
477 }
478
479 #[test]
480 fn test_add_triple_does_not_duplicate_existing_edge() {
481 let mut ext = make_extractor();
482 ext.add_triple("Alice", "knows", "Bob");
483 let events = ext.add_triple("Alice", "knows", "Bob"); assert!(events.is_empty()); }
486
487 #[test]
488 fn test_add_triple_increases_node_and_edge_count() {
489 let mut ext = make_extractor();
490 ext.add_triple("A", "p", "B");
491 assert_eq!(ext.node_count(), 2);
492 assert_eq!(ext.edge_count(), 1);
493 }
494
495 #[test]
496 fn test_add_triple_new_object_emits_node_added() {
497 let mut ext = make_extractor();
498 let events = ext.add_triple("A", "p", "B");
499 let node_added_nodes: Vec<_> = events
500 .iter()
501 .filter(|e| e.event_type == SubgraphEventType::NodeAdded)
502 .map(|e| e.node.as_str())
503 .collect();
504 assert!(node_added_nodes.contains(&"B"));
505 }
506
507 #[test]
510 fn test_remove_triple_emits_edge_removed() {
511 let mut ext = make_extractor();
512 ext.add_triple("Alice", "knows", "Bob");
513 ext.drain_events();
514 let events = ext.remove_triple("Alice", "knows", "Bob");
515 assert!(events
516 .iter()
517 .any(|e| e.event_type == SubgraphEventType::EdgeRemoved));
518 }
519
520 #[test]
521 fn test_remove_triple_emits_node_removed_when_no_edges_left() {
522 let mut ext = make_extractor();
523 ext.add_triple("Alice", "knows", "Bob");
524 ext.drain_events();
525 let events = ext.remove_triple("Alice", "knows", "Bob");
526 assert!(events
527 .iter()
528 .any(|e| e.event_type == SubgraphEventType::NodeRemoved));
529 }
530
531 #[test]
532 fn test_remove_nonexistent_triple_emits_nothing() {
533 let mut ext = make_extractor();
534 let events = ext.remove_triple("X", "p", "Y");
535 assert!(events.is_empty());
536 }
537
538 #[test]
539 fn test_remove_triple_decreases_edge_count() {
540 let mut ext = make_extractor();
541 ext.add_triple("A", "p", "B");
542 ext.add_triple("A", "q", "C");
543 ext.remove_triple("A", "p", "B");
544 assert_eq!(ext.edge_count(), 1);
545 }
546
547 #[test]
550 fn test_extract_subgraph_direct_anchor() {
551 let mut ext = make_extractor();
552 ext.add_triple("A", "p", "B");
553 ext.add_triple("A", "q", "C");
554 let pattern = SubgraphPattern {
555 anchor: "A".to_string(),
556 predicates: vec![],
557 depth: 1,
558 filter: None,
559 };
560 let triples = ext.extract_subgraph(&pattern);
561 assert_eq!(triples.len(), 2);
562 }
563
564 #[test]
565 fn test_extract_subgraph_predicate_filter() {
566 let mut ext = make_extractor();
567 ext.add_triple("A", "knows", "B");
568 ext.add_triple("A", "likes", "C");
569 let pattern = SubgraphPattern {
570 anchor: "A".to_string(),
571 predicates: vec!["knows".to_string()],
572 depth: 1,
573 filter: None,
574 };
575 let triples = ext.extract_subgraph(&pattern);
576 assert_eq!(triples.len(), 1);
577 assert_eq!(triples[0].1, "knows");
578 }
579
580 #[test]
581 fn test_extract_subgraph_variable_anchor() {
582 let mut ext = make_extractor();
583 ext.add_triple("A", "p", "B");
584 ext.add_triple("C", "p", "D");
585 let pattern = SubgraphPattern {
586 anchor: "?x".to_string(),
587 predicates: vec![],
588 depth: 0,
589 filter: None,
590 };
591 let triples = ext.extract_subgraph(&pattern);
592 assert!(!triples.is_empty());
594 }
595
596 #[test]
597 fn test_extract_subgraph_depth_limits_expansion() {
598 let mut ext = make_extractor();
599 ext.add_triple("A", "p", "B");
600 ext.add_triple("B", "p", "C");
601 ext.add_triple("C", "p", "D");
602 let pattern = SubgraphPattern {
604 anchor: "A".to_string(),
605 predicates: vec![],
606 depth: 1,
607 filter: None,
608 };
609 let triples = ext.extract_subgraph(&pattern);
610 let has_cd = triples.iter().any(|(s, _, o)| s == "C" && o == "D");
612 assert!(!has_cd, "Should not expand past depth 1");
613 }
614
615 #[test]
618 fn test_extract_neighborhood_basic() {
619 let mut ext = make_extractor();
620 ext.add_triple("Root", "p", "Child");
621 let triples = ext.extract_neighborhood("Root", 1);
622 assert!(!triples.is_empty());
623 assert_eq!(triples[0].0, "Root");
624 }
625
626 #[test]
627 fn test_extract_neighborhood_unknown_node_returns_empty() {
628 let ext = make_extractor();
629 let triples = ext.extract_neighborhood("Unknown", 2);
630 assert!(triples.is_empty());
631 }
632
633 #[test]
636 fn test_filter_min_degree() {
637 let mut ext = make_extractor();
638 ext.add_triple("A", "p", "B");
640 ext.add_triple("A", "q", "C");
641 let pattern = SubgraphPattern {
642 anchor: "A".to_string(),
643 predicates: vec![],
644 depth: 1,
645 filter: Some(SubgraphFilter::MinDegree(2)),
646 };
647 let triples = ext.extract_subgraph(&pattern);
648 assert!(!triples.is_empty());
650 }
651
652 #[test]
653 fn test_filter_max_degree() {
654 let mut ext = make_extractor();
655 ext.add_triple("A", "p1", "B");
656 ext.add_triple("A", "p2", "C");
657 ext.add_triple("A", "p3", "D");
658 let pattern = SubgraphPattern {
659 anchor: "A".to_string(),
660 predicates: vec![],
661 depth: 1,
662 filter: Some(SubgraphFilter::MaxDegree(1)),
663 };
664 let triples = ext.extract_subgraph(&pattern);
666 assert!(triples.is_empty());
667 }
668
669 #[test]
672 fn test_drain_events_clears_buffer() {
673 let mut ext = make_extractor();
674 ext.add_triple("A", "p", "B");
675 assert!(!ext.drain_events().is_empty());
676 assert!(ext.drain_events().is_empty());
677 }
678
679 #[test]
682 fn test_streaming_rag_process_event_updates_graph() {
683 let mut rag = StreamingGraphRag::new(3);
684 rag.process_event("Alice", "knows", "Bob");
685 let stats = rag.stats();
686 assert!(stats.nodes >= 2);
687 assert!(stats.edges >= 1);
688 }
689
690 #[test]
691 fn test_streaming_rag_cache_hit() {
692 let mut rag = StreamingGraphRag::new(3);
693 rag.process_event("A", "p", "B");
694
695 let pattern = SubgraphPattern {
696 anchor: "A".to_string(),
697 predicates: vec![],
698 depth: 1,
699 filter: None,
700 };
701
702 let _ = rag.query_live(&pattern); let _ = rag.query_live(&pattern); let stats = rag.stats();
705 assert!(stats.cache_hit_rate > 0.0);
706 }
707
708 #[test]
709 fn test_streaming_rag_cache_invalidated_on_event() {
710 let mut rag = StreamingGraphRag::new(3);
711 rag.process_event("A", "p", "B");
712
713 let pattern = SubgraphPattern {
714 anchor: "A".to_string(),
715 predicates: vec![],
716 depth: 1,
717 filter: None,
718 };
719
720 let r1 = rag.query_live(&pattern);
721 rag.process_event("A", "q", "C");
723 let r2 = rag.query_live(&pattern);
724 assert!(r2.len() >= r1.len());
726 }
727
728 #[test]
729 fn test_streaming_rag_stats_initial_nan_hit_rate() {
730 let rag = StreamingGraphRag::new(3);
731 let stats = rag.stats();
732 assert!(stats.cache_hit_rate.is_nan());
733 }
734
735 #[test]
736 fn test_streaming_rag_drain_events() {
737 let mut rag = StreamingGraphRag::new(3);
738 rag.process_event("X", "p", "Y");
739 let events = rag.drain_events();
740 assert!(!events.is_empty());
741 }
742
743 #[test]
744 fn test_streaming_stats_fields() {
745 let mut rag = StreamingGraphRag::new(2);
746 rag.process_event("A", "edge", "B");
747 let stats = rag.stats();
748 assert_eq!(stats.nodes, 2);
749 assert_eq!(stats.edges, 1);
750 }
751}