1use arrow::array::{
21 ArrayRef, AsArray as _, BinaryArray, BinaryViewArray, BooleanArray, Date32Array,
22 Date64Array, Decimal128Array, Decimal256Array, Decimal32Array, Decimal64Array,
23 DurationMicrosecondArray, DurationMillisecondArray, DurationNanosecondArray,
24 DurationSecondArray, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array,
25 Int16Array, Int32Array, Int64Array, Int8Array, IntervalDayTimeArray,
26 IntervalMonthDayNanoArray, IntervalYearMonthArray, LargeBinaryArray,
27 LargeStringArray, StringArray, StringViewArray, Time32MillisecondArray,
28 Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray,
29 TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
30 TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
31};
32use arrow::compute;
33use arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
34use datafusion_common::{
35 downcast_value, internal_err, DataFusionError, Result, ScalarValue,
36};
37use datafusion_expr_common::accumulator::Accumulator;
38use std::{cmp::Ordering, mem::size_of_val};
39
40macro_rules! typed_min_max {
42 ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident $(, $EXTRA_ARGS:ident)*) => {{
43 ScalarValue::$SCALAR(
44 match ($VALUE, $DELTA) {
45 (None, None) => None,
46 (Some(a), None) => Some(*a),
47 (None, Some(b)) => Some(*b),
48 (Some(a), Some(b)) => Some((*a).$OP(*b)),
49 },
50 $($EXTRA_ARGS.clone()),*
51 )
52 }};
53}
54
55macro_rules! typed_min_max_float {
56 ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident) => {{
57 ScalarValue::$SCALAR(match ($VALUE, $DELTA) {
58 (None, None) => None,
59 (Some(a), None) => Some(*a),
60 (None, Some(b)) => Some(*b),
61 (Some(a), Some(b)) => match a.total_cmp(b) {
62 choose_min_max!($OP) => Some(*b),
63 _ => Some(*a),
64 },
65 })
66 }};
67}
68
69macro_rules! typed_min_max_string {
71 ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident) => {{
72 ScalarValue::$SCALAR(match ($VALUE, $DELTA) {
73 (None, None) => None,
74 (Some(a), None) => Some(a.clone()),
75 (None, Some(b)) => Some(b.clone()),
76 (Some(a), Some(b)) => Some((a).$OP(b).clone()),
77 })
78 }};
79}
80
81macro_rules! typed_min_max_string_arg {
83 ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident, $ARG:expr) => {{
84 ScalarValue::$SCALAR(
85 $ARG,
86 match ($VALUE, $DELTA) {
87 (None, None) => None,
88 (Some(a), None) => Some(a.clone()),
89 (None, Some(b)) => Some(b.clone()),
90 (Some(a), Some(b)) => Some((a).$OP(b).clone()),
91 },
92 )
93 }};
94}
95
96macro_rules! choose_min_max {
97 (min) => {
98 std::cmp::Ordering::Greater
99 };
100 (max) => {
101 std::cmp::Ordering::Less
102 };
103}
104
105macro_rules! interval_min_max {
106 ($OP:tt, $LHS:expr, $RHS:expr) => {{
107 match $LHS.partial_cmp(&$RHS) {
108 Some(choose_min_max!($OP)) => $RHS.clone(),
109 Some(_) => $LHS.clone(),
110 None => {
111 return internal_err!("Comparison error while computing interval min/max")
112 }
113 }
114 }};
115}
116
117macro_rules! min_max_generic {
118 ($VALUE:expr, $DELTA:expr, $OP:ident) => {{
119 if $VALUE.is_null() {
120 let mut delta_copy = $DELTA.clone();
121 delta_copy.compact();
124 delta_copy
125 } else if $DELTA.is_null() {
126 $VALUE.clone()
127 } else {
128 match $VALUE.partial_cmp(&$DELTA) {
129 Some(choose_min_max!($OP)) => {
130 let mut delta_copy = $DELTA.clone();
133 delta_copy.compact();
134 delta_copy
135 }
136 _ => $VALUE.clone(),
137 }
138 }
139 }};
140}
141
142macro_rules! min_max {
144 ($VALUE:expr, $DELTA:expr, $OP:ident) => {{
145 Ok(match ($VALUE, $DELTA) {
146 (ScalarValue::Null, ScalarValue::Null) => ScalarValue::Null,
147 (
148 lhs @ ScalarValue::Decimal32(lhsv, lhsp, lhss),
149 rhs @ ScalarValue::Decimal32(rhsv, rhsp, rhss)
150 ) => {
151 if lhsp.eq(rhsp) && lhss.eq(rhss) {
152 typed_min_max!(lhsv, rhsv, Decimal32, $OP, lhsp, lhss)
153 } else {
154 return internal_err!(
155 "MIN/MAX is not expected to receive scalars of incompatible types {:?}",
156 (lhs, rhs)
157 );
158 }
159 }
160 (
161 lhs @ ScalarValue::Decimal64(lhsv, lhsp, lhss),
162 rhs @ ScalarValue::Decimal64(rhsv, rhsp, rhss)
163 ) => {
164 if lhsp.eq(rhsp) && lhss.eq(rhss) {
165 typed_min_max!(lhsv, rhsv, Decimal64, $OP, lhsp, lhss)
166 } else {
167 return internal_err!(
168 "MIN/MAX is not expected to receive scalars of incompatible types {:?}",
169 (lhs, rhs)
170 );
171 }
172 }
173 (
174 lhs @ ScalarValue::Decimal128(lhsv, lhsp, lhss),
175 rhs @ ScalarValue::Decimal128(rhsv, rhsp, rhss)
176 ) => {
177 if lhsp.eq(rhsp) && lhss.eq(rhss) {
178 typed_min_max!(lhsv, rhsv, Decimal128, $OP, lhsp, lhss)
179 } else {
180 return internal_err!(
181 "MIN/MAX is not expected to receive scalars of incompatible types {:?}",
182 (lhs, rhs)
183 );
184 }
185 }
186 (
187 lhs @ ScalarValue::Decimal256(lhsv, lhsp, lhss),
188 rhs @ ScalarValue::Decimal256(rhsv, rhsp, rhss)
189 ) => {
190 if lhsp.eq(rhsp) && lhss.eq(rhss) {
191 typed_min_max!(lhsv, rhsv, Decimal256, $OP, lhsp, lhss)
192 } else {
193 return internal_err!(
194 "MIN/MAX is not expected to receive scalars of incompatible types {:?}",
195 (lhs, rhs)
196 );
197 }
198 }
199 (ScalarValue::Boolean(lhs), ScalarValue::Boolean(rhs)) => {
200 typed_min_max!(lhs, rhs, Boolean, $OP)
201 }
202 (ScalarValue::Float64(lhs), ScalarValue::Float64(rhs)) => {
203 typed_min_max_float!(lhs, rhs, Float64, $OP)
204 }
205 (ScalarValue::Float32(lhs), ScalarValue::Float32(rhs)) => {
206 typed_min_max_float!(lhs, rhs, Float32, $OP)
207 }
208 (ScalarValue::Float16(lhs), ScalarValue::Float16(rhs)) => {
209 typed_min_max_float!(lhs, rhs, Float16, $OP)
210 }
211 (ScalarValue::UInt64(lhs), ScalarValue::UInt64(rhs)) => {
212 typed_min_max!(lhs, rhs, UInt64, $OP)
213 }
214 (ScalarValue::UInt32(lhs), ScalarValue::UInt32(rhs)) => {
215 typed_min_max!(lhs, rhs, UInt32, $OP)
216 }
217 (ScalarValue::UInt16(lhs), ScalarValue::UInt16(rhs)) => {
218 typed_min_max!(lhs, rhs, UInt16, $OP)
219 }
220 (ScalarValue::UInt8(lhs), ScalarValue::UInt8(rhs)) => {
221 typed_min_max!(lhs, rhs, UInt8, $OP)
222 }
223 (ScalarValue::Int64(lhs), ScalarValue::Int64(rhs)) => {
224 typed_min_max!(lhs, rhs, Int64, $OP)
225 }
226 (ScalarValue::Int32(lhs), ScalarValue::Int32(rhs)) => {
227 typed_min_max!(lhs, rhs, Int32, $OP)
228 }
229 (ScalarValue::Int16(lhs), ScalarValue::Int16(rhs)) => {
230 typed_min_max!(lhs, rhs, Int16, $OP)
231 }
232 (ScalarValue::Int8(lhs), ScalarValue::Int8(rhs)) => {
233 typed_min_max!(lhs, rhs, Int8, $OP)
234 }
235 (ScalarValue::Utf8(lhs), ScalarValue::Utf8(rhs)) => {
236 typed_min_max_string!(lhs, rhs, Utf8, $OP)
237 }
238 (ScalarValue::LargeUtf8(lhs), ScalarValue::LargeUtf8(rhs)) => {
239 typed_min_max_string!(lhs, rhs, LargeUtf8, $OP)
240 }
241 (ScalarValue::Utf8View(lhs), ScalarValue::Utf8View(rhs)) => {
242 typed_min_max_string!(lhs, rhs, Utf8View, $OP)
243 }
244 (ScalarValue::Binary(lhs), ScalarValue::Binary(rhs)) => {
245 typed_min_max_string!(lhs, rhs, Binary, $OP)
246 }
247 (ScalarValue::LargeBinary(lhs), ScalarValue::LargeBinary(rhs)) => {
248 typed_min_max_string!(lhs, rhs, LargeBinary, $OP)
249 }
250 (ScalarValue::FixedSizeBinary(lsize, lhs), ScalarValue::FixedSizeBinary(rsize, rhs)) => {
251 if lsize == rsize {
252 typed_min_max_string_arg!(lhs, rhs, FixedSizeBinary, $OP, *lsize)
253 }
254 else {
255 return internal_err!(
256 "MIN/MAX is not expected to receive FixedSizeBinary of incompatible sizes {:?}",
257 (lsize, rsize))
258 }
259 }
260 (ScalarValue::BinaryView(lhs), ScalarValue::BinaryView(rhs)) => {
261 typed_min_max_string!(lhs, rhs, BinaryView, $OP)
262 }
263 (ScalarValue::TimestampSecond(lhs, l_tz), ScalarValue::TimestampSecond(rhs, _)) => {
264 typed_min_max!(lhs, rhs, TimestampSecond, $OP, l_tz)
265 }
266 (
267 ScalarValue::TimestampMillisecond(lhs, l_tz),
268 ScalarValue::TimestampMillisecond(rhs, _),
269 ) => {
270 typed_min_max!(lhs, rhs, TimestampMillisecond, $OP, l_tz)
271 }
272 (
273 ScalarValue::TimestampMicrosecond(lhs, l_tz),
274 ScalarValue::TimestampMicrosecond(rhs, _),
275 ) => {
276 typed_min_max!(lhs, rhs, TimestampMicrosecond, $OP, l_tz)
277 }
278 (
279 ScalarValue::TimestampNanosecond(lhs, l_tz),
280 ScalarValue::TimestampNanosecond(rhs, _),
281 ) => {
282 typed_min_max!(lhs, rhs, TimestampNanosecond, $OP, l_tz)
283 }
284 (
285 ScalarValue::Date32(lhs),
286 ScalarValue::Date32(rhs),
287 ) => {
288 typed_min_max!(lhs, rhs, Date32, $OP)
289 }
290 (
291 ScalarValue::Date64(lhs),
292 ScalarValue::Date64(rhs),
293 ) => {
294 typed_min_max!(lhs, rhs, Date64, $OP)
295 }
296 (
297 ScalarValue::Time32Second(lhs),
298 ScalarValue::Time32Second(rhs),
299 ) => {
300 typed_min_max!(lhs, rhs, Time32Second, $OP)
301 }
302 (
303 ScalarValue::Time32Millisecond(lhs),
304 ScalarValue::Time32Millisecond(rhs),
305 ) => {
306 typed_min_max!(lhs, rhs, Time32Millisecond, $OP)
307 }
308 (
309 ScalarValue::Time64Microsecond(lhs),
310 ScalarValue::Time64Microsecond(rhs),
311 ) => {
312 typed_min_max!(lhs, rhs, Time64Microsecond, $OP)
313 }
314 (
315 ScalarValue::Time64Nanosecond(lhs),
316 ScalarValue::Time64Nanosecond(rhs),
317 ) => {
318 typed_min_max!(lhs, rhs, Time64Nanosecond, $OP)
319 }
320 (
321 ScalarValue::IntervalYearMonth(lhs),
322 ScalarValue::IntervalYearMonth(rhs),
323 ) => {
324 typed_min_max!(lhs, rhs, IntervalYearMonth, $OP)
325 }
326 (
327 ScalarValue::IntervalMonthDayNano(lhs),
328 ScalarValue::IntervalMonthDayNano(rhs),
329 ) => {
330 typed_min_max!(lhs, rhs, IntervalMonthDayNano, $OP)
331 }
332 (
333 ScalarValue::IntervalDayTime(lhs),
334 ScalarValue::IntervalDayTime(rhs),
335 ) => {
336 typed_min_max!(lhs, rhs, IntervalDayTime, $OP)
337 }
338 (
339 ScalarValue::IntervalYearMonth(_),
340 ScalarValue::IntervalMonthDayNano(_),
341 ) | (
342 ScalarValue::IntervalYearMonth(_),
343 ScalarValue::IntervalDayTime(_),
344 ) | (
345 ScalarValue::IntervalMonthDayNano(_),
346 ScalarValue::IntervalDayTime(_),
347 ) | (
348 ScalarValue::IntervalMonthDayNano(_),
349 ScalarValue::IntervalYearMonth(_),
350 ) | (
351 ScalarValue::IntervalDayTime(_),
352 ScalarValue::IntervalYearMonth(_),
353 ) | (
354 ScalarValue::IntervalDayTime(_),
355 ScalarValue::IntervalMonthDayNano(_),
356 ) => {
357 interval_min_max!($OP, $VALUE, $DELTA)
358 }
359 (
360 ScalarValue::DurationSecond(lhs),
361 ScalarValue::DurationSecond(rhs),
362 ) => {
363 typed_min_max!(lhs, rhs, DurationSecond, $OP)
364 }
365 (
366 ScalarValue::DurationMillisecond(lhs),
367 ScalarValue::DurationMillisecond(rhs),
368 ) => {
369 typed_min_max!(lhs, rhs, DurationMillisecond, $OP)
370 }
371 (
372 ScalarValue::DurationMicrosecond(lhs),
373 ScalarValue::DurationMicrosecond(rhs),
374 ) => {
375 typed_min_max!(lhs, rhs, DurationMicrosecond, $OP)
376 }
377 (
378 ScalarValue::DurationNanosecond(lhs),
379 ScalarValue::DurationNanosecond(rhs),
380 ) => {
381 typed_min_max!(lhs, rhs, DurationNanosecond, $OP)
382 }
383
384 (
385 lhs @ ScalarValue::Struct(_),
386 rhs @ ScalarValue::Struct(_),
387 ) => {
388 min_max_generic!(lhs, rhs, $OP)
389 }
390
391 (
392 lhs @ ScalarValue::List(_),
393 rhs @ ScalarValue::List(_),
394 ) => {
395 min_max_generic!(lhs, rhs, $OP)
396 }
397
398
399 (
400 lhs @ ScalarValue::LargeList(_),
401 rhs @ ScalarValue::LargeList(_),
402 ) => {
403 min_max_generic!(lhs, rhs, $OP)
404 }
405
406
407 (
408 lhs @ ScalarValue::FixedSizeList(_),
409 rhs @ ScalarValue::FixedSizeList(_),
410 ) => {
411 min_max_generic!(lhs, rhs, $OP)
412 }
413
414 e => {
415 return internal_err!(
416 "MIN/MAX is not expected to receive scalars of incompatible types {:?}",
417 e
418 )
419 }
420 })
421 }};
422}
423
424#[derive(Debug, Clone)]
426pub struct MaxAccumulator {
427 max: ScalarValue,
428}
429
430impl MaxAccumulator {
431 pub fn try_new(datatype: &DataType) -> Result<Self> {
433 Ok(Self {
434 max: ScalarValue::try_from(datatype)?,
435 })
436 }
437}
438
439impl Accumulator for MaxAccumulator {
440 fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
441 let values = &values[0];
442 let delta = &max_batch(values)?;
443 let new_max: Result<ScalarValue, DataFusionError> =
444 min_max!(&self.max, delta, max);
445 self.max = new_max?;
446 Ok(())
447 }
448
449 fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
450 self.update_batch(states)
451 }
452
453 fn state(&mut self) -> Result<Vec<ScalarValue>> {
454 Ok(vec![self.evaluate()?])
455 }
456 fn evaluate(&mut self) -> Result<ScalarValue> {
457 Ok(self.max.clone())
458 }
459
460 fn size(&self) -> usize {
461 size_of_val(self) - size_of_val(&self.max) + self.max.size()
462 }
463}
464
465#[derive(Debug, Clone)]
467pub struct MinAccumulator {
468 min: ScalarValue,
469}
470
471impl MinAccumulator {
472 pub fn try_new(datatype: &DataType) -> Result<Self> {
474 Ok(Self {
475 min: ScalarValue::try_from(datatype)?,
476 })
477 }
478}
479
480impl Accumulator for MinAccumulator {
481 fn state(&mut self) -> Result<Vec<ScalarValue>> {
482 Ok(vec![self.evaluate()?])
483 }
484
485 fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
486 let values = &values[0];
487 let delta = &min_batch(values)?;
488 let new_min: Result<ScalarValue, DataFusionError> =
489 min_max!(&self.min, delta, min);
490 self.min = new_min?;
491 Ok(())
492 }
493
494 fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
495 self.update_batch(states)
496 }
497
498 fn evaluate(&mut self) -> Result<ScalarValue> {
499 Ok(self.min.clone())
500 }
501
502 fn size(&self) -> usize {
503 size_of_val(self) - size_of_val(&self.min) + self.min.size()
504 }
505}
506
507macro_rules! typed_min_max_batch_string {
509 ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
510 let array = downcast_value!($VALUES, $ARRAYTYPE);
511 let value = compute::$OP(array);
512 let value = value.and_then(|e| Some(e.to_string()));
513 ScalarValue::$SCALAR(value)
514 }};
515}
516
517macro_rules! typed_min_max_batch_binary {
519 ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
520 let array = downcast_value!($VALUES, $ARRAYTYPE);
521 let value = compute::$OP(array);
522 let value = value.and_then(|e| Some(e.to_vec()));
523 ScalarValue::$SCALAR(value)
524 }};
525}
526
527macro_rules! typed_min_max_batch {
529 ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident $(, $EXTRA_ARGS:ident)*) => {{
530 let array = downcast_value!($VALUES, $ARRAYTYPE);
531 let value = compute::$OP(array);
532 ScalarValue::$SCALAR(value, $($EXTRA_ARGS.clone()),*)
533 }};
534}
535
536macro_rules! min_max_batch {
539 ($VALUES:expr, $OP:ident) => {{
540 match $VALUES.data_type() {
541 DataType::Null => ScalarValue::Null,
542 DataType::Decimal32(precision, scale) => {
543 typed_min_max_batch!(
544 $VALUES,
545 Decimal32Array,
546 Decimal32,
547 $OP,
548 precision,
549 scale
550 )
551 }
552 DataType::Decimal64(precision, scale) => {
553 typed_min_max_batch!(
554 $VALUES,
555 Decimal64Array,
556 Decimal64,
557 $OP,
558 precision,
559 scale
560 )
561 }
562 DataType::Decimal128(precision, scale) => {
563 typed_min_max_batch!(
564 $VALUES,
565 Decimal128Array,
566 Decimal128,
567 $OP,
568 precision,
569 scale
570 )
571 }
572 DataType::Decimal256(precision, scale) => {
573 typed_min_max_batch!(
574 $VALUES,
575 Decimal256Array,
576 Decimal256,
577 $OP,
578 precision,
579 scale
580 )
581 }
582 DataType::Float64 => {
584 typed_min_max_batch!($VALUES, Float64Array, Float64, $OP)
585 }
586 DataType::Float32 => {
587 typed_min_max_batch!($VALUES, Float32Array, Float32, $OP)
588 }
589 DataType::Float16 => {
590 typed_min_max_batch!($VALUES, Float16Array, Float16, $OP)
591 }
592 DataType::Int64 => typed_min_max_batch!($VALUES, Int64Array, Int64, $OP),
593 DataType::Int32 => typed_min_max_batch!($VALUES, Int32Array, Int32, $OP),
594 DataType::Int16 => typed_min_max_batch!($VALUES, Int16Array, Int16, $OP),
595 DataType::Int8 => typed_min_max_batch!($VALUES, Int8Array, Int8, $OP),
596 DataType::UInt64 => typed_min_max_batch!($VALUES, UInt64Array, UInt64, $OP),
597 DataType::UInt32 => typed_min_max_batch!($VALUES, UInt32Array, UInt32, $OP),
598 DataType::UInt16 => typed_min_max_batch!($VALUES, UInt16Array, UInt16, $OP),
599 DataType::UInt8 => typed_min_max_batch!($VALUES, UInt8Array, UInt8, $OP),
600 DataType::Timestamp(TimeUnit::Second, tz_opt) => {
601 typed_min_max_batch!(
602 $VALUES,
603 TimestampSecondArray,
604 TimestampSecond,
605 $OP,
606 tz_opt
607 )
608 }
609 DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => typed_min_max_batch!(
610 $VALUES,
611 TimestampMillisecondArray,
612 TimestampMillisecond,
613 $OP,
614 tz_opt
615 ),
616 DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => typed_min_max_batch!(
617 $VALUES,
618 TimestampMicrosecondArray,
619 TimestampMicrosecond,
620 $OP,
621 tz_opt
622 ),
623 DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => typed_min_max_batch!(
624 $VALUES,
625 TimestampNanosecondArray,
626 TimestampNanosecond,
627 $OP,
628 tz_opt
629 ),
630 DataType::Date32 => typed_min_max_batch!($VALUES, Date32Array, Date32, $OP),
631 DataType::Date64 => typed_min_max_batch!($VALUES, Date64Array, Date64, $OP),
632 DataType::Time32(TimeUnit::Second) => {
633 typed_min_max_batch!($VALUES, Time32SecondArray, Time32Second, $OP)
634 }
635 DataType::Time32(TimeUnit::Millisecond) => {
636 typed_min_max_batch!(
637 $VALUES,
638 Time32MillisecondArray,
639 Time32Millisecond,
640 $OP
641 )
642 }
643 DataType::Time64(TimeUnit::Microsecond) => {
644 typed_min_max_batch!(
645 $VALUES,
646 Time64MicrosecondArray,
647 Time64Microsecond,
648 $OP
649 )
650 }
651 DataType::Time64(TimeUnit::Nanosecond) => {
652 typed_min_max_batch!(
653 $VALUES,
654 Time64NanosecondArray,
655 Time64Nanosecond,
656 $OP
657 )
658 }
659 DataType::Interval(IntervalUnit::YearMonth) => {
660 typed_min_max_batch!(
661 $VALUES,
662 IntervalYearMonthArray,
663 IntervalYearMonth,
664 $OP
665 )
666 }
667 DataType::Interval(IntervalUnit::DayTime) => {
668 typed_min_max_batch!($VALUES, IntervalDayTimeArray, IntervalDayTime, $OP)
669 }
670 DataType::Interval(IntervalUnit::MonthDayNano) => {
671 typed_min_max_batch!(
672 $VALUES,
673 IntervalMonthDayNanoArray,
674 IntervalMonthDayNano,
675 $OP
676 )
677 }
678 DataType::Duration(TimeUnit::Second) => {
679 typed_min_max_batch!($VALUES, DurationSecondArray, DurationSecond, $OP)
680 }
681 DataType::Duration(TimeUnit::Millisecond) => {
682 typed_min_max_batch!(
683 $VALUES,
684 DurationMillisecondArray,
685 DurationMillisecond,
686 $OP
687 )
688 }
689 DataType::Duration(TimeUnit::Microsecond) => {
690 typed_min_max_batch!(
691 $VALUES,
692 DurationMicrosecondArray,
693 DurationMicrosecond,
694 $OP
695 )
696 }
697 DataType::Duration(TimeUnit::Nanosecond) => {
698 typed_min_max_batch!(
699 $VALUES,
700 DurationNanosecondArray,
701 DurationNanosecond,
702 $OP
703 )
704 }
705 other => {
706 return datafusion_common::internal_err!(
708 "Min/Max accumulator not implemented for type {}",
709 other
710 );
711 }
712 }
713 }};
714}
715
716pub fn min_batch(values: &ArrayRef) -> Result<ScalarValue> {
718 Ok(match values.data_type() {
719 DataType::Utf8 => {
720 typed_min_max_batch_string!(values, StringArray, Utf8, min_string)
721 }
722 DataType::LargeUtf8 => {
723 typed_min_max_batch_string!(values, LargeStringArray, LargeUtf8, min_string)
724 }
725 DataType::Utf8View => {
726 typed_min_max_batch_string!(
727 values,
728 StringViewArray,
729 Utf8View,
730 min_string_view
731 )
732 }
733 DataType::Boolean => {
734 typed_min_max_batch!(values, BooleanArray, Boolean, min_boolean)
735 }
736 DataType::Binary => {
737 typed_min_max_batch_binary!(&values, BinaryArray, Binary, min_binary)
738 }
739 DataType::LargeBinary => {
740 typed_min_max_batch_binary!(
741 &values,
742 LargeBinaryArray,
743 LargeBinary,
744 min_binary
745 )
746 }
747 DataType::FixedSizeBinary(size) => {
748 let array = downcast_value!(&values, FixedSizeBinaryArray);
749 let value = compute::min_fixed_size_binary(array);
750 let value = value.map(|e| e.to_vec());
751 ScalarValue::FixedSizeBinary(*size, value)
752 }
753 DataType::BinaryView => {
754 typed_min_max_batch_binary!(
755 &values,
756 BinaryViewArray,
757 BinaryView,
758 min_binary_view
759 )
760 }
761 DataType::Struct(_) => min_max_batch_generic(values, Ordering::Greater)?,
762 DataType::List(_) => min_max_batch_generic(values, Ordering::Greater)?,
763 DataType::LargeList(_) => min_max_batch_generic(values, Ordering::Greater)?,
764 DataType::FixedSizeList(_, _) => {
765 min_max_batch_generic(values, Ordering::Greater)?
766 }
767 DataType::Dictionary(_, _) => {
768 let values = values.as_any_dictionary().values();
769 min_batch(values)?
770 }
771 _ => min_max_batch!(values, min),
772 })
773}
774
775fn min_max_batch_generic(array: &ArrayRef, ordering: Ordering) -> Result<ScalarValue> {
777 if array.len() == array.null_count() {
778 return ScalarValue::try_from(array.data_type());
779 }
780 let mut extreme = ScalarValue::try_from_array(array, 0)?;
781 for i in 1..array.len() {
782 let current = ScalarValue::try_from_array(array, i)?;
783 if current.is_null() {
784 continue;
785 }
786 if extreme.is_null() {
787 extreme = current;
788 continue;
789 }
790 let cmp = extreme.try_cmp(¤t)?;
791 if cmp == ordering {
792 extreme = current;
793 }
794 }
795
796 Ok(extreme)
797}
798
799pub fn max_batch(values: &ArrayRef) -> Result<ScalarValue> {
801 Ok(match values.data_type() {
802 DataType::Utf8 => {
803 typed_min_max_batch_string!(values, StringArray, Utf8, max_string)
804 }
805 DataType::LargeUtf8 => {
806 typed_min_max_batch_string!(values, LargeStringArray, LargeUtf8, max_string)
807 }
808 DataType::Utf8View => {
809 typed_min_max_batch_string!(
810 values,
811 StringViewArray,
812 Utf8View,
813 max_string_view
814 )
815 }
816 DataType::Boolean => {
817 typed_min_max_batch!(values, BooleanArray, Boolean, max_boolean)
818 }
819 DataType::Binary => {
820 typed_min_max_batch_binary!(&values, BinaryArray, Binary, max_binary)
821 }
822 DataType::BinaryView => {
823 typed_min_max_batch_binary!(
824 &values,
825 BinaryViewArray,
826 BinaryView,
827 max_binary_view
828 )
829 }
830 DataType::LargeBinary => {
831 typed_min_max_batch_binary!(
832 &values,
833 LargeBinaryArray,
834 LargeBinary,
835 max_binary
836 )
837 }
838 DataType::FixedSizeBinary(size) => {
839 let array = downcast_value!(&values, FixedSizeBinaryArray);
840 let value = compute::max_fixed_size_binary(array);
841 let value = value.map(|e| e.to_vec());
842 ScalarValue::FixedSizeBinary(*size, value)
843 }
844 DataType::Struct(_) => min_max_batch_generic(values, Ordering::Less)?,
845 DataType::List(_) => min_max_batch_generic(values, Ordering::Less)?,
846 DataType::LargeList(_) => min_max_batch_generic(values, Ordering::Less)?,
847 DataType::FixedSizeList(_, _) => min_max_batch_generic(values, Ordering::Less)?,
848 DataType::Dictionary(_, _) => {
849 let values = values.as_any_dictionary().values();
850 max_batch(values)?
851 }
852 _ => min_max_batch!(values, max),
853 })
854}