Skip to main content

differential_dataflow/algorithms/graphs/
scc.rs

1//! Strongly connected component structure.
2
3use std::mem;
4use std::hash::Hash;
5
6use timely::dataflow::*;
7
8use crate::{VecCollection, ExchangeData};
9use crate::operators::*;
10use crate::lattice::Lattice;
11use crate::difference::{Abelian, Multiply};
12
13use super::propagate::propagate;
14
15/// Iteratively removes nodes with no in-edges.
16pub fn trim<G, N, R>(graph: VecCollection<G, (N,N), R>) -> VecCollection<G, (N,N), R>
17where
18    G: Scope<Timestamp: Lattice+Ord>,
19    N: ExchangeData + Hash,
20    R: ExchangeData + Abelian,
21    R: Multiply<R, Output=R>,
22    R: From<i8>,
23{
24    graph.clone().iterate(|scope, edges| {
25        // keep edges from active edge destinations.
26        let graph = graph.enter(&scope);
27        let active =
28        edges.map(|(_src,dst)| dst)
29             .threshold(|_,c| if c.is_zero() { R::from(0_i8) } else { R::from(1_i8) });
30
31        graph.semijoin(active)
32    })
33}
34
35/// Returns the subset of edges in the same strongly connected component.
36pub fn strongly_connected<G, N, R>(graph: VecCollection<G, (N,N), R>) -> VecCollection<G, (N,N), R>
37where
38    G: Scope<Timestamp: Lattice+Ord+Hash>,
39    N: ExchangeData + Hash,
40    R: ExchangeData + Abelian,
41    R: Multiply<R, Output=R>,
42    R: From<i8>
43{
44    graph.clone().iterate(|scope, inner| {
45        let edges = graph.enter(&scope);
46        let trans = edges.clone().map_in_place(|x| mem::swap(&mut x.0, &mut x.1));
47        trim_edges(trim_edges(inner, edges), trans)
48    })
49}
50
51fn trim_edges<G, N, R>(cycle: VecCollection<G, (N,N), R>, edges: VecCollection<G, (N,N), R>)
52    -> VecCollection<G, (N,N), R>
53where
54    G: Scope<Timestamp: Lattice+Ord+Hash>,
55    N: ExchangeData + Hash,
56    R: ExchangeData + Abelian,
57    R: Multiply<R, Output=R>,
58    R: From<i8>
59{
60    let nodes = edges.clone()
61                     .map_in_place(|x| x.0 = x.1.clone())
62                     .consolidate();
63
64    // NOTE: With a node -> int function, can be improved by:
65    // let labels = propagate_at(&cycle, &nodes, |x| *x as u64);
66    let labels = propagate(cycle, nodes);
67
68    edges.join_map(labels.clone(), |e1,e2,l1| (e2.clone(),(e1.clone(),l1.clone())))
69         .join_map(labels, |e2,(e1,l1),l2| ((e1.clone(),e2.clone()),(l1.clone(),l2.clone())))
70         .filter(|(_,(l1,l2))| l1 == l2)
71         .map(|((x1,x2),_)| (x2,x1))
72}