1mod bool;
5mod decimal;
6mod extension;
7mod primitive;
8mod varbin;
9
10use std::sync::LazyLock;
11
12use vortex_error::VortexExpect;
13use vortex_error::VortexResult;
14use vortex_error::vortex_bail;
15
16use self::bool::accumulate_bool;
17use self::decimal::accumulate_decimal;
18use self::extension::accumulate_extension;
19use self::primitive::accumulate_primitive;
20use self::varbin::accumulate_varbinview;
21use crate::ArrayRef;
22use crate::Canonical;
23use crate::Columnar;
24use crate::ExecutionCtx;
25use crate::aggregate_fn::Accumulator;
26use crate::aggregate_fn::AggregateFnId;
27use crate::aggregate_fn::AggregateFnVTable;
28use crate::aggregate_fn::DynAccumulator;
29use crate::aggregate_fn::EmptyOptions;
30use crate::dtype::DType;
31use crate::dtype::FieldNames;
32use crate::dtype::Nullability;
33use crate::dtype::StructFields;
34use crate::expr::stats::Precision;
35use crate::expr::stats::Stat;
36use crate::expr::stats::StatsProvider;
37use crate::partial_ord::partial_max;
38use crate::partial_ord::partial_min;
39use crate::scalar::Scalar;
40
41static NAMES: LazyLock<FieldNames> = LazyLock::new(|| FieldNames::from(["min", "max"]));
42
43pub fn min_max(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Option<MinMaxResult>> {
48 let cached_min = array
50 .statistics()
51 .get(Stat::Min)
52 .and_then(Precision::as_exact);
53 let cached_max = array
54 .statistics()
55 .get(Stat::Max)
56 .and_then(Precision::as_exact);
57 if let Some((min, max)) = cached_min.zip(cached_max) {
58 let non_nullable_dtype = array.dtype().as_nonnullable();
59 return Ok(Some(MinMaxResult {
60 min: min.cast(&non_nullable_dtype)?,
61 max: max.cast(&non_nullable_dtype)?,
62 }));
63 }
64
65 if array.is_empty() || array.valid_count(ctx)? == 0 {
67 return Ok(None);
68 }
69
70 if MinMax.return_dtype(&EmptyOptions, array.dtype()).is_none() {
72 return Ok(None);
73 }
74
75 let mut acc = Accumulator::try_new(MinMax, EmptyOptions, array.dtype().clone())?;
77 acc.accumulate(array, ctx)?;
78 let result_scalar = acc.finish()?;
79 let result = MinMaxResult::from_scalar(result_scalar)?;
80
81 if let Some(r) = &result {
83 if let Some(min_value) = r.min.value() {
84 array
85 .statistics()
86 .set(Stat::Min, Precision::Exact(min_value.clone()));
87 }
88 if let Some(max_value) = r.max.value() {
89 array
90 .statistics()
91 .set(Stat::Max, Precision::Exact(max_value.clone()));
92 }
93 }
94
95 Ok(result)
96}
97
98#[derive(Debug, Clone, PartialEq, Eq)]
100pub struct MinMaxResult {
101 pub min: Scalar,
102 pub max: Scalar,
103}
104
105impl MinMaxResult {
106 pub fn from_scalar(scalar: Scalar) -> VortexResult<Option<Self>> {
108 if scalar.is_null() {
109 Ok(None)
110 } else {
111 let min = scalar
112 .as_struct()
113 .field_by_idx(0)
114 .vortex_expect("missing min field");
115 let max = scalar
116 .as_struct()
117 .field_by_idx(1)
118 .vortex_expect("missing max field");
119 Ok(Some(MinMaxResult { min, max }))
120 }
121 }
122}
123
124#[derive(Clone, Debug)]
129pub struct MinMax;
130
131pub struct MinMaxPartial {
133 min: Option<Scalar>,
134 max: Option<Scalar>,
135 element_dtype: DType,
136}
137
138impl MinMaxPartial {
139 fn merge(&mut self, local: Option<MinMaxResult>) {
141 let Some(MinMaxResult { min, max }) = local else {
142 return;
143 };
144
145 self.min = Some(match self.min.take() {
146 Some(current) => partial_min(min, current).vortex_expect("incomparable min scalars"),
147 None => min,
148 });
149
150 self.max = Some(match self.max.take() {
151 Some(current) => partial_max(max, current).vortex_expect("incomparable max scalars"),
152 None => max,
153 });
154 }
155}
156
157pub fn make_minmax_dtype(element_dtype: &DType) -> DType {
159 DType::Struct(
160 StructFields::new(
161 NAMES.clone(),
162 vec![
163 element_dtype.as_nonnullable(),
164 element_dtype.as_nonnullable(),
165 ],
166 ),
167 Nullability::Nullable,
168 )
169}
170
171impl AggregateFnVTable for MinMax {
172 type Options = EmptyOptions;
173 type Partial = MinMaxPartial;
174
175 fn id(&self) -> AggregateFnId {
176 AggregateFnId::new("vortex.min_max")
177 }
178
179 fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
180 unimplemented!("MinMax is not yet serializable");
181 }
182
183 fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option<DType> {
184 match input_dtype {
185 DType::Bool(_)
186 | DType::Primitive(..)
187 | DType::Decimal(..)
188 | DType::Utf8(..)
189 | DType::Binary(..)
190 | DType::Extension(..) => Some(make_minmax_dtype(input_dtype)),
191 _ => None,
192 }
193 }
194
195 fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
196 self.return_dtype(options, input_dtype)
197 }
198
199 fn empty_partial(
200 &self,
201 _options: &Self::Options,
202 input_dtype: &DType,
203 ) -> VortexResult<Self::Partial> {
204 Ok(MinMaxPartial {
205 min: None,
206 max: None,
207 element_dtype: input_dtype.clone(),
208 })
209 }
210
211 fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
212 let local = MinMaxResult::from_scalar(other)?;
213 partial.merge(local);
214 Ok(())
215 }
216
217 fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
218 let dtype = make_minmax_dtype(&partial.element_dtype);
219 Ok(match (&partial.min, &partial.max) {
220 (Some(min), Some(max)) => Scalar::struct_(dtype, vec![min.clone(), max.clone()]),
221 _ => Scalar::null(dtype),
222 })
223 }
224
225 fn reset(&self, partial: &mut Self::Partial) {
226 partial.min = None;
227 partial.max = None;
228 }
229
230 #[inline]
231 fn is_saturated(&self, _partial: &Self::Partial) -> bool {
232 false
233 }
234
235 fn accumulate(
236 &self,
237 partial: &mut Self::Partial,
238 batch: &Columnar,
239 ctx: &mut ExecutionCtx,
240 ) -> VortexResult<()> {
241 match batch {
242 Columnar::Constant(c) => {
243 let scalar = c.scalar();
244 if scalar.is_null() {
245 return Ok(());
246 }
247 if scalar.as_primitive_opt().is_some_and(|p| p.is_nan()) {
249 return Ok(());
250 }
251 let non_nullable_dtype = scalar.dtype().as_nonnullable();
252 let cast = scalar.cast(&non_nullable_dtype)?;
253 partial.merge(Some(MinMaxResult {
254 min: cast.clone(),
255 max: cast,
256 }));
257 Ok(())
258 }
259 Columnar::Canonical(c) => match c {
260 Canonical::Primitive(p) => accumulate_primitive(partial, p, ctx),
261 Canonical::Bool(b) => accumulate_bool(partial, b, ctx),
262 Canonical::VarBinView(v) => accumulate_varbinview(partial, v),
263 Canonical::Decimal(d) => accumulate_decimal(partial, d, ctx),
264 Canonical::Extension(e) => accumulate_extension(partial, e, ctx),
265 Canonical::Null(_) => Ok(()),
266 Canonical::Struct(_)
267 | Canonical::List(_)
268 | Canonical::FixedSizeList(_)
269 | Canonical::Variant(_) => {
270 vortex_bail!("Unsupported canonical type for min_max: {}", batch.dtype())
271 }
272 },
273 }
274 }
275
276 fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
277 Ok(partials)
278 }
279
280 fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
281 self.to_scalar(partial)
282 }
283}
284
285#[cfg(test)]
286mod tests {
287 use vortex_buffer::BitBuffer;
288 use vortex_buffer::buffer;
289 use vortex_error::VortexExpect;
290 use vortex_error::VortexResult;
291
292 use crate::IntoArray as _;
293 use crate::LEGACY_SESSION;
294 use crate::VortexSessionExecute;
295 use crate::aggregate_fn::Accumulator;
296 use crate::aggregate_fn::AggregateFnVTable;
297 use crate::aggregate_fn::DynAccumulator;
298 use crate::aggregate_fn::EmptyOptions;
299 use crate::aggregate_fn::fns::min_max::MinMax;
300 use crate::aggregate_fn::fns::min_max::MinMaxResult;
301 use crate::aggregate_fn::fns::min_max::make_minmax_dtype;
302 use crate::aggregate_fn::fns::min_max::min_max;
303 use crate::arrays::BoolArray;
304 use crate::arrays::ChunkedArray;
305 use crate::arrays::ConstantArray;
306 use crate::arrays::DecimalArray;
307 use crate::arrays::NullArray;
308 use crate::arrays::PrimitiveArray;
309 use crate::arrays::VarBinArray;
310 use crate::dtype::DType;
311 use crate::dtype::DecimalDType;
312 use crate::dtype::Nullability;
313 use crate::dtype::PType;
314 use crate::scalar::DecimalValue;
315 use crate::scalar::Scalar;
316 use crate::scalar::ScalarValue;
317 use crate::validity::Validity;
318
319 #[test]
320 fn test_prim_min_max() -> VortexResult<()> {
321 let p = PrimitiveArray::new(buffer![1, 2, 3], Validity::NonNullable).into_array();
322 let mut ctx = LEGACY_SESSION.create_execution_ctx();
323 assert_eq!(
324 min_max(&p, &mut ctx)?,
325 Some(MinMaxResult {
326 min: 1.into(),
327 max: 3.into()
328 })
329 );
330 Ok(())
331 }
332
333 #[test]
334 fn test_bool_min_max() -> VortexResult<()> {
335 let mut ctx = LEGACY_SESSION.create_execution_ctx();
336
337 let all_true = BoolArray::new(
338 BitBuffer::from([true, true, true].as_slice()),
339 Validity::NonNullable,
340 )
341 .into_array();
342 assert_eq!(
343 min_max(&all_true, &mut ctx)?,
344 Some(MinMaxResult {
345 min: true.into(),
346 max: true.into()
347 })
348 );
349
350 let all_false = BoolArray::new(
351 BitBuffer::from([false, false, false].as_slice()),
352 Validity::NonNullable,
353 )
354 .into_array();
355 assert_eq!(
356 min_max(&all_false, &mut ctx)?,
357 Some(MinMaxResult {
358 min: false.into(),
359 max: false.into()
360 })
361 );
362
363 let mixed = BoolArray::new(
364 BitBuffer::from([false, true, false].as_slice()),
365 Validity::NonNullable,
366 )
367 .into_array();
368 assert_eq!(
369 min_max(&mixed, &mut ctx)?,
370 Some(MinMaxResult {
371 min: false.into(),
372 max: true.into()
373 })
374 );
375 Ok(())
376 }
377
378 #[test]
379 fn test_null_array() -> VortexResult<()> {
380 let p = NullArray::new(1).into_array();
381 let mut ctx = LEGACY_SESSION.create_execution_ctx();
382 assert_eq!(min_max(&p, &mut ctx)?, None);
383 Ok(())
384 }
385
386 #[test]
387 fn test_prim_nan() -> VortexResult<()> {
388 let array = PrimitiveArray::new(
389 buffer![f32::NAN, -f32::NAN, -1.0, 1.0],
390 Validity::NonNullable,
391 );
392 let mut ctx = LEGACY_SESSION.create_execution_ctx();
393 let result = min_max(&array.into_array(), &mut ctx)?.vortex_expect("should have result");
394 assert_eq!(f32::try_from(&result.min)?, -1.0);
395 assert_eq!(f32::try_from(&result.max)?, 1.0);
396 Ok(())
397 }
398
399 #[test]
400 fn test_prim_inf() -> VortexResult<()> {
401 let array = PrimitiveArray::new(
402 buffer![f32::INFINITY, f32::NEG_INFINITY, -1.0, 1.0],
403 Validity::NonNullable,
404 );
405 let mut ctx = LEGACY_SESSION.create_execution_ctx();
406 let result = min_max(&array.into_array(), &mut ctx)?.vortex_expect("should have result");
407 assert_eq!(f32::try_from(&result.min)?, f32::NEG_INFINITY);
408 assert_eq!(f32::try_from(&result.max)?, f32::INFINITY);
409 Ok(())
410 }
411
412 #[test]
413 fn test_multi_batch() -> VortexResult<()> {
414 let mut ctx = LEGACY_SESSION.create_execution_ctx();
415 let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
416 let mut acc = Accumulator::try_new(MinMax, EmptyOptions, dtype)?;
417
418 let batch1 = PrimitiveArray::new(buffer![10i32, 20, 5], Validity::NonNullable).into_array();
419 acc.accumulate(&batch1, &mut ctx)?;
420
421 let batch2 = PrimitiveArray::new(buffer![3i32, 25], Validity::NonNullable).into_array();
422 acc.accumulate(&batch2, &mut ctx)?;
423
424 let result = MinMaxResult::from_scalar(acc.finish()?)?.vortex_expect("should have result");
425 assert_eq!(result.min, Scalar::from(3i32));
426 assert_eq!(result.max, Scalar::from(25i32));
427 Ok(())
428 }
429
430 #[test]
431 fn test_finish_resets_state() -> VortexResult<()> {
432 let mut ctx = LEGACY_SESSION.create_execution_ctx();
433 let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
434 let mut acc = Accumulator::try_new(MinMax, EmptyOptions, dtype)?;
435
436 let batch1 = PrimitiveArray::new(buffer![10i32, 20], Validity::NonNullable).into_array();
437 acc.accumulate(&batch1, &mut ctx)?;
438 let result1 = MinMaxResult::from_scalar(acc.finish()?)?.vortex_expect("should have result");
439 assert_eq!(result1.min, Scalar::from(10i32));
440 assert_eq!(result1.max, Scalar::from(20i32));
441
442 let batch2 = PrimitiveArray::new(buffer![3i32, 6, 9], Validity::NonNullable).into_array();
443 acc.accumulate(&batch2, &mut ctx)?;
444 let result2 = MinMaxResult::from_scalar(acc.finish()?)?.vortex_expect("should have result");
445 assert_eq!(result2.min, Scalar::from(3i32));
446 assert_eq!(result2.max, Scalar::from(9i32));
447 Ok(())
448 }
449
450 #[test]
451 fn test_state_merge() -> VortexResult<()> {
452 let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
453 let mut state = MinMax.empty_partial(&EmptyOptions, &dtype)?;
454
455 let struct_dtype = make_minmax_dtype(&dtype);
456 let scalar1 = Scalar::struct_(
457 struct_dtype.clone(),
458 vec![Scalar::from(5i32), Scalar::from(15i32)],
459 );
460 MinMax.combine_partials(&mut state, scalar1)?;
461
462 let scalar2 = Scalar::struct_(struct_dtype, vec![Scalar::from(2i32), Scalar::from(10i32)]);
463 MinMax.combine_partials(&mut state, scalar2)?;
464
465 let result = MinMaxResult::from_scalar(MinMax.to_scalar(&state)?)?
466 .vortex_expect("should have result");
467 assert_eq!(result.min, Scalar::from(2i32));
468 assert_eq!(result.max, Scalar::from(15i32));
469 Ok(())
470 }
471
472 #[test]
473 fn test_constant_nan() -> VortexResult<()> {
474 let scalar = Scalar::primitive(f16::NAN, Nullability::NonNullable);
475 let array = ConstantArray::new(scalar, 2).into_array();
476 let mut ctx = LEGACY_SESSION.create_execution_ctx();
477 assert_eq!(min_max(&array, &mut ctx)?, None);
478 Ok(())
479 }
480
481 #[test]
482 fn test_chunked() -> VortexResult<()> {
483 let chunk1 = PrimitiveArray::from_option_iter([Some(5i32), None, Some(1)]);
484 let chunk2 = PrimitiveArray::from_option_iter([Some(10i32), Some(3), None]);
485 let dtype = chunk1.dtype().clone();
486 let chunked = ChunkedArray::try_new(vec![chunk1.into_array(), chunk2.into_array()], dtype)?;
487 let mut ctx = LEGACY_SESSION.create_execution_ctx();
488 let result = min_max(&chunked.into_array(), &mut ctx)?.vortex_expect("should have result");
489 assert_eq!(result.min, Scalar::from(1i32));
490 assert_eq!(result.max, Scalar::from(10i32));
491 Ok(())
492 }
493
494 #[test]
495 fn test_all_null() -> VortexResult<()> {
496 let p = PrimitiveArray::from_option_iter::<i32, _>([None, None, None]);
497 let mut ctx = LEGACY_SESSION.create_execution_ctx();
498 assert_eq!(min_max(&p.into_array(), &mut ctx)?, None);
499 Ok(())
500 }
501
502 #[test]
503 fn test_varbin() -> VortexResult<()> {
504 let array = VarBinArray::from_iter(
505 vec![
506 Some("hello world"),
507 None,
508 Some("hello world this is a long string"),
509 None,
510 ],
511 DType::Utf8(Nullability::Nullable),
512 );
513 let mut ctx = LEGACY_SESSION.create_execution_ctx();
514 let result = min_max(&array.into_array(), &mut ctx)?.vortex_expect("should have result");
515 assert_eq!(
516 result.min,
517 Scalar::utf8("hello world", Nullability::NonNullable)
518 );
519 assert_eq!(
520 result.max,
521 Scalar::utf8(
522 "hello world this is a long string",
523 Nullability::NonNullable
524 )
525 );
526 Ok(())
527 }
528
529 #[test]
530 fn test_decimal() -> VortexResult<()> {
531 let decimal = DecimalArray::new(
532 buffer![100i32, 2000i32, 200i32],
533 DecimalDType::new(4, 2),
534 Validity::from_iter([true, false, true]),
535 );
536 let mut ctx = LEGACY_SESSION.create_execution_ctx();
537 let result = min_max(&decimal.into_array(), &mut ctx)?.vortex_expect("should have result");
538
539 let non_nullable_dtype = DType::Decimal(DecimalDType::new(4, 2), Nullability::NonNullable);
540 let expected_min = Scalar::try_new(
541 non_nullable_dtype.clone(),
542 Some(ScalarValue::from(DecimalValue::from(100i32))),
543 )?;
544 let expected_max = Scalar::try_new(
545 non_nullable_dtype,
546 Some(ScalarValue::from(DecimalValue::from(200i32))),
547 )?;
548 assert_eq!(result.min, expected_min);
549 assert_eq!(result.max, expected_max);
550 Ok(())
551 }
552
553 use crate::dtype::half::f16;
554
555 #[test]
556 fn test_bool_with_nulls() -> VortexResult<()> {
557 let mut ctx = LEGACY_SESSION.create_execution_ctx();
558
559 let result = min_max(
560 &BoolArray::from_iter(vec![Some(true), Some(true), None, None]).into_array(),
561 &mut ctx,
562 )?;
563 assert_eq!(
564 result,
565 Some(MinMaxResult {
566 min: Scalar::bool(true, Nullability::NonNullable),
567 max: Scalar::bool(true, Nullability::NonNullable),
568 })
569 );
570
571 let result = min_max(
572 &BoolArray::from_iter(vec![None, Some(true), Some(true)]).into_array(),
573 &mut ctx,
574 )?;
575 assert_eq!(
576 result,
577 Some(MinMaxResult {
578 min: Scalar::bool(true, Nullability::NonNullable),
579 max: Scalar::bool(true, Nullability::NonNullable),
580 })
581 );
582
583 let result = min_max(
584 &BoolArray::from_iter(vec![None, Some(true), Some(true), None]).into_array(),
585 &mut ctx,
586 )?;
587 assert_eq!(
588 result,
589 Some(MinMaxResult {
590 min: Scalar::bool(true, Nullability::NonNullable),
591 max: Scalar::bool(true, Nullability::NonNullable),
592 })
593 );
594
595 let result = min_max(
596 &BoolArray::from_iter(vec![Some(false), Some(false), None, None]).into_array(),
597 &mut ctx,
598 )?;
599 assert_eq!(
600 result,
601 Some(MinMaxResult {
602 min: Scalar::bool(false, Nullability::NonNullable),
603 max: Scalar::bool(false, Nullability::NonNullable),
604 })
605 );
606 Ok(())
607 }
608
609 #[test]
615 fn test_bool_chunked_with_empty_chunk() -> VortexResult<()> {
616 let mut ctx = LEGACY_SESSION.create_execution_ctx();
617
618 let empty = BoolArray::new(BitBuffer::from([].as_slice()), Validity::NonNullable);
619 let chunk1 = BoolArray::new(
620 BitBuffer::from([true, true].as_slice()),
621 Validity::NonNullable,
622 );
623 let chunk2 = BoolArray::new(
624 BitBuffer::from([true, true, true].as_slice()),
625 Validity::NonNullable,
626 );
627 let chunked = ChunkedArray::try_new(
628 vec![empty.into_array(), chunk1.into_array(), chunk2.into_array()],
629 DType::Bool(Nullability::NonNullable),
630 )?;
631
632 let result = min_max(&chunked.into_array(), &mut ctx)?;
633 assert_eq!(
634 result,
635 Some(MinMaxResult {
636 min: Scalar::bool(true, Nullability::NonNullable),
637 max: Scalar::bool(true, Nullability::NonNullable),
638 })
639 );
640 Ok(())
641 }
642
643 #[test]
644 fn test_varbin_all_nulls() -> VortexResult<()> {
645 let array = VarBinArray::from_iter(
646 vec![Option::<&str>::None, None, None],
647 DType::Utf8(Nullability::Nullable),
648 );
649 let mut ctx = LEGACY_SESSION.create_execution_ctx();
650 assert_eq!(min_max(&array.into_array(), &mut ctx)?, None);
651 Ok(())
652 }
653}