use std::sync::Arc;
use crate::aggregate::groups_accumulator::nulls::filtered_null_mask;
use arrow::array::{ArrayRef, AsArray, BooleanArray, BooleanBufferBuilder};
use arrow::buffer::BooleanBuffer;
use datafusion_common::Result;
use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator};
use super::accumulate::NullState;
#[derive(Debug)]
pub struct BooleanGroupsAccumulator<F>
where
F: Fn(bool, bool) -> bool + Send + Sync,
{
values: BooleanBufferBuilder,
null_state: NullState,
bool_fn: F,
identity: bool,
}
impl<F> BooleanGroupsAccumulator<F>
where
F: Fn(bool, bool) -> bool + Send + Sync,
{
pub fn new(bool_fn: F, identity: bool) -> Self {
Self {
values: BooleanBufferBuilder::new(0),
null_state: NullState::new(),
bool_fn,
identity,
}
}
}
impl<F> GroupsAccumulator for BooleanGroupsAccumulator<F>
where
F: Fn(bool, bool) -> bool + Send + Sync,
{
fn update_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
assert_eq!(values.len(), 1, "single argument to update_batch");
let values = values[0].as_boolean();
if self.values.len() < total_num_groups {
let new_groups = total_num_groups - self.values.len();
self.values.append_n(new_groups, self.identity);
}
self.null_state.accumulate_boolean(
group_indices,
values,
opt_filter,
total_num_groups,
|group_index, new_value| {
let current_value = self.values.get_bit(group_index);
let value = (self.bool_fn)(current_value, new_value);
self.values.set_bit(group_index, value);
},
);
Ok(())
}
fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
let values = self.values.finish();
let values = match emit_to {
EmitTo::All => values,
EmitTo::First(n) => {
let first_n: BooleanBuffer = values.iter().take(n).collect();
for v in values.iter().skip(n) {
self.values.append(v);
}
first_n
}
};
let nulls = self.null_state.build(emit_to);
let values = BooleanArray::new(values, Some(nulls));
Ok(Arc::new(values))
}
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
self.evaluate(emit_to).map(|arr| vec![arr])
}
fn merge_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
self.update_batch(values, group_indices, opt_filter, total_num_groups)
}
fn size(&self) -> usize {
self.values.capacity() / 8 + self.null_state.size()
}
fn convert_to_state(
&self,
values: &[ArrayRef],
opt_filter: Option<&BooleanArray>,
) -> Result<Vec<ArrayRef>> {
let values = values[0].as_boolean().clone();
let values_null_buffer_filtered = filtered_null_mask(opt_filter, &values);
let (values_buf, _) = values.into_parts();
let values_filtered = BooleanArray::new(values_buf, values_null_buffer_filtered);
Ok(vec![Arc::new(values_filtered)])
}
fn supports_convert_to_state(&self) -> bool {
true
}
}