1mod bool;
5mod decimal;
6mod extension;
7mod primitive;
8mod varbin;
9
10use std::sync::LazyLock;
11
12use vortex_error::VortexExpect;
13use vortex_error::VortexResult;
14use vortex_error::vortex_bail;
15
16use self::bool::accumulate_bool;
17use self::decimal::accumulate_decimal;
18use self::extension::accumulate_extension;
19use self::primitive::accumulate_primitive;
20use self::varbin::accumulate_varbinview;
21use crate::ArrayRef;
22use crate::Canonical;
23use crate::Columnar;
24use crate::ExecutionCtx;
25use crate::aggregate_fn::Accumulator;
26use crate::aggregate_fn::AggregateFnId;
27use crate::aggregate_fn::AggregateFnVTable;
28use crate::aggregate_fn::DynAccumulator;
29use crate::aggregate_fn::EmptyOptions;
30use crate::dtype::DType;
31use crate::dtype::FieldNames;
32use crate::dtype::Nullability;
33use crate::dtype::StructFields;
34use crate::expr::stats::Precision;
35use crate::expr::stats::Stat;
36use crate::expr::stats::StatsProvider;
37use crate::partial_ord::partial_max;
38use crate::partial_ord::partial_min;
39use crate::scalar::Scalar;
40
41static NAMES: LazyLock<FieldNames> = LazyLock::new(|| FieldNames::from(["min", "max"]));
42
43pub fn min_max(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Option<MinMaxResult>> {
48 let cached_min = array
50 .statistics()
51 .get(Stat::Min)
52 .and_then(Precision::as_exact);
53 let cached_max = array
54 .statistics()
55 .get(Stat::Max)
56 .and_then(Precision::as_exact);
57 if let Some((min, max)) = cached_min.zip(cached_max) {
58 let non_nullable_dtype = array.dtype().as_nonnullable();
59 return Ok(Some(MinMaxResult {
60 min: min.cast(&non_nullable_dtype)?,
61 max: max.cast(&non_nullable_dtype)?,
62 }));
63 }
64
65 if array.is_empty() || array.valid_count()? == 0 {
67 return Ok(None);
68 }
69
70 if MinMax.return_dtype(&EmptyOptions, array.dtype()).is_none() {
72 return Ok(None);
73 }
74
75 let mut acc = Accumulator::try_new(MinMax, EmptyOptions, array.dtype().clone())?;
77 acc.accumulate(array, ctx)?;
78 let result_scalar = acc.finish()?;
79 let result = MinMaxResult::from_scalar(result_scalar)?;
80
81 if let Some(ref r) = result {
83 if let Some(min_value) = r.min.value() {
84 array
85 .statistics()
86 .set(Stat::Min, Precision::Exact(min_value.clone()));
87 }
88 if let Some(max_value) = r.max.value() {
89 array
90 .statistics()
91 .set(Stat::Max, Precision::Exact(max_value.clone()));
92 }
93 }
94
95 Ok(result)
96}
97
98#[derive(Debug, Clone, PartialEq, Eq)]
100pub struct MinMaxResult {
101 pub min: Scalar,
102 pub max: Scalar,
103}
104
105impl MinMaxResult {
106 pub fn from_scalar(scalar: Scalar) -> VortexResult<Option<Self>> {
108 if scalar.is_null() {
109 Ok(None)
110 } else {
111 let min = scalar
112 .as_struct()
113 .field_by_idx(0)
114 .vortex_expect("missing min field");
115 let max = scalar
116 .as_struct()
117 .field_by_idx(1)
118 .vortex_expect("missing max field");
119 Ok(Some(MinMaxResult { min, max }))
120 }
121 }
122}
123
124#[derive(Clone, Debug)]
129pub struct MinMax;
130
131pub struct MinMaxPartial {
133 min: Option<Scalar>,
134 max: Option<Scalar>,
135 element_dtype: DType,
136}
137
138impl MinMaxPartial {
139 fn merge(&mut self, local: Option<MinMaxResult>) {
141 let Some(MinMaxResult { min, max }) = local else {
142 return;
143 };
144
145 self.min = Some(match self.min.take() {
146 Some(current) => partial_min(min, current).vortex_expect("incomparable min scalars"),
147 None => min,
148 });
149
150 self.max = Some(match self.max.take() {
151 Some(current) => partial_max(max, current).vortex_expect("incomparable max scalars"),
152 None => max,
153 });
154 }
155}
156
157pub fn make_minmax_dtype(element_dtype: &DType) -> DType {
159 DType::Struct(
160 StructFields::new(
161 NAMES.clone(),
162 vec![
163 element_dtype.as_nonnullable(),
164 element_dtype.as_nonnullable(),
165 ],
166 ),
167 Nullability::Nullable,
168 )
169}
170
171impl AggregateFnVTable for MinMax {
172 type Options = EmptyOptions;
173 type Partial = MinMaxPartial;
174
175 fn id(&self) -> AggregateFnId {
176 AggregateFnId::new_ref("vortex.min_max")
177 }
178
179 fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
180 unimplemented!("MinMax is not yet serializable");
181 }
182
183 fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option<DType> {
184 match input_dtype {
185 DType::Bool(_)
186 | DType::Primitive(..)
187 | DType::Decimal(..)
188 | DType::Utf8(..)
189 | DType::Binary(..)
190 | DType::Extension(..) => Some(make_minmax_dtype(input_dtype)),
191 _ => None,
192 }
193 }
194
195 fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
196 self.return_dtype(options, input_dtype)
197 }
198
199 fn empty_partial(
200 &self,
201 _options: &Self::Options,
202 input_dtype: &DType,
203 ) -> VortexResult<Self::Partial> {
204 Ok(MinMaxPartial {
205 min: None,
206 max: None,
207 element_dtype: input_dtype.clone(),
208 })
209 }
210
211 fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
212 let local = MinMaxResult::from_scalar(other)?;
213 partial.merge(local);
214 Ok(())
215 }
216
217 fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
218 let dtype = make_minmax_dtype(&partial.element_dtype);
219 Ok(match (&partial.min, &partial.max) {
220 (Some(min), Some(max)) => Scalar::struct_(dtype, vec![min.clone(), max.clone()]),
221 _ => Scalar::null(dtype),
222 })
223 }
224
225 fn reset(&self, partial: &mut Self::Partial) {
226 partial.min = None;
227 partial.max = None;
228 }
229
230 #[inline]
231 fn is_saturated(&self, _partial: &Self::Partial) -> bool {
232 false
233 }
234
235 fn accumulate(
236 &self,
237 partial: &mut Self::Partial,
238 batch: &Columnar,
239 ctx: &mut ExecutionCtx,
240 ) -> VortexResult<()> {
241 match batch {
242 Columnar::Constant(c) => {
243 let scalar = c.scalar();
244 if scalar.is_null() {
245 return Ok(());
246 }
247 if scalar.as_primitive_opt().is_some_and(|p| p.is_nan()) {
249 return Ok(());
250 }
251 let non_nullable_dtype = scalar.dtype().as_nonnullable();
252 let cast = scalar.cast(&non_nullable_dtype)?;
253 partial.merge(Some(MinMaxResult {
254 min: cast.clone(),
255 max: cast,
256 }));
257 Ok(())
258 }
259 Columnar::Canonical(c) => match c {
260 Canonical::Primitive(p) => accumulate_primitive(partial, p),
261 Canonical::Bool(b) => accumulate_bool(partial, b),
262 Canonical::VarBinView(v) => accumulate_varbinview(partial, v),
263 Canonical::Decimal(d) => accumulate_decimal(partial, d),
264 Canonical::Extension(e) => accumulate_extension(partial, e, ctx),
265 Canonical::Null(_) => Ok(()),
266 Canonical::Struct(_)
267 | Canonical::List(_)
268 | Canonical::FixedSizeList(_)
269 | Canonical::Variant(_) => {
270 vortex_bail!("Unsupported canonical type for min_max: {}", batch.dtype())
271 }
272 },
273 }
274 }
275
276 fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
277 Ok(partials)
278 }
279
280 fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
281 self.to_scalar(partial)
282 }
283}
284
285#[cfg(test)]
286mod tests {
287 use vortex_buffer::BitBuffer;
288 use vortex_buffer::buffer;
289 use vortex_error::VortexExpect;
290 use vortex_error::VortexResult;
291
292 use crate::IntoArray as _;
293 use crate::LEGACY_SESSION;
294 use crate::VortexSessionExecute;
295 use crate::aggregate_fn::Accumulator;
296 use crate::aggregate_fn::AggregateFnVTable;
297 use crate::aggregate_fn::DynAccumulator;
298 use crate::aggregate_fn::EmptyOptions;
299 use crate::aggregate_fn::fns::min_max::MinMax;
300 use crate::aggregate_fn::fns::min_max::MinMaxResult;
301 use crate::aggregate_fn::fns::min_max::min_max;
302 use crate::arrays::BoolArray;
303 use crate::arrays::ChunkedArray;
304 use crate::arrays::ConstantArray;
305 use crate::arrays::DecimalArray;
306 use crate::arrays::NullArray;
307 use crate::arrays::PrimitiveArray;
308 use crate::arrays::VarBinArray;
309 use crate::dtype::DType;
310 use crate::dtype::DecimalDType;
311 use crate::dtype::Nullability;
312 use crate::dtype::PType;
313 use crate::scalar::DecimalValue;
314 use crate::scalar::Scalar;
315 use crate::scalar::ScalarValue;
316 use crate::validity::Validity;
317
318 #[test]
319 fn test_prim_min_max() -> VortexResult<()> {
320 let p = PrimitiveArray::new(buffer![1, 2, 3], Validity::NonNullable).into_array();
321 let mut ctx = LEGACY_SESSION.create_execution_ctx();
322 assert_eq!(
323 min_max(&p, &mut ctx)?,
324 Some(MinMaxResult {
325 min: 1.into(),
326 max: 3.into()
327 })
328 );
329 Ok(())
330 }
331
332 #[test]
333 fn test_bool_min_max() -> VortexResult<()> {
334 let mut ctx = LEGACY_SESSION.create_execution_ctx();
335
336 let all_true = BoolArray::new(
337 BitBuffer::from([true, true, true].as_slice()),
338 Validity::NonNullable,
339 )
340 .into_array();
341 assert_eq!(
342 min_max(&all_true, &mut ctx)?,
343 Some(MinMaxResult {
344 min: true.into(),
345 max: true.into()
346 })
347 );
348
349 let all_false = BoolArray::new(
350 BitBuffer::from([false, false, false].as_slice()),
351 Validity::NonNullable,
352 )
353 .into_array();
354 assert_eq!(
355 min_max(&all_false, &mut ctx)?,
356 Some(MinMaxResult {
357 min: false.into(),
358 max: false.into()
359 })
360 );
361
362 let mixed = BoolArray::new(
363 BitBuffer::from([false, true, false].as_slice()),
364 Validity::NonNullable,
365 )
366 .into_array();
367 assert_eq!(
368 min_max(&mixed, &mut ctx)?,
369 Some(MinMaxResult {
370 min: false.into(),
371 max: true.into()
372 })
373 );
374 Ok(())
375 }
376
377 #[test]
378 fn test_null_array() -> VortexResult<()> {
379 let p = NullArray::new(1).into_array();
380 let mut ctx = LEGACY_SESSION.create_execution_ctx();
381 assert_eq!(min_max(&p, &mut ctx)?, None);
382 Ok(())
383 }
384
385 #[test]
386 fn test_prim_nan() -> VortexResult<()> {
387 let array = PrimitiveArray::new(
388 buffer![f32::NAN, -f32::NAN, -1.0, 1.0],
389 Validity::NonNullable,
390 );
391 let mut ctx = LEGACY_SESSION.create_execution_ctx();
392 let result = min_max(&array.into_array(), &mut ctx)?.vortex_expect("should have result");
393 assert_eq!(f32::try_from(&result.min)?, -1.0);
394 assert_eq!(f32::try_from(&result.max)?, 1.0);
395 Ok(())
396 }
397
398 #[test]
399 fn test_prim_inf() -> VortexResult<()> {
400 let array = PrimitiveArray::new(
401 buffer![f32::INFINITY, f32::NEG_INFINITY, -1.0, 1.0],
402 Validity::NonNullable,
403 );
404 let mut ctx = LEGACY_SESSION.create_execution_ctx();
405 let result = min_max(&array.into_array(), &mut ctx)?.vortex_expect("should have result");
406 assert_eq!(f32::try_from(&result.min)?, f32::NEG_INFINITY);
407 assert_eq!(f32::try_from(&result.max)?, f32::INFINITY);
408 Ok(())
409 }
410
411 #[test]
412 fn test_multi_batch() -> VortexResult<()> {
413 let mut ctx = LEGACY_SESSION.create_execution_ctx();
414 let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
415 let mut acc = Accumulator::try_new(MinMax, EmptyOptions, dtype)?;
416
417 let batch1 = PrimitiveArray::new(buffer![10i32, 20, 5], Validity::NonNullable).into_array();
418 acc.accumulate(&batch1, &mut ctx)?;
419
420 let batch2 = PrimitiveArray::new(buffer![3i32, 25], Validity::NonNullable).into_array();
421 acc.accumulate(&batch2, &mut ctx)?;
422
423 let result = MinMaxResult::from_scalar(acc.finish()?)?.vortex_expect("should have result");
424 assert_eq!(result.min, Scalar::from(3i32));
425 assert_eq!(result.max, Scalar::from(25i32));
426 Ok(())
427 }
428
429 #[test]
430 fn test_finish_resets_state() -> VortexResult<()> {
431 let mut ctx = LEGACY_SESSION.create_execution_ctx();
432 let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
433 let mut acc = Accumulator::try_new(MinMax, EmptyOptions, dtype)?;
434
435 let batch1 = PrimitiveArray::new(buffer![10i32, 20], Validity::NonNullable).into_array();
436 acc.accumulate(&batch1, &mut ctx)?;
437 let result1 = MinMaxResult::from_scalar(acc.finish()?)?.vortex_expect("should have result");
438 assert_eq!(result1.min, Scalar::from(10i32));
439 assert_eq!(result1.max, Scalar::from(20i32));
440
441 let batch2 = PrimitiveArray::new(buffer![3i32, 6, 9], Validity::NonNullable).into_array();
442 acc.accumulate(&batch2, &mut ctx)?;
443 let result2 = MinMaxResult::from_scalar(acc.finish()?)?.vortex_expect("should have result");
444 assert_eq!(result2.min, Scalar::from(3i32));
445 assert_eq!(result2.max, Scalar::from(9i32));
446 Ok(())
447 }
448
449 #[test]
450 fn test_state_merge() -> VortexResult<()> {
451 let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
452 let mut state = MinMax.empty_partial(&EmptyOptions, &dtype)?;
453
454 let struct_dtype = crate::aggregate_fn::fns::min_max::make_minmax_dtype(&dtype);
455 let scalar1 = Scalar::struct_(
456 struct_dtype.clone(),
457 vec![Scalar::from(5i32), Scalar::from(15i32)],
458 );
459 MinMax.combine_partials(&mut state, scalar1)?;
460
461 let scalar2 = Scalar::struct_(struct_dtype, vec![Scalar::from(2i32), Scalar::from(10i32)]);
462 MinMax.combine_partials(&mut state, scalar2)?;
463
464 let result = MinMaxResult::from_scalar(MinMax.to_scalar(&state)?)?
465 .vortex_expect("should have result");
466 assert_eq!(result.min, Scalar::from(2i32));
467 assert_eq!(result.max, Scalar::from(15i32));
468 Ok(())
469 }
470
471 #[test]
472 fn test_constant_nan() -> VortexResult<()> {
473 let scalar = Scalar::primitive(f16::NAN, Nullability::NonNullable);
474 let array = ConstantArray::new(scalar, 2).into_array();
475 let mut ctx = LEGACY_SESSION.create_execution_ctx();
476 assert_eq!(min_max(&array, &mut ctx)?, None);
477 Ok(())
478 }
479
480 #[test]
481 fn test_chunked() -> VortexResult<()> {
482 let chunk1 = PrimitiveArray::from_option_iter([Some(5i32), None, Some(1)]);
483 let chunk2 = PrimitiveArray::from_option_iter([Some(10i32), Some(3), None]);
484 let dtype = chunk1.dtype().clone();
485 let chunked = ChunkedArray::try_new(vec![chunk1.into_array(), chunk2.into_array()], dtype)?;
486 let mut ctx = LEGACY_SESSION.create_execution_ctx();
487 let result = min_max(&chunked.into_array(), &mut ctx)?.vortex_expect("should have result");
488 assert_eq!(result.min, Scalar::from(1i32));
489 assert_eq!(result.max, Scalar::from(10i32));
490 Ok(())
491 }
492
493 #[test]
494 fn test_all_null() -> VortexResult<()> {
495 let p = PrimitiveArray::from_option_iter::<i32, _>([None, None, None]);
496 let mut ctx = LEGACY_SESSION.create_execution_ctx();
497 assert_eq!(min_max(&p.into_array(), &mut ctx)?, None);
498 Ok(())
499 }
500
501 #[test]
502 fn test_varbin() -> VortexResult<()> {
503 let array = VarBinArray::from_iter(
504 vec![
505 Some("hello world"),
506 None,
507 Some("hello world this is a long string"),
508 None,
509 ],
510 DType::Utf8(Nullability::Nullable),
511 );
512 let mut ctx = LEGACY_SESSION.create_execution_ctx();
513 let result = min_max(&array.into_array(), &mut ctx)?.vortex_expect("should have result");
514 assert_eq!(
515 result.min,
516 Scalar::utf8("hello world", Nullability::NonNullable)
517 );
518 assert_eq!(
519 result.max,
520 Scalar::utf8(
521 "hello world this is a long string",
522 Nullability::NonNullable
523 )
524 );
525 Ok(())
526 }
527
528 #[test]
529 fn test_decimal() -> VortexResult<()> {
530 let decimal = DecimalArray::new(
531 buffer![100i32, 2000i32, 200i32],
532 DecimalDType::new(4, 2),
533 Validity::from_iter([true, false, true]),
534 );
535 let mut ctx = LEGACY_SESSION.create_execution_ctx();
536 let result = min_max(&decimal.into_array(), &mut ctx)?.vortex_expect("should have result");
537
538 let non_nullable_dtype = DType::Decimal(DecimalDType::new(4, 2), Nullability::NonNullable);
539 let expected_min = Scalar::try_new(
540 non_nullable_dtype.clone(),
541 Some(ScalarValue::from(DecimalValue::from(100i32))),
542 )?;
543 let expected_max = Scalar::try_new(
544 non_nullable_dtype,
545 Some(ScalarValue::from(DecimalValue::from(200i32))),
546 )?;
547 assert_eq!(result.min, expected_min);
548 assert_eq!(result.max, expected_max);
549 Ok(())
550 }
551
552 use crate::dtype::half::f16;
553
554 #[test]
555 fn test_bool_with_nulls() -> VortexResult<()> {
556 let mut ctx = LEGACY_SESSION.create_execution_ctx();
557
558 let result = min_max(
559 &BoolArray::from_iter(vec![Some(true), Some(true), None, None]).into_array(),
560 &mut ctx,
561 )?;
562 assert_eq!(
563 result,
564 Some(MinMaxResult {
565 min: Scalar::bool(true, Nullability::NonNullable),
566 max: Scalar::bool(true, Nullability::NonNullable),
567 })
568 );
569
570 let result = min_max(
571 &BoolArray::from_iter(vec![None, Some(true), Some(true)]).into_array(),
572 &mut ctx,
573 )?;
574 assert_eq!(
575 result,
576 Some(MinMaxResult {
577 min: Scalar::bool(true, Nullability::NonNullable),
578 max: Scalar::bool(true, Nullability::NonNullable),
579 })
580 );
581
582 let result = min_max(
583 &BoolArray::from_iter(vec![None, Some(true), Some(true), None]).into_array(),
584 &mut ctx,
585 )?;
586 assert_eq!(
587 result,
588 Some(MinMaxResult {
589 min: Scalar::bool(true, Nullability::NonNullable),
590 max: Scalar::bool(true, Nullability::NonNullable),
591 })
592 );
593
594 let result = min_max(
595 &BoolArray::from_iter(vec![Some(false), Some(false), None, None]).into_array(),
596 &mut ctx,
597 )?;
598 assert_eq!(
599 result,
600 Some(MinMaxResult {
601 min: Scalar::bool(false, Nullability::NonNullable),
602 max: Scalar::bool(false, Nullability::NonNullable),
603 })
604 );
605 Ok(())
606 }
607
608 #[test]
614 fn test_bool_chunked_with_empty_chunk() -> VortexResult<()> {
615 let mut ctx = LEGACY_SESSION.create_execution_ctx();
616
617 let empty = BoolArray::new(BitBuffer::from([].as_slice()), Validity::NonNullable);
618 let chunk1 = BoolArray::new(
619 BitBuffer::from([true, true].as_slice()),
620 Validity::NonNullable,
621 );
622 let chunk2 = BoolArray::new(
623 BitBuffer::from([true, true, true].as_slice()),
624 Validity::NonNullable,
625 );
626 let chunked = ChunkedArray::try_new(
627 vec![empty.into_array(), chunk1.into_array(), chunk2.into_array()],
628 DType::Bool(Nullability::NonNullable),
629 )?;
630
631 let result = min_max(&chunked.into_array(), &mut ctx)?;
632 assert_eq!(
633 result,
634 Some(MinMaxResult {
635 min: Scalar::bool(true, Nullability::NonNullable),
636 max: Scalar::bool(true, Nullability::NonNullable),
637 })
638 );
639 Ok(())
640 }
641
642 #[test]
643 fn test_varbin_all_nulls() -> VortexResult<()> {
644 let array = VarBinArray::from_iter(
645 vec![Option::<&str>::None, None, None],
646 DType::Utf8(Nullability::Nullable),
647 );
648 let mut ctx = LEGACY_SESSION.create_execution_ctx();
649 assert_eq!(min_max(&array.into_array(), &mut ctx)?, None);
650 Ok(())
651 }
652}