datafusion_functions_aggregate_common/aggregate/
groups_accumulator.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Utilities for implementing GroupsAccumulator
19//! Adapter that makes [`GroupsAccumulator`] out of [`Accumulator`]
20
21pub mod accumulate;
22pub mod bool_op;
23pub mod nulls;
24pub mod prim_op;
25
26use std::mem::{size_of, size_of_val};
27
28use arrow::array::new_empty_array;
29use arrow::{
30    array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray},
31    compute,
32    compute::take_arrays,
33    datatypes::UInt32Type,
34};
35use datafusion_common::{arrow_datafusion_err, DataFusionError, Result, ScalarValue};
36use datafusion_expr_common::accumulator::Accumulator;
37use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator};
38
39/// An adapter that implements [`GroupsAccumulator`] for any [`Accumulator`]
40///
41/// While [`Accumulator`] are simpler to implement and can support
42/// more general calculations (like retractable window functions),
43/// they are not as fast as a specialized `GroupsAccumulator`. This
44/// interface bridges the gap so the group by operator only operates
45/// in terms of [`Accumulator`].
46///
47/// Internally, this adapter creates a new [`Accumulator`] for each group which
48/// stores the state for that group. This both requires an allocation for each
49/// Accumulator, internal indices, as well as whatever internal allocations the
50/// Accumulator itself requires.
51///
52/// For example, a `MinAccumulator` that computes the minimum string value with
53/// a [`ScalarValue::Utf8`]. That will require at least two allocations per group
54/// (one for the `MinAccumulator` and one for the `ScalarValue::Utf8`).
55///
56/// ```text
57///                       ┌─────────────────────────────────┐
58///                       │MinAccumulator {                 │
59///                ┌─────▶│ min: ScalarValue::Utf8("A")     │───────┐
60///                │      │}                                │       │
61///                │      └─────────────────────────────────┘       └───────▶   "A"
62///    ┌─────┐     │      ┌─────────────────────────────────┐
63///    │  0  │─────┘      │MinAccumulator {                 │
64///    ├─────┤     ┌─────▶│ min: ScalarValue::Utf8("Z")     │───────────────▶   "Z"
65///    │  1  │─────┘      │}                                │
66///    └─────┘            └─────────────────────────────────┘                   ...
67///      ...                 ...
68///    ┌─────┐            ┌────────────────────────────────┐
69///    │ N-2 │            │MinAccumulator {                │
70///    ├─────┤            │  min: ScalarValue::Utf8("A")   │────────────────▶   "A"
71///    │ N-1 │─────┐      │}                               │
72///    └─────┘     │      └────────────────────────────────┘
73///                │      ┌────────────────────────────────┐        ┌───────▶   "Q"
74///                │      │MinAccumulator {                │        │
75///                └─────▶│  min: ScalarValue::Utf8("Q")   │────────┘
76///                       │}                               │
77///                       └────────────────────────────────┘
78///
79///
80///  Logical group         Current Min/Max value for that group stored
81///     number             as a ScalarValue which points to an
82///                        individually allocated String
83/// ```
84///
85/// # Optimizations
86///
87/// The adapter minimizes the number of calls to [`Accumulator::update_batch`]
88/// by first collecting the input rows for each group into a contiguous array
89/// using [`compute::take`]
90pub struct GroupsAccumulatorAdapter {
91    factory: Box<dyn Fn() -> Result<Box<dyn Accumulator>> + Send>,
92
93    /// state for each group, stored in group_index order
94    states: Vec<AccumulatorState>,
95
96    /// Current memory usage, in bytes.
97    ///
98    /// Note this is incrementally updated with deltas to avoid the
99    /// call to size() being a bottleneck. We saw size() being a
100    /// bottleneck in earlier implementations when there were many
101    /// distinct groups.
102    allocation_bytes: usize,
103}
104
105struct AccumulatorState {
106    /// [`Accumulator`] that stores the per-group state
107    accumulator: Box<dyn Accumulator>,
108
109    /// scratch space: indexes in the input array that will be fed to
110    /// this accumulator. Stores indexes as `u32` to match the arrow
111    /// `take` kernel input.
112    indices: Vec<u32>,
113}
114
115impl AccumulatorState {
116    fn new(accumulator: Box<dyn Accumulator>) -> Self {
117        Self {
118            accumulator,
119            indices: vec![],
120        }
121    }
122
123    /// Returns the amount of memory taken by this structure and its accumulator
124    fn size(&self) -> usize {
125        self.accumulator.size() + size_of_val(self) + self.indices.allocated_size()
126    }
127}
128
129impl GroupsAccumulatorAdapter {
130    /// Create a new adapter that will create a new [`Accumulator`]
131    /// for each group, using the specified factory function
132    pub fn new<F>(factory: F) -> Self
133    where
134        F: Fn() -> Result<Box<dyn Accumulator>> + Send + 'static,
135    {
136        Self {
137            factory: Box::new(factory),
138            states: vec![],
139            allocation_bytes: 0,
140        }
141    }
142
143    /// Ensure that self.accumulators has total_num_groups
144    fn make_accumulators_if_needed(&mut self, total_num_groups: usize) -> Result<()> {
145        // can't shrink
146        assert!(total_num_groups >= self.states.len());
147        let vec_size_pre = self.states.allocated_size();
148
149        // instantiate new accumulators
150        let new_accumulators = total_num_groups - self.states.len();
151        for _ in 0..new_accumulators {
152            let accumulator = (self.factory)()?;
153            let state = AccumulatorState::new(accumulator);
154            self.add_allocation(state.size());
155            self.states.push(state);
156        }
157
158        self.adjust_allocation(vec_size_pre, self.states.allocated_size());
159        Ok(())
160    }
161
162    /// invokes f(accumulator, values) for each group that has values
163    /// in group_indices.
164    ///
165    /// This function first reorders the input and filter so that
166    /// values for each group_index are contiguous and then invokes f
167    /// on the contiguous ranges, to minimize per-row overhead
168    ///
169    /// ```text
170    /// ┌─────────┐   ┌─────────┐   ┌ ─ ─ ─ ─ ┐                       ┌─────────┐   ┌ ─ ─ ─ ─ ┐
171    /// │ ┌─────┐ │   │ ┌─────┐ │     ┌─────┐              ┏━━━━━┓    │ ┌─────┐ │     ┌─────┐
172    /// │ │  2  │ │   │ │ 200 │ │   │ │  t  │ │            ┃  0  ┃    │ │ 200 │ │   │ │  t  │ │
173    /// │ ├─────┤ │   │ ├─────┤ │     ├─────┤              ┣━━━━━┫    │ ├─────┤ │     ├─────┤
174    /// │ │  2  │ │   │ │ 100 │ │   │ │  f  │ │            ┃  0  ┃    │ │ 300 │ │   │ │  t  │ │
175    /// │ ├─────┤ │   │ ├─────┤ │     ├─────┤              ┣━━━━━┫    │ ├─────┤ │     ├─────┤
176    /// │ │  0  │ │   │ │ 200 │ │   │ │  t  │ │            ┃  1  ┃    │ │ 200 │ │   │ │NULL │ │
177    /// │ ├─────┤ │   │ ├─────┤ │     ├─────┤   ────────▶  ┣━━━━━┫    │ ├─────┤ │     ├─────┤
178    /// │ │  1  │ │   │ │ 200 │ │   │ │NULL │ │            ┃  2  ┃    │ │ 200 │ │   │ │  t  │ │
179    /// │ ├─────┤ │   │ ├─────┤ │     ├─────┤              ┣━━━━━┫    │ ├─────┤ │     ├─────┤
180    /// │ │  0  │ │   │ │ 300 │ │   │ │  t  │ │            ┃  2  ┃    │ │ 100 │ │   │ │  f  │ │
181    /// │ └─────┘ │   │ └─────┘ │     └─────┘              ┗━━━━━┛    │ └─────┘ │     └─────┘
182    /// └─────────┘   └─────────┘   └ ─ ─ ─ ─ ┘                       └─────────┘   └ ─ ─ ─ ─ ┘
183    ///
184    /// logical group   values      opt_filter           logical group  values       opt_filter
185    /// ```
186    fn invoke_per_accumulator<F>(
187        &mut self,
188        values: &[ArrayRef],
189        group_indices: &[usize],
190        opt_filter: Option<&BooleanArray>,
191        total_num_groups: usize,
192        f: F,
193    ) -> Result<()>
194    where
195        F: Fn(&mut dyn Accumulator, &[ArrayRef]) -> Result<()>,
196    {
197        self.make_accumulators_if_needed(total_num_groups)?;
198
199        assert_eq!(values[0].len(), group_indices.len());
200
201        // figure out which input rows correspond to which groups.
202        // Note that self.state.indices starts empty for all groups
203        // (it is cleared out below)
204        for (idx, group_index) in group_indices.iter().enumerate() {
205            self.states[*group_index].indices.push(idx as u32);
206        }
207
208        // groups_with_rows holds a list of group indexes that have
209        // any rows that need to be accumulated, stored in order of
210        // group_index
211
212        let mut groups_with_rows = vec![];
213
214        // batch_indices holds indices into values, each group is contiguous
215        let mut batch_indices = vec![];
216
217        // offsets[i] is index into batch_indices where the rows for
218        // group_index i starts
219        let mut offsets = vec![0];
220
221        let mut offset_so_far = 0;
222        for (group_index, state) in self.states.iter_mut().enumerate() {
223            let indices = &state.indices;
224            if indices.is_empty() {
225                continue;
226            }
227
228            groups_with_rows.push(group_index);
229            batch_indices.extend_from_slice(indices);
230            offset_so_far += indices.len();
231            offsets.push(offset_so_far);
232        }
233        let batch_indices = batch_indices.into();
234
235        // reorder the values and opt_filter by batch_indices so that
236        // all values for each group are contiguous, then invoke the
237        // accumulator once per group with values
238        let values = take_arrays(values, &batch_indices, None)?;
239        let opt_filter = get_filter_at_indices(opt_filter, &batch_indices)?;
240
241        // invoke each accumulator with the appropriate rows, first
242        // pulling the input arguments for this group into their own
243        // RecordBatch(es)
244        let iter = groups_with_rows.iter().zip(offsets.windows(2));
245
246        let mut sizes_pre = 0;
247        let mut sizes_post = 0;
248        for (&group_idx, offsets) in iter {
249            let state = &mut self.states[group_idx];
250            sizes_pre += state.size();
251
252            let values_to_accumulate = slice_and_maybe_filter(
253                &values,
254                opt_filter.as_ref().map(|f| f.as_boolean()),
255                offsets,
256            )?;
257            f(state.accumulator.as_mut(), &values_to_accumulate)?;
258
259            // clear out the state so they are empty for next
260            // iteration
261            state.indices.clear();
262            sizes_post += state.size();
263        }
264
265        self.adjust_allocation(sizes_pre, sizes_post);
266        Ok(())
267    }
268
269    /// Increment the allocation by `n`
270    ///
271    /// See [`Self::allocation_bytes`] for rationale.
272    fn add_allocation(&mut self, size: usize) {
273        self.allocation_bytes += size;
274    }
275
276    /// Decrease the allocation by `n`
277    ///
278    /// See [`Self::allocation_bytes`] for rationale.
279    fn free_allocation(&mut self, size: usize) {
280        // use saturating sub to avoid errors if the accumulators
281        // report erroneous sizes
282        self.allocation_bytes = self.allocation_bytes.saturating_sub(size)
283    }
284
285    /// Adjusts the allocation for something that started with
286    /// start_size and now has new_size avoiding overflow
287    ///
288    /// See [`Self::allocation_bytes`] for rationale.
289    fn adjust_allocation(&mut self, old_size: usize, new_size: usize) {
290        if new_size > old_size {
291            self.add_allocation(new_size - old_size)
292        } else {
293            self.free_allocation(old_size - new_size)
294        }
295    }
296}
297
298impl GroupsAccumulator for GroupsAccumulatorAdapter {
299    fn update_batch(
300        &mut self,
301        values: &[ArrayRef],
302        group_indices: &[usize],
303        opt_filter: Option<&BooleanArray>,
304        total_num_groups: usize,
305    ) -> Result<()> {
306        self.invoke_per_accumulator(
307            values,
308            group_indices,
309            opt_filter,
310            total_num_groups,
311            |accumulator, values_to_accumulate| {
312                accumulator.update_batch(values_to_accumulate)
313            },
314        )?;
315        Ok(())
316    }
317
318    fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
319        let vec_size_pre = self.states.allocated_size();
320
321        let states = emit_to.take_needed(&mut self.states);
322
323        let results: Vec<ScalarValue> = states
324            .into_iter()
325            .map(|mut state| {
326                self.free_allocation(state.size());
327                state.accumulator.evaluate()
328            })
329            .collect::<Result<_>>()?;
330
331        let result = ScalarValue::iter_to_array(results);
332
333        self.adjust_allocation(vec_size_pre, self.states.allocated_size());
334
335        result
336    }
337
338    // filtered_null_mask(opt_filter, &values);
339    fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
340        let vec_size_pre = self.states.allocated_size();
341        let states = emit_to.take_needed(&mut self.states);
342
343        // each accumulator produces a potential vector of values
344        // which we need to form into columns
345        let mut results: Vec<Vec<ScalarValue>> = vec![];
346
347        for mut state in states {
348            self.free_allocation(state.size());
349            let accumulator_state = state.accumulator.state()?;
350            results.resize_with(accumulator_state.len(), Vec::new);
351            for (idx, state_val) in accumulator_state.into_iter().enumerate() {
352                results[idx].push(state_val);
353            }
354        }
355
356        // create an array for each intermediate column
357        let arrays = results
358            .into_iter()
359            .map(ScalarValue::iter_to_array)
360            .collect::<Result<Vec<_>>>()?;
361
362        // double check each array has the same length (aka the
363        // accumulator was implemented correctly
364        if let Some(first_col) = arrays.first() {
365            for arr in &arrays {
366                assert_eq!(arr.len(), first_col.len())
367            }
368        }
369        self.adjust_allocation(vec_size_pre, self.states.allocated_size());
370
371        Ok(arrays)
372    }
373
374    fn merge_batch(
375        &mut self,
376        values: &[ArrayRef],
377        group_indices: &[usize],
378        opt_filter: Option<&BooleanArray>,
379        total_num_groups: usize,
380    ) -> Result<()> {
381        self.invoke_per_accumulator(
382            values,
383            group_indices,
384            opt_filter,
385            total_num_groups,
386            |accumulator, values_to_accumulate| {
387                accumulator.merge_batch(values_to_accumulate)?;
388                Ok(())
389            },
390        )?;
391        Ok(())
392    }
393
394    fn size(&self) -> usize {
395        self.allocation_bytes
396    }
397
398    fn convert_to_state(
399        &self,
400        values: &[ArrayRef],
401        opt_filter: Option<&BooleanArray>,
402    ) -> Result<Vec<ArrayRef>> {
403        let num_rows = values[0].len();
404
405        // If there are no rows, return empty arrays
406        if num_rows == 0 {
407            // create empty accumulator to get the state types
408            let empty_state = (self.factory)()?.state()?;
409            let empty_arrays = empty_state
410                .into_iter()
411                .map(|state_val| new_empty_array(&state_val.data_type()))
412                .collect::<Vec<_>>();
413
414            return Ok(empty_arrays);
415        }
416
417        // Each row has its respective group
418        let mut results = vec![];
419        for row_idx in 0..num_rows {
420            // Create the empty accumulator for converting
421            let mut converted_accumulator = (self.factory)()?;
422
423            // Convert row to states
424            let values_to_accumulate =
425                slice_and_maybe_filter(values, opt_filter, &[row_idx, row_idx + 1])?;
426            converted_accumulator.update_batch(&values_to_accumulate)?;
427            let states = converted_accumulator.state()?;
428
429            // Resize results to have enough columns according to the converted states
430            results.resize_with(states.len(), || Vec::with_capacity(num_rows));
431
432            // Add the states to results
433            for (idx, state_val) in states.into_iter().enumerate() {
434                results[idx].push(state_val);
435            }
436        }
437
438        let arrays = results
439            .into_iter()
440            .map(ScalarValue::iter_to_array)
441            .collect::<Result<Vec<_>>>()?;
442
443        Ok(arrays)
444    }
445
446    fn supports_convert_to_state(&self) -> bool {
447        true
448    }
449}
450
451/// Extension trait for [`Vec`] to account for allocations.
452pub trait VecAllocExt {
453    /// Item type.
454    type T;
455    /// Return the amount of memory allocated by this Vec (not
456    /// recursively counting any heap allocations contained within the
457    /// structure). Does not include the size of `self`
458    fn allocated_size(&self) -> usize;
459}
460
461impl<T> VecAllocExt for Vec<T> {
462    type T = T;
463    fn allocated_size(&self) -> usize {
464        size_of::<T>() * self.capacity()
465    }
466}
467
468fn get_filter_at_indices(
469    opt_filter: Option<&BooleanArray>,
470    indices: &PrimitiveArray<UInt32Type>,
471) -> Result<Option<ArrayRef>> {
472    opt_filter
473        .map(|filter| {
474            compute::take(
475                &filter, indices, None, // None: no index check
476            )
477        })
478        .transpose()
479        .map_err(|e| arrow_datafusion_err!(e))
480}
481
482// Copied from physical-plan
483pub(crate) fn slice_and_maybe_filter(
484    aggr_array: &[ArrayRef],
485    filter_opt: Option<&BooleanArray>,
486    offsets: &[usize],
487) -> Result<Vec<ArrayRef>> {
488    let (offset, length) = (offsets[0], offsets[1] - offsets[0]);
489    let sliced_arrays: Vec<ArrayRef> = aggr_array
490        .iter()
491        .map(|array| array.slice(offset, length))
492        .collect();
493
494    if let Some(f) = filter_opt {
495        let filter = f.slice(offset, length);
496
497        sliced_arrays
498            .iter()
499            .map(|array| {
500                compute::filter(&array, &filter).map_err(|e| arrow_datafusion_err!(e))
501            })
502            .collect()
503    } else {
504        Ok(sliced_arrays)
505    }
506}