differential_dataflow/algorithms/graphs/
propagate.rs1use std::hash::Hash;
4
5use timely::dataflow::*;
6
7use crate::{VecCollection, ExchangeData};
8use crate::lattice::Lattice;
9use crate::difference::{Abelian, Multiply};
10
11pub fn propagate<G, N, L, R>(edges: VecCollection<G, (N,N), R>, nodes: VecCollection<G,(N,L),R>) -> VecCollection<G,(N,L),R>
17where
18 G: Scope<Timestamp: Lattice+Ord+Hash>,
19 N: ExchangeData+Hash,
20 R: ExchangeData+Abelian,
21 R: Multiply<R, Output=R>,
22 R: From<i8>,
23 L: ExchangeData,
24{
25 propagate_core(edges.arrange_by_key(), nodes, |_label| 0)
26}
27
28pub fn propagate_at<G, N, L, F, R>(edges: VecCollection<G, (N,N), R>, nodes: VecCollection<G,(N,L),R>, logic: F) -> VecCollection<G,(N,L),R>
34where
35 G: Scope<Timestamp: Lattice+Ord+Hash>,
36 N: ExchangeData+Hash,
37 R: ExchangeData+Abelian,
38 R: Multiply<R, Output=R>,
39 R: From<i8>,
40 L: ExchangeData,
41 F: Fn(&L)->u64+Clone+'static,
42{
43 propagate_core(edges.arrange_by_key(), nodes, logic)
44}
45
46use crate::trace::TraceReader;
47use crate::operators::arrange::arrangement::Arranged;
48
49pub fn propagate_core<G, N, L, Tr, F, R>(edges: Arranged<G,Tr>, nodes: VecCollection<G,(N,L),R>, logic: F) -> VecCollection<G,(N,L),R>
55where
56 G: Scope<Timestamp=Tr::Time>,
57 N: ExchangeData+Hash,
58 R: ExchangeData+Abelian,
59 R: Multiply<R, Output=R>,
60 R: From<i8>,
61 L: ExchangeData,
62 Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Time:Hash, Diff=R>+Clone+'static,
63 F: Fn(&L)->u64+Clone+'static,
64{
65 nodes.scope().iterative::<usize,_,_>(|scope| {
81
82 use crate::operators::iterate::Variable;
83 use crate::trace::implementations::{ValBuilder, ValSpine};
84
85 use timely::order::Product;
86
87 let edges = edges.enter(scope);
88 let nodes = nodes.enter_at(scope, move |r| 256 * (64 - (logic(&r.1)).leading_zeros() as usize));
89
90 let (proposals_bind, proposals) = Variable::new(scope, Product::new(Default::default(), 1usize));
91
92 let labels =
93 proposals
94 .concat(nodes)
95 .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1_i8))));
96
97 let propagate: VecCollection<_, (N, L), R> =
98 labels
99 .clone()
100 .join_core(edges, |_k, l: &L, d| Some((d.clone(), l.clone())));
101
102 proposals_bind.set(propagate);
103
104 labels
105 .as_collection(|k,v| (k.clone(), v.clone()))
106 .leave()
107 })
108}