datafusion_functions_aggregate_common/aggregate/groups_accumulator.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Utilities for implementing GroupsAccumulator
19//! Adapter that makes [`GroupsAccumulator`] out of [`Accumulator`]
20
21pub mod accumulate;
22pub mod bool_op;
23pub mod nulls;
24pub mod prim_op;
25
26use std::mem::{size_of, size_of_val};
27
28use arrow::array::new_empty_array;
29use arrow::{
30 array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray},
31 compute,
32 compute::take_arrays,
33 datatypes::UInt32Type,
34};
35use datafusion_common::{arrow_datafusion_err, DataFusionError, Result, ScalarValue};
36use datafusion_expr_common::accumulator::Accumulator;
37use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator};
38
39/// An adapter that implements [`GroupsAccumulator`] for any [`Accumulator`]
40///
41/// While [`Accumulator`] are simpler to implement and can support
42/// more general calculations (like retractable window functions),
43/// they are not as fast as a specialized `GroupsAccumulator`. This
44/// interface bridges the gap so the group by operator only operates
45/// in terms of [`Accumulator`].
46///
47/// Internally, this adapter creates a new [`Accumulator`] for each group which
48/// stores the state for that group. This both requires an allocation for each
49/// Accumulator, internal indices, as well as whatever internal allocations the
50/// Accumulator itself requires.
51///
52/// For example, a `MinAccumulator` that computes the minimum string value with
53/// a [`ScalarValue::Utf8`]. That will require at least two allocations per group
54/// (one for the `MinAccumulator` and one for the `ScalarValue::Utf8`).
55///
56/// ```text
57/// ┌─────────────────────────────────┐
58/// │MinAccumulator { │
59/// ┌─────▶│ min: ScalarValue::Utf8("A") │───────┐
60/// │ │} │ │
61/// │ └─────────────────────────────────┘ └───────▶ "A"
62/// ┌─────┐ │ ┌─────────────────────────────────┐
63/// │ 0 │─────┘ │MinAccumulator { │
64/// ├─────┤ ┌─────▶│ min: ScalarValue::Utf8("Z") │───────────────▶ "Z"
65/// │ 1 │─────┘ │} │
66/// └─────┘ └─────────────────────────────────┘ ...
67/// ... ...
68/// ┌─────┐ ┌────────────────────────────────┐
69/// │ N-2 │ │MinAccumulator { │
70/// ├─────┤ │ min: ScalarValue::Utf8("A") │────────────────▶ "A"
71/// │ N-1 │─────┐ │} │
72/// └─────┘ │ └────────────────────────────────┘
73/// │ ┌────────────────────────────────┐ ┌───────▶ "Q"
74/// │ │MinAccumulator { │ │
75/// └─────▶│ min: ScalarValue::Utf8("Q") │────────┘
76/// │} │
77/// └────────────────────────────────┘
78///
79///
80/// Logical group Current Min/Max value for that group stored
81/// number as a ScalarValue which points to an
82/// individually allocated String
83///
84///```
85///
86/// # Optimizations
87///
88/// The adapter minimizes the number of calls to [`Accumulator::update_batch`]
89/// by first collecting the input rows for each group into a contiguous array
90/// using [`compute::take`]
91///
92pub struct GroupsAccumulatorAdapter {
93 factory: Box<dyn Fn() -> Result<Box<dyn Accumulator>> + Send>,
94
95 /// state for each group, stored in group_index order
96 states: Vec<AccumulatorState>,
97
98 /// Current memory usage, in bytes.
99 ///
100 /// Note this is incrementally updated with deltas to avoid the
101 /// call to size() being a bottleneck. We saw size() being a
102 /// bottleneck in earlier implementations when there were many
103 /// distinct groups.
104 allocation_bytes: usize,
105}
106
107struct AccumulatorState {
108 /// [`Accumulator`] that stores the per-group state
109 accumulator: Box<dyn Accumulator>,
110
111 /// scratch space: indexes in the input array that will be fed to
112 /// this accumulator. Stores indexes as `u32` to match the arrow
113 /// `take` kernel input.
114 indices: Vec<u32>,
115}
116
117impl AccumulatorState {
118 fn new(accumulator: Box<dyn Accumulator>) -> Self {
119 Self {
120 accumulator,
121 indices: vec![],
122 }
123 }
124
125 /// Returns the amount of memory taken by this structure and its accumulator
126 fn size(&self) -> usize {
127 self.accumulator.size() + size_of_val(self) + self.indices.allocated_size()
128 }
129}
130
131impl GroupsAccumulatorAdapter {
132 /// Create a new adapter that will create a new [`Accumulator`]
133 /// for each group, using the specified factory function
134 pub fn new<F>(factory: F) -> Self
135 where
136 F: Fn() -> Result<Box<dyn Accumulator>> + Send + 'static,
137 {
138 Self {
139 factory: Box::new(factory),
140 states: vec![],
141 allocation_bytes: 0,
142 }
143 }
144
145 /// Ensure that self.accumulators has total_num_groups
146 fn make_accumulators_if_needed(&mut self, total_num_groups: usize) -> Result<()> {
147 // can't shrink
148 assert!(total_num_groups >= self.states.len());
149 let vec_size_pre = self.states.allocated_size();
150
151 // instantiate new accumulators
152 let new_accumulators = total_num_groups - self.states.len();
153 for _ in 0..new_accumulators {
154 let accumulator = (self.factory)()?;
155 let state = AccumulatorState::new(accumulator);
156 self.add_allocation(state.size());
157 self.states.push(state);
158 }
159
160 self.adjust_allocation(vec_size_pre, self.states.allocated_size());
161 Ok(())
162 }
163
164 /// invokes f(accumulator, values) for each group that has values
165 /// in group_indices.
166 ///
167 /// This function first reorders the input and filter so that
168 /// values for each group_index are contiguous and then invokes f
169 /// on the contiguous ranges, to minimize per-row overhead
170 ///
171 /// ```text
172 /// ┌─────────┐ ┌─────────┐ ┌ ─ ─ ─ ─ ┐ ┌─────────┐ ┌ ─ ─ ─ ─ ┐
173 /// │ ┌─────┐ │ │ ┌─────┐ │ ┌─────┐ ┏━━━━━┓ │ ┌─────┐ │ ┌─────┐
174 /// │ │ 2 │ │ │ │ 200 │ │ │ │ t │ │ ┃ 0 ┃ │ │ 200 │ │ │ │ t │ │
175 /// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ ┣━━━━━┫ │ ├─────┤ │ ├─────┤
176 /// │ │ 2 │ │ │ │ 100 │ │ │ │ f │ │ ┃ 0 ┃ │ │ 300 │ │ │ │ t │ │
177 /// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ ┣━━━━━┫ │ ├─────┤ │ ├─────┤
178 /// │ │ 0 │ │ │ │ 200 │ │ │ │ t │ │ ┃ 1 ┃ │ │ 200 │ │ │ │NULL │ │
179 /// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ ────────▶ ┣━━━━━┫ │ ├─────┤ │ ├─────┤
180 /// │ │ 1 │ │ │ │ 200 │ │ │ │NULL │ │ ┃ 2 ┃ │ │ 200 │ │ │ │ t │ │
181 /// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ ┣━━━━━┫ │ ├─────┤ │ ├─────┤
182 /// │ │ 0 │ │ │ │ 300 │ │ │ │ t │ │ ┃ 2 ┃ │ │ 100 │ │ │ │ f │ │
183 /// │ └─────┘ │ │ └─────┘ │ └─────┘ ┗━━━━━┛ │ └─────┘ │ └─────┘
184 /// └─────────┘ └─────────┘ └ ─ ─ ─ ─ ┘ └─────────┘ └ ─ ─ ─ ─ ┘
185 ///
186 /// logical group values opt_filter logical group values opt_filter
187 ///
188 /// ```
189 fn invoke_per_accumulator<F>(
190 &mut self,
191 values: &[ArrayRef],
192 group_indices: &[usize],
193 opt_filter: Option<&BooleanArray>,
194 total_num_groups: usize,
195 f: F,
196 ) -> Result<()>
197 where
198 F: Fn(&mut dyn Accumulator, &[ArrayRef]) -> Result<()>,
199 {
200 self.make_accumulators_if_needed(total_num_groups)?;
201
202 assert_eq!(values[0].len(), group_indices.len());
203
204 // figure out which input rows correspond to which groups.
205 // Note that self.state.indices starts empty for all groups
206 // (it is cleared out below)
207 for (idx, group_index) in group_indices.iter().enumerate() {
208 self.states[*group_index].indices.push(idx as u32);
209 }
210
211 // groups_with_rows holds a list of group indexes that have
212 // any rows that need to be accumulated, stored in order of
213 // group_index
214
215 let mut groups_with_rows = vec![];
216
217 // batch_indices holds indices into values, each group is contiguous
218 let mut batch_indices = vec![];
219
220 // offsets[i] is index into batch_indices where the rows for
221 // group_index i starts
222 let mut offsets = vec![0];
223
224 let mut offset_so_far = 0;
225 for (group_index, state) in self.states.iter_mut().enumerate() {
226 let indices = &state.indices;
227 if indices.is_empty() {
228 continue;
229 }
230
231 groups_with_rows.push(group_index);
232 batch_indices.extend_from_slice(indices);
233 offset_so_far += indices.len();
234 offsets.push(offset_so_far);
235 }
236 let batch_indices = batch_indices.into();
237
238 // reorder the values and opt_filter by batch_indices so that
239 // all values for each group are contiguous, then invoke the
240 // accumulator once per group with values
241 let values = take_arrays(values, &batch_indices, None)?;
242 let opt_filter = get_filter_at_indices(opt_filter, &batch_indices)?;
243
244 // invoke each accumulator with the appropriate rows, first
245 // pulling the input arguments for this group into their own
246 // RecordBatch(es)
247 let iter = groups_with_rows.iter().zip(offsets.windows(2));
248
249 let mut sizes_pre = 0;
250 let mut sizes_post = 0;
251 for (&group_idx, offsets) in iter {
252 let state = &mut self.states[group_idx];
253 sizes_pre += state.size();
254
255 let values_to_accumulate = slice_and_maybe_filter(
256 &values,
257 opt_filter.as_ref().map(|f| f.as_boolean()),
258 offsets,
259 )?;
260 f(state.accumulator.as_mut(), &values_to_accumulate)?;
261
262 // clear out the state so they are empty for next
263 // iteration
264 state.indices.clear();
265 sizes_post += state.size();
266 }
267
268 self.adjust_allocation(sizes_pre, sizes_post);
269 Ok(())
270 }
271
272 /// Increment the allocation by `n`
273 ///
274 /// See [`Self::allocation_bytes`] for rationale.
275 fn add_allocation(&mut self, size: usize) {
276 self.allocation_bytes += size;
277 }
278
279 /// Decrease the allocation by `n`
280 ///
281 /// See [`Self::allocation_bytes`] for rationale.
282 fn free_allocation(&mut self, size: usize) {
283 // use saturating sub to avoid errors if the accumulators
284 // report erroneous sizes
285 self.allocation_bytes = self.allocation_bytes.saturating_sub(size)
286 }
287
288 /// Adjusts the allocation for something that started with
289 /// start_size and now has new_size avoiding overflow
290 ///
291 /// See [`Self::allocation_bytes`] for rationale.
292 fn adjust_allocation(&mut self, old_size: usize, new_size: usize) {
293 if new_size > old_size {
294 self.add_allocation(new_size - old_size)
295 } else {
296 self.free_allocation(old_size - new_size)
297 }
298 }
299}
300
301impl GroupsAccumulator for GroupsAccumulatorAdapter {
302 fn update_batch(
303 &mut self,
304 values: &[ArrayRef],
305 group_indices: &[usize],
306 opt_filter: Option<&BooleanArray>,
307 total_num_groups: usize,
308 ) -> Result<()> {
309 self.invoke_per_accumulator(
310 values,
311 group_indices,
312 opt_filter,
313 total_num_groups,
314 |accumulator, values_to_accumulate| {
315 accumulator.update_batch(values_to_accumulate)
316 },
317 )?;
318 Ok(())
319 }
320
321 fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
322 let vec_size_pre = self.states.allocated_size();
323
324 let states = emit_to.take_needed(&mut self.states);
325
326 let results: Vec<ScalarValue> = states
327 .into_iter()
328 .map(|mut state| {
329 self.free_allocation(state.size());
330 state.accumulator.evaluate()
331 })
332 .collect::<Result<_>>()?;
333
334 let result = ScalarValue::iter_to_array(results);
335
336 self.adjust_allocation(vec_size_pre, self.states.allocated_size());
337
338 result
339 }
340
341 // filtered_null_mask(opt_filter, &values);
342 fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
343 let vec_size_pre = self.states.allocated_size();
344 let states = emit_to.take_needed(&mut self.states);
345
346 // each accumulator produces a potential vector of values
347 // which we need to form into columns
348 let mut results: Vec<Vec<ScalarValue>> = vec![];
349
350 for mut state in states {
351 self.free_allocation(state.size());
352 let accumulator_state = state.accumulator.state()?;
353 results.resize_with(accumulator_state.len(), Vec::new);
354 for (idx, state_val) in accumulator_state.into_iter().enumerate() {
355 results[idx].push(state_val);
356 }
357 }
358
359 // create an array for each intermediate column
360 let arrays = results
361 .into_iter()
362 .map(ScalarValue::iter_to_array)
363 .collect::<Result<Vec<_>>>()?;
364
365 // double check each array has the same length (aka the
366 // accumulator was implemented correctly
367 if let Some(first_col) = arrays.first() {
368 for arr in &arrays {
369 assert_eq!(arr.len(), first_col.len())
370 }
371 }
372 self.adjust_allocation(vec_size_pre, self.states.allocated_size());
373
374 Ok(arrays)
375 }
376
377 fn merge_batch(
378 &mut self,
379 values: &[ArrayRef],
380 group_indices: &[usize],
381 opt_filter: Option<&BooleanArray>,
382 total_num_groups: usize,
383 ) -> Result<()> {
384 self.invoke_per_accumulator(
385 values,
386 group_indices,
387 opt_filter,
388 total_num_groups,
389 |accumulator, values_to_accumulate| {
390 accumulator.merge_batch(values_to_accumulate)?;
391 Ok(())
392 },
393 )?;
394 Ok(())
395 }
396
397 fn size(&self) -> usize {
398 self.allocation_bytes
399 }
400
401 fn convert_to_state(
402 &self,
403 values: &[ArrayRef],
404 opt_filter: Option<&BooleanArray>,
405 ) -> Result<Vec<ArrayRef>> {
406 let num_rows = values[0].len();
407
408 // If there are no rows, return empty arrays
409 if num_rows == 0 {
410 // create empty accumulator to get the state types
411 let empty_state = (self.factory)()?.state()?;
412 let empty_arrays = empty_state
413 .into_iter()
414 .map(|state_val| new_empty_array(&state_val.data_type()))
415 .collect::<Vec<_>>();
416
417 return Ok(empty_arrays);
418 }
419
420 // Each row has its respective group
421 let mut results = vec![];
422 for row_idx in 0..num_rows {
423 // Create the empty accumulator for converting
424 let mut converted_accumulator = (self.factory)()?;
425
426 // Convert row to states
427 let values_to_accumulate =
428 slice_and_maybe_filter(values, opt_filter, &[row_idx, row_idx + 1])?;
429 converted_accumulator.update_batch(&values_to_accumulate)?;
430 let states = converted_accumulator.state()?;
431
432 // Resize results to have enough columns according to the converted states
433 results.resize_with(states.len(), || Vec::with_capacity(num_rows));
434
435 // Add the states to results
436 for (idx, state_val) in states.into_iter().enumerate() {
437 results[idx].push(state_val);
438 }
439 }
440
441 let arrays = results
442 .into_iter()
443 .map(ScalarValue::iter_to_array)
444 .collect::<Result<Vec<_>>>()?;
445
446 Ok(arrays)
447 }
448
449 fn supports_convert_to_state(&self) -> bool {
450 true
451 }
452}
453
454/// Extension trait for [`Vec`] to account for allocations.
455pub trait VecAllocExt {
456 /// Item type.
457 type T;
458 /// Return the amount of memory allocated by this Vec (not
459 /// recursively counting any heap allocations contained within the
460 /// structure). Does not include the size of `self`
461 fn allocated_size(&self) -> usize;
462}
463
464impl<T> VecAllocExt for Vec<T> {
465 type T = T;
466 fn allocated_size(&self) -> usize {
467 size_of::<T>() * self.capacity()
468 }
469}
470
471fn get_filter_at_indices(
472 opt_filter: Option<&BooleanArray>,
473 indices: &PrimitiveArray<UInt32Type>,
474) -> Result<Option<ArrayRef>> {
475 opt_filter
476 .map(|filter| {
477 compute::take(
478 &filter, indices, None, // None: no index check
479 )
480 })
481 .transpose()
482 .map_err(|e| arrow_datafusion_err!(e))
483}
484
485// Copied from physical-plan
486pub(crate) fn slice_and_maybe_filter(
487 aggr_array: &[ArrayRef],
488 filter_opt: Option<&BooleanArray>,
489 offsets: &[usize],
490) -> Result<Vec<ArrayRef>> {
491 let (offset, length) = (offsets[0], offsets[1] - offsets[0]);
492 let sliced_arrays: Vec<ArrayRef> = aggr_array
493 .iter()
494 .map(|array| array.slice(offset, length))
495 .collect();
496
497 if let Some(f) = filter_opt {
498 let filter = f.slice(offset, length);
499
500 sliced_arrays
501 .iter()
502 .map(|array| {
503 compute::filter(&array, &filter).map_err(|e| arrow_datafusion_err!(e))
504 })
505 .collect()
506 } else {
507 Ok(sliced_arrays)
508 }
509}