Skip to main content

differential_dataflow/algorithms/graphs/
scc.rs

1//! Strongly connected component structure.
2
3use std::mem;
4use std::hash::Hash;
5
6use timely::dataflow::*;
7
8use crate::{VecCollection, ExchangeData};
9use crate::lattice::Lattice;
10use crate::difference::{Abelian, Multiply};
11
12use super::propagate::propagate;
13
14/// Returns the subset of edges in the same strongly connected component.
15pub fn strongly_connected<G, N, R>(graph: VecCollection<G, (N,N), R>) -> VecCollection<G, (N,N), R>
16where
17    G: Scope<Timestamp: Lattice+Ord+Hash>,
18    N: ExchangeData + Hash,
19    R: ExchangeData + Abelian,
20    R: Multiply<R, Output=R>,
21    R: From<i8>
22{
23    use timely::order::Product;
24    graph.scope().scoped::<Product<_, usize>,_,_>("StronglyConnected", |scope| {
25        // Bring in edges and transposed edges.
26        let edges = graph.enter(&scope);
27        let trans = edges.clone().map_in_place(|x| mem::swap(&mut x.0, &mut x.1));
28        // Create a new variable that will be intra-scc edges.
29        use crate::operators::iterate::Variable;
30        let (variable, inner) = Variable::new_from(edges.clone(), Product::new(Default::default(), 1));
31
32        let result = trim_edges(trim_edges(inner, edges), trans);
33        variable.set(result.clone());
34        result.leave()
35    })
36}
37
38fn trim_edges<G, N, R>(cycle: VecCollection<G, (N,N), R>, edges: VecCollection<G, (N,N), R>)
39    -> VecCollection<G, (N,N), R>
40where
41    G: Scope<Timestamp: Lattice+Ord+Hash>,
42    N: ExchangeData + Hash,
43    R: ExchangeData + Abelian,
44    R: Multiply<R, Output=R>,
45    R: From<i8>
46{
47    edges.inner.scope().region_named("TrimEdges", |region| {
48        let cycle = cycle.enter_region(region);
49        let edges = edges.enter_region(region);
50
51        let nodes = edges.clone()
52                         .map_in_place(|x| x.0 = x.1.clone())
53                         .consolidate();
54
55        // NOTE: With a node -> int function, can be improved by:
56        // let labels = propagate_at(&cycle, &nodes, |x| *x as u64);
57        let labels = propagate(cycle, nodes).arrange_by_key();
58
59        edges.arrange_by_key()
60             .join_core(labels.clone(), |e1,e2,l1| [(e2.clone(),(e1.clone(),l1.clone()))])
61             .arrange_by_key()
62             .join_core(labels, |e2,(e1,l1),l2| [((e1.clone(),e2.clone()),(l1.clone(),l2.clone()))])
63             .filter(|(_,(l1,l2))| l1 == l2)
64             .map(|((x1,x2),_)| (x2,x1))
65             .leave_region()
66    })
67}