Skip to main content

vortex_array/aggregate_fn/fns/min_max/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4mod bool;
5mod decimal;
6mod extension;
7mod primitive;
8mod varbin;
9
10use std::sync::LazyLock;
11
12use vortex_error::VortexExpect;
13use vortex_error::VortexResult;
14use vortex_error::vortex_bail;
15
16use self::bool::accumulate_bool;
17use self::decimal::accumulate_decimal;
18use self::extension::accumulate_extension;
19use self::primitive::accumulate_primitive;
20use self::varbin::accumulate_varbinview;
21use crate::ArrayRef;
22use crate::Canonical;
23use crate::Columnar;
24use crate::ExecutionCtx;
25use crate::aggregate_fn::Accumulator;
26use crate::aggregate_fn::AggregateFnId;
27use crate::aggregate_fn::AggregateFnVTable;
28use crate::aggregate_fn::DynAccumulator;
29use crate::aggregate_fn::EmptyOptions;
30use crate::dtype::DType;
31use crate::dtype::FieldNames;
32use crate::dtype::Nullability;
33use crate::dtype::StructFields;
34use crate::expr::stats::Precision;
35use crate::expr::stats::Stat;
36use crate::expr::stats::StatsProvider;
37use crate::partial_ord::partial_max;
38use crate::partial_ord::partial_min;
39use crate::scalar::Scalar;
40
41static NAMES: LazyLock<FieldNames> = LazyLock::new(|| FieldNames::from(["min", "max"]));
42
43/// The minimum and maximum non-null values of an array, or `None` if there are no non-null values.
44///
45/// The result scalars have the non-nullable version of the array dtype.
46/// This will update the stats set of the array as a side effect.
47pub fn min_max(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Option<MinMaxResult>> {
48    // Short-circuit using cached array statistics.
49    let cached_min = array
50        .statistics()
51        .get(Stat::Min)
52        .and_then(Precision::as_exact);
53    let cached_max = array
54        .statistics()
55        .get(Stat::Max)
56        .and_then(Precision::as_exact);
57    if let Some((min, max)) = cached_min.zip(cached_max) {
58        let non_nullable_dtype = array.dtype().as_nonnullable();
59        return Ok(Some(MinMaxResult {
60            min: min.cast(&non_nullable_dtype)?,
61            max: max.cast(&non_nullable_dtype)?,
62        }));
63    }
64
65    // Short-circuit for empty arrays or all-null arrays.
66    if array.is_empty() || array.valid_count(ctx)? == 0 {
67        return Ok(None);
68    }
69
70    // Short-circuit for unsupported dtypes.
71    if MinMax.return_dtype(&EmptyOptions, array.dtype()).is_none() {
72        return Ok(None);
73    }
74
75    // Compute using Accumulator<MinMax>.
76    let mut acc = Accumulator::try_new(MinMax, EmptyOptions, array.dtype().clone())?;
77    acc.accumulate(array, ctx)?;
78    let result_scalar = acc.finish()?;
79    let result = MinMaxResult::from_scalar(result_scalar)?;
80
81    // Cache the computed min/max as statistics.
82    if let Some(r) = &result {
83        if let Some(min_value) = r.min.value() {
84            array
85                .statistics()
86                .set(Stat::Min, Precision::Exact(min_value.clone()));
87        }
88        if let Some(max_value) = r.max.value() {
89            array
90                .statistics()
91                .set(Stat::Max, Precision::Exact(max_value.clone()));
92        }
93    }
94
95    Ok(result)
96}
97
98/// The minimum and maximum non-null values of an array.
99#[derive(Debug, Clone, PartialEq, Eq)]
100pub struct MinMaxResult {
101    pub min: Scalar,
102    pub max: Scalar,
103}
104
105impl MinMaxResult {
106    /// Extract a `MinMaxResult` from a struct scalar with `{min, max}` fields.
107    pub fn from_scalar(scalar: Scalar) -> VortexResult<Option<Self>> {
108        if scalar.is_null() {
109            Ok(None)
110        } else {
111            let min = scalar
112                .as_struct()
113                .field_by_idx(0)
114                .vortex_expect("missing min field");
115            let max = scalar
116                .as_struct()
117                .field_by_idx(1)
118                .vortex_expect("missing max field");
119            Ok(Some(MinMaxResult { min, max }))
120        }
121    }
122}
123
124/// Compute the min and max of an array.
125///
126/// Returns a nullable struct scalar `{min: T, max: T}` where `T` is the non-nullable input dtype.
127/// The struct is null when the array is empty or all-null.
128#[derive(Clone, Debug)]
129pub struct MinMax;
130
131/// Partial accumulator state for min/max.
132pub struct MinMaxPartial {
133    min: Option<Scalar>,
134    max: Option<Scalar>,
135    element_dtype: DType,
136}
137
138impl MinMaxPartial {
139    /// Merge a local `MinMaxResult` into this partial state.
140    fn merge(&mut self, local: Option<MinMaxResult>) {
141        let Some(MinMaxResult { min, max }) = local else {
142            return;
143        };
144
145        self.min = Some(match self.min.take() {
146            Some(current) => partial_min(min, current).vortex_expect("incomparable min scalars"),
147            None => min,
148        });
149
150        self.max = Some(match self.max.take() {
151            Some(current) => partial_max(max, current).vortex_expect("incomparable max scalars"),
152            None => max,
153        });
154    }
155}
156
157/// Creates the struct dtype `{min: T, max: T}` (nullable) used for min/max aggregate results.
158pub fn make_minmax_dtype(element_dtype: &DType) -> DType {
159    DType::Struct(
160        StructFields::new(
161            NAMES.clone(),
162            vec![
163                element_dtype.as_nonnullable(),
164                element_dtype.as_nonnullable(),
165            ],
166        ),
167        Nullability::Nullable,
168    )
169}
170
171impl AggregateFnVTable for MinMax {
172    type Options = EmptyOptions;
173    type Partial = MinMaxPartial;
174
175    fn id(&self) -> AggregateFnId {
176        AggregateFnId::new("vortex.min_max")
177    }
178
179    fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
180        unimplemented!("MinMax is not yet serializable");
181    }
182
183    fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option<DType> {
184        match input_dtype {
185            DType::Bool(_)
186            | DType::Primitive(..)
187            | DType::Decimal(..)
188            | DType::Utf8(..)
189            | DType::Binary(..)
190            | DType::Extension(..) => Some(make_minmax_dtype(input_dtype)),
191            _ => None,
192        }
193    }
194
195    fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
196        self.return_dtype(options, input_dtype)
197    }
198
199    fn empty_partial(
200        &self,
201        _options: &Self::Options,
202        input_dtype: &DType,
203    ) -> VortexResult<Self::Partial> {
204        Ok(MinMaxPartial {
205            min: None,
206            max: None,
207            element_dtype: input_dtype.clone(),
208        })
209    }
210
211    fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
212        let local = MinMaxResult::from_scalar(other)?;
213        partial.merge(local);
214        Ok(())
215    }
216
217    fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
218        let dtype = make_minmax_dtype(&partial.element_dtype);
219        Ok(match (&partial.min, &partial.max) {
220            (Some(min), Some(max)) => Scalar::struct_(dtype, vec![min.clone(), max.clone()]),
221            _ => Scalar::null(dtype),
222        })
223    }
224
225    fn reset(&self, partial: &mut Self::Partial) {
226        partial.min = None;
227        partial.max = None;
228    }
229
230    #[inline]
231    fn is_saturated(&self, _partial: &Self::Partial) -> bool {
232        false
233    }
234
235    fn accumulate(
236        &self,
237        partial: &mut Self::Partial,
238        batch: &Columnar,
239        ctx: &mut ExecutionCtx,
240    ) -> VortexResult<()> {
241        match batch {
242            Columnar::Constant(c) => {
243                let scalar = c.scalar();
244                if scalar.is_null() {
245                    return Ok(());
246                }
247                // Skip NaN float constants
248                if scalar.as_primitive_opt().is_some_and(|p| p.is_nan()) {
249                    return Ok(());
250                }
251                let non_nullable_dtype = scalar.dtype().as_nonnullable();
252                let cast = scalar.cast(&non_nullable_dtype)?;
253                partial.merge(Some(MinMaxResult {
254                    min: cast.clone(),
255                    max: cast,
256                }));
257                Ok(())
258            }
259            Columnar::Canonical(c) => match c {
260                Canonical::Primitive(p) => accumulate_primitive(partial, p, ctx),
261                Canonical::Bool(b) => accumulate_bool(partial, b, ctx),
262                Canonical::VarBinView(v) => accumulate_varbinview(partial, v),
263                Canonical::Decimal(d) => accumulate_decimal(partial, d, ctx),
264                Canonical::Extension(e) => accumulate_extension(partial, e, ctx),
265                Canonical::Null(_) => Ok(()),
266                Canonical::Struct(_)
267                | Canonical::List(_)
268                | Canonical::FixedSizeList(_)
269                | Canonical::Variant(_) => {
270                    vortex_bail!("Unsupported canonical type for min_max: {}", batch.dtype())
271                }
272            },
273        }
274    }
275
276    fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
277        Ok(partials)
278    }
279
280    fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
281        self.to_scalar(partial)
282    }
283}
284
285#[cfg(test)]
286mod tests {
287    use vortex_buffer::BitBuffer;
288    use vortex_buffer::buffer;
289    use vortex_error::VortexExpect;
290    use vortex_error::VortexResult;
291
292    use crate::IntoArray as _;
293    use crate::LEGACY_SESSION;
294    use crate::VortexSessionExecute;
295    use crate::aggregate_fn::Accumulator;
296    use crate::aggregate_fn::AggregateFnVTable;
297    use crate::aggregate_fn::DynAccumulator;
298    use crate::aggregate_fn::EmptyOptions;
299    use crate::aggregate_fn::fns::min_max::MinMax;
300    use crate::aggregate_fn::fns::min_max::MinMaxResult;
301    use crate::aggregate_fn::fns::min_max::make_minmax_dtype;
302    use crate::aggregate_fn::fns::min_max::min_max;
303    use crate::arrays::BoolArray;
304    use crate::arrays::ChunkedArray;
305    use crate::arrays::ConstantArray;
306    use crate::arrays::DecimalArray;
307    use crate::arrays::NullArray;
308    use crate::arrays::PrimitiveArray;
309    use crate::arrays::VarBinArray;
310    use crate::dtype::DType;
311    use crate::dtype::DecimalDType;
312    use crate::dtype::Nullability;
313    use crate::dtype::PType;
314    use crate::scalar::DecimalValue;
315    use crate::scalar::Scalar;
316    use crate::scalar::ScalarValue;
317    use crate::validity::Validity;
318
319    #[test]
320    fn test_prim_min_max() -> VortexResult<()> {
321        let p = PrimitiveArray::new(buffer![1, 2, 3], Validity::NonNullable).into_array();
322        let mut ctx = LEGACY_SESSION.create_execution_ctx();
323        assert_eq!(
324            min_max(&p, &mut ctx)?,
325            Some(MinMaxResult {
326                min: 1.into(),
327                max: 3.into()
328            })
329        );
330        Ok(())
331    }
332
333    #[test]
334    fn test_bool_min_max() -> VortexResult<()> {
335        let mut ctx = LEGACY_SESSION.create_execution_ctx();
336
337        let all_true = BoolArray::new(
338            BitBuffer::from([true, true, true].as_slice()),
339            Validity::NonNullable,
340        )
341        .into_array();
342        assert_eq!(
343            min_max(&all_true, &mut ctx)?,
344            Some(MinMaxResult {
345                min: true.into(),
346                max: true.into()
347            })
348        );
349
350        let all_false = BoolArray::new(
351            BitBuffer::from([false, false, false].as_slice()),
352            Validity::NonNullable,
353        )
354        .into_array();
355        assert_eq!(
356            min_max(&all_false, &mut ctx)?,
357            Some(MinMaxResult {
358                min: false.into(),
359                max: false.into()
360            })
361        );
362
363        let mixed = BoolArray::new(
364            BitBuffer::from([false, true, false].as_slice()),
365            Validity::NonNullable,
366        )
367        .into_array();
368        assert_eq!(
369            min_max(&mixed, &mut ctx)?,
370            Some(MinMaxResult {
371                min: false.into(),
372                max: true.into()
373            })
374        );
375        Ok(())
376    }
377
378    #[test]
379    fn test_null_array() -> VortexResult<()> {
380        let p = NullArray::new(1).into_array();
381        let mut ctx = LEGACY_SESSION.create_execution_ctx();
382        assert_eq!(min_max(&p, &mut ctx)?, None);
383        Ok(())
384    }
385
386    #[test]
387    fn test_prim_nan() -> VortexResult<()> {
388        let array = PrimitiveArray::new(
389            buffer![f32::NAN, -f32::NAN, -1.0, 1.0],
390            Validity::NonNullable,
391        );
392        let mut ctx = LEGACY_SESSION.create_execution_ctx();
393        let result = min_max(&array.into_array(), &mut ctx)?.vortex_expect("should have result");
394        assert_eq!(f32::try_from(&result.min)?, -1.0);
395        assert_eq!(f32::try_from(&result.max)?, 1.0);
396        Ok(())
397    }
398
399    #[test]
400    fn test_prim_inf() -> VortexResult<()> {
401        let array = PrimitiveArray::new(
402            buffer![f32::INFINITY, f32::NEG_INFINITY, -1.0, 1.0],
403            Validity::NonNullable,
404        );
405        let mut ctx = LEGACY_SESSION.create_execution_ctx();
406        let result = min_max(&array.into_array(), &mut ctx)?.vortex_expect("should have result");
407        assert_eq!(f32::try_from(&result.min)?, f32::NEG_INFINITY);
408        assert_eq!(f32::try_from(&result.max)?, f32::INFINITY);
409        Ok(())
410    }
411
412    #[test]
413    fn test_multi_batch() -> VortexResult<()> {
414        let mut ctx = LEGACY_SESSION.create_execution_ctx();
415        let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
416        let mut acc = Accumulator::try_new(MinMax, EmptyOptions, dtype)?;
417
418        let batch1 = PrimitiveArray::new(buffer![10i32, 20, 5], Validity::NonNullable).into_array();
419        acc.accumulate(&batch1, &mut ctx)?;
420
421        let batch2 = PrimitiveArray::new(buffer![3i32, 25], Validity::NonNullable).into_array();
422        acc.accumulate(&batch2, &mut ctx)?;
423
424        let result = MinMaxResult::from_scalar(acc.finish()?)?.vortex_expect("should have result");
425        assert_eq!(result.min, Scalar::from(3i32));
426        assert_eq!(result.max, Scalar::from(25i32));
427        Ok(())
428    }
429
430    #[test]
431    fn test_finish_resets_state() -> VortexResult<()> {
432        let mut ctx = LEGACY_SESSION.create_execution_ctx();
433        let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
434        let mut acc = Accumulator::try_new(MinMax, EmptyOptions, dtype)?;
435
436        let batch1 = PrimitiveArray::new(buffer![10i32, 20], Validity::NonNullable).into_array();
437        acc.accumulate(&batch1, &mut ctx)?;
438        let result1 = MinMaxResult::from_scalar(acc.finish()?)?.vortex_expect("should have result");
439        assert_eq!(result1.min, Scalar::from(10i32));
440        assert_eq!(result1.max, Scalar::from(20i32));
441
442        let batch2 = PrimitiveArray::new(buffer![3i32, 6, 9], Validity::NonNullable).into_array();
443        acc.accumulate(&batch2, &mut ctx)?;
444        let result2 = MinMaxResult::from_scalar(acc.finish()?)?.vortex_expect("should have result");
445        assert_eq!(result2.min, Scalar::from(3i32));
446        assert_eq!(result2.max, Scalar::from(9i32));
447        Ok(())
448    }
449
450    #[test]
451    fn test_state_merge() -> VortexResult<()> {
452        let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
453        let mut state = MinMax.empty_partial(&EmptyOptions, &dtype)?;
454
455        let struct_dtype = make_minmax_dtype(&dtype);
456        let scalar1 = Scalar::struct_(
457            struct_dtype.clone(),
458            vec![Scalar::from(5i32), Scalar::from(15i32)],
459        );
460        MinMax.combine_partials(&mut state, scalar1)?;
461
462        let scalar2 = Scalar::struct_(struct_dtype, vec![Scalar::from(2i32), Scalar::from(10i32)]);
463        MinMax.combine_partials(&mut state, scalar2)?;
464
465        let result = MinMaxResult::from_scalar(MinMax.to_scalar(&state)?)?
466            .vortex_expect("should have result");
467        assert_eq!(result.min, Scalar::from(2i32));
468        assert_eq!(result.max, Scalar::from(15i32));
469        Ok(())
470    }
471
472    #[test]
473    fn test_constant_nan() -> VortexResult<()> {
474        let scalar = Scalar::primitive(f16::NAN, Nullability::NonNullable);
475        let array = ConstantArray::new(scalar, 2).into_array();
476        let mut ctx = LEGACY_SESSION.create_execution_ctx();
477        assert_eq!(min_max(&array, &mut ctx)?, None);
478        Ok(())
479    }
480
481    #[test]
482    fn test_chunked() -> VortexResult<()> {
483        let chunk1 = PrimitiveArray::from_option_iter([Some(5i32), None, Some(1)]);
484        let chunk2 = PrimitiveArray::from_option_iter([Some(10i32), Some(3), None]);
485        let dtype = chunk1.dtype().clone();
486        let chunked = ChunkedArray::try_new(vec![chunk1.into_array(), chunk2.into_array()], dtype)?;
487        let mut ctx = LEGACY_SESSION.create_execution_ctx();
488        let result = min_max(&chunked.into_array(), &mut ctx)?.vortex_expect("should have result");
489        assert_eq!(result.min, Scalar::from(1i32));
490        assert_eq!(result.max, Scalar::from(10i32));
491        Ok(())
492    }
493
494    #[test]
495    fn test_all_null() -> VortexResult<()> {
496        let p = PrimitiveArray::from_option_iter::<i32, _>([None, None, None]);
497        let mut ctx = LEGACY_SESSION.create_execution_ctx();
498        assert_eq!(min_max(&p.into_array(), &mut ctx)?, None);
499        Ok(())
500    }
501
502    #[test]
503    fn test_varbin() -> VortexResult<()> {
504        let array = VarBinArray::from_iter(
505            vec![
506                Some("hello world"),
507                None,
508                Some("hello world this is a long string"),
509                None,
510            ],
511            DType::Utf8(Nullability::Nullable),
512        );
513        let mut ctx = LEGACY_SESSION.create_execution_ctx();
514        let result = min_max(&array.into_array(), &mut ctx)?.vortex_expect("should have result");
515        assert_eq!(
516            result.min,
517            Scalar::utf8("hello world", Nullability::NonNullable)
518        );
519        assert_eq!(
520            result.max,
521            Scalar::utf8(
522                "hello world this is a long string",
523                Nullability::NonNullable
524            )
525        );
526        Ok(())
527    }
528
529    #[test]
530    fn test_decimal() -> VortexResult<()> {
531        let decimal = DecimalArray::new(
532            buffer![100i32, 2000i32, 200i32],
533            DecimalDType::new(4, 2),
534            Validity::from_iter([true, false, true]),
535        );
536        let mut ctx = LEGACY_SESSION.create_execution_ctx();
537        let result = min_max(&decimal.into_array(), &mut ctx)?.vortex_expect("should have result");
538
539        let non_nullable_dtype = DType::Decimal(DecimalDType::new(4, 2), Nullability::NonNullable);
540        let expected_min = Scalar::try_new(
541            non_nullable_dtype.clone(),
542            Some(ScalarValue::from(DecimalValue::from(100i32))),
543        )?;
544        let expected_max = Scalar::try_new(
545            non_nullable_dtype,
546            Some(ScalarValue::from(DecimalValue::from(200i32))),
547        )?;
548        assert_eq!(result.min, expected_min);
549        assert_eq!(result.max, expected_max);
550        Ok(())
551    }
552
553    use crate::dtype::half::f16;
554
555    #[test]
556    fn test_bool_with_nulls() -> VortexResult<()> {
557        let mut ctx = LEGACY_SESSION.create_execution_ctx();
558
559        let result = min_max(
560            &BoolArray::from_iter(vec![Some(true), Some(true), None, None]).into_array(),
561            &mut ctx,
562        )?;
563        assert_eq!(
564            result,
565            Some(MinMaxResult {
566                min: Scalar::bool(true, Nullability::NonNullable),
567                max: Scalar::bool(true, Nullability::NonNullable),
568            })
569        );
570
571        let result = min_max(
572            &BoolArray::from_iter(vec![None, Some(true), Some(true)]).into_array(),
573            &mut ctx,
574        )?;
575        assert_eq!(
576            result,
577            Some(MinMaxResult {
578                min: Scalar::bool(true, Nullability::NonNullable),
579                max: Scalar::bool(true, Nullability::NonNullable),
580            })
581        );
582
583        let result = min_max(
584            &BoolArray::from_iter(vec![None, Some(true), Some(true), None]).into_array(),
585            &mut ctx,
586        )?;
587        assert_eq!(
588            result,
589            Some(MinMaxResult {
590                min: Scalar::bool(true, Nullability::NonNullable),
591                max: Scalar::bool(true, Nullability::NonNullable),
592            })
593        );
594
595        let result = min_max(
596            &BoolArray::from_iter(vec![Some(false), Some(false), None, None]).into_array(),
597            &mut ctx,
598        )?;
599        assert_eq!(
600            result,
601            Some(MinMaxResult {
602                min: Scalar::bool(false, Nullability::NonNullable),
603                max: Scalar::bool(false, Nullability::NonNullable),
604            })
605        );
606        Ok(())
607    }
608
609    /// Regression test for <https://github.com/vortex-data/vortex/issues/7074>.
610    ///
611    /// A chunked all-true bool array with an empty first chunk returned min=false because
612    /// `accumulate_bool` on the empty chunk incorrectly merged min=false,max=false into the
613    /// partial state.
614    #[test]
615    fn test_bool_chunked_with_empty_chunk() -> VortexResult<()> {
616        let mut ctx = LEGACY_SESSION.create_execution_ctx();
617
618        let empty = BoolArray::new(BitBuffer::from([].as_slice()), Validity::NonNullable);
619        let chunk1 = BoolArray::new(
620            BitBuffer::from([true, true].as_slice()),
621            Validity::NonNullable,
622        );
623        let chunk2 = BoolArray::new(
624            BitBuffer::from([true, true, true].as_slice()),
625            Validity::NonNullable,
626        );
627        let chunked = ChunkedArray::try_new(
628            vec![empty.into_array(), chunk1.into_array(), chunk2.into_array()],
629            DType::Bool(Nullability::NonNullable),
630        )?;
631
632        let result = min_max(&chunked.into_array(), &mut ctx)?;
633        assert_eq!(
634            result,
635            Some(MinMaxResult {
636                min: Scalar::bool(true, Nullability::NonNullable),
637                max: Scalar::bool(true, Nullability::NonNullable),
638            })
639        );
640        Ok(())
641    }
642
643    #[test]
644    fn test_varbin_all_nulls() -> VortexResult<()> {
645        let array = VarBinArray::from_iter(
646            vec![Option::<&str>::None, None, None],
647            DType::Utf8(Nullability::Nullable),
648        );
649        let mut ctx = LEGACY_SESSION.create_execution_ctx();
650        assert_eq!(min_max(&array.into_array(), &mut ctx)?, None);
651        Ok(())
652    }
653}