declarative_dataflow/plan/
aggregate_neu.rs

1//! Aggregate expression plan.
2
3use timely::dataflow::scopes::child::Iterative;
4use timely::dataflow::Scope;
5use timely::progress::Timestamp;
6
7use differential_dataflow::difference::DiffPair;
8use differential_dataflow::lattice::Lattice;
9use differential_dataflow::operators::Join as JoinMap;
10use differential_dataflow::operators::{Count, Reduce};
11
12use crate::binding::{AsBinding, Binding};
13use crate::plan::{Dependencies, ImplContext, Implementable};
14use crate::{CollectionRelation, Implemented, Relation, ShutdownHandle, Value, Var, VariableMap};
15
16use num_rational::{Ratio, Rational32};
17
18/// Permitted aggregation function.
19#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
20pub enum AggregationFn {
21    /// Minimum
22    MIN,
23    /// Maximum
24    MAX,
25    /// MEDIAN
26    MEDIAN,
27    /// Count
28    COUNT,
29    /// Sum
30    SUM,
31    /// Average
32    AVG,
33    /// Variance
34    VARIANCE,
35    // /// Standard deviation
36    // STDDEV,
37}
38
39/// [WIP] A plan stage applying the specified aggregation functions to
40/// bindings for the specified variables. Given multiple aggregations
41/// we iterate and n-1 joins are applied to the results.
42#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
43pub struct Aggregate<P: Implementable> {
44    /// TODO
45    pub variables: Vec<Var>,
46    /// Plan for the data source.
47    pub plan: Box<P>,
48    /// Logical predicate to apply.
49    pub aggregation_fns: Vec<AggregationFn>,
50    /// Relation variables that determine the grouping.
51    pub key_variables: Vec<Var>,
52    /// Aggregation variables
53    pub aggregation_variables: Vec<Var>,
54    /// With variables
55    pub with_variables: Vec<Var>,
56}
57
58impl<P: Implementable> Implementable for Aggregate<P> {
59    fn dependencies(&self) -> Dependencies {
60        self.plan.dependencies()
61    }
62
63    fn into_bindings(&self) -> Vec<Binding> {
64        self.plan.into_bindings()
65    }
66
67    fn implement<'b, T, I, S>(
68        &self,
69        nested: &mut Iterative<'b, S, u64>,
70        local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
71        context: &mut I,
72    ) -> (Implemented<'b, S>, ShutdownHandle)
73    where
74        T: Timestamp + Lattice,
75        I: ImplContext<T>,
76        S: Scope<Timestamp = T>,
77    {
78        let (relation, mut shutdown_handle) =
79            self.plan.implement(nested, local_arrangements, context);
80
81        // We split the incoming tuples into their (key, value) parts.
82        let tuples = {
83            let (tuples, shutdown) =
84                relation.tuples_by_variables(nested, context, &self.key_variables);
85            shutdown_handle.merge_with(shutdown);
86            tuples
87        };
88
89        // For each aggregation function that is to be applied, we
90        // need to determine the index (into the value part of each
91        // tuple) at which its argument is to be found.
92
93        let mut value_offsets = Vec::new();
94        let mut seen = Vec::new();
95
96        for variable in self.aggregation_variables.iter() {
97            if !seen.contains(variable) {
98                seen.push(*variable);
99                value_offsets.push(seen.len() - 1);
100            } else {
101                value_offsets.push(AsBinding::binds(&seen, *variable).unwrap());
102            }
103        }
104
105        // Users can specify weird find clauses like [:find ?key1 (min ?v1) ?key2]
106        // and we would like to avoid an extra projection. Thus, we pre-compute
107        // the correct output offset for each aggregation.
108
109        let mut variables = self.variables.clone();
110        let mut output_offsets = Vec::new();
111
112        for variable in self.aggregation_variables.iter() {
113            let output_index = AsBinding::binds(&variables, *variable).unwrap();
114            output_offsets.push(output_index);
115
116            variables[output_index] = 0;
117        }
118
119        let mut collections = Vec::new();
120
121        // We iterate over all aggregations and keep track of the
122        // resulting collections, s.t. they can be joined afterwards.
123        for (i, aggregation_fn) in self.aggregation_fns.iter().enumerate() {
124            let value_offset = value_offsets[i];
125            let with_length = self.with_variables.len();
126
127            // Access the right value for the given iteration loop and extend possible with-values.
128            let prepare_unary = move |(key, tuple): (Vec<Value>, Vec<Value>)| {
129                let value = &tuple[value_offset];
130                let mut v = vec![value.clone()];
131
132                // With-variables are always the last elements in the
133                // value part of each tuple, given they are specified.
134                // We append these, s.t. we consolidate correctly.
135                if with_length > 0 {
136                    v.extend(tuple.iter().rev().take(with_length).cloned());
137                }
138
139                (key, v)
140            };
141
142            match aggregation_fn {
143                AggregationFn::MIN => {
144                    let tuples = tuples.map(prepare_unary).reduce(|_key, vals, output| {
145                        let min = &vals[0].0[0];
146                        output.push((vec![min.clone()], 1));
147                    });
148                    collections.push(tuples);
149                }
150                AggregationFn::MAX => {
151                    let tuples = tuples.map(prepare_unary).reduce(|_key, vals, output| {
152                        let max = &vals[vals.len() - 1].0[0];
153                        output.push((vec![max.clone()], 1));
154                    });
155                    collections.push(tuples);
156                }
157                AggregationFn::MEDIAN => {
158                    let tuples = tuples.map(prepare_unary).reduce(|_key, vals, output| {
159                        let median = &vals[vals.len() / 2].0[0];
160                        output.push((vec![median.clone()], 1));
161                    });
162                    collections.push(tuples);
163                }
164                AggregationFn::COUNT => {
165                    let tuples = tuples.map(prepare_unary).reduce(|_key, input, output| {
166                        let mut total_count = 0;
167                        for (_, count) in input.iter() {
168                            total_count += count;
169                        }
170
171                        output.push((vec![Value::Number(total_count as i64)], 1))
172                    });
173                    collections.push(tuples);
174                }
175                AggregationFn::SUM => {
176                    let tuples = tuples
177                        .map(prepare_unary)
178                        .explode(|(key, val)| {
179                            let v = match val[0] {
180                                Value::Number(num) => num,
181                                _ => panic!("SUM can only be applied on type Number."),
182                            };
183                            Some((key, v as isize))
184                        })
185                        .count()
186                        .map(move |(key, count)| (key, vec![Value::Number(count as i64)]));
187                    collections.push(tuples);
188                }
189                AggregationFn::AVG => {
190                    let tuples = tuples
191                        .map(prepare_unary)
192                        .explode(move |(key, val)| {
193                            let v = match val[0] {
194                                Value::Number(num) => num,
195                                _ => panic!("AVG can only be applied on type Number."),
196                            };
197                            Some((key, DiffPair::new(v as isize, 1)))
198                        })
199                        .count()
200                        .map(move |(key, diff_pair)| {
201                            (
202                                key,
203                                vec![Value::Rational32(Ratio::new(
204                                    diff_pair.element1 as i32,
205                                    diff_pair.element2 as i32,
206                                ))],
207                            )
208                        });
209                    collections.push(tuples);
210                }
211                AggregationFn::VARIANCE => {
212                    let tuples = tuples
213                        .map(prepare_unary)
214                        .explode(move |(key, val)| {
215                            let v = match val[0] {
216                                Value::Number(num) => num,
217                                _ => panic!("VARIANCE can only be applied on type Number."),
218                            };
219                            Some((
220                                key,
221                                DiffPair::new(
222                                    DiffPair::new(v as isize * v as isize, v as isize),
223                                    1,
224                                ),
225                            ))
226                        })
227                        .count()
228                        .map(move |(key, diff_pair)| {
229                            let sum_square = diff_pair.element1.element1 as i32;
230                            let sum = diff_pair.element1.element2 as i32;
231                            let c = diff_pair.element2 as i32;
232                            (
233                                key,
234                                vec![Value::Rational32(
235                                    Rational32::new(sum_square, c) - Rational32::new(sum, c).pow(2),
236                                )],
237                            )
238                        });
239                    collections.push(tuples);
240                }
241            };
242        }
243
244        if collections.len() == 1 {
245            let output_index = output_offsets[0];
246            let relation = CollectionRelation {
247                variables: self.variables.to_vec(),
248                tuples: collections[0].map(move |(key, val)| {
249                    let mut k = key.clone();
250                    let v = val[0].clone();
251                    k.insert(output_index, v);
252                    k
253                }),
254            };
255
256            (Implemented::Collection(relation), shutdown_handle)
257        } else {
258            // @TODO replace this with a join application
259            let left = collections.remove(0);
260            let tuples = collections.iter().fold(left, |coll, next| {
261                coll.join_map(&next, |key, v1, v2| {
262                    let mut val = v1.clone();
263                    val.append(&mut v2.clone());
264                    (key.clone(), val)
265                })
266            });
267
268            let relation = CollectionRelation {
269                variables: self.variables.to_vec(),
270                tuples: tuples.map(move |(key, vals)| {
271                    let mut v = key.clone();
272                    for (i, val) in vals.iter().enumerate() {
273                        v.insert(output_offsets[i], val.clone())
274                    }
275                    v
276                }),
277            };
278
279            (Implemented::Collection(relation), shutdown_handle)
280        }
281    }
282}