1use crate::execution::chunk::DataChunk;
4use crate::execution::operators::OperatorError;
5use crate::execution::operators::accumulator::{AggregateExpr, AggregateFunction};
6use crate::execution::pipeline::{ChunkSizeHint, PushOperator, Sink};
7#[cfg(feature = "spill")]
8use crate::execution::spill::{PartitionedState, SpillManager};
9use crate::execution::vector::ValueVector;
10use grafeo_common::types::Value;
11use std::collections::HashMap;
12#[cfg(feature = "spill")]
13use std::io::{Read, Write};
14#[cfg(feature = "spill")]
15use std::sync::Arc;
16
17#[derive(Debug, Clone, Default)]
19struct Accumulator {
20 count: i64,
21 sum: f64,
22 min: Option<Value>,
23 max: Option<Value>,
24 first: Option<Value>,
25}
26
27impl Accumulator {
28 fn new() -> Self {
29 Self {
30 count: 0,
31 sum: 0.0,
32 min: None,
33 max: None,
34 first: None,
35 }
36 }
37
38 fn add(&mut self, value: &Value) {
39 if matches!(value, Value::Null) {
41 return;
42 }
43
44 self.count += 1;
45
46 if let Some(n) = value_to_f64(value) {
48 self.sum += n;
49 }
50
51 if self.min.is_none() || compare_for_min(&self.min, value) {
53 self.min = Some(value.clone());
54 }
55
56 if self.max.is_none() || compare_for_max(&self.max, value) {
58 self.max = Some(value.clone());
59 }
60
61 if self.first.is_none() {
63 self.first = Some(value.clone());
64 }
65 }
66
67 fn finalize(&mut self, func: AggregateFunction) -> Value {
68 match func {
69 AggregateFunction::Count | AggregateFunction::CountNonNull => Value::Int64(self.count),
70 AggregateFunction::Sum => {
71 if self.count == 0 {
72 Value::Null
73 } else {
74 Value::Float64(self.sum)
75 }
76 }
77 AggregateFunction::Min => self.min.take().unwrap_or(Value::Null),
78 AggregateFunction::Max => self.max.take().unwrap_or(Value::Null),
79 AggregateFunction::Avg => {
80 if self.count == 0 {
81 Value::Null
82 } else {
83 Value::Float64(self.sum / self.count as f64)
84 }
85 }
86 AggregateFunction::First => self.first.take().unwrap_or(Value::Null),
87 AggregateFunction::Last
90 | AggregateFunction::Collect
91 | AggregateFunction::StdDev
92 | AggregateFunction::StdDevPop
93 | AggregateFunction::Variance
94 | AggregateFunction::VariancePop
95 | AggregateFunction::PercentileDisc
96 | AggregateFunction::PercentileCont
97 | AggregateFunction::GroupConcat
98 | AggregateFunction::Sample
99 | AggregateFunction::CovarSamp
100 | AggregateFunction::CovarPop
101 | AggregateFunction::Corr
102 | AggregateFunction::RegrSlope
103 | AggregateFunction::RegrIntercept
104 | AggregateFunction::RegrR2
105 | AggregateFunction::RegrCount
106 | AggregateFunction::RegrSxx
107 | AggregateFunction::RegrSyy
108 | AggregateFunction::RegrSxy
109 | AggregateFunction::RegrAvgx
110 | AggregateFunction::RegrAvgy => Value::Null,
111 }
112 }
113}
114
115use crate::execution::operators::value_utils::{
116 is_greater_than as compare_for_max, is_less_than as compare_for_min, value_to_f64,
117};
118
119#[derive(Debug, Clone, PartialEq, Eq, Hash)]
121struct GroupKey(Vec<u64>);
122
123impl GroupKey {
124 fn from_row(chunk: &DataChunk, row: usize, group_by: &[usize]) -> Self {
125 let hashes: Vec<u64> = group_by
126 .iter()
127 .map(|&col| {
128 chunk
129 .column(col)
130 .and_then(|c| c.get_value(row))
131 .map_or(0, |v| hash_value(&v))
132 })
133 .collect();
134 Self(hashes)
135 }
136}
137
138fn hash_value(value: &Value) -> u64 {
139 use std::collections::hash_map::DefaultHasher;
140 use std::hash::{Hash, Hasher};
141
142 let mut hasher = DefaultHasher::new();
143 match value {
144 Value::Null => 0u8.hash(&mut hasher),
145 Value::Bool(b) => b.hash(&mut hasher),
146 Value::Int64(i) => i.hash(&mut hasher),
147 Value::Float64(f) => f.to_bits().hash(&mut hasher),
148 Value::String(s) => s.hash(&mut hasher),
149 _ => 0u8.hash(&mut hasher),
150 }
151 hasher.finish()
152}
153
154#[derive(Clone)]
156struct GroupState {
157 key_values: Vec<Value>,
158 accumulators: Vec<Accumulator>,
159}
160
161pub struct AggregatePushOperator {
166 group_by: Vec<usize>,
168 aggregates: Vec<AggregateExpr>,
170 groups: HashMap<GroupKey, GroupState>,
172 global_state: Option<Vec<Accumulator>>,
174}
175
176impl AggregatePushOperator {
177 pub fn new(group_by: Vec<usize>, aggregates: Vec<AggregateExpr>) -> Self {
179 let global_state = if group_by.is_empty() {
180 Some(aggregates.iter().map(|_| Accumulator::new()).collect())
181 } else {
182 None
183 };
184
185 Self {
186 group_by,
187 aggregates,
188 groups: HashMap::new(),
189 global_state,
190 }
191 }
192
193 pub fn global(aggregates: Vec<AggregateExpr>) -> Self {
195 Self::new(Vec::new(), aggregates)
196 }
197}
198
199impl PushOperator for AggregatePushOperator {
200 fn push(&mut self, chunk: DataChunk, _sink: &mut dyn Sink) -> Result<bool, OperatorError> {
201 if chunk.is_empty() {
202 return Ok(true);
203 }
204
205 for row in chunk.selected_indices() {
206 if self.group_by.is_empty() {
207 if let Some(ref mut accumulators) = self.global_state {
209 for (acc, expr) in accumulators.iter_mut().zip(&self.aggregates) {
210 if let Some(col) = expr.column {
211 if let Some(c) = chunk.column(col)
212 && let Some(val) = c.get_value(row)
213 {
214 acc.add(&val);
215 }
216 } else {
217 acc.count += 1;
219 }
220 }
221 }
222 } else {
223 let key = GroupKey::from_row(&chunk, row, &self.group_by);
225
226 let state = self.groups.entry(key).or_insert_with(|| {
227 let key_values: Vec<Value> = self
228 .group_by
229 .iter()
230 .map(|&col| {
231 chunk
232 .column(col)
233 .and_then(|c| c.get_value(row))
234 .unwrap_or(Value::Null)
235 })
236 .collect();
237
238 GroupState {
239 key_values,
240 accumulators: self.aggregates.iter().map(|_| Accumulator::new()).collect(),
241 }
242 });
243
244 for (acc, expr) in state.accumulators.iter_mut().zip(&self.aggregates) {
245 if let Some(col) = expr.column {
246 if let Some(c) = chunk.column(col)
247 && let Some(val) = c.get_value(row)
248 {
249 acc.add(&val);
250 }
251 } else {
252 acc.count += 1;
254 }
255 }
256 }
257 }
258
259 Ok(true)
260 }
261
262 fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError> {
263 let num_output_cols = self.group_by.len() + self.aggregates.len();
264 let mut columns: Vec<ValueVector> =
265 (0..num_output_cols).map(|_| ValueVector::new()).collect();
266
267 if self.group_by.is_empty() {
268 if let Some(ref mut accumulators) = self.global_state {
270 for (i, (acc, expr)) in accumulators.iter_mut().zip(&self.aggregates).enumerate() {
271 columns[i].push(acc.finalize(expr.function));
272 }
273 }
274 } else {
275 for state in self.groups.values_mut() {
277 for (i, val) in state.key_values.iter().enumerate() {
279 columns[i].push(val.clone());
280 }
281
282 for (i, (acc, expr)) in state
284 .accumulators
285 .iter_mut()
286 .zip(&self.aggregates)
287 .enumerate()
288 {
289 columns[self.group_by.len() + i].push(acc.finalize(expr.function));
290 }
291 }
292 }
293
294 if !columns.is_empty() && !columns[0].is_empty() {
295 let chunk = DataChunk::new(columns);
296 sink.consume(chunk)?;
297 }
298
299 Ok(())
300 }
301
302 fn preferred_chunk_size(&self) -> ChunkSizeHint {
303 ChunkSizeHint::Default
304 }
305
306 fn name(&self) -> &'static str {
307 "AggregatePush"
308 }
309}
310
311#[cfg(feature = "spill")]
313pub const DEFAULT_AGGREGATE_SPILL_THRESHOLD: usize = 50_000;
314
315#[cfg(feature = "spill")]
317fn serialize_group_state(state: &GroupState, w: &mut dyn Write) -> std::io::Result<()> {
318 use crate::execution::spill::serialize_value;
319
320 w.write_all(&(state.key_values.len() as u64).to_le_bytes())?;
322 for val in &state.key_values {
323 serialize_value(val, w)?;
324 }
325
326 w.write_all(&(state.accumulators.len() as u64).to_le_bytes())?;
328 for acc in &state.accumulators {
329 w.write_all(&acc.count.to_le_bytes())?;
330 w.write_all(&acc.sum.to_bits().to_le_bytes())?;
331
332 let has_min = acc.min.is_some();
334 w.write_all(&[has_min as u8])?;
335 if let Some(ref v) = acc.min {
336 serialize_value(v, w)?;
337 }
338
339 let has_max = acc.max.is_some();
341 w.write_all(&[has_max as u8])?;
342 if let Some(ref v) = acc.max {
343 serialize_value(v, w)?;
344 }
345
346 let has_first = acc.first.is_some();
348 w.write_all(&[has_first as u8])?;
349 if let Some(ref v) = acc.first {
350 serialize_value(v, w)?;
351 }
352 }
353
354 Ok(())
355}
356
357#[cfg(feature = "spill")]
359fn deserialize_group_state(r: &mut dyn Read) -> std::io::Result<GroupState> {
360 use crate::execution::spill::deserialize_value;
361
362 let mut len_buf = [0u8; 8];
364 r.read_exact(&mut len_buf)?;
365 let num_keys = u64::from_le_bytes(len_buf) as usize;
366
367 let mut key_values = Vec::with_capacity(num_keys);
368 for _ in 0..num_keys {
369 key_values.push(deserialize_value(r)?);
370 }
371
372 r.read_exact(&mut len_buf)?;
374 let num_accumulators = u64::from_le_bytes(len_buf) as usize;
375
376 let mut accumulators = Vec::with_capacity(num_accumulators);
377 for _ in 0..num_accumulators {
378 let mut count_buf = [0u8; 8];
379 r.read_exact(&mut count_buf)?;
380 let count = i64::from_le_bytes(count_buf);
381
382 r.read_exact(&mut count_buf)?;
383 let sum = f64::from_bits(u64::from_le_bytes(count_buf));
384
385 let mut flag_buf = [0u8; 1];
387 r.read_exact(&mut flag_buf)?;
388 let min = if flag_buf[0] != 0 {
389 Some(deserialize_value(r)?)
390 } else {
391 None
392 };
393
394 r.read_exact(&mut flag_buf)?;
396 let max = if flag_buf[0] != 0 {
397 Some(deserialize_value(r)?)
398 } else {
399 None
400 };
401
402 r.read_exact(&mut flag_buf)?;
404 let first = if flag_buf[0] != 0 {
405 Some(deserialize_value(r)?)
406 } else {
407 None
408 };
409
410 accumulators.push(Accumulator {
411 count,
412 sum,
413 min,
414 max,
415 first,
416 });
417 }
418
419 Ok(GroupState {
420 key_values,
421 accumulators,
422 })
423}
424
425#[cfg(feature = "spill")]
430pub struct SpillableAggregatePushOperator {
431 group_by: Vec<usize>,
433 aggregates: Vec<AggregateExpr>,
435 spill_manager: Option<Arc<SpillManager>>,
437 partitioned_groups: Option<PartitionedState<GroupState>>,
439 groups: HashMap<GroupKey, GroupState>,
441 global_state: Option<Vec<Accumulator>>,
443 spill_threshold: usize,
445 using_partitioned: bool,
447}
448
449#[cfg(feature = "spill")]
450impl SpillableAggregatePushOperator {
451 pub fn new(group_by: Vec<usize>, aggregates: Vec<AggregateExpr>) -> Self {
453 let global_state = if group_by.is_empty() {
454 Some(aggregates.iter().map(|_| Accumulator::new()).collect())
455 } else {
456 None
457 };
458
459 Self {
460 group_by,
461 aggregates,
462 spill_manager: None,
463 partitioned_groups: None,
464 groups: HashMap::new(),
465 global_state,
466 spill_threshold: DEFAULT_AGGREGATE_SPILL_THRESHOLD,
467 using_partitioned: false,
468 }
469 }
470
471 pub fn with_spilling(
473 group_by: Vec<usize>,
474 aggregates: Vec<AggregateExpr>,
475 manager: Arc<SpillManager>,
476 threshold: usize,
477 ) -> Self {
478 let global_state = if group_by.is_empty() {
479 Some(aggregates.iter().map(|_| Accumulator::new()).collect())
480 } else {
481 None
482 };
483
484 let partitioned = PartitionedState::new(
485 Arc::clone(&manager),
486 256, serialize_group_state,
488 deserialize_group_state,
489 );
490
491 Self {
492 group_by,
493 aggregates,
494 spill_manager: Some(manager),
495 partitioned_groups: Some(partitioned),
496 groups: HashMap::new(),
497 global_state,
498 spill_threshold: threshold,
499 using_partitioned: true,
500 }
501 }
502
503 pub fn global(aggregates: Vec<AggregateExpr>) -> Self {
505 Self::new(Vec::new(), aggregates)
506 }
507
508 pub fn with_threshold(mut self, threshold: usize) -> Self {
510 self.spill_threshold = threshold;
511 self
512 }
513
514 fn maybe_spill(&mut self) -> Result<(), OperatorError> {
516 if self.global_state.is_some() {
517 return Ok(());
519 }
520
521 if let Some(ref mut partitioned) = self.partitioned_groups {
523 if partitioned.total_size() >= self.spill_threshold {
524 partitioned
525 .spill_largest()
526 .map_err(|e| OperatorError::Execution(e.to_string()))?;
527 }
528 } else if self.groups.len() >= self.spill_threshold {
529 if let Some(ref manager) = self.spill_manager {
532 let mut partitioned = PartitionedState::new(
533 Arc::clone(manager),
534 256,
535 serialize_group_state,
536 deserialize_group_state,
537 );
538
539 for (_key, state) in self.groups.drain() {
541 partitioned
542 .insert(state.key_values.clone(), state)
543 .map_err(|e| OperatorError::Execution(e.to_string()))?;
544 }
545
546 self.partitioned_groups = Some(partitioned);
547 self.using_partitioned = true;
548 }
549 }
550
551 Ok(())
552 }
553}
554
555#[cfg(feature = "spill")]
556impl PushOperator for SpillableAggregatePushOperator {
557 fn push(&mut self, chunk: DataChunk, _sink: &mut dyn Sink) -> Result<bool, OperatorError> {
558 if chunk.is_empty() {
559 return Ok(true);
560 }
561
562 for row in chunk.selected_indices() {
563 if self.group_by.is_empty() {
564 if let Some(ref mut accumulators) = self.global_state {
566 for (acc, expr) in accumulators.iter_mut().zip(&self.aggregates) {
567 if let Some(col) = expr.column {
568 if let Some(c) = chunk.column(col)
569 && let Some(val) = c.get_value(row)
570 {
571 acc.add(&val);
572 }
573 } else {
574 acc.count += 1;
575 }
576 }
577 }
578 } else if self.using_partitioned {
579 if let Some(ref mut partitioned) = self.partitioned_groups {
581 let key_values: Vec<Value> = self
582 .group_by
583 .iter()
584 .map(|&col| {
585 chunk
586 .column(col)
587 .and_then(|c| c.get_value(row))
588 .unwrap_or(Value::Null)
589 })
590 .collect();
591
592 let aggregates = &self.aggregates;
593 let state = partitioned
594 .get_or_insert_with(key_values.clone(), || GroupState {
595 key_values: key_values.clone(),
596 accumulators: aggregates.iter().map(|_| Accumulator::new()).collect(),
597 })
598 .map_err(|e| OperatorError::Execution(e.to_string()))?;
599
600 for (acc, expr) in state.accumulators.iter_mut().zip(&self.aggregates) {
601 if let Some(col) = expr.column {
602 if let Some(c) = chunk.column(col)
603 && let Some(val) = c.get_value(row)
604 {
605 acc.add(&val);
606 }
607 } else {
608 acc.count += 1;
609 }
610 }
611 }
612 } else {
613 let key = GroupKey::from_row(&chunk, row, &self.group_by);
615
616 let state = self.groups.entry(key).or_insert_with(|| {
617 let key_values: Vec<Value> = self
618 .group_by
619 .iter()
620 .map(|&col| {
621 chunk
622 .column(col)
623 .and_then(|c| c.get_value(row))
624 .unwrap_or(Value::Null)
625 })
626 .collect();
627
628 GroupState {
629 key_values,
630 accumulators: self.aggregates.iter().map(|_| Accumulator::new()).collect(),
631 }
632 });
633
634 for (acc, expr) in state.accumulators.iter_mut().zip(&self.aggregates) {
635 if let Some(col) = expr.column {
636 if let Some(c) = chunk.column(col)
637 && let Some(val) = c.get_value(row)
638 {
639 acc.add(&val);
640 }
641 } else {
642 acc.count += 1;
643 }
644 }
645 }
646 }
647
648 self.maybe_spill()?;
650
651 Ok(true)
652 }
653
654 fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError> {
655 let num_output_cols = self.group_by.len() + self.aggregates.len();
656 let mut columns: Vec<ValueVector> =
657 (0..num_output_cols).map(|_| ValueVector::new()).collect();
658
659 if self.group_by.is_empty() {
660 if let Some(ref mut accumulators) = self.global_state {
662 for (i, (acc, expr)) in accumulators.iter_mut().zip(&self.aggregates).enumerate() {
663 columns[i].push(acc.finalize(expr.function));
664 }
665 }
666 } else if self.using_partitioned {
667 if let Some(ref mut partitioned) = self.partitioned_groups {
669 let groups = partitioned
670 .drain_all()
671 .map_err(|e| OperatorError::Execution(e.to_string()))?;
672
673 for (_key, mut state) in groups {
674 for (i, val) in state.key_values.iter().enumerate() {
676 columns[i].push(val.clone());
677 }
678
679 for (i, (acc, expr)) in state
681 .accumulators
682 .iter_mut()
683 .zip(&self.aggregates)
684 .enumerate()
685 {
686 columns[self.group_by.len() + i].push(acc.finalize(expr.function));
687 }
688 }
689 }
690 } else {
691 for state in self.groups.values_mut() {
693 for (i, val) in state.key_values.iter().enumerate() {
695 columns[i].push(val.clone());
696 }
697
698 for (i, (acc, expr)) in state
700 .accumulators
701 .iter_mut()
702 .zip(&self.aggregates)
703 .enumerate()
704 {
705 columns[self.group_by.len() + i].push(acc.finalize(expr.function));
706 }
707 }
708 }
709
710 if !columns.is_empty() && !columns[0].is_empty() {
711 let chunk = DataChunk::new(columns);
712 sink.consume(chunk)?;
713 }
714
715 Ok(())
716 }
717
718 fn preferred_chunk_size(&self) -> ChunkSizeHint {
719 ChunkSizeHint::Default
720 }
721
722 fn name(&self) -> &'static str {
723 "SpillableAggregatePush"
724 }
725}
726
727#[cfg(test)]
728mod tests {
729 use super::*;
730 use crate::execution::sink::CollectorSink;
731
732 fn create_test_chunk(values: &[i64]) -> DataChunk {
733 let v: Vec<Value> = values.iter().map(|&i| Value::Int64(i)).collect();
734 let vector = ValueVector::from_values(&v);
735 DataChunk::new(vec![vector])
736 }
737
738 fn create_two_column_chunk(col1: &[i64], col2: &[i64]) -> DataChunk {
739 let v1: Vec<Value> = col1.iter().map(|&i| Value::Int64(i)).collect();
740 let v2: Vec<Value> = col2.iter().map(|&i| Value::Int64(i)).collect();
741 DataChunk::new(vec![
742 ValueVector::from_values(&v1),
743 ValueVector::from_values(&v2),
744 ])
745 }
746
747 #[test]
748 fn test_global_count() {
749 let mut agg = AggregatePushOperator::global(vec![AggregateExpr::count_star()]);
750 let mut sink = CollectorSink::new();
751
752 agg.push(create_test_chunk(&[1, 2, 3, 4, 5]), &mut sink)
753 .unwrap();
754 agg.finalize(&mut sink).unwrap();
755
756 let chunks = sink.into_chunks();
757 assert_eq!(chunks.len(), 1);
758 assert_eq!(
759 chunks[0].column(0).unwrap().get_value(0),
760 Some(Value::Int64(5))
761 );
762 }
763
764 #[test]
765 fn test_global_sum() {
766 let mut agg = AggregatePushOperator::global(vec![AggregateExpr::sum(0)]);
767 let mut sink = CollectorSink::new();
768
769 agg.push(create_test_chunk(&[1, 2, 3, 4, 5]), &mut sink)
770 .unwrap();
771 agg.finalize(&mut sink).unwrap();
772
773 let chunks = sink.into_chunks();
774 assert_eq!(
775 chunks[0].column(0).unwrap().get_value(0),
776 Some(Value::Float64(15.0))
777 );
778 }
779
780 #[test]
781 fn test_global_min_max() {
782 let mut agg =
783 AggregatePushOperator::global(vec![AggregateExpr::min(0), AggregateExpr::max(0)]);
784 let mut sink = CollectorSink::new();
785
786 agg.push(create_test_chunk(&[3, 1, 4, 1, 5, 9, 2, 6]), &mut sink)
787 .unwrap();
788 agg.finalize(&mut sink).unwrap();
789
790 let chunks = sink.into_chunks();
791 assert_eq!(
792 chunks[0].column(0).unwrap().get_value(0),
793 Some(Value::Int64(1))
794 );
795 assert_eq!(
796 chunks[0].column(1).unwrap().get_value(0),
797 Some(Value::Int64(9))
798 );
799 }
800
801 #[test]
802 fn test_group_by_sum() {
803 let mut agg = AggregatePushOperator::new(vec![0], vec![AggregateExpr::sum(1)]);
805 let mut sink = CollectorSink::new();
806
807 agg.push(
809 create_two_column_chunk(&[1, 1, 2, 2], &[10, 20, 30, 40]),
810 &mut sink,
811 )
812 .unwrap();
813 agg.finalize(&mut sink).unwrap();
814
815 let chunks = sink.into_chunks();
816 assert_eq!(chunks[0].len(), 2); }
818
819 #[test]
820 #[cfg(feature = "spill")]
821 fn test_spillable_aggregate_no_spill() {
822 let mut agg = SpillableAggregatePushOperator::new(vec![0], vec![AggregateExpr::sum(1)])
824 .with_threshold(100);
825 let mut sink = CollectorSink::new();
826
827 agg.push(
828 create_two_column_chunk(&[1, 1, 2, 2], &[10, 20, 30, 40]),
829 &mut sink,
830 )
831 .unwrap();
832 agg.finalize(&mut sink).unwrap();
833
834 let chunks = sink.into_chunks();
835 assert_eq!(chunks[0].len(), 2); }
837
838 #[test]
839 #[cfg(feature = "spill")]
840 fn test_spillable_aggregate_with_spilling() {
841 use tempfile::TempDir;
842
843 let temp_dir = TempDir::new().unwrap();
844 let manager = Arc::new(SpillManager::new(temp_dir.path()).unwrap());
845
846 let mut agg = SpillableAggregatePushOperator::with_spilling(
848 vec![0],
849 vec![AggregateExpr::sum(1)],
850 manager,
851 3, );
853 let mut sink = CollectorSink::new();
854
855 for i in 0..10 {
857 let chunk = create_two_column_chunk(&[i], &[i * 10]);
858 agg.push(chunk, &mut sink).unwrap();
859 }
860 agg.finalize(&mut sink).unwrap();
861
862 let chunks = sink.into_chunks();
863 assert_eq!(chunks.len(), 1);
864 assert_eq!(chunks[0].len(), 10); let mut sums: Vec<f64> = Vec::new();
868 for i in 0..chunks[0].len() {
869 if let Some(Value::Float64(sum)) = chunks[0].column(1).unwrap().get_value(i) {
870 sums.push(sum);
871 }
872 }
873 sums.sort_by(|a, b| a.partial_cmp(b).unwrap());
874 assert_eq!(
875 sums,
876 vec![0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0]
877 );
878 }
879
880 #[test]
881 #[cfg(feature = "spill")]
882 fn test_spillable_aggregate_global() {
883 let mut agg = SpillableAggregatePushOperator::global(vec![AggregateExpr::count_star()]);
885 let mut sink = CollectorSink::new();
886
887 agg.push(create_test_chunk(&[1, 2, 3, 4, 5]), &mut sink)
888 .unwrap();
889 agg.finalize(&mut sink).unwrap();
890
891 let chunks = sink.into_chunks();
892 assert_eq!(chunks.len(), 1);
893 assert_eq!(
894 chunks[0].column(0).unwrap().get_value(0),
895 Some(Value::Int64(5))
896 );
897 }
898
899 #[test]
900 #[cfg(feature = "spill")]
901 fn test_spillable_aggregate_many_groups() {
902 use tempfile::TempDir;
903
904 let temp_dir = TempDir::new().unwrap();
905 let manager = Arc::new(SpillManager::new(temp_dir.path()).unwrap());
906
907 let mut agg = SpillableAggregatePushOperator::with_spilling(
908 vec![0],
909 vec![AggregateExpr::count_star()],
910 manager,
911 10, );
913 let mut sink = CollectorSink::new();
914
915 for i in 0..100 {
917 let chunk = create_test_chunk(&[i]);
918 agg.push(chunk, &mut sink).unwrap();
919 }
920 agg.finalize(&mut sink).unwrap();
921
922 let chunks = sink.into_chunks();
923 assert_eq!(chunks.len(), 1);
924 assert_eq!(chunks[0].len(), 100); for i in 0..100 {
928 if let Some(Value::Int64(count)) = chunks[0].column(1).unwrap().get_value(i) {
929 assert_eq!(count, 1);
930 }
931 }
932 }
933}