1use arrow::array::{
21 ArrayRef, AsArray as _, BinaryArray, BinaryViewArray, BooleanArray, Date32Array,
22 Date64Array, Decimal32Array, Decimal64Array, Decimal128Array, Decimal256Array,
23 DurationMicrosecondArray, DurationMillisecondArray, DurationNanosecondArray,
24 DurationSecondArray, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array,
25 Int8Array, Int16Array, Int32Array, Int64Array, IntervalDayTimeArray,
26 IntervalMonthDayNanoArray, IntervalYearMonthArray, LargeBinaryArray,
27 LargeStringArray, StringArray, StringViewArray, Time32MillisecondArray,
28 Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray,
29 TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
30 TimestampSecondArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
31};
32use arrow::compute;
33use arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
34use datafusion_common::{
35 DataFusionError, Result, ScalarValue, downcast_value, internal_err,
36};
37use datafusion_expr_common::accumulator::Accumulator;
38use std::{cmp::Ordering, mem::size_of_val};
39
40macro_rules! typed_min_max {
42 ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident $(, $EXTRA_ARGS:ident)*) => {{
43 ScalarValue::$SCALAR(
44 match ($VALUE, $DELTA) {
45 (None, None) => None,
46 (Some(a), None) => Some(*a),
47 (None, Some(b)) => Some(*b),
48 (Some(a), Some(b)) => Some((*a).$OP(*b)),
49 },
50 $($EXTRA_ARGS.clone()),*
51 )
52 }};
53}
54
55macro_rules! typed_min_max_float {
56 ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident) => {{
57 ScalarValue::$SCALAR(match ($VALUE, $DELTA) {
58 (None, None) => None,
59 (Some(a), None) => Some(*a),
60 (None, Some(b)) => Some(*b),
61 (Some(a), Some(b)) => match a.total_cmp(b) {
62 choose_min_max!($OP) => Some(*b),
63 _ => Some(*a),
64 },
65 })
66 }};
67}
68
69macro_rules! typed_min_max_string {
71 ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident) => {{
72 ScalarValue::$SCALAR(match ($VALUE, $DELTA) {
73 (None, None) => None,
74 (Some(a), None) => Some(a.clone()),
75 (None, Some(b)) => Some(b.clone()),
76 (Some(a), Some(b)) => Some((a).$OP(b).clone()),
77 })
78 }};
79}
80
81macro_rules! typed_min_max_string_arg {
83 ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident, $ARG:expr) => {{
84 ScalarValue::$SCALAR(
85 $ARG,
86 match ($VALUE, $DELTA) {
87 (None, None) => None,
88 (Some(a), None) => Some(a.clone()),
89 (None, Some(b)) => Some(b.clone()),
90 (Some(a), Some(b)) => Some((a).$OP(b).clone()),
91 },
92 )
93 }};
94}
95
96macro_rules! choose_min_max {
97 (min) => {
98 std::cmp::Ordering::Greater
99 };
100 (max) => {
101 std::cmp::Ordering::Less
102 };
103}
104
105macro_rules! interval_min_max {
106 ($OP:tt, $LHS:expr, $RHS:expr) => {{
107 match $LHS.partial_cmp(&$RHS) {
108 Some(choose_min_max!($OP)) => $RHS.clone(),
109 Some(_) => $LHS.clone(),
110 None => {
111 return internal_err!(
112 "Comparison error while computing interval min/max"
113 );
114 }
115 }
116 }};
117}
118
119macro_rules! min_max_generic {
120 ($VALUE:expr, $DELTA:expr, $OP:ident) => {{
121 if $VALUE.is_null() {
122 let mut delta_copy = $DELTA.clone();
123 delta_copy.compact();
126 delta_copy
127 } else if $DELTA.is_null() {
128 $VALUE.clone()
129 } else {
130 match $VALUE.partial_cmp(&$DELTA) {
131 Some(choose_min_max!($OP)) => {
132 let mut delta_copy = $DELTA.clone();
135 delta_copy.compact();
136 delta_copy
137 }
138 _ => $VALUE.clone(),
139 }
140 }
141 }};
142}
143
144macro_rules! min_max {
146 ($VALUE:expr, $DELTA:expr, $OP:ident) => {{
147 Ok(match ($VALUE, $DELTA) {
148 (ScalarValue::Null, ScalarValue::Null) => ScalarValue::Null,
149 (
150 lhs @ ScalarValue::Decimal32(lhsv, lhsp, lhss),
151 rhs @ ScalarValue::Decimal32(rhsv, rhsp, rhss)
152 ) => {
153 if lhsp.eq(rhsp) && lhss.eq(rhss) {
154 typed_min_max!(lhsv, rhsv, Decimal32, $OP, lhsp, lhss)
155 } else {
156 return internal_err!(
157 "MIN/MAX is not expected to receive scalars of incompatible types {:?}",
158 (lhs, rhs)
159 );
160 }
161 }
162 (
163 lhs @ ScalarValue::Decimal64(lhsv, lhsp, lhss),
164 rhs @ ScalarValue::Decimal64(rhsv, rhsp, rhss)
165 ) => {
166 if lhsp.eq(rhsp) && lhss.eq(rhss) {
167 typed_min_max!(lhsv, rhsv, Decimal64, $OP, lhsp, lhss)
168 } else {
169 return internal_err!(
170 "MIN/MAX is not expected to receive scalars of incompatible types {:?}",
171 (lhs, rhs)
172 );
173 }
174 }
175 (
176 lhs @ ScalarValue::Decimal128(lhsv, lhsp, lhss),
177 rhs @ ScalarValue::Decimal128(rhsv, rhsp, rhss)
178 ) => {
179 if lhsp.eq(rhsp) && lhss.eq(rhss) {
180 typed_min_max!(lhsv, rhsv, Decimal128, $OP, lhsp, lhss)
181 } else {
182 return internal_err!(
183 "MIN/MAX is not expected to receive scalars of incompatible types {:?}",
184 (lhs, rhs)
185 );
186 }
187 }
188 (
189 lhs @ ScalarValue::Decimal256(lhsv, lhsp, lhss),
190 rhs @ ScalarValue::Decimal256(rhsv, rhsp, rhss)
191 ) => {
192 if lhsp.eq(rhsp) && lhss.eq(rhss) {
193 typed_min_max!(lhsv, rhsv, Decimal256, $OP, lhsp, lhss)
194 } else {
195 return internal_err!(
196 "MIN/MAX is not expected to receive scalars of incompatible types {:?}",
197 (lhs, rhs)
198 );
199 }
200 }
201 (ScalarValue::Boolean(lhs), ScalarValue::Boolean(rhs)) => {
202 typed_min_max!(lhs, rhs, Boolean, $OP)
203 }
204 (ScalarValue::Float64(lhs), ScalarValue::Float64(rhs)) => {
205 typed_min_max_float!(lhs, rhs, Float64, $OP)
206 }
207 (ScalarValue::Float32(lhs), ScalarValue::Float32(rhs)) => {
208 typed_min_max_float!(lhs, rhs, Float32, $OP)
209 }
210 (ScalarValue::Float16(lhs), ScalarValue::Float16(rhs)) => {
211 typed_min_max_float!(lhs, rhs, Float16, $OP)
212 }
213 (ScalarValue::UInt64(lhs), ScalarValue::UInt64(rhs)) => {
214 typed_min_max!(lhs, rhs, UInt64, $OP)
215 }
216 (ScalarValue::UInt32(lhs), ScalarValue::UInt32(rhs)) => {
217 typed_min_max!(lhs, rhs, UInt32, $OP)
218 }
219 (ScalarValue::UInt16(lhs), ScalarValue::UInt16(rhs)) => {
220 typed_min_max!(lhs, rhs, UInt16, $OP)
221 }
222 (ScalarValue::UInt8(lhs), ScalarValue::UInt8(rhs)) => {
223 typed_min_max!(lhs, rhs, UInt8, $OP)
224 }
225 (ScalarValue::Int64(lhs), ScalarValue::Int64(rhs)) => {
226 typed_min_max!(lhs, rhs, Int64, $OP)
227 }
228 (ScalarValue::Int32(lhs), ScalarValue::Int32(rhs)) => {
229 typed_min_max!(lhs, rhs, Int32, $OP)
230 }
231 (ScalarValue::Int16(lhs), ScalarValue::Int16(rhs)) => {
232 typed_min_max!(lhs, rhs, Int16, $OP)
233 }
234 (ScalarValue::Int8(lhs), ScalarValue::Int8(rhs)) => {
235 typed_min_max!(lhs, rhs, Int8, $OP)
236 }
237 (ScalarValue::Utf8(lhs), ScalarValue::Utf8(rhs)) => {
238 typed_min_max_string!(lhs, rhs, Utf8, $OP)
239 }
240 (ScalarValue::LargeUtf8(lhs), ScalarValue::LargeUtf8(rhs)) => {
241 typed_min_max_string!(lhs, rhs, LargeUtf8, $OP)
242 }
243 (ScalarValue::Utf8View(lhs), ScalarValue::Utf8View(rhs)) => {
244 typed_min_max_string!(lhs, rhs, Utf8View, $OP)
245 }
246 (ScalarValue::Binary(lhs), ScalarValue::Binary(rhs)) => {
247 typed_min_max_string!(lhs, rhs, Binary, $OP)
248 }
249 (ScalarValue::LargeBinary(lhs), ScalarValue::LargeBinary(rhs)) => {
250 typed_min_max_string!(lhs, rhs, LargeBinary, $OP)
251 }
252 (ScalarValue::FixedSizeBinary(lsize, lhs), ScalarValue::FixedSizeBinary(rsize, rhs)) => {
253 if lsize == rsize {
254 typed_min_max_string_arg!(lhs, rhs, FixedSizeBinary, $OP, *lsize)
255 }
256 else {
257 return internal_err!(
258 "MIN/MAX is not expected to receive FixedSizeBinary of incompatible sizes {:?}",
259 (lsize, rsize))
260 }
261 }
262 (ScalarValue::BinaryView(lhs), ScalarValue::BinaryView(rhs)) => {
263 typed_min_max_string!(lhs, rhs, BinaryView, $OP)
264 }
265 (ScalarValue::TimestampSecond(lhs, l_tz), ScalarValue::TimestampSecond(rhs, _)) => {
266 typed_min_max!(lhs, rhs, TimestampSecond, $OP, l_tz)
267 }
268 (
269 ScalarValue::TimestampMillisecond(lhs, l_tz),
270 ScalarValue::TimestampMillisecond(rhs, _),
271 ) => {
272 typed_min_max!(lhs, rhs, TimestampMillisecond, $OP, l_tz)
273 }
274 (
275 ScalarValue::TimestampMicrosecond(lhs, l_tz),
276 ScalarValue::TimestampMicrosecond(rhs, _),
277 ) => {
278 typed_min_max!(lhs, rhs, TimestampMicrosecond, $OP, l_tz)
279 }
280 (
281 ScalarValue::TimestampNanosecond(lhs, l_tz),
282 ScalarValue::TimestampNanosecond(rhs, _),
283 ) => {
284 typed_min_max!(lhs, rhs, TimestampNanosecond, $OP, l_tz)
285 }
286 (
287 ScalarValue::Date32(lhs),
288 ScalarValue::Date32(rhs),
289 ) => {
290 typed_min_max!(lhs, rhs, Date32, $OP)
291 }
292 (
293 ScalarValue::Date64(lhs),
294 ScalarValue::Date64(rhs),
295 ) => {
296 typed_min_max!(lhs, rhs, Date64, $OP)
297 }
298 (
299 ScalarValue::Time32Second(lhs),
300 ScalarValue::Time32Second(rhs),
301 ) => {
302 typed_min_max!(lhs, rhs, Time32Second, $OP)
303 }
304 (
305 ScalarValue::Time32Millisecond(lhs),
306 ScalarValue::Time32Millisecond(rhs),
307 ) => {
308 typed_min_max!(lhs, rhs, Time32Millisecond, $OP)
309 }
310 (
311 ScalarValue::Time64Microsecond(lhs),
312 ScalarValue::Time64Microsecond(rhs),
313 ) => {
314 typed_min_max!(lhs, rhs, Time64Microsecond, $OP)
315 }
316 (
317 ScalarValue::Time64Nanosecond(lhs),
318 ScalarValue::Time64Nanosecond(rhs),
319 ) => {
320 typed_min_max!(lhs, rhs, Time64Nanosecond, $OP)
321 }
322 (
323 ScalarValue::IntervalYearMonth(lhs),
324 ScalarValue::IntervalYearMonth(rhs),
325 ) => {
326 typed_min_max!(lhs, rhs, IntervalYearMonth, $OP)
327 }
328 (
329 ScalarValue::IntervalMonthDayNano(lhs),
330 ScalarValue::IntervalMonthDayNano(rhs),
331 ) => {
332 typed_min_max!(lhs, rhs, IntervalMonthDayNano, $OP)
333 }
334 (
335 ScalarValue::IntervalDayTime(lhs),
336 ScalarValue::IntervalDayTime(rhs),
337 ) => {
338 typed_min_max!(lhs, rhs, IntervalDayTime, $OP)
339 }
340 (
341 ScalarValue::IntervalYearMonth(_),
342 ScalarValue::IntervalMonthDayNano(_),
343 ) | (
344 ScalarValue::IntervalYearMonth(_),
345 ScalarValue::IntervalDayTime(_),
346 ) | (
347 ScalarValue::IntervalMonthDayNano(_),
348 ScalarValue::IntervalDayTime(_),
349 ) | (
350 ScalarValue::IntervalMonthDayNano(_),
351 ScalarValue::IntervalYearMonth(_),
352 ) | (
353 ScalarValue::IntervalDayTime(_),
354 ScalarValue::IntervalYearMonth(_),
355 ) | (
356 ScalarValue::IntervalDayTime(_),
357 ScalarValue::IntervalMonthDayNano(_),
358 ) => {
359 interval_min_max!($OP, $VALUE, $DELTA)
360 }
361 (
362 ScalarValue::DurationSecond(lhs),
363 ScalarValue::DurationSecond(rhs),
364 ) => {
365 typed_min_max!(lhs, rhs, DurationSecond, $OP)
366 }
367 (
368 ScalarValue::DurationMillisecond(lhs),
369 ScalarValue::DurationMillisecond(rhs),
370 ) => {
371 typed_min_max!(lhs, rhs, DurationMillisecond, $OP)
372 }
373 (
374 ScalarValue::DurationMicrosecond(lhs),
375 ScalarValue::DurationMicrosecond(rhs),
376 ) => {
377 typed_min_max!(lhs, rhs, DurationMicrosecond, $OP)
378 }
379 (
380 ScalarValue::DurationNanosecond(lhs),
381 ScalarValue::DurationNanosecond(rhs),
382 ) => {
383 typed_min_max!(lhs, rhs, DurationNanosecond, $OP)
384 }
385
386 (
387 lhs @ ScalarValue::Struct(_),
388 rhs @ ScalarValue::Struct(_),
389 ) => {
390 min_max_generic!(lhs, rhs, $OP)
391 }
392
393 (
394 lhs @ ScalarValue::List(_),
395 rhs @ ScalarValue::List(_),
396 ) => {
397 min_max_generic!(lhs, rhs, $OP)
398 }
399
400
401 (
402 lhs @ ScalarValue::LargeList(_),
403 rhs @ ScalarValue::LargeList(_),
404 ) => {
405 min_max_generic!(lhs, rhs, $OP)
406 }
407
408
409 (
410 lhs @ ScalarValue::FixedSizeList(_),
411 rhs @ ScalarValue::FixedSizeList(_),
412 ) => {
413 min_max_generic!(lhs, rhs, $OP)
414 }
415
416 e => {
417 return internal_err!(
418 "MIN/MAX is not expected to receive scalars of incompatible types {:?}",
419 e
420 )
421 }
422 })
423 }};
424}
425
426#[derive(Debug, Clone)]
428pub struct MaxAccumulator {
429 max: ScalarValue,
430}
431
432impl MaxAccumulator {
433 pub fn try_new(datatype: &DataType) -> Result<Self> {
435 Ok(Self {
436 max: ScalarValue::try_from(datatype)?,
437 })
438 }
439}
440
441impl Accumulator for MaxAccumulator {
442 fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
443 let values = &values[0];
444 let delta = &max_batch(values)?;
445 let new_max: Result<ScalarValue, DataFusionError> =
446 min_max!(&self.max, delta, max);
447 self.max = new_max?;
448 Ok(())
449 }
450
451 fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
452 self.update_batch(states)
453 }
454
455 fn state(&mut self) -> Result<Vec<ScalarValue>> {
456 Ok(vec![self.evaluate()?])
457 }
458 fn evaluate(&mut self) -> Result<ScalarValue> {
459 Ok(self.max.clone())
460 }
461
462 fn size(&self) -> usize {
463 size_of_val(self) - size_of_val(&self.max) + self.max.size()
464 }
465}
466
467#[derive(Debug, Clone)]
469pub struct MinAccumulator {
470 min: ScalarValue,
471}
472
473impl MinAccumulator {
474 pub fn try_new(datatype: &DataType) -> Result<Self> {
476 Ok(Self {
477 min: ScalarValue::try_from(datatype)?,
478 })
479 }
480}
481
482impl Accumulator for MinAccumulator {
483 fn state(&mut self) -> Result<Vec<ScalarValue>> {
484 Ok(vec![self.evaluate()?])
485 }
486
487 fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
488 let values = &values[0];
489 let delta = &min_batch(values)?;
490 let new_min: Result<ScalarValue, DataFusionError> =
491 min_max!(&self.min, delta, min);
492 self.min = new_min?;
493 Ok(())
494 }
495
496 fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
497 self.update_batch(states)
498 }
499
500 fn evaluate(&mut self) -> Result<ScalarValue> {
501 Ok(self.min.clone())
502 }
503
504 fn size(&self) -> usize {
505 size_of_val(self) - size_of_val(&self.min) + self.min.size()
506 }
507}
508
509macro_rules! typed_min_max_batch_string {
511 ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
512 let array = downcast_value!($VALUES, $ARRAYTYPE);
513 let value = compute::$OP(array);
514 let value = value.and_then(|e| Some(e.to_string()));
515 ScalarValue::$SCALAR(value)
516 }};
517}
518
519macro_rules! typed_min_max_batch_binary {
521 ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
522 let array = downcast_value!($VALUES, $ARRAYTYPE);
523 let value = compute::$OP(array);
524 let value = value.and_then(|e| Some(e.to_vec()));
525 ScalarValue::$SCALAR(value)
526 }};
527}
528
529macro_rules! typed_min_max_batch {
531 ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident $(, $EXTRA_ARGS:ident)*) => {{
532 let array = downcast_value!($VALUES, $ARRAYTYPE);
533 let value = compute::$OP(array);
534 ScalarValue::$SCALAR(value, $($EXTRA_ARGS.clone()),*)
535 }};
536}
537
538macro_rules! min_max_batch {
541 ($VALUES:expr, $OP:ident) => {{
542 match $VALUES.data_type() {
543 DataType::Null => ScalarValue::Null,
544 DataType::Decimal32(precision, scale) => {
545 typed_min_max_batch!(
546 $VALUES,
547 Decimal32Array,
548 Decimal32,
549 $OP,
550 precision,
551 scale
552 )
553 }
554 DataType::Decimal64(precision, scale) => {
555 typed_min_max_batch!(
556 $VALUES,
557 Decimal64Array,
558 Decimal64,
559 $OP,
560 precision,
561 scale
562 )
563 }
564 DataType::Decimal128(precision, scale) => {
565 typed_min_max_batch!(
566 $VALUES,
567 Decimal128Array,
568 Decimal128,
569 $OP,
570 precision,
571 scale
572 )
573 }
574 DataType::Decimal256(precision, scale) => {
575 typed_min_max_batch!(
576 $VALUES,
577 Decimal256Array,
578 Decimal256,
579 $OP,
580 precision,
581 scale
582 )
583 }
584 DataType::Float64 => {
586 typed_min_max_batch!($VALUES, Float64Array, Float64, $OP)
587 }
588 DataType::Float32 => {
589 typed_min_max_batch!($VALUES, Float32Array, Float32, $OP)
590 }
591 DataType::Float16 => {
592 typed_min_max_batch!($VALUES, Float16Array, Float16, $OP)
593 }
594 DataType::Int64 => typed_min_max_batch!($VALUES, Int64Array, Int64, $OP),
595 DataType::Int32 => typed_min_max_batch!($VALUES, Int32Array, Int32, $OP),
596 DataType::Int16 => typed_min_max_batch!($VALUES, Int16Array, Int16, $OP),
597 DataType::Int8 => typed_min_max_batch!($VALUES, Int8Array, Int8, $OP),
598 DataType::UInt64 => typed_min_max_batch!($VALUES, UInt64Array, UInt64, $OP),
599 DataType::UInt32 => typed_min_max_batch!($VALUES, UInt32Array, UInt32, $OP),
600 DataType::UInt16 => typed_min_max_batch!($VALUES, UInt16Array, UInt16, $OP),
601 DataType::UInt8 => typed_min_max_batch!($VALUES, UInt8Array, UInt8, $OP),
602 DataType::Timestamp(TimeUnit::Second, tz_opt) => {
603 typed_min_max_batch!(
604 $VALUES,
605 TimestampSecondArray,
606 TimestampSecond,
607 $OP,
608 tz_opt
609 )
610 }
611 DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => typed_min_max_batch!(
612 $VALUES,
613 TimestampMillisecondArray,
614 TimestampMillisecond,
615 $OP,
616 tz_opt
617 ),
618 DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => typed_min_max_batch!(
619 $VALUES,
620 TimestampMicrosecondArray,
621 TimestampMicrosecond,
622 $OP,
623 tz_opt
624 ),
625 DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => typed_min_max_batch!(
626 $VALUES,
627 TimestampNanosecondArray,
628 TimestampNanosecond,
629 $OP,
630 tz_opt
631 ),
632 DataType::Date32 => typed_min_max_batch!($VALUES, Date32Array, Date32, $OP),
633 DataType::Date64 => typed_min_max_batch!($VALUES, Date64Array, Date64, $OP),
634 DataType::Time32(TimeUnit::Second) => {
635 typed_min_max_batch!($VALUES, Time32SecondArray, Time32Second, $OP)
636 }
637 DataType::Time32(TimeUnit::Millisecond) => {
638 typed_min_max_batch!(
639 $VALUES,
640 Time32MillisecondArray,
641 Time32Millisecond,
642 $OP
643 )
644 }
645 DataType::Time64(TimeUnit::Microsecond) => {
646 typed_min_max_batch!(
647 $VALUES,
648 Time64MicrosecondArray,
649 Time64Microsecond,
650 $OP
651 )
652 }
653 DataType::Time64(TimeUnit::Nanosecond) => {
654 typed_min_max_batch!(
655 $VALUES,
656 Time64NanosecondArray,
657 Time64Nanosecond,
658 $OP
659 )
660 }
661 DataType::Interval(IntervalUnit::YearMonth) => {
662 typed_min_max_batch!(
663 $VALUES,
664 IntervalYearMonthArray,
665 IntervalYearMonth,
666 $OP
667 )
668 }
669 DataType::Interval(IntervalUnit::DayTime) => {
670 typed_min_max_batch!($VALUES, IntervalDayTimeArray, IntervalDayTime, $OP)
671 }
672 DataType::Interval(IntervalUnit::MonthDayNano) => {
673 typed_min_max_batch!(
674 $VALUES,
675 IntervalMonthDayNanoArray,
676 IntervalMonthDayNano,
677 $OP
678 )
679 }
680 DataType::Duration(TimeUnit::Second) => {
681 typed_min_max_batch!($VALUES, DurationSecondArray, DurationSecond, $OP)
682 }
683 DataType::Duration(TimeUnit::Millisecond) => {
684 typed_min_max_batch!(
685 $VALUES,
686 DurationMillisecondArray,
687 DurationMillisecond,
688 $OP
689 )
690 }
691 DataType::Duration(TimeUnit::Microsecond) => {
692 typed_min_max_batch!(
693 $VALUES,
694 DurationMicrosecondArray,
695 DurationMicrosecond,
696 $OP
697 )
698 }
699 DataType::Duration(TimeUnit::Nanosecond) => {
700 typed_min_max_batch!(
701 $VALUES,
702 DurationNanosecondArray,
703 DurationNanosecond,
704 $OP
705 )
706 }
707 other => {
708 return datafusion_common::internal_err!(
710 "Min/Max accumulator not implemented for type {}",
711 other
712 );
713 }
714 }
715 }};
716}
717
718pub fn min_batch(values: &ArrayRef) -> Result<ScalarValue> {
720 Ok(match values.data_type() {
721 DataType::Utf8 => {
722 typed_min_max_batch_string!(values, StringArray, Utf8, min_string)
723 }
724 DataType::LargeUtf8 => {
725 typed_min_max_batch_string!(values, LargeStringArray, LargeUtf8, min_string)
726 }
727 DataType::Utf8View => {
728 typed_min_max_batch_string!(
729 values,
730 StringViewArray,
731 Utf8View,
732 min_string_view
733 )
734 }
735 DataType::Boolean => {
736 typed_min_max_batch!(values, BooleanArray, Boolean, min_boolean)
737 }
738 DataType::Binary => {
739 typed_min_max_batch_binary!(&values, BinaryArray, Binary, min_binary)
740 }
741 DataType::LargeBinary => {
742 typed_min_max_batch_binary!(
743 &values,
744 LargeBinaryArray,
745 LargeBinary,
746 min_binary
747 )
748 }
749 DataType::FixedSizeBinary(size) => {
750 let array = downcast_value!(&values, FixedSizeBinaryArray);
751 let value = compute::min_fixed_size_binary(array);
752 let value = value.map(|e| e.to_vec());
753 ScalarValue::FixedSizeBinary(*size, value)
754 }
755 DataType::BinaryView => {
756 typed_min_max_batch_binary!(
757 &values,
758 BinaryViewArray,
759 BinaryView,
760 min_binary_view
761 )
762 }
763 DataType::Struct(_) => min_max_batch_generic(values, Ordering::Greater)?,
764 DataType::List(_) => min_max_batch_generic(values, Ordering::Greater)?,
765 DataType::LargeList(_) => min_max_batch_generic(values, Ordering::Greater)?,
766 DataType::FixedSizeList(_, _) => {
767 min_max_batch_generic(values, Ordering::Greater)?
768 }
769 DataType::Dictionary(_, _) => {
770 let values = values.as_any_dictionary().values();
771 min_batch(values)?
772 }
773 _ => min_max_batch!(values, min),
774 })
775}
776
777fn min_max_batch_generic(array: &ArrayRef, ordering: Ordering) -> Result<ScalarValue> {
779 if array.len() == array.null_count() {
780 return ScalarValue::try_from(array.data_type());
781 }
782 let mut extreme = ScalarValue::try_from_array(array, 0)?;
783 for i in 1..array.len() {
784 let current = ScalarValue::try_from_array(array, i)?;
785 if current.is_null() {
786 continue;
787 }
788 if extreme.is_null() {
789 extreme = current;
790 continue;
791 }
792 let cmp = extreme.try_cmp(¤t)?;
793 if cmp == ordering {
794 extreme = current;
795 }
796 }
797
798 Ok(extreme)
799}
800
801pub fn max_batch(values: &ArrayRef) -> Result<ScalarValue> {
803 Ok(match values.data_type() {
804 DataType::Utf8 => {
805 typed_min_max_batch_string!(values, StringArray, Utf8, max_string)
806 }
807 DataType::LargeUtf8 => {
808 typed_min_max_batch_string!(values, LargeStringArray, LargeUtf8, max_string)
809 }
810 DataType::Utf8View => {
811 typed_min_max_batch_string!(
812 values,
813 StringViewArray,
814 Utf8View,
815 max_string_view
816 )
817 }
818 DataType::Boolean => {
819 typed_min_max_batch!(values, BooleanArray, Boolean, max_boolean)
820 }
821 DataType::Binary => {
822 typed_min_max_batch_binary!(&values, BinaryArray, Binary, max_binary)
823 }
824 DataType::BinaryView => {
825 typed_min_max_batch_binary!(
826 &values,
827 BinaryViewArray,
828 BinaryView,
829 max_binary_view
830 )
831 }
832 DataType::LargeBinary => {
833 typed_min_max_batch_binary!(
834 &values,
835 LargeBinaryArray,
836 LargeBinary,
837 max_binary
838 )
839 }
840 DataType::FixedSizeBinary(size) => {
841 let array = downcast_value!(&values, FixedSizeBinaryArray);
842 let value = compute::max_fixed_size_binary(array);
843 let value = value.map(|e| e.to_vec());
844 ScalarValue::FixedSizeBinary(*size, value)
845 }
846 DataType::Struct(_) => min_max_batch_generic(values, Ordering::Less)?,
847 DataType::List(_) => min_max_batch_generic(values, Ordering::Less)?,
848 DataType::LargeList(_) => min_max_batch_generic(values, Ordering::Less)?,
849 DataType::FixedSizeList(_, _) => min_max_batch_generic(values, Ordering::Less)?,
850 DataType::Dictionary(_, _) => {
851 let values = values.as_any_dictionary().values();
852 max_batch(values)?
853 }
854 _ => min_max_batch!(values, max),
855 })
856}