differential_dataflow/algorithms/
prefix_sum.rs1use timely::progress::Timestamp;
4
5use crate::{VecCollection, ExchangeData};
6use crate::lattice::Lattice;
7use crate::operators::*;
8
9pub trait PrefixSum<'scope, T: Timestamp, K, D> {
11 fn prefix_sum<F>(self, zero: D, combine: F) -> Self where F: Fn(&K,&D,&D)->D + 'static;
17
18 fn prefix_sum_at<F>(self, locations: VecCollection<'scope, T, (usize, K)>, zero: D, combine: F) -> Self where F: Fn(&K,&D,&D)->D + 'static;
20}
21
22impl<'scope, T, K, D> PrefixSum<'scope, T, K, D> for VecCollection<'scope, T, ((usize, K), D)>
23where
24 T: Timestamp + Lattice,
25 K: ExchangeData + ::std::hash::Hash,
26 D: ExchangeData + ::std::hash::Hash,
27{
28 fn prefix_sum<F>(self, zero: D, combine: F) -> Self where F: Fn(&K,&D,&D)->D + 'static {
29 self.clone().prefix_sum_at(self.map(|(x,_)| x), zero, combine)
30 }
31
32 fn prefix_sum_at<F>(self, locations: VecCollection<'scope, T, (usize, K)>, zero: D, combine: F) -> Self where F: Fn(&K,&D,&D)->D + 'static {
33
34 let combine1 = ::std::rc::Rc::new(combine);
35 let combine2 = combine1.clone();
36
37 let ranges = aggregate(self.clone(), move |k,x,y| (*combine1)(k,x,y));
38 broadcast(ranges, locations, zero, move |k,x,y| (*combine2)(k,x,y))
39 }
40}
41
42pub fn aggregate<'scope, T, K, D, F>(collection: VecCollection<'scope, T, ((usize, K), D)>, combine: F) -> VecCollection<'scope, T, ((usize, usize, K), D)>
44where
45 T: Timestamp + Lattice,
46 K: ExchangeData + ::std::hash::Hash,
47 D: ExchangeData + ::std::hash::Hash,
48 F: Fn(&K,&D,&D)->D + 'static,
49{
50 let unit_ranges = collection.map(|((index, key), data)| ((index, 0, key), data));
52
53 unit_ranges
54 .clone()
55 .iterate(|scope, ranges| {
56
57 let unit_ranges = unit_ranges.enter(scope);
63 ranges
64 .filter(|&((_pos, log, _), _)| log < 64)
65 .map(|((pos, log, key), data)| ((pos >> 1, log + 1, key), (pos, data)))
66 .reduce(move |&(_pos, _log, ref key), input, output| {
67 let mut result = (input[0].0).1.clone();
68 if input.len() > 1 { result = combine(key, &result, &(input[1].0).1); }
69 output.push((result, 1));
70 })
71 .concat(unit_ranges)
72 })
73}
74
75pub fn broadcast<'scope, T, K, D, F>(
77 ranges: VecCollection<'scope, T, ((usize, usize, K), D)>,
78 queries: VecCollection<'scope, T, (usize, K)>,
79 zero: D,
80 combine: F) -> VecCollection<'scope, T, ((usize, K), D)>
81where
82 T: Timestamp + Lattice + Ord + ::std::fmt::Debug,
83 K: ExchangeData + ::std::hash::Hash,
84 D: ExchangeData + ::std::hash::Hash,
85 F: Fn(&K,&D,&D)->D + 'static,
86{
87 let zero0 = zero.clone();
88 let zero1 = zero.clone();
89 let zero2 = zero.clone();
90
91 let requests =
108 queries
109 .clone()
110 .flat_map(|(idx, key)|
111 (0 .. 64)
112 .filter(move |i| (idx & (1usize << i)) != 0) .map(move |i| ((idx >> i) - 1, i, key.clone())) )
115 .distinct();
116
117 let full_ranges =
119 ranges
120 .semijoin(requests.clone());
121
122 let zero_ranges =
124 full_ranges
125 .clone()
126 .map(move |((idx, log, key), _)| ((idx, log, key), zero0.clone()))
127 .negate()
128 .concat(requests.map(move |(idx, log, key)| ((idx, log, key), zero1.clone())));
129
130 let used_ranges = full_ranges.concat(zero_ranges);
132
133 let init_states =
135 queries
136 .clone()
137 .map(move |(_, key)| ((0, key), zero2.clone()))
138 .distinct();
139
140 init_states
142 .clone()
143 .iterate(|scope, states| {
144 let init_states = init_states.enter(scope);
145 used_ranges
146 .enter(scope)
147 .map(|((pos, log, key), data)| ((pos << log, key), (log, data)))
148 .join_map(states, move |&(pos, ref key), &(log, ref data), state|
149 ((pos + (1 << log), key.clone()), combine(key, state, data)))
150 .concat(init_states)
151 .distinct()
152 })
153 .semijoin(queries)
154}