Skip to main content

oxirs_graphrag/streaming/
mod.rs

1//! Streaming subgraph extraction using SPARQL-like patterns.
2//!
3//! # Overview
4//!
5//! [`StreamingSubgraphExtractor`] maintains an in-memory adjacency graph built
6//! from `(subject, predicate, object)` triples added or removed at runtime.
7//! It emits [`SubgraphEvent`]s whenever the graph changes, and provides pattern-
8//! based subgraph extraction.
9//!
10//! [`StreamingGraphRag`] wraps the extractor and adds a query-result cache with
11//! a configurable TTL, tracking hit-rate statistics.
12
13use std::collections::{HashMap, HashSet, VecDeque};
14use std::time::{SystemTime, UNIX_EPOCH};
15
16use serde::{Deserialize, Serialize};
17
18// ─────────────────────────────────────────────────────────────────────────────
19// Event types
20// ─────────────────────────────────────────────────────────────────────────────
21
22/// Describes what changed in the streaming subgraph.
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
24pub enum SubgraphEventType {
25    /// A node (entity) was added to the graph for the first time
26    NodeAdded,
27    /// The last edge touching a node was removed, making it isolated
28    NodeRemoved,
29    /// An edge (triple) was added
30    EdgeAdded,
31    /// An edge (triple) was removed
32    EdgeRemoved,
33    /// A node matched a named subgraph pattern
34    SubgraphMatch(String),
35}
36
37/// A single event emitted by [`StreamingSubgraphExtractor`].
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct SubgraphEvent {
40    /// The kind of change
41    pub event_type: SubgraphEventType,
42    /// The focal node for this event
43    pub node: String,
44    /// Unix-ms timestamp of the event
45    pub timestamp: i64,
46}
47
48// ─────────────────────────────────────────────────────────────────────────────
49// Subgraph pattern
50// ─────────────────────────────────────────────────────────────────────────────
51
52/// Predicate filter for neighbourhood expansion.
53#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
54pub enum SubgraphFilter {
55    /// Include only nodes that have the given label predicate object
56    HasLabel(String),
57    /// Include only nodes whose out-degree is ≤ limit
58    MaxDegree(usize),
59    /// Include only nodes whose out-degree is ≥ limit
60    MinDegree(usize),
61}
62
63/// Describes which subgraph to extract from the live graph.
64#[derive(Debug, Clone, Serialize, Deserialize)]
65pub struct SubgraphPattern {
66    /// Anchor node URI, or `"?x"` to indicate "any reachable node"
67    pub anchor: String,
68    /// Predicates to follow during expansion (empty = follow all)
69    pub predicates: Vec<String>,
70    /// Maximum BFS depth from the anchor
71    pub depth: usize,
72    /// Optional additional filter applied to each candidate node
73    pub filter: Option<SubgraphFilter>,
74}
75
76// ─────────────────────────────────────────────────────────────────────────────
77// StreamingSubgraphExtractor
78// ─────────────────────────────────────────────────────────────────────────────
79
80/// Edge storage: subject → [(predicate, object)]
81type AdjList = HashMap<String, Vec<(String, String)>>;
82
83/// In-memory streaming subgraph manager.
84pub struct StreamingSubgraphExtractor {
85    /// Forward adjacency: subject → [(predicate, object)]
86    graph: AdjList,
87    /// Pending events not yet consumed by the caller
88    buffer: VecDeque<SubgraphEvent>,
89    /// Maximum BFS depth when extracting subgraphs
90    max_depth: usize,
91    /// Maximum number of nodes to include in a subgraph extraction
92    max_nodes: usize,
93    /// Total events ever emitted (including consumed ones)
94    events_total: u64,
95}
96
97impl StreamingSubgraphExtractor {
98    /// Create a new extractor.
99    ///
100    /// * `max_depth` – BFS depth limit for subgraph extraction
101    /// * `max_nodes` – node cap for subgraph extraction
102    pub fn new(max_depth: usize, max_nodes: usize) -> Self {
103        Self {
104            graph: HashMap::new(),
105            buffer: VecDeque::new(),
106            max_depth,
107            max_nodes,
108            events_total: 0,
109        }
110    }
111
112    /// Add a triple to the graph and emit the corresponding events.
113    ///
114    /// Returns the events produced by this mutation (also buffered internally).
115    pub fn add_triple(&mut self, s: &str, p: &str, o: &str) -> Vec<SubgraphEvent> {
116        let now = unix_ms();
117        let mut events = Vec::new();
118
119        // Detect if subject is a brand-new node
120        let subject_is_new = !self.graph.contains_key(s);
121        let edges = self.graph.entry(s.to_string()).or_default();
122
123        // Only add if not already present
124        if !edges.iter().any(|(ep, eo)| ep == p && eo == o) {
125            edges.push((p.to_string(), o.to_string()));
126
127            if subject_is_new {
128                let ev = SubgraphEvent {
129                    event_type: SubgraphEventType::NodeAdded,
130                    node: s.to_string(),
131                    timestamp: now,
132                };
133                events.push(ev.clone());
134                self.buffer.push_back(ev);
135            }
136
137            let ev = SubgraphEvent {
138                event_type: SubgraphEventType::EdgeAdded,
139                node: s.to_string(),
140                timestamp: now,
141            };
142            events.push(ev.clone());
143            self.buffer.push_back(ev);
144
145            // If object is new, emit NodeAdded for it too
146            if !self.graph.contains_key(o) {
147                // Insert empty adjacency to mark the node as known
148                self.graph.entry(o.to_string()).or_default();
149                let ev = SubgraphEvent {
150                    event_type: SubgraphEventType::NodeAdded,
151                    node: o.to_string(),
152                    timestamp: now,
153                };
154                events.push(ev.clone());
155                self.buffer.push_back(ev);
156            }
157        }
158
159        self.events_total += events.len() as u64;
160        events
161    }
162
163    /// Remove a triple from the graph and emit the corresponding events.
164    ///
165    /// Returns the events produced by this mutation.
166    pub fn remove_triple(&mut self, s: &str, p: &str, o: &str) -> Vec<SubgraphEvent> {
167        let now = unix_ms();
168        let mut events = Vec::new();
169
170        let removed = if let Some(edges) = self.graph.get_mut(s) {
171            let before = edges.len();
172            edges.retain(|(ep, eo)| !(ep == p && eo == o));
173            edges.len() < before
174        } else {
175            false
176        };
177
178        if removed {
179            let ev = SubgraphEvent {
180                event_type: SubgraphEventType::EdgeRemoved,
181                node: s.to_string(),
182                timestamp: now,
183            };
184            events.push(ev.clone());
185            self.buffer.push_back(ev);
186
187            // If subject now has no edges, emit NodeRemoved
188            if self.graph.get(s).map(|e| e.is_empty()).unwrap_or(false) {
189                self.graph.remove(s);
190                let ev = SubgraphEvent {
191                    event_type: SubgraphEventType::NodeRemoved,
192                    node: s.to_string(),
193                    timestamp: now,
194                };
195                events.push(ev.clone());
196                self.buffer.push_back(ev);
197            }
198        }
199
200        self.events_total += events.len() as u64;
201        events
202    }
203
204    /// Extract `(subject, predicate, object)` triples matching `pattern`.
205    ///
206    /// If `pattern.anchor` is `"?x"` the extractor uses all known subjects
207    /// as starting points (up to `max_nodes`).
208    pub fn extract_subgraph(&self, pattern: &SubgraphPattern) -> Vec<(String, String, String)> {
209        let anchors: Vec<&str> = if pattern.anchor == "?x" {
210            self.graph.keys().map(|s| s.as_str()).collect()
211        } else {
212            vec![pattern.anchor.as_str()]
213        };
214
215        let pred_filter: Option<HashSet<&str>> = if pattern.predicates.is_empty() {
216            None
217        } else {
218            Some(pattern.predicates.iter().map(|s| s.as_str()).collect())
219        };
220
221        let mut visited: HashSet<String> = HashSet::new();
222        let mut result: Vec<(String, String, String)> = Vec::new();
223        let mut queue: VecDeque<(String, usize)> = VecDeque::new();
224
225        for anchor in anchors {
226            if visited.len() >= self.max_nodes {
227                break;
228            }
229            if !visited.contains(anchor) {
230                queue.push_back((anchor.to_string(), 0));
231                visited.insert(anchor.to_string());
232            }
233        }
234
235        while let Some((node, depth)) = queue.pop_front() {
236            if visited.len() > self.max_nodes {
237                break;
238            }
239
240            if let Some(edges) = self.graph.get(&node) {
241                for (pred, obj) in edges {
242                    // Predicate filter
243                    if let Some(ref pf) = pred_filter {
244                        if !pf.contains(pred.as_str()) {
245                            continue;
246                        }
247                    }
248
249                    // Degree filter on the target node
250                    if let Some(ref filter) = pattern.filter {
251                        if !self.node_passes_filter(&node, filter) {
252                            continue;
253                        }
254                    }
255
256                    result.push((node.clone(), pred.clone(), obj.clone()));
257
258                    if depth < pattern.depth && !visited.contains(obj) {
259                        visited.insert(obj.clone());
260                        queue.push_back((obj.clone(), depth + 1));
261                    }
262                }
263            }
264        }
265
266        result
267    }
268
269    /// Extract the BFS neighbourhood of `node` up to `depth` hops.
270    pub fn extract_neighborhood(&self, node: &str, depth: usize) -> Vec<(String, String, String)> {
271        let pattern = SubgraphPattern {
272            anchor: node.to_string(),
273            predicates: vec![],
274            depth,
275            filter: None,
276        };
277        self.extract_subgraph(&pattern)
278    }
279
280    /// Drain and return all buffered events.
281    pub fn drain_events(&mut self) -> Vec<SubgraphEvent> {
282        self.buffer.drain(..).collect()
283    }
284
285    /// Number of nodes currently in the graph.
286    pub fn node_count(&self) -> usize {
287        self.graph.len()
288    }
289
290    /// Number of directed edges currently in the graph.
291    pub fn edge_count(&self) -> usize {
292        self.graph.values().map(|e| e.len()).sum()
293    }
294
295    // ── helpers ──────────────────────────────────────────────────────────────
296
297    fn node_passes_filter(&self, node: &str, filter: &SubgraphFilter) -> bool {
298        match filter {
299            SubgraphFilter::HasLabel(label) => {
300                // A node "has a label" if it has at least one edge whose object
301                // contains the label string (simplified heuristic)
302                self.graph
303                    .get(node)
304                    .map(|edges| edges.iter().any(|(_, o)| o.contains(label.as_str())))
305                    .unwrap_or(false)
306            }
307            SubgraphFilter::MaxDegree(max) => {
308                self.graph.get(node).map(|e| e.len()).unwrap_or(0) <= *max
309            }
310            SubgraphFilter::MinDegree(min) => {
311                self.graph.get(node).map(|e| e.len()).unwrap_or(0) >= *min
312            }
313        }
314    }
315}
316
317// ─────────────────────────────────────────────────────────────────────────────
318// StreamingStats
319// ─────────────────────────────────────────────────────────────────────────────
320
321/// Runtime statistics for [`StreamingGraphRag`].
322#[derive(Debug, Clone, Serialize, Deserialize)]
323pub struct StreamingStats {
324    /// Number of unique nodes in the live graph
325    pub nodes: usize,
326    /// Number of directed edges in the live graph
327    pub edges: usize,
328    /// Cache hit rate: hits / total queries (NaN if no queries yet)
329    pub cache_hit_rate: f64,
330    /// Total number of events processed since creation
331    pub events_processed: u64,
332}
333
334// ─────────────────────────────────────────────────────────────────────────────
335// StreamingGraphRag
336// ─────────────────────────────────────────────────────────────────────────────
337
338/// A live-updating GraphRAG engine that processes streaming triple events and
339/// serves subgraph queries from a cache backed by the live graph.
340pub struct StreamingGraphRag {
341    extractor: StreamingSubgraphExtractor,
342    query_cache: HashMap<String, Vec<(String, String, String)>>,
343    cache_ttl_ms: i64,
344    cache_timestamps: HashMap<String, i64>,
345    cache_hits: u64,
346    cache_misses: u64,
347    events_processed: u64,
348}
349
350impl StreamingGraphRag {
351    /// Create a new engine.
352    ///
353    /// * `max_depth` – BFS depth for subgraph extraction
354    /// * `cache_ttl_ms` is ignored if 0 (no expiry)
355    pub fn new(max_depth: usize) -> Self {
356        Self::with_cache_ttl(max_depth, 60_000) // default 60-second TTL
357    }
358
359    /// Create with an explicit cache TTL in milliseconds (0 = no expiry).
360    pub fn with_cache_ttl(max_depth: usize, cache_ttl_ms: i64) -> Self {
361        Self {
362            extractor: StreamingSubgraphExtractor::new(max_depth, 10_000),
363            query_cache: HashMap::new(),
364            cache_ttl_ms,
365            cache_timestamps: HashMap::new(),
366            cache_hits: 0,
367            cache_misses: 0,
368            events_processed: 0,
369        }
370    }
371
372    /// Process an incoming triple event (always treated as "add").
373    pub fn process_event(&mut self, s: &str, p: &str, o: &str) {
374        let events = self.extractor.add_triple(s, p, o);
375        self.events_processed += events.len() as u64;
376
377        // Invalidate any cached entries whose anchor matches the modified nodes
378        let dirty: Vec<String> = self
379            .query_cache
380            .keys()
381            .filter(|k| k.contains(s) || k.contains(o))
382            .cloned()
383            .collect();
384        for key in dirty {
385            self.query_cache.remove(&key);
386            self.cache_timestamps.remove(&key);
387        }
388    }
389
390    /// Query the live graph using `pattern`, returning cached results when fresh.
391    pub fn query_live(&mut self, pattern: &SubgraphPattern) -> Vec<(String, String, String)> {
392        let cache_key = pattern_cache_key(pattern);
393        let now = unix_ms();
394
395        if let Some(result) = self.query_cache.get(&cache_key) {
396            let cached_at = self.cache_timestamps.get(&cache_key).copied().unwrap_or(0);
397            let still_fresh = self.cache_ttl_ms == 0 || now - cached_at < self.cache_ttl_ms;
398            if still_fresh {
399                self.cache_hits += 1;
400                return result.clone();
401            }
402        }
403
404        self.cache_misses += 1;
405        let result = self.extractor.extract_subgraph(pattern);
406        self.query_cache.insert(cache_key.clone(), result.clone());
407        self.cache_timestamps.insert(cache_key, now);
408        result
409    }
410
411    /// Return current statistics.
412    pub fn stats(&self) -> StreamingStats {
413        let total = self.cache_hits + self.cache_misses;
414        let cache_hit_rate = if total == 0 {
415            f64::NAN
416        } else {
417            self.cache_hits as f64 / total as f64
418        };
419        StreamingStats {
420            nodes: self.extractor.node_count(),
421            edges: self.extractor.edge_count(),
422            cache_hit_rate,
423            events_processed: self.events_processed,
424        }
425    }
426
427    /// Drain buffered events from the underlying extractor.
428    pub fn drain_events(&mut self) -> Vec<SubgraphEvent> {
429        self.extractor.drain_events()
430    }
431}
432
433// ─────────────────────────────────────────────────────────────────────────────
434// Helpers
435// ─────────────────────────────────────────────────────────────────────────────
436
437fn unix_ms() -> i64 {
438    SystemTime::now()
439        .duration_since(UNIX_EPOCH)
440        .map(|d| d.as_millis() as i64)
441        .unwrap_or(0)
442}
443
444fn pattern_cache_key(p: &SubgraphPattern) -> String {
445    format!("{}|{}|{}", p.anchor, p.predicates.join(","), p.depth)
446}
447
448// ─────────────────────────────────────────────────────────────────────────────
449// Tests
450// ─────────────────────────────────────────────────────────────────────────────
451
452#[cfg(test)]
453mod tests {
454    use super::*;
455
456    fn make_extractor() -> StreamingSubgraphExtractor {
457        StreamingSubgraphExtractor::new(3, 1000)
458    }
459
460    // ── add_triple ────────────────────────────────────────────────────────
461
462    #[test]
463    fn test_add_triple_emits_node_added_for_new_subject() {
464        let mut ext = make_extractor();
465        let events = ext.add_triple("Alice", "knows", "Bob");
466        let types: Vec<_> = events.iter().map(|e| &e.event_type).collect();
467        assert!(types.contains(&&SubgraphEventType::NodeAdded));
468    }
469
470    #[test]
471    fn test_add_triple_emits_edge_added() {
472        let mut ext = make_extractor();
473        let events = ext.add_triple("Alice", "knows", "Bob");
474        assert!(events
475            .iter()
476            .any(|e| e.event_type == SubgraphEventType::EdgeAdded));
477    }
478
479    #[test]
480    fn test_add_triple_does_not_duplicate_existing_edge() {
481        let mut ext = make_extractor();
482        ext.add_triple("Alice", "knows", "Bob");
483        let events = ext.add_triple("Alice", "knows", "Bob"); // duplicate
484        assert!(events.is_empty()); // nothing changed
485    }
486
487    #[test]
488    fn test_add_triple_increases_node_and_edge_count() {
489        let mut ext = make_extractor();
490        ext.add_triple("A", "p", "B");
491        assert_eq!(ext.node_count(), 2);
492        assert_eq!(ext.edge_count(), 1);
493    }
494
495    #[test]
496    fn test_add_triple_new_object_emits_node_added() {
497        let mut ext = make_extractor();
498        let events = ext.add_triple("A", "p", "B");
499        let node_added_nodes: Vec<_> = events
500            .iter()
501            .filter(|e| e.event_type == SubgraphEventType::NodeAdded)
502            .map(|e| e.node.as_str())
503            .collect();
504        assert!(node_added_nodes.contains(&"B"));
505    }
506
507    // ── remove_triple ─────────────────────────────────────────────────────
508
509    #[test]
510    fn test_remove_triple_emits_edge_removed() {
511        let mut ext = make_extractor();
512        ext.add_triple("Alice", "knows", "Bob");
513        ext.drain_events();
514        let events = ext.remove_triple("Alice", "knows", "Bob");
515        assert!(events
516            .iter()
517            .any(|e| e.event_type == SubgraphEventType::EdgeRemoved));
518    }
519
520    #[test]
521    fn test_remove_triple_emits_node_removed_when_no_edges_left() {
522        let mut ext = make_extractor();
523        ext.add_triple("Alice", "knows", "Bob");
524        ext.drain_events();
525        let events = ext.remove_triple("Alice", "knows", "Bob");
526        assert!(events
527            .iter()
528            .any(|e| e.event_type == SubgraphEventType::NodeRemoved));
529    }
530
531    #[test]
532    fn test_remove_nonexistent_triple_emits_nothing() {
533        let mut ext = make_extractor();
534        let events = ext.remove_triple("X", "p", "Y");
535        assert!(events.is_empty());
536    }
537
538    #[test]
539    fn test_remove_triple_decreases_edge_count() {
540        let mut ext = make_extractor();
541        ext.add_triple("A", "p", "B");
542        ext.add_triple("A", "q", "C");
543        ext.remove_triple("A", "p", "B");
544        assert_eq!(ext.edge_count(), 1);
545    }
546
547    // ── extract_subgraph ──────────────────────────────────────────────────
548
549    #[test]
550    fn test_extract_subgraph_direct_anchor() {
551        let mut ext = make_extractor();
552        ext.add_triple("A", "p", "B");
553        ext.add_triple("A", "q", "C");
554        let pattern = SubgraphPattern {
555            anchor: "A".to_string(),
556            predicates: vec![],
557            depth: 1,
558            filter: None,
559        };
560        let triples = ext.extract_subgraph(&pattern);
561        assert_eq!(triples.len(), 2);
562    }
563
564    #[test]
565    fn test_extract_subgraph_predicate_filter() {
566        let mut ext = make_extractor();
567        ext.add_triple("A", "knows", "B");
568        ext.add_triple("A", "likes", "C");
569        let pattern = SubgraphPattern {
570            anchor: "A".to_string(),
571            predicates: vec!["knows".to_string()],
572            depth: 1,
573            filter: None,
574        };
575        let triples = ext.extract_subgraph(&pattern);
576        assert_eq!(triples.len(), 1);
577        assert_eq!(triples[0].1, "knows");
578    }
579
580    #[test]
581    fn test_extract_subgraph_variable_anchor() {
582        let mut ext = make_extractor();
583        ext.add_triple("A", "p", "B");
584        ext.add_triple("C", "p", "D");
585        let pattern = SubgraphPattern {
586            anchor: "?x".to_string(),
587            predicates: vec![],
588            depth: 0,
589            filter: None,
590        };
591        let triples = ext.extract_subgraph(&pattern);
592        // Should find edges from all subjects
593        assert!(!triples.is_empty());
594    }
595
596    #[test]
597    fn test_extract_subgraph_depth_limits_expansion() {
598        let mut ext = make_extractor();
599        ext.add_triple("A", "p", "B");
600        ext.add_triple("B", "p", "C");
601        ext.add_triple("C", "p", "D");
602        // depth 1 → only A→B, then B→C (depth 1 from A)
603        let pattern = SubgraphPattern {
604            anchor: "A".to_string(),
605            predicates: vec![],
606            depth: 1,
607            filter: None,
608        };
609        let triples = ext.extract_subgraph(&pattern);
610        // Should contain A→B and B→C but not C→D
611        let has_cd = triples.iter().any(|(s, _, o)| s == "C" && o == "D");
612        assert!(!has_cd, "Should not expand past depth 1");
613    }
614
615    // ── extract_neighborhood ──────────────────────────────────────────────
616
617    #[test]
618    fn test_extract_neighborhood_basic() {
619        let mut ext = make_extractor();
620        ext.add_triple("Root", "p", "Child");
621        let triples = ext.extract_neighborhood("Root", 1);
622        assert!(!triples.is_empty());
623        assert_eq!(triples[0].0, "Root");
624    }
625
626    #[test]
627    fn test_extract_neighborhood_unknown_node_returns_empty() {
628        let ext = make_extractor();
629        let triples = ext.extract_neighborhood("Unknown", 2);
630        assert!(triples.is_empty());
631    }
632
633    // ── SubgraphFilter ────────────────────────────────────────────────────
634
635    #[test]
636    fn test_filter_min_degree() {
637        let mut ext = make_extractor();
638        // A has 2 edges, B has 0
639        ext.add_triple("A", "p", "B");
640        ext.add_triple("A", "q", "C");
641        let pattern = SubgraphPattern {
642            anchor: "A".to_string(),
643            predicates: vec![],
644            depth: 1,
645            filter: Some(SubgraphFilter::MinDegree(2)),
646        };
647        let triples = ext.extract_subgraph(&pattern);
648        // A has ≥ 2 edges → passes filter
649        assert!(!triples.is_empty());
650    }
651
652    #[test]
653    fn test_filter_max_degree() {
654        let mut ext = make_extractor();
655        ext.add_triple("A", "p1", "B");
656        ext.add_triple("A", "p2", "C");
657        ext.add_triple("A", "p3", "D");
658        let pattern = SubgraphPattern {
659            anchor: "A".to_string(),
660            predicates: vec![],
661            depth: 1,
662            filter: Some(SubgraphFilter::MaxDegree(1)),
663        };
664        // A has 3 edges, fails MaxDegree(1)
665        let triples = ext.extract_subgraph(&pattern);
666        assert!(triples.is_empty());
667    }
668
669    // ── drain_events ──────────────────────────────────────────────────────
670
671    #[test]
672    fn test_drain_events_clears_buffer() {
673        let mut ext = make_extractor();
674        ext.add_triple("A", "p", "B");
675        assert!(!ext.drain_events().is_empty());
676        assert!(ext.drain_events().is_empty());
677    }
678
679    // ── StreamingGraphRag ─────────────────────────────────────────────────
680
681    #[test]
682    fn test_streaming_rag_process_event_updates_graph() {
683        let mut rag = StreamingGraphRag::new(3);
684        rag.process_event("Alice", "knows", "Bob");
685        let stats = rag.stats();
686        assert!(stats.nodes >= 2);
687        assert!(stats.edges >= 1);
688    }
689
690    #[test]
691    fn test_streaming_rag_cache_hit() {
692        let mut rag = StreamingGraphRag::new(3);
693        rag.process_event("A", "p", "B");
694
695        let pattern = SubgraphPattern {
696            anchor: "A".to_string(),
697            predicates: vec![],
698            depth: 1,
699            filter: None,
700        };
701
702        let _ = rag.query_live(&pattern); // miss
703        let _ = rag.query_live(&pattern); // hit
704        let stats = rag.stats();
705        assert!(stats.cache_hit_rate > 0.0);
706    }
707
708    #[test]
709    fn test_streaming_rag_cache_invalidated_on_event() {
710        let mut rag = StreamingGraphRag::new(3);
711        rag.process_event("A", "p", "B");
712
713        let pattern = SubgraphPattern {
714            anchor: "A".to_string(),
715            predicates: vec![],
716            depth: 1,
717            filter: None,
718        };
719
720        let r1 = rag.query_live(&pattern);
721        // Add a new edge touching A → should invalidate cache
722        rag.process_event("A", "q", "C");
723        let r2 = rag.query_live(&pattern);
724        // r2 should have more or equal triples than r1
725        assert!(r2.len() >= r1.len());
726    }
727
728    #[test]
729    fn test_streaming_rag_stats_initial_nan_hit_rate() {
730        let rag = StreamingGraphRag::new(3);
731        let stats = rag.stats();
732        assert!(stats.cache_hit_rate.is_nan());
733    }
734
735    #[test]
736    fn test_streaming_rag_drain_events() {
737        let mut rag = StreamingGraphRag::new(3);
738        rag.process_event("X", "p", "Y");
739        let events = rag.drain_events();
740        assert!(!events.is_empty());
741    }
742
743    #[test]
744    fn test_streaming_stats_fields() {
745        let mut rag = StreamingGraphRag::new(2);
746        rag.process_event("A", "edge", "B");
747        let stats = rag.stats();
748        assert_eq!(stats.nodes, 2);
749        assert_eq!(stats.edges, 1);
750    }
751}