1use arrow::array::{
9 Array, ArrayRef, BooleanArray, Date32Array, Float64Array, Float64Builder, Int64Array,
10 Int64Builder, RecordBatch, StringArray,
11};
12use arrow::datatypes::{DataType, Field};
13use llkv_result::Error;
14use llkv_types::FieldId;
15use rustc_hash::FxHashSet;
16use std::sync::Arc;
17use std::{cmp::Ordering, convert::TryFrom};
18
19pub use llkv_plan::{AggregateExpr, AggregateFunction};
20
21pub type AggregateResult<T> = Result<T, Error>;
23
24#[derive(Clone)]
26pub struct AggregateSpec {
27 pub alias: String,
28 pub kind: AggregateKind,
29}
30
31#[derive(Clone)]
33pub enum AggregateKind {
34 Count {
35 field_id: Option<FieldId>,
36 distinct: bool,
37 },
38 Sum {
39 field_id: FieldId,
40 data_type: DataType,
41 distinct: bool,
42 },
43 Total {
44 field_id: FieldId,
45 data_type: DataType,
46 distinct: bool,
47 },
48 Avg {
49 field_id: FieldId,
50 data_type: DataType,
51 distinct: bool,
52 },
53 Min {
54 field_id: FieldId,
55 data_type: DataType,
56 },
57 Max {
58 field_id: FieldId,
59 data_type: DataType,
60 },
61 CountNulls {
62 field_id: FieldId,
63 },
64 GroupConcat {
65 field_id: FieldId,
66 distinct: bool,
67 separator: String,
68 },
69}
70
71impl AggregateKind {
72 pub fn field_id(&self) -> Option<FieldId> {
74 match self {
75 AggregateKind::Count { field_id, .. } => *field_id,
76 AggregateKind::Sum { field_id, .. }
77 | AggregateKind::Total { field_id, .. }
78 | AggregateKind::Avg { field_id, .. }
79 | AggregateKind::Min { field_id, .. }
80 | AggregateKind::Max { field_id, .. }
81 | AggregateKind::CountNulls { field_id }
82 | AggregateKind::GroupConcat { field_id, .. } => Some(*field_id),
83 }
84 }
85}
86
87pub struct AggregateState {
89 pub alias: String,
90 pub accumulator: AggregateAccumulator,
91 pub override_value: Option<i64>,
92}
93
94pub enum AggregateAccumulator {
96 CountStar {
97 value: i64,
98 },
99 CountColumn {
100 column_index: usize,
101 value: i64,
102 },
103 CountDistinctColumn {
104 column_index: usize,
105 seen: FxHashSet<DistinctKey>,
106 },
107 SumInt64 {
108 column_index: usize,
109 value: Option<i64>, has_values: bool, },
112 SumDistinctInt64 {
113 column_index: usize,
114 sum: Option<i64>, seen: FxHashSet<DistinctKey>,
116 },
117 SumFloat64 {
118 column_index: usize,
119 value: f64,
120 saw_value: bool,
121 },
122 SumDistinctFloat64 {
123 column_index: usize,
124 sum: f64,
125 seen: FxHashSet<DistinctKey>,
126 },
127 TotalInt64 {
128 column_index: usize,
129 value: f64, },
131 TotalDistinctInt64 {
132 column_index: usize,
133 sum: f64, seen: FxHashSet<DistinctKey>,
135 },
136 TotalFloat64 {
137 column_index: usize,
138 value: f64,
139 },
140 TotalDistinctFloat64 {
141 column_index: usize,
142 sum: f64,
143 seen: FxHashSet<DistinctKey>,
144 },
145 AvgInt64 {
146 column_index: usize,
147 sum: i64,
148 count: i64,
149 },
150 AvgDistinctInt64 {
151 column_index: usize,
152 sum: i64,
153 seen: FxHashSet<DistinctKey>,
154 },
155 AvgFloat64 {
156 column_index: usize,
157 sum: f64,
158 count: i64,
159 },
160 AvgDistinctFloat64 {
161 column_index: usize,
162 sum: f64,
163 seen: FxHashSet<DistinctKey>,
164 },
165 SumDecimal128 {
166 column_index: usize,
167 sum: i128,
168 precision: u8,
169 scale: i8,
170 },
171 SumDistinctDecimal128 {
172 column_index: usize,
173 sum: i128,
174 seen: FxHashSet<DistinctKey>,
175 precision: u8,
176 scale: i8,
177 },
178 TotalDecimal128 {
179 column_index: usize,
180 sum: i128,
181 precision: u8,
182 scale: i8,
183 },
184 TotalDistinctDecimal128 {
185 column_index: usize,
186 sum: i128,
187 seen: FxHashSet<DistinctKey>,
188 precision: u8,
189 scale: i8,
190 },
191 AvgDecimal128 {
192 column_index: usize,
193 sum: i128,
194 count: i64,
195 precision: u8,
196 scale: i8,
197 },
198 AvgDistinctDecimal128 {
199 column_index: usize,
200 sum: i128,
201 seen: FxHashSet<DistinctKey>,
202 precision: u8,
203 scale: i8,
204 },
205 MinInt64 {
206 column_index: usize,
207 value: Option<i64>,
208 },
209 MinFloat64 {
210 column_index: usize,
211 value: Option<f64>,
212 },
213 MinDecimal128 {
214 column_index: usize,
215 value: Option<i128>,
216 precision: u8,
217 scale: i8,
218 },
219 MaxInt64 {
220 column_index: usize,
221 value: Option<i64>,
222 },
223 MaxFloat64 {
224 column_index: usize,
225 value: Option<f64>,
226 },
227 MaxDecimal128 {
228 column_index: usize,
229 value: Option<i128>,
230 precision: u8,
231 scale: i8,
232 },
233 CountNulls {
234 column_index: usize,
235 non_null_rows: i64,
236 total_rows_seen: i64,
237 },
238 GroupConcat {
239 column_index: usize,
240 values: Vec<String>,
241 separator: String,
242 },
243 GroupConcatDistinct {
244 column_index: usize,
245 seen: FxHashSet<String>,
246 values: Vec<String>,
247 separator: String,
248 },
249}
250
251#[derive(Hash, Eq, PartialEq, Clone)]
252pub enum DistinctKey {
253 Int(i64),
254 Float(u64),
255 Str(String),
256 Bool(bool),
257 Date(i32),
258 Decimal(i128), }
260
261impl DistinctKey {
262 fn from_array(array: &ArrayRef, index: usize) -> AggregateResult<Self> {
263 match array.data_type() {
264 DataType::Int64 => {
265 let values = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
266 Error::InvalidArgumentError(
267 "COUNT(DISTINCT) expected an INT64 column in execution".into(),
268 )
269 })?;
270 Ok(DistinctKey::Int(values.value(index)))
271 }
272 DataType::Float64 => {
273 let values = array
274 .as_any()
275 .downcast_ref::<Float64Array>()
276 .ok_or_else(|| {
277 Error::InvalidArgumentError(
278 "COUNT(DISTINCT) expected a FLOAT64 column in execution".into(),
279 )
280 })?;
281 Ok(DistinctKey::Float(values.value(index).to_bits()))
282 }
283 DataType::Utf8 => {
284 let values = array
285 .as_any()
286 .downcast_ref::<StringArray>()
287 .ok_or_else(|| {
288 Error::InvalidArgumentError(
289 "COUNT(DISTINCT) expected a UTF8 column in execution".into(),
290 )
291 })?;
292 Ok(DistinctKey::Str(values.value(index).to_owned()))
293 }
294 DataType::Boolean => {
295 let values = array
296 .as_any()
297 .downcast_ref::<BooleanArray>()
298 .ok_or_else(|| {
299 Error::InvalidArgumentError(
300 "COUNT(DISTINCT) expected a BOOLEAN column in execution".into(),
301 )
302 })?;
303 Ok(DistinctKey::Bool(values.value(index)))
304 }
305 DataType::Date32 => {
306 let values = array
307 .as_any()
308 .downcast_ref::<Date32Array>()
309 .ok_or_else(|| {
310 Error::InvalidArgumentError(
311 "COUNT(DISTINCT) expected a DATE32 column in execution".into(),
312 )
313 })?;
314 Ok(DistinctKey::Date(values.value(index)))
315 }
316 DataType::Decimal128(_, _) => {
317 let values = array
318 .as_any()
319 .downcast_ref::<arrow::array::Decimal128Array>()
320 .ok_or_else(|| {
321 Error::InvalidArgumentError(
322 "COUNT(DISTINCT) expected a DECIMAL128 column in execution".into(),
323 )
324 })?;
325 Ok(DistinctKey::Decimal(values.value(index)))
326 }
327 DataType::Null => Ok(DistinctKey::Int(0)),
330 other => Err(Error::InvalidArgumentError(format!(
331 "COUNT(DISTINCT) is not supported for column type {other:?}"
332 ))),
333 }
334 }
335}
336
337fn array_value_to_string(array: &ArrayRef, index: usize) -> AggregateResult<String> {
348 match array.data_type() {
349 DataType::Utf8 => {
350 let arr = array
351 .as_any()
352 .downcast_ref::<StringArray>()
353 .ok_or_else(|| Error::InvalidArgumentError("Expected String array".into()))?;
354 Ok(arr.value(index).to_string())
355 }
356 DataType::Int64 => {
357 let arr = array
358 .as_any()
359 .downcast_ref::<Int64Array>()
360 .ok_or_else(|| Error::InvalidArgumentError("Expected Int64 array".into()))?;
361 Ok(arr.value(index).to_string())
362 }
363 DataType::Float64 => {
364 let arr = array
365 .as_any()
366 .downcast_ref::<Float64Array>()
367 .ok_or_else(|| Error::InvalidArgumentError("Expected Float64 array".into()))?;
368 Ok(arr.value(index).to_string())
369 }
370 DataType::Boolean => {
371 let arr = array
372 .as_any()
373 .downcast_ref::<BooleanArray>()
374 .ok_or_else(|| Error::InvalidArgumentError("Expected Boolean array".into()))?;
375 Ok(if arr.value(index) {
376 "1".to_string()
377 } else {
378 "0".to_string()
379 })
380 }
381 other => Err(Error::InvalidArgumentError(format!(
382 "group_concat does not support column type {other:?}"
383 ))),
384 }
385}
386
387fn array_value_to_numeric(array: &ArrayRef, index: usize) -> AggregateResult<f64> {
401 match array.data_type() {
402 DataType::Int64 => {
403 let arr = array
404 .as_any()
405 .downcast_ref::<Int64Array>()
406 .ok_or_else(|| Error::InvalidArgumentError("Expected Int64 array".into()))?;
407 Ok(arr.value(index) as f64)
408 }
409 DataType::Float64 => {
410 let arr = array
411 .as_any()
412 .downcast_ref::<Float64Array>()
413 .ok_or_else(|| Error::InvalidArgumentError("Expected Float64 array".into()))?;
414 Ok(arr.value(index))
415 }
416 DataType::Decimal128(_, scale) => {
417 let arr = array
418 .as_any()
419 .downcast_ref::<arrow::array::Decimal128Array>()
420 .ok_or_else(|| Error::InvalidArgumentError("Expected Decimal128 array".into()))?;
421 let value_i128 = arr.value(index);
422 let scale_factor = 10_f64.powi(*scale as i32);
424 Ok(value_i128 as f64 / scale_factor)
425 }
426 DataType::Utf8 => {
427 let arr = array
428 .as_any()
429 .downcast_ref::<StringArray>()
430 .ok_or_else(|| Error::InvalidArgumentError("Expected String array".into()))?;
431 let s = arr.value(index);
432 Ok(s.trim().parse::<f64>().unwrap_or(0.0))
434 }
435 DataType::Boolean => {
436 let arr = array
437 .as_any()
438 .downcast_ref::<BooleanArray>()
439 .ok_or_else(|| Error::InvalidArgumentError("Expected Boolean array".into()))?;
440 Ok(if arr.value(index) { 1.0 } else { 0.0 })
441 }
442 DataType::Null => Ok(0.0),
445 other => Err(Error::InvalidArgumentError(format!(
446 "Numeric coercion not supported for column type {other:?}"
447 ))),
448 }
449}
450
451impl AggregateAccumulator {
452 pub fn new_with_projection_index(
464 spec: &AggregateSpec,
465 projection_idx: Option<usize>,
466 _total_rows_hint: Option<i64>,
467 ) -> AggregateResult<Self> {
468 match &spec.kind {
469 AggregateKind::Count { field_id, distinct } => {
470 if field_id.is_none() {
471 return Ok(AggregateAccumulator::CountStar { value: 0 });
472 }
473 let idx = projection_idx.ok_or_else(|| {
474 Error::Internal("Count aggregate requires projection index".into())
475 })?;
476 if *distinct {
477 Ok(AggregateAccumulator::CountDistinctColumn {
478 column_index: idx,
479 seen: FxHashSet::default(),
480 })
481 } else {
482 Ok(AggregateAccumulator::CountColumn {
483 column_index: idx,
484 value: 0,
485 })
486 }
487 }
488 AggregateKind::Sum {
489 data_type,
490 distinct,
491 ..
492 } => {
493 let idx = projection_idx.ok_or_else(|| {
494 Error::Internal("Sum aggregate requires projection index".into())
495 })?;
496 match (data_type, *distinct) {
497 (&DataType::Int64, true) => Ok(AggregateAccumulator::SumDistinctInt64 {
498 column_index: idx,
499 sum: Some(0),
500 seen: FxHashSet::default(),
501 }),
502 (&DataType::Int64, false) => Ok(AggregateAccumulator::SumInt64 {
503 column_index: idx,
504 value: Some(0),
505 has_values: false,
506 }),
507 (&DataType::Decimal128(precision, scale), true) => {
508 Ok(AggregateAccumulator::SumDistinctDecimal128 {
509 column_index: idx,
510 sum: 0,
511 seen: FxHashSet::default(),
512 precision,
513 scale,
514 })
515 }
516 (&DataType::Decimal128(precision, scale), false) => {
517 Ok(AggregateAccumulator::SumDecimal128 {
518 column_index: idx,
519 sum: 0,
520 precision,
521 scale,
522 })
523 }
524 (&DataType::Float64, true) | (&DataType::Utf8, true) => {
526 Ok(AggregateAccumulator::SumDistinctFloat64 {
527 column_index: idx,
528 sum: 0.0,
529 seen: FxHashSet::default(),
530 })
531 }
532 (&DataType::Float64, false) | (&DataType::Utf8, false) => {
533 Ok(AggregateAccumulator::SumFloat64 {
534 column_index: idx,
535 value: 0.0,
536 saw_value: false,
537 })
538 }
539 other => Err(Error::InvalidArgumentError(format!(
540 "SUM aggregate not supported for column type {:?}",
541 other.0
542 ))),
543 }
544 }
545 AggregateKind::Total {
546 data_type,
547 distinct,
548 ..
549 } => {
550 let idx = projection_idx.ok_or_else(|| {
551 Error::Internal("Total aggregate requires projection index".into())
552 })?;
553 match (data_type, *distinct) {
554 (&DataType::Int64, true) => Ok(AggregateAccumulator::TotalDistinctInt64 {
555 column_index: idx,
556 sum: 0.0,
557 seen: FxHashSet::default(),
558 }),
559 (&DataType::Int64, false) => Ok(AggregateAccumulator::TotalInt64 {
560 column_index: idx,
561 value: 0.0,
562 }),
563 (&DataType::Decimal128(precision, scale), true) => {
564 Ok(AggregateAccumulator::TotalDistinctDecimal128 {
565 column_index: idx,
566 sum: 0,
567 seen: FxHashSet::default(),
568 precision,
569 scale,
570 })
571 }
572 (&DataType::Decimal128(precision, scale), false) => {
573 Ok(AggregateAccumulator::TotalDecimal128 {
574 column_index: idx,
575 sum: 0,
576 precision,
577 scale,
578 })
579 }
580 (&DataType::Float64, true) | (&DataType::Utf8, true) => {
582 Ok(AggregateAccumulator::TotalDistinctFloat64 {
583 column_index: idx,
584 sum: 0.0,
585 seen: FxHashSet::default(),
586 })
587 }
588 (&DataType::Float64, false) | (&DataType::Utf8, false) => {
589 Ok(AggregateAccumulator::TotalFloat64 {
590 column_index: idx,
591 value: 0.0,
592 })
593 }
594 other => Err(Error::InvalidArgumentError(format!(
595 "TOTAL aggregate not supported for column type {:?}",
596 other.0
597 ))),
598 }
599 }
600 AggregateKind::Avg {
601 data_type,
602 distinct,
603 ..
604 } => {
605 let idx = projection_idx.ok_or_else(|| {
606 Error::Internal("Avg aggregate requires projection index".into())
607 })?;
608 match (data_type, *distinct) {
609 (&DataType::Int64, true) => Ok(AggregateAccumulator::AvgDistinctInt64 {
610 column_index: idx,
611 sum: 0,
612 seen: FxHashSet::default(),
613 }),
614 (&DataType::Int64, false) => Ok(AggregateAccumulator::AvgInt64 {
615 column_index: idx,
616 sum: 0,
617 count: 0,
618 }),
619 (&DataType::Decimal128(precision, scale), true) => {
620 Ok(AggregateAccumulator::AvgDistinctDecimal128 {
621 column_index: idx,
622 sum: 0,
623 seen: FxHashSet::default(),
624 precision,
625 scale,
626 })
627 }
628 (&DataType::Decimal128(precision, scale), false) => {
629 Ok(AggregateAccumulator::AvgDecimal128 {
630 column_index: idx,
631 sum: 0,
632 count: 0,
633 precision,
634 scale,
635 })
636 }
637 (&DataType::Float64, true) | (&DataType::Utf8, true) => {
639 Ok(AggregateAccumulator::AvgDistinctFloat64 {
640 column_index: idx,
641 sum: 0.0,
642 seen: FxHashSet::default(),
643 })
644 }
645 (&DataType::Float64, false) | (&DataType::Utf8, false) => {
646 Ok(AggregateAccumulator::AvgFloat64 {
647 column_index: idx,
648 sum: 0.0,
649 count: 0,
650 })
651 }
652 other => Err(Error::InvalidArgumentError(format!(
653 "AVG aggregate not supported for column type {:?}",
654 other.0
655 ))),
656 }
657 }
658 AggregateKind::Min { data_type, .. } => {
659 let idx = projection_idx.ok_or_else(|| {
660 Error::Internal("Min aggregate requires projection index".into())
661 })?;
662 match data_type {
663 &DataType::Int64 => Ok(AggregateAccumulator::MinInt64 {
664 column_index: idx,
665 value: None,
666 }),
667 &DataType::Decimal128(precision, scale) => {
668 Ok(AggregateAccumulator::MinDecimal128 {
669 column_index: idx,
670 value: None,
671 precision,
672 scale,
673 })
674 }
675 &DataType::Float64 | &DataType::Utf8 => Ok(AggregateAccumulator::MinFloat64 {
677 column_index: idx,
678 value: None,
679 }),
680 other => Err(Error::InvalidArgumentError(format!(
681 "MIN aggregate not supported for column type {:?}",
682 other
683 ))),
684 }
685 }
686 AggregateKind::Max { data_type, .. } => {
687 let idx = projection_idx.ok_or_else(|| {
688 Error::Internal("Max aggregate requires projection index".into())
689 })?;
690 match data_type {
691 &DataType::Int64 => Ok(AggregateAccumulator::MaxInt64 {
692 column_index: idx,
693 value: None,
694 }),
695 &DataType::Decimal128(precision, scale) => {
696 Ok(AggregateAccumulator::MaxDecimal128 {
697 column_index: idx,
698 value: None,
699 precision,
700 scale,
701 })
702 }
703 &DataType::Float64 | &DataType::Utf8 => Ok(AggregateAccumulator::MaxFloat64 {
705 column_index: idx,
706 value: None,
707 }),
708 other => Err(Error::InvalidArgumentError(format!(
709 "MAX aggregate not supported for column type {:?}",
710 other
711 ))),
712 }
713 }
714 AggregateKind::CountNulls { .. } => {
715 let idx = projection_idx.ok_or_else(|| {
716 Error::Internal("CountNulls aggregate requires projection index".into())
717 })?;
718 Ok(AggregateAccumulator::CountNulls {
719 column_index: idx,
720 non_null_rows: 0,
721 total_rows_seen: 0,
722 })
723 }
724 AggregateKind::GroupConcat {
725 distinct,
726 separator,
727 ..
728 } => {
729 let idx = projection_idx.ok_or_else(|| {
730 Error::Internal("GroupConcat aggregate requires projection index".into())
731 })?;
732 if *distinct {
733 Ok(AggregateAccumulator::GroupConcatDistinct {
734 column_index: idx,
735 seen: FxHashSet::default(),
736 values: Vec::new(),
737 separator: separator.clone(),
738 })
739 } else {
740 Ok(AggregateAccumulator::GroupConcat {
741 column_index: idx,
742 values: Vec::new(),
743 separator: separator.clone(),
744 })
745 }
746 }
747 }
748 }
749
750 pub fn update(&mut self, batch: &RecordBatch) -> AggregateResult<()> {
760 match self {
761 AggregateAccumulator::CountStar { value } => {
762 let rows = i64::try_from(batch.num_rows()).map_err(|_| {
763 Error::InvalidArgumentError("COUNT result exceeds i64 range".into())
764 })?;
765 *value = value.checked_add(rows).ok_or_else(|| {
766 Error::InvalidArgumentError("COUNT result exceeds i64 range".into())
767 })?;
768 }
769 AggregateAccumulator::CountColumn {
770 column_index,
771 value,
772 } => {
773 let array = batch.column(*column_index);
774 if matches!(array.data_type(), DataType::Null) {
777 return Ok(());
778 }
779 let non_null = (0..array.len()).filter(|idx| array.is_valid(*idx)).count();
780 let non_null = i64::try_from(non_null).map_err(|_| {
781 Error::InvalidArgumentError("COUNT result exceeds i64 range".into())
782 })?;
783 *value = value.checked_add(non_null).ok_or_else(|| {
784 Error::InvalidArgumentError("COUNT result exceeds i64 range".into())
785 })?;
786 }
787 AggregateAccumulator::CountDistinctColumn { column_index, seen } => {
788 let array = batch.column(*column_index);
789 if matches!(array.data_type(), DataType::Null) {
791 return Ok(());
792 }
793 for i in 0..array.len() {
794 if !array.is_valid(i) {
795 continue;
796 }
797 let value = DistinctKey::from_array(array, i)?;
798 seen.insert(value);
799 }
800 }
801 AggregateAccumulator::SumInt64 {
802 column_index,
803 value,
804 has_values,
805 } => {
806 let array = batch.column(*column_index);
807 if matches!(array.data_type(), DataType::Null) {
809 return Ok(());
810 }
811 let array = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
812 Error::InvalidArgumentError(
813 "SUM aggregate expected an INT column in execution".into(),
814 )
815 })?;
816 for i in 0..array.len() {
817 if array.is_valid(i) {
818 *has_values = true;
819 let v = array.value(i);
820 *value = match *value {
821 Some(current) => Some(current.checked_add(v).ok_or_else(|| {
822 Error::InvalidArgumentError("integer overflow".into())
823 })?),
824 None => {
825 return Err(Error::InvalidArgumentError("integer overflow".into()));
826 }
827 };
828 }
829 }
830 }
831 AggregateAccumulator::SumDistinctInt64 {
832 column_index,
833 sum,
834 seen,
835 } => {
836 let array = batch.column(*column_index);
837 if matches!(array.data_type(), DataType::Null) {
839 return Ok(());
840 }
841 let array = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
842 Error::InvalidArgumentError(
843 "SUM(DISTINCT) aggregate expected an INT column in execution".into(),
844 )
845 })?;
846 for i in 0..array.len() {
847 if array.is_valid(i) {
848 let col_array = batch.column(*column_index);
849 let key = DistinctKey::from_array(col_array, i)?;
850 if seen.insert(key.clone()) {
851 if let DistinctKey::Int(v) = key {
853 *sum = match *sum {
854 Some(current) => {
855 Some(current.checked_add(v).ok_or_else(|| {
856 Error::InvalidArgumentError("integer overflow".into())
857 })?)
858 }
859 None => {
860 return Err(Error::InvalidArgumentError(
861 "integer overflow".into(),
862 ));
863 }
864 };
865 }
866 }
867 }
868 }
869 }
870 AggregateAccumulator::SumFloat64 {
871 column_index,
872 value,
873 saw_value,
874 } => {
875 let column = batch.column(*column_index);
876 if matches!(column.data_type(), DataType::Null) {
878 return Ok(());
879 }
880 for i in 0..column.len() {
882 if column.is_valid(i) {
883 let v = array_value_to_numeric(column, i)?;
884 *value += v;
885 *saw_value = true;
886 }
887 }
888 }
889 AggregateAccumulator::SumDistinctFloat64 {
890 column_index,
891 sum,
892 seen,
893 } => {
894 let column = batch.column(*column_index);
895 if matches!(column.data_type(), DataType::Null) {
897 return Ok(());
898 }
899 for i in 0..column.len() {
900 if !column.is_valid(i) {
901 continue;
902 }
903 let key = DistinctKey::from_array(column, i)?;
905 if seen.insert(key.clone()) {
906 let v = match key {
908 DistinctKey::Float(bits) => f64::from_bits(bits),
909 DistinctKey::Int(int_val) => int_val as f64,
910 DistinctKey::Str(_) => array_value_to_numeric(column, i)?,
911 DistinctKey::Bool(b) => {
912 if b {
913 1.0
914 } else {
915 0.0
916 }
917 }
918 DistinctKey::Date(d) => d as f64,
919 DistinctKey::Decimal(_) => array_value_to_numeric(column, i)?,
920 };
921 *sum += v;
922 }
923 }
924 }
925 AggregateAccumulator::SumDecimal128 {
926 column_index, sum, ..
927 } => {
928 let column = batch.column(*column_index);
929 let arr = column
930 .as_any()
931 .downcast_ref::<arrow::array::Decimal128Array>()
932 .ok_or_else(|| {
933 Error::InvalidArgumentError("Expected Decimal128 array".into())
934 })?;
935 for i in 0..arr.len() {
936 if arr.is_valid(i) {
937 *sum = sum.checked_add(arr.value(i)).ok_or_else(|| {
938 Error::InvalidArgumentError("Decimal128 sum overflow".into())
939 })?;
940 }
941 }
942 }
943 AggregateAccumulator::SumDistinctDecimal128 {
944 column_index,
945 sum,
946 seen,
947 ..
948 } => {
949 let column = batch.column(*column_index);
950 let arr = column
951 .as_any()
952 .downcast_ref::<arrow::array::Decimal128Array>()
953 .ok_or_else(|| {
954 Error::InvalidArgumentError("Expected Decimal128 array".into())
955 })?;
956 for i in 0..arr.len() {
957 if !arr.is_valid(i) {
958 continue;
959 }
960 let key = DistinctKey::from_array(column, i)?;
961 if seen.insert(key) {
962 *sum = sum.checked_add(arr.value(i)).ok_or_else(|| {
963 Error::InvalidArgumentError("Decimal128 sum overflow".into())
964 })?;
965 }
966 }
967 }
968 AggregateAccumulator::TotalInt64 {
969 column_index,
970 value,
971 } => {
972 let array = batch.column(*column_index);
973 if matches!(array.data_type(), DataType::Null) {
975 return Ok(());
976 }
977 let array = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
978 Error::InvalidArgumentError(
979 "TOTAL aggregate expected an INT column in execution".into(),
980 )
981 })?;
982 for i in 0..array.len() {
983 if array.is_valid(i) {
984 let v = array.value(i);
985 *value += v as f64;
987 }
988 }
989 }
990 AggregateAccumulator::TotalDistinctInt64 {
991 column_index,
992 sum,
993 seen,
994 } => {
995 let array = batch.column(*column_index);
996 if matches!(array.data_type(), DataType::Null) {
998 return Ok(());
999 }
1000 let array = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
1001 Error::InvalidArgumentError(
1002 "TOTAL(DISTINCT) aggregate expected an INT column in execution".into(),
1003 )
1004 })?;
1005 for i in 0..array.len() {
1006 if array.is_valid(i) {
1007 let col_array = batch.column(*column_index);
1008 let key = DistinctKey::from_array(col_array, i)?;
1009 if seen.insert(key.clone())
1010 && let DistinctKey::Int(v) = key
1011 {
1012 *sum += v as f64;
1014 }
1015 }
1016 }
1017 }
1018 AggregateAccumulator::TotalFloat64 {
1019 column_index,
1020 value,
1021 } => {
1022 let column = batch.column(*column_index);
1023 if matches!(column.data_type(), DataType::Null) {
1025 return Ok(());
1026 }
1027 for i in 0..column.len() {
1029 if column.is_valid(i) {
1030 let v = array_value_to_numeric(column, i)?;
1031 *value += v;
1032 }
1033 }
1034 }
1035 AggregateAccumulator::TotalDistinctFloat64 {
1036 column_index,
1037 sum,
1038 seen,
1039 } => {
1040 let column = batch.column(*column_index);
1041 if matches!(column.data_type(), DataType::Null) {
1043 return Ok(());
1044 }
1045 for i in 0..column.len() {
1046 if !column.is_valid(i) {
1047 continue;
1048 }
1049 let key = DistinctKey::from_array(column, i)?;
1051 if seen.insert(key.clone()) {
1052 let v = match key {
1054 DistinctKey::Float(bits) => f64::from_bits(bits),
1055 DistinctKey::Int(int_val) => int_val as f64,
1056 DistinctKey::Str(_) => array_value_to_numeric(column, i)?,
1057 DistinctKey::Bool(b) => {
1058 if b {
1059 1.0
1060 } else {
1061 0.0
1062 }
1063 }
1064 DistinctKey::Date(d) => d as f64,
1065 DistinctKey::Decimal(_) => array_value_to_numeric(column, i)?,
1066 };
1067 *sum += v;
1068 }
1069 }
1070 }
1071 AggregateAccumulator::TotalDecimal128 {
1072 column_index, sum, ..
1073 } => {
1074 let column = batch.column(*column_index);
1075 let arr = column
1076 .as_any()
1077 .downcast_ref::<arrow::array::Decimal128Array>()
1078 .ok_or_else(|| {
1079 Error::InvalidArgumentError("Expected Decimal128 array".into())
1080 })?;
1081 for i in 0..arr.len() {
1082 if arr.is_valid(i) {
1083 *sum = sum.checked_add(arr.value(i)).ok_or_else(|| {
1084 Error::InvalidArgumentError("Decimal128 total overflow".into())
1085 })?;
1086 }
1087 }
1088 }
1089 AggregateAccumulator::TotalDistinctDecimal128 {
1090 column_index,
1091 sum,
1092 seen,
1093 ..
1094 } => {
1095 let column = batch.column(*column_index);
1096 let arr = column
1097 .as_any()
1098 .downcast_ref::<arrow::array::Decimal128Array>()
1099 .ok_or_else(|| {
1100 Error::InvalidArgumentError("Expected Decimal128 array".into())
1101 })?;
1102 for i in 0..arr.len() {
1103 if !arr.is_valid(i) {
1104 continue;
1105 }
1106 let key = DistinctKey::from_array(column, i)?;
1107 if seen.insert(key) {
1108 *sum = sum.checked_add(arr.value(i)).ok_or_else(|| {
1109 Error::InvalidArgumentError("Decimal128 total overflow".into())
1110 })?;
1111 }
1112 }
1113 }
1114 AggregateAccumulator::AvgInt64 {
1115 column_index,
1116 sum,
1117 count,
1118 } => {
1119 let array = batch.column(*column_index);
1120 if matches!(array.data_type(), DataType::Null) {
1122 return Ok(());
1123 }
1124 let array = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
1125 Error::InvalidArgumentError(
1126 "AVG aggregate expected an INT column in execution".into(),
1127 )
1128 })?;
1129 for i in 0..array.len() {
1130 if array.is_valid(i) {
1131 let v = array.value(i);
1132 *sum = sum.checked_add(v).ok_or_else(|| {
1133 Error::InvalidArgumentError(
1134 "AVG aggregate sum exceeds i64 range".into(),
1135 )
1136 })?;
1137 *count = count.checked_add(1).ok_or_else(|| {
1138 Error::InvalidArgumentError(
1139 "AVG aggregate count exceeds i64 range".into(),
1140 )
1141 })?;
1142 }
1143 }
1144 }
1145 AggregateAccumulator::AvgDistinctInt64 {
1146 column_index,
1147 sum,
1148 seen,
1149 } => {
1150 let array = batch.column(*column_index);
1151 if matches!(array.data_type(), DataType::Null) {
1153 return Ok(());
1154 }
1155 let array = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
1156 Error::InvalidArgumentError(
1157 "AVG(DISTINCT) aggregate expected an INT column in execution".into(),
1158 )
1159 })?;
1160 for i in 0..array.len() {
1161 if array.is_valid(i) {
1162 let col_array = batch.column(*column_index);
1163 let key = DistinctKey::from_array(col_array, i)?;
1164 if seen.insert(key.clone()) {
1165 if let DistinctKey::Int(v) = key {
1167 *sum = sum.checked_add(v).ok_or_else(|| {
1168 Error::InvalidArgumentError(
1169 "AVG(DISTINCT) aggregate sum exceeds i64 range".into(),
1170 )
1171 })?;
1172 }
1173 }
1174 }
1175 }
1176 }
1177 AggregateAccumulator::AvgFloat64 {
1178 column_index,
1179 sum,
1180 count,
1181 } => {
1182 let column = batch.column(*column_index);
1183 if matches!(column.data_type(), DataType::Null) {
1185 return Ok(());
1186 }
1187 for i in 0..column.len() {
1189 if column.is_valid(i) {
1190 let v = array_value_to_numeric(column, i)?;
1191 *sum += v;
1192 *count = count.checked_add(1).ok_or_else(|| {
1193 Error::InvalidArgumentError(
1194 "AVG aggregate count exceeds i64 range".into(),
1195 )
1196 })?;
1197 }
1198 }
1199 }
1200 AggregateAccumulator::AvgDistinctFloat64 {
1201 column_index,
1202 sum,
1203 seen,
1204 } => {
1205 let column = batch.column(*column_index);
1206 if matches!(column.data_type(), DataType::Null) {
1208 return Ok(());
1209 }
1210 for i in 0..column.len() {
1211 if !column.is_valid(i) {
1212 continue;
1213 }
1214 let key = DistinctKey::from_array(column, i)?;
1216 if seen.insert(key.clone()) {
1217 let v = match key {
1219 DistinctKey::Float(bits) => f64::from_bits(bits),
1220 DistinctKey::Int(int_val) => int_val as f64,
1221 DistinctKey::Str(_) => array_value_to_numeric(column, i)?,
1222 DistinctKey::Bool(b) => {
1223 if b {
1224 1.0
1225 } else {
1226 0.0
1227 }
1228 }
1229 DistinctKey::Date(d) => d as f64,
1230 DistinctKey::Decimal(_) => array_value_to_numeric(column, i)?,
1231 };
1232 *sum += v;
1233 }
1234 }
1235 }
1236 AggregateAccumulator::AvgDecimal128 {
1237 column_index,
1238 sum,
1239 count,
1240 ..
1241 } => {
1242 let column = batch.column(*column_index);
1243 let arr = column
1244 .as_any()
1245 .downcast_ref::<arrow::array::Decimal128Array>()
1246 .ok_or_else(|| {
1247 Error::InvalidArgumentError("Expected Decimal128 array".into())
1248 })?;
1249 for i in 0..arr.len() {
1250 if arr.is_valid(i) {
1251 *sum = sum.checked_add(arr.value(i)).ok_or_else(|| {
1252 Error::InvalidArgumentError("Decimal128 sum overflow".into())
1253 })?;
1254 *count = count.checked_add(1).ok_or_else(|| {
1255 Error::InvalidArgumentError("AVG count overflow".into())
1256 })?;
1257 }
1258 }
1259 }
1260 AggregateAccumulator::AvgDistinctDecimal128 {
1261 column_index,
1262 sum,
1263 seen,
1264 ..
1265 } => {
1266 let column = batch.column(*column_index);
1267 let arr = column
1268 .as_any()
1269 .downcast_ref::<arrow::array::Decimal128Array>()
1270 .ok_or_else(|| {
1271 Error::InvalidArgumentError("Expected Decimal128 array".into())
1272 })?;
1273 for i in 0..arr.len() {
1274 if !arr.is_valid(i) {
1275 continue;
1276 }
1277 let key = DistinctKey::from_array(column, i)?;
1278 if seen.insert(key) {
1279 *sum = sum.checked_add(arr.value(i)).ok_or_else(|| {
1280 Error::InvalidArgumentError("Decimal128 sum overflow".into())
1281 })?;
1282 }
1283 }
1284 }
1285 AggregateAccumulator::MinInt64 {
1286 column_index,
1287 value,
1288 } => {
1289 let array = batch.column(*column_index);
1290 if matches!(array.data_type(), DataType::Null) {
1292 return Ok(());
1293 }
1294 let array = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
1295 Error::InvalidArgumentError(
1296 "MIN aggregate expected an INT column in execution".into(),
1297 )
1298 })?;
1299 for i in 0..array.len() {
1300 if array.is_valid(i) {
1301 let v = array.value(i);
1302 *value = Some(match *value {
1303 Some(current) => current.min(v),
1304 None => v,
1305 });
1306 }
1307 }
1308 }
1309 AggregateAccumulator::MinFloat64 {
1310 column_index,
1311 value,
1312 } => {
1313 let column = batch.column(*column_index);
1314 if matches!(column.data_type(), DataType::Null) {
1316 return Ok(());
1317 }
1318 for i in 0..column.len() {
1320 if column.is_valid(i) {
1321 let v = array_value_to_numeric(column, i)?;
1322 *value = Some(match *value {
1323 Some(current) => match v.partial_cmp(¤t) {
1324 Some(Ordering::Less) => v,
1325 _ => current,
1326 },
1327 None => v,
1328 });
1329 }
1330 }
1331 }
1332 AggregateAccumulator::MinDecimal128 {
1333 column_index,
1334 value,
1335 ..
1336 } => {
1337 let column = batch.column(*column_index);
1338 let arr = column
1339 .as_any()
1340 .downcast_ref::<arrow::array::Decimal128Array>()
1341 .ok_or_else(|| {
1342 Error::InvalidArgumentError("Expected Decimal128 array".into())
1343 })?;
1344 for i in 0..arr.len() {
1345 if arr.is_valid(i) {
1346 let v = arr.value(i);
1347 *value = Some(match *value {
1348 Some(current) => current.min(v),
1349 None => v,
1350 });
1351 }
1352 }
1353 }
1354 AggregateAccumulator::MaxInt64 {
1355 column_index,
1356 value,
1357 } => {
1358 let array = batch.column(*column_index);
1359 if matches!(array.data_type(), DataType::Null) {
1360 return Ok(());
1361 }
1362 let array = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
1363 Error::InvalidArgumentError(
1364 "MAX aggregate expected an INT column in execution".into(),
1365 )
1366 })?;
1367 for i in 0..array.len() {
1368 if array.is_valid(i) {
1369 let v = array.value(i);
1370 *value = Some(match *value {
1371 Some(current) => current.max(v),
1372 None => v,
1373 });
1374 }
1375 }
1376 }
1377 AggregateAccumulator::MaxFloat64 {
1378 column_index,
1379 value,
1380 } => {
1381 let column = batch.column(*column_index);
1382 if matches!(column.data_type(), DataType::Null) {
1384 return Ok(());
1385 }
1386 for i in 0..column.len() {
1388 if column.is_valid(i) {
1389 let v = array_value_to_numeric(column, i)?;
1390 *value = Some(match *value {
1391 Some(current) => match v.partial_cmp(¤t) {
1392 Some(Ordering::Greater) => v,
1393 _ => current,
1394 },
1395 None => v,
1396 });
1397 }
1398 }
1399 }
1400 AggregateAccumulator::MaxDecimal128 {
1401 column_index,
1402 value,
1403 ..
1404 } => {
1405 let column = batch.column(*column_index);
1406 let arr = column
1407 .as_any()
1408 .downcast_ref::<arrow::array::Decimal128Array>()
1409 .ok_or_else(|| {
1410 Error::InvalidArgumentError("Expected Decimal128 array".into())
1411 })?;
1412 for i in 0..arr.len() {
1413 if arr.is_valid(i) {
1414 let v = arr.value(i);
1415 *value = Some(match *value {
1416 Some(current) => current.max(v),
1417 None => v,
1418 });
1419 }
1420 }
1421 }
1422 AggregateAccumulator::CountNulls {
1423 column_index,
1424 non_null_rows,
1425 total_rows_seen,
1426 } => {
1427 let rows = i64::try_from(batch.num_rows()).map_err(|_| {
1428 Error::InvalidArgumentError("COUNT result exceeds i64 range".into())
1429 })?;
1430 *total_rows_seen = total_rows_seen.checked_add(rows).ok_or_else(|| {
1431 Error::InvalidArgumentError("COUNT result exceeds i64 range".into())
1432 })?;
1433
1434 let array = batch.column(*column_index);
1435 if !matches!(array.data_type(), DataType::Null) {
1437 let non_null = (0..array.len()).filter(|idx| array.is_valid(*idx)).count();
1438 let non_null = i64::try_from(non_null).map_err(|_| {
1439 Error::InvalidArgumentError("COUNT result exceeds i64 range".into())
1440 })?;
1441 *non_null_rows = non_null_rows.checked_add(non_null).ok_or_else(|| {
1442 Error::InvalidArgumentError("COUNT result exceeds i64 range".into())
1443 })?;
1444 }
1445 }
1446 AggregateAccumulator::GroupConcat {
1447 column_index,
1448 values,
1449 separator: _,
1450 } => {
1451 let array = batch.column(*column_index);
1452 for i in 0..array.len() {
1453 if array.is_valid(i) {
1454 let str_val = array_value_to_string(array, i)?;
1455 values.push(str_val);
1456 }
1457 }
1458 }
1459 AggregateAccumulator::GroupConcatDistinct {
1460 column_index,
1461 seen,
1462 values,
1463 separator: _,
1464 } => {
1465 let array = batch.column(*column_index);
1466 for i in 0..array.len() {
1467 if array.is_valid(i) {
1468 let str_val = array_value_to_string(array, i)?;
1469 if seen.insert(str_val.clone()) {
1470 values.push(str_val);
1471 }
1472 }
1473 }
1474 }
1475 }
1476 Ok(())
1477 }
1478
1479 pub fn finalize(self) -> AggregateResult<(Field, ArrayRef)> {
1489 match self {
1490 AggregateAccumulator::CountStar { value } => {
1491 let mut builder = Int64Builder::with_capacity(1);
1492 builder.append_value(value);
1493 let array = Arc::new(builder.finish()) as ArrayRef;
1494 Ok((Field::new("count", DataType::Int64, false), array))
1495 }
1496 AggregateAccumulator::CountColumn { value, .. } => {
1497 let mut builder = Int64Builder::with_capacity(1);
1498 builder.append_value(value);
1499 let array = Arc::new(builder.finish()) as ArrayRef;
1500 Ok((Field::new("count", DataType::Int64, false), array))
1501 }
1502 AggregateAccumulator::CountDistinctColumn { seen, .. } => {
1503 let mut builder = Int64Builder::with_capacity(1);
1504 let count = i64::try_from(seen.len()).map_err(|_| {
1505 Error::InvalidArgumentError("COUNT(DISTINCT) result exceeds i64 range".into())
1506 })?;
1507 builder.append_value(count);
1508 let array = Arc::new(builder.finish()) as ArrayRef;
1509 Ok((Field::new("count_distinct", DataType::Int64, false), array))
1510 }
1511 AggregateAccumulator::SumInt64 {
1512 value, has_values, ..
1513 } => {
1514 if has_values && value.is_none() {
1517 return Err(Error::InvalidArgumentError("integer overflow".into()));
1518 }
1519
1520 let mut builder = Int64Builder::with_capacity(1);
1521 if !has_values {
1522 builder.append_null(); } else {
1524 match value {
1525 Some(v) => builder.append_value(v),
1526 None => unreachable!(), }
1528 }
1529 let array = Arc::new(builder.finish()) as ArrayRef;
1530 Ok((Field::new("sum", DataType::Int64, true), array))
1531 }
1532 AggregateAccumulator::SumDistinctInt64 { sum, seen, .. } => {
1533 let mut builder = Int64Builder::with_capacity(1);
1534 if seen.is_empty() {
1535 builder.append_null();
1536 } else {
1537 match sum {
1538 Some(v) => builder.append_value(v),
1539 None => builder.append_null(), }
1541 }
1542 let array = Arc::new(builder.finish()) as ArrayRef;
1543 Ok((Field::new("sum_distinct", DataType::Int64, true), array))
1544 }
1545 AggregateAccumulator::SumFloat64 {
1546 value, saw_value, ..
1547 } => {
1548 let mut builder = Float64Builder::with_capacity(1);
1549 if saw_value {
1550 builder.append_value(value);
1551 } else {
1552 builder.append_null();
1553 }
1554 let array = Arc::new(builder.finish()) as ArrayRef;
1555 Ok((Field::new("sum", DataType::Float64, true), array))
1556 }
1557 AggregateAccumulator::SumDistinctFloat64 { sum, seen, .. } => {
1558 let mut builder = Float64Builder::with_capacity(1);
1559 if !seen.is_empty() {
1560 builder.append_value(sum);
1561 } else {
1562 builder.append_null();
1563 }
1564 let array = Arc::new(builder.finish()) as ArrayRef;
1565 Ok((Field::new("sum_distinct", DataType::Float64, true), array))
1566 }
1567 AggregateAccumulator::SumDecimal128 {
1568 sum,
1569 precision,
1570 scale,
1571 ..
1572 } => {
1573 let data_type = DataType::Decimal128(precision, scale);
1574 let array = Arc::new(
1575 arrow::array::Decimal128Array::from(vec![sum])
1576 .with_precision_and_scale(precision, scale)
1577 .map_err(|e| {
1578 Error::InvalidArgumentError(format!("Invalid decimal: {}", e))
1579 })?,
1580 ) as ArrayRef;
1581 Ok((Field::new("sum", data_type, true), array))
1582 }
1583 AggregateAccumulator::SumDistinctDecimal128 {
1584 sum,
1585 seen,
1586 precision,
1587 scale,
1588 ..
1589 } => {
1590 let data_type = DataType::Decimal128(precision, scale);
1591 let array = if seen.is_empty() {
1592 Arc::new(
1593 arrow::array::Decimal128Array::from(vec![Option::<i128>::None])
1594 .with_precision_and_scale(precision, scale)
1595 .map_err(|e| {
1596 Error::InvalidArgumentError(format!("Invalid decimal: {}", e))
1597 })?,
1598 ) as ArrayRef
1599 } else {
1600 Arc::new(
1601 arrow::array::Decimal128Array::from(vec![sum])
1602 .with_precision_and_scale(precision, scale)
1603 .map_err(|e| {
1604 Error::InvalidArgumentError(format!("Invalid decimal: {}", e))
1605 })?,
1606 ) as ArrayRef
1607 };
1608 Ok((Field::new("sum_distinct", data_type, true), array))
1609 }
1610 AggregateAccumulator::TotalInt64 { value, .. } => {
1611 let mut builder = Float64Builder::with_capacity(1);
1612 builder.append_value(value);
1613 let array = Arc::new(builder.finish()) as ArrayRef;
1614 Ok((Field::new("total", DataType::Float64, false), array))
1615 }
1616 AggregateAccumulator::TotalDistinctInt64 { sum, .. } => {
1617 let mut builder = Float64Builder::with_capacity(1);
1618 builder.append_value(sum);
1619 let array = Arc::new(builder.finish()) as ArrayRef;
1620 Ok((
1621 Field::new("total_distinct", DataType::Float64, false),
1622 array,
1623 ))
1624 }
1625 AggregateAccumulator::TotalFloat64 { value, .. } => {
1626 let mut builder = Float64Builder::with_capacity(1);
1627 builder.append_value(value);
1628 let array = Arc::new(builder.finish()) as ArrayRef;
1629 Ok((Field::new("total", DataType::Float64, false), array))
1630 }
1631 AggregateAccumulator::TotalDistinctFloat64 { sum, .. } => {
1632 let mut builder = Float64Builder::with_capacity(1);
1633 builder.append_value(sum);
1634 let array = Arc::new(builder.finish()) as ArrayRef;
1635 Ok((
1636 Field::new("total_distinct", DataType::Float64, false),
1637 array,
1638 ))
1639 }
1640 AggregateAccumulator::TotalDecimal128 {
1641 sum,
1642 precision,
1643 scale,
1644 ..
1645 } => {
1646 let data_type = DataType::Decimal128(precision, scale);
1647 let array = Arc::new(
1648 arrow::array::Decimal128Array::from(vec![sum])
1649 .with_precision_and_scale(precision, scale)
1650 .map_err(|e| {
1651 Error::InvalidArgumentError(format!("Invalid decimal: {}", e))
1652 })?,
1653 ) as ArrayRef;
1654 Ok((Field::new("total", data_type, false), array))
1655 }
1656 AggregateAccumulator::TotalDistinctDecimal128 {
1657 sum,
1658 precision,
1659 scale,
1660 ..
1661 } => {
1662 let data_type = DataType::Decimal128(precision, scale);
1663 let array = Arc::new(
1664 arrow::array::Decimal128Array::from(vec![sum])
1665 .with_precision_and_scale(precision, scale)
1666 .map_err(|e| {
1667 Error::InvalidArgumentError(format!("Invalid decimal: {}", e))
1668 })?,
1669 ) as ArrayRef;
1670 Ok((Field::new("total_distinct", data_type, false), array))
1671 }
1672 AggregateAccumulator::AvgInt64 { sum, count, .. } => {
1673 let mut builder = Float64Builder::with_capacity(1);
1674 if count > 0 {
1675 let avg = (sum as f64) / (count as f64);
1677 builder.append_value(avg);
1678 } else {
1679 builder.append_null();
1680 }
1681 let array = Arc::new(builder.finish()) as ArrayRef;
1682 Ok((Field::new("avg", DataType::Float64, true), array))
1683 }
1684 AggregateAccumulator::AvgDistinctInt64 { sum, seen, .. } => {
1685 let mut builder = Float64Builder::with_capacity(1);
1686 let count = seen.len();
1687 if count > 0 {
1688 let avg = (sum as f64) / (count as f64);
1690 builder.append_value(avg);
1691 } else {
1692 builder.append_null();
1693 }
1694 let array = Arc::new(builder.finish()) as ArrayRef;
1695 Ok((Field::new("avg_distinct", DataType::Float64, true), array))
1696 }
1697 AggregateAccumulator::AvgFloat64 { sum, count, .. } => {
1698 let mut builder = Float64Builder::with_capacity(1);
1699 if count > 0 {
1700 let avg = sum / (count as f64);
1701 builder.append_value(avg);
1702 } else {
1703 builder.append_null();
1704 }
1705 let array = Arc::new(builder.finish()) as ArrayRef;
1706 Ok((Field::new("avg", DataType::Float64, true), array))
1707 }
1708 AggregateAccumulator::AvgDistinctFloat64 { sum, seen, .. } => {
1709 let mut builder = Float64Builder::with_capacity(1);
1710 let count = seen.len();
1711 if count > 0 {
1712 let avg = sum / (count as f64);
1713 builder.append_value(avg);
1714 } else {
1715 builder.append_null();
1716 }
1717 let array = Arc::new(builder.finish()) as ArrayRef;
1718 Ok((Field::new("avg_distinct", DataType::Float64, true), array))
1719 }
1720 AggregateAccumulator::AvgDecimal128 {
1721 sum,
1722 count,
1723 precision,
1724 scale,
1725 ..
1726 } => {
1727 let data_type = DataType::Decimal128(precision, scale);
1728 let array = if count > 0 {
1729 let count_i128 = count as i128;
1732 let mut avg = sum / count_i128;
1733 let rem = sum % count_i128;
1734
1735 if rem.abs() * 2 >= count_i128.abs() {
1737 if sum.signum() == count_i128.signum() {
1738 avg += 1;
1739 } else {
1740 avg -= 1;
1741 }
1742 }
1743
1744 Arc::new(
1745 arrow::array::Decimal128Array::from(vec![avg])
1746 .with_precision_and_scale(precision, scale)
1747 .map_err(|e| {
1748 Error::InvalidArgumentError(format!("Invalid decimal: {}", e))
1749 })?,
1750 ) as ArrayRef
1751 } else {
1752 Arc::new(
1753 arrow::array::Decimal128Array::from(vec![Option::<i128>::None])
1754 .with_precision_and_scale(precision, scale)
1755 .map_err(|e| {
1756 Error::InvalidArgumentError(format!("Invalid decimal: {}", e))
1757 })?,
1758 ) as ArrayRef
1759 };
1760 Ok((Field::new("avg", data_type, true), array))
1761 }
1762 AggregateAccumulator::AvgDistinctDecimal128 {
1763 sum,
1764 seen,
1765 precision,
1766 scale,
1767 ..
1768 } => {
1769 let data_type = DataType::Decimal128(precision, scale);
1770 let count = seen.len();
1771 let array = if count > 0 {
1772 let count_i128 = count as i128;
1775 let mut avg = sum / count_i128;
1776 let rem = sum % count_i128;
1777
1778 if rem.abs() * 2 >= count_i128.abs() {
1780 if sum.signum() == count_i128.signum() {
1781 avg += 1;
1782 } else {
1783 avg -= 1;
1784 }
1785 }
1786
1787 Arc::new(
1788 arrow::array::Decimal128Array::from(vec![avg])
1789 .with_precision_and_scale(precision, scale)
1790 .map_err(|e| {
1791 Error::InvalidArgumentError(format!("Invalid decimal: {}", e))
1792 })?,
1793 ) as ArrayRef
1794 } else {
1795 Arc::new(
1796 arrow::array::Decimal128Array::from(vec![Option::<i128>::None])
1797 .with_precision_and_scale(precision, scale)
1798 .map_err(|e| {
1799 Error::InvalidArgumentError(format!("Invalid decimal: {}", e))
1800 })?,
1801 ) as ArrayRef
1802 };
1803 Ok((Field::new("avg_distinct", data_type, true), array))
1804 }
1805 AggregateAccumulator::MinInt64 { value, .. } => {
1806 let mut builder = Int64Builder::with_capacity(1);
1807 if let Some(v) = value {
1808 builder.append_value(v);
1809 } else {
1810 builder.append_null();
1811 }
1812 let array = Arc::new(builder.finish()) as ArrayRef;
1813 Ok((Field::new("min", DataType::Int64, true), array))
1814 }
1815 AggregateAccumulator::MinFloat64 { value, .. } => {
1816 let mut builder = Float64Builder::with_capacity(1);
1817 if let Some(v) = value {
1818 builder.append_value(v);
1819 } else {
1820 builder.append_null();
1821 }
1822 let array = Arc::new(builder.finish()) as ArrayRef;
1823 Ok((Field::new("min", DataType::Float64, true), array))
1824 }
1825 AggregateAccumulator::MinDecimal128 {
1826 value,
1827 precision,
1828 scale,
1829 ..
1830 } => {
1831 let data_type = DataType::Decimal128(precision, scale);
1832 let array = match value {
1833 Some(v) => Arc::new(
1834 arrow::array::Decimal128Array::from(vec![v])
1835 .with_precision_and_scale(precision, scale)
1836 .map_err(|e| {
1837 Error::InvalidArgumentError(format!("Invalid decimal: {}", e))
1838 })?,
1839 ) as ArrayRef,
1840 None => Arc::new(
1841 arrow::array::Decimal128Array::from(vec![Option::<i128>::None])
1842 .with_precision_and_scale(precision, scale)
1843 .map_err(|e| {
1844 Error::InvalidArgumentError(format!("Invalid decimal: {}", e))
1845 })?,
1846 ) as ArrayRef,
1847 };
1848 Ok((Field::new("min", data_type, true), array))
1849 }
1850 AggregateAccumulator::MaxInt64 { value, .. } => {
1851 let mut builder = Int64Builder::with_capacity(1);
1852 if let Some(v) = value {
1853 builder.append_value(v);
1854 } else {
1855 builder.append_null();
1856 }
1857 let array = Arc::new(builder.finish()) as ArrayRef;
1858 Ok((Field::new("max", DataType::Int64, true), array))
1859 }
1860 AggregateAccumulator::MaxFloat64 { value, .. } => {
1861 let mut builder = Float64Builder::with_capacity(1);
1862 if let Some(v) = value {
1863 builder.append_value(v);
1864 } else {
1865 builder.append_null();
1866 }
1867 let array = Arc::new(builder.finish()) as ArrayRef;
1868 Ok((Field::new("max", DataType::Float64, true), array))
1869 }
1870 AggregateAccumulator::MaxDecimal128 {
1871 value,
1872 precision,
1873 scale,
1874 ..
1875 } => {
1876 let data_type = DataType::Decimal128(precision, scale);
1877 let array = match value {
1878 Some(v) => Arc::new(
1879 arrow::array::Decimal128Array::from(vec![v])
1880 .with_precision_and_scale(precision, scale)
1881 .map_err(|e| {
1882 Error::InvalidArgumentError(format!("Invalid decimal: {}", e))
1883 })?,
1884 ) as ArrayRef,
1885 None => Arc::new(
1886 arrow::array::Decimal128Array::from(vec![Option::<i128>::None])
1887 .with_precision_and_scale(precision, scale)
1888 .map_err(|e| {
1889 Error::InvalidArgumentError(format!("Invalid decimal: {}", e))
1890 })?,
1891 ) as ArrayRef,
1892 };
1893 Ok((Field::new("max", data_type, true), array))
1894 }
1895 AggregateAccumulator::CountNulls {
1896 non_null_rows,
1897 total_rows_seen,
1898 ..
1899 } => {
1900 let nulls = total_rows_seen.checked_sub(non_null_rows).ok_or_else(|| {
1901 Error::InvalidArgumentError(
1902 "NULL-count aggregate observed more non-null rows than total rows".into(),
1903 )
1904 })?;
1905 let mut builder = Int64Builder::with_capacity(1);
1906 builder.append_value(nulls);
1907 let array = Arc::new(builder.finish()) as ArrayRef;
1908 Ok((Field::new("count_nulls", DataType::Int64, false), array))
1909 }
1910 AggregateAccumulator::GroupConcat {
1911 values, separator, ..
1912 } => {
1913 use arrow::array::StringBuilder;
1914 let mut builder = StringBuilder::with_capacity(1, 256);
1915 if values.is_empty() {
1916 builder.append_null();
1917 } else {
1918 let result = values.join(&separator);
1919 builder.append_value(&result);
1920 }
1921 let array = Arc::new(builder.finish()) as ArrayRef;
1922 Ok((Field::new("group_concat", DataType::Utf8, true), array))
1923 }
1924 AggregateAccumulator::GroupConcatDistinct {
1925 values, separator, ..
1926 } => {
1927 use arrow::array::StringBuilder;
1928 let mut builder = StringBuilder::with_capacity(1, 256);
1929 if values.is_empty() {
1930 builder.append_null();
1931 } else {
1932 let result = values.join(&separator);
1933 builder.append_value(&result);
1934 }
1935 let array = Arc::new(builder.finish()) as ArrayRef;
1936 Ok((Field::new("group_concat", DataType::Utf8, true), array))
1937 }
1938 }
1939 }
1940}
1941
1942impl AggregateState {
1943 pub fn new(
1951 alias: String,
1952 accumulator: AggregateAccumulator,
1953 override_value: Option<i64>,
1954 ) -> Self {
1955 Self {
1956 alias,
1957 accumulator,
1958 override_value,
1959 }
1960 }
1961
1962 pub fn update(&mut self, batch: &RecordBatch) -> AggregateResult<()> {
1972 self.accumulator.update(batch)
1973 }
1974
1975 pub fn finalize(self) -> AggregateResult<(Field, ArrayRef)> {
1987 let (mut field, array) = self.accumulator.finalize()?;
1988 field = field.with_name(self.alias);
1989 if let Some(value) = self.override_value {
1990 let mut builder = Int64Builder::with_capacity(1);
1991 builder.append_value(value);
1992 let array = Arc::new(builder.finish()) as ArrayRef;
1993 return Ok((field, array));
1994 }
1995 Ok((field, array))
1996 }
1997}