differential_dataflow/algorithms/graphs/
propagate.rs1use std::hash::Hash;
4
5use timely::dataflow::*;
6
7use crate::{Collection, ExchangeData};
8use crate::lattice::Lattice;
9use crate::difference::{Abelian, Multiply};
10use crate::operators::arrange::arrangement::ArrangeByKey;
11
12pub fn propagate<G, N, L, R>(edges: &Collection<G, (N,N), R>, nodes: &Collection<G,(N,L),R>) -> Collection<G,(N,L),R>
18where
19 G: Scope,
20 G::Timestamp: Lattice+Ord,
21 N: ExchangeData+Hash,
22 R: ExchangeData+Abelian,
23 R: Multiply<R, Output=R>,
24 R: From<i8>,
25 L: ExchangeData,
26{
27 propagate_core(&edges.arrange_by_key(), nodes, |_label| 0)
28}
29
30pub fn propagate_at<G, N, L, F, R>(edges: &Collection<G, (N,N), R>, nodes: &Collection<G,(N,L),R>, logic: F) -> Collection<G,(N,L),R>
36where
37 G: Scope,
38 G::Timestamp: Lattice+Ord,
39 N: ExchangeData+Hash,
40 R: ExchangeData+Abelian,
41 R: Multiply<R, Output=R>,
42 R: From<i8>,
43 L: ExchangeData,
44 F: Fn(&L)->u64+Clone+'static,
45{
46 propagate_core(&edges.arrange_by_key(), nodes, logic)
47}
48
49use crate::trace::TraceReader;
50use crate::operators::arrange::arrangement::Arranged;
51
52pub fn propagate_core<G, N, L, Tr, F, R>(edges: &Arranged<G,Tr>, nodes: &Collection<G,(N,L),R>, logic: F) -> Collection<G,(N,L),R>
58where
59 G: Scope<Timestamp=Tr::Time>,
60 N: ExchangeData+Hash,
61 R: ExchangeData+Abelian,
62 R: Multiply<R, Output=R>,
63 R: From<i8>,
64 L: ExchangeData,
65 Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Diff=R>+Clone+'static,
66 F: Fn(&L)->u64+Clone+'static,
67{
68 nodes.scope().iterative::<usize,_,_>(|scope| {
84
85 use crate::operators::reduce::ReduceCore;
86 use crate::operators::iterate::SemigroupVariable;
87 use crate::trace::implementations::{ValBuilder, ValSpine};
88
89 use timely::order::Product;
90
91 let edges = edges.enter(scope);
92 let nodes = nodes.enter_at(scope, move |r| 256 * (64 - (logic(&r.1)).leading_zeros() as usize));
93
94 let proposals = SemigroupVariable::new(scope, Product::new(Default::default(), 1usize));
95
96 let labels =
97 proposals
98 .concat(&nodes)
99 .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1_i8))));
100
101 let propagate: Collection<_, (N, L), R> =
102 labels
103 .join_core(&edges, |_k, l: &L, d| Some((d.clone(), l.clone())));
104
105 proposals.set(&propagate);
106
107 labels
108 .as_collection(|k,v| (k.clone(), v.clone()))
109 .leave()
110 })
111}