1use arrow::array::{
19 Array, ArrayRef, AsArray, BinaryBuilder, BinaryViewBuilder, BooleanArray,
20 LargeBinaryBuilder, LargeStringBuilder, StringBuilder, StringViewBuilder,
21};
22use arrow::datatypes::DataType;
23use datafusion_common::{internal_err, Result};
24use datafusion_expr::{EmitTo, GroupsAccumulator};
25use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::apply_filter_as_nulls;
26use std::mem::size_of;
27use std::sync::Arc;
28
29#[derive(Debug)]
39pub(crate) struct MinMaxBytesAccumulator {
40 inner: MinMaxBytesState,
42 is_min: bool,
44}
45
46impl MinMaxBytesAccumulator {
47 pub fn new_min(data_type: DataType) -> Self {
49 Self {
50 inner: MinMaxBytesState::new(data_type),
51 is_min: true,
52 }
53 }
54
55 pub fn new_max(data_type: DataType) -> Self {
57 Self {
58 inner: MinMaxBytesState::new(data_type),
59 is_min: false,
60 }
61 }
62}
63
64impl GroupsAccumulator for MinMaxBytesAccumulator {
65 fn update_batch(
66 &mut self,
67 values: &[ArrayRef],
68 group_indices: &[usize],
69 opt_filter: Option<&BooleanArray>,
70 total_num_groups: usize,
71 ) -> Result<()> {
72 let array = &values[0];
73 assert_eq!(array.len(), group_indices.len());
74 assert_eq!(array.data_type(), &self.inner.data_type);
75
76 let array = apply_filter_as_nulls(array, opt_filter)?;
78
79 fn string_min(a: &[u8], b: &[u8]) -> bool {
81 unsafe {
84 let a = std::str::from_utf8_unchecked(a);
85 let b = std::str::from_utf8_unchecked(b);
86 a < b
87 }
88 }
89 fn string_max(a: &[u8], b: &[u8]) -> bool {
90 unsafe {
93 let a = std::str::from_utf8_unchecked(a);
94 let b = std::str::from_utf8_unchecked(b);
95 a > b
96 }
97 }
98 fn binary_min(a: &[u8], b: &[u8]) -> bool {
99 a < b
100 }
101
102 fn binary_max(a: &[u8], b: &[u8]) -> bool {
103 a > b
104 }
105
106 fn str_to_bytes<'a>(
107 it: impl Iterator<Item = Option<&'a str>>,
108 ) -> impl Iterator<Item = Option<&'a [u8]>> {
109 it.map(|s| s.map(|s| s.as_bytes()))
110 }
111
112 match (self.is_min, &self.inner.data_type) {
113 (true, &DataType::Utf8) => self.inner.update_batch(
115 str_to_bytes(array.as_string::<i32>().iter()),
116 group_indices,
117 total_num_groups,
118 string_min,
119 ),
120 (true, &DataType::LargeUtf8) => self.inner.update_batch(
121 str_to_bytes(array.as_string::<i64>().iter()),
122 group_indices,
123 total_num_groups,
124 string_min,
125 ),
126 (true, &DataType::Utf8View) => self.inner.update_batch(
127 str_to_bytes(array.as_string_view().iter()),
128 group_indices,
129 total_num_groups,
130 string_min,
131 ),
132
133 (false, &DataType::Utf8) => self.inner.update_batch(
135 str_to_bytes(array.as_string::<i32>().iter()),
136 group_indices,
137 total_num_groups,
138 string_max,
139 ),
140 (false, &DataType::LargeUtf8) => self.inner.update_batch(
141 str_to_bytes(array.as_string::<i64>().iter()),
142 group_indices,
143 total_num_groups,
144 string_max,
145 ),
146 (false, &DataType::Utf8View) => self.inner.update_batch(
147 str_to_bytes(array.as_string_view().iter()),
148 group_indices,
149 total_num_groups,
150 string_max,
151 ),
152
153 (true, &DataType::Binary) => self.inner.update_batch(
155 array.as_binary::<i32>().iter(),
156 group_indices,
157 total_num_groups,
158 binary_min,
159 ),
160 (true, &DataType::LargeBinary) => self.inner.update_batch(
161 array.as_binary::<i64>().iter(),
162 group_indices,
163 total_num_groups,
164 binary_min,
165 ),
166 (true, &DataType::BinaryView) => self.inner.update_batch(
167 array.as_binary_view().iter(),
168 group_indices,
169 total_num_groups,
170 binary_min,
171 ),
172
173 (false, &DataType::Binary) => self.inner.update_batch(
175 array.as_binary::<i32>().iter(),
176 group_indices,
177 total_num_groups,
178 binary_max,
179 ),
180 (false, &DataType::LargeBinary) => self.inner.update_batch(
181 array.as_binary::<i64>().iter(),
182 group_indices,
183 total_num_groups,
184 binary_max,
185 ),
186 (false, &DataType::BinaryView) => self.inner.update_batch(
187 array.as_binary_view().iter(),
188 group_indices,
189 total_num_groups,
190 binary_max,
191 ),
192
193 _ => internal_err!(
194 "Unexpected combination for MinMaxBytesAccumulator: ({:?}, {:?})",
195 self.is_min,
196 self.inner.data_type
197 ),
198 }
199 }
200
201 fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
202 let (data_capacity, min_maxes) = self.inner.emit_to(emit_to);
203
204 fn bytes_to_str(
206 min_maxes: Vec<Option<Vec<u8>>>,
207 ) -> impl Iterator<Item = Option<String>> {
208 min_maxes.into_iter().map(|opt| {
209 opt.map(|bytes| {
210 unsafe { String::from_utf8_unchecked(bytes) }
213 })
214 })
215 }
216
217 let result: ArrayRef = match self.inner.data_type {
218 DataType::Utf8 => {
219 let mut builder =
220 StringBuilder::with_capacity(min_maxes.len(), data_capacity);
221 for opt in bytes_to_str(min_maxes) {
222 match opt {
223 None => builder.append_null(),
224 Some(s) => builder.append_value(s.as_str()),
225 }
226 }
227 Arc::new(builder.finish())
228 }
229 DataType::LargeUtf8 => {
230 let mut builder =
231 LargeStringBuilder::with_capacity(min_maxes.len(), data_capacity);
232 for opt in bytes_to_str(min_maxes) {
233 match opt {
234 None => builder.append_null(),
235 Some(s) => builder.append_value(s.as_str()),
236 }
237 }
238 Arc::new(builder.finish())
239 }
240 DataType::Utf8View => {
241 let block_size = capacity_to_view_block_size(data_capacity);
242
243 let mut builder = StringViewBuilder::with_capacity(min_maxes.len())
244 .with_fixed_block_size(block_size);
245 for opt in bytes_to_str(min_maxes) {
246 match opt {
247 None => builder.append_null(),
248 Some(s) => builder.append_value(s.as_str()),
249 }
250 }
251 Arc::new(builder.finish())
252 }
253 DataType::Binary => {
254 let mut builder =
255 BinaryBuilder::with_capacity(min_maxes.len(), data_capacity);
256 for opt in min_maxes {
257 match opt {
258 None => builder.append_null(),
259 Some(s) => builder.append_value(s.as_ref() as &[u8]),
260 }
261 }
262 Arc::new(builder.finish())
263 }
264 DataType::LargeBinary => {
265 let mut builder =
266 LargeBinaryBuilder::with_capacity(min_maxes.len(), data_capacity);
267 for opt in min_maxes {
268 match opt {
269 None => builder.append_null(),
270 Some(s) => builder.append_value(s.as_ref() as &[u8]),
271 }
272 }
273 Arc::new(builder.finish())
274 }
275 DataType::BinaryView => {
276 let block_size = capacity_to_view_block_size(data_capacity);
277
278 let mut builder = BinaryViewBuilder::with_capacity(min_maxes.len())
279 .with_fixed_block_size(block_size);
280 for opt in min_maxes {
281 match opt {
282 None => builder.append_null(),
283 Some(s) => builder.append_value(s.as_ref() as &[u8]),
284 }
285 }
286 Arc::new(builder.finish())
287 }
288 _ => {
289 return internal_err!(
290 "Unexpected data type for MinMaxBytesAccumulator: {:?}",
291 self.inner.data_type
292 );
293 }
294 };
295
296 assert_eq!(&self.inner.data_type, result.data_type());
297 Ok(result)
298 }
299
300 fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
301 self.evaluate(emit_to).map(|arr| vec![arr])
303 }
304
305 fn merge_batch(
306 &mut self,
307 values: &[ArrayRef],
308 group_indices: &[usize],
309 opt_filter: Option<&BooleanArray>,
310 total_num_groups: usize,
311 ) -> Result<()> {
312 self.update_batch(values, group_indices, opt_filter, total_num_groups)
314 }
315
316 fn convert_to_state(
317 &self,
318 values: &[ArrayRef],
319 opt_filter: Option<&BooleanArray>,
320 ) -> Result<Vec<ArrayRef>> {
321 let output = apply_filter_as_nulls(&values[0], opt_filter)?;
324 Ok(vec![output])
325 }
326
327 fn supports_convert_to_state(&self) -> bool {
328 true
329 }
330
331 fn size(&self) -> usize {
332 self.inner.size()
333 }
334}
335
336fn capacity_to_view_block_size(data_capacity: usize) -> u32 {
341 let max_block_size = 2 * 1024 * 1024;
342 if data_capacity == 0 {
344 return 1;
345 }
346 if let Ok(block_size) = u32::try_from(data_capacity) {
347 block_size.min(max_block_size)
348 } else {
349 max_block_size
350 }
351}
352
353#[derive(Debug)]
384struct MinMaxBytesState {
385 min_max: Vec<Option<Vec<u8>>>,
387 data_type: DataType,
389 total_data_bytes: usize,
392}
393
394#[derive(Debug, Clone, Copy)]
395enum MinMaxLocation<'a> {
396 ExistingMinMax,
398 Input(&'a [u8]),
400}
401
402impl MinMaxBytesState {
405 fn new(data_type: DataType) -> Self {
410 Self {
411 min_max: vec![],
412 data_type,
413 total_data_bytes: 0,
414 }
415 }
416
417 fn set_value(&mut self, group_index: usize, new_val: &[u8]) {
419 match self.min_max[group_index].as_mut() {
420 None => {
421 self.min_max[group_index] = Some(new_val.to_vec());
422 self.total_data_bytes += new_val.len();
423 }
424 Some(existing_val) => {
425 self.total_data_bytes -= existing_val.len();
427 self.total_data_bytes += new_val.len();
428 existing_val.clear();
429 existing_val.extend_from_slice(new_val);
430 }
431 }
432 }
433
434 fn update_batch<'a, F, I>(
439 &mut self,
440 iter: I,
441 group_indices: &[usize],
442 total_num_groups: usize,
443 mut cmp: F,
444 ) -> Result<()>
445 where
446 F: FnMut(&[u8], &[u8]) -> bool + Send + Sync,
447 I: IntoIterator<Item = Option<&'a [u8]>>,
448 {
449 self.min_max.resize(total_num_groups, None);
450 let mut locations = vec![MinMaxLocation::ExistingMinMax; total_num_groups];
454
455 for (new_val, group_index) in iter.into_iter().zip(group_indices.iter()) {
457 let group_index = *group_index;
458 let Some(new_val) = new_val else {
459 continue; };
461
462 let existing_val = match locations[group_index] {
463 MinMaxLocation::Input(existing_val) => existing_val,
465 MinMaxLocation::ExistingMinMax => {
466 let Some(existing_val) = self.min_max[group_index].as_ref() else {
467 locations[group_index] = MinMaxLocation::Input(new_val);
469 continue;
470 };
471 existing_val.as_ref()
472 }
473 };
474
475 if cmp(new_val, existing_val) {
477 locations[group_index] = MinMaxLocation::Input(new_val);
478 }
479 }
480
481 for (group_index, location) in locations.iter().enumerate() {
483 match location {
484 MinMaxLocation::ExistingMinMax => {}
485 MinMaxLocation::Input(new_val) => self.set_value(group_index, new_val),
486 }
487 }
488 Ok(())
489 }
490
491 fn emit_to(&mut self, emit_to: EmitTo) -> (usize, Vec<Option<Vec<u8>>>) {
498 match emit_to {
499 EmitTo::All => {
500 (
501 std::mem::take(&mut self.total_data_bytes), std::mem::take(&mut self.min_max),
503 )
504 }
505 EmitTo::First(n) => {
506 let first_min_maxes: Vec<_> = self.min_max.drain(..n).collect();
507 let first_data_capacity: usize = first_min_maxes
508 .iter()
509 .map(|opt| opt.as_ref().map(|s| s.len()).unwrap_or(0))
510 .sum();
511 self.total_data_bytes -= first_data_capacity;
512 (first_data_capacity, first_min_maxes)
513 }
514 }
515 }
516
517 fn size(&self) -> usize {
518 self.total_data_bytes + self.min_max.len() * size_of::<Option<Vec<u8>>>()
519 }
520}