datafusion_functions_aggregate_common/aggregate/groups_accumulator.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Utilities for implementing GroupsAccumulator
19//! Adapter that makes [`GroupsAccumulator`] out of [`Accumulator`]
20
21pub mod accumulate;
22pub mod bool_op;
23pub mod nulls;
24pub mod prim_op;
25
26use std::mem::{size_of, size_of_val};
27
28use arrow::array::new_empty_array;
29use arrow::{
30 array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray},
31 compute,
32 compute::take_arrays,
33 datatypes::UInt32Type,
34};
35use datafusion_common::{arrow_datafusion_err, DataFusionError, Result, ScalarValue};
36use datafusion_expr_common::accumulator::Accumulator;
37use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator};
38
39/// An adapter that implements [`GroupsAccumulator`] for any [`Accumulator`]
40///
41/// While [`Accumulator`] are simpler to implement and can support
42/// more general calculations (like retractable window functions),
43/// they are not as fast as a specialized `GroupsAccumulator`. This
44/// interface bridges the gap so the group by operator only operates
45/// in terms of [`Accumulator`].
46///
47/// Internally, this adapter creates a new [`Accumulator`] for each group which
48/// stores the state for that group. This both requires an allocation for each
49/// Accumulator, internal indices, as well as whatever internal allocations the
50/// Accumulator itself requires.
51///
52/// For example, a `MinAccumulator` that computes the minimum string value with
53/// a [`ScalarValue::Utf8`]. That will require at least two allocations per group
54/// (one for the `MinAccumulator` and one for the `ScalarValue::Utf8`).
55///
56/// ```text
57/// ┌─────────────────────────────────┐
58/// │MinAccumulator { │
59/// ┌─────▶│ min: ScalarValue::Utf8("A") │───────┐
60/// │ │} │ │
61/// │ └─────────────────────────────────┘ └───────▶ "A"
62/// ┌─────┐ │ ┌─────────────────────────────────┐
63/// │ 0 │─────┘ │MinAccumulator { │
64/// ├─────┤ ┌─────▶│ min: ScalarValue::Utf8("Z") │───────────────▶ "Z"
65/// │ 1 │─────┘ │} │
66/// └─────┘ └─────────────────────────────────┘ ...
67/// ... ...
68/// ┌─────┐ ┌────────────────────────────────┐
69/// │ N-2 │ │MinAccumulator { │
70/// ├─────┤ │ min: ScalarValue::Utf8("A") │────────────────▶ "A"
71/// │ N-1 │─────┐ │} │
72/// └─────┘ │ └────────────────────────────────┘
73/// │ ┌────────────────────────────────┐ ┌───────▶ "Q"
74/// │ │MinAccumulator { │ │
75/// └─────▶│ min: ScalarValue::Utf8("Q") │────────┘
76/// │} │
77/// └────────────────────────────────┘
78///
79///
80/// Logical group Current Min/Max value for that group stored
81/// number as a ScalarValue which points to an
82/// individually allocated String
83/// ```
84///
85/// # Optimizations
86///
87/// The adapter minimizes the number of calls to [`Accumulator::update_batch`]
88/// by first collecting the input rows for each group into a contiguous array
89/// using [`compute::take`]
90pub struct GroupsAccumulatorAdapter {
91 factory: Box<dyn Fn() -> Result<Box<dyn Accumulator>> + Send>,
92
93 /// state for each group, stored in group_index order
94 states: Vec<AccumulatorState>,
95
96 /// Current memory usage, in bytes.
97 ///
98 /// Note this is incrementally updated with deltas to avoid the
99 /// call to size() being a bottleneck. We saw size() being a
100 /// bottleneck in earlier implementations when there were many
101 /// distinct groups.
102 allocation_bytes: usize,
103}
104
105struct AccumulatorState {
106 /// [`Accumulator`] that stores the per-group state
107 accumulator: Box<dyn Accumulator>,
108
109 /// scratch space: indexes in the input array that will be fed to
110 /// this accumulator. Stores indexes as `u32` to match the arrow
111 /// `take` kernel input.
112 indices: Vec<u32>,
113}
114
115impl AccumulatorState {
116 fn new(accumulator: Box<dyn Accumulator>) -> Self {
117 Self {
118 accumulator,
119 indices: vec![],
120 }
121 }
122
123 /// Returns the amount of memory taken by this structure and its accumulator
124 fn size(&self) -> usize {
125 self.accumulator.size() + size_of_val(self) + self.indices.allocated_size()
126 }
127}
128
129impl GroupsAccumulatorAdapter {
130 /// Create a new adapter that will create a new [`Accumulator`]
131 /// for each group, using the specified factory function
132 pub fn new<F>(factory: F) -> Self
133 where
134 F: Fn() -> Result<Box<dyn Accumulator>> + Send + 'static,
135 {
136 Self {
137 factory: Box::new(factory),
138 states: vec![],
139 allocation_bytes: 0,
140 }
141 }
142
143 /// Ensure that self.accumulators has total_num_groups
144 fn make_accumulators_if_needed(&mut self, total_num_groups: usize) -> Result<()> {
145 // can't shrink
146 assert!(total_num_groups >= self.states.len());
147 let vec_size_pre = self.states.allocated_size();
148
149 // instantiate new accumulators
150 let new_accumulators = total_num_groups - self.states.len();
151 for _ in 0..new_accumulators {
152 let accumulator = (self.factory)()?;
153 let state = AccumulatorState::new(accumulator);
154 self.add_allocation(state.size());
155 self.states.push(state);
156 }
157
158 self.adjust_allocation(vec_size_pre, self.states.allocated_size());
159 Ok(())
160 }
161
162 /// invokes f(accumulator, values) for each group that has values
163 /// in group_indices.
164 ///
165 /// This function first reorders the input and filter so that
166 /// values for each group_index are contiguous and then invokes f
167 /// on the contiguous ranges, to minimize per-row overhead
168 ///
169 /// ```text
170 /// ┌─────────┐ ┌─────────┐ ┌ ─ ─ ─ ─ ┐ ┌─────────┐ ┌ ─ ─ ─ ─ ┐
171 /// │ ┌─────┐ │ │ ┌─────┐ │ ┌─────┐ ┏━━━━━┓ │ ┌─────┐ │ ┌─────┐
172 /// │ │ 2 │ │ │ │ 200 │ │ │ │ t │ │ ┃ 0 ┃ │ │ 200 │ │ │ │ t │ │
173 /// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ ┣━━━━━┫ │ ├─────┤ │ ├─────┤
174 /// │ │ 2 │ │ │ │ 100 │ │ │ │ f │ │ ┃ 0 ┃ │ │ 300 │ │ │ │ t │ │
175 /// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ ┣━━━━━┫ │ ├─────┤ │ ├─────┤
176 /// │ │ 0 │ │ │ │ 200 │ │ │ │ t │ │ ┃ 1 ┃ │ │ 200 │ │ │ │NULL │ │
177 /// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ ────────▶ ┣━━━━━┫ │ ├─────┤ │ ├─────┤
178 /// │ │ 1 │ │ │ │ 200 │ │ │ │NULL │ │ ┃ 2 ┃ │ │ 200 │ │ │ │ t │ │
179 /// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ ┣━━━━━┫ │ ├─────┤ │ ├─────┤
180 /// │ │ 0 │ │ │ │ 300 │ │ │ │ t │ │ ┃ 2 ┃ │ │ 100 │ │ │ │ f │ │
181 /// │ └─────┘ │ │ └─────┘ │ └─────┘ ┗━━━━━┛ │ └─────┘ │ └─────┘
182 /// └─────────┘ └─────────┘ └ ─ ─ ─ ─ ┘ └─────────┘ └ ─ ─ ─ ─ ┘
183 ///
184 /// logical group values opt_filter logical group values opt_filter
185 /// ```
186 fn invoke_per_accumulator<F>(
187 &mut self,
188 values: &[ArrayRef],
189 group_indices: &[usize],
190 opt_filter: Option<&BooleanArray>,
191 total_num_groups: usize,
192 f: F,
193 ) -> Result<()>
194 where
195 F: Fn(&mut dyn Accumulator, &[ArrayRef]) -> Result<()>,
196 {
197 self.make_accumulators_if_needed(total_num_groups)?;
198
199 assert_eq!(values[0].len(), group_indices.len());
200
201 // figure out which input rows correspond to which groups.
202 // Note that self.state.indices starts empty for all groups
203 // (it is cleared out below)
204 for (idx, group_index) in group_indices.iter().enumerate() {
205 self.states[*group_index].indices.push(idx as u32);
206 }
207
208 // groups_with_rows holds a list of group indexes that have
209 // any rows that need to be accumulated, stored in order of
210 // group_index
211
212 let mut groups_with_rows = vec![];
213
214 // batch_indices holds indices into values, each group is contiguous
215 let mut batch_indices = vec![];
216
217 // offsets[i] is index into batch_indices where the rows for
218 // group_index i starts
219 let mut offsets = vec![0];
220
221 let mut offset_so_far = 0;
222 for (group_index, state) in self.states.iter_mut().enumerate() {
223 let indices = &state.indices;
224 if indices.is_empty() {
225 continue;
226 }
227
228 groups_with_rows.push(group_index);
229 batch_indices.extend_from_slice(indices);
230 offset_so_far += indices.len();
231 offsets.push(offset_so_far);
232 }
233 let batch_indices = batch_indices.into();
234
235 // reorder the values and opt_filter by batch_indices so that
236 // all values for each group are contiguous, then invoke the
237 // accumulator once per group with values
238 let values = take_arrays(values, &batch_indices, None)?;
239 let opt_filter = get_filter_at_indices(opt_filter, &batch_indices)?;
240
241 // invoke each accumulator with the appropriate rows, first
242 // pulling the input arguments for this group into their own
243 // RecordBatch(es)
244 let iter = groups_with_rows.iter().zip(offsets.windows(2));
245
246 let mut sizes_pre = 0;
247 let mut sizes_post = 0;
248 for (&group_idx, offsets) in iter {
249 let state = &mut self.states[group_idx];
250 sizes_pre += state.size();
251
252 let values_to_accumulate = slice_and_maybe_filter(
253 &values,
254 opt_filter.as_ref().map(|f| f.as_boolean()),
255 offsets,
256 )?;
257 f(state.accumulator.as_mut(), &values_to_accumulate)?;
258
259 // clear out the state so they are empty for next
260 // iteration
261 state.indices.clear();
262 sizes_post += state.size();
263 }
264
265 self.adjust_allocation(sizes_pre, sizes_post);
266 Ok(())
267 }
268
269 /// Increment the allocation by `n`
270 ///
271 /// See [`Self::allocation_bytes`] for rationale.
272 fn add_allocation(&mut self, size: usize) {
273 self.allocation_bytes += size;
274 }
275
276 /// Decrease the allocation by `n`
277 ///
278 /// See [`Self::allocation_bytes`] for rationale.
279 fn free_allocation(&mut self, size: usize) {
280 // use saturating sub to avoid errors if the accumulators
281 // report erroneous sizes
282 self.allocation_bytes = self.allocation_bytes.saturating_sub(size)
283 }
284
285 /// Adjusts the allocation for something that started with
286 /// start_size and now has new_size avoiding overflow
287 ///
288 /// See [`Self::allocation_bytes`] for rationale.
289 fn adjust_allocation(&mut self, old_size: usize, new_size: usize) {
290 if new_size > old_size {
291 self.add_allocation(new_size - old_size)
292 } else {
293 self.free_allocation(old_size - new_size)
294 }
295 }
296}
297
298impl GroupsAccumulator for GroupsAccumulatorAdapter {
299 fn update_batch(
300 &mut self,
301 values: &[ArrayRef],
302 group_indices: &[usize],
303 opt_filter: Option<&BooleanArray>,
304 total_num_groups: usize,
305 ) -> Result<()> {
306 self.invoke_per_accumulator(
307 values,
308 group_indices,
309 opt_filter,
310 total_num_groups,
311 |accumulator, values_to_accumulate| {
312 accumulator.update_batch(values_to_accumulate)
313 },
314 )?;
315 Ok(())
316 }
317
318 fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
319 let vec_size_pre = self.states.allocated_size();
320
321 let states = emit_to.take_needed(&mut self.states);
322
323 let results: Vec<ScalarValue> = states
324 .into_iter()
325 .map(|mut state| {
326 self.free_allocation(state.size());
327 state.accumulator.evaluate()
328 })
329 .collect::<Result<_>>()?;
330
331 let result = ScalarValue::iter_to_array(results);
332
333 self.adjust_allocation(vec_size_pre, self.states.allocated_size());
334
335 result
336 }
337
338 // filtered_null_mask(opt_filter, &values);
339 fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
340 let vec_size_pre = self.states.allocated_size();
341 let states = emit_to.take_needed(&mut self.states);
342
343 // each accumulator produces a potential vector of values
344 // which we need to form into columns
345 let mut results: Vec<Vec<ScalarValue>> = vec![];
346
347 for mut state in states {
348 self.free_allocation(state.size());
349 let accumulator_state = state.accumulator.state()?;
350 results.resize_with(accumulator_state.len(), Vec::new);
351 for (idx, state_val) in accumulator_state.into_iter().enumerate() {
352 results[idx].push(state_val);
353 }
354 }
355
356 // create an array for each intermediate column
357 let arrays = results
358 .into_iter()
359 .map(ScalarValue::iter_to_array)
360 .collect::<Result<Vec<_>>>()?;
361
362 // double check each array has the same length (aka the
363 // accumulator was implemented correctly
364 if let Some(first_col) = arrays.first() {
365 for arr in &arrays {
366 assert_eq!(arr.len(), first_col.len())
367 }
368 }
369 self.adjust_allocation(vec_size_pre, self.states.allocated_size());
370
371 Ok(arrays)
372 }
373
374 fn merge_batch(
375 &mut self,
376 values: &[ArrayRef],
377 group_indices: &[usize],
378 opt_filter: Option<&BooleanArray>,
379 total_num_groups: usize,
380 ) -> Result<()> {
381 self.invoke_per_accumulator(
382 values,
383 group_indices,
384 opt_filter,
385 total_num_groups,
386 |accumulator, values_to_accumulate| {
387 accumulator.merge_batch(values_to_accumulate)?;
388 Ok(())
389 },
390 )?;
391 Ok(())
392 }
393
394 fn size(&self) -> usize {
395 self.allocation_bytes
396 }
397
398 fn convert_to_state(
399 &self,
400 values: &[ArrayRef],
401 opt_filter: Option<&BooleanArray>,
402 ) -> Result<Vec<ArrayRef>> {
403 let num_rows = values[0].len();
404
405 // If there are no rows, return empty arrays
406 if num_rows == 0 {
407 // create empty accumulator to get the state types
408 let empty_state = (self.factory)()?.state()?;
409 let empty_arrays = empty_state
410 .into_iter()
411 .map(|state_val| new_empty_array(&state_val.data_type()))
412 .collect::<Vec<_>>();
413
414 return Ok(empty_arrays);
415 }
416
417 // Each row has its respective group
418 let mut results = vec![];
419 for row_idx in 0..num_rows {
420 // Create the empty accumulator for converting
421 let mut converted_accumulator = (self.factory)()?;
422
423 // Convert row to states
424 let values_to_accumulate =
425 slice_and_maybe_filter(values, opt_filter, &[row_idx, row_idx + 1])?;
426 converted_accumulator.update_batch(&values_to_accumulate)?;
427 let states = converted_accumulator.state()?;
428
429 // Resize results to have enough columns according to the converted states
430 results.resize_with(states.len(), || Vec::with_capacity(num_rows));
431
432 // Add the states to results
433 for (idx, state_val) in states.into_iter().enumerate() {
434 results[idx].push(state_val);
435 }
436 }
437
438 let arrays = results
439 .into_iter()
440 .map(ScalarValue::iter_to_array)
441 .collect::<Result<Vec<_>>>()?;
442
443 Ok(arrays)
444 }
445
446 fn supports_convert_to_state(&self) -> bool {
447 true
448 }
449}
450
451/// Extension trait for [`Vec`] to account for allocations.
452pub trait VecAllocExt {
453 /// Item type.
454 type T;
455 /// Return the amount of memory allocated by this Vec (not
456 /// recursively counting any heap allocations contained within the
457 /// structure). Does not include the size of `self`
458 fn allocated_size(&self) -> usize;
459}
460
461impl<T> VecAllocExt for Vec<T> {
462 type T = T;
463 fn allocated_size(&self) -> usize {
464 size_of::<T>() * self.capacity()
465 }
466}
467
468fn get_filter_at_indices(
469 opt_filter: Option<&BooleanArray>,
470 indices: &PrimitiveArray<UInt32Type>,
471) -> Result<Option<ArrayRef>> {
472 opt_filter
473 .map(|filter| {
474 compute::take(
475 &filter, indices, None, // None: no index check
476 )
477 })
478 .transpose()
479 .map_err(|e| arrow_datafusion_err!(e))
480}
481
482// Copied from physical-plan
483pub(crate) fn slice_and_maybe_filter(
484 aggr_array: &[ArrayRef],
485 filter_opt: Option<&BooleanArray>,
486 offsets: &[usize],
487) -> Result<Vec<ArrayRef>> {
488 let (offset, length) = (offsets[0], offsets[1] - offsets[0]);
489 let sliced_arrays: Vec<ArrayRef> = aggr_array
490 .iter()
491 .map(|array| array.slice(offset, length))
492 .collect();
493
494 if let Some(f) = filter_opt {
495 let filter = f.slice(offset, length);
496
497 sliced_arrays
498 .iter()
499 .map(|array| {
500 compute::filter(&array, &filter).map_err(|e| arrow_datafusion_err!(e))
501 })
502 .collect()
503 } else {
504 Ok(sliced_arrays)
505 }
506}