datafusion_functions_aggregate_common/aggregate/
groups_accumulator.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Utilities for implementing GroupsAccumulator
19//! Adapter that makes [`GroupsAccumulator`] out of [`Accumulator`]
20
21pub mod accumulate;
22pub mod bool_op;
23pub mod nulls;
24pub mod prim_op;
25
26use std::mem::{size_of, size_of_val};
27
28use arrow::array::new_empty_array;
29use arrow::{
30    array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray},
31    compute,
32    compute::take_arrays,
33    datatypes::UInt32Type,
34};
35use datafusion_common::{arrow_datafusion_err, DataFusionError, Result, ScalarValue};
36use datafusion_expr_common::accumulator::Accumulator;
37use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator};
38
39/// An adapter that implements [`GroupsAccumulator`] for any [`Accumulator`]
40///
41/// While [`Accumulator`] are simpler to implement and can support
42/// more general calculations (like retractable window functions),
43/// they are not as fast as a specialized `GroupsAccumulator`. This
44/// interface bridges the gap so the group by operator only operates
45/// in terms of [`Accumulator`].
46///
47/// Internally, this adapter creates a new [`Accumulator`] for each group which
48/// stores the state for that group. This both requires an allocation for each
49/// Accumulator, internal indices, as well as whatever internal allocations the
50/// Accumulator itself requires.
51///
52/// For example, a `MinAccumulator` that computes the minimum string value with
53/// a [`ScalarValue::Utf8`]. That will require at least two allocations per group
54/// (one for the `MinAccumulator` and one for the `ScalarValue::Utf8`).
55///
56/// ```text
57///                       ┌─────────────────────────────────┐
58///                       │MinAccumulator {                 │
59///                ┌─────▶│ min: ScalarValue::Utf8("A")     │───────┐
60///                │      │}                                │       │
61///                │      └─────────────────────────────────┘       └───────▶   "A"
62///    ┌─────┐     │      ┌─────────────────────────────────┐
63///    │  0  │─────┘      │MinAccumulator {                 │
64///    ├─────┤     ┌─────▶│ min: ScalarValue::Utf8("Z")     │───────────────▶   "Z"
65///    │  1  │─────┘      │}                                │
66///    └─────┘            └─────────────────────────────────┘                   ...
67///      ...                 ...
68///    ┌─────┐            ┌────────────────────────────────┐
69///    │ N-2 │            │MinAccumulator {                │
70///    ├─────┤            │  min: ScalarValue::Utf8("A")   │────────────────▶   "A"
71///    │ N-1 │─────┐      │}                               │
72///    └─────┘     │      └────────────────────────────────┘
73///                │      ┌────────────────────────────────┐        ┌───────▶   "Q"
74///                │      │MinAccumulator {                │        │
75///                └─────▶│  min: ScalarValue::Utf8("Q")   │────────┘
76///                       │}                               │
77///                       └────────────────────────────────┘
78///
79///
80///  Logical group         Current Min/Max value for that group stored
81///     number             as a ScalarValue which points to an
82///                        individually allocated String
83///
84///```
85///
86/// # Optimizations
87///
88/// The adapter minimizes the number of calls to [`Accumulator::update_batch`]
89/// by first collecting the input rows for each group into a contiguous array
90/// using [`compute::take`]
91///
92pub struct GroupsAccumulatorAdapter {
93    factory: Box<dyn Fn() -> Result<Box<dyn Accumulator>> + Send>,
94
95    /// state for each group, stored in group_index order
96    states: Vec<AccumulatorState>,
97
98    /// Current memory usage, in bytes.
99    ///
100    /// Note this is incrementally updated with deltas to avoid the
101    /// call to size() being a bottleneck. We saw size() being a
102    /// bottleneck in earlier implementations when there were many
103    /// distinct groups.
104    allocation_bytes: usize,
105}
106
107struct AccumulatorState {
108    /// [`Accumulator`] that stores the per-group state
109    accumulator: Box<dyn Accumulator>,
110
111    /// scratch space: indexes in the input array that will be fed to
112    /// this accumulator. Stores indexes as `u32` to match the arrow
113    /// `take` kernel input.
114    indices: Vec<u32>,
115}
116
117impl AccumulatorState {
118    fn new(accumulator: Box<dyn Accumulator>) -> Self {
119        Self {
120            accumulator,
121            indices: vec![],
122        }
123    }
124
125    /// Returns the amount of memory taken by this structure and its accumulator
126    fn size(&self) -> usize {
127        self.accumulator.size() + size_of_val(self) + self.indices.allocated_size()
128    }
129}
130
131impl GroupsAccumulatorAdapter {
132    /// Create a new adapter that will create a new [`Accumulator`]
133    /// for each group, using the specified factory function
134    pub fn new<F>(factory: F) -> Self
135    where
136        F: Fn() -> Result<Box<dyn Accumulator>> + Send + 'static,
137    {
138        Self {
139            factory: Box::new(factory),
140            states: vec![],
141            allocation_bytes: 0,
142        }
143    }
144
145    /// Ensure that self.accumulators has total_num_groups
146    fn make_accumulators_if_needed(&mut self, total_num_groups: usize) -> Result<()> {
147        // can't shrink
148        assert!(total_num_groups >= self.states.len());
149        let vec_size_pre = self.states.allocated_size();
150
151        // instantiate new accumulators
152        let new_accumulators = total_num_groups - self.states.len();
153        for _ in 0..new_accumulators {
154            let accumulator = (self.factory)()?;
155            let state = AccumulatorState::new(accumulator);
156            self.add_allocation(state.size());
157            self.states.push(state);
158        }
159
160        self.adjust_allocation(vec_size_pre, self.states.allocated_size());
161        Ok(())
162    }
163
164    /// invokes f(accumulator, values) for each group that has values
165    /// in group_indices.
166    ///
167    /// This function first reorders the input and filter so that
168    /// values for each group_index are contiguous and then invokes f
169    /// on the contiguous ranges, to minimize per-row overhead
170    ///
171    /// ```text
172    /// ┌─────────┐   ┌─────────┐   ┌ ─ ─ ─ ─ ┐                       ┌─────────┐   ┌ ─ ─ ─ ─ ┐
173    /// │ ┌─────┐ │   │ ┌─────┐ │     ┌─────┐              ┏━━━━━┓    │ ┌─────┐ │     ┌─────┐
174    /// │ │  2  │ │   │ │ 200 │ │   │ │  t  │ │            ┃  0  ┃    │ │ 200 │ │   │ │  t  │ │
175    /// │ ├─────┤ │   │ ├─────┤ │     ├─────┤              ┣━━━━━┫    │ ├─────┤ │     ├─────┤
176    /// │ │  2  │ │   │ │ 100 │ │   │ │  f  │ │            ┃  0  ┃    │ │ 300 │ │   │ │  t  │ │
177    /// │ ├─────┤ │   │ ├─────┤ │     ├─────┤              ┣━━━━━┫    │ ├─────┤ │     ├─────┤
178    /// │ │  0  │ │   │ │ 200 │ │   │ │  t  │ │            ┃  1  ┃    │ │ 200 │ │   │ │NULL │ │
179    /// │ ├─────┤ │   │ ├─────┤ │     ├─────┤   ────────▶  ┣━━━━━┫    │ ├─────┤ │     ├─────┤
180    /// │ │  1  │ │   │ │ 200 │ │   │ │NULL │ │            ┃  2  ┃    │ │ 200 │ │   │ │  t  │ │
181    /// │ ├─────┤ │   │ ├─────┤ │     ├─────┤              ┣━━━━━┫    │ ├─────┤ │     ├─────┤
182    /// │ │  0  │ │   │ │ 300 │ │   │ │  t  │ │            ┃  2  ┃    │ │ 100 │ │   │ │  f  │ │
183    /// │ └─────┘ │   │ └─────┘ │     └─────┘              ┗━━━━━┛    │ └─────┘ │     └─────┘
184    /// └─────────┘   └─────────┘   └ ─ ─ ─ ─ ┘                       └─────────┘   └ ─ ─ ─ ─ ┘
185    ///
186    /// logical group   values      opt_filter           logical group  values       opt_filter
187    ///
188    /// ```
189    fn invoke_per_accumulator<F>(
190        &mut self,
191        values: &[ArrayRef],
192        group_indices: &[usize],
193        opt_filter: Option<&BooleanArray>,
194        total_num_groups: usize,
195        f: F,
196    ) -> Result<()>
197    where
198        F: Fn(&mut dyn Accumulator, &[ArrayRef]) -> Result<()>,
199    {
200        self.make_accumulators_if_needed(total_num_groups)?;
201
202        assert_eq!(values[0].len(), group_indices.len());
203
204        // figure out which input rows correspond to which groups.
205        // Note that self.state.indices starts empty for all groups
206        // (it is cleared out below)
207        for (idx, group_index) in group_indices.iter().enumerate() {
208            self.states[*group_index].indices.push(idx as u32);
209        }
210
211        // groups_with_rows holds a list of group indexes that have
212        // any rows that need to be accumulated, stored in order of
213        // group_index
214
215        let mut groups_with_rows = vec![];
216
217        // batch_indices holds indices into values, each group is contiguous
218        let mut batch_indices = vec![];
219
220        // offsets[i] is index into batch_indices where the rows for
221        // group_index i starts
222        let mut offsets = vec![0];
223
224        let mut offset_so_far = 0;
225        for (group_index, state) in self.states.iter_mut().enumerate() {
226            let indices = &state.indices;
227            if indices.is_empty() {
228                continue;
229            }
230
231            groups_with_rows.push(group_index);
232            batch_indices.extend_from_slice(indices);
233            offset_so_far += indices.len();
234            offsets.push(offset_so_far);
235        }
236        let batch_indices = batch_indices.into();
237
238        // reorder the values and opt_filter by batch_indices so that
239        // all values for each group are contiguous, then invoke the
240        // accumulator once per group with values
241        let values = take_arrays(values, &batch_indices, None)?;
242        let opt_filter = get_filter_at_indices(opt_filter, &batch_indices)?;
243
244        // invoke each accumulator with the appropriate rows, first
245        // pulling the input arguments for this group into their own
246        // RecordBatch(es)
247        let iter = groups_with_rows.iter().zip(offsets.windows(2));
248
249        let mut sizes_pre = 0;
250        let mut sizes_post = 0;
251        for (&group_idx, offsets) in iter {
252            let state = &mut self.states[group_idx];
253            sizes_pre += state.size();
254
255            let values_to_accumulate = slice_and_maybe_filter(
256                &values,
257                opt_filter.as_ref().map(|f| f.as_boolean()),
258                offsets,
259            )?;
260            f(state.accumulator.as_mut(), &values_to_accumulate)?;
261
262            // clear out the state so they are empty for next
263            // iteration
264            state.indices.clear();
265            sizes_post += state.size();
266        }
267
268        self.adjust_allocation(sizes_pre, sizes_post);
269        Ok(())
270    }
271
272    /// Increment the allocation by `n`
273    ///
274    /// See [`Self::allocation_bytes`] for rationale.
275    fn add_allocation(&mut self, size: usize) {
276        self.allocation_bytes += size;
277    }
278
279    /// Decrease the allocation by `n`
280    ///
281    /// See [`Self::allocation_bytes`] for rationale.
282    fn free_allocation(&mut self, size: usize) {
283        // use saturating sub to avoid errors if the accumulators
284        // report erroneous sizes
285        self.allocation_bytes = self.allocation_bytes.saturating_sub(size)
286    }
287
288    /// Adjusts the allocation for something that started with
289    /// start_size and now has new_size avoiding overflow
290    ///
291    /// See [`Self::allocation_bytes`] for rationale.
292    fn adjust_allocation(&mut self, old_size: usize, new_size: usize) {
293        if new_size > old_size {
294            self.add_allocation(new_size - old_size)
295        } else {
296            self.free_allocation(old_size - new_size)
297        }
298    }
299}
300
301impl GroupsAccumulator for GroupsAccumulatorAdapter {
302    fn update_batch(
303        &mut self,
304        values: &[ArrayRef],
305        group_indices: &[usize],
306        opt_filter: Option<&BooleanArray>,
307        total_num_groups: usize,
308    ) -> Result<()> {
309        self.invoke_per_accumulator(
310            values,
311            group_indices,
312            opt_filter,
313            total_num_groups,
314            |accumulator, values_to_accumulate| {
315                accumulator.update_batch(values_to_accumulate)
316            },
317        )?;
318        Ok(())
319    }
320
321    fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
322        let vec_size_pre = self.states.allocated_size();
323
324        let states = emit_to.take_needed(&mut self.states);
325
326        let results: Vec<ScalarValue> = states
327            .into_iter()
328            .map(|mut state| {
329                self.free_allocation(state.size());
330                state.accumulator.evaluate()
331            })
332            .collect::<Result<_>>()?;
333
334        let result = ScalarValue::iter_to_array(results);
335
336        self.adjust_allocation(vec_size_pre, self.states.allocated_size());
337
338        result
339    }
340
341    // filtered_null_mask(opt_filter, &values);
342    fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
343        let vec_size_pre = self.states.allocated_size();
344        let states = emit_to.take_needed(&mut self.states);
345
346        // each accumulator produces a potential vector of values
347        // which we need to form into columns
348        let mut results: Vec<Vec<ScalarValue>> = vec![];
349
350        for mut state in states {
351            self.free_allocation(state.size());
352            let accumulator_state = state.accumulator.state()?;
353            results.resize_with(accumulator_state.len(), Vec::new);
354            for (idx, state_val) in accumulator_state.into_iter().enumerate() {
355                results[idx].push(state_val);
356            }
357        }
358
359        // create an array for each intermediate column
360        let arrays = results
361            .into_iter()
362            .map(ScalarValue::iter_to_array)
363            .collect::<Result<Vec<_>>>()?;
364
365        // double check each array has the same length (aka the
366        // accumulator was implemented correctly
367        if let Some(first_col) = arrays.first() {
368            for arr in &arrays {
369                assert_eq!(arr.len(), first_col.len())
370            }
371        }
372        self.adjust_allocation(vec_size_pre, self.states.allocated_size());
373
374        Ok(arrays)
375    }
376
377    fn merge_batch(
378        &mut self,
379        values: &[ArrayRef],
380        group_indices: &[usize],
381        opt_filter: Option<&BooleanArray>,
382        total_num_groups: usize,
383    ) -> Result<()> {
384        self.invoke_per_accumulator(
385            values,
386            group_indices,
387            opt_filter,
388            total_num_groups,
389            |accumulator, values_to_accumulate| {
390                accumulator.merge_batch(values_to_accumulate)?;
391                Ok(())
392            },
393        )?;
394        Ok(())
395    }
396
397    fn size(&self) -> usize {
398        self.allocation_bytes
399    }
400
401    fn convert_to_state(
402        &self,
403        values: &[ArrayRef],
404        opt_filter: Option<&BooleanArray>,
405    ) -> Result<Vec<ArrayRef>> {
406        let num_rows = values[0].len();
407
408        // If there are no rows, return empty arrays
409        if num_rows == 0 {
410            // create empty accumulator to get the state types
411            let empty_state = (self.factory)()?.state()?;
412            let empty_arrays = empty_state
413                .into_iter()
414                .map(|state_val| new_empty_array(&state_val.data_type()))
415                .collect::<Vec<_>>();
416
417            return Ok(empty_arrays);
418        }
419
420        // Each row has its respective group
421        let mut results = vec![];
422        for row_idx in 0..num_rows {
423            // Create the empty accumulator for converting
424            let mut converted_accumulator = (self.factory)()?;
425
426            // Convert row to states
427            let values_to_accumulate =
428                slice_and_maybe_filter(values, opt_filter, &[row_idx, row_idx + 1])?;
429            converted_accumulator.update_batch(&values_to_accumulate)?;
430            let states = converted_accumulator.state()?;
431
432            // Resize results to have enough columns according to the converted states
433            results.resize_with(states.len(), || Vec::with_capacity(num_rows));
434
435            // Add the states to results
436            for (idx, state_val) in states.into_iter().enumerate() {
437                results[idx].push(state_val);
438            }
439        }
440
441        let arrays = results
442            .into_iter()
443            .map(ScalarValue::iter_to_array)
444            .collect::<Result<Vec<_>>>()?;
445
446        Ok(arrays)
447    }
448
449    fn supports_convert_to_state(&self) -> bool {
450        true
451    }
452}
453
454/// Extension trait for [`Vec`] to account for allocations.
455pub trait VecAllocExt {
456    /// Item type.
457    type T;
458    /// Return the amount of memory allocated by this Vec (not
459    /// recursively counting any heap allocations contained within the
460    /// structure). Does not include the size of `self`
461    fn allocated_size(&self) -> usize;
462}
463
464impl<T> VecAllocExt for Vec<T> {
465    type T = T;
466    fn allocated_size(&self) -> usize {
467        size_of::<T>() * self.capacity()
468    }
469}
470
471fn get_filter_at_indices(
472    opt_filter: Option<&BooleanArray>,
473    indices: &PrimitiveArray<UInt32Type>,
474) -> Result<Option<ArrayRef>> {
475    opt_filter
476        .map(|filter| {
477            compute::take(
478                &filter, indices, None, // None: no index check
479            )
480        })
481        .transpose()
482        .map_err(|e| arrow_datafusion_err!(e))
483}
484
485// Copied from physical-plan
486pub(crate) fn slice_and_maybe_filter(
487    aggr_array: &[ArrayRef],
488    filter_opt: Option<&BooleanArray>,
489    offsets: &[usize],
490) -> Result<Vec<ArrayRef>> {
491    let (offset, length) = (offsets[0], offsets[1] - offsets[0]);
492    let sliced_arrays: Vec<ArrayRef> = aggr_array
493        .iter()
494        .map(|array| array.slice(offset, length))
495        .collect();
496
497    if let Some(f) = filter_opt {
498        let filter = f.slice(offset, length);
499
500        sliced_arrays
501            .iter()
502            .map(|array| {
503                compute::filter(&array, &filter).map_err(|e| arrow_datafusion_err!(e))
504            })
505            .collect()
506    } else {
507        Ok(sliced_arrays)
508    }
509}