use vortex_array::ArrayRef;
use vortex_array::ExecutionCtx;
use vortex_array::aggregate_fn::AggregateFnRef;
use vortex_array::aggregate_fn::fns::min_max::MinMax;
use vortex_array::aggregate_fn::fns::min_max::make_minmax_dtype;
use vortex_array::aggregate_fn::kernels::DynAggregateKernel;
use vortex_array::dtype::DType;
use vortex_array::dtype::Nullability;
use vortex_array::match_each_pvalue;
use vortex_array::scalar::PValue;
use vortex_array::scalar::Scalar;
use vortex_array::scalar::ScalarValue;
use vortex_error::VortexResult;
use crate::Sequence;
use crate::SequenceData;
#[derive(Debug)]
pub(crate) struct SequenceMinMaxKernel;
impl DynAggregateKernel for SequenceMinMaxKernel {
fn aggregate(
&self,
aggregate_fn: &AggregateFnRef,
batch: &ArrayRef,
_ctx: &mut ExecutionCtx,
) -> VortexResult<Option<Scalar>> {
if !aggregate_fn.is::<MinMax>() {
return Ok(None);
}
let Some(seq) = batch.as_opt::<Sequence>() else {
return Ok(None);
};
let struct_dtype = make_minmax_dtype(batch.dtype());
if seq.is_empty() {
return Ok(Some(Scalar::null(struct_dtype)));
}
let base = seq.base();
let last = SequenceData::try_last(base, seq.multiplier(), seq.ptype(), seq.len())?;
let (min_pvalue, max_pvalue) = match_each_pvalue!(
seq.multiplier(),
uint: |_v| { (base, last) },
int: |v| {
if v >= 0 {
(base, last)
} else {
(last, base)
}
},
float: |_v| { unreachable!("float multiplier not supported for SequenceArray") }
);
let non_nullable_dtype = DType::Primitive(seq.ptype(), Nullability::NonNullable);
let min_scalar = Scalar::try_new(
non_nullable_dtype.clone(),
Some(ScalarValue::Primitive(min_pvalue)),
)?;
let max_scalar =
Scalar::try_new(non_nullable_dtype, Some(ScalarValue::Primitive(max_pvalue)))?;
Ok(Some(Scalar::struct_(
struct_dtype,
vec![min_scalar, max_scalar],
)))
}
}