use std::mem::size_of;
use crate::aggregates::group_values::GroupValues;
use arrow::array::{Array, ArrayRef, OffsetSizeTrait};
use datafusion_common::Result;
use datafusion_expr::EmitTo;
use datafusion_physical_expr_common::binary_map::{ArrowBytesMap, OutputType};
pub struct GroupValuesBytes<O: OffsetSizeTrait> {
map: ArrowBytesMap<O, usize>,
num_groups: usize,
}
impl<O: OffsetSizeTrait> GroupValuesBytes<O> {
pub fn new(output_type: OutputType) -> Self {
Self {
map: ArrowBytesMap::new(output_type),
num_groups: 0,
}
}
}
impl<O: OffsetSizeTrait> GroupValues for GroupValuesBytes<O> {
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()> {
assert_eq!(cols.len(), 1);
let arr = &cols[0];
groups.clear();
self.map.insert_if_new(
arr,
|_value| {
let group_idx = self.num_groups;
self.num_groups += 1;
group_idx
},
|group_idx| {
groups.push(group_idx);
},
);
assert_eq!(groups.len(), arr.len());
Ok(())
}
fn size(&self) -> usize {
self.map.size() + size_of::<Self>()
}
fn is_empty(&self) -> bool {
self.num_groups == 0
}
fn len(&self) -> usize {
self.num_groups
}
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
let map_contents = self.map.take().into_state();
let group_values = match emit_to {
EmitTo::All => {
self.num_groups -= map_contents.len();
map_contents
}
EmitTo::First(n) if n == self.len() => {
self.num_groups -= map_contents.len();
map_contents
}
EmitTo::First(n) => {
let emit_group_values = map_contents.slice(0, n);
let remaining_group_values =
map_contents.slice(n, map_contents.len() - n);
self.num_groups = 0;
let mut group_indexes = vec![];
self.intern(&[remaining_group_values], &mut group_indexes)?;
assert_eq!(0, group_indexes[0]);
emit_group_values
}
};
Ok(vec![group_values])
}
fn clear_shrink(&mut self, _num_rows: usize) {
self.map.take();
}
}