use arrow::array::{
Array, ArrayRef, AsArray, BinaryBuilder, BinaryViewBuilder, BooleanArray,
LargeBinaryBuilder, LargeStringBuilder, StringBuilder, StringViewBuilder,
};
use arrow::datatypes::DataType;
use datafusion_common::hash_map::Entry;
use datafusion_common::{HashMap, Result, internal_err};
use datafusion_expr::{EmitTo, GroupsAccumulator};
use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::apply_filter_as_nulls;
use std::mem::size_of;
use std::sync::Arc;
#[derive(Debug)]
pub(crate) struct MinMaxBytesAccumulator {
inner: MinMaxBytesState,
is_min: bool,
}
impl MinMaxBytesAccumulator {
pub fn new_min(data_type: DataType) -> Self {
Self {
inner: MinMaxBytesState::new(data_type),
is_min: true,
}
}
pub fn new_max(data_type: DataType) -> Self {
Self {
inner: MinMaxBytesState::new(data_type),
is_min: false,
}
}
}
impl GroupsAccumulator for MinMaxBytesAccumulator {
fn update_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
let array = &values[0];
assert_eq!(array.len(), group_indices.len());
assert_eq!(array.data_type(), &self.inner.data_type);
let array = apply_filter_as_nulls(array, opt_filter)?;
fn string_min(a: &[u8], b: &[u8]) -> bool {
unsafe {
let a = std::str::from_utf8_unchecked(a);
let b = std::str::from_utf8_unchecked(b);
a < b
}
}
fn string_max(a: &[u8], b: &[u8]) -> bool {
unsafe {
let a = std::str::from_utf8_unchecked(a);
let b = std::str::from_utf8_unchecked(b);
a > b
}
}
fn binary_min(a: &[u8], b: &[u8]) -> bool {
a < b
}
fn binary_max(a: &[u8], b: &[u8]) -> bool {
a > b
}
fn str_to_bytes<'a>(
it: impl Iterator<Item = Option<&'a str>>,
) -> impl Iterator<Item = Option<&'a [u8]>> {
it.map(|s| s.map(|s| s.as_bytes()))
}
match (self.is_min, &self.inner.data_type) {
(true, &DataType::Utf8) => self.inner.update_batch(
str_to_bytes(array.as_string::<i32>().iter()),
group_indices,
total_num_groups,
string_min,
),
(true, &DataType::LargeUtf8) => self.inner.update_batch(
str_to_bytes(array.as_string::<i64>().iter()),
group_indices,
total_num_groups,
string_min,
),
(true, &DataType::Utf8View) => self.inner.update_batch(
str_to_bytes(array.as_string_view().iter()),
group_indices,
total_num_groups,
string_min,
),
(false, &DataType::Utf8) => self.inner.update_batch(
str_to_bytes(array.as_string::<i32>().iter()),
group_indices,
total_num_groups,
string_max,
),
(false, &DataType::LargeUtf8) => self.inner.update_batch(
str_to_bytes(array.as_string::<i64>().iter()),
group_indices,
total_num_groups,
string_max,
),
(false, &DataType::Utf8View) => self.inner.update_batch(
str_to_bytes(array.as_string_view().iter()),
group_indices,
total_num_groups,
string_max,
),
(true, &DataType::Binary) => self.inner.update_batch(
array.as_binary::<i32>().iter(),
group_indices,
total_num_groups,
binary_min,
),
(true, &DataType::LargeBinary) => self.inner.update_batch(
array.as_binary::<i64>().iter(),
group_indices,
total_num_groups,
binary_min,
),
(true, &DataType::BinaryView) => self.inner.update_batch(
array.as_binary_view().iter(),
group_indices,
total_num_groups,
binary_min,
),
(false, &DataType::Binary) => self.inner.update_batch(
array.as_binary::<i32>().iter(),
group_indices,
total_num_groups,
binary_max,
),
(false, &DataType::LargeBinary) => self.inner.update_batch(
array.as_binary::<i64>().iter(),
group_indices,
total_num_groups,
binary_max,
),
(false, &DataType::BinaryView) => self.inner.update_batch(
array.as_binary_view().iter(),
group_indices,
total_num_groups,
binary_max,
),
_ => internal_err!(
"Unexpected combination for MinMaxBytesAccumulator: ({:?}, {:?})",
self.is_min,
self.inner.data_type
),
}
}
fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
let (data_capacity, min_maxes) = self.inner.emit_to(emit_to);
fn bytes_to_str(
min_maxes: Vec<Option<Vec<u8>>>,
) -> impl Iterator<Item = Option<String>> {
min_maxes.into_iter().map(|opt| {
opt.map(|bytes| {
unsafe { String::from_utf8_unchecked(bytes) }
})
})
}
let result: ArrayRef = match self.inner.data_type {
DataType::Utf8 => {
let mut builder =
StringBuilder::with_capacity(min_maxes.len(), data_capacity);
for opt in bytes_to_str(min_maxes) {
match opt {
None => builder.append_null(),
Some(s) => builder.append_value(s.as_str()),
}
}
Arc::new(builder.finish())
}
DataType::LargeUtf8 => {
let mut builder =
LargeStringBuilder::with_capacity(min_maxes.len(), data_capacity);
for opt in bytes_to_str(min_maxes) {
match opt {
None => builder.append_null(),
Some(s) => builder.append_value(s.as_str()),
}
}
Arc::new(builder.finish())
}
DataType::Utf8View => {
let block_size = capacity_to_view_block_size(data_capacity);
let mut builder = StringViewBuilder::with_capacity(min_maxes.len())
.with_fixed_block_size(block_size);
for opt in bytes_to_str(min_maxes) {
match opt {
None => builder.append_null(),
Some(s) => builder.append_value(s.as_str()),
}
}
Arc::new(builder.finish())
}
DataType::Binary => {
let mut builder =
BinaryBuilder::with_capacity(min_maxes.len(), data_capacity);
for opt in min_maxes {
match opt {
None => builder.append_null(),
Some(s) => builder.append_value(s.as_ref() as &[u8]),
}
}
Arc::new(builder.finish())
}
DataType::LargeBinary => {
let mut builder =
LargeBinaryBuilder::with_capacity(min_maxes.len(), data_capacity);
for opt in min_maxes {
match opt {
None => builder.append_null(),
Some(s) => builder.append_value(s.as_ref() as &[u8]),
}
}
Arc::new(builder.finish())
}
DataType::BinaryView => {
let block_size = capacity_to_view_block_size(data_capacity);
let mut builder = BinaryViewBuilder::with_capacity(min_maxes.len())
.with_fixed_block_size(block_size);
for opt in min_maxes {
match opt {
None => builder.append_null(),
Some(s) => builder.append_value(s.as_ref() as &[u8]),
}
}
Arc::new(builder.finish())
}
_ => {
return internal_err!(
"Unexpected data type for MinMaxBytesAccumulator: {:?}",
self.inner.data_type
);
}
};
assert_eq!(&self.inner.data_type, result.data_type());
Ok(result)
}
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
self.evaluate(emit_to).map(|arr| vec![arr])
}
fn merge_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
self.update_batch(values, group_indices, opt_filter, total_num_groups)
}
fn convert_to_state(
&self,
values: &[ArrayRef],
opt_filter: Option<&BooleanArray>,
) -> Result<Vec<ArrayRef>> {
let output = apply_filter_as_nulls(&values[0], opt_filter)?;
Ok(vec![output])
}
fn supports_convert_to_state(&self) -> bool {
true
}
fn size(&self) -> usize {
self.inner.size()
}
}
fn capacity_to_view_block_size(data_capacity: usize) -> u32 {
let max_block_size = 2 * 1024 * 1024;
if data_capacity == 0 {
return 1;
}
if let Ok(block_size) = u32::try_from(data_capacity) {
block_size.min(max_block_size)
} else {
max_block_size
}
}
#[derive(Debug)]
struct MinMaxBytesState {
min_max: Vec<Option<Vec<u8>>>,
data_type: DataType,
total_data_bytes: usize,
}
impl MinMaxBytesState {
fn new(data_type: DataType) -> Self {
Self {
min_max: vec![],
data_type,
total_data_bytes: 0,
}
}
fn set_value(&mut self, group_index: usize, new_val: &[u8]) {
match self.min_max[group_index].as_mut() {
None => {
self.min_max[group_index] = Some(new_val.to_vec());
self.total_data_bytes += new_val.len();
}
Some(existing_val) => {
self.total_data_bytes -= existing_val.len();
self.total_data_bytes += new_val.len();
existing_val.clear();
existing_val.extend_from_slice(new_val);
}
}
}
fn update_batch<'a, F, I>(
&mut self,
iter: I,
group_indices: &[usize],
total_num_groups: usize,
mut cmp: F,
) -> Result<()>
where
F: FnMut(&[u8], &[u8]) -> bool + Send + Sync,
I: IntoIterator<Item = Option<&'a [u8]>>,
{
self.min_max.resize(total_num_groups, None);
let mut locations = HashMap::<usize, &[u8]>::with_capacity(group_indices.len());
for (new_val, group_index) in iter.into_iter().zip(group_indices.iter()) {
let group_index = *group_index;
let Some(new_val) = new_val else {
continue; };
match locations.entry(group_index) {
Entry::Occupied(mut occupied_entry) => {
if cmp(new_val, occupied_entry.get()) {
occupied_entry.insert(new_val);
}
}
Entry::Vacant(vacant_entry) => {
if let Some(old_val) = self.min_max[group_index].as_ref() {
if cmp(new_val, old_val) {
vacant_entry.insert(new_val);
}
} else {
vacant_entry.insert(new_val);
}
}
};
}
for (group_index, location) in locations.iter() {
self.set_value(*group_index, location);
}
Ok(())
}
fn emit_to(&mut self, emit_to: EmitTo) -> (usize, Vec<Option<Vec<u8>>>) {
match emit_to {
EmitTo::All => {
(
std::mem::take(&mut self.total_data_bytes), std::mem::take(&mut self.min_max),
)
}
EmitTo::First(n) => {
let first_min_maxes: Vec<_> = self.min_max.drain(..n).collect();
let first_data_capacity: usize = first_min_maxes
.iter()
.map(|opt| opt.as_ref().map(|s| s.len()).unwrap_or(0))
.sum();
self.total_data_bytes -= first_data_capacity;
(first_data_capacity, first_min_maxes)
}
}
}
fn size(&self) -> usize {
self.total_data_bytes + self.min_max.len() * size_of::<Option<Vec<u8>>>()
}
}