1mod min_max_bytes;
22mod min_max_struct;
23
24use arrow::array::{
25 ArrayRef, AsArray as _, BinaryArray, BinaryViewArray, BooleanArray, Date32Array,
26 Date64Array, Decimal128Array, Decimal256Array, DurationMicrosecondArray,
27 DurationMillisecondArray, DurationNanosecondArray, DurationSecondArray, Float16Array,
28 Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
29 IntervalDayTimeArray, IntervalMonthDayNanoArray, IntervalYearMonthArray,
30 LargeBinaryArray, LargeStringArray, StringArray, StringViewArray,
31 Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
32 Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
33 TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array,
34 UInt64Array, UInt8Array,
35};
36use arrow::compute;
37use arrow::datatypes::{
38 DataType, Decimal128Type, Decimal256Type, DurationMicrosecondType,
39 DurationMillisecondType, DurationNanosecondType, DurationSecondType, Float16Type,
40 Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, IntervalUnit,
41 UInt16Type, UInt32Type, UInt64Type, UInt8Type,
42};
43use datafusion_common::stats::Precision;
44use datafusion_common::{
45 downcast_value, exec_err, internal_err, ColumnStatistics, DataFusionError, Result,
46};
47use datafusion_functions_aggregate_common::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator;
48use datafusion_physical_expr::expressions;
49use std::cmp::Ordering;
50use std::fmt::Debug;
51
52use arrow::datatypes::i256;
53use arrow::datatypes::{
54 Date32Type, Date64Type, Time32MillisecondType, Time32SecondType,
55 Time64MicrosecondType, Time64NanosecondType, TimeUnit, TimestampMicrosecondType,
56 TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
57};
58
59use crate::min_max::min_max_bytes::MinMaxBytesAccumulator;
60use crate::min_max::min_max_struct::MinMaxStructAccumulator;
61use datafusion_common::ScalarValue;
62use datafusion_expr::{
63 function::AccumulatorArgs, Accumulator, AggregateUDFImpl, Documentation,
64 SetMonotonicity, Signature, Volatility,
65};
66use datafusion_expr::{GroupsAccumulator, StatisticsArgs};
67use datafusion_macros::user_doc;
68use half::f16;
69use std::mem::size_of_val;
70use std::ops::Deref;
71
72fn get_min_max_result_type(input_types: &[DataType]) -> Result<Vec<DataType>> {
73 if input_types.len() != 1 {
75 return exec_err!(
76 "min/max was called with {} arguments. It requires only 1.",
77 input_types.len()
78 );
79 }
80 match &input_types[0] {
83 DataType::Dictionary(_, dict_value_type) => {
84 Ok(vec![dict_value_type.deref().clone()])
86 }
87 _ => Ok(input_types.to_vec()),
90 }
91}
92
93#[user_doc(
94 doc_section(label = "General Functions"),
95 description = "Returns the maximum value in the specified column.",
96 syntax_example = "max(expression)",
97 sql_example = r#"```sql
98> SELECT max(column_name) FROM table_name;
99+----------------------+
100| max(column_name) |
101+----------------------+
102| 150 |
103+----------------------+
104```"#,
105 standard_argument(name = "expression",)
106)]
107#[derive(Debug)]
109pub struct Max {
110 signature: Signature,
111}
112
113impl Max {
114 pub fn new() -> Self {
115 Self {
116 signature: Signature::user_defined(Volatility::Immutable),
117 }
118 }
119}
120
121impl Default for Max {
122 fn default() -> Self {
123 Self::new()
124 }
125}
126macro_rules! primitive_max_accumulator {
131 ($DATA_TYPE:ident, $NATIVE:ident, $PRIMTYPE:ident) => {{
132 Ok(Box::new(
133 PrimitiveGroupsAccumulator::<$PRIMTYPE, _>::new($DATA_TYPE, |cur, new| {
134 match (new).partial_cmp(cur) {
135 Some(Ordering::Greater) | None => {
136 *cur = new
138 }
139 _ => {}
140 }
141 })
142 .with_starting_value($NATIVE::MIN),
144 ))
145 }};
146}
147
148macro_rules! primitive_min_accumulator {
154 ($DATA_TYPE:ident, $NATIVE:ident, $PRIMTYPE:ident) => {{
155 Ok(Box::new(
156 PrimitiveGroupsAccumulator::<$PRIMTYPE, _>::new(&$DATA_TYPE, |cur, new| {
157 match (new).partial_cmp(cur) {
158 Some(Ordering::Less) | None => {
159 *cur = new
161 }
162 _ => {}
163 }
164 })
165 .with_starting_value($NATIVE::MAX),
167 ))
168 }};
169}
170
171trait FromColumnStatistics {
172 fn value_from_column_statistics(
173 &self,
174 stats: &ColumnStatistics,
175 ) -> Option<ScalarValue>;
176
177 fn value_from_statistics(
178 &self,
179 statistics_args: &StatisticsArgs,
180 ) -> Option<ScalarValue> {
181 if let Precision::Exact(num_rows) = &statistics_args.statistics.num_rows {
182 match *num_rows {
183 0 => return ScalarValue::try_from(statistics_args.return_type).ok(),
184 value if value > 0 => {
185 let col_stats = &statistics_args.statistics.column_statistics;
186 if statistics_args.exprs.len() == 1 {
187 if let Some(col_expr) = statistics_args.exprs[0]
189 .as_any()
190 .downcast_ref::<expressions::Column>()
191 {
192 return self.value_from_column_statistics(
193 &col_stats[col_expr.index()],
194 );
195 }
196 }
197 }
198 _ => {}
199 }
200 }
201 None
202 }
203}
204
205impl FromColumnStatistics for Max {
206 fn value_from_column_statistics(
207 &self,
208 col_stats: &ColumnStatistics,
209 ) -> Option<ScalarValue> {
210 if let Precision::Exact(ref val) = col_stats.max_value {
211 if !val.is_null() {
212 return Some(val.clone());
213 }
214 }
215 None
216 }
217}
218
219impl AggregateUDFImpl for Max {
220 fn as_any(&self) -> &dyn std::any::Any {
221 self
222 }
223
224 fn name(&self) -> &str {
225 "max"
226 }
227
228 fn signature(&self) -> &Signature {
229 &self.signature
230 }
231
232 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
233 Ok(arg_types[0].to_owned())
234 }
235
236 fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
237 Ok(Box::new(MaxAccumulator::try_new(
238 acc_args.return_field.data_type(),
239 )?))
240 }
241
242 fn aliases(&self) -> &[String] {
243 &[]
244 }
245
246 fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool {
247 use DataType::*;
248 matches!(
249 args.return_field.data_type(),
250 Int8 | Int16
251 | Int32
252 | Int64
253 | UInt8
254 | UInt16
255 | UInt32
256 | UInt64
257 | Float16
258 | Float32
259 | Float64
260 | Decimal128(_, _)
261 | Decimal256(_, _)
262 | Date32
263 | Date64
264 | Time32(_)
265 | Time64(_)
266 | Timestamp(_, _)
267 | Utf8
268 | LargeUtf8
269 | Utf8View
270 | Binary
271 | LargeBinary
272 | BinaryView
273 | Duration(_)
274 | Struct(_)
275 )
276 }
277
278 fn create_groups_accumulator(
279 &self,
280 args: AccumulatorArgs,
281 ) -> Result<Box<dyn GroupsAccumulator>> {
282 use DataType::*;
283 use TimeUnit::*;
284 let data_type = args.return_field.data_type();
285 match data_type {
286 Int8 => primitive_max_accumulator!(data_type, i8, Int8Type),
287 Int16 => primitive_max_accumulator!(data_type, i16, Int16Type),
288 Int32 => primitive_max_accumulator!(data_type, i32, Int32Type),
289 Int64 => primitive_max_accumulator!(data_type, i64, Int64Type),
290 UInt8 => primitive_max_accumulator!(data_type, u8, UInt8Type),
291 UInt16 => primitive_max_accumulator!(data_type, u16, UInt16Type),
292 UInt32 => primitive_max_accumulator!(data_type, u32, UInt32Type),
293 UInt64 => primitive_max_accumulator!(data_type, u64, UInt64Type),
294 Float16 => {
295 primitive_max_accumulator!(data_type, f16, Float16Type)
296 }
297 Float32 => {
298 primitive_max_accumulator!(data_type, f32, Float32Type)
299 }
300 Float64 => {
301 primitive_max_accumulator!(data_type, f64, Float64Type)
302 }
303 Date32 => primitive_max_accumulator!(data_type, i32, Date32Type),
304 Date64 => primitive_max_accumulator!(data_type, i64, Date64Type),
305 Time32(Second) => {
306 primitive_max_accumulator!(data_type, i32, Time32SecondType)
307 }
308 Time32(Millisecond) => {
309 primitive_max_accumulator!(data_type, i32, Time32MillisecondType)
310 }
311 Time64(Microsecond) => {
312 primitive_max_accumulator!(data_type, i64, Time64MicrosecondType)
313 }
314 Time64(Nanosecond) => {
315 primitive_max_accumulator!(data_type, i64, Time64NanosecondType)
316 }
317 Timestamp(Second, _) => {
318 primitive_max_accumulator!(data_type, i64, TimestampSecondType)
319 }
320 Timestamp(Millisecond, _) => {
321 primitive_max_accumulator!(data_type, i64, TimestampMillisecondType)
322 }
323 Timestamp(Microsecond, _) => {
324 primitive_max_accumulator!(data_type, i64, TimestampMicrosecondType)
325 }
326 Timestamp(Nanosecond, _) => {
327 primitive_max_accumulator!(data_type, i64, TimestampNanosecondType)
328 }
329 Duration(Second) => {
330 primitive_max_accumulator!(data_type, i64, DurationSecondType)
331 }
332 Duration(Millisecond) => {
333 primitive_max_accumulator!(data_type, i64, DurationMillisecondType)
334 }
335 Duration(Microsecond) => {
336 primitive_max_accumulator!(data_type, i64, DurationMicrosecondType)
337 }
338 Duration(Nanosecond) => {
339 primitive_max_accumulator!(data_type, i64, DurationNanosecondType)
340 }
341 Decimal128(_, _) => {
342 primitive_max_accumulator!(data_type, i128, Decimal128Type)
343 }
344 Decimal256(_, _) => {
345 primitive_max_accumulator!(data_type, i256, Decimal256Type)
346 }
347 Utf8 | LargeUtf8 | Utf8View | Binary | LargeBinary | BinaryView => {
348 Ok(Box::new(MinMaxBytesAccumulator::new_max(data_type.clone())))
349 }
350 Struct(_) => Ok(Box::new(MinMaxStructAccumulator::new_max(
351 data_type.clone(),
352 ))),
353 _ => internal_err!("GroupsAccumulator not supported for max({})", data_type),
355 }
356 }
357
358 fn create_sliding_accumulator(
359 &self,
360 args: AccumulatorArgs,
361 ) -> Result<Box<dyn Accumulator>> {
362 Ok(Box::new(SlidingMaxAccumulator::try_new(
363 args.return_field.data_type(),
364 )?))
365 }
366
367 fn is_descending(&self) -> Option<bool> {
368 Some(true)
369 }
370
371 fn order_sensitivity(&self) -> datafusion_expr::utils::AggregateOrderSensitivity {
372 datafusion_expr::utils::AggregateOrderSensitivity::Insensitive
373 }
374
375 fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
376 get_min_max_result_type(arg_types)
377 }
378 fn reverse_expr(&self) -> datafusion_expr::ReversedUDAF {
379 datafusion_expr::ReversedUDAF::Identical
380 }
381 fn value_from_stats(&self, statistics_args: &StatisticsArgs) -> Option<ScalarValue> {
382 self.value_from_statistics(statistics_args)
383 }
384
385 fn documentation(&self) -> Option<&Documentation> {
386 self.doc()
387 }
388
389 fn set_monotonicity(&self, _data_type: &DataType) -> SetMonotonicity {
390 SetMonotonicity::Increasing
393 }
394}
395
396macro_rules! typed_min_max_batch_string {
398 ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
399 let array = downcast_value!($VALUES, $ARRAYTYPE);
400 let value = compute::$OP(array);
401 let value = value.and_then(|e| Some(e.to_string()));
402 ScalarValue::$SCALAR(value)
403 }};
404}
405macro_rules! typed_min_max_batch_binary {
407 ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
408 let array = downcast_value!($VALUES, $ARRAYTYPE);
409 let value = compute::$OP(array);
410 let value = value.and_then(|e| Some(e.to_vec()));
411 ScalarValue::$SCALAR(value)
412 }};
413}
414
415macro_rules! typed_min_max_batch {
417 ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident $(, $EXTRA_ARGS:ident)*) => {{
418 let array = downcast_value!($VALUES, $ARRAYTYPE);
419 let value = compute::$OP(array);
420 ScalarValue::$SCALAR(value, $($EXTRA_ARGS.clone()),*)
421 }};
422}
423
424macro_rules! min_max_batch {
427 ($VALUES:expr, $OP:ident) => {{
428 match $VALUES.data_type() {
429 DataType::Null => ScalarValue::Null,
430 DataType::Decimal128(precision, scale) => {
431 typed_min_max_batch!(
432 $VALUES,
433 Decimal128Array,
434 Decimal128,
435 $OP,
436 precision,
437 scale
438 )
439 }
440 DataType::Decimal256(precision, scale) => {
441 typed_min_max_batch!(
442 $VALUES,
443 Decimal256Array,
444 Decimal256,
445 $OP,
446 precision,
447 scale
448 )
449 }
450 DataType::Float64 => {
452 typed_min_max_batch!($VALUES, Float64Array, Float64, $OP)
453 }
454 DataType::Float32 => {
455 typed_min_max_batch!($VALUES, Float32Array, Float32, $OP)
456 }
457 DataType::Float16 => {
458 typed_min_max_batch!($VALUES, Float16Array, Float16, $OP)
459 }
460 DataType::Int64 => typed_min_max_batch!($VALUES, Int64Array, Int64, $OP),
461 DataType::Int32 => typed_min_max_batch!($VALUES, Int32Array, Int32, $OP),
462 DataType::Int16 => typed_min_max_batch!($VALUES, Int16Array, Int16, $OP),
463 DataType::Int8 => typed_min_max_batch!($VALUES, Int8Array, Int8, $OP),
464 DataType::UInt64 => typed_min_max_batch!($VALUES, UInt64Array, UInt64, $OP),
465 DataType::UInt32 => typed_min_max_batch!($VALUES, UInt32Array, UInt32, $OP),
466 DataType::UInt16 => typed_min_max_batch!($VALUES, UInt16Array, UInt16, $OP),
467 DataType::UInt8 => typed_min_max_batch!($VALUES, UInt8Array, UInt8, $OP),
468 DataType::Timestamp(TimeUnit::Second, tz_opt) => {
469 typed_min_max_batch!(
470 $VALUES,
471 TimestampSecondArray,
472 TimestampSecond,
473 $OP,
474 tz_opt
475 )
476 }
477 DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => typed_min_max_batch!(
478 $VALUES,
479 TimestampMillisecondArray,
480 TimestampMillisecond,
481 $OP,
482 tz_opt
483 ),
484 DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => typed_min_max_batch!(
485 $VALUES,
486 TimestampMicrosecondArray,
487 TimestampMicrosecond,
488 $OP,
489 tz_opt
490 ),
491 DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => typed_min_max_batch!(
492 $VALUES,
493 TimestampNanosecondArray,
494 TimestampNanosecond,
495 $OP,
496 tz_opt
497 ),
498 DataType::Date32 => typed_min_max_batch!($VALUES, Date32Array, Date32, $OP),
499 DataType::Date64 => typed_min_max_batch!($VALUES, Date64Array, Date64, $OP),
500 DataType::Time32(TimeUnit::Second) => {
501 typed_min_max_batch!($VALUES, Time32SecondArray, Time32Second, $OP)
502 }
503 DataType::Time32(TimeUnit::Millisecond) => {
504 typed_min_max_batch!(
505 $VALUES,
506 Time32MillisecondArray,
507 Time32Millisecond,
508 $OP
509 )
510 }
511 DataType::Time64(TimeUnit::Microsecond) => {
512 typed_min_max_batch!(
513 $VALUES,
514 Time64MicrosecondArray,
515 Time64Microsecond,
516 $OP
517 )
518 }
519 DataType::Time64(TimeUnit::Nanosecond) => {
520 typed_min_max_batch!(
521 $VALUES,
522 Time64NanosecondArray,
523 Time64Nanosecond,
524 $OP
525 )
526 }
527 DataType::Interval(IntervalUnit::YearMonth) => {
528 typed_min_max_batch!(
529 $VALUES,
530 IntervalYearMonthArray,
531 IntervalYearMonth,
532 $OP
533 )
534 }
535 DataType::Interval(IntervalUnit::DayTime) => {
536 typed_min_max_batch!($VALUES, IntervalDayTimeArray, IntervalDayTime, $OP)
537 }
538 DataType::Interval(IntervalUnit::MonthDayNano) => {
539 typed_min_max_batch!(
540 $VALUES,
541 IntervalMonthDayNanoArray,
542 IntervalMonthDayNano,
543 $OP
544 )
545 }
546 DataType::Duration(TimeUnit::Second) => {
547 typed_min_max_batch!($VALUES, DurationSecondArray, DurationSecond, $OP)
548 }
549 DataType::Duration(TimeUnit::Millisecond) => {
550 typed_min_max_batch!(
551 $VALUES,
552 DurationMillisecondArray,
553 DurationMillisecond,
554 $OP
555 )
556 }
557 DataType::Duration(TimeUnit::Microsecond) => {
558 typed_min_max_batch!(
559 $VALUES,
560 DurationMicrosecondArray,
561 DurationMicrosecond,
562 $OP
563 )
564 }
565 DataType::Duration(TimeUnit::Nanosecond) => {
566 typed_min_max_batch!(
567 $VALUES,
568 DurationNanosecondArray,
569 DurationNanosecond,
570 $OP
571 )
572 }
573 other => {
574 return internal_err!(
576 "Min/Max accumulator not implemented for type {:?}",
577 other
578 );
579 }
580 }
581 }};
582}
583
584fn min_batch(values: &ArrayRef) -> Result<ScalarValue> {
586 Ok(match values.data_type() {
587 DataType::Utf8 => {
588 typed_min_max_batch_string!(values, StringArray, Utf8, min_string)
589 }
590 DataType::LargeUtf8 => {
591 typed_min_max_batch_string!(values, LargeStringArray, LargeUtf8, min_string)
592 }
593 DataType::Utf8View => {
594 typed_min_max_batch_string!(
595 values,
596 StringViewArray,
597 Utf8View,
598 min_string_view
599 )
600 }
601 DataType::Boolean => {
602 typed_min_max_batch!(values, BooleanArray, Boolean, min_boolean)
603 }
604 DataType::Binary => {
605 typed_min_max_batch_binary!(&values, BinaryArray, Binary, min_binary)
606 }
607 DataType::LargeBinary => {
608 typed_min_max_batch_binary!(
609 &values,
610 LargeBinaryArray,
611 LargeBinary,
612 min_binary
613 )
614 }
615 DataType::BinaryView => {
616 typed_min_max_batch_binary!(
617 &values,
618 BinaryViewArray,
619 BinaryView,
620 min_binary_view
621 )
622 }
623 DataType::Struct(_) => min_max_batch_generic(values, Ordering::Greater)?,
624 DataType::List(_) => min_max_batch_generic(values, Ordering::Greater)?,
625 DataType::LargeList(_) => min_max_batch_generic(values, Ordering::Greater)?,
626 DataType::FixedSizeList(_, _) => {
627 min_max_batch_generic(values, Ordering::Greater)?
628 }
629 DataType::Dictionary(_, _) => {
630 let values = values.as_any_dictionary().values();
631 min_batch(values)?
632 }
633 _ => min_max_batch!(values, min),
634 })
635}
636
637fn min_max_batch_generic(array: &ArrayRef, ordering: Ordering) -> Result<ScalarValue> {
638 if array.len() == array.null_count() {
639 return ScalarValue::try_from(array.data_type());
640 }
641 let mut extreme = ScalarValue::try_from_array(array, 0)?;
642 for i in 1..array.len() {
643 let current = ScalarValue::try_from_array(array, i)?;
644 if current.is_null() {
645 continue;
646 }
647 if extreme.is_null() {
648 extreme = current;
649 continue;
650 }
651 if let Some(cmp) = extreme.partial_cmp(¤t) {
652 if cmp == ordering {
653 extreme = current;
654 }
655 }
656 }
657
658 Ok(extreme)
659}
660
661macro_rules! min_max_generic {
662 ($VALUE:expr, $DELTA:expr, $OP:ident) => {{
663 if $VALUE.is_null() {
664 let mut delta_copy = $DELTA.clone();
665 delta_copy.compact();
668 delta_copy
669 } else if $DELTA.is_null() {
670 $VALUE.clone()
671 } else {
672 match $VALUE.partial_cmp(&$DELTA) {
673 Some(choose_min_max!($OP)) => {
674 let mut delta_copy = $DELTA.clone();
677 delta_copy.compact();
678 delta_copy
679 }
680 _ => $VALUE.clone(),
681 }
682 }
683 }};
684}
685
686pub fn max_batch(values: &ArrayRef) -> Result<ScalarValue> {
688 Ok(match values.data_type() {
689 DataType::Utf8 => {
690 typed_min_max_batch_string!(values, StringArray, Utf8, max_string)
691 }
692 DataType::LargeUtf8 => {
693 typed_min_max_batch_string!(values, LargeStringArray, LargeUtf8, max_string)
694 }
695 DataType::Utf8View => {
696 typed_min_max_batch_string!(
697 values,
698 StringViewArray,
699 Utf8View,
700 max_string_view
701 )
702 }
703 DataType::Boolean => {
704 typed_min_max_batch!(values, BooleanArray, Boolean, max_boolean)
705 }
706 DataType::Binary => {
707 typed_min_max_batch_binary!(&values, BinaryArray, Binary, max_binary)
708 }
709 DataType::BinaryView => {
710 typed_min_max_batch_binary!(
711 &values,
712 BinaryViewArray,
713 BinaryView,
714 max_binary_view
715 )
716 }
717 DataType::LargeBinary => {
718 typed_min_max_batch_binary!(
719 &values,
720 LargeBinaryArray,
721 LargeBinary,
722 max_binary
723 )
724 }
725 DataType::Struct(_) => min_max_batch_generic(values, Ordering::Less)?,
726 DataType::List(_) => min_max_batch_generic(values, Ordering::Less)?,
727 DataType::LargeList(_) => min_max_batch_generic(values, Ordering::Less)?,
728 DataType::FixedSizeList(_, _) => min_max_batch_generic(values, Ordering::Less)?,
729 DataType::Dictionary(_, _) => {
730 let values = values.as_any_dictionary().values();
731 max_batch(values)?
732 }
733 _ => min_max_batch!(values, max),
734 })
735}
736
737macro_rules! typed_min_max {
739 ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident $(, $EXTRA_ARGS:ident)*) => {{
740 ScalarValue::$SCALAR(
741 match ($VALUE, $DELTA) {
742 (None, None) => None,
743 (Some(a), None) => Some(*a),
744 (None, Some(b)) => Some(*b),
745 (Some(a), Some(b)) => Some((*a).$OP(*b)),
746 },
747 $($EXTRA_ARGS.clone()),*
748 )
749 }};
750}
751macro_rules! typed_min_max_float {
752 ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident) => {{
753 ScalarValue::$SCALAR(match ($VALUE, $DELTA) {
754 (None, None) => None,
755 (Some(a), None) => Some(*a),
756 (None, Some(b)) => Some(*b),
757 (Some(a), Some(b)) => match a.total_cmp(b) {
758 choose_min_max!($OP) => Some(*b),
759 _ => Some(*a),
760 },
761 })
762 }};
763}
764
765macro_rules! typed_min_max_string {
767 ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident) => {{
768 ScalarValue::$SCALAR(match ($VALUE, $DELTA) {
769 (None, None) => None,
770 (Some(a), None) => Some(a.clone()),
771 (None, Some(b)) => Some(b.clone()),
772 (Some(a), Some(b)) => Some((a).$OP(b).clone()),
773 })
774 }};
775}
776
777macro_rules! choose_min_max {
778 (min) => {
779 std::cmp::Ordering::Greater
780 };
781 (max) => {
782 std::cmp::Ordering::Less
783 };
784}
785
786macro_rules! interval_min_max {
787 ($OP:tt, $LHS:expr, $RHS:expr) => {{
788 match $LHS.partial_cmp(&$RHS) {
789 Some(choose_min_max!($OP)) => $RHS.clone(),
790 Some(_) => $LHS.clone(),
791 None => {
792 return internal_err!("Comparison error while computing interval min/max")
793 }
794 }
795 }};
796}
797
798macro_rules! min_max {
800 ($VALUE:expr, $DELTA:expr, $OP:ident) => {{
801 Ok(match ($VALUE, $DELTA) {
802 (ScalarValue::Null, ScalarValue::Null) => ScalarValue::Null,
803 (
804 lhs @ ScalarValue::Decimal128(lhsv, lhsp, lhss),
805 rhs @ ScalarValue::Decimal128(rhsv, rhsp, rhss)
806 ) => {
807 if lhsp.eq(rhsp) && lhss.eq(rhss) {
808 typed_min_max!(lhsv, rhsv, Decimal128, $OP, lhsp, lhss)
809 } else {
810 return internal_err!(
811 "MIN/MAX is not expected to receive scalars of incompatible types {:?}",
812 (lhs, rhs)
813 );
814 }
815 }
816 (
817 lhs @ ScalarValue::Decimal256(lhsv, lhsp, lhss),
818 rhs @ ScalarValue::Decimal256(rhsv, rhsp, rhss)
819 ) => {
820 if lhsp.eq(rhsp) && lhss.eq(rhss) {
821 typed_min_max!(lhsv, rhsv, Decimal256, $OP, lhsp, lhss)
822 } else {
823 return internal_err!(
824 "MIN/MAX is not expected to receive scalars of incompatible types {:?}",
825 (lhs, rhs)
826 );
827 }
828 }
829 (ScalarValue::Boolean(lhs), ScalarValue::Boolean(rhs)) => {
830 typed_min_max!(lhs, rhs, Boolean, $OP)
831 }
832 (ScalarValue::Float64(lhs), ScalarValue::Float64(rhs)) => {
833 typed_min_max_float!(lhs, rhs, Float64, $OP)
834 }
835 (ScalarValue::Float32(lhs), ScalarValue::Float32(rhs)) => {
836 typed_min_max_float!(lhs, rhs, Float32, $OP)
837 }
838 (ScalarValue::Float16(lhs), ScalarValue::Float16(rhs)) => {
839 typed_min_max_float!(lhs, rhs, Float16, $OP)
840 }
841 (ScalarValue::UInt64(lhs), ScalarValue::UInt64(rhs)) => {
842 typed_min_max!(lhs, rhs, UInt64, $OP)
843 }
844 (ScalarValue::UInt32(lhs), ScalarValue::UInt32(rhs)) => {
845 typed_min_max!(lhs, rhs, UInt32, $OP)
846 }
847 (ScalarValue::UInt16(lhs), ScalarValue::UInt16(rhs)) => {
848 typed_min_max!(lhs, rhs, UInt16, $OP)
849 }
850 (ScalarValue::UInt8(lhs), ScalarValue::UInt8(rhs)) => {
851 typed_min_max!(lhs, rhs, UInt8, $OP)
852 }
853 (ScalarValue::Int64(lhs), ScalarValue::Int64(rhs)) => {
854 typed_min_max!(lhs, rhs, Int64, $OP)
855 }
856 (ScalarValue::Int32(lhs), ScalarValue::Int32(rhs)) => {
857 typed_min_max!(lhs, rhs, Int32, $OP)
858 }
859 (ScalarValue::Int16(lhs), ScalarValue::Int16(rhs)) => {
860 typed_min_max!(lhs, rhs, Int16, $OP)
861 }
862 (ScalarValue::Int8(lhs), ScalarValue::Int8(rhs)) => {
863 typed_min_max!(lhs, rhs, Int8, $OP)
864 }
865 (ScalarValue::Utf8(lhs), ScalarValue::Utf8(rhs)) => {
866 typed_min_max_string!(lhs, rhs, Utf8, $OP)
867 }
868 (ScalarValue::LargeUtf8(lhs), ScalarValue::LargeUtf8(rhs)) => {
869 typed_min_max_string!(lhs, rhs, LargeUtf8, $OP)
870 }
871 (ScalarValue::Utf8View(lhs), ScalarValue::Utf8View(rhs)) => {
872 typed_min_max_string!(lhs, rhs, Utf8View, $OP)
873 }
874 (ScalarValue::Binary(lhs), ScalarValue::Binary(rhs)) => {
875 typed_min_max_string!(lhs, rhs, Binary, $OP)
876 }
877 (ScalarValue::LargeBinary(lhs), ScalarValue::LargeBinary(rhs)) => {
878 typed_min_max_string!(lhs, rhs, LargeBinary, $OP)
879 }
880 (ScalarValue::BinaryView(lhs), ScalarValue::BinaryView(rhs)) => {
881 typed_min_max_string!(lhs, rhs, BinaryView, $OP)
882 }
883 (ScalarValue::TimestampSecond(lhs, l_tz), ScalarValue::TimestampSecond(rhs, _)) => {
884 typed_min_max!(lhs, rhs, TimestampSecond, $OP, l_tz)
885 }
886 (
887 ScalarValue::TimestampMillisecond(lhs, l_tz),
888 ScalarValue::TimestampMillisecond(rhs, _),
889 ) => {
890 typed_min_max!(lhs, rhs, TimestampMillisecond, $OP, l_tz)
891 }
892 (
893 ScalarValue::TimestampMicrosecond(lhs, l_tz),
894 ScalarValue::TimestampMicrosecond(rhs, _),
895 ) => {
896 typed_min_max!(lhs, rhs, TimestampMicrosecond, $OP, l_tz)
897 }
898 (
899 ScalarValue::TimestampNanosecond(lhs, l_tz),
900 ScalarValue::TimestampNanosecond(rhs, _),
901 ) => {
902 typed_min_max!(lhs, rhs, TimestampNanosecond, $OP, l_tz)
903 }
904 (
905 ScalarValue::Date32(lhs),
906 ScalarValue::Date32(rhs),
907 ) => {
908 typed_min_max!(lhs, rhs, Date32, $OP)
909 }
910 (
911 ScalarValue::Date64(lhs),
912 ScalarValue::Date64(rhs),
913 ) => {
914 typed_min_max!(lhs, rhs, Date64, $OP)
915 }
916 (
917 ScalarValue::Time32Second(lhs),
918 ScalarValue::Time32Second(rhs),
919 ) => {
920 typed_min_max!(lhs, rhs, Time32Second, $OP)
921 }
922 (
923 ScalarValue::Time32Millisecond(lhs),
924 ScalarValue::Time32Millisecond(rhs),
925 ) => {
926 typed_min_max!(lhs, rhs, Time32Millisecond, $OP)
927 }
928 (
929 ScalarValue::Time64Microsecond(lhs),
930 ScalarValue::Time64Microsecond(rhs),
931 ) => {
932 typed_min_max!(lhs, rhs, Time64Microsecond, $OP)
933 }
934 (
935 ScalarValue::Time64Nanosecond(lhs),
936 ScalarValue::Time64Nanosecond(rhs),
937 ) => {
938 typed_min_max!(lhs, rhs, Time64Nanosecond, $OP)
939 }
940 (
941 ScalarValue::IntervalYearMonth(lhs),
942 ScalarValue::IntervalYearMonth(rhs),
943 ) => {
944 typed_min_max!(lhs, rhs, IntervalYearMonth, $OP)
945 }
946 (
947 ScalarValue::IntervalMonthDayNano(lhs),
948 ScalarValue::IntervalMonthDayNano(rhs),
949 ) => {
950 typed_min_max!(lhs, rhs, IntervalMonthDayNano, $OP)
951 }
952 (
953 ScalarValue::IntervalDayTime(lhs),
954 ScalarValue::IntervalDayTime(rhs),
955 ) => {
956 typed_min_max!(lhs, rhs, IntervalDayTime, $OP)
957 }
958 (
959 ScalarValue::IntervalYearMonth(_),
960 ScalarValue::IntervalMonthDayNano(_),
961 ) | (
962 ScalarValue::IntervalYearMonth(_),
963 ScalarValue::IntervalDayTime(_),
964 ) | (
965 ScalarValue::IntervalMonthDayNano(_),
966 ScalarValue::IntervalDayTime(_),
967 ) | (
968 ScalarValue::IntervalMonthDayNano(_),
969 ScalarValue::IntervalYearMonth(_),
970 ) | (
971 ScalarValue::IntervalDayTime(_),
972 ScalarValue::IntervalYearMonth(_),
973 ) | (
974 ScalarValue::IntervalDayTime(_),
975 ScalarValue::IntervalMonthDayNano(_),
976 ) => {
977 interval_min_max!($OP, $VALUE, $DELTA)
978 }
979 (
980 ScalarValue::DurationSecond(lhs),
981 ScalarValue::DurationSecond(rhs),
982 ) => {
983 typed_min_max!(lhs, rhs, DurationSecond, $OP)
984 }
985 (
986 ScalarValue::DurationMillisecond(lhs),
987 ScalarValue::DurationMillisecond(rhs),
988 ) => {
989 typed_min_max!(lhs, rhs, DurationMillisecond, $OP)
990 }
991 (
992 ScalarValue::DurationMicrosecond(lhs),
993 ScalarValue::DurationMicrosecond(rhs),
994 ) => {
995 typed_min_max!(lhs, rhs, DurationMicrosecond, $OP)
996 }
997 (
998 ScalarValue::DurationNanosecond(lhs),
999 ScalarValue::DurationNanosecond(rhs),
1000 ) => {
1001 typed_min_max!(lhs, rhs, DurationNanosecond, $OP)
1002 }
1003
1004 (
1005 lhs @ ScalarValue::Struct(_),
1006 rhs @ ScalarValue::Struct(_),
1007 ) => {
1008 min_max_generic!(lhs, rhs, $OP)
1009 }
1010
1011 (
1012 lhs @ ScalarValue::List(_),
1013 rhs @ ScalarValue::List(_),
1014 ) => {
1015 min_max_generic!(lhs, rhs, $OP)
1016 }
1017
1018
1019 (
1020 lhs @ ScalarValue::LargeList(_),
1021 rhs @ ScalarValue::LargeList(_),
1022 ) => {
1023 min_max_generic!(lhs, rhs, $OP)
1024 }
1025
1026
1027 (
1028 lhs @ ScalarValue::FixedSizeList(_),
1029 rhs @ ScalarValue::FixedSizeList(_),
1030 ) => {
1031 min_max_generic!(lhs, rhs, $OP)
1032 }
1033
1034 e => {
1035 return internal_err!(
1036 "MIN/MAX is not expected to receive scalars of incompatible types {:?}",
1037 e
1038 )
1039 }
1040 })
1041 }};
1042}
1043
1044#[derive(Debug)]
1046pub struct MaxAccumulator {
1047 max: ScalarValue,
1048}
1049
1050impl MaxAccumulator {
1051 pub fn try_new(datatype: &DataType) -> Result<Self> {
1053 Ok(Self {
1054 max: ScalarValue::try_from(datatype)?,
1055 })
1056 }
1057}
1058
1059impl Accumulator for MaxAccumulator {
1060 fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
1061 let values = &values[0];
1062 let delta = &max_batch(values)?;
1063 let new_max: Result<ScalarValue, DataFusionError> =
1064 min_max!(&self.max, delta, max);
1065 self.max = new_max?;
1066 Ok(())
1067 }
1068
1069 fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
1070 self.update_batch(states)
1071 }
1072
1073 fn state(&mut self) -> Result<Vec<ScalarValue>> {
1074 Ok(vec![self.evaluate()?])
1075 }
1076 fn evaluate(&mut self) -> Result<ScalarValue> {
1077 Ok(self.max.clone())
1078 }
1079
1080 fn size(&self) -> usize {
1081 size_of_val(self) - size_of_val(&self.max) + self.max.size()
1082 }
1083}
1084
1085#[derive(Debug)]
1086pub struct SlidingMaxAccumulator {
1087 max: ScalarValue,
1088 moving_max: MovingMax<ScalarValue>,
1089}
1090
1091impl SlidingMaxAccumulator {
1092 pub fn try_new(datatype: &DataType) -> Result<Self> {
1094 Ok(Self {
1095 max: ScalarValue::try_from(datatype)?,
1096 moving_max: MovingMax::<ScalarValue>::new(),
1097 })
1098 }
1099}
1100
1101impl Accumulator for SlidingMaxAccumulator {
1102 fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
1103 for idx in 0..values[0].len() {
1104 let val = ScalarValue::try_from_array(&values[0], idx)?;
1105 self.moving_max.push(val);
1106 }
1107 if let Some(res) = self.moving_max.max() {
1108 self.max = res.clone();
1109 }
1110 Ok(())
1111 }
1112
1113 fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
1114 for _idx in 0..values[0].len() {
1115 (self.moving_max).pop();
1116 }
1117 if let Some(res) = self.moving_max.max() {
1118 self.max = res.clone();
1119 }
1120 Ok(())
1121 }
1122
1123 fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
1124 self.update_batch(states)
1125 }
1126
1127 fn state(&mut self) -> Result<Vec<ScalarValue>> {
1128 Ok(vec![self.max.clone()])
1129 }
1130
1131 fn evaluate(&mut self) -> Result<ScalarValue> {
1132 Ok(self.max.clone())
1133 }
1134
1135 fn supports_retract_batch(&self) -> bool {
1136 true
1137 }
1138
1139 fn size(&self) -> usize {
1140 size_of_val(self) - size_of_val(&self.max) + self.max.size()
1141 }
1142}
1143
1144#[user_doc(
1145 doc_section(label = "General Functions"),
1146 description = "Returns the minimum value in the specified column.",
1147 syntax_example = "min(expression)",
1148 sql_example = r#"```sql
1149> SELECT min(column_name) FROM table_name;
1150+----------------------+
1151| min(column_name) |
1152+----------------------+
1153| 12 |
1154+----------------------+
1155```"#,
1156 standard_argument(name = "expression",)
1157)]
1158#[derive(Debug)]
1159pub struct Min {
1160 signature: Signature,
1161}
1162
1163impl Min {
1164 pub fn new() -> Self {
1165 Self {
1166 signature: Signature::user_defined(Volatility::Immutable),
1167 }
1168 }
1169}
1170
1171impl Default for Min {
1172 fn default() -> Self {
1173 Self::new()
1174 }
1175}
1176
1177impl FromColumnStatistics for Min {
1178 fn value_from_column_statistics(
1179 &self,
1180 col_stats: &ColumnStatistics,
1181 ) -> Option<ScalarValue> {
1182 if let Precision::Exact(ref val) = col_stats.min_value {
1183 if !val.is_null() {
1184 return Some(val.clone());
1185 }
1186 }
1187 None
1188 }
1189}
1190
1191impl AggregateUDFImpl for Min {
1192 fn as_any(&self) -> &dyn std::any::Any {
1193 self
1194 }
1195
1196 fn name(&self) -> &str {
1197 "min"
1198 }
1199
1200 fn signature(&self) -> &Signature {
1201 &self.signature
1202 }
1203
1204 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
1205 Ok(arg_types[0].to_owned())
1206 }
1207
1208 fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
1209 Ok(Box::new(MinAccumulator::try_new(
1210 acc_args.return_field.data_type(),
1211 )?))
1212 }
1213
1214 fn aliases(&self) -> &[String] {
1215 &[]
1216 }
1217
1218 fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool {
1219 use DataType::*;
1220 matches!(
1221 args.return_field.data_type(),
1222 Int8 | Int16
1223 | Int32
1224 | Int64
1225 | UInt8
1226 | UInt16
1227 | UInt32
1228 | UInt64
1229 | Float16
1230 | Float32
1231 | Float64
1232 | Decimal128(_, _)
1233 | Decimal256(_, _)
1234 | Date32
1235 | Date64
1236 | Time32(_)
1237 | Time64(_)
1238 | Timestamp(_, _)
1239 | Utf8
1240 | LargeUtf8
1241 | Utf8View
1242 | Binary
1243 | LargeBinary
1244 | BinaryView
1245 | Duration(_)
1246 | Struct(_)
1247 )
1248 }
1249
1250 fn create_groups_accumulator(
1251 &self,
1252 args: AccumulatorArgs,
1253 ) -> Result<Box<dyn GroupsAccumulator>> {
1254 use DataType::*;
1255 use TimeUnit::*;
1256 let data_type = args.return_field.data_type();
1257 match data_type {
1258 Int8 => primitive_min_accumulator!(data_type, i8, Int8Type),
1259 Int16 => primitive_min_accumulator!(data_type, i16, Int16Type),
1260 Int32 => primitive_min_accumulator!(data_type, i32, Int32Type),
1261 Int64 => primitive_min_accumulator!(data_type, i64, Int64Type),
1262 UInt8 => primitive_min_accumulator!(data_type, u8, UInt8Type),
1263 UInt16 => primitive_min_accumulator!(data_type, u16, UInt16Type),
1264 UInt32 => primitive_min_accumulator!(data_type, u32, UInt32Type),
1265 UInt64 => primitive_min_accumulator!(data_type, u64, UInt64Type),
1266 Float16 => {
1267 primitive_min_accumulator!(data_type, f16, Float16Type)
1268 }
1269 Float32 => {
1270 primitive_min_accumulator!(data_type, f32, Float32Type)
1271 }
1272 Float64 => {
1273 primitive_min_accumulator!(data_type, f64, Float64Type)
1274 }
1275 Date32 => primitive_min_accumulator!(data_type, i32, Date32Type),
1276 Date64 => primitive_min_accumulator!(data_type, i64, Date64Type),
1277 Time32(Second) => {
1278 primitive_min_accumulator!(data_type, i32, Time32SecondType)
1279 }
1280 Time32(Millisecond) => {
1281 primitive_min_accumulator!(data_type, i32, Time32MillisecondType)
1282 }
1283 Time64(Microsecond) => {
1284 primitive_min_accumulator!(data_type, i64, Time64MicrosecondType)
1285 }
1286 Time64(Nanosecond) => {
1287 primitive_min_accumulator!(data_type, i64, Time64NanosecondType)
1288 }
1289 Timestamp(Second, _) => {
1290 primitive_min_accumulator!(data_type, i64, TimestampSecondType)
1291 }
1292 Timestamp(Millisecond, _) => {
1293 primitive_min_accumulator!(data_type, i64, TimestampMillisecondType)
1294 }
1295 Timestamp(Microsecond, _) => {
1296 primitive_min_accumulator!(data_type, i64, TimestampMicrosecondType)
1297 }
1298 Timestamp(Nanosecond, _) => {
1299 primitive_min_accumulator!(data_type, i64, TimestampNanosecondType)
1300 }
1301 Duration(Second) => {
1302 primitive_min_accumulator!(data_type, i64, DurationSecondType)
1303 }
1304 Duration(Millisecond) => {
1305 primitive_min_accumulator!(data_type, i64, DurationMillisecondType)
1306 }
1307 Duration(Microsecond) => {
1308 primitive_min_accumulator!(data_type, i64, DurationMicrosecondType)
1309 }
1310 Duration(Nanosecond) => {
1311 primitive_min_accumulator!(data_type, i64, DurationNanosecondType)
1312 }
1313 Decimal128(_, _) => {
1314 primitive_min_accumulator!(data_type, i128, Decimal128Type)
1315 }
1316 Decimal256(_, _) => {
1317 primitive_min_accumulator!(data_type, i256, Decimal256Type)
1318 }
1319 Utf8 | LargeUtf8 | Utf8View | Binary | LargeBinary | BinaryView => {
1320 Ok(Box::new(MinMaxBytesAccumulator::new_min(data_type.clone())))
1321 }
1322 Struct(_) => Ok(Box::new(MinMaxStructAccumulator::new_min(
1323 data_type.clone(),
1324 ))),
1325 _ => internal_err!("GroupsAccumulator not supported for min({})", data_type),
1327 }
1328 }
1329
1330 fn create_sliding_accumulator(
1331 &self,
1332 args: AccumulatorArgs,
1333 ) -> Result<Box<dyn Accumulator>> {
1334 Ok(Box::new(SlidingMinAccumulator::try_new(
1335 args.return_field.data_type(),
1336 )?))
1337 }
1338
1339 fn is_descending(&self) -> Option<bool> {
1340 Some(false)
1341 }
1342
1343 fn value_from_stats(&self, statistics_args: &StatisticsArgs) -> Option<ScalarValue> {
1344 self.value_from_statistics(statistics_args)
1345 }
1346 fn order_sensitivity(&self) -> datafusion_expr::utils::AggregateOrderSensitivity {
1347 datafusion_expr::utils::AggregateOrderSensitivity::Insensitive
1348 }
1349
1350 fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
1351 get_min_max_result_type(arg_types)
1352 }
1353
1354 fn reverse_expr(&self) -> datafusion_expr::ReversedUDAF {
1355 datafusion_expr::ReversedUDAF::Identical
1356 }
1357
1358 fn documentation(&self) -> Option<&Documentation> {
1359 self.doc()
1360 }
1361
1362 fn set_monotonicity(&self, _data_type: &DataType) -> SetMonotonicity {
1363 SetMonotonicity::Decreasing
1366 }
1367}
1368
1369#[derive(Debug)]
1371pub struct MinAccumulator {
1372 min: ScalarValue,
1373}
1374
1375impl MinAccumulator {
1376 pub fn try_new(datatype: &DataType) -> Result<Self> {
1378 Ok(Self {
1379 min: ScalarValue::try_from(datatype)?,
1380 })
1381 }
1382}
1383
1384impl Accumulator for MinAccumulator {
1385 fn state(&mut self) -> Result<Vec<ScalarValue>> {
1386 Ok(vec![self.evaluate()?])
1387 }
1388
1389 fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
1390 let values = &values[0];
1391 let delta = &min_batch(values)?;
1392 let new_min: Result<ScalarValue, DataFusionError> =
1393 min_max!(&self.min, delta, min);
1394 self.min = new_min?;
1395 Ok(())
1396 }
1397
1398 fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
1399 self.update_batch(states)
1400 }
1401
1402 fn evaluate(&mut self) -> Result<ScalarValue> {
1403 Ok(self.min.clone())
1404 }
1405
1406 fn size(&self) -> usize {
1407 size_of_val(self) - size_of_val(&self.min) + self.min.size()
1408 }
1409}
1410
1411#[derive(Debug)]
1412pub struct SlidingMinAccumulator {
1413 min: ScalarValue,
1414 moving_min: MovingMin<ScalarValue>,
1415}
1416
1417impl SlidingMinAccumulator {
1418 pub fn try_new(datatype: &DataType) -> Result<Self> {
1419 Ok(Self {
1420 min: ScalarValue::try_from(datatype)?,
1421 moving_min: MovingMin::<ScalarValue>::new(),
1422 })
1423 }
1424}
1425
1426impl Accumulator for SlidingMinAccumulator {
1427 fn state(&mut self) -> Result<Vec<ScalarValue>> {
1428 Ok(vec![self.min.clone()])
1429 }
1430
1431 fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
1432 for idx in 0..values[0].len() {
1433 let val = ScalarValue::try_from_array(&values[0], idx)?;
1434 if !val.is_null() {
1435 self.moving_min.push(val);
1436 }
1437 }
1438 if let Some(res) = self.moving_min.min() {
1439 self.min = res.clone();
1440 }
1441 Ok(())
1442 }
1443
1444 fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
1445 for idx in 0..values[0].len() {
1446 let val = ScalarValue::try_from_array(&values[0], idx)?;
1447 if !val.is_null() {
1448 (self.moving_min).pop();
1449 }
1450 }
1451 if let Some(res) = self.moving_min.min() {
1452 self.min = res.clone();
1453 }
1454 Ok(())
1455 }
1456
1457 fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
1458 self.update_batch(states)
1459 }
1460
1461 fn evaluate(&mut self) -> Result<ScalarValue> {
1462 Ok(self.min.clone())
1463 }
1464
1465 fn supports_retract_batch(&self) -> bool {
1466 true
1467 }
1468
1469 fn size(&self) -> usize {
1470 size_of_val(self) - size_of_val(&self.min) + self.min.size()
1471 }
1472}
1473
1474#[derive(Debug)]
1512pub struct MovingMin<T> {
1513 push_stack: Vec<(T, T)>,
1514 pop_stack: Vec<(T, T)>,
1515}
1516
1517impl<T: Clone + PartialOrd> Default for MovingMin<T> {
1518 fn default() -> Self {
1519 Self {
1520 push_stack: Vec::new(),
1521 pop_stack: Vec::new(),
1522 }
1523 }
1524}
1525
1526impl<T: Clone + PartialOrd> MovingMin<T> {
1527 #[inline]
1530 pub fn new() -> Self {
1531 Self::default()
1532 }
1533
1534 #[inline]
1537 pub fn with_capacity(capacity: usize) -> Self {
1538 Self {
1539 push_stack: Vec::with_capacity(capacity),
1540 pop_stack: Vec::with_capacity(capacity),
1541 }
1542 }
1543
1544 #[inline]
1547 pub fn min(&self) -> Option<&T> {
1548 match (self.push_stack.last(), self.pop_stack.last()) {
1549 (None, None) => None,
1550 (Some((_, min)), None) => Some(min),
1551 (None, Some((_, min))) => Some(min),
1552 (Some((_, a)), Some((_, b))) => Some(if a < b { a } else { b }),
1553 }
1554 }
1555
1556 #[inline]
1558 pub fn push(&mut self, val: T) {
1559 self.push_stack.push(match self.push_stack.last() {
1560 Some((_, min)) => {
1561 if val > *min {
1562 (val, min.clone())
1563 } else {
1564 (val.clone(), val)
1565 }
1566 }
1567 None => (val.clone(), val),
1568 });
1569 }
1570
1571 #[inline]
1573 pub fn pop(&mut self) -> Option<T> {
1574 if self.pop_stack.is_empty() {
1575 match self.push_stack.pop() {
1576 Some((val, _)) => {
1577 let mut last = (val.clone(), val);
1578 self.pop_stack.push(last.clone());
1579 while let Some((val, _)) = self.push_stack.pop() {
1580 let min = if last.1 < val {
1581 last.1.clone()
1582 } else {
1583 val.clone()
1584 };
1585 last = (val.clone(), min);
1586 self.pop_stack.push(last.clone());
1587 }
1588 }
1589 None => return None,
1590 }
1591 }
1592 self.pop_stack.pop().map(|(val, _)| val)
1593 }
1594
1595 #[inline]
1597 pub fn len(&self) -> usize {
1598 self.push_stack.len() + self.pop_stack.len()
1599 }
1600
1601 #[inline]
1603 pub fn is_empty(&self) -> bool {
1604 self.len() == 0
1605 }
1606}
1607
1608#[derive(Debug)]
1632pub struct MovingMax<T> {
1633 push_stack: Vec<(T, T)>,
1634 pop_stack: Vec<(T, T)>,
1635}
1636
1637impl<T: Clone + PartialOrd> Default for MovingMax<T> {
1638 fn default() -> Self {
1639 Self {
1640 push_stack: Vec::new(),
1641 pop_stack: Vec::new(),
1642 }
1643 }
1644}
1645
1646impl<T: Clone + PartialOrd> MovingMax<T> {
1647 #[inline]
1649 pub fn new() -> Self {
1650 Self::default()
1651 }
1652
1653 #[inline]
1656 pub fn with_capacity(capacity: usize) -> Self {
1657 Self {
1658 push_stack: Vec::with_capacity(capacity),
1659 pop_stack: Vec::with_capacity(capacity),
1660 }
1661 }
1662
1663 #[inline]
1665 pub fn max(&self) -> Option<&T> {
1666 match (self.push_stack.last(), self.pop_stack.last()) {
1667 (None, None) => None,
1668 (Some((_, max)), None) => Some(max),
1669 (None, Some((_, max))) => Some(max),
1670 (Some((_, a)), Some((_, b))) => Some(if a > b { a } else { b }),
1671 }
1672 }
1673
1674 #[inline]
1676 pub fn push(&mut self, val: T) {
1677 self.push_stack.push(match self.push_stack.last() {
1678 Some((_, max)) => {
1679 if val < *max {
1680 (val, max.clone())
1681 } else {
1682 (val.clone(), val)
1683 }
1684 }
1685 None => (val.clone(), val),
1686 });
1687 }
1688
1689 #[inline]
1691 pub fn pop(&mut self) -> Option<T> {
1692 if self.pop_stack.is_empty() {
1693 match self.push_stack.pop() {
1694 Some((val, _)) => {
1695 let mut last = (val.clone(), val);
1696 self.pop_stack.push(last.clone());
1697 while let Some((val, _)) = self.push_stack.pop() {
1698 let max = if last.1 > val {
1699 last.1.clone()
1700 } else {
1701 val.clone()
1702 };
1703 last = (val.clone(), max);
1704 self.pop_stack.push(last.clone());
1705 }
1706 }
1707 None => return None,
1708 }
1709 }
1710 self.pop_stack.pop().map(|(val, _)| val)
1711 }
1712
1713 #[inline]
1715 pub fn len(&self) -> usize {
1716 self.push_stack.len() + self.pop_stack.len()
1717 }
1718
1719 #[inline]
1721 pub fn is_empty(&self) -> bool {
1722 self.len() == 0
1723 }
1724}
1725
1726make_udaf_expr_and_func!(
1727 Max,
1728 max,
1729 expression,
1730 "Returns the maximum of a group of values.",
1731 max_udaf
1732);
1733
1734make_udaf_expr_and_func!(
1735 Min,
1736 min,
1737 expression,
1738 "Returns the minimum of a group of values.",
1739 min_udaf
1740);
1741
1742#[cfg(test)]
1743mod tests {
1744 use super::*;
1745 use arrow::{
1746 array::DictionaryArray,
1747 datatypes::{
1748 IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
1749 },
1750 };
1751 use std::sync::Arc;
1752
1753 #[test]
1754 fn interval_min_max() {
1755 let b = IntervalYearMonthArray::from(vec![
1757 IntervalYearMonthType::make_value(0, 1),
1758 IntervalYearMonthType::make_value(5, 34),
1759 IntervalYearMonthType::make_value(-2, 4),
1760 IntervalYearMonthType::make_value(7, -4),
1761 IntervalYearMonthType::make_value(0, 1),
1762 ]);
1763 let b: ArrayRef = Arc::new(b);
1764
1765 let mut min =
1766 MinAccumulator::try_new(&DataType::Interval(IntervalUnit::YearMonth))
1767 .unwrap();
1768 min.update_batch(&[Arc::clone(&b)]).unwrap();
1769 let min_res = min.evaluate().unwrap();
1770 assert_eq!(
1771 min_res,
1772 ScalarValue::IntervalYearMonth(Some(IntervalYearMonthType::make_value(
1773 -2, 4,
1774 )))
1775 );
1776
1777 let mut max =
1778 MaxAccumulator::try_new(&DataType::Interval(IntervalUnit::YearMonth))
1779 .unwrap();
1780 max.update_batch(&[Arc::clone(&b)]).unwrap();
1781 let max_res = max.evaluate().unwrap();
1782 assert_eq!(
1783 max_res,
1784 ScalarValue::IntervalYearMonth(Some(IntervalYearMonthType::make_value(
1785 5, 34,
1786 )))
1787 );
1788
1789 let b = IntervalDayTimeArray::from(vec![
1791 IntervalDayTimeType::make_value(0, 0),
1792 IntervalDayTimeType::make_value(5, 454000),
1793 IntervalDayTimeType::make_value(-34, 0),
1794 IntervalDayTimeType::make_value(7, -4000),
1795 IntervalDayTimeType::make_value(1, 0),
1796 ]);
1797 let b: ArrayRef = Arc::new(b);
1798
1799 let mut min =
1800 MinAccumulator::try_new(&DataType::Interval(IntervalUnit::DayTime)).unwrap();
1801 min.update_batch(&[Arc::clone(&b)]).unwrap();
1802 let min_res = min.evaluate().unwrap();
1803 assert_eq!(
1804 min_res,
1805 ScalarValue::IntervalDayTime(Some(IntervalDayTimeType::make_value(-34, 0)))
1806 );
1807
1808 let mut max =
1809 MaxAccumulator::try_new(&DataType::Interval(IntervalUnit::DayTime)).unwrap();
1810 max.update_batch(&[Arc::clone(&b)]).unwrap();
1811 let max_res = max.evaluate().unwrap();
1812 assert_eq!(
1813 max_res,
1814 ScalarValue::IntervalDayTime(Some(IntervalDayTimeType::make_value(7, -4000)))
1815 );
1816
1817 let b = IntervalMonthDayNanoArray::from(vec![
1819 IntervalMonthDayNanoType::make_value(1, 0, 0),
1820 IntervalMonthDayNanoType::make_value(344, 34, -43_000_000_000),
1821 IntervalMonthDayNanoType::make_value(-593, -33, 13_000_000_000),
1822 IntervalMonthDayNanoType::make_value(5, 2, 493_000_000_000),
1823 IntervalMonthDayNanoType::make_value(1, 0, 0),
1824 ]);
1825 let b: ArrayRef = Arc::new(b);
1826
1827 let mut min =
1828 MinAccumulator::try_new(&DataType::Interval(IntervalUnit::MonthDayNano))
1829 .unwrap();
1830 min.update_batch(&[Arc::clone(&b)]).unwrap();
1831 let min_res = min.evaluate().unwrap();
1832 assert_eq!(
1833 min_res,
1834 ScalarValue::IntervalMonthDayNano(Some(
1835 IntervalMonthDayNanoType::make_value(-593, -33, 13_000_000_000)
1836 ))
1837 );
1838
1839 let mut max =
1840 MaxAccumulator::try_new(&DataType::Interval(IntervalUnit::MonthDayNano))
1841 .unwrap();
1842 max.update_batch(&[Arc::clone(&b)]).unwrap();
1843 let max_res = max.evaluate().unwrap();
1844 assert_eq!(
1845 max_res,
1846 ScalarValue::IntervalMonthDayNano(Some(
1847 IntervalMonthDayNanoType::make_value(344, 34, -43_000_000_000)
1848 ))
1849 );
1850 }
1851
1852 #[test]
1853 fn float_min_max_with_nans() {
1854 let pos_nan = f32::NAN;
1855 let zero = 0_f32;
1856 let neg_inf = f32::NEG_INFINITY;
1857
1858 let check = |acc: &mut dyn Accumulator, values: &[&[f32]], expected: f32| {
1859 for batch in values.iter() {
1860 let batch =
1861 Arc::new(Float32Array::from_iter_values(batch.iter().copied()));
1862 acc.update_batch(&[batch]).unwrap();
1863 }
1864 let result = acc.evaluate().unwrap();
1865 assert_eq!(result, ScalarValue::Float32(Some(expected)));
1866 };
1867
1868 let min = || MinAccumulator::try_new(&DataType::Float32).unwrap();
1873 let max = || MaxAccumulator::try_new(&DataType::Float32).unwrap();
1874
1875 check(&mut min(), &[&[zero], &[pos_nan]], zero);
1876 check(&mut min(), &[&[zero, pos_nan]], zero);
1877 check(&mut min(), &[&[zero], &[neg_inf]], neg_inf);
1878 check(&mut min(), &[&[zero, neg_inf]], neg_inf);
1879 check(&mut max(), &[&[zero], &[pos_nan]], pos_nan);
1880 check(&mut max(), &[&[zero, pos_nan]], pos_nan);
1881 check(&mut max(), &[&[zero], &[neg_inf]], zero);
1882 check(&mut max(), &[&[zero, neg_inf]], zero);
1883 }
1884
1885 use datafusion_common::Result;
1886 use rand::Rng;
1887
1888 fn get_random_vec_i32(len: usize) -> Vec<i32> {
1889 let mut rng = rand::rng();
1890 let mut input = Vec::with_capacity(len);
1891 for _i in 0..len {
1892 input.push(rng.random_range(0..100));
1893 }
1894 input
1895 }
1896
1897 fn moving_min_i32(len: usize, n_sliding_window: usize) -> Result<()> {
1898 let data = get_random_vec_i32(len);
1899 let mut expected = Vec::with_capacity(len);
1900 let mut moving_min = MovingMin::<i32>::new();
1901 let mut res = Vec::with_capacity(len);
1902 for i in 0..len {
1903 let start = i.saturating_sub(n_sliding_window);
1904 expected.push(*data[start..i + 1].iter().min().unwrap());
1905
1906 moving_min.push(data[i]);
1907 if i > n_sliding_window {
1908 moving_min.pop();
1909 }
1910 res.push(*moving_min.min().unwrap());
1911 }
1912 assert_eq!(res, expected);
1913 Ok(())
1914 }
1915
1916 fn moving_max_i32(len: usize, n_sliding_window: usize) -> Result<()> {
1917 let data = get_random_vec_i32(len);
1918 let mut expected = Vec::with_capacity(len);
1919 let mut moving_max = MovingMax::<i32>::new();
1920 let mut res = Vec::with_capacity(len);
1921 for i in 0..len {
1922 let start = i.saturating_sub(n_sliding_window);
1923 expected.push(*data[start..i + 1].iter().max().unwrap());
1924
1925 moving_max.push(data[i]);
1926 if i > n_sliding_window {
1927 moving_max.pop();
1928 }
1929 res.push(*moving_max.max().unwrap());
1930 }
1931 assert_eq!(res, expected);
1932 Ok(())
1933 }
1934
1935 #[test]
1936 fn moving_min_tests() -> Result<()> {
1937 moving_min_i32(100, 10)?;
1938 moving_min_i32(100, 20)?;
1939 moving_min_i32(100, 50)?;
1940 moving_min_i32(100, 100)?;
1941 Ok(())
1942 }
1943
1944 #[test]
1945 fn moving_max_tests() -> Result<()> {
1946 moving_max_i32(100, 10)?;
1947 moving_max_i32(100, 20)?;
1948 moving_max_i32(100, 50)?;
1949 moving_max_i32(100, 100)?;
1950 Ok(())
1951 }
1952
1953 #[test]
1954 fn test_min_max_coerce_types() {
1955 let funs: Vec<Box<dyn AggregateUDFImpl>> =
1957 vec![Box::new(Min::new()), Box::new(Max::new())];
1958 let input_types = vec![
1959 vec![DataType::Int32],
1960 vec![DataType::Decimal128(10, 2)],
1961 vec![DataType::Decimal256(1, 1)],
1962 vec![DataType::Utf8],
1963 ];
1964 for fun in funs {
1965 for input_type in &input_types {
1966 let result = fun.coerce_types(input_type);
1967 assert_eq!(*input_type, result.unwrap());
1968 }
1969 }
1970 }
1971
1972 #[test]
1973 fn test_get_min_max_return_type_coerce_dictionary() -> Result<()> {
1974 let data_type =
1975 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
1976 let result = get_min_max_result_type(&[data_type])?;
1977 assert_eq!(result, vec![DataType::Utf8]);
1978 Ok(())
1979 }
1980
1981 #[test]
1982 fn test_min_max_dictionary() -> Result<()> {
1983 let values = StringArray::from(vec!["b", "c", "a", "🦀", "d"]);
1984 let keys = Int32Array::from(vec![Some(0), Some(1), Some(2), None, Some(4)]);
1985 let dict_array =
1986 DictionaryArray::try_new(keys, Arc::new(values) as ArrayRef).unwrap();
1987 let dict_array_ref = Arc::new(dict_array) as ArrayRef;
1988 let rt_type =
1989 get_min_max_result_type(&[dict_array_ref.data_type().clone()])?[0].clone();
1990
1991 let mut min_acc = MinAccumulator::try_new(&rt_type)?;
1992 min_acc.update_batch(&[Arc::clone(&dict_array_ref)])?;
1993 let min_result = min_acc.evaluate()?;
1994 assert_eq!(min_result, ScalarValue::Utf8(Some("a".to_string())));
1995
1996 let mut max_acc = MaxAccumulator::try_new(&rt_type)?;
1997 max_acc.update_batch(&[Arc::clone(&dict_array_ref)])?;
1998 let max_result = max_acc.evaluate()?;
1999 assert_eq!(max_result, ScalarValue::Utf8(Some("🦀".to_string())));
2000 Ok(())
2001 }
2002}