pub mod accumulate;
pub mod bool_op;
pub mod nulls;
pub mod prim_op;
use std::mem::{size_of, size_of_val};
use arrow::array::new_empty_array;
use arrow::{
array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray},
compute,
compute::take_arrays,
datatypes::UInt32Type,
};
use datafusion_common::{Result, ScalarValue, arrow_datafusion_err};
use datafusion_expr_common::accumulator::Accumulator;
use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator};
pub struct GroupsAccumulatorAdapter {
factory: Box<dyn Fn() -> Result<Box<dyn Accumulator>> + Send>,
states: Vec<AccumulatorState>,
allocation_bytes: usize,
}
struct AccumulatorState {
accumulator: Box<dyn Accumulator>,
indices: Vec<u32>,
}
impl AccumulatorState {
fn new(accumulator: Box<dyn Accumulator>) -> Self {
Self {
accumulator,
indices: vec![],
}
}
fn size(&self) -> usize {
self.accumulator.size() + size_of_val(self) + self.indices.allocated_size()
}
}
impl GroupsAccumulatorAdapter {
pub fn new<F>(factory: F) -> Self
where
F: Fn() -> Result<Box<dyn Accumulator>> + Send + 'static,
{
Self {
factory: Box::new(factory),
states: vec![],
allocation_bytes: 0,
}
}
fn make_accumulators_if_needed(&mut self, total_num_groups: usize) -> Result<()> {
assert!(total_num_groups >= self.states.len());
let vec_size_pre = self.states.allocated_size();
let new_accumulators = total_num_groups - self.states.len();
for _ in 0..new_accumulators {
let accumulator = (self.factory)()?;
let state = AccumulatorState::new(accumulator);
self.add_allocation(state.size());
self.states.push(state);
}
self.adjust_allocation(vec_size_pre, self.states.allocated_size());
Ok(())
}
fn invoke_per_accumulator<F>(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
f: F,
) -> Result<()>
where
F: Fn(&mut dyn Accumulator, &[ArrayRef]) -> Result<()>,
{
self.make_accumulators_if_needed(total_num_groups)?;
assert_eq!(values[0].len(), group_indices.len());
for (idx, group_index) in group_indices.iter().enumerate() {
self.states[*group_index].indices.push(idx as u32);
}
let mut groups_with_rows = vec![];
let mut batch_indices = vec![];
let mut offsets = vec![0];
let mut offset_so_far = 0;
for (group_index, state) in self.states.iter_mut().enumerate() {
let indices = &state.indices;
if indices.is_empty() {
continue;
}
groups_with_rows.push(group_index);
batch_indices.extend_from_slice(indices);
offset_so_far += indices.len();
offsets.push(offset_so_far);
}
let batch_indices = batch_indices.into();
let values = take_arrays(values, &batch_indices, None)?;
let opt_filter = get_filter_at_indices(opt_filter, &batch_indices)?;
let iter = groups_with_rows.iter().zip(offsets.windows(2));
let mut sizes_pre = 0;
let mut sizes_post = 0;
for (&group_idx, offsets) in iter {
let state = &mut self.states[group_idx];
sizes_pre += state.size();
let values_to_accumulate = slice_and_maybe_filter(
&values,
opt_filter.as_ref().map(|f| f.as_boolean()),
offsets,
)?;
f(state.accumulator.as_mut(), &values_to_accumulate)?;
state.indices.clear();
sizes_post += state.size();
}
self.adjust_allocation(sizes_pre, sizes_post);
Ok(())
}
fn add_allocation(&mut self, size: usize) {
self.allocation_bytes += size;
}
fn free_allocation(&mut self, size: usize) {
self.allocation_bytes = self.allocation_bytes.saturating_sub(size)
}
fn adjust_allocation(&mut self, old_size: usize, new_size: usize) {
if new_size > old_size {
self.add_allocation(new_size - old_size)
} else {
self.free_allocation(old_size - new_size)
}
}
}
impl GroupsAccumulator for GroupsAccumulatorAdapter {
fn update_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
self.invoke_per_accumulator(
values,
group_indices,
opt_filter,
total_num_groups,
|accumulator, values_to_accumulate| {
accumulator.update_batch(values_to_accumulate)
},
)?;
Ok(())
}
fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
let vec_size_pre = self.states.allocated_size();
let states = emit_to.take_needed(&mut self.states);
let results: Vec<ScalarValue> = states
.into_iter()
.map(|mut state| {
self.free_allocation(state.size());
state.accumulator.evaluate()
})
.collect::<Result<_>>()?;
let result = ScalarValue::iter_to_array(results);
self.adjust_allocation(vec_size_pre, self.states.allocated_size());
result
}
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
let vec_size_pre = self.states.allocated_size();
let states = emit_to.take_needed(&mut self.states);
let mut results: Vec<Vec<ScalarValue>> = vec![];
for mut state in states {
self.free_allocation(state.size());
let accumulator_state = state.accumulator.state()?;
results.resize_with(accumulator_state.len(), Vec::new);
for (idx, state_val) in accumulator_state.into_iter().enumerate() {
results[idx].push(state_val);
}
}
let arrays = results
.into_iter()
.map(ScalarValue::iter_to_array)
.collect::<Result<Vec<_>>>()?;
if let Some(first_col) = arrays.first() {
for arr in &arrays {
assert_eq!(arr.len(), first_col.len())
}
}
self.adjust_allocation(vec_size_pre, self.states.allocated_size());
Ok(arrays)
}
fn merge_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
self.invoke_per_accumulator(
values,
group_indices,
opt_filter,
total_num_groups,
|accumulator, values_to_accumulate| {
accumulator.merge_batch(values_to_accumulate)?;
Ok(())
},
)?;
Ok(())
}
fn size(&self) -> usize {
self.allocation_bytes
}
fn convert_to_state(
&self,
values: &[ArrayRef],
opt_filter: Option<&BooleanArray>,
) -> Result<Vec<ArrayRef>> {
let num_rows = values[0].len();
if num_rows == 0 {
let empty_state = (self.factory)()?.state()?;
let empty_arrays = empty_state
.into_iter()
.map(|state_val| new_empty_array(&state_val.data_type()))
.collect::<Vec<_>>();
return Ok(empty_arrays);
}
let mut results = vec![];
for row_idx in 0..num_rows {
let mut converted_accumulator = (self.factory)()?;
let values_to_accumulate =
slice_and_maybe_filter(values, opt_filter, &[row_idx, row_idx + 1])?;
converted_accumulator.update_batch(&values_to_accumulate)?;
let states = converted_accumulator.state()?;
results.resize_with(states.len(), || Vec::with_capacity(num_rows));
for (idx, state_val) in states.into_iter().enumerate() {
results[idx].push(state_val);
}
}
let arrays = results
.into_iter()
.map(ScalarValue::iter_to_array)
.collect::<Result<Vec<_>>>()?;
Ok(arrays)
}
fn supports_convert_to_state(&self) -> bool {
true
}
}
pub trait VecAllocExt {
type T;
fn allocated_size(&self) -> usize;
}
impl<T> VecAllocExt for Vec<T> {
type T = T;
fn allocated_size(&self) -> usize {
size_of::<T>() * self.capacity()
}
}
fn get_filter_at_indices(
opt_filter: Option<&BooleanArray>,
indices: &PrimitiveArray<UInt32Type>,
) -> Result<Option<ArrayRef>> {
opt_filter
.map(|filter| {
compute::take(
&filter, indices, None, )
})
.transpose()
.map_err(|e| arrow_datafusion_err!(e))
}
pub(crate) fn slice_and_maybe_filter(
aggr_array: &[ArrayRef],
filter_opt: Option<&BooleanArray>,
offsets: &[usize],
) -> Result<Vec<ArrayRef>> {
let (offset, length) = (offsets[0], offsets[1] - offsets[0]);
let sliced_arrays: Vec<ArrayRef> = aggr_array
.iter()
.map(|array| array.slice(offset, length))
.collect();
if let Some(f) = filter_opt {
let filter = f.slice(offset, length);
sliced_arrays
.iter()
.map(|array| {
compute::filter(&array, &filter).map_err(|e| arrow_datafusion_err!(e))
})
.collect()
} else {
Ok(sliced_arrays)
}
}