differential_dataflow/algorithms/
prefix_sum.rs1use timely::dataflow::Scope;
4
5use crate::{VecCollection, ExchangeData};
6use crate::lattice::Lattice;
7use crate::operators::*;
8
9pub trait PrefixSum<G: Scope, K, D> {
11 fn prefix_sum<F>(self, zero: D, combine: F) -> Self where F: Fn(&K,&D,&D)->D + 'static;
17
18 fn prefix_sum_at<F>(self, locations: VecCollection<G, (usize, K)>, zero: D, combine: F) -> Self where F: Fn(&K,&D,&D)->D + 'static;
20}
21
22impl<G, K, D> PrefixSum<G, K, D> for VecCollection<G, ((usize, K), D)>
23where
24 G: Scope<Timestamp: Lattice>,
25 K: ExchangeData + ::std::hash::Hash,
26 D: ExchangeData + ::std::hash::Hash,
27{
28 fn prefix_sum<F>(self, zero: D, combine: F) -> Self where F: Fn(&K,&D,&D)->D + 'static {
29 self.clone().prefix_sum_at(self.map(|(x,_)| x), zero, combine)
30 }
31
32 fn prefix_sum_at<F>(self, locations: VecCollection<G, (usize, K)>, zero: D, combine: F) -> Self where F: Fn(&K,&D,&D)->D + 'static {
33
34 let combine1 = ::std::rc::Rc::new(combine);
35 let combine2 = combine1.clone();
36
37 let ranges = aggregate(self.clone(), move |k,x,y| (*combine1)(k,x,y));
38 broadcast(ranges, locations, zero, move |k,x,y| (*combine2)(k,x,y))
39 }
40}
41
42pub fn aggregate<G, K, D, F>(collection: VecCollection<G, ((usize, K), D)>, combine: F) -> VecCollection<G, ((usize, usize, K), D)>
44where
45 G: Scope<Timestamp: Lattice>,
46 K: ExchangeData + ::std::hash::Hash,
47 D: ExchangeData + ::std::hash::Hash,
48 F: Fn(&K,&D,&D)->D + 'static,
49{
50 let unit_ranges = collection.map(|((index, key), data)| ((index, 0, key), data));
52
53 unit_ranges
54 .clone()
55 .iterate(|scope, ranges| {
56
57 let unit_ranges = unit_ranges.enter(&scope);
63 ranges
64 .filter(|&((_pos, log, _), _)| log < 64)
65 .map(|((pos, log, key), data)| ((pos >> 1, log + 1, key), (pos, data)))
66 .reduce(move |&(_pos, _log, ref key), input, output| {
67 let mut result = (input[0].0).1.clone();
68 if input.len() > 1 { result = combine(key, &result, &(input[1].0).1); }
69 output.push((result, 1));
70 })
71 .concat(unit_ranges)
72 })
73}
74
75pub fn broadcast<G, K, D, F>(
77 ranges: VecCollection<G, ((usize, usize, K), D)>,
78 queries: VecCollection<G, (usize, K)>,
79 zero: D,
80 combine: F) -> VecCollection<G, ((usize, K), D)>
81where
82 G: Scope<Timestamp: Lattice + Ord + ::std::fmt::Debug>,
83 K: ExchangeData + ::std::hash::Hash,
84 D: ExchangeData + ::std::hash::Hash,
85 F: Fn(&K,&D,&D)->D + 'static,
86{
87
88 let zero0 = zero.clone();
89 let zero1 = zero.clone();
90 let zero2 = zero.clone();
91
92 let requests =
109 queries
110 .clone()
111 .flat_map(|(idx, key)|
112 (0 .. 64)
113 .filter(move |i| (idx & (1usize << i)) != 0) .map(move |i| ((idx >> i) - 1, i, key.clone())) )
116 .distinct();
117
118 let full_ranges =
120 ranges
121 .semijoin(requests.clone());
122
123 let zero_ranges =
125 full_ranges
126 .clone()
127 .map(move |((idx, log, key), _)| ((idx, log, key), zero0.clone()))
128 .negate()
129 .concat(requests.map(move |(idx, log, key)| ((idx, log, key), zero1.clone())));
130
131 let used_ranges = full_ranges.concat(zero_ranges);
133
134 let init_states =
136 queries
137 .clone()
138 .map(move |(_, key)| ((0, key), zero2.clone()))
139 .distinct();
140
141 init_states
143 .clone()
144 .iterate(|scope, states| {
145 let init_states = init_states.enter(&scope);
146 used_ranges
147 .enter(&scope)
148 .map(|((pos, log, key), data)| ((pos << log, key), (log, data)))
149 .join_map(states, move |&(pos, ref key), &(log, ref data), state|
150 ((pos + (1 << log), key.clone()), combine(key, state, data)))
151 .concat(init_states)
152 .distinct()
153 })
154 .semijoin(queries)
155}