1use arrow::array::{
21 ArrayRef, BinaryArray, BinaryViewArray, BooleanArray, Date32Array, Date64Array,
22 Decimal32Array, Decimal64Array, Decimal128Array, Decimal256Array,
23 DurationMicrosecondArray, DurationMillisecondArray, DurationNanosecondArray,
24 DurationSecondArray, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array,
25 Int8Array, Int16Array, Int32Array, Int64Array, IntervalDayTimeArray,
26 IntervalMonthDayNanoArray, IntervalYearMonthArray, LargeBinaryArray,
27 LargeStringArray, StringArray, StringViewArray, Time32MillisecondArray,
28 Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray,
29 TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
30 TimestampSecondArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
31};
32use arrow::compute;
33use arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
34use datafusion_common::{
35 DataFusionError, Result, ScalarValue, downcast_value, internal_err,
36};
37use datafusion_expr_common::accumulator::Accumulator;
38use std::{cmp::Ordering, mem::size_of_val};
39
40macro_rules! choose_min_max {
41 (min) => {
42 std::cmp::Ordering::Greater
43 };
44 (max) => {
45 std::cmp::Ordering::Less
46 };
47}
48
49macro_rules! min_max {
50 ($VALUE:expr, $DELTA:expr, $OP:ident) => {{ min_max_scalar($VALUE, $DELTA, choose_min_max!($OP)) }};
51}
52
53fn min_max_option<T: Clone + Ord>(
54 lhs: &Option<T>,
55 rhs: &Option<T>,
56 ordering: Ordering,
57) -> Option<T> {
58 match (lhs, rhs) {
59 (None, None) => None,
60 (Some(a), None) => Some(a.clone()),
61 (None, Some(b)) => Some(b.clone()),
62 (Some(a), Some(b)) if a.cmp(b) == ordering => Some(b.clone()),
63 (Some(a), Some(_)) => Some(a.clone()),
64 }
65}
66
67fn min_max_float_option<T: Copy>(
68 lhs: &Option<T>,
69 rhs: &Option<T>,
70 ordering: Ordering,
71 cmp: impl Fn(&T, &T) -> Ordering,
72) -> Option<T> {
73 match (lhs, rhs) {
74 (None, None) => None,
75 (Some(a), None) => Some(*a),
76 (None, Some(b)) => Some(*b),
77 (Some(a), Some(b)) if cmp(a, b) == ordering => Some(*b),
78 (Some(a), Some(_)) => Some(*a),
79 }
80}
81
82fn ensure_decimal_compatibility(
83 lhs: &ScalarValue,
84 rhs: &ScalarValue,
85 lhs_type: (u8, i8),
86 rhs_type: (u8, i8),
87) -> Result<()> {
88 if lhs_type == rhs_type {
89 Ok(())
90 } else {
91 internal_err!(
92 "MIN/MAX is not expected to receive scalars of incompatible types {:?}",
93 (lhs, rhs)
94 )
95 }
96}
97
98fn min_max_generic_scalar(
99 lhs: &ScalarValue,
100 rhs: &ScalarValue,
101 ordering: Ordering,
102) -> ScalarValue {
103 if lhs.is_null() {
104 let mut rhs_copy = rhs.clone();
105 rhs_copy.compact();
108 rhs_copy
109 } else if rhs.is_null() {
110 lhs.clone()
111 } else {
112 match lhs.partial_cmp(rhs) {
113 Some(order) if order == ordering => {
114 let mut rhs_copy = rhs.clone();
117 rhs_copy.compact();
118 rhs_copy
119 }
120 _ => lhs.clone(),
121 }
122 }
123}
124
125fn min_max_interval_scalar(
126 lhs: &ScalarValue,
127 rhs: &ScalarValue,
128 ordering: Ordering,
129) -> Result<ScalarValue> {
130 match lhs.partial_cmp(rhs) {
131 Some(order) if order == ordering => Ok(rhs.clone()),
132 Some(_) => Ok(lhs.clone()),
133 None => internal_err!("Comparison error while computing interval min/max"),
134 }
135}
136
137fn min_max_dictionary_scalar(
138 lhs: &ScalarValue,
139 rhs: &ScalarValue,
140 ordering: Ordering,
141) -> Result<Option<ScalarValue>> {
142 match (lhs, rhs) {
143 (
144 ScalarValue::Dictionary(lhs_dict_key_type, lhs_dict_value),
145 ScalarValue::Dictionary(rhs_dict_key_type, rhs_dict_value),
146 ) => {
147 if lhs_dict_key_type != rhs_dict_key_type {
148 return internal_err!(
149 "MIN/MAX is not expected to receive dictionary scalars with different key types ({:?} vs {:?})",
150 lhs_dict_key_type,
151 rhs_dict_key_type
152 );
153 }
154
155 let result = min_max_scalar(
156 lhs_dict_value.as_ref(),
157 rhs_dict_value.as_ref(),
158 ordering,
159 )?;
160 Ok(Some(ScalarValue::Dictionary(
161 lhs_dict_key_type.clone(),
162 Box::new(result),
163 )))
164 }
165 (ScalarValue::Dictionary(_, lhs_dict_value), rhs_scalar) => {
166 min_max_scalar(lhs_dict_value.as_ref(), rhs_scalar, ordering).map(Some)
167 }
168 (lhs_scalar, ScalarValue::Dictionary(_, rhs_dict_value)) => {
169 min_max_scalar(lhs_scalar, rhs_dict_value.as_ref(), ordering).map(Some)
170 }
171 _ => Ok(None),
172 }
173}
174
175fn min_max_scalar(
180 lhs: &ScalarValue,
181 rhs: &ScalarValue,
182 ordering: Ordering,
183) -> Result<ScalarValue> {
184 if ordering == Ordering::Equal {
185 unreachable!("min/max comparisons do not use equal ordering");
186 }
187
188 if let Some(result) = min_max_dictionary_scalar(lhs, rhs, ordering)? {
189 return Ok(result);
190 }
191
192 min_max_scalar_same_variant(lhs, rhs, ordering)
193}
194
195fn min_max_scalar_same_variant(
196 lhs: &ScalarValue,
197 rhs: &ScalarValue,
198 ordering: Ordering,
199) -> Result<ScalarValue> {
200 let result = match (lhs, rhs) {
201 (ScalarValue::Null, ScalarValue::Null) => ScalarValue::Null,
202 (
203 ScalarValue::Decimal32(lhsv, lhsp, lhss),
204 ScalarValue::Decimal32(rhsv, rhsp, rhss),
205 ) => {
206 ensure_decimal_compatibility(lhs, rhs, (*lhsp, *lhss), (*rhsp, *rhss))?;
207 ScalarValue::Decimal32(min_max_option(lhsv, rhsv, ordering), *lhsp, *lhss)
208 }
209 (
210 ScalarValue::Decimal64(lhsv, lhsp, lhss),
211 ScalarValue::Decimal64(rhsv, rhsp, rhss),
212 ) => {
213 ensure_decimal_compatibility(lhs, rhs, (*lhsp, *lhss), (*rhsp, *rhss))?;
214 ScalarValue::Decimal64(min_max_option(lhsv, rhsv, ordering), *lhsp, *lhss)
215 }
216 (
217 ScalarValue::Decimal128(lhsv, lhsp, lhss),
218 ScalarValue::Decimal128(rhsv, rhsp, rhss),
219 ) => {
220 ensure_decimal_compatibility(lhs, rhs, (*lhsp, *lhss), (*rhsp, *rhss))?;
221 ScalarValue::Decimal128(min_max_option(lhsv, rhsv, ordering), *lhsp, *lhss)
222 }
223 (
224 ScalarValue::Decimal256(lhsv, lhsp, lhss),
225 ScalarValue::Decimal256(rhsv, rhsp, rhss),
226 ) => {
227 ensure_decimal_compatibility(lhs, rhs, (*lhsp, *lhss), (*rhsp, *rhss))?;
228 ScalarValue::Decimal256(min_max_option(lhsv, rhsv, ordering), *lhsp, *lhss)
229 }
230 (ScalarValue::Boolean(lhs), ScalarValue::Boolean(rhs)) => {
231 ScalarValue::Boolean(min_max_option(lhs, rhs, ordering))
232 }
233 (ScalarValue::Float64(lhs), ScalarValue::Float64(rhs)) => {
234 ScalarValue::Float64(min_max_float_option(lhs, rhs, ordering, f64::total_cmp))
235 }
236 (ScalarValue::Float32(lhs), ScalarValue::Float32(rhs)) => {
237 ScalarValue::Float32(min_max_float_option(lhs, rhs, ordering, f32::total_cmp))
238 }
239 (ScalarValue::Float16(lhs), ScalarValue::Float16(rhs)) => {
240 ScalarValue::Float16(min_max_float_option(lhs, rhs, ordering, |a, b| {
241 a.total_cmp(b)
242 }))
243 }
244 (ScalarValue::UInt64(lhs), ScalarValue::UInt64(rhs)) => {
245 ScalarValue::UInt64(min_max_option(lhs, rhs, ordering))
246 }
247 (ScalarValue::UInt32(lhs), ScalarValue::UInt32(rhs)) => {
248 ScalarValue::UInt32(min_max_option(lhs, rhs, ordering))
249 }
250 (ScalarValue::UInt16(lhs), ScalarValue::UInt16(rhs)) => {
251 ScalarValue::UInt16(min_max_option(lhs, rhs, ordering))
252 }
253 (ScalarValue::UInt8(lhs), ScalarValue::UInt8(rhs)) => {
254 ScalarValue::UInt8(min_max_option(lhs, rhs, ordering))
255 }
256 (ScalarValue::Int64(lhs), ScalarValue::Int64(rhs)) => {
257 ScalarValue::Int64(min_max_option(lhs, rhs, ordering))
258 }
259 (ScalarValue::Int32(lhs), ScalarValue::Int32(rhs)) => {
260 ScalarValue::Int32(min_max_option(lhs, rhs, ordering))
261 }
262 (ScalarValue::Int16(lhs), ScalarValue::Int16(rhs)) => {
263 ScalarValue::Int16(min_max_option(lhs, rhs, ordering))
264 }
265 (ScalarValue::Int8(lhs), ScalarValue::Int8(rhs)) => {
266 ScalarValue::Int8(min_max_option(lhs, rhs, ordering))
267 }
268 (ScalarValue::Utf8(lhs), ScalarValue::Utf8(rhs)) => {
269 ScalarValue::Utf8(min_max_option(lhs, rhs, ordering))
270 }
271 (ScalarValue::LargeUtf8(lhs), ScalarValue::LargeUtf8(rhs)) => {
272 ScalarValue::LargeUtf8(min_max_option(lhs, rhs, ordering))
273 }
274 (ScalarValue::Utf8View(lhs), ScalarValue::Utf8View(rhs)) => {
275 ScalarValue::Utf8View(min_max_option(lhs, rhs, ordering))
276 }
277 (ScalarValue::Binary(lhs), ScalarValue::Binary(rhs)) => {
278 ScalarValue::Binary(min_max_option(lhs, rhs, ordering))
279 }
280 (ScalarValue::LargeBinary(lhs), ScalarValue::LargeBinary(rhs)) => {
281 ScalarValue::LargeBinary(min_max_option(lhs, rhs, ordering))
282 }
283 (
284 ScalarValue::FixedSizeBinary(lsize, lhs),
285 ScalarValue::FixedSizeBinary(rsize, rhs),
286 ) => {
287 if lsize == rsize {
288 ScalarValue::FixedSizeBinary(*lsize, min_max_option(lhs, rhs, ordering))
289 } else {
290 return internal_err!(
291 "MIN/MAX is not expected to receive FixedSizeBinary of incompatible sizes {:?}",
292 (lsize, rsize)
293 );
294 }
295 }
296 (ScalarValue::BinaryView(lhs), ScalarValue::BinaryView(rhs)) => {
297 ScalarValue::BinaryView(min_max_option(lhs, rhs, ordering))
298 }
299 (
300 ScalarValue::TimestampSecond(lhs, l_tz),
301 ScalarValue::TimestampSecond(rhs, _),
302 ) => {
303 ScalarValue::TimestampSecond(min_max_option(lhs, rhs, ordering), l_tz.clone())
304 }
305 (
306 ScalarValue::TimestampMillisecond(lhs, l_tz),
307 ScalarValue::TimestampMillisecond(rhs, _),
308 ) => ScalarValue::TimestampMillisecond(
309 min_max_option(lhs, rhs, ordering),
310 l_tz.clone(),
311 ),
312 (
313 ScalarValue::TimestampMicrosecond(lhs, l_tz),
314 ScalarValue::TimestampMicrosecond(rhs, _),
315 ) => ScalarValue::TimestampMicrosecond(
316 min_max_option(lhs, rhs, ordering),
317 l_tz.clone(),
318 ),
319 (
320 ScalarValue::TimestampNanosecond(lhs, l_tz),
321 ScalarValue::TimestampNanosecond(rhs, _),
322 ) => ScalarValue::TimestampNanosecond(
323 min_max_option(lhs, rhs, ordering),
324 l_tz.clone(),
325 ),
326 (ScalarValue::Date32(lhs), ScalarValue::Date32(rhs)) => {
327 ScalarValue::Date32(min_max_option(lhs, rhs, ordering))
328 }
329 (ScalarValue::Date64(lhs), ScalarValue::Date64(rhs)) => {
330 ScalarValue::Date64(min_max_option(lhs, rhs, ordering))
331 }
332 (ScalarValue::Time32Second(lhs), ScalarValue::Time32Second(rhs)) => {
333 ScalarValue::Time32Second(min_max_option(lhs, rhs, ordering))
334 }
335 (ScalarValue::Time32Millisecond(lhs), ScalarValue::Time32Millisecond(rhs)) => {
336 ScalarValue::Time32Millisecond(min_max_option(lhs, rhs, ordering))
337 }
338 (ScalarValue::Time64Microsecond(lhs), ScalarValue::Time64Microsecond(rhs)) => {
339 ScalarValue::Time64Microsecond(min_max_option(lhs, rhs, ordering))
340 }
341 (ScalarValue::Time64Nanosecond(lhs), ScalarValue::Time64Nanosecond(rhs)) => {
342 ScalarValue::Time64Nanosecond(min_max_option(lhs, rhs, ordering))
343 }
344 (ScalarValue::IntervalYearMonth(lhs), ScalarValue::IntervalYearMonth(rhs)) => {
345 ScalarValue::IntervalYearMonth(min_max_option(lhs, rhs, ordering))
346 }
347 (
348 ScalarValue::IntervalMonthDayNano(lhs),
349 ScalarValue::IntervalMonthDayNano(rhs),
350 ) => ScalarValue::IntervalMonthDayNano(min_max_option(lhs, rhs, ordering)),
351 (ScalarValue::IntervalDayTime(lhs), ScalarValue::IntervalDayTime(rhs)) => {
352 ScalarValue::IntervalDayTime(min_max_option(lhs, rhs, ordering))
353 }
354 (ScalarValue::IntervalYearMonth(_), ScalarValue::IntervalMonthDayNano(_))
355 | (ScalarValue::IntervalYearMonth(_), ScalarValue::IntervalDayTime(_))
356 | (ScalarValue::IntervalMonthDayNano(_), ScalarValue::IntervalDayTime(_))
357 | (ScalarValue::IntervalMonthDayNano(_), ScalarValue::IntervalYearMonth(_))
358 | (ScalarValue::IntervalDayTime(_), ScalarValue::IntervalYearMonth(_))
359 | (ScalarValue::IntervalDayTime(_), ScalarValue::IntervalMonthDayNano(_)) => {
360 return min_max_interval_scalar(lhs, rhs, ordering);
361 }
362 (ScalarValue::DurationSecond(lhs), ScalarValue::DurationSecond(rhs)) => {
363 ScalarValue::DurationSecond(min_max_option(lhs, rhs, ordering))
364 }
365 (
366 ScalarValue::DurationMillisecond(lhs),
367 ScalarValue::DurationMillisecond(rhs),
368 ) => ScalarValue::DurationMillisecond(min_max_option(lhs, rhs, ordering)),
369 (
370 ScalarValue::DurationMicrosecond(lhs),
371 ScalarValue::DurationMicrosecond(rhs),
372 ) => ScalarValue::DurationMicrosecond(min_max_option(lhs, rhs, ordering)),
373 (ScalarValue::DurationNanosecond(lhs), ScalarValue::DurationNanosecond(rhs)) => {
374 ScalarValue::DurationNanosecond(min_max_option(lhs, rhs, ordering))
375 }
376 (ScalarValue::Struct(_), ScalarValue::Struct(_))
377 | (ScalarValue::List(_), ScalarValue::List(_))
378 | (ScalarValue::LargeList(_), ScalarValue::LargeList(_))
379 | (ScalarValue::FixedSizeList(_), ScalarValue::FixedSizeList(_)) => {
380 min_max_generic_scalar(lhs, rhs, ordering)
381 }
382 _ => {
383 return internal_err!(
384 "MIN/MAX is not expected to receive logically incompatible scalar values {:?}",
385 (lhs, rhs)
386 );
387 }
388 };
389
390 Ok(result)
391}
392
393#[derive(Debug, Clone)]
395pub struct MaxAccumulator {
396 max: ScalarValue,
397}
398
399impl MaxAccumulator {
400 pub fn try_new(datatype: &DataType) -> Result<Self> {
402 Ok(Self {
403 max: ScalarValue::try_from(datatype)?,
404 })
405 }
406}
407
408impl Accumulator for MaxAccumulator {
409 fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
410 let values = &values[0];
411 let delta = &max_batch(values)?;
412 let new_max: Result<ScalarValue, DataFusionError> =
413 min_max!(&self.max, delta, max);
414 self.max = new_max?;
415 Ok(())
416 }
417
418 fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
419 self.update_batch(states)
420 }
421
422 fn state(&mut self) -> Result<Vec<ScalarValue>> {
423 Ok(vec![self.evaluate()?])
424 }
425 fn evaluate(&mut self) -> Result<ScalarValue> {
426 Ok(self.max.clone())
427 }
428
429 fn size(&self) -> usize {
430 size_of_val(self) - size_of_val(&self.max) + self.max.size()
431 }
432}
433
434#[derive(Debug, Clone)]
436pub struct MinAccumulator {
437 min: ScalarValue,
438}
439
440impl MinAccumulator {
441 pub fn try_new(datatype: &DataType) -> Result<Self> {
443 Ok(Self {
444 min: ScalarValue::try_from(datatype)?,
445 })
446 }
447}
448
449impl Accumulator for MinAccumulator {
450 fn state(&mut self) -> Result<Vec<ScalarValue>> {
451 Ok(vec![self.evaluate()?])
452 }
453
454 fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
455 let values = &values[0];
456 let delta = &min_batch(values)?;
457 let new_min: Result<ScalarValue, DataFusionError> =
458 min_max!(&self.min, delta, min);
459 self.min = new_min?;
460 Ok(())
461 }
462
463 fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
464 self.update_batch(states)
465 }
466
467 fn evaluate(&mut self) -> Result<ScalarValue> {
468 Ok(self.min.clone())
469 }
470
471 fn size(&self) -> usize {
472 size_of_val(self) - size_of_val(&self.min) + self.min.size()
473 }
474}
475
476macro_rules! typed_min_max_batch_string {
478 ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
479 let array = downcast_value!($VALUES, $ARRAYTYPE);
480 let value = compute::$OP(array);
481 let value = value.and_then(|e| Some(e.to_string()));
482 ScalarValue::$SCALAR(value)
483 }};
484}
485
486macro_rules! typed_min_max_batch_binary {
488 ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
489 let array = downcast_value!($VALUES, $ARRAYTYPE);
490 let value = compute::$OP(array);
491 let value = value.and_then(|e| Some(e.to_vec()));
492 ScalarValue::$SCALAR(value)
493 }};
494}
495
496macro_rules! typed_min_max_batch {
498 ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident $(, $EXTRA_ARGS:ident)*) => {{
499 let array = downcast_value!($VALUES, $ARRAYTYPE);
500 let value = compute::$OP(array);
501 ScalarValue::$SCALAR(value, $($EXTRA_ARGS.clone()),*)
502 }};
503}
504
505macro_rules! min_max_batch {
508 ($VALUES:expr, $OP:ident) => {{
509 match $VALUES.data_type() {
510 DataType::Null => ScalarValue::Null,
511 DataType::Decimal32(precision, scale) => {
512 typed_min_max_batch!(
513 $VALUES,
514 Decimal32Array,
515 Decimal32,
516 $OP,
517 precision,
518 scale
519 )
520 }
521 DataType::Decimal64(precision, scale) => {
522 typed_min_max_batch!(
523 $VALUES,
524 Decimal64Array,
525 Decimal64,
526 $OP,
527 precision,
528 scale
529 )
530 }
531 DataType::Decimal128(precision, scale) => {
532 typed_min_max_batch!(
533 $VALUES,
534 Decimal128Array,
535 Decimal128,
536 $OP,
537 precision,
538 scale
539 )
540 }
541 DataType::Decimal256(precision, scale) => {
542 typed_min_max_batch!(
543 $VALUES,
544 Decimal256Array,
545 Decimal256,
546 $OP,
547 precision,
548 scale
549 )
550 }
551 DataType::Float64 => {
553 typed_min_max_batch!($VALUES, Float64Array, Float64, $OP)
554 }
555 DataType::Float32 => {
556 typed_min_max_batch!($VALUES, Float32Array, Float32, $OP)
557 }
558 DataType::Float16 => {
559 typed_min_max_batch!($VALUES, Float16Array, Float16, $OP)
560 }
561 DataType::Int64 => typed_min_max_batch!($VALUES, Int64Array, Int64, $OP),
562 DataType::Int32 => typed_min_max_batch!($VALUES, Int32Array, Int32, $OP),
563 DataType::Int16 => typed_min_max_batch!($VALUES, Int16Array, Int16, $OP),
564 DataType::Int8 => typed_min_max_batch!($VALUES, Int8Array, Int8, $OP),
565 DataType::UInt64 => typed_min_max_batch!($VALUES, UInt64Array, UInt64, $OP),
566 DataType::UInt32 => typed_min_max_batch!($VALUES, UInt32Array, UInt32, $OP),
567 DataType::UInt16 => typed_min_max_batch!($VALUES, UInt16Array, UInt16, $OP),
568 DataType::UInt8 => typed_min_max_batch!($VALUES, UInt8Array, UInt8, $OP),
569 DataType::Timestamp(TimeUnit::Second, tz_opt) => {
570 typed_min_max_batch!(
571 $VALUES,
572 TimestampSecondArray,
573 TimestampSecond,
574 $OP,
575 tz_opt
576 )
577 }
578 DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => typed_min_max_batch!(
579 $VALUES,
580 TimestampMillisecondArray,
581 TimestampMillisecond,
582 $OP,
583 tz_opt
584 ),
585 DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => typed_min_max_batch!(
586 $VALUES,
587 TimestampMicrosecondArray,
588 TimestampMicrosecond,
589 $OP,
590 tz_opt
591 ),
592 DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => typed_min_max_batch!(
593 $VALUES,
594 TimestampNanosecondArray,
595 TimestampNanosecond,
596 $OP,
597 tz_opt
598 ),
599 DataType::Date32 => typed_min_max_batch!($VALUES, Date32Array, Date32, $OP),
600 DataType::Date64 => typed_min_max_batch!($VALUES, Date64Array, Date64, $OP),
601 DataType::Time32(TimeUnit::Second) => {
602 typed_min_max_batch!($VALUES, Time32SecondArray, Time32Second, $OP)
603 }
604 DataType::Time32(TimeUnit::Millisecond) => {
605 typed_min_max_batch!(
606 $VALUES,
607 Time32MillisecondArray,
608 Time32Millisecond,
609 $OP
610 )
611 }
612 DataType::Time64(TimeUnit::Microsecond) => {
613 typed_min_max_batch!(
614 $VALUES,
615 Time64MicrosecondArray,
616 Time64Microsecond,
617 $OP
618 )
619 }
620 DataType::Time64(TimeUnit::Nanosecond) => {
621 typed_min_max_batch!(
622 $VALUES,
623 Time64NanosecondArray,
624 Time64Nanosecond,
625 $OP
626 )
627 }
628 DataType::Interval(IntervalUnit::YearMonth) => {
629 typed_min_max_batch!(
630 $VALUES,
631 IntervalYearMonthArray,
632 IntervalYearMonth,
633 $OP
634 )
635 }
636 DataType::Interval(IntervalUnit::DayTime) => {
637 typed_min_max_batch!($VALUES, IntervalDayTimeArray, IntervalDayTime, $OP)
638 }
639 DataType::Interval(IntervalUnit::MonthDayNano) => {
640 typed_min_max_batch!(
641 $VALUES,
642 IntervalMonthDayNanoArray,
643 IntervalMonthDayNano,
644 $OP
645 )
646 }
647 DataType::Duration(TimeUnit::Second) => {
648 typed_min_max_batch!($VALUES, DurationSecondArray, DurationSecond, $OP)
649 }
650 DataType::Duration(TimeUnit::Millisecond) => {
651 typed_min_max_batch!(
652 $VALUES,
653 DurationMillisecondArray,
654 DurationMillisecond,
655 $OP
656 )
657 }
658 DataType::Duration(TimeUnit::Microsecond) => {
659 typed_min_max_batch!(
660 $VALUES,
661 DurationMicrosecondArray,
662 DurationMicrosecond,
663 $OP
664 )
665 }
666 DataType::Duration(TimeUnit::Nanosecond) => {
667 typed_min_max_batch!(
668 $VALUES,
669 DurationNanosecondArray,
670 DurationNanosecond,
671 $OP
672 )
673 }
674 other => {
675 return datafusion_common::internal_err!(
677 "Min/Max accumulator not implemented for type {}",
678 other
679 );
680 }
681 }
682 }};
683}
684
685pub fn min_batch(values: &ArrayRef) -> Result<ScalarValue> {
687 Ok(match values.data_type() {
688 DataType::Utf8 => {
689 typed_min_max_batch_string!(values, StringArray, Utf8, min_string)
690 }
691 DataType::LargeUtf8 => {
692 typed_min_max_batch_string!(values, LargeStringArray, LargeUtf8, min_string)
693 }
694 DataType::Utf8View => {
695 typed_min_max_batch_string!(
696 values,
697 StringViewArray,
698 Utf8View,
699 min_string_view
700 )
701 }
702 DataType::Boolean => {
703 typed_min_max_batch!(values, BooleanArray, Boolean, min_boolean)
704 }
705 DataType::Binary => {
706 typed_min_max_batch_binary!(&values, BinaryArray, Binary, min_binary)
707 }
708 DataType::LargeBinary => {
709 typed_min_max_batch_binary!(
710 &values,
711 LargeBinaryArray,
712 LargeBinary,
713 min_binary
714 )
715 }
716 DataType::FixedSizeBinary(size) => {
717 let array = downcast_value!(&values, FixedSizeBinaryArray);
718 let value = compute::min_fixed_size_binary(array);
719 let value = value.map(|e| e.to_vec());
720 ScalarValue::FixedSizeBinary(*size, value)
721 }
722 DataType::BinaryView => {
723 typed_min_max_batch_binary!(
724 &values,
725 BinaryViewArray,
726 BinaryView,
727 min_binary_view
728 )
729 }
730 DataType::Struct(_)
731 | DataType::List(_)
732 | DataType::LargeList(_)
733 | DataType::FixedSizeList(_, _)
734 | DataType::Dictionary(_, _) => min_max_batch_generic(values, Ordering::Greater)?,
735 _ => min_max_batch!(values, min),
736 })
737}
738
739fn min_max_batch_generic(array: &ArrayRef, ordering: Ordering) -> Result<ScalarValue> {
741 if array.len() == array.null_count() {
742 return ScalarValue::try_from(array.data_type());
743 }
744 let mut extreme = ScalarValue::try_from_array(array, 0)?;
745 for i in 1..array.len() {
746 let current = ScalarValue::try_from_array(array, i)?;
747 if current.is_null() {
748 continue;
749 }
750 if extreme.is_null() {
751 extreme = current;
752 continue;
753 }
754 let cmp = extreme.try_cmp(¤t)?;
755 if cmp == ordering {
756 extreme = current;
757 }
758 }
759
760 Ok(extreme)
761}
762
763pub fn max_batch(values: &ArrayRef) -> Result<ScalarValue> {
765 Ok(match values.data_type() {
766 DataType::Utf8 => {
767 typed_min_max_batch_string!(values, StringArray, Utf8, max_string)
768 }
769 DataType::LargeUtf8 => {
770 typed_min_max_batch_string!(values, LargeStringArray, LargeUtf8, max_string)
771 }
772 DataType::Utf8View => {
773 typed_min_max_batch_string!(
774 values,
775 StringViewArray,
776 Utf8View,
777 max_string_view
778 )
779 }
780 DataType::Boolean => {
781 typed_min_max_batch!(values, BooleanArray, Boolean, max_boolean)
782 }
783 DataType::Binary => {
784 typed_min_max_batch_binary!(&values, BinaryArray, Binary, max_binary)
785 }
786 DataType::BinaryView => {
787 typed_min_max_batch_binary!(
788 &values,
789 BinaryViewArray,
790 BinaryView,
791 max_binary_view
792 )
793 }
794 DataType::LargeBinary => {
795 typed_min_max_batch_binary!(
796 &values,
797 LargeBinaryArray,
798 LargeBinary,
799 max_binary
800 )
801 }
802 DataType::FixedSizeBinary(size) => {
803 let array = downcast_value!(&values, FixedSizeBinaryArray);
804 let value = compute::max_fixed_size_binary(array);
805 let value = value.map(|e| e.to_vec());
806 ScalarValue::FixedSizeBinary(*size, value)
807 }
808 DataType::Struct(_)
809 | DataType::List(_)
810 | DataType::LargeList(_)
811 | DataType::FixedSizeList(_, _)
812 | DataType::Dictionary(_, _) => min_max_batch_generic(values, Ordering::Less)?,
813 _ => min_max_batch!(values, max),
814 })
815}
816
817#[cfg(test)]
818mod tests {
819 use super::*;
820
821 #[test]
822 fn min_max_scalar_preserves_core_behaviors() -> Result<()> {
823 let cases = [
824 (
825 ScalarValue::Int32(Some(1)),
826 ScalarValue::Int32(Some(2)),
827 Ordering::Less,
828 ScalarValue::Int32(Some(2)),
829 ),
830 (
831 ScalarValue::Int32(Some(1)),
832 ScalarValue::Int32(Some(2)),
833 Ordering::Greater,
834 ScalarValue::Int32(Some(1)),
835 ),
836 (
837 ScalarValue::Utf8(Some("a".to_string())),
838 ScalarValue::Utf8(Some("b".to_string())),
839 Ordering::Less,
840 ScalarValue::Utf8(Some("b".to_string())),
841 ),
842 (
843 ScalarValue::Boolean(None),
844 ScalarValue::Boolean(Some(true)),
845 Ordering::Greater,
846 ScalarValue::Boolean(Some(true)),
847 ),
848 ];
849
850 for (lhs, rhs, ordering, expected) in cases {
851 assert_eq!(min_max_scalar(&lhs, &rhs, ordering)?, expected);
852 }
853
854 Ok(())
855 }
856
857 #[test]
858 fn min_max_scalar_float_uses_total_cmp_for_nan() -> Result<()> {
859 type F16 =
860 <arrow::datatypes::Float16Type as arrow::datatypes::ArrowPrimitiveType>::Native;
861
862 let lhs = ScalarValue::Float64(Some(f64::NAN));
863 let rhs = ScalarValue::Float64(Some(1.0));
864 assert_eq!(min_max_scalar(&lhs, &rhs, Ordering::Greater)?, rhs);
865 assert!(matches!(
866 min_max_scalar(&lhs, &rhs, Ordering::Less)?,
867 ScalarValue::Float64(Some(value)) if value.is_nan()
868 ));
869
870 let lhs = ScalarValue::Float32(Some(f32::NAN));
871 let rhs = ScalarValue::Float32(Some(1.0));
872 assert_eq!(min_max_scalar(&lhs, &rhs, Ordering::Greater)?, rhs);
873 assert!(matches!(
874 min_max_scalar(&lhs, &rhs, Ordering::Less)?,
875 ScalarValue::Float32(Some(value)) if value.is_nan()
876 ));
877
878 let lhs = ScalarValue::Float16(Some(F16::NAN));
879 let rhs = ScalarValue::Float16(Some(F16::from_f32(1.0)));
880 assert_eq!(min_max_scalar(&lhs, &rhs, Ordering::Greater)?, rhs);
881 assert!(matches!(
882 min_max_scalar(&lhs, &rhs, Ordering::Less)?,
883 ScalarValue::Float16(Some(value)) if value.is_nan()
884 ));
885 Ok(())
886 }
887
888 #[test]
889 fn min_max_decimal_mismatch_error_is_preserved() -> Result<()> {
890 let lhs = ScalarValue::Decimal128(Some(1), 10, 2);
891 let rhs = ScalarValue::Decimal128(Some(2), 11, 2);
892
893 let error = min_max_scalar(&lhs, &rhs, Ordering::Less).unwrap_err();
894 let message = error.to_string();
895
896 assert!(message.starts_with(&format!(
897 "Internal error: MIN/MAX is not expected to receive scalars of incompatible types {:?}",
898 (&lhs, &rhs)
899 )));
900 Ok(())
901 }
902
903 #[test]
904 fn min_max_fixed_size_binary_mismatch_error_is_preserved() -> Result<()> {
905 let lhs = ScalarValue::FixedSizeBinary(2, Some(vec![1, 2]));
906 let rhs = ScalarValue::FixedSizeBinary(3, Some(vec![1, 2, 3]));
907
908 let error = min_max_scalar(&lhs, &rhs, Ordering::Less).unwrap_err();
909 let message = error.to_string();
910
911 assert!(message.starts_with(
912 "Internal error: MIN/MAX is not expected to receive FixedSizeBinary of incompatible sizes (2, 3)"
913 ));
914 Ok(())
915 }
916
917 #[test]
918 fn min_max_mixed_interval_error_is_preserved() -> Result<()> {
919 let lhs = ScalarValue::IntervalYearMonth(Some(1));
920 let rhs = ScalarValue::IntervalDayTime(Some(
921 arrow::datatypes::IntervalDayTime::new(1, 0),
922 ));
923
924 let error = min_max_scalar(&lhs, &rhs, Ordering::Less).unwrap_err();
925 let message = error.to_string();
926
927 assert!(message.starts_with(
928 "Internal error: Comparison error while computing interval min/max"
929 ));
930 Ok(())
931 }
932
933 #[test]
934 fn min_max_dictionary_and_scalar_compare_by_inner_value() -> Result<()> {
935 let dictionary = ScalarValue::Dictionary(
936 Box::new(DataType::Int32),
937 Box::new(ScalarValue::Float32(Some(1.0))),
938 );
939 let scalar = ScalarValue::Float32(Some(2.0));
940
941 let result = min_max_scalar(&dictionary, &scalar, Ordering::Less)?;
942
943 assert_eq!(result, ScalarValue::Float32(Some(2.0)));
944 Ok(())
945 }
946
947 #[test]
948 fn min_max_dictionary_same_key_type_rewraps_result() -> Result<()> {
949 let lhs = ScalarValue::Dictionary(
950 Box::new(DataType::Int32),
951 Box::new(ScalarValue::Float32(Some(1.0))),
952 );
953 let rhs = ScalarValue::Dictionary(
954 Box::new(DataType::Int32),
955 Box::new(ScalarValue::Float32(Some(2.0))),
956 );
957
958 let result = min_max_scalar(&lhs, &rhs, Ordering::Less)?;
959
960 assert_eq!(
961 result,
962 ScalarValue::Dictionary(
963 Box::new(DataType::Int32),
964 Box::new(ScalarValue::Float32(Some(2.0))),
965 )
966 );
967 Ok(())
968 }
969
970 #[test]
971 fn min_max_dictionary_different_key_types_error() -> Result<()> {
972 let lhs = ScalarValue::Dictionary(
973 Box::new(DataType::Int8),
974 Box::new(ScalarValue::Float32(Some(1.0))),
975 );
976 let rhs = ScalarValue::Dictionary(
977 Box::new(DataType::Int32),
978 Box::new(ScalarValue::Float32(Some(2.0))),
979 );
980
981 let error: DataFusionError =
982 min_max_scalar(&lhs, &rhs, Ordering::Less).unwrap_err();
983
984 assert!(
985 error
986 .to_string()
987 .contains("dictionary scalars with different key types")
988 );
989 Ok(())
990 }
991
992 #[test]
993 fn min_max_dictionary_and_incompatible_scalar_error() -> Result<()> {
994 let dictionary = ScalarValue::Dictionary(
995 Box::new(DataType::Int32),
996 Box::new(ScalarValue::Float32(Some(1.0))),
997 );
998 let scalar = ScalarValue::Int32(Some(2));
999
1000 let error: DataFusionError =
1001 min_max_scalar(&dictionary, &scalar, Ordering::Less).unwrap_err();
1002
1003 assert!(
1004 error
1005 .to_string()
1006 .contains("logically incompatible scalar values")
1007 );
1008 Ok(())
1009 }
1010}