use std::collections::{HashMap, HashSet, VecDeque};
use std::time::{SystemTime, UNIX_EPOCH};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum SubgraphEventType {
NodeAdded,
NodeRemoved,
EdgeAdded,
EdgeRemoved,
SubgraphMatch(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubgraphEvent {
pub event_type: SubgraphEventType,
pub node: String,
pub timestamp: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum SubgraphFilter {
HasLabel(String),
MaxDegree(usize),
MinDegree(usize),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubgraphPattern {
pub anchor: String,
pub predicates: Vec<String>,
pub depth: usize,
pub filter: Option<SubgraphFilter>,
}
type AdjList = HashMap<String, Vec<(String, String)>>;
pub struct StreamingSubgraphExtractor {
graph: AdjList,
buffer: VecDeque<SubgraphEvent>,
max_depth: usize,
max_nodes: usize,
events_total: u64,
}
impl StreamingSubgraphExtractor {
pub fn new(max_depth: usize, max_nodes: usize) -> Self {
Self {
graph: HashMap::new(),
buffer: VecDeque::new(),
max_depth,
max_nodes,
events_total: 0,
}
}
pub fn add_triple(&mut self, s: &str, p: &str, o: &str) -> Vec<SubgraphEvent> {
let now = unix_ms();
let mut events = Vec::new();
let subject_is_new = !self.graph.contains_key(s);
let edges = self.graph.entry(s.to_string()).or_default();
if !edges.iter().any(|(ep, eo)| ep == p && eo == o) {
edges.push((p.to_string(), o.to_string()));
if subject_is_new {
let ev = SubgraphEvent {
event_type: SubgraphEventType::NodeAdded,
node: s.to_string(),
timestamp: now,
};
events.push(ev.clone());
self.buffer.push_back(ev);
}
let ev = SubgraphEvent {
event_type: SubgraphEventType::EdgeAdded,
node: s.to_string(),
timestamp: now,
};
events.push(ev.clone());
self.buffer.push_back(ev);
if !self.graph.contains_key(o) {
self.graph.entry(o.to_string()).or_default();
let ev = SubgraphEvent {
event_type: SubgraphEventType::NodeAdded,
node: o.to_string(),
timestamp: now,
};
events.push(ev.clone());
self.buffer.push_back(ev);
}
}
self.events_total += events.len() as u64;
events
}
pub fn remove_triple(&mut self, s: &str, p: &str, o: &str) -> Vec<SubgraphEvent> {
let now = unix_ms();
let mut events = Vec::new();
let removed = if let Some(edges) = self.graph.get_mut(s) {
let before = edges.len();
edges.retain(|(ep, eo)| !(ep == p && eo == o));
edges.len() < before
} else {
false
};
if removed {
let ev = SubgraphEvent {
event_type: SubgraphEventType::EdgeRemoved,
node: s.to_string(),
timestamp: now,
};
events.push(ev.clone());
self.buffer.push_back(ev);
if self.graph.get(s).map(|e| e.is_empty()).unwrap_or(false) {
self.graph.remove(s);
let ev = SubgraphEvent {
event_type: SubgraphEventType::NodeRemoved,
node: s.to_string(),
timestamp: now,
};
events.push(ev.clone());
self.buffer.push_back(ev);
}
}
self.events_total += events.len() as u64;
events
}
pub fn extract_subgraph(&self, pattern: &SubgraphPattern) -> Vec<(String, String, String)> {
let anchors: Vec<&str> = if pattern.anchor == "?x" {
self.graph.keys().map(|s| s.as_str()).collect()
} else {
vec![pattern.anchor.as_str()]
};
let pred_filter: Option<HashSet<&str>> = if pattern.predicates.is_empty() {
None
} else {
Some(pattern.predicates.iter().map(|s| s.as_str()).collect())
};
let mut visited: HashSet<String> = HashSet::new();
let mut result: Vec<(String, String, String)> = Vec::new();
let mut queue: VecDeque<(String, usize)> = VecDeque::new();
for anchor in anchors {
if visited.len() >= self.max_nodes {
break;
}
if !visited.contains(anchor) {
queue.push_back((anchor.to_string(), 0));
visited.insert(anchor.to_string());
}
}
while let Some((node, depth)) = queue.pop_front() {
if visited.len() > self.max_nodes {
break;
}
if let Some(edges) = self.graph.get(&node) {
for (pred, obj) in edges {
if let Some(ref pf) = pred_filter {
if !pf.contains(pred.as_str()) {
continue;
}
}
if let Some(ref filter) = pattern.filter {
if !self.node_passes_filter(&node, filter) {
continue;
}
}
result.push((node.clone(), pred.clone(), obj.clone()));
if depth < pattern.depth && !visited.contains(obj) {
visited.insert(obj.clone());
queue.push_back((obj.clone(), depth + 1));
}
}
}
}
result
}
pub fn extract_neighborhood(&self, node: &str, depth: usize) -> Vec<(String, String, String)> {
let pattern = SubgraphPattern {
anchor: node.to_string(),
predicates: vec![],
depth,
filter: None,
};
self.extract_subgraph(&pattern)
}
pub fn drain_events(&mut self) -> Vec<SubgraphEvent> {
self.buffer.drain(..).collect()
}
pub fn node_count(&self) -> usize {
self.graph.len()
}
pub fn edge_count(&self) -> usize {
self.graph.values().map(|e| e.len()).sum()
}
fn node_passes_filter(&self, node: &str, filter: &SubgraphFilter) -> bool {
match filter {
SubgraphFilter::HasLabel(label) => {
self.graph
.get(node)
.map(|edges| edges.iter().any(|(_, o)| o.contains(label.as_str())))
.unwrap_or(false)
}
SubgraphFilter::MaxDegree(max) => {
self.graph.get(node).map(|e| e.len()).unwrap_or(0) <= *max
}
SubgraphFilter::MinDegree(min) => {
self.graph.get(node).map(|e| e.len()).unwrap_or(0) >= *min
}
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamingStats {
pub nodes: usize,
pub edges: usize,
pub cache_hit_rate: f64,
pub events_processed: u64,
}
pub struct StreamingGraphRag {
extractor: StreamingSubgraphExtractor,
query_cache: HashMap<String, Vec<(String, String, String)>>,
cache_ttl_ms: i64,
cache_timestamps: HashMap<String, i64>,
cache_hits: u64,
cache_misses: u64,
events_processed: u64,
}
impl StreamingGraphRag {
pub fn new(max_depth: usize) -> Self {
Self::with_cache_ttl(max_depth, 60_000) }
pub fn with_cache_ttl(max_depth: usize, cache_ttl_ms: i64) -> Self {
Self {
extractor: StreamingSubgraphExtractor::new(max_depth, 10_000),
query_cache: HashMap::new(),
cache_ttl_ms,
cache_timestamps: HashMap::new(),
cache_hits: 0,
cache_misses: 0,
events_processed: 0,
}
}
pub fn process_event(&mut self, s: &str, p: &str, o: &str) {
let events = self.extractor.add_triple(s, p, o);
self.events_processed += events.len() as u64;
let dirty: Vec<String> = self
.query_cache
.keys()
.filter(|k| k.contains(s) || k.contains(o))
.cloned()
.collect();
for key in dirty {
self.query_cache.remove(&key);
self.cache_timestamps.remove(&key);
}
}
pub fn query_live(&mut self, pattern: &SubgraphPattern) -> Vec<(String, String, String)> {
let cache_key = pattern_cache_key(pattern);
let now = unix_ms();
if let Some(result) = self.query_cache.get(&cache_key) {
let cached_at = self.cache_timestamps.get(&cache_key).copied().unwrap_or(0);
let still_fresh = self.cache_ttl_ms == 0 || now - cached_at < self.cache_ttl_ms;
if still_fresh {
self.cache_hits += 1;
return result.clone();
}
}
self.cache_misses += 1;
let result = self.extractor.extract_subgraph(pattern);
self.query_cache.insert(cache_key.clone(), result.clone());
self.cache_timestamps.insert(cache_key, now);
result
}
pub fn stats(&self) -> StreamingStats {
let total = self.cache_hits + self.cache_misses;
let cache_hit_rate = if total == 0 {
f64::NAN
} else {
self.cache_hits as f64 / total as f64
};
StreamingStats {
nodes: self.extractor.node_count(),
edges: self.extractor.edge_count(),
cache_hit_rate,
events_processed: self.events_processed,
}
}
pub fn drain_events(&mut self) -> Vec<SubgraphEvent> {
self.extractor.drain_events()
}
}
fn unix_ms() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0)
}
fn pattern_cache_key(p: &SubgraphPattern) -> String {
format!("{}|{}|{}", p.anchor, p.predicates.join(","), p.depth)
}
#[cfg(test)]
mod tests {
use super::*;
fn make_extractor() -> StreamingSubgraphExtractor {
StreamingSubgraphExtractor::new(3, 1000)
}
#[test]
fn test_add_triple_emits_node_added_for_new_subject() {
let mut ext = make_extractor();
let events = ext.add_triple("Alice", "knows", "Bob");
let types: Vec<_> = events.iter().map(|e| &e.event_type).collect();
assert!(types.contains(&&SubgraphEventType::NodeAdded));
}
#[test]
fn test_add_triple_emits_edge_added() {
let mut ext = make_extractor();
let events = ext.add_triple("Alice", "knows", "Bob");
assert!(events
.iter()
.any(|e| e.event_type == SubgraphEventType::EdgeAdded));
}
#[test]
fn test_add_triple_does_not_duplicate_existing_edge() {
let mut ext = make_extractor();
ext.add_triple("Alice", "knows", "Bob");
let events = ext.add_triple("Alice", "knows", "Bob"); assert!(events.is_empty()); }
#[test]
fn test_add_triple_increases_node_and_edge_count() {
let mut ext = make_extractor();
ext.add_triple("A", "p", "B");
assert_eq!(ext.node_count(), 2);
assert_eq!(ext.edge_count(), 1);
}
#[test]
fn test_add_triple_new_object_emits_node_added() {
let mut ext = make_extractor();
let events = ext.add_triple("A", "p", "B");
let node_added_nodes: Vec<_> = events
.iter()
.filter(|e| e.event_type == SubgraphEventType::NodeAdded)
.map(|e| e.node.as_str())
.collect();
assert!(node_added_nodes.contains(&"B"));
}
#[test]
fn test_remove_triple_emits_edge_removed() {
let mut ext = make_extractor();
ext.add_triple("Alice", "knows", "Bob");
ext.drain_events();
let events = ext.remove_triple("Alice", "knows", "Bob");
assert!(events
.iter()
.any(|e| e.event_type == SubgraphEventType::EdgeRemoved));
}
#[test]
fn test_remove_triple_emits_node_removed_when_no_edges_left() {
let mut ext = make_extractor();
ext.add_triple("Alice", "knows", "Bob");
ext.drain_events();
let events = ext.remove_triple("Alice", "knows", "Bob");
assert!(events
.iter()
.any(|e| e.event_type == SubgraphEventType::NodeRemoved));
}
#[test]
fn test_remove_nonexistent_triple_emits_nothing() {
let mut ext = make_extractor();
let events = ext.remove_triple("X", "p", "Y");
assert!(events.is_empty());
}
#[test]
fn test_remove_triple_decreases_edge_count() {
let mut ext = make_extractor();
ext.add_triple("A", "p", "B");
ext.add_triple("A", "q", "C");
ext.remove_triple("A", "p", "B");
assert_eq!(ext.edge_count(), 1);
}
#[test]
fn test_extract_subgraph_direct_anchor() {
let mut ext = make_extractor();
ext.add_triple("A", "p", "B");
ext.add_triple("A", "q", "C");
let pattern = SubgraphPattern {
anchor: "A".to_string(),
predicates: vec![],
depth: 1,
filter: None,
};
let triples = ext.extract_subgraph(&pattern);
assert_eq!(triples.len(), 2);
}
#[test]
fn test_extract_subgraph_predicate_filter() {
let mut ext = make_extractor();
ext.add_triple("A", "knows", "B");
ext.add_triple("A", "likes", "C");
let pattern = SubgraphPattern {
anchor: "A".to_string(),
predicates: vec!["knows".to_string()],
depth: 1,
filter: None,
};
let triples = ext.extract_subgraph(&pattern);
assert_eq!(triples.len(), 1);
assert_eq!(triples[0].1, "knows");
}
#[test]
fn test_extract_subgraph_variable_anchor() {
let mut ext = make_extractor();
ext.add_triple("A", "p", "B");
ext.add_triple("C", "p", "D");
let pattern = SubgraphPattern {
anchor: "?x".to_string(),
predicates: vec![],
depth: 0,
filter: None,
};
let triples = ext.extract_subgraph(&pattern);
assert!(!triples.is_empty());
}
#[test]
fn test_extract_subgraph_depth_limits_expansion() {
let mut ext = make_extractor();
ext.add_triple("A", "p", "B");
ext.add_triple("B", "p", "C");
ext.add_triple("C", "p", "D");
let pattern = SubgraphPattern {
anchor: "A".to_string(),
predicates: vec![],
depth: 1,
filter: None,
};
let triples = ext.extract_subgraph(&pattern);
let has_cd = triples.iter().any(|(s, _, o)| s == "C" && o == "D");
assert!(!has_cd, "Should not expand past depth 1");
}
#[test]
fn test_extract_neighborhood_basic() {
let mut ext = make_extractor();
ext.add_triple("Root", "p", "Child");
let triples = ext.extract_neighborhood("Root", 1);
assert!(!triples.is_empty());
assert_eq!(triples[0].0, "Root");
}
#[test]
fn test_extract_neighborhood_unknown_node_returns_empty() {
let ext = make_extractor();
let triples = ext.extract_neighborhood("Unknown", 2);
assert!(triples.is_empty());
}
#[test]
fn test_filter_min_degree() {
let mut ext = make_extractor();
ext.add_triple("A", "p", "B");
ext.add_triple("A", "q", "C");
let pattern = SubgraphPattern {
anchor: "A".to_string(),
predicates: vec![],
depth: 1,
filter: Some(SubgraphFilter::MinDegree(2)),
};
let triples = ext.extract_subgraph(&pattern);
assert!(!triples.is_empty());
}
#[test]
fn test_filter_max_degree() {
let mut ext = make_extractor();
ext.add_triple("A", "p1", "B");
ext.add_triple("A", "p2", "C");
ext.add_triple("A", "p3", "D");
let pattern = SubgraphPattern {
anchor: "A".to_string(),
predicates: vec![],
depth: 1,
filter: Some(SubgraphFilter::MaxDegree(1)),
};
let triples = ext.extract_subgraph(&pattern);
assert!(triples.is_empty());
}
#[test]
fn test_drain_events_clears_buffer() {
let mut ext = make_extractor();
ext.add_triple("A", "p", "B");
assert!(!ext.drain_events().is_empty());
assert!(ext.drain_events().is_empty());
}
#[test]
fn test_streaming_rag_process_event_updates_graph() {
let mut rag = StreamingGraphRag::new(3);
rag.process_event("Alice", "knows", "Bob");
let stats = rag.stats();
assert!(stats.nodes >= 2);
assert!(stats.edges >= 1);
}
#[test]
fn test_streaming_rag_cache_hit() {
let mut rag = StreamingGraphRag::new(3);
rag.process_event("A", "p", "B");
let pattern = SubgraphPattern {
anchor: "A".to_string(),
predicates: vec![],
depth: 1,
filter: None,
};
let _ = rag.query_live(&pattern); let _ = rag.query_live(&pattern); let stats = rag.stats();
assert!(stats.cache_hit_rate > 0.0);
}
#[test]
fn test_streaming_rag_cache_invalidated_on_event() {
let mut rag = StreamingGraphRag::new(3);
rag.process_event("A", "p", "B");
let pattern = SubgraphPattern {
anchor: "A".to_string(),
predicates: vec![],
depth: 1,
filter: None,
};
let r1 = rag.query_live(&pattern);
rag.process_event("A", "q", "C");
let r2 = rag.query_live(&pattern);
assert!(r2.len() >= r1.len());
}
#[test]
fn test_streaming_rag_stats_initial_nan_hit_rate() {
let rag = StreamingGraphRag::new(3);
let stats = rag.stats();
assert!(stats.cache_hit_rate.is_nan());
}
#[test]
fn test_streaming_rag_drain_events() {
let mut rag = StreamingGraphRag::new(3);
rag.process_event("X", "p", "Y");
let events = rag.drain_events();
assert!(!events.is_empty());
}
#[test]
fn test_streaming_stats_fields() {
let mut rag = StreamingGraphRag::new(2);
rag.process_event("A", "edge", "B");
let stats = rag.stats();
assert_eq!(stats.nodes, 2);
assert_eq!(stats.edges, 1);
}
}