use std::sync::Arc;
use arrow::array::AsArray;
use arrow_array::{ArrayRef, BooleanArray};
use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder};
use datafusion_common::Result;
use crate::GroupsAccumulator;
use super::{accumulate::NullState, EmitTo};
#[derive(Debug)]
pub struct BooleanGroupsAccumulator<F>
where
F: Fn(bool, bool) -> bool + Send + Sync,
{
values: BooleanBufferBuilder,
null_state: NullState,
bool_fn: F,
}
impl<F> BooleanGroupsAccumulator<F>
where
F: Fn(bool, bool) -> bool + Send + Sync,
{
pub fn new(bitop_fn: F) -> Self {
Self {
values: BooleanBufferBuilder::new(0),
null_state: NullState::new(),
bool_fn: bitop_fn,
}
}
}
impl<F> GroupsAccumulator for BooleanGroupsAccumulator<F>
where
F: Fn(bool, bool) -> bool + Send + Sync,
{
fn update_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
assert_eq!(values.len(), 1, "single argument to update_batch");
let values = values[0].as_boolean();
if self.values.len() < total_num_groups {
let new_groups = total_num_groups - self.values.len();
self.values.append_n(new_groups, Default::default());
}
self.null_state.accumulate_boolean(
group_indices,
values,
opt_filter,
total_num_groups,
|group_index, new_value| {
let current_value = self.values.get_bit(group_index);
let value = (self.bool_fn)(current_value, new_value);
self.values.set_bit(group_index, value);
},
);
Ok(())
}
fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
let values = self.values.finish();
let values = match emit_to {
EmitTo::All => values,
EmitTo::First(n) => {
let first_n: BooleanBuffer = values.iter().take(n).collect();
for v in values.iter().skip(n) {
self.values.append(v);
}
first_n
}
};
let nulls = self.null_state.build(emit_to);
let values = BooleanArray::new(values, Some(nulls));
Ok(Arc::new(values))
}
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
self.evaluate(emit_to).map(|arr| vec![arr])
}
fn merge_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
self.update_batch(values, group_indices, opt_filter, total_num_groups)
}
fn size(&self) -> usize {
self.values.capacity() / 8 + self.null_state.size()
}
}