1use arrow::array::{
21 ArrayRef, AsArray as _, BinaryArray, BinaryViewArray, BooleanArray, Date32Array,
22 Date64Array, Decimal128Array, Decimal256Array, DurationMicrosecondArray,
23 DurationMillisecondArray, DurationNanosecondArray, DurationSecondArray,
24 FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array, Int16Array,
25 Int32Array, Int64Array, Int8Array, IntervalDayTimeArray, IntervalMonthDayNanoArray,
26 IntervalYearMonthArray, LargeBinaryArray, LargeStringArray, StringArray,
27 StringViewArray, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
28 Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
29 TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array,
30 UInt64Array, UInt8Array,
31};
32use arrow::compute;
33use arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
34use datafusion_common::{downcast_value, Result, ScalarValue};
35use std::cmp::Ordering;
36
37macro_rules! typed_min_max_batch_string {
39 ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
40 let array = downcast_value!($VALUES, $ARRAYTYPE);
41 let value = compute::$OP(array);
42 let value = value.and_then(|e| Some(e.to_string()));
43 ScalarValue::$SCALAR(value)
44 }};
45}
46
47macro_rules! typed_min_max_batch_binary {
49 ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
50 let array = downcast_value!($VALUES, $ARRAYTYPE);
51 let value = compute::$OP(array);
52 let value = value.and_then(|e| Some(e.to_vec()));
53 ScalarValue::$SCALAR(value)
54 }};
55}
56
57macro_rules! typed_min_max_batch {
59 ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident $(, $EXTRA_ARGS:ident)*) => {{
60 let array = downcast_value!($VALUES, $ARRAYTYPE);
61 let value = compute::$OP(array);
62 ScalarValue::$SCALAR(value, $($EXTRA_ARGS.clone()),*)
63 }};
64}
65
66macro_rules! min_max_batch {
69 ($VALUES:expr, $OP:ident) => {{
70 match $VALUES.data_type() {
71 DataType::Null => ScalarValue::Null,
72 DataType::Decimal128(precision, scale) => {
73 typed_min_max_batch!(
74 $VALUES,
75 Decimal128Array,
76 Decimal128,
77 $OP,
78 precision,
79 scale
80 )
81 }
82 DataType::Decimal256(precision, scale) => {
83 typed_min_max_batch!(
84 $VALUES,
85 Decimal256Array,
86 Decimal256,
87 $OP,
88 precision,
89 scale
90 )
91 }
92 DataType::Float64 => {
94 typed_min_max_batch!($VALUES, Float64Array, Float64, $OP)
95 }
96 DataType::Float32 => {
97 typed_min_max_batch!($VALUES, Float32Array, Float32, $OP)
98 }
99 DataType::Float16 => {
100 typed_min_max_batch!($VALUES, Float16Array, Float16, $OP)
101 }
102 DataType::Int64 => typed_min_max_batch!($VALUES, Int64Array, Int64, $OP),
103 DataType::Int32 => typed_min_max_batch!($VALUES, Int32Array, Int32, $OP),
104 DataType::Int16 => typed_min_max_batch!($VALUES, Int16Array, Int16, $OP),
105 DataType::Int8 => typed_min_max_batch!($VALUES, Int8Array, Int8, $OP),
106 DataType::UInt64 => typed_min_max_batch!($VALUES, UInt64Array, UInt64, $OP),
107 DataType::UInt32 => typed_min_max_batch!($VALUES, UInt32Array, UInt32, $OP),
108 DataType::UInt16 => typed_min_max_batch!($VALUES, UInt16Array, UInt16, $OP),
109 DataType::UInt8 => typed_min_max_batch!($VALUES, UInt8Array, UInt8, $OP),
110 DataType::Timestamp(TimeUnit::Second, tz_opt) => {
111 typed_min_max_batch!(
112 $VALUES,
113 TimestampSecondArray,
114 TimestampSecond,
115 $OP,
116 tz_opt
117 )
118 }
119 DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => typed_min_max_batch!(
120 $VALUES,
121 TimestampMillisecondArray,
122 TimestampMillisecond,
123 $OP,
124 tz_opt
125 ),
126 DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => typed_min_max_batch!(
127 $VALUES,
128 TimestampMicrosecondArray,
129 TimestampMicrosecond,
130 $OP,
131 tz_opt
132 ),
133 DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => typed_min_max_batch!(
134 $VALUES,
135 TimestampNanosecondArray,
136 TimestampNanosecond,
137 $OP,
138 tz_opt
139 ),
140 DataType::Date32 => typed_min_max_batch!($VALUES, Date32Array, Date32, $OP),
141 DataType::Date64 => typed_min_max_batch!($VALUES, Date64Array, Date64, $OP),
142 DataType::Time32(TimeUnit::Second) => {
143 typed_min_max_batch!($VALUES, Time32SecondArray, Time32Second, $OP)
144 }
145 DataType::Time32(TimeUnit::Millisecond) => {
146 typed_min_max_batch!(
147 $VALUES,
148 Time32MillisecondArray,
149 Time32Millisecond,
150 $OP
151 )
152 }
153 DataType::Time64(TimeUnit::Microsecond) => {
154 typed_min_max_batch!(
155 $VALUES,
156 Time64MicrosecondArray,
157 Time64Microsecond,
158 $OP
159 )
160 }
161 DataType::Time64(TimeUnit::Nanosecond) => {
162 typed_min_max_batch!(
163 $VALUES,
164 Time64NanosecondArray,
165 Time64Nanosecond,
166 $OP
167 )
168 }
169 DataType::Interval(IntervalUnit::YearMonth) => {
170 typed_min_max_batch!(
171 $VALUES,
172 IntervalYearMonthArray,
173 IntervalYearMonth,
174 $OP
175 )
176 }
177 DataType::Interval(IntervalUnit::DayTime) => {
178 typed_min_max_batch!($VALUES, IntervalDayTimeArray, IntervalDayTime, $OP)
179 }
180 DataType::Interval(IntervalUnit::MonthDayNano) => {
181 typed_min_max_batch!(
182 $VALUES,
183 IntervalMonthDayNanoArray,
184 IntervalMonthDayNano,
185 $OP
186 )
187 }
188 DataType::Duration(TimeUnit::Second) => {
189 typed_min_max_batch!($VALUES, DurationSecondArray, DurationSecond, $OP)
190 }
191 DataType::Duration(TimeUnit::Millisecond) => {
192 typed_min_max_batch!(
193 $VALUES,
194 DurationMillisecondArray,
195 DurationMillisecond,
196 $OP
197 )
198 }
199 DataType::Duration(TimeUnit::Microsecond) => {
200 typed_min_max_batch!(
201 $VALUES,
202 DurationMicrosecondArray,
203 DurationMicrosecond,
204 $OP
205 )
206 }
207 DataType::Duration(TimeUnit::Nanosecond) => {
208 typed_min_max_batch!(
209 $VALUES,
210 DurationNanosecondArray,
211 DurationNanosecond,
212 $OP
213 )
214 }
215 other => {
216 return datafusion_common::internal_err!(
218 "Min/Max accumulator not implemented for type {:?}",
219 other
220 );
221 }
222 }
223 }};
224}
225
226pub fn min_batch(values: &ArrayRef) -> Result<ScalarValue> {
228 Ok(match values.data_type() {
229 DataType::Utf8 => {
230 typed_min_max_batch_string!(values, StringArray, Utf8, min_string)
231 }
232 DataType::LargeUtf8 => {
233 typed_min_max_batch_string!(values, LargeStringArray, LargeUtf8, min_string)
234 }
235 DataType::Utf8View => {
236 typed_min_max_batch_string!(
237 values,
238 StringViewArray,
239 Utf8View,
240 min_string_view
241 )
242 }
243 DataType::Boolean => {
244 typed_min_max_batch!(values, BooleanArray, Boolean, min_boolean)
245 }
246 DataType::Binary => {
247 typed_min_max_batch_binary!(&values, BinaryArray, Binary, min_binary)
248 }
249 DataType::LargeBinary => {
250 typed_min_max_batch_binary!(
251 &values,
252 LargeBinaryArray,
253 LargeBinary,
254 min_binary
255 )
256 }
257 DataType::FixedSizeBinary(size) => {
258 let array = downcast_value!(&values, FixedSizeBinaryArray);
259 let value = compute::min_fixed_size_binary(array);
260 let value = value.map(|e| e.to_vec());
261 ScalarValue::FixedSizeBinary(*size, value)
262 }
263 DataType::BinaryView => {
264 typed_min_max_batch_binary!(
265 &values,
266 BinaryViewArray,
267 BinaryView,
268 min_binary_view
269 )
270 }
271 DataType::Struct(_) => min_max_batch_generic(values, Ordering::Greater)?,
272 DataType::List(_) => min_max_batch_generic(values, Ordering::Greater)?,
273 DataType::LargeList(_) => min_max_batch_generic(values, Ordering::Greater)?,
274 DataType::FixedSizeList(_, _) => {
275 min_max_batch_generic(values, Ordering::Greater)?
276 }
277 DataType::Dictionary(_, _) => {
278 let values = values.as_any_dictionary().values();
279 min_batch(values)?
280 }
281 _ => min_max_batch!(values, min),
282 })
283}
284
285fn min_max_batch_generic(array: &ArrayRef, ordering: Ordering) -> Result<ScalarValue> {
287 if array.len() == array.null_count() {
288 return ScalarValue::try_from(array.data_type());
289 }
290 let mut extreme = ScalarValue::try_from_array(array, 0)?;
291 for i in 1..array.len() {
292 let current = ScalarValue::try_from_array(array, i)?;
293 if current.is_null() {
294 continue;
295 }
296 if extreme.is_null() {
297 extreme = current;
298 continue;
299 }
300 let cmp = extreme.try_cmp(¤t)?;
301 if cmp == ordering {
302 extreme = current;
303 }
304 }
305
306 Ok(extreme)
307}
308
309pub fn max_batch(values: &ArrayRef) -> Result<ScalarValue> {
311 Ok(match values.data_type() {
312 DataType::Utf8 => {
313 typed_min_max_batch_string!(values, StringArray, Utf8, max_string)
314 }
315 DataType::LargeUtf8 => {
316 typed_min_max_batch_string!(values, LargeStringArray, LargeUtf8, max_string)
317 }
318 DataType::Utf8View => {
319 typed_min_max_batch_string!(
320 values,
321 StringViewArray,
322 Utf8View,
323 max_string_view
324 )
325 }
326 DataType::Boolean => {
327 typed_min_max_batch!(values, BooleanArray, Boolean, max_boolean)
328 }
329 DataType::Binary => {
330 typed_min_max_batch_binary!(&values, BinaryArray, Binary, max_binary)
331 }
332 DataType::BinaryView => {
333 typed_min_max_batch_binary!(
334 &values,
335 BinaryViewArray,
336 BinaryView,
337 max_binary_view
338 )
339 }
340 DataType::LargeBinary => {
341 typed_min_max_batch_binary!(
342 &values,
343 LargeBinaryArray,
344 LargeBinary,
345 max_binary
346 )
347 }
348 DataType::FixedSizeBinary(size) => {
349 let array = downcast_value!(&values, FixedSizeBinaryArray);
350 let value = compute::max_fixed_size_binary(array);
351 let value = value.map(|e| e.to_vec());
352 ScalarValue::FixedSizeBinary(*size, value)
353 }
354 DataType::Struct(_) => min_max_batch_generic(values, Ordering::Less)?,
355 DataType::List(_) => min_max_batch_generic(values, Ordering::Less)?,
356 DataType::LargeList(_) => min_max_batch_generic(values, Ordering::Less)?,
357 DataType::FixedSizeList(_, _) => min_max_batch_generic(values, Ordering::Less)?,
358 DataType::Dictionary(_, _) => {
359 let values = values.as_any_dictionary().values();
360 max_batch(values)?
361 }
362 _ => min_max_batch!(values, max),
363 })
364}