use arrow::array::types::{
Date32Type, Date64Type, Decimal128Type, Time32MillisecondType, Time32SecondType,
Time64MicrosecondType, Time64NanosecondType, TimestampMicrosecondType,
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
};
use arrow::array::{ArrayRef, downcast_primitive};
use arrow::datatypes::{DataType, SchemaRef, TimeUnit};
use datafusion_common::Result;
use datafusion_expr::EmitTo;
pub mod multi_group_by;
mod row;
mod single_group_by;
use datafusion_physical_expr::binary_map::OutputType;
use multi_group_by::GroupValuesColumn;
use row::GroupValuesRows;
pub(crate) use single_group_by::primitive::HashValue;
use crate::aggregates::{
group_values::single_group_by::{
boolean::GroupValuesBoolean, bytes::GroupValuesBytes,
bytes_view::GroupValuesBytesView, primitive::GroupValuesPrimitive,
},
order::GroupOrdering,
};
mod metrics;
mod null_builder;
pub(crate) use metrics::GroupByMetrics;
pub trait GroupValues: Send {
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()>;
fn size(&self) -> usize;
fn is_empty(&self) -> bool;
fn len(&self) -> usize;
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>;
fn clear_shrink(&mut self, num_rows: usize);
}
pub fn new_group_values(
schema: SchemaRef,
group_ordering: &GroupOrdering,
) -> Result<Box<dyn GroupValues>> {
if schema.fields.len() == 1 {
let d = schema.fields[0].data_type();
macro_rules! downcast_helper {
($t:ty, $d:ident) => {
return Ok(Box::new(GroupValuesPrimitive::<$t>::new($d.clone())))
};
}
downcast_primitive! {
d => (downcast_helper, d),
_ => {}
}
match d {
DataType::Date32 => {
downcast_helper!(Date32Type, d);
}
DataType::Date64 => {
downcast_helper!(Date64Type, d);
}
DataType::Time32(t) => match t {
TimeUnit::Second => downcast_helper!(Time32SecondType, d),
TimeUnit::Millisecond => downcast_helper!(Time32MillisecondType, d),
_ => {}
},
DataType::Time64(t) => match t {
TimeUnit::Microsecond => downcast_helper!(Time64MicrosecondType, d),
TimeUnit::Nanosecond => downcast_helper!(Time64NanosecondType, d),
_ => {}
},
DataType::Timestamp(t, _tz) => match t {
TimeUnit::Second => downcast_helper!(TimestampSecondType, d),
TimeUnit::Millisecond => downcast_helper!(TimestampMillisecondType, d),
TimeUnit::Microsecond => downcast_helper!(TimestampMicrosecondType, d),
TimeUnit::Nanosecond => downcast_helper!(TimestampNanosecondType, d),
},
DataType::Decimal128(_, _) => {
downcast_helper!(Decimal128Type, d);
}
DataType::Utf8 => {
return Ok(Box::new(GroupValuesBytes::<i32>::new(OutputType::Utf8)));
}
DataType::LargeUtf8 => {
return Ok(Box::new(GroupValuesBytes::<i64>::new(OutputType::Utf8)));
}
DataType::Utf8View => {
return Ok(Box::new(GroupValuesBytesView::new(OutputType::Utf8View)));
}
DataType::Binary => {
return Ok(Box::new(GroupValuesBytes::<i32>::new(OutputType::Binary)));
}
DataType::LargeBinary => {
return Ok(Box::new(GroupValuesBytes::<i64>::new(OutputType::Binary)));
}
DataType::BinaryView => {
return Ok(Box::new(GroupValuesBytesView::new(OutputType::BinaryView)));
}
DataType::Boolean => {
return Ok(Box::new(GroupValuesBoolean::new()));
}
_ => {}
}
}
if multi_group_by::supported_schema(schema.as_ref()) {
if matches!(group_ordering, GroupOrdering::None) {
Ok(Box::new(GroupValuesColumn::<false>::try_new(schema)?))
} else {
Ok(Box::new(GroupValuesColumn::<true>::try_new(schema)?))
}
} else {
Ok(Box::new(GroupValuesRows::try_new(schema)?))
}
}