use super::{EmitTo, GroupsAccumulator};
use arrow::{
array::{AsArray, UInt32Builder},
compute,
datatypes::UInt32Type,
};
use arrow_array::{ArrayRef, BooleanArray, PrimitiveArray};
use datafusion_common::{
utils::get_arrayref_at_indices, DataFusionError, Result, ScalarValue,
};
use datafusion_expr::Accumulator;
pub struct GroupsAccumulatorAdapter {
factory: Box<dyn Fn() -> Result<Box<dyn Accumulator>> + Send>,
states: Vec<AccumulatorState>,
allocation_bytes: usize,
}
struct AccumulatorState {
accumulator: Box<dyn Accumulator>,
indices: Vec<u32>,
}
impl AccumulatorState {
fn new(accumulator: Box<dyn Accumulator>) -> Self {
Self {
accumulator,
indices: vec![],
}
}
fn size(&self) -> usize {
self.accumulator.size()
+ std::mem::size_of_val(self)
+ self.indices.allocated_size()
}
}
impl GroupsAccumulatorAdapter {
pub fn new<F>(factory: F) -> Self
where
F: Fn() -> Result<Box<dyn Accumulator>> + Send + 'static,
{
Self {
factory: Box::new(factory),
states: vec![],
allocation_bytes: 0,
}
}
fn make_accumulators_if_needed(&mut self, total_num_groups: usize) -> Result<()> {
assert!(total_num_groups >= self.states.len());
let vec_size_pre = self.states.allocated_size();
let new_accumulators = total_num_groups - self.states.len();
for _ in 0..new_accumulators {
let accumulator = (self.factory)()?;
let state = AccumulatorState::new(accumulator);
self.add_allocation(state.size());
self.states.push(state);
}
self.adjust_allocation(vec_size_pre, self.states.allocated_size());
Ok(())
}
fn invoke_per_accumulator<F>(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
f: F,
) -> Result<()>
where
F: Fn(&mut dyn Accumulator, &[ArrayRef]) -> Result<()>,
{
self.make_accumulators_if_needed(total_num_groups)?;
assert_eq!(values[0].len(), group_indices.len());
for (idx, group_index) in group_indices.iter().enumerate() {
self.states[*group_index].indices.push(idx as u32);
}
let mut groups_with_rows = vec![];
let mut batch_indices = UInt32Builder::with_capacity(0);
let mut offsets = vec![0];
let mut offset_so_far = 0;
for (group_index, state) in self.states.iter_mut().enumerate() {
let indices = &state.indices;
if indices.is_empty() {
continue;
}
groups_with_rows.push(group_index);
batch_indices.append_slice(indices);
offset_so_far += indices.len();
offsets.push(offset_so_far);
}
let batch_indices = batch_indices.finish();
let values = get_arrayref_at_indices(values, &batch_indices)?;
let opt_filter = get_filter_at_indices(opt_filter, &batch_indices)?;
let iter = groups_with_rows.iter().zip(offsets.windows(2));
let mut sizes_pre = 0;
let mut sizes_post = 0;
for (&group_idx, offsets) in iter {
let state = &mut self.states[group_idx];
sizes_pre += state.size();
let values_to_accumulate =
slice_and_maybe_filter(&values, opt_filter.as_ref(), offsets)?;
(f)(state.accumulator.as_mut(), &values_to_accumulate)?;
state.indices.clear();
sizes_post += state.size();
}
self.adjust_allocation(sizes_pre, sizes_post);
Ok(())
}
fn add_allocation(&mut self, size: usize) {
self.allocation_bytes += size;
}
fn free_allocation(&mut self, size: usize) {
self.allocation_bytes = self.allocation_bytes.saturating_sub(size)
}
fn adjust_allocation(&mut self, old_size: usize, new_size: usize) {
if new_size > old_size {
self.add_allocation(new_size - old_size)
} else {
self.free_allocation(old_size - new_size)
}
}
}
impl GroupsAccumulator for GroupsAccumulatorAdapter {
fn update_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
self.invoke_per_accumulator(
values,
group_indices,
opt_filter,
total_num_groups,
|accumulator, values_to_accumulate| {
accumulator.update_batch(values_to_accumulate)
},
)?;
Ok(())
}
fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
let vec_size_pre = self.states.allocated_size();
let states = emit_to.take_needed(&mut self.states);
let results: Vec<ScalarValue> = states
.into_iter()
.map(|state| {
self.free_allocation(state.size());
state.accumulator.evaluate()
})
.collect::<Result<_>>()?;
let result = ScalarValue::iter_to_array(results);
self.adjust_allocation(vec_size_pre, self.states.allocated_size());
result
}
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
let vec_size_pre = self.states.allocated_size();
let states = emit_to.take_needed(&mut self.states);
let mut results: Vec<Vec<ScalarValue>> = vec![];
for state in states {
self.free_allocation(state.size());
let accumulator_state = state.accumulator.state()?;
results.resize_with(accumulator_state.len(), Vec::new);
for (idx, state_val) in accumulator_state.into_iter().enumerate() {
results[idx].push(state_val);
}
}
let arrays = results
.into_iter()
.map(ScalarValue::iter_to_array)
.collect::<Result<Vec<_>>>()?;
if let Some(first_col) = arrays.get(0) {
for arr in &arrays {
assert_eq!(arr.len(), first_col.len())
}
}
self.adjust_allocation(vec_size_pre, self.states.allocated_size());
Ok(arrays)
}
fn merge_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
self.invoke_per_accumulator(
values,
group_indices,
opt_filter,
total_num_groups,
|accumulator, values_to_accumulate| {
accumulator.merge_batch(values_to_accumulate)?;
Ok(())
},
)?;
Ok(())
}
fn size(&self) -> usize {
self.allocation_bytes
}
}
pub trait VecAllocExt {
type T;
fn allocated_size(&self) -> usize;
}
impl<T> VecAllocExt for Vec<T> {
type T = T;
fn allocated_size(&self) -> usize {
std::mem::size_of::<T>() * self.capacity()
}
}
fn get_filter_at_indices(
opt_filter: Option<&BooleanArray>,
indices: &PrimitiveArray<UInt32Type>,
) -> Result<Option<ArrayRef>> {
opt_filter
.map(|filter| {
compute::take(
&filter, indices, None, )
})
.transpose()
.map_err(DataFusionError::ArrowError)
}
pub(crate) fn slice_and_maybe_filter(
aggr_array: &[ArrayRef],
filter_opt: Option<&ArrayRef>,
offsets: &[usize],
) -> Result<Vec<ArrayRef>> {
let (offset, length) = (offsets[0], offsets[1] - offsets[0]);
let sliced_arrays: Vec<ArrayRef> = aggr_array
.iter()
.map(|array| array.slice(offset, length))
.collect();
if let Some(f) = filter_opt {
let filter_array = f.slice(offset, length);
let filter_array = filter_array.as_boolean();
sliced_arrays
.iter()
.map(|array| {
compute::filter(array, filter_array).map_err(DataFusionError::ArrowError)
})
.collect()
} else {
Ok(sliced_arrays)
}
}