llkv_aggregate/
lib.rs

1// TODO: Can portions of this be offloaded to llkv-compute as vectorized ops?
2
3//! Runtime aggregation utilities used by the planner and executor.
4//!
5//! The crate evaluates logical aggregates described by [`llkv_plan::AggregateExpr`] against Arrow
6//! batches. It supports streaming accumulation with overflow checks and COUNT DISTINCT tracking for
7//! a subset of scalar types.
8use arrow::array::{
9    Array, ArrayRef, BooleanArray, Date32Array, Float64Array, Float64Builder, Int64Array,
10    Int64Builder, RecordBatch, StringArray,
11};
12use arrow::datatypes::{DataType, Field};
13use llkv_result::Error;
14use llkv_types::FieldId;
15use rustc_hash::FxHashSet;
16use std::sync::Arc;
17use std::{cmp::Ordering, convert::TryFrom};
18
19pub use llkv_plan::{AggregateExpr, AggregateFunction};
20
21/// Result type alias for aggregation routines.
22pub type AggregateResult<T> = Result<T, Error>;
23
24/// Specification for an aggregate operation.
25#[derive(Clone)]
26pub struct AggregateSpec {
27    pub alias: String,
28    pub kind: AggregateKind,
29}
30
31/// Type of aggregate operation.
32#[derive(Clone)]
33pub enum AggregateKind {
34    Count {
35        field_id: Option<FieldId>,
36        distinct: bool,
37    },
38    Sum {
39        field_id: FieldId,
40        data_type: DataType,
41        distinct: bool,
42    },
43    Total {
44        field_id: FieldId,
45        data_type: DataType,
46        distinct: bool,
47    },
48    Avg {
49        field_id: FieldId,
50        data_type: DataType,
51        distinct: bool,
52    },
53    Min {
54        field_id: FieldId,
55        data_type: DataType,
56    },
57    Max {
58        field_id: FieldId,
59        data_type: DataType,
60    },
61    CountNulls {
62        field_id: FieldId,
63    },
64    GroupConcat {
65        field_id: FieldId,
66        distinct: bool,
67        separator: String,
68    },
69}
70
71impl AggregateKind {
72    /// Returns the field ID referenced by this aggregate, if any.
73    pub fn field_id(&self) -> Option<FieldId> {
74        match self {
75            AggregateKind::Count { field_id, .. } => *field_id,
76            AggregateKind::Sum { field_id, .. }
77            | AggregateKind::Total { field_id, .. }
78            | AggregateKind::Avg { field_id, .. }
79            | AggregateKind::Min { field_id, .. }
80            | AggregateKind::Max { field_id, .. }
81            | AggregateKind::CountNulls { field_id }
82            | AggregateKind::GroupConcat { field_id, .. } => Some(*field_id),
83        }
84    }
85}
86
87/// Runtime state for an aggregate computation.
88pub struct AggregateState {
89    pub alias: String,
90    pub accumulator: AggregateAccumulator,
91    pub override_value: Option<i64>,
92}
93
94/// Accumulator for incremental aggregate computation.
95pub enum AggregateAccumulator {
96    CountStar {
97        value: i64,
98    },
99    CountColumn {
100        column_index: usize,
101        value: i64,
102    },
103    CountDistinctColumn {
104        column_index: usize,
105        seen: FxHashSet<DistinctKey>,
106    },
107    SumInt64 {
108        column_index: usize,
109        value: Option<i64>, // None = overflow, Some = current sum or initial state
110        has_values: bool,   // Track whether we've seen any values
111    },
112    SumDistinctInt64 {
113        column_index: usize,
114        sum: Option<i64>, // None = overflow
115        seen: FxHashSet<DistinctKey>,
116    },
117    SumFloat64 {
118        column_index: usize,
119        value: f64,
120        saw_value: bool,
121    },
122    SumDistinctFloat64 {
123        column_index: usize,
124        sum: f64,
125        seen: FxHashSet<DistinctKey>,
126    },
127    TotalInt64 {
128        column_index: usize,
129        value: f64, // TOTAL always returns float to avoid overflow
130    },
131    TotalDistinctInt64 {
132        column_index: usize,
133        sum: f64, // TOTAL always returns float to avoid overflow
134        seen: FxHashSet<DistinctKey>,
135    },
136    TotalFloat64 {
137        column_index: usize,
138        value: f64,
139    },
140    TotalDistinctFloat64 {
141        column_index: usize,
142        sum: f64,
143        seen: FxHashSet<DistinctKey>,
144    },
145    AvgInt64 {
146        column_index: usize,
147        sum: i64,
148        count: i64,
149    },
150    AvgDistinctInt64 {
151        column_index: usize,
152        sum: i64,
153        seen: FxHashSet<DistinctKey>,
154    },
155    AvgFloat64 {
156        column_index: usize,
157        sum: f64,
158        count: i64,
159    },
160    AvgDistinctFloat64 {
161        column_index: usize,
162        sum: f64,
163        seen: FxHashSet<DistinctKey>,
164    },
165    SumDecimal128 {
166        column_index: usize,
167        sum: i128,
168        precision: u8,
169        scale: i8,
170    },
171    SumDistinctDecimal128 {
172        column_index: usize,
173        sum: i128,
174        seen: FxHashSet<DistinctKey>,
175        precision: u8,
176        scale: i8,
177    },
178    TotalDecimal128 {
179        column_index: usize,
180        sum: i128,
181        precision: u8,
182        scale: i8,
183    },
184    TotalDistinctDecimal128 {
185        column_index: usize,
186        sum: i128,
187        seen: FxHashSet<DistinctKey>,
188        precision: u8,
189        scale: i8,
190    },
191    AvgDecimal128 {
192        column_index: usize,
193        sum: i128,
194        count: i64,
195        precision: u8,
196        scale: i8,
197    },
198    AvgDistinctDecimal128 {
199        column_index: usize,
200        sum: i128,
201        seen: FxHashSet<DistinctKey>,
202        precision: u8,
203        scale: i8,
204    },
205    MinInt64 {
206        column_index: usize,
207        value: Option<i64>,
208    },
209    MinFloat64 {
210        column_index: usize,
211        value: Option<f64>,
212    },
213    MinDecimal128 {
214        column_index: usize,
215        value: Option<i128>,
216        precision: u8,
217        scale: i8,
218    },
219    MaxInt64 {
220        column_index: usize,
221        value: Option<i64>,
222    },
223    MaxFloat64 {
224        column_index: usize,
225        value: Option<f64>,
226    },
227    MaxDecimal128 {
228        column_index: usize,
229        value: Option<i128>,
230        precision: u8,
231        scale: i8,
232    },
233    CountNulls {
234        column_index: usize,
235        non_null_rows: i64,
236        total_rows_seen: i64,
237    },
238    GroupConcat {
239        column_index: usize,
240        values: Vec<String>,
241        separator: String,
242    },
243    GroupConcatDistinct {
244        column_index: usize,
245        seen: FxHashSet<String>,
246        values: Vec<String>,
247        separator: String,
248    },
249}
250
251#[derive(Hash, Eq, PartialEq, Clone)]
252pub enum DistinctKey {
253    Int(i64),
254    Float(u64),
255    Str(String),
256    Bool(bool),
257    Date(i32),
258    Decimal(i128), // Store raw decimal value for exact comparison
259}
260
261impl DistinctKey {
262    fn from_array(array: &ArrayRef, index: usize) -> AggregateResult<Self> {
263        match array.data_type() {
264            DataType::Int64 => {
265                let values = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
266                    Error::InvalidArgumentError(
267                        "COUNT(DISTINCT) expected an INT64 column in execution".into(),
268                    )
269                })?;
270                Ok(DistinctKey::Int(values.value(index)))
271            }
272            DataType::Float64 => {
273                let values = array
274                    .as_any()
275                    .downcast_ref::<Float64Array>()
276                    .ok_or_else(|| {
277                        Error::InvalidArgumentError(
278                            "COUNT(DISTINCT) expected a FLOAT64 column in execution".into(),
279                        )
280                    })?;
281                Ok(DistinctKey::Float(values.value(index).to_bits()))
282            }
283            DataType::Utf8 => {
284                let values = array
285                    .as_any()
286                    .downcast_ref::<StringArray>()
287                    .ok_or_else(|| {
288                        Error::InvalidArgumentError(
289                            "COUNT(DISTINCT) expected a UTF8 column in execution".into(),
290                        )
291                    })?;
292                Ok(DistinctKey::Str(values.value(index).to_owned()))
293            }
294            DataType::Boolean => {
295                let values = array
296                    .as_any()
297                    .downcast_ref::<BooleanArray>()
298                    .ok_or_else(|| {
299                        Error::InvalidArgumentError(
300                            "COUNT(DISTINCT) expected a BOOLEAN column in execution".into(),
301                        )
302                    })?;
303                Ok(DistinctKey::Bool(values.value(index)))
304            }
305            DataType::Date32 => {
306                let values = array
307                    .as_any()
308                    .downcast_ref::<Date32Array>()
309                    .ok_or_else(|| {
310                        Error::InvalidArgumentError(
311                            "COUNT(DISTINCT) expected a DATE32 column in execution".into(),
312                        )
313                    })?;
314                Ok(DistinctKey::Date(values.value(index)))
315            }
316            DataType::Decimal128(_, _) => {
317                let values = array
318                    .as_any()
319                    .downcast_ref::<arrow::array::Decimal128Array>()
320                    .ok_or_else(|| {
321                        Error::InvalidArgumentError(
322                            "COUNT(DISTINCT) expected a DECIMAL128 column in execution".into(),
323                        )
324                    })?;
325                Ok(DistinctKey::Decimal(values.value(index)))
326            }
327            // Null type can occur when all values are NULL - treat as Int for accumulator purposes
328            // The actual value will always be NULL and won't be counted as distinct
329            DataType::Null => Ok(DistinctKey::Int(0)),
330            other => Err(Error::InvalidArgumentError(format!(
331                "COUNT(DISTINCT) is not supported for column type {other:?}"
332            ))),
333        }
334    }
335}
336
337/// Helper function to convert an array value to a string representation.
338///
339/// # Arguments
340///
341/// - `array`: The array to extract the value from
342/// - `index`: The row index
343///
344/// # Errors
345///
346/// Returns an error if the type is unsupported or conversion fails.
347fn array_value_to_string(array: &ArrayRef, index: usize) -> AggregateResult<String> {
348    match array.data_type() {
349        DataType::Utf8 => {
350            let arr = array
351                .as_any()
352                .downcast_ref::<StringArray>()
353                .ok_or_else(|| Error::InvalidArgumentError("Expected String array".into()))?;
354            Ok(arr.value(index).to_string())
355        }
356        DataType::Int64 => {
357            let arr = array
358                .as_any()
359                .downcast_ref::<Int64Array>()
360                .ok_or_else(|| Error::InvalidArgumentError("Expected Int64 array".into()))?;
361            Ok(arr.value(index).to_string())
362        }
363        DataType::Float64 => {
364            let arr = array
365                .as_any()
366                .downcast_ref::<Float64Array>()
367                .ok_or_else(|| Error::InvalidArgumentError("Expected Float64 array".into()))?;
368            Ok(arr.value(index).to_string())
369        }
370        DataType::Boolean => {
371            let arr = array
372                .as_any()
373                .downcast_ref::<BooleanArray>()
374                .ok_or_else(|| Error::InvalidArgumentError("Expected Boolean array".into()))?;
375            Ok(if arr.value(index) {
376                "1".to_string()
377            } else {
378                "0".to_string()
379            })
380        }
381        other => Err(Error::InvalidArgumentError(format!(
382            "group_concat does not support column type {other:?}"
383        ))),
384    }
385}
386
387/// Helper function to extract a numeric f64 value from an array with SQLite-style type coercion.
388///
389/// SQLite behavior: String and BLOB values that do not look like numbers are interpreted as 0.
390/// This function implements that coercion for SUM and AVG operations on string columns.
391///
392/// # Arguments
393///
394/// - `array`: The array to extract the value from
395/// - `index`: The row index
396///
397/// # Returns
398///
399/// Returns the numeric value as f64. Non-numeric strings return 0.0.
400fn array_value_to_numeric(array: &ArrayRef, index: usize) -> AggregateResult<f64> {
401    match array.data_type() {
402        DataType::Int64 => {
403            let arr = array
404                .as_any()
405                .downcast_ref::<Int64Array>()
406                .ok_or_else(|| Error::InvalidArgumentError("Expected Int64 array".into()))?;
407            Ok(arr.value(index) as f64)
408        }
409        DataType::Float64 => {
410            let arr = array
411                .as_any()
412                .downcast_ref::<Float64Array>()
413                .ok_or_else(|| Error::InvalidArgumentError("Expected Float64 array".into()))?;
414            Ok(arr.value(index))
415        }
416        DataType::Decimal128(_, scale) => {
417            let arr = array
418                .as_any()
419                .downcast_ref::<arrow::array::Decimal128Array>()
420                .ok_or_else(|| Error::InvalidArgumentError("Expected Decimal128 array".into()))?;
421            let value_i128 = arr.value(index);
422            // Convert decimal to f64: value / 10^scale
423            let scale_factor = 10_f64.powi(*scale as i32);
424            Ok(value_i128 as f64 / scale_factor)
425        }
426        DataType::Utf8 => {
427            let arr = array
428                .as_any()
429                .downcast_ref::<StringArray>()
430                .ok_or_else(|| Error::InvalidArgumentError("Expected String array".into()))?;
431            let s = arr.value(index);
432            // SQLite behavior: try to parse as number, if it fails use 0.0
433            Ok(s.trim().parse::<f64>().unwrap_or(0.0))
434        }
435        DataType::Boolean => {
436            let arr = array
437                .as_any()
438                .downcast_ref::<BooleanArray>()
439                .ok_or_else(|| Error::InvalidArgumentError("Expected Boolean array".into()))?;
440            Ok(if arr.value(index) { 1.0 } else { 0.0 })
441        }
442        // Null type can occur when all values are NULL - return 0.0 as default
443        // (though these values won't actually be accumulated since they're NULL)
444        DataType::Null => Ok(0.0),
445        other => Err(Error::InvalidArgumentError(format!(
446            "Numeric coercion not supported for column type {other:?}"
447        ))),
448    }
449}
450
451impl AggregateAccumulator {
452    /// Creates an accumulator using the projection index from a batch.
453    ///
454    /// # Arguments
455    ///
456    /// - `spec`: Aggregate specification defining the operation type
457    /// - `projection_idx`: Column position in the batch; `None` for `CountStar`
458    /// - `_total_rows_hint`: Unused optimization hint for row count
459    ///
460    /// # Errors
461    ///
462    /// Returns an error if the aggregate kind requires a projection index but none is provided.
463    pub fn new_with_projection_index(
464        spec: &AggregateSpec,
465        projection_idx: Option<usize>,
466        _total_rows_hint: Option<i64>,
467    ) -> AggregateResult<Self> {
468        match &spec.kind {
469            AggregateKind::Count { field_id, distinct } => {
470                if field_id.is_none() {
471                    return Ok(AggregateAccumulator::CountStar { value: 0 });
472                }
473                let idx = projection_idx.ok_or_else(|| {
474                    Error::Internal("Count aggregate requires projection index".into())
475                })?;
476                if *distinct {
477                    Ok(AggregateAccumulator::CountDistinctColumn {
478                        column_index: idx,
479                        seen: FxHashSet::default(),
480                    })
481                } else {
482                    Ok(AggregateAccumulator::CountColumn {
483                        column_index: idx,
484                        value: 0,
485                    })
486                }
487            }
488            AggregateKind::Sum {
489                data_type,
490                distinct,
491                ..
492            } => {
493                let idx = projection_idx.ok_or_else(|| {
494                    Error::Internal("Sum aggregate requires projection index".into())
495                })?;
496                match (data_type, *distinct) {
497                    (&DataType::Int64, true) => Ok(AggregateAccumulator::SumDistinctInt64 {
498                        column_index: idx,
499                        sum: Some(0),
500                        seen: FxHashSet::default(),
501                    }),
502                    (&DataType::Int64, false) => Ok(AggregateAccumulator::SumInt64 {
503                        column_index: idx,
504                        value: Some(0),
505                        has_values: false,
506                    }),
507                    (&DataType::Decimal128(precision, scale), true) => {
508                        Ok(AggregateAccumulator::SumDistinctDecimal128 {
509                            column_index: idx,
510                            sum: 0,
511                            seen: FxHashSet::default(),
512                            precision,
513                            scale,
514                        })
515                    }
516                    (&DataType::Decimal128(precision, scale), false) => {
517                        Ok(AggregateAccumulator::SumDecimal128 {
518                            column_index: idx,
519                            sum: 0,
520                            precision,
521                            scale,
522                        })
523                    }
524                    // For Float64 and Utf8, use Float64 accumulator with numeric coercion
525                    (&DataType::Float64, true) | (&DataType::Utf8, true) => {
526                        Ok(AggregateAccumulator::SumDistinctFloat64 {
527                            column_index: idx,
528                            sum: 0.0,
529                            seen: FxHashSet::default(),
530                        })
531                    }
532                    (&DataType::Float64, false) | (&DataType::Utf8, false) => {
533                        Ok(AggregateAccumulator::SumFloat64 {
534                            column_index: idx,
535                            value: 0.0,
536                            saw_value: false,
537                        })
538                    }
539                    other => Err(Error::InvalidArgumentError(format!(
540                        "SUM aggregate not supported for column type {:?}",
541                        other.0
542                    ))),
543                }
544            }
545            AggregateKind::Total {
546                data_type,
547                distinct,
548                ..
549            } => {
550                let idx = projection_idx.ok_or_else(|| {
551                    Error::Internal("Total aggregate requires projection index".into())
552                })?;
553                match (data_type, *distinct) {
554                    (&DataType::Int64, true) => Ok(AggregateAccumulator::TotalDistinctInt64 {
555                        column_index: idx,
556                        sum: 0.0,
557                        seen: FxHashSet::default(),
558                    }),
559                    (&DataType::Int64, false) => Ok(AggregateAccumulator::TotalInt64 {
560                        column_index: idx,
561                        value: 0.0,
562                    }),
563                    (&DataType::Decimal128(precision, scale), true) => {
564                        Ok(AggregateAccumulator::TotalDistinctDecimal128 {
565                            column_index: idx,
566                            sum: 0,
567                            seen: FxHashSet::default(),
568                            precision,
569                            scale,
570                        })
571                    }
572                    (&DataType::Decimal128(precision, scale), false) => {
573                        Ok(AggregateAccumulator::TotalDecimal128 {
574                            column_index: idx,
575                            sum: 0,
576                            precision,
577                            scale,
578                        })
579                    }
580                    // For Float64 and Utf8, use Float64 accumulator with numeric coercion
581                    (&DataType::Float64, true) | (&DataType::Utf8, true) => {
582                        Ok(AggregateAccumulator::TotalDistinctFloat64 {
583                            column_index: idx,
584                            sum: 0.0,
585                            seen: FxHashSet::default(),
586                        })
587                    }
588                    (&DataType::Float64, false) | (&DataType::Utf8, false) => {
589                        Ok(AggregateAccumulator::TotalFloat64 {
590                            column_index: idx,
591                            value: 0.0,
592                        })
593                    }
594                    other => Err(Error::InvalidArgumentError(format!(
595                        "TOTAL aggregate not supported for column type {:?}",
596                        other.0
597                    ))),
598                }
599            }
600            AggregateKind::Avg {
601                data_type,
602                distinct,
603                ..
604            } => {
605                let idx = projection_idx.ok_or_else(|| {
606                    Error::Internal("Avg aggregate requires projection index".into())
607                })?;
608                match (data_type, *distinct) {
609                    (&DataType::Int64, true) => Ok(AggregateAccumulator::AvgDistinctInt64 {
610                        column_index: idx,
611                        sum: 0,
612                        seen: FxHashSet::default(),
613                    }),
614                    (&DataType::Int64, false) => Ok(AggregateAccumulator::AvgInt64 {
615                        column_index: idx,
616                        sum: 0,
617                        count: 0,
618                    }),
619                    (&DataType::Decimal128(precision, scale), true) => {
620                        Ok(AggregateAccumulator::AvgDistinctDecimal128 {
621                            column_index: idx,
622                            sum: 0,
623                            seen: FxHashSet::default(),
624                            precision,
625                            scale,
626                        })
627                    }
628                    (&DataType::Decimal128(precision, scale), false) => {
629                        Ok(AggregateAccumulator::AvgDecimal128 {
630                            column_index: idx,
631                            sum: 0,
632                            count: 0,
633                            precision,
634                            scale,
635                        })
636                    }
637                    // For Float64 and Utf8, use Float64 accumulator with numeric coercion
638                    (&DataType::Float64, true) | (&DataType::Utf8, true) => {
639                        Ok(AggregateAccumulator::AvgDistinctFloat64 {
640                            column_index: idx,
641                            sum: 0.0,
642                            seen: FxHashSet::default(),
643                        })
644                    }
645                    (&DataType::Float64, false) | (&DataType::Utf8, false) => {
646                        Ok(AggregateAccumulator::AvgFloat64 {
647                            column_index: idx,
648                            sum: 0.0,
649                            count: 0,
650                        })
651                    }
652                    other => Err(Error::InvalidArgumentError(format!(
653                        "AVG aggregate not supported for column type {:?}",
654                        other.0
655                    ))),
656                }
657            }
658            AggregateKind::Min { data_type, .. } => {
659                let idx = projection_idx.ok_or_else(|| {
660                    Error::Internal("Min aggregate requires projection index".into())
661                })?;
662                match data_type {
663                    &DataType::Int64 => Ok(AggregateAccumulator::MinInt64 {
664                        column_index: idx,
665                        value: None,
666                    }),
667                    &DataType::Decimal128(precision, scale) => {
668                        Ok(AggregateAccumulator::MinDecimal128 {
669                            column_index: idx,
670                            value: None,
671                            precision,
672                            scale,
673                        })
674                    }
675                    // For Float64 and Utf8, use Float64 accumulator with numeric coercion
676                    &DataType::Float64 | &DataType::Utf8 => Ok(AggregateAccumulator::MinFloat64 {
677                        column_index: idx,
678                        value: None,
679                    }),
680                    other => Err(Error::InvalidArgumentError(format!(
681                        "MIN aggregate not supported for column type {:?}",
682                        other
683                    ))),
684                }
685            }
686            AggregateKind::Max { data_type, .. } => {
687                let idx = projection_idx.ok_or_else(|| {
688                    Error::Internal("Max aggregate requires projection index".into())
689                })?;
690                match data_type {
691                    &DataType::Int64 => Ok(AggregateAccumulator::MaxInt64 {
692                        column_index: idx,
693                        value: None,
694                    }),
695                    &DataType::Decimal128(precision, scale) => {
696                        Ok(AggregateAccumulator::MaxDecimal128 {
697                            column_index: idx,
698                            value: None,
699                            precision,
700                            scale,
701                        })
702                    }
703                    // For Float64 and Utf8, use Float64 accumulator with numeric coercion
704                    &DataType::Float64 | &DataType::Utf8 => Ok(AggregateAccumulator::MaxFloat64 {
705                        column_index: idx,
706                        value: None,
707                    }),
708                    other => Err(Error::InvalidArgumentError(format!(
709                        "MAX aggregate not supported for column type {:?}",
710                        other
711                    ))),
712                }
713            }
714            AggregateKind::CountNulls { .. } => {
715                let idx = projection_idx.ok_or_else(|| {
716                    Error::Internal("CountNulls aggregate requires projection index".into())
717                })?;
718                Ok(AggregateAccumulator::CountNulls {
719                    column_index: idx,
720                    non_null_rows: 0,
721                    total_rows_seen: 0,
722                })
723            }
724            AggregateKind::GroupConcat {
725                distinct,
726                separator,
727                ..
728            } => {
729                let idx = projection_idx.ok_or_else(|| {
730                    Error::Internal("GroupConcat aggregate requires projection index".into())
731                })?;
732                if *distinct {
733                    Ok(AggregateAccumulator::GroupConcatDistinct {
734                        column_index: idx,
735                        seen: FxHashSet::default(),
736                        values: Vec::new(),
737                        separator: separator.clone(),
738                    })
739                } else {
740                    Ok(AggregateAccumulator::GroupConcat {
741                        column_index: idx,
742                        values: Vec::new(),
743                        separator: separator.clone(),
744                    })
745                }
746            }
747        }
748    }
749
750    /// Updates the accumulator with values from a new batch.
751    ///
752    /// # Arguments
753    ///
754    /// - `batch`: RecordBatch containing the column data to aggregate
755    ///
756    /// # Errors
757    ///
758    /// Returns an error if column types mismatch, overflow occurs, or downcast fails.
759    pub fn update(&mut self, batch: &RecordBatch) -> AggregateResult<()> {
760        match self {
761            AggregateAccumulator::CountStar { value } => {
762                let rows = i64::try_from(batch.num_rows()).map_err(|_| {
763                    Error::InvalidArgumentError("COUNT result exceeds i64 range".into())
764                })?;
765                *value = value.checked_add(rows).ok_or_else(|| {
766                    Error::InvalidArgumentError("COUNT result exceeds i64 range".into())
767                })?;
768            }
769            AggregateAccumulator::CountColumn {
770                column_index,
771                value,
772            } => {
773                let array = batch.column(*column_index);
774                // Skip accumulation for Null-type columns - all values are implicitly NULL
775                // COUNT(NULL column) should be 0, not the row count
776                if matches!(array.data_type(), DataType::Null) {
777                    return Ok(());
778                }
779                let non_null = (0..array.len()).filter(|idx| array.is_valid(*idx)).count();
780                let non_null = i64::try_from(non_null).map_err(|_| {
781                    Error::InvalidArgumentError("COUNT result exceeds i64 range".into())
782                })?;
783                *value = value.checked_add(non_null).ok_or_else(|| {
784                    Error::InvalidArgumentError("COUNT result exceeds i64 range".into())
785                })?;
786            }
787            AggregateAccumulator::CountDistinctColumn { column_index, seen } => {
788                let array = batch.column(*column_index);
789                // Skip accumulation for Null-type columns - all values are implicitly NULL
790                if matches!(array.data_type(), DataType::Null) {
791                    return Ok(());
792                }
793                for i in 0..array.len() {
794                    if !array.is_valid(i) {
795                        continue;
796                    }
797                    let value = DistinctKey::from_array(array, i)?;
798                    seen.insert(value);
799                }
800            }
801            AggregateAccumulator::SumInt64 {
802                column_index,
803                value,
804                has_values,
805            } => {
806                let array = batch.column(*column_index);
807                // Skip accumulation for Null-type columns - all values are implicitly NULL
808                if matches!(array.data_type(), DataType::Null) {
809                    return Ok(());
810                }
811                let array = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
812                    Error::InvalidArgumentError(
813                        "SUM aggregate expected an INT column in execution".into(),
814                    )
815                })?;
816                for i in 0..array.len() {
817                    if array.is_valid(i) {
818                        *has_values = true;
819                        let v = array.value(i);
820                        *value = match *value {
821                            Some(current) => Some(current.checked_add(v).ok_or_else(|| {
822                                Error::InvalidArgumentError("integer overflow".into())
823                            })?),
824                            None => {
825                                return Err(Error::InvalidArgumentError("integer overflow".into()));
826                            }
827                        };
828                    }
829                }
830            }
831            AggregateAccumulator::SumDistinctInt64 {
832                column_index,
833                sum,
834                seen,
835            } => {
836                let array = batch.column(*column_index);
837                // Skip accumulation for Null-type columns - all values are implicitly NULL
838                if matches!(array.data_type(), DataType::Null) {
839                    return Ok(());
840                }
841                let array = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
842                    Error::InvalidArgumentError(
843                        "SUM(DISTINCT) aggregate expected an INT column in execution".into(),
844                    )
845                })?;
846                for i in 0..array.len() {
847                    if array.is_valid(i) {
848                        let col_array = batch.column(*column_index);
849                        let key = DistinctKey::from_array(col_array, i)?;
850                        if seen.insert(key.clone()) {
851                            // Only add to sum if we haven't seen this value before
852                            if let DistinctKey::Int(v) = key {
853                                *sum = match *sum {
854                                    Some(current) => {
855                                        Some(current.checked_add(v).ok_or_else(|| {
856                                            Error::InvalidArgumentError("integer overflow".into())
857                                        })?)
858                                    }
859                                    None => {
860                                        return Err(Error::InvalidArgumentError(
861                                            "integer overflow".into(),
862                                        ));
863                                    }
864                                };
865                            }
866                        }
867                    }
868                }
869            }
870            AggregateAccumulator::SumFloat64 {
871                column_index,
872                value,
873                saw_value,
874            } => {
875                let column = batch.column(*column_index);
876                // Skip accumulation for Null-type columns - all values are implicitly NULL
877                if matches!(column.data_type(), DataType::Null) {
878                    return Ok(());
879                }
880                // Use generic numeric coercion to support Utf8 and other types
881                for i in 0..column.len() {
882                    if column.is_valid(i) {
883                        let v = array_value_to_numeric(column, i)?;
884                        *value += v;
885                        *saw_value = true;
886                    }
887                }
888            }
889            AggregateAccumulator::SumDistinctFloat64 {
890                column_index,
891                sum,
892                seen,
893            } => {
894                let column = batch.column(*column_index);
895                // Skip accumulation for Null-type columns - all values are implicitly NULL
896                if matches!(column.data_type(), DataType::Null) {
897                    return Ok(());
898                }
899                for i in 0..column.len() {
900                    if !column.is_valid(i) {
901                        continue;
902                    }
903                    // Track distinctness based on original value (string, number, etc.)
904                    let key = DistinctKey::from_array(column, i)?;
905                    if seen.insert(key.clone()) {
906                        // Convert to numeric using SQLite-style coercion
907                        let v = match key {
908                            DistinctKey::Float(bits) => f64::from_bits(bits),
909                            DistinctKey::Int(int_val) => int_val as f64,
910                            DistinctKey::Str(_) => array_value_to_numeric(column, i)?,
911                            DistinctKey::Bool(b) => {
912                                if b {
913                                    1.0
914                                } else {
915                                    0.0
916                                }
917                            }
918                            DistinctKey::Date(d) => d as f64,
919                            DistinctKey::Decimal(_) => array_value_to_numeric(column, i)?,
920                        };
921                        *sum += v;
922                    }
923                }
924            }
925            AggregateAccumulator::SumDecimal128 {
926                column_index, sum, ..
927            } => {
928                let column = batch.column(*column_index);
929                let arr = column
930                    .as_any()
931                    .downcast_ref::<arrow::array::Decimal128Array>()
932                    .ok_or_else(|| {
933                        Error::InvalidArgumentError("Expected Decimal128 array".into())
934                    })?;
935                for i in 0..arr.len() {
936                    if arr.is_valid(i) {
937                        *sum = sum.checked_add(arr.value(i)).ok_or_else(|| {
938                            Error::InvalidArgumentError("Decimal128 sum overflow".into())
939                        })?;
940                    }
941                }
942            }
943            AggregateAccumulator::SumDistinctDecimal128 {
944                column_index,
945                sum,
946                seen,
947                ..
948            } => {
949                let column = batch.column(*column_index);
950                let arr = column
951                    .as_any()
952                    .downcast_ref::<arrow::array::Decimal128Array>()
953                    .ok_or_else(|| {
954                        Error::InvalidArgumentError("Expected Decimal128 array".into())
955                    })?;
956                for i in 0..arr.len() {
957                    if !arr.is_valid(i) {
958                        continue;
959                    }
960                    let key = DistinctKey::from_array(column, i)?;
961                    if seen.insert(key) {
962                        *sum = sum.checked_add(arr.value(i)).ok_or_else(|| {
963                            Error::InvalidArgumentError("Decimal128 sum overflow".into())
964                        })?;
965                    }
966                }
967            }
968            AggregateAccumulator::TotalInt64 {
969                column_index,
970                value,
971            } => {
972                let array = batch.column(*column_index);
973                // Skip accumulation for Null-type columns - all values are implicitly NULL
974                if matches!(array.data_type(), DataType::Null) {
975                    return Ok(());
976                }
977                let array = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
978                    Error::InvalidArgumentError(
979                        "TOTAL aggregate expected an INT column in execution".into(),
980                    )
981                })?;
982                for i in 0..array.len() {
983                    if array.is_valid(i) {
984                        let v = array.value(i);
985                        // TOTAL never overflows - accumulate as float
986                        *value += v as f64;
987                    }
988                }
989            }
990            AggregateAccumulator::TotalDistinctInt64 {
991                column_index,
992                sum,
993                seen,
994            } => {
995                let array = batch.column(*column_index);
996                // Skip accumulation for Null-type columns - all values are implicitly NULL
997                if matches!(array.data_type(), DataType::Null) {
998                    return Ok(());
999                }
1000                let array = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
1001                    Error::InvalidArgumentError(
1002                        "TOTAL(DISTINCT) aggregate expected an INT column in execution".into(),
1003                    )
1004                })?;
1005                for i in 0..array.len() {
1006                    if array.is_valid(i) {
1007                        let col_array = batch.column(*column_index);
1008                        let key = DistinctKey::from_array(col_array, i)?;
1009                        if seen.insert(key.clone())
1010                            && let DistinctKey::Int(v) = key
1011                        {
1012                            // TOTAL never overflows - accumulate as float
1013                            *sum += v as f64;
1014                        }
1015                    }
1016                }
1017            }
1018            AggregateAccumulator::TotalFloat64 {
1019                column_index,
1020                value,
1021            } => {
1022                let column = batch.column(*column_index);
1023                // Skip accumulation for Null-type columns - all values are implicitly NULL
1024                if matches!(column.data_type(), DataType::Null) {
1025                    return Ok(());
1026                }
1027                // Use generic numeric coercion to support Utf8 and other types
1028                for i in 0..column.len() {
1029                    if column.is_valid(i) {
1030                        let v = array_value_to_numeric(column, i)?;
1031                        *value += v;
1032                    }
1033                }
1034            }
1035            AggregateAccumulator::TotalDistinctFloat64 {
1036                column_index,
1037                sum,
1038                seen,
1039            } => {
1040                let column = batch.column(*column_index);
1041                // Skip accumulation for Null-type columns - all values are implicitly NULL
1042                if matches!(column.data_type(), DataType::Null) {
1043                    return Ok(());
1044                }
1045                for i in 0..column.len() {
1046                    if !column.is_valid(i) {
1047                        continue;
1048                    }
1049                    // Track distinctness based on original value (string, number, etc.)
1050                    let key = DistinctKey::from_array(column, i)?;
1051                    if seen.insert(key.clone()) {
1052                        // Convert to numeric using SQLite-style coercion
1053                        let v = match key {
1054                            DistinctKey::Float(bits) => f64::from_bits(bits),
1055                            DistinctKey::Int(int_val) => int_val as f64,
1056                            DistinctKey::Str(_) => array_value_to_numeric(column, i)?,
1057                            DistinctKey::Bool(b) => {
1058                                if b {
1059                                    1.0
1060                                } else {
1061                                    0.0
1062                                }
1063                            }
1064                            DistinctKey::Date(d) => d as f64,
1065                            DistinctKey::Decimal(_) => array_value_to_numeric(column, i)?,
1066                        };
1067                        *sum += v;
1068                    }
1069                }
1070            }
1071            AggregateAccumulator::TotalDecimal128 {
1072                column_index, sum, ..
1073            } => {
1074                let column = batch.column(*column_index);
1075                let arr = column
1076                    .as_any()
1077                    .downcast_ref::<arrow::array::Decimal128Array>()
1078                    .ok_or_else(|| {
1079                        Error::InvalidArgumentError("Expected Decimal128 array".into())
1080                    })?;
1081                for i in 0..arr.len() {
1082                    if arr.is_valid(i) {
1083                        *sum = sum.checked_add(arr.value(i)).ok_or_else(|| {
1084                            Error::InvalidArgumentError("Decimal128 total overflow".into())
1085                        })?;
1086                    }
1087                }
1088            }
1089            AggregateAccumulator::TotalDistinctDecimal128 {
1090                column_index,
1091                sum,
1092                seen,
1093                ..
1094            } => {
1095                let column = batch.column(*column_index);
1096                let arr = column
1097                    .as_any()
1098                    .downcast_ref::<arrow::array::Decimal128Array>()
1099                    .ok_or_else(|| {
1100                        Error::InvalidArgumentError("Expected Decimal128 array".into())
1101                    })?;
1102                for i in 0..arr.len() {
1103                    if !arr.is_valid(i) {
1104                        continue;
1105                    }
1106                    let key = DistinctKey::from_array(column, i)?;
1107                    if seen.insert(key) {
1108                        *sum = sum.checked_add(arr.value(i)).ok_or_else(|| {
1109                            Error::InvalidArgumentError("Decimal128 total overflow".into())
1110                        })?;
1111                    }
1112                }
1113            }
1114            AggregateAccumulator::AvgInt64 {
1115                column_index,
1116                sum,
1117                count,
1118            } => {
1119                let array = batch.column(*column_index);
1120                // Skip accumulation for Null-type columns - all values are implicitly NULL
1121                if matches!(array.data_type(), DataType::Null) {
1122                    return Ok(());
1123                }
1124                let array = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
1125                    Error::InvalidArgumentError(
1126                        "AVG aggregate expected an INT column in execution".into(),
1127                    )
1128                })?;
1129                for i in 0..array.len() {
1130                    if array.is_valid(i) {
1131                        let v = array.value(i);
1132                        *sum = sum.checked_add(v).ok_or_else(|| {
1133                            Error::InvalidArgumentError(
1134                                "AVG aggregate sum exceeds i64 range".into(),
1135                            )
1136                        })?;
1137                        *count = count.checked_add(1).ok_or_else(|| {
1138                            Error::InvalidArgumentError(
1139                                "AVG aggregate count exceeds i64 range".into(),
1140                            )
1141                        })?;
1142                    }
1143                }
1144            }
1145            AggregateAccumulator::AvgDistinctInt64 {
1146                column_index,
1147                sum,
1148                seen,
1149            } => {
1150                let array = batch.column(*column_index);
1151                // Skip accumulation for Null-type columns - all values are implicitly NULL
1152                if matches!(array.data_type(), DataType::Null) {
1153                    return Ok(());
1154                }
1155                let array = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
1156                    Error::InvalidArgumentError(
1157                        "AVG(DISTINCT) aggregate expected an INT column in execution".into(),
1158                    )
1159                })?;
1160                for i in 0..array.len() {
1161                    if array.is_valid(i) {
1162                        let col_array = batch.column(*column_index);
1163                        let key = DistinctKey::from_array(col_array, i)?;
1164                        if seen.insert(key.clone()) {
1165                            // Only add to sum if we haven't seen this value before
1166                            if let DistinctKey::Int(v) = key {
1167                                *sum = sum.checked_add(v).ok_or_else(|| {
1168                                    Error::InvalidArgumentError(
1169                                        "AVG(DISTINCT) aggregate sum exceeds i64 range".into(),
1170                                    )
1171                                })?;
1172                            }
1173                        }
1174                    }
1175                }
1176            }
1177            AggregateAccumulator::AvgFloat64 {
1178                column_index,
1179                sum,
1180                count,
1181            } => {
1182                let column = batch.column(*column_index);
1183                // Skip accumulation for Null-type columns - all values are implicitly NULL
1184                if matches!(column.data_type(), DataType::Null) {
1185                    return Ok(());
1186                }
1187                // Use generic numeric coercion to support Utf8 and other types
1188                for i in 0..column.len() {
1189                    if column.is_valid(i) {
1190                        let v = array_value_to_numeric(column, i)?;
1191                        *sum += v;
1192                        *count = count.checked_add(1).ok_or_else(|| {
1193                            Error::InvalidArgumentError(
1194                                "AVG aggregate count exceeds i64 range".into(),
1195                            )
1196                        })?;
1197                    }
1198                }
1199            }
1200            AggregateAccumulator::AvgDistinctFloat64 {
1201                column_index,
1202                sum,
1203                seen,
1204            } => {
1205                let column = batch.column(*column_index);
1206                // Skip accumulation for Null-type columns - all values are implicitly NULL
1207                if matches!(column.data_type(), DataType::Null) {
1208                    return Ok(());
1209                }
1210                for i in 0..column.len() {
1211                    if !column.is_valid(i) {
1212                        continue;
1213                    }
1214                    // Track distinctness based on original value (string, number, etc.)
1215                    let key = DistinctKey::from_array(column, i)?;
1216                    if seen.insert(key.clone()) {
1217                        // Convert to numeric using SQLite-style coercion
1218                        let v = match key {
1219                            DistinctKey::Float(bits) => f64::from_bits(bits),
1220                            DistinctKey::Int(int_val) => int_val as f64,
1221                            DistinctKey::Str(_) => array_value_to_numeric(column, i)?,
1222                            DistinctKey::Bool(b) => {
1223                                if b {
1224                                    1.0
1225                                } else {
1226                                    0.0
1227                                }
1228                            }
1229                            DistinctKey::Date(d) => d as f64,
1230                            DistinctKey::Decimal(_) => array_value_to_numeric(column, i)?,
1231                        };
1232                        *sum += v;
1233                    }
1234                }
1235            }
1236            AggregateAccumulator::AvgDecimal128 {
1237                column_index,
1238                sum,
1239                count,
1240                ..
1241            } => {
1242                let column = batch.column(*column_index);
1243                let arr = column
1244                    .as_any()
1245                    .downcast_ref::<arrow::array::Decimal128Array>()
1246                    .ok_or_else(|| {
1247                        Error::InvalidArgumentError("Expected Decimal128 array".into())
1248                    })?;
1249                for i in 0..arr.len() {
1250                    if arr.is_valid(i) {
1251                        *sum = sum.checked_add(arr.value(i)).ok_or_else(|| {
1252                            Error::InvalidArgumentError("Decimal128 sum overflow".into())
1253                        })?;
1254                        *count = count.checked_add(1).ok_or_else(|| {
1255                            Error::InvalidArgumentError("AVG count overflow".into())
1256                        })?;
1257                    }
1258                }
1259            }
1260            AggregateAccumulator::AvgDistinctDecimal128 {
1261                column_index,
1262                sum,
1263                seen,
1264                ..
1265            } => {
1266                let column = batch.column(*column_index);
1267                let arr = column
1268                    .as_any()
1269                    .downcast_ref::<arrow::array::Decimal128Array>()
1270                    .ok_or_else(|| {
1271                        Error::InvalidArgumentError("Expected Decimal128 array".into())
1272                    })?;
1273                for i in 0..arr.len() {
1274                    if !arr.is_valid(i) {
1275                        continue;
1276                    }
1277                    let key = DistinctKey::from_array(column, i)?;
1278                    if seen.insert(key) {
1279                        *sum = sum.checked_add(arr.value(i)).ok_or_else(|| {
1280                            Error::InvalidArgumentError("Decimal128 sum overflow".into())
1281                        })?;
1282                    }
1283                }
1284            }
1285            AggregateAccumulator::MinInt64 {
1286                column_index,
1287                value,
1288            } => {
1289                let array = batch.column(*column_index);
1290                // Skip accumulation for Null-type columns - all values are implicitly NULL
1291                if matches!(array.data_type(), DataType::Null) {
1292                    return Ok(());
1293                }
1294                let array = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
1295                    Error::InvalidArgumentError(
1296                        "MIN aggregate expected an INT column in execution".into(),
1297                    )
1298                })?;
1299                for i in 0..array.len() {
1300                    if array.is_valid(i) {
1301                        let v = array.value(i);
1302                        *value = Some(match *value {
1303                            Some(current) => current.min(v),
1304                            None => v,
1305                        });
1306                    }
1307                }
1308            }
1309            AggregateAccumulator::MinFloat64 {
1310                column_index,
1311                value,
1312            } => {
1313                let column = batch.column(*column_index);
1314                // Skip accumulation for Null-type columns - all values are implicitly NULL
1315                if matches!(column.data_type(), DataType::Null) {
1316                    return Ok(());
1317                }
1318                // Use generic numeric coercion to support Utf8 and other types
1319                for i in 0..column.len() {
1320                    if column.is_valid(i) {
1321                        let v = array_value_to_numeric(column, i)?;
1322                        *value = Some(match *value {
1323                            Some(current) => match v.partial_cmp(&current) {
1324                                Some(Ordering::Less) => v,
1325                                _ => current,
1326                            },
1327                            None => v,
1328                        });
1329                    }
1330                }
1331            }
1332            AggregateAccumulator::MinDecimal128 {
1333                column_index,
1334                value,
1335                ..
1336            } => {
1337                let column = batch.column(*column_index);
1338                let arr = column
1339                    .as_any()
1340                    .downcast_ref::<arrow::array::Decimal128Array>()
1341                    .ok_or_else(|| {
1342                        Error::InvalidArgumentError("Expected Decimal128 array".into())
1343                    })?;
1344                for i in 0..arr.len() {
1345                    if arr.is_valid(i) {
1346                        let v = arr.value(i);
1347                        *value = Some(match *value {
1348                            Some(current) => current.min(v),
1349                            None => v,
1350                        });
1351                    }
1352                }
1353            }
1354            AggregateAccumulator::MaxInt64 {
1355                column_index,
1356                value,
1357            } => {
1358                let array = batch.column(*column_index);
1359                if matches!(array.data_type(), DataType::Null) {
1360                    return Ok(());
1361                }
1362                let array = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
1363                    Error::InvalidArgumentError(
1364                        "MAX aggregate expected an INT column in execution".into(),
1365                    )
1366                })?;
1367                for i in 0..array.len() {
1368                    if array.is_valid(i) {
1369                        let v = array.value(i);
1370                        *value = Some(match *value {
1371                            Some(current) => current.max(v),
1372                            None => v,
1373                        });
1374                    }
1375                }
1376            }
1377            AggregateAccumulator::MaxFloat64 {
1378                column_index,
1379                value,
1380            } => {
1381                let column = batch.column(*column_index);
1382                // Skip accumulation for Null-type columns - all values are implicitly NULL
1383                if matches!(column.data_type(), DataType::Null) {
1384                    return Ok(());
1385                }
1386                // Use generic numeric coercion to support Utf8 and other types
1387                for i in 0..column.len() {
1388                    if column.is_valid(i) {
1389                        let v = array_value_to_numeric(column, i)?;
1390                        *value = Some(match *value {
1391                            Some(current) => match v.partial_cmp(&current) {
1392                                Some(Ordering::Greater) => v,
1393                                _ => current,
1394                            },
1395                            None => v,
1396                        });
1397                    }
1398                }
1399            }
1400            AggregateAccumulator::MaxDecimal128 {
1401                column_index,
1402                value,
1403                ..
1404            } => {
1405                let column = batch.column(*column_index);
1406                let arr = column
1407                    .as_any()
1408                    .downcast_ref::<arrow::array::Decimal128Array>()
1409                    .ok_or_else(|| {
1410                        Error::InvalidArgumentError("Expected Decimal128 array".into())
1411                    })?;
1412                for i in 0..arr.len() {
1413                    if arr.is_valid(i) {
1414                        let v = arr.value(i);
1415                        *value = Some(match *value {
1416                            Some(current) => current.max(v),
1417                            None => v,
1418                        });
1419                    }
1420                }
1421            }
1422            AggregateAccumulator::CountNulls {
1423                column_index,
1424                non_null_rows,
1425                total_rows_seen,
1426            } => {
1427                let rows = i64::try_from(batch.num_rows()).map_err(|_| {
1428                    Error::InvalidArgumentError("COUNT result exceeds i64 range".into())
1429                })?;
1430                *total_rows_seen = total_rows_seen.checked_add(rows).ok_or_else(|| {
1431                    Error::InvalidArgumentError("COUNT result exceeds i64 range".into())
1432                })?;
1433
1434                let array = batch.column(*column_index);
1435                // If column is Null type, non_null_rows doesn't increase
1436                if !matches!(array.data_type(), DataType::Null) {
1437                    let non_null = (0..array.len()).filter(|idx| array.is_valid(*idx)).count();
1438                    let non_null = i64::try_from(non_null).map_err(|_| {
1439                        Error::InvalidArgumentError("COUNT result exceeds i64 range".into())
1440                    })?;
1441                    *non_null_rows = non_null_rows.checked_add(non_null).ok_or_else(|| {
1442                        Error::InvalidArgumentError("COUNT result exceeds i64 range".into())
1443                    })?;
1444                }
1445            }
1446            AggregateAccumulator::GroupConcat {
1447                column_index,
1448                values,
1449                separator: _,
1450            } => {
1451                let array = batch.column(*column_index);
1452                for i in 0..array.len() {
1453                    if array.is_valid(i) {
1454                        let str_val = array_value_to_string(array, i)?;
1455                        values.push(str_val);
1456                    }
1457                }
1458            }
1459            AggregateAccumulator::GroupConcatDistinct {
1460                column_index,
1461                seen,
1462                values,
1463                separator: _,
1464            } => {
1465                let array = batch.column(*column_index);
1466                for i in 0..array.len() {
1467                    if array.is_valid(i) {
1468                        let str_val = array_value_to_string(array, i)?;
1469                        if seen.insert(str_val.clone()) {
1470                            values.push(str_val);
1471                        }
1472                    }
1473                }
1474            }
1475        }
1476        Ok(())
1477    }
1478
1479    /// Finalizes the accumulator and produces the resulting field and array.
1480    ///
1481    /// # Returns
1482    ///
1483    /// A tuple containing the output field schema and the computed aggregate value array.
1484    ///
1485    /// # Errors
1486    ///
1487    /// Returns an error if result conversion to i64 fails or arithmetic underflow occurs.
1488    pub fn finalize(self) -> AggregateResult<(Field, ArrayRef)> {
1489        match self {
1490            AggregateAccumulator::CountStar { value } => {
1491                let mut builder = Int64Builder::with_capacity(1);
1492                builder.append_value(value);
1493                let array = Arc::new(builder.finish()) as ArrayRef;
1494                Ok((Field::new("count", DataType::Int64, false), array))
1495            }
1496            AggregateAccumulator::CountColumn { value, .. } => {
1497                let mut builder = Int64Builder::with_capacity(1);
1498                builder.append_value(value);
1499                let array = Arc::new(builder.finish()) as ArrayRef;
1500                Ok((Field::new("count", DataType::Int64, false), array))
1501            }
1502            AggregateAccumulator::CountDistinctColumn { seen, .. } => {
1503                let mut builder = Int64Builder::with_capacity(1);
1504                let count = i64::try_from(seen.len()).map_err(|_| {
1505                    Error::InvalidArgumentError("COUNT(DISTINCT) result exceeds i64 range".into())
1506                })?;
1507                builder.append_value(count);
1508                let array = Arc::new(builder.finish()) as ArrayRef;
1509                Ok((Field::new("count_distinct", DataType::Int64, false), array))
1510            }
1511            AggregateAccumulator::SumInt64 {
1512                value, has_values, ..
1513            } => {
1514                // If overflow occurred (value is None after seeing values), return error
1515                // to match SQLite behavior where integer overflow in SUM throws exception
1516                if has_values && value.is_none() {
1517                    return Err(Error::InvalidArgumentError("integer overflow".into()));
1518                }
1519
1520                let mut builder = Int64Builder::with_capacity(1);
1521                if !has_values {
1522                    builder.append_null(); // No values seen
1523                } else {
1524                    match value {
1525                        Some(v) => builder.append_value(v),
1526                        None => unreachable!(), // Already handled above
1527                    }
1528                }
1529                let array = Arc::new(builder.finish()) as ArrayRef;
1530                Ok((Field::new("sum", DataType::Int64, true), array))
1531            }
1532            AggregateAccumulator::SumDistinctInt64 { sum, seen, .. } => {
1533                let mut builder = Int64Builder::with_capacity(1);
1534                if seen.is_empty() {
1535                    builder.append_null();
1536                } else {
1537                    match sum {
1538                        Some(v) => builder.append_value(v),
1539                        None => builder.append_null(), // Overflow occurred
1540                    }
1541                }
1542                let array = Arc::new(builder.finish()) as ArrayRef;
1543                Ok((Field::new("sum_distinct", DataType::Int64, true), array))
1544            }
1545            AggregateAccumulator::SumFloat64 {
1546                value, saw_value, ..
1547            } => {
1548                let mut builder = Float64Builder::with_capacity(1);
1549                if saw_value {
1550                    builder.append_value(value);
1551                } else {
1552                    builder.append_null();
1553                }
1554                let array = Arc::new(builder.finish()) as ArrayRef;
1555                Ok((Field::new("sum", DataType::Float64, true), array))
1556            }
1557            AggregateAccumulator::SumDistinctFloat64 { sum, seen, .. } => {
1558                let mut builder = Float64Builder::with_capacity(1);
1559                if !seen.is_empty() {
1560                    builder.append_value(sum);
1561                } else {
1562                    builder.append_null();
1563                }
1564                let array = Arc::new(builder.finish()) as ArrayRef;
1565                Ok((Field::new("sum_distinct", DataType::Float64, true), array))
1566            }
1567            AggregateAccumulator::SumDecimal128 {
1568                sum,
1569                precision,
1570                scale,
1571                ..
1572            } => {
1573                let data_type = DataType::Decimal128(precision, scale);
1574                let array = Arc::new(
1575                    arrow::array::Decimal128Array::from(vec![sum])
1576                        .with_precision_and_scale(precision, scale)
1577                        .map_err(|e| {
1578                            Error::InvalidArgumentError(format!("Invalid decimal: {}", e))
1579                        })?,
1580                ) as ArrayRef;
1581                Ok((Field::new("sum", data_type, true), array))
1582            }
1583            AggregateAccumulator::SumDistinctDecimal128 {
1584                sum,
1585                seen,
1586                precision,
1587                scale,
1588                ..
1589            } => {
1590                let data_type = DataType::Decimal128(precision, scale);
1591                let array = if seen.is_empty() {
1592                    Arc::new(
1593                        arrow::array::Decimal128Array::from(vec![Option::<i128>::None])
1594                            .with_precision_and_scale(precision, scale)
1595                            .map_err(|e| {
1596                                Error::InvalidArgumentError(format!("Invalid decimal: {}", e))
1597                            })?,
1598                    ) as ArrayRef
1599                } else {
1600                    Arc::new(
1601                        arrow::array::Decimal128Array::from(vec![sum])
1602                            .with_precision_and_scale(precision, scale)
1603                            .map_err(|e| {
1604                                Error::InvalidArgumentError(format!("Invalid decimal: {}", e))
1605                            })?,
1606                    ) as ArrayRef
1607                };
1608                Ok((Field::new("sum_distinct", data_type, true), array))
1609            }
1610            AggregateAccumulator::TotalInt64 { value, .. } => {
1611                let mut builder = Float64Builder::with_capacity(1);
1612                builder.append_value(value);
1613                let array = Arc::new(builder.finish()) as ArrayRef;
1614                Ok((Field::new("total", DataType::Float64, false), array))
1615            }
1616            AggregateAccumulator::TotalDistinctInt64 { sum, .. } => {
1617                let mut builder = Float64Builder::with_capacity(1);
1618                builder.append_value(sum);
1619                let array = Arc::new(builder.finish()) as ArrayRef;
1620                Ok((
1621                    Field::new("total_distinct", DataType::Float64, false),
1622                    array,
1623                ))
1624            }
1625            AggregateAccumulator::TotalFloat64 { value, .. } => {
1626                let mut builder = Float64Builder::with_capacity(1);
1627                builder.append_value(value);
1628                let array = Arc::new(builder.finish()) as ArrayRef;
1629                Ok((Field::new("total", DataType::Float64, false), array))
1630            }
1631            AggregateAccumulator::TotalDistinctFloat64 { sum, .. } => {
1632                let mut builder = Float64Builder::with_capacity(1);
1633                builder.append_value(sum);
1634                let array = Arc::new(builder.finish()) as ArrayRef;
1635                Ok((
1636                    Field::new("total_distinct", DataType::Float64, false),
1637                    array,
1638                ))
1639            }
1640            AggregateAccumulator::TotalDecimal128 {
1641                sum,
1642                precision,
1643                scale,
1644                ..
1645            } => {
1646                let data_type = DataType::Decimal128(precision, scale);
1647                let array = Arc::new(
1648                    arrow::array::Decimal128Array::from(vec![sum])
1649                        .with_precision_and_scale(precision, scale)
1650                        .map_err(|e| {
1651                            Error::InvalidArgumentError(format!("Invalid decimal: {}", e))
1652                        })?,
1653                ) as ArrayRef;
1654                Ok((Field::new("total", data_type, false), array))
1655            }
1656            AggregateAccumulator::TotalDistinctDecimal128 {
1657                sum,
1658                precision,
1659                scale,
1660                ..
1661            } => {
1662                let data_type = DataType::Decimal128(precision, scale);
1663                let array = Arc::new(
1664                    arrow::array::Decimal128Array::from(vec![sum])
1665                        .with_precision_and_scale(precision, scale)
1666                        .map_err(|e| {
1667                            Error::InvalidArgumentError(format!("Invalid decimal: {}", e))
1668                        })?,
1669                ) as ArrayRef;
1670                Ok((Field::new("total_distinct", data_type, false), array))
1671            }
1672            AggregateAccumulator::AvgInt64 { sum, count, .. } => {
1673                let mut builder = Float64Builder::with_capacity(1);
1674                if count > 0 {
1675                    // Compute average as floating-point for SQL standard compatibility
1676                    let avg = (sum as f64) / (count as f64);
1677                    builder.append_value(avg);
1678                } else {
1679                    builder.append_null();
1680                }
1681                let array = Arc::new(builder.finish()) as ArrayRef;
1682                Ok((Field::new("avg", DataType::Float64, true), array))
1683            }
1684            AggregateAccumulator::AvgDistinctInt64 { sum, seen, .. } => {
1685                let mut builder = Float64Builder::with_capacity(1);
1686                let count = seen.len();
1687                if count > 0 {
1688                    // Compute average as floating-point for SQL standard compatibility
1689                    let avg = (sum as f64) / (count as f64);
1690                    builder.append_value(avg);
1691                } else {
1692                    builder.append_null();
1693                }
1694                let array = Arc::new(builder.finish()) as ArrayRef;
1695                Ok((Field::new("avg_distinct", DataType::Float64, true), array))
1696            }
1697            AggregateAccumulator::AvgFloat64 { sum, count, .. } => {
1698                let mut builder = Float64Builder::with_capacity(1);
1699                if count > 0 {
1700                    let avg = sum / (count as f64);
1701                    builder.append_value(avg);
1702                } else {
1703                    builder.append_null();
1704                }
1705                let array = Arc::new(builder.finish()) as ArrayRef;
1706                Ok((Field::new("avg", DataType::Float64, true), array))
1707            }
1708            AggregateAccumulator::AvgDistinctFloat64 { sum, seen, .. } => {
1709                let mut builder = Float64Builder::with_capacity(1);
1710                let count = seen.len();
1711                if count > 0 {
1712                    let avg = sum / (count as f64);
1713                    builder.append_value(avg);
1714                } else {
1715                    builder.append_null();
1716                }
1717                let array = Arc::new(builder.finish()) as ArrayRef;
1718                Ok((Field::new("avg_distinct", DataType::Float64, true), array))
1719            }
1720            AggregateAccumulator::AvgDecimal128 {
1721                sum,
1722                count,
1723                precision,
1724                scale,
1725                ..
1726            } => {
1727                let data_type = DataType::Decimal128(precision, scale);
1728                let array = if count > 0 {
1729                    // Compute average in decimal space: sum / count
1730                    // Use rounding division instead of truncating division
1731                    let count_i128 = count as i128;
1732                    let mut avg = sum / count_i128;
1733                    let rem = sum % count_i128;
1734
1735                    // Round half away from zero
1736                    if rem.abs() * 2 >= count_i128.abs() {
1737                        if sum.signum() == count_i128.signum() {
1738                            avg += 1;
1739                        } else {
1740                            avg -= 1;
1741                        }
1742                    }
1743
1744                    Arc::new(
1745                        arrow::array::Decimal128Array::from(vec![avg])
1746                            .with_precision_and_scale(precision, scale)
1747                            .map_err(|e| {
1748                                Error::InvalidArgumentError(format!("Invalid decimal: {}", e))
1749                            })?,
1750                    ) as ArrayRef
1751                } else {
1752                    Arc::new(
1753                        arrow::array::Decimal128Array::from(vec![Option::<i128>::None])
1754                            .with_precision_and_scale(precision, scale)
1755                            .map_err(|e| {
1756                                Error::InvalidArgumentError(format!("Invalid decimal: {}", e))
1757                            })?,
1758                    ) as ArrayRef
1759                };
1760                Ok((Field::new("avg", data_type, true), array))
1761            }
1762            AggregateAccumulator::AvgDistinctDecimal128 {
1763                sum,
1764                seen,
1765                precision,
1766                scale,
1767                ..
1768            } => {
1769                let data_type = DataType::Decimal128(precision, scale);
1770                let count = seen.len();
1771                let array = if count > 0 {
1772                    // Compute average in decimal space: sum / count
1773                    // Use rounding division instead of truncating division
1774                    let count_i128 = count as i128;
1775                    let mut avg = sum / count_i128;
1776                    let rem = sum % count_i128;
1777
1778                    // Round half away from zero
1779                    if rem.abs() * 2 >= count_i128.abs() {
1780                        if sum.signum() == count_i128.signum() {
1781                            avg += 1;
1782                        } else {
1783                            avg -= 1;
1784                        }
1785                    }
1786
1787                    Arc::new(
1788                        arrow::array::Decimal128Array::from(vec![avg])
1789                            .with_precision_and_scale(precision, scale)
1790                            .map_err(|e| {
1791                                Error::InvalidArgumentError(format!("Invalid decimal: {}", e))
1792                            })?,
1793                    ) as ArrayRef
1794                } else {
1795                    Arc::new(
1796                        arrow::array::Decimal128Array::from(vec![Option::<i128>::None])
1797                            .with_precision_and_scale(precision, scale)
1798                            .map_err(|e| {
1799                                Error::InvalidArgumentError(format!("Invalid decimal: {}", e))
1800                            })?,
1801                    ) as ArrayRef
1802                };
1803                Ok((Field::new("avg_distinct", data_type, true), array))
1804            }
1805            AggregateAccumulator::MinInt64 { value, .. } => {
1806                let mut builder = Int64Builder::with_capacity(1);
1807                if let Some(v) = value {
1808                    builder.append_value(v);
1809                } else {
1810                    builder.append_null();
1811                }
1812                let array = Arc::new(builder.finish()) as ArrayRef;
1813                Ok((Field::new("min", DataType::Int64, true), array))
1814            }
1815            AggregateAccumulator::MinFloat64 { value, .. } => {
1816                let mut builder = Float64Builder::with_capacity(1);
1817                if let Some(v) = value {
1818                    builder.append_value(v);
1819                } else {
1820                    builder.append_null();
1821                }
1822                let array = Arc::new(builder.finish()) as ArrayRef;
1823                Ok((Field::new("min", DataType::Float64, true), array))
1824            }
1825            AggregateAccumulator::MinDecimal128 {
1826                value,
1827                precision,
1828                scale,
1829                ..
1830            } => {
1831                let data_type = DataType::Decimal128(precision, scale);
1832                let array = match value {
1833                    Some(v) => Arc::new(
1834                        arrow::array::Decimal128Array::from(vec![v])
1835                            .with_precision_and_scale(precision, scale)
1836                            .map_err(|e| {
1837                                Error::InvalidArgumentError(format!("Invalid decimal: {}", e))
1838                            })?,
1839                    ) as ArrayRef,
1840                    None => Arc::new(
1841                        arrow::array::Decimal128Array::from(vec![Option::<i128>::None])
1842                            .with_precision_and_scale(precision, scale)
1843                            .map_err(|e| {
1844                                Error::InvalidArgumentError(format!("Invalid decimal: {}", e))
1845                            })?,
1846                    ) as ArrayRef,
1847                };
1848                Ok((Field::new("min", data_type, true), array))
1849            }
1850            AggregateAccumulator::MaxInt64 { value, .. } => {
1851                let mut builder = Int64Builder::with_capacity(1);
1852                if let Some(v) = value {
1853                    builder.append_value(v);
1854                } else {
1855                    builder.append_null();
1856                }
1857                let array = Arc::new(builder.finish()) as ArrayRef;
1858                Ok((Field::new("max", DataType::Int64, true), array))
1859            }
1860            AggregateAccumulator::MaxFloat64 { value, .. } => {
1861                let mut builder = Float64Builder::with_capacity(1);
1862                if let Some(v) = value {
1863                    builder.append_value(v);
1864                } else {
1865                    builder.append_null();
1866                }
1867                let array = Arc::new(builder.finish()) as ArrayRef;
1868                Ok((Field::new("max", DataType::Float64, true), array))
1869            }
1870            AggregateAccumulator::MaxDecimal128 {
1871                value,
1872                precision,
1873                scale,
1874                ..
1875            } => {
1876                let data_type = DataType::Decimal128(precision, scale);
1877                let array = match value {
1878                    Some(v) => Arc::new(
1879                        arrow::array::Decimal128Array::from(vec![v])
1880                            .with_precision_and_scale(precision, scale)
1881                            .map_err(|e| {
1882                                Error::InvalidArgumentError(format!("Invalid decimal: {}", e))
1883                            })?,
1884                    ) as ArrayRef,
1885                    None => Arc::new(
1886                        arrow::array::Decimal128Array::from(vec![Option::<i128>::None])
1887                            .with_precision_and_scale(precision, scale)
1888                            .map_err(|e| {
1889                                Error::InvalidArgumentError(format!("Invalid decimal: {}", e))
1890                            })?,
1891                    ) as ArrayRef,
1892                };
1893                Ok((Field::new("max", data_type, true), array))
1894            }
1895            AggregateAccumulator::CountNulls {
1896                non_null_rows,
1897                total_rows_seen,
1898                ..
1899            } => {
1900                let nulls = total_rows_seen.checked_sub(non_null_rows).ok_or_else(|| {
1901                    Error::InvalidArgumentError(
1902                        "NULL-count aggregate observed more non-null rows than total rows".into(),
1903                    )
1904                })?;
1905                let mut builder = Int64Builder::with_capacity(1);
1906                builder.append_value(nulls);
1907                let array = Arc::new(builder.finish()) as ArrayRef;
1908                Ok((Field::new("count_nulls", DataType::Int64, false), array))
1909            }
1910            AggregateAccumulator::GroupConcat {
1911                values, separator, ..
1912            } => {
1913                use arrow::array::StringBuilder;
1914                let mut builder = StringBuilder::with_capacity(1, 256);
1915                if values.is_empty() {
1916                    builder.append_null();
1917                } else {
1918                    let result = values.join(&separator);
1919                    builder.append_value(&result);
1920                }
1921                let array = Arc::new(builder.finish()) as ArrayRef;
1922                Ok((Field::new("group_concat", DataType::Utf8, true), array))
1923            }
1924            AggregateAccumulator::GroupConcatDistinct {
1925                values, separator, ..
1926            } => {
1927                use arrow::array::StringBuilder;
1928                let mut builder = StringBuilder::with_capacity(1, 256);
1929                if values.is_empty() {
1930                    builder.append_null();
1931                } else {
1932                    let result = values.join(&separator);
1933                    builder.append_value(&result);
1934                }
1935                let array = Arc::new(builder.finish()) as ArrayRef;
1936                Ok((Field::new("group_concat", DataType::Utf8, true), array))
1937            }
1938        }
1939    }
1940}
1941
1942impl AggregateState {
1943    /// Creates a new aggregate state with the given components.
1944    ///
1945    /// # Arguments
1946    ///
1947    /// - `alias`: Output column name for the aggregate result
1948    /// - `accumulator`: The aggregator instance performing the computation
1949    /// - `override_value`: Optional fixed value to replace the computed result
1950    pub fn new(
1951        alias: String,
1952        accumulator: AggregateAccumulator,
1953        override_value: Option<i64>,
1954    ) -> Self {
1955        Self {
1956            alias,
1957            accumulator,
1958            override_value,
1959        }
1960    }
1961
1962    /// Updates the state with values from a new batch.
1963    ///
1964    /// # Arguments
1965    ///
1966    /// - `batch`: RecordBatch containing the column data to aggregate
1967    ///
1968    /// # Errors
1969    ///
1970    /// Returns an error if the underlying accumulator update fails.
1971    pub fn update(&mut self, batch: &RecordBatch) -> AggregateResult<()> {
1972        self.accumulator.update(batch)
1973    }
1974
1975    /// Finalizes the state and produces the resulting field and array.
1976    ///
1977    /// Applies the alias and override value if present.
1978    ///
1979    /// # Returns
1980    ///
1981    /// A tuple containing the output field schema and the computed aggregate value array.
1982    ///
1983    /// # Errors
1984    ///
1985    /// Returns an error if the underlying accumulator finalization fails.
1986    pub fn finalize(self) -> AggregateResult<(Field, ArrayRef)> {
1987        let (mut field, array) = self.accumulator.finalize()?;
1988        field = field.with_name(self.alias);
1989        if let Some(value) = self.override_value {
1990            let mut builder = Int64Builder::with_capacity(1);
1991            builder.append_value(value);
1992            let array = Arc::new(builder.finish()) as ArrayRef;
1993            return Ok((field, array));
1994        }
1995        Ok((field, array))
1996    }
1997}