1use std::cmp::Ordering;
21use std::collections::{HashSet, VecDeque};
22use std::mem::{size_of, size_of_val, take};
23use std::sync::Arc;
24
25use arrow::array::{
26 Array, ArrayRef, AsArray, BooleanArray, ListArray, NullBufferBuilder, StructArray,
27 UInt32Array, new_empty_array,
28};
29use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
30use arrow::compute::{SortOptions, filter};
31use arrow::datatypes::{DataType, Field, FieldRef, Fields};
32
33use datafusion_common::cast::as_list_array;
34use datafusion_common::utils::{
35 SingleRowListArrayBuilder, compare_rows, get_row_at_idx, take_function_args,
36};
37use datafusion_common::{Result, ScalarValue, assert_eq_or_internal_err, exec_err};
38use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
39use datafusion_expr::utils::format_state_name;
40use datafusion_expr::{
41 Accumulator, AggregateUDFImpl, Documentation, EmitTo, GroupsAccumulator, Signature,
42 Volatility,
43};
44use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::filter_to_nulls;
45use datafusion_functions_aggregate_common::merge_arrays::merge_ordered_arrays;
46use datafusion_functions_aggregate_common::order::AggregateOrderSensitivity;
47use datafusion_functions_aggregate_common::utils::ordering_fields;
48use datafusion_macros::user_doc;
49use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
50
51make_udaf_expr_and_func!(
52 ArrayAgg,
53 array_agg,
54 expression,
55 "input values, including nulls, concatenated into an array",
56 array_agg_udaf
57);
58
59#[user_doc(
60 doc_section(label = "General Functions"),
61 description = r#"Returns an array created from the expression elements. If ordering is required, elements are inserted in the specified order.
62This aggregation function can only mix DISTINCT and ORDER BY if the ordering expression is exactly the same as the argument expression."#,
63 syntax_example = "array_agg(expression [ORDER BY expression])",
64 sql_example = r#"
65```sql
66> SELECT array_agg(column_name ORDER BY other_column) FROM table_name;
67+-----------------------------------------------+
68| array_agg(column_name ORDER BY other_column) |
69+-----------------------------------------------+
70| [element1, element2, element3] |
71+-----------------------------------------------+
72> SELECT array_agg(DISTINCT column_name ORDER BY column_name) FROM table_name;
73+--------------------------------------------------------+
74| array_agg(DISTINCT column_name ORDER BY column_name) |
75+--------------------------------------------------------+
76| [element1, element2, element3] |
77+--------------------------------------------------------+
78```
79"#,
80 standard_argument(name = "expression",)
81)]
82#[derive(Debug, PartialEq, Eq, Hash)]
83pub struct ArrayAgg {
85 signature: Signature,
86 is_input_pre_ordered: bool,
87}
88
89impl Default for ArrayAgg {
90 fn default() -> Self {
91 Self {
92 signature: Signature::any(1, Volatility::Immutable),
93 is_input_pre_ordered: false,
94 }
95 }
96}
97
98impl AggregateUDFImpl for ArrayAgg {
99 fn name(&self) -> &str {
100 "array_agg"
101 }
102
103 fn signature(&self) -> &Signature {
104 &self.signature
105 }
106
107 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
108 Ok(DataType::List(Arc::new(Field::new_list_field(
109 arg_types[0].clone(),
110 true,
111 ))))
112 }
113
114 fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
115 if args.is_distinct {
116 return Ok(vec![
117 Field::new_list(
118 format_state_name(args.name, "distinct_array_agg"),
119 Field::new_list_field(args.input_fields[0].data_type().clone(), true),
121 true,
122 )
123 .into(),
124 ]);
125 }
126
127 let mut fields = vec![
128 Field::new_list(
129 format_state_name(args.name, "array_agg"),
130 Field::new_list_field(args.input_fields[0].data_type().clone(), true),
132 true,
133 )
134 .into(),
135 ];
136
137 if args.ordering_fields.is_empty() {
138 return Ok(fields);
139 }
140
141 let orderings = args.ordering_fields.to_vec();
142 fields.push(
143 Field::new_list(
144 format_state_name(args.name, "array_agg_orderings"),
145 Field::new_list_field(DataType::Struct(Fields::from(orderings)), true),
146 false,
147 )
148 .into(),
149 );
150
151 Ok(fields)
152 }
153
154 fn order_sensitivity(&self) -> AggregateOrderSensitivity {
155 AggregateOrderSensitivity::SoftRequirement
156 }
157
158 fn with_beneficial_ordering(
159 self: Arc<Self>,
160 beneficial_ordering: bool,
161 ) -> Result<Option<Arc<dyn AggregateUDFImpl>>> {
162 Ok(Some(Arc::new(Self {
163 signature: self.signature.clone(),
164 is_input_pre_ordered: beneficial_ordering,
165 })))
166 }
167
168 fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
169 let field = &acc_args.expr_fields[0];
170 let data_type = field.data_type();
171 let ignore_nulls = acc_args.ignore_nulls && field.is_nullable();
172
173 if acc_args.is_distinct {
174 let sort_option = match acc_args.order_bys {
189 [single] if single.expr.eq(&acc_args.exprs[0]) => Some(single.options),
190 [] => None,
191 _ => {
192 return exec_err!(
193 "In an aggregate with DISTINCT, ORDER BY expressions must appear in argument list"
194 );
195 }
196 };
197 return Ok(Box::new(DistinctArrayAggAccumulator::try_new(
198 data_type,
199 sort_option,
200 ignore_nulls,
201 )?));
202 }
203
204 let Some(ordering) = LexOrdering::new(acc_args.order_bys.to_vec()) else {
205 return Ok(Box::new(ArrayAggAccumulator::try_new(
206 data_type,
207 ignore_nulls,
208 )?));
209 };
210
211 let ordering_dtypes = ordering
212 .iter()
213 .map(|e| e.expr.data_type(acc_args.schema))
214 .collect::<Result<Vec<_>>>()?;
215
216 OrderSensitiveArrayAggAccumulator::try_new(
217 data_type,
218 &ordering_dtypes,
219 ordering,
220 self.is_input_pre_ordered,
221 acc_args.is_reversed,
222 ignore_nulls,
223 )
224 .map(|acc| Box::new(acc) as _)
225 }
226
227 fn reverse_expr(&self) -> datafusion_expr::ReversedUDAF {
228 datafusion_expr::ReversedUDAF::Reversed(array_agg_udaf())
229 }
230
231 fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool {
232 !args.is_distinct && args.order_bys.is_empty()
233 }
234
235 fn create_groups_accumulator(
236 &self,
237 args: AccumulatorArgs,
238 ) -> Result<Box<dyn GroupsAccumulator>> {
239 let field = &args.expr_fields[0];
240 let data_type = field.data_type().clone();
241 let ignore_nulls = args.ignore_nulls && field.is_nullable();
242 Ok(Box::new(ArrayAggGroupsAccumulator::new(
243 data_type,
244 ignore_nulls,
245 )))
246 }
247
248 fn supports_null_handling_clause(&self) -> bool {
249 true
250 }
251
252 fn documentation(&self) -> Option<&Documentation> {
253 self.doc()
254 }
255}
256
257#[derive(Debug)]
258pub struct ArrayAggAccumulator {
259 values: VecDeque<ArrayRef>,
260 datatype: DataType,
261 ignore_nulls: bool,
262 front_offset: usize,
265}
266
267impl ArrayAggAccumulator {
268 pub fn try_new(datatype: &DataType, ignore_nulls: bool) -> Result<Self> {
270 Ok(Self {
271 values: VecDeque::new(),
272 datatype: datatype.clone(),
273 ignore_nulls,
274 front_offset: 0,
275 })
276 }
277
278 fn get_optional_values_to_merge_as_is(list_array: &ListArray) -> Option<ArrayRef> {
281 let offsets = list_array.value_offsets();
282 let initial_offset = offsets[0];
284 let null_count = list_array.null_count();
285
286 if null_count == 0 {
289 let list_values = list_array.values().slice(
291 initial_offset as usize,
292 (offsets[offsets.len() - 1] - initial_offset) as usize,
293 );
294 return Some(list_values);
295 }
296
297 if list_array.null_count() == list_array.len() {
299 return Some(list_array.values().slice(0, 0));
300 }
301
302 let nulls = list_array.nulls().unwrap();
307
308 let mut valid_slices_iter = nulls.valid_slices();
309
310 let (start, end) = valid_slices_iter.next().unwrap();
312
313 let start_offset = offsets[start];
314
315 let mut end_offset_of_last_valid_value = offsets[end];
318
319 for (start, end) in valid_slices_iter {
320 if offsets[start] != end_offset_of_last_valid_value {
323 return None;
324 }
325
326 end_offset_of_last_valid_value = offsets[end];
329 }
330
331 let consecutive_valid_values = list_array.values().slice(
332 start_offset as usize,
333 (end_offset_of_last_valid_value - start_offset) as usize,
334 );
335
336 Some(consecutive_valid_values)
337 }
338}
339
340impl Accumulator for ArrayAggAccumulator {
341 fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
342 if values.is_empty() {
344 return Ok(());
345 }
346
347 assert_eq_or_internal_err!(values.len(), 1, "expects single batch");
348
349 let val = &values[0];
350 let nulls = if self.ignore_nulls {
351 val.logical_nulls()
352 } else {
353 None
354 };
355
356 let val = match nulls {
357 Some(nulls) if nulls.null_count() >= val.len() => return Ok(()),
358 Some(nulls) => filter(val, &BooleanArray::new(nulls.inner().clone(), None))?,
359 None => Arc::clone(val),
360 };
361
362 if !val.is_empty() {
363 self.values.push_back(val)
364 }
365
366 Ok(())
367 }
368
369 fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
370 if states.is_empty() {
372 return Ok(());
373 }
374
375 assert_eq_or_internal_err!(states.len(), 1, "expects single state");
376
377 let list_arr = as_list_array(&states[0])?;
378
379 match Self::get_optional_values_to_merge_as_is(list_arr) {
380 Some(values) => {
381 if !values.is_empty() {
383 self.values.push_back(values);
384 }
385 }
386 None => {
387 for arr in list_arr.iter().flatten() {
388 self.values.push_back(arr);
389 }
390 }
391 }
392
393 Ok(())
394 }
395
396 fn state(&mut self) -> Result<Vec<ScalarValue>> {
397 Ok(vec![self.evaluate()?])
398 }
399
400 fn evaluate(&mut self) -> Result<ScalarValue> {
401 if self.values.is_empty() {
402 return Ok(ScalarValue::new_null_list(self.datatype.clone(), true, 1));
403 }
404
405 let element_arrays: Vec<ArrayRef> = self
406 .values
407 .iter()
408 .enumerate()
409 .map(|(i, a)| {
410 if i == 0 && self.front_offset > 0 {
411 a.slice(self.front_offset, a.len() - self.front_offset)
412 } else {
413 Arc::clone(a)
414 }
415 })
416 .collect();
417
418 let element_refs: Vec<&dyn Array> =
419 element_arrays.iter().map(|a| a.as_ref()).collect();
420
421 if element_refs.iter().all(|a| a.is_empty()) {
422 return Ok(ScalarValue::new_null_list(self.datatype.clone(), true, 1));
423 }
424
425 let concated_array = arrow::compute::concat(&element_refs)?;
426
427 Ok(SingleRowListArrayBuilder::new(concated_array).build_list_scalar())
428 }
429
430 fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
431 if values.is_empty() {
432 return Ok(());
433 }
434
435 assert_eq_or_internal_err!(values.len(), 1, "expects single batch");
436
437 let val = &values[0];
438 let mut to_retract = if self.ignore_nulls {
439 val.len() - val.logical_null_count()
440 } else {
441 val.len()
442 };
443
444 while to_retract > 0 {
445 let Some(front) = self.values.front() else {
446 break;
447 };
448 let available = front.len() - self.front_offset;
449 if to_retract >= available {
450 self.values.pop_front();
451 to_retract -= available;
452 self.front_offset = 0;
453 } else {
454 self.front_offset += to_retract;
455 to_retract = 0;
456 }
457 }
458
459 Ok(())
460 }
461
462 fn supports_retract_batch(&self) -> bool {
463 true
464 }
465
466 fn size(&self) -> usize {
467 size_of_val(self)
468 + (size_of::<ArrayRef>() * self.values.capacity())
469 + self
470 .values
471 .iter()
472 .map(|arr| arr.to_data().get_slice_memory_size().unwrap_or_default())
484 .sum::<usize>()
485 + self.datatype.size()
486 - size_of_val(&self.datatype)
487 }
488}
489
490#[derive(Debug)]
491struct ArrayAggGroupsAccumulator {
492 datatype: DataType,
493 ignore_nulls: bool,
494 batches: Vec<ArrayRef>,
497 batch_entries: Vec<Vec<(u32, u32)>>,
499 num_groups: usize,
501}
502
503impl ArrayAggGroupsAccumulator {
504 fn new(datatype: DataType, ignore_nulls: bool) -> Self {
505 Self {
506 datatype,
507 ignore_nulls,
508 batches: Vec::new(),
509 batch_entries: Vec::new(),
510 num_groups: 0,
511 }
512 }
513
514 fn clear_state(&mut self) {
515 self.batches = Vec::new();
518 self.batch_entries = Vec::new();
519 self.num_groups = 0;
520 }
521
522 fn compact_retained_state(&mut self, emit_groups: usize) -> Result<()> {
523 let emit_groups = emit_groups as u32;
532 let old_batches = take(&mut self.batches);
533 let old_batch_entries = take(&mut self.batch_entries);
534
535 let mut batches = Vec::new();
536 let mut batch_entries = Vec::new();
537
538 for (batch, entries) in old_batches.into_iter().zip(old_batch_entries) {
539 let retained_len = entries.iter().filter(|(g, _)| *g >= emit_groups).count();
540
541 if retained_len == 0 {
542 continue;
543 }
544
545 if retained_len == entries.len() {
546 let mut retained_entries = entries;
550 for (g, _) in &mut retained_entries {
551 *g -= emit_groups;
552 }
553 retained_entries.shrink_to_fit();
554 batches.push(batch);
555 batch_entries.push(retained_entries);
556 continue;
557 }
558
559 let mut retained_entries = Vec::with_capacity(retained_len);
560 let mut retained_rows = Vec::with_capacity(retained_len);
561
562 for (g, r) in entries {
563 if g >= emit_groups {
564 retained_entries.push((g - emit_groups, retained_rows.len() as u32));
569 retained_rows.push(r);
570 }
571 }
572
573 debug_assert_eq!(retained_entries.len(), retained_len);
574 debug_assert_eq!(retained_rows.len(), retained_len);
575
576 let batch = if retained_len == batch.len() {
577 batch
578 } else {
579 let retained_rows = UInt32Array::from(retained_rows);
582 arrow::compute::take(batch.as_ref(), &retained_rows, None)?
583 };
584
585 batches.push(batch);
586 batch_entries.push(retained_entries);
587 }
588
589 self.batches = batches;
590 self.batch_entries = batch_entries;
591 self.num_groups -= emit_groups as usize;
592
593 Ok(())
594 }
595}
596
597impl GroupsAccumulator for ArrayAggGroupsAccumulator {
598 fn update_batch(
601 &mut self,
602 values: &[ArrayRef],
603 group_indices: &[usize],
604 opt_filter: Option<&BooleanArray>,
605 total_num_groups: usize,
606 ) -> Result<()> {
607 assert_eq!(values.len(), 1, "single argument to update_batch");
608 let input = &values[0];
609
610 self.num_groups = self.num_groups.max(total_num_groups);
611
612 let nulls = if self.ignore_nulls {
613 input.logical_nulls()
614 } else {
615 None
616 };
617
618 let mut entries = Vec::new();
619
620 for (row_idx, &group_idx) in group_indices.iter().enumerate() {
621 if let Some(filter) = opt_filter
623 && (filter.is_null(row_idx) || !filter.value(row_idx))
624 {
625 continue;
626 }
627
628 if let Some(ref nulls) = nulls
630 && nulls.is_null(row_idx)
631 {
632 continue;
633 }
634
635 entries.push((group_idx as u32, row_idx as u32));
636 }
637
638 if !entries.is_empty() {
640 self.batches.push(Arc::clone(input));
641 self.batch_entries.push(entries);
642 }
643
644 Ok(())
645 }
646
647 fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
654 let emit_groups = match emit_to {
655 EmitTo::All => self.num_groups,
656 EmitTo::First(n) => n,
657 };
658
659 let mut counts = vec![0u32; emit_groups];
662 for entries in &self.batch_entries {
663 for &(g, _) in entries {
664 let g = g as usize;
665 if g < emit_groups {
666 counts[g] += 1;
667 }
668 }
669 }
670
671 let mut offsets = Vec::<i32>::with_capacity(emit_groups + 1);
674 offsets.push(0);
675 let mut nulls_builder = NullBufferBuilder::new(emit_groups);
676 let mut write_positions = Vec::with_capacity(emit_groups);
677 let mut cur_offset = 0u32;
678 for &count in &counts {
679 if count == 0 {
680 nulls_builder.append_null();
681 } else {
682 nulls_builder.append_non_null();
683 }
684 write_positions.push(cur_offset);
685 cur_offset += count;
686 offsets.push(cur_offset as i32);
687 }
688 let total_rows = cur_offset as usize;
689
690 let flat_values = if total_rows == 0 {
693 new_empty_array(&self.datatype)
694 } else {
695 let mut interleave_indices = vec![(0usize, 0usize); total_rows];
696 for (batch_idx, entries) in self.batch_entries.iter().enumerate() {
697 for &(g, r) in entries {
698 let g = g as usize;
699 if g < emit_groups {
700 let wp = write_positions[g] as usize;
701 interleave_indices[wp] = (batch_idx, r as usize);
702 write_positions[g] += 1;
703 }
704 }
705 }
706
707 let sources: Vec<&dyn Array> =
708 self.batches.iter().map(|b| b.as_ref()).collect();
709 arrow::compute::interleave(&sources, &interleave_indices)?
710 };
711
712 match emit_to {
714 EmitTo::All => self.clear_state(),
715 EmitTo::First(_) => self.compact_retained_state(emit_groups)?,
716 }
717
718 let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));
719 let field = Arc::new(Field::new_list_field(self.datatype.clone(), true));
720 let result = ListArray::new(field, offsets, flat_values, nulls_builder.finish());
721
722 Ok(Arc::new(result))
723 }
724
725 fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
726 Ok(vec![self.evaluate(emit_to)?])
727 }
728
729 fn merge_batch(
730 &mut self,
731 values: &[ArrayRef],
732 group_indices: &[usize],
733 _opt_filter: Option<&BooleanArray>,
734 total_num_groups: usize,
735 ) -> Result<()> {
736 assert_eq!(values.len(), 1, "one argument to merge_batch");
737 let input_list = values[0].as_list::<i32>();
738
739 self.num_groups = self.num_groups.max(total_num_groups);
740
741 let list_values = input_list.values();
743 let list_offsets = input_list.offsets();
744
745 let mut entries = Vec::new();
746
747 for (row_idx, &group_idx) in group_indices.iter().enumerate() {
748 if input_list.is_null(row_idx) {
749 continue;
750 }
751 let start = list_offsets[row_idx] as u32;
752 let end = list_offsets[row_idx + 1] as u32;
753 for pos in start..end {
754 entries.push((group_idx as u32, pos));
755 }
756 }
757
758 if !entries.is_empty() {
759 self.batches.push(Arc::clone(list_values));
760 self.batch_entries.push(entries);
761 }
762
763 Ok(())
764 }
765
766 fn convert_to_state(
767 &self,
768 values: &[ArrayRef],
769 opt_filter: Option<&BooleanArray>,
770 ) -> Result<Vec<ArrayRef>> {
771 assert_eq!(values.len(), 1, "one argument to convert_to_state");
772
773 let input = &values[0];
774
775 let offsets = OffsetBuffer::from_repeated_length(1, input.len());
777
778 let filter_nulls = opt_filter.map(filter_to_nulls);
780
781 let nulls = if self.ignore_nulls {
784 let logical = input.logical_nulls();
785 NullBuffer::union(filter_nulls.as_ref(), logical.as_ref())
786 } else {
787 filter_nulls
788 };
789
790 let field = Arc::new(Field::new_list_field(self.datatype.clone(), true));
791 let list_array = ListArray::new(field, offsets, Arc::clone(input), nulls);
792
793 Ok(vec![Arc::new(list_array)])
794 }
795
796 fn supports_convert_to_state(&self) -> bool {
797 true
798 }
799
800 fn size(&self) -> usize {
801 self.batches
802 .iter()
803 .map(|arr| arr.to_data().get_slice_memory_size().unwrap_or_default())
804 .sum::<usize>()
805 + self.batches.capacity() * size_of::<ArrayRef>()
806 + self
807 .batch_entries
808 .iter()
809 .map(|e| e.capacity() * size_of::<(u32, u32)>())
810 .sum::<usize>()
811 + self.batch_entries.capacity() * size_of::<Vec<(u32, u32)>>()
812 }
813}
814
815#[derive(Debug)]
816pub struct DistinctArrayAggAccumulator {
817 values: HashSet<ScalarValue>,
818 datatype: DataType,
819 sort_options: Option<SortOptions>,
820 ignore_nulls: bool,
821}
822
823impl DistinctArrayAggAccumulator {
824 pub fn try_new(
825 datatype: &DataType,
826 sort_options: Option<SortOptions>,
827 ignore_nulls: bool,
828 ) -> Result<Self> {
829 Ok(Self {
830 values: HashSet::new(),
831 datatype: datatype.clone(),
832 sort_options,
833 ignore_nulls,
834 })
835 }
836}
837
838impl Accumulator for DistinctArrayAggAccumulator {
839 fn state(&mut self) -> Result<Vec<ScalarValue>> {
840 Ok(vec![self.evaluate()?])
841 }
842
843 fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
844 if values.is_empty() {
845 return Ok(());
846 }
847
848 let val = &values[0];
849 let nulls = if self.ignore_nulls {
850 val.logical_nulls()
851 } else {
852 None
853 };
854
855 let nulls = nulls.as_ref();
856 if nulls.is_none_or(|nulls| nulls.null_count() < val.len()) {
857 for i in 0..val.len() {
858 if nulls.is_none_or(|nulls| nulls.is_valid(i)) {
859 self.values
860 .insert(ScalarValue::try_from_array(val, i)?.compacted());
861 }
862 }
863 }
864
865 Ok(())
866 }
867
868 fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
869 if states.is_empty() {
870 return Ok(());
871 }
872
873 assert_eq_or_internal_err!(states.len(), 1, "expects single state");
874
875 states[0]
876 .as_list::<i32>()
877 .iter()
878 .flatten()
879 .try_for_each(|val| self.update_batch(&[val]))
880 }
881
882 fn evaluate(&mut self) -> Result<ScalarValue> {
883 let mut values: Vec<ScalarValue> = self.values.iter().cloned().collect();
884 if values.is_empty() {
885 return Ok(ScalarValue::new_null_list(self.datatype.clone(), true, 1));
886 }
887
888 if let Some(opts) = self.sort_options {
889 let mut delayed_cmp_err = Ok(());
890 values.sort_by(|a, b| {
891 if a.is_null() {
892 return match opts.nulls_first {
893 true => Ordering::Less,
894 false => Ordering::Greater,
895 };
896 }
897 if b.is_null() {
898 return match opts.nulls_first {
899 true => Ordering::Greater,
900 false => Ordering::Less,
901 };
902 }
903 match opts.descending {
904 true => b.try_cmp(a),
905 false => a.try_cmp(b),
906 }
907 .unwrap_or_else(|err| {
908 delayed_cmp_err = Err(err);
909 Ordering::Equal
910 })
911 });
912 delayed_cmp_err?;
913 };
914
915 let arr = ScalarValue::new_list(&values, &self.datatype, true);
916 Ok(ScalarValue::List(arr))
917 }
918
919 fn size(&self) -> usize {
920 size_of_val(self) + ScalarValue::size_of_hashset(&self.values)
921 - size_of_val(&self.values)
922 + self.datatype.size()
923 - size_of_val(&self.datatype)
924 - size_of_val(&self.sort_options)
925 + size_of::<Option<SortOptions>>()
926 }
927}
928
929#[derive(Debug)]
933pub(crate) struct OrderSensitiveArrayAggAccumulator {
934 values: Vec<ScalarValue>,
936 ordering_values: Vec<Vec<ScalarValue>>,
941 datatypes: Vec<DataType>,
944 ordering_req: LexOrdering,
946 is_input_pre_ordered: bool,
948 reverse: bool,
950 ignore_nulls: bool,
952}
953
954impl OrderSensitiveArrayAggAccumulator {
955 pub fn try_new(
958 datatype: &DataType,
959 ordering_dtypes: &[DataType],
960 ordering_req: LexOrdering,
961 is_input_pre_ordered: bool,
962 reverse: bool,
963 ignore_nulls: bool,
964 ) -> Result<Self> {
965 let mut datatypes = vec![datatype.clone()];
966 datatypes.extend(ordering_dtypes.iter().cloned());
967 Ok(Self {
968 values: vec![],
969 ordering_values: vec![],
970 datatypes,
971 ordering_req,
972 is_input_pre_ordered,
973 reverse,
974 ignore_nulls,
975 })
976 }
977
978 fn sort(&mut self) {
979 let sort_options = self
980 .ordering_req
981 .iter()
982 .map(|sort_expr| sort_expr.options)
983 .collect::<Vec<_>>();
984 let mut values = take(&mut self.values)
985 .into_iter()
986 .zip(take(&mut self.ordering_values))
987 .collect::<Vec<_>>();
988 let mut delayed_cmp_err = Ok(());
989 values.sort_by(|(_, left_ordering), (_, right_ordering)| {
990 compare_rows(left_ordering, right_ordering, &sort_options).unwrap_or_else(
991 |err| {
992 delayed_cmp_err = Err(err);
993 Ordering::Equal
994 },
995 )
996 });
997 (self.values, self.ordering_values) = values.into_iter().unzip();
998 }
999
1000 fn evaluate_orderings(&self) -> Result<ScalarValue> {
1001 let fields = ordering_fields(&self.ordering_req, &self.datatypes[1..]);
1002
1003 let column_wise_ordering_values = if self.ordering_values.is_empty() {
1004 fields
1005 .iter()
1006 .map(|f| new_empty_array(f.data_type()))
1007 .collect::<Vec<_>>()
1008 } else {
1009 (0..fields.len())
1010 .map(|i| {
1011 let column_values = self.ordering_values.iter().map(|x| x[i].clone());
1012 ScalarValue::iter_to_array(column_values)
1013 })
1014 .collect::<Result<_>>()?
1015 };
1016
1017 let ordering_array = StructArray::try_new(
1018 Fields::from(fields),
1019 column_wise_ordering_values,
1020 None,
1021 )?;
1022 Ok(SingleRowListArrayBuilder::new(Arc::new(ordering_array)).build_list_scalar())
1023 }
1024}
1025
1026impl Accumulator for OrderSensitiveArrayAggAccumulator {
1027 fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
1028 if values.is_empty() {
1029 return Ok(());
1030 }
1031
1032 let val = &values[0];
1033 let ord = &values[1..];
1034 let nulls = if self.ignore_nulls {
1035 val.logical_nulls()
1036 } else {
1037 None
1038 };
1039
1040 let nulls = nulls.as_ref();
1041 if nulls.is_none_or(|nulls| nulls.null_count() < val.len()) {
1042 for i in 0..val.len() {
1043 if nulls.is_none_or(|nulls| nulls.is_valid(i)) {
1044 self.values
1045 .push(ScalarValue::try_from_array(val, i)?.compacted());
1046 self.ordering_values.push(
1047 get_row_at_idx(ord, i)?
1048 .into_iter()
1049 .map(|v| v.compacted())
1050 .collect(),
1051 )
1052 }
1053 }
1054 }
1055
1056 Ok(())
1057 }
1058
1059 fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
1060 if states.is_empty() {
1061 return Ok(());
1062 }
1063
1064 let [array_agg_values, agg_orderings] =
1071 take_function_args("OrderSensitiveArrayAggAccumulator::merge_batch", states)?;
1072 let Some(agg_orderings) = agg_orderings.as_list_opt::<i32>() else {
1073 return exec_err!("Expects to receive a list array");
1074 };
1075
1076 let mut partition_values = vec![];
1078 let mut partition_ordering_values = vec![];
1080
1081 if !self.is_input_pre_ordered {
1083 self.sort();
1084 }
1085 partition_values.push(take(&mut self.values).into());
1086 partition_ordering_values.push(take(&mut self.ordering_values).into());
1087
1088 let array_agg_res = ScalarValue::convert_array_to_scalar_vec(array_agg_values)?;
1090 for maybe_v in array_agg_res.into_iter() {
1091 if let Some(v) = maybe_v {
1092 partition_values.push(v.into());
1093 } else {
1094 partition_values.push(vec![].into());
1095 }
1096 }
1097
1098 let orderings = ScalarValue::convert_array_to_scalar_vec(agg_orderings)?;
1099 for partition_ordering_rows in orderings.into_iter().flatten() {
1100 let ordering_value = partition_ordering_rows.into_iter().map(|ordering_row| {
1102 if let ScalarValue::Struct(s) = ordering_row {
1103 let mut ordering_columns_per_row = vec![];
1104
1105 for column in s.columns() {
1106 let sv = ScalarValue::try_from_array(column, 0)?;
1107 ordering_columns_per_row.push(sv);
1108 }
1109
1110 Ok(ordering_columns_per_row)
1111 } else {
1112 exec_err!(
1113 "Expects to receive ScalarValue::Struct(Arc<StructArray>) but got:{:?}",
1114 ordering_row.data_type()
1115 )
1116 }
1117 }).collect::<Result<VecDeque<_>>>()?;
1118
1119 partition_ordering_values.push(ordering_value);
1120 }
1121
1122 let sort_options = self
1123 .ordering_req
1124 .iter()
1125 .map(|sort_expr| sort_expr.options)
1126 .collect::<Vec<_>>();
1127
1128 (self.values, self.ordering_values) = merge_ordered_arrays(
1129 &mut partition_values,
1130 &mut partition_ordering_values,
1131 &sort_options,
1132 )?;
1133
1134 Ok(())
1135 }
1136
1137 fn state(&mut self) -> Result<Vec<ScalarValue>> {
1138 if !self.is_input_pre_ordered {
1139 self.sort();
1140 }
1141
1142 let mut result = vec![self.evaluate()?];
1143 result.push(self.evaluate_orderings()?);
1144
1145 Ok(result)
1146 }
1147
1148 fn evaluate(&mut self) -> Result<ScalarValue> {
1149 if !self.is_input_pre_ordered {
1150 self.sort();
1151 }
1152
1153 if self.values.is_empty() {
1154 return Ok(ScalarValue::new_null_list(
1155 self.datatypes[0].clone(),
1156 true,
1157 1,
1158 ));
1159 }
1160
1161 let values = self.values.clone();
1162 let array = if self.reverse {
1163 ScalarValue::new_list_from_iter(
1164 values.into_iter().rev(),
1165 &self.datatypes[0],
1166 true,
1167 )
1168 } else {
1169 ScalarValue::new_list_from_iter(values.into_iter(), &self.datatypes[0], true)
1170 };
1171 Ok(ScalarValue::List(array))
1172 }
1173
1174 fn size(&self) -> usize {
1175 let mut total = size_of_val(self) + ScalarValue::size_of_vec(&self.values)
1176 - size_of_val(&self.values);
1177
1178 total += size_of::<Vec<ScalarValue>>() * self.ordering_values.capacity();
1180 for row in &self.ordering_values {
1181 total += ScalarValue::size_of_vec(row) - size_of_val(row);
1182 }
1183
1184 total += size_of::<DataType>() * self.datatypes.capacity();
1186 for dtype in &self.datatypes {
1187 total += dtype.size() - size_of_val(dtype);
1188 }
1189
1190 total += size_of::<PhysicalSortExpr>() * self.ordering_req.capacity();
1192 total
1194 }
1195}
1196
1197#[cfg(test)]
1198mod tests {
1199 use super::*;
1200 use arrow::array::{ListBuilder, StringBuilder};
1201 use arrow::datatypes::Schema;
1202 use datafusion_common::cast::as_generic_string_array;
1203 use datafusion_common::internal_err;
1204 use datafusion_physical_expr::PhysicalExpr;
1205 use datafusion_physical_expr::expressions::Column;
1206
1207 #[test]
1208 fn no_duplicates_no_distinct() -> Result<()> {
1209 let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string().build_two()?;
1210
1211 acc1.update_batch(&[data(["a", "b", "c"])])?;
1212 acc2.update_batch(&[data(["d", "e", "f"])])?;
1213 acc1 = merge(acc1, acc2)?;
1214
1215 let result = print_nulls(str_arr(acc1.evaluate()?)?);
1216
1217 assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]);
1218
1219 Ok(())
1220 }
1221
1222 #[test]
1223 fn no_duplicates_distinct() -> Result<()> {
1224 let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
1225 .distinct()
1226 .build_two()?;
1227
1228 acc1.update_batch(&[data(["a", "b", "c"])])?;
1229 acc2.update_batch(&[data(["d", "e", "f"])])?;
1230 acc1 = merge(acc1, acc2)?;
1231
1232 let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
1233 result.sort();
1234
1235 assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]);
1236
1237 Ok(())
1238 }
1239
1240 #[test]
1241 fn duplicates_no_distinct() -> Result<()> {
1242 let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string().build_two()?;
1243
1244 acc1.update_batch(&[data(["a", "b", "c"])])?;
1245 acc2.update_batch(&[data(["a", "b", "c"])])?;
1246 acc1 = merge(acc1, acc2)?;
1247
1248 let result = print_nulls(str_arr(acc1.evaluate()?)?);
1249
1250 assert_eq!(result, vec!["a", "b", "c", "a", "b", "c"]);
1251
1252 Ok(())
1253 }
1254
1255 #[test]
1256 fn duplicates_distinct() -> Result<()> {
1257 let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
1258 .distinct()
1259 .build_two()?;
1260
1261 acc1.update_batch(&[data(["a", "b", "c"])])?;
1262 acc2.update_batch(&[data(["a", "b", "c"])])?;
1263 acc1 = merge(acc1, acc2)?;
1264
1265 let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
1266 result.sort();
1267
1268 assert_eq!(result, vec!["a", "b", "c"]);
1269
1270 Ok(())
1271 }
1272
1273 #[test]
1274 fn duplicates_on_second_batch_distinct() -> Result<()> {
1275 let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
1276 .distinct()
1277 .build_two()?;
1278
1279 acc1.update_batch(&[data(["a", "c"])])?;
1280 acc2.update_batch(&[data(["d", "a", "b", "c"])])?;
1281 acc1 = merge(acc1, acc2)?;
1282
1283 let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
1284 result.sort();
1285
1286 assert_eq!(result, vec!["a", "b", "c", "d"]);
1287
1288 Ok(())
1289 }
1290
1291 #[test]
1292 fn no_duplicates_distinct_sort_asc() -> Result<()> {
1293 let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
1294 .distinct()
1295 .order_by_col("col", SortOptions::new(false, false))
1296 .build_two()?;
1297
1298 acc1.update_batch(&[data(["e", "b", "d"])])?;
1299 acc2.update_batch(&[data(["f", "a", "c"])])?;
1300 acc1 = merge(acc1, acc2)?;
1301
1302 let result = print_nulls(str_arr(acc1.evaluate()?)?);
1303
1304 assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]);
1305
1306 Ok(())
1307 }
1308
1309 #[test]
1310 fn no_duplicates_distinct_sort_desc() -> Result<()> {
1311 let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
1312 .distinct()
1313 .order_by_col("col", SortOptions::new(true, false))
1314 .build_two()?;
1315
1316 acc1.update_batch(&[data(["e", "b", "d"])])?;
1317 acc2.update_batch(&[data(["f", "a", "c"])])?;
1318 acc1 = merge(acc1, acc2)?;
1319
1320 let result = print_nulls(str_arr(acc1.evaluate()?)?);
1321
1322 assert_eq!(result, vec!["f", "e", "d", "c", "b", "a"]);
1323
1324 Ok(())
1325 }
1326
1327 #[test]
1328 fn duplicates_distinct_sort_asc() -> Result<()> {
1329 let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
1330 .distinct()
1331 .order_by_col("col", SortOptions::new(false, false))
1332 .build_two()?;
1333
1334 acc1.update_batch(&[data(["a", "c", "b"])])?;
1335 acc2.update_batch(&[data(["b", "c", "a"])])?;
1336 acc1 = merge(acc1, acc2)?;
1337
1338 let result = print_nulls(str_arr(acc1.evaluate()?)?);
1339
1340 assert_eq!(result, vec!["a", "b", "c"]);
1341
1342 Ok(())
1343 }
1344
1345 #[test]
1346 fn duplicates_distinct_sort_desc() -> Result<()> {
1347 let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
1348 .distinct()
1349 .order_by_col("col", SortOptions::new(true, false))
1350 .build_two()?;
1351
1352 acc1.update_batch(&[data(["a", "c", "b"])])?;
1353 acc2.update_batch(&[data(["b", "c", "a"])])?;
1354 acc1 = merge(acc1, acc2)?;
1355
1356 let result = print_nulls(str_arr(acc1.evaluate()?)?);
1357
1358 assert_eq!(result, vec!["c", "b", "a"]);
1359
1360 Ok(())
1361 }
1362
1363 #[test]
1364 fn no_duplicates_distinct_sort_asc_nulls_first() -> Result<()> {
1365 let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
1366 .distinct()
1367 .order_by_col("col", SortOptions::new(false, true))
1368 .build_two()?;
1369
1370 acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
1371 acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
1372 acc1 = merge(acc1, acc2)?;
1373
1374 let result = print_nulls(str_arr(acc1.evaluate()?)?);
1375
1376 assert_eq!(result, vec!["NULL", "a", "b", "e", "f"]);
1377
1378 Ok(())
1379 }
1380
1381 #[test]
1382 fn no_duplicates_distinct_sort_asc_nulls_last() -> Result<()> {
1383 let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
1384 .distinct()
1385 .order_by_col("col", SortOptions::new(false, false))
1386 .build_two()?;
1387
1388 acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
1389 acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
1390 acc1 = merge(acc1, acc2)?;
1391
1392 let result = print_nulls(str_arr(acc1.evaluate()?)?);
1393
1394 assert_eq!(result, vec!["a", "b", "e", "f", "NULL"]);
1395
1396 Ok(())
1397 }
1398
1399 #[test]
1400 fn no_duplicates_distinct_sort_desc_nulls_first() -> Result<()> {
1401 let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
1402 .distinct()
1403 .order_by_col("col", SortOptions::new(true, true))
1404 .build_two()?;
1405
1406 acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
1407 acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
1408 acc1 = merge(acc1, acc2)?;
1409
1410 let result = print_nulls(str_arr(acc1.evaluate()?)?);
1411
1412 assert_eq!(result, vec!["NULL", "f", "e", "b", "a"]);
1413
1414 Ok(())
1415 }
1416
1417 #[test]
1418 fn no_duplicates_distinct_sort_desc_nulls_last() -> Result<()> {
1419 let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
1420 .distinct()
1421 .order_by_col("col", SortOptions::new(true, false))
1422 .build_two()?;
1423
1424 acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
1425 acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
1426 acc1 = merge(acc1, acc2)?;
1427
1428 let result = print_nulls(str_arr(acc1.evaluate()?)?);
1429
1430 assert_eq!(result, vec!["f", "e", "b", "a", "NULL"]);
1431
1432 Ok(())
1433 }
1434
1435 #[test]
1436 fn all_nulls_on_first_batch_with_distinct() -> Result<()> {
1437 let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
1438 .distinct()
1439 .build_two()?;
1440
1441 acc1.update_batch(&[data::<Option<&str>, 3>([None, None, None])])?;
1442 acc2.update_batch(&[data([Some("a"), None, None, None])])?;
1443 acc1 = merge(acc1, acc2)?;
1444
1445 let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
1446 result.sort();
1447 assert_eq!(result, vec!["NULL", "a"]);
1448 Ok(())
1449 }
1450
1451 #[test]
1452 fn all_nulls_on_both_batches_with_distinct() -> Result<()> {
1453 let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
1454 .distinct()
1455 .build_two()?;
1456
1457 acc1.update_batch(&[data::<Option<&str>, 3>([None, None, None])])?;
1458 acc2.update_batch(&[data::<Option<&str>, 4>([None, None, None, None])])?;
1459 acc1 = merge(acc1, acc2)?;
1460
1461 let result = print_nulls(str_arr(acc1.evaluate()?)?);
1462 assert_eq!(result, vec!["NULL"]);
1463 Ok(())
1464 }
1465
1466 #[test]
1467 fn does_not_over_account_memory() -> Result<()> {
1468 let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string().build_two()?;
1469
1470 acc1.update_batch(&[data(["a", "c", "b"])])?;
1471 acc2.update_batch(&[data(["b", "c", "a"])])?;
1472 acc1 = merge(acc1, acc2)?;
1473
1474 assert_eq!(acc1.size(), 282);
1475
1476 Ok(())
1477 }
1478 #[test]
1479 fn does_not_over_account_memory_distinct() -> Result<()> {
1480 let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
1481 .distinct()
1482 .build_two()?;
1483
1484 acc1.update_batch(&[string_list_data([
1485 vec!["a", "b", "c"],
1486 vec!["d", "e", "f"],
1487 ])])?;
1488 acc2.update_batch(&[string_list_data([vec!["e", "f", "g"]])])?;
1489 acc1 = merge(acc1, acc2)?;
1490
1491 assert_eq!(acc1.size(), 1660);
1493
1494 Ok(())
1495 }
1496
1497 #[test]
1498 fn does_not_over_account_memory_ordered() -> Result<()> {
1499 let mut acc = ArrayAggAccumulatorBuilder::string()
1500 .order_by_col("col", SortOptions::new(false, false))
1501 .build()?;
1502
1503 acc.update_batch(&[string_list_data([
1504 vec!["a", "b", "c"],
1505 vec!["c", "d", "e"],
1506 vec!["b", "c", "d"],
1507 ])])?;
1508
1509 assert_eq!(acc.size(), 2224);
1511
1512 Ok(())
1513 }
1514
1515 struct ArrayAggAccumulatorBuilder {
1516 return_field: FieldRef,
1517 distinct: bool,
1518 order_bys: Vec<PhysicalSortExpr>,
1519 schema: Schema,
1520 }
1521
1522 impl ArrayAggAccumulatorBuilder {
1523 fn string() -> Self {
1524 Self::new(DataType::Utf8)
1525 }
1526
1527 fn new(data_type: DataType) -> Self {
1528 Self {
1529 return_field: Field::new("f", data_type.clone(), true).into(),
1530 distinct: false,
1531 order_bys: vec![],
1532 schema: Schema {
1533 fields: Fields::from(vec![Field::new(
1534 "col",
1535 DataType::new_list(data_type, true),
1536 true,
1537 )]),
1538 metadata: Default::default(),
1539 },
1540 }
1541 }
1542
1543 fn distinct(mut self) -> Self {
1544 self.distinct = true;
1545 self
1546 }
1547
1548 fn order_by_col(mut self, col: &str, sort_options: SortOptions) -> Self {
1549 let new_order = PhysicalSortExpr::new(
1550 Arc::new(
1551 Column::new_with_schema(col, &self.schema)
1552 .expect("column not available in schema"),
1553 ),
1554 sort_options,
1555 );
1556 self.order_bys.push(new_order);
1557 self
1558 }
1559
1560 fn build(&self) -> Result<Box<dyn Accumulator>> {
1561 let expr = Arc::new(Column::new("col", 0));
1562 let expr_field = expr.return_field(&self.schema)?;
1563 ArrayAgg::default().accumulator(AccumulatorArgs {
1564 return_field: Arc::clone(&self.return_field),
1565 schema: &self.schema,
1566 expr_fields: &[expr_field],
1567 ignore_nulls: false,
1568 order_bys: &self.order_bys,
1569 is_reversed: false,
1570 name: "",
1571 is_distinct: self.distinct,
1572 exprs: &[expr],
1573 })
1574 }
1575
1576 fn build_two(&self) -> Result<(Box<dyn Accumulator>, Box<dyn Accumulator>)> {
1577 Ok((self.build()?, self.build()?))
1578 }
1579 }
1580
1581 fn str_arr(value: ScalarValue) -> Result<Vec<Option<String>>> {
1582 let ScalarValue::List(list) = value else {
1583 return internal_err!("ScalarValue was not a List");
1584 };
1585 Ok(as_generic_string_array::<i32>(list.values())?
1586 .iter()
1587 .map(|v| v.map(|v| v.to_string()))
1588 .collect())
1589 }
1590
1591 fn print_nulls(sort: Vec<Option<String>>) -> Vec<String> {
1592 sort.into_iter()
1593 .map(|v| v.unwrap_or_else(|| "NULL".to_string()))
1594 .collect()
1595 }
1596
1597 fn string_list_data<'a>(data: impl IntoIterator<Item = Vec<&'a str>>) -> ArrayRef {
1598 let mut builder = ListBuilder::new(StringBuilder::new());
1599 for string_list in data.into_iter() {
1600 builder.append_value(string_list.iter().map(Some).collect::<Vec<_>>());
1601 }
1602
1603 Arc::new(builder.finish())
1604 }
1605
1606 fn data<T, const N: usize>(list: [T; N]) -> ArrayRef
1607 where
1608 ScalarValue: From<T>,
1609 {
1610 let values: Vec<_> = list.into_iter().map(ScalarValue::from).collect();
1611 ScalarValue::iter_to_array(values).expect("Cannot convert to array")
1612 }
1613
1614 fn merge(
1615 mut acc1: Box<dyn Accumulator>,
1616 mut acc2: Box<dyn Accumulator>,
1617 ) -> Result<Box<dyn Accumulator>> {
1618 let intermediate_state = acc2.state().and_then(|e| {
1619 e.iter()
1620 .map(|v| v.to_array())
1621 .collect::<Result<Vec<ArrayRef>>>()
1622 })?;
1623 acc1.merge_batch(&intermediate_state)?;
1624 Ok(acc1)
1625 }
1626
1627 use arrow::array::Int32Array;
1630
1631 fn list_array_to_i32_vecs(list: &ListArray) -> Vec<Option<Vec<Option<i32>>>> {
1632 (0..list.len())
1633 .map(|i| {
1634 if list.is_null(i) {
1635 None
1636 } else {
1637 let arr = list.value(i);
1638 let vals: Vec<Option<i32>> = arr
1639 .as_any()
1640 .downcast_ref::<Int32Array>()
1641 .unwrap()
1642 .iter()
1643 .collect();
1644 Some(vals)
1645 }
1646 })
1647 .collect()
1648 }
1649
1650 fn eval_i32_lists(
1651 acc: &mut ArrayAggGroupsAccumulator,
1652 emit_to: EmitTo,
1653 ) -> Result<Vec<Option<Vec<Option<i32>>>>> {
1654 let result = acc.evaluate(emit_to)?;
1655 Ok(list_array_to_i32_vecs(result.as_list::<i32>()))
1656 }
1657
1658 #[test]
1659 fn groups_accumulator_multiple_batches() -> Result<()> {
1660 let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
1661
1662 let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
1664 acc.update_batch(&[values], &[0, 1, 0], None, 2)?;
1665
1666 let values: ArrayRef = Arc::new(Int32Array::from(vec![4, 5]));
1668 acc.update_batch(&[values], &[1, 0], None, 2)?;
1669
1670 let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
1671 assert_eq!(vals[0], Some(vec![Some(1), Some(3), Some(5)]));
1672 assert_eq!(vals[1], Some(vec![Some(2), Some(4)]));
1673
1674 Ok(())
1675 }
1676
1677 #[test]
1678 fn groups_accumulator_emit_first() -> Result<()> {
1679 let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
1680
1681 let values: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30]));
1682 acc.update_batch(&[values], &[0, 1, 2], None, 3)?;
1683
1684 let vals = eval_i32_lists(&mut acc, EmitTo::First(2))?;
1686 assert_eq!(vals.len(), 2);
1687 assert_eq!(vals[0], Some(vec![Some(10)]));
1688 assert_eq!(vals[1], Some(vec![Some(20)]));
1689
1690 let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
1692 assert_eq!(vals.len(), 1);
1693 assert_eq!(vals[0], Some(vec![Some(30)]));
1694
1695 Ok(())
1696 }
1697
1698 #[test]
1699 fn groups_accumulator_emit_first_frees_batches() -> Result<()> {
1700 let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
1705
1706 let batch0: ArrayRef = Arc::new(Int32Array::from(vec![10, 20]));
1707 acc.update_batch(&[batch0], &[0, 0], None, 2)?;
1708
1709 let batch1: ArrayRef = Arc::new(Int32Array::from(vec![30, 40]));
1710 acc.update_batch(&[batch1], &[0, 1], None, 2)?;
1711
1712 assert_eq!(acc.batches.len(), 2);
1713 assert!(!acc.batches[0].is_empty());
1714 assert!(!acc.batches[1].is_empty());
1715
1716 let vals = eval_i32_lists(&mut acc, EmitTo::First(1))?;
1720 assert_eq!(vals[0], Some(vec![Some(10), Some(20), Some(30)]));
1721
1722 assert_eq!(acc.batches.len(), 1);
1723 let retained = acc.batches[0]
1724 .as_any()
1725 .downcast_ref::<Int32Array>()
1726 .unwrap();
1727 assert_eq!(retained.values(), &[40]);
1728 assert_eq!(acc.batch_entries, vec![vec![(0, 0)]]);
1729
1730 let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
1732 assert_eq!(vals[0], Some(vec![Some(40)]));
1733
1734 assert!(acc.batches.is_empty());
1735 assert_eq!(acc.size(), 0);
1736
1737 Ok(())
1738 }
1739
1740 #[test]
1741 fn groups_accumulator_emit_first_compacts_mixed_batches() -> Result<()> {
1742 let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
1743
1744 let batch: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30, 40]));
1745 acc.update_batch(&[batch], &[0, 1, 0, 1], None, 2)?;
1746
1747 let size_before = acc.size();
1748 let vals = eval_i32_lists(&mut acc, EmitTo::First(1))?;
1749 assert_eq!(vals[0], Some(vec![Some(10), Some(30)]));
1750
1751 assert_eq!(acc.num_groups, 1);
1752 assert_eq!(acc.batches.len(), 1);
1753 let retained = acc.batches[0]
1754 .as_any()
1755 .downcast_ref::<Int32Array>()
1756 .unwrap();
1757 assert_eq!(retained.values(), &[20, 40]);
1758 assert_eq!(acc.batch_entries, vec![vec![(0, 0), (0, 1)]]);
1759 assert!(acc.size() < size_before);
1760
1761 let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
1762 assert_eq!(vals[0], Some(vec![Some(20), Some(40)]));
1763 assert_eq!(acc.size(), 0);
1764
1765 Ok(())
1766 }
1767
1768 #[test]
1769 fn groups_accumulator_emit_all_releases_capacity() -> Result<()> {
1770 let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
1771
1772 let batch: ArrayRef = Arc::new(Int32Array::from_iter_values(0..64));
1773 acc.update_batch(
1774 &[batch],
1775 &(0..64).map(|i| i % 4).collect::<Vec<_>>(),
1776 None,
1777 4,
1778 )?;
1779
1780 assert!(acc.size() > 0);
1781 let _ = eval_i32_lists(&mut acc, EmitTo::All)?;
1782
1783 assert_eq!(acc.size(), 0);
1784 assert_eq!(acc.batches.capacity(), 0);
1785 assert_eq!(acc.batch_entries.capacity(), 0);
1786
1787 Ok(())
1788 }
1789
1790 #[test]
1791 fn groups_accumulator_null_groups() -> Result<()> {
1792 let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
1794
1795 let values: ArrayRef = Arc::new(Int32Array::from(vec![1]));
1796 acc.update_batch(&[values], &[0], None, 3)?;
1798
1799 let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
1800 assert_eq!(vals, vec![Some(vec![Some(1)]), None, None]);
1801
1802 Ok(())
1803 }
1804
1805 #[test]
1806 fn groups_accumulator_ignore_nulls() -> Result<()> {
1807 let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, true);
1808
1809 let values: ArrayRef =
1810 Arc::new(Int32Array::from(vec![Some(1), None, Some(3), None]));
1811 acc.update_batch(&[values], &[0, 0, 1, 1], None, 2)?;
1812
1813 let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
1814 assert_eq!(vals[0], Some(vec![Some(1)]));
1816 assert_eq!(vals[1], Some(vec![Some(3)]));
1818
1819 Ok(())
1820 }
1821
1822 #[test]
1823 fn groups_accumulator_opt_filter() -> Result<()> {
1824 let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
1825
1826 let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
1827 let filter = BooleanArray::from(vec![Some(true), None, Some(true), Some(false)]);
1830 acc.update_batch(&[values], &[0, 0, 1, 1], Some(&filter), 2)?;
1831
1832 let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
1833 assert_eq!(vals[0], Some(vec![Some(1)])); assert_eq!(vals[1], Some(vec![Some(3)])); Ok(())
1837 }
1838
1839 #[test]
1840 fn groups_accumulator_state_merge_roundtrip() -> Result<()> {
1841 let mut acc1 = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
1844 let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
1845 acc1.update_batch(&[values], &[0, 1], None, 2)?;
1846
1847 let mut acc2 = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
1849 let values: ArrayRef = Arc::new(Int32Array::from(vec![3, 4]));
1850 acc2.update_batch(&[values], &[0, 1], None, 2)?;
1851
1852 let state = acc2.state(EmitTo::All)?;
1854 acc1.merge_batch(&state, &[0, 1], None, 2)?;
1855
1856 let values: ArrayRef = Arc::new(Int32Array::from(vec![5, 6]));
1858 acc1.update_batch(&[values], &[0, 1], None, 2)?;
1859
1860 let vals = eval_i32_lists(&mut acc1, EmitTo::All)?;
1864 assert_eq!(vals[0], Some(vec![Some(1), Some(3), Some(5)]));
1865 assert_eq!(vals[1], Some(vec![Some(2), Some(4), Some(6)]));
1866
1867 Ok(())
1868 }
1869
1870 #[test]
1871 fn groups_accumulator_convert_to_state() -> Result<()> {
1872 let acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
1873
1874 let values: ArrayRef = Arc::new(Int32Array::from(vec![Some(10), None, Some(30)]));
1875 let state = acc.convert_to_state(&[values], None)?;
1876
1877 assert_eq!(state.len(), 1);
1878 let vals = list_array_to_i32_vecs(state[0].as_list::<i32>());
1879 assert_eq!(
1880 vals,
1881 vec![
1882 Some(vec![Some(10)]),
1883 Some(vec![None]), Some(vec![Some(30)]),
1885 ]
1886 );
1887
1888 Ok(())
1889 }
1890
1891 #[test]
1892 fn groups_accumulator_convert_to_state_with_filter() -> Result<()> {
1893 let acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
1894
1895 let values: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30]));
1896 let filter = BooleanArray::from(vec![true, false, true]);
1897 let state = acc.convert_to_state(&[values], Some(&filter))?;
1898
1899 let vals = list_array_to_i32_vecs(state[0].as_list::<i32>());
1900 assert_eq!(
1901 vals,
1902 vec![
1903 Some(vec![Some(10)]),
1904 None, Some(vec![Some(30)]),
1906 ]
1907 );
1908
1909 Ok(())
1910 }
1911
1912 #[test]
1913 fn groups_accumulator_convert_to_state_merge_preserves_nulls() -> Result<()> {
1914 let acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
1917
1918 let values: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)]));
1919 let state = acc.convert_to_state(&[values], None)?;
1920
1921 let mut acc2 = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
1923 acc2.merge_batch(&state, &[0, 0, 1], None, 2)?;
1924
1925 let vals = eval_i32_lists(&mut acc2, EmitTo::All)?;
1927 assert_eq!(vals[0], Some(vec![Some(1), None]));
1928 assert_eq!(vals[1], Some(vec![Some(3)]));
1930
1931 Ok(())
1932 }
1933
1934 #[test]
1935 fn groups_accumulator_convert_to_state_merge_ignore_nulls() -> Result<()> {
1936 let acc = ArrayAggGroupsAccumulator::new(DataType::Int32, true);
1939
1940 let values: ArrayRef =
1941 Arc::new(Int32Array::from(vec![Some(1), None, Some(3), None]));
1942 let state = acc.convert_to_state(&[values], None)?;
1943
1944 let list = state[0].as_list::<i32>();
1945 assert!(!list.is_null(0));
1947 assert!(list.is_null(1));
1948 assert!(!list.is_null(2));
1949 assert!(list.is_null(3));
1950
1951 let mut acc2 = ArrayAggGroupsAccumulator::new(DataType::Int32, true);
1953 acc2.merge_batch(&state, &[0, 0, 1, 1], None, 2)?;
1954
1955 let vals = eval_i32_lists(&mut acc2, EmitTo::All)?;
1957 assert_eq!(vals[0], Some(vec![Some(1)]));
1958 assert_eq!(vals[1], Some(vec![Some(3)]));
1960
1961 Ok(())
1962 }
1963
1964 #[test]
1965 fn groups_accumulator_all_groups_empty() -> Result<()> {
1966 let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
1967
1968 let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
1970 let filter = BooleanArray::from(vec![false, false]);
1971 acc.update_batch(&[values], &[0, 1], Some(&filter), 2)?;
1972
1973 let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
1974 assert_eq!(vals, vec![None, None]);
1975
1976 Ok(())
1977 }
1978
1979 #[test]
1980 fn groups_accumulator_ignore_nulls_all_null_group() -> Result<()> {
1981 let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, true);
1984
1985 let values: ArrayRef = Arc::new(Int32Array::from(vec![None, Some(1), None]));
1986 acc.update_batch(&[values], &[0, 1, 0], None, 2)?;
1987
1988 let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
1989 assert_eq!(vals[0], None); assert_eq!(vals[1], Some(vec![Some(1)])); Ok(())
1993 }
1994
1995 #[test]
1998 fn retract_basic_sliding_window() -> Result<()> {
1999 let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
2000
2001 acc.update_batch(&[data(["A"])])?;
2004 assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A"]);
2005
2006 acc.update_batch(&[data(["B"])])?;
2008 assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "B"]);
2009
2010 acc.update_batch(&[data(["C"])])?;
2012 acc.retract_batch(&[data(["A"])])?;
2013 assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["B", "C"]);
2014
2015 acc.update_batch(&[data(["D"])])?;
2017 acc.retract_batch(&[data(["B"])])?;
2018 assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["C", "D"]);
2019
2020 Ok(())
2021 }
2022
2023 #[test]
2024 fn retract_multi_element_across_arrays() -> Result<()> {
2025 let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
2026
2027 acc.update_batch(&[data(["A", "B", "C"])])?;
2029 acc.update_batch(&[data(["D"])])?;
2031
2032 assert_eq!(
2033 print_nulls(str_arr(acc.evaluate()?)?),
2034 vec!["A", "B", "C", "D"]
2035 );
2036
2037 acc.retract_batch(&[data(["A"])])?;
2039 assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["B", "C", "D"]);
2040
2041 acc.retract_batch(&[data(["B", "C", "D"])])?;
2043 let result = acc.evaluate()?;
2044 assert!(
2045 matches!(&result, ScalarValue::List(arr) if arr.is_null(0)),
2046 "expected null list after full retract, got {result:?}"
2047 );
2048
2049 Ok(())
2050 }
2051
2052 #[test]
2053 fn retract_with_nulls_preserved() -> Result<()> {
2054 let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
2056
2057 acc.update_batch(&[data([Some("A"), None, Some("C")])])?;
2058 assert_eq!(
2059 print_nulls(str_arr(acc.evaluate()?)?),
2060 vec!["A", "NULL", "C"]
2061 );
2062
2063 acc.retract_batch(&[data([Some("A"), None])])?;
2065 assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["C"]);
2066
2067 Ok(())
2068 }
2069
2070 #[test]
2071 fn retract_with_ignore_nulls() -> Result<()> {
2072 let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, true)?;
2075
2076 acc.update_batch(&[data([Some("A"), None, Some("C")])])?;
2078 assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "C"]);
2079
2080 acc.retract_batch(&[data([Some("A"), None])])?;
2083 assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["C"]);
2084
2085 acc.retract_batch(&[data([None, Some("C")])])?;
2087 let result = acc.evaluate()?;
2088 assert!(
2089 matches!(&result, ScalarValue::List(arr) if arr.is_null(0)),
2090 "expected null list after full retract, got {result:?}"
2091 );
2092
2093 Ok(())
2094 }
2095
2096 #[test]
2097 fn retract_ignore_nulls_all_nulls_batch() -> Result<()> {
2098 let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, true)?;
2100
2101 acc.update_batch(&[data([Some("A"), Some("B")])])?;
2102 assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "B"]);
2103
2104 acc.retract_batch(&[data::<Option<&str>, 3>([None, None, None])])?;
2106 assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "B"]);
2107
2108 Ok(())
2109 }
2110
2111 #[test]
2112 fn retract_empty_accumulator() -> Result<()> {
2113 let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
2114
2115 acc.retract_batch(&[data(["A"])])?;
2117 let result = acc.evaluate()?;
2118 assert!(
2119 matches!(&result, ScalarValue::List(arr) if arr.is_null(0)),
2120 "expected null list for empty accumulator, got {result:?}"
2121 );
2122
2123 Ok(())
2124 }
2125
2126 #[test]
2127 fn retract_front_offset_partial_consume() -> Result<()> {
2128 let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
2137
2138 acc.update_batch(&[data(["A", "B", "C"])])?;
2140 assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "B", "C"]);
2141
2142 acc.update_batch(&[data(["D"])])?;
2144 assert_eq!(
2145 print_nulls(str_arr(acc.evaluate()?)?),
2146 vec!["A", "B", "C", "D"]
2147 );
2148
2149 acc.retract_batch(&[data(["A"])])?;
2151 assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["B", "C", "D"]);
2152
2153 acc.update_batch(&[data(["E"])])?;
2156 acc.retract_batch(&[data(["B", "C", "D"])])?;
2157 assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["E"]);
2158
2159 Ok(())
2160 }
2161
2162 #[test]
2163 fn retract_update_after_full_drain() -> Result<()> {
2164 let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
2166
2167 acc.update_batch(&[data(["A", "B"])])?;
2168 acc.retract_batch(&[data(["A", "B"])])?;
2169
2170 let result = acc.evaluate()?;
2172 assert!(
2173 matches!(&result, ScalarValue::List(arr) if arr.is_null(0)),
2174 "expected null list, got {result:?}"
2175 );
2176
2177 acc.update_batch(&[data(["X", "Y"])])?;
2179 assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["X", "Y"]);
2180
2181 acc.retract_batch(&[data(["X"])])?;
2182 assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["Y"]);
2183
2184 Ok(())
2185 }
2186
2187 #[test]
2188 fn retract_supports_retract_batch() -> Result<()> {
2189 let acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
2190 assert!(acc.supports_retract_batch());
2191
2192 let acc_ignore = ArrayAggAccumulator::try_new(&DataType::Utf8, true)?;
2193 assert!(acc_ignore.supports_retract_batch());
2194
2195 Ok(())
2196 }
2197
2198 #[test]
2199 fn retract_ignore_nulls_logical_vs_physical() -> Result<()> {
2200 use arrow::array::{DictionaryArray, Int32Array, StringArray};
2205
2206 let dict_type =
2207 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
2208 let mut acc = ArrayAggAccumulator::try_new(&dict_type, true)?;
2209
2210 let values = StringArray::from(vec![Some("hello"), None, Some("world")]);
2213 let keys = Int32Array::from(vec![0, 1, 2, 1]);
2214 let dict_array: ArrayRef = Arc::new(DictionaryArray::new(keys, Arc::new(values)));
2215
2216 assert_eq!(
2218 dict_array.null_count(),
2219 0,
2220 "physical nulls: none in keys bitmap"
2221 );
2222 assert_eq!(
2223 dict_array.logical_null_count(),
2224 2,
2225 "logical nulls: keys pointing to null values"
2226 );
2227
2228 acc.update_batch(std::slice::from_ref(&dict_array))?;
2230
2231 let result = acc.evaluate()?;
2233 match &result {
2234 ScalarValue::List(arr) => {
2235 let values = arr.value(0);
2236 assert_eq!(values.len(), 2);
2237 }
2238 other => panic!("expected List, got {other:?}"),
2239 }
2240
2241 acc.retract_batch(&[dict_array])?;
2243 let result = acc.evaluate()?;
2244 assert!(
2245 matches!(&result, ScalarValue::List(arr) if arr.is_null(0)),
2246 "expected null list after full retract, got {result:?}"
2247 );
2248
2249 Ok(())
2250 }
2251
2252 #[test]
2253 fn retract_ignore_nulls_dict_partial() -> Result<()> {
2254 use arrow::array::{DictionaryArray, Int32Array, StringArray};
2257
2258 let dict_type =
2259 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
2260 let mut acc = ArrayAggAccumulator::try_new(&dict_type, true)?;
2261
2262 let values = StringArray::from(vec!["A", "B", "C"]);
2264 let keys = Int32Array::from(vec![0, 1, 2]);
2265 let update_array: ArrayRef =
2266 Arc::new(DictionaryArray::new(keys, Arc::new(values)));
2267 acc.update_batch(&[update_array])?;
2268
2269 let values = StringArray::from(vec![Some("A"), None]);
2274 let keys = Int32Array::from(vec![0, 1, 1]);
2275 let retract_array: ArrayRef =
2276 Arc::new(DictionaryArray::new(keys, Arc::new(values)));
2277
2278 assert_eq!(
2279 retract_array.null_count(),
2280 0,
2281 "physical nulls: none in keys bitmap"
2282 );
2283 assert_eq!(
2284 retract_array.logical_null_count(),
2285 2,
2286 "logical nulls: keys pointing to null values"
2287 );
2288
2289 acc.retract_batch(&[retract_array])?;
2290
2291 let result = acc.evaluate()?;
2293 match &result {
2294 ScalarValue::List(arr) => {
2295 let values = arr.value(0);
2296 assert_eq!(values.len(), 2);
2297 }
2298 other => panic!("expected List with 2 elements, got {other:?}"),
2299 }
2300
2301 Ok(())
2302 }
2303}