differential_dataflow/algorithms/graphs/
propagate.rs

1//! Directed label reachability.
2
3use std::hash::Hash;
4
5use timely::dataflow::*;
6
7use crate::{Collection, ExchangeData};
8use crate::lattice::Lattice;
9use crate::difference::{Abelian, Multiply};
10use crate::operators::arrange::arrangement::ArrangeByKey;
11
12/// Propagates labels forward, retaining the minimum label.
13///
14/// This algorithm naively propagates all labels at once, much like standard label propagation.
15/// To more carefully control the label propagation, consider `propagate_core` which supports a
16/// method to limit the introduction of labels.
17pub fn propagate<G, N, L, R>(edges: &Collection<G, (N,N), R>, nodes: &Collection<G,(N,L),R>) -> Collection<G,(N,L),R>
18where
19    G: Scope,
20    G::Timestamp: Lattice+Ord,
21    N: ExchangeData+Hash,
22    R: ExchangeData+Abelian,
23    R: Multiply<R, Output=R>,
24    R: From<i8>,
25    L: ExchangeData,
26{
27    propagate_core(&edges.arrange_by_key(), nodes, |_label| 0)
28}
29
30/// Propagates labels forward, retaining the minimum label.
31///
32/// This algorithm naively propagates all labels at once, much like standard label propagation.
33/// To more carefully control the label propagation, consider `propagate_core` which supports a
34/// method to limit the introduction of labels.
35pub fn propagate_at<G, N, L, F, R>(edges: &Collection<G, (N,N), R>, nodes: &Collection<G,(N,L),R>, logic: F) -> Collection<G,(N,L),R>
36where
37    G: Scope,
38    G::Timestamp: Lattice+Ord,
39    N: ExchangeData+Hash,
40    R: ExchangeData+Abelian,
41    R: Multiply<R, Output=R>,
42    R: From<i8>,
43    L: ExchangeData,
44    F: Fn(&L)->u64+Clone+'static,
45{
46    propagate_core(&edges.arrange_by_key(), nodes, logic)
47}
48
49use crate::trace::TraceReader;
50use crate::operators::arrange::arrangement::Arranged;
51
52/// Propagates labels forward, retaining the minimum label.
53///
54/// This variant takes a pre-arranged edge collection, to facilitate re-use, and allows
55/// a method `logic` to specify the rounds in which we introduce various labels. The output
56/// of `logic should be a number in the interval [0,64],
57pub fn propagate_core<G, N, L, Tr, F, R>(edges: &Arranged<G,Tr>, nodes: &Collection<G,(N,L),R>, logic: F) -> Collection<G,(N,L),R>
58where
59    G: Scope<Timestamp=Tr::Time>,
60    N: ExchangeData+Hash,
61    R: ExchangeData+Abelian,
62    R: Multiply<R, Output=R>,
63    R: From<i8>,
64    L: ExchangeData,
65    Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Diff=R>+Clone+'static,
66    F: Fn(&L)->u64+Clone+'static,
67{
68    // Morally the code performs the following iterative computation. However, in the interest of a simplified
69    // dataflow graph and reduced memory footprint we instead have a wordier version below. The core differences
70    // between the two are that 1. the former filters its input and pretends to perform non-monotonic computation,
71    // whereas the latter creates an initially empty monotonic iteration variable, and 2. the latter rotates the
72    // iterative computation so that the arrangement produced by `reduce` can be re-used.
73
74    // nodes.filter(|_| false)
75    //      .iterate(|inner| {
76    //          let edges = edges.enter(&inner.scope());
77    //          let nodes = nodes.enter_at(&inner.scope(), move |r| 256 * (64 - (logic(&r.1)).leading_zeros() as u64));
78    //          inner.join_map(&edges, |_k,l,d| (d.clone(),l.clone()))
79    //               .concat(&nodes)
80    //               .reduce(|_, s, t| t.push((s[0].0.clone(), 1)))
81    //      })
82
83    nodes.scope().iterative::<usize,_,_>(|scope| {
84
85        use crate::operators::reduce::ReduceCore;
86        use crate::operators::iterate::SemigroupVariable;
87        use crate::trace::implementations::{ValBuilder, ValSpine};
88
89        use timely::order::Product;
90
91        let edges = edges.enter(scope);
92        let nodes = nodes.enter_at(scope, move |r| 256 * (64 - (logic(&r.1)).leading_zeros() as usize));
93
94        let proposals = SemigroupVariable::new(scope, Product::new(Default::default(), 1usize));
95
96        let labels =
97        proposals
98            .concat(&nodes)
99            .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1_i8))));
100
101        let propagate: Collection<_, (N, L), R> =
102        labels
103            .join_core(&edges, |_k, l: &L, d| Some((d.clone(), l.clone())));
104
105        proposals.set(&propagate);
106
107        labels
108            .as_collection(|k,v| (k.clone(), v.clone()))
109            .leave()
110    })
111}