1mod bool;
5mod decimal;
6mod extension;
7mod primitive;
8mod varbin;
9
10use std::sync::LazyLock;
11
12use vortex_error::VortexExpect;
13use vortex_error::VortexResult;
14use vortex_error::vortex_bail;
15
16use self::bool::accumulate_bool;
17use self::decimal::accumulate_decimal;
18use self::extension::accumulate_extension;
19use self::primitive::accumulate_primitive;
20use self::varbin::accumulate_varbinview;
21use crate::ArrayRef;
22use crate::Canonical;
23use crate::Columnar;
24use crate::ExecutionCtx;
25use crate::aggregate_fn::Accumulator;
26use crate::aggregate_fn::AggregateFnId;
27use crate::aggregate_fn::AggregateFnVTable;
28use crate::aggregate_fn::DynAccumulator;
29use crate::aggregate_fn::EmptyOptions;
30use crate::dtype::DType;
31use crate::dtype::FieldNames;
32use crate::dtype::Nullability;
33use crate::dtype::StructFields;
34use crate::expr::stats::Precision;
35use crate::expr::stats::Stat;
36use crate::expr::stats::StatsProvider;
37use crate::partial_ord::partial_max;
38use crate::partial_ord::partial_min;
39use crate::scalar::Scalar;
40
41static NAMES: LazyLock<FieldNames> = LazyLock::new(|| FieldNames::from(["min", "max"]));
42
43pub fn min_max(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Option<MinMaxResult>> {
48 let cached_min = array.statistics().get(Stat::Min).as_exact();
50 let cached_max = array.statistics().get(Stat::Max).as_exact();
51 if let Some((min, max)) = cached_min.zip(cached_max) {
52 let non_nullable_dtype = array.dtype().as_nonnullable();
53 return Ok(Some(MinMaxResult {
54 min: min.cast(&non_nullable_dtype)?,
55 max: max.cast(&non_nullable_dtype)?,
56 }));
57 }
58
59 if array.is_empty() || array.valid_count(ctx)? == 0 {
61 return Ok(None);
62 }
63
64 if MinMax.return_dtype(&EmptyOptions, array.dtype()).is_none() {
66 return Ok(None);
67 }
68
69 let mut acc = Accumulator::try_new(MinMax, EmptyOptions, array.dtype().clone())?;
71 acc.accumulate(array, ctx)?;
72 let result_scalar = acc.finish()?;
73 let result = MinMaxResult::from_scalar(result_scalar)?;
74
75 if let Some(r) = &result {
77 if let Some(min_value) = r.min.value() {
78 array
79 .statistics()
80 .set(Stat::Min, Precision::Exact(min_value.clone()));
81 }
82 if let Some(max_value) = r.max.value() {
83 array
84 .statistics()
85 .set(Stat::Max, Precision::Exact(max_value.clone()));
86 }
87 }
88
89 Ok(result)
90}
91
92#[derive(Debug, Clone, PartialEq, Eq)]
94pub struct MinMaxResult {
95 pub min: Scalar,
96 pub max: Scalar,
97}
98
99impl MinMaxResult {
100 pub fn from_scalar(scalar: Scalar) -> VortexResult<Option<Self>> {
102 if scalar.is_null() {
103 Ok(None)
104 } else {
105 let min = scalar
106 .as_struct()
107 .field_by_idx(0)
108 .vortex_expect("missing min field");
109 let max = scalar
110 .as_struct()
111 .field_by_idx(1)
112 .vortex_expect("missing max field");
113 Ok(Some(MinMaxResult { min, max }))
114 }
115 }
116}
117
118#[derive(Clone, Debug)]
123pub struct MinMax;
124
125pub struct MinMaxPartial {
127 min: Option<Scalar>,
128 max: Option<Scalar>,
129 element_dtype: DType,
130}
131
132impl MinMaxPartial {
133 fn merge(&mut self, local: Option<MinMaxResult>) {
135 let Some(MinMaxResult { min, max }) = local else {
136 return;
137 };
138
139 self.min = Some(match self.min.take() {
140 Some(current) => partial_min(min, current).vortex_expect("incomparable min scalars"),
141 None => min,
142 });
143
144 self.max = Some(match self.max.take() {
145 Some(current) => partial_max(max, current).vortex_expect("incomparable max scalars"),
146 None => max,
147 });
148 }
149}
150
151pub fn make_minmax_dtype(element_dtype: &DType) -> DType {
153 DType::Struct(
154 StructFields::new(
155 NAMES.clone(),
156 vec![
157 element_dtype.as_nonnullable(),
158 element_dtype.as_nonnullable(),
159 ],
160 ),
161 Nullability::Nullable,
162 )
163}
164
165impl AggregateFnVTable for MinMax {
166 type Options = EmptyOptions;
167 type Partial = MinMaxPartial;
168
169 fn id(&self) -> AggregateFnId {
170 AggregateFnId::new("vortex.min_max")
171 }
172
173 fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
174 Ok(None)
175 }
176
177 fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option<DType> {
178 match input_dtype {
179 DType::Bool(_)
180 | DType::Primitive(..)
181 | DType::Decimal(..)
182 | DType::Utf8(..)
183 | DType::Binary(..)
184 | DType::Extension(..) => Some(make_minmax_dtype(input_dtype)),
185 _ => None,
186 }
187 }
188
189 fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
190 self.return_dtype(options, input_dtype)
191 }
192
193 fn empty_partial(
194 &self,
195 _options: &Self::Options,
196 input_dtype: &DType,
197 ) -> VortexResult<Self::Partial> {
198 Ok(MinMaxPartial {
199 min: None,
200 max: None,
201 element_dtype: input_dtype.clone(),
202 })
203 }
204
205 fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
206 let local = MinMaxResult::from_scalar(other)?;
207 partial.merge(local);
208 Ok(())
209 }
210
211 fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
212 let dtype = make_minmax_dtype(&partial.element_dtype);
213 Ok(match (&partial.min, &partial.max) {
214 (Some(min), Some(max)) => Scalar::struct_(dtype, vec![min.clone(), max.clone()]),
215 _ => Scalar::null(dtype),
216 })
217 }
218
219 fn reset(&self, partial: &mut Self::Partial) {
220 partial.min = None;
221 partial.max = None;
222 }
223
224 #[inline]
225 fn is_saturated(&self, _partial: &Self::Partial) -> bool {
226 false
227 }
228
229 fn accumulate(
230 &self,
231 partial: &mut Self::Partial,
232 batch: &Columnar,
233 ctx: &mut ExecutionCtx,
234 ) -> VortexResult<()> {
235 match batch {
236 Columnar::Constant(c) => {
237 let scalar = c.scalar();
238 if scalar.is_null() {
239 return Ok(());
240 }
241 if scalar.as_primitive_opt().is_some_and(|p| p.is_nan()) {
243 return Ok(());
244 }
245 let non_nullable_dtype = scalar.dtype().as_nonnullable();
246 let cast = scalar.cast(&non_nullable_dtype)?;
247 partial.merge(Some(MinMaxResult {
248 min: cast.clone(),
249 max: cast,
250 }));
251 Ok(())
252 }
253 Columnar::Canonical(c) => match c {
254 Canonical::Primitive(p) => accumulate_primitive(partial, p, ctx),
255 Canonical::Bool(b) => accumulate_bool(partial, b, ctx),
256 Canonical::VarBinView(v) => accumulate_varbinview(partial, v),
257 Canonical::Decimal(d) => accumulate_decimal(partial, d, ctx),
258 Canonical::Extension(e) => accumulate_extension(partial, e, ctx),
259 Canonical::Null(_) => Ok(()),
260 Canonical::Struct(_)
261 | Canonical::List(_)
262 | Canonical::FixedSizeList(_)
263 | Canonical::Variant(_) => {
264 vortex_bail!("Unsupported canonical type for min_max: {}", batch.dtype())
265 }
266 },
267 }
268 }
269
270 fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
271 Ok(partials)
272 }
273
274 fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
275 self.to_scalar(partial)
276 }
277}
278
279#[cfg(test)]
280mod tests {
281 use vortex_buffer::BitBuffer;
282 use vortex_buffer::buffer;
283 use vortex_error::VortexExpect;
284 use vortex_error::VortexResult;
285
286 use crate::IntoArray as _;
287 use crate::LEGACY_SESSION;
288 use crate::VortexSessionExecute;
289 use crate::aggregate_fn::Accumulator;
290 use crate::aggregate_fn::AggregateFnVTable;
291 use crate::aggregate_fn::DynAccumulator;
292 use crate::aggregate_fn::EmptyOptions;
293 use crate::aggregate_fn::fns::min_max::MinMax;
294 use crate::aggregate_fn::fns::min_max::MinMaxResult;
295 use crate::aggregate_fn::fns::min_max::make_minmax_dtype;
296 use crate::aggregate_fn::fns::min_max::min_max;
297 use crate::arrays::BoolArray;
298 use crate::arrays::ChunkedArray;
299 use crate::arrays::ConstantArray;
300 use crate::arrays::DecimalArray;
301 use crate::arrays::NullArray;
302 use crate::arrays::PrimitiveArray;
303 use crate::arrays::VarBinArray;
304 use crate::dtype::DType;
305 use crate::dtype::DecimalDType;
306 use crate::dtype::Nullability;
307 use crate::dtype::PType;
308 use crate::scalar::DecimalValue;
309 use crate::scalar::Scalar;
310 use crate::scalar::ScalarValue;
311 use crate::validity::Validity;
312
313 #[test]
314 fn test_prim_min_max() -> VortexResult<()> {
315 let p = PrimitiveArray::new(buffer![1, 2, 3], Validity::NonNullable).into_array();
316 let mut ctx = LEGACY_SESSION.create_execution_ctx();
317 assert_eq!(
318 min_max(&p, &mut ctx)?,
319 Some(MinMaxResult {
320 min: 1.into(),
321 max: 3.into()
322 })
323 );
324 Ok(())
325 }
326
327 #[test]
328 fn test_prim_min_max_multiple_null_runs() -> VortexResult<()> {
329 let p = PrimitiveArray::from_option_iter([
332 Some(5i32),
333 Some(3),
334 None,
335 None,
336 Some(9),
337 None,
338 Some(1),
339 Some(7),
340 ])
341 .into_array();
342 let mut ctx = LEGACY_SESSION.create_execution_ctx();
343 assert_eq!(
344 min_max(&p, &mut ctx)?,
345 Some(MinMaxResult {
346 min: 1.into(),
347 max: 9.into()
348 })
349 );
350 Ok(())
351 }
352
353 #[test]
354 fn test_bool_min_max() -> VortexResult<()> {
355 let mut ctx = LEGACY_SESSION.create_execution_ctx();
356
357 let all_true = BoolArray::new(
358 BitBuffer::from([true, true, true].as_slice()),
359 Validity::NonNullable,
360 )
361 .into_array();
362 assert_eq!(
363 min_max(&all_true, &mut ctx)?,
364 Some(MinMaxResult {
365 min: true.into(),
366 max: true.into()
367 })
368 );
369
370 let all_false = BoolArray::new(
371 BitBuffer::from([false, false, false].as_slice()),
372 Validity::NonNullable,
373 )
374 .into_array();
375 assert_eq!(
376 min_max(&all_false, &mut ctx)?,
377 Some(MinMaxResult {
378 min: false.into(),
379 max: false.into()
380 })
381 );
382
383 let mixed = BoolArray::new(
384 BitBuffer::from([false, true, false].as_slice()),
385 Validity::NonNullable,
386 )
387 .into_array();
388 assert_eq!(
389 min_max(&mixed, &mut ctx)?,
390 Some(MinMaxResult {
391 min: false.into(),
392 max: true.into()
393 })
394 );
395 Ok(())
396 }
397
398 #[test]
399 fn test_null_array() -> VortexResult<()> {
400 let p = NullArray::new(1).into_array();
401 let mut ctx = LEGACY_SESSION.create_execution_ctx();
402 assert_eq!(min_max(&p, &mut ctx)?, None);
403 Ok(())
404 }
405
406 #[test]
407 fn test_prim_nan() -> VortexResult<()> {
408 let array = PrimitiveArray::new(
409 buffer![f32::NAN, -f32::NAN, -1.0, 1.0],
410 Validity::NonNullable,
411 );
412 let mut ctx = LEGACY_SESSION.create_execution_ctx();
413 let result = min_max(&array.into_array(), &mut ctx)?.vortex_expect("should have result");
414 assert_eq!(f32::try_from(&result.min)?, -1.0);
415 assert_eq!(f32::try_from(&result.max)?, 1.0);
416 Ok(())
417 }
418
419 #[test]
420 fn test_prim_inf() -> VortexResult<()> {
421 let array = PrimitiveArray::new(
422 buffer![f32::INFINITY, f32::NEG_INFINITY, -1.0, 1.0],
423 Validity::NonNullable,
424 );
425 let mut ctx = LEGACY_SESSION.create_execution_ctx();
426 let result = min_max(&array.into_array(), &mut ctx)?.vortex_expect("should have result");
427 assert_eq!(f32::try_from(&result.min)?, f32::NEG_INFINITY);
428 assert_eq!(f32::try_from(&result.max)?, f32::INFINITY);
429 Ok(())
430 }
431
432 #[test]
433 fn test_multi_batch() -> VortexResult<()> {
434 let mut ctx = LEGACY_SESSION.create_execution_ctx();
435 let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
436 let mut acc = Accumulator::try_new(MinMax, EmptyOptions, dtype)?;
437
438 let batch1 = PrimitiveArray::new(buffer![10i32, 20, 5], Validity::NonNullable).into_array();
439 acc.accumulate(&batch1, &mut ctx)?;
440
441 let batch2 = PrimitiveArray::new(buffer![3i32, 25], Validity::NonNullable).into_array();
442 acc.accumulate(&batch2, &mut ctx)?;
443
444 let result = MinMaxResult::from_scalar(acc.finish()?)?.vortex_expect("should have result");
445 assert_eq!(result.min, Scalar::from(3i32));
446 assert_eq!(result.max, Scalar::from(25i32));
447 Ok(())
448 }
449
450 #[test]
451 fn test_finish_resets_state() -> VortexResult<()> {
452 let mut ctx = LEGACY_SESSION.create_execution_ctx();
453 let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
454 let mut acc = Accumulator::try_new(MinMax, EmptyOptions, dtype)?;
455
456 let batch1 = PrimitiveArray::new(buffer![10i32, 20], Validity::NonNullable).into_array();
457 acc.accumulate(&batch1, &mut ctx)?;
458 let result1 = MinMaxResult::from_scalar(acc.finish()?)?.vortex_expect("should have result");
459 assert_eq!(result1.min, Scalar::from(10i32));
460 assert_eq!(result1.max, Scalar::from(20i32));
461
462 let batch2 = PrimitiveArray::new(buffer![3i32, 6, 9], Validity::NonNullable).into_array();
463 acc.accumulate(&batch2, &mut ctx)?;
464 let result2 = MinMaxResult::from_scalar(acc.finish()?)?.vortex_expect("should have result");
465 assert_eq!(result2.min, Scalar::from(3i32));
466 assert_eq!(result2.max, Scalar::from(9i32));
467 Ok(())
468 }
469
470 #[test]
471 fn test_state_merge() -> VortexResult<()> {
472 let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
473 let mut state = MinMax.empty_partial(&EmptyOptions, &dtype)?;
474
475 let struct_dtype = make_minmax_dtype(&dtype);
476 let scalar1 = Scalar::struct_(
477 struct_dtype.clone(),
478 vec![Scalar::from(5i32), Scalar::from(15i32)],
479 );
480 MinMax.combine_partials(&mut state, scalar1)?;
481
482 let scalar2 = Scalar::struct_(struct_dtype, vec![Scalar::from(2i32), Scalar::from(10i32)]);
483 MinMax.combine_partials(&mut state, scalar2)?;
484
485 let result = MinMaxResult::from_scalar(MinMax.to_scalar(&state)?)?
486 .vortex_expect("should have result");
487 assert_eq!(result.min, Scalar::from(2i32));
488 assert_eq!(result.max, Scalar::from(15i32));
489 Ok(())
490 }
491
492 #[test]
493 fn test_constant_nan() -> VortexResult<()> {
494 let scalar = Scalar::primitive(f16::NAN, Nullability::NonNullable);
495 let array = ConstantArray::new(scalar, 2).into_array();
496 let mut ctx = LEGACY_SESSION.create_execution_ctx();
497 assert_eq!(min_max(&array, &mut ctx)?, None);
498 Ok(())
499 }
500
501 #[test]
502 fn test_chunked() -> VortexResult<()> {
503 let chunk1 = PrimitiveArray::from_option_iter([Some(5i32), None, Some(1)]);
504 let chunk2 = PrimitiveArray::from_option_iter([Some(10i32), Some(3), None]);
505 let dtype = chunk1.dtype().clone();
506 let chunked = ChunkedArray::try_new(vec![chunk1.into_array(), chunk2.into_array()], dtype)?;
507 let mut ctx = LEGACY_SESSION.create_execution_ctx();
508 let result = min_max(&chunked.into_array(), &mut ctx)?.vortex_expect("should have result");
509 assert_eq!(result.min, Scalar::from(1i32));
510 assert_eq!(result.max, Scalar::from(10i32));
511 Ok(())
512 }
513
514 #[test]
515 fn test_all_null() -> VortexResult<()> {
516 let p = PrimitiveArray::from_option_iter::<i32, _>([None, None, None]);
517 let mut ctx = LEGACY_SESSION.create_execution_ctx();
518 assert_eq!(min_max(&p.into_array(), &mut ctx)?, None);
519 Ok(())
520 }
521
522 #[test]
523 fn test_varbin() -> VortexResult<()> {
524 let array = VarBinArray::from_iter(
525 vec![
526 Some("hello world"),
527 None,
528 Some("hello world this is a long string"),
529 None,
530 ],
531 DType::Utf8(Nullability::Nullable),
532 );
533 let mut ctx = LEGACY_SESSION.create_execution_ctx();
534 let result = min_max(&array.into_array(), &mut ctx)?.vortex_expect("should have result");
535 assert_eq!(
536 result.min,
537 Scalar::utf8("hello world", Nullability::NonNullable)
538 );
539 assert_eq!(
540 result.max,
541 Scalar::utf8(
542 "hello world this is a long string",
543 Nullability::NonNullable
544 )
545 );
546 Ok(())
547 }
548
549 #[test]
550 fn test_decimal() -> VortexResult<()> {
551 let decimal = DecimalArray::new(
552 buffer![100i32, 2000i32, 200i32],
553 DecimalDType::new(4, 2),
554 Validity::from_iter([true, false, true]),
555 );
556 let mut ctx = LEGACY_SESSION.create_execution_ctx();
557 let result = min_max(&decimal.into_array(), &mut ctx)?.vortex_expect("should have result");
558
559 let non_nullable_dtype = DType::Decimal(DecimalDType::new(4, 2), Nullability::NonNullable);
560 let expected_min = Scalar::try_new(
561 non_nullable_dtype.clone(),
562 Some(ScalarValue::from(DecimalValue::from(100i32))),
563 )?;
564 let expected_max = Scalar::try_new(
565 non_nullable_dtype,
566 Some(ScalarValue::from(DecimalValue::from(200i32))),
567 )?;
568 assert_eq!(result.min, expected_min);
569 assert_eq!(result.max, expected_max);
570 Ok(())
571 }
572
573 use crate::dtype::half::f16;
574
575 #[test]
576 fn test_bool_with_nulls() -> VortexResult<()> {
577 let mut ctx = LEGACY_SESSION.create_execution_ctx();
578
579 let result = min_max(
580 &BoolArray::from_iter(vec![Some(true), Some(true), None, None]).into_array(),
581 &mut ctx,
582 )?;
583 assert_eq!(
584 result,
585 Some(MinMaxResult {
586 min: Scalar::bool(true, Nullability::NonNullable),
587 max: Scalar::bool(true, Nullability::NonNullable),
588 })
589 );
590
591 let result = min_max(
592 &BoolArray::from_iter(vec![None, Some(true), Some(true)]).into_array(),
593 &mut ctx,
594 )?;
595 assert_eq!(
596 result,
597 Some(MinMaxResult {
598 min: Scalar::bool(true, Nullability::NonNullable),
599 max: Scalar::bool(true, Nullability::NonNullable),
600 })
601 );
602
603 let result = min_max(
604 &BoolArray::from_iter(vec![None, Some(true), Some(true), None]).into_array(),
605 &mut ctx,
606 )?;
607 assert_eq!(
608 result,
609 Some(MinMaxResult {
610 min: Scalar::bool(true, Nullability::NonNullable),
611 max: Scalar::bool(true, Nullability::NonNullable),
612 })
613 );
614
615 let result = min_max(
616 &BoolArray::from_iter(vec![Some(false), Some(false), None, None]).into_array(),
617 &mut ctx,
618 )?;
619 assert_eq!(
620 result,
621 Some(MinMaxResult {
622 min: Scalar::bool(false, Nullability::NonNullable),
623 max: Scalar::bool(false, Nullability::NonNullable),
624 })
625 );
626 Ok(())
627 }
628
629 #[test]
635 fn test_bool_chunked_with_empty_chunk() -> VortexResult<()> {
636 let mut ctx = LEGACY_SESSION.create_execution_ctx();
637
638 let empty = BoolArray::new(BitBuffer::from([].as_slice()), Validity::NonNullable);
639 let chunk1 = BoolArray::new(
640 BitBuffer::from([true, true].as_slice()),
641 Validity::NonNullable,
642 );
643 let chunk2 = BoolArray::new(
644 BitBuffer::from([true, true, true].as_slice()),
645 Validity::NonNullable,
646 );
647 let chunked = ChunkedArray::try_new(
648 vec![empty.into_array(), chunk1.into_array(), chunk2.into_array()],
649 DType::Bool(Nullability::NonNullable),
650 )?;
651
652 let result = min_max(&chunked.into_array(), &mut ctx)?;
653 assert_eq!(
654 result,
655 Some(MinMaxResult {
656 min: Scalar::bool(true, Nullability::NonNullable),
657 max: Scalar::bool(true, Nullability::NonNullable),
658 })
659 );
660 Ok(())
661 }
662
663 #[test]
664 fn test_varbin_all_nulls() -> VortexResult<()> {
665 let array = VarBinArray::from_iter(
666 vec![Option::<&str>::None, None, None],
667 DType::Utf8(Nullability::Nullable),
668 );
669 let mut ctx = LEGACY_SESSION.create_execution_ctx();
670 assert_eq!(min_max(&array.into_array(), &mut ctx)?, None);
671 Ok(())
672 }
673}