use std::any::Any;
use std::mem::size_of_val;
use std::sync::Arc;
use datafusion::arrow::array::{
new_null_array, Array, ArrayRef, AsArray, BooleanArray, BooleanBufferBuilder, UInt32Array,
};
use datafusion::arrow::compute::{concat, take};
use datafusion::arrow::datatypes::{DataType, Field};
use datafusion::common::{exec_err, Result, ScalarValue};
use datafusion::logical_expr::function::{AccumulatorArgs, StateFieldsArgs};
use datafusion::logical_expr::{
Accumulator, AggregateUDF, AggregateUDFImpl, EmitTo, GroupsAccumulator, Signature, Volatility,
};
pub fn any_value_udaf() -> AggregateUDF {
AggregateUDF::new_from_impl(AnyValueUdaf::new())
}
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct AnyValueUdaf {
signature: Signature,
}
impl Default for AnyValueUdaf {
fn default() -> Self {
Self::new()
}
}
impl AnyValueUdaf {
pub fn new() -> Self {
Self {
signature: Signature::any(1, Volatility::Immutable),
}
}
}
impl AggregateUDFImpl for AnyValueUdaf {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"hamelin_any_value"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
Ok(arg_types[0].clone())
}
fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<Arc<Field>>> {
Ok(vec![
Arc::new(
args.return_field
.as_ref()
.clone()
.with_name(format!("{}_value", args.name)),
),
Arc::new(Field::new(
format!("{}_is_set", args.name),
DataType::Boolean,
true,
)),
])
}
fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
Ok(Box::new(AnyValueAccumulator::new(
acc_args.return_field.data_type().clone(),
)))
}
fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool {
true
}
fn create_groups_accumulator(
&self,
args: AccumulatorArgs,
) -> Result<Box<dyn GroupsAccumulator>> {
Ok(Box::new(AnyValueGroupsAccumulator::new(
args.return_field.data_type().clone(),
)))
}
}
#[derive(Debug)]
struct AnyValueAccumulator {
value: Option<ScalarValue>,
data_type: DataType,
}
impl AnyValueAccumulator {
fn new(data_type: DataType) -> Self {
Self {
value: None,
data_type,
}
}
}
impl Accumulator for AnyValueAccumulator {
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
if self.value.is_some() {
return Ok(());
}
let arr = &values[0];
for i in 0..arr.len() {
if arr.is_valid(i) {
self.value = Some(ScalarValue::try_from_array(arr, i)?);
return Ok(());
}
}
Ok(())
}
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
if self.value.is_some() {
return Ok(());
}
let value_arr = &states[0];
let is_set_arr = states[1].as_boolean();
for i in 0..value_arr.len() {
if is_set_arr.is_valid(i) && is_set_arr.value(i) && value_arr.is_valid(i) {
self.value = Some(ScalarValue::try_from_array(value_arr, i)?);
return Ok(());
}
}
Ok(())
}
fn state(&mut self) -> Result<Vec<ScalarValue>> {
match &self.value {
Some(v) => Ok(vec![v.clone(), ScalarValue::Boolean(Some(true))]),
None => Ok(vec![
ScalarValue::try_new_null(&self.data_type)?,
ScalarValue::Boolean(Some(false)),
]),
}
}
fn evaluate(&mut self) -> Result<ScalarValue> {
match &self.value {
Some(v) => Ok(v.clone()),
None => ScalarValue::try_new_null(&self.data_type),
}
}
fn size(&self) -> usize {
size_of_val(self) + self.value.as_ref().map_or(0, |v| v.size())
}
}
#[derive(Debug, Clone)]
struct ValueRef {
source: ArrayRef,
index: u32,
}
#[derive(Debug)]
struct AnyValueGroupsAccumulator {
values: Vec<Option<ValueRef>>,
seen_epoch: Vec<u64>,
epoch: u64,
data_type: DataType,
}
impl AnyValueGroupsAccumulator {
fn new(data_type: DataType) -> Self {
Self {
values: Vec::new(),
seen_epoch: Vec::new(),
epoch: 1,
data_type,
}
}
fn ensure_capacity(&mut self, total_num_groups: usize) {
if self.values.len() < total_num_groups {
self.values.resize(total_num_groups, None);
}
if self.seen_epoch.len() < total_num_groups {
self.seen_epoch.resize(total_num_groups, 0);
}
}
fn next_epoch(&mut self) -> u64 {
let next = self.epoch.wrapping_add(1);
if next == 0 {
self.seen_epoch.fill(0);
self.epoch = 1;
1
} else {
self.epoch = next;
next
}
}
fn first_hits(
&mut self,
values: &ArrayRef,
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
) -> Vec<(usize, u32)> {
let epoch = self.next_epoch();
let has_nulls = values.null_count() > 0;
let mut hits = Vec::new();
for (row_idx, &group_idx) in group_indices.iter().enumerate() {
if self.values[group_idx].is_some() || self.seen_epoch[group_idx] == epoch {
continue;
}
if let Some(filter) = opt_filter {
if !filter.is_valid(row_idx) || !filter.value(row_idx) {
continue;
}
}
if has_nulls && !values.is_valid(row_idx) {
continue;
}
self.seen_epoch[group_idx] = epoch;
hits.push((group_idx, row_idx as u32));
}
hits
}
fn gather_and_assign(&mut self, arr: &ArrayRef, hits: &[(usize, u32)]) -> Result<()> {
if hits.is_empty() {
return Ok(());
}
let indices: UInt32Array = hits.iter().map(|&(_, row)| Some(row)).collect();
let gathered = take(arr.as_ref(), &indices, None)?;
for (offset, &(group_idx, _)) in hits.iter().enumerate() {
self.values[group_idx] = Some(ValueRef {
source: gathered.clone(),
index: offset as u32,
});
}
Ok(())
}
fn materialize_emitted(&self, emitted: &[Option<ValueRef>]) -> Result<ArrayRef> {
if emitted.is_empty() {
return Ok(new_null_array(&self.data_type, 0));
}
let non_null_count = emitted.iter().filter(|slot| slot.is_some()).count();
if non_null_count == 0 {
return Ok(new_null_array(&self.data_type, emitted.len()));
}
let Some(first_value) = emitted.iter().flatten().next() else {
return Ok(new_null_array(&self.data_type, emitted.len()));
};
let first_ptr = Arc::as_ptr(&first_value.source) as *const () as usize;
let single_source = emitted
.iter()
.flatten()
.all(|v| (Arc::as_ptr(&v.source) as *const () as usize) == first_ptr);
if single_source {
if non_null_count == emitted.len() {
let mut contiguous = true;
let start = first_value.index as usize;
let mut expected = start;
for value in emitted.iter().flatten() {
if value.index as usize != expected {
contiguous = false;
break;
}
expected = expected.saturating_add(1);
}
if contiguous && expected <= first_value.source.len() {
return Ok(first_value.source.slice(start, emitted.len()));
}
}
let indices: UInt32Array = emitted
.iter()
.map(|slot| slot.as_ref().map(|v| v.index))
.collect();
return Ok(take(first_value.source.as_ref(), &indices, None)?);
}
let mut entries: Vec<(usize, usize, u32, ArrayRef)> = Vec::with_capacity(non_null_count);
for (slot_idx, slot) in emitted.iter().enumerate() {
if let Some(value) = slot {
let ptr = Arc::as_ptr(&value.source) as *const () as usize;
entries.push((ptr, slot_idx, value.index, value.source.clone()));
}
}
entries.sort_unstable_by_key(|(ptr, _, _, _)| *ptr);
let null_singleton = new_null_array(&self.data_type, 1);
let mut sources: Vec<ArrayRef> = vec![null_singleton];
let mut mapped_indices: Vec<Option<u32>> = vec![None; emitted.len()];
let mut running: usize = 1; let mut i = 0usize;
while i < entries.len() {
let run_ptr = entries[i].0;
let source = entries[i].3.clone();
let source_offset = running;
running = running.saturating_add(source.len());
sources.push(source);
while i < entries.len() && entries[i].0 == run_ptr {
let (_, slot_idx, source_idx, _) = &entries[i];
let global = source_offset.saturating_add(*source_idx as usize);
if global > u32::MAX as usize {
return exec_err!(
"hamelin_any_value emitted array too large for UInt32 take indices: {}",
global
);
}
mapped_indices[*slot_idx] = Some(global as u32);
i += 1;
}
}
if running > u32::MAX as usize {
return exec_err!(
"hamelin_any_value emitted array too large for UInt32 take indices: {}",
running
);
}
let refs: Vec<&dyn Array> = sources.iter().map(|a| a.as_ref()).collect();
let concatenated = concat(&refs)?;
let indices: UInt32Array = mapped_indices.into_iter().collect();
Ok(take(concatenated.as_ref(), &indices, None)?)
}
fn emit(&mut self, emit_to: EmitTo) -> Vec<Option<ValueRef>> {
match emit_to {
EmitTo::All => {
self.seen_epoch.clear();
std::mem::take(&mut self.values)
}
EmitTo::First(n) => {
let n = n.min(self.values.len());
let mut rest_values = self.values.split_off(n);
std::mem::swap(&mut self.values, &mut rest_values);
let m = n.min(self.seen_epoch.len());
let mut rest_epoch = self.seen_epoch.split_off(m);
std::mem::swap(&mut self.seen_epoch, &mut rest_epoch);
rest_values
}
}
}
}
impl GroupsAccumulator for AnyValueGroupsAccumulator {
fn update_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
self.ensure_capacity(total_num_groups);
let hits = self.first_hits(&values[0], group_indices, opt_filter);
self.gather_and_assign(&values[0], &hits)
}
fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
let emitted = self.emit(emit_to);
self.materialize_emitted(&emitted)
}
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
let emitted = self.emit(emit_to);
let value_array = self.materialize_emitted(&emitted)?;
let mut is_set_builder = BooleanBufferBuilder::new(emitted.len());
for slot in &emitted {
is_set_builder.append(slot.is_some());
}
let is_set_array: ArrayRef = Arc::new(BooleanArray::new(is_set_builder.finish(), None));
Ok(vec![value_array, is_set_array])
}
fn merge_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
if values.len() != 2 {
return exec_err!(
"hamelin_any_value merge expects 2 state arrays, got {}",
values.len()
);
}
self.ensure_capacity(total_num_groups);
let value_arr = &values[0];
let is_set_arr = values[1].as_boolean();
let has_set_nulls = is_set_arr.null_count() > 0;
let has_value_nulls = value_arr.null_count() > 0;
let epoch = self.next_epoch();
for (row_idx, &group_idx) in group_indices.iter().enumerate() {
if self.values[group_idx].is_some() || self.seen_epoch[group_idx] == epoch {
continue;
}
if let Some(filter) = opt_filter {
if !filter.is_valid(row_idx) || !filter.value(row_idx) {
continue;
}
}
if has_set_nulls && !is_set_arr.is_valid(row_idx) {
continue;
}
if is_set_arr.value(row_idx) && !(has_value_nulls && !value_arr.is_valid(row_idx)) {
self.seen_epoch[group_idx] = epoch;
self.values[group_idx] = Some(ValueRef {
source: value_arr.clone(),
index: row_idx as u32,
});
}
}
Ok(())
}
fn size(&self) -> usize {
size_of_val(self)
+ self.values.capacity() * std::mem::size_of::<Option<ValueRef>>()
+ self.seen_epoch.capacity() * std::mem::size_of::<u64>()
}
}