Skip to main content

vortex_array/aggregate_fn/fns/min_max/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4mod bool;
5mod decimal;
6mod extension;
7mod primitive;
8mod varbin;
9
10use std::sync::LazyLock;
11
12use vortex_error::VortexExpect;
13use vortex_error::VortexResult;
14use vortex_error::vortex_bail;
15
16use self::bool::accumulate_bool;
17use self::decimal::accumulate_decimal;
18use self::extension::accumulate_extension;
19use self::primitive::accumulate_primitive;
20use self::varbin::accumulate_varbinview;
21use crate::ArrayRef;
22use crate::Canonical;
23use crate::Columnar;
24use crate::ExecutionCtx;
25use crate::aggregate_fn::Accumulator;
26use crate::aggregate_fn::AggregateFnId;
27use crate::aggregate_fn::AggregateFnVTable;
28use crate::aggregate_fn::DynAccumulator;
29use crate::aggregate_fn::EmptyOptions;
30use crate::dtype::DType;
31use crate::dtype::FieldNames;
32use crate::dtype::Nullability;
33use crate::dtype::StructFields;
34use crate::expr::stats::Precision;
35use crate::expr::stats::Stat;
36use crate::expr::stats::StatsProvider;
37use crate::partial_ord::partial_max;
38use crate::partial_ord::partial_min;
39use crate::scalar::Scalar;
40
41static NAMES: LazyLock<FieldNames> = LazyLock::new(|| FieldNames::from(["min", "max"]));
42
43/// The minimum and maximum non-null values of an array, or `None` if there are no non-null values.
44///
45/// The result scalars have the non-nullable version of the array dtype.
46/// This will update the stats set of the array as a side effect.
47pub fn min_max(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Option<MinMaxResult>> {
48    // Short-circuit using cached array statistics.
49    let cached_min = array
50        .statistics()
51        .get(Stat::Min)
52        .and_then(Precision::as_exact);
53    let cached_max = array
54        .statistics()
55        .get(Stat::Max)
56        .and_then(Precision::as_exact);
57    if let Some((min, max)) = cached_min.zip(cached_max) {
58        let non_nullable_dtype = array.dtype().as_nonnullable();
59        return Ok(Some(MinMaxResult {
60            min: min.cast(&non_nullable_dtype)?,
61            max: max.cast(&non_nullable_dtype)?,
62        }));
63    }
64
65    // Short-circuit for empty arrays or all-null arrays.
66    if array.is_empty() || array.valid_count()? == 0 {
67        return Ok(None);
68    }
69
70    // Short-circuit for unsupported dtypes.
71    if MinMax.return_dtype(&EmptyOptions, array.dtype()).is_none() {
72        return Ok(None);
73    }
74
75    // Compute using Accumulator<MinMax>.
76    let mut acc = Accumulator::try_new(MinMax, EmptyOptions, array.dtype().clone())?;
77    acc.accumulate(array, ctx)?;
78    let result_scalar = acc.finish()?;
79    let result = MinMaxResult::from_scalar(result_scalar)?;
80
81    // Cache the computed min/max as statistics.
82    if let Some(ref r) = result {
83        if let Some(min_value) = r.min.value() {
84            array
85                .statistics()
86                .set(Stat::Min, Precision::Exact(min_value.clone()));
87        }
88        if let Some(max_value) = r.max.value() {
89            array
90                .statistics()
91                .set(Stat::Max, Precision::Exact(max_value.clone()));
92        }
93    }
94
95    Ok(result)
96}
97
98/// The minimum and maximum non-null values of an array.
99#[derive(Debug, Clone, PartialEq, Eq)]
100pub struct MinMaxResult {
101    pub min: Scalar,
102    pub max: Scalar,
103}
104
105impl MinMaxResult {
106    /// Extract a `MinMaxResult` from a struct scalar with `{min, max}` fields.
107    pub fn from_scalar(scalar: Scalar) -> VortexResult<Option<Self>> {
108        if scalar.is_null() {
109            Ok(None)
110        } else {
111            let min = scalar
112                .as_struct()
113                .field_by_idx(0)
114                .vortex_expect("missing min field");
115            let max = scalar
116                .as_struct()
117                .field_by_idx(1)
118                .vortex_expect("missing max field");
119            Ok(Some(MinMaxResult { min, max }))
120        }
121    }
122}
123
124/// Compute the min and max of an array.
125///
126/// Returns a nullable struct scalar `{min: T, max: T}` where `T` is the non-nullable input dtype.
127/// The struct is null when the array is empty or all-null.
128#[derive(Clone, Debug)]
129pub struct MinMax;
130
131/// Partial accumulator state for min/max.
132pub struct MinMaxPartial {
133    min: Option<Scalar>,
134    max: Option<Scalar>,
135    element_dtype: DType,
136}
137
138impl MinMaxPartial {
139    /// Merge a local `MinMaxResult` into this partial state.
140    fn merge(&mut self, local: Option<MinMaxResult>) {
141        let Some(MinMaxResult { min, max }) = local else {
142            return;
143        };
144
145        self.min = Some(match self.min.take() {
146            Some(current) => partial_min(min, current).vortex_expect("incomparable min scalars"),
147            None => min,
148        });
149
150        self.max = Some(match self.max.take() {
151            Some(current) => partial_max(max, current).vortex_expect("incomparable max scalars"),
152            None => max,
153        });
154    }
155}
156
157/// Creates the struct dtype `{min: T, max: T}` (nullable) used for min/max aggregate results.
158pub fn make_minmax_dtype(element_dtype: &DType) -> DType {
159    DType::Struct(
160        StructFields::new(
161            NAMES.clone(),
162            vec![
163                element_dtype.as_nonnullable(),
164                element_dtype.as_nonnullable(),
165            ],
166        ),
167        Nullability::Nullable,
168    )
169}
170
171impl AggregateFnVTable for MinMax {
172    type Options = EmptyOptions;
173    type Partial = MinMaxPartial;
174
175    fn id(&self) -> AggregateFnId {
176        AggregateFnId::new_ref("vortex.min_max")
177    }
178
179    fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
180        unimplemented!("MinMax is not yet serializable");
181    }
182
183    fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option<DType> {
184        match input_dtype {
185            DType::Bool(_)
186            | DType::Primitive(..)
187            | DType::Decimal(..)
188            | DType::Utf8(..)
189            | DType::Binary(..)
190            | DType::Extension(..) => Some(make_minmax_dtype(input_dtype)),
191            _ => None,
192        }
193    }
194
195    fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
196        self.return_dtype(options, input_dtype)
197    }
198
199    fn empty_partial(
200        &self,
201        _options: &Self::Options,
202        input_dtype: &DType,
203    ) -> VortexResult<Self::Partial> {
204        Ok(MinMaxPartial {
205            min: None,
206            max: None,
207            element_dtype: input_dtype.clone(),
208        })
209    }
210
211    fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
212        let local = MinMaxResult::from_scalar(other)?;
213        partial.merge(local);
214        Ok(())
215    }
216
217    fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
218        let dtype = make_minmax_dtype(&partial.element_dtype);
219        Ok(match (&partial.min, &partial.max) {
220            (Some(min), Some(max)) => Scalar::struct_(dtype, vec![min.clone(), max.clone()]),
221            _ => Scalar::null(dtype),
222        })
223    }
224
225    fn reset(&self, partial: &mut Self::Partial) {
226        partial.min = None;
227        partial.max = None;
228    }
229
230    #[inline]
231    fn is_saturated(&self, _partial: &Self::Partial) -> bool {
232        false
233    }
234
235    fn accumulate(
236        &self,
237        partial: &mut Self::Partial,
238        batch: &Columnar,
239        ctx: &mut ExecutionCtx,
240    ) -> VortexResult<()> {
241        match batch {
242            Columnar::Constant(c) => {
243                let scalar = c.scalar();
244                if scalar.is_null() {
245                    return Ok(());
246                }
247                // Skip NaN float constants
248                if scalar.as_primitive_opt().is_some_and(|p| p.is_nan()) {
249                    return Ok(());
250                }
251                let non_nullable_dtype = scalar.dtype().as_nonnullable();
252                let cast = scalar.cast(&non_nullable_dtype)?;
253                partial.merge(Some(MinMaxResult {
254                    min: cast.clone(),
255                    max: cast,
256                }));
257                Ok(())
258            }
259            Columnar::Canonical(c) => match c {
260                Canonical::Primitive(p) => accumulate_primitive(partial, p),
261                Canonical::Bool(b) => accumulate_bool(partial, b),
262                Canonical::VarBinView(v) => accumulate_varbinview(partial, v),
263                Canonical::Decimal(d) => accumulate_decimal(partial, d),
264                Canonical::Extension(e) => accumulate_extension(partial, e, ctx),
265                Canonical::Null(_) => Ok(()),
266                Canonical::Struct(_)
267                | Canonical::List(_)
268                | Canonical::FixedSizeList(_)
269                | Canonical::Variant(_) => {
270                    vortex_bail!("Unsupported canonical type for min_max: {}", batch.dtype())
271                }
272            },
273        }
274    }
275
276    fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
277        Ok(partials)
278    }
279
280    fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
281        self.to_scalar(partial)
282    }
283}
284
285#[cfg(test)]
286mod tests {
287    use vortex_buffer::BitBuffer;
288    use vortex_buffer::buffer;
289    use vortex_error::VortexExpect;
290    use vortex_error::VortexResult;
291
292    use crate::IntoArray as _;
293    use crate::LEGACY_SESSION;
294    use crate::VortexSessionExecute;
295    use crate::aggregate_fn::Accumulator;
296    use crate::aggregate_fn::AggregateFnVTable;
297    use crate::aggregate_fn::DynAccumulator;
298    use crate::aggregate_fn::EmptyOptions;
299    use crate::aggregate_fn::fns::min_max::MinMax;
300    use crate::aggregate_fn::fns::min_max::MinMaxResult;
301    use crate::aggregate_fn::fns::min_max::min_max;
302    use crate::arrays::BoolArray;
303    use crate::arrays::ChunkedArray;
304    use crate::arrays::ConstantArray;
305    use crate::arrays::DecimalArray;
306    use crate::arrays::NullArray;
307    use crate::arrays::PrimitiveArray;
308    use crate::arrays::VarBinArray;
309    use crate::dtype::DType;
310    use crate::dtype::DecimalDType;
311    use crate::dtype::Nullability;
312    use crate::dtype::PType;
313    use crate::scalar::DecimalValue;
314    use crate::scalar::Scalar;
315    use crate::scalar::ScalarValue;
316    use crate::validity::Validity;
317
318    #[test]
319    fn test_prim_min_max() -> VortexResult<()> {
320        let p = PrimitiveArray::new(buffer![1, 2, 3], Validity::NonNullable).into_array();
321        let mut ctx = LEGACY_SESSION.create_execution_ctx();
322        assert_eq!(
323            min_max(&p, &mut ctx)?,
324            Some(MinMaxResult {
325                min: 1.into(),
326                max: 3.into()
327            })
328        );
329        Ok(())
330    }
331
332    #[test]
333    fn test_bool_min_max() -> VortexResult<()> {
334        let mut ctx = LEGACY_SESSION.create_execution_ctx();
335
336        let all_true = BoolArray::new(
337            BitBuffer::from([true, true, true].as_slice()),
338            Validity::NonNullable,
339        )
340        .into_array();
341        assert_eq!(
342            min_max(&all_true, &mut ctx)?,
343            Some(MinMaxResult {
344                min: true.into(),
345                max: true.into()
346            })
347        );
348
349        let all_false = BoolArray::new(
350            BitBuffer::from([false, false, false].as_slice()),
351            Validity::NonNullable,
352        )
353        .into_array();
354        assert_eq!(
355            min_max(&all_false, &mut ctx)?,
356            Some(MinMaxResult {
357                min: false.into(),
358                max: false.into()
359            })
360        );
361
362        let mixed = BoolArray::new(
363            BitBuffer::from([false, true, false].as_slice()),
364            Validity::NonNullable,
365        )
366        .into_array();
367        assert_eq!(
368            min_max(&mixed, &mut ctx)?,
369            Some(MinMaxResult {
370                min: false.into(),
371                max: true.into()
372            })
373        );
374        Ok(())
375    }
376
377    #[test]
378    fn test_null_array() -> VortexResult<()> {
379        let p = NullArray::new(1).into_array();
380        let mut ctx = LEGACY_SESSION.create_execution_ctx();
381        assert_eq!(min_max(&p, &mut ctx)?, None);
382        Ok(())
383    }
384
385    #[test]
386    fn test_prim_nan() -> VortexResult<()> {
387        let array = PrimitiveArray::new(
388            buffer![f32::NAN, -f32::NAN, -1.0, 1.0],
389            Validity::NonNullable,
390        );
391        let mut ctx = LEGACY_SESSION.create_execution_ctx();
392        let result = min_max(&array.into_array(), &mut ctx)?.vortex_expect("should have result");
393        assert_eq!(f32::try_from(&result.min)?, -1.0);
394        assert_eq!(f32::try_from(&result.max)?, 1.0);
395        Ok(())
396    }
397
398    #[test]
399    fn test_prim_inf() -> VortexResult<()> {
400        let array = PrimitiveArray::new(
401            buffer![f32::INFINITY, f32::NEG_INFINITY, -1.0, 1.0],
402            Validity::NonNullable,
403        );
404        let mut ctx = LEGACY_SESSION.create_execution_ctx();
405        let result = min_max(&array.into_array(), &mut ctx)?.vortex_expect("should have result");
406        assert_eq!(f32::try_from(&result.min)?, f32::NEG_INFINITY);
407        assert_eq!(f32::try_from(&result.max)?, f32::INFINITY);
408        Ok(())
409    }
410
411    #[test]
412    fn test_multi_batch() -> VortexResult<()> {
413        let mut ctx = LEGACY_SESSION.create_execution_ctx();
414        let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
415        let mut acc = Accumulator::try_new(MinMax, EmptyOptions, dtype)?;
416
417        let batch1 = PrimitiveArray::new(buffer![10i32, 20, 5], Validity::NonNullable).into_array();
418        acc.accumulate(&batch1, &mut ctx)?;
419
420        let batch2 = PrimitiveArray::new(buffer![3i32, 25], Validity::NonNullable).into_array();
421        acc.accumulate(&batch2, &mut ctx)?;
422
423        let result = MinMaxResult::from_scalar(acc.finish()?)?.vortex_expect("should have result");
424        assert_eq!(result.min, Scalar::from(3i32));
425        assert_eq!(result.max, Scalar::from(25i32));
426        Ok(())
427    }
428
429    #[test]
430    fn test_finish_resets_state() -> VortexResult<()> {
431        let mut ctx = LEGACY_SESSION.create_execution_ctx();
432        let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
433        let mut acc = Accumulator::try_new(MinMax, EmptyOptions, dtype)?;
434
435        let batch1 = PrimitiveArray::new(buffer![10i32, 20], Validity::NonNullable).into_array();
436        acc.accumulate(&batch1, &mut ctx)?;
437        let result1 = MinMaxResult::from_scalar(acc.finish()?)?.vortex_expect("should have result");
438        assert_eq!(result1.min, Scalar::from(10i32));
439        assert_eq!(result1.max, Scalar::from(20i32));
440
441        let batch2 = PrimitiveArray::new(buffer![3i32, 6, 9], Validity::NonNullable).into_array();
442        acc.accumulate(&batch2, &mut ctx)?;
443        let result2 = MinMaxResult::from_scalar(acc.finish()?)?.vortex_expect("should have result");
444        assert_eq!(result2.min, Scalar::from(3i32));
445        assert_eq!(result2.max, Scalar::from(9i32));
446        Ok(())
447    }
448
449    #[test]
450    fn test_state_merge() -> VortexResult<()> {
451        let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
452        let mut state = MinMax.empty_partial(&EmptyOptions, &dtype)?;
453
454        let struct_dtype = crate::aggregate_fn::fns::min_max::make_minmax_dtype(&dtype);
455        let scalar1 = Scalar::struct_(
456            struct_dtype.clone(),
457            vec![Scalar::from(5i32), Scalar::from(15i32)],
458        );
459        MinMax.combine_partials(&mut state, scalar1)?;
460
461        let scalar2 = Scalar::struct_(struct_dtype, vec![Scalar::from(2i32), Scalar::from(10i32)]);
462        MinMax.combine_partials(&mut state, scalar2)?;
463
464        let result = MinMaxResult::from_scalar(MinMax.to_scalar(&state)?)?
465            .vortex_expect("should have result");
466        assert_eq!(result.min, Scalar::from(2i32));
467        assert_eq!(result.max, Scalar::from(15i32));
468        Ok(())
469    }
470
471    #[test]
472    fn test_constant_nan() -> VortexResult<()> {
473        let scalar = Scalar::primitive(f16::NAN, Nullability::NonNullable);
474        let array = ConstantArray::new(scalar, 2).into_array();
475        let mut ctx = LEGACY_SESSION.create_execution_ctx();
476        assert_eq!(min_max(&array, &mut ctx)?, None);
477        Ok(())
478    }
479
480    #[test]
481    fn test_chunked() -> VortexResult<()> {
482        let chunk1 = PrimitiveArray::from_option_iter([Some(5i32), None, Some(1)]);
483        let chunk2 = PrimitiveArray::from_option_iter([Some(10i32), Some(3), None]);
484        let dtype = chunk1.dtype().clone();
485        let chunked = ChunkedArray::try_new(vec![chunk1.into_array(), chunk2.into_array()], dtype)?;
486        let mut ctx = LEGACY_SESSION.create_execution_ctx();
487        let result = min_max(&chunked.into_array(), &mut ctx)?.vortex_expect("should have result");
488        assert_eq!(result.min, Scalar::from(1i32));
489        assert_eq!(result.max, Scalar::from(10i32));
490        Ok(())
491    }
492
493    #[test]
494    fn test_all_null() -> VortexResult<()> {
495        let p = PrimitiveArray::from_option_iter::<i32, _>([None, None, None]);
496        let mut ctx = LEGACY_SESSION.create_execution_ctx();
497        assert_eq!(min_max(&p.into_array(), &mut ctx)?, None);
498        Ok(())
499    }
500
501    #[test]
502    fn test_varbin() -> VortexResult<()> {
503        let array = VarBinArray::from_iter(
504            vec![
505                Some("hello world"),
506                None,
507                Some("hello world this is a long string"),
508                None,
509            ],
510            DType::Utf8(Nullability::Nullable),
511        );
512        let mut ctx = LEGACY_SESSION.create_execution_ctx();
513        let result = min_max(&array.into_array(), &mut ctx)?.vortex_expect("should have result");
514        assert_eq!(
515            result.min,
516            Scalar::utf8("hello world", Nullability::NonNullable)
517        );
518        assert_eq!(
519            result.max,
520            Scalar::utf8(
521                "hello world this is a long string",
522                Nullability::NonNullable
523            )
524        );
525        Ok(())
526    }
527
528    #[test]
529    fn test_decimal() -> VortexResult<()> {
530        let decimal = DecimalArray::new(
531            buffer![100i32, 2000i32, 200i32],
532            DecimalDType::new(4, 2),
533            Validity::from_iter([true, false, true]),
534        );
535        let mut ctx = LEGACY_SESSION.create_execution_ctx();
536        let result = min_max(&decimal.into_array(), &mut ctx)?.vortex_expect("should have result");
537
538        let non_nullable_dtype = DType::Decimal(DecimalDType::new(4, 2), Nullability::NonNullable);
539        let expected_min = Scalar::try_new(
540            non_nullable_dtype.clone(),
541            Some(ScalarValue::from(DecimalValue::from(100i32))),
542        )?;
543        let expected_max = Scalar::try_new(
544            non_nullable_dtype,
545            Some(ScalarValue::from(DecimalValue::from(200i32))),
546        )?;
547        assert_eq!(result.min, expected_min);
548        assert_eq!(result.max, expected_max);
549        Ok(())
550    }
551
552    use crate::dtype::half::f16;
553
554    #[test]
555    fn test_bool_with_nulls() -> VortexResult<()> {
556        let mut ctx = LEGACY_SESSION.create_execution_ctx();
557
558        let result = min_max(
559            &BoolArray::from_iter(vec![Some(true), Some(true), None, None]).into_array(),
560            &mut ctx,
561        )?;
562        assert_eq!(
563            result,
564            Some(MinMaxResult {
565                min: Scalar::bool(true, Nullability::NonNullable),
566                max: Scalar::bool(true, Nullability::NonNullable),
567            })
568        );
569
570        let result = min_max(
571            &BoolArray::from_iter(vec![None, Some(true), Some(true)]).into_array(),
572            &mut ctx,
573        )?;
574        assert_eq!(
575            result,
576            Some(MinMaxResult {
577                min: Scalar::bool(true, Nullability::NonNullable),
578                max: Scalar::bool(true, Nullability::NonNullable),
579            })
580        );
581
582        let result = min_max(
583            &BoolArray::from_iter(vec![None, Some(true), Some(true), None]).into_array(),
584            &mut ctx,
585        )?;
586        assert_eq!(
587            result,
588            Some(MinMaxResult {
589                min: Scalar::bool(true, Nullability::NonNullable),
590                max: Scalar::bool(true, Nullability::NonNullable),
591            })
592        );
593
594        let result = min_max(
595            &BoolArray::from_iter(vec![Some(false), Some(false), None, None]).into_array(),
596            &mut ctx,
597        )?;
598        assert_eq!(
599            result,
600            Some(MinMaxResult {
601                min: Scalar::bool(false, Nullability::NonNullable),
602                max: Scalar::bool(false, Nullability::NonNullable),
603            })
604        );
605        Ok(())
606    }
607
608    /// Regression test for <https://github.com/vortex-data/vortex/issues/7074>.
609    ///
610    /// A chunked all-true bool array with an empty first chunk returned min=false because
611    /// `accumulate_bool` on the empty chunk incorrectly merged min=false,max=false into the
612    /// partial state.
613    #[test]
614    fn test_bool_chunked_with_empty_chunk() -> VortexResult<()> {
615        let mut ctx = LEGACY_SESSION.create_execution_ctx();
616
617        let empty = BoolArray::new(BitBuffer::from([].as_slice()), Validity::NonNullable);
618        let chunk1 = BoolArray::new(
619            BitBuffer::from([true, true].as_slice()),
620            Validity::NonNullable,
621        );
622        let chunk2 = BoolArray::new(
623            BitBuffer::from([true, true, true].as_slice()),
624            Validity::NonNullable,
625        );
626        let chunked = ChunkedArray::try_new(
627            vec![empty.into_array(), chunk1.into_array(), chunk2.into_array()],
628            DType::Bool(Nullability::NonNullable),
629        )?;
630
631        let result = min_max(&chunked.into_array(), &mut ctx)?;
632        assert_eq!(
633            result,
634            Some(MinMaxResult {
635                min: Scalar::bool(true, Nullability::NonNullable),
636                max: Scalar::bool(true, Nullability::NonNullable),
637            })
638        );
639        Ok(())
640    }
641
642    #[test]
643    fn test_varbin_all_nulls() -> VortexResult<()> {
644        let array = VarBinArray::from_iter(
645            vec![Option::<&str>::None, None, None],
646            DType::Utf8(Nullability::Nullable),
647        );
648        let mut ctx = LEGACY_SESSION.create_execution_ctx();
649        assert_eq!(min_max(&array.into_array(), &mut ctx)?, None);
650        Ok(())
651    }
652}