datafusion_functions_aggregate_common/
min_max.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Basic min/max functionality shared across DataFusion aggregate functions
19
20use arrow::array::{
21    ArrayRef, AsArray as _, BinaryArray, BinaryViewArray, BooleanArray, Date32Array,
22    Date64Array, Decimal128Array, Decimal256Array, Decimal32Array, Decimal64Array,
23    DurationMicrosecondArray, DurationMillisecondArray, DurationNanosecondArray,
24    DurationSecondArray, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array,
25    Int16Array, Int32Array, Int64Array, Int8Array, IntervalDayTimeArray,
26    IntervalMonthDayNanoArray, IntervalYearMonthArray, LargeBinaryArray,
27    LargeStringArray, StringArray, StringViewArray, Time32MillisecondArray,
28    Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray,
29    TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
30    TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
31};
32use arrow::compute;
33use arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
34use datafusion_common::{
35    downcast_value, internal_err, DataFusionError, Result, ScalarValue,
36};
37use datafusion_expr_common::accumulator::Accumulator;
38use std::{cmp::Ordering, mem::size_of_val};
39
40// min/max of two non-string scalar values.
41macro_rules! typed_min_max {
42    ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident $(, $EXTRA_ARGS:ident)*) => {{
43        ScalarValue::$SCALAR(
44            match ($VALUE, $DELTA) {
45                (None, None) => None,
46                (Some(a), None) => Some(*a),
47                (None, Some(b)) => Some(*b),
48                (Some(a), Some(b)) => Some((*a).$OP(*b)),
49            },
50            $($EXTRA_ARGS.clone()),*
51        )
52    }};
53}
54
55macro_rules! typed_min_max_float {
56    ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident) => {{
57        ScalarValue::$SCALAR(match ($VALUE, $DELTA) {
58            (None, None) => None,
59            (Some(a), None) => Some(*a),
60            (None, Some(b)) => Some(*b),
61            (Some(a), Some(b)) => match a.total_cmp(b) {
62                choose_min_max!($OP) => Some(*b),
63                _ => Some(*a),
64            },
65        })
66    }};
67}
68
69// min/max of two scalar string values.
70macro_rules! typed_min_max_string {
71    ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident) => {{
72        ScalarValue::$SCALAR(match ($VALUE, $DELTA) {
73            (None, None) => None,
74            (Some(a), None) => Some(a.clone()),
75            (None, Some(b)) => Some(b.clone()),
76            (Some(a), Some(b)) => Some((a).$OP(b).clone()),
77        })
78    }};
79}
80
81// min/max of two scalar string values with a prefix argument.
82macro_rules! typed_min_max_string_arg {
83    ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident, $ARG:expr) => {{
84        ScalarValue::$SCALAR(
85            $ARG,
86            match ($VALUE, $DELTA) {
87                (None, None) => None,
88                (Some(a), None) => Some(a.clone()),
89                (None, Some(b)) => Some(b.clone()),
90                (Some(a), Some(b)) => Some((a).$OP(b).clone()),
91            },
92        )
93    }};
94}
95
96macro_rules! choose_min_max {
97    (min) => {
98        std::cmp::Ordering::Greater
99    };
100    (max) => {
101        std::cmp::Ordering::Less
102    };
103}
104
105macro_rules! interval_min_max {
106    ($OP:tt, $LHS:expr, $RHS:expr) => {{
107        match $LHS.partial_cmp(&$RHS) {
108            Some(choose_min_max!($OP)) => $RHS.clone(),
109            Some(_) => $LHS.clone(),
110            None => {
111                return internal_err!("Comparison error while computing interval min/max")
112            }
113        }
114    }};
115}
116
117macro_rules! min_max_generic {
118    ($VALUE:expr, $DELTA:expr, $OP:ident) => {{
119        if $VALUE.is_null() {
120            let mut delta_copy = $DELTA.clone();
121            // When the new value won we want to compact it to
122            // avoid storing the entire input
123            delta_copy.compact();
124            delta_copy
125        } else if $DELTA.is_null() {
126            $VALUE.clone()
127        } else {
128            match $VALUE.partial_cmp(&$DELTA) {
129                Some(choose_min_max!($OP)) => {
130                    // When the new value won we want to compact it to
131                    // avoid storing the entire input
132                    let mut delta_copy = $DELTA.clone();
133                    delta_copy.compact();
134                    delta_copy
135                }
136                _ => $VALUE.clone(),
137            }
138        }
139    }};
140}
141
142// min/max of two scalar values of the same type
143macro_rules! min_max {
144    ($VALUE:expr, $DELTA:expr, $OP:ident) => {{
145        Ok(match ($VALUE, $DELTA) {
146            (ScalarValue::Null, ScalarValue::Null) => ScalarValue::Null,
147            (
148                lhs @ ScalarValue::Decimal32(lhsv, lhsp, lhss),
149                rhs @ ScalarValue::Decimal32(rhsv, rhsp, rhss)
150            ) => {
151                if lhsp.eq(rhsp) && lhss.eq(rhss) {
152                    typed_min_max!(lhsv, rhsv, Decimal32, $OP, lhsp, lhss)
153                } else {
154                    return internal_err!(
155                    "MIN/MAX is not expected to receive scalars of incompatible types {:?}",
156                    (lhs, rhs)
157                );
158                }
159            }
160            (
161                lhs @ ScalarValue::Decimal64(lhsv, lhsp, lhss),
162                rhs @ ScalarValue::Decimal64(rhsv, rhsp, rhss)
163            ) => {
164                if lhsp.eq(rhsp) && lhss.eq(rhss) {
165                    typed_min_max!(lhsv, rhsv, Decimal64, $OP, lhsp, lhss)
166                } else {
167                    return internal_err!(
168                    "MIN/MAX is not expected to receive scalars of incompatible types {:?}",
169                    (lhs, rhs)
170                );
171                }
172            }
173            (
174                lhs @ ScalarValue::Decimal128(lhsv, lhsp, lhss),
175                rhs @ ScalarValue::Decimal128(rhsv, rhsp, rhss)
176            ) => {
177                if lhsp.eq(rhsp) && lhss.eq(rhss) {
178                    typed_min_max!(lhsv, rhsv, Decimal128, $OP, lhsp, lhss)
179                } else {
180                    return internal_err!(
181                    "MIN/MAX is not expected to receive scalars of incompatible types {:?}",
182                    (lhs, rhs)
183                );
184                }
185            }
186            (
187                lhs @ ScalarValue::Decimal256(lhsv, lhsp, lhss),
188                rhs @ ScalarValue::Decimal256(rhsv, rhsp, rhss)
189            ) => {
190                if lhsp.eq(rhsp) && lhss.eq(rhss) {
191                    typed_min_max!(lhsv, rhsv, Decimal256, $OP, lhsp, lhss)
192                } else {
193                    return internal_err!(
194                    "MIN/MAX is not expected to receive scalars of incompatible types {:?}",
195                    (lhs, rhs)
196                );
197                }
198            }
199            (ScalarValue::Boolean(lhs), ScalarValue::Boolean(rhs)) => {
200                typed_min_max!(lhs, rhs, Boolean, $OP)
201            }
202            (ScalarValue::Float64(lhs), ScalarValue::Float64(rhs)) => {
203                typed_min_max_float!(lhs, rhs, Float64, $OP)
204            }
205            (ScalarValue::Float32(lhs), ScalarValue::Float32(rhs)) => {
206                typed_min_max_float!(lhs, rhs, Float32, $OP)
207            }
208            (ScalarValue::Float16(lhs), ScalarValue::Float16(rhs)) => {
209                typed_min_max_float!(lhs, rhs, Float16, $OP)
210            }
211            (ScalarValue::UInt64(lhs), ScalarValue::UInt64(rhs)) => {
212                typed_min_max!(lhs, rhs, UInt64, $OP)
213            }
214            (ScalarValue::UInt32(lhs), ScalarValue::UInt32(rhs)) => {
215                typed_min_max!(lhs, rhs, UInt32, $OP)
216            }
217            (ScalarValue::UInt16(lhs), ScalarValue::UInt16(rhs)) => {
218                typed_min_max!(lhs, rhs, UInt16, $OP)
219            }
220            (ScalarValue::UInt8(lhs), ScalarValue::UInt8(rhs)) => {
221                typed_min_max!(lhs, rhs, UInt8, $OP)
222            }
223            (ScalarValue::Int64(lhs), ScalarValue::Int64(rhs)) => {
224                typed_min_max!(lhs, rhs, Int64, $OP)
225            }
226            (ScalarValue::Int32(lhs), ScalarValue::Int32(rhs)) => {
227                typed_min_max!(lhs, rhs, Int32, $OP)
228            }
229            (ScalarValue::Int16(lhs), ScalarValue::Int16(rhs)) => {
230                typed_min_max!(lhs, rhs, Int16, $OP)
231            }
232            (ScalarValue::Int8(lhs), ScalarValue::Int8(rhs)) => {
233                typed_min_max!(lhs, rhs, Int8, $OP)
234            }
235            (ScalarValue::Utf8(lhs), ScalarValue::Utf8(rhs)) => {
236                typed_min_max_string!(lhs, rhs, Utf8, $OP)
237            }
238            (ScalarValue::LargeUtf8(lhs), ScalarValue::LargeUtf8(rhs)) => {
239                typed_min_max_string!(lhs, rhs, LargeUtf8, $OP)
240            }
241            (ScalarValue::Utf8View(lhs), ScalarValue::Utf8View(rhs)) => {
242                typed_min_max_string!(lhs, rhs, Utf8View, $OP)
243            }
244            (ScalarValue::Binary(lhs), ScalarValue::Binary(rhs)) => {
245                typed_min_max_string!(lhs, rhs, Binary, $OP)
246            }
247            (ScalarValue::LargeBinary(lhs), ScalarValue::LargeBinary(rhs)) => {
248                typed_min_max_string!(lhs, rhs, LargeBinary, $OP)
249            }
250            (ScalarValue::FixedSizeBinary(lsize, lhs), ScalarValue::FixedSizeBinary(rsize, rhs)) => {
251                if lsize == rsize {
252                    typed_min_max_string_arg!(lhs, rhs, FixedSizeBinary, $OP, *lsize)
253                }
254                else {
255                    return internal_err!(
256                        "MIN/MAX is not expected to receive FixedSizeBinary of incompatible sizes {:?}",
257                        (lsize, rsize))
258                }
259            }
260            (ScalarValue::BinaryView(lhs), ScalarValue::BinaryView(rhs)) => {
261                typed_min_max_string!(lhs, rhs, BinaryView, $OP)
262            }
263            (ScalarValue::TimestampSecond(lhs, l_tz), ScalarValue::TimestampSecond(rhs, _)) => {
264                typed_min_max!(lhs, rhs, TimestampSecond, $OP, l_tz)
265            }
266            (
267                ScalarValue::TimestampMillisecond(lhs, l_tz),
268                ScalarValue::TimestampMillisecond(rhs, _),
269            ) => {
270                typed_min_max!(lhs, rhs, TimestampMillisecond, $OP, l_tz)
271            }
272            (
273                ScalarValue::TimestampMicrosecond(lhs, l_tz),
274                ScalarValue::TimestampMicrosecond(rhs, _),
275            ) => {
276                typed_min_max!(lhs, rhs, TimestampMicrosecond, $OP, l_tz)
277            }
278            (
279                ScalarValue::TimestampNanosecond(lhs, l_tz),
280                ScalarValue::TimestampNanosecond(rhs, _),
281            ) => {
282                typed_min_max!(lhs, rhs, TimestampNanosecond, $OP, l_tz)
283            }
284            (
285                ScalarValue::Date32(lhs),
286                ScalarValue::Date32(rhs),
287            ) => {
288                typed_min_max!(lhs, rhs, Date32, $OP)
289            }
290            (
291                ScalarValue::Date64(lhs),
292                ScalarValue::Date64(rhs),
293            ) => {
294                typed_min_max!(lhs, rhs, Date64, $OP)
295            }
296            (
297                ScalarValue::Time32Second(lhs),
298                ScalarValue::Time32Second(rhs),
299            ) => {
300                typed_min_max!(lhs, rhs, Time32Second, $OP)
301            }
302            (
303                ScalarValue::Time32Millisecond(lhs),
304                ScalarValue::Time32Millisecond(rhs),
305            ) => {
306                typed_min_max!(lhs, rhs, Time32Millisecond, $OP)
307            }
308            (
309                ScalarValue::Time64Microsecond(lhs),
310                ScalarValue::Time64Microsecond(rhs),
311            ) => {
312                typed_min_max!(lhs, rhs, Time64Microsecond, $OP)
313            }
314            (
315                ScalarValue::Time64Nanosecond(lhs),
316                ScalarValue::Time64Nanosecond(rhs),
317            ) => {
318                typed_min_max!(lhs, rhs, Time64Nanosecond, $OP)
319            }
320            (
321                ScalarValue::IntervalYearMonth(lhs),
322                ScalarValue::IntervalYearMonth(rhs),
323            ) => {
324                typed_min_max!(lhs, rhs, IntervalYearMonth, $OP)
325            }
326            (
327                ScalarValue::IntervalMonthDayNano(lhs),
328                ScalarValue::IntervalMonthDayNano(rhs),
329            ) => {
330                typed_min_max!(lhs, rhs, IntervalMonthDayNano, $OP)
331            }
332            (
333                ScalarValue::IntervalDayTime(lhs),
334                ScalarValue::IntervalDayTime(rhs),
335            ) => {
336                typed_min_max!(lhs, rhs, IntervalDayTime, $OP)
337            }
338            (
339                ScalarValue::IntervalYearMonth(_),
340                ScalarValue::IntervalMonthDayNano(_),
341            ) | (
342                ScalarValue::IntervalYearMonth(_),
343                ScalarValue::IntervalDayTime(_),
344            ) | (
345                ScalarValue::IntervalMonthDayNano(_),
346                ScalarValue::IntervalDayTime(_),
347            ) | (
348                ScalarValue::IntervalMonthDayNano(_),
349                ScalarValue::IntervalYearMonth(_),
350            ) | (
351                ScalarValue::IntervalDayTime(_),
352                ScalarValue::IntervalYearMonth(_),
353            ) | (
354                ScalarValue::IntervalDayTime(_),
355                ScalarValue::IntervalMonthDayNano(_),
356            ) => {
357                interval_min_max!($OP, $VALUE, $DELTA)
358            }
359                    (
360                ScalarValue::DurationSecond(lhs),
361                ScalarValue::DurationSecond(rhs),
362            ) => {
363                typed_min_max!(lhs, rhs, DurationSecond, $OP)
364            }
365                                (
366                ScalarValue::DurationMillisecond(lhs),
367                ScalarValue::DurationMillisecond(rhs),
368            ) => {
369                typed_min_max!(lhs, rhs, DurationMillisecond, $OP)
370            }
371                                (
372                ScalarValue::DurationMicrosecond(lhs),
373                ScalarValue::DurationMicrosecond(rhs),
374            ) => {
375                typed_min_max!(lhs, rhs, DurationMicrosecond, $OP)
376            }
377                                        (
378                ScalarValue::DurationNanosecond(lhs),
379                ScalarValue::DurationNanosecond(rhs),
380            ) => {
381                typed_min_max!(lhs, rhs, DurationNanosecond, $OP)
382            }
383
384            (
385                lhs @ ScalarValue::Struct(_),
386                rhs @ ScalarValue::Struct(_),
387            ) => {
388                min_max_generic!(lhs, rhs, $OP)
389            }
390
391            (
392                lhs @ ScalarValue::List(_),
393                rhs @ ScalarValue::List(_),
394            ) => {
395                min_max_generic!(lhs, rhs, $OP)
396            }
397
398
399            (
400                lhs @ ScalarValue::LargeList(_),
401                rhs @ ScalarValue::LargeList(_),
402            ) => {
403                min_max_generic!(lhs, rhs, $OP)
404            }
405
406
407            (
408                lhs @ ScalarValue::FixedSizeList(_),
409                rhs @ ScalarValue::FixedSizeList(_),
410            ) => {
411                min_max_generic!(lhs, rhs, $OP)
412            }
413
414            e => {
415                return internal_err!(
416                    "MIN/MAX is not expected to receive scalars of incompatible types {:?}",
417                    e
418                )
419            }
420        })
421    }};
422}
423
424/// An accumulator to compute the maximum value
425#[derive(Debug, Clone)]
426pub struct MaxAccumulator {
427    max: ScalarValue,
428}
429
430impl MaxAccumulator {
431    /// new max accumulator
432    pub fn try_new(datatype: &DataType) -> Result<Self> {
433        Ok(Self {
434            max: ScalarValue::try_from(datatype)?,
435        })
436    }
437}
438
439impl Accumulator for MaxAccumulator {
440    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
441        let values = &values[0];
442        let delta = &max_batch(values)?;
443        let new_max: Result<ScalarValue, DataFusionError> =
444            min_max!(&self.max, delta, max);
445        self.max = new_max?;
446        Ok(())
447    }
448
449    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
450        self.update_batch(states)
451    }
452
453    fn state(&mut self) -> Result<Vec<ScalarValue>> {
454        Ok(vec![self.evaluate()?])
455    }
456    fn evaluate(&mut self) -> Result<ScalarValue> {
457        Ok(self.max.clone())
458    }
459
460    fn size(&self) -> usize {
461        size_of_val(self) - size_of_val(&self.max) + self.max.size()
462    }
463}
464
465/// An accumulator to compute the minimum value
466#[derive(Debug, Clone)]
467pub struct MinAccumulator {
468    min: ScalarValue,
469}
470
471impl MinAccumulator {
472    /// new min accumulator
473    pub fn try_new(datatype: &DataType) -> Result<Self> {
474        Ok(Self {
475            min: ScalarValue::try_from(datatype)?,
476        })
477    }
478}
479
480impl Accumulator for MinAccumulator {
481    fn state(&mut self) -> Result<Vec<ScalarValue>> {
482        Ok(vec![self.evaluate()?])
483    }
484
485    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
486        let values = &values[0];
487        let delta = &min_batch(values)?;
488        let new_min: Result<ScalarValue, DataFusionError> =
489            min_max!(&self.min, delta, min);
490        self.min = new_min?;
491        Ok(())
492    }
493
494    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
495        self.update_batch(states)
496    }
497
498    fn evaluate(&mut self) -> Result<ScalarValue> {
499        Ok(self.min.clone())
500    }
501
502    fn size(&self) -> usize {
503        size_of_val(self) - size_of_val(&self.min) + self.min.size()
504    }
505}
506
507// Statically-typed version of min/max(array) -> ScalarValue for string types
508macro_rules! typed_min_max_batch_string {
509    ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
510        let array = downcast_value!($VALUES, $ARRAYTYPE);
511        let value = compute::$OP(array);
512        let value = value.and_then(|e| Some(e.to_string()));
513        ScalarValue::$SCALAR(value)
514    }};
515}
516
517// Statically-typed version of min/max(array) -> ScalarValue for binary types.
518macro_rules! typed_min_max_batch_binary {
519    ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
520        let array = downcast_value!($VALUES, $ARRAYTYPE);
521        let value = compute::$OP(array);
522        let value = value.and_then(|e| Some(e.to_vec()));
523        ScalarValue::$SCALAR(value)
524    }};
525}
526
527// Statically-typed version of min/max(array) -> ScalarValue for non-string types.
528macro_rules! typed_min_max_batch {
529    ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident $(, $EXTRA_ARGS:ident)*) => {{
530        let array = downcast_value!($VALUES, $ARRAYTYPE);
531        let value = compute::$OP(array);
532        ScalarValue::$SCALAR(value, $($EXTRA_ARGS.clone()),*)
533    }};
534}
535
536// Statically-typed version of min/max(array) -> ScalarValue  for non-string types.
537// this is a macro to support both operations (min and max).
538macro_rules! min_max_batch {
539    ($VALUES:expr, $OP:ident) => {{
540        match $VALUES.data_type() {
541            DataType::Null => ScalarValue::Null,
542            DataType::Decimal32(precision, scale) => {
543                typed_min_max_batch!(
544                    $VALUES,
545                    Decimal32Array,
546                    Decimal32,
547                    $OP,
548                    precision,
549                    scale
550                )
551            }
552            DataType::Decimal64(precision, scale) => {
553                typed_min_max_batch!(
554                    $VALUES,
555                    Decimal64Array,
556                    Decimal64,
557                    $OP,
558                    precision,
559                    scale
560                )
561            }
562            DataType::Decimal128(precision, scale) => {
563                typed_min_max_batch!(
564                    $VALUES,
565                    Decimal128Array,
566                    Decimal128,
567                    $OP,
568                    precision,
569                    scale
570                )
571            }
572            DataType::Decimal256(precision, scale) => {
573                typed_min_max_batch!(
574                    $VALUES,
575                    Decimal256Array,
576                    Decimal256,
577                    $OP,
578                    precision,
579                    scale
580                )
581            }
582            // all types that have a natural order
583            DataType::Float64 => {
584                typed_min_max_batch!($VALUES, Float64Array, Float64, $OP)
585            }
586            DataType::Float32 => {
587                typed_min_max_batch!($VALUES, Float32Array, Float32, $OP)
588            }
589            DataType::Float16 => {
590                typed_min_max_batch!($VALUES, Float16Array, Float16, $OP)
591            }
592            DataType::Int64 => typed_min_max_batch!($VALUES, Int64Array, Int64, $OP),
593            DataType::Int32 => typed_min_max_batch!($VALUES, Int32Array, Int32, $OP),
594            DataType::Int16 => typed_min_max_batch!($VALUES, Int16Array, Int16, $OP),
595            DataType::Int8 => typed_min_max_batch!($VALUES, Int8Array, Int8, $OP),
596            DataType::UInt64 => typed_min_max_batch!($VALUES, UInt64Array, UInt64, $OP),
597            DataType::UInt32 => typed_min_max_batch!($VALUES, UInt32Array, UInt32, $OP),
598            DataType::UInt16 => typed_min_max_batch!($VALUES, UInt16Array, UInt16, $OP),
599            DataType::UInt8 => typed_min_max_batch!($VALUES, UInt8Array, UInt8, $OP),
600            DataType::Timestamp(TimeUnit::Second, tz_opt) => {
601                typed_min_max_batch!(
602                    $VALUES,
603                    TimestampSecondArray,
604                    TimestampSecond,
605                    $OP,
606                    tz_opt
607                )
608            }
609            DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => typed_min_max_batch!(
610                $VALUES,
611                TimestampMillisecondArray,
612                TimestampMillisecond,
613                $OP,
614                tz_opt
615            ),
616            DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => typed_min_max_batch!(
617                $VALUES,
618                TimestampMicrosecondArray,
619                TimestampMicrosecond,
620                $OP,
621                tz_opt
622            ),
623            DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => typed_min_max_batch!(
624                $VALUES,
625                TimestampNanosecondArray,
626                TimestampNanosecond,
627                $OP,
628                tz_opt
629            ),
630            DataType::Date32 => typed_min_max_batch!($VALUES, Date32Array, Date32, $OP),
631            DataType::Date64 => typed_min_max_batch!($VALUES, Date64Array, Date64, $OP),
632            DataType::Time32(TimeUnit::Second) => {
633                typed_min_max_batch!($VALUES, Time32SecondArray, Time32Second, $OP)
634            }
635            DataType::Time32(TimeUnit::Millisecond) => {
636                typed_min_max_batch!(
637                    $VALUES,
638                    Time32MillisecondArray,
639                    Time32Millisecond,
640                    $OP
641                )
642            }
643            DataType::Time64(TimeUnit::Microsecond) => {
644                typed_min_max_batch!(
645                    $VALUES,
646                    Time64MicrosecondArray,
647                    Time64Microsecond,
648                    $OP
649                )
650            }
651            DataType::Time64(TimeUnit::Nanosecond) => {
652                typed_min_max_batch!(
653                    $VALUES,
654                    Time64NanosecondArray,
655                    Time64Nanosecond,
656                    $OP
657                )
658            }
659            DataType::Interval(IntervalUnit::YearMonth) => {
660                typed_min_max_batch!(
661                    $VALUES,
662                    IntervalYearMonthArray,
663                    IntervalYearMonth,
664                    $OP
665                )
666            }
667            DataType::Interval(IntervalUnit::DayTime) => {
668                typed_min_max_batch!($VALUES, IntervalDayTimeArray, IntervalDayTime, $OP)
669            }
670            DataType::Interval(IntervalUnit::MonthDayNano) => {
671                typed_min_max_batch!(
672                    $VALUES,
673                    IntervalMonthDayNanoArray,
674                    IntervalMonthDayNano,
675                    $OP
676                )
677            }
678            DataType::Duration(TimeUnit::Second) => {
679                typed_min_max_batch!($VALUES, DurationSecondArray, DurationSecond, $OP)
680            }
681            DataType::Duration(TimeUnit::Millisecond) => {
682                typed_min_max_batch!(
683                    $VALUES,
684                    DurationMillisecondArray,
685                    DurationMillisecond,
686                    $OP
687                )
688            }
689            DataType::Duration(TimeUnit::Microsecond) => {
690                typed_min_max_batch!(
691                    $VALUES,
692                    DurationMicrosecondArray,
693                    DurationMicrosecond,
694                    $OP
695                )
696            }
697            DataType::Duration(TimeUnit::Nanosecond) => {
698                typed_min_max_batch!(
699                    $VALUES,
700                    DurationNanosecondArray,
701                    DurationNanosecond,
702                    $OP
703                )
704            }
705            other => {
706                // This should have been handled before
707                return datafusion_common::internal_err!(
708                    "Min/Max accumulator not implemented for type {}",
709                    other
710                );
711            }
712        }
713    }};
714}
715
716/// dynamically-typed min(array) -> ScalarValue
717pub fn min_batch(values: &ArrayRef) -> Result<ScalarValue> {
718    Ok(match values.data_type() {
719        DataType::Utf8 => {
720            typed_min_max_batch_string!(values, StringArray, Utf8, min_string)
721        }
722        DataType::LargeUtf8 => {
723            typed_min_max_batch_string!(values, LargeStringArray, LargeUtf8, min_string)
724        }
725        DataType::Utf8View => {
726            typed_min_max_batch_string!(
727                values,
728                StringViewArray,
729                Utf8View,
730                min_string_view
731            )
732        }
733        DataType::Boolean => {
734            typed_min_max_batch!(values, BooleanArray, Boolean, min_boolean)
735        }
736        DataType::Binary => {
737            typed_min_max_batch_binary!(&values, BinaryArray, Binary, min_binary)
738        }
739        DataType::LargeBinary => {
740            typed_min_max_batch_binary!(
741                &values,
742                LargeBinaryArray,
743                LargeBinary,
744                min_binary
745            )
746        }
747        DataType::FixedSizeBinary(size) => {
748            let array = downcast_value!(&values, FixedSizeBinaryArray);
749            let value = compute::min_fixed_size_binary(array);
750            let value = value.map(|e| e.to_vec());
751            ScalarValue::FixedSizeBinary(*size, value)
752        }
753        DataType::BinaryView => {
754            typed_min_max_batch_binary!(
755                &values,
756                BinaryViewArray,
757                BinaryView,
758                min_binary_view
759            )
760        }
761        DataType::Struct(_) => min_max_batch_generic(values, Ordering::Greater)?,
762        DataType::List(_) => min_max_batch_generic(values, Ordering::Greater)?,
763        DataType::LargeList(_) => min_max_batch_generic(values, Ordering::Greater)?,
764        DataType::FixedSizeList(_, _) => {
765            min_max_batch_generic(values, Ordering::Greater)?
766        }
767        DataType::Dictionary(_, _) => {
768            let values = values.as_any_dictionary().values();
769            min_batch(values)?
770        }
771        _ => min_max_batch!(values, min),
772    })
773}
774
775/// Generic min/max implementation for complex types
776fn min_max_batch_generic(array: &ArrayRef, ordering: Ordering) -> Result<ScalarValue> {
777    if array.len() == array.null_count() {
778        return ScalarValue::try_from(array.data_type());
779    }
780    let mut extreme = ScalarValue::try_from_array(array, 0)?;
781    for i in 1..array.len() {
782        let current = ScalarValue::try_from_array(array, i)?;
783        if current.is_null() {
784            continue;
785        }
786        if extreme.is_null() {
787            extreme = current;
788            continue;
789        }
790        let cmp = extreme.try_cmp(&current)?;
791        if cmp == ordering {
792            extreme = current;
793        }
794    }
795
796    Ok(extreme)
797}
798
799/// dynamically-typed max(array) -> ScalarValue
800pub fn max_batch(values: &ArrayRef) -> Result<ScalarValue> {
801    Ok(match values.data_type() {
802        DataType::Utf8 => {
803            typed_min_max_batch_string!(values, StringArray, Utf8, max_string)
804        }
805        DataType::LargeUtf8 => {
806            typed_min_max_batch_string!(values, LargeStringArray, LargeUtf8, max_string)
807        }
808        DataType::Utf8View => {
809            typed_min_max_batch_string!(
810                values,
811                StringViewArray,
812                Utf8View,
813                max_string_view
814            )
815        }
816        DataType::Boolean => {
817            typed_min_max_batch!(values, BooleanArray, Boolean, max_boolean)
818        }
819        DataType::Binary => {
820            typed_min_max_batch_binary!(&values, BinaryArray, Binary, max_binary)
821        }
822        DataType::BinaryView => {
823            typed_min_max_batch_binary!(
824                &values,
825                BinaryViewArray,
826                BinaryView,
827                max_binary_view
828            )
829        }
830        DataType::LargeBinary => {
831            typed_min_max_batch_binary!(
832                &values,
833                LargeBinaryArray,
834                LargeBinary,
835                max_binary
836            )
837        }
838        DataType::FixedSizeBinary(size) => {
839            let array = downcast_value!(&values, FixedSizeBinaryArray);
840            let value = compute::max_fixed_size_binary(array);
841            let value = value.map(|e| e.to_vec());
842            ScalarValue::FixedSizeBinary(*size, value)
843        }
844        DataType::Struct(_) => min_max_batch_generic(values, Ordering::Less)?,
845        DataType::List(_) => min_max_batch_generic(values, Ordering::Less)?,
846        DataType::LargeList(_) => min_max_batch_generic(values, Ordering::Less)?,
847        DataType::FixedSizeList(_, _) => min_max_batch_generic(values, Ordering::Less)?,
848        DataType::Dictionary(_, _) => {
849            let values = values.as_any_dictionary().values();
850            max_batch(values)?
851        }
852        _ => min_max_batch!(values, max),
853    })
854}