declarative_dataflow/plan/
aggregate_neu.rs1use timely::dataflow::scopes::child::Iterative;
4use timely::dataflow::Scope;
5use timely::progress::Timestamp;
6
7use differential_dataflow::difference::DiffPair;
8use differential_dataflow::lattice::Lattice;
9use differential_dataflow::operators::Join as JoinMap;
10use differential_dataflow::operators::{Count, Reduce};
11
12use crate::binding::{AsBinding, Binding};
13use crate::plan::{Dependencies, ImplContext, Implementable};
14use crate::{CollectionRelation, Implemented, Relation, ShutdownHandle, Value, Var, VariableMap};
15
16use num_rational::{Ratio, Rational32};
17
18#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
20pub enum AggregationFn {
21 MIN,
23 MAX,
25 MEDIAN,
27 COUNT,
29 SUM,
31 AVG,
33 VARIANCE,
35 }
38
39#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
43pub struct Aggregate<P: Implementable> {
44 pub variables: Vec<Var>,
46 pub plan: Box<P>,
48 pub aggregation_fns: Vec<AggregationFn>,
50 pub key_variables: Vec<Var>,
52 pub aggregation_variables: Vec<Var>,
54 pub with_variables: Vec<Var>,
56}
57
58impl<P: Implementable> Implementable for Aggregate<P> {
59 fn dependencies(&self) -> Dependencies {
60 self.plan.dependencies()
61 }
62
63 fn into_bindings(&self) -> Vec<Binding> {
64 self.plan.into_bindings()
65 }
66
67 fn implement<'b, T, I, S>(
68 &self,
69 nested: &mut Iterative<'b, S, u64>,
70 local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
71 context: &mut I,
72 ) -> (Implemented<'b, S>, ShutdownHandle)
73 where
74 T: Timestamp + Lattice,
75 I: ImplContext<T>,
76 S: Scope<Timestamp = T>,
77 {
78 let (relation, mut shutdown_handle) =
79 self.plan.implement(nested, local_arrangements, context);
80
81 let tuples = {
83 let (tuples, shutdown) =
84 relation.tuples_by_variables(nested, context, &self.key_variables);
85 shutdown_handle.merge_with(shutdown);
86 tuples
87 };
88
89 let mut value_offsets = Vec::new();
94 let mut seen = Vec::new();
95
96 for variable in self.aggregation_variables.iter() {
97 if !seen.contains(variable) {
98 seen.push(*variable);
99 value_offsets.push(seen.len() - 1);
100 } else {
101 value_offsets.push(AsBinding::binds(&seen, *variable).unwrap());
102 }
103 }
104
105 let mut variables = self.variables.clone();
110 let mut output_offsets = Vec::new();
111
112 for variable in self.aggregation_variables.iter() {
113 let output_index = AsBinding::binds(&variables, *variable).unwrap();
114 output_offsets.push(output_index);
115
116 variables[output_index] = 0;
117 }
118
119 let mut collections = Vec::new();
120
121 for (i, aggregation_fn) in self.aggregation_fns.iter().enumerate() {
124 let value_offset = value_offsets[i];
125 let with_length = self.with_variables.len();
126
127 let prepare_unary = move |(key, tuple): (Vec<Value>, Vec<Value>)| {
129 let value = &tuple[value_offset];
130 let mut v = vec![value.clone()];
131
132 if with_length > 0 {
136 v.extend(tuple.iter().rev().take(with_length).cloned());
137 }
138
139 (key, v)
140 };
141
142 match aggregation_fn {
143 AggregationFn::MIN => {
144 let tuples = tuples.map(prepare_unary).reduce(|_key, vals, output| {
145 let min = &vals[0].0[0];
146 output.push((vec![min.clone()], 1));
147 });
148 collections.push(tuples);
149 }
150 AggregationFn::MAX => {
151 let tuples = tuples.map(prepare_unary).reduce(|_key, vals, output| {
152 let max = &vals[vals.len() - 1].0[0];
153 output.push((vec![max.clone()], 1));
154 });
155 collections.push(tuples);
156 }
157 AggregationFn::MEDIAN => {
158 let tuples = tuples.map(prepare_unary).reduce(|_key, vals, output| {
159 let median = &vals[vals.len() / 2].0[0];
160 output.push((vec![median.clone()], 1));
161 });
162 collections.push(tuples);
163 }
164 AggregationFn::COUNT => {
165 let tuples = tuples.map(prepare_unary).reduce(|_key, input, output| {
166 let mut total_count = 0;
167 for (_, count) in input.iter() {
168 total_count += count;
169 }
170
171 output.push((vec![Value::Number(total_count as i64)], 1))
172 });
173 collections.push(tuples);
174 }
175 AggregationFn::SUM => {
176 let tuples = tuples
177 .map(prepare_unary)
178 .explode(|(key, val)| {
179 let v = match val[0] {
180 Value::Number(num) => num,
181 _ => panic!("SUM can only be applied on type Number."),
182 };
183 Some((key, v as isize))
184 })
185 .count()
186 .map(move |(key, count)| (key, vec![Value::Number(count as i64)]));
187 collections.push(tuples);
188 }
189 AggregationFn::AVG => {
190 let tuples = tuples
191 .map(prepare_unary)
192 .explode(move |(key, val)| {
193 let v = match val[0] {
194 Value::Number(num) => num,
195 _ => panic!("AVG can only be applied on type Number."),
196 };
197 Some((key, DiffPair::new(v as isize, 1)))
198 })
199 .count()
200 .map(move |(key, diff_pair)| {
201 (
202 key,
203 vec![Value::Rational32(Ratio::new(
204 diff_pair.element1 as i32,
205 diff_pair.element2 as i32,
206 ))],
207 )
208 });
209 collections.push(tuples);
210 }
211 AggregationFn::VARIANCE => {
212 let tuples = tuples
213 .map(prepare_unary)
214 .explode(move |(key, val)| {
215 let v = match val[0] {
216 Value::Number(num) => num,
217 _ => panic!("VARIANCE can only be applied on type Number."),
218 };
219 Some((
220 key,
221 DiffPair::new(
222 DiffPair::new(v as isize * v as isize, v as isize),
223 1,
224 ),
225 ))
226 })
227 .count()
228 .map(move |(key, diff_pair)| {
229 let sum_square = diff_pair.element1.element1 as i32;
230 let sum = diff_pair.element1.element2 as i32;
231 let c = diff_pair.element2 as i32;
232 (
233 key,
234 vec![Value::Rational32(
235 Rational32::new(sum_square, c) - Rational32::new(sum, c).pow(2),
236 )],
237 )
238 });
239 collections.push(tuples);
240 }
241 };
242 }
243
244 if collections.len() == 1 {
245 let output_index = output_offsets[0];
246 let relation = CollectionRelation {
247 variables: self.variables.to_vec(),
248 tuples: collections[0].map(move |(key, val)| {
249 let mut k = key.clone();
250 let v = val[0].clone();
251 k.insert(output_index, v);
252 k
253 }),
254 };
255
256 (Implemented::Collection(relation), shutdown_handle)
257 } else {
258 let left = collections.remove(0);
260 let tuples = collections.iter().fold(left, |coll, next| {
261 coll.join_map(&next, |key, v1, v2| {
262 let mut val = v1.clone();
263 val.append(&mut v2.clone());
264 (key.clone(), val)
265 })
266 });
267
268 let relation = CollectionRelation {
269 variables: self.variables.to_vec(),
270 tuples: tuples.map(move |(key, vals)| {
271 let mut v = key.clone();
272 for (i, val) in vals.iter().enumerate() {
273 v.insert(output_offsets[i], val.clone())
274 }
275 v
276 }),
277 };
278
279 (Implemented::Collection(relation), shutdown_handle)
280 }
281 }
282}