differential_dataflow/algorithms/graphs/
scc.rs1use std::mem;
4use std::hash::Hash;
5
6use timely::dataflow::*;
7
8use crate::{Collection, ExchangeData};
9use crate::operators::*;
10use crate::lattice::Lattice;
11use crate::difference::{Abelian, Multiply};
12
13use super::propagate::propagate;
14
15pub fn trim<G, N, R>(graph: &Collection<G, (N,N), R>) -> Collection<G, (N,N), R>
17where
18 G: Scope<Timestamp: Lattice+Ord>,
19 N: ExchangeData + Hash,
20 R: ExchangeData + Abelian,
21 R: Multiply<R, Output=R>,
22 R: From<i8>,
23{
24 graph.iterate(|edges| {
25 let active =
27 edges.map(|(_src,dst)| dst)
28 .threshold(|_,c| if c.is_zero() { R::from(0_i8) } else { R::from(1_i8) });
29
30 graph.enter(&edges.scope())
31 .semijoin(&active)
32 })
33}
34
35pub fn strongly_connected<G, N, R>(graph: &Collection<G, (N,N), R>) -> Collection<G, (N,N), R>
37where
38 G: Scope<Timestamp: Lattice + Ord>,
39 N: ExchangeData + Hash,
40 R: ExchangeData + Abelian,
41 R: Multiply<R, Output=R>,
42 R: From<i8>
43{
44 graph.iterate(|inner| {
45 let edges = graph.enter(&inner.scope());
46 let trans = edges.map_in_place(|x| mem::swap(&mut x.0, &mut x.1));
47 trim_edges(&trim_edges(inner, &edges), &trans)
48 })
49}
50
51fn trim_edges<G, N, R>(cycle: &Collection<G, (N,N), R>, edges: &Collection<G, (N,N), R>)
52 -> Collection<G, (N,N), R>
53where
54 G: Scope<Timestamp: Lattice+Ord>,
55 N: ExchangeData + Hash,
56 R: ExchangeData + Abelian,
57 R: Multiply<R, Output=R>,
58 R: From<i8>
59{
60 let nodes = edges.map_in_place(|x| x.0 = x.1.clone())
61 .consolidate();
62
63 let labels = propagate(cycle, &nodes);
66
67 edges.join_map(&labels, |e1,e2,l1| (e2.clone(),(e1.clone(),l1.clone())))
68 .join_map(&labels, |e2,(e1,l1),l2| ((e1.clone(),e2.clone()),(l1.clone(),l2.clone())))
69 .filter(|(_,(l1,l2))| l1 == l2)
70 .map(|((x1,x2),_)| (x2,x1))
71}