Skip to main content

graphrag_core/graph/
temporal.rs

1//! Temporal Graph Analysis
2//!
3//! This module provides analysis capabilities for time-evolving graphs:
4//! - Temporal graph representation
5//! - Snapshot-based analysis
6//! - Evolution metrics
7//! - Temporal community detection
8//! - Time-aware path finding
9//!
10//! ## Use Cases
11//!
12//! - Social network evolution tracking
13//! - Knowledge graph versioning
14//! - Anomaly detection in dynamic networks
15//! - Trend analysis and forecasting
16//! - Event detection in temporal data
17
18use serde::{Deserialize, Serialize};
19use std::collections::{HashMap, HashSet, BTreeMap};
20
21/// Temporal edge with timestamp
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct TemporalEdge {
24    /// Source node
25    pub source: String,
26    /// Target node
27    pub target: String,
28    /// Edge type/label
29    pub edge_type: String,
30    /// Timestamp (Unix timestamp)
31    pub timestamp: i64,
32    /// Edge weight
33    pub weight: f32,
34    /// Start time (optional, for interval-based edges)
35    pub start_time: Option<i64>,
36    /// End time (optional, for interval-based edges)
37    pub end_time: Option<i64>,
38}
39
40impl TemporalEdge {
41    /// Check if edge is active at given timestamp
42    pub fn is_active_at(&self, timestamp: i64) -> bool {
43        if let (Some(start), Some(end)) = (self.start_time, self.end_time) {
44            timestamp >= start && timestamp <= end
45        } else {
46            // Point-in-time edge
47            self.timestamp == timestamp
48        }
49    }
50
51    /// Check if edge is active in time range
52    pub fn is_active_in_range(&self, start: i64, end: i64) -> bool {
53        if let (Some(edge_start), Some(edge_end)) = (self.start_time, self.end_time) {
54            // Interval overlap check
55            edge_start <= end && edge_end >= start
56        } else {
57            // Point-in-time edge
58            self.timestamp >= start && self.timestamp <= end
59        }
60    }
61}
62
63/// Graph snapshot at specific time
64#[derive(Debug, Clone)]
65pub struct Snapshot {
66    /// Snapshot timestamp
67    pub timestamp: i64,
68    /// Nodes active in this snapshot
69    pub nodes: HashSet<String>,
70    /// Edges active in this snapshot
71    pub edges: Vec<TemporalEdge>,
72    /// Edge count
73    pub edge_count: usize,
74    /// Node count
75    pub node_count: usize,
76}
77
78impl Snapshot {
79    /// Create snapshot from temporal edges
80    pub fn from_edges(timestamp: i64, edges: Vec<TemporalEdge>) -> Self {
81        let mut nodes = HashSet::new();
82
83        for edge in &edges {
84            nodes.insert(edge.source.clone());
85            nodes.insert(edge.target.clone());
86        }
87
88        let node_count = nodes.len();
89        let edge_count = edges.len();
90
91        Self {
92            timestamp,
93            nodes,
94            edges,
95            edge_count,
96            node_count,
97        }
98    }
99
100    /// Get node degree in snapshot
101    pub fn node_degree(&self, node: &str) -> usize {
102        self.edges
103            .iter()
104            .filter(|e| e.source == node || e.target == node)
105            .count()
106    }
107
108    /// Get graph density
109    pub fn density(&self) -> f32 {
110        if self.node_count < 2 {
111            return 0.0;
112        }
113
114        let max_edges = (self.node_count * (self.node_count - 1)) / 2;
115        self.edge_count as f32 / max_edges as f32
116    }
117}
118
119/// Temporal graph
120pub struct TemporalGraph {
121    /// All temporal edges
122    edges: Vec<TemporalEdge>,
123    /// Edge index by timestamp
124    edge_index: BTreeMap<i64, Vec<usize>>,
125    /// Node first appearance time
126    node_first_seen: HashMap<String, i64>,
127    /// Node last seen time
128    node_last_seen: HashMap<String, i64>,
129}
130
131impl TemporalGraph {
132    /// Create new temporal graph
133    pub fn new() -> Self {
134        Self {
135            edges: Vec::new(),
136            edge_index: BTreeMap::new(),
137            node_first_seen: HashMap::new(),
138            node_last_seen: HashMap::new(),
139        }
140    }
141
142    /// Add temporal edge
143    pub fn add_edge(&mut self, edge: TemporalEdge) {
144        let timestamp = edge.timestamp;
145        let edge_idx = self.edges.len();
146
147        // Update node timestamps
148        self.update_node_timestamp(&edge.source, timestamp);
149        self.update_node_timestamp(&edge.target, timestamp);
150
151        // Add to edge index
152        self.edge_index
153            .entry(timestamp)
154            .or_default()
155            .push(edge_idx);
156
157        self.edges.push(edge);
158    }
159
160    /// Update node first/last seen timestamps
161    fn update_node_timestamp(&mut self, node: &str, timestamp: i64) {
162        self.node_first_seen
163            .entry(node.to_string())
164            .and_modify(|t| *t = (*t).min(timestamp))
165            .or_insert(timestamp);
166
167        self.node_last_seen
168            .entry(node.to_string())
169            .and_modify(|t| *t = (*t).max(timestamp))
170            .or_insert(timestamp);
171    }
172
173    /// Get snapshot at specific timestamp
174    pub fn snapshot_at(&self, timestamp: i64) -> Snapshot {
175        let edges: Vec<TemporalEdge> = self
176            .edges
177            .iter()
178            .filter(|e| e.is_active_at(timestamp))
179            .cloned()
180            .collect();
181
182        Snapshot::from_edges(timestamp, edges)
183    }
184
185    /// Get snapshot for time range
186    pub fn snapshot_range(&self, start: i64, end: i64) -> Snapshot {
187        let edges: Vec<TemporalEdge> = self
188            .edges
189            .iter()
190            .filter(|e| e.is_active_in_range(start, end))
191            .cloned()
192            .collect();
193
194        Snapshot::from_edges((start + end) / 2, edges)
195    }
196
197    /// Get all timestamps (discrete time points)
198    pub fn timestamps(&self) -> Vec<i64> {
199        self.edge_index.keys().copied().collect()
200    }
201
202    /// Get time range
203    pub fn time_range(&self) -> Option<(i64, i64)> {
204        if self.edges.is_empty() {
205            return None;
206        }
207
208        let min = self.edges.iter().map(|e| e.timestamp).min().unwrap();
209        let max = self.edges.iter().map(|e| e.timestamp).max().unwrap();
210
211        Some((min, max))
212    }
213
214    /// Get node lifetime
215    pub fn node_lifetime(&self, node: &str) -> Option<(i64, i64)> {
216        let first = self.node_first_seen.get(node)?;
217        let last = self.node_last_seen.get(node)?;
218
219        Some((*first, *last))
220    }
221
222    /// Get all nodes
223    pub fn nodes(&self) -> HashSet<String> {
224        self.node_first_seen.keys().cloned().collect()
225    }
226
227    /// Get edge count
228    pub fn edge_count(&self) -> usize {
229        self.edges.len()
230    }
231
232    /// Get node count
233    pub fn node_count(&self) -> usize {
234        self.node_first_seen.len()
235    }
236}
237
238impl Default for TemporalGraph {
239    fn default() -> Self {
240        Self::new()
241    }
242}
243
244/// Temporal query parameters
245#[derive(Debug, Clone, Serialize, Deserialize)]
246pub struct TemporalQuery {
247    /// Start timestamp
248    pub start_time: i64,
249    /// End timestamp
250    pub end_time: i64,
251    /// Granularity (e.g., daily, weekly)
252    pub granularity: i64,
253    /// Node filter (optional)
254    pub nodes: Option<Vec<String>>,
255    /// Edge type filter (optional)
256    pub edge_types: Option<Vec<String>>,
257}
258
259/// Temporal analytics engine
260pub struct TemporalAnalytics {
261    graph: TemporalGraph,
262}
263
264impl TemporalAnalytics {
265    /// Create analytics engine
266    pub fn new(graph: TemporalGraph) -> Self {
267        Self { graph }
268    }
269
270    /// Calculate evolution metrics over time
271    pub fn evolution_metrics(&self, query: &TemporalQuery) -> Vec<EvolutionMetrics> {
272        let mut metrics = Vec::new();
273        let mut current_time = query.start_time;
274
275        while current_time <= query.end_time {
276            let next_time = current_time + query.granularity;
277            let snapshot = self.graph.snapshot_range(current_time, next_time);
278
279            let metric = EvolutionMetrics {
280                timestamp: current_time,
281                node_count: snapshot.node_count,
282                edge_count: snapshot.edge_count,
283                density: snapshot.density(),
284                avg_degree: self.calculate_avg_degree(&snapshot),
285            };
286
287            metrics.push(metric);
288            current_time = next_time;
289        }
290
291        metrics
292    }
293
294    /// Calculate average node degree in snapshot
295    fn calculate_avg_degree(&self, snapshot: &Snapshot) -> f32 {
296        if snapshot.node_count == 0 {
297            return 0.0;
298        }
299
300        let total_degree: usize = snapshot
301            .nodes
302            .iter()
303            .map(|n| snapshot.node_degree(n))
304            .sum();
305
306        total_degree as f32 / snapshot.node_count as f32
307    }
308
309    /// Detect node churn (nodes appearing/disappearing)
310    pub fn node_churn(&self, query: &TemporalQuery) -> NodeChurn {
311        let start_snapshot = self.graph.snapshot_at(query.start_time);
312        let end_snapshot = self.graph.snapshot_at(query.end_time);
313
314        let added: HashSet<_> = end_snapshot
315            .nodes
316            .difference(&start_snapshot.nodes)
317            .cloned()
318            .collect();
319
320        let removed: HashSet<_> = start_snapshot
321            .nodes
322            .difference(&end_snapshot.nodes)
323            .cloned()
324            .collect();
325
326        let stable: HashSet<_> = start_snapshot
327            .nodes
328            .intersection(&end_snapshot.nodes)
329            .cloned()
330            .collect();
331
332        let added_count = added.len();
333        let removed_count = removed.len();
334        let stable_count = stable.len();
335
336        NodeChurn {
337            added: added.into_iter().collect(),
338            removed: removed.into_iter().collect(),
339            stable: stable.into_iter().collect(),
340            added_count,
341            removed_count,
342            stable_count,
343        }
344    }
345
346    /// Find nodes with highest activity growth
347    pub fn top_growing_nodes(&self, query: &TemporalQuery, top_k: usize) -> Vec<(String, f32)> {
348        let start_snapshot = self.graph.snapshot_range(query.start_time, query.start_time + query.granularity);
349        let end_snapshot = self.graph.snapshot_range(query.end_time - query.granularity, query.end_time);
350
351        let mut growth_scores: Vec<(String, f32)> = Vec::new();
352
353        for node in &end_snapshot.nodes {
354            let start_degree = start_snapshot.node_degree(node) as f32;
355            let end_degree = end_snapshot.node_degree(node) as f32;
356
357            let growth = if start_degree > 0.0 {
358                (end_degree - start_degree) / start_degree
359            } else {
360                end_degree
361            };
362
363            growth_scores.push((node.clone(), growth));
364        }
365
366        growth_scores.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
367        growth_scores.truncate(top_k);
368
369        growth_scores
370    }
371
372    /// Get temporal centrality (activity over time)
373    pub fn temporal_centrality(&self, node: &str, query: &TemporalQuery) -> Vec<(i64, f32)> {
374        let mut centrality = Vec::new();
375        let mut current_time = query.start_time;
376
377        while current_time <= query.end_time {
378            let next_time = current_time + query.granularity;
379            let snapshot = self.graph.snapshot_range(current_time, next_time);
380
381            let degree = snapshot.node_degree(node) as f32;
382            let centrality_score = if snapshot.node_count > 1 {
383                degree / (snapshot.node_count - 1) as f32
384            } else {
385                0.0
386            };
387
388            centrality.push((current_time, centrality_score));
389            current_time = next_time;
390        }
391
392        centrality
393    }
394}
395
396/// Evolution metrics over time
397#[derive(Debug, Clone, Serialize, Deserialize)]
398pub struct EvolutionMetrics {
399    /// Timestamp
400    pub timestamp: i64,
401    /// Number of nodes
402    pub node_count: usize,
403    /// Number of edges
404    pub edge_count: usize,
405    /// Graph density
406    pub density: f32,
407    /// Average degree
408    pub avg_degree: f32,
409}
410
411/// Node churn analysis
412#[derive(Debug, Clone)]
413pub struct NodeChurn {
414    /// Nodes added
415    pub added: Vec<String>,
416    /// Nodes removed
417    pub removed: Vec<String>,
418    /// Stable nodes
419    pub stable: Vec<String>,
420    /// Count of added nodes
421    pub added_count: usize,
422    /// Count of removed nodes
423    pub removed_count: usize,
424    /// Count of stable nodes
425    pub stable_count: usize,
426}
427
428#[cfg(test)]
429mod tests {
430    use super::*;
431
432    fn create_test_temporal_graph() -> TemporalGraph {
433        let mut graph = TemporalGraph::new();
434
435        // Add edges over time
436        graph.add_edge(TemporalEdge {
437            source: "A".to_string(),
438            target: "B".to_string(),
439            edge_type: "knows".to_string(),
440            timestamp: 100,
441            weight: 1.0,
442            start_time: Some(100),
443            end_time: Some(200),
444        });
445
446        graph.add_edge(TemporalEdge {
447            source: "B".to_string(),
448            target: "C".to_string(),
449            edge_type: "knows".to_string(),
450            timestamp: 150,
451            weight: 1.0,
452            start_time: Some(150),
453            end_time: Some(250),
454        });
455
456        graph.add_edge(TemporalEdge {
457            source: "A".to_string(),
458            target: "C".to_string(),
459            edge_type: "knows".to_string(),
460            timestamp: 200,
461            weight: 1.0,
462            start_time: Some(200),
463            end_time: Some(300),
464        });
465
466        graph
467    }
468
469    #[test]
470    fn test_temporal_graph_creation() {
471        let graph = create_test_temporal_graph();
472        assert_eq!(graph.edge_count(), 3);
473        assert_eq!(graph.node_count(), 3);
474    }
475
476    #[test]
477    fn test_snapshot_at_timestamp() {
478        let graph = create_test_temporal_graph();
479        let snapshot = graph.snapshot_at(150);
480
481        assert!(snapshot.node_count > 0);
482        assert!(snapshot.edge_count > 0);
483    }
484
485    #[test]
486    fn test_snapshot_range() {
487        let graph = create_test_temporal_graph();
488        let snapshot = graph.snapshot_range(100, 200);
489
490        assert_eq!(snapshot.node_count, 3);
491        assert!(snapshot.edge_count >= 2);
492    }
493
494    #[test]
495    fn test_time_range() {
496        let graph = create_test_temporal_graph();
497        let (min, max) = graph.time_range().unwrap();
498
499        assert_eq!(min, 100);
500        assert_eq!(max, 200);
501    }
502
503    #[test]
504    fn test_node_lifetime() {
505        let graph = create_test_temporal_graph();
506        let (first, last) = graph.node_lifetime("A").unwrap();
507
508        assert_eq!(first, 100);
509        assert_eq!(last, 200);
510    }
511
512    #[test]
513    fn test_evolution_metrics() {
514        let graph = create_test_temporal_graph();
515        let analytics = TemporalAnalytics::new(graph);
516
517        let query = TemporalQuery {
518            start_time: 100,
519            end_time: 300,
520            granularity: 50,
521            nodes: None,
522            edge_types: None,
523        };
524
525        let metrics = analytics.evolution_metrics(&query);
526        assert!(!metrics.is_empty());
527
528        for metric in &metrics {
529            assert!(metric.timestamp >= 100);
530            assert!(metric.timestamp <= 300);
531        }
532    }
533
534    #[test]
535    fn test_node_churn() {
536        let mut graph = TemporalGraph::new();
537
538        // Initial nodes: A, B
539        graph.add_edge(TemporalEdge {
540            source: "A".to_string(),
541            target: "B".to_string(),
542            edge_type: "knows".to_string(),
543            timestamp: 100,
544            weight: 1.0,
545            start_time: None,
546            end_time: None,
547        });
548
549        // Later: B, C (A removed, C added)
550        graph.add_edge(TemporalEdge {
551            source: "B".to_string(),
552            target: "C".to_string(),
553            edge_type: "knows".to_string(),
554            timestamp: 200,
555            weight: 1.0,
556            start_time: None,
557            end_time: None,
558        });
559
560        let analytics = TemporalAnalytics::new(graph);
561        let query = TemporalQuery {
562            start_time: 100,
563            end_time: 200,
564            granularity: 50,
565            nodes: None,
566            edge_types: None,
567        };
568
569        let churn = analytics.node_churn(&query);
570        assert!(churn.added.contains(&"C".to_string()) || churn.stable_count > 0);
571    }
572
573    #[test]
574    fn test_temporal_edge_is_active() {
575        let edge = TemporalEdge {
576            source: "A".to_string(),
577            target: "B".to_string(),
578            edge_type: "knows".to_string(),
579            timestamp: 100,
580            weight: 1.0,
581            start_time: Some(100),
582            end_time: Some(200),
583        };
584
585        assert!(edge.is_active_at(150));
586        assert!(edge.is_active_at(100));
587        assert!(edge.is_active_at(200));
588        assert!(!edge.is_active_at(50));
589        assert!(!edge.is_active_at(250));
590
591        assert!(edge.is_active_in_range(90, 110));
592        assert!(edge.is_active_in_range(150, 250));
593        assert!(!edge.is_active_in_range(50, 90));
594    }
595}