1mod min_max_bytes;
22mod min_max_struct;
23
24use arrow::array::ArrayRef;
25use arrow::datatypes::{
26 DataType, Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type,
27 DurationMicrosecondType, DurationMillisecondType, DurationNanosecondType,
28 DurationSecondType, Float16Type, Float32Type, Float64Type, Int8Type, Int16Type,
29 Int32Type, Int64Type, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
30};
31use datafusion_common::stats::Precision;
32use datafusion_common::{ColumnStatistics, Result, exec_err, internal_err};
33use datafusion_functions_aggregate_common::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator;
34use datafusion_physical_expr::expressions;
35use std::cmp::Ordering;
36use std::fmt::Debug;
37
38use arrow::datatypes::i256;
39use arrow::datatypes::{
40 Date32Type, Date64Type, Time32MillisecondType, Time32SecondType,
41 Time64MicrosecondType, Time64NanosecondType, TimeUnit, TimestampMicrosecondType,
42 TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
43};
44
45use crate::min_max::min_max_bytes::MinMaxBytesAccumulator;
46use crate::min_max::min_max_struct::MinMaxStructAccumulator;
47use datafusion_common::ScalarValue;
48use datafusion_expr::{
49 Accumulator, AggregateUDFImpl, Documentation, SetMonotonicity, Signature, Volatility,
50 function::AccumulatorArgs,
51};
52use datafusion_expr::{GroupsAccumulator, StatisticsArgs};
53use datafusion_macros::user_doc;
54use half::f16;
55use std::mem::size_of_val;
56use std::ops::Deref;
57
58fn get_min_max_result_type(input_types: &[DataType]) -> Result<Vec<DataType>> {
59 if input_types.len() != 1 {
61 return exec_err!(
62 "min/max was called with {} arguments. It requires only 1.",
63 input_types.len()
64 );
65 }
66 match &input_types[0] {
69 DataType::Dictionary(_, dict_value_type) => {
70 Ok(vec![dict_value_type.deref().clone()])
72 }
73 _ => Ok(input_types.to_vec()),
76 }
77}
78
79#[user_doc(
80 doc_section(label = "General Functions"),
81 description = "Returns the maximum value in the specified column.",
82 syntax_example = "max(expression)",
83 sql_example = r#"```sql
84> SELECT max(column_name) FROM table_name;
85+----------------------+
86| max(column_name) |
87+----------------------+
88| 150 |
89+----------------------+
90```"#,
91 standard_argument(name = "expression",)
92)]
93#[derive(Debug, PartialEq, Eq, Hash)]
95pub struct Max {
96 signature: Signature,
97}
98
99impl Max {
100 pub fn new() -> Self {
101 Self {
102 signature: Signature::user_defined(Volatility::Immutable),
103 }
104 }
105}
106
107impl Default for Max {
108 fn default() -> Self {
109 Self::new()
110 }
111}
112macro_rules! primitive_max_accumulator {
117 ($DATA_TYPE:ident, $NATIVE:ident, $PRIMTYPE:ident) => {{
118 Ok(Box::new(
119 PrimitiveGroupsAccumulator::<$PRIMTYPE, _>::new($DATA_TYPE, |cur, new| {
120 match (new).partial_cmp(cur) {
121 Some(Ordering::Greater) | None => {
122 *cur = new
124 }
125 _ => {}
126 }
127 })
128 .with_starting_value($NATIVE::MIN),
130 ))
131 }};
132}
133
134macro_rules! primitive_min_accumulator {
140 ($DATA_TYPE:ident, $NATIVE:ident, $PRIMTYPE:ident) => {{
141 Ok(Box::new(
142 PrimitiveGroupsAccumulator::<$PRIMTYPE, _>::new(&$DATA_TYPE, |cur, new| {
143 match (new).partial_cmp(cur) {
144 Some(Ordering::Less) | None => {
145 *cur = new
147 }
148 _ => {}
149 }
150 })
151 .with_starting_value($NATIVE::MAX),
153 ))
154 }};
155}
156
157trait FromColumnStatistics {
158 fn value_from_column_statistics(
159 &self,
160 stats: &ColumnStatistics,
161 ) -> Option<ScalarValue>;
162
163 fn value_from_statistics(
164 &self,
165 statistics_args: &StatisticsArgs,
166 ) -> Option<ScalarValue> {
167 if let Precision::Exact(num_rows) = &statistics_args.statistics.num_rows {
168 match *num_rows {
169 0 => return ScalarValue::try_from(statistics_args.return_type).ok(),
170 value if value > 0 => {
171 let col_stats = &statistics_args.statistics.column_statistics;
172 if statistics_args.exprs.len() == 1 {
173 if let Some(col_expr) =
175 statistics_args.exprs[0].downcast_ref::<expressions::Column>()
176 {
177 return self.value_from_column_statistics(
178 &col_stats[col_expr.index()],
179 );
180 }
181 }
182 }
183 _ => {}
184 }
185 }
186 None
187 }
188}
189
190impl FromColumnStatistics for Max {
191 fn value_from_column_statistics(
192 &self,
193 col_stats: &ColumnStatistics,
194 ) -> Option<ScalarValue> {
195 if let Precision::Exact(ref val) = col_stats.max_value
196 && !val.is_null()
197 {
198 return Some(val.clone());
199 }
200 None
201 }
202}
203
204impl AggregateUDFImpl for Max {
205 fn name(&self) -> &str {
206 "max"
207 }
208
209 fn signature(&self) -> &Signature {
210 &self.signature
211 }
212
213 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
214 Ok(arg_types[0].to_owned())
215 }
216
217 fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
218 Ok(Box::new(MaxAccumulator::try_new(
219 acc_args.return_field.data_type(),
220 )?))
221 }
222
223 fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool {
224 use DataType::*;
225 matches!(
226 args.return_field.data_type(),
227 Int8 | Int16
228 | Int32
229 | Int64
230 | UInt8
231 | UInt16
232 | UInt32
233 | UInt64
234 | Float16
235 | Float32
236 | Float64
237 | Decimal32(_, _)
238 | Decimal64(_, _)
239 | Decimal128(_, _)
240 | Decimal256(_, _)
241 | Date32
242 | Date64
243 | Time32(_)
244 | Time64(_)
245 | Timestamp(_, _)
246 | Utf8
247 | LargeUtf8
248 | Utf8View
249 | Binary
250 | LargeBinary
251 | BinaryView
252 | Duration(_)
253 | Struct(_)
254 )
255 }
256
257 fn create_groups_accumulator(
258 &self,
259 args: AccumulatorArgs,
260 ) -> Result<Box<dyn GroupsAccumulator>> {
261 use DataType::*;
262 use TimeUnit::*;
263 let data_type = args.return_field.data_type();
264 match data_type {
265 Int8 => primitive_max_accumulator!(data_type, i8, Int8Type),
266 Int16 => primitive_max_accumulator!(data_type, i16, Int16Type),
267 Int32 => primitive_max_accumulator!(data_type, i32, Int32Type),
268 Int64 => primitive_max_accumulator!(data_type, i64, Int64Type),
269 UInt8 => primitive_max_accumulator!(data_type, u8, UInt8Type),
270 UInt16 => primitive_max_accumulator!(data_type, u16, UInt16Type),
271 UInt32 => primitive_max_accumulator!(data_type, u32, UInt32Type),
272 UInt64 => primitive_max_accumulator!(data_type, u64, UInt64Type),
273 Float16 => {
274 primitive_max_accumulator!(data_type, f16, Float16Type)
275 }
276 Float32 => {
277 primitive_max_accumulator!(data_type, f32, Float32Type)
278 }
279 Float64 => {
280 primitive_max_accumulator!(data_type, f64, Float64Type)
281 }
282 Date32 => primitive_max_accumulator!(data_type, i32, Date32Type),
283 Date64 => primitive_max_accumulator!(data_type, i64, Date64Type),
284 Time32(Second) => {
285 primitive_max_accumulator!(data_type, i32, Time32SecondType)
286 }
287 Time32(Millisecond) => {
288 primitive_max_accumulator!(data_type, i32, Time32MillisecondType)
289 }
290 Time64(Microsecond) => {
291 primitive_max_accumulator!(data_type, i64, Time64MicrosecondType)
292 }
293 Time64(Nanosecond) => {
294 primitive_max_accumulator!(data_type, i64, Time64NanosecondType)
295 }
296 Timestamp(Second, _) => {
297 primitive_max_accumulator!(data_type, i64, TimestampSecondType)
298 }
299 Timestamp(Millisecond, _) => {
300 primitive_max_accumulator!(data_type, i64, TimestampMillisecondType)
301 }
302 Timestamp(Microsecond, _) => {
303 primitive_max_accumulator!(data_type, i64, TimestampMicrosecondType)
304 }
305 Timestamp(Nanosecond, _) => {
306 primitive_max_accumulator!(data_type, i64, TimestampNanosecondType)
307 }
308 Duration(Second) => {
309 primitive_max_accumulator!(data_type, i64, DurationSecondType)
310 }
311 Duration(Millisecond) => {
312 primitive_max_accumulator!(data_type, i64, DurationMillisecondType)
313 }
314 Duration(Microsecond) => {
315 primitive_max_accumulator!(data_type, i64, DurationMicrosecondType)
316 }
317 Duration(Nanosecond) => {
318 primitive_max_accumulator!(data_type, i64, DurationNanosecondType)
319 }
320 Decimal32(_, _) => {
321 primitive_max_accumulator!(data_type, i32, Decimal32Type)
322 }
323 Decimal64(_, _) => {
324 primitive_max_accumulator!(data_type, i64, Decimal64Type)
325 }
326 Decimal128(_, _) => {
327 primitive_max_accumulator!(data_type, i128, Decimal128Type)
328 }
329 Decimal256(_, _) => {
330 primitive_max_accumulator!(data_type, i256, Decimal256Type)
331 }
332 Utf8 | LargeUtf8 | Utf8View | Binary | LargeBinary | BinaryView => {
333 Ok(Box::new(MinMaxBytesAccumulator::new_max(data_type.clone())))
334 }
335 Struct(_) => Ok(Box::new(MinMaxStructAccumulator::new_max(
336 data_type.clone(),
337 ))),
338 _ => internal_err!("GroupsAccumulator not supported for max({})", data_type),
340 }
341 }
342
343 fn create_sliding_accumulator(
344 &self,
345 args: AccumulatorArgs,
346 ) -> Result<Box<dyn Accumulator>> {
347 Ok(Box::new(SlidingMaxAccumulator::try_new(
348 args.return_field.data_type(),
349 )?))
350 }
351
352 fn is_descending(&self) -> Option<bool> {
353 Some(true)
354 }
355
356 fn order_sensitivity(&self) -> datafusion_expr::utils::AggregateOrderSensitivity {
357 datafusion_expr::utils::AggregateOrderSensitivity::Insensitive
358 }
359
360 fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
361 get_min_max_result_type(arg_types)
362 }
363 fn reverse_expr(&self) -> datafusion_expr::ReversedUDAF {
364 datafusion_expr::ReversedUDAF::Identical
365 }
366 fn value_from_stats(&self, statistics_args: &StatisticsArgs) -> Option<ScalarValue> {
367 self.value_from_statistics(statistics_args)
368 }
369
370 fn documentation(&self) -> Option<&Documentation> {
371 self.doc()
372 }
373
374 fn set_monotonicity(&self, _data_type: &DataType) -> SetMonotonicity {
375 SetMonotonicity::Increasing
378 }
379}
380
381#[derive(Debug)]
382pub struct SlidingMaxAccumulator {
383 max: ScalarValue,
384 moving_max: MovingMax<ScalarValue>,
385}
386
387impl SlidingMaxAccumulator {
388 pub fn try_new(datatype: &DataType) -> Result<Self> {
390 Ok(Self {
391 max: ScalarValue::try_from(datatype)?,
392 moving_max: MovingMax::<ScalarValue>::new(),
393 })
394 }
395}
396
397impl Accumulator for SlidingMaxAccumulator {
398 fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
399 for idx in 0..values[0].len() {
400 let val = ScalarValue::try_from_array(&values[0], idx)?;
401 self.moving_max.push(val);
402 }
403 if let Some(res) = self.moving_max.max() {
404 self.max = res.clone();
405 }
406 Ok(())
407 }
408
409 fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
410 for _idx in 0..values[0].len() {
411 (self.moving_max).pop();
412 }
413 if let Some(res) = self.moving_max.max() {
414 self.max = res.clone();
415 }
416 Ok(())
417 }
418
419 fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
420 self.update_batch(states)
421 }
422
423 fn state(&mut self) -> Result<Vec<ScalarValue>> {
424 Ok(vec![self.max.clone()])
425 }
426
427 fn evaluate(&mut self) -> Result<ScalarValue> {
428 Ok(self.max.clone())
429 }
430
431 fn supports_retract_batch(&self) -> bool {
432 true
433 }
434
435 fn size(&self) -> usize {
436 size_of_val(self) - size_of_val(&self.max) + self.max.size()
437 }
438}
439
440#[user_doc(
441 doc_section(label = "General Functions"),
442 description = "Returns the minimum value in the specified column.",
443 syntax_example = "min(expression)",
444 sql_example = r#"```sql
445> SELECT min(column_name) FROM table_name;
446+----------------------+
447| min(column_name) |
448+----------------------+
449| 12 |
450+----------------------+
451```"#,
452 standard_argument(name = "expression",)
453)]
454#[derive(Debug, PartialEq, Eq, Hash)]
455pub struct Min {
456 signature: Signature,
457}
458
459impl Min {
460 pub fn new() -> Self {
461 Self {
462 signature: Signature::user_defined(Volatility::Immutable),
463 }
464 }
465}
466
467impl Default for Min {
468 fn default() -> Self {
469 Self::new()
470 }
471}
472
473impl FromColumnStatistics for Min {
474 fn value_from_column_statistics(
475 &self,
476 col_stats: &ColumnStatistics,
477 ) -> Option<ScalarValue> {
478 if let Precision::Exact(ref val) = col_stats.min_value
479 && !val.is_null()
480 {
481 return Some(val.clone());
482 }
483 None
484 }
485}
486
487impl AggregateUDFImpl for Min {
488 fn name(&self) -> &str {
489 "min"
490 }
491
492 fn signature(&self) -> &Signature {
493 &self.signature
494 }
495
496 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
497 Ok(arg_types[0].to_owned())
498 }
499
500 fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
501 Ok(Box::new(MinAccumulator::try_new(
502 acc_args.return_field.data_type(),
503 )?))
504 }
505
506 fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool {
507 use DataType::*;
508 matches!(
509 args.return_field.data_type(),
510 Int8 | Int16
511 | Int32
512 | Int64
513 | UInt8
514 | UInt16
515 | UInt32
516 | UInt64
517 | Float16
518 | Float32
519 | Float64
520 | Decimal32(_, _)
521 | Decimal64(_, _)
522 | Decimal128(_, _)
523 | Decimal256(_, _)
524 | Date32
525 | Date64
526 | Time32(_)
527 | Time64(_)
528 | Timestamp(_, _)
529 | Utf8
530 | LargeUtf8
531 | Utf8View
532 | Binary
533 | LargeBinary
534 | BinaryView
535 | Duration(_)
536 | Struct(_)
537 )
538 }
539
540 fn create_groups_accumulator(
541 &self,
542 args: AccumulatorArgs,
543 ) -> Result<Box<dyn GroupsAccumulator>> {
544 use DataType::*;
545 use TimeUnit::*;
546 let data_type = args.return_field.data_type();
547 match data_type {
548 Int8 => primitive_min_accumulator!(data_type, i8, Int8Type),
549 Int16 => primitive_min_accumulator!(data_type, i16, Int16Type),
550 Int32 => primitive_min_accumulator!(data_type, i32, Int32Type),
551 Int64 => primitive_min_accumulator!(data_type, i64, Int64Type),
552 UInt8 => primitive_min_accumulator!(data_type, u8, UInt8Type),
553 UInt16 => primitive_min_accumulator!(data_type, u16, UInt16Type),
554 UInt32 => primitive_min_accumulator!(data_type, u32, UInt32Type),
555 UInt64 => primitive_min_accumulator!(data_type, u64, UInt64Type),
556 Float16 => {
557 primitive_min_accumulator!(data_type, f16, Float16Type)
558 }
559 Float32 => {
560 primitive_min_accumulator!(data_type, f32, Float32Type)
561 }
562 Float64 => {
563 primitive_min_accumulator!(data_type, f64, Float64Type)
564 }
565 Date32 => primitive_min_accumulator!(data_type, i32, Date32Type),
566 Date64 => primitive_min_accumulator!(data_type, i64, Date64Type),
567 Time32(Second) => {
568 primitive_min_accumulator!(data_type, i32, Time32SecondType)
569 }
570 Time32(Millisecond) => {
571 primitive_min_accumulator!(data_type, i32, Time32MillisecondType)
572 }
573 Time64(Microsecond) => {
574 primitive_min_accumulator!(data_type, i64, Time64MicrosecondType)
575 }
576 Time64(Nanosecond) => {
577 primitive_min_accumulator!(data_type, i64, Time64NanosecondType)
578 }
579 Timestamp(Second, _) => {
580 primitive_min_accumulator!(data_type, i64, TimestampSecondType)
581 }
582 Timestamp(Millisecond, _) => {
583 primitive_min_accumulator!(data_type, i64, TimestampMillisecondType)
584 }
585 Timestamp(Microsecond, _) => {
586 primitive_min_accumulator!(data_type, i64, TimestampMicrosecondType)
587 }
588 Timestamp(Nanosecond, _) => {
589 primitive_min_accumulator!(data_type, i64, TimestampNanosecondType)
590 }
591 Duration(Second) => {
592 primitive_min_accumulator!(data_type, i64, DurationSecondType)
593 }
594 Duration(Millisecond) => {
595 primitive_min_accumulator!(data_type, i64, DurationMillisecondType)
596 }
597 Duration(Microsecond) => {
598 primitive_min_accumulator!(data_type, i64, DurationMicrosecondType)
599 }
600 Duration(Nanosecond) => {
601 primitive_min_accumulator!(data_type, i64, DurationNanosecondType)
602 }
603 Decimal32(_, _) => {
604 primitive_min_accumulator!(data_type, i32, Decimal32Type)
605 }
606 Decimal64(_, _) => {
607 primitive_min_accumulator!(data_type, i64, Decimal64Type)
608 }
609 Decimal128(_, _) => {
610 primitive_min_accumulator!(data_type, i128, Decimal128Type)
611 }
612 Decimal256(_, _) => {
613 primitive_min_accumulator!(data_type, i256, Decimal256Type)
614 }
615 Utf8 | LargeUtf8 | Utf8View | Binary | LargeBinary | BinaryView => {
616 Ok(Box::new(MinMaxBytesAccumulator::new_min(data_type.clone())))
617 }
618 Struct(_) => Ok(Box::new(MinMaxStructAccumulator::new_min(
619 data_type.clone(),
620 ))),
621 _ => internal_err!("GroupsAccumulator not supported for min({})", data_type),
623 }
624 }
625
626 fn create_sliding_accumulator(
627 &self,
628 args: AccumulatorArgs,
629 ) -> Result<Box<dyn Accumulator>> {
630 Ok(Box::new(SlidingMinAccumulator::try_new(
631 args.return_field.data_type(),
632 )?))
633 }
634
635 fn is_descending(&self) -> Option<bool> {
636 Some(false)
637 }
638
639 fn value_from_stats(&self, statistics_args: &StatisticsArgs) -> Option<ScalarValue> {
640 self.value_from_statistics(statistics_args)
641 }
642 fn order_sensitivity(&self) -> datafusion_expr::utils::AggregateOrderSensitivity {
643 datafusion_expr::utils::AggregateOrderSensitivity::Insensitive
644 }
645
646 fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
647 get_min_max_result_type(arg_types)
648 }
649
650 fn reverse_expr(&self) -> datafusion_expr::ReversedUDAF {
651 datafusion_expr::ReversedUDAF::Identical
652 }
653
654 fn documentation(&self) -> Option<&Documentation> {
655 self.doc()
656 }
657
658 fn set_monotonicity(&self, _data_type: &DataType) -> SetMonotonicity {
659 SetMonotonicity::Decreasing
662 }
663}
664
665#[derive(Debug)]
666pub struct SlidingMinAccumulator {
667 min: ScalarValue,
668 moving_min: MovingMin<ScalarValue>,
669}
670
671impl SlidingMinAccumulator {
672 pub fn try_new(datatype: &DataType) -> Result<Self> {
673 Ok(Self {
674 min: ScalarValue::try_from(datatype)?,
675 moving_min: MovingMin::<ScalarValue>::new(),
676 })
677 }
678}
679
680impl Accumulator for SlidingMinAccumulator {
681 fn state(&mut self) -> Result<Vec<ScalarValue>> {
682 Ok(vec![self.min.clone()])
683 }
684
685 fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
686 for idx in 0..values[0].len() {
687 let val = ScalarValue::try_from_array(&values[0], idx)?;
688 if !val.is_null() {
689 self.moving_min.push(val);
690 }
691 }
692 if let Some(res) = self.moving_min.min() {
693 self.min = res.clone();
694 }
695 Ok(())
696 }
697
698 fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
699 for idx in 0..values[0].len() {
700 let val = ScalarValue::try_from_array(&values[0], idx)?;
701 if !val.is_null() {
702 (self.moving_min).pop();
703 }
704 }
705 if let Some(res) = self.moving_min.min() {
706 self.min = res.clone();
707 }
708 Ok(())
709 }
710
711 fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
712 self.update_batch(states)
713 }
714
715 fn evaluate(&mut self) -> Result<ScalarValue> {
716 Ok(self.min.clone())
717 }
718
719 fn supports_retract_batch(&self) -> bool {
720 true
721 }
722
723 fn size(&self) -> usize {
724 size_of_val(self) - size_of_val(&self.min) + self.min.size()
725 }
726}
727
728#[derive(Debug)]
766pub struct MovingMin<T> {
767 push_stack: Vec<(T, T)>,
768 pop_stack: Vec<(T, T)>,
769}
770
771impl<T: Clone + PartialOrd> Default for MovingMin<T> {
772 fn default() -> Self {
773 Self {
774 push_stack: Vec::new(),
775 pop_stack: Vec::new(),
776 }
777 }
778}
779
780impl<T: Clone + PartialOrd> MovingMin<T> {
781 #[inline]
784 pub fn new() -> Self {
785 Self::default()
786 }
787
788 #[inline]
791 pub fn with_capacity(capacity: usize) -> Self {
792 Self {
793 push_stack: Vec::with_capacity(capacity),
794 pop_stack: Vec::with_capacity(capacity),
795 }
796 }
797
798 #[inline]
801 pub fn min(&self) -> Option<&T> {
802 match (self.push_stack.last(), self.pop_stack.last()) {
803 (None, None) => None,
804 (Some((_, min)), None) => Some(min),
805 (None, Some((_, min))) => Some(min),
806 (Some((_, a)), Some((_, b))) => Some(if a < b { a } else { b }),
807 }
808 }
809
810 #[inline]
812 pub fn push(&mut self, val: T) {
813 self.push_stack.push(match self.push_stack.last() {
814 Some((_, min)) => {
815 if val > *min {
816 (val, min.clone())
817 } else {
818 (val.clone(), val)
819 }
820 }
821 None => (val.clone(), val),
822 });
823 }
824
825 #[inline]
827 pub fn pop(&mut self) -> Option<T> {
828 if self.pop_stack.is_empty() {
829 match self.push_stack.pop() {
830 Some((val, _)) => {
831 let mut last = (val.clone(), val);
832 self.pop_stack.push(last.clone());
833 while let Some((val, _)) = self.push_stack.pop() {
834 let min = if last.1 < val {
835 last.1.clone()
836 } else {
837 val.clone()
838 };
839 last = (val.clone(), min);
840 self.pop_stack.push(last.clone());
841 }
842 }
843 None => return None,
844 }
845 }
846 self.pop_stack.pop().map(|(val, _)| val)
847 }
848
849 #[inline]
851 pub fn len(&self) -> usize {
852 self.push_stack.len() + self.pop_stack.len()
853 }
854
855 #[inline]
857 pub fn is_empty(&self) -> bool {
858 self.len() == 0
859 }
860}
861
862#[derive(Debug)]
886pub struct MovingMax<T> {
887 push_stack: Vec<(T, T)>,
888 pop_stack: Vec<(T, T)>,
889}
890
891impl<T: Clone + PartialOrd> Default for MovingMax<T> {
892 fn default() -> Self {
893 Self {
894 push_stack: Vec::new(),
895 pop_stack: Vec::new(),
896 }
897 }
898}
899
900impl<T: Clone + PartialOrd> MovingMax<T> {
901 #[inline]
903 pub fn new() -> Self {
904 Self::default()
905 }
906
907 #[inline]
910 pub fn with_capacity(capacity: usize) -> Self {
911 Self {
912 push_stack: Vec::with_capacity(capacity),
913 pop_stack: Vec::with_capacity(capacity),
914 }
915 }
916
917 #[inline]
919 pub fn max(&self) -> Option<&T> {
920 match (self.push_stack.last(), self.pop_stack.last()) {
921 (None, None) => None,
922 (Some((_, max)), None) => Some(max),
923 (None, Some((_, max))) => Some(max),
924 (Some((_, a)), Some((_, b))) => Some(if a > b { a } else { b }),
925 }
926 }
927
928 #[inline]
930 pub fn push(&mut self, val: T) {
931 self.push_stack.push(match self.push_stack.last() {
932 Some((_, max)) => {
933 if val < *max {
934 (val, max.clone())
935 } else {
936 (val.clone(), val)
937 }
938 }
939 None => (val.clone(), val),
940 });
941 }
942
943 #[inline]
945 pub fn pop(&mut self) -> Option<T> {
946 if self.pop_stack.is_empty() {
947 match self.push_stack.pop() {
948 Some((val, _)) => {
949 let mut last = (val.clone(), val);
950 self.pop_stack.push(last.clone());
951 while let Some((val, _)) = self.push_stack.pop() {
952 let max = if last.1 > val {
953 last.1.clone()
954 } else {
955 val.clone()
956 };
957 last = (val.clone(), max);
958 self.pop_stack.push(last.clone());
959 }
960 }
961 None => return None,
962 }
963 }
964 self.pop_stack.pop().map(|(val, _)| val)
965 }
966
967 #[inline]
969 pub fn len(&self) -> usize {
970 self.push_stack.len() + self.pop_stack.len()
971 }
972
973 #[inline]
975 pub fn is_empty(&self) -> bool {
976 self.len() == 0
977 }
978}
979
980make_udaf_expr_and_func!(
981 Max,
982 max,
983 expression,
984 "Returns the maximum of a group of values.",
985 max_udaf
986);
987
988make_udaf_expr_and_func!(
989 Min,
990 min,
991 expression,
992 "Returns the minimum of a group of values.",
993 min_udaf
994);
995
996pub use datafusion_functions_aggregate_common::min_max::{
998 MaxAccumulator, MinAccumulator,
999};
1000
1001#[cfg(test)]
1002mod tests {
1003 use super::*;
1004 use arrow::{
1005 array::{
1006 Array, DictionaryArray, Float32Array, Int8Array, Int32Array,
1007 IntervalDayTimeArray, IntervalMonthDayNanoArray, IntervalYearMonthArray,
1008 PrimitiveArray, StringArray,
1009 },
1010 datatypes::{
1011 ArrowDictionaryKeyType, IntervalDayTimeType, IntervalMonthDayNanoType,
1012 IntervalUnit, IntervalYearMonthType,
1013 },
1014 };
1015 use std::sync::Arc;
1016
1017 #[test]
1018 fn interval_min_max() {
1019 let b = IntervalYearMonthArray::from(vec![
1021 IntervalYearMonthType::make_value(0, 1),
1022 IntervalYearMonthType::make_value(5, 34),
1023 IntervalYearMonthType::make_value(-2, 4),
1024 IntervalYearMonthType::make_value(7, -4),
1025 IntervalYearMonthType::make_value(0, 1),
1026 ]);
1027 let b: ArrayRef = Arc::new(b);
1028
1029 let mut min =
1030 MinAccumulator::try_new(&DataType::Interval(IntervalUnit::YearMonth))
1031 .unwrap();
1032 min.update_batch(&[Arc::clone(&b)]).unwrap();
1033 let min_res = min.evaluate().unwrap();
1034 assert_eq!(
1035 min_res,
1036 ScalarValue::IntervalYearMonth(Some(IntervalYearMonthType::make_value(
1037 -2, 4,
1038 )))
1039 );
1040
1041 let mut max =
1042 MaxAccumulator::try_new(&DataType::Interval(IntervalUnit::YearMonth))
1043 .unwrap();
1044 max.update_batch(&[Arc::clone(&b)]).unwrap();
1045 let max_res = max.evaluate().unwrap();
1046 assert_eq!(
1047 max_res,
1048 ScalarValue::IntervalYearMonth(Some(IntervalYearMonthType::make_value(
1049 5, 34,
1050 )))
1051 );
1052
1053 let b = IntervalDayTimeArray::from(vec![
1055 IntervalDayTimeType::make_value(0, 0),
1056 IntervalDayTimeType::make_value(5, 454000),
1057 IntervalDayTimeType::make_value(-34, 0),
1058 IntervalDayTimeType::make_value(7, -4000),
1059 IntervalDayTimeType::make_value(1, 0),
1060 ]);
1061 let b: ArrayRef = Arc::new(b);
1062
1063 let mut min =
1064 MinAccumulator::try_new(&DataType::Interval(IntervalUnit::DayTime)).unwrap();
1065 min.update_batch(&[Arc::clone(&b)]).unwrap();
1066 let min_res = min.evaluate().unwrap();
1067 assert_eq!(
1068 min_res,
1069 ScalarValue::IntervalDayTime(Some(IntervalDayTimeType::make_value(-34, 0)))
1070 );
1071
1072 let mut max =
1073 MaxAccumulator::try_new(&DataType::Interval(IntervalUnit::DayTime)).unwrap();
1074 max.update_batch(&[Arc::clone(&b)]).unwrap();
1075 let max_res = max.evaluate().unwrap();
1076 assert_eq!(
1077 max_res,
1078 ScalarValue::IntervalDayTime(Some(IntervalDayTimeType::make_value(7, -4000)))
1079 );
1080
1081 let b = IntervalMonthDayNanoArray::from(vec![
1083 IntervalMonthDayNanoType::make_value(1, 0, 0),
1084 IntervalMonthDayNanoType::make_value(344, 34, -43_000_000_000),
1085 IntervalMonthDayNanoType::make_value(-593, -33, 13_000_000_000),
1086 IntervalMonthDayNanoType::make_value(5, 2, 493_000_000_000),
1087 IntervalMonthDayNanoType::make_value(1, 0, 0),
1088 ]);
1089 let b: ArrayRef = Arc::new(b);
1090
1091 let mut min =
1092 MinAccumulator::try_new(&DataType::Interval(IntervalUnit::MonthDayNano))
1093 .unwrap();
1094 min.update_batch(&[Arc::clone(&b)]).unwrap();
1095 let min_res = min.evaluate().unwrap();
1096 assert_eq!(
1097 min_res,
1098 ScalarValue::IntervalMonthDayNano(Some(
1099 IntervalMonthDayNanoType::make_value(-593, -33, 13_000_000_000)
1100 ))
1101 );
1102
1103 let mut max =
1104 MaxAccumulator::try_new(&DataType::Interval(IntervalUnit::MonthDayNano))
1105 .unwrap();
1106 max.update_batch(&[Arc::clone(&b)]).unwrap();
1107 let max_res = max.evaluate().unwrap();
1108 assert_eq!(
1109 max_res,
1110 ScalarValue::IntervalMonthDayNano(Some(
1111 IntervalMonthDayNanoType::make_value(344, 34, -43_000_000_000)
1112 ))
1113 );
1114 }
1115
1116 #[test]
1117 fn float_min_max_with_nans() {
1118 let pos_nan = f32::NAN;
1119 let zero = 0_f32;
1120 let neg_inf = f32::NEG_INFINITY;
1121
1122 let check = |acc: &mut dyn Accumulator, values: &[&[f32]], expected: f32| {
1123 for batch in values.iter() {
1124 let batch =
1125 Arc::new(Float32Array::from_iter_values(batch.iter().copied()));
1126 acc.update_batch(&[batch]).unwrap();
1127 }
1128 let result = acc.evaluate().unwrap();
1129 assert_eq!(result, ScalarValue::Float32(Some(expected)));
1130 };
1131
1132 let min = || MinAccumulator::try_new(&DataType::Float32).unwrap();
1137 let max = || MaxAccumulator::try_new(&DataType::Float32).unwrap();
1138
1139 check(&mut min(), &[&[zero], &[pos_nan]], zero);
1140 check(&mut min(), &[&[zero, pos_nan]], zero);
1141 check(&mut min(), &[&[zero], &[neg_inf]], neg_inf);
1142 check(&mut min(), &[&[zero, neg_inf]], neg_inf);
1143 check(&mut max(), &[&[zero], &[pos_nan]], pos_nan);
1144 check(&mut max(), &[&[zero, pos_nan]], pos_nan);
1145 check(&mut max(), &[&[zero], &[neg_inf]], zero);
1146 check(&mut max(), &[&[zero, neg_inf]], zero);
1147 }
1148
1149 use rand::Rng;
1150
1151 fn get_random_vec_i32(len: usize) -> Vec<i32> {
1152 let mut rng = rand::rng();
1153 let mut input = Vec::with_capacity(len);
1154 for _i in 0..len {
1155 input.push(rng.random_range(0..100));
1156 }
1157 input
1158 }
1159
1160 fn moving_min_i32(len: usize, n_sliding_window: usize) -> Result<()> {
1161 let data = get_random_vec_i32(len);
1162 let mut expected = Vec::with_capacity(len);
1163 let mut moving_min = MovingMin::<i32>::new();
1164 let mut res = Vec::with_capacity(len);
1165 for i in 0..len {
1166 let start = i.saturating_sub(n_sliding_window);
1167 expected.push(*data[start..i + 1].iter().min().unwrap());
1168
1169 moving_min.push(data[i]);
1170 if i > n_sliding_window {
1171 moving_min.pop();
1172 }
1173 res.push(*moving_min.min().unwrap());
1174 }
1175 assert_eq!(res, expected);
1176 Ok(())
1177 }
1178
1179 fn moving_max_i32(len: usize, n_sliding_window: usize) -> Result<()> {
1180 let data = get_random_vec_i32(len);
1181 let mut expected = Vec::with_capacity(len);
1182 let mut moving_max = MovingMax::<i32>::new();
1183 let mut res = Vec::with_capacity(len);
1184 for i in 0..len {
1185 let start = i.saturating_sub(n_sliding_window);
1186 expected.push(*data[start..i + 1].iter().max().unwrap());
1187
1188 moving_max.push(data[i]);
1189 if i > n_sliding_window {
1190 moving_max.pop();
1191 }
1192 res.push(*moving_max.max().unwrap());
1193 }
1194 assert_eq!(res, expected);
1195 Ok(())
1196 }
1197
1198 #[test]
1199 fn moving_min_tests() -> Result<()> {
1200 moving_min_i32(100, 10)?;
1201 moving_min_i32(100, 20)?;
1202 moving_min_i32(100, 50)?;
1203 moving_min_i32(100, 100)?;
1204 Ok(())
1205 }
1206
1207 #[test]
1208 fn moving_max_tests() -> Result<()> {
1209 moving_max_i32(100, 10)?;
1210 moving_max_i32(100, 20)?;
1211 moving_max_i32(100, 50)?;
1212 moving_max_i32(100, 100)?;
1213 Ok(())
1214 }
1215
1216 #[test]
1217 fn test_min_max_coerce_types() {
1218 let funs: Vec<Box<dyn AggregateUDFImpl>> =
1220 vec![Box::new(Min::new()), Box::new(Max::new())];
1221 let input_types = vec![
1222 vec![DataType::Int32],
1223 vec![DataType::Decimal128(10, 2)],
1224 vec![DataType::Decimal256(1, 1)],
1225 vec![DataType::Utf8],
1226 ];
1227 for fun in funs {
1228 for input_type in &input_types {
1229 let result = fun.coerce_types(input_type);
1230 assert_eq!(*input_type, result.unwrap());
1231 }
1232 }
1233 }
1234
1235 #[test]
1236 fn test_get_min_max_return_type_coerce_dictionary() -> Result<()> {
1237 let data_type =
1238 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
1239 let result = get_min_max_result_type(&[data_type])?;
1240 assert_eq!(result, vec![DataType::Utf8]);
1241 Ok(())
1242 }
1243
1244 #[test]
1245 fn test_min_max_dictionary() -> Result<()> {
1246 let values = StringArray::from(vec!["b", "c", "a", "🦀", "d"]);
1247 let keys = Int32Array::from(vec![Some(0), Some(1), Some(2), None, Some(4)]);
1248 let dict_array =
1249 DictionaryArray::try_new(keys, Arc::new(values) as ArrayRef).unwrap();
1250 let dict_array_ref = Arc::new(dict_array) as ArrayRef;
1251 let rt_type =
1252 get_min_max_result_type(&[dict_array_ref.data_type().clone()])?[0].clone();
1253
1254 let mut min_acc = MinAccumulator::try_new(&rt_type)?;
1255 min_acc.update_batch(&[Arc::clone(&dict_array_ref)])?;
1256 let min_result = min_acc.evaluate()?;
1257 assert_eq!(min_result, ScalarValue::Utf8(Some("a".to_string())));
1258
1259 let mut max_acc = MaxAccumulator::try_new(&rt_type)?;
1260 max_acc.update_batch(&[Arc::clone(&dict_array_ref)])?;
1261 let max_result = max_acc.evaluate()?;
1262 assert_eq!(max_result, ScalarValue::Utf8(Some("d".to_string())));
1263 Ok(())
1264 }
1265
1266 fn dict_scalar(key_type: DataType, inner: ScalarValue) -> ScalarValue {
1267 ScalarValue::Dictionary(Box::new(key_type), Box::new(inner))
1268 }
1269
1270 fn utf8_dict_scalar(key_type: DataType, value: &str) -> ScalarValue {
1271 dict_scalar(key_type, ScalarValue::Utf8(Some(value.to_string())))
1272 }
1273
1274 fn string_dictionary_batch(values: &[&str], keys: &[Option<i32>]) -> ArrayRef {
1275 string_dictionary_batch_with_keys(Int32Array::from(keys.to_vec()), values)
1276 }
1277
1278 fn string_dictionary_batch_with_keys<K>(
1279 keys: PrimitiveArray<K>,
1280 values: &[&str],
1281 ) -> ArrayRef
1282 where
1283 K: ArrowDictionaryKeyType,
1284 {
1285 let values = Arc::new(StringArray::from(values.to_vec())) as ArrayRef;
1286 Arc::new(DictionaryArray::try_new(keys, values).unwrap()) as ArrayRef
1287 }
1288
1289 fn optional_string_dictionary_batch(
1290 values: &[Option<&str>],
1291 keys: &[Option<i32>],
1292 ) -> ArrayRef {
1293 let values = Arc::new(StringArray::from(values.to_vec())) as ArrayRef;
1294 Arc::new(
1295 DictionaryArray::try_new(Int32Array::from(keys.to_vec()), values).unwrap(),
1296 ) as ArrayRef
1297 }
1298
1299 fn float_dictionary_batch(values: &[f32], keys: &[Option<i32>]) -> ArrayRef {
1300 let values = Arc::new(Float32Array::from(values.to_vec())) as ArrayRef;
1301 Arc::new(
1302 DictionaryArray::try_new(Int32Array::from(keys.to_vec()), values).unwrap(),
1303 ) as ArrayRef
1304 }
1305
1306 fn evaluate_dictionary_accumulator(
1307 mut acc: impl Accumulator,
1308 batches: &[ArrayRef],
1309 ) -> Result<ScalarValue> {
1310 for batch in batches {
1311 acc.update_batch(&[Arc::clone(batch)])?;
1312 }
1313 acc.evaluate()
1314 }
1315
1316 fn assert_dictionary_min_max(
1317 dict_type: &DataType,
1318 batches: &[ArrayRef],
1319 expected_min: &str,
1320 expected_max: &str,
1321 ) -> Result<()> {
1322 let key_type = match dict_type {
1323 DataType::Dictionary(key_type, _) => key_type.as_ref().clone(),
1324 other => panic!("expected dictionary type, got {other:?}"),
1325 };
1326
1327 let min_result = evaluate_dictionary_accumulator(
1328 MinAccumulator::try_new(dict_type)?,
1329 batches,
1330 )?;
1331 assert_eq!(min_result, utf8_dict_scalar(key_type.clone(), expected_min));
1332
1333 let max_result = evaluate_dictionary_accumulator(
1334 MaxAccumulator::try_new(dict_type)?,
1335 batches,
1336 )?;
1337 assert_eq!(max_result, utf8_dict_scalar(key_type, expected_max));
1338
1339 Ok(())
1340 }
1341
1342 #[test]
1343 fn test_min_max_dictionary_without_coercion() -> Result<()> {
1344 let dict_array_ref = string_dictionary_batch(
1345 &["b", "c", "a", "d"],
1346 &[Some(0), Some(1), Some(2), Some(3)],
1347 );
1348 let dict_type = dict_array_ref.data_type().clone();
1349
1350 assert_dictionary_min_max(&dict_type, &[dict_array_ref], "a", "d")
1351 }
1352
1353 #[test]
1354 fn test_min_max_dictionary_with_nulls() -> Result<()> {
1355 let dict_array_ref = string_dictionary_batch(
1356 &["b", "c", "a"],
1357 &[None, Some(0), None, Some(1), Some(2)],
1358 );
1359 let dict_type = dict_array_ref.data_type().clone();
1360
1361 assert_dictionary_min_max(&dict_type, &[dict_array_ref], "a", "c")
1362 }
1363
1364 #[test]
1365 fn test_min_max_dictionary_ignores_unreferenced_values() -> Result<()> {
1366 let dict_array_ref =
1367 string_dictionary_batch(&["a", "z", "zz_unused"], &[Some(1), Some(1), None]);
1368 let dict_type = dict_array_ref.data_type().clone();
1369
1370 assert_dictionary_min_max(&dict_type, &[dict_array_ref], "z", "z")
1371 }
1372
1373 #[test]
1374 fn test_min_max_dictionary_ignores_referenced_null_values() -> Result<()> {
1375 let dict_array_ref = optional_string_dictionary_batch(
1376 &[Some("b"), None, Some("a"), Some("d")],
1377 &[Some(0), Some(1), Some(2), Some(3)],
1378 );
1379 let dict_type = dict_array_ref.data_type().clone();
1380
1381 assert_dictionary_min_max(&dict_type, &[dict_array_ref], "a", "d")
1382 }
1383
1384 #[test]
1385 fn test_min_max_dictionary_multi_batch() -> Result<()> {
1386 let dict_type =
1387 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
1388 let batch1 = string_dictionary_batch(&["b", "c"], &[Some(0), Some(1)]);
1389 let batch2 = string_dictionary_batch(&["a", "d"], &[Some(0), Some(1)]);
1390
1391 assert_dictionary_min_max(&dict_type, &[batch1, batch2], "a", "d")
1392 }
1393
1394 #[test]
1395 fn test_min_max_dictionary_int8_keys() -> Result<()> {
1396 let dict_type =
1397 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8));
1398 let dict_array_ref = string_dictionary_batch_with_keys(
1399 Int8Array::from(vec![Some(0), Some(1), Some(2), Some(3)]),
1400 &["b", "c", "a", "d"],
1401 );
1402
1403 assert_dictionary_min_max(&dict_type, &[dict_array_ref], "a", "d")
1404 }
1405
1406 #[test]
1407 fn test_min_max_dictionary_float_with_nans() -> Result<()> {
1408 let dict_type =
1409 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Float32));
1410 let batch1 = float_dictionary_batch(&[0.0, f32::NAN], &[Some(0), Some(1)]);
1411 let batch2 = float_dictionary_batch(&[f32::NEG_INFINITY], &[Some(0)]);
1412
1413 let min_result = evaluate_dictionary_accumulator(
1414 MinAccumulator::try_new(&dict_type)?,
1415 &[Arc::clone(&batch1), Arc::clone(&batch2)],
1416 )?;
1417 assert_eq!(
1418 min_result,
1419 dict_scalar(
1420 DataType::Int32,
1421 ScalarValue::Float32(Some(f32::NEG_INFINITY)),
1422 )
1423 );
1424
1425 let max_result = evaluate_dictionary_accumulator(
1426 MaxAccumulator::try_new(&dict_type)?,
1427 &[batch1, batch2],
1428 )?;
1429 assert_eq!(
1430 max_result,
1431 dict_scalar(DataType::Int32, ScalarValue::Float32(Some(f32::NAN)))
1432 );
1433
1434 Ok(())
1435 }
1436}