Skip to main content

datafusion_functions_aggregate_common/
min_max.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Basic min/max functionality shared across DataFusion aggregate functions
19
20use arrow::array::{
21    ArrayRef, BinaryArray, BinaryViewArray, BooleanArray, Date32Array, Date64Array,
22    Decimal32Array, Decimal64Array, Decimal128Array, Decimal256Array,
23    DurationMicrosecondArray, DurationMillisecondArray, DurationNanosecondArray,
24    DurationSecondArray, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array,
25    Int8Array, Int16Array, Int32Array, Int64Array, IntervalDayTimeArray,
26    IntervalMonthDayNanoArray, IntervalYearMonthArray, LargeBinaryArray,
27    LargeStringArray, StringArray, StringViewArray, Time32MillisecondArray,
28    Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray,
29    TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
30    TimestampSecondArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
31};
32use arrow::compute;
33use arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
34use datafusion_common::{
35    DataFusionError, Result, ScalarValue, downcast_value, internal_err,
36};
37use datafusion_expr_common::accumulator::Accumulator;
38use std::{cmp::Ordering, mem::size_of_val};
39
40macro_rules! choose_min_max {
41    (min) => {
42        std::cmp::Ordering::Greater
43    };
44    (max) => {
45        std::cmp::Ordering::Less
46    };
47}
48
49macro_rules! min_max {
50    ($VALUE:expr, $DELTA:expr, $OP:ident) => {{ min_max_scalar($VALUE, $DELTA, choose_min_max!($OP)) }};
51}
52
53fn min_max_option<T: Clone + Ord>(
54    lhs: &Option<T>,
55    rhs: &Option<T>,
56    ordering: Ordering,
57) -> Option<T> {
58    match (lhs, rhs) {
59        (None, None) => None,
60        (Some(a), None) => Some(a.clone()),
61        (None, Some(b)) => Some(b.clone()),
62        (Some(a), Some(b)) if a.cmp(b) == ordering => Some(b.clone()),
63        (Some(a), Some(_)) => Some(a.clone()),
64    }
65}
66
67fn min_max_float_option<T: Copy>(
68    lhs: &Option<T>,
69    rhs: &Option<T>,
70    ordering: Ordering,
71    cmp: impl Fn(&T, &T) -> Ordering,
72) -> Option<T> {
73    match (lhs, rhs) {
74        (None, None) => None,
75        (Some(a), None) => Some(*a),
76        (None, Some(b)) => Some(*b),
77        (Some(a), Some(b)) if cmp(a, b) == ordering => Some(*b),
78        (Some(a), Some(_)) => Some(*a),
79    }
80}
81
82fn ensure_decimal_compatibility(
83    lhs: &ScalarValue,
84    rhs: &ScalarValue,
85    lhs_type: (u8, i8),
86    rhs_type: (u8, i8),
87) -> Result<()> {
88    if lhs_type == rhs_type {
89        Ok(())
90    } else {
91        internal_err!(
92            "MIN/MAX is not expected to receive scalars of incompatible types {:?}",
93            (lhs, rhs)
94        )
95    }
96}
97
98fn min_max_generic_scalar(
99    lhs: &ScalarValue,
100    rhs: &ScalarValue,
101    ordering: Ordering,
102) -> ScalarValue {
103    if lhs.is_null() {
104        let mut rhs_copy = rhs.clone();
105        // When the new value won we want to compact it to
106        // avoid storing the entire input
107        rhs_copy.compact();
108        rhs_copy
109    } else if rhs.is_null() {
110        lhs.clone()
111    } else {
112        match lhs.partial_cmp(rhs) {
113            Some(order) if order == ordering => {
114                // When the new value won we want to compact it to
115                // avoid storing the entire input
116                let mut rhs_copy = rhs.clone();
117                rhs_copy.compact();
118                rhs_copy
119            }
120            _ => lhs.clone(),
121        }
122    }
123}
124
125fn min_max_interval_scalar(
126    lhs: &ScalarValue,
127    rhs: &ScalarValue,
128    ordering: Ordering,
129) -> Result<ScalarValue> {
130    match lhs.partial_cmp(rhs) {
131        Some(order) if order == ordering => Ok(rhs.clone()),
132        Some(_) => Ok(lhs.clone()),
133        None => internal_err!("Comparison error while computing interval min/max"),
134    }
135}
136
137fn min_max_dictionary_scalar(
138    lhs: &ScalarValue,
139    rhs: &ScalarValue,
140    ordering: Ordering,
141) -> Result<Option<ScalarValue>> {
142    match (lhs, rhs) {
143        (
144            ScalarValue::Dictionary(lhs_dict_key_type, lhs_dict_value),
145            ScalarValue::Dictionary(rhs_dict_key_type, rhs_dict_value),
146        ) => {
147            if lhs_dict_key_type != rhs_dict_key_type {
148                return internal_err!(
149                    "MIN/MAX is not expected to receive dictionary scalars with different key types ({:?} vs {:?})",
150                    lhs_dict_key_type,
151                    rhs_dict_key_type
152                );
153            }
154
155            let result = min_max_scalar(
156                lhs_dict_value.as_ref(),
157                rhs_dict_value.as_ref(),
158                ordering,
159            )?;
160            Ok(Some(ScalarValue::Dictionary(
161                lhs_dict_key_type.clone(),
162                Box::new(result),
163            )))
164        }
165        (ScalarValue::Dictionary(_, lhs_dict_value), rhs_scalar) => {
166            min_max_scalar(lhs_dict_value.as_ref(), rhs_scalar, ordering).map(Some)
167        }
168        (lhs_scalar, ScalarValue::Dictionary(_, rhs_dict_value)) => {
169            min_max_scalar(lhs_scalar, rhs_dict_value.as_ref(), ordering).map(Some)
170        }
171        _ => Ok(None),
172    }
173}
174
175// min/max of two logically compatible scalar values.
176// Dictionary scalars participate by comparing their inner logical values.
177// When both inputs are dictionaries, matching key types are preserved in the
178// result; differing key types remain an unexpected invariant violation.
179fn min_max_scalar(
180    lhs: &ScalarValue,
181    rhs: &ScalarValue,
182    ordering: Ordering,
183) -> Result<ScalarValue> {
184    if ordering == Ordering::Equal {
185        unreachable!("min/max comparisons do not use equal ordering");
186    }
187
188    if let Some(result) = min_max_dictionary_scalar(lhs, rhs, ordering)? {
189        return Ok(result);
190    }
191
192    min_max_scalar_same_variant(lhs, rhs, ordering)
193}
194
195fn min_max_scalar_same_variant(
196    lhs: &ScalarValue,
197    rhs: &ScalarValue,
198    ordering: Ordering,
199) -> Result<ScalarValue> {
200    let result = match (lhs, rhs) {
201        (ScalarValue::Null, ScalarValue::Null) => ScalarValue::Null,
202        (
203            ScalarValue::Decimal32(lhsv, lhsp, lhss),
204            ScalarValue::Decimal32(rhsv, rhsp, rhss),
205        ) => {
206            ensure_decimal_compatibility(lhs, rhs, (*lhsp, *lhss), (*rhsp, *rhss))?;
207            ScalarValue::Decimal32(min_max_option(lhsv, rhsv, ordering), *lhsp, *lhss)
208        }
209        (
210            ScalarValue::Decimal64(lhsv, lhsp, lhss),
211            ScalarValue::Decimal64(rhsv, rhsp, rhss),
212        ) => {
213            ensure_decimal_compatibility(lhs, rhs, (*lhsp, *lhss), (*rhsp, *rhss))?;
214            ScalarValue::Decimal64(min_max_option(lhsv, rhsv, ordering), *lhsp, *lhss)
215        }
216        (
217            ScalarValue::Decimal128(lhsv, lhsp, lhss),
218            ScalarValue::Decimal128(rhsv, rhsp, rhss),
219        ) => {
220            ensure_decimal_compatibility(lhs, rhs, (*lhsp, *lhss), (*rhsp, *rhss))?;
221            ScalarValue::Decimal128(min_max_option(lhsv, rhsv, ordering), *lhsp, *lhss)
222        }
223        (
224            ScalarValue::Decimal256(lhsv, lhsp, lhss),
225            ScalarValue::Decimal256(rhsv, rhsp, rhss),
226        ) => {
227            ensure_decimal_compatibility(lhs, rhs, (*lhsp, *lhss), (*rhsp, *rhss))?;
228            ScalarValue::Decimal256(min_max_option(lhsv, rhsv, ordering), *lhsp, *lhss)
229        }
230        (ScalarValue::Boolean(lhs), ScalarValue::Boolean(rhs)) => {
231            ScalarValue::Boolean(min_max_option(lhs, rhs, ordering))
232        }
233        (ScalarValue::Float64(lhs), ScalarValue::Float64(rhs)) => {
234            ScalarValue::Float64(min_max_float_option(lhs, rhs, ordering, f64::total_cmp))
235        }
236        (ScalarValue::Float32(lhs), ScalarValue::Float32(rhs)) => {
237            ScalarValue::Float32(min_max_float_option(lhs, rhs, ordering, f32::total_cmp))
238        }
239        (ScalarValue::Float16(lhs), ScalarValue::Float16(rhs)) => {
240            ScalarValue::Float16(min_max_float_option(lhs, rhs, ordering, |a, b| {
241                a.total_cmp(b)
242            }))
243        }
244        (ScalarValue::UInt64(lhs), ScalarValue::UInt64(rhs)) => {
245            ScalarValue::UInt64(min_max_option(lhs, rhs, ordering))
246        }
247        (ScalarValue::UInt32(lhs), ScalarValue::UInt32(rhs)) => {
248            ScalarValue::UInt32(min_max_option(lhs, rhs, ordering))
249        }
250        (ScalarValue::UInt16(lhs), ScalarValue::UInt16(rhs)) => {
251            ScalarValue::UInt16(min_max_option(lhs, rhs, ordering))
252        }
253        (ScalarValue::UInt8(lhs), ScalarValue::UInt8(rhs)) => {
254            ScalarValue::UInt8(min_max_option(lhs, rhs, ordering))
255        }
256        (ScalarValue::Int64(lhs), ScalarValue::Int64(rhs)) => {
257            ScalarValue::Int64(min_max_option(lhs, rhs, ordering))
258        }
259        (ScalarValue::Int32(lhs), ScalarValue::Int32(rhs)) => {
260            ScalarValue::Int32(min_max_option(lhs, rhs, ordering))
261        }
262        (ScalarValue::Int16(lhs), ScalarValue::Int16(rhs)) => {
263            ScalarValue::Int16(min_max_option(lhs, rhs, ordering))
264        }
265        (ScalarValue::Int8(lhs), ScalarValue::Int8(rhs)) => {
266            ScalarValue::Int8(min_max_option(lhs, rhs, ordering))
267        }
268        (ScalarValue::Utf8(lhs), ScalarValue::Utf8(rhs)) => {
269            ScalarValue::Utf8(min_max_option(lhs, rhs, ordering))
270        }
271        (ScalarValue::LargeUtf8(lhs), ScalarValue::LargeUtf8(rhs)) => {
272            ScalarValue::LargeUtf8(min_max_option(lhs, rhs, ordering))
273        }
274        (ScalarValue::Utf8View(lhs), ScalarValue::Utf8View(rhs)) => {
275            ScalarValue::Utf8View(min_max_option(lhs, rhs, ordering))
276        }
277        (ScalarValue::Binary(lhs), ScalarValue::Binary(rhs)) => {
278            ScalarValue::Binary(min_max_option(lhs, rhs, ordering))
279        }
280        (ScalarValue::LargeBinary(lhs), ScalarValue::LargeBinary(rhs)) => {
281            ScalarValue::LargeBinary(min_max_option(lhs, rhs, ordering))
282        }
283        (
284            ScalarValue::FixedSizeBinary(lsize, lhs),
285            ScalarValue::FixedSizeBinary(rsize, rhs),
286        ) => {
287            if lsize == rsize {
288                ScalarValue::FixedSizeBinary(*lsize, min_max_option(lhs, rhs, ordering))
289            } else {
290                return internal_err!(
291                    "MIN/MAX is not expected to receive FixedSizeBinary of incompatible sizes {:?}",
292                    (lsize, rsize)
293                );
294            }
295        }
296        (ScalarValue::BinaryView(lhs), ScalarValue::BinaryView(rhs)) => {
297            ScalarValue::BinaryView(min_max_option(lhs, rhs, ordering))
298        }
299        (
300            ScalarValue::TimestampSecond(lhs, l_tz),
301            ScalarValue::TimestampSecond(rhs, _),
302        ) => {
303            ScalarValue::TimestampSecond(min_max_option(lhs, rhs, ordering), l_tz.clone())
304        }
305        (
306            ScalarValue::TimestampMillisecond(lhs, l_tz),
307            ScalarValue::TimestampMillisecond(rhs, _),
308        ) => ScalarValue::TimestampMillisecond(
309            min_max_option(lhs, rhs, ordering),
310            l_tz.clone(),
311        ),
312        (
313            ScalarValue::TimestampMicrosecond(lhs, l_tz),
314            ScalarValue::TimestampMicrosecond(rhs, _),
315        ) => ScalarValue::TimestampMicrosecond(
316            min_max_option(lhs, rhs, ordering),
317            l_tz.clone(),
318        ),
319        (
320            ScalarValue::TimestampNanosecond(lhs, l_tz),
321            ScalarValue::TimestampNanosecond(rhs, _),
322        ) => ScalarValue::TimestampNanosecond(
323            min_max_option(lhs, rhs, ordering),
324            l_tz.clone(),
325        ),
326        (ScalarValue::Date32(lhs), ScalarValue::Date32(rhs)) => {
327            ScalarValue::Date32(min_max_option(lhs, rhs, ordering))
328        }
329        (ScalarValue::Date64(lhs), ScalarValue::Date64(rhs)) => {
330            ScalarValue::Date64(min_max_option(lhs, rhs, ordering))
331        }
332        (ScalarValue::Time32Second(lhs), ScalarValue::Time32Second(rhs)) => {
333            ScalarValue::Time32Second(min_max_option(lhs, rhs, ordering))
334        }
335        (ScalarValue::Time32Millisecond(lhs), ScalarValue::Time32Millisecond(rhs)) => {
336            ScalarValue::Time32Millisecond(min_max_option(lhs, rhs, ordering))
337        }
338        (ScalarValue::Time64Microsecond(lhs), ScalarValue::Time64Microsecond(rhs)) => {
339            ScalarValue::Time64Microsecond(min_max_option(lhs, rhs, ordering))
340        }
341        (ScalarValue::Time64Nanosecond(lhs), ScalarValue::Time64Nanosecond(rhs)) => {
342            ScalarValue::Time64Nanosecond(min_max_option(lhs, rhs, ordering))
343        }
344        (ScalarValue::IntervalYearMonth(lhs), ScalarValue::IntervalYearMonth(rhs)) => {
345            ScalarValue::IntervalYearMonth(min_max_option(lhs, rhs, ordering))
346        }
347        (
348            ScalarValue::IntervalMonthDayNano(lhs),
349            ScalarValue::IntervalMonthDayNano(rhs),
350        ) => ScalarValue::IntervalMonthDayNano(min_max_option(lhs, rhs, ordering)),
351        (ScalarValue::IntervalDayTime(lhs), ScalarValue::IntervalDayTime(rhs)) => {
352            ScalarValue::IntervalDayTime(min_max_option(lhs, rhs, ordering))
353        }
354        (ScalarValue::IntervalYearMonth(_), ScalarValue::IntervalMonthDayNano(_))
355        | (ScalarValue::IntervalYearMonth(_), ScalarValue::IntervalDayTime(_))
356        | (ScalarValue::IntervalMonthDayNano(_), ScalarValue::IntervalDayTime(_))
357        | (ScalarValue::IntervalMonthDayNano(_), ScalarValue::IntervalYearMonth(_))
358        | (ScalarValue::IntervalDayTime(_), ScalarValue::IntervalYearMonth(_))
359        | (ScalarValue::IntervalDayTime(_), ScalarValue::IntervalMonthDayNano(_)) => {
360            return min_max_interval_scalar(lhs, rhs, ordering);
361        }
362        (ScalarValue::DurationSecond(lhs), ScalarValue::DurationSecond(rhs)) => {
363            ScalarValue::DurationSecond(min_max_option(lhs, rhs, ordering))
364        }
365        (
366            ScalarValue::DurationMillisecond(lhs),
367            ScalarValue::DurationMillisecond(rhs),
368        ) => ScalarValue::DurationMillisecond(min_max_option(lhs, rhs, ordering)),
369        (
370            ScalarValue::DurationMicrosecond(lhs),
371            ScalarValue::DurationMicrosecond(rhs),
372        ) => ScalarValue::DurationMicrosecond(min_max_option(lhs, rhs, ordering)),
373        (ScalarValue::DurationNanosecond(lhs), ScalarValue::DurationNanosecond(rhs)) => {
374            ScalarValue::DurationNanosecond(min_max_option(lhs, rhs, ordering))
375        }
376        (ScalarValue::Struct(_), ScalarValue::Struct(_))
377        | (ScalarValue::List(_), ScalarValue::List(_))
378        | (ScalarValue::LargeList(_), ScalarValue::LargeList(_))
379        | (ScalarValue::FixedSizeList(_), ScalarValue::FixedSizeList(_)) => {
380            min_max_generic_scalar(lhs, rhs, ordering)
381        }
382        _ => {
383            return internal_err!(
384                "MIN/MAX is not expected to receive logically incompatible scalar values {:?}",
385                (lhs, rhs)
386            );
387        }
388    };
389
390    Ok(result)
391}
392
393/// An accumulator to compute the maximum value
394#[derive(Debug, Clone)]
395pub struct MaxAccumulator {
396    max: ScalarValue,
397}
398
399impl MaxAccumulator {
400    /// new max accumulator
401    pub fn try_new(datatype: &DataType) -> Result<Self> {
402        Ok(Self {
403            max: ScalarValue::try_from(datatype)?,
404        })
405    }
406}
407
408impl Accumulator for MaxAccumulator {
409    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
410        let values = &values[0];
411        let delta = &max_batch(values)?;
412        let new_max: Result<ScalarValue, DataFusionError> =
413            min_max!(&self.max, delta, max);
414        self.max = new_max?;
415        Ok(())
416    }
417
418    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
419        self.update_batch(states)
420    }
421
422    fn state(&mut self) -> Result<Vec<ScalarValue>> {
423        Ok(vec![self.evaluate()?])
424    }
425    fn evaluate(&mut self) -> Result<ScalarValue> {
426        Ok(self.max.clone())
427    }
428
429    fn size(&self) -> usize {
430        size_of_val(self) - size_of_val(&self.max) + self.max.size()
431    }
432}
433
434/// An accumulator to compute the minimum value
435#[derive(Debug, Clone)]
436pub struct MinAccumulator {
437    min: ScalarValue,
438}
439
440impl MinAccumulator {
441    /// new min accumulator
442    pub fn try_new(datatype: &DataType) -> Result<Self> {
443        Ok(Self {
444            min: ScalarValue::try_from(datatype)?,
445        })
446    }
447}
448
449impl Accumulator for MinAccumulator {
450    fn state(&mut self) -> Result<Vec<ScalarValue>> {
451        Ok(vec![self.evaluate()?])
452    }
453
454    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
455        let values = &values[0];
456        let delta = &min_batch(values)?;
457        let new_min: Result<ScalarValue, DataFusionError> =
458            min_max!(&self.min, delta, min);
459        self.min = new_min?;
460        Ok(())
461    }
462
463    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
464        self.update_batch(states)
465    }
466
467    fn evaluate(&mut self) -> Result<ScalarValue> {
468        Ok(self.min.clone())
469    }
470
471    fn size(&self) -> usize {
472        size_of_val(self) - size_of_val(&self.min) + self.min.size()
473    }
474}
475
476// Statically-typed version of min/max(array) -> ScalarValue for string types
477macro_rules! typed_min_max_batch_string {
478    ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
479        let array = downcast_value!($VALUES, $ARRAYTYPE);
480        let value = compute::$OP(array);
481        let value = value.and_then(|e| Some(e.to_string()));
482        ScalarValue::$SCALAR(value)
483    }};
484}
485
486// Statically-typed version of min/max(array) -> ScalarValue for binary types.
487macro_rules! typed_min_max_batch_binary {
488    ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
489        let array = downcast_value!($VALUES, $ARRAYTYPE);
490        let value = compute::$OP(array);
491        let value = value.and_then(|e| Some(e.to_vec()));
492        ScalarValue::$SCALAR(value)
493    }};
494}
495
496// Statically-typed version of min/max(array) -> ScalarValue for non-string types.
497macro_rules! typed_min_max_batch {
498    ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident $(, $EXTRA_ARGS:ident)*) => {{
499        let array = downcast_value!($VALUES, $ARRAYTYPE);
500        let value = compute::$OP(array);
501        ScalarValue::$SCALAR(value, $($EXTRA_ARGS.clone()),*)
502    }};
503}
504
505// Statically-typed version of min/max(array) -> ScalarValue  for non-string types.
506// this is a macro to support both operations (min and max).
507macro_rules! min_max_batch {
508    ($VALUES:expr, $OP:ident) => {{
509        match $VALUES.data_type() {
510            DataType::Null => ScalarValue::Null,
511            DataType::Decimal32(precision, scale) => {
512                typed_min_max_batch!(
513                    $VALUES,
514                    Decimal32Array,
515                    Decimal32,
516                    $OP,
517                    precision,
518                    scale
519                )
520            }
521            DataType::Decimal64(precision, scale) => {
522                typed_min_max_batch!(
523                    $VALUES,
524                    Decimal64Array,
525                    Decimal64,
526                    $OP,
527                    precision,
528                    scale
529                )
530            }
531            DataType::Decimal128(precision, scale) => {
532                typed_min_max_batch!(
533                    $VALUES,
534                    Decimal128Array,
535                    Decimal128,
536                    $OP,
537                    precision,
538                    scale
539                )
540            }
541            DataType::Decimal256(precision, scale) => {
542                typed_min_max_batch!(
543                    $VALUES,
544                    Decimal256Array,
545                    Decimal256,
546                    $OP,
547                    precision,
548                    scale
549                )
550            }
551            // all types that have a natural order
552            DataType::Float64 => {
553                typed_min_max_batch!($VALUES, Float64Array, Float64, $OP)
554            }
555            DataType::Float32 => {
556                typed_min_max_batch!($VALUES, Float32Array, Float32, $OP)
557            }
558            DataType::Float16 => {
559                typed_min_max_batch!($VALUES, Float16Array, Float16, $OP)
560            }
561            DataType::Int64 => typed_min_max_batch!($VALUES, Int64Array, Int64, $OP),
562            DataType::Int32 => typed_min_max_batch!($VALUES, Int32Array, Int32, $OP),
563            DataType::Int16 => typed_min_max_batch!($VALUES, Int16Array, Int16, $OP),
564            DataType::Int8 => typed_min_max_batch!($VALUES, Int8Array, Int8, $OP),
565            DataType::UInt64 => typed_min_max_batch!($VALUES, UInt64Array, UInt64, $OP),
566            DataType::UInt32 => typed_min_max_batch!($VALUES, UInt32Array, UInt32, $OP),
567            DataType::UInt16 => typed_min_max_batch!($VALUES, UInt16Array, UInt16, $OP),
568            DataType::UInt8 => typed_min_max_batch!($VALUES, UInt8Array, UInt8, $OP),
569            DataType::Timestamp(TimeUnit::Second, tz_opt) => {
570                typed_min_max_batch!(
571                    $VALUES,
572                    TimestampSecondArray,
573                    TimestampSecond,
574                    $OP,
575                    tz_opt
576                )
577            }
578            DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => typed_min_max_batch!(
579                $VALUES,
580                TimestampMillisecondArray,
581                TimestampMillisecond,
582                $OP,
583                tz_opt
584            ),
585            DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => typed_min_max_batch!(
586                $VALUES,
587                TimestampMicrosecondArray,
588                TimestampMicrosecond,
589                $OP,
590                tz_opt
591            ),
592            DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => typed_min_max_batch!(
593                $VALUES,
594                TimestampNanosecondArray,
595                TimestampNanosecond,
596                $OP,
597                tz_opt
598            ),
599            DataType::Date32 => typed_min_max_batch!($VALUES, Date32Array, Date32, $OP),
600            DataType::Date64 => typed_min_max_batch!($VALUES, Date64Array, Date64, $OP),
601            DataType::Time32(TimeUnit::Second) => {
602                typed_min_max_batch!($VALUES, Time32SecondArray, Time32Second, $OP)
603            }
604            DataType::Time32(TimeUnit::Millisecond) => {
605                typed_min_max_batch!(
606                    $VALUES,
607                    Time32MillisecondArray,
608                    Time32Millisecond,
609                    $OP
610                )
611            }
612            DataType::Time64(TimeUnit::Microsecond) => {
613                typed_min_max_batch!(
614                    $VALUES,
615                    Time64MicrosecondArray,
616                    Time64Microsecond,
617                    $OP
618                )
619            }
620            DataType::Time64(TimeUnit::Nanosecond) => {
621                typed_min_max_batch!(
622                    $VALUES,
623                    Time64NanosecondArray,
624                    Time64Nanosecond,
625                    $OP
626                )
627            }
628            DataType::Interval(IntervalUnit::YearMonth) => {
629                typed_min_max_batch!(
630                    $VALUES,
631                    IntervalYearMonthArray,
632                    IntervalYearMonth,
633                    $OP
634                )
635            }
636            DataType::Interval(IntervalUnit::DayTime) => {
637                typed_min_max_batch!($VALUES, IntervalDayTimeArray, IntervalDayTime, $OP)
638            }
639            DataType::Interval(IntervalUnit::MonthDayNano) => {
640                typed_min_max_batch!(
641                    $VALUES,
642                    IntervalMonthDayNanoArray,
643                    IntervalMonthDayNano,
644                    $OP
645                )
646            }
647            DataType::Duration(TimeUnit::Second) => {
648                typed_min_max_batch!($VALUES, DurationSecondArray, DurationSecond, $OP)
649            }
650            DataType::Duration(TimeUnit::Millisecond) => {
651                typed_min_max_batch!(
652                    $VALUES,
653                    DurationMillisecondArray,
654                    DurationMillisecond,
655                    $OP
656                )
657            }
658            DataType::Duration(TimeUnit::Microsecond) => {
659                typed_min_max_batch!(
660                    $VALUES,
661                    DurationMicrosecondArray,
662                    DurationMicrosecond,
663                    $OP
664                )
665            }
666            DataType::Duration(TimeUnit::Nanosecond) => {
667                typed_min_max_batch!(
668                    $VALUES,
669                    DurationNanosecondArray,
670                    DurationNanosecond,
671                    $OP
672                )
673            }
674            other => {
675                // This should have been handled before
676                return datafusion_common::internal_err!(
677                    "Min/Max accumulator not implemented for type {}",
678                    other
679                );
680            }
681        }
682    }};
683}
684
685/// dynamically-typed min(array) -> ScalarValue
686pub fn min_batch(values: &ArrayRef) -> Result<ScalarValue> {
687    Ok(match values.data_type() {
688        DataType::Utf8 => {
689            typed_min_max_batch_string!(values, StringArray, Utf8, min_string)
690        }
691        DataType::LargeUtf8 => {
692            typed_min_max_batch_string!(values, LargeStringArray, LargeUtf8, min_string)
693        }
694        DataType::Utf8View => {
695            typed_min_max_batch_string!(
696                values,
697                StringViewArray,
698                Utf8View,
699                min_string_view
700            )
701        }
702        DataType::Boolean => {
703            typed_min_max_batch!(values, BooleanArray, Boolean, min_boolean)
704        }
705        DataType::Binary => {
706            typed_min_max_batch_binary!(&values, BinaryArray, Binary, min_binary)
707        }
708        DataType::LargeBinary => {
709            typed_min_max_batch_binary!(
710                &values,
711                LargeBinaryArray,
712                LargeBinary,
713                min_binary
714            )
715        }
716        DataType::FixedSizeBinary(size) => {
717            let array = downcast_value!(&values, FixedSizeBinaryArray);
718            let value = compute::min_fixed_size_binary(array);
719            let value = value.map(|e| e.to_vec());
720            ScalarValue::FixedSizeBinary(*size, value)
721        }
722        DataType::BinaryView => {
723            typed_min_max_batch_binary!(
724                &values,
725                BinaryViewArray,
726                BinaryView,
727                min_binary_view
728            )
729        }
730        DataType::Struct(_)
731        | DataType::List(_)
732        | DataType::LargeList(_)
733        | DataType::FixedSizeList(_, _)
734        | DataType::Dictionary(_, _) => min_max_batch_generic(values, Ordering::Greater)?,
735        _ => min_max_batch!(values, min),
736    })
737}
738
739/// Generic min/max implementation for complex types
740fn min_max_batch_generic(array: &ArrayRef, ordering: Ordering) -> Result<ScalarValue> {
741    if array.len() == array.null_count() {
742        return ScalarValue::try_from(array.data_type());
743    }
744    let mut extreme = ScalarValue::try_from_array(array, 0)?;
745    for i in 1..array.len() {
746        let current = ScalarValue::try_from_array(array, i)?;
747        if current.is_null() {
748            continue;
749        }
750        if extreme.is_null() {
751            extreme = current;
752            continue;
753        }
754        let cmp = extreme.try_cmp(&current)?;
755        if cmp == ordering {
756            extreme = current;
757        }
758    }
759
760    Ok(extreme)
761}
762
763/// dynamically-typed max(array) -> ScalarValue
764pub fn max_batch(values: &ArrayRef) -> Result<ScalarValue> {
765    Ok(match values.data_type() {
766        DataType::Utf8 => {
767            typed_min_max_batch_string!(values, StringArray, Utf8, max_string)
768        }
769        DataType::LargeUtf8 => {
770            typed_min_max_batch_string!(values, LargeStringArray, LargeUtf8, max_string)
771        }
772        DataType::Utf8View => {
773            typed_min_max_batch_string!(
774                values,
775                StringViewArray,
776                Utf8View,
777                max_string_view
778            )
779        }
780        DataType::Boolean => {
781            typed_min_max_batch!(values, BooleanArray, Boolean, max_boolean)
782        }
783        DataType::Binary => {
784            typed_min_max_batch_binary!(&values, BinaryArray, Binary, max_binary)
785        }
786        DataType::BinaryView => {
787            typed_min_max_batch_binary!(
788                &values,
789                BinaryViewArray,
790                BinaryView,
791                max_binary_view
792            )
793        }
794        DataType::LargeBinary => {
795            typed_min_max_batch_binary!(
796                &values,
797                LargeBinaryArray,
798                LargeBinary,
799                max_binary
800            )
801        }
802        DataType::FixedSizeBinary(size) => {
803            let array = downcast_value!(&values, FixedSizeBinaryArray);
804            let value = compute::max_fixed_size_binary(array);
805            let value = value.map(|e| e.to_vec());
806            ScalarValue::FixedSizeBinary(*size, value)
807        }
808        DataType::Struct(_)
809        | DataType::List(_)
810        | DataType::LargeList(_)
811        | DataType::FixedSizeList(_, _)
812        | DataType::Dictionary(_, _) => min_max_batch_generic(values, Ordering::Less)?,
813        _ => min_max_batch!(values, max),
814    })
815}
816
817#[cfg(test)]
818mod tests {
819    use super::*;
820
821    #[test]
822    fn min_max_scalar_preserves_core_behaviors() -> Result<()> {
823        let cases = [
824            (
825                ScalarValue::Int32(Some(1)),
826                ScalarValue::Int32(Some(2)),
827                Ordering::Less,
828                ScalarValue::Int32(Some(2)),
829            ),
830            (
831                ScalarValue::Int32(Some(1)),
832                ScalarValue::Int32(Some(2)),
833                Ordering::Greater,
834                ScalarValue::Int32(Some(1)),
835            ),
836            (
837                ScalarValue::Utf8(Some("a".to_string())),
838                ScalarValue::Utf8(Some("b".to_string())),
839                Ordering::Less,
840                ScalarValue::Utf8(Some("b".to_string())),
841            ),
842            (
843                ScalarValue::Boolean(None),
844                ScalarValue::Boolean(Some(true)),
845                Ordering::Greater,
846                ScalarValue::Boolean(Some(true)),
847            ),
848        ];
849
850        for (lhs, rhs, ordering, expected) in cases {
851            assert_eq!(min_max_scalar(&lhs, &rhs, ordering)?, expected);
852        }
853
854        Ok(())
855    }
856
857    #[test]
858    fn min_max_scalar_float_uses_total_cmp_for_nan() -> Result<()> {
859        type F16 =
860            <arrow::datatypes::Float16Type as arrow::datatypes::ArrowPrimitiveType>::Native;
861
862        let lhs = ScalarValue::Float64(Some(f64::NAN));
863        let rhs = ScalarValue::Float64(Some(1.0));
864        assert_eq!(min_max_scalar(&lhs, &rhs, Ordering::Greater)?, rhs);
865        assert!(matches!(
866            min_max_scalar(&lhs, &rhs, Ordering::Less)?,
867            ScalarValue::Float64(Some(value)) if value.is_nan()
868        ));
869
870        let lhs = ScalarValue::Float32(Some(f32::NAN));
871        let rhs = ScalarValue::Float32(Some(1.0));
872        assert_eq!(min_max_scalar(&lhs, &rhs, Ordering::Greater)?, rhs);
873        assert!(matches!(
874            min_max_scalar(&lhs, &rhs, Ordering::Less)?,
875            ScalarValue::Float32(Some(value)) if value.is_nan()
876        ));
877
878        let lhs = ScalarValue::Float16(Some(F16::NAN));
879        let rhs = ScalarValue::Float16(Some(F16::from_f32(1.0)));
880        assert_eq!(min_max_scalar(&lhs, &rhs, Ordering::Greater)?, rhs);
881        assert!(matches!(
882            min_max_scalar(&lhs, &rhs, Ordering::Less)?,
883            ScalarValue::Float16(Some(value)) if value.is_nan()
884        ));
885        Ok(())
886    }
887
888    #[test]
889    fn min_max_decimal_mismatch_error_is_preserved() -> Result<()> {
890        let lhs = ScalarValue::Decimal128(Some(1), 10, 2);
891        let rhs = ScalarValue::Decimal128(Some(2), 11, 2);
892
893        let error = min_max_scalar(&lhs, &rhs, Ordering::Less).unwrap_err();
894        let message = error.to_string();
895
896        assert!(message.starts_with(&format!(
897            "Internal error: MIN/MAX is not expected to receive scalars of incompatible types {:?}",
898            (&lhs, &rhs)
899        )));
900        Ok(())
901    }
902
903    #[test]
904    fn min_max_fixed_size_binary_mismatch_error_is_preserved() -> Result<()> {
905        let lhs = ScalarValue::FixedSizeBinary(2, Some(vec![1, 2]));
906        let rhs = ScalarValue::FixedSizeBinary(3, Some(vec![1, 2, 3]));
907
908        let error = min_max_scalar(&lhs, &rhs, Ordering::Less).unwrap_err();
909        let message = error.to_string();
910
911        assert!(message.starts_with(
912            "Internal error: MIN/MAX is not expected to receive FixedSizeBinary of incompatible sizes (2, 3)"
913        ));
914        Ok(())
915    }
916
917    #[test]
918    fn min_max_mixed_interval_error_is_preserved() -> Result<()> {
919        let lhs = ScalarValue::IntervalYearMonth(Some(1));
920        let rhs = ScalarValue::IntervalDayTime(Some(
921            arrow::datatypes::IntervalDayTime::new(1, 0),
922        ));
923
924        let error = min_max_scalar(&lhs, &rhs, Ordering::Less).unwrap_err();
925        let message = error.to_string();
926
927        assert!(message.starts_with(
928            "Internal error: Comparison error while computing interval min/max"
929        ));
930        Ok(())
931    }
932
933    #[test]
934    fn min_max_dictionary_and_scalar_compare_by_inner_value() -> Result<()> {
935        let dictionary = ScalarValue::Dictionary(
936            Box::new(DataType::Int32),
937            Box::new(ScalarValue::Float32(Some(1.0))),
938        );
939        let scalar = ScalarValue::Float32(Some(2.0));
940
941        let result = min_max_scalar(&dictionary, &scalar, Ordering::Less)?;
942
943        assert_eq!(result, ScalarValue::Float32(Some(2.0)));
944        Ok(())
945    }
946
947    #[test]
948    fn min_max_dictionary_same_key_type_rewraps_result() -> Result<()> {
949        let lhs = ScalarValue::Dictionary(
950            Box::new(DataType::Int32),
951            Box::new(ScalarValue::Float32(Some(1.0))),
952        );
953        let rhs = ScalarValue::Dictionary(
954            Box::new(DataType::Int32),
955            Box::new(ScalarValue::Float32(Some(2.0))),
956        );
957
958        let result = min_max_scalar(&lhs, &rhs, Ordering::Less)?;
959
960        assert_eq!(
961            result,
962            ScalarValue::Dictionary(
963                Box::new(DataType::Int32),
964                Box::new(ScalarValue::Float32(Some(2.0))),
965            )
966        );
967        Ok(())
968    }
969
970    #[test]
971    fn min_max_dictionary_different_key_types_error() -> Result<()> {
972        let lhs = ScalarValue::Dictionary(
973            Box::new(DataType::Int8),
974            Box::new(ScalarValue::Float32(Some(1.0))),
975        );
976        let rhs = ScalarValue::Dictionary(
977            Box::new(DataType::Int32),
978            Box::new(ScalarValue::Float32(Some(2.0))),
979        );
980
981        let error: DataFusionError =
982            min_max_scalar(&lhs, &rhs, Ordering::Less).unwrap_err();
983
984        assert!(
985            error
986                .to_string()
987                .contains("dictionary scalars with different key types")
988        );
989        Ok(())
990    }
991
992    #[test]
993    fn min_max_dictionary_and_incompatible_scalar_error() -> Result<()> {
994        let dictionary = ScalarValue::Dictionary(
995            Box::new(DataType::Int32),
996            Box::new(ScalarValue::Float32(Some(1.0))),
997        );
998        let scalar = ScalarValue::Int32(Some(2));
999
1000        let error: DataFusionError =
1001            min_max_scalar(&dictionary, &scalar, Ordering::Less).unwrap_err();
1002
1003        assert!(
1004            error
1005                .to_string()
1006                .contains("logically incompatible scalar values")
1007        );
1008        Ok(())
1009    }
1010}