1use crate::execution::chunk::DataChunk;
4use crate::execution::operators::OperatorError;
5use crate::execution::operators::accumulator::{AggregateExpr, AggregateFunction};
6use crate::execution::pipeline::{ChunkSizeHint, PushOperator, Sink};
7#[cfg(feature = "spill")]
8use crate::execution::spill::{PartitionedState, SpillManager};
9use crate::execution::vector::ValueVector;
10use grafeo_common::types::Value;
11use std::collections::HashMap;
12#[cfg(feature = "spill")]
13use std::io::{Read, Write};
14#[cfg(feature = "spill")]
15use std::sync::Arc;
16
17#[derive(Debug, Clone, Default)]
19struct Accumulator {
20 count: i64,
21 sum: f64,
22 min: Option<Value>,
23 max: Option<Value>,
24 first: Option<Value>,
25}
26
27impl Accumulator {
28 fn new() -> Self {
29 Self {
30 count: 0,
31 sum: 0.0,
32 min: None,
33 max: None,
34 first: None,
35 }
36 }
37
38 fn add(&mut self, value: &Value) {
39 if matches!(value, Value::Null) {
41 return;
42 }
43
44 self.count += 1;
45
46 if let Some(n) = value_to_f64(value) {
48 self.sum += n;
49 }
50
51 if self.min.is_none() || compare_for_min(&self.min, value) {
53 self.min = Some(value.clone());
54 }
55
56 if self.max.is_none() || compare_for_max(&self.max, value) {
58 self.max = Some(value.clone());
59 }
60
61 if self.first.is_none() {
63 self.first = Some(value.clone());
64 }
65 }
66
67 fn finalize(&mut self, func: AggregateFunction) -> Value {
68 match func {
69 AggregateFunction::Count | AggregateFunction::CountNonNull => Value::Int64(self.count),
70 AggregateFunction::Sum => {
71 if self.count == 0 {
72 Value::Null
73 } else {
74 Value::Float64(self.sum)
75 }
76 }
77 AggregateFunction::Min => self.min.take().unwrap_or(Value::Null),
78 AggregateFunction::Max => self.max.take().unwrap_or(Value::Null),
79 AggregateFunction::Avg => {
80 if self.count == 0 {
81 Value::Null
82 } else {
83 Value::Float64(self.sum / self.count as f64)
84 }
85 }
86 AggregateFunction::First => self.first.take().unwrap_or(Value::Null),
87 AggregateFunction::Last
90 | AggregateFunction::Collect
91 | AggregateFunction::StdDev
92 | AggregateFunction::StdDevPop
93 | AggregateFunction::Variance
94 | AggregateFunction::VariancePop
95 | AggregateFunction::PercentileDisc
96 | AggregateFunction::PercentileCont
97 | AggregateFunction::GroupConcat
98 | AggregateFunction::Sample
99 | AggregateFunction::CovarSamp
100 | AggregateFunction::CovarPop
101 | AggregateFunction::Corr
102 | AggregateFunction::RegrSlope
103 | AggregateFunction::RegrIntercept
104 | AggregateFunction::RegrR2
105 | AggregateFunction::RegrCount
106 | AggregateFunction::RegrSxx
107 | AggregateFunction::RegrSyy
108 | AggregateFunction::RegrSxy
109 | AggregateFunction::RegrAvgx
110 | AggregateFunction::RegrAvgy => Value::Null,
111 }
112 }
113}
114
115use crate::execution::operators::value_utils::{
116 is_greater_than as compare_for_max, is_less_than as compare_for_min, value_to_f64,
117};
118
119#[derive(Debug, Clone, PartialEq, Eq, Hash)]
121struct GroupKey(Vec<u64>);
122
123impl GroupKey {
124 fn from_row(chunk: &DataChunk, row: usize, group_by: &[usize]) -> Self {
125 let hashes: Vec<u64> = group_by
126 .iter()
127 .map(|&col| {
128 chunk
129 .column(col)
130 .and_then(|c| c.get_value(row))
131 .map_or(0, |v| hash_value(&v))
132 })
133 .collect();
134 Self(hashes)
135 }
136}
137
138fn hash_value(value: &Value) -> u64 {
139 use std::collections::hash_map::DefaultHasher;
140 use std::hash::{Hash, Hasher};
141
142 let mut hasher = DefaultHasher::new();
143 match value {
144 Value::Null => 0u8.hash(&mut hasher),
145 Value::Bool(b) => b.hash(&mut hasher),
146 Value::Int64(i) => i.hash(&mut hasher),
147 Value::Float64(f) => f.to_bits().hash(&mut hasher),
148 Value::String(s) => s.hash(&mut hasher),
149 Value::List(list) => {
150 list.len().hash(&mut hasher);
151 for elem in list.iter() {
152 hash_value(elem).hash(&mut hasher);
153 }
154 }
155 _ => 0u8.hash(&mut hasher),
156 }
157 hasher.finish()
158}
159
160#[derive(Clone)]
162struct GroupState {
163 key_values: Vec<Value>,
164 accumulators: Vec<Accumulator>,
165}
166
167pub struct AggregatePushOperator {
172 group_by: Vec<usize>,
174 aggregates: Vec<AggregateExpr>,
176 groups: HashMap<GroupKey, GroupState>,
178 global_state: Option<Vec<Accumulator>>,
180}
181
182impl AggregatePushOperator {
183 pub fn new(group_by: Vec<usize>, aggregates: Vec<AggregateExpr>) -> Self {
185 let global_state = if group_by.is_empty() {
186 Some(aggregates.iter().map(|_| Accumulator::new()).collect())
187 } else {
188 None
189 };
190
191 Self {
192 group_by,
193 aggregates,
194 groups: HashMap::new(),
195 global_state,
196 }
197 }
198
199 pub fn global(aggregates: Vec<AggregateExpr>) -> Self {
201 Self::new(Vec::new(), aggregates)
202 }
203}
204
205impl PushOperator for AggregatePushOperator {
206 fn push(&mut self, chunk: DataChunk, _sink: &mut dyn Sink) -> Result<bool, OperatorError> {
207 if chunk.is_empty() {
208 return Ok(true);
209 }
210
211 for row in chunk.selected_indices() {
212 if self.group_by.is_empty() {
213 if let Some(ref mut accumulators) = self.global_state {
215 for (acc, expr) in accumulators.iter_mut().zip(&self.aggregates) {
216 if let Some(col) = expr.column {
217 if let Some(c) = chunk.column(col)
218 && let Some(val) = c.get_value(row)
219 {
220 acc.add(&val);
221 }
222 } else {
223 acc.count += 1;
225 }
226 }
227 }
228 } else {
229 let key = GroupKey::from_row(&chunk, row, &self.group_by);
231
232 let state = self.groups.entry(key).or_insert_with(|| {
233 let key_values: Vec<Value> = self
234 .group_by
235 .iter()
236 .map(|&col| {
237 chunk
238 .column(col)
239 .and_then(|c| c.get_value(row))
240 .unwrap_or(Value::Null)
241 })
242 .collect();
243
244 GroupState {
245 key_values,
246 accumulators: self.aggregates.iter().map(|_| Accumulator::new()).collect(),
247 }
248 });
249
250 for (acc, expr) in state.accumulators.iter_mut().zip(&self.aggregates) {
251 if let Some(col) = expr.column {
252 if let Some(c) = chunk.column(col)
253 && let Some(val) = c.get_value(row)
254 {
255 acc.add(&val);
256 }
257 } else {
258 acc.count += 1;
260 }
261 }
262 }
263 }
264
265 Ok(true)
266 }
267
268 fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError> {
269 let num_output_cols = self.group_by.len() + self.aggregates.len();
270 let mut columns: Vec<ValueVector> =
271 (0..num_output_cols).map(|_| ValueVector::new()).collect();
272
273 if self.group_by.is_empty() {
274 if let Some(ref mut accumulators) = self.global_state {
276 for (i, (acc, expr)) in accumulators.iter_mut().zip(&self.aggregates).enumerate() {
277 columns[i].push(acc.finalize(expr.function));
278 }
279 }
280 } else {
281 for state in self.groups.values_mut() {
283 for (i, val) in state.key_values.iter().enumerate() {
285 columns[i].push(val.clone());
286 }
287
288 for (i, (acc, expr)) in state
290 .accumulators
291 .iter_mut()
292 .zip(&self.aggregates)
293 .enumerate()
294 {
295 columns[self.group_by.len() + i].push(acc.finalize(expr.function));
296 }
297 }
298 }
299
300 if !columns.is_empty() && !columns[0].is_empty() {
301 let chunk = DataChunk::new(columns);
302 sink.consume(chunk)?;
303 }
304
305 Ok(())
306 }
307
308 fn preferred_chunk_size(&self) -> ChunkSizeHint {
309 ChunkSizeHint::Default
310 }
311
312 fn name(&self) -> &'static str {
313 "AggregatePush"
314 }
315}
316
317#[cfg(feature = "spill")]
319pub const DEFAULT_AGGREGATE_SPILL_THRESHOLD: usize = 50_000;
320
321#[cfg(feature = "spill")]
323fn serialize_group_state(state: &GroupState, w: &mut dyn Write) -> std::io::Result<()> {
324 use crate::execution::spill::serialize_value;
325
326 w.write_all(&(state.key_values.len() as u64).to_le_bytes())?;
328 for val in &state.key_values {
329 serialize_value(val, w)?;
330 }
331
332 w.write_all(&(state.accumulators.len() as u64).to_le_bytes())?;
334 for acc in &state.accumulators {
335 w.write_all(&acc.count.to_le_bytes())?;
336 w.write_all(&acc.sum.to_bits().to_le_bytes())?;
337
338 let has_min = acc.min.is_some();
340 w.write_all(&[has_min as u8])?;
341 if let Some(ref v) = acc.min {
342 serialize_value(v, w)?;
343 }
344
345 let has_max = acc.max.is_some();
347 w.write_all(&[has_max as u8])?;
348 if let Some(ref v) = acc.max {
349 serialize_value(v, w)?;
350 }
351
352 let has_first = acc.first.is_some();
354 w.write_all(&[has_first as u8])?;
355 if let Some(ref v) = acc.first {
356 serialize_value(v, w)?;
357 }
358 }
359
360 Ok(())
361}
362
363#[cfg(feature = "spill")]
365fn deserialize_group_state(r: &mut dyn Read) -> std::io::Result<GroupState> {
366 use crate::execution::spill::deserialize_value;
367
368 let mut len_buf = [0u8; 8];
370 r.read_exact(&mut len_buf)?;
371 let num_keys = u64::from_le_bytes(len_buf) as usize;
372
373 let mut key_values = Vec::with_capacity(num_keys);
374 for _ in 0..num_keys {
375 key_values.push(deserialize_value(r)?);
376 }
377
378 r.read_exact(&mut len_buf)?;
380 let num_accumulators = u64::from_le_bytes(len_buf) as usize;
381
382 let mut accumulators = Vec::with_capacity(num_accumulators);
383 for _ in 0..num_accumulators {
384 let mut count_buf = [0u8; 8];
385 r.read_exact(&mut count_buf)?;
386 let count = i64::from_le_bytes(count_buf);
387
388 r.read_exact(&mut count_buf)?;
389 let sum = f64::from_bits(u64::from_le_bytes(count_buf));
390
391 let mut flag_buf = [0u8; 1];
393 r.read_exact(&mut flag_buf)?;
394 let min = if flag_buf[0] != 0 {
395 Some(deserialize_value(r)?)
396 } else {
397 None
398 };
399
400 r.read_exact(&mut flag_buf)?;
402 let max = if flag_buf[0] != 0 {
403 Some(deserialize_value(r)?)
404 } else {
405 None
406 };
407
408 r.read_exact(&mut flag_buf)?;
410 let first = if flag_buf[0] != 0 {
411 Some(deserialize_value(r)?)
412 } else {
413 None
414 };
415
416 accumulators.push(Accumulator {
417 count,
418 sum,
419 min,
420 max,
421 first,
422 });
423 }
424
425 Ok(GroupState {
426 key_values,
427 accumulators,
428 })
429}
430
431#[cfg(feature = "spill")]
436pub struct SpillableAggregatePushOperator {
437 group_by: Vec<usize>,
439 aggregates: Vec<AggregateExpr>,
441 spill_manager: Option<Arc<SpillManager>>,
443 partitioned_groups: Option<PartitionedState<GroupState>>,
445 groups: HashMap<GroupKey, GroupState>,
447 global_state: Option<Vec<Accumulator>>,
449 spill_threshold: usize,
451 using_partitioned: bool,
453}
454
455#[cfg(feature = "spill")]
456impl SpillableAggregatePushOperator {
457 pub fn new(group_by: Vec<usize>, aggregates: Vec<AggregateExpr>) -> Self {
459 let global_state = if group_by.is_empty() {
460 Some(aggregates.iter().map(|_| Accumulator::new()).collect())
461 } else {
462 None
463 };
464
465 Self {
466 group_by,
467 aggregates,
468 spill_manager: None,
469 partitioned_groups: None,
470 groups: HashMap::new(),
471 global_state,
472 spill_threshold: DEFAULT_AGGREGATE_SPILL_THRESHOLD,
473 using_partitioned: false,
474 }
475 }
476
477 pub fn with_spilling(
479 group_by: Vec<usize>,
480 aggregates: Vec<AggregateExpr>,
481 manager: Arc<SpillManager>,
482 threshold: usize,
483 ) -> Self {
484 let global_state = if group_by.is_empty() {
485 Some(aggregates.iter().map(|_| Accumulator::new()).collect())
486 } else {
487 None
488 };
489
490 let partitioned = PartitionedState::new(
491 Arc::clone(&manager),
492 256, serialize_group_state,
494 deserialize_group_state,
495 );
496
497 Self {
498 group_by,
499 aggregates,
500 spill_manager: Some(manager),
501 partitioned_groups: Some(partitioned),
502 groups: HashMap::new(),
503 global_state,
504 spill_threshold: threshold,
505 using_partitioned: true,
506 }
507 }
508
509 pub fn global(aggregates: Vec<AggregateExpr>) -> Self {
511 Self::new(Vec::new(), aggregates)
512 }
513
514 pub fn with_threshold(mut self, threshold: usize) -> Self {
516 self.spill_threshold = threshold;
517 self
518 }
519
520 fn maybe_spill(&mut self) -> Result<(), OperatorError> {
522 if self.global_state.is_some() {
523 return Ok(());
525 }
526
527 if let Some(ref mut partitioned) = self.partitioned_groups {
529 if partitioned.total_size() >= self.spill_threshold {
530 partitioned
531 .spill_largest()
532 .map_err(|e| OperatorError::Execution(e.to_string()))?;
533 }
534 } else if self.groups.len() >= self.spill_threshold {
535 if let Some(ref manager) = self.spill_manager {
538 let mut partitioned = PartitionedState::new(
539 Arc::clone(manager),
540 256,
541 serialize_group_state,
542 deserialize_group_state,
543 );
544
545 for (_key, state) in self.groups.drain() {
547 partitioned
548 .insert(state.key_values.clone(), state)
549 .map_err(|e| OperatorError::Execution(e.to_string()))?;
550 }
551
552 self.partitioned_groups = Some(partitioned);
553 self.using_partitioned = true;
554 }
555 }
556
557 Ok(())
558 }
559}
560
561#[cfg(feature = "spill")]
562impl PushOperator for SpillableAggregatePushOperator {
563 fn push(&mut self, chunk: DataChunk, _sink: &mut dyn Sink) -> Result<bool, OperatorError> {
564 if chunk.is_empty() {
565 return Ok(true);
566 }
567
568 for row in chunk.selected_indices() {
569 if self.group_by.is_empty() {
570 if let Some(ref mut accumulators) = self.global_state {
572 for (acc, expr) in accumulators.iter_mut().zip(&self.aggregates) {
573 if let Some(col) = expr.column {
574 if let Some(c) = chunk.column(col)
575 && let Some(val) = c.get_value(row)
576 {
577 acc.add(&val);
578 }
579 } else {
580 acc.count += 1;
581 }
582 }
583 }
584 } else if self.using_partitioned {
585 if let Some(ref mut partitioned) = self.partitioned_groups {
587 let key_values: Vec<Value> = self
588 .group_by
589 .iter()
590 .map(|&col| {
591 chunk
592 .column(col)
593 .and_then(|c| c.get_value(row))
594 .unwrap_or(Value::Null)
595 })
596 .collect();
597
598 let aggregates = &self.aggregates;
599 let state = partitioned
600 .get_or_insert_with(key_values.clone(), || GroupState {
601 key_values: key_values.clone(),
602 accumulators: aggregates.iter().map(|_| Accumulator::new()).collect(),
603 })
604 .map_err(|e| OperatorError::Execution(e.to_string()))?;
605
606 for (acc, expr) in state.accumulators.iter_mut().zip(&self.aggregates) {
607 if let Some(col) = expr.column {
608 if let Some(c) = chunk.column(col)
609 && let Some(val) = c.get_value(row)
610 {
611 acc.add(&val);
612 }
613 } else {
614 acc.count += 1;
615 }
616 }
617 }
618 } else {
619 let key = GroupKey::from_row(&chunk, row, &self.group_by);
621
622 let state = self.groups.entry(key).or_insert_with(|| {
623 let key_values: Vec<Value> = self
624 .group_by
625 .iter()
626 .map(|&col| {
627 chunk
628 .column(col)
629 .and_then(|c| c.get_value(row))
630 .unwrap_or(Value::Null)
631 })
632 .collect();
633
634 GroupState {
635 key_values,
636 accumulators: self.aggregates.iter().map(|_| Accumulator::new()).collect(),
637 }
638 });
639
640 for (acc, expr) in state.accumulators.iter_mut().zip(&self.aggregates) {
641 if let Some(col) = expr.column {
642 if let Some(c) = chunk.column(col)
643 && let Some(val) = c.get_value(row)
644 {
645 acc.add(&val);
646 }
647 } else {
648 acc.count += 1;
649 }
650 }
651 }
652 }
653
654 self.maybe_spill()?;
656
657 Ok(true)
658 }
659
660 fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError> {
661 let num_output_cols = self.group_by.len() + self.aggregates.len();
662 let mut columns: Vec<ValueVector> =
663 (0..num_output_cols).map(|_| ValueVector::new()).collect();
664
665 if self.group_by.is_empty() {
666 if let Some(ref mut accumulators) = self.global_state {
668 for (i, (acc, expr)) in accumulators.iter_mut().zip(&self.aggregates).enumerate() {
669 columns[i].push(acc.finalize(expr.function));
670 }
671 }
672 } else if self.using_partitioned {
673 if let Some(ref mut partitioned) = self.partitioned_groups {
675 let groups = partitioned
676 .drain_all()
677 .map_err(|e| OperatorError::Execution(e.to_string()))?;
678
679 for (_key, mut state) in groups {
680 for (i, val) in state.key_values.iter().enumerate() {
682 columns[i].push(val.clone());
683 }
684
685 for (i, (acc, expr)) in state
687 .accumulators
688 .iter_mut()
689 .zip(&self.aggregates)
690 .enumerate()
691 {
692 columns[self.group_by.len() + i].push(acc.finalize(expr.function));
693 }
694 }
695 }
696 } else {
697 for state in self.groups.values_mut() {
699 for (i, val) in state.key_values.iter().enumerate() {
701 columns[i].push(val.clone());
702 }
703
704 for (i, (acc, expr)) in state
706 .accumulators
707 .iter_mut()
708 .zip(&self.aggregates)
709 .enumerate()
710 {
711 columns[self.group_by.len() + i].push(acc.finalize(expr.function));
712 }
713 }
714 }
715
716 if !columns.is_empty() && !columns[0].is_empty() {
717 let chunk = DataChunk::new(columns);
718 sink.consume(chunk)?;
719 }
720
721 Ok(())
722 }
723
724 fn preferred_chunk_size(&self) -> ChunkSizeHint {
725 ChunkSizeHint::Default
726 }
727
728 fn name(&self) -> &'static str {
729 "SpillableAggregatePush"
730 }
731}
732
733#[cfg(test)]
734mod tests {
735 use super::*;
736 use crate::execution::sink::CollectorSink;
737
738 fn create_test_chunk(values: &[i64]) -> DataChunk {
739 let v: Vec<Value> = values.iter().map(|&i| Value::Int64(i)).collect();
740 let vector = ValueVector::from_values(&v);
741 DataChunk::new(vec![vector])
742 }
743
744 fn create_two_column_chunk(col1: &[i64], col2: &[i64]) -> DataChunk {
745 let v1: Vec<Value> = col1.iter().map(|&i| Value::Int64(i)).collect();
746 let v2: Vec<Value> = col2.iter().map(|&i| Value::Int64(i)).collect();
747 DataChunk::new(vec![
748 ValueVector::from_values(&v1),
749 ValueVector::from_values(&v2),
750 ])
751 }
752
753 #[test]
754 fn test_global_count() {
755 let mut agg = AggregatePushOperator::global(vec![AggregateExpr::count_star()]);
756 let mut sink = CollectorSink::new();
757
758 agg.push(create_test_chunk(&[1, 2, 3, 4, 5]), &mut sink)
759 .unwrap();
760 agg.finalize(&mut sink).unwrap();
761
762 let chunks = sink.into_chunks();
763 assert_eq!(chunks.len(), 1);
764 assert_eq!(
765 chunks[0].column(0).unwrap().get_value(0),
766 Some(Value::Int64(5))
767 );
768 }
769
770 #[test]
771 fn test_global_sum() {
772 let mut agg = AggregatePushOperator::global(vec![AggregateExpr::sum(0)]);
773 let mut sink = CollectorSink::new();
774
775 agg.push(create_test_chunk(&[1, 2, 3, 4, 5]), &mut sink)
776 .unwrap();
777 agg.finalize(&mut sink).unwrap();
778
779 let chunks = sink.into_chunks();
780 assert_eq!(
781 chunks[0].column(0).unwrap().get_value(0),
782 Some(Value::Float64(15.0))
783 );
784 }
785
786 #[test]
787 fn test_global_min_max() {
788 let mut agg =
789 AggregatePushOperator::global(vec![AggregateExpr::min(0), AggregateExpr::max(0)]);
790 let mut sink = CollectorSink::new();
791
792 agg.push(create_test_chunk(&[3, 1, 4, 1, 5, 9, 2, 6]), &mut sink)
793 .unwrap();
794 agg.finalize(&mut sink).unwrap();
795
796 let chunks = sink.into_chunks();
797 assert_eq!(
798 chunks[0].column(0).unwrap().get_value(0),
799 Some(Value::Int64(1))
800 );
801 assert_eq!(
802 chunks[0].column(1).unwrap().get_value(0),
803 Some(Value::Int64(9))
804 );
805 }
806
807 #[test]
808 fn test_group_by_sum() {
809 let mut agg = AggregatePushOperator::new(vec![0], vec![AggregateExpr::sum(1)]);
811 let mut sink = CollectorSink::new();
812
813 agg.push(
815 create_two_column_chunk(&[1, 1, 2, 2], &[10, 20, 30, 40]),
816 &mut sink,
817 )
818 .unwrap();
819 agg.finalize(&mut sink).unwrap();
820
821 let chunks = sink.into_chunks();
822 assert_eq!(chunks[0].len(), 2); }
824
825 #[test]
826 #[cfg(feature = "spill")]
827 fn test_spillable_aggregate_no_spill() {
828 let mut agg = SpillableAggregatePushOperator::new(vec![0], vec![AggregateExpr::sum(1)])
830 .with_threshold(100);
831 let mut sink = CollectorSink::new();
832
833 agg.push(
834 create_two_column_chunk(&[1, 1, 2, 2], &[10, 20, 30, 40]),
835 &mut sink,
836 )
837 .unwrap();
838 agg.finalize(&mut sink).unwrap();
839
840 let chunks = sink.into_chunks();
841 assert_eq!(chunks[0].len(), 2); }
843
844 #[test]
845 #[cfg(feature = "spill")]
846 fn test_spillable_aggregate_with_spilling() {
847 use tempfile::TempDir;
848
849 let temp_dir = TempDir::new().unwrap();
850 let manager = Arc::new(SpillManager::new(temp_dir.path()).unwrap());
851
852 let mut agg = SpillableAggregatePushOperator::with_spilling(
854 vec![0],
855 vec![AggregateExpr::sum(1)],
856 manager,
857 3, );
859 let mut sink = CollectorSink::new();
860
861 for i in 0..10 {
863 let chunk = create_two_column_chunk(&[i], &[i * 10]);
864 agg.push(chunk, &mut sink).unwrap();
865 }
866 agg.finalize(&mut sink).unwrap();
867
868 let chunks = sink.into_chunks();
869 assert_eq!(chunks.len(), 1);
870 assert_eq!(chunks[0].len(), 10); let mut sums: Vec<f64> = Vec::new();
874 for i in 0..chunks[0].len() {
875 if let Some(Value::Float64(sum)) = chunks[0].column(1).unwrap().get_value(i) {
876 sums.push(sum);
877 }
878 }
879 sums.sort_by(|a, b| a.partial_cmp(b).unwrap());
880 assert_eq!(
881 sums,
882 vec![0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0]
883 );
884 }
885
886 #[test]
887 #[cfg(feature = "spill")]
888 fn test_spillable_aggregate_global() {
889 let mut agg = SpillableAggregatePushOperator::global(vec![AggregateExpr::count_star()]);
891 let mut sink = CollectorSink::new();
892
893 agg.push(create_test_chunk(&[1, 2, 3, 4, 5]), &mut sink)
894 .unwrap();
895 agg.finalize(&mut sink).unwrap();
896
897 let chunks = sink.into_chunks();
898 assert_eq!(chunks.len(), 1);
899 assert_eq!(
900 chunks[0].column(0).unwrap().get_value(0),
901 Some(Value::Int64(5))
902 );
903 }
904
905 #[test]
906 #[cfg(feature = "spill")]
907 fn test_spillable_aggregate_many_groups() {
908 use tempfile::TempDir;
909
910 let temp_dir = TempDir::new().unwrap();
911 let manager = Arc::new(SpillManager::new(temp_dir.path()).unwrap());
912
913 let mut agg = SpillableAggregatePushOperator::with_spilling(
914 vec![0],
915 vec![AggregateExpr::count_star()],
916 manager,
917 10, );
919 let mut sink = CollectorSink::new();
920
921 for i in 0..100 {
923 let chunk = create_test_chunk(&[i]);
924 agg.push(chunk, &mut sink).unwrap();
925 }
926 agg.finalize(&mut sink).unwrap();
927
928 let chunks = sink.into_chunks();
929 assert_eq!(chunks.len(), 1);
930 assert_eq!(chunks[0].len(), 100); for i in 0..100 {
934 if let Some(Value::Int64(count)) = chunks[0].column(1).unwrap().get_value(i) {
935 assert_eq!(count, 1);
936 }
937 }
938 }
939}