Skip to main content

datafusion_functions_aggregate/
min_max.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`Max`] and [`MaxAccumulator`] accumulator for the `max` function
19//! [`Min`] and [`MinAccumulator`] accumulator for the `min` function
20
21mod min_max_bytes;
22mod min_max_struct;
23
24use arrow::array::ArrayRef;
25use arrow::datatypes::{
26    DataType, Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type,
27    DurationMicrosecondType, DurationMillisecondType, DurationNanosecondType,
28    DurationSecondType, Float16Type, Float32Type, Float64Type, Int8Type, Int16Type,
29    Int32Type, Int64Type, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
30};
31use datafusion_common::stats::Precision;
32use datafusion_common::{ColumnStatistics, Result, exec_err, internal_err};
33use datafusion_functions_aggregate_common::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator;
34use datafusion_physical_expr::expressions;
35use std::cmp::Ordering;
36use std::fmt::Debug;
37
38use arrow::datatypes::i256;
39use arrow::datatypes::{
40    Date32Type, Date64Type, Time32MillisecondType, Time32SecondType,
41    Time64MicrosecondType, Time64NanosecondType, TimeUnit, TimestampMicrosecondType,
42    TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
43};
44
45use crate::min_max::min_max_bytes::MinMaxBytesAccumulator;
46use crate::min_max::min_max_struct::MinMaxStructAccumulator;
47use datafusion_common::ScalarValue;
48use datafusion_expr::{
49    Accumulator, AggregateUDFImpl, Documentation, SetMonotonicity, Signature, Volatility,
50    function::AccumulatorArgs,
51};
52use datafusion_expr::{GroupsAccumulator, StatisticsArgs};
53use datafusion_macros::user_doc;
54use half::f16;
55use std::mem::size_of_val;
56use std::ops::Deref;
57
58fn get_min_max_result_type(input_types: &[DataType]) -> Result<Vec<DataType>> {
59    // make sure that the input types only has one element.
60    if input_types.len() != 1 {
61        return exec_err!(
62            "min/max was called with {} arguments. It requires only 1.",
63            input_types.len()
64        );
65    }
66    // min and max support the dictionary data type
67    // unpack the dictionary to get the value
68    match &input_types[0] {
69        DataType::Dictionary(_, dict_value_type) => {
70            // TODO add checker, if the value type is complex data type
71            Ok(vec![dict_value_type.deref().clone()])
72        }
73        // TODO add checker for datatype which min and max supported
74        // For example, the `Struct` and `Map` type are not supported in the MIN and MAX function
75        _ => Ok(input_types.to_vec()),
76    }
77}
78
79#[user_doc(
80    doc_section(label = "General Functions"),
81    description = "Returns the maximum value in the specified column.",
82    syntax_example = "max(expression)",
83    sql_example = r#"```sql
84> SELECT max(column_name) FROM table_name;
85+----------------------+
86| max(column_name)      |
87+----------------------+
88| 150                  |
89+----------------------+
90```"#,
91    standard_argument(name = "expression",)
92)]
93// MAX aggregate UDF
94#[derive(Debug, PartialEq, Eq, Hash)]
95pub struct Max {
96    signature: Signature,
97}
98
99impl Max {
100    pub fn new() -> Self {
101        Self {
102            signature: Signature::user_defined(Volatility::Immutable),
103        }
104    }
105}
106
107impl Default for Max {
108    fn default() -> Self {
109        Self::new()
110    }
111}
112/// Creates a [`PrimitiveGroupsAccumulator`] for computing `MAX`
113/// the specified [`ArrowPrimitiveType`].
114///
115/// [`ArrowPrimitiveType`]: arrow::datatypes::ArrowPrimitiveType
116macro_rules! primitive_max_accumulator {
117    ($DATA_TYPE:ident, $NATIVE:ident, $PRIMTYPE:ident) => {{
118        Ok(Box::new(
119            PrimitiveGroupsAccumulator::<$PRIMTYPE, _>::new($DATA_TYPE, |cur, new| {
120                match (new).partial_cmp(cur) {
121                    Some(Ordering::Greater) | None => {
122                        // new is Greater or None
123                        *cur = new
124                    }
125                    _ => {}
126                }
127            })
128            // Initialize each accumulator to $NATIVE::MIN
129            .with_starting_value($NATIVE::MIN),
130        ))
131    }};
132}
133
134/// Creates a [`PrimitiveGroupsAccumulator`] for computing `MIN`
135/// the specified [`ArrowPrimitiveType`].
136///
137///
138/// [`ArrowPrimitiveType`]: arrow::datatypes::ArrowPrimitiveType
139macro_rules! primitive_min_accumulator {
140    ($DATA_TYPE:ident, $NATIVE:ident, $PRIMTYPE:ident) => {{
141        Ok(Box::new(
142            PrimitiveGroupsAccumulator::<$PRIMTYPE, _>::new(&$DATA_TYPE, |cur, new| {
143                match (new).partial_cmp(cur) {
144                    Some(Ordering::Less) | None => {
145                        // new is Less or NaN
146                        *cur = new
147                    }
148                    _ => {}
149                }
150            })
151            // Initialize each accumulator to $NATIVE::MAX
152            .with_starting_value($NATIVE::MAX),
153        ))
154    }};
155}
156
157trait FromColumnStatistics {
158    fn value_from_column_statistics(
159        &self,
160        stats: &ColumnStatistics,
161    ) -> Option<ScalarValue>;
162
163    fn value_from_statistics(
164        &self,
165        statistics_args: &StatisticsArgs,
166    ) -> Option<ScalarValue> {
167        if let Precision::Exact(num_rows) = &statistics_args.statistics.num_rows {
168            match *num_rows {
169                0 => return ScalarValue::try_from(statistics_args.return_type).ok(),
170                value if value > 0 => {
171                    let col_stats = &statistics_args.statistics.column_statistics;
172                    if statistics_args.exprs.len() == 1 {
173                        // TODO optimize with exprs other than Column
174                        if let Some(col_expr) =
175                            statistics_args.exprs[0].downcast_ref::<expressions::Column>()
176                        {
177                            return self.value_from_column_statistics(
178                                &col_stats[col_expr.index()],
179                            );
180                        }
181                    }
182                }
183                _ => {}
184            }
185        }
186        None
187    }
188}
189
190impl FromColumnStatistics for Max {
191    fn value_from_column_statistics(
192        &self,
193        col_stats: &ColumnStatistics,
194    ) -> Option<ScalarValue> {
195        if let Precision::Exact(ref val) = col_stats.max_value
196            && !val.is_null()
197        {
198            return Some(val.clone());
199        }
200        None
201    }
202}
203
204impl AggregateUDFImpl for Max {
205    fn name(&self) -> &str {
206        "max"
207    }
208
209    fn signature(&self) -> &Signature {
210        &self.signature
211    }
212
213    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
214        Ok(arg_types[0].to_owned())
215    }
216
217    fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
218        Ok(Box::new(MaxAccumulator::try_new(
219            acc_args.return_field.data_type(),
220        )?))
221    }
222
223    fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool {
224        use DataType::*;
225        matches!(
226            args.return_field.data_type(),
227            Int8 | Int16
228                | Int32
229                | Int64
230                | UInt8
231                | UInt16
232                | UInt32
233                | UInt64
234                | Float16
235                | Float32
236                | Float64
237                | Decimal32(_, _)
238                | Decimal64(_, _)
239                | Decimal128(_, _)
240                | Decimal256(_, _)
241                | Date32
242                | Date64
243                | Time32(_)
244                | Time64(_)
245                | Timestamp(_, _)
246                | Utf8
247                | LargeUtf8
248                | Utf8View
249                | Binary
250                | LargeBinary
251                | BinaryView
252                | Duration(_)
253                | Struct(_)
254        )
255    }
256
257    fn create_groups_accumulator(
258        &self,
259        args: AccumulatorArgs,
260    ) -> Result<Box<dyn GroupsAccumulator>> {
261        use DataType::*;
262        use TimeUnit::*;
263        let data_type = args.return_field.data_type();
264        match data_type {
265            Int8 => primitive_max_accumulator!(data_type, i8, Int8Type),
266            Int16 => primitive_max_accumulator!(data_type, i16, Int16Type),
267            Int32 => primitive_max_accumulator!(data_type, i32, Int32Type),
268            Int64 => primitive_max_accumulator!(data_type, i64, Int64Type),
269            UInt8 => primitive_max_accumulator!(data_type, u8, UInt8Type),
270            UInt16 => primitive_max_accumulator!(data_type, u16, UInt16Type),
271            UInt32 => primitive_max_accumulator!(data_type, u32, UInt32Type),
272            UInt64 => primitive_max_accumulator!(data_type, u64, UInt64Type),
273            Float16 => {
274                primitive_max_accumulator!(data_type, f16, Float16Type)
275            }
276            Float32 => {
277                primitive_max_accumulator!(data_type, f32, Float32Type)
278            }
279            Float64 => {
280                primitive_max_accumulator!(data_type, f64, Float64Type)
281            }
282            Date32 => primitive_max_accumulator!(data_type, i32, Date32Type),
283            Date64 => primitive_max_accumulator!(data_type, i64, Date64Type),
284            Time32(Second) => {
285                primitive_max_accumulator!(data_type, i32, Time32SecondType)
286            }
287            Time32(Millisecond) => {
288                primitive_max_accumulator!(data_type, i32, Time32MillisecondType)
289            }
290            Time64(Microsecond) => {
291                primitive_max_accumulator!(data_type, i64, Time64MicrosecondType)
292            }
293            Time64(Nanosecond) => {
294                primitive_max_accumulator!(data_type, i64, Time64NanosecondType)
295            }
296            Timestamp(Second, _) => {
297                primitive_max_accumulator!(data_type, i64, TimestampSecondType)
298            }
299            Timestamp(Millisecond, _) => {
300                primitive_max_accumulator!(data_type, i64, TimestampMillisecondType)
301            }
302            Timestamp(Microsecond, _) => {
303                primitive_max_accumulator!(data_type, i64, TimestampMicrosecondType)
304            }
305            Timestamp(Nanosecond, _) => {
306                primitive_max_accumulator!(data_type, i64, TimestampNanosecondType)
307            }
308            Duration(Second) => {
309                primitive_max_accumulator!(data_type, i64, DurationSecondType)
310            }
311            Duration(Millisecond) => {
312                primitive_max_accumulator!(data_type, i64, DurationMillisecondType)
313            }
314            Duration(Microsecond) => {
315                primitive_max_accumulator!(data_type, i64, DurationMicrosecondType)
316            }
317            Duration(Nanosecond) => {
318                primitive_max_accumulator!(data_type, i64, DurationNanosecondType)
319            }
320            Decimal32(_, _) => {
321                primitive_max_accumulator!(data_type, i32, Decimal32Type)
322            }
323            Decimal64(_, _) => {
324                primitive_max_accumulator!(data_type, i64, Decimal64Type)
325            }
326            Decimal128(_, _) => {
327                primitive_max_accumulator!(data_type, i128, Decimal128Type)
328            }
329            Decimal256(_, _) => {
330                primitive_max_accumulator!(data_type, i256, Decimal256Type)
331            }
332            Utf8 | LargeUtf8 | Utf8View | Binary | LargeBinary | BinaryView => {
333                Ok(Box::new(MinMaxBytesAccumulator::new_max(data_type.clone())))
334            }
335            Struct(_) => Ok(Box::new(MinMaxStructAccumulator::new_max(
336                data_type.clone(),
337            ))),
338            // This is only reached if groups_accumulator_supported is out of sync
339            _ => internal_err!("GroupsAccumulator not supported for max({})", data_type),
340        }
341    }
342
343    fn create_sliding_accumulator(
344        &self,
345        args: AccumulatorArgs,
346    ) -> Result<Box<dyn Accumulator>> {
347        Ok(Box::new(SlidingMaxAccumulator::try_new(
348            args.return_field.data_type(),
349        )?))
350    }
351
352    fn is_descending(&self) -> Option<bool> {
353        Some(true)
354    }
355
356    fn order_sensitivity(&self) -> datafusion_expr::utils::AggregateOrderSensitivity {
357        datafusion_expr::utils::AggregateOrderSensitivity::Insensitive
358    }
359
360    fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
361        get_min_max_result_type(arg_types)
362    }
363    fn reverse_expr(&self) -> datafusion_expr::ReversedUDAF {
364        datafusion_expr::ReversedUDAF::Identical
365    }
366    fn value_from_stats(&self, statistics_args: &StatisticsArgs) -> Option<ScalarValue> {
367        self.value_from_statistics(statistics_args)
368    }
369
370    fn documentation(&self) -> Option<&Documentation> {
371        self.doc()
372    }
373
374    fn set_monotonicity(&self, _data_type: &DataType) -> SetMonotonicity {
375        // `MAX` is monotonically increasing as it always increases or stays
376        // the same as new values are seen.
377        SetMonotonicity::Increasing
378    }
379}
380
381#[derive(Debug)]
382pub struct SlidingMaxAccumulator {
383    max: ScalarValue,
384    moving_max: MovingMax<ScalarValue>,
385}
386
387impl SlidingMaxAccumulator {
388    /// new max accumulator
389    pub fn try_new(datatype: &DataType) -> Result<Self> {
390        Ok(Self {
391            max: ScalarValue::try_from(datatype)?,
392            moving_max: MovingMax::<ScalarValue>::new(),
393        })
394    }
395}
396
397impl Accumulator for SlidingMaxAccumulator {
398    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
399        for idx in 0..values[0].len() {
400            let val = ScalarValue::try_from_array(&values[0], idx)?;
401            self.moving_max.push(val);
402        }
403        if let Some(res) = self.moving_max.max() {
404            self.max = res.clone();
405        }
406        Ok(())
407    }
408
409    fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
410        for _idx in 0..values[0].len() {
411            (self.moving_max).pop();
412        }
413        if let Some(res) = self.moving_max.max() {
414            self.max = res.clone();
415        }
416        Ok(())
417    }
418
419    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
420        self.update_batch(states)
421    }
422
423    fn state(&mut self) -> Result<Vec<ScalarValue>> {
424        Ok(vec![self.max.clone()])
425    }
426
427    fn evaluate(&mut self) -> Result<ScalarValue> {
428        Ok(self.max.clone())
429    }
430
431    fn supports_retract_batch(&self) -> bool {
432        true
433    }
434
435    fn size(&self) -> usize {
436        size_of_val(self) - size_of_val(&self.max) + self.max.size()
437    }
438}
439
440#[user_doc(
441    doc_section(label = "General Functions"),
442    description = "Returns the minimum value in the specified column.",
443    syntax_example = "min(expression)",
444    sql_example = r#"```sql
445> SELECT min(column_name) FROM table_name;
446+----------------------+
447| min(column_name)      |
448+----------------------+
449| 12                   |
450+----------------------+
451```"#,
452    standard_argument(name = "expression",)
453)]
454#[derive(Debug, PartialEq, Eq, Hash)]
455pub struct Min {
456    signature: Signature,
457}
458
459impl Min {
460    pub fn new() -> Self {
461        Self {
462            signature: Signature::user_defined(Volatility::Immutable),
463        }
464    }
465}
466
467impl Default for Min {
468    fn default() -> Self {
469        Self::new()
470    }
471}
472
473impl FromColumnStatistics for Min {
474    fn value_from_column_statistics(
475        &self,
476        col_stats: &ColumnStatistics,
477    ) -> Option<ScalarValue> {
478        if let Precision::Exact(ref val) = col_stats.min_value
479            && !val.is_null()
480        {
481            return Some(val.clone());
482        }
483        None
484    }
485}
486
487impl AggregateUDFImpl for Min {
488    fn name(&self) -> &str {
489        "min"
490    }
491
492    fn signature(&self) -> &Signature {
493        &self.signature
494    }
495
496    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
497        Ok(arg_types[0].to_owned())
498    }
499
500    fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
501        Ok(Box::new(MinAccumulator::try_new(
502            acc_args.return_field.data_type(),
503        )?))
504    }
505
506    fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool {
507        use DataType::*;
508        matches!(
509            args.return_field.data_type(),
510            Int8 | Int16
511                | Int32
512                | Int64
513                | UInt8
514                | UInt16
515                | UInt32
516                | UInt64
517                | Float16
518                | Float32
519                | Float64
520                | Decimal32(_, _)
521                | Decimal64(_, _)
522                | Decimal128(_, _)
523                | Decimal256(_, _)
524                | Date32
525                | Date64
526                | Time32(_)
527                | Time64(_)
528                | Timestamp(_, _)
529                | Utf8
530                | LargeUtf8
531                | Utf8View
532                | Binary
533                | LargeBinary
534                | BinaryView
535                | Duration(_)
536                | Struct(_)
537        )
538    }
539
540    fn create_groups_accumulator(
541        &self,
542        args: AccumulatorArgs,
543    ) -> Result<Box<dyn GroupsAccumulator>> {
544        use DataType::*;
545        use TimeUnit::*;
546        let data_type = args.return_field.data_type();
547        match data_type {
548            Int8 => primitive_min_accumulator!(data_type, i8, Int8Type),
549            Int16 => primitive_min_accumulator!(data_type, i16, Int16Type),
550            Int32 => primitive_min_accumulator!(data_type, i32, Int32Type),
551            Int64 => primitive_min_accumulator!(data_type, i64, Int64Type),
552            UInt8 => primitive_min_accumulator!(data_type, u8, UInt8Type),
553            UInt16 => primitive_min_accumulator!(data_type, u16, UInt16Type),
554            UInt32 => primitive_min_accumulator!(data_type, u32, UInt32Type),
555            UInt64 => primitive_min_accumulator!(data_type, u64, UInt64Type),
556            Float16 => {
557                primitive_min_accumulator!(data_type, f16, Float16Type)
558            }
559            Float32 => {
560                primitive_min_accumulator!(data_type, f32, Float32Type)
561            }
562            Float64 => {
563                primitive_min_accumulator!(data_type, f64, Float64Type)
564            }
565            Date32 => primitive_min_accumulator!(data_type, i32, Date32Type),
566            Date64 => primitive_min_accumulator!(data_type, i64, Date64Type),
567            Time32(Second) => {
568                primitive_min_accumulator!(data_type, i32, Time32SecondType)
569            }
570            Time32(Millisecond) => {
571                primitive_min_accumulator!(data_type, i32, Time32MillisecondType)
572            }
573            Time64(Microsecond) => {
574                primitive_min_accumulator!(data_type, i64, Time64MicrosecondType)
575            }
576            Time64(Nanosecond) => {
577                primitive_min_accumulator!(data_type, i64, Time64NanosecondType)
578            }
579            Timestamp(Second, _) => {
580                primitive_min_accumulator!(data_type, i64, TimestampSecondType)
581            }
582            Timestamp(Millisecond, _) => {
583                primitive_min_accumulator!(data_type, i64, TimestampMillisecondType)
584            }
585            Timestamp(Microsecond, _) => {
586                primitive_min_accumulator!(data_type, i64, TimestampMicrosecondType)
587            }
588            Timestamp(Nanosecond, _) => {
589                primitive_min_accumulator!(data_type, i64, TimestampNanosecondType)
590            }
591            Duration(Second) => {
592                primitive_min_accumulator!(data_type, i64, DurationSecondType)
593            }
594            Duration(Millisecond) => {
595                primitive_min_accumulator!(data_type, i64, DurationMillisecondType)
596            }
597            Duration(Microsecond) => {
598                primitive_min_accumulator!(data_type, i64, DurationMicrosecondType)
599            }
600            Duration(Nanosecond) => {
601                primitive_min_accumulator!(data_type, i64, DurationNanosecondType)
602            }
603            Decimal32(_, _) => {
604                primitive_min_accumulator!(data_type, i32, Decimal32Type)
605            }
606            Decimal64(_, _) => {
607                primitive_min_accumulator!(data_type, i64, Decimal64Type)
608            }
609            Decimal128(_, _) => {
610                primitive_min_accumulator!(data_type, i128, Decimal128Type)
611            }
612            Decimal256(_, _) => {
613                primitive_min_accumulator!(data_type, i256, Decimal256Type)
614            }
615            Utf8 | LargeUtf8 | Utf8View | Binary | LargeBinary | BinaryView => {
616                Ok(Box::new(MinMaxBytesAccumulator::new_min(data_type.clone())))
617            }
618            Struct(_) => Ok(Box::new(MinMaxStructAccumulator::new_min(
619                data_type.clone(),
620            ))),
621            // This is only reached if groups_accumulator_supported is out of sync
622            _ => internal_err!("GroupsAccumulator not supported for min({})", data_type),
623        }
624    }
625
626    fn create_sliding_accumulator(
627        &self,
628        args: AccumulatorArgs,
629    ) -> Result<Box<dyn Accumulator>> {
630        Ok(Box::new(SlidingMinAccumulator::try_new(
631            args.return_field.data_type(),
632        )?))
633    }
634
635    fn is_descending(&self) -> Option<bool> {
636        Some(false)
637    }
638
639    fn value_from_stats(&self, statistics_args: &StatisticsArgs) -> Option<ScalarValue> {
640        self.value_from_statistics(statistics_args)
641    }
642    fn order_sensitivity(&self) -> datafusion_expr::utils::AggregateOrderSensitivity {
643        datafusion_expr::utils::AggregateOrderSensitivity::Insensitive
644    }
645
646    fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
647        get_min_max_result_type(arg_types)
648    }
649
650    fn reverse_expr(&self) -> datafusion_expr::ReversedUDAF {
651        datafusion_expr::ReversedUDAF::Identical
652    }
653
654    fn documentation(&self) -> Option<&Documentation> {
655        self.doc()
656    }
657
658    fn set_monotonicity(&self, _data_type: &DataType) -> SetMonotonicity {
659        // `MIN` is monotonically decreasing as it always decreases or stays
660        // the same as new values are seen.
661        SetMonotonicity::Decreasing
662    }
663}
664
665#[derive(Debug)]
666pub struct SlidingMinAccumulator {
667    min: ScalarValue,
668    moving_min: MovingMin<ScalarValue>,
669}
670
671impl SlidingMinAccumulator {
672    pub fn try_new(datatype: &DataType) -> Result<Self> {
673        Ok(Self {
674            min: ScalarValue::try_from(datatype)?,
675            moving_min: MovingMin::<ScalarValue>::new(),
676        })
677    }
678}
679
680impl Accumulator for SlidingMinAccumulator {
681    fn state(&mut self) -> Result<Vec<ScalarValue>> {
682        Ok(vec![self.min.clone()])
683    }
684
685    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
686        for idx in 0..values[0].len() {
687            let val = ScalarValue::try_from_array(&values[0], idx)?;
688            if !val.is_null() {
689                self.moving_min.push(val);
690            }
691        }
692        if let Some(res) = self.moving_min.min() {
693            self.min = res.clone();
694        }
695        Ok(())
696    }
697
698    fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
699        for idx in 0..values[0].len() {
700            let val = ScalarValue::try_from_array(&values[0], idx)?;
701            if !val.is_null() {
702                (self.moving_min).pop();
703            }
704        }
705        if let Some(res) = self.moving_min.min() {
706            self.min = res.clone();
707        }
708        Ok(())
709    }
710
711    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
712        self.update_batch(states)
713    }
714
715    fn evaluate(&mut self) -> Result<ScalarValue> {
716        Ok(self.min.clone())
717    }
718
719    fn supports_retract_batch(&self) -> bool {
720        true
721    }
722
723    fn size(&self) -> usize {
724        size_of_val(self) - size_of_val(&self.min) + self.min.size()
725    }
726}
727
728/// Keep track of the minimum value in a sliding window.
729///
730/// The implementation is taken from <https://github.com/spebern/moving_min_max/blob/master/src/lib.rs>
731///
732/// `moving min max` provides one data structure for keeping track of the
733/// minimum value and one for keeping track of the maximum value in a sliding
734/// window.
735///
736/// Each element is stored with the current min/max. One stack to push and another one for pop. If pop stack is empty,
737/// push to this stack all elements popped from first stack while updating their current min/max. Now pop from
738/// the second stack (MovingMin/Max struct works as a queue). To find the minimum element of the queue,
739/// look at the smallest/largest two elements of the individual stacks, then take the minimum of those two values.
740///
741/// The complexity of the operations are
742/// - O(1) for getting the minimum/maximum
743/// - O(1) for push
744/// - amortized O(1) for pop
745///
746/// ```
747/// # use datafusion_functions_aggregate::min_max::MovingMin;
748/// let mut moving_min = MovingMin::<i32>::new();
749/// moving_min.push(2);
750/// moving_min.push(1);
751/// moving_min.push(3);
752///
753/// assert_eq!(moving_min.min(), Some(&1));
754/// assert_eq!(moving_min.pop(), Some(2));
755///
756/// assert_eq!(moving_min.min(), Some(&1));
757/// assert_eq!(moving_min.pop(), Some(1));
758///
759/// assert_eq!(moving_min.min(), Some(&3));
760/// assert_eq!(moving_min.pop(), Some(3));
761///
762/// assert_eq!(moving_min.min(), None);
763/// assert_eq!(moving_min.pop(), None);
764/// ```
765#[derive(Debug)]
766pub struct MovingMin<T> {
767    push_stack: Vec<(T, T)>,
768    pop_stack: Vec<(T, T)>,
769}
770
771impl<T: Clone + PartialOrd> Default for MovingMin<T> {
772    fn default() -> Self {
773        Self {
774            push_stack: Vec::new(),
775            pop_stack: Vec::new(),
776        }
777    }
778}
779
780impl<T: Clone + PartialOrd> MovingMin<T> {
781    /// Creates a new `MovingMin` to keep track of the minimum in a sliding
782    /// window.
783    #[inline]
784    pub fn new() -> Self {
785        Self::default()
786    }
787
788    /// Creates a new `MovingMin` to keep track of the minimum in a sliding
789    /// window with `capacity` allocated slots.
790    #[inline]
791    pub fn with_capacity(capacity: usize) -> Self {
792        Self {
793            push_stack: Vec::with_capacity(capacity),
794            pop_stack: Vec::with_capacity(capacity),
795        }
796    }
797
798    /// Returns the minimum of the sliding window or `None` if the window is
799    /// empty.
800    #[inline]
801    pub fn min(&self) -> Option<&T> {
802        match (self.push_stack.last(), self.pop_stack.last()) {
803            (None, None) => None,
804            (Some((_, min)), None) => Some(min),
805            (None, Some((_, min))) => Some(min),
806            (Some((_, a)), Some((_, b))) => Some(if a < b { a } else { b }),
807        }
808    }
809
810    /// Pushes a new element into the sliding window.
811    #[inline]
812    pub fn push(&mut self, val: T) {
813        self.push_stack.push(match self.push_stack.last() {
814            Some((_, min)) => {
815                if val > *min {
816                    (val, min.clone())
817                } else {
818                    (val.clone(), val)
819                }
820            }
821            None => (val.clone(), val),
822        });
823    }
824
825    /// Removes and returns the last value of the sliding window.
826    #[inline]
827    pub fn pop(&mut self) -> Option<T> {
828        if self.pop_stack.is_empty() {
829            match self.push_stack.pop() {
830                Some((val, _)) => {
831                    let mut last = (val.clone(), val);
832                    self.pop_stack.push(last.clone());
833                    while let Some((val, _)) = self.push_stack.pop() {
834                        let min = if last.1 < val {
835                            last.1.clone()
836                        } else {
837                            val.clone()
838                        };
839                        last = (val.clone(), min);
840                        self.pop_stack.push(last.clone());
841                    }
842                }
843                None => return None,
844            }
845        }
846        self.pop_stack.pop().map(|(val, _)| val)
847    }
848
849    /// Returns the number of elements stored in the sliding window.
850    #[inline]
851    pub fn len(&self) -> usize {
852        self.push_stack.len() + self.pop_stack.len()
853    }
854
855    /// Returns `true` if the moving window contains no elements.
856    #[inline]
857    pub fn is_empty(&self) -> bool {
858        self.len() == 0
859    }
860}
861
862/// Keep track of the maximum value in a sliding window.
863///
864/// See [`MovingMin`] for more details.
865///
866/// ```
867/// # use datafusion_functions_aggregate::min_max::MovingMax;
868/// let mut moving_max = MovingMax::<i32>::new();
869/// moving_max.push(2);
870/// moving_max.push(3);
871/// moving_max.push(1);
872///
873/// assert_eq!(moving_max.max(), Some(&3));
874/// assert_eq!(moving_max.pop(), Some(2));
875///
876/// assert_eq!(moving_max.max(), Some(&3));
877/// assert_eq!(moving_max.pop(), Some(3));
878///
879/// assert_eq!(moving_max.max(), Some(&1));
880/// assert_eq!(moving_max.pop(), Some(1));
881///
882/// assert_eq!(moving_max.max(), None);
883/// assert_eq!(moving_max.pop(), None);
884/// ```
885#[derive(Debug)]
886pub struct MovingMax<T> {
887    push_stack: Vec<(T, T)>,
888    pop_stack: Vec<(T, T)>,
889}
890
891impl<T: Clone + PartialOrd> Default for MovingMax<T> {
892    fn default() -> Self {
893        Self {
894            push_stack: Vec::new(),
895            pop_stack: Vec::new(),
896        }
897    }
898}
899
900impl<T: Clone + PartialOrd> MovingMax<T> {
901    /// Creates a new `MovingMax` to keep track of the maximum in a sliding window.
902    #[inline]
903    pub fn new() -> Self {
904        Self::default()
905    }
906
907    /// Creates a new `MovingMax` to keep track of the maximum in a sliding window with
908    /// `capacity` allocated slots.
909    #[inline]
910    pub fn with_capacity(capacity: usize) -> Self {
911        Self {
912            push_stack: Vec::with_capacity(capacity),
913            pop_stack: Vec::with_capacity(capacity),
914        }
915    }
916
917    /// Returns the maximum of the sliding window or `None` if the window is empty.
918    #[inline]
919    pub fn max(&self) -> Option<&T> {
920        match (self.push_stack.last(), self.pop_stack.last()) {
921            (None, None) => None,
922            (Some((_, max)), None) => Some(max),
923            (None, Some((_, max))) => Some(max),
924            (Some((_, a)), Some((_, b))) => Some(if a > b { a } else { b }),
925        }
926    }
927
928    /// Pushes a new element into the sliding window.
929    #[inline]
930    pub fn push(&mut self, val: T) {
931        self.push_stack.push(match self.push_stack.last() {
932            Some((_, max)) => {
933                if val < *max {
934                    (val, max.clone())
935                } else {
936                    (val.clone(), val)
937                }
938            }
939            None => (val.clone(), val),
940        });
941    }
942
943    /// Removes and returns the last value of the sliding window.
944    #[inline]
945    pub fn pop(&mut self) -> Option<T> {
946        if self.pop_stack.is_empty() {
947            match self.push_stack.pop() {
948                Some((val, _)) => {
949                    let mut last = (val.clone(), val);
950                    self.pop_stack.push(last.clone());
951                    while let Some((val, _)) = self.push_stack.pop() {
952                        let max = if last.1 > val {
953                            last.1.clone()
954                        } else {
955                            val.clone()
956                        };
957                        last = (val.clone(), max);
958                        self.pop_stack.push(last.clone());
959                    }
960                }
961                None => return None,
962            }
963        }
964        self.pop_stack.pop().map(|(val, _)| val)
965    }
966
967    /// Returns the number of elements stored in the sliding window.
968    #[inline]
969    pub fn len(&self) -> usize {
970        self.push_stack.len() + self.pop_stack.len()
971    }
972
973    /// Returns `true` if the moving window contains no elements.
974    #[inline]
975    pub fn is_empty(&self) -> bool {
976        self.len() == 0
977    }
978}
979
980make_udaf_expr_and_func!(
981    Max,
982    max,
983    expression,
984    "Returns the maximum of a group of values.",
985    max_udaf
986);
987
988make_udaf_expr_and_func!(
989    Min,
990    min,
991    expression,
992    "Returns the minimum of a group of values.",
993    min_udaf
994);
995
996// Re-export accumulators from the common module for backwards compatibility
997pub use datafusion_functions_aggregate_common::min_max::{
998    MaxAccumulator, MinAccumulator,
999};
1000
1001#[cfg(test)]
1002mod tests {
1003    use super::*;
1004    use arrow::{
1005        array::{
1006            Array, DictionaryArray, Float32Array, Int8Array, Int32Array,
1007            IntervalDayTimeArray, IntervalMonthDayNanoArray, IntervalYearMonthArray,
1008            PrimitiveArray, StringArray,
1009        },
1010        datatypes::{
1011            ArrowDictionaryKeyType, IntervalDayTimeType, IntervalMonthDayNanoType,
1012            IntervalUnit, IntervalYearMonthType,
1013        },
1014    };
1015    use std::sync::Arc;
1016
1017    #[test]
1018    fn interval_min_max() {
1019        // IntervalYearMonth
1020        let b = IntervalYearMonthArray::from(vec![
1021            IntervalYearMonthType::make_value(0, 1),
1022            IntervalYearMonthType::make_value(5, 34),
1023            IntervalYearMonthType::make_value(-2, 4),
1024            IntervalYearMonthType::make_value(7, -4),
1025            IntervalYearMonthType::make_value(0, 1),
1026        ]);
1027        let b: ArrayRef = Arc::new(b);
1028
1029        let mut min =
1030            MinAccumulator::try_new(&DataType::Interval(IntervalUnit::YearMonth))
1031                .unwrap();
1032        min.update_batch(&[Arc::clone(&b)]).unwrap();
1033        let min_res = min.evaluate().unwrap();
1034        assert_eq!(
1035            min_res,
1036            ScalarValue::IntervalYearMonth(Some(IntervalYearMonthType::make_value(
1037                -2, 4,
1038            )))
1039        );
1040
1041        let mut max =
1042            MaxAccumulator::try_new(&DataType::Interval(IntervalUnit::YearMonth))
1043                .unwrap();
1044        max.update_batch(&[Arc::clone(&b)]).unwrap();
1045        let max_res = max.evaluate().unwrap();
1046        assert_eq!(
1047            max_res,
1048            ScalarValue::IntervalYearMonth(Some(IntervalYearMonthType::make_value(
1049                5, 34,
1050            )))
1051        );
1052
1053        // IntervalDayTime
1054        let b = IntervalDayTimeArray::from(vec![
1055            IntervalDayTimeType::make_value(0, 0),
1056            IntervalDayTimeType::make_value(5, 454000),
1057            IntervalDayTimeType::make_value(-34, 0),
1058            IntervalDayTimeType::make_value(7, -4000),
1059            IntervalDayTimeType::make_value(1, 0),
1060        ]);
1061        let b: ArrayRef = Arc::new(b);
1062
1063        let mut min =
1064            MinAccumulator::try_new(&DataType::Interval(IntervalUnit::DayTime)).unwrap();
1065        min.update_batch(&[Arc::clone(&b)]).unwrap();
1066        let min_res = min.evaluate().unwrap();
1067        assert_eq!(
1068            min_res,
1069            ScalarValue::IntervalDayTime(Some(IntervalDayTimeType::make_value(-34, 0)))
1070        );
1071
1072        let mut max =
1073            MaxAccumulator::try_new(&DataType::Interval(IntervalUnit::DayTime)).unwrap();
1074        max.update_batch(&[Arc::clone(&b)]).unwrap();
1075        let max_res = max.evaluate().unwrap();
1076        assert_eq!(
1077            max_res,
1078            ScalarValue::IntervalDayTime(Some(IntervalDayTimeType::make_value(7, -4000)))
1079        );
1080
1081        // IntervalMonthDayNano
1082        let b = IntervalMonthDayNanoArray::from(vec![
1083            IntervalMonthDayNanoType::make_value(1, 0, 0),
1084            IntervalMonthDayNanoType::make_value(344, 34, -43_000_000_000),
1085            IntervalMonthDayNanoType::make_value(-593, -33, 13_000_000_000),
1086            IntervalMonthDayNanoType::make_value(5, 2, 493_000_000_000),
1087            IntervalMonthDayNanoType::make_value(1, 0, 0),
1088        ]);
1089        let b: ArrayRef = Arc::new(b);
1090
1091        let mut min =
1092            MinAccumulator::try_new(&DataType::Interval(IntervalUnit::MonthDayNano))
1093                .unwrap();
1094        min.update_batch(&[Arc::clone(&b)]).unwrap();
1095        let min_res = min.evaluate().unwrap();
1096        assert_eq!(
1097            min_res,
1098            ScalarValue::IntervalMonthDayNano(Some(
1099                IntervalMonthDayNanoType::make_value(-593, -33, 13_000_000_000)
1100            ))
1101        );
1102
1103        let mut max =
1104            MaxAccumulator::try_new(&DataType::Interval(IntervalUnit::MonthDayNano))
1105                .unwrap();
1106        max.update_batch(&[Arc::clone(&b)]).unwrap();
1107        let max_res = max.evaluate().unwrap();
1108        assert_eq!(
1109            max_res,
1110            ScalarValue::IntervalMonthDayNano(Some(
1111                IntervalMonthDayNanoType::make_value(344, 34, -43_000_000_000)
1112            ))
1113        );
1114    }
1115
1116    #[test]
1117    fn float_min_max_with_nans() {
1118        let pos_nan = f32::NAN;
1119        let zero = 0_f32;
1120        let neg_inf = f32::NEG_INFINITY;
1121
1122        let check = |acc: &mut dyn Accumulator, values: &[&[f32]], expected: f32| {
1123            for batch in values.iter() {
1124                let batch =
1125                    Arc::new(Float32Array::from_iter_values(batch.iter().copied()));
1126                acc.update_batch(&[batch]).unwrap();
1127            }
1128            let result = acc.evaluate().unwrap();
1129            assert_eq!(result, ScalarValue::Float32(Some(expected)));
1130        };
1131
1132        // This test checks both comparison between batches (which uses the min_max macro
1133        // defined above) and within a batch (which uses the arrow min/max compute function
1134        // and verifies both respect the total order comparison for floats)
1135
1136        let min = || MinAccumulator::try_new(&DataType::Float32).unwrap();
1137        let max = || MaxAccumulator::try_new(&DataType::Float32).unwrap();
1138
1139        check(&mut min(), &[&[zero], &[pos_nan]], zero);
1140        check(&mut min(), &[&[zero, pos_nan]], zero);
1141        check(&mut min(), &[&[zero], &[neg_inf]], neg_inf);
1142        check(&mut min(), &[&[zero, neg_inf]], neg_inf);
1143        check(&mut max(), &[&[zero], &[pos_nan]], pos_nan);
1144        check(&mut max(), &[&[zero, pos_nan]], pos_nan);
1145        check(&mut max(), &[&[zero], &[neg_inf]], zero);
1146        check(&mut max(), &[&[zero, neg_inf]], zero);
1147    }
1148
1149    use rand::Rng;
1150
1151    fn get_random_vec_i32(len: usize) -> Vec<i32> {
1152        let mut rng = rand::rng();
1153        let mut input = Vec::with_capacity(len);
1154        for _i in 0..len {
1155            input.push(rng.random_range(0..100));
1156        }
1157        input
1158    }
1159
1160    fn moving_min_i32(len: usize, n_sliding_window: usize) -> Result<()> {
1161        let data = get_random_vec_i32(len);
1162        let mut expected = Vec::with_capacity(len);
1163        let mut moving_min = MovingMin::<i32>::new();
1164        let mut res = Vec::with_capacity(len);
1165        for i in 0..len {
1166            let start = i.saturating_sub(n_sliding_window);
1167            expected.push(*data[start..i + 1].iter().min().unwrap());
1168
1169            moving_min.push(data[i]);
1170            if i > n_sliding_window {
1171                moving_min.pop();
1172            }
1173            res.push(*moving_min.min().unwrap());
1174        }
1175        assert_eq!(res, expected);
1176        Ok(())
1177    }
1178
1179    fn moving_max_i32(len: usize, n_sliding_window: usize) -> Result<()> {
1180        let data = get_random_vec_i32(len);
1181        let mut expected = Vec::with_capacity(len);
1182        let mut moving_max = MovingMax::<i32>::new();
1183        let mut res = Vec::with_capacity(len);
1184        for i in 0..len {
1185            let start = i.saturating_sub(n_sliding_window);
1186            expected.push(*data[start..i + 1].iter().max().unwrap());
1187
1188            moving_max.push(data[i]);
1189            if i > n_sliding_window {
1190                moving_max.pop();
1191            }
1192            res.push(*moving_max.max().unwrap());
1193        }
1194        assert_eq!(res, expected);
1195        Ok(())
1196    }
1197
1198    #[test]
1199    fn moving_min_tests() -> Result<()> {
1200        moving_min_i32(100, 10)?;
1201        moving_min_i32(100, 20)?;
1202        moving_min_i32(100, 50)?;
1203        moving_min_i32(100, 100)?;
1204        Ok(())
1205    }
1206
1207    #[test]
1208    fn moving_max_tests() -> Result<()> {
1209        moving_max_i32(100, 10)?;
1210        moving_max_i32(100, 20)?;
1211        moving_max_i32(100, 50)?;
1212        moving_max_i32(100, 100)?;
1213        Ok(())
1214    }
1215
1216    #[test]
1217    fn test_min_max_coerce_types() {
1218        // the coerced types is same with input types
1219        let funs: Vec<Box<dyn AggregateUDFImpl>> =
1220            vec![Box::new(Min::new()), Box::new(Max::new())];
1221        let input_types = vec![
1222            vec![DataType::Int32],
1223            vec![DataType::Decimal128(10, 2)],
1224            vec![DataType::Decimal256(1, 1)],
1225            vec![DataType::Utf8],
1226        ];
1227        for fun in funs {
1228            for input_type in &input_types {
1229                let result = fun.coerce_types(input_type);
1230                assert_eq!(*input_type, result.unwrap());
1231            }
1232        }
1233    }
1234
1235    #[test]
1236    fn test_get_min_max_return_type_coerce_dictionary() -> Result<()> {
1237        let data_type =
1238            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
1239        let result = get_min_max_result_type(&[data_type])?;
1240        assert_eq!(result, vec![DataType::Utf8]);
1241        Ok(())
1242    }
1243
1244    #[test]
1245    fn test_min_max_dictionary() -> Result<()> {
1246        let values = StringArray::from(vec!["b", "c", "a", "🦀", "d"]);
1247        let keys = Int32Array::from(vec![Some(0), Some(1), Some(2), None, Some(4)]);
1248        let dict_array =
1249            DictionaryArray::try_new(keys, Arc::new(values) as ArrayRef).unwrap();
1250        let dict_array_ref = Arc::new(dict_array) as ArrayRef;
1251        let rt_type =
1252            get_min_max_result_type(&[dict_array_ref.data_type().clone()])?[0].clone();
1253
1254        let mut min_acc = MinAccumulator::try_new(&rt_type)?;
1255        min_acc.update_batch(&[Arc::clone(&dict_array_ref)])?;
1256        let min_result = min_acc.evaluate()?;
1257        assert_eq!(min_result, ScalarValue::Utf8(Some("a".to_string())));
1258
1259        let mut max_acc = MaxAccumulator::try_new(&rt_type)?;
1260        max_acc.update_batch(&[Arc::clone(&dict_array_ref)])?;
1261        let max_result = max_acc.evaluate()?;
1262        assert_eq!(max_result, ScalarValue::Utf8(Some("d".to_string())));
1263        Ok(())
1264    }
1265
1266    fn dict_scalar(key_type: DataType, inner: ScalarValue) -> ScalarValue {
1267        ScalarValue::Dictionary(Box::new(key_type), Box::new(inner))
1268    }
1269
1270    fn utf8_dict_scalar(key_type: DataType, value: &str) -> ScalarValue {
1271        dict_scalar(key_type, ScalarValue::Utf8(Some(value.to_string())))
1272    }
1273
1274    fn string_dictionary_batch(values: &[&str], keys: &[Option<i32>]) -> ArrayRef {
1275        string_dictionary_batch_with_keys(Int32Array::from(keys.to_vec()), values)
1276    }
1277
1278    fn string_dictionary_batch_with_keys<K>(
1279        keys: PrimitiveArray<K>,
1280        values: &[&str],
1281    ) -> ArrayRef
1282    where
1283        K: ArrowDictionaryKeyType,
1284    {
1285        let values = Arc::new(StringArray::from(values.to_vec())) as ArrayRef;
1286        Arc::new(DictionaryArray::try_new(keys, values).unwrap()) as ArrayRef
1287    }
1288
1289    fn optional_string_dictionary_batch(
1290        values: &[Option<&str>],
1291        keys: &[Option<i32>],
1292    ) -> ArrayRef {
1293        let values = Arc::new(StringArray::from(values.to_vec())) as ArrayRef;
1294        Arc::new(
1295            DictionaryArray::try_new(Int32Array::from(keys.to_vec()), values).unwrap(),
1296        ) as ArrayRef
1297    }
1298
1299    fn float_dictionary_batch(values: &[f32], keys: &[Option<i32>]) -> ArrayRef {
1300        let values = Arc::new(Float32Array::from(values.to_vec())) as ArrayRef;
1301        Arc::new(
1302            DictionaryArray::try_new(Int32Array::from(keys.to_vec()), values).unwrap(),
1303        ) as ArrayRef
1304    }
1305
1306    fn evaluate_dictionary_accumulator(
1307        mut acc: impl Accumulator,
1308        batches: &[ArrayRef],
1309    ) -> Result<ScalarValue> {
1310        for batch in batches {
1311            acc.update_batch(&[Arc::clone(batch)])?;
1312        }
1313        acc.evaluate()
1314    }
1315
1316    fn assert_dictionary_min_max(
1317        dict_type: &DataType,
1318        batches: &[ArrayRef],
1319        expected_min: &str,
1320        expected_max: &str,
1321    ) -> Result<()> {
1322        let key_type = match dict_type {
1323            DataType::Dictionary(key_type, _) => key_type.as_ref().clone(),
1324            other => panic!("expected dictionary type, got {other:?}"),
1325        };
1326
1327        let min_result = evaluate_dictionary_accumulator(
1328            MinAccumulator::try_new(dict_type)?,
1329            batches,
1330        )?;
1331        assert_eq!(min_result, utf8_dict_scalar(key_type.clone(), expected_min));
1332
1333        let max_result = evaluate_dictionary_accumulator(
1334            MaxAccumulator::try_new(dict_type)?,
1335            batches,
1336        )?;
1337        assert_eq!(max_result, utf8_dict_scalar(key_type, expected_max));
1338
1339        Ok(())
1340    }
1341
1342    #[test]
1343    fn test_min_max_dictionary_without_coercion() -> Result<()> {
1344        let dict_array_ref = string_dictionary_batch(
1345            &["b", "c", "a", "d"],
1346            &[Some(0), Some(1), Some(2), Some(3)],
1347        );
1348        let dict_type = dict_array_ref.data_type().clone();
1349
1350        assert_dictionary_min_max(&dict_type, &[dict_array_ref], "a", "d")
1351    }
1352
1353    #[test]
1354    fn test_min_max_dictionary_with_nulls() -> Result<()> {
1355        let dict_array_ref = string_dictionary_batch(
1356            &["b", "c", "a"],
1357            &[None, Some(0), None, Some(1), Some(2)],
1358        );
1359        let dict_type = dict_array_ref.data_type().clone();
1360
1361        assert_dictionary_min_max(&dict_type, &[dict_array_ref], "a", "c")
1362    }
1363
1364    #[test]
1365    fn test_min_max_dictionary_ignores_unreferenced_values() -> Result<()> {
1366        let dict_array_ref =
1367            string_dictionary_batch(&["a", "z", "zz_unused"], &[Some(1), Some(1), None]);
1368        let dict_type = dict_array_ref.data_type().clone();
1369
1370        assert_dictionary_min_max(&dict_type, &[dict_array_ref], "z", "z")
1371    }
1372
1373    #[test]
1374    fn test_min_max_dictionary_ignores_referenced_null_values() -> Result<()> {
1375        let dict_array_ref = optional_string_dictionary_batch(
1376            &[Some("b"), None, Some("a"), Some("d")],
1377            &[Some(0), Some(1), Some(2), Some(3)],
1378        );
1379        let dict_type = dict_array_ref.data_type().clone();
1380
1381        assert_dictionary_min_max(&dict_type, &[dict_array_ref], "a", "d")
1382    }
1383
1384    #[test]
1385    fn test_min_max_dictionary_multi_batch() -> Result<()> {
1386        let dict_type =
1387            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
1388        let batch1 = string_dictionary_batch(&["b", "c"], &[Some(0), Some(1)]);
1389        let batch2 = string_dictionary_batch(&["a", "d"], &[Some(0), Some(1)]);
1390
1391        assert_dictionary_min_max(&dict_type, &[batch1, batch2], "a", "d")
1392    }
1393
1394    #[test]
1395    fn test_min_max_dictionary_int8_keys() -> Result<()> {
1396        let dict_type =
1397            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8));
1398        let dict_array_ref = string_dictionary_batch_with_keys(
1399            Int8Array::from(vec![Some(0), Some(1), Some(2), Some(3)]),
1400            &["b", "c", "a", "d"],
1401        );
1402
1403        assert_dictionary_min_max(&dict_type, &[dict_array_ref], "a", "d")
1404    }
1405
1406    #[test]
1407    fn test_min_max_dictionary_float_with_nans() -> Result<()> {
1408        let dict_type =
1409            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Float32));
1410        let batch1 = float_dictionary_batch(&[0.0, f32::NAN], &[Some(0), Some(1)]);
1411        let batch2 = float_dictionary_batch(&[f32::NEG_INFINITY], &[Some(0)]);
1412
1413        let min_result = evaluate_dictionary_accumulator(
1414            MinAccumulator::try_new(&dict_type)?,
1415            &[Arc::clone(&batch1), Arc::clone(&batch2)],
1416        )?;
1417        assert_eq!(
1418            min_result,
1419            dict_scalar(
1420                DataType::Int32,
1421                ScalarValue::Float32(Some(f32::NEG_INFINITY)),
1422            )
1423        );
1424
1425        let max_result = evaluate_dictionary_accumulator(
1426            MaxAccumulator::try_new(&dict_type)?,
1427            &[batch1, batch2],
1428        )?;
1429        assert_eq!(
1430            max_result,
1431            dict_scalar(DataType::Int32, ScalarValue::Float32(Some(f32::NAN)))
1432        );
1433
1434        Ok(())
1435    }
1436}