1mod bool;
5mod decimal;
6mod extension;
7mod primitive;
8mod varbin;
9
10use std::sync::LazyLock;
11
12use vortex_error::VortexExpect;
13use vortex_error::VortexResult;
14use vortex_error::vortex_bail;
15
16use self::bool::accumulate_bool;
17use self::decimal::accumulate_decimal;
18use self::extension::accumulate_extension;
19use self::primitive::accumulate_primitive;
20use self::varbin::accumulate_varbinview;
21use crate::ArrayRef;
22use crate::Canonical;
23use crate::Columnar;
24use crate::ExecutionCtx;
25use crate::aggregate_fn::Accumulator;
26use crate::aggregate_fn::AggregateFnId;
27use crate::aggregate_fn::AggregateFnVTable;
28use crate::aggregate_fn::DynAccumulator;
29use crate::aggregate_fn::EmptyOptions;
30use crate::dtype::DType;
31use crate::dtype::FieldNames;
32use crate::dtype::Nullability;
33use crate::dtype::StructFields;
34use crate::expr::stats::Precision;
35use crate::expr::stats::Stat;
36use crate::expr::stats::StatsProvider;
37use crate::partial_ord::partial_max;
38use crate::partial_ord::partial_min;
39use crate::scalar::Scalar;
40
41static NAMES: LazyLock<FieldNames> = LazyLock::new(|| FieldNames::from(["min", "max"]));
42
43pub fn min_max(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Option<MinMaxResult>> {
48 let cached_min = array
50 .statistics()
51 .get(Stat::Min)
52 .and_then(Precision::as_exact);
53 let cached_max = array
54 .statistics()
55 .get(Stat::Max)
56 .and_then(Precision::as_exact);
57 if let Some((min, max)) = cached_min.zip(cached_max) {
58 let non_nullable_dtype = array.dtype().as_nonnullable();
59 return Ok(Some(MinMaxResult {
60 min: min.cast(&non_nullable_dtype)?,
61 max: max.cast(&non_nullable_dtype)?,
62 }));
63 }
64
65 if array.is_empty() || array.valid_count()? == 0 {
67 return Ok(None);
68 }
69
70 if MinMax.return_dtype(&EmptyOptions, array.dtype()).is_none() {
72 return Ok(None);
73 }
74
75 let mut acc = Accumulator::try_new(MinMax, EmptyOptions, array.dtype().clone())?;
77 acc.accumulate(array, ctx)?;
78 let result_scalar = acc.finish()?;
79 let result = MinMaxResult::from_scalar(result_scalar)?;
80
81 if let Some(ref r) = result {
83 if let Some(min_value) = r.min.value() {
84 array
85 .statistics()
86 .set(Stat::Min, Precision::Exact(min_value.clone()));
87 }
88 if let Some(max_value) = r.max.value() {
89 array
90 .statistics()
91 .set(Stat::Max, Precision::Exact(max_value.clone()));
92 }
93 }
94
95 Ok(result)
96}
97
98#[derive(Debug, Clone, PartialEq, Eq)]
100pub struct MinMaxResult {
101 pub min: Scalar,
102 pub max: Scalar,
103}
104
105impl MinMaxResult {
106 pub fn from_scalar(scalar: Scalar) -> VortexResult<Option<Self>> {
108 if scalar.is_null() {
109 Ok(None)
110 } else {
111 let min = scalar
112 .as_struct()
113 .field_by_idx(0)
114 .vortex_expect("missing min field");
115 let max = scalar
116 .as_struct()
117 .field_by_idx(1)
118 .vortex_expect("missing max field");
119 Ok(Some(MinMaxResult { min, max }))
120 }
121 }
122}
123
124#[derive(Clone, Debug)]
129pub struct MinMax;
130
131pub struct MinMaxPartial {
133 min: Option<Scalar>,
134 max: Option<Scalar>,
135 element_dtype: DType,
136}
137
138impl MinMaxPartial {
139 fn merge(&mut self, local: Option<MinMaxResult>) {
141 let Some(MinMaxResult { min, max }) = local else {
142 return;
143 };
144
145 self.min = Some(match self.min.take() {
146 Some(current) => partial_min(min, current).vortex_expect("incomparable min scalars"),
147 None => min,
148 });
149
150 self.max = Some(match self.max.take() {
151 Some(current) => partial_max(max, current).vortex_expect("incomparable max scalars"),
152 None => max,
153 });
154 }
155}
156
157pub fn make_minmax_dtype(element_dtype: &DType) -> DType {
159 DType::Struct(
160 StructFields::new(
161 NAMES.clone(),
162 vec![
163 element_dtype.as_nonnullable(),
164 element_dtype.as_nonnullable(),
165 ],
166 ),
167 Nullability::Nullable,
168 )
169}
170
171impl AggregateFnVTable for MinMax {
172 type Options = EmptyOptions;
173 type Partial = MinMaxPartial;
174
175 fn id(&self) -> AggregateFnId {
176 AggregateFnId::new_ref("vortex.min_max")
177 }
178
179 fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
180 Ok(Some(vec![]))
181 }
182
183 fn deserialize(
184 &self,
185 _metadata: &[u8],
186 _session: &vortex_session::VortexSession,
187 ) -> VortexResult<Self::Options> {
188 Ok(EmptyOptions)
189 }
190
191 fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option<DType> {
192 match input_dtype {
193 DType::Bool(_)
194 | DType::Primitive(..)
195 | DType::Decimal(..)
196 | DType::Utf8(..)
197 | DType::Binary(..)
198 | DType::Extension(..) => Some(make_minmax_dtype(input_dtype)),
199 _ => None,
200 }
201 }
202
203 fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
204 self.return_dtype(options, input_dtype)
205 }
206
207 fn empty_partial(
208 &self,
209 _options: &Self::Options,
210 input_dtype: &DType,
211 ) -> VortexResult<Self::Partial> {
212 Ok(MinMaxPartial {
213 min: None,
214 max: None,
215 element_dtype: input_dtype.clone(),
216 })
217 }
218
219 fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
220 let local = MinMaxResult::from_scalar(other)?;
221 partial.merge(local);
222 Ok(())
223 }
224
225 fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
226 let dtype = make_minmax_dtype(&partial.element_dtype);
227 Ok(match (&partial.min, &partial.max) {
228 (Some(min), Some(max)) => Scalar::struct_(dtype, vec![min.clone(), max.clone()]),
229 _ => Scalar::null(dtype),
230 })
231 }
232
233 fn reset(&self, partial: &mut Self::Partial) {
234 partial.min = None;
235 partial.max = None;
236 }
237
238 #[inline]
239 fn is_saturated(&self, _partial: &Self::Partial) -> bool {
240 false
241 }
242
243 fn accumulate(
244 &self,
245 partial: &mut Self::Partial,
246 batch: &Columnar,
247 ctx: &mut ExecutionCtx,
248 ) -> VortexResult<()> {
249 match batch {
250 Columnar::Constant(c) => {
251 let scalar = c.scalar();
252 if scalar.is_null() {
253 return Ok(());
254 }
255 if scalar.as_primitive_opt().is_some_and(|p| p.is_nan()) {
257 return Ok(());
258 }
259 let non_nullable_dtype = scalar.dtype().as_nonnullable();
260 let cast = scalar.cast(&non_nullable_dtype)?;
261 partial.merge(Some(MinMaxResult {
262 min: cast.clone(),
263 max: cast,
264 }));
265 Ok(())
266 }
267 Columnar::Canonical(c) => match c {
268 Canonical::Primitive(p) => accumulate_primitive(partial, p),
269 Canonical::Bool(b) => accumulate_bool(partial, b),
270 Canonical::VarBinView(v) => accumulate_varbinview(partial, v),
271 Canonical::Decimal(d) => accumulate_decimal(partial, d),
272 Canonical::Extension(e) => accumulate_extension(partial, e, ctx),
273 Canonical::Null(_) => Ok(()),
274 Canonical::Struct(_)
275 | Canonical::List(_)
276 | Canonical::FixedSizeList(_)
277 | Canonical::Variant(_) => {
278 vortex_bail!("Unsupported canonical type for min_max: {}", batch.dtype())
279 }
280 },
281 }
282 }
283
284 fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
285 Ok(partials)
286 }
287
288 fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
289 self.to_scalar(partial)
290 }
291}
292
293#[cfg(test)]
294mod tests {
295 use vortex_buffer::BitBuffer;
296 use vortex_buffer::buffer;
297 use vortex_error::VortexExpect;
298 use vortex_error::VortexResult;
299
300 use crate::IntoArray as _;
301 use crate::LEGACY_SESSION;
302 use crate::VortexSessionExecute;
303 use crate::aggregate_fn::Accumulator;
304 use crate::aggregate_fn::AggregateFnVTable;
305 use crate::aggregate_fn::DynAccumulator;
306 use crate::aggregate_fn::EmptyOptions;
307 use crate::aggregate_fn::fns::min_max::MinMax;
308 use crate::aggregate_fn::fns::min_max::MinMaxResult;
309 use crate::aggregate_fn::fns::min_max::min_max;
310 use crate::arrays::BoolArray;
311 use crate::arrays::ChunkedArray;
312 use crate::arrays::ConstantArray;
313 use crate::arrays::DecimalArray;
314 use crate::arrays::NullArray;
315 use crate::arrays::PrimitiveArray;
316 use crate::arrays::VarBinArray;
317 use crate::dtype::DType;
318 use crate::dtype::DecimalDType;
319 use crate::dtype::Nullability;
320 use crate::dtype::PType;
321 use crate::scalar::DecimalValue;
322 use crate::scalar::Scalar;
323 use crate::scalar::ScalarValue;
324 use crate::validity::Validity;
325
326 #[test]
327 fn test_prim_min_max() -> VortexResult<()> {
328 let p = PrimitiveArray::new(buffer![1, 2, 3], Validity::NonNullable).into_array();
329 let mut ctx = LEGACY_SESSION.create_execution_ctx();
330 assert_eq!(
331 min_max(&p, &mut ctx)?,
332 Some(MinMaxResult {
333 min: 1.into(),
334 max: 3.into()
335 })
336 );
337 Ok(())
338 }
339
340 #[test]
341 fn test_bool_min_max() -> VortexResult<()> {
342 let mut ctx = LEGACY_SESSION.create_execution_ctx();
343
344 let all_true = BoolArray::new(
345 BitBuffer::from([true, true, true].as_slice()),
346 Validity::NonNullable,
347 )
348 .into_array();
349 assert_eq!(
350 min_max(&all_true, &mut ctx)?,
351 Some(MinMaxResult {
352 min: true.into(),
353 max: true.into()
354 })
355 );
356
357 let all_false = BoolArray::new(
358 BitBuffer::from([false, false, false].as_slice()),
359 Validity::NonNullable,
360 )
361 .into_array();
362 assert_eq!(
363 min_max(&all_false, &mut ctx)?,
364 Some(MinMaxResult {
365 min: false.into(),
366 max: false.into()
367 })
368 );
369
370 let mixed = BoolArray::new(
371 BitBuffer::from([false, true, false].as_slice()),
372 Validity::NonNullable,
373 )
374 .into_array();
375 assert_eq!(
376 min_max(&mixed, &mut ctx)?,
377 Some(MinMaxResult {
378 min: false.into(),
379 max: true.into()
380 })
381 );
382 Ok(())
383 }
384
385 #[test]
386 fn test_null_array() -> VortexResult<()> {
387 let p = NullArray::new(1).into_array();
388 let mut ctx = LEGACY_SESSION.create_execution_ctx();
389 assert_eq!(min_max(&p, &mut ctx)?, None);
390 Ok(())
391 }
392
393 #[test]
394 fn test_prim_nan() -> VortexResult<()> {
395 let array = PrimitiveArray::new(
396 buffer![f32::NAN, -f32::NAN, -1.0, 1.0],
397 Validity::NonNullable,
398 );
399 let mut ctx = LEGACY_SESSION.create_execution_ctx();
400 let result = min_max(&array.into_array(), &mut ctx)?.vortex_expect("should have result");
401 assert_eq!(f32::try_from(&result.min)?, -1.0);
402 assert_eq!(f32::try_from(&result.max)?, 1.0);
403 Ok(())
404 }
405
406 #[test]
407 fn test_prim_inf() -> VortexResult<()> {
408 let array = PrimitiveArray::new(
409 buffer![f32::INFINITY, f32::NEG_INFINITY, -1.0, 1.0],
410 Validity::NonNullable,
411 );
412 let mut ctx = LEGACY_SESSION.create_execution_ctx();
413 let result = min_max(&array.into_array(), &mut ctx)?.vortex_expect("should have result");
414 assert_eq!(f32::try_from(&result.min)?, f32::NEG_INFINITY);
415 assert_eq!(f32::try_from(&result.max)?, f32::INFINITY);
416 Ok(())
417 }
418
419 #[test]
420 fn test_multi_batch() -> VortexResult<()> {
421 let mut ctx = LEGACY_SESSION.create_execution_ctx();
422 let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
423 let mut acc = Accumulator::try_new(MinMax, EmptyOptions, dtype)?;
424
425 let batch1 = PrimitiveArray::new(buffer![10i32, 20, 5], Validity::NonNullable).into_array();
426 acc.accumulate(&batch1, &mut ctx)?;
427
428 let batch2 = PrimitiveArray::new(buffer![3i32, 25], Validity::NonNullable).into_array();
429 acc.accumulate(&batch2, &mut ctx)?;
430
431 let result = MinMaxResult::from_scalar(acc.finish()?)?.vortex_expect("should have result");
432 assert_eq!(result.min, Scalar::from(3i32));
433 assert_eq!(result.max, Scalar::from(25i32));
434 Ok(())
435 }
436
437 #[test]
438 fn test_finish_resets_state() -> VortexResult<()> {
439 let mut ctx = LEGACY_SESSION.create_execution_ctx();
440 let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
441 let mut acc = Accumulator::try_new(MinMax, EmptyOptions, dtype)?;
442
443 let batch1 = PrimitiveArray::new(buffer![10i32, 20], Validity::NonNullable).into_array();
444 acc.accumulate(&batch1, &mut ctx)?;
445 let result1 = MinMaxResult::from_scalar(acc.finish()?)?.vortex_expect("should have result");
446 assert_eq!(result1.min, Scalar::from(10i32));
447 assert_eq!(result1.max, Scalar::from(20i32));
448
449 let batch2 = PrimitiveArray::new(buffer![3i32, 6, 9], Validity::NonNullable).into_array();
450 acc.accumulate(&batch2, &mut ctx)?;
451 let result2 = MinMaxResult::from_scalar(acc.finish()?)?.vortex_expect("should have result");
452 assert_eq!(result2.min, Scalar::from(3i32));
453 assert_eq!(result2.max, Scalar::from(9i32));
454 Ok(())
455 }
456
457 #[test]
458 fn test_state_merge() -> VortexResult<()> {
459 let dtype = DType::Primitive(PType::I32, Nullability::NonNullable);
460 let mut state = MinMax.empty_partial(&EmptyOptions, &dtype)?;
461
462 let struct_dtype = crate::aggregate_fn::fns::min_max::make_minmax_dtype(&dtype);
463 let scalar1 = Scalar::struct_(
464 struct_dtype.clone(),
465 vec![Scalar::from(5i32), Scalar::from(15i32)],
466 );
467 MinMax.combine_partials(&mut state, scalar1)?;
468
469 let scalar2 = Scalar::struct_(struct_dtype, vec![Scalar::from(2i32), Scalar::from(10i32)]);
470 MinMax.combine_partials(&mut state, scalar2)?;
471
472 let result = MinMaxResult::from_scalar(MinMax.to_scalar(&state)?)?
473 .vortex_expect("should have result");
474 assert_eq!(result.min, Scalar::from(2i32));
475 assert_eq!(result.max, Scalar::from(15i32));
476 Ok(())
477 }
478
479 #[test]
480 fn test_constant_nan() -> VortexResult<()> {
481 let scalar = Scalar::primitive(f16::NAN, Nullability::NonNullable);
482 let array = ConstantArray::new(scalar, 2).into_array();
483 let mut ctx = LEGACY_SESSION.create_execution_ctx();
484 assert_eq!(min_max(&array, &mut ctx)?, None);
485 Ok(())
486 }
487
488 #[test]
489 fn test_chunked() -> VortexResult<()> {
490 let chunk1 = PrimitiveArray::from_option_iter([Some(5i32), None, Some(1)]);
491 let chunk2 = PrimitiveArray::from_option_iter([Some(10i32), Some(3), None]);
492 let dtype = chunk1.dtype().clone();
493 let chunked = ChunkedArray::try_new(vec![chunk1.into_array(), chunk2.into_array()], dtype)?;
494 let mut ctx = LEGACY_SESSION.create_execution_ctx();
495 let result = min_max(&chunked.into_array(), &mut ctx)?.vortex_expect("should have result");
496 assert_eq!(result.min, Scalar::from(1i32));
497 assert_eq!(result.max, Scalar::from(10i32));
498 Ok(())
499 }
500
501 #[test]
502 fn test_all_null() -> VortexResult<()> {
503 let p = PrimitiveArray::from_option_iter::<i32, _>([None, None, None]);
504 let mut ctx = LEGACY_SESSION.create_execution_ctx();
505 assert_eq!(min_max(&p.into_array(), &mut ctx)?, None);
506 Ok(())
507 }
508
509 #[test]
510 fn test_varbin() -> VortexResult<()> {
511 let array = VarBinArray::from_iter(
512 vec![
513 Some("hello world"),
514 None,
515 Some("hello world this is a long string"),
516 None,
517 ],
518 DType::Utf8(Nullability::Nullable),
519 );
520 let mut ctx = LEGACY_SESSION.create_execution_ctx();
521 let result = min_max(&array.into_array(), &mut ctx)?.vortex_expect("should have result");
522 assert_eq!(
523 result.min,
524 Scalar::utf8("hello world", Nullability::NonNullable)
525 );
526 assert_eq!(
527 result.max,
528 Scalar::utf8(
529 "hello world this is a long string",
530 Nullability::NonNullable
531 )
532 );
533 Ok(())
534 }
535
536 #[test]
537 fn test_decimal() -> VortexResult<()> {
538 let decimal = DecimalArray::new(
539 buffer![100i32, 2000i32, 200i32],
540 DecimalDType::new(4, 2),
541 Validity::from_iter([true, false, true]),
542 );
543 let mut ctx = LEGACY_SESSION.create_execution_ctx();
544 let result = min_max(&decimal.into_array(), &mut ctx)?.vortex_expect("should have result");
545
546 let non_nullable_dtype = DType::Decimal(DecimalDType::new(4, 2), Nullability::NonNullable);
547 let expected_min = Scalar::try_new(
548 non_nullable_dtype.clone(),
549 Some(ScalarValue::from(DecimalValue::from(100i32))),
550 )?;
551 let expected_max = Scalar::try_new(
552 non_nullable_dtype,
553 Some(ScalarValue::from(DecimalValue::from(200i32))),
554 )?;
555 assert_eq!(result.min, expected_min);
556 assert_eq!(result.max, expected_max);
557 Ok(())
558 }
559
560 use crate::dtype::half::f16;
561
562 #[test]
563 fn test_bool_with_nulls() -> VortexResult<()> {
564 let mut ctx = LEGACY_SESSION.create_execution_ctx();
565
566 let result = min_max(
567 &BoolArray::from_iter(vec![Some(true), Some(true), None, None]).into_array(),
568 &mut ctx,
569 )?;
570 assert_eq!(
571 result,
572 Some(MinMaxResult {
573 min: Scalar::bool(true, Nullability::NonNullable),
574 max: Scalar::bool(true, Nullability::NonNullable),
575 })
576 );
577
578 let result = min_max(
579 &BoolArray::from_iter(vec![None, Some(true), Some(true)]).into_array(),
580 &mut ctx,
581 )?;
582 assert_eq!(
583 result,
584 Some(MinMaxResult {
585 min: Scalar::bool(true, Nullability::NonNullable),
586 max: Scalar::bool(true, Nullability::NonNullable),
587 })
588 );
589
590 let result = min_max(
591 &BoolArray::from_iter(vec![None, Some(true), Some(true), None]).into_array(),
592 &mut ctx,
593 )?;
594 assert_eq!(
595 result,
596 Some(MinMaxResult {
597 min: Scalar::bool(true, Nullability::NonNullable),
598 max: Scalar::bool(true, Nullability::NonNullable),
599 })
600 );
601
602 let result = min_max(
603 &BoolArray::from_iter(vec![Some(false), Some(false), None, None]).into_array(),
604 &mut ctx,
605 )?;
606 assert_eq!(
607 result,
608 Some(MinMaxResult {
609 min: Scalar::bool(false, Nullability::NonNullable),
610 max: Scalar::bool(false, Nullability::NonNullable),
611 })
612 );
613 Ok(())
614 }
615
616 #[test]
622 fn test_bool_chunked_with_empty_chunk() -> VortexResult<()> {
623 let mut ctx = LEGACY_SESSION.create_execution_ctx();
624
625 let empty = BoolArray::new(BitBuffer::from([].as_slice()), Validity::NonNullable);
626 let chunk1 = BoolArray::new(
627 BitBuffer::from([true, true].as_slice()),
628 Validity::NonNullable,
629 );
630 let chunk2 = BoolArray::new(
631 BitBuffer::from([true, true, true].as_slice()),
632 Validity::NonNullable,
633 );
634 let chunked = ChunkedArray::try_new(
635 vec![empty.into_array(), chunk1.into_array(), chunk2.into_array()],
636 DType::Bool(Nullability::NonNullable),
637 )?;
638
639 let result = min_max(&chunked.into_array(), &mut ctx)?;
640 assert_eq!(
641 result,
642 Some(MinMaxResult {
643 min: Scalar::bool(true, Nullability::NonNullable),
644 max: Scalar::bool(true, Nullability::NonNullable),
645 })
646 );
647 Ok(())
648 }
649
650 #[test]
651 fn test_varbin_all_nulls() -> VortexResult<()> {
652 let array = VarBinArray::from_iter(
653 vec![Option::<&str>::None, None, None],
654 DType::Utf8(Nullability::Nullable),
655 );
656 let mut ctx = LEGACY_SESSION.create_execution_ctx();
657 assert_eq!(min_max(&array.into_array(), &mut ctx)?, None);
658 Ok(())
659 }
660}