Skip to main content

vortex_array/aggregate_fn/fns/min_max/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4mod bool;
5mod decimal;
6mod extension;
7mod primitive;
8mod varbin;
9
10use std::sync::LazyLock;
11
12use vortex_error::VortexExpect;
13use vortex_error::VortexResult;
14use vortex_error::vortex_bail;
15
16use self::bool::accumulate_bool;
17use self::decimal::accumulate_decimal;
18use self::extension::accumulate_extension;
19use self::primitive::accumulate_primitive;
20use self::varbin::accumulate_varbinview;
21use crate::ArrayRef;
22use crate::Canonical;
23use crate::Columnar;
24use crate::ExecutionCtx;
25use crate::aggregate_fn::Accumulator;
26use crate::aggregate_fn::AggregateFnId;
27use crate::aggregate_fn::AggregateFnVTable;
28use crate::aggregate_fn::DynAccumulator;
29use crate::aggregate_fn::EmptyOptions;
30use crate::dtype::DType;
31use crate::dtype::FieldNames;
32use crate::dtype::Nullability;
33use crate::dtype::StructFields;
34use crate::expr::stats::Precision;
35use crate::expr::stats::Stat;
36use crate::expr::stats::StatsProvider;
37use crate::partial_ord::partial_max;
38use crate::partial_ord::partial_min;
39use crate::scalar::Scalar;
40
41static NAMES: LazyLock<FieldNames> = LazyLock::new(|| FieldNames::from(["min", "max"]));
42
43/// The minimum and maximum non-null values of an array, or `None` if there are no non-null values.
44///
45/// The result scalars have the non-nullable version of the array dtype.
46/// This will update the stats set of the array as a side effect.
47pub fn min_max(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Option<MinMaxResult>> {
48    // Short-circuit using cached array statistics.
49    let cached_min = array
50        .statistics()
51        .get(Stat::Min)
52        .and_then(Precision::as_exact);
53    let cached_max = array
54        .statistics()
55        .get(Stat::Max)
56        .and_then(Precision::as_exact);
57    if let Some((min, max)) = cached_min.zip(cached_max) {
58        let non_nullable_dtype = array.dtype().as_nonnullable();
59        return Ok(Some(MinMaxResult {
60            min: min.cast(&non_nullable_dtype)?,
61            max: max.cast(&non_nullable_dtype)?,
62        }));
63    }
64
65    // Short-circuit for empty arrays or all-null arrays.
66    if array.is_empty() || array.valid_count()? == 0 {
67        return Ok(None);
68    }
69
70    // Short-circuit for unsupported dtypes.
71    if MinMax.return_dtype(&EmptyOptions, array.dtype()).is_none() {
72        return Ok(None);
73    }
74
75    // Compute using Accumulator<MinMax>.
76    let mut acc = Accumulator::try_new(MinMax, EmptyOptions, array.dtype().clone())?;
77    acc.accumulate(array, ctx)?;
78    let result_scalar = acc.finish()?;
79    let result = MinMaxResult::from_scalar(result_scalar)?;
80
81    // Cache the computed min/max as statistics.
82    if let Some(ref r) = result {
83        if let Some(min_value) = r.min.value() {
84            array
85                .statistics()
86                .set(Stat::Min, Precision::Exact(min_value.clone()));
87        }
88        if let Some(max_value) = r.max.value() {
89            array
90                .statistics()
91                .set(Stat::Max, Precision::Exact(max_value.clone()));
92        }
93    }
94
95    Ok(result)
96}
97
98/// The minimum and maximum non-null values of an array.
99#[derive(Debug, Clone, PartialEq, Eq)]
100pub struct MinMaxResult {
101    pub min: Scalar,
102    pub max: Scalar,
103}
104
105impl MinMaxResult {
106    /// Extract a `MinMaxResult` from a struct scalar with `{min, max}` fields.
107    pub fn from_scalar(scalar: Scalar) -> VortexResult<Option<Self>> {
108        if scalar.is_null() {
109            Ok(None)
110        } else {
111            let min = scalar
112                .as_struct()
113                .field_by_idx(0)
114                .vortex_expect("missing min field");
115            let max = scalar
116                .as_struct()
117                .field_by_idx(1)
118                .vortex_expect("missing max field");
119            Ok(Some(MinMaxResult { min, max }))
120        }
121    }
122}
123
124/// Compute the min and max of an array.
125///
126/// Returns a nullable struct scalar `{min: T, max: T}` where `T` is the non-nullable input dtype.
127/// The struct is null when the array is empty or all-null.
128#[derive(Clone, Debug)]
129pub struct MinMax;
130
131/// Partial accumulator state for min/max.
132pub struct MinMaxPartial {
133    min: Option<Scalar>,
134    max: Option<Scalar>,
135    element_dtype: DType,
136}
137
138impl MinMaxPartial {
139    /// Merge a local `MinMaxResult` into this partial state.
140    fn merge(&mut self, local: Option<MinMaxResult>) {
141        let Some(MinMaxResult { min, max }) = local else {
142            return;
143        };
144
145        self.min = Some(match self.min.take() {
146            Some(current) => partial_min(min, current).vortex_expect("incomparable min scalars"),
147            None => min,
148        });
149
150        self.max = Some(match self.max.take() {
151            Some(current) => partial_max(max, current).vortex_expect("incomparable max scalars"),
152            None => max,
153        });
154    }
155}
156
157/// Creates the struct dtype `{min: T, max: T}` (nullable) used for min/max aggregate results.
158pub fn make_minmax_dtype(element_dtype: &DType) -> DType {
159    DType::Struct(
160        StructFields::new(
161            NAMES.clone(),
162            vec![
163                element_dtype.as_nonnullable(),
164                element_dtype.as_nonnullable(),
165            ],
166        ),
167        Nullability::Nullable,
168    )
169}
170
171impl AggregateFnVTable for MinMax {
172    type Options = EmptyOptions;
173    type Partial = MinMaxPartial;
174
175    fn id(&self) -> AggregateFnId {
176        AggregateFnId::new_ref("vortex.min_max")
177    }
178
179    fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
180        Ok(Some(vec![]))
181    }
182
183    fn deserialize(
184        &self,
185        _metadata: &[u8],
186        _session: &vortex_session::VortexSession,
187    ) -> VortexResult<Self::Options> {
188        Ok(EmptyOptions)
189    }
190
191    fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option<DType> {
192        match input_dtype {
193            DType::Bool(_)
194            | DType::Primitive(..)
195            | DType::Decimal(..)
196            | DType::Utf8(..)
197            | DType::Binary(..)
198            | DType::Extension(..) => Some(make_minmax_dtype(input_dtype)),
199            _ => None,
200        }
201    }
202
203    fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
204        self.return_dtype(options, input_dtype)
205    }
206
207    fn empty_partial(
208        &self,
209        _options: &Self::Options,
210        input_dtype: &DType,
211    ) -> VortexResult<Self::Partial> {
212        Ok(MinMaxPartial {
213            min: None,
214            max: None,
215            element_dtype: input_dtype.clone(),
216        })
217    }
218
219    fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
220        let local = MinMaxResult::from_scalar(other)?;
221        partial.merge(local);
222        Ok(())
223    }
224
225    fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
226        let dtype = make_minmax_dtype(&partial.element_dtype);
227        Ok(match (&partial.min, &partial.max) {
228            (Some(min), Some(max)) => Scalar::struct_(dtype, vec![min.clone(), max.clone()]),
229            _ => Scalar::null(dtype),
230        })
231    }
232
233    fn reset(&self, partial: &mut Self::Partial) {
234        partial.min = None;
235        partial.max = None;
236    }
237
238    #[inline]
239    fn is_saturated(&self, _partial: &Self::Partial) -> bool {
240        false
241    }
242
243    fn accumulate(
244        &self,
245        partial: &mut Self::Partial,
246        batch: &Columnar,
247        ctx: &mut ExecutionCtx,
248    ) -> VortexResult<()> {
249        match batch {
250            Columnar::Constant(c) => {
251                let scalar = c.scalar();
252                if scalar.is_null() {
253                    return Ok(());
254                }
255                // Skip NaN float constants
256                if scalar.as_primitive_opt().is_some_and(|p| p.is_nan()) {
257                    return Ok(());
258                }
259                let non_nullable_dtype = scalar.dtype().as_nonnullable();
260                let cast = scalar.cast(&non_nullable_dtype)?;
261                partial.merge(Some(MinMaxResult {
262                    min: cast.clone(),
263                    max: cast,
264                }));
265                Ok(())
266            }
267            Columnar::Canonical(c) => match c {
268                Canonical::Primitive(p) => accumulate_primitive(partial, p),
269                Canonical::Bool(b) => accumulate_bool(partial, b),
270                Canonical::VarBinView(v) => accumulate_varbinview(partial, v),
271                Canonical::Decimal(d) => accumulate_decimal(partial, d),
272                Canonical::Extension(e) => accumulate_extension(partial, e, ctx),
273                Canonical::Null(_) => Ok(()),
274                Canonical::Struct(_)
275                | Canonical::List(_)
276                | Canonical::FixedSizeList(_)
277                | Canonical::Variant(_) => {
278                    vortex_bail!("Unsupported canonical type for min_max: {}", batch.dtype())
279                }
280            },
281        }
282    }
283
284    fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
285        Ok(partials)
286    }
287
288    fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
289        self.to_scalar(partial)
290    }
291}
292
293#[cfg(test)]
294mod tests {
295    use vortex_buffer::BitBuffer;
296    use vortex_buffer::buffer;
297    use vortex_error::VortexExpect;
298    use vortex_error::VortexResult;
299
300    use crate::IntoArray as _;
301    use crate::LEGACY_SESSION;
302    use crate::VortexSessionExecute;
303    use crate::aggregate_fn::Accumulator;
304    use crate::aggregate_fn::AggregateFnVTable;
305    use crate::aggregate_fn::DynAccumulator;
306    use crate::aggregate_fn::EmptyOptions;
307    use crate::aggregate_fn::fns::min_max::MinMax;
308    use crate::aggregate_fn::fns::min_max::MinMaxResult;
309    use crate::aggregate_fn::fns::min_max::min_max;
310    use crate::arrays::BoolArray;
311    use crate::arrays::ChunkedArray;
312    use crate::arrays::ConstantArray;
313    use crate::arrays::DecimalArray;
314    use crate::arrays::NullArray;
315    use crate::arrays::PrimitiveArray;
316    use crate::arrays::VarBinArray;
317    use crate::dtype::DType;
318    use crate::dtype::DecimalDType;
319    use crate::dtype::Nullability;
320    use crate::dtype::PType;
321    use crate::scalar::DecimalValue;
322    use crate::scalar::Scalar;
323    use crate::scalar::ScalarValue;
324    use crate::validity::Validity;
325
326    #[test]
327    fn test_prim_min_max() -> VortexResult<()> {
328        let p = PrimitiveArray::new(buffer![1, 2, 3], Validity::NonNullable).into_array();
329        let mut ctx = LEGACY_SESSION.create_execution_ctx();
330        assert_eq!(
331            min_max(&p, &mut ctx)?,
332            Some(MinMaxResult {
333                min: 1.into(),
334                max: 3.into()
335            })
336        );
337        Ok(())
338    }
339
340    #[test]
341    fn test_bool_min_max() -> VortexResult<()> {
342        let mut ctx = LEGACY_SESSION.create_execution_ctx();
343
344        let all_true = BoolArray::new(
345            BitBuffer::from([true, true, true].as_slice()),
346            Validity::NonNullable,
347        )
348        .into_array();
349        assert_eq!(
350            min_max(&all_true, &mut ctx)?,
351            Some(MinMaxResult {
352                min: true.into(),
353                max: true.into()
354            })
355        );
356
357        let all_false = BoolArray::new(
358            BitBuffer::from([false, false, false].as_slice()),
359            Validity::NonNullable,
360        )
361        .into_array();
362        assert_eq!(
363            min_max(&all_false, &mut ctx)?,
364            Some(MinMaxResult {
365                min: false.into(),
366                max: false.into()
367            })
368        );
369
370        let mixed = BoolArray::new(
371            BitBuffer::from([false, true, false].as_slice()),
372            Validity::NonNullable,
373        )
374        .into_array();
375        assert_eq!(
376            min_max(&mixed, &mut ctx)?,
377            Some(MinMaxResult {
378                min: false.into(),
379                max: true.into()
380            })
381        );
382        Ok(())
383    }
384
385    #[test]
386    fn test_null_array() -> VortexResult<()> {
387        let p = NullArray::new(1).into_array();
388        let mut ctx = LEGACY_SESSION.create_execution_ctx();
389        assert_eq!(min_max(&p, &mut ctx)?, None);
390        Ok(())
391    }
392
393    #[test]
394    fn test_prim_nan() -> VortexResult<()> {
395        let array = PrimitiveArray::new(
396            buffer![f32::NAN, -f32::NAN, -1.0, 1.0],
397            Validity::NonNullable,
398        );
399        let mut ctx = LEGACY_SESSION.create_execution_ctx();
400        let result = min_max(&array.into_array(), &mut ctx)?.vortex_expect("should have result");
401        assert_eq!(f32::try_from(&result.min)?, -1.0);
402        assert_eq!(f32::try_from(&result.max)?, 1.0);
403        Ok(())
404    }
405
406    #[test]
407    fn test_prim_inf() -> VortexResult<()> {
408        let array = PrimitiveArray::new(
409            buffer![f32::INFINITY, f32::NEG_INFINITY, -1.0, 1.0],
410            Validity::NonNullable,
411        );
412        let mut ctx = LEGACY_SESSION.create_execution_ctx();
413        let result = min_max(&array.into_array(), &mut ctx)?.vortex_expect("should have result");
414        assert_eq!(f32::try_from(&result.min)?, f32::NEG_INFINITY);
415        assert_eq!(f32::try_from(&result.max)?, f32::INFINITY);
416        Ok(())
417    }
418
419    #[test]
420    fn test_multi_batch() -> VortexResult<()> {
421        let mut ctx = LEGACY_SESSION.create_execution_ctx();
422        let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
423        let mut acc = Accumulator::try_new(MinMax, EmptyOptions, dtype)?;
424
425        let batch1 = PrimitiveArray::new(buffer![10i32, 20, 5], Validity::NonNullable).into_array();
426        acc.accumulate(&batch1, &mut ctx)?;
427
428        let batch2 = PrimitiveArray::new(buffer![3i32, 25], Validity::NonNullable).into_array();
429        acc.accumulate(&batch2, &mut ctx)?;
430
431        let result = MinMaxResult::from_scalar(acc.finish()?)?.vortex_expect("should have result");
432        assert_eq!(result.min, Scalar::from(3i32));
433        assert_eq!(result.max, Scalar::from(25i32));
434        Ok(())
435    }
436
437    #[test]
438    fn test_finish_resets_state() -> VortexResult<()> {
439        let mut ctx = LEGACY_SESSION.create_execution_ctx();
440        let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
441        let mut acc = Accumulator::try_new(MinMax, EmptyOptions, dtype)?;
442
443        let batch1 = PrimitiveArray::new(buffer![10i32, 20], Validity::NonNullable).into_array();
444        acc.accumulate(&batch1, &mut ctx)?;
445        let result1 = MinMaxResult::from_scalar(acc.finish()?)?.vortex_expect("should have result");
446        assert_eq!(result1.min, Scalar::from(10i32));
447        assert_eq!(result1.max, Scalar::from(20i32));
448
449        let batch2 = PrimitiveArray::new(buffer![3i32, 6, 9], Validity::NonNullable).into_array();
450        acc.accumulate(&batch2, &mut ctx)?;
451        let result2 = MinMaxResult::from_scalar(acc.finish()?)?.vortex_expect("should have result");
452        assert_eq!(result2.min, Scalar::from(3i32));
453        assert_eq!(result2.max, Scalar::from(9i32));
454        Ok(())
455    }
456
457    #[test]
458    fn test_state_merge() -> VortexResult<()> {
459        let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
460        let mut state = MinMax.empty_partial(&EmptyOptions, &dtype)?;
461
462        let struct_dtype = crate::aggregate_fn::fns::min_max::make_minmax_dtype(&dtype);
463        let scalar1 = Scalar::struct_(
464            struct_dtype.clone(),
465            vec![Scalar::from(5i32), Scalar::from(15i32)],
466        );
467        MinMax.combine_partials(&mut state, scalar1)?;
468
469        let scalar2 = Scalar::struct_(struct_dtype, vec![Scalar::from(2i32), Scalar::from(10i32)]);
470        MinMax.combine_partials(&mut state, scalar2)?;
471
472        let result = MinMaxResult::from_scalar(MinMax.to_scalar(&state)?)?
473            .vortex_expect("should have result");
474        assert_eq!(result.min, Scalar::from(2i32));
475        assert_eq!(result.max, Scalar::from(15i32));
476        Ok(())
477    }
478
479    #[test]
480    fn test_constant_nan() -> VortexResult<()> {
481        let scalar = Scalar::primitive(f16::NAN, Nullability::NonNullable);
482        let array = ConstantArray::new(scalar, 2).into_array();
483        let mut ctx = LEGACY_SESSION.create_execution_ctx();
484        assert_eq!(min_max(&array, &mut ctx)?, None);
485        Ok(())
486    }
487
488    #[test]
489    fn test_chunked() -> VortexResult<()> {
490        let chunk1 = PrimitiveArray::from_option_iter([Some(5i32), None, Some(1)]);
491        let chunk2 = PrimitiveArray::from_option_iter([Some(10i32), Some(3), None]);
492        let dtype = chunk1.dtype().clone();
493        let chunked = ChunkedArray::try_new(vec![chunk1.into_array(), chunk2.into_array()], dtype)?;
494        let mut ctx = LEGACY_SESSION.create_execution_ctx();
495        let result = min_max(&chunked.into_array(), &mut ctx)?.vortex_expect("should have result");
496        assert_eq!(result.min, Scalar::from(1i32));
497        assert_eq!(result.max, Scalar::from(10i32));
498        Ok(())
499    }
500
501    #[test]
502    fn test_all_null() -> VortexResult<()> {
503        let p = PrimitiveArray::from_option_iter::<i32, _>([None, None, None]);
504        let mut ctx = LEGACY_SESSION.create_execution_ctx();
505        assert_eq!(min_max(&p.into_array(), &mut ctx)?, None);
506        Ok(())
507    }
508
509    #[test]
510    fn test_varbin() -> VortexResult<()> {
511        let array = VarBinArray::from_iter(
512            vec![
513                Some("hello world"),
514                None,
515                Some("hello world this is a long string"),
516                None,
517            ],
518            DType::Utf8(Nullability::Nullable),
519        );
520        let mut ctx = LEGACY_SESSION.create_execution_ctx();
521        let result = min_max(&array.into_array(), &mut ctx)?.vortex_expect("should have result");
522        assert_eq!(
523            result.min,
524            Scalar::utf8("hello world", Nullability::NonNullable)
525        );
526        assert_eq!(
527            result.max,
528            Scalar::utf8(
529                "hello world this is a long string",
530                Nullability::NonNullable
531            )
532        );
533        Ok(())
534    }
535
536    #[test]
537    fn test_decimal() -> VortexResult<()> {
538        let decimal = DecimalArray::new(
539            buffer![100i32, 2000i32, 200i32],
540            DecimalDType::new(4, 2),
541            Validity::from_iter([true, false, true]),
542        );
543        let mut ctx = LEGACY_SESSION.create_execution_ctx();
544        let result = min_max(&decimal.into_array(), &mut ctx)?.vortex_expect("should have result");
545
546        let non_nullable_dtype = DType::Decimal(DecimalDType::new(4, 2), Nullability::NonNullable);
547        let expected_min = Scalar::try_new(
548            non_nullable_dtype.clone(),
549            Some(ScalarValue::from(DecimalValue::from(100i32))),
550        )?;
551        let expected_max = Scalar::try_new(
552            non_nullable_dtype,
553            Some(ScalarValue::from(DecimalValue::from(200i32))),
554        )?;
555        assert_eq!(result.min, expected_min);
556        assert_eq!(result.max, expected_max);
557        Ok(())
558    }
559
560    use crate::dtype::half::f16;
561
562    #[test]
563    fn test_bool_with_nulls() -> VortexResult<()> {
564        let mut ctx = LEGACY_SESSION.create_execution_ctx();
565
566        let result = min_max(
567            &BoolArray::from_iter(vec![Some(true), Some(true), None, None]).into_array(),
568            &mut ctx,
569        )?;
570        assert_eq!(
571            result,
572            Some(MinMaxResult {
573                min: Scalar::bool(true, Nullability::NonNullable),
574                max: Scalar::bool(true, Nullability::NonNullable),
575            })
576        );
577
578        let result = min_max(
579            &BoolArray::from_iter(vec![None, Some(true), Some(true)]).into_array(),
580            &mut ctx,
581        )?;
582        assert_eq!(
583            result,
584            Some(MinMaxResult {
585                min: Scalar::bool(true, Nullability::NonNullable),
586                max: Scalar::bool(true, Nullability::NonNullable),
587            })
588        );
589
590        let result = min_max(
591            &BoolArray::from_iter(vec![None, Some(true), Some(true), None]).into_array(),
592            &mut ctx,
593        )?;
594        assert_eq!(
595            result,
596            Some(MinMaxResult {
597                min: Scalar::bool(true, Nullability::NonNullable),
598                max: Scalar::bool(true, Nullability::NonNullable),
599            })
600        );
601
602        let result = min_max(
603            &BoolArray::from_iter(vec![Some(false), Some(false), None, None]).into_array(),
604            &mut ctx,
605        )?;
606        assert_eq!(
607            result,
608            Some(MinMaxResult {
609                min: Scalar::bool(false, Nullability::NonNullable),
610                max: Scalar::bool(false, Nullability::NonNullable),
611            })
612        );
613        Ok(())
614    }
615
616    /// Regression test for <https://github.com/vortex-data/vortex/issues/7074>.
617    ///
618    /// A chunked all-true bool array with an empty first chunk returned min=false because
619    /// `accumulate_bool` on the empty chunk incorrectly merged min=false,max=false into the
620    /// partial state.
621    #[test]
622    fn test_bool_chunked_with_empty_chunk() -> VortexResult<()> {
623        let mut ctx = LEGACY_SESSION.create_execution_ctx();
624
625        let empty = BoolArray::new(BitBuffer::from([].as_slice()), Validity::NonNullable);
626        let chunk1 = BoolArray::new(
627            BitBuffer::from([true, true].as_slice()),
628            Validity::NonNullable,
629        );
630        let chunk2 = BoolArray::new(
631            BitBuffer::from([true, true, true].as_slice()),
632            Validity::NonNullable,
633        );
634        let chunked = ChunkedArray::try_new(
635            vec![empty.into_array(), chunk1.into_array(), chunk2.into_array()],
636            DType::Bool(Nullability::NonNullable),
637        )?;
638
639        let result = min_max(&chunked.into_array(), &mut ctx)?;
640        assert_eq!(
641            result,
642            Some(MinMaxResult {
643                min: Scalar::bool(true, Nullability::NonNullable),
644                max: Scalar::bool(true, Nullability::NonNullable),
645            })
646        );
647        Ok(())
648    }
649
650    #[test]
651    fn test_varbin_all_nulls() -> VortexResult<()> {
652        let array = VarBinArray::from_iter(
653            vec![Option::<&str>::None, None, None],
654            DType::Utf8(Nullability::Nullable),
655        );
656        let mut ctx = LEGACY_SESSION.create_execution_ctx();
657        assert_eq!(min_max(&array.into_array(), &mut ctx)?, None);
658        Ok(())
659    }
660}