datafusion_expr_common/
groups_accumulator.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Vectorized [`GroupsAccumulator`]
19
20use arrow::array::{ArrayRef, BooleanArray};
21use datafusion_common::{not_impl_err, Result};
22
23/// Describes how many rows should be emitted during grouping.
24#[derive(Debug, Clone, Copy)]
25pub enum EmitTo {
26    /// Emit all groups
27    All,
28    /// Emit only the first `n` groups and shift all existing group
29    /// indexes down by `n`.
30    ///
31    /// For example, if `n=10`, group_index `0, 1, ... 9` are emitted
32    /// and group indexes `10, 11, 12, ...` become `0, 1, 2, ...`.
33    First(usize),
34}
35
36impl EmitTo {
37    /// Removes the number of rows from `v` required to emit the right
38    /// number of rows, returning a `Vec` with elements taken, and the
39    /// remaining values in `v`.
40    ///
41    /// This avoids copying if Self::All
42    pub fn take_needed<T>(&self, v: &mut Vec<T>) -> Vec<T> {
43        match self {
44            Self::All => {
45                // Take the entire vector, leave new (empty) vector
46                std::mem::take(v)
47            }
48            Self::First(n) => {
49                // get end n+1,.. values into t
50                let mut t = v.split_off(*n);
51                // leave n+1,.. in v
52                std::mem::swap(v, &mut t);
53                t
54            }
55        }
56    }
57}
58
59/// `GroupsAccumulator` implements a single aggregate (e.g. AVG) and
60/// stores the state for *all* groups internally.
61///
62/// Logically, a [`GroupsAccumulator`] stores a mapping from each group index to
63/// the state of the aggregate for that group. For example an implementation for
64/// `min` might look like
65///
66/// ```text
67///    ┌─────┐
68///    │  0  │───────────▶   100
69///    ├─────┤
70///    │  1  │───────────▶   200
71///    └─────┘
72///      ...                 ...
73///    ┌─────┐
74///    │ N-2 │───────────▶    50
75///    ├─────┤
76///    │ N-1 │───────────▶   200
77///    └─────┘
78///
79///
80///  Logical group      Current Min
81///     number          value for that
82///                     group
83/// ```
84///
85/// # Notes on Implementing `GroupsAccumulator`
86///
87/// All aggregates must first implement the simpler [`Accumulator`] trait, which
88/// handles state for a single group. Implementing `GroupsAccumulator` is
89/// optional and is harder to implement than `Accumulator`, but can be much
90/// faster for queries with many group values.  See the [Aggregating Millions of
91/// Groups Fast blog] for more background.
92///
93/// [`NullState`] can help keep the state for groups that have not seen any
94/// values and produce the correct output for those groups.
95///
96/// [`NullState`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/struct.NullState.html
97///
98/// # Details
99/// Each group is assigned a `group_index` by the hash table and each
100/// accumulator manages the specific state, one per `group_index`.
101///
102/// `group_index`es are contiguous (there aren't gaps), and thus it is
103/// expected that each `GroupsAccumulator` will use something like `Vec<..>`
104/// to store the group states.
105///
106/// [`Accumulator`]: crate::accumulator::Accumulator
107/// [Aggregating Millions of Groups Fast blog]: https://arrow.apache.org/blog/2023/08/05/datafusion_fast_grouping/
108pub trait GroupsAccumulator: Send {
109    /// Updates the accumulator's state from its arguments, encoded as
110    /// a vector of [`ArrayRef`]s.
111    ///
112    /// * `values`: the input arguments to the accumulator
113    ///
114    /// * `group_indices`: The group indices to which each row in `values` belongs.
115    ///
116    /// * `opt_filter`: if present, only update aggregate state using
117    ///   `values[i]` if `opt_filter[i]` is true
118    ///
119    /// * `total_num_groups`: the number of groups (the largest
120    ///   group_index is thus `total_num_groups - 1`).
121    ///
122    /// Note that subsequent calls to update_batch may have larger
123    /// total_num_groups as new groups are seen.
124    ///
125    /// See [`NullState`] to help keep the state for groups that have not seen any
126    /// values and produce the correct output for those groups.
127    ///
128    /// [`NullState`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/struct.NullState.html
129    fn update_batch(
130        &mut self,
131        values: &[ArrayRef],
132        group_indices: &[usize],
133        opt_filter: Option<&BooleanArray>,
134        total_num_groups: usize,
135    ) -> Result<()>;
136
137    /// Returns the final aggregate value for each group as a single
138    /// `RecordBatch`, resetting the internal state.
139    ///
140    /// The rows returned *must* be in group_index order: The value
141    /// for group_index 0, followed by 1, etc.  Any group_index that
142    /// did not have values, should be null.
143    ///
144    /// For example, a `SUM` accumulator maintains a running sum for
145    /// each group, and `evaluate` will produce that running sum as
146    /// its output for all groups, in group_index order
147    ///
148    /// If `emit_to` is [`EmitTo::All`], the accumulator should
149    /// return all groups and release / reset its internal state
150    /// equivalent to when it was first created.
151    ///
152    /// If `emit_to` is [`EmitTo::First`], only the first `n` groups
153    /// should be emitted and the state for those first groups
154    /// removed. State for the remaining groups must be retained for
155    /// future use. The group_indices on subsequent calls to
156    /// `update_batch` or `merge_batch` will be shifted down by
157    /// `n`. See [`EmitTo::First`] for more details.
158    fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef>;
159
160    /// Returns the intermediate aggregate state for this accumulator,
161    /// used for multi-phase grouping, resetting its internal state.
162    ///
163    /// See [`Accumulator::state`] for more information on multi-phase
164    /// aggregation.
165    ///
166    /// For example, `AVG` might return two arrays: `SUM` and `COUNT`
167    /// but the `MIN` aggregate would just return a single array.
168    ///
169    /// Note more sophisticated internal state can be passed as
170    /// single `StructArray` rather than multiple arrays.
171    ///
172    /// See [`Self::evaluate`] for details on the required output
173    /// order and `emit_to`.
174    ///
175    /// [`Accumulator::state`]: crate::accumulator::Accumulator::state
176    fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>;
177
178    /// Merges intermediate state (the output from [`Self::state`])
179    /// into this accumulator's current state.
180    ///
181    /// For some aggregates (such as `SUM`), `merge_batch` is the same
182    /// as `update_batch`, but for some aggregates (such as `COUNT`,
183    /// where the partial counts must be summed) the operations
184    /// differ. See [`Self::state`] for more details on how state is
185    /// used and merged.
186    ///
187    /// * `values`: arrays produced from previously calling `state` on other accumulators.
188    ///
189    /// Other arguments are the same as for [`Self::update_batch`].
190    fn merge_batch(
191        &mut self,
192        values: &[ArrayRef],
193        group_indices: &[usize],
194        opt_filter: Option<&BooleanArray>,
195        total_num_groups: usize,
196    ) -> Result<()>;
197
198    /// Converts an input batch directly to the intermediate aggregate state.
199    ///
200    /// This is the equivalent of treating each input row as its own group. It
201    /// is invoked when the Partial phase of a multi-phase aggregation is not
202    /// reducing the cardinality enough to warrant spending more effort on
203    /// pre-aggregation (see `Background` section below), and switches to
204    /// passing intermediate state directly on to the next aggregation phase.
205    ///
206    /// Examples:
207    /// * `COUNT`: an array of 1s for each row in the input batch.
208    /// * `SUM/MIN/MAX`: the input values themselves.
209    ///
210    /// # Arguments
211    /// * `values`: the input arguments to the accumulator
212    /// * `opt_filter`: if present, any row where `opt_filter[i]` is false should be ignored
213    ///
214    /// # Background
215    ///
216    /// In a multi-phase aggregation (see [`Accumulator::state`]), the initial
217    /// Partial phase reduces the cardinality of the input data as soon as
218    /// possible in the plan.
219    ///
220    /// This strategy is very effective for queries with a small number of
221    /// groups, as most of the data is aggregated immediately and only a small
222    /// amount of data must be repartitioned (see [`Accumulator::state`] for
223    /// background)
224    ///
225    /// However, for queries with a large number of groups, the Partial phase
226    /// often does not reduce the cardinality enough to warrant the memory and
227    /// CPU cost of actually performing the aggregation. For such cases, the
228    /// HashAggregate operator will dynamically switch to passing intermediate
229    /// state directly to the next aggregation phase with minimal processing
230    /// using this method.
231    ///
232    /// [`Accumulator::state`]: crate::accumulator::Accumulator::state
233    fn convert_to_state(
234        &self,
235        _values: &[ArrayRef],
236        _opt_filter: Option<&BooleanArray>,
237    ) -> Result<Vec<ArrayRef>> {
238        not_impl_err!("Input batch conversion to state not implemented")
239    }
240
241    /// Returns `true` if [`Self::convert_to_state`] is implemented to support
242    /// intermediate aggregate state conversion.
243    fn supports_convert_to_state(&self) -> bool {
244        false
245    }
246
247    /// Amount of memory used to store the state of this accumulator,
248    /// in bytes.
249    ///
250    /// This function is called once per batch, so it should be `O(n)` to
251    /// compute, not `O(num_groups)`
252    fn size(&self) -> usize;
253}