datafusion_functions_aggregate_common/
min_max.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Basic min/max functionality shared across DataFusion aggregate functions
19
20use arrow::array::{
21    ArrayRef, AsArray as _, BinaryArray, BinaryViewArray, BooleanArray, Date32Array,
22    Date64Array, Decimal32Array, Decimal64Array, Decimal128Array, Decimal256Array,
23    DurationMicrosecondArray, DurationMillisecondArray, DurationNanosecondArray,
24    DurationSecondArray, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array,
25    Int8Array, Int16Array, Int32Array, Int64Array, IntervalDayTimeArray,
26    IntervalMonthDayNanoArray, IntervalYearMonthArray, LargeBinaryArray,
27    LargeStringArray, StringArray, StringViewArray, Time32MillisecondArray,
28    Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray,
29    TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
30    TimestampSecondArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
31};
32use arrow::compute;
33use arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
34use datafusion_common::{
35    DataFusionError, Result, ScalarValue, downcast_value, internal_err,
36};
37use datafusion_expr_common::accumulator::Accumulator;
38use std::{cmp::Ordering, mem::size_of_val};
39
40// min/max of two non-string scalar values.
41macro_rules! typed_min_max {
42    ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident $(, $EXTRA_ARGS:ident)*) => {{
43        ScalarValue::$SCALAR(
44            match ($VALUE, $DELTA) {
45                (None, None) => None,
46                (Some(a), None) => Some(*a),
47                (None, Some(b)) => Some(*b),
48                (Some(a), Some(b)) => Some((*a).$OP(*b)),
49            },
50            $($EXTRA_ARGS.clone()),*
51        )
52    }};
53}
54
55macro_rules! typed_min_max_float {
56    ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident) => {{
57        ScalarValue::$SCALAR(match ($VALUE, $DELTA) {
58            (None, None) => None,
59            (Some(a), None) => Some(*a),
60            (None, Some(b)) => Some(*b),
61            (Some(a), Some(b)) => match a.total_cmp(b) {
62                choose_min_max!($OP) => Some(*b),
63                _ => Some(*a),
64            },
65        })
66    }};
67}
68
69// min/max of two scalar string values.
70macro_rules! typed_min_max_string {
71    ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident) => {{
72        ScalarValue::$SCALAR(match ($VALUE, $DELTA) {
73            (None, None) => None,
74            (Some(a), None) => Some(a.clone()),
75            (None, Some(b)) => Some(b.clone()),
76            (Some(a), Some(b)) => Some((a).$OP(b).clone()),
77        })
78    }};
79}
80
81// min/max of two scalar string values with a prefix argument.
82macro_rules! typed_min_max_string_arg {
83    ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident, $ARG:expr) => {{
84        ScalarValue::$SCALAR(
85            $ARG,
86            match ($VALUE, $DELTA) {
87                (None, None) => None,
88                (Some(a), None) => Some(a.clone()),
89                (None, Some(b)) => Some(b.clone()),
90                (Some(a), Some(b)) => Some((a).$OP(b).clone()),
91            },
92        )
93    }};
94}
95
96macro_rules! choose_min_max {
97    (min) => {
98        std::cmp::Ordering::Greater
99    };
100    (max) => {
101        std::cmp::Ordering::Less
102    };
103}
104
105macro_rules! interval_min_max {
106    ($OP:tt, $LHS:expr, $RHS:expr) => {{
107        match $LHS.partial_cmp(&$RHS) {
108            Some(choose_min_max!($OP)) => $RHS.clone(),
109            Some(_) => $LHS.clone(),
110            None => {
111                return internal_err!(
112                    "Comparison error while computing interval min/max"
113                );
114            }
115        }
116    }};
117}
118
119macro_rules! min_max_generic {
120    ($VALUE:expr, $DELTA:expr, $OP:ident) => {{
121        if $VALUE.is_null() {
122            let mut delta_copy = $DELTA.clone();
123            // When the new value won we want to compact it to
124            // avoid storing the entire input
125            delta_copy.compact();
126            delta_copy
127        } else if $DELTA.is_null() {
128            $VALUE.clone()
129        } else {
130            match $VALUE.partial_cmp(&$DELTA) {
131                Some(choose_min_max!($OP)) => {
132                    // When the new value won we want to compact it to
133                    // avoid storing the entire input
134                    let mut delta_copy = $DELTA.clone();
135                    delta_copy.compact();
136                    delta_copy
137                }
138                _ => $VALUE.clone(),
139            }
140        }
141    }};
142}
143
144// min/max of two scalar values of the same type
145macro_rules! min_max {
146    ($VALUE:expr, $DELTA:expr, $OP:ident) => {{
147        Ok(match ($VALUE, $DELTA) {
148            (ScalarValue::Null, ScalarValue::Null) => ScalarValue::Null,
149            (
150                lhs @ ScalarValue::Decimal32(lhsv, lhsp, lhss),
151                rhs @ ScalarValue::Decimal32(rhsv, rhsp, rhss)
152            ) => {
153                if lhsp.eq(rhsp) && lhss.eq(rhss) {
154                    typed_min_max!(lhsv, rhsv, Decimal32, $OP, lhsp, lhss)
155                } else {
156                    return internal_err!(
157                    "MIN/MAX is not expected to receive scalars of incompatible types {:?}",
158                    (lhs, rhs)
159                );
160                }
161            }
162            (
163                lhs @ ScalarValue::Decimal64(lhsv, lhsp, lhss),
164                rhs @ ScalarValue::Decimal64(rhsv, rhsp, rhss)
165            ) => {
166                if lhsp.eq(rhsp) && lhss.eq(rhss) {
167                    typed_min_max!(lhsv, rhsv, Decimal64, $OP, lhsp, lhss)
168                } else {
169                    return internal_err!(
170                    "MIN/MAX is not expected to receive scalars of incompatible types {:?}",
171                    (lhs, rhs)
172                );
173                }
174            }
175            (
176                lhs @ ScalarValue::Decimal128(lhsv, lhsp, lhss),
177                rhs @ ScalarValue::Decimal128(rhsv, rhsp, rhss)
178            ) => {
179                if lhsp.eq(rhsp) && lhss.eq(rhss) {
180                    typed_min_max!(lhsv, rhsv, Decimal128, $OP, lhsp, lhss)
181                } else {
182                    return internal_err!(
183                    "MIN/MAX is not expected to receive scalars of incompatible types {:?}",
184                    (lhs, rhs)
185                );
186                }
187            }
188            (
189                lhs @ ScalarValue::Decimal256(lhsv, lhsp, lhss),
190                rhs @ ScalarValue::Decimal256(rhsv, rhsp, rhss)
191            ) => {
192                if lhsp.eq(rhsp) && lhss.eq(rhss) {
193                    typed_min_max!(lhsv, rhsv, Decimal256, $OP, lhsp, lhss)
194                } else {
195                    return internal_err!(
196                    "MIN/MAX is not expected to receive scalars of incompatible types {:?}",
197                    (lhs, rhs)
198                );
199                }
200            }
201            (ScalarValue::Boolean(lhs), ScalarValue::Boolean(rhs)) => {
202                typed_min_max!(lhs, rhs, Boolean, $OP)
203            }
204            (ScalarValue::Float64(lhs), ScalarValue::Float64(rhs)) => {
205                typed_min_max_float!(lhs, rhs, Float64, $OP)
206            }
207            (ScalarValue::Float32(lhs), ScalarValue::Float32(rhs)) => {
208                typed_min_max_float!(lhs, rhs, Float32, $OP)
209            }
210            (ScalarValue::Float16(lhs), ScalarValue::Float16(rhs)) => {
211                typed_min_max_float!(lhs, rhs, Float16, $OP)
212            }
213            (ScalarValue::UInt64(lhs), ScalarValue::UInt64(rhs)) => {
214                typed_min_max!(lhs, rhs, UInt64, $OP)
215            }
216            (ScalarValue::UInt32(lhs), ScalarValue::UInt32(rhs)) => {
217                typed_min_max!(lhs, rhs, UInt32, $OP)
218            }
219            (ScalarValue::UInt16(lhs), ScalarValue::UInt16(rhs)) => {
220                typed_min_max!(lhs, rhs, UInt16, $OP)
221            }
222            (ScalarValue::UInt8(lhs), ScalarValue::UInt8(rhs)) => {
223                typed_min_max!(lhs, rhs, UInt8, $OP)
224            }
225            (ScalarValue::Int64(lhs), ScalarValue::Int64(rhs)) => {
226                typed_min_max!(lhs, rhs, Int64, $OP)
227            }
228            (ScalarValue::Int32(lhs), ScalarValue::Int32(rhs)) => {
229                typed_min_max!(lhs, rhs, Int32, $OP)
230            }
231            (ScalarValue::Int16(lhs), ScalarValue::Int16(rhs)) => {
232                typed_min_max!(lhs, rhs, Int16, $OP)
233            }
234            (ScalarValue::Int8(lhs), ScalarValue::Int8(rhs)) => {
235                typed_min_max!(lhs, rhs, Int8, $OP)
236            }
237            (ScalarValue::Utf8(lhs), ScalarValue::Utf8(rhs)) => {
238                typed_min_max_string!(lhs, rhs, Utf8, $OP)
239            }
240            (ScalarValue::LargeUtf8(lhs), ScalarValue::LargeUtf8(rhs)) => {
241                typed_min_max_string!(lhs, rhs, LargeUtf8, $OP)
242            }
243            (ScalarValue::Utf8View(lhs), ScalarValue::Utf8View(rhs)) => {
244                typed_min_max_string!(lhs, rhs, Utf8View, $OP)
245            }
246            (ScalarValue::Binary(lhs), ScalarValue::Binary(rhs)) => {
247                typed_min_max_string!(lhs, rhs, Binary, $OP)
248            }
249            (ScalarValue::LargeBinary(lhs), ScalarValue::LargeBinary(rhs)) => {
250                typed_min_max_string!(lhs, rhs, LargeBinary, $OP)
251            }
252            (ScalarValue::FixedSizeBinary(lsize, lhs), ScalarValue::FixedSizeBinary(rsize, rhs)) => {
253                if lsize == rsize {
254                    typed_min_max_string_arg!(lhs, rhs, FixedSizeBinary, $OP, *lsize)
255                }
256                else {
257                    return internal_err!(
258                        "MIN/MAX is not expected to receive FixedSizeBinary of incompatible sizes {:?}",
259                        (lsize, rsize))
260                }
261            }
262            (ScalarValue::BinaryView(lhs), ScalarValue::BinaryView(rhs)) => {
263                typed_min_max_string!(lhs, rhs, BinaryView, $OP)
264            }
265            (ScalarValue::TimestampSecond(lhs, l_tz), ScalarValue::TimestampSecond(rhs, _)) => {
266                typed_min_max!(lhs, rhs, TimestampSecond, $OP, l_tz)
267            }
268            (
269                ScalarValue::TimestampMillisecond(lhs, l_tz),
270                ScalarValue::TimestampMillisecond(rhs, _),
271            ) => {
272                typed_min_max!(lhs, rhs, TimestampMillisecond, $OP, l_tz)
273            }
274            (
275                ScalarValue::TimestampMicrosecond(lhs, l_tz),
276                ScalarValue::TimestampMicrosecond(rhs, _),
277            ) => {
278                typed_min_max!(lhs, rhs, TimestampMicrosecond, $OP, l_tz)
279            }
280            (
281                ScalarValue::TimestampNanosecond(lhs, l_tz),
282                ScalarValue::TimestampNanosecond(rhs, _),
283            ) => {
284                typed_min_max!(lhs, rhs, TimestampNanosecond, $OP, l_tz)
285            }
286            (
287                ScalarValue::Date32(lhs),
288                ScalarValue::Date32(rhs),
289            ) => {
290                typed_min_max!(lhs, rhs, Date32, $OP)
291            }
292            (
293                ScalarValue::Date64(lhs),
294                ScalarValue::Date64(rhs),
295            ) => {
296                typed_min_max!(lhs, rhs, Date64, $OP)
297            }
298            (
299                ScalarValue::Time32Second(lhs),
300                ScalarValue::Time32Second(rhs),
301            ) => {
302                typed_min_max!(lhs, rhs, Time32Second, $OP)
303            }
304            (
305                ScalarValue::Time32Millisecond(lhs),
306                ScalarValue::Time32Millisecond(rhs),
307            ) => {
308                typed_min_max!(lhs, rhs, Time32Millisecond, $OP)
309            }
310            (
311                ScalarValue::Time64Microsecond(lhs),
312                ScalarValue::Time64Microsecond(rhs),
313            ) => {
314                typed_min_max!(lhs, rhs, Time64Microsecond, $OP)
315            }
316            (
317                ScalarValue::Time64Nanosecond(lhs),
318                ScalarValue::Time64Nanosecond(rhs),
319            ) => {
320                typed_min_max!(lhs, rhs, Time64Nanosecond, $OP)
321            }
322            (
323                ScalarValue::IntervalYearMonth(lhs),
324                ScalarValue::IntervalYearMonth(rhs),
325            ) => {
326                typed_min_max!(lhs, rhs, IntervalYearMonth, $OP)
327            }
328            (
329                ScalarValue::IntervalMonthDayNano(lhs),
330                ScalarValue::IntervalMonthDayNano(rhs),
331            ) => {
332                typed_min_max!(lhs, rhs, IntervalMonthDayNano, $OP)
333            }
334            (
335                ScalarValue::IntervalDayTime(lhs),
336                ScalarValue::IntervalDayTime(rhs),
337            ) => {
338                typed_min_max!(lhs, rhs, IntervalDayTime, $OP)
339            }
340            (
341                ScalarValue::IntervalYearMonth(_),
342                ScalarValue::IntervalMonthDayNano(_),
343            ) | (
344                ScalarValue::IntervalYearMonth(_),
345                ScalarValue::IntervalDayTime(_),
346            ) | (
347                ScalarValue::IntervalMonthDayNano(_),
348                ScalarValue::IntervalDayTime(_),
349            ) | (
350                ScalarValue::IntervalMonthDayNano(_),
351                ScalarValue::IntervalYearMonth(_),
352            ) | (
353                ScalarValue::IntervalDayTime(_),
354                ScalarValue::IntervalYearMonth(_),
355            ) | (
356                ScalarValue::IntervalDayTime(_),
357                ScalarValue::IntervalMonthDayNano(_),
358            ) => {
359                interval_min_max!($OP, $VALUE, $DELTA)
360            }
361                    (
362                ScalarValue::DurationSecond(lhs),
363                ScalarValue::DurationSecond(rhs),
364            ) => {
365                typed_min_max!(lhs, rhs, DurationSecond, $OP)
366            }
367                                (
368                ScalarValue::DurationMillisecond(lhs),
369                ScalarValue::DurationMillisecond(rhs),
370            ) => {
371                typed_min_max!(lhs, rhs, DurationMillisecond, $OP)
372            }
373                                (
374                ScalarValue::DurationMicrosecond(lhs),
375                ScalarValue::DurationMicrosecond(rhs),
376            ) => {
377                typed_min_max!(lhs, rhs, DurationMicrosecond, $OP)
378            }
379                                        (
380                ScalarValue::DurationNanosecond(lhs),
381                ScalarValue::DurationNanosecond(rhs),
382            ) => {
383                typed_min_max!(lhs, rhs, DurationNanosecond, $OP)
384            }
385
386            (
387                lhs @ ScalarValue::Struct(_),
388                rhs @ ScalarValue::Struct(_),
389            ) => {
390                min_max_generic!(lhs, rhs, $OP)
391            }
392
393            (
394                lhs @ ScalarValue::List(_),
395                rhs @ ScalarValue::List(_),
396            ) => {
397                min_max_generic!(lhs, rhs, $OP)
398            }
399
400
401            (
402                lhs @ ScalarValue::LargeList(_),
403                rhs @ ScalarValue::LargeList(_),
404            ) => {
405                min_max_generic!(lhs, rhs, $OP)
406            }
407
408
409            (
410                lhs @ ScalarValue::FixedSizeList(_),
411                rhs @ ScalarValue::FixedSizeList(_),
412            ) => {
413                min_max_generic!(lhs, rhs, $OP)
414            }
415
416            e => {
417                return internal_err!(
418                    "MIN/MAX is not expected to receive scalars of incompatible types {:?}",
419                    e
420                )
421            }
422        })
423    }};
424}
425
426/// An accumulator to compute the maximum value
427#[derive(Debug, Clone)]
428pub struct MaxAccumulator {
429    max: ScalarValue,
430}
431
432impl MaxAccumulator {
433    /// new max accumulator
434    pub fn try_new(datatype: &DataType) -> Result<Self> {
435        Ok(Self {
436            max: ScalarValue::try_from(datatype)?,
437        })
438    }
439}
440
441impl Accumulator for MaxAccumulator {
442    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
443        let values = &values[0];
444        let delta = &max_batch(values)?;
445        let new_max: Result<ScalarValue, DataFusionError> =
446            min_max!(&self.max, delta, max);
447        self.max = new_max?;
448        Ok(())
449    }
450
451    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
452        self.update_batch(states)
453    }
454
455    fn state(&mut self) -> Result<Vec<ScalarValue>> {
456        Ok(vec![self.evaluate()?])
457    }
458    fn evaluate(&mut self) -> Result<ScalarValue> {
459        Ok(self.max.clone())
460    }
461
462    fn size(&self) -> usize {
463        size_of_val(self) - size_of_val(&self.max) + self.max.size()
464    }
465}
466
467/// An accumulator to compute the minimum value
468#[derive(Debug, Clone)]
469pub struct MinAccumulator {
470    min: ScalarValue,
471}
472
473impl MinAccumulator {
474    /// new min accumulator
475    pub fn try_new(datatype: &DataType) -> Result<Self> {
476        Ok(Self {
477            min: ScalarValue::try_from(datatype)?,
478        })
479    }
480}
481
482impl Accumulator for MinAccumulator {
483    fn state(&mut self) -> Result<Vec<ScalarValue>> {
484        Ok(vec![self.evaluate()?])
485    }
486
487    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
488        let values = &values[0];
489        let delta = &min_batch(values)?;
490        let new_min: Result<ScalarValue, DataFusionError> =
491            min_max!(&self.min, delta, min);
492        self.min = new_min?;
493        Ok(())
494    }
495
496    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
497        self.update_batch(states)
498    }
499
500    fn evaluate(&mut self) -> Result<ScalarValue> {
501        Ok(self.min.clone())
502    }
503
504    fn size(&self) -> usize {
505        size_of_val(self) - size_of_val(&self.min) + self.min.size()
506    }
507}
508
509// Statically-typed version of min/max(array) -> ScalarValue for string types
510macro_rules! typed_min_max_batch_string {
511    ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
512        let array = downcast_value!($VALUES, $ARRAYTYPE);
513        let value = compute::$OP(array);
514        let value = value.and_then(|e| Some(e.to_string()));
515        ScalarValue::$SCALAR(value)
516    }};
517}
518
519// Statically-typed version of min/max(array) -> ScalarValue for binary types.
520macro_rules! typed_min_max_batch_binary {
521    ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
522        let array = downcast_value!($VALUES, $ARRAYTYPE);
523        let value = compute::$OP(array);
524        let value = value.and_then(|e| Some(e.to_vec()));
525        ScalarValue::$SCALAR(value)
526    }};
527}
528
529// Statically-typed version of min/max(array) -> ScalarValue for non-string types.
530macro_rules! typed_min_max_batch {
531    ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident $(, $EXTRA_ARGS:ident)*) => {{
532        let array = downcast_value!($VALUES, $ARRAYTYPE);
533        let value = compute::$OP(array);
534        ScalarValue::$SCALAR(value, $($EXTRA_ARGS.clone()),*)
535    }};
536}
537
538// Statically-typed version of min/max(array) -> ScalarValue  for non-string types.
539// this is a macro to support both operations (min and max).
540macro_rules! min_max_batch {
541    ($VALUES:expr, $OP:ident) => {{
542        match $VALUES.data_type() {
543            DataType::Null => ScalarValue::Null,
544            DataType::Decimal32(precision, scale) => {
545                typed_min_max_batch!(
546                    $VALUES,
547                    Decimal32Array,
548                    Decimal32,
549                    $OP,
550                    precision,
551                    scale
552                )
553            }
554            DataType::Decimal64(precision, scale) => {
555                typed_min_max_batch!(
556                    $VALUES,
557                    Decimal64Array,
558                    Decimal64,
559                    $OP,
560                    precision,
561                    scale
562                )
563            }
564            DataType::Decimal128(precision, scale) => {
565                typed_min_max_batch!(
566                    $VALUES,
567                    Decimal128Array,
568                    Decimal128,
569                    $OP,
570                    precision,
571                    scale
572                )
573            }
574            DataType::Decimal256(precision, scale) => {
575                typed_min_max_batch!(
576                    $VALUES,
577                    Decimal256Array,
578                    Decimal256,
579                    $OP,
580                    precision,
581                    scale
582                )
583            }
584            // all types that have a natural order
585            DataType::Float64 => {
586                typed_min_max_batch!($VALUES, Float64Array, Float64, $OP)
587            }
588            DataType::Float32 => {
589                typed_min_max_batch!($VALUES, Float32Array, Float32, $OP)
590            }
591            DataType::Float16 => {
592                typed_min_max_batch!($VALUES, Float16Array, Float16, $OP)
593            }
594            DataType::Int64 => typed_min_max_batch!($VALUES, Int64Array, Int64, $OP),
595            DataType::Int32 => typed_min_max_batch!($VALUES, Int32Array, Int32, $OP),
596            DataType::Int16 => typed_min_max_batch!($VALUES, Int16Array, Int16, $OP),
597            DataType::Int8 => typed_min_max_batch!($VALUES, Int8Array, Int8, $OP),
598            DataType::UInt64 => typed_min_max_batch!($VALUES, UInt64Array, UInt64, $OP),
599            DataType::UInt32 => typed_min_max_batch!($VALUES, UInt32Array, UInt32, $OP),
600            DataType::UInt16 => typed_min_max_batch!($VALUES, UInt16Array, UInt16, $OP),
601            DataType::UInt8 => typed_min_max_batch!($VALUES, UInt8Array, UInt8, $OP),
602            DataType::Timestamp(TimeUnit::Second, tz_opt) => {
603                typed_min_max_batch!(
604                    $VALUES,
605                    TimestampSecondArray,
606                    TimestampSecond,
607                    $OP,
608                    tz_opt
609                )
610            }
611            DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => typed_min_max_batch!(
612                $VALUES,
613                TimestampMillisecondArray,
614                TimestampMillisecond,
615                $OP,
616                tz_opt
617            ),
618            DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => typed_min_max_batch!(
619                $VALUES,
620                TimestampMicrosecondArray,
621                TimestampMicrosecond,
622                $OP,
623                tz_opt
624            ),
625            DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => typed_min_max_batch!(
626                $VALUES,
627                TimestampNanosecondArray,
628                TimestampNanosecond,
629                $OP,
630                tz_opt
631            ),
632            DataType::Date32 => typed_min_max_batch!($VALUES, Date32Array, Date32, $OP),
633            DataType::Date64 => typed_min_max_batch!($VALUES, Date64Array, Date64, $OP),
634            DataType::Time32(TimeUnit::Second) => {
635                typed_min_max_batch!($VALUES, Time32SecondArray, Time32Second, $OP)
636            }
637            DataType::Time32(TimeUnit::Millisecond) => {
638                typed_min_max_batch!(
639                    $VALUES,
640                    Time32MillisecondArray,
641                    Time32Millisecond,
642                    $OP
643                )
644            }
645            DataType::Time64(TimeUnit::Microsecond) => {
646                typed_min_max_batch!(
647                    $VALUES,
648                    Time64MicrosecondArray,
649                    Time64Microsecond,
650                    $OP
651                )
652            }
653            DataType::Time64(TimeUnit::Nanosecond) => {
654                typed_min_max_batch!(
655                    $VALUES,
656                    Time64NanosecondArray,
657                    Time64Nanosecond,
658                    $OP
659                )
660            }
661            DataType::Interval(IntervalUnit::YearMonth) => {
662                typed_min_max_batch!(
663                    $VALUES,
664                    IntervalYearMonthArray,
665                    IntervalYearMonth,
666                    $OP
667                )
668            }
669            DataType::Interval(IntervalUnit::DayTime) => {
670                typed_min_max_batch!($VALUES, IntervalDayTimeArray, IntervalDayTime, $OP)
671            }
672            DataType::Interval(IntervalUnit::MonthDayNano) => {
673                typed_min_max_batch!(
674                    $VALUES,
675                    IntervalMonthDayNanoArray,
676                    IntervalMonthDayNano,
677                    $OP
678                )
679            }
680            DataType::Duration(TimeUnit::Second) => {
681                typed_min_max_batch!($VALUES, DurationSecondArray, DurationSecond, $OP)
682            }
683            DataType::Duration(TimeUnit::Millisecond) => {
684                typed_min_max_batch!(
685                    $VALUES,
686                    DurationMillisecondArray,
687                    DurationMillisecond,
688                    $OP
689                )
690            }
691            DataType::Duration(TimeUnit::Microsecond) => {
692                typed_min_max_batch!(
693                    $VALUES,
694                    DurationMicrosecondArray,
695                    DurationMicrosecond,
696                    $OP
697                )
698            }
699            DataType::Duration(TimeUnit::Nanosecond) => {
700                typed_min_max_batch!(
701                    $VALUES,
702                    DurationNanosecondArray,
703                    DurationNanosecond,
704                    $OP
705                )
706            }
707            other => {
708                // This should have been handled before
709                return datafusion_common::internal_err!(
710                    "Min/Max accumulator not implemented for type {}",
711                    other
712                );
713            }
714        }
715    }};
716}
717
718/// dynamically-typed min(array) -> ScalarValue
719pub fn min_batch(values: &ArrayRef) -> Result<ScalarValue> {
720    Ok(match values.data_type() {
721        DataType::Utf8 => {
722            typed_min_max_batch_string!(values, StringArray, Utf8, min_string)
723        }
724        DataType::LargeUtf8 => {
725            typed_min_max_batch_string!(values, LargeStringArray, LargeUtf8, min_string)
726        }
727        DataType::Utf8View => {
728            typed_min_max_batch_string!(
729                values,
730                StringViewArray,
731                Utf8View,
732                min_string_view
733            )
734        }
735        DataType::Boolean => {
736            typed_min_max_batch!(values, BooleanArray, Boolean, min_boolean)
737        }
738        DataType::Binary => {
739            typed_min_max_batch_binary!(&values, BinaryArray, Binary, min_binary)
740        }
741        DataType::LargeBinary => {
742            typed_min_max_batch_binary!(
743                &values,
744                LargeBinaryArray,
745                LargeBinary,
746                min_binary
747            )
748        }
749        DataType::FixedSizeBinary(size) => {
750            let array = downcast_value!(&values, FixedSizeBinaryArray);
751            let value = compute::min_fixed_size_binary(array);
752            let value = value.map(|e| e.to_vec());
753            ScalarValue::FixedSizeBinary(*size, value)
754        }
755        DataType::BinaryView => {
756            typed_min_max_batch_binary!(
757                &values,
758                BinaryViewArray,
759                BinaryView,
760                min_binary_view
761            )
762        }
763        DataType::Struct(_) => min_max_batch_generic(values, Ordering::Greater)?,
764        DataType::List(_) => min_max_batch_generic(values, Ordering::Greater)?,
765        DataType::LargeList(_) => min_max_batch_generic(values, Ordering::Greater)?,
766        DataType::FixedSizeList(_, _) => {
767            min_max_batch_generic(values, Ordering::Greater)?
768        }
769        DataType::Dictionary(_, _) => {
770            let values = values.as_any_dictionary().values();
771            min_batch(values)?
772        }
773        _ => min_max_batch!(values, min),
774    })
775}
776
777/// Generic min/max implementation for complex types
778fn min_max_batch_generic(array: &ArrayRef, ordering: Ordering) -> Result<ScalarValue> {
779    if array.len() == array.null_count() {
780        return ScalarValue::try_from(array.data_type());
781    }
782    let mut extreme = ScalarValue::try_from_array(array, 0)?;
783    for i in 1..array.len() {
784        let current = ScalarValue::try_from_array(array, i)?;
785        if current.is_null() {
786            continue;
787        }
788        if extreme.is_null() {
789            extreme = current;
790            continue;
791        }
792        let cmp = extreme.try_cmp(&current)?;
793        if cmp == ordering {
794            extreme = current;
795        }
796    }
797
798    Ok(extreme)
799}
800
801/// dynamically-typed max(array) -> ScalarValue
802pub fn max_batch(values: &ArrayRef) -> Result<ScalarValue> {
803    Ok(match values.data_type() {
804        DataType::Utf8 => {
805            typed_min_max_batch_string!(values, StringArray, Utf8, max_string)
806        }
807        DataType::LargeUtf8 => {
808            typed_min_max_batch_string!(values, LargeStringArray, LargeUtf8, max_string)
809        }
810        DataType::Utf8View => {
811            typed_min_max_batch_string!(
812                values,
813                StringViewArray,
814                Utf8View,
815                max_string_view
816            )
817        }
818        DataType::Boolean => {
819            typed_min_max_batch!(values, BooleanArray, Boolean, max_boolean)
820        }
821        DataType::Binary => {
822            typed_min_max_batch_binary!(&values, BinaryArray, Binary, max_binary)
823        }
824        DataType::BinaryView => {
825            typed_min_max_batch_binary!(
826                &values,
827                BinaryViewArray,
828                BinaryView,
829                max_binary_view
830            )
831        }
832        DataType::LargeBinary => {
833            typed_min_max_batch_binary!(
834                &values,
835                LargeBinaryArray,
836                LargeBinary,
837                max_binary
838            )
839        }
840        DataType::FixedSizeBinary(size) => {
841            let array = downcast_value!(&values, FixedSizeBinaryArray);
842            let value = compute::max_fixed_size_binary(array);
843            let value = value.map(|e| e.to_vec());
844            ScalarValue::FixedSizeBinary(*size, value)
845        }
846        DataType::Struct(_) => min_max_batch_generic(values, Ordering::Less)?,
847        DataType::List(_) => min_max_batch_generic(values, Ordering::Less)?,
848        DataType::LargeList(_) => min_max_batch_generic(values, Ordering::Less)?,
849        DataType::FixedSizeList(_, _) => min_max_batch_generic(values, Ordering::Less)?,
850        DataType::Dictionary(_, _) => {
851            let values = values.as_any_dictionary().values();
852            max_batch(values)?
853        }
854        _ => min_max_batch!(values, max),
855    })
856}