use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use crate::ArrayRef;
use crate::Columnar;
use crate::ExecutionCtx;
use crate::IntoArray;
use crate::aggregate_fn::AggregateFnId;
use crate::aggregate_fn::AggregateFnRef;
use crate::aggregate_fn::AggregateFnSatisfaction;
use crate::aggregate_fn::AggregateFnVTable;
use crate::aggregate_fn::EmptyOptions;
use crate::aggregate_fn::fns::bounded_min::BoundedMin;
use crate::aggregate_fn::fns::min_max::MinMax;
use crate::aggregate_fn::fns::min_max::min_max;
use crate::dtype::DType;
use crate::partial_ord::partial_min;
use crate::scalar::Scalar;
#[derive(Clone, Debug)]
pub struct Min;
pub struct MinPartial {
min: Option<Scalar>,
element_dtype: DType,
}
impl MinPartial {
fn merge(&mut self, min: Scalar) {
if min.is_null() {
return;
}
self.min = Some(match self.min.take() {
Some(current) => partial_min(min, current).vortex_expect("incomparable min scalars"),
None => min,
});
}
}
impl AggregateFnVTable for Min {
type Options = EmptyOptions;
type Partial = MinPartial;
fn id(&self) -> AggregateFnId {
AggregateFnId::new("vortex.min")
}
fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
Ok(None)
}
fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option<DType> {
MinMax
.return_dtype(&EmptyOptions, input_dtype)
.map(|_| input_dtype.as_nullable())
}
fn can_satisfy(
&self,
_options: &Self::Options,
requested: &AggregateFnRef,
) -> AggregateFnSatisfaction {
if requested.is::<Self>() {
AggregateFnSatisfaction::Exact
} else if requested.is::<BoundedMin>() {
AggregateFnSatisfaction::Approximate
} else {
AggregateFnSatisfaction::No
}
}
fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
self.return_dtype(options, input_dtype)
}
fn empty_partial(
&self,
_options: &Self::Options,
input_dtype: &DType,
) -> VortexResult<Self::Partial> {
Ok(MinPartial {
min: None,
element_dtype: input_dtype.clone(),
})
}
fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
partial.merge(other);
Ok(())
}
fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
let dtype = partial.element_dtype.as_nullable();
match &partial.min {
Some(min) => min.cast(&dtype),
None => Ok(Scalar::null(dtype)),
}
}
fn reset(&self, partial: &mut Self::Partial) {
partial.min = None;
}
fn is_saturated(&self, _partial: &Self::Partial) -> bool {
false
}
fn accumulate(
&self,
partial: &mut Self::Partial,
batch: &Columnar,
ctx: &mut ExecutionCtx,
) -> VortexResult<()> {
let array = match batch {
Columnar::Canonical(canonical) => canonical.clone().into_array(),
Columnar::Constant(constant) => constant.clone().into_array(),
};
if let Some(result) = min_max(&array, ctx)? {
partial.merge(result.min);
}
Ok(())
}
fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
Ok(partials)
}
fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
self.to_scalar(partial)
}
}
#[cfg(test)]
mod tests {
use vortex_buffer::buffer;
use vortex_error::VortexResult;
use crate::IntoArray as _;
use crate::LEGACY_SESSION;
use crate::VortexSessionExecute;
use crate::aggregate_fn::Accumulator;
use crate::aggregate_fn::DynAccumulator;
use crate::aggregate_fn::EmptyOptions;
use crate::aggregate_fn::fns::min::Min;
use crate::arrays::PrimitiveArray;
use crate::dtype::DType;
use crate::dtype::Nullability;
use crate::dtype::PType;
use crate::expr::stats::Precision;
use crate::expr::stats::Stat;
use crate::scalar::Scalar;
use crate::scalar::ScalarValue;
use crate::validity::Validity;
#[test]
fn min_aggregate_fn() -> VortexResult<()> {
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
let mut acc = Accumulator::try_new(Min, EmptyOptions, dtype)?;
let batch1 = PrimitiveArray::new(buffer![10i32, 20, 5], Validity::NonNullable).into_array();
acc.accumulate(&batch1, &mut ctx)?;
let batch2 = PrimitiveArray::new(buffer![3i32, 25], Validity::NonNullable).into_array();
acc.accumulate(&batch2, &mut ctx)?;
assert_eq!(
acc.finish()?,
Scalar::primitive(3i32, Nullability::Nullable)
);
Ok(())
}
#[test]
fn min_empty_group_returns_null() -> VortexResult<()> {
let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
let mut acc = Accumulator::try_new(Min, EmptyOptions, dtype)?;
assert_eq!(
acc.finish()?,
Scalar::null(DType::Primitive(PType::I32, Nullability::Nullable))
);
Ok(())
}
#[test]
fn min_casts_nonnullable_legacy_stat_to_nullable_partial() -> VortexResult<()> {
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let batch = PrimitiveArray::new(buffer![10i32, 20], Validity::NonNullable).into_array();
batch
.statistics()
.set(Stat::Min, Precision::Exact(ScalarValue::from(3i32)));
let mut acc = Accumulator::try_new(Min, EmptyOptions, batch.dtype().clone())?;
acc.accumulate(&batch, &mut ctx)?;
assert_eq!(
acc.finish()?,
Scalar::primitive(3i32, Nullability::Nullable)
);
Ok(())
}
}