Skip to main content

scirs2_graph/
streaming.rs

1//! Streaming graph processing for dynamic and large-scale graphs
2//!
3//! This module provides data structures and algorithms for processing graph
4//! streams where edges arrive (and optionally depart) over time. It supports:
5//!
6//! - **Edge stream processing**: Incremental edge additions and deletions
7//! - **Approximate degree distribution**: Maintained incrementally
8//! - **Streaming triangle counting**: Doulion-style sampling and MASCOT-style
9//!   edge sampling for approximate triangle counts
10//! - **Sliding window model**: Maintain a graph over the most recent W edges
11//! - **Memory-bounded processing**: Configurable memory limits with eviction
12//!
13//! # Design
14//!
15//! The streaming model assumes edges arrive one at a time. Algorithms maintain
16//! approximate statistics without storing the entire graph, making them suitable
17//! for graphs that do not fit in memory.
18
19use crate::compressed::CsrGraph;
20use crate::error::{GraphError, Result};
21use scirs2_core::random::prelude::*;
22use std::collections::{HashMap, HashSet, VecDeque};
23
24// ────────────────────────────────────────────────────────────────────────────
25// StreamEdge
26// ────────────────────────────────────────────────────────────────────────────
27
28/// A single edge in a graph stream.
29#[derive(Debug, Clone, Copy, PartialEq)]
30pub struct StreamEdge {
31    /// Source node
32    pub src: usize,
33    /// Destination node
34    pub dst: usize,
35    /// Edge weight
36    pub weight: f64,
37    /// Timestamp (monotonically increasing)
38    pub timestamp: u64,
39}
40
41/// Type of stream operation.
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum StreamOp {
44    /// Add an edge
45    Insert,
46    /// Remove an edge
47    Delete,
48}
49
50/// A stream event: an edge with an operation type.
51#[derive(Debug, Clone, Copy, PartialEq)]
52pub struct StreamEvent {
53    /// The edge
54    pub edge: StreamEdge,
55    /// Whether this is an insertion or deletion
56    pub op: StreamOp,
57}
58
59// ────────────────────────────────────────────────────────────────────────────
60// StreamingGraph
61// ────────────────────────────────────────────────────────────────────────────
62
63/// A streaming graph that supports incremental edge additions and deletions.
64///
65/// Maintains an adjacency set representation that is updated incrementally.
66/// Tracks basic statistics like degree distribution, edge count, and node count.
67#[derive(Debug)]
68pub struct StreamingGraph {
69    /// Adjacency sets: node -> set of (neighbor, weight)
70    adjacency: HashMap<usize, HashMap<usize, f64>>,
71    /// Total number of edges currently in the graph
72    num_edges: usize,
73    /// Number of stream events processed
74    events_processed: u64,
75    /// Whether the graph is directed
76    directed: bool,
77    /// Maximum node ID seen
78    max_node_id: usize,
79}
80
81impl StreamingGraph {
82    /// Create a new empty streaming graph.
83    pub fn new(directed: bool) -> Self {
84        Self {
85            adjacency: HashMap::new(),
86            num_edges: 0,
87            events_processed: 0,
88            directed,
89            max_node_id: 0,
90        }
91    }
92
93    /// Process a stream event (insert or delete an edge).
94    pub fn process_event(&mut self, event: &StreamEvent) {
95        self.events_processed += 1;
96        match event.op {
97            StreamOp::Insert => self.insert_edge(event.edge.src, event.edge.dst, event.edge.weight),
98            StreamOp::Delete => self.delete_edge(event.edge.src, event.edge.dst),
99        }
100    }
101
102    /// Insert an edge into the graph.
103    pub fn insert_edge(&mut self, src: usize, dst: usize, weight: f64) {
104        self.max_node_id = self.max_node_id.max(src).max(dst);
105
106        self.adjacency.entry(src).or_default().insert(dst, weight);
107
108        if !self.directed {
109            self.adjacency.entry(dst).or_default().insert(src, weight);
110        }
111        self.num_edges += 1;
112    }
113
114    /// Delete an edge from the graph.
115    pub fn delete_edge(&mut self, src: usize, dst: usize) {
116        if let Some(neighbors) = self.adjacency.get_mut(&src) {
117            if neighbors.remove(&dst).is_some() {
118                self.num_edges = self.num_edges.saturating_sub(1);
119            }
120        }
121        if !self.directed {
122            if let Some(neighbors) = self.adjacency.get_mut(&dst) {
123                neighbors.remove(&src);
124            }
125        }
126    }
127
128    /// Check if an edge exists.
129    pub fn has_edge(&self, src: usize, dst: usize) -> bool {
130        self.adjacency
131            .get(&src)
132            .is_some_and(|neighbors| neighbors.contains_key(&dst))
133    }
134
135    /// Get the degree of a node.
136    pub fn degree(&self, node: usize) -> usize {
137        self.adjacency
138            .get(&node)
139            .map_or(0, |neighbors| neighbors.len())
140    }
141
142    /// Get the number of nodes.
143    pub fn num_nodes(&self) -> usize {
144        self.adjacency.len()
145    }
146
147    /// Get the number of edges.
148    pub fn num_edges(&self) -> usize {
149        self.num_edges
150    }
151
152    /// Get the number of events processed.
153    pub fn events_processed(&self) -> u64 {
154        self.events_processed
155    }
156
157    /// Get neighbors of a node.
158    pub fn neighbors(&self, node: usize) -> Vec<(usize, f64)> {
159        self.adjacency
160            .get(&node)
161            .map_or_else(Vec::new, |neighbors| {
162                neighbors.iter().map(|(&n, &w)| (n, w)).collect()
163            })
164    }
165
166    /// Get the degree distribution as a histogram.
167    pub fn degree_distribution(&self) -> DegreeDistribution {
168        let mut dist = HashMap::new();
169        let mut max_degree = 0;
170        let mut total_degree = 0;
171
172        for neighbors in self.adjacency.values() {
173            let deg = neighbors.len();
174            *dist.entry(deg).or_insert(0usize) += 1;
175            max_degree = max_degree.max(deg);
176            total_degree += deg;
177        }
178
179        let n = self.adjacency.len();
180        let avg_degree = if n > 0 {
181            total_degree as f64 / n as f64
182        } else {
183            0.0
184        };
185
186        DegreeDistribution {
187            histogram: dist,
188            max_degree,
189            avg_degree,
190            num_nodes: n,
191        }
192    }
193
194    /// Snapshot the current graph state as a CSR graph.
195    pub fn to_csr(&self) -> Result<CsrGraph> {
196        let num_nodes = if self.adjacency.is_empty() {
197            0
198        } else {
199            self.max_node_id + 1
200        };
201
202        let mut edges = Vec::with_capacity(self.num_edges);
203        for (&src, neighbors) in &self.adjacency {
204            for (&dst, &weight) in neighbors {
205                if self.directed || src <= dst {
206                    edges.push((src, dst, weight));
207                }
208            }
209        }
210
211        CsrGraph::from_edges(num_nodes, edges, self.directed)
212    }
213}
214
215/// Degree distribution statistics.
216#[derive(Debug, Clone)]
217pub struct DegreeDistribution {
218    /// Histogram: degree -> count
219    pub histogram: HashMap<usize, usize>,
220    /// Maximum degree observed
221    pub max_degree: usize,
222    /// Average degree
223    pub avg_degree: f64,
224    /// Number of nodes
225    pub num_nodes: usize,
226}
227
228// ────────────────────────────────────────────────────────────────────────────
229// Streaming Triangle Counter (Doulion-style)
230// ────────────────────────────────────────────────────────────────────────────
231
232/// Approximate streaming triangle counter using edge sampling (Doulion algorithm).
233///
234/// The Doulion algorithm samples each edge with probability `p` and counts
235/// triangles in the sampled subgraph. The triangle count is then scaled by `1/p^3`
236/// to estimate the total.
237///
238/// # Reference
239/// Tsourakakis et al., "Doulion: Counting Triangles in Massive Graphs with
240/// a Coin", KDD 2009.
241#[derive(Debug)]
242pub struct DoulionTriangleCounter {
243    /// Sampling probability
244    sample_prob: f64,
245    /// Sampled edges as adjacency sets
246    sampled_adj: HashMap<usize, HashSet<usize>>,
247    /// Number of triangles found in sampled subgraph
248    sampled_triangles: usize,
249    /// Number of edges processed
250    edges_processed: u64,
251    /// Number of edges sampled
252    edges_sampled: u64,
253    /// RNG for sampling
254    rng: StdRng,
255}
256
257impl DoulionTriangleCounter {
258    /// Create a new Doulion triangle counter with given sampling probability.
259    ///
260    /// Lower `sample_prob` uses less memory but gives less accurate estimates.
261    /// A value of 0.1 is reasonable for graphs with millions of edges.
262    pub fn new(sample_prob: f64, seed: u64) -> Result<Self> {
263        if !(0.0..=1.0).contains(&sample_prob) {
264            return Err(GraphError::InvalidGraph(
265                "sample_prob must be in [0, 1]".to_string(),
266            ));
267        }
268        Ok(Self {
269            sample_prob,
270            sampled_adj: HashMap::new(),
271            sampled_triangles: 0,
272            edges_processed: 0,
273            edges_sampled: 0,
274            rng: StdRng::seed_from_u64(seed),
275        })
276    }
277
278    /// Process a new edge from the stream.
279    ///
280    /// With probability `p`, the edge is sampled. If both endpoints are already
281    /// in the sample, we check for triangles formed.
282    pub fn process_edge(&mut self, src: usize, dst: usize) {
283        self.edges_processed += 1;
284
285        // Sample this edge with probability p
286        if self.rng.random::<f64>() >= self.sample_prob {
287            return;
288        }
289
290        self.edges_sampled += 1;
291
292        // Before adding the edge, count new triangles formed
293        // A triangle is formed if there exists a node w such that
294        // w is a neighbor of both src and dst in the sampled graph
295        let neighbors_src: HashSet<usize> = self.sampled_adj.get(&src).cloned().unwrap_or_default();
296        let neighbors_dst: HashSet<usize> = self.sampled_adj.get(&dst).cloned().unwrap_or_default();
297
298        // Count common neighbors
299        let common = neighbors_src.intersection(&neighbors_dst).count();
300        self.sampled_triangles += common;
301
302        // Add the edge to the sample
303        self.sampled_adj.entry(src).or_default().insert(dst);
304        self.sampled_adj.entry(dst).or_default().insert(src);
305    }
306
307    /// Get the estimated total number of triangles.
308    pub fn estimated_triangles(&self) -> f64 {
309        if self.sample_prob <= 0.0 {
310            return 0.0;
311        }
312        // Scale by 1/p^3
313        let p3 = self.sample_prob * self.sample_prob * self.sample_prob;
314        self.sampled_triangles as f64 / p3
315    }
316
317    /// Get the number of sampled triangles (unscaled).
318    pub fn sampled_triangles(&self) -> usize {
319        self.sampled_triangles
320    }
321
322    /// Get statistics about the counter.
323    pub fn stats(&self) -> TriangleCounterStats {
324        TriangleCounterStats {
325            edges_processed: self.edges_processed,
326            edges_sampled: self.edges_sampled,
327            sampled_triangles: self.sampled_triangles,
328            estimated_triangles: self.estimated_triangles(),
329            sample_prob: self.sample_prob,
330            memory_nodes: self.sampled_adj.len(),
331        }
332    }
333}
334
335/// Statistics from a streaming triangle counter.
336#[derive(Debug, Clone)]
337pub struct TriangleCounterStats {
338    /// Total edges processed from the stream
339    pub edges_processed: u64,
340    /// Number of edges retained in the sample
341    pub edges_sampled: u64,
342    /// Triangles found in the sample
343    pub sampled_triangles: usize,
344    /// Estimated total triangles (scaled)
345    pub estimated_triangles: f64,
346    /// Sampling probability used
347    pub sample_prob: f64,
348    /// Number of nodes stored in memory
349    pub memory_nodes: usize,
350}
351
352// ────────────────────────────────────────────────────────────────────────────
353// MASCOT-style Triangle Counter
354// ────────────────────────────────────────────────────────────────────────────
355
356/// MASCOT (Memory-efficient Accurate Sampling for Counting Local Triangles)
357/// streaming triangle counter.
358///
359/// Maintains a fixed-size edge reservoir sample and updates triangle counts
360/// as new edges arrive. More memory-efficient than Doulion for fixed memory budgets.
361///
362/// # Reference
363/// Lim & Kang, "MASCOT: Memory-efficient and Accurate Sampling for Counting
364/// Local Triangles in Graph Streams", KDD 2015.
365#[derive(Debug)]
366pub struct MascotTriangleCounter {
367    /// Maximum number of edges to store
368    max_edges: usize,
369    /// Current edge reservoir
370    edges: Vec<(usize, usize)>,
371    /// Adjacency sets for quick triangle checks
372    adj: HashMap<usize, HashSet<usize>>,
373    /// Global triangle count estimate (with scaling)
374    triangle_estimate: f64,
375    /// Number of edges processed
376    edges_processed: u64,
377    /// RNG for reservoir sampling
378    rng: StdRng,
379}
380
381impl MascotTriangleCounter {
382    /// Create a new MASCOT counter with a fixed edge budget.
383    pub fn new(max_edges: usize, seed: u64) -> Self {
384        Self {
385            max_edges,
386            edges: Vec::with_capacity(max_edges),
387            adj: HashMap::new(),
388            triangle_estimate: 0.0,
389            edges_processed: 0,
390            rng: StdRng::seed_from_u64(seed),
391        }
392    }
393
394    /// Process a new edge from the stream.
395    pub fn process_edge(&mut self, src: usize, dst: usize) {
396        self.edges_processed += 1;
397
398        // Count triangles formed with current sample
399        let neighbors_src: Vec<usize> = self
400            .adj
401            .get(&src)
402            .map_or_else(Vec::new, |s| s.iter().copied().collect());
403        let neighbors_dst: HashSet<usize> = self.adj.get(&dst).cloned().unwrap_or_default();
404
405        let common_count = neighbors_src
406            .iter()
407            .filter(|w| neighbors_dst.contains(w))
408            .count();
409
410        // Scale factor: probability that both other edges are in the sample
411        let t = self.edges_processed;
412        let m = self.max_edges;
413        if t <= m as u64 {
414            // All edges are stored, no scaling needed
415            self.triangle_estimate += common_count as f64;
416        } else {
417            // Reservoir sampling probability for an edge to be in sample
418            let p = m as f64 / t as f64;
419            // Both other edges of triangle must be in sample: scale by 1/p^2
420            if p > 0.0 {
421                self.triangle_estimate += common_count as f64 / (p * p);
422            }
423        }
424
425        // Reservoir sampling: decide whether to include this edge
426        if self.edges.len() < self.max_edges {
427            // Sample not full yet, always include
428            self.edges.push((src, dst));
429            self.adj.entry(src).or_default().insert(dst);
430            self.adj.entry(dst).or_default().insert(src);
431        } else {
432            // Replace a random edge with probability max_edges / edges_processed
433            let j = self.rng.random_range(0..self.edges_processed as usize);
434            if j < self.max_edges {
435                // Remove old edge
436                let (old_src, old_dst) = self.edges[j];
437                if let Some(set) = self.adj.get_mut(&old_src) {
438                    set.remove(&old_dst);
439                }
440                if let Some(set) = self.adj.get_mut(&old_dst) {
441                    set.remove(&old_src);
442                }
443
444                // Insert new edge
445                self.edges[j] = (src, dst);
446                self.adj.entry(src).or_default().insert(dst);
447                self.adj.entry(dst).or_default().insert(src);
448            }
449        }
450    }
451
452    /// Get the estimated triangle count.
453    pub fn estimated_triangles(&self) -> f64 {
454        self.triangle_estimate
455    }
456
457    /// Get counter statistics.
458    pub fn stats(&self) -> TriangleCounterStats {
459        TriangleCounterStats {
460            edges_processed: self.edges_processed,
461            edges_sampled: self.edges.len() as u64,
462            sampled_triangles: 0, // MASCOT tracks scaled estimate directly
463            estimated_triangles: self.triangle_estimate,
464            sample_prob: if self.edges_processed > 0 {
465                self.edges.len() as f64 / self.edges_processed as f64
466            } else {
467                1.0
468            },
469            memory_nodes: self.adj.len(),
470        }
471    }
472}
473
474// ────────────────────────────────────────────────────────────────────────────
475// Sliding Window Graph
476// ────────────────────────────────────────────────────────────────────────────
477
478/// A sliding window graph that maintains only the most recent `W` edges.
479///
480/// As new edges arrive, old edges beyond the window are automatically evicted.
481/// This is useful for maintaining a graph over a time-bounded stream.
482#[derive(Debug)]
483pub struct SlidingWindowGraph {
484    /// Window size (maximum number of edges to retain)
485    window_size: usize,
486    /// Ordered queue of edges (front = oldest)
487    edge_queue: VecDeque<(usize, usize, f64)>,
488    /// Current adjacency representation
489    adjacency: HashMap<usize, HashMap<usize, f64>>,
490    /// Edge count for each (src, dst) pair to handle multi-edges
491    edge_counts: HashMap<(usize, usize), usize>,
492    /// Whether the graph is directed
493    directed: bool,
494    /// Total events processed
495    events_processed: u64,
496}
497
498impl SlidingWindowGraph {
499    /// Create a new sliding window graph.
500    pub fn new(window_size: usize, directed: bool) -> Self {
501        Self {
502            window_size,
503            edge_queue: VecDeque::with_capacity(window_size),
504            adjacency: HashMap::new(),
505            edge_counts: HashMap::new(),
506            directed,
507            events_processed: 0,
508        }
509    }
510
511    /// Process a new edge. If the window is full, the oldest edge is evicted.
512    pub fn process_edge(&mut self, src: usize, dst: usize, weight: f64) {
513        self.events_processed += 1;
514
515        // Evict oldest edge if window is full
516        if self.edge_queue.len() >= self.window_size {
517            if let Some((old_src, old_dst, _old_weight)) = self.edge_queue.pop_front() {
518                self.remove_edge_internal(old_src, old_dst);
519            }
520        }
521
522        // Add new edge
523        self.edge_queue.push_back((src, dst, weight));
524        self.add_edge_internal(src, dst, weight);
525    }
526
527    fn add_edge_internal(&mut self, src: usize, dst: usize, weight: f64) {
528        self.adjacency.entry(src).or_default().insert(dst, weight);
529        *self.edge_counts.entry((src, dst)).or_insert(0) += 1;
530
531        if !self.directed {
532            self.adjacency.entry(dst).or_default().insert(src, weight);
533            *self.edge_counts.entry((dst, src)).or_insert(0) += 1;
534        }
535    }
536
537    fn remove_edge_internal(&mut self, src: usize, dst: usize) {
538        let key = (src, dst);
539        if let Some(count) = self.edge_counts.get_mut(&key) {
540            *count = count.saturating_sub(1);
541            if *count == 0 {
542                self.edge_counts.remove(&key);
543                if let Some(neighbors) = self.adjacency.get_mut(&src) {
544                    neighbors.remove(&dst);
545                    if neighbors.is_empty() {
546                        self.adjacency.remove(&src);
547                    }
548                }
549            }
550        }
551
552        if !self.directed {
553            let rev_key = (dst, src);
554            if let Some(count) = self.edge_counts.get_mut(&rev_key) {
555                *count = count.saturating_sub(1);
556                if *count == 0 {
557                    self.edge_counts.remove(&rev_key);
558                    if let Some(neighbors) = self.adjacency.get_mut(&dst) {
559                        neighbors.remove(&src);
560                        if neighbors.is_empty() {
561                            self.adjacency.remove(&dst);
562                        }
563                    }
564                }
565            }
566        }
567    }
568
569    /// Get the current number of edges in the window.
570    pub fn num_edges(&self) -> usize {
571        self.edge_queue.len()
572    }
573
574    /// Get the current number of active nodes.
575    pub fn num_nodes(&self) -> usize {
576        self.adjacency.len()
577    }
578
579    /// Get the window size.
580    pub fn window_size(&self) -> usize {
581        self.window_size
582    }
583
584    /// Get neighbors of a node in the current window.
585    pub fn neighbors(&self, node: usize) -> Vec<(usize, f64)> {
586        self.adjacency
587            .get(&node)
588            .map_or_else(Vec::new, |neighbors| {
589                neighbors.iter().map(|(&n, &w)| (n, w)).collect()
590            })
591    }
592
593    /// Get the degree of a node.
594    pub fn degree(&self, node: usize) -> usize {
595        self.adjacency
596            .get(&node)
597            .map_or(0, |neighbors| neighbors.len())
598    }
599
600    /// Check if an edge exists in the current window.
601    pub fn has_edge(&self, src: usize, dst: usize) -> bool {
602        self.adjacency
603            .get(&src)
604            .is_some_and(|n| n.contains_key(&dst))
605    }
606
607    /// Get total events processed.
608    pub fn events_processed(&self) -> u64 {
609        self.events_processed
610    }
611
612    /// Take a snapshot of the current window as a CSR graph.
613    pub fn to_csr(&self) -> Result<CsrGraph> {
614        let max_node = self
615            .adjacency
616            .keys()
617            .chain(self.adjacency.values().flat_map(|n| n.keys()))
618            .copied()
619            .max()
620            .map_or(0, |m| m + 1);
621
622        let mut edges = Vec::new();
623        for (&src, neighbors) in &self.adjacency {
624            for (&dst, &weight) in neighbors {
625                if self.directed || src <= dst {
626                    edges.push((src, dst, weight));
627                }
628            }
629        }
630
631        CsrGraph::from_edges(max_node, edges, self.directed)
632    }
633}
634
635// ────────────────────────────────────────────────────────────────────────────
636// Memory-Bounded Stream Processor
637// ────────────────────────────────────────────────────────────────────────────
638
639/// Configuration for memory-bounded stream processing.
640#[derive(Debug, Clone)]
641pub struct MemoryBoundedConfig {
642    /// Maximum memory budget in bytes
643    pub max_memory_bytes: usize,
644    /// Eviction strategy when memory is exceeded
645    pub eviction_strategy: EvictionStrategy,
646    /// Whether to track degree distribution
647    pub track_degrees: bool,
648    /// Whether to count triangles
649    pub count_triangles: bool,
650    /// Triangle counting sample probability
651    pub triangle_sample_prob: f64,
652}
653
654impl Default for MemoryBoundedConfig {
655    fn default() -> Self {
656        Self {
657            max_memory_bytes: 100 * 1024 * 1024, // 100 MB
658            eviction_strategy: EvictionStrategy::LeastRecentEdge,
659            track_degrees: true,
660            count_triangles: false,
661            triangle_sample_prob: 0.1,
662        }
663    }
664}
665
666/// Strategy for evicting data when memory budget is exceeded.
667#[derive(Debug, Clone, Copy, PartialEq, Eq)]
668pub enum EvictionStrategy {
669    /// Remove the oldest edges first (FIFO)
670    LeastRecentEdge,
671    /// Remove edges from the lowest-degree nodes first
672    LowestDegreeNode,
673    /// Random edge removal
674    RandomEdge,
675}
676
677/// A memory-bounded stream processor that enforces a memory budget.
678///
679/// Processes edge events and maintains approximate graph statistics
680/// while staying within a configurable memory limit.
681#[derive(Debug)]
682pub struct MemoryBoundedProcessor {
683    /// Configuration
684    config: MemoryBoundedConfig,
685    /// The underlying streaming graph
686    graph: StreamingGraph,
687    /// Edge insertion order (for LeastRecentEdge eviction)
688    insertion_order: VecDeque<(usize, usize)>,
689    /// Approximate memory usage in bytes
690    estimated_memory: usize,
691    /// Edges evicted
692    edges_evicted: u64,
693    /// RNG for random eviction
694    rng: StdRng,
695}
696
697impl MemoryBoundedProcessor {
698    /// Create a new memory-bounded processor.
699    pub fn new(config: MemoryBoundedConfig) -> Self {
700        Self {
701            graph: StreamingGraph::new(false),
702            insertion_order: VecDeque::new(),
703            estimated_memory: 0,
704            edges_evicted: 0,
705            rng: StdRng::seed_from_u64(42),
706            config,
707        }
708    }
709
710    /// Process an edge event, evicting old edges if memory budget is exceeded.
711    pub fn process_edge(&mut self, src: usize, dst: usize, weight: f64) {
712        // Estimate memory for this edge (~80 bytes for HashMap entries + overhead)
713        let edge_memory_estimate = 80;
714
715        // Evict edges if over budget
716        while self.estimated_memory + edge_memory_estimate > self.config.max_memory_bytes
717            && !self.insertion_order.is_empty()
718        {
719            self.evict_one();
720        }
721
722        // Insert the new edge
723        self.graph.insert_edge(src, dst, weight);
724        self.insertion_order.push_back((src, dst));
725        self.estimated_memory += edge_memory_estimate;
726    }
727
728    fn evict_one(&mut self) {
729        match self.config.eviction_strategy {
730            EvictionStrategy::LeastRecentEdge => {
731                if let Some((src, dst)) = self.insertion_order.pop_front() {
732                    self.graph.delete_edge(src, dst);
733                    self.estimated_memory = self.estimated_memory.saturating_sub(80);
734                    self.edges_evicted += 1;
735                }
736            }
737            EvictionStrategy::LowestDegreeNode => {
738                // Find the node with the lowest degree
739                if let Some((&node, _)) = self
740                    .graph
741                    .adjacency
742                    .iter()
743                    .min_by_key(|(_, neighbors)| neighbors.len())
744                {
745                    let neighbors: Vec<usize> = self
746                        .graph
747                        .adjacency
748                        .get(&node)
749                        .map_or_else(Vec::new, |n| n.keys().copied().collect());
750                    for neighbor in neighbors {
751                        self.graph.delete_edge(node, neighbor);
752                        self.estimated_memory = self.estimated_memory.saturating_sub(80);
753                        self.edges_evicted += 1;
754                    }
755                }
756            }
757            EvictionStrategy::RandomEdge => {
758                if !self.insertion_order.is_empty() {
759                    let idx = self.rng.random_range(0..self.insertion_order.len());
760                    if let Some((src, dst)) = self.insertion_order.remove(idx) {
761                        self.graph.delete_edge(src, dst);
762                        self.estimated_memory = self.estimated_memory.saturating_sub(80);
763                        self.edges_evicted += 1;
764                    }
765                }
766            }
767        }
768    }
769
770    /// Get the current streaming graph.
771    pub fn graph(&self) -> &StreamingGraph {
772        &self.graph
773    }
774
775    /// Get the number of edges evicted.
776    pub fn edges_evicted(&self) -> u64 {
777        self.edges_evicted
778    }
779
780    /// Get estimated memory usage in bytes.
781    pub fn estimated_memory(&self) -> usize {
782        self.estimated_memory
783    }
784
785    /// Get the degree distribution of the current graph.
786    pub fn degree_distribution(&self) -> DegreeDistribution {
787        self.graph.degree_distribution()
788    }
789}
790
791// ────────────────────────────────────────────────────────────────────────────
792// Tests
793// ────────────────────────────────────────────────────────────────────────────
794
795#[cfg(test)]
796mod tests {
797    use super::*;
798
799    // ── StreamingGraph Tests ──
800
801    #[test]
802    fn test_streaming_graph_insert() {
803        let mut g = StreamingGraph::new(false);
804        g.insert_edge(0, 1, 1.0);
805        g.insert_edge(1, 2, 2.0);
806
807        assert_eq!(g.num_edges(), 2);
808        assert_eq!(g.num_nodes(), 3);
809        assert!(g.has_edge(0, 1));
810        assert!(g.has_edge(1, 0)); // undirected
811        assert!(g.has_edge(1, 2));
812    }
813
814    #[test]
815    fn test_streaming_graph_delete() {
816        let mut g = StreamingGraph::new(false);
817        g.insert_edge(0, 1, 1.0);
818        g.insert_edge(1, 2, 2.0);
819        g.delete_edge(0, 1);
820
821        assert_eq!(g.num_edges(), 1);
822        assert!(!g.has_edge(0, 1));
823        assert!(!g.has_edge(1, 0)); // undirected deletion
824        assert!(g.has_edge(1, 2));
825    }
826
827    #[test]
828    fn test_streaming_graph_directed() {
829        let mut g = StreamingGraph::new(true);
830        g.insert_edge(0, 1, 1.0);
831
832        assert!(g.has_edge(0, 1));
833        assert!(!g.has_edge(1, 0)); // directed
834    }
835
836    #[test]
837    fn test_streaming_graph_process_event() {
838        let mut g = StreamingGraph::new(false);
839        let event = StreamEvent {
840            edge: StreamEdge {
841                src: 0,
842                dst: 1,
843                weight: 1.0,
844                timestamp: 0,
845            },
846            op: StreamOp::Insert,
847        };
848        g.process_event(&event);
849        assert_eq!(g.num_edges(), 1);
850        assert_eq!(g.events_processed(), 1);
851
852        let del_event = StreamEvent {
853            edge: StreamEdge {
854                src: 0,
855                dst: 1,
856                weight: 1.0,
857                timestamp: 1,
858            },
859            op: StreamOp::Delete,
860        };
861        g.process_event(&del_event);
862        assert_eq!(g.num_edges(), 0);
863        assert_eq!(g.events_processed(), 2);
864    }
865
866    #[test]
867    fn test_streaming_graph_degree() {
868        let mut g = StreamingGraph::new(false);
869        g.insert_edge(0, 1, 1.0);
870        g.insert_edge(0, 2, 1.0);
871        g.insert_edge(0, 3, 1.0);
872
873        assert_eq!(g.degree(0), 3);
874        assert_eq!(g.degree(1), 1);
875        assert_eq!(g.degree(4), 0); // non-existent node
876    }
877
878    #[test]
879    fn test_streaming_graph_neighbors() {
880        let mut g = StreamingGraph::new(false);
881        g.insert_edge(0, 1, 1.0);
882        g.insert_edge(0, 2, 2.0);
883
884        let mut neighbors = g.neighbors(0);
885        neighbors.sort_by_key(|&(n, _)| n);
886        assert_eq!(neighbors.len(), 2);
887        assert_eq!(neighbors[0].0, 1);
888        assert_eq!(neighbors[1].0, 2);
889    }
890
891    #[test]
892    fn test_streaming_graph_degree_distribution() {
893        let mut g = StreamingGraph::new(false);
894        // Star graph: center=0, spokes=1,2,3,4
895        g.insert_edge(0, 1, 1.0);
896        g.insert_edge(0, 2, 1.0);
897        g.insert_edge(0, 3, 1.0);
898        g.insert_edge(0, 4, 1.0);
899
900        let dist = g.degree_distribution();
901        assert_eq!(dist.max_degree, 4);
902        assert_eq!(dist.num_nodes, 5);
903        // Node 0 has degree 4, nodes 1-4 have degree 1
904        assert_eq!(dist.histogram.get(&4), Some(&1));
905        assert_eq!(dist.histogram.get(&1), Some(&4));
906    }
907
908    #[test]
909    fn test_streaming_graph_to_csr() {
910        let mut g = StreamingGraph::new(false);
911        g.insert_edge(0, 1, 1.0);
912        g.insert_edge(1, 2, 2.0);
913
914        let csr = g.to_csr().expect("to_csr");
915        assert_eq!(csr.num_nodes(), 3);
916        assert!(csr.has_edge(0, 1));
917        assert!(csr.has_edge(1, 0));
918        assert!(csr.has_edge(1, 2));
919    }
920
921    // ── Doulion Triangle Counter Tests ──
922
923    #[test]
924    fn test_doulion_basic() {
925        // Complete graph K4 has 4 triangles
926        let mut counter = DoulionTriangleCounter::new(1.0, 42).expect("new");
927
928        // Add all edges of K4
929        counter.process_edge(0, 1);
930        counter.process_edge(0, 2);
931        counter.process_edge(0, 3);
932        counter.process_edge(1, 2);
933        counter.process_edge(1, 3);
934        counter.process_edge(2, 3);
935
936        // With p=1.0, the estimate should be exact
937        let est = counter.estimated_triangles();
938        assert!((est - 4.0).abs() < 1e-6, "expected 4 triangles, got {est}");
939
940        let stats = counter.stats();
941        assert_eq!(stats.edges_processed, 6);
942        assert_eq!(stats.edges_sampled, 6);
943    }
944
945    #[test]
946    fn test_doulion_no_triangles() {
947        let mut counter = DoulionTriangleCounter::new(1.0, 42).expect("new");
948
949        // Path graph: no triangles
950        counter.process_edge(0, 1);
951        counter.process_edge(1, 2);
952        counter.process_edge(2, 3);
953
954        assert!(counter.estimated_triangles().abs() < 1e-6);
955    }
956
957    #[test]
958    fn test_doulion_sampling() {
959        // With p=0.5, the estimate may differ from exact
960        let mut counter = DoulionTriangleCounter::new(0.5, 42).expect("new");
961
962        // K4 edges
963        counter.process_edge(0, 1);
964        counter.process_edge(0, 2);
965        counter.process_edge(0, 3);
966        counter.process_edge(1, 2);
967        counter.process_edge(1, 3);
968        counter.process_edge(2, 3);
969
970        // The estimate should be non-negative
971        assert!(counter.estimated_triangles() >= 0.0);
972    }
973
974    #[test]
975    fn test_doulion_invalid_prob() {
976        assert!(DoulionTriangleCounter::new(1.5, 42).is_err());
977        assert!(DoulionTriangleCounter::new(-0.1, 42).is_err());
978    }
979
980    // ── MASCOT Triangle Counter Tests ──
981
982    #[test]
983    fn test_mascot_basic() {
984        let mut counter = MascotTriangleCounter::new(100, 42);
985
986        // K4 edges
987        counter.process_edge(0, 1);
988        counter.process_edge(0, 2);
989        counter.process_edge(0, 3);
990        counter.process_edge(1, 2);
991        counter.process_edge(1, 3);
992        counter.process_edge(2, 3);
993
994        // With budget=100 and only 6 edges, all are stored -> exact count
995        let est = counter.estimated_triangles();
996        assert!((est - 4.0).abs() < 1e-6, "expected 4 triangles, got {est}");
997    }
998
999    #[test]
1000    fn test_mascot_stats() {
1001        let mut counter = MascotTriangleCounter::new(100, 42);
1002        counter.process_edge(0, 1);
1003        counter.process_edge(1, 2);
1004
1005        let stats = counter.stats();
1006        assert_eq!(stats.edges_processed, 2);
1007        assert_eq!(stats.edges_sampled, 2);
1008    }
1009
1010    // ── Sliding Window Tests ──
1011
1012    #[test]
1013    fn test_sliding_window_basic() {
1014        let mut sw = SlidingWindowGraph::new(3, false);
1015
1016        sw.process_edge(0, 1, 1.0);
1017        sw.process_edge(1, 2, 2.0);
1018        sw.process_edge(2, 3, 3.0);
1019
1020        assert_eq!(sw.num_edges(), 3);
1021        assert!(sw.has_edge(0, 1));
1022        assert!(sw.has_edge(1, 2));
1023        assert!(sw.has_edge(2, 3));
1024
1025        // Add one more: oldest (0-1) should be evicted
1026        sw.process_edge(3, 4, 4.0);
1027        assert_eq!(sw.num_edges(), 3);
1028        assert!(!sw.has_edge(0, 1)); // evicted
1029        assert!(sw.has_edge(1, 2));
1030        assert!(sw.has_edge(2, 3));
1031        assert!(sw.has_edge(3, 4));
1032    }
1033
1034    #[test]
1035    fn test_sliding_window_directed() {
1036        let mut sw = SlidingWindowGraph::new(5, true);
1037
1038        sw.process_edge(0, 1, 1.0);
1039        assert!(sw.has_edge(0, 1));
1040        assert!(!sw.has_edge(1, 0)); // directed
1041    }
1042
1043    #[test]
1044    fn test_sliding_window_degree() {
1045        let mut sw = SlidingWindowGraph::new(10, false);
1046        sw.process_edge(0, 1, 1.0);
1047        sw.process_edge(0, 2, 1.0);
1048        sw.process_edge(0, 3, 1.0);
1049
1050        assert_eq!(sw.degree(0), 3);
1051        assert_eq!(sw.degree(1), 1);
1052    }
1053
1054    #[test]
1055    fn test_sliding_window_events_processed() {
1056        let mut sw = SlidingWindowGraph::new(2, false);
1057        sw.process_edge(0, 1, 1.0);
1058        sw.process_edge(1, 2, 1.0);
1059        sw.process_edge(2, 3, 1.0); // evicts 0-1
1060
1061        assert_eq!(sw.events_processed(), 3);
1062        assert_eq!(sw.num_edges(), 2);
1063    }
1064
1065    #[test]
1066    fn test_sliding_window_to_csr() {
1067        let mut sw = SlidingWindowGraph::new(10, false);
1068        sw.process_edge(0, 1, 1.0);
1069        sw.process_edge(1, 2, 2.0);
1070
1071        let csr = sw.to_csr().expect("to_csr");
1072        assert_eq!(csr.num_nodes(), 3);
1073        assert!(csr.has_edge(0, 1));
1074    }
1075
1076    // ── Memory-Bounded Processor Tests ──
1077
1078    #[test]
1079    fn test_memory_bounded_basic() {
1080        let config = MemoryBoundedConfig {
1081            max_memory_bytes: 400, // Very small: ~5 edges
1082            eviction_strategy: EvictionStrategy::LeastRecentEdge,
1083            track_degrees: true,
1084            count_triangles: false,
1085            triangle_sample_prob: 0.1,
1086        };
1087        let mut proc = MemoryBoundedProcessor::new(config);
1088
1089        for i in 0..20 {
1090            proc.process_edge(i, i + 1, 1.0);
1091        }
1092
1093        // Some edges should have been evicted
1094        assert!(proc.edges_evicted() > 0);
1095        assert!(proc.estimated_memory() <= 400);
1096    }
1097
1098    #[test]
1099    fn test_memory_bounded_degree_dist() {
1100        let config = MemoryBoundedConfig {
1101            max_memory_bytes: 10_000,
1102            ..Default::default()
1103        };
1104        let mut proc = MemoryBoundedProcessor::new(config);
1105
1106        proc.process_edge(0, 1, 1.0);
1107        proc.process_edge(0, 2, 1.0);
1108        proc.process_edge(0, 3, 1.0);
1109
1110        let dist = proc.degree_distribution();
1111        assert!(dist.num_nodes > 0);
1112    }
1113
1114    #[test]
1115    fn test_memory_bounded_eviction_strategies() {
1116        for strategy in &[
1117            EvictionStrategy::LeastRecentEdge,
1118            EvictionStrategy::RandomEdge,
1119        ] {
1120            let config = MemoryBoundedConfig {
1121                max_memory_bytes: 200,
1122                eviction_strategy: *strategy,
1123                ..Default::default()
1124            };
1125            let mut proc = MemoryBoundedProcessor::new(config);
1126
1127            for i in 0..10 {
1128                proc.process_edge(i, i + 1, 1.0);
1129            }
1130
1131            // Should not crash and should evict
1132            assert!(proc.edges_evicted() > 0 || proc.estimated_memory() <= 200);
1133        }
1134    }
1135
1136    #[test]
1137    fn test_streaming_graph_empty() {
1138        let g = StreamingGraph::new(false);
1139        assert_eq!(g.num_nodes(), 0);
1140        assert_eq!(g.num_edges(), 0);
1141        assert!(g.neighbors(0).is_empty());
1142        assert_eq!(g.degree(0), 0);
1143    }
1144
1145    #[test]
1146    fn test_streaming_graph_delete_nonexistent() {
1147        let mut g = StreamingGraph::new(false);
1148        g.insert_edge(0, 1, 1.0);
1149        g.delete_edge(5, 6); // should not crash
1150        assert_eq!(g.num_edges(), 1);
1151    }
1152
1153    #[test]
1154    fn test_sliding_window_single_capacity() {
1155        let mut sw = SlidingWindowGraph::new(1, false);
1156        sw.process_edge(0, 1, 1.0);
1157        assert_eq!(sw.num_edges(), 1);
1158        assert!(sw.has_edge(0, 1));
1159
1160        sw.process_edge(2, 3, 2.0);
1161        assert_eq!(sw.num_edges(), 1);
1162        assert!(!sw.has_edge(0, 1)); // evicted
1163        assert!(sw.has_edge(2, 3));
1164    }
1165}