use std::collections::{HashMap, HashSet, VecDeque};
#[derive(Debug, Clone)]
pub struct StreamConfig {
pub window_size: usize,
pub n_sketch_bits: usize,
pub seed: u64,
}
impl Default for StreamConfig {
fn default() -> Self {
Self {
window_size: 1000,
n_sketch_bits: 64,
seed: 42,
}
}
}
pub struct GraphStream {
inner: Box<dyn Iterator<Item = (usize, usize)>>,
}
impl GraphStream {
pub fn from_edges(edges: Vec<(usize, usize)>) -> Self {
Self {
inner: Box::new(edges.into_iter()),
}
}
pub fn from_fn(mut f: impl FnMut() -> Option<(usize, usize)> + 'static) -> Self {
Self {
inner: Box::new(std::iter::from_fn(f)),
}
}
pub fn next_edge(&mut self) -> Option<(usize, usize)> {
self.inner.next()
}
}
#[derive(Debug)]
pub struct StreamingTriangleCounter {
adjacency_sketch: HashMap<usize, HashSet<usize>>,
triangle_count: f64,
n_edges: usize,
reservoir: VecDeque<(usize, usize)>,
window_size: usize,
}
impl StreamingTriangleCounter {
pub fn new(config: StreamConfig) -> Self {
Self {
adjacency_sketch: HashMap::new(),
triangle_count: 0.0,
n_edges: 0,
reservoir: VecDeque::new(),
window_size: config.window_size,
}
}
pub fn process_edge(&mut self, u: usize, v: usize) {
self.n_edges += 1;
let neighbours_u: HashSet<usize> =
self.adjacency_sketch.get(&u).cloned().unwrap_or_default();
let neighbours_v: HashSet<usize> =
self.adjacency_sketch.get(&v).cloned().unwrap_or_default();
let common = neighbours_u.intersection(&neighbours_v).count();
let scale = if self.n_edges <= self.window_size {
1.0
} else {
let m = self.window_size as f64;
let n = self.n_edges as f64;
(n / m) * (n / m)
};
self.triangle_count += common as f64 * scale;
self.adjacency_sketch.entry(u).or_default().insert(v);
self.adjacency_sketch.entry(v).or_default().insert(u);
self.reservoir.push_back((u, v));
if self.reservoir.len() > self.window_size {
if let Some((eu, ev)) = self.reservoir.pop_front() {
if let Some(set) = self.adjacency_sketch.get_mut(&eu) {
set.remove(&ev);
}
if let Some(set) = self.adjacency_sketch.get_mut(&ev) {
set.remove(&eu);
}
}
}
}
pub fn estimate_triangles(&self) -> f64 {
self.triangle_count
}
pub fn process_stream(&mut self, stream: &mut GraphStream) -> f64 {
while let Some((u, v)) = stream.next_edge() {
self.process_edge(u, v);
}
self.estimate_triangles()
}
}
#[derive(Debug, Clone)]
pub struct StreamingBfsConfig {
pub source: usize,
pub max_dist: usize,
pub memory_limit: usize,
}
impl Default for StreamingBfsConfig {
fn default() -> Self {
Self {
source: 0,
max_dist: usize::MAX,
memory_limit: 10_000,
}
}
}
#[derive(Debug, Clone)]
pub struct StreamBfsResult {
pub distances: HashMap<usize, usize>,
pub n_passes: usize,
pub n_vertices_reached: usize,
}
pub fn streaming_bfs(stream: &mut GraphStream, config: &StreamingBfsConfig) -> StreamBfsResult {
let mut edges: Vec<(usize, usize)> = Vec::new();
while let Some(e) = stream.next_edge() {
edges.push(e);
}
let source = config.source;
let mut distances: HashMap<usize, usize> = HashMap::new();
distances.insert(source, 0);
let mut frontier: HashSet<usize> = HashSet::new();
frontier.insert(source);
let mut n_passes = 0usize;
let mut current_dist = 0usize;
while !frontier.is_empty() && current_dist < config.max_dist {
let mut next_frontier: HashSet<usize> = HashSet::new();
for &(u, v) in &edges {
for &(a, b) in &[(u, v), (v, u)] {
if frontier.contains(&a)
&& !distances.contains_key(&b)
&& distances.len() < config.memory_limit
{
distances.insert(b, current_dist + 1);
next_frontier.insert(b);
}
}
}
n_passes += 1;
current_dist += 1;
frontier = next_frontier;
}
let n_vertices_reached = distances.len();
StreamBfsResult {
distances,
n_passes,
n_vertices_reached,
}
}
#[derive(Debug)]
pub struct StreamingDegreeEstimator {
count_min: Vec<Vec<u32>>,
d: usize,
w: usize,
n_edges: usize,
seeds: Vec<u64>,
}
impl StreamingDegreeEstimator {
pub fn new(config: StreamConfig) -> Self {
let d = 4usize;
let bits = config.n_sketch_bits.min(16);
let w = 1usize << bits;
let seeds: Vec<u64> = (0..d)
.map(|i| {
config
.seed
.wrapping_add((i as u64).wrapping_mul(0x9e37_79b9_7f4a_7c15))
})
.collect();
Self {
count_min: vec![vec![0u32; w]; d],
d,
w,
n_edges: 0,
seeds,
}
}
fn hash_vertex(&self, vertex: usize, seed: u64) -> usize {
let mut h = seed ^ (vertex as u64).wrapping_mul(0x9e37_79b9_7f4a_7c15);
h ^= h >> 33;
h = h.wrapping_mul(0xff51_afd7_ed55_8ccd);
h ^= h >> 33;
h = h.wrapping_mul(0xc4ce_b9fe_1a85_ec53);
h ^= h >> 33;
(h as usize) % self.w
}
pub fn process_edge(&mut self, u: usize, v: usize) {
self.n_edges += 1;
for row in 0..self.d {
let seed = self.seeds[row];
let col_u = self.hash_vertex(u, seed);
let col_v = self.hash_vertex(v, seed);
self.count_min[row][col_u] = self.count_min[row][col_u].saturating_add(1);
self.count_min[row][col_v] = self.count_min[row][col_v].saturating_add(1);
}
}
pub fn estimate_degree(&self, vertex: usize) -> u32 {
(0..self.d)
.map(|row| {
let col = self.hash_vertex(vertex, self.seeds[row]);
self.count_min[row][col]
})
.min()
.unwrap_or(0)
}
pub fn approximate_degree_distribution(&self, n_vertices: usize) -> Vec<u32> {
(0..n_vertices).map(|v| self.estimate_degree(v)).collect()
}
}
#[derive(Debug, Default)]
pub struct StreamingUnionFind {
parent: HashMap<usize, usize>,
rank: HashMap<usize, usize>,
}
impl StreamingUnionFind {
pub fn new() -> Self {
Self::default()
}
fn make_set(&mut self, x: usize) {
self.parent.entry(x).or_insert(x);
self.rank.entry(x).or_insert(0);
}
pub fn find(&mut self, x: usize) -> usize {
self.make_set(x);
let mut root = x;
loop {
let p = *self.parent.get(&root).unwrap_or(&root);
if p == root {
break;
}
root = p;
}
let mut current = x;
loop {
let p = *self.parent.get(¤t).unwrap_or(¤t);
if p == root {
break;
}
self.parent.insert(current, root);
current = p;
}
root
}
pub fn process_edge(&mut self, u: usize, v: usize) {
self.make_set(u);
self.make_set(v);
let ru = self.find(u);
let rv = self.find(v);
if ru == rv {
return; }
let rank_u = *self.rank.get(&ru).unwrap_or(&0);
let rank_v = *self.rank.get(&rv).unwrap_or(&0);
match rank_u.cmp(&rank_v) {
std::cmp::Ordering::Less => {
self.parent.insert(ru, rv);
}
std::cmp::Ordering::Greater => {
self.parent.insert(rv, ru);
}
std::cmp::Ordering::Equal => {
self.parent.insert(rv, ru);
self.rank.insert(ru, rank_u + 1);
}
}
}
pub fn n_components(&self) -> usize {
self.parent.iter().filter(|(&node, &p)| node == p).count()
}
pub fn component_id(&mut self, x: usize) -> usize {
self.find(x)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_streaming_triangle_k4() {
let edges = vec![(0, 1), (0, 2), (0, 3), (1, 2), (1, 3), (2, 3)];
let config = StreamConfig {
window_size: 100,
..Default::default()
};
let mut counter = StreamingTriangleCounter::new(config);
let mut stream = GraphStream::from_edges(edges);
let estimate = counter.process_stream(&mut stream);
assert!(
(estimate - 4.0).abs() < 1.0,
"Expected ~4.0 triangles, got {estimate}"
);
}
#[test]
fn test_streaming_union_find_path_graph() {
let mut uf = StreamingUnionFind::new();
for i in 0..4usize {
uf.process_edge(i, i + 1);
}
assert_eq!(uf.n_components(), 1, "Path graph should be one component");
let c0 = uf.component_id(0);
for v in 1..5usize {
assert_eq!(uf.component_id(v), c0);
}
}
#[test]
fn test_streaming_union_find_disconnected() {
let mut uf = StreamingUnionFind::new();
uf.process_edge(0, 1);
uf.process_edge(2, 3);
uf.process_edge(4, 5);
assert_eq!(uf.n_components(), 3);
}
#[test]
fn test_streaming_bfs_small_graph() {
let edges = vec![(0, 1), (1, 2), (2, 3), (1, 3)];
let mut stream = GraphStream::from_edges(edges);
let config = StreamingBfsConfig {
source: 0,
..Default::default()
};
let result = streaming_bfs(&mut stream, &config);
assert_eq!(result.distances[&0], 0);
assert_eq!(result.distances[&1], 1);
assert_eq!(result.distances[&2], 2);
assert_eq!(result.distances[&3], 2);
assert_eq!(result.n_vertices_reached, 4);
}
#[test]
fn test_streaming_bfs_star() {
let edges: Vec<(usize, usize)> = (1..=5).map(|i| (0, i)).collect();
let mut stream = GraphStream::from_edges(edges);
let config = StreamingBfsConfig {
source: 0,
..Default::default()
};
let result = streaming_bfs(&mut stream, &config);
assert_eq!(result.distances[&0], 0);
for leaf in 1..=5usize {
assert_eq!(result.distances[&leaf], 1);
}
}
#[test]
fn test_degree_estimator_known_degrees() {
let edges = vec![(0, 1), (0, 2), (0, 3), (0, 4)];
let config = StreamConfig {
n_sketch_bits: 8,
..Default::default()
};
let mut estimator = StreamingDegreeEstimator::new(config);
for (u, v) in &edges {
estimator.process_edge(*u, *v);
}
let est_deg_0 = estimator.estimate_degree(0);
assert!(
est_deg_0 >= 2,
"Degree estimate for vertex 0 should be >= 2 (true=4), got {est_deg_0}"
);
for v in 1..=4usize {
let est = estimator.estimate_degree(v);
assert!(
est >= 1,
"Degree estimate for leaf {v} should be >= 1, got {est}"
);
}
}
#[test]
fn test_degree_estimator_distribution() {
let edges = vec![(0, 1), (1, 2), (2, 3), (3, 4)];
let config = StreamConfig {
n_sketch_bits: 8,
..Default::default()
};
let mut estimator = StreamingDegreeEstimator::new(config);
for (u, v) in &edges {
estimator.process_edge(*u, *v);
}
let dist = estimator.approximate_degree_distribution(5);
for (v, &est) in dist.iter().enumerate() {
assert!(est >= 1, "Vertex {v} degree estimate {est} should be >= 1");
assert!(est <= 8, "Vertex {v} degree estimate {est} should be <= 8");
}
}
#[test]
fn test_graph_stream_from_fn() {
let data = vec![(0usize, 1usize), (1, 2)];
let mut iter = data.into_iter();
let mut stream = GraphStream::from_fn(move || iter.next());
assert_eq!(stream.next_edge(), Some((0, 1)));
assert_eq!(stream.next_edge(), Some((1, 2)));
assert_eq!(stream.next_edge(), None);
}
}