differential_dataflow/algorithms/graphs/
propagate.rs1use std::hash::Hash;
4
5use timely::progress::Timestamp;
6
7use crate::{VecCollection, ExchangeData};
8use crate::lattice::Lattice;
9use crate::difference::{Abelian, Multiply};
10
11pub fn propagate<'scope, T, N, L, R>(edges: VecCollection<'scope, T, (N,N), R>, nodes: VecCollection<'scope, T,(N,L),R>) -> VecCollection<'scope, T,(N,L),R>
17where
18 T: Timestamp + Lattice + Ord + Hash,
19 N: ExchangeData+Hash,
20 R: ExchangeData+Abelian,
21 R: Multiply<R, Output=R>,
22 R: From<i8>,
23 L: ExchangeData,
24{
25 propagate_core(edges.arrange_by_key(), nodes, |_label| 0)
26}
27
28pub fn propagate_at<'scope, T, N, L, F, R>(edges: VecCollection<'scope, T, (N,N), R>, nodes: VecCollection<'scope, T,(N,L),R>, logic: F) -> VecCollection<'scope, T,(N,L),R>
34where
35 T: Timestamp + Lattice + Ord + Hash,
36 N: ExchangeData+Hash,
37 R: ExchangeData+Abelian,
38 R: Multiply<R, Output=R>,
39 R: From<i8>,
40 L: ExchangeData,
41 F: Fn(&L)->u64+Clone+'static,
42{
43 propagate_core(edges.arrange_by_key(), nodes, logic)
44}
45
46use crate::trace::TraceReader;
47use crate::operators::arrange::arrangement::Arranged;
48
49pub fn propagate_core<'scope, N, L, Tr, F, R>(edges: Arranged<'scope, Tr>, nodes: VecCollection<'scope, Tr::Time,(N,L),R>, logic: F) -> VecCollection<'scope, Tr::Time,(N,L),R>
55where
56 N: ExchangeData+Hash,
57 R: ExchangeData+Abelian,
58 R: Multiply<R, Output=R>,
59 R: From<i8>,
60 L: ExchangeData,
61 Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Diff=R, Time: Hash>+Clone+'static,
62 F: Fn(&L)->u64+Clone+'static,
63{
64 use timely::order::Product;
80 let outer = nodes.scope();
81 outer.scoped::<Product<_, usize>,_,_>("Propagate", |scope| {
82
83 use crate::operators::iterate::Variable;
84 use crate::trace::implementations::{ValBuilder, ValSpine};
85
86 use timely::order::Product;
87
88 let edges = edges.enter(scope);
89 let nodes = nodes.enter_at(scope, move |r| 256 * (64 - (logic(&r.1)).leading_zeros() as usize));
90
91 let (proposals_bind, proposals) = Variable::new(scope, Product::new(Default::default(), 1usize));
92
93 let labels =
94 proposals
95 .concat(nodes)
96 .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1_i8))));
97
98 let propagate: VecCollection<_, (N, L), R> =
99 labels
100 .clone()
101 .join_core(edges, |_k, l: &L, d| Some((d.clone(), l.clone())));
102
103 proposals_bind.set(propagate);
104
105 labels
106 .as_collection(|k,v| (k.clone(), v.clone()))
107 .leave(outer)
108 })
109}