differential_dataflow/algorithms/graphs/
scc.rs1use std::mem;
4use std::hash::Hash;
5
6use timely::progress::Timestamp;
7
8use crate::{VecCollection, ExchangeData};
9use crate::lattice::Lattice;
10use crate::difference::{Abelian, Multiply};
11
12use super::propagate::propagate;
13
14pub fn strongly_connected<'scope, T, N, R>(graph: VecCollection<'scope, T, (N,N), R>) -> VecCollection<'scope, T, (N,N), R>
16where
17 T: Timestamp + Lattice + Ord + Hash,
18 N: ExchangeData + Hash,
19 R: ExchangeData + Abelian,
20 R: Multiply<R, Output=R>,
21 R: From<i8>
22{
23 use timely::order::Product;
24 let outer = graph.scope();
25 outer.scoped::<Product<_, usize>,_,_>("StronglyConnected", |scope| {
26 let edges = graph.enter(scope);
28 let trans = edges.clone().map_in_place(|x| mem::swap(&mut x.0, &mut x.1));
29 use crate::operators::iterate::Variable;
31 let (variable, inner) = Variable::new_from(edges.clone(), Product::new(Default::default(), 1));
32
33 let result = trim_edges(trim_edges(inner, edges), trans);
34 variable.set(result.clone());
35 result.leave(outer)
36 })
37}
38
39fn trim_edges<'scope, T, N, R>(cycle: VecCollection<'scope, T, (N,N), R>, edges: VecCollection<'scope, T, (N,N), R>)
40 -> VecCollection<'scope, T, (N,N), R>
41where
42 T: Timestamp + Lattice + Ord + Hash,
43 N: ExchangeData + Hash,
44 R: ExchangeData + Abelian,
45 R: Multiply<R, Output=R>,
46 R: From<i8>
47{
48 let outer = edges.inner.scope();
49 outer.region_named("TrimEdges", |region| {
50 let cycle = cycle.enter_region(region);
51 let edges = edges.enter_region(region);
52
53 let nodes = edges.clone()
54 .map_in_place(|x| x.0 = x.1.clone())
55 .consolidate();
56
57 let labels = propagate(cycle, nodes).arrange_by_key();
60
61 edges.arrange_by_key()
62 .join_core(labels.clone(), |e1,e2,l1| [(e2.clone(),(e1.clone(),l1.clone()))])
63 .arrange_by_key()
64 .join_core(labels, |e2,(e1,l1),l2| [((e1.clone(),e2.clone()),(l1.clone(),l2.clone()))])
65 .filter(|(_,(l1,l2))| l1 == l2)
66 .map(|((x1,x2),_)| (x2,x1))
67 .leave_region(outer)
68 })
69}