Skip to main content

datafusion_functions_aggregate/
array_agg.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! `ARRAY_AGG` aggregate implementation: [`ArrayAgg`]
19
20use std::cmp::Ordering;
21use std::collections::{HashSet, VecDeque};
22use std::mem::{size_of, size_of_val, take};
23use std::sync::Arc;
24
25use arrow::array::{
26    Array, ArrayRef, AsArray, BooleanArray, ListArray, NullBufferBuilder, StructArray,
27    UInt32Array, new_empty_array,
28};
29use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
30use arrow::compute::{SortOptions, filter};
31use arrow::datatypes::{DataType, Field, FieldRef, Fields};
32
33use datafusion_common::cast::as_list_array;
34use datafusion_common::utils::{
35    SingleRowListArrayBuilder, compare_rows, get_row_at_idx, take_function_args,
36};
37use datafusion_common::{Result, ScalarValue, assert_eq_or_internal_err, exec_err};
38use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
39use datafusion_expr::utils::format_state_name;
40use datafusion_expr::{
41    Accumulator, AggregateUDFImpl, Documentation, EmitTo, GroupsAccumulator, Signature,
42    Volatility,
43};
44use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::filter_to_nulls;
45use datafusion_functions_aggregate_common::merge_arrays::merge_ordered_arrays;
46use datafusion_functions_aggregate_common::order::AggregateOrderSensitivity;
47use datafusion_functions_aggregate_common::utils::ordering_fields;
48use datafusion_macros::user_doc;
49use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
50
51make_udaf_expr_and_func!(
52    ArrayAgg,
53    array_agg,
54    expression,
55    "input values, including nulls, concatenated into an array",
56    array_agg_udaf
57);
58
59#[user_doc(
60    doc_section(label = "General Functions"),
61    description = r#"Returns an array created from the expression elements. If ordering is required, elements are inserted in the specified order.
62This aggregation function can only mix DISTINCT and ORDER BY if the ordering expression is exactly the same as the argument expression."#,
63    syntax_example = "array_agg(expression [ORDER BY expression])",
64    sql_example = r#"
65```sql
66> SELECT array_agg(column_name ORDER BY other_column) FROM table_name;
67+-----------------------------------------------+
68| array_agg(column_name ORDER BY other_column)  |
69+-----------------------------------------------+
70| [element1, element2, element3]                |
71+-----------------------------------------------+
72> SELECT array_agg(DISTINCT column_name ORDER BY column_name) FROM table_name;
73+--------------------------------------------------------+
74| array_agg(DISTINCT column_name ORDER BY column_name)  |
75+--------------------------------------------------------+
76| [element1, element2, element3]                         |
77+--------------------------------------------------------+
78```
79"#,
80    standard_argument(name = "expression",)
81)]
82#[derive(Debug, PartialEq, Eq, Hash)]
83/// ARRAY_AGG aggregate expression
84pub struct ArrayAgg {
85    signature: Signature,
86    is_input_pre_ordered: bool,
87}
88
89impl Default for ArrayAgg {
90    fn default() -> Self {
91        Self {
92            signature: Signature::any(1, Volatility::Immutable),
93            is_input_pre_ordered: false,
94        }
95    }
96}
97
98impl AggregateUDFImpl for ArrayAgg {
99    fn name(&self) -> &str {
100        "array_agg"
101    }
102
103    fn signature(&self) -> &Signature {
104        &self.signature
105    }
106
107    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
108        Ok(DataType::List(Arc::new(Field::new_list_field(
109            arg_types[0].clone(),
110            true,
111        ))))
112    }
113
114    fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
115        if args.is_distinct {
116            return Ok(vec![
117                Field::new_list(
118                    format_state_name(args.name, "distinct_array_agg"),
119                    // See COMMENTS.md to understand why nullable is set to true
120                    Field::new_list_field(args.input_fields[0].data_type().clone(), true),
121                    true,
122                )
123                .into(),
124            ]);
125        }
126
127        let mut fields = vec![
128            Field::new_list(
129                format_state_name(args.name, "array_agg"),
130                // See COMMENTS.md to understand why nullable is set to true
131                Field::new_list_field(args.input_fields[0].data_type().clone(), true),
132                true,
133            )
134            .into(),
135        ];
136
137        if args.ordering_fields.is_empty() {
138            return Ok(fields);
139        }
140
141        let orderings = args.ordering_fields.to_vec();
142        fields.push(
143            Field::new_list(
144                format_state_name(args.name, "array_agg_orderings"),
145                Field::new_list_field(DataType::Struct(Fields::from(orderings)), true),
146                false,
147            )
148            .into(),
149        );
150
151        Ok(fields)
152    }
153
154    fn order_sensitivity(&self) -> AggregateOrderSensitivity {
155        AggregateOrderSensitivity::SoftRequirement
156    }
157
158    fn with_beneficial_ordering(
159        self: Arc<Self>,
160        beneficial_ordering: bool,
161    ) -> Result<Option<Arc<dyn AggregateUDFImpl>>> {
162        Ok(Some(Arc::new(Self {
163            signature: self.signature.clone(),
164            is_input_pre_ordered: beneficial_ordering,
165        })))
166    }
167
168    fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
169        let field = &acc_args.expr_fields[0];
170        let data_type = field.data_type();
171        let ignore_nulls = acc_args.ignore_nulls && field.is_nullable();
172
173        if acc_args.is_distinct {
174            // Limitation similar to Postgres. The aggregation function can only mix
175            // DISTINCT and ORDER BY if all the expressions in the ORDER BY appear
176            // also in the arguments of the function. This implies that if the
177            // aggregation function only accepts one argument, only one argument
178            // can be used in the ORDER BY, For example:
179            //
180            // ARRAY_AGG(DISTINCT col)
181            //
182            // can only be mixed with an ORDER BY if the order expression is "col".
183            //
184            // ARRAY_AGG(DISTINCT col ORDER BY col)                         <- Valid
185            // ARRAY_AGG(DISTINCT concat(col, '') ORDER BY concat(col, '')) <- Valid
186            // ARRAY_AGG(DISTINCT col ORDER BY other_col)                   <- Invalid
187            // ARRAY_AGG(DISTINCT col ORDER BY concat(col, ''))             <- Invalid
188            let sort_option = match acc_args.order_bys {
189                [single] if single.expr.eq(&acc_args.exprs[0]) => Some(single.options),
190                [] => None,
191                _ => {
192                    return exec_err!(
193                        "In an aggregate with DISTINCT, ORDER BY expressions must appear in argument list"
194                    );
195                }
196            };
197            return Ok(Box::new(DistinctArrayAggAccumulator::try_new(
198                data_type,
199                sort_option,
200                ignore_nulls,
201            )?));
202        }
203
204        let Some(ordering) = LexOrdering::new(acc_args.order_bys.to_vec()) else {
205            return Ok(Box::new(ArrayAggAccumulator::try_new(
206                data_type,
207                ignore_nulls,
208            )?));
209        };
210
211        let ordering_dtypes = ordering
212            .iter()
213            .map(|e| e.expr.data_type(acc_args.schema))
214            .collect::<Result<Vec<_>>>()?;
215
216        OrderSensitiveArrayAggAccumulator::try_new(
217            data_type,
218            &ordering_dtypes,
219            ordering,
220            self.is_input_pre_ordered,
221            acc_args.is_reversed,
222            ignore_nulls,
223        )
224        .map(|acc| Box::new(acc) as _)
225    }
226
227    fn reverse_expr(&self) -> datafusion_expr::ReversedUDAF {
228        datafusion_expr::ReversedUDAF::Reversed(array_agg_udaf())
229    }
230
231    fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool {
232        !args.is_distinct && args.order_bys.is_empty()
233    }
234
235    fn create_groups_accumulator(
236        &self,
237        args: AccumulatorArgs,
238    ) -> Result<Box<dyn GroupsAccumulator>> {
239        let field = &args.expr_fields[0];
240        let data_type = field.data_type().clone();
241        let ignore_nulls = args.ignore_nulls && field.is_nullable();
242        Ok(Box::new(ArrayAggGroupsAccumulator::new(
243            data_type,
244            ignore_nulls,
245        )))
246    }
247
248    fn supports_null_handling_clause(&self) -> bool {
249        true
250    }
251
252    fn documentation(&self) -> Option<&Documentation> {
253        self.doc()
254    }
255}
256
257#[derive(Debug)]
258pub struct ArrayAggAccumulator {
259    values: VecDeque<ArrayRef>,
260    datatype: DataType,
261    ignore_nulls: bool,
262    /// Number of elements already consumed (retracted) from the front array.
263    /// Used by sliding window frames to avoid copying on partial retract.
264    front_offset: usize,
265}
266
267impl ArrayAggAccumulator {
268    /// new array_agg accumulator based on given item data type
269    pub fn try_new(datatype: &DataType, ignore_nulls: bool) -> Result<Self> {
270        Ok(Self {
271            values: VecDeque::new(),
272            datatype: datatype.clone(),
273            ignore_nulls,
274            front_offset: 0,
275        })
276    }
277
278    /// This function will return the underlying list array values if all valid values are consecutive without gaps (i.e. no null value point to a non-empty list)
279    /// If there are gaps but only in the end of the list array, the function will return the values without the null values in the end
280    fn get_optional_values_to_merge_as_is(list_array: &ListArray) -> Option<ArrayRef> {
281        let offsets = list_array.value_offsets();
282        // Offsets always have at least 1 value
283        let initial_offset = offsets[0];
284        let null_count = list_array.null_count();
285
286        // If no nulls than just use the fast path
287        // This is ok as the state is a ListArray rather than a ListViewArray so all the values are consecutive
288        if null_count == 0 {
289            // According to Arrow specification, the first offset can be non-zero
290            let list_values = list_array.values().slice(
291                initial_offset as usize,
292                (offsets[offsets.len() - 1] - initial_offset) as usize,
293            );
294            return Some(list_values);
295        }
296
297        // If all the values are null than just return an empty values array
298        if list_array.null_count() == list_array.len() {
299            return Some(list_array.values().slice(0, 0));
300        }
301
302        // According to the Arrow spec, null values can point to non-empty lists
303        // So this will check if all null values starting from the first valid value to the last one point to a 0 length list so we can just slice the underlying value
304
305        // Unwrapping is safe as we just checked if there is a null value
306        let nulls = list_array.nulls().unwrap();
307
308        let mut valid_slices_iter = nulls.valid_slices();
309
310        // This is safe as we validated that there is at least 1 valid value in the array
311        let (start, end) = valid_slices_iter.next().unwrap();
312
313        let start_offset = offsets[start];
314
315        // End is exclusive, so it already point to the last offset value
316        // This is valid as the length of the array is always 1 less than the length of the offsets
317        let mut end_offset_of_last_valid_value = offsets[end];
318
319        for (start, end) in valid_slices_iter {
320            // If there is a null value that point to a non-empty list than the start offset of the valid value
321            // will be different that the end offset of the last valid value
322            if offsets[start] != end_offset_of_last_valid_value {
323                return None;
324            }
325
326            // End is exclusive, so it already point to the last offset value
327            // This is valid as the length of the array is always 1 less than the length of the offsets
328            end_offset_of_last_valid_value = offsets[end];
329        }
330
331        let consecutive_valid_values = list_array.values().slice(
332            start_offset as usize,
333            (end_offset_of_last_valid_value - start_offset) as usize,
334        );
335
336        Some(consecutive_valid_values)
337    }
338}
339
340impl Accumulator for ArrayAggAccumulator {
341    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
342        // Append value like Int64Array(1,2,3)
343        if values.is_empty() {
344            return Ok(());
345        }
346
347        assert_eq_or_internal_err!(values.len(), 1, "expects single batch");
348
349        let val = &values[0];
350        let nulls = if self.ignore_nulls {
351            val.logical_nulls()
352        } else {
353            None
354        };
355
356        let val = match nulls {
357            Some(nulls) if nulls.null_count() >= val.len() => return Ok(()),
358            Some(nulls) => filter(val, &BooleanArray::new(nulls.inner().clone(), None))?,
359            None => Arc::clone(val),
360        };
361
362        if !val.is_empty() {
363            self.values.push_back(val)
364        }
365
366        Ok(())
367    }
368
369    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
370        // Append value like ListArray(Int64Array(1,2,3), Int64Array(4,5,6))
371        if states.is_empty() {
372            return Ok(());
373        }
374
375        assert_eq_or_internal_err!(states.len(), 1, "expects single state");
376
377        let list_arr = as_list_array(&states[0])?;
378
379        match Self::get_optional_values_to_merge_as_is(list_arr) {
380            Some(values) => {
381                // Make sure we don't insert empty lists
382                if !values.is_empty() {
383                    self.values.push_back(values);
384                }
385            }
386            None => {
387                for arr in list_arr.iter().flatten() {
388                    self.values.push_back(arr);
389                }
390            }
391        }
392
393        Ok(())
394    }
395
396    fn state(&mut self) -> Result<Vec<ScalarValue>> {
397        Ok(vec![self.evaluate()?])
398    }
399
400    fn evaluate(&mut self) -> Result<ScalarValue> {
401        if self.values.is_empty() {
402            return Ok(ScalarValue::new_null_list(self.datatype.clone(), true, 1));
403        }
404
405        let element_arrays: Vec<ArrayRef> = self
406            .values
407            .iter()
408            .enumerate()
409            .map(|(i, a)| {
410                if i == 0 && self.front_offset > 0 {
411                    a.slice(self.front_offset, a.len() - self.front_offset)
412                } else {
413                    Arc::clone(a)
414                }
415            })
416            .collect();
417
418        let element_refs: Vec<&dyn Array> =
419            element_arrays.iter().map(|a| a.as_ref()).collect();
420
421        if element_refs.iter().all(|a| a.is_empty()) {
422            return Ok(ScalarValue::new_null_list(self.datatype.clone(), true, 1));
423        }
424
425        let concated_array = arrow::compute::concat(&element_refs)?;
426
427        Ok(SingleRowListArrayBuilder::new(concated_array).build_list_scalar())
428    }
429
430    fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
431        if values.is_empty() {
432            return Ok(());
433        }
434
435        assert_eq_or_internal_err!(values.len(), 1, "expects single batch");
436
437        let val = &values[0];
438        let mut to_retract = if self.ignore_nulls {
439            val.len() - val.logical_null_count()
440        } else {
441            val.len()
442        };
443
444        while to_retract > 0 {
445            let Some(front) = self.values.front() else {
446                break;
447            };
448            let available = front.len() - self.front_offset;
449            if to_retract >= available {
450                self.values.pop_front();
451                to_retract -= available;
452                self.front_offset = 0;
453            } else {
454                self.front_offset += to_retract;
455                to_retract = 0;
456            }
457        }
458
459        Ok(())
460    }
461
462    fn supports_retract_batch(&self) -> bool {
463        true
464    }
465
466    fn size(&self) -> usize {
467        size_of_val(self)
468            + (size_of::<ArrayRef>() * self.values.capacity())
469            + self
470                .values
471                .iter()
472                // Each ArrayRef might be just a reference to a bigger array, and many
473                // ArrayRefs here might be referencing exactly the same array, so if we
474                // were to call `arr.get_array_memory_size()`, we would be double-counting
475                // the same underlying data many times.
476                //
477                // Instead, we do an approximation by estimating how much memory each
478                // ArrayRef would occupy if its underlying data was fully owned by this
479                // accumulator.
480                //
481                // Note that this is just an estimation, but the reality is that this
482                // accumulator might not own any data.
483                .map(|arr| arr.to_data().get_slice_memory_size().unwrap_or_default())
484                .sum::<usize>()
485            + self.datatype.size()
486            - size_of_val(&self.datatype)
487    }
488}
489
490#[derive(Debug)]
491struct ArrayAggGroupsAccumulator {
492    datatype: DataType,
493    ignore_nulls: bool,
494    /// Source arrays — input arrays (from update_batch) or list backing
495    /// arrays (from merge_batch).
496    batches: Vec<ArrayRef>,
497    /// Per-batch list of (group_idx, row_idx) pairs.
498    batch_entries: Vec<Vec<(u32, u32)>>,
499    /// Total number of groups tracked.
500    num_groups: usize,
501}
502
503impl ArrayAggGroupsAccumulator {
504    fn new(datatype: DataType, ignore_nulls: bool) -> Self {
505        Self {
506            datatype,
507            ignore_nulls,
508            batches: Vec::new(),
509            batch_entries: Vec::new(),
510            num_groups: 0,
511        }
512    }
513
514    fn clear_state(&mut self) {
515        // `size()` measures Vec capacity rather than len, so allocate new
516        // buffers instead of using `clear()`.
517        self.batches = Vec::new();
518        self.batch_entries = Vec::new();
519        self.num_groups = 0;
520    }
521
522    fn compact_retained_state(&mut self, emit_groups: usize) -> Result<()> {
523        // EmitTo::First is used to recover from memory pressure. Simply
524        // removing emitted entries in place is not enough because mixed batches
525        // would continue to pin their original Array arrays, even if only a few
526        // retained rows remain.
527        //
528        // Rebuild the retained state from scratch so fully emitted batches are
529        // dropped, mixed batches are compacted to arrays containing only the
530        // surviving rows, and retained metadata is right-sized.
531        let emit_groups = emit_groups as u32;
532        let old_batches = take(&mut self.batches);
533        let old_batch_entries = take(&mut self.batch_entries);
534
535        let mut batches = Vec::new();
536        let mut batch_entries = Vec::new();
537
538        for (batch, entries) in old_batches.into_iter().zip(old_batch_entries) {
539            let retained_len = entries.iter().filter(|(g, _)| *g >= emit_groups).count();
540
541            if retained_len == 0 {
542                continue;
543            }
544
545            if retained_len == entries.len() {
546                // Nothing was emitted from this batch, so we keep the existing
547                // array and only renumber the remaining group IDs so that they
548                // start from 0.
549                let mut retained_entries = entries;
550                for (g, _) in &mut retained_entries {
551                    *g -= emit_groups;
552                }
553                retained_entries.shrink_to_fit();
554                batches.push(batch);
555                batch_entries.push(retained_entries);
556                continue;
557            }
558
559            let mut retained_entries = Vec::with_capacity(retained_len);
560            let mut retained_rows = Vec::with_capacity(retained_len);
561
562            for (g, r) in entries {
563                if g >= emit_groups {
564                    // Compute the new `(group_idx, row_idx)` pair for a
565                    // retained row. `group_idx` is renumbered to start from
566                    // 0, and `row_idx` points into the new dense batch we are
567                    // building.
568                    retained_entries.push((g - emit_groups, retained_rows.len() as u32));
569                    retained_rows.push(r);
570                }
571            }
572
573            debug_assert_eq!(retained_entries.len(), retained_len);
574            debug_assert_eq!(retained_rows.len(), retained_len);
575
576            let batch = if retained_len == batch.len() {
577                batch
578            } else {
579                // Compact mixed batches so retained rows no longer pin the
580                // original array.
581                let retained_rows = UInt32Array::from(retained_rows);
582                arrow::compute::take(batch.as_ref(), &retained_rows, None)?
583            };
584
585            batches.push(batch);
586            batch_entries.push(retained_entries);
587        }
588
589        self.batches = batches;
590        self.batch_entries = batch_entries;
591        self.num_groups -= emit_groups as usize;
592
593        Ok(())
594    }
595}
596
597impl GroupsAccumulator for ArrayAggGroupsAccumulator {
598    /// Store a reference to the input batch, plus a `(group_idx, row_idx)` pair
599    /// for every row.
600    fn update_batch(
601        &mut self,
602        values: &[ArrayRef],
603        group_indices: &[usize],
604        opt_filter: Option<&BooleanArray>,
605        total_num_groups: usize,
606    ) -> Result<()> {
607        assert_eq!(values.len(), 1, "single argument to update_batch");
608        let input = &values[0];
609
610        self.num_groups = self.num_groups.max(total_num_groups);
611
612        let nulls = if self.ignore_nulls {
613            input.logical_nulls()
614        } else {
615            None
616        };
617
618        let mut entries = Vec::new();
619
620        for (row_idx, &group_idx) in group_indices.iter().enumerate() {
621            // Skip filtered rows
622            if let Some(filter) = opt_filter
623                && (filter.is_null(row_idx) || !filter.value(row_idx))
624            {
625                continue;
626            }
627
628            // Skip null values when ignore_nulls is set
629            if let Some(ref nulls) = nulls
630                && nulls.is_null(row_idx)
631            {
632                continue;
633            }
634
635            entries.push((group_idx as u32, row_idx as u32));
636        }
637
638        // We only need to record the batch if it was non-empty.
639        if !entries.is_empty() {
640            self.batches.push(Arc::clone(input));
641            self.batch_entries.push(entries);
642        }
643
644        Ok(())
645    }
646
647    /// Produce a `ListArray` ordered by group index: the list at
648    /// position N contains the aggregated values for group N.
649    ///
650    /// Uses a counting sort to rearrange the stored `(group, row)`
651    /// entries into group order, then calls `interleave` to gather
652    /// the values into a flat array that backs the output `ListArray`.
653    fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
654        let emit_groups = match emit_to {
655            EmitTo::All => self.num_groups,
656            EmitTo::First(n) => n,
657        };
658
659        // Step 1: Count entries per group. For EmitTo::First(n), only groups
660        // 0..n are counted; the rest are retained to be emitted in the future.
661        let mut counts = vec![0u32; emit_groups];
662        for entries in &self.batch_entries {
663            for &(g, _) in entries {
664                let g = g as usize;
665                if g < emit_groups {
666                    counts[g] += 1;
667                }
668            }
669        }
670
671        // Step 2: Do a prefix sum over the counts and use it to build ListArray
672        // offsets, null buffer, and write positions for the counting sort.
673        let mut offsets = Vec::<i32>::with_capacity(emit_groups + 1);
674        offsets.push(0);
675        let mut nulls_builder = NullBufferBuilder::new(emit_groups);
676        let mut write_positions = Vec::with_capacity(emit_groups);
677        let mut cur_offset = 0u32;
678        for &count in &counts {
679            if count == 0 {
680                nulls_builder.append_null();
681            } else {
682                nulls_builder.append_non_null();
683            }
684            write_positions.push(cur_offset);
685            cur_offset += count;
686            offsets.push(cur_offset as i32);
687        }
688        let total_rows = cur_offset as usize;
689
690        // Step 3: Scatter entries into group order using the counting sort. The
691        // batch index is implicit from the outer loop position.
692        let flat_values = if total_rows == 0 {
693            new_empty_array(&self.datatype)
694        } else {
695            let mut interleave_indices = vec![(0usize, 0usize); total_rows];
696            for (batch_idx, entries) in self.batch_entries.iter().enumerate() {
697                for &(g, r) in entries {
698                    let g = g as usize;
699                    if g < emit_groups {
700                        let wp = write_positions[g] as usize;
701                        interleave_indices[wp] = (batch_idx, r as usize);
702                        write_positions[g] += 1;
703                    }
704                }
705            }
706
707            let sources: Vec<&dyn Array> =
708                self.batches.iter().map(|b| b.as_ref()).collect();
709            arrow::compute::interleave(&sources, &interleave_indices)?
710        };
711
712        // Step 4: Release state for emitted groups.
713        match emit_to {
714            EmitTo::All => self.clear_state(),
715            EmitTo::First(_) => self.compact_retained_state(emit_groups)?,
716        }
717
718        let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));
719        let field = Arc::new(Field::new_list_field(self.datatype.clone(), true));
720        let result = ListArray::new(field, offsets, flat_values, nulls_builder.finish());
721
722        Ok(Arc::new(result))
723    }
724
725    fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
726        Ok(vec![self.evaluate(emit_to)?])
727    }
728
729    fn merge_batch(
730        &mut self,
731        values: &[ArrayRef],
732        group_indices: &[usize],
733        _opt_filter: Option<&BooleanArray>,
734        total_num_groups: usize,
735    ) -> Result<()> {
736        assert_eq!(values.len(), 1, "one argument to merge_batch");
737        let input_list = values[0].as_list::<i32>();
738
739        self.num_groups = self.num_groups.max(total_num_groups);
740
741        // Push the ListArray's backing values array as a single batch.
742        let list_values = input_list.values();
743        let list_offsets = input_list.offsets();
744
745        let mut entries = Vec::new();
746
747        for (row_idx, &group_idx) in group_indices.iter().enumerate() {
748            if input_list.is_null(row_idx) {
749                continue;
750            }
751            let start = list_offsets[row_idx] as u32;
752            let end = list_offsets[row_idx + 1] as u32;
753            for pos in start..end {
754                entries.push((group_idx as u32, pos));
755            }
756        }
757
758        if !entries.is_empty() {
759            self.batches.push(Arc::clone(list_values));
760            self.batch_entries.push(entries);
761        }
762
763        Ok(())
764    }
765
766    fn convert_to_state(
767        &self,
768        values: &[ArrayRef],
769        opt_filter: Option<&BooleanArray>,
770    ) -> Result<Vec<ArrayRef>> {
771        assert_eq!(values.len(), 1, "one argument to convert_to_state");
772
773        let input = &values[0];
774
775        // Each row becomes a 1-element list: offsets are [0, 1, 2, ..., n].
776        let offsets = OffsetBuffer::from_repeated_length(1, input.len());
777
778        // Filtered rows become null list entries, which merge_batch will skip.
779        let filter_nulls = opt_filter.map(filter_to_nulls);
780
781        // With ignore_nulls, null values also become null list entries. Without
782        // ignore_nulls, null values stay as [NULL] so merge_batch retains them.
783        let nulls = if self.ignore_nulls {
784            let logical = input.logical_nulls();
785            NullBuffer::union(filter_nulls.as_ref(), logical.as_ref())
786        } else {
787            filter_nulls
788        };
789
790        let field = Arc::new(Field::new_list_field(self.datatype.clone(), true));
791        let list_array = ListArray::new(field, offsets, Arc::clone(input), nulls);
792
793        Ok(vec![Arc::new(list_array)])
794    }
795
796    fn supports_convert_to_state(&self) -> bool {
797        true
798    }
799
800    fn size(&self) -> usize {
801        self.batches
802            .iter()
803            .map(|arr| arr.to_data().get_slice_memory_size().unwrap_or_default())
804            .sum::<usize>()
805            + self.batches.capacity() * size_of::<ArrayRef>()
806            + self
807                .batch_entries
808                .iter()
809                .map(|e| e.capacity() * size_of::<(u32, u32)>())
810                .sum::<usize>()
811            + self.batch_entries.capacity() * size_of::<Vec<(u32, u32)>>()
812    }
813}
814
815#[derive(Debug)]
816pub struct DistinctArrayAggAccumulator {
817    values: HashSet<ScalarValue>,
818    datatype: DataType,
819    sort_options: Option<SortOptions>,
820    ignore_nulls: bool,
821}
822
823impl DistinctArrayAggAccumulator {
824    pub fn try_new(
825        datatype: &DataType,
826        sort_options: Option<SortOptions>,
827        ignore_nulls: bool,
828    ) -> Result<Self> {
829        Ok(Self {
830            values: HashSet::new(),
831            datatype: datatype.clone(),
832            sort_options,
833            ignore_nulls,
834        })
835    }
836}
837
838impl Accumulator for DistinctArrayAggAccumulator {
839    fn state(&mut self) -> Result<Vec<ScalarValue>> {
840        Ok(vec![self.evaluate()?])
841    }
842
843    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
844        if values.is_empty() {
845            return Ok(());
846        }
847
848        let val = &values[0];
849        let nulls = if self.ignore_nulls {
850            val.logical_nulls()
851        } else {
852            None
853        };
854
855        let nulls = nulls.as_ref();
856        if nulls.is_none_or(|nulls| nulls.null_count() < val.len()) {
857            for i in 0..val.len() {
858                if nulls.is_none_or(|nulls| nulls.is_valid(i)) {
859                    self.values
860                        .insert(ScalarValue::try_from_array(val, i)?.compacted());
861                }
862            }
863        }
864
865        Ok(())
866    }
867
868    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
869        if states.is_empty() {
870            return Ok(());
871        }
872
873        assert_eq_or_internal_err!(states.len(), 1, "expects single state");
874
875        states[0]
876            .as_list::<i32>()
877            .iter()
878            .flatten()
879            .try_for_each(|val| self.update_batch(&[val]))
880    }
881
882    fn evaluate(&mut self) -> Result<ScalarValue> {
883        let mut values: Vec<ScalarValue> = self.values.iter().cloned().collect();
884        if values.is_empty() {
885            return Ok(ScalarValue::new_null_list(self.datatype.clone(), true, 1));
886        }
887
888        if let Some(opts) = self.sort_options {
889            let mut delayed_cmp_err = Ok(());
890            values.sort_by(|a, b| {
891                if a.is_null() {
892                    return match opts.nulls_first {
893                        true => Ordering::Less,
894                        false => Ordering::Greater,
895                    };
896                }
897                if b.is_null() {
898                    return match opts.nulls_first {
899                        true => Ordering::Greater,
900                        false => Ordering::Less,
901                    };
902                }
903                match opts.descending {
904                    true => b.try_cmp(a),
905                    false => a.try_cmp(b),
906                }
907                .unwrap_or_else(|err| {
908                    delayed_cmp_err = Err(err);
909                    Ordering::Equal
910                })
911            });
912            delayed_cmp_err?;
913        };
914
915        let arr = ScalarValue::new_list(&values, &self.datatype, true);
916        Ok(ScalarValue::List(arr))
917    }
918
919    fn size(&self) -> usize {
920        size_of_val(self) + ScalarValue::size_of_hashset(&self.values)
921            - size_of_val(&self.values)
922            + self.datatype.size()
923            - size_of_val(&self.datatype)
924            - size_of_val(&self.sort_options)
925            + size_of::<Option<SortOptions>>()
926    }
927}
928
929/// Accumulator for a `ARRAY_AGG(... ORDER BY ..., ...)` aggregation. In a multi
930/// partition setting, partial aggregations are computed for every partition,
931/// and then their results are merged.
932#[derive(Debug)]
933pub(crate) struct OrderSensitiveArrayAggAccumulator {
934    /// Stores entries in the `ARRAY_AGG` result.
935    values: Vec<ScalarValue>,
936    /// Stores values of ordering requirement expressions corresponding to each
937    /// entry in `values`. This information is used when merging results from
938    /// different partitions. For detailed information how merging is done, see
939    /// [`merge_ordered_arrays`].
940    ordering_values: Vec<Vec<ScalarValue>>,
941    /// Stores datatypes of expressions inside values and ordering requirement
942    /// expressions.
943    datatypes: Vec<DataType>,
944    /// Stores the ordering requirement of the `Accumulator`.
945    ordering_req: LexOrdering,
946    /// Whether the input is known to be pre-ordered
947    is_input_pre_ordered: bool,
948    /// Whether the aggregation is running in reverse.
949    reverse: bool,
950    /// Whether the aggregation should ignore null values.
951    ignore_nulls: bool,
952}
953
954impl OrderSensitiveArrayAggAccumulator {
955    /// Create a new order-sensitive ARRAY_AGG accumulator based on the given
956    /// item data type.
957    pub fn try_new(
958        datatype: &DataType,
959        ordering_dtypes: &[DataType],
960        ordering_req: LexOrdering,
961        is_input_pre_ordered: bool,
962        reverse: bool,
963        ignore_nulls: bool,
964    ) -> Result<Self> {
965        let mut datatypes = vec![datatype.clone()];
966        datatypes.extend(ordering_dtypes.iter().cloned());
967        Ok(Self {
968            values: vec![],
969            ordering_values: vec![],
970            datatypes,
971            ordering_req,
972            is_input_pre_ordered,
973            reverse,
974            ignore_nulls,
975        })
976    }
977
978    fn sort(&mut self) {
979        let sort_options = self
980            .ordering_req
981            .iter()
982            .map(|sort_expr| sort_expr.options)
983            .collect::<Vec<_>>();
984        let mut values = take(&mut self.values)
985            .into_iter()
986            .zip(take(&mut self.ordering_values))
987            .collect::<Vec<_>>();
988        let mut delayed_cmp_err = Ok(());
989        values.sort_by(|(_, left_ordering), (_, right_ordering)| {
990            compare_rows(left_ordering, right_ordering, &sort_options).unwrap_or_else(
991                |err| {
992                    delayed_cmp_err = Err(err);
993                    Ordering::Equal
994                },
995            )
996        });
997        (self.values, self.ordering_values) = values.into_iter().unzip();
998    }
999
1000    fn evaluate_orderings(&self) -> Result<ScalarValue> {
1001        let fields = ordering_fields(&self.ordering_req, &self.datatypes[1..]);
1002
1003        let column_wise_ordering_values = if self.ordering_values.is_empty() {
1004            fields
1005                .iter()
1006                .map(|f| new_empty_array(f.data_type()))
1007                .collect::<Vec<_>>()
1008        } else {
1009            (0..fields.len())
1010                .map(|i| {
1011                    let column_values = self.ordering_values.iter().map(|x| x[i].clone());
1012                    ScalarValue::iter_to_array(column_values)
1013                })
1014                .collect::<Result<_>>()?
1015        };
1016
1017        let ordering_array = StructArray::try_new(
1018            Fields::from(fields),
1019            column_wise_ordering_values,
1020            None,
1021        )?;
1022        Ok(SingleRowListArrayBuilder::new(Arc::new(ordering_array)).build_list_scalar())
1023    }
1024}
1025
1026impl Accumulator for OrderSensitiveArrayAggAccumulator {
1027    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
1028        if values.is_empty() {
1029            return Ok(());
1030        }
1031
1032        let val = &values[0];
1033        let ord = &values[1..];
1034        let nulls = if self.ignore_nulls {
1035            val.logical_nulls()
1036        } else {
1037            None
1038        };
1039
1040        let nulls = nulls.as_ref();
1041        if nulls.is_none_or(|nulls| nulls.null_count() < val.len()) {
1042            for i in 0..val.len() {
1043                if nulls.is_none_or(|nulls| nulls.is_valid(i)) {
1044                    self.values
1045                        .push(ScalarValue::try_from_array(val, i)?.compacted());
1046                    self.ordering_values.push(
1047                        get_row_at_idx(ord, i)?
1048                            .into_iter()
1049                            .map(|v| v.compacted())
1050                            .collect(),
1051                    )
1052                }
1053            }
1054        }
1055
1056        Ok(())
1057    }
1058
1059    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
1060        if states.is_empty() {
1061            return Ok(());
1062        }
1063
1064        // First entry in the state is the aggregation result. Second entry
1065        // stores values received for ordering requirement columns for each
1066        // aggregation value inside `ARRAY_AGG` list. For each `StructArray`
1067        // inside `ARRAY_AGG` list, we will receive an `Array` that stores values
1068        // received from its ordering requirement expression. (This information
1069        // is necessary for during merging).
1070        let [array_agg_values, agg_orderings] =
1071            take_function_args("OrderSensitiveArrayAggAccumulator::merge_batch", states)?;
1072        let Some(agg_orderings) = agg_orderings.as_list_opt::<i32>() else {
1073            return exec_err!("Expects to receive a list array");
1074        };
1075
1076        // Stores ARRAY_AGG results coming from each partition
1077        let mut partition_values = vec![];
1078        // Stores ordering requirement expression results coming from each partition
1079        let mut partition_ordering_values = vec![];
1080
1081        // Existing values should be merged also.
1082        if !self.is_input_pre_ordered {
1083            self.sort();
1084        }
1085        partition_values.push(take(&mut self.values).into());
1086        partition_ordering_values.push(take(&mut self.ordering_values).into());
1087
1088        // Convert array to Scalars to sort them easily. Convert back to array at evaluation.
1089        let array_agg_res = ScalarValue::convert_array_to_scalar_vec(array_agg_values)?;
1090        for maybe_v in array_agg_res.into_iter() {
1091            if let Some(v) = maybe_v {
1092                partition_values.push(v.into());
1093            } else {
1094                partition_values.push(vec![].into());
1095            }
1096        }
1097
1098        let orderings = ScalarValue::convert_array_to_scalar_vec(agg_orderings)?;
1099        for partition_ordering_rows in orderings.into_iter().flatten() {
1100            // Extract value from struct to ordering_rows for each group/partition
1101            let ordering_value = partition_ordering_rows.into_iter().map(|ordering_row| {
1102                    if let ScalarValue::Struct(s) = ordering_row {
1103                        let mut ordering_columns_per_row = vec![];
1104
1105                        for column in s.columns() {
1106                            let sv = ScalarValue::try_from_array(column, 0)?;
1107                            ordering_columns_per_row.push(sv);
1108                        }
1109
1110                        Ok(ordering_columns_per_row)
1111                    } else {
1112                        exec_err!(
1113                            "Expects to receive ScalarValue::Struct(Arc<StructArray>) but got:{:?}",
1114                            ordering_row.data_type()
1115                        )
1116                    }
1117                }).collect::<Result<VecDeque<_>>>()?;
1118
1119            partition_ordering_values.push(ordering_value);
1120        }
1121
1122        let sort_options = self
1123            .ordering_req
1124            .iter()
1125            .map(|sort_expr| sort_expr.options)
1126            .collect::<Vec<_>>();
1127
1128        (self.values, self.ordering_values) = merge_ordered_arrays(
1129            &mut partition_values,
1130            &mut partition_ordering_values,
1131            &sort_options,
1132        )?;
1133
1134        Ok(())
1135    }
1136
1137    fn state(&mut self) -> Result<Vec<ScalarValue>> {
1138        if !self.is_input_pre_ordered {
1139            self.sort();
1140        }
1141
1142        let mut result = vec![self.evaluate()?];
1143        result.push(self.evaluate_orderings()?);
1144
1145        Ok(result)
1146    }
1147
1148    fn evaluate(&mut self) -> Result<ScalarValue> {
1149        if !self.is_input_pre_ordered {
1150            self.sort();
1151        }
1152
1153        if self.values.is_empty() {
1154            return Ok(ScalarValue::new_null_list(
1155                self.datatypes[0].clone(),
1156                true,
1157                1,
1158            ));
1159        }
1160
1161        let values = self.values.clone();
1162        let array = if self.reverse {
1163            ScalarValue::new_list_from_iter(
1164                values.into_iter().rev(),
1165                &self.datatypes[0],
1166                true,
1167            )
1168        } else {
1169            ScalarValue::new_list_from_iter(values.into_iter(), &self.datatypes[0], true)
1170        };
1171        Ok(ScalarValue::List(array))
1172    }
1173
1174    fn size(&self) -> usize {
1175        let mut total = size_of_val(self) + ScalarValue::size_of_vec(&self.values)
1176            - size_of_val(&self.values);
1177
1178        // Add size of the `self.ordering_values`
1179        total += size_of::<Vec<ScalarValue>>() * self.ordering_values.capacity();
1180        for row in &self.ordering_values {
1181            total += ScalarValue::size_of_vec(row) - size_of_val(row);
1182        }
1183
1184        // Add size of the `self.datatypes`
1185        total += size_of::<DataType>() * self.datatypes.capacity();
1186        for dtype in &self.datatypes {
1187            total += dtype.size() - size_of_val(dtype);
1188        }
1189
1190        // Add size of the `self.ordering_req`
1191        total += size_of::<PhysicalSortExpr>() * self.ordering_req.capacity();
1192        // TODO: Calculate size of each `PhysicalSortExpr` more accurately.
1193        total
1194    }
1195}
1196
1197#[cfg(test)]
1198mod tests {
1199    use super::*;
1200    use arrow::array::{ListBuilder, StringBuilder};
1201    use arrow::datatypes::Schema;
1202    use datafusion_common::cast::as_generic_string_array;
1203    use datafusion_common::internal_err;
1204    use datafusion_physical_expr::PhysicalExpr;
1205    use datafusion_physical_expr::expressions::Column;
1206
1207    #[test]
1208    fn no_duplicates_no_distinct() -> Result<()> {
1209        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string().build_two()?;
1210
1211        acc1.update_batch(&[data(["a", "b", "c"])])?;
1212        acc2.update_batch(&[data(["d", "e", "f"])])?;
1213        acc1 = merge(acc1, acc2)?;
1214
1215        let result = print_nulls(str_arr(acc1.evaluate()?)?);
1216
1217        assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]);
1218
1219        Ok(())
1220    }
1221
1222    #[test]
1223    fn no_duplicates_distinct() -> Result<()> {
1224        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
1225            .distinct()
1226            .build_two()?;
1227
1228        acc1.update_batch(&[data(["a", "b", "c"])])?;
1229        acc2.update_batch(&[data(["d", "e", "f"])])?;
1230        acc1 = merge(acc1, acc2)?;
1231
1232        let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
1233        result.sort();
1234
1235        assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]);
1236
1237        Ok(())
1238    }
1239
1240    #[test]
1241    fn duplicates_no_distinct() -> Result<()> {
1242        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string().build_two()?;
1243
1244        acc1.update_batch(&[data(["a", "b", "c"])])?;
1245        acc2.update_batch(&[data(["a", "b", "c"])])?;
1246        acc1 = merge(acc1, acc2)?;
1247
1248        let result = print_nulls(str_arr(acc1.evaluate()?)?);
1249
1250        assert_eq!(result, vec!["a", "b", "c", "a", "b", "c"]);
1251
1252        Ok(())
1253    }
1254
1255    #[test]
1256    fn duplicates_distinct() -> Result<()> {
1257        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
1258            .distinct()
1259            .build_two()?;
1260
1261        acc1.update_batch(&[data(["a", "b", "c"])])?;
1262        acc2.update_batch(&[data(["a", "b", "c"])])?;
1263        acc1 = merge(acc1, acc2)?;
1264
1265        let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
1266        result.sort();
1267
1268        assert_eq!(result, vec!["a", "b", "c"]);
1269
1270        Ok(())
1271    }
1272
1273    #[test]
1274    fn duplicates_on_second_batch_distinct() -> Result<()> {
1275        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
1276            .distinct()
1277            .build_two()?;
1278
1279        acc1.update_batch(&[data(["a", "c"])])?;
1280        acc2.update_batch(&[data(["d", "a", "b", "c"])])?;
1281        acc1 = merge(acc1, acc2)?;
1282
1283        let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
1284        result.sort();
1285
1286        assert_eq!(result, vec!["a", "b", "c", "d"]);
1287
1288        Ok(())
1289    }
1290
1291    #[test]
1292    fn no_duplicates_distinct_sort_asc() -> Result<()> {
1293        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
1294            .distinct()
1295            .order_by_col("col", SortOptions::new(false, false))
1296            .build_two()?;
1297
1298        acc1.update_batch(&[data(["e", "b", "d"])])?;
1299        acc2.update_batch(&[data(["f", "a", "c"])])?;
1300        acc1 = merge(acc1, acc2)?;
1301
1302        let result = print_nulls(str_arr(acc1.evaluate()?)?);
1303
1304        assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]);
1305
1306        Ok(())
1307    }
1308
1309    #[test]
1310    fn no_duplicates_distinct_sort_desc() -> Result<()> {
1311        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
1312            .distinct()
1313            .order_by_col("col", SortOptions::new(true, false))
1314            .build_two()?;
1315
1316        acc1.update_batch(&[data(["e", "b", "d"])])?;
1317        acc2.update_batch(&[data(["f", "a", "c"])])?;
1318        acc1 = merge(acc1, acc2)?;
1319
1320        let result = print_nulls(str_arr(acc1.evaluate()?)?);
1321
1322        assert_eq!(result, vec!["f", "e", "d", "c", "b", "a"]);
1323
1324        Ok(())
1325    }
1326
1327    #[test]
1328    fn duplicates_distinct_sort_asc() -> Result<()> {
1329        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
1330            .distinct()
1331            .order_by_col("col", SortOptions::new(false, false))
1332            .build_two()?;
1333
1334        acc1.update_batch(&[data(["a", "c", "b"])])?;
1335        acc2.update_batch(&[data(["b", "c", "a"])])?;
1336        acc1 = merge(acc1, acc2)?;
1337
1338        let result = print_nulls(str_arr(acc1.evaluate()?)?);
1339
1340        assert_eq!(result, vec!["a", "b", "c"]);
1341
1342        Ok(())
1343    }
1344
1345    #[test]
1346    fn duplicates_distinct_sort_desc() -> Result<()> {
1347        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
1348            .distinct()
1349            .order_by_col("col", SortOptions::new(true, false))
1350            .build_two()?;
1351
1352        acc1.update_batch(&[data(["a", "c", "b"])])?;
1353        acc2.update_batch(&[data(["b", "c", "a"])])?;
1354        acc1 = merge(acc1, acc2)?;
1355
1356        let result = print_nulls(str_arr(acc1.evaluate()?)?);
1357
1358        assert_eq!(result, vec!["c", "b", "a"]);
1359
1360        Ok(())
1361    }
1362
1363    #[test]
1364    fn no_duplicates_distinct_sort_asc_nulls_first() -> Result<()> {
1365        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
1366            .distinct()
1367            .order_by_col("col", SortOptions::new(false, true))
1368            .build_two()?;
1369
1370        acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
1371        acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
1372        acc1 = merge(acc1, acc2)?;
1373
1374        let result = print_nulls(str_arr(acc1.evaluate()?)?);
1375
1376        assert_eq!(result, vec!["NULL", "a", "b", "e", "f"]);
1377
1378        Ok(())
1379    }
1380
1381    #[test]
1382    fn no_duplicates_distinct_sort_asc_nulls_last() -> Result<()> {
1383        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
1384            .distinct()
1385            .order_by_col("col", SortOptions::new(false, false))
1386            .build_two()?;
1387
1388        acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
1389        acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
1390        acc1 = merge(acc1, acc2)?;
1391
1392        let result = print_nulls(str_arr(acc1.evaluate()?)?);
1393
1394        assert_eq!(result, vec!["a", "b", "e", "f", "NULL"]);
1395
1396        Ok(())
1397    }
1398
1399    #[test]
1400    fn no_duplicates_distinct_sort_desc_nulls_first() -> Result<()> {
1401        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
1402            .distinct()
1403            .order_by_col("col", SortOptions::new(true, true))
1404            .build_two()?;
1405
1406        acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
1407        acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
1408        acc1 = merge(acc1, acc2)?;
1409
1410        let result = print_nulls(str_arr(acc1.evaluate()?)?);
1411
1412        assert_eq!(result, vec!["NULL", "f", "e", "b", "a"]);
1413
1414        Ok(())
1415    }
1416
1417    #[test]
1418    fn no_duplicates_distinct_sort_desc_nulls_last() -> Result<()> {
1419        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
1420            .distinct()
1421            .order_by_col("col", SortOptions::new(true, false))
1422            .build_two()?;
1423
1424        acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
1425        acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
1426        acc1 = merge(acc1, acc2)?;
1427
1428        let result = print_nulls(str_arr(acc1.evaluate()?)?);
1429
1430        assert_eq!(result, vec!["f", "e", "b", "a", "NULL"]);
1431
1432        Ok(())
1433    }
1434
1435    #[test]
1436    fn all_nulls_on_first_batch_with_distinct() -> Result<()> {
1437        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
1438            .distinct()
1439            .build_two()?;
1440
1441        acc1.update_batch(&[data::<Option<&str>, 3>([None, None, None])])?;
1442        acc2.update_batch(&[data([Some("a"), None, None, None])])?;
1443        acc1 = merge(acc1, acc2)?;
1444
1445        let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
1446        result.sort();
1447        assert_eq!(result, vec!["NULL", "a"]);
1448        Ok(())
1449    }
1450
1451    #[test]
1452    fn all_nulls_on_both_batches_with_distinct() -> Result<()> {
1453        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
1454            .distinct()
1455            .build_two()?;
1456
1457        acc1.update_batch(&[data::<Option<&str>, 3>([None, None, None])])?;
1458        acc2.update_batch(&[data::<Option<&str>, 4>([None, None, None, None])])?;
1459        acc1 = merge(acc1, acc2)?;
1460
1461        let result = print_nulls(str_arr(acc1.evaluate()?)?);
1462        assert_eq!(result, vec!["NULL"]);
1463        Ok(())
1464    }
1465
1466    #[test]
1467    fn does_not_over_account_memory() -> Result<()> {
1468        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string().build_two()?;
1469
1470        acc1.update_batch(&[data(["a", "c", "b"])])?;
1471        acc2.update_batch(&[data(["b", "c", "a"])])?;
1472        acc1 = merge(acc1, acc2)?;
1473
1474        assert_eq!(acc1.size(), 282);
1475
1476        Ok(())
1477    }
1478    #[test]
1479    fn does_not_over_account_memory_distinct() -> Result<()> {
1480        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
1481            .distinct()
1482            .build_two()?;
1483
1484        acc1.update_batch(&[string_list_data([
1485            vec!["a", "b", "c"],
1486            vec!["d", "e", "f"],
1487        ])])?;
1488        acc2.update_batch(&[string_list_data([vec!["e", "f", "g"]])])?;
1489        acc1 = merge(acc1, acc2)?;
1490
1491        // without compaction, the size is 16660
1492        assert_eq!(acc1.size(), 1660);
1493
1494        Ok(())
1495    }
1496
1497    #[test]
1498    fn does_not_over_account_memory_ordered() -> Result<()> {
1499        let mut acc = ArrayAggAccumulatorBuilder::string()
1500            .order_by_col("col", SortOptions::new(false, false))
1501            .build()?;
1502
1503        acc.update_batch(&[string_list_data([
1504            vec!["a", "b", "c"],
1505            vec!["c", "d", "e"],
1506            vec!["b", "c", "d"],
1507        ])])?;
1508
1509        // without compaction, the size is 17112
1510        assert_eq!(acc.size(), 2224);
1511
1512        Ok(())
1513    }
1514
1515    struct ArrayAggAccumulatorBuilder {
1516        return_field: FieldRef,
1517        distinct: bool,
1518        order_bys: Vec<PhysicalSortExpr>,
1519        schema: Schema,
1520    }
1521
1522    impl ArrayAggAccumulatorBuilder {
1523        fn string() -> Self {
1524            Self::new(DataType::Utf8)
1525        }
1526
1527        fn new(data_type: DataType) -> Self {
1528            Self {
1529                return_field: Field::new("f", data_type.clone(), true).into(),
1530                distinct: false,
1531                order_bys: vec![],
1532                schema: Schema {
1533                    fields: Fields::from(vec![Field::new(
1534                        "col",
1535                        DataType::new_list(data_type, true),
1536                        true,
1537                    )]),
1538                    metadata: Default::default(),
1539                },
1540            }
1541        }
1542
1543        fn distinct(mut self) -> Self {
1544            self.distinct = true;
1545            self
1546        }
1547
1548        fn order_by_col(mut self, col: &str, sort_options: SortOptions) -> Self {
1549            let new_order = PhysicalSortExpr::new(
1550                Arc::new(
1551                    Column::new_with_schema(col, &self.schema)
1552                        .expect("column not available in schema"),
1553                ),
1554                sort_options,
1555            );
1556            self.order_bys.push(new_order);
1557            self
1558        }
1559
1560        fn build(&self) -> Result<Box<dyn Accumulator>> {
1561            let expr = Arc::new(Column::new("col", 0));
1562            let expr_field = expr.return_field(&self.schema)?;
1563            ArrayAgg::default().accumulator(AccumulatorArgs {
1564                return_field: Arc::clone(&self.return_field),
1565                schema: &self.schema,
1566                expr_fields: &[expr_field],
1567                ignore_nulls: false,
1568                order_bys: &self.order_bys,
1569                is_reversed: false,
1570                name: "",
1571                is_distinct: self.distinct,
1572                exprs: &[expr],
1573            })
1574        }
1575
1576        fn build_two(&self) -> Result<(Box<dyn Accumulator>, Box<dyn Accumulator>)> {
1577            Ok((self.build()?, self.build()?))
1578        }
1579    }
1580
1581    fn str_arr(value: ScalarValue) -> Result<Vec<Option<String>>> {
1582        let ScalarValue::List(list) = value else {
1583            return internal_err!("ScalarValue was not a List");
1584        };
1585        Ok(as_generic_string_array::<i32>(list.values())?
1586            .iter()
1587            .map(|v| v.map(|v| v.to_string()))
1588            .collect())
1589    }
1590
1591    fn print_nulls(sort: Vec<Option<String>>) -> Vec<String> {
1592        sort.into_iter()
1593            .map(|v| v.unwrap_or_else(|| "NULL".to_string()))
1594            .collect()
1595    }
1596
1597    fn string_list_data<'a>(data: impl IntoIterator<Item = Vec<&'a str>>) -> ArrayRef {
1598        let mut builder = ListBuilder::new(StringBuilder::new());
1599        for string_list in data.into_iter() {
1600            builder.append_value(string_list.iter().map(Some).collect::<Vec<_>>());
1601        }
1602
1603        Arc::new(builder.finish())
1604    }
1605
1606    fn data<T, const N: usize>(list: [T; N]) -> ArrayRef
1607    where
1608        ScalarValue: From<T>,
1609    {
1610        let values: Vec<_> = list.into_iter().map(ScalarValue::from).collect();
1611        ScalarValue::iter_to_array(values).expect("Cannot convert to array")
1612    }
1613
1614    fn merge(
1615        mut acc1: Box<dyn Accumulator>,
1616        mut acc2: Box<dyn Accumulator>,
1617    ) -> Result<Box<dyn Accumulator>> {
1618        let intermediate_state = acc2.state().and_then(|e| {
1619            e.iter()
1620                .map(|v| v.to_array())
1621                .collect::<Result<Vec<ArrayRef>>>()
1622        })?;
1623        acc1.merge_batch(&intermediate_state)?;
1624        Ok(acc1)
1625    }
1626
1627    // ---- GroupsAccumulator tests ----
1628
1629    use arrow::array::Int32Array;
1630
1631    fn list_array_to_i32_vecs(list: &ListArray) -> Vec<Option<Vec<Option<i32>>>> {
1632        (0..list.len())
1633            .map(|i| {
1634                if list.is_null(i) {
1635                    None
1636                } else {
1637                    let arr = list.value(i);
1638                    let vals: Vec<Option<i32>> = arr
1639                        .as_any()
1640                        .downcast_ref::<Int32Array>()
1641                        .unwrap()
1642                        .iter()
1643                        .collect();
1644                    Some(vals)
1645                }
1646            })
1647            .collect()
1648    }
1649
1650    fn eval_i32_lists(
1651        acc: &mut ArrayAggGroupsAccumulator,
1652        emit_to: EmitTo,
1653    ) -> Result<Vec<Option<Vec<Option<i32>>>>> {
1654        let result = acc.evaluate(emit_to)?;
1655        Ok(list_array_to_i32_vecs(result.as_list::<i32>()))
1656    }
1657
1658    #[test]
1659    fn groups_accumulator_multiple_batches() -> Result<()> {
1660        let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
1661
1662        // First batch
1663        let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
1664        acc.update_batch(&[values], &[0, 1, 0], None, 2)?;
1665
1666        // Second batch
1667        let values: ArrayRef = Arc::new(Int32Array::from(vec![4, 5]));
1668        acc.update_batch(&[values], &[1, 0], None, 2)?;
1669
1670        let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
1671        assert_eq!(vals[0], Some(vec![Some(1), Some(3), Some(5)]));
1672        assert_eq!(vals[1], Some(vec![Some(2), Some(4)]));
1673
1674        Ok(())
1675    }
1676
1677    #[test]
1678    fn groups_accumulator_emit_first() -> Result<()> {
1679        let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
1680
1681        let values: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30]));
1682        acc.update_batch(&[values], &[0, 1, 2], None, 3)?;
1683
1684        // Emit first 2 groups
1685        let vals = eval_i32_lists(&mut acc, EmitTo::First(2))?;
1686        assert_eq!(vals.len(), 2);
1687        assert_eq!(vals[0], Some(vec![Some(10)]));
1688        assert_eq!(vals[1], Some(vec![Some(20)]));
1689
1690        // Remaining group (was index 2, now shifted to 0)
1691        let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
1692        assert_eq!(vals.len(), 1);
1693        assert_eq!(vals[0], Some(vec![Some(30)]));
1694
1695        Ok(())
1696    }
1697
1698    #[test]
1699    fn groups_accumulator_emit_first_frees_batches() -> Result<()> {
1700        // Batch 0 has rows only for group 0; batch 1 has rows for
1701        // both groups. After emitting group 0, batch 0 should be
1702        // dropped entirely and batch 1 should be compacted to the
1703        // retained row(s).
1704        let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
1705
1706        let batch0: ArrayRef = Arc::new(Int32Array::from(vec![10, 20]));
1707        acc.update_batch(&[batch0], &[0, 0], None, 2)?;
1708
1709        let batch1: ArrayRef = Arc::new(Int32Array::from(vec![30, 40]));
1710        acc.update_batch(&[batch1], &[0, 1], None, 2)?;
1711
1712        assert_eq!(acc.batches.len(), 2);
1713        assert!(!acc.batches[0].is_empty());
1714        assert!(!acc.batches[1].is_empty());
1715
1716        // Emit group 0. Batch 0 is only referenced by group 0, so it
1717        // should be removed. Batch 1 is mixed, so it should be compacted
1718        // to contain only the retained row for group 1.
1719        let vals = eval_i32_lists(&mut acc, EmitTo::First(1))?;
1720        assert_eq!(vals[0], Some(vec![Some(10), Some(20), Some(30)]));
1721
1722        assert_eq!(acc.batches.len(), 1);
1723        let retained = acc.batches[0]
1724            .as_any()
1725            .downcast_ref::<Int32Array>()
1726            .unwrap();
1727        assert_eq!(retained.values(), &[40]);
1728        assert_eq!(acc.batch_entries, vec![vec![(0, 0)]]);
1729
1730        // Emit remaining group 1
1731        let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
1732        assert_eq!(vals[0], Some(vec![Some(40)]));
1733
1734        assert!(acc.batches.is_empty());
1735        assert_eq!(acc.size(), 0);
1736
1737        Ok(())
1738    }
1739
1740    #[test]
1741    fn groups_accumulator_emit_first_compacts_mixed_batches() -> Result<()> {
1742        let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
1743
1744        let batch: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30, 40]));
1745        acc.update_batch(&[batch], &[0, 1, 0, 1], None, 2)?;
1746
1747        let size_before = acc.size();
1748        let vals = eval_i32_lists(&mut acc, EmitTo::First(1))?;
1749        assert_eq!(vals[0], Some(vec![Some(10), Some(30)]));
1750
1751        assert_eq!(acc.num_groups, 1);
1752        assert_eq!(acc.batches.len(), 1);
1753        let retained = acc.batches[0]
1754            .as_any()
1755            .downcast_ref::<Int32Array>()
1756            .unwrap();
1757        assert_eq!(retained.values(), &[20, 40]);
1758        assert_eq!(acc.batch_entries, vec![vec![(0, 0), (0, 1)]]);
1759        assert!(acc.size() < size_before);
1760
1761        let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
1762        assert_eq!(vals[0], Some(vec![Some(20), Some(40)]));
1763        assert_eq!(acc.size(), 0);
1764
1765        Ok(())
1766    }
1767
1768    #[test]
1769    fn groups_accumulator_emit_all_releases_capacity() -> Result<()> {
1770        let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
1771
1772        let batch: ArrayRef = Arc::new(Int32Array::from_iter_values(0..64));
1773        acc.update_batch(
1774            &[batch],
1775            &(0..64).map(|i| i % 4).collect::<Vec<_>>(),
1776            None,
1777            4,
1778        )?;
1779
1780        assert!(acc.size() > 0);
1781        let _ = eval_i32_lists(&mut acc, EmitTo::All)?;
1782
1783        assert_eq!(acc.size(), 0);
1784        assert_eq!(acc.batches.capacity(), 0);
1785        assert_eq!(acc.batch_entries.capacity(), 0);
1786
1787        Ok(())
1788    }
1789
1790    #[test]
1791    fn groups_accumulator_null_groups() -> Result<()> {
1792        // Groups that never receive values should produce null
1793        let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
1794
1795        let values: ArrayRef = Arc::new(Int32Array::from(vec![1]));
1796        // Only group 0 gets a value, groups 1 and 2 are empty
1797        acc.update_batch(&[values], &[0], None, 3)?;
1798
1799        let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
1800        assert_eq!(vals, vec![Some(vec![Some(1)]), None, None]);
1801
1802        Ok(())
1803    }
1804
1805    #[test]
1806    fn groups_accumulator_ignore_nulls() -> Result<()> {
1807        let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, true);
1808
1809        let values: ArrayRef =
1810            Arc::new(Int32Array::from(vec![Some(1), None, Some(3), None]));
1811        acc.update_batch(&[values], &[0, 0, 1, 1], None, 2)?;
1812
1813        let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
1814        // Group 0: only non-null value is 1
1815        assert_eq!(vals[0], Some(vec![Some(1)]));
1816        // Group 1: only non-null value is 3
1817        assert_eq!(vals[1], Some(vec![Some(3)]));
1818
1819        Ok(())
1820    }
1821
1822    #[test]
1823    fn groups_accumulator_opt_filter() -> Result<()> {
1824        let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
1825
1826        let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
1827        // Use a mix of false and null to filter out rows — both should
1828        // be skipped.
1829        let filter = BooleanArray::from(vec![Some(true), None, Some(true), Some(false)]);
1830        acc.update_batch(&[values], &[0, 0, 1, 1], Some(&filter), 2)?;
1831
1832        let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
1833        assert_eq!(vals[0], Some(vec![Some(1)])); // row 1 filtered (null)
1834        assert_eq!(vals[1], Some(vec![Some(3)])); // row 3 filtered (false)
1835
1836        Ok(())
1837    }
1838
1839    #[test]
1840    fn groups_accumulator_state_merge_roundtrip() -> Result<()> {
1841        // Accumulator 1: update_batch, then merge, then update_batch again.
1842        // Verifies that values appear in chronological insertion order.
1843        let mut acc1 = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
1844        let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
1845        acc1.update_batch(&[values], &[0, 1], None, 2)?;
1846
1847        // Accumulator 2
1848        let mut acc2 = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
1849        let values: ArrayRef = Arc::new(Int32Array::from(vec![3, 4]));
1850        acc2.update_batch(&[values], &[0, 1], None, 2)?;
1851
1852        // Merge acc2's state into acc1
1853        let state = acc2.state(EmitTo::All)?;
1854        acc1.merge_batch(&state, &[0, 1], None, 2)?;
1855
1856        // Another update_batch on acc1 after the merge
1857        let values: ArrayRef = Arc::new(Int32Array::from(vec![5, 6]));
1858        acc1.update_batch(&[values], &[0, 1], None, 2)?;
1859
1860        // Each group's values in insertion order:
1861        // group 0: update(1), merge(3), update(5) → [1, 3, 5]
1862        // group 1: update(2), merge(4), update(6) → [2, 4, 6]
1863        let vals = eval_i32_lists(&mut acc1, EmitTo::All)?;
1864        assert_eq!(vals[0], Some(vec![Some(1), Some(3), Some(5)]));
1865        assert_eq!(vals[1], Some(vec![Some(2), Some(4), Some(6)]));
1866
1867        Ok(())
1868    }
1869
1870    #[test]
1871    fn groups_accumulator_convert_to_state() -> Result<()> {
1872        let acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
1873
1874        let values: ArrayRef = Arc::new(Int32Array::from(vec![Some(10), None, Some(30)]));
1875        let state = acc.convert_to_state(&[values], None)?;
1876
1877        assert_eq!(state.len(), 1);
1878        let vals = list_array_to_i32_vecs(state[0].as_list::<i32>());
1879        assert_eq!(
1880            vals,
1881            vec![
1882                Some(vec![Some(10)]),
1883                Some(vec![None]), // null preserved inside list, not promoted
1884                Some(vec![Some(30)]),
1885            ]
1886        );
1887
1888        Ok(())
1889    }
1890
1891    #[test]
1892    fn groups_accumulator_convert_to_state_with_filter() -> Result<()> {
1893        let acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
1894
1895        let values: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30]));
1896        let filter = BooleanArray::from(vec![true, false, true]);
1897        let state = acc.convert_to_state(&[values], Some(&filter))?;
1898
1899        let vals = list_array_to_i32_vecs(state[0].as_list::<i32>());
1900        assert_eq!(
1901            vals,
1902            vec![
1903                Some(vec![Some(10)]),
1904                None, // filtered
1905                Some(vec![Some(30)]),
1906            ]
1907        );
1908
1909        Ok(())
1910    }
1911
1912    #[test]
1913    fn groups_accumulator_convert_to_state_merge_preserves_nulls() -> Result<()> {
1914        // Verifies that null values survive the convert_to_state -> merge_batch
1915        // round-trip when ignore_nulls is false (default null handling).
1916        let acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
1917
1918        let values: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)]));
1919        let state = acc.convert_to_state(&[values], None)?;
1920
1921        // Feed state into a new accumulator via merge_batch
1922        let mut acc2 = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
1923        acc2.merge_batch(&state, &[0, 0, 1], None, 2)?;
1924
1925        // Group 0 received rows 0 ([1]) and 1 ([NULL]) → [1, NULL]
1926        let vals = eval_i32_lists(&mut acc2, EmitTo::All)?;
1927        assert_eq!(vals[0], Some(vec![Some(1), None]));
1928        // Group 1 received row 2 ([3]) → [3]
1929        assert_eq!(vals[1], Some(vec![Some(3)]));
1930
1931        Ok(())
1932    }
1933
1934    #[test]
1935    fn groups_accumulator_convert_to_state_merge_ignore_nulls() -> Result<()> {
1936        // Verifies that null values are dropped in the convert_to_state ->
1937        // merge_batch round-trip when ignore_nulls is true.
1938        let acc = ArrayAggGroupsAccumulator::new(DataType::Int32, true);
1939
1940        let values: ArrayRef =
1941            Arc::new(Int32Array::from(vec![Some(1), None, Some(3), None]));
1942        let state = acc.convert_to_state(&[values], None)?;
1943
1944        let list = state[0].as_list::<i32>();
1945        // Rows 0 and 2 are valid lists; rows 1 and 3 are null list entries
1946        assert!(!list.is_null(0));
1947        assert!(list.is_null(1));
1948        assert!(!list.is_null(2));
1949        assert!(list.is_null(3));
1950
1951        // Feed state into a new accumulator via merge_batch
1952        let mut acc2 = ArrayAggGroupsAccumulator::new(DataType::Int32, true);
1953        acc2.merge_batch(&state, &[0, 0, 1, 1], None, 2)?;
1954
1955        // Group 0: received [1] and null (skipped) → [1]
1956        let vals = eval_i32_lists(&mut acc2, EmitTo::All)?;
1957        assert_eq!(vals[0], Some(vec![Some(1)]));
1958        // Group 1: received [3] and null (skipped) → [3]
1959        assert_eq!(vals[1], Some(vec![Some(3)]));
1960
1961        Ok(())
1962    }
1963
1964    #[test]
1965    fn groups_accumulator_all_groups_empty() -> Result<()> {
1966        let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
1967
1968        // Create groups but don't add any values (all filtered out)
1969        let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
1970        let filter = BooleanArray::from(vec![false, false]);
1971        acc.update_batch(&[values], &[0, 1], Some(&filter), 2)?;
1972
1973        let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
1974        assert_eq!(vals, vec![None, None]);
1975
1976        Ok(())
1977    }
1978
1979    #[test]
1980    fn groups_accumulator_ignore_nulls_all_null_group() -> Result<()> {
1981        // When ignore_nulls is true and a group receives only nulls,
1982        // it should produce a null output
1983        let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, true);
1984
1985        let values: ArrayRef = Arc::new(Int32Array::from(vec![None, Some(1), None]));
1986        acc.update_batch(&[values], &[0, 1, 0], None, 2)?;
1987
1988        let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
1989        assert_eq!(vals[0], None); // group 0 got only nulls, all filtered
1990        assert_eq!(vals[1], Some(vec![Some(1)])); // group 1 got value 1
1991
1992        Ok(())
1993    }
1994
1995    // ---- retract_batch tests ----
1996
1997    #[test]
1998    fn retract_basic_sliding_window() -> Result<()> {
1999        let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
2000
2001        // Simulate ROWS BETWEEN 1 PRECEDING AND CURRENT ROW over [A, B, C, D]
2002        // Row 1: frame = [A]
2003        acc.update_batch(&[data(["A"])])?;
2004        assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A"]);
2005
2006        // Row 2: frame = [A, B]
2007        acc.update_batch(&[data(["B"])])?;
2008        assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "B"]);
2009
2010        // Row 3: frame = [B, C] — A leaves
2011        acc.update_batch(&[data(["C"])])?;
2012        acc.retract_batch(&[data(["A"])])?;
2013        assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["B", "C"]);
2014
2015        // Row 4: frame = [C, D] — B leaves
2016        acc.update_batch(&[data(["D"])])?;
2017        acc.retract_batch(&[data(["B"])])?;
2018        assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["C", "D"]);
2019
2020        Ok(())
2021    }
2022
2023    #[test]
2024    fn retract_multi_element_across_arrays() -> Result<()> {
2025        let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
2026
2027        // First batch: 3 elements
2028        acc.update_batch(&[data(["A", "B", "C"])])?;
2029        // Second batch: 1 element
2030        acc.update_batch(&[data(["D"])])?;
2031
2032        assert_eq!(
2033            print_nulls(str_arr(acc.evaluate()?)?),
2034            vec!["A", "B", "C", "D"]
2035        );
2036
2037        // Partial retract from front array: A leaves
2038        acc.retract_batch(&[data(["A"])])?;
2039        assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["B", "C", "D"]);
2040
2041        // Retract spanning two arrays: B, C (rest of first array) + D (second array)
2042        acc.retract_batch(&[data(["B", "C", "D"])])?;
2043        let result = acc.evaluate()?;
2044        assert!(
2045            matches!(&result, ScalarValue::List(arr) if arr.is_null(0)),
2046            "expected null list after full retract, got {result:?}"
2047        );
2048
2049        Ok(())
2050    }
2051
2052    #[test]
2053    fn retract_with_nulls_preserved() -> Result<()> {
2054        // ignore_nulls = false: NULLs are stored and counted for retract
2055        let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
2056
2057        acc.update_batch(&[data([Some("A"), None, Some("C")])])?;
2058        assert_eq!(
2059            print_nulls(str_arr(acc.evaluate()?)?),
2060            vec!["A", "NULL", "C"]
2061        );
2062
2063        // Retract 2 elements: A and NULL both leave
2064        acc.retract_batch(&[data([Some("A"), None])])?;
2065        assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["C"]);
2066
2067        Ok(())
2068    }
2069
2070    #[test]
2071    fn retract_with_ignore_nulls() -> Result<()> {
2072        // ignore_nulls = true: NULLs are NOT stored by update_batch,
2073        // so retract must only count non-null values
2074        let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, true)?;
2075
2076        // update_batch with [A, NULL, C] → stores only [A, C] (NULL filtered)
2077        acc.update_batch(&[data([Some("A"), None, Some("C")])])?;
2078        assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "C"]);
2079
2080        // retract_batch receives the original values including NULL: [A, NULL]
2081        // But only 1 non-null value (A) should be retracted
2082        acc.retract_batch(&[data([Some("A"), None])])?;
2083        assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["C"]);
2084
2085        // retract_batch with [NULL, C] — only C (1 non-null) retracted
2086        acc.retract_batch(&[data([None, Some("C")])])?;
2087        let result = acc.evaluate()?;
2088        assert!(
2089            matches!(&result, ScalarValue::List(arr) if arr.is_null(0)),
2090            "expected null list after full retract, got {result:?}"
2091        );
2092
2093        Ok(())
2094    }
2095
2096    #[test]
2097    fn retract_ignore_nulls_all_nulls_batch() -> Result<()> {
2098        // When ignore_nulls = true and retract batch is all NULLs, nothing is retracted
2099        let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, true)?;
2100
2101        acc.update_batch(&[data([Some("A"), Some("B")])])?;
2102        assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "B"]);
2103
2104        // Retract batch of all NULLs: to_retract = 0, nothing changes
2105        acc.retract_batch(&[data::<Option<&str>, 3>([None, None, None])])?;
2106        assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "B"]);
2107
2108        Ok(())
2109    }
2110
2111    #[test]
2112    fn retract_empty_accumulator() -> Result<()> {
2113        let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
2114
2115        // Retract on empty accumulator should be a no-op
2116        acc.retract_batch(&[data(["A"])])?;
2117        let result = acc.evaluate()?;
2118        assert!(
2119            matches!(&result, ScalarValue::List(arr) if arr.is_null(0)),
2120            "expected null list for empty accumulator, got {result:?}"
2121        );
2122
2123        Ok(())
2124    }
2125
2126    #[test]
2127    fn retract_front_offset_partial_consume() -> Result<()> {
2128        // Reproduces the RANGE BETWEEN 2 PRECEDING AND 2 FOLLOWING scenario:
2129        //   ts: 1, 2, 3, 4, 100
2130        //
2131        // Row 1 (ts=1): update [A,B,C] (3 elements, ts in [-1,3])
2132        // Row 2 (ts=2): update [D]     (ts=4 enters)
2133        // Row 3 (ts=3): no change      (same frame [0..4))
2134        // Row 4 (ts=4): retract [A]    (ts=1 leaves, partial consume)
2135        // Row 5 (ts=100): retract [B,C,D] (3-element retract spanning arrays)
2136        let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
2137
2138        // Row 1: update_batch(["A","B","C"])
2139        acc.update_batch(&[data(["A", "B", "C"])])?;
2140        assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "B", "C"]);
2141
2142        // Row 2: update_batch(["D"])
2143        acc.update_batch(&[data(["D"])])?;
2144        assert_eq!(
2145            print_nulls(str_arr(acc.evaluate()?)?),
2146            vec!["A", "B", "C", "D"]
2147        );
2148
2149        // Row 4: retract_batch(["A"]) — partial consume, front_offset = 1
2150        acc.retract_batch(&[data(["A"])])?;
2151        assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["B", "C", "D"]);
2152
2153        // Row 5: update_batch(["E"]), then retract_batch(["B","C","D"])
2154        // retract spans: ["A","B","C"] (offset=1, 2 remaining) + ["D"] (1 element)
2155        acc.update_batch(&[data(["E"])])?;
2156        acc.retract_batch(&[data(["B", "C", "D"])])?;
2157        assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["E"]);
2158
2159        Ok(())
2160    }
2161
2162    #[test]
2163    fn retract_update_after_full_drain() -> Result<()> {
2164        // Verify accumulator works correctly after being fully drained
2165        let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
2166
2167        acc.update_batch(&[data(["A", "B"])])?;
2168        acc.retract_batch(&[data(["A", "B"])])?;
2169
2170        // Accumulator is empty now
2171        let result = acc.evaluate()?;
2172        assert!(
2173            matches!(&result, ScalarValue::List(arr) if arr.is_null(0)),
2174            "expected null list, got {result:?}"
2175        );
2176
2177        // New values should work normally after drain
2178        acc.update_batch(&[data(["X", "Y"])])?;
2179        assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["X", "Y"]);
2180
2181        acc.retract_batch(&[data(["X"])])?;
2182        assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["Y"]);
2183
2184        Ok(())
2185    }
2186
2187    #[test]
2188    fn retract_supports_retract_batch() -> Result<()> {
2189        let acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
2190        assert!(acc.supports_retract_batch());
2191
2192        let acc_ignore = ArrayAggAccumulator::try_new(&DataType::Utf8, true)?;
2193        assert!(acc_ignore.supports_retract_batch());
2194
2195        Ok(())
2196    }
2197
2198    #[test]
2199    fn retract_ignore_nulls_logical_vs_physical() -> Result<()> {
2200        // Regression test: DictionaryArray where logical nulls differ from physical nulls.
2201        // Manually construct a DictionaryArray where all indices are valid
2202        // (physical null_count = 0) but some point to null dictionary values
2203        // (logical_null_count > 0).
2204        use arrow::array::{DictionaryArray, Int32Array, StringArray};
2205
2206        let dict_type =
2207            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
2208        let mut acc = ArrayAggAccumulator::try_new(&dict_type, true)?;
2209
2210        // Dictionary values: ["hello", NULL, "world"]
2211        // Keys: [0, 1, 2, 1] — all valid, but keys 1 and 3 point to null value
2212        let values = StringArray::from(vec![Some("hello"), None, Some("world")]);
2213        let keys = Int32Array::from(vec![0, 1, 2, 1]);
2214        let dict_array: ArrayRef = Arc::new(DictionaryArray::new(keys, Arc::new(values)));
2215
2216        // Confirm the divergence this test exists to exercise
2217        assert_eq!(
2218            dict_array.null_count(),
2219            0,
2220            "physical nulls: none in keys bitmap"
2221        );
2222        assert_eq!(
2223            dict_array.logical_null_count(),
2224            2,
2225            "logical nulls: keys pointing to null values"
2226        );
2227
2228        // update_batch uses logical_nulls() → stores only ["hello", "world"]
2229        acc.update_batch(std::slice::from_ref(&dict_array))?;
2230
2231        // Verify 2 elements stored
2232        let result = acc.evaluate()?;
2233        match &result {
2234            ScalarValue::List(arr) => {
2235                let values = arr.value(0);
2236                assert_eq!(values.len(), 2);
2237            }
2238            other => panic!("expected List, got {other:?}"),
2239        }
2240
2241        // retract_batch with same array: should retract 2 (logical non-nulls), not 4 (len) or 0 (physical non-nulls would be len-0=4)
2242        acc.retract_batch(&[dict_array])?;
2243        let result = acc.evaluate()?;
2244        assert!(
2245            matches!(&result, ScalarValue::List(arr) if arr.is_null(0)),
2246            "expected null list after full retract, got {result:?}"
2247        );
2248
2249        Ok(())
2250    }
2251
2252    #[test]
2253    fn retract_ignore_nulls_dict_partial() -> Result<()> {
2254        // Partial retraction with DictionaryArray where logical != physical nulls.
2255        // Manually construct so keys are all valid but some point to null values.
2256        use arrow::array::{DictionaryArray, Int32Array, StringArray};
2257
2258        let dict_type =
2259            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
2260        let mut acc = ArrayAggAccumulator::try_new(&dict_type, true)?;
2261
2262        // update with ["A", "B", "C"] (no nulls)
2263        let values = StringArray::from(vec!["A", "B", "C"]);
2264        let keys = Int32Array::from(vec![0, 1, 2]);
2265        let update_array: ArrayRef =
2266            Arc::new(DictionaryArray::new(keys, Arc::new(values)));
2267        acc.update_batch(&[update_array])?;
2268
2269        // retract with dict ["A", NULL, NULL]:
2270        //   keys [0, 1, 1] all valid → physical null_count = 0
2271        //   keys 1,2 point to null value → logical_null_count = 2
2272        //   non-null count = 3 - 2 = 1 → retract 1 element
2273        let values = StringArray::from(vec![Some("A"), None]);
2274        let keys = Int32Array::from(vec![0, 1, 1]);
2275        let retract_array: ArrayRef =
2276            Arc::new(DictionaryArray::new(keys, Arc::new(values)));
2277
2278        assert_eq!(
2279            retract_array.null_count(),
2280            0,
2281            "physical nulls: none in keys bitmap"
2282        );
2283        assert_eq!(
2284            retract_array.logical_null_count(),
2285            2,
2286            "logical nulls: keys pointing to null values"
2287        );
2288
2289        acc.retract_batch(&[retract_array])?;
2290
2291        // Should have retracted only 1 element, leaving ["B", "C"]
2292        let result = acc.evaluate()?;
2293        match &result {
2294            ScalarValue::List(arr) => {
2295                let values = arr.value(0);
2296                assert_eq!(values.len(), 2);
2297            }
2298            other => panic!("expected List with 2 elements, got {other:?}"),
2299        }
2300
2301        Ok(())
2302    }
2303}