Skip to main content

differential_dataflow/algorithms/graphs/
propagate.rs

1//! Directed label reachability.
2
3use std::hash::Hash;
4
5use timely::dataflow::*;
6
7use crate::{VecCollection, ExchangeData};
8use crate::lattice::Lattice;
9use crate::difference::{Abelian, Multiply};
10
11/// Propagates labels forward, retaining the minimum label.
12///
13/// This algorithm naively propagates all labels at once, much like standard label propagation.
14/// To more carefully control the label propagation, consider `propagate_core` which supports a
15/// method to limit the introduction of labels.
16pub fn propagate<G, N, L, R>(edges: VecCollection<G, (N,N), R>, nodes: VecCollection<G,(N,L),R>) -> VecCollection<G,(N,L),R>
17where
18    G: Scope<Timestamp: Lattice+Ord+Hash>,
19    N: ExchangeData+Hash,
20    R: ExchangeData+Abelian,
21    R: Multiply<R, Output=R>,
22    R: From<i8>,
23    L: ExchangeData,
24{
25    propagate_core(edges.arrange_by_key(), nodes, |_label| 0)
26}
27
28/// Propagates labels forward, retaining the minimum label.
29///
30/// This algorithm naively propagates all labels at once, much like standard label propagation.
31/// To more carefully control the label propagation, consider `propagate_core` which supports a
32/// method to limit the introduction of labels.
33pub fn propagate_at<G, N, L, F, R>(edges: VecCollection<G, (N,N), R>, nodes: VecCollection<G,(N,L),R>, logic: F) -> VecCollection<G,(N,L),R>
34where
35    G: Scope<Timestamp: Lattice+Ord+Hash>,
36    N: ExchangeData+Hash,
37    R: ExchangeData+Abelian,
38    R: Multiply<R, Output=R>,
39    R: From<i8>,
40    L: ExchangeData,
41    F: Fn(&L)->u64+Clone+'static,
42{
43    propagate_core(edges.arrange_by_key(), nodes, logic)
44}
45
46use crate::trace::TraceReader;
47use crate::operators::arrange::arrangement::Arranged;
48
49/// Propagates labels forward, retaining the minimum label.
50///
51/// This variant takes a pre-arranged edge collection, to facilitate re-use, and allows
52/// a method `logic` to specify the rounds in which we introduce various labels. The output
53/// of `logic should be a number in the interval \[0,64\],
54pub fn propagate_core<G, N, L, Tr, F, R>(edges: Arranged<G,Tr>, nodes: VecCollection<G,(N,L),R>, logic: F) -> VecCollection<G,(N,L),R>
55where
56    G: Scope<Timestamp=Tr::Time>,
57    N: ExchangeData+Hash,
58    R: ExchangeData+Abelian,
59    R: Multiply<R, Output=R>,
60    R: From<i8>,
61    L: ExchangeData,
62    Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Time:Hash, Diff=R>+Clone+'static,
63    F: Fn(&L)->u64+Clone+'static,
64{
65    // Morally the code performs the following iterative computation. However, in the interest of a simplified
66    // dataflow graph and reduced memory footprint we instead have a wordier version below. The core differences
67    // between the two are that 1. the former filters its input and pretends to perform non-monotonic computation,
68    // whereas the latter creates an initially empty monotonic iteration variable, and 2. the latter rotates the
69    // iterative computation so that the arrangement produced by `reduce` can be re-used.
70
71    // nodes.filter(|_| false)
72    //      .iterate(|scope, inner| {
73    //          let edges = edges.enter(&scope);
74    //          let nodes = nodes.enter_at(&scope, move |r| 256 * (64 - (logic(&r.1)).leading_zeros() as u64));
75    //          inner.join_map(edges, |_k,l,d| (d.clone(),l.clone()))
76    //               .concat(nodes)
77    //               .reduce(|_, s, t| t.push((s[0].0.clone(), 1)))
78    //      })
79
80    nodes.scope().iterative::<usize,_,_>(|scope| {
81
82        use crate::operators::iterate::Variable;
83        use crate::trace::implementations::{ValBuilder, ValSpine};
84
85        use timely::order::Product;
86
87        let edges = edges.enter(scope);
88        let nodes = nodes.enter_at(scope, move |r| 256 * (64 - (logic(&r.1)).leading_zeros() as usize));
89
90        let (proposals_bind, proposals) = Variable::new(scope, Product::new(Default::default(), 1usize));
91
92        let labels =
93        proposals
94            .concat(nodes)
95            .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1_i8))));
96
97        let propagate: VecCollection<_, (N, L), R> =
98        labels
99            .clone()
100            .join_core(edges, |_k, l: &L, d| Some((d.clone(), l.clone())));
101
102        proposals_bind.set(propagate);
103
104        labels
105            .as_collection(|k,v| (k.clone(), v.clone()))
106            .leave()
107    })
108}