datafusion_functions_aggregate/
min_max.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`Max`] and [`MaxAccumulator`] accumulator for the `max` function
19//! [`Min`] and [`MinAccumulator`] accumulator for the `min` function
20
21mod min_max_bytes;
22mod min_max_struct;
23
24use arrow::array::{
25    ArrayRef, AsArray as _, BinaryArray, BinaryViewArray, BooleanArray, Date32Array,
26    Date64Array, Decimal128Array, Decimal256Array, DurationMicrosecondArray,
27    DurationMillisecondArray, DurationNanosecondArray, DurationSecondArray, Float16Array,
28    Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
29    IntervalDayTimeArray, IntervalMonthDayNanoArray, IntervalYearMonthArray,
30    LargeBinaryArray, LargeStringArray, StringArray, StringViewArray,
31    Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
32    Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
33    TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array,
34    UInt64Array, UInt8Array,
35};
36use arrow::compute;
37use arrow::datatypes::{
38    DataType, Decimal128Type, Decimal256Type, DurationMicrosecondType,
39    DurationMillisecondType, DurationNanosecondType, DurationSecondType, Float16Type,
40    Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, IntervalUnit,
41    UInt16Type, UInt32Type, UInt64Type, UInt8Type,
42};
43use datafusion_common::stats::Precision;
44use datafusion_common::{
45    downcast_value, exec_err, internal_err, ColumnStatistics, DataFusionError, Result,
46};
47use datafusion_functions_aggregate_common::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator;
48use datafusion_physical_expr::expressions;
49use std::cmp::Ordering;
50use std::fmt::Debug;
51
52use arrow::datatypes::i256;
53use arrow::datatypes::{
54    Date32Type, Date64Type, Time32MillisecondType, Time32SecondType,
55    Time64MicrosecondType, Time64NanosecondType, TimeUnit, TimestampMicrosecondType,
56    TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
57};
58
59use crate::min_max::min_max_bytes::MinMaxBytesAccumulator;
60use crate::min_max::min_max_struct::MinMaxStructAccumulator;
61use datafusion_common::ScalarValue;
62use datafusion_expr::{
63    function::AccumulatorArgs, Accumulator, AggregateUDFImpl, Documentation,
64    SetMonotonicity, Signature, Volatility,
65};
66use datafusion_expr::{GroupsAccumulator, StatisticsArgs};
67use datafusion_macros::user_doc;
68use half::f16;
69use std::mem::size_of_val;
70use std::ops::Deref;
71
72fn get_min_max_result_type(input_types: &[DataType]) -> Result<Vec<DataType>> {
73    // make sure that the input types only has one element.
74    if input_types.len() != 1 {
75        return exec_err!(
76            "min/max was called with {} arguments. It requires only 1.",
77            input_types.len()
78        );
79    }
80    // min and max support the dictionary data type
81    // unpack the dictionary to get the value
82    match &input_types[0] {
83        DataType::Dictionary(_, dict_value_type) => {
84            // TODO add checker, if the value type is complex data type
85            Ok(vec![dict_value_type.deref().clone()])
86        }
87        // TODO add checker for datatype which min and max supported
88        // For example, the `Struct` and `Map` type are not supported in the MIN and MAX function
89        _ => Ok(input_types.to_vec()),
90    }
91}
92
93#[user_doc(
94    doc_section(label = "General Functions"),
95    description = "Returns the maximum value in the specified column.",
96    syntax_example = "max(expression)",
97    sql_example = r#"```sql
98> SELECT max(column_name) FROM table_name;
99+----------------------+
100| max(column_name)      |
101+----------------------+
102| 150                  |
103+----------------------+
104```"#,
105    standard_argument(name = "expression",)
106)]
107// MAX aggregate UDF
108#[derive(Debug)]
109pub struct Max {
110    signature: Signature,
111}
112
113impl Max {
114    pub fn new() -> Self {
115        Self {
116            signature: Signature::user_defined(Volatility::Immutable),
117        }
118    }
119}
120
121impl Default for Max {
122    fn default() -> Self {
123        Self::new()
124    }
125}
126/// Creates a [`PrimitiveGroupsAccumulator`] for computing `MAX`
127/// the specified [`ArrowPrimitiveType`].
128///
129/// [`ArrowPrimitiveType`]: arrow::datatypes::ArrowPrimitiveType
130macro_rules! primitive_max_accumulator {
131    ($DATA_TYPE:ident, $NATIVE:ident, $PRIMTYPE:ident) => {{
132        Ok(Box::new(
133            PrimitiveGroupsAccumulator::<$PRIMTYPE, _>::new($DATA_TYPE, |cur, new| {
134                match (new).partial_cmp(cur) {
135                    Some(Ordering::Greater) | None => {
136                        // new is Greater or None
137                        *cur = new
138                    }
139                    _ => {}
140                }
141            })
142            // Initialize each accumulator to $NATIVE::MIN
143            .with_starting_value($NATIVE::MIN),
144        ))
145    }};
146}
147
148/// Creates a [`PrimitiveGroupsAccumulator`] for computing `MIN`
149/// the specified [`ArrowPrimitiveType`].
150///
151///
152/// [`ArrowPrimitiveType`]: arrow::datatypes::ArrowPrimitiveType
153macro_rules! primitive_min_accumulator {
154    ($DATA_TYPE:ident, $NATIVE:ident, $PRIMTYPE:ident) => {{
155        Ok(Box::new(
156            PrimitiveGroupsAccumulator::<$PRIMTYPE, _>::new(&$DATA_TYPE, |cur, new| {
157                match (new).partial_cmp(cur) {
158                    Some(Ordering::Less) | None => {
159                        // new is Less or NaN
160                        *cur = new
161                    }
162                    _ => {}
163                }
164            })
165            // Initialize each accumulator to $NATIVE::MAX
166            .with_starting_value($NATIVE::MAX),
167        ))
168    }};
169}
170
171trait FromColumnStatistics {
172    fn value_from_column_statistics(
173        &self,
174        stats: &ColumnStatistics,
175    ) -> Option<ScalarValue>;
176
177    fn value_from_statistics(
178        &self,
179        statistics_args: &StatisticsArgs,
180    ) -> Option<ScalarValue> {
181        if let Precision::Exact(num_rows) = &statistics_args.statistics.num_rows {
182            match *num_rows {
183                0 => return ScalarValue::try_from(statistics_args.return_type).ok(),
184                value if value > 0 => {
185                    let col_stats = &statistics_args.statistics.column_statistics;
186                    if statistics_args.exprs.len() == 1 {
187                        // TODO optimize with exprs other than Column
188                        if let Some(col_expr) = statistics_args.exprs[0]
189                            .as_any()
190                            .downcast_ref::<expressions::Column>()
191                        {
192                            return self.value_from_column_statistics(
193                                &col_stats[col_expr.index()],
194                            );
195                        }
196                    }
197                }
198                _ => {}
199            }
200        }
201        None
202    }
203}
204
205impl FromColumnStatistics for Max {
206    fn value_from_column_statistics(
207        &self,
208        col_stats: &ColumnStatistics,
209    ) -> Option<ScalarValue> {
210        if let Precision::Exact(ref val) = col_stats.max_value {
211            if !val.is_null() {
212                return Some(val.clone());
213            }
214        }
215        None
216    }
217}
218
219impl AggregateUDFImpl for Max {
220    fn as_any(&self) -> &dyn std::any::Any {
221        self
222    }
223
224    fn name(&self) -> &str {
225        "max"
226    }
227
228    fn signature(&self) -> &Signature {
229        &self.signature
230    }
231
232    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
233        Ok(arg_types[0].to_owned())
234    }
235
236    fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
237        Ok(Box::new(MaxAccumulator::try_new(
238            acc_args.return_field.data_type(),
239        )?))
240    }
241
242    fn aliases(&self) -> &[String] {
243        &[]
244    }
245
246    fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool {
247        use DataType::*;
248        matches!(
249            args.return_field.data_type(),
250            Int8 | Int16
251                | Int32
252                | Int64
253                | UInt8
254                | UInt16
255                | UInt32
256                | UInt64
257                | Float16
258                | Float32
259                | Float64
260                | Decimal128(_, _)
261                | Decimal256(_, _)
262                | Date32
263                | Date64
264                | Time32(_)
265                | Time64(_)
266                | Timestamp(_, _)
267                | Utf8
268                | LargeUtf8
269                | Utf8View
270                | Binary
271                | LargeBinary
272                | BinaryView
273                | Duration(_)
274                | Struct(_)
275        )
276    }
277
278    fn create_groups_accumulator(
279        &self,
280        args: AccumulatorArgs,
281    ) -> Result<Box<dyn GroupsAccumulator>> {
282        use DataType::*;
283        use TimeUnit::*;
284        let data_type = args.return_field.data_type();
285        match data_type {
286            Int8 => primitive_max_accumulator!(data_type, i8, Int8Type),
287            Int16 => primitive_max_accumulator!(data_type, i16, Int16Type),
288            Int32 => primitive_max_accumulator!(data_type, i32, Int32Type),
289            Int64 => primitive_max_accumulator!(data_type, i64, Int64Type),
290            UInt8 => primitive_max_accumulator!(data_type, u8, UInt8Type),
291            UInt16 => primitive_max_accumulator!(data_type, u16, UInt16Type),
292            UInt32 => primitive_max_accumulator!(data_type, u32, UInt32Type),
293            UInt64 => primitive_max_accumulator!(data_type, u64, UInt64Type),
294            Float16 => {
295                primitive_max_accumulator!(data_type, f16, Float16Type)
296            }
297            Float32 => {
298                primitive_max_accumulator!(data_type, f32, Float32Type)
299            }
300            Float64 => {
301                primitive_max_accumulator!(data_type, f64, Float64Type)
302            }
303            Date32 => primitive_max_accumulator!(data_type, i32, Date32Type),
304            Date64 => primitive_max_accumulator!(data_type, i64, Date64Type),
305            Time32(Second) => {
306                primitive_max_accumulator!(data_type, i32, Time32SecondType)
307            }
308            Time32(Millisecond) => {
309                primitive_max_accumulator!(data_type, i32, Time32MillisecondType)
310            }
311            Time64(Microsecond) => {
312                primitive_max_accumulator!(data_type, i64, Time64MicrosecondType)
313            }
314            Time64(Nanosecond) => {
315                primitive_max_accumulator!(data_type, i64, Time64NanosecondType)
316            }
317            Timestamp(Second, _) => {
318                primitive_max_accumulator!(data_type, i64, TimestampSecondType)
319            }
320            Timestamp(Millisecond, _) => {
321                primitive_max_accumulator!(data_type, i64, TimestampMillisecondType)
322            }
323            Timestamp(Microsecond, _) => {
324                primitive_max_accumulator!(data_type, i64, TimestampMicrosecondType)
325            }
326            Timestamp(Nanosecond, _) => {
327                primitive_max_accumulator!(data_type, i64, TimestampNanosecondType)
328            }
329            Duration(Second) => {
330                primitive_max_accumulator!(data_type, i64, DurationSecondType)
331            }
332            Duration(Millisecond) => {
333                primitive_max_accumulator!(data_type, i64, DurationMillisecondType)
334            }
335            Duration(Microsecond) => {
336                primitive_max_accumulator!(data_type, i64, DurationMicrosecondType)
337            }
338            Duration(Nanosecond) => {
339                primitive_max_accumulator!(data_type, i64, DurationNanosecondType)
340            }
341            Decimal128(_, _) => {
342                primitive_max_accumulator!(data_type, i128, Decimal128Type)
343            }
344            Decimal256(_, _) => {
345                primitive_max_accumulator!(data_type, i256, Decimal256Type)
346            }
347            Utf8 | LargeUtf8 | Utf8View | Binary | LargeBinary | BinaryView => {
348                Ok(Box::new(MinMaxBytesAccumulator::new_max(data_type.clone())))
349            }
350            Struct(_) => Ok(Box::new(MinMaxStructAccumulator::new_max(
351                data_type.clone(),
352            ))),
353            // This is only reached if groups_accumulator_supported is out of sync
354            _ => internal_err!("GroupsAccumulator not supported for max({})", data_type),
355        }
356    }
357
358    fn create_sliding_accumulator(
359        &self,
360        args: AccumulatorArgs,
361    ) -> Result<Box<dyn Accumulator>> {
362        Ok(Box::new(SlidingMaxAccumulator::try_new(
363            args.return_field.data_type(),
364        )?))
365    }
366
367    fn is_descending(&self) -> Option<bool> {
368        Some(true)
369    }
370
371    fn order_sensitivity(&self) -> datafusion_expr::utils::AggregateOrderSensitivity {
372        datafusion_expr::utils::AggregateOrderSensitivity::Insensitive
373    }
374
375    fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
376        get_min_max_result_type(arg_types)
377    }
378    fn reverse_expr(&self) -> datafusion_expr::ReversedUDAF {
379        datafusion_expr::ReversedUDAF::Identical
380    }
381    fn value_from_stats(&self, statistics_args: &StatisticsArgs) -> Option<ScalarValue> {
382        self.value_from_statistics(statistics_args)
383    }
384
385    fn documentation(&self) -> Option<&Documentation> {
386        self.doc()
387    }
388
389    fn set_monotonicity(&self, _data_type: &DataType) -> SetMonotonicity {
390        // `MAX` is monotonically increasing as it always increases or stays
391        // the same as new values are seen.
392        SetMonotonicity::Increasing
393    }
394}
395
396// Statically-typed version of min/max(array) -> ScalarValue for string types
397macro_rules! typed_min_max_batch_string {
398    ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
399        let array = downcast_value!($VALUES, $ARRAYTYPE);
400        let value = compute::$OP(array);
401        let value = value.and_then(|e| Some(e.to_string()));
402        ScalarValue::$SCALAR(value)
403    }};
404}
405// Statically-typed version of min/max(array) -> ScalarValue for binary types.
406macro_rules! typed_min_max_batch_binary {
407    ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
408        let array = downcast_value!($VALUES, $ARRAYTYPE);
409        let value = compute::$OP(array);
410        let value = value.and_then(|e| Some(e.to_vec()));
411        ScalarValue::$SCALAR(value)
412    }};
413}
414
415// Statically-typed version of min/max(array) -> ScalarValue for non-string types.
416macro_rules! typed_min_max_batch {
417    ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident $(, $EXTRA_ARGS:ident)*) => {{
418        let array = downcast_value!($VALUES, $ARRAYTYPE);
419        let value = compute::$OP(array);
420        ScalarValue::$SCALAR(value, $($EXTRA_ARGS.clone()),*)
421    }};
422}
423
424// Statically-typed version of min/max(array) -> ScalarValue  for non-string types.
425// this is a macro to support both operations (min and max).
426macro_rules! min_max_batch {
427    ($VALUES:expr, $OP:ident) => {{
428        match $VALUES.data_type() {
429            DataType::Null => ScalarValue::Null,
430            DataType::Decimal128(precision, scale) => {
431                typed_min_max_batch!(
432                    $VALUES,
433                    Decimal128Array,
434                    Decimal128,
435                    $OP,
436                    precision,
437                    scale
438                )
439            }
440            DataType::Decimal256(precision, scale) => {
441                typed_min_max_batch!(
442                    $VALUES,
443                    Decimal256Array,
444                    Decimal256,
445                    $OP,
446                    precision,
447                    scale
448                )
449            }
450            // all types that have a natural order
451            DataType::Float64 => {
452                typed_min_max_batch!($VALUES, Float64Array, Float64, $OP)
453            }
454            DataType::Float32 => {
455                typed_min_max_batch!($VALUES, Float32Array, Float32, $OP)
456            }
457            DataType::Float16 => {
458                typed_min_max_batch!($VALUES, Float16Array, Float16, $OP)
459            }
460            DataType::Int64 => typed_min_max_batch!($VALUES, Int64Array, Int64, $OP),
461            DataType::Int32 => typed_min_max_batch!($VALUES, Int32Array, Int32, $OP),
462            DataType::Int16 => typed_min_max_batch!($VALUES, Int16Array, Int16, $OP),
463            DataType::Int8 => typed_min_max_batch!($VALUES, Int8Array, Int8, $OP),
464            DataType::UInt64 => typed_min_max_batch!($VALUES, UInt64Array, UInt64, $OP),
465            DataType::UInt32 => typed_min_max_batch!($VALUES, UInt32Array, UInt32, $OP),
466            DataType::UInt16 => typed_min_max_batch!($VALUES, UInt16Array, UInt16, $OP),
467            DataType::UInt8 => typed_min_max_batch!($VALUES, UInt8Array, UInt8, $OP),
468            DataType::Timestamp(TimeUnit::Second, tz_opt) => {
469                typed_min_max_batch!(
470                    $VALUES,
471                    TimestampSecondArray,
472                    TimestampSecond,
473                    $OP,
474                    tz_opt
475                )
476            }
477            DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => typed_min_max_batch!(
478                $VALUES,
479                TimestampMillisecondArray,
480                TimestampMillisecond,
481                $OP,
482                tz_opt
483            ),
484            DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => typed_min_max_batch!(
485                $VALUES,
486                TimestampMicrosecondArray,
487                TimestampMicrosecond,
488                $OP,
489                tz_opt
490            ),
491            DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => typed_min_max_batch!(
492                $VALUES,
493                TimestampNanosecondArray,
494                TimestampNanosecond,
495                $OP,
496                tz_opt
497            ),
498            DataType::Date32 => typed_min_max_batch!($VALUES, Date32Array, Date32, $OP),
499            DataType::Date64 => typed_min_max_batch!($VALUES, Date64Array, Date64, $OP),
500            DataType::Time32(TimeUnit::Second) => {
501                typed_min_max_batch!($VALUES, Time32SecondArray, Time32Second, $OP)
502            }
503            DataType::Time32(TimeUnit::Millisecond) => {
504                typed_min_max_batch!(
505                    $VALUES,
506                    Time32MillisecondArray,
507                    Time32Millisecond,
508                    $OP
509                )
510            }
511            DataType::Time64(TimeUnit::Microsecond) => {
512                typed_min_max_batch!(
513                    $VALUES,
514                    Time64MicrosecondArray,
515                    Time64Microsecond,
516                    $OP
517                )
518            }
519            DataType::Time64(TimeUnit::Nanosecond) => {
520                typed_min_max_batch!(
521                    $VALUES,
522                    Time64NanosecondArray,
523                    Time64Nanosecond,
524                    $OP
525                )
526            }
527            DataType::Interval(IntervalUnit::YearMonth) => {
528                typed_min_max_batch!(
529                    $VALUES,
530                    IntervalYearMonthArray,
531                    IntervalYearMonth,
532                    $OP
533                )
534            }
535            DataType::Interval(IntervalUnit::DayTime) => {
536                typed_min_max_batch!($VALUES, IntervalDayTimeArray, IntervalDayTime, $OP)
537            }
538            DataType::Interval(IntervalUnit::MonthDayNano) => {
539                typed_min_max_batch!(
540                    $VALUES,
541                    IntervalMonthDayNanoArray,
542                    IntervalMonthDayNano,
543                    $OP
544                )
545            }
546            DataType::Duration(TimeUnit::Second) => {
547                typed_min_max_batch!($VALUES, DurationSecondArray, DurationSecond, $OP)
548            }
549            DataType::Duration(TimeUnit::Millisecond) => {
550                typed_min_max_batch!(
551                    $VALUES,
552                    DurationMillisecondArray,
553                    DurationMillisecond,
554                    $OP
555                )
556            }
557            DataType::Duration(TimeUnit::Microsecond) => {
558                typed_min_max_batch!(
559                    $VALUES,
560                    DurationMicrosecondArray,
561                    DurationMicrosecond,
562                    $OP
563                )
564            }
565            DataType::Duration(TimeUnit::Nanosecond) => {
566                typed_min_max_batch!(
567                    $VALUES,
568                    DurationNanosecondArray,
569                    DurationNanosecond,
570                    $OP
571                )
572            }
573            other => {
574                // This should have been handled before
575                return internal_err!(
576                    "Min/Max accumulator not implemented for type {:?}",
577                    other
578                );
579            }
580        }
581    }};
582}
583
584/// dynamically-typed min(array) -> ScalarValue
585fn min_batch(values: &ArrayRef) -> Result<ScalarValue> {
586    Ok(match values.data_type() {
587        DataType::Utf8 => {
588            typed_min_max_batch_string!(values, StringArray, Utf8, min_string)
589        }
590        DataType::LargeUtf8 => {
591            typed_min_max_batch_string!(values, LargeStringArray, LargeUtf8, min_string)
592        }
593        DataType::Utf8View => {
594            typed_min_max_batch_string!(
595                values,
596                StringViewArray,
597                Utf8View,
598                min_string_view
599            )
600        }
601        DataType::Boolean => {
602            typed_min_max_batch!(values, BooleanArray, Boolean, min_boolean)
603        }
604        DataType::Binary => {
605            typed_min_max_batch_binary!(&values, BinaryArray, Binary, min_binary)
606        }
607        DataType::LargeBinary => {
608            typed_min_max_batch_binary!(
609                &values,
610                LargeBinaryArray,
611                LargeBinary,
612                min_binary
613            )
614        }
615        DataType::BinaryView => {
616            typed_min_max_batch_binary!(
617                &values,
618                BinaryViewArray,
619                BinaryView,
620                min_binary_view
621            )
622        }
623        DataType::Struct(_) => min_max_batch_generic(values, Ordering::Greater)?,
624        DataType::List(_) => min_max_batch_generic(values, Ordering::Greater)?,
625        DataType::LargeList(_) => min_max_batch_generic(values, Ordering::Greater)?,
626        DataType::FixedSizeList(_, _) => {
627            min_max_batch_generic(values, Ordering::Greater)?
628        }
629        DataType::Dictionary(_, _) => {
630            let values = values.as_any_dictionary().values();
631            min_batch(values)?
632        }
633        _ => min_max_batch!(values, min),
634    })
635}
636
637fn min_max_batch_generic(array: &ArrayRef, ordering: Ordering) -> Result<ScalarValue> {
638    if array.len() == array.null_count() {
639        return ScalarValue::try_from(array.data_type());
640    }
641    let mut extreme = ScalarValue::try_from_array(array, 0)?;
642    for i in 1..array.len() {
643        let current = ScalarValue::try_from_array(array, i)?;
644        if current.is_null() {
645            continue;
646        }
647        if extreme.is_null() {
648            extreme = current;
649            continue;
650        }
651        if let Some(cmp) = extreme.partial_cmp(&current) {
652            if cmp == ordering {
653                extreme = current;
654            }
655        }
656    }
657
658    Ok(extreme)
659}
660
661macro_rules! min_max_generic {
662    ($VALUE:expr, $DELTA:expr, $OP:ident) => {{
663        if $VALUE.is_null() {
664            let mut delta_copy = $DELTA.clone();
665            // When the new value won we want to compact it to
666            // avoid storing the entire input
667            delta_copy.compact();
668            delta_copy
669        } else if $DELTA.is_null() {
670            $VALUE.clone()
671        } else {
672            match $VALUE.partial_cmp(&$DELTA) {
673                Some(choose_min_max!($OP)) => {
674                    // When the new value won we want to compact it to
675                    // avoid storing the entire input
676                    let mut delta_copy = $DELTA.clone();
677                    delta_copy.compact();
678                    delta_copy
679                }
680                _ => $VALUE.clone(),
681            }
682        }
683    }};
684}
685
686/// dynamically-typed max(array) -> ScalarValue
687pub fn max_batch(values: &ArrayRef) -> Result<ScalarValue> {
688    Ok(match values.data_type() {
689        DataType::Utf8 => {
690            typed_min_max_batch_string!(values, StringArray, Utf8, max_string)
691        }
692        DataType::LargeUtf8 => {
693            typed_min_max_batch_string!(values, LargeStringArray, LargeUtf8, max_string)
694        }
695        DataType::Utf8View => {
696            typed_min_max_batch_string!(
697                values,
698                StringViewArray,
699                Utf8View,
700                max_string_view
701            )
702        }
703        DataType::Boolean => {
704            typed_min_max_batch!(values, BooleanArray, Boolean, max_boolean)
705        }
706        DataType::Binary => {
707            typed_min_max_batch_binary!(&values, BinaryArray, Binary, max_binary)
708        }
709        DataType::BinaryView => {
710            typed_min_max_batch_binary!(
711                &values,
712                BinaryViewArray,
713                BinaryView,
714                max_binary_view
715            )
716        }
717        DataType::LargeBinary => {
718            typed_min_max_batch_binary!(
719                &values,
720                LargeBinaryArray,
721                LargeBinary,
722                max_binary
723            )
724        }
725        DataType::Struct(_) => min_max_batch_generic(values, Ordering::Less)?,
726        DataType::List(_) => min_max_batch_generic(values, Ordering::Less)?,
727        DataType::LargeList(_) => min_max_batch_generic(values, Ordering::Less)?,
728        DataType::FixedSizeList(_, _) => min_max_batch_generic(values, Ordering::Less)?,
729        DataType::Dictionary(_, _) => {
730            let values = values.as_any_dictionary().values();
731            max_batch(values)?
732        }
733        _ => min_max_batch!(values, max),
734    })
735}
736
737// min/max of two non-string scalar values.
738macro_rules! typed_min_max {
739    ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident $(, $EXTRA_ARGS:ident)*) => {{
740        ScalarValue::$SCALAR(
741            match ($VALUE, $DELTA) {
742                (None, None) => None,
743                (Some(a), None) => Some(*a),
744                (None, Some(b)) => Some(*b),
745                (Some(a), Some(b)) => Some((*a).$OP(*b)),
746            },
747            $($EXTRA_ARGS.clone()),*
748        )
749    }};
750}
751macro_rules! typed_min_max_float {
752    ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident) => {{
753        ScalarValue::$SCALAR(match ($VALUE, $DELTA) {
754            (None, None) => None,
755            (Some(a), None) => Some(*a),
756            (None, Some(b)) => Some(*b),
757            (Some(a), Some(b)) => match a.total_cmp(b) {
758                choose_min_max!($OP) => Some(*b),
759                _ => Some(*a),
760            },
761        })
762    }};
763}
764
765// min/max of two scalar string values.
766macro_rules! typed_min_max_string {
767    ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident) => {{
768        ScalarValue::$SCALAR(match ($VALUE, $DELTA) {
769            (None, None) => None,
770            (Some(a), None) => Some(a.clone()),
771            (None, Some(b)) => Some(b.clone()),
772            (Some(a), Some(b)) => Some((a).$OP(b).clone()),
773        })
774    }};
775}
776
777macro_rules! choose_min_max {
778    (min) => {
779        std::cmp::Ordering::Greater
780    };
781    (max) => {
782        std::cmp::Ordering::Less
783    };
784}
785
786macro_rules! interval_min_max {
787    ($OP:tt, $LHS:expr, $RHS:expr) => {{
788        match $LHS.partial_cmp(&$RHS) {
789            Some(choose_min_max!($OP)) => $RHS.clone(),
790            Some(_) => $LHS.clone(),
791            None => {
792                return internal_err!("Comparison error while computing interval min/max")
793            }
794        }
795    }};
796}
797
798// min/max of two scalar values of the same type
799macro_rules! min_max {
800    ($VALUE:expr, $DELTA:expr, $OP:ident) => {{
801        Ok(match ($VALUE, $DELTA) {
802            (ScalarValue::Null, ScalarValue::Null) => ScalarValue::Null,
803            (
804                lhs @ ScalarValue::Decimal128(lhsv, lhsp, lhss),
805                rhs @ ScalarValue::Decimal128(rhsv, rhsp, rhss)
806            ) => {
807                if lhsp.eq(rhsp) && lhss.eq(rhss) {
808                    typed_min_max!(lhsv, rhsv, Decimal128, $OP, lhsp, lhss)
809                } else {
810                    return internal_err!(
811                    "MIN/MAX is not expected to receive scalars of incompatible types {:?}",
812                    (lhs, rhs)
813                );
814                }
815            }
816            (
817                lhs @ ScalarValue::Decimal256(lhsv, lhsp, lhss),
818                rhs @ ScalarValue::Decimal256(rhsv, rhsp, rhss)
819            ) => {
820                if lhsp.eq(rhsp) && lhss.eq(rhss) {
821                    typed_min_max!(lhsv, rhsv, Decimal256, $OP, lhsp, lhss)
822                } else {
823                    return internal_err!(
824                    "MIN/MAX is not expected to receive scalars of incompatible types {:?}",
825                    (lhs, rhs)
826                );
827                }
828            }
829            (ScalarValue::Boolean(lhs), ScalarValue::Boolean(rhs)) => {
830                typed_min_max!(lhs, rhs, Boolean, $OP)
831            }
832            (ScalarValue::Float64(lhs), ScalarValue::Float64(rhs)) => {
833                typed_min_max_float!(lhs, rhs, Float64, $OP)
834            }
835            (ScalarValue::Float32(lhs), ScalarValue::Float32(rhs)) => {
836                typed_min_max_float!(lhs, rhs, Float32, $OP)
837            }
838            (ScalarValue::Float16(lhs), ScalarValue::Float16(rhs)) => {
839                typed_min_max_float!(lhs, rhs, Float16, $OP)
840            }
841            (ScalarValue::UInt64(lhs), ScalarValue::UInt64(rhs)) => {
842                typed_min_max!(lhs, rhs, UInt64, $OP)
843            }
844            (ScalarValue::UInt32(lhs), ScalarValue::UInt32(rhs)) => {
845                typed_min_max!(lhs, rhs, UInt32, $OP)
846            }
847            (ScalarValue::UInt16(lhs), ScalarValue::UInt16(rhs)) => {
848                typed_min_max!(lhs, rhs, UInt16, $OP)
849            }
850            (ScalarValue::UInt8(lhs), ScalarValue::UInt8(rhs)) => {
851                typed_min_max!(lhs, rhs, UInt8, $OP)
852            }
853            (ScalarValue::Int64(lhs), ScalarValue::Int64(rhs)) => {
854                typed_min_max!(lhs, rhs, Int64, $OP)
855            }
856            (ScalarValue::Int32(lhs), ScalarValue::Int32(rhs)) => {
857                typed_min_max!(lhs, rhs, Int32, $OP)
858            }
859            (ScalarValue::Int16(lhs), ScalarValue::Int16(rhs)) => {
860                typed_min_max!(lhs, rhs, Int16, $OP)
861            }
862            (ScalarValue::Int8(lhs), ScalarValue::Int8(rhs)) => {
863                typed_min_max!(lhs, rhs, Int8, $OP)
864            }
865            (ScalarValue::Utf8(lhs), ScalarValue::Utf8(rhs)) => {
866                typed_min_max_string!(lhs, rhs, Utf8, $OP)
867            }
868            (ScalarValue::LargeUtf8(lhs), ScalarValue::LargeUtf8(rhs)) => {
869                typed_min_max_string!(lhs, rhs, LargeUtf8, $OP)
870            }
871            (ScalarValue::Utf8View(lhs), ScalarValue::Utf8View(rhs)) => {
872                typed_min_max_string!(lhs, rhs, Utf8View, $OP)
873            }
874            (ScalarValue::Binary(lhs), ScalarValue::Binary(rhs)) => {
875                typed_min_max_string!(lhs, rhs, Binary, $OP)
876            }
877            (ScalarValue::LargeBinary(lhs), ScalarValue::LargeBinary(rhs)) => {
878                typed_min_max_string!(lhs, rhs, LargeBinary, $OP)
879            }
880            (ScalarValue::BinaryView(lhs), ScalarValue::BinaryView(rhs)) => {
881                typed_min_max_string!(lhs, rhs, BinaryView, $OP)
882            }
883            (ScalarValue::TimestampSecond(lhs, l_tz), ScalarValue::TimestampSecond(rhs, _)) => {
884                typed_min_max!(lhs, rhs, TimestampSecond, $OP, l_tz)
885            }
886            (
887                ScalarValue::TimestampMillisecond(lhs, l_tz),
888                ScalarValue::TimestampMillisecond(rhs, _),
889            ) => {
890                typed_min_max!(lhs, rhs, TimestampMillisecond, $OP, l_tz)
891            }
892            (
893                ScalarValue::TimestampMicrosecond(lhs, l_tz),
894                ScalarValue::TimestampMicrosecond(rhs, _),
895            ) => {
896                typed_min_max!(lhs, rhs, TimestampMicrosecond, $OP, l_tz)
897            }
898            (
899                ScalarValue::TimestampNanosecond(lhs, l_tz),
900                ScalarValue::TimestampNanosecond(rhs, _),
901            ) => {
902                typed_min_max!(lhs, rhs, TimestampNanosecond, $OP, l_tz)
903            }
904            (
905                ScalarValue::Date32(lhs),
906                ScalarValue::Date32(rhs),
907            ) => {
908                typed_min_max!(lhs, rhs, Date32, $OP)
909            }
910            (
911                ScalarValue::Date64(lhs),
912                ScalarValue::Date64(rhs),
913            ) => {
914                typed_min_max!(lhs, rhs, Date64, $OP)
915            }
916            (
917                ScalarValue::Time32Second(lhs),
918                ScalarValue::Time32Second(rhs),
919            ) => {
920                typed_min_max!(lhs, rhs, Time32Second, $OP)
921            }
922            (
923                ScalarValue::Time32Millisecond(lhs),
924                ScalarValue::Time32Millisecond(rhs),
925            ) => {
926                typed_min_max!(lhs, rhs, Time32Millisecond, $OP)
927            }
928            (
929                ScalarValue::Time64Microsecond(lhs),
930                ScalarValue::Time64Microsecond(rhs),
931            ) => {
932                typed_min_max!(lhs, rhs, Time64Microsecond, $OP)
933            }
934            (
935                ScalarValue::Time64Nanosecond(lhs),
936                ScalarValue::Time64Nanosecond(rhs),
937            ) => {
938                typed_min_max!(lhs, rhs, Time64Nanosecond, $OP)
939            }
940            (
941                ScalarValue::IntervalYearMonth(lhs),
942                ScalarValue::IntervalYearMonth(rhs),
943            ) => {
944                typed_min_max!(lhs, rhs, IntervalYearMonth, $OP)
945            }
946            (
947                ScalarValue::IntervalMonthDayNano(lhs),
948                ScalarValue::IntervalMonthDayNano(rhs),
949            ) => {
950                typed_min_max!(lhs, rhs, IntervalMonthDayNano, $OP)
951            }
952            (
953                ScalarValue::IntervalDayTime(lhs),
954                ScalarValue::IntervalDayTime(rhs),
955            ) => {
956                typed_min_max!(lhs, rhs, IntervalDayTime, $OP)
957            }
958            (
959                ScalarValue::IntervalYearMonth(_),
960                ScalarValue::IntervalMonthDayNano(_),
961            ) | (
962                ScalarValue::IntervalYearMonth(_),
963                ScalarValue::IntervalDayTime(_),
964            ) | (
965                ScalarValue::IntervalMonthDayNano(_),
966                ScalarValue::IntervalDayTime(_),
967            ) | (
968                ScalarValue::IntervalMonthDayNano(_),
969                ScalarValue::IntervalYearMonth(_),
970            ) | (
971                ScalarValue::IntervalDayTime(_),
972                ScalarValue::IntervalYearMonth(_),
973            ) | (
974                ScalarValue::IntervalDayTime(_),
975                ScalarValue::IntervalMonthDayNano(_),
976            ) => {
977                interval_min_max!($OP, $VALUE, $DELTA)
978            }
979                    (
980                ScalarValue::DurationSecond(lhs),
981                ScalarValue::DurationSecond(rhs),
982            ) => {
983                typed_min_max!(lhs, rhs, DurationSecond, $OP)
984            }
985                                (
986                ScalarValue::DurationMillisecond(lhs),
987                ScalarValue::DurationMillisecond(rhs),
988            ) => {
989                typed_min_max!(lhs, rhs, DurationMillisecond, $OP)
990            }
991                                (
992                ScalarValue::DurationMicrosecond(lhs),
993                ScalarValue::DurationMicrosecond(rhs),
994            ) => {
995                typed_min_max!(lhs, rhs, DurationMicrosecond, $OP)
996            }
997                                        (
998                ScalarValue::DurationNanosecond(lhs),
999                ScalarValue::DurationNanosecond(rhs),
1000            ) => {
1001                typed_min_max!(lhs, rhs, DurationNanosecond, $OP)
1002            }
1003
1004            (
1005                lhs @ ScalarValue::Struct(_),
1006                rhs @ ScalarValue::Struct(_),
1007            ) => {
1008                min_max_generic!(lhs, rhs, $OP)
1009            }
1010
1011            (
1012                lhs @ ScalarValue::List(_),
1013                rhs @ ScalarValue::List(_),
1014            ) => {
1015                min_max_generic!(lhs, rhs, $OP)
1016            }
1017
1018
1019            (
1020                lhs @ ScalarValue::LargeList(_),
1021                rhs @ ScalarValue::LargeList(_),
1022            ) => {
1023                min_max_generic!(lhs, rhs, $OP)
1024            }
1025
1026
1027            (
1028                lhs @ ScalarValue::FixedSizeList(_),
1029                rhs @ ScalarValue::FixedSizeList(_),
1030            ) => {
1031                min_max_generic!(lhs, rhs, $OP)
1032            }
1033
1034            e => {
1035                return internal_err!(
1036                    "MIN/MAX is not expected to receive scalars of incompatible types {:?}",
1037                    e
1038                )
1039            }
1040        })
1041    }};
1042}
1043
1044/// An accumulator to compute the maximum value
1045#[derive(Debug)]
1046pub struct MaxAccumulator {
1047    max: ScalarValue,
1048}
1049
1050impl MaxAccumulator {
1051    /// new max accumulator
1052    pub fn try_new(datatype: &DataType) -> Result<Self> {
1053        Ok(Self {
1054            max: ScalarValue::try_from(datatype)?,
1055        })
1056    }
1057}
1058
1059impl Accumulator for MaxAccumulator {
1060    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
1061        let values = &values[0];
1062        let delta = &max_batch(values)?;
1063        let new_max: Result<ScalarValue, DataFusionError> =
1064            min_max!(&self.max, delta, max);
1065        self.max = new_max?;
1066        Ok(())
1067    }
1068
1069    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
1070        self.update_batch(states)
1071    }
1072
1073    fn state(&mut self) -> Result<Vec<ScalarValue>> {
1074        Ok(vec![self.evaluate()?])
1075    }
1076    fn evaluate(&mut self) -> Result<ScalarValue> {
1077        Ok(self.max.clone())
1078    }
1079
1080    fn size(&self) -> usize {
1081        size_of_val(self) - size_of_val(&self.max) + self.max.size()
1082    }
1083}
1084
1085#[derive(Debug)]
1086pub struct SlidingMaxAccumulator {
1087    max: ScalarValue,
1088    moving_max: MovingMax<ScalarValue>,
1089}
1090
1091impl SlidingMaxAccumulator {
1092    /// new max accumulator
1093    pub fn try_new(datatype: &DataType) -> Result<Self> {
1094        Ok(Self {
1095            max: ScalarValue::try_from(datatype)?,
1096            moving_max: MovingMax::<ScalarValue>::new(),
1097        })
1098    }
1099}
1100
1101impl Accumulator for SlidingMaxAccumulator {
1102    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
1103        for idx in 0..values[0].len() {
1104            let val = ScalarValue::try_from_array(&values[0], idx)?;
1105            self.moving_max.push(val);
1106        }
1107        if let Some(res) = self.moving_max.max() {
1108            self.max = res.clone();
1109        }
1110        Ok(())
1111    }
1112
1113    fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
1114        for _idx in 0..values[0].len() {
1115            (self.moving_max).pop();
1116        }
1117        if let Some(res) = self.moving_max.max() {
1118            self.max = res.clone();
1119        }
1120        Ok(())
1121    }
1122
1123    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
1124        self.update_batch(states)
1125    }
1126
1127    fn state(&mut self) -> Result<Vec<ScalarValue>> {
1128        Ok(vec![self.max.clone()])
1129    }
1130
1131    fn evaluate(&mut self) -> Result<ScalarValue> {
1132        Ok(self.max.clone())
1133    }
1134
1135    fn supports_retract_batch(&self) -> bool {
1136        true
1137    }
1138
1139    fn size(&self) -> usize {
1140        size_of_val(self) - size_of_val(&self.max) + self.max.size()
1141    }
1142}
1143
1144#[user_doc(
1145    doc_section(label = "General Functions"),
1146    description = "Returns the minimum value in the specified column.",
1147    syntax_example = "min(expression)",
1148    sql_example = r#"```sql
1149> SELECT min(column_name) FROM table_name;
1150+----------------------+
1151| min(column_name)      |
1152+----------------------+
1153| 12                   |
1154+----------------------+
1155```"#,
1156    standard_argument(name = "expression",)
1157)]
1158#[derive(Debug)]
1159pub struct Min {
1160    signature: Signature,
1161}
1162
1163impl Min {
1164    pub fn new() -> Self {
1165        Self {
1166            signature: Signature::user_defined(Volatility::Immutable),
1167        }
1168    }
1169}
1170
1171impl Default for Min {
1172    fn default() -> Self {
1173        Self::new()
1174    }
1175}
1176
1177impl FromColumnStatistics for Min {
1178    fn value_from_column_statistics(
1179        &self,
1180        col_stats: &ColumnStatistics,
1181    ) -> Option<ScalarValue> {
1182        if let Precision::Exact(ref val) = col_stats.min_value {
1183            if !val.is_null() {
1184                return Some(val.clone());
1185            }
1186        }
1187        None
1188    }
1189}
1190
1191impl AggregateUDFImpl for Min {
1192    fn as_any(&self) -> &dyn std::any::Any {
1193        self
1194    }
1195
1196    fn name(&self) -> &str {
1197        "min"
1198    }
1199
1200    fn signature(&self) -> &Signature {
1201        &self.signature
1202    }
1203
1204    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
1205        Ok(arg_types[0].to_owned())
1206    }
1207
1208    fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
1209        Ok(Box::new(MinAccumulator::try_new(
1210            acc_args.return_field.data_type(),
1211        )?))
1212    }
1213
1214    fn aliases(&self) -> &[String] {
1215        &[]
1216    }
1217
1218    fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool {
1219        use DataType::*;
1220        matches!(
1221            args.return_field.data_type(),
1222            Int8 | Int16
1223                | Int32
1224                | Int64
1225                | UInt8
1226                | UInt16
1227                | UInt32
1228                | UInt64
1229                | Float16
1230                | Float32
1231                | Float64
1232                | Decimal128(_, _)
1233                | Decimal256(_, _)
1234                | Date32
1235                | Date64
1236                | Time32(_)
1237                | Time64(_)
1238                | Timestamp(_, _)
1239                | Utf8
1240                | LargeUtf8
1241                | Utf8View
1242                | Binary
1243                | LargeBinary
1244                | BinaryView
1245                | Duration(_)
1246                | Struct(_)
1247        )
1248    }
1249
1250    fn create_groups_accumulator(
1251        &self,
1252        args: AccumulatorArgs,
1253    ) -> Result<Box<dyn GroupsAccumulator>> {
1254        use DataType::*;
1255        use TimeUnit::*;
1256        let data_type = args.return_field.data_type();
1257        match data_type {
1258            Int8 => primitive_min_accumulator!(data_type, i8, Int8Type),
1259            Int16 => primitive_min_accumulator!(data_type, i16, Int16Type),
1260            Int32 => primitive_min_accumulator!(data_type, i32, Int32Type),
1261            Int64 => primitive_min_accumulator!(data_type, i64, Int64Type),
1262            UInt8 => primitive_min_accumulator!(data_type, u8, UInt8Type),
1263            UInt16 => primitive_min_accumulator!(data_type, u16, UInt16Type),
1264            UInt32 => primitive_min_accumulator!(data_type, u32, UInt32Type),
1265            UInt64 => primitive_min_accumulator!(data_type, u64, UInt64Type),
1266            Float16 => {
1267                primitive_min_accumulator!(data_type, f16, Float16Type)
1268            }
1269            Float32 => {
1270                primitive_min_accumulator!(data_type, f32, Float32Type)
1271            }
1272            Float64 => {
1273                primitive_min_accumulator!(data_type, f64, Float64Type)
1274            }
1275            Date32 => primitive_min_accumulator!(data_type, i32, Date32Type),
1276            Date64 => primitive_min_accumulator!(data_type, i64, Date64Type),
1277            Time32(Second) => {
1278                primitive_min_accumulator!(data_type, i32, Time32SecondType)
1279            }
1280            Time32(Millisecond) => {
1281                primitive_min_accumulator!(data_type, i32, Time32MillisecondType)
1282            }
1283            Time64(Microsecond) => {
1284                primitive_min_accumulator!(data_type, i64, Time64MicrosecondType)
1285            }
1286            Time64(Nanosecond) => {
1287                primitive_min_accumulator!(data_type, i64, Time64NanosecondType)
1288            }
1289            Timestamp(Second, _) => {
1290                primitive_min_accumulator!(data_type, i64, TimestampSecondType)
1291            }
1292            Timestamp(Millisecond, _) => {
1293                primitive_min_accumulator!(data_type, i64, TimestampMillisecondType)
1294            }
1295            Timestamp(Microsecond, _) => {
1296                primitive_min_accumulator!(data_type, i64, TimestampMicrosecondType)
1297            }
1298            Timestamp(Nanosecond, _) => {
1299                primitive_min_accumulator!(data_type, i64, TimestampNanosecondType)
1300            }
1301            Duration(Second) => {
1302                primitive_min_accumulator!(data_type, i64, DurationSecondType)
1303            }
1304            Duration(Millisecond) => {
1305                primitive_min_accumulator!(data_type, i64, DurationMillisecondType)
1306            }
1307            Duration(Microsecond) => {
1308                primitive_min_accumulator!(data_type, i64, DurationMicrosecondType)
1309            }
1310            Duration(Nanosecond) => {
1311                primitive_min_accumulator!(data_type, i64, DurationNanosecondType)
1312            }
1313            Decimal128(_, _) => {
1314                primitive_min_accumulator!(data_type, i128, Decimal128Type)
1315            }
1316            Decimal256(_, _) => {
1317                primitive_min_accumulator!(data_type, i256, Decimal256Type)
1318            }
1319            Utf8 | LargeUtf8 | Utf8View | Binary | LargeBinary | BinaryView => {
1320                Ok(Box::new(MinMaxBytesAccumulator::new_min(data_type.clone())))
1321            }
1322            Struct(_) => Ok(Box::new(MinMaxStructAccumulator::new_min(
1323                data_type.clone(),
1324            ))),
1325            // This is only reached if groups_accumulator_supported is out of sync
1326            _ => internal_err!("GroupsAccumulator not supported for min({})", data_type),
1327        }
1328    }
1329
1330    fn create_sliding_accumulator(
1331        &self,
1332        args: AccumulatorArgs,
1333    ) -> Result<Box<dyn Accumulator>> {
1334        Ok(Box::new(SlidingMinAccumulator::try_new(
1335            args.return_field.data_type(),
1336        )?))
1337    }
1338
1339    fn is_descending(&self) -> Option<bool> {
1340        Some(false)
1341    }
1342
1343    fn value_from_stats(&self, statistics_args: &StatisticsArgs) -> Option<ScalarValue> {
1344        self.value_from_statistics(statistics_args)
1345    }
1346    fn order_sensitivity(&self) -> datafusion_expr::utils::AggregateOrderSensitivity {
1347        datafusion_expr::utils::AggregateOrderSensitivity::Insensitive
1348    }
1349
1350    fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
1351        get_min_max_result_type(arg_types)
1352    }
1353
1354    fn reverse_expr(&self) -> datafusion_expr::ReversedUDAF {
1355        datafusion_expr::ReversedUDAF::Identical
1356    }
1357
1358    fn documentation(&self) -> Option<&Documentation> {
1359        self.doc()
1360    }
1361
1362    fn set_monotonicity(&self, _data_type: &DataType) -> SetMonotonicity {
1363        // `MIN` is monotonically decreasing as it always decreases or stays
1364        // the same as new values are seen.
1365        SetMonotonicity::Decreasing
1366    }
1367}
1368
1369/// An accumulator to compute the minimum value
1370#[derive(Debug)]
1371pub struct MinAccumulator {
1372    min: ScalarValue,
1373}
1374
1375impl MinAccumulator {
1376    /// new min accumulator
1377    pub fn try_new(datatype: &DataType) -> Result<Self> {
1378        Ok(Self {
1379            min: ScalarValue::try_from(datatype)?,
1380        })
1381    }
1382}
1383
1384impl Accumulator for MinAccumulator {
1385    fn state(&mut self) -> Result<Vec<ScalarValue>> {
1386        Ok(vec![self.evaluate()?])
1387    }
1388
1389    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
1390        let values = &values[0];
1391        let delta = &min_batch(values)?;
1392        let new_min: Result<ScalarValue, DataFusionError> =
1393            min_max!(&self.min, delta, min);
1394        self.min = new_min?;
1395        Ok(())
1396    }
1397
1398    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
1399        self.update_batch(states)
1400    }
1401
1402    fn evaluate(&mut self) -> Result<ScalarValue> {
1403        Ok(self.min.clone())
1404    }
1405
1406    fn size(&self) -> usize {
1407        size_of_val(self) - size_of_val(&self.min) + self.min.size()
1408    }
1409}
1410
1411#[derive(Debug)]
1412pub struct SlidingMinAccumulator {
1413    min: ScalarValue,
1414    moving_min: MovingMin<ScalarValue>,
1415}
1416
1417impl SlidingMinAccumulator {
1418    pub fn try_new(datatype: &DataType) -> Result<Self> {
1419        Ok(Self {
1420            min: ScalarValue::try_from(datatype)?,
1421            moving_min: MovingMin::<ScalarValue>::new(),
1422        })
1423    }
1424}
1425
1426impl Accumulator for SlidingMinAccumulator {
1427    fn state(&mut self) -> Result<Vec<ScalarValue>> {
1428        Ok(vec![self.min.clone()])
1429    }
1430
1431    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
1432        for idx in 0..values[0].len() {
1433            let val = ScalarValue::try_from_array(&values[0], idx)?;
1434            if !val.is_null() {
1435                self.moving_min.push(val);
1436            }
1437        }
1438        if let Some(res) = self.moving_min.min() {
1439            self.min = res.clone();
1440        }
1441        Ok(())
1442    }
1443
1444    fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
1445        for idx in 0..values[0].len() {
1446            let val = ScalarValue::try_from_array(&values[0], idx)?;
1447            if !val.is_null() {
1448                (self.moving_min).pop();
1449            }
1450        }
1451        if let Some(res) = self.moving_min.min() {
1452            self.min = res.clone();
1453        }
1454        Ok(())
1455    }
1456
1457    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
1458        self.update_batch(states)
1459    }
1460
1461    fn evaluate(&mut self) -> Result<ScalarValue> {
1462        Ok(self.min.clone())
1463    }
1464
1465    fn supports_retract_batch(&self) -> bool {
1466        true
1467    }
1468
1469    fn size(&self) -> usize {
1470        size_of_val(self) - size_of_val(&self.min) + self.min.size()
1471    }
1472}
1473
1474/// Keep track of the minimum value in a sliding window.
1475///
1476/// The implementation is taken from <https://github.com/spebern/moving_min_max/blob/master/src/lib.rs>
1477///
1478/// `moving min max` provides one data structure for keeping track of the
1479/// minimum value and one for keeping track of the maximum value in a sliding
1480/// window.
1481///
1482/// Each element is stored with the current min/max. One stack to push and another one for pop. If pop stack is empty,
1483/// push to this stack all elements popped from first stack while updating their current min/max. Now pop from
1484/// the second stack (MovingMin/Max struct works as a queue). To find the minimum element of the queue,
1485/// look at the smallest/largest two elements of the individual stacks, then take the minimum of those two values.
1486///
1487/// The complexity of the operations are
1488/// - O(1) for getting the minimum/maximum
1489/// - O(1) for push
1490/// - amortized O(1) for pop
1491///
1492/// ```
1493/// # use datafusion_functions_aggregate::min_max::MovingMin;
1494/// let mut moving_min = MovingMin::<i32>::new();
1495/// moving_min.push(2);
1496/// moving_min.push(1);
1497/// moving_min.push(3);
1498///
1499/// assert_eq!(moving_min.min(), Some(&1));
1500/// assert_eq!(moving_min.pop(), Some(2));
1501///
1502/// assert_eq!(moving_min.min(), Some(&1));
1503/// assert_eq!(moving_min.pop(), Some(1));
1504///
1505/// assert_eq!(moving_min.min(), Some(&3));
1506/// assert_eq!(moving_min.pop(), Some(3));
1507///
1508/// assert_eq!(moving_min.min(), None);
1509/// assert_eq!(moving_min.pop(), None);
1510/// ```
1511#[derive(Debug)]
1512pub struct MovingMin<T> {
1513    push_stack: Vec<(T, T)>,
1514    pop_stack: Vec<(T, T)>,
1515}
1516
1517impl<T: Clone + PartialOrd> Default for MovingMin<T> {
1518    fn default() -> Self {
1519        Self {
1520            push_stack: Vec::new(),
1521            pop_stack: Vec::new(),
1522        }
1523    }
1524}
1525
1526impl<T: Clone + PartialOrd> MovingMin<T> {
1527    /// Creates a new `MovingMin` to keep track of the minimum in a sliding
1528    /// window.
1529    #[inline]
1530    pub fn new() -> Self {
1531        Self::default()
1532    }
1533
1534    /// Creates a new `MovingMin` to keep track of the minimum in a sliding
1535    /// window with `capacity` allocated slots.
1536    #[inline]
1537    pub fn with_capacity(capacity: usize) -> Self {
1538        Self {
1539            push_stack: Vec::with_capacity(capacity),
1540            pop_stack: Vec::with_capacity(capacity),
1541        }
1542    }
1543
1544    /// Returns the minimum of the sliding window or `None` if the window is
1545    /// empty.
1546    #[inline]
1547    pub fn min(&self) -> Option<&T> {
1548        match (self.push_stack.last(), self.pop_stack.last()) {
1549            (None, None) => None,
1550            (Some((_, min)), None) => Some(min),
1551            (None, Some((_, min))) => Some(min),
1552            (Some((_, a)), Some((_, b))) => Some(if a < b { a } else { b }),
1553        }
1554    }
1555
1556    /// Pushes a new element into the sliding window.
1557    #[inline]
1558    pub fn push(&mut self, val: T) {
1559        self.push_stack.push(match self.push_stack.last() {
1560            Some((_, min)) => {
1561                if val > *min {
1562                    (val, min.clone())
1563                } else {
1564                    (val.clone(), val)
1565                }
1566            }
1567            None => (val.clone(), val),
1568        });
1569    }
1570
1571    /// Removes and returns the last value of the sliding window.
1572    #[inline]
1573    pub fn pop(&mut self) -> Option<T> {
1574        if self.pop_stack.is_empty() {
1575            match self.push_stack.pop() {
1576                Some((val, _)) => {
1577                    let mut last = (val.clone(), val);
1578                    self.pop_stack.push(last.clone());
1579                    while let Some((val, _)) = self.push_stack.pop() {
1580                        let min = if last.1 < val {
1581                            last.1.clone()
1582                        } else {
1583                            val.clone()
1584                        };
1585                        last = (val.clone(), min);
1586                        self.pop_stack.push(last.clone());
1587                    }
1588                }
1589                None => return None,
1590            }
1591        }
1592        self.pop_stack.pop().map(|(val, _)| val)
1593    }
1594
1595    /// Returns the number of elements stored in the sliding window.
1596    #[inline]
1597    pub fn len(&self) -> usize {
1598        self.push_stack.len() + self.pop_stack.len()
1599    }
1600
1601    /// Returns `true` if the moving window contains no elements.
1602    #[inline]
1603    pub fn is_empty(&self) -> bool {
1604        self.len() == 0
1605    }
1606}
1607
1608/// Keep track of the maximum value in a sliding window.
1609///
1610/// See [`MovingMin`] for more details.
1611///
1612/// ```
1613/// # use datafusion_functions_aggregate::min_max::MovingMax;
1614/// let mut moving_max = MovingMax::<i32>::new();
1615/// moving_max.push(2);
1616/// moving_max.push(3);
1617/// moving_max.push(1);
1618///
1619/// assert_eq!(moving_max.max(), Some(&3));
1620/// assert_eq!(moving_max.pop(), Some(2));
1621///
1622/// assert_eq!(moving_max.max(), Some(&3));
1623/// assert_eq!(moving_max.pop(), Some(3));
1624///
1625/// assert_eq!(moving_max.max(), Some(&1));
1626/// assert_eq!(moving_max.pop(), Some(1));
1627///
1628/// assert_eq!(moving_max.max(), None);
1629/// assert_eq!(moving_max.pop(), None);
1630/// ```
1631#[derive(Debug)]
1632pub struct MovingMax<T> {
1633    push_stack: Vec<(T, T)>,
1634    pop_stack: Vec<(T, T)>,
1635}
1636
1637impl<T: Clone + PartialOrd> Default for MovingMax<T> {
1638    fn default() -> Self {
1639        Self {
1640            push_stack: Vec::new(),
1641            pop_stack: Vec::new(),
1642        }
1643    }
1644}
1645
1646impl<T: Clone + PartialOrd> MovingMax<T> {
1647    /// Creates a new `MovingMax` to keep track of the maximum in a sliding window.
1648    #[inline]
1649    pub fn new() -> Self {
1650        Self::default()
1651    }
1652
1653    /// Creates a new `MovingMax` to keep track of the maximum in a sliding window with
1654    /// `capacity` allocated slots.
1655    #[inline]
1656    pub fn with_capacity(capacity: usize) -> Self {
1657        Self {
1658            push_stack: Vec::with_capacity(capacity),
1659            pop_stack: Vec::with_capacity(capacity),
1660        }
1661    }
1662
1663    /// Returns the maximum of the sliding window or `None` if the window is empty.
1664    #[inline]
1665    pub fn max(&self) -> Option<&T> {
1666        match (self.push_stack.last(), self.pop_stack.last()) {
1667            (None, None) => None,
1668            (Some((_, max)), None) => Some(max),
1669            (None, Some((_, max))) => Some(max),
1670            (Some((_, a)), Some((_, b))) => Some(if a > b { a } else { b }),
1671        }
1672    }
1673
1674    /// Pushes a new element into the sliding window.
1675    #[inline]
1676    pub fn push(&mut self, val: T) {
1677        self.push_stack.push(match self.push_stack.last() {
1678            Some((_, max)) => {
1679                if val < *max {
1680                    (val, max.clone())
1681                } else {
1682                    (val.clone(), val)
1683                }
1684            }
1685            None => (val.clone(), val),
1686        });
1687    }
1688
1689    /// Removes and returns the last value of the sliding window.
1690    #[inline]
1691    pub fn pop(&mut self) -> Option<T> {
1692        if self.pop_stack.is_empty() {
1693            match self.push_stack.pop() {
1694                Some((val, _)) => {
1695                    let mut last = (val.clone(), val);
1696                    self.pop_stack.push(last.clone());
1697                    while let Some((val, _)) = self.push_stack.pop() {
1698                        let max = if last.1 > val {
1699                            last.1.clone()
1700                        } else {
1701                            val.clone()
1702                        };
1703                        last = (val.clone(), max);
1704                        self.pop_stack.push(last.clone());
1705                    }
1706                }
1707                None => return None,
1708            }
1709        }
1710        self.pop_stack.pop().map(|(val, _)| val)
1711    }
1712
1713    /// Returns the number of elements stored in the sliding window.
1714    #[inline]
1715    pub fn len(&self) -> usize {
1716        self.push_stack.len() + self.pop_stack.len()
1717    }
1718
1719    /// Returns `true` if the moving window contains no elements.
1720    #[inline]
1721    pub fn is_empty(&self) -> bool {
1722        self.len() == 0
1723    }
1724}
1725
1726make_udaf_expr_and_func!(
1727    Max,
1728    max,
1729    expression,
1730    "Returns the maximum of a group of values.",
1731    max_udaf
1732);
1733
1734make_udaf_expr_and_func!(
1735    Min,
1736    min,
1737    expression,
1738    "Returns the minimum of a group of values.",
1739    min_udaf
1740);
1741
1742#[cfg(test)]
1743mod tests {
1744    use super::*;
1745    use arrow::{
1746        array::DictionaryArray,
1747        datatypes::{
1748            IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
1749        },
1750    };
1751    use std::sync::Arc;
1752
1753    #[test]
1754    fn interval_min_max() {
1755        // IntervalYearMonth
1756        let b = IntervalYearMonthArray::from(vec![
1757            IntervalYearMonthType::make_value(0, 1),
1758            IntervalYearMonthType::make_value(5, 34),
1759            IntervalYearMonthType::make_value(-2, 4),
1760            IntervalYearMonthType::make_value(7, -4),
1761            IntervalYearMonthType::make_value(0, 1),
1762        ]);
1763        let b: ArrayRef = Arc::new(b);
1764
1765        let mut min =
1766            MinAccumulator::try_new(&DataType::Interval(IntervalUnit::YearMonth))
1767                .unwrap();
1768        min.update_batch(&[Arc::clone(&b)]).unwrap();
1769        let min_res = min.evaluate().unwrap();
1770        assert_eq!(
1771            min_res,
1772            ScalarValue::IntervalYearMonth(Some(IntervalYearMonthType::make_value(
1773                -2, 4,
1774            )))
1775        );
1776
1777        let mut max =
1778            MaxAccumulator::try_new(&DataType::Interval(IntervalUnit::YearMonth))
1779                .unwrap();
1780        max.update_batch(&[Arc::clone(&b)]).unwrap();
1781        let max_res = max.evaluate().unwrap();
1782        assert_eq!(
1783            max_res,
1784            ScalarValue::IntervalYearMonth(Some(IntervalYearMonthType::make_value(
1785                5, 34,
1786            )))
1787        );
1788
1789        // IntervalDayTime
1790        let b = IntervalDayTimeArray::from(vec![
1791            IntervalDayTimeType::make_value(0, 0),
1792            IntervalDayTimeType::make_value(5, 454000),
1793            IntervalDayTimeType::make_value(-34, 0),
1794            IntervalDayTimeType::make_value(7, -4000),
1795            IntervalDayTimeType::make_value(1, 0),
1796        ]);
1797        let b: ArrayRef = Arc::new(b);
1798
1799        let mut min =
1800            MinAccumulator::try_new(&DataType::Interval(IntervalUnit::DayTime)).unwrap();
1801        min.update_batch(&[Arc::clone(&b)]).unwrap();
1802        let min_res = min.evaluate().unwrap();
1803        assert_eq!(
1804            min_res,
1805            ScalarValue::IntervalDayTime(Some(IntervalDayTimeType::make_value(-34, 0)))
1806        );
1807
1808        let mut max =
1809            MaxAccumulator::try_new(&DataType::Interval(IntervalUnit::DayTime)).unwrap();
1810        max.update_batch(&[Arc::clone(&b)]).unwrap();
1811        let max_res = max.evaluate().unwrap();
1812        assert_eq!(
1813            max_res,
1814            ScalarValue::IntervalDayTime(Some(IntervalDayTimeType::make_value(7, -4000)))
1815        );
1816
1817        // IntervalMonthDayNano
1818        let b = IntervalMonthDayNanoArray::from(vec![
1819            IntervalMonthDayNanoType::make_value(1, 0, 0),
1820            IntervalMonthDayNanoType::make_value(344, 34, -43_000_000_000),
1821            IntervalMonthDayNanoType::make_value(-593, -33, 13_000_000_000),
1822            IntervalMonthDayNanoType::make_value(5, 2, 493_000_000_000),
1823            IntervalMonthDayNanoType::make_value(1, 0, 0),
1824        ]);
1825        let b: ArrayRef = Arc::new(b);
1826
1827        let mut min =
1828            MinAccumulator::try_new(&DataType::Interval(IntervalUnit::MonthDayNano))
1829                .unwrap();
1830        min.update_batch(&[Arc::clone(&b)]).unwrap();
1831        let min_res = min.evaluate().unwrap();
1832        assert_eq!(
1833            min_res,
1834            ScalarValue::IntervalMonthDayNano(Some(
1835                IntervalMonthDayNanoType::make_value(-593, -33, 13_000_000_000)
1836            ))
1837        );
1838
1839        let mut max =
1840            MaxAccumulator::try_new(&DataType::Interval(IntervalUnit::MonthDayNano))
1841                .unwrap();
1842        max.update_batch(&[Arc::clone(&b)]).unwrap();
1843        let max_res = max.evaluate().unwrap();
1844        assert_eq!(
1845            max_res,
1846            ScalarValue::IntervalMonthDayNano(Some(
1847                IntervalMonthDayNanoType::make_value(344, 34, -43_000_000_000)
1848            ))
1849        );
1850    }
1851
1852    #[test]
1853    fn float_min_max_with_nans() {
1854        let pos_nan = f32::NAN;
1855        let zero = 0_f32;
1856        let neg_inf = f32::NEG_INFINITY;
1857
1858        let check = |acc: &mut dyn Accumulator, values: &[&[f32]], expected: f32| {
1859            for batch in values.iter() {
1860                let batch =
1861                    Arc::new(Float32Array::from_iter_values(batch.iter().copied()));
1862                acc.update_batch(&[batch]).unwrap();
1863            }
1864            let result = acc.evaluate().unwrap();
1865            assert_eq!(result, ScalarValue::Float32(Some(expected)));
1866        };
1867
1868        // This test checks both comparison between batches (which uses the min_max macro
1869        // defined above) and within a batch (which uses the arrow min/max compute function
1870        // and verifies both respect the total order comparison for floats)
1871
1872        let min = || MinAccumulator::try_new(&DataType::Float32).unwrap();
1873        let max = || MaxAccumulator::try_new(&DataType::Float32).unwrap();
1874
1875        check(&mut min(), &[&[zero], &[pos_nan]], zero);
1876        check(&mut min(), &[&[zero, pos_nan]], zero);
1877        check(&mut min(), &[&[zero], &[neg_inf]], neg_inf);
1878        check(&mut min(), &[&[zero, neg_inf]], neg_inf);
1879        check(&mut max(), &[&[zero], &[pos_nan]], pos_nan);
1880        check(&mut max(), &[&[zero, pos_nan]], pos_nan);
1881        check(&mut max(), &[&[zero], &[neg_inf]], zero);
1882        check(&mut max(), &[&[zero, neg_inf]], zero);
1883    }
1884
1885    use datafusion_common::Result;
1886    use rand::Rng;
1887
1888    fn get_random_vec_i32(len: usize) -> Vec<i32> {
1889        let mut rng = rand::rng();
1890        let mut input = Vec::with_capacity(len);
1891        for _i in 0..len {
1892            input.push(rng.random_range(0..100));
1893        }
1894        input
1895    }
1896
1897    fn moving_min_i32(len: usize, n_sliding_window: usize) -> Result<()> {
1898        let data = get_random_vec_i32(len);
1899        let mut expected = Vec::with_capacity(len);
1900        let mut moving_min = MovingMin::<i32>::new();
1901        let mut res = Vec::with_capacity(len);
1902        for i in 0..len {
1903            let start = i.saturating_sub(n_sliding_window);
1904            expected.push(*data[start..i + 1].iter().min().unwrap());
1905
1906            moving_min.push(data[i]);
1907            if i > n_sliding_window {
1908                moving_min.pop();
1909            }
1910            res.push(*moving_min.min().unwrap());
1911        }
1912        assert_eq!(res, expected);
1913        Ok(())
1914    }
1915
1916    fn moving_max_i32(len: usize, n_sliding_window: usize) -> Result<()> {
1917        let data = get_random_vec_i32(len);
1918        let mut expected = Vec::with_capacity(len);
1919        let mut moving_max = MovingMax::<i32>::new();
1920        let mut res = Vec::with_capacity(len);
1921        for i in 0..len {
1922            let start = i.saturating_sub(n_sliding_window);
1923            expected.push(*data[start..i + 1].iter().max().unwrap());
1924
1925            moving_max.push(data[i]);
1926            if i > n_sliding_window {
1927                moving_max.pop();
1928            }
1929            res.push(*moving_max.max().unwrap());
1930        }
1931        assert_eq!(res, expected);
1932        Ok(())
1933    }
1934
1935    #[test]
1936    fn moving_min_tests() -> Result<()> {
1937        moving_min_i32(100, 10)?;
1938        moving_min_i32(100, 20)?;
1939        moving_min_i32(100, 50)?;
1940        moving_min_i32(100, 100)?;
1941        Ok(())
1942    }
1943
1944    #[test]
1945    fn moving_max_tests() -> Result<()> {
1946        moving_max_i32(100, 10)?;
1947        moving_max_i32(100, 20)?;
1948        moving_max_i32(100, 50)?;
1949        moving_max_i32(100, 100)?;
1950        Ok(())
1951    }
1952
1953    #[test]
1954    fn test_min_max_coerce_types() {
1955        // the coerced types is same with input types
1956        let funs: Vec<Box<dyn AggregateUDFImpl>> =
1957            vec![Box::new(Min::new()), Box::new(Max::new())];
1958        let input_types = vec![
1959            vec![DataType::Int32],
1960            vec![DataType::Decimal128(10, 2)],
1961            vec![DataType::Decimal256(1, 1)],
1962            vec![DataType::Utf8],
1963        ];
1964        for fun in funs {
1965            for input_type in &input_types {
1966                let result = fun.coerce_types(input_type);
1967                assert_eq!(*input_type, result.unwrap());
1968            }
1969        }
1970    }
1971
1972    #[test]
1973    fn test_get_min_max_return_type_coerce_dictionary() -> Result<()> {
1974        let data_type =
1975            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
1976        let result = get_min_max_result_type(&[data_type])?;
1977        assert_eq!(result, vec![DataType::Utf8]);
1978        Ok(())
1979    }
1980
1981    #[test]
1982    fn test_min_max_dictionary() -> Result<()> {
1983        let values = StringArray::from(vec!["b", "c", "a", "🦀", "d"]);
1984        let keys = Int32Array::from(vec![Some(0), Some(1), Some(2), None, Some(4)]);
1985        let dict_array =
1986            DictionaryArray::try_new(keys, Arc::new(values) as ArrayRef).unwrap();
1987        let dict_array_ref = Arc::new(dict_array) as ArrayRef;
1988        let rt_type =
1989            get_min_max_result_type(&[dict_array_ref.data_type().clone()])?[0].clone();
1990
1991        let mut min_acc = MinAccumulator::try_new(&rt_type)?;
1992        min_acc.update_batch(&[Arc::clone(&dict_array_ref)])?;
1993        let min_result = min_acc.evaluate()?;
1994        assert_eq!(min_result, ScalarValue::Utf8(Some("a".to_string())));
1995
1996        let mut max_acc = MaxAccumulator::try_new(&rt_type)?;
1997        max_acc.update_batch(&[Arc::clone(&dict_array_ref)])?;
1998        let max_result = max_acc.evaluate()?;
1999        assert_eq!(max_result, ScalarValue::Utf8(Some("🦀".to_string())));
2000        Ok(())
2001    }
2002}