Skip to main content

grafeo_core/execution/operators/push/
aggregate.rs

1//! Push-based aggregate operator (pipeline breaker).
2
3use crate::execution::chunk::DataChunk;
4use crate::execution::operators::OperatorError;
5use crate::execution::operators::accumulator::{AggregateExpr, AggregateFunction};
6use crate::execution::pipeline::{ChunkSizeHint, PushOperator, Sink};
7#[cfg(feature = "spill")]
8use crate::execution::spill::{PartitionedState, SpillManager};
9use crate::execution::vector::ValueVector;
10use grafeo_common::types::Value;
11use std::collections::HashMap;
12#[cfg(feature = "spill")]
13use std::io::{Read, Write};
14#[cfg(feature = "spill")]
15use std::sync::Arc;
16
17/// Accumulator for aggregate state.
18#[derive(Debug, Clone, Default)]
19struct Accumulator {
20    count: i64,
21    sum: f64,
22    min: Option<Value>,
23    max: Option<Value>,
24    first: Option<Value>,
25}
26
27impl Accumulator {
28    fn new() -> Self {
29        Self {
30            count: 0,
31            sum: 0.0,
32            min: None,
33            max: None,
34            first: None,
35        }
36    }
37
38    fn add(&mut self, value: &Value) {
39        // Skip nulls for aggregates
40        if matches!(value, Value::Null) {
41            return;
42        }
43
44        self.count += 1;
45
46        // Sum (for numeric types)
47        if let Some(n) = value_to_f64(value) {
48            self.sum += n;
49        }
50
51        // Min
52        if self.min.is_none() || compare_for_min(&self.min, value) {
53            self.min = Some(value.clone());
54        }
55
56        // Max
57        if self.max.is_none() || compare_for_max(&self.max, value) {
58            self.max = Some(value.clone());
59        }
60
61        // First
62        if self.first.is_none() {
63            self.first = Some(value.clone());
64        }
65    }
66
67    fn finalize(&mut self, func: AggregateFunction) -> Value {
68        match func {
69            AggregateFunction::Count | AggregateFunction::CountNonNull => Value::Int64(self.count),
70            AggregateFunction::Sum => {
71                if self.count == 0 {
72                    Value::Null
73                } else {
74                    Value::Float64(self.sum)
75                }
76            }
77            AggregateFunction::Min => self.min.take().unwrap_or(Value::Null),
78            AggregateFunction::Max => self.max.take().unwrap_or(Value::Null),
79            AggregateFunction::Avg => {
80                if self.count == 0 {
81                    Value::Null
82                } else {
83                    Value::Float64(self.sum / self.count as f64)
84                }
85            }
86            AggregateFunction::First => self.first.take().unwrap_or(Value::Null),
87            // Advanced functions not supported in push-based accumulator;
88            // these are handled by the pull-based AggregateState instead.
89            AggregateFunction::Last
90            | AggregateFunction::Collect
91            | AggregateFunction::StdDev
92            | AggregateFunction::StdDevPop
93            | AggregateFunction::Variance
94            | AggregateFunction::VariancePop
95            | AggregateFunction::PercentileDisc
96            | AggregateFunction::PercentileCont
97            | AggregateFunction::GroupConcat
98            | AggregateFunction::Sample
99            | AggregateFunction::CovarSamp
100            | AggregateFunction::CovarPop
101            | AggregateFunction::Corr
102            | AggregateFunction::RegrSlope
103            | AggregateFunction::RegrIntercept
104            | AggregateFunction::RegrR2
105            | AggregateFunction::RegrCount
106            | AggregateFunction::RegrSxx
107            | AggregateFunction::RegrSyy
108            | AggregateFunction::RegrSxy
109            | AggregateFunction::RegrAvgx
110            | AggregateFunction::RegrAvgy => Value::Null,
111        }
112    }
113}
114
115use crate::execution::operators::value_utils::{
116    is_greater_than as compare_for_max, is_less_than as compare_for_min, value_to_f64,
117};
118
119/// Hash key for grouping.
120#[derive(Debug, Clone, PartialEq, Eq, Hash)]
121struct GroupKey(Vec<u64>);
122
123impl GroupKey {
124    fn from_row(chunk: &DataChunk, row: usize, group_by: &[usize]) -> Self {
125        let hashes: Vec<u64> = group_by
126            .iter()
127            .map(|&col| {
128                chunk
129                    .column(col)
130                    .and_then(|c| c.get_value(row))
131                    .map_or(0, |v| hash_value(&v))
132            })
133            .collect();
134        Self(hashes)
135    }
136}
137
138fn hash_value(value: &Value) -> u64 {
139    use std::collections::hash_map::DefaultHasher;
140    use std::hash::{Hash, Hasher};
141
142    let mut hasher = DefaultHasher::new();
143    match value {
144        Value::Null => 0u8.hash(&mut hasher),
145        Value::Bool(b) => b.hash(&mut hasher),
146        Value::Int64(i) => i.hash(&mut hasher),
147        Value::Float64(f) => f.to_bits().hash(&mut hasher),
148        Value::String(s) => s.hash(&mut hasher),
149        Value::List(list) => {
150            list.len().hash(&mut hasher);
151            for elem in list.iter() {
152                hash_value(elem).hash(&mut hasher);
153            }
154        }
155        _ => 0u8.hash(&mut hasher),
156    }
157    hasher.finish()
158}
159
160/// Group state with key values and accumulators.
161#[derive(Clone)]
162struct GroupState {
163    key_values: Vec<Value>,
164    accumulators: Vec<Accumulator>,
165}
166
167/// Push-based aggregate operator.
168///
169/// This is a pipeline breaker that accumulates all input, groups by key,
170/// and produces aggregated output in the finalize phase.
171pub struct AggregatePushOperator {
172    /// Columns to group by.
173    group_by: Vec<usize>,
174    /// Aggregate expressions.
175    aggregates: Vec<AggregateExpr>,
176    /// Group states by hash key.
177    groups: HashMap<GroupKey, GroupState>,
178    /// Global accumulator (for no GROUP BY).
179    global_state: Option<Vec<Accumulator>>,
180}
181
182impl AggregatePushOperator {
183    /// Create a new aggregate operator.
184    pub fn new(group_by: Vec<usize>, aggregates: Vec<AggregateExpr>) -> Self {
185        let global_state = if group_by.is_empty() {
186            Some(aggregates.iter().map(|_| Accumulator::new()).collect())
187        } else {
188            None
189        };
190
191        Self {
192            group_by,
193            aggregates,
194            groups: HashMap::new(),
195            global_state,
196        }
197    }
198
199    /// Create a simple global aggregate (no GROUP BY).
200    pub fn global(aggregates: Vec<AggregateExpr>) -> Self {
201        Self::new(Vec::new(), aggregates)
202    }
203}
204
205impl PushOperator for AggregatePushOperator {
206    fn push(&mut self, chunk: DataChunk, _sink: &mut dyn Sink) -> Result<bool, OperatorError> {
207        if chunk.is_empty() {
208            return Ok(true);
209        }
210
211        for row in chunk.selected_indices() {
212            if self.group_by.is_empty() {
213                // Global aggregation
214                if let Some(ref mut accumulators) = self.global_state {
215                    for (acc, expr) in accumulators.iter_mut().zip(&self.aggregates) {
216                        if let Some(col) = expr.column {
217                            if let Some(c) = chunk.column(col)
218                                && let Some(val) = c.get_value(row)
219                            {
220                                acc.add(&val);
221                            }
222                        } else {
223                            // COUNT(*)
224                            acc.count += 1;
225                        }
226                    }
227                }
228            } else {
229                // Group by aggregation
230                let key = GroupKey::from_row(&chunk, row, &self.group_by);
231
232                let state = self.groups.entry(key).or_insert_with(|| {
233                    let key_values: Vec<Value> = self
234                        .group_by
235                        .iter()
236                        .map(|&col| {
237                            chunk
238                                .column(col)
239                                .and_then(|c| c.get_value(row))
240                                .unwrap_or(Value::Null)
241                        })
242                        .collect();
243
244                    GroupState {
245                        key_values,
246                        accumulators: self.aggregates.iter().map(|_| Accumulator::new()).collect(),
247                    }
248                });
249
250                for (acc, expr) in state.accumulators.iter_mut().zip(&self.aggregates) {
251                    if let Some(col) = expr.column {
252                        if let Some(c) = chunk.column(col)
253                            && let Some(val) = c.get_value(row)
254                        {
255                            acc.add(&val);
256                        }
257                    } else {
258                        // COUNT(*)
259                        acc.count += 1;
260                    }
261                }
262            }
263        }
264
265        Ok(true)
266    }
267
268    fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError> {
269        let num_output_cols = self.group_by.len() + self.aggregates.len();
270        let mut columns: Vec<ValueVector> =
271            (0..num_output_cols).map(|_| ValueVector::new()).collect();
272
273        if self.group_by.is_empty() {
274            // Global aggregation - single row output
275            if let Some(ref mut accumulators) = self.global_state {
276                for (i, (acc, expr)) in accumulators.iter_mut().zip(&self.aggregates).enumerate() {
277                    columns[i].push(acc.finalize(expr.function));
278                }
279            }
280        } else {
281            // Group by - one row per group
282            for state in self.groups.values_mut() {
283                // Output group key columns
284                for (i, val) in state.key_values.iter().enumerate() {
285                    columns[i].push(val.clone());
286                }
287
288                // Output aggregate results
289                for (i, (acc, expr)) in state
290                    .accumulators
291                    .iter_mut()
292                    .zip(&self.aggregates)
293                    .enumerate()
294                {
295                    columns[self.group_by.len() + i].push(acc.finalize(expr.function));
296                }
297            }
298        }
299
300        if !columns.is_empty() && !columns[0].is_empty() {
301            let chunk = DataChunk::new(columns);
302            sink.consume(chunk)?;
303        }
304
305        Ok(())
306    }
307
308    fn preferred_chunk_size(&self) -> ChunkSizeHint {
309        ChunkSizeHint::Default
310    }
311
312    fn name(&self) -> &'static str {
313        "AggregatePush"
314    }
315}
316
317/// Default spill threshold for aggregates (number of groups).
318#[cfg(feature = "spill")]
319pub const DEFAULT_AGGREGATE_SPILL_THRESHOLD: usize = 50_000;
320
321/// Serializes a GroupState to bytes.
322#[cfg(feature = "spill")]
323fn serialize_group_state(state: &GroupState, w: &mut dyn Write) -> std::io::Result<()> {
324    use crate::execution::spill::serialize_value;
325
326    // Write key values
327    w.write_all(&(state.key_values.len() as u64).to_le_bytes())?;
328    for val in &state.key_values {
329        serialize_value(val, w)?;
330    }
331
332    // Write accumulators
333    w.write_all(&(state.accumulators.len() as u64).to_le_bytes())?;
334    for acc in &state.accumulators {
335        w.write_all(&acc.count.to_le_bytes())?;
336        w.write_all(&acc.sum.to_bits().to_le_bytes())?;
337
338        // Min
339        let has_min = acc.min.is_some();
340        w.write_all(&[has_min as u8])?;
341        if let Some(ref v) = acc.min {
342            serialize_value(v, w)?;
343        }
344
345        // Max
346        let has_max = acc.max.is_some();
347        w.write_all(&[has_max as u8])?;
348        if let Some(ref v) = acc.max {
349            serialize_value(v, w)?;
350        }
351
352        // First
353        let has_first = acc.first.is_some();
354        w.write_all(&[has_first as u8])?;
355        if let Some(ref v) = acc.first {
356            serialize_value(v, w)?;
357        }
358    }
359
360    Ok(())
361}
362
363/// Deserializes a GroupState from bytes.
364#[cfg(feature = "spill")]
365fn deserialize_group_state(r: &mut dyn Read) -> std::io::Result<GroupState> {
366    use crate::execution::spill::deserialize_value;
367
368    // Read key values
369    let mut len_buf = [0u8; 8];
370    r.read_exact(&mut len_buf)?;
371    let num_keys = u64::from_le_bytes(len_buf) as usize;
372
373    let mut key_values = Vec::with_capacity(num_keys);
374    for _ in 0..num_keys {
375        key_values.push(deserialize_value(r)?);
376    }
377
378    // Read accumulators
379    r.read_exact(&mut len_buf)?;
380    let num_accumulators = u64::from_le_bytes(len_buf) as usize;
381
382    let mut accumulators = Vec::with_capacity(num_accumulators);
383    for _ in 0..num_accumulators {
384        let mut count_buf = [0u8; 8];
385        r.read_exact(&mut count_buf)?;
386        let count = i64::from_le_bytes(count_buf);
387
388        r.read_exact(&mut count_buf)?;
389        let sum = f64::from_bits(u64::from_le_bytes(count_buf));
390
391        // Min
392        let mut flag_buf = [0u8; 1];
393        r.read_exact(&mut flag_buf)?;
394        let min = if flag_buf[0] != 0 {
395            Some(deserialize_value(r)?)
396        } else {
397            None
398        };
399
400        // Max
401        r.read_exact(&mut flag_buf)?;
402        let max = if flag_buf[0] != 0 {
403            Some(deserialize_value(r)?)
404        } else {
405            None
406        };
407
408        // First
409        r.read_exact(&mut flag_buf)?;
410        let first = if flag_buf[0] != 0 {
411            Some(deserialize_value(r)?)
412        } else {
413            None
414        };
415
416        accumulators.push(Accumulator {
417            count,
418            sum,
419            min,
420            max,
421            first,
422        });
423    }
424
425    Ok(GroupState {
426        key_values,
427        accumulators,
428    })
429}
430
431/// Push-based aggregate operator with spilling support.
432///
433/// Uses partitioned hash table that can spill cold partitions to disk
434/// when memory pressure is high.
435#[cfg(feature = "spill")]
436pub struct SpillableAggregatePushOperator {
437    /// Columns to group by.
438    group_by: Vec<usize>,
439    /// Aggregate expressions.
440    aggregates: Vec<AggregateExpr>,
441    /// Spill manager (None = no spilling).
442    spill_manager: Option<Arc<SpillManager>>,
443    /// Partitioned groups (used when spilling is enabled).
444    partitioned_groups: Option<PartitionedState<GroupState>>,
445    /// Non-partitioned groups (used when spilling is disabled).
446    groups: HashMap<GroupKey, GroupState>,
447    /// Global accumulator (for no GROUP BY).
448    global_state: Option<Vec<Accumulator>>,
449    /// Spill threshold (number of groups).
450    spill_threshold: usize,
451    /// Whether we've switched to partitioned mode.
452    using_partitioned: bool,
453}
454
455#[cfg(feature = "spill")]
456impl SpillableAggregatePushOperator {
457    /// Create a new spillable aggregate operator.
458    pub fn new(group_by: Vec<usize>, aggregates: Vec<AggregateExpr>) -> Self {
459        let global_state = if group_by.is_empty() {
460            Some(aggregates.iter().map(|_| Accumulator::new()).collect())
461        } else {
462            None
463        };
464
465        Self {
466            group_by,
467            aggregates,
468            spill_manager: None,
469            partitioned_groups: None,
470            groups: HashMap::new(),
471            global_state,
472            spill_threshold: DEFAULT_AGGREGATE_SPILL_THRESHOLD,
473            using_partitioned: false,
474        }
475    }
476
477    /// Create a spillable aggregate operator with spilling enabled.
478    pub fn with_spilling(
479        group_by: Vec<usize>,
480        aggregates: Vec<AggregateExpr>,
481        manager: Arc<SpillManager>,
482        threshold: usize,
483    ) -> Self {
484        let global_state = if group_by.is_empty() {
485            Some(aggregates.iter().map(|_| Accumulator::new()).collect())
486        } else {
487            None
488        };
489
490        let partitioned = PartitionedState::new(
491            Arc::clone(&manager),
492            256, // Number of partitions
493            serialize_group_state,
494            deserialize_group_state,
495        );
496
497        Self {
498            group_by,
499            aggregates,
500            spill_manager: Some(manager),
501            partitioned_groups: Some(partitioned),
502            groups: HashMap::new(),
503            global_state,
504            spill_threshold: threshold,
505            using_partitioned: true,
506        }
507    }
508
509    /// Create a simple global aggregate (no GROUP BY).
510    pub fn global(aggregates: Vec<AggregateExpr>) -> Self {
511        Self::new(Vec::new(), aggregates)
512    }
513
514    /// Sets the spill threshold.
515    pub fn with_threshold(mut self, threshold: usize) -> Self {
516        self.spill_threshold = threshold;
517        self
518    }
519
520    /// Switches to partitioned mode if needed.
521    fn maybe_spill(&mut self) -> Result<(), OperatorError> {
522        if self.global_state.is_some() {
523            // Global aggregation doesn't need spilling
524            return Ok(());
525        }
526
527        // If using partitioned state, check if we need to spill
528        if let Some(ref mut partitioned) = self.partitioned_groups {
529            if partitioned.total_size() >= self.spill_threshold {
530                partitioned
531                    .spill_largest()
532                    .map_err(|e| OperatorError::Execution(e.to_string()))?;
533            }
534        } else if self.groups.len() >= self.spill_threshold {
535            // Not using partitioned state yet, but reached threshold
536            // If spilling is configured, switch to partitioned mode
537            if let Some(ref manager) = self.spill_manager {
538                let mut partitioned = PartitionedState::new(
539                    Arc::clone(manager),
540                    256,
541                    serialize_group_state,
542                    deserialize_group_state,
543                );
544
545                // Move existing groups to partitioned state
546                for (_key, state) in self.groups.drain() {
547                    partitioned
548                        .insert(state.key_values.clone(), state)
549                        .map_err(|e| OperatorError::Execution(e.to_string()))?;
550                }
551
552                self.partitioned_groups = Some(partitioned);
553                self.using_partitioned = true;
554            }
555        }
556
557        Ok(())
558    }
559}
560
561#[cfg(feature = "spill")]
562impl PushOperator for SpillableAggregatePushOperator {
563    fn push(&mut self, chunk: DataChunk, _sink: &mut dyn Sink) -> Result<bool, OperatorError> {
564        if chunk.is_empty() {
565            return Ok(true);
566        }
567
568        for row in chunk.selected_indices() {
569            if self.group_by.is_empty() {
570                // Global aggregation - same as non-spillable
571                if let Some(ref mut accumulators) = self.global_state {
572                    for (acc, expr) in accumulators.iter_mut().zip(&self.aggregates) {
573                        if let Some(col) = expr.column {
574                            if let Some(c) = chunk.column(col)
575                                && let Some(val) = c.get_value(row)
576                            {
577                                acc.add(&val);
578                            }
579                        } else {
580                            acc.count += 1;
581                        }
582                    }
583                }
584            } else if self.using_partitioned {
585                // Use partitioned state
586                if let Some(ref mut partitioned) = self.partitioned_groups {
587                    let key_values: Vec<Value> = self
588                        .group_by
589                        .iter()
590                        .map(|&col| {
591                            chunk
592                                .column(col)
593                                .and_then(|c| c.get_value(row))
594                                .unwrap_or(Value::Null)
595                        })
596                        .collect();
597
598                    let aggregates = &self.aggregates;
599                    let state = partitioned
600                        .get_or_insert_with(key_values.clone(), || GroupState {
601                            key_values: key_values.clone(),
602                            accumulators: aggregates.iter().map(|_| Accumulator::new()).collect(),
603                        })
604                        .map_err(|e| OperatorError::Execution(e.to_string()))?;
605
606                    for (acc, expr) in state.accumulators.iter_mut().zip(&self.aggregates) {
607                        if let Some(col) = expr.column {
608                            if let Some(c) = chunk.column(col)
609                                && let Some(val) = c.get_value(row)
610                            {
611                                acc.add(&val);
612                            }
613                        } else {
614                            acc.count += 1;
615                        }
616                    }
617                }
618            } else {
619                // Use regular hash map
620                let key = GroupKey::from_row(&chunk, row, &self.group_by);
621
622                let state = self.groups.entry(key).or_insert_with(|| {
623                    let key_values: Vec<Value> = self
624                        .group_by
625                        .iter()
626                        .map(|&col| {
627                            chunk
628                                .column(col)
629                                .and_then(|c| c.get_value(row))
630                                .unwrap_or(Value::Null)
631                        })
632                        .collect();
633
634                    GroupState {
635                        key_values,
636                        accumulators: self.aggregates.iter().map(|_| Accumulator::new()).collect(),
637                    }
638                });
639
640                for (acc, expr) in state.accumulators.iter_mut().zip(&self.aggregates) {
641                    if let Some(col) = expr.column {
642                        if let Some(c) = chunk.column(col)
643                            && let Some(val) = c.get_value(row)
644                        {
645                            acc.add(&val);
646                        }
647                    } else {
648                        acc.count += 1;
649                    }
650                }
651            }
652        }
653
654        // Check if we need to spill
655        self.maybe_spill()?;
656
657        Ok(true)
658    }
659
660    fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError> {
661        let num_output_cols = self.group_by.len() + self.aggregates.len();
662        let mut columns: Vec<ValueVector> =
663            (0..num_output_cols).map(|_| ValueVector::new()).collect();
664
665        if self.group_by.is_empty() {
666            // Global aggregation - single row output
667            if let Some(ref mut accumulators) = self.global_state {
668                for (i, (acc, expr)) in accumulators.iter_mut().zip(&self.aggregates).enumerate() {
669                    columns[i].push(acc.finalize(expr.function));
670                }
671            }
672        } else if self.using_partitioned {
673            // Drain partitioned state
674            if let Some(ref mut partitioned) = self.partitioned_groups {
675                let groups = partitioned
676                    .drain_all()
677                    .map_err(|e| OperatorError::Execution(e.to_string()))?;
678
679                for (_key, mut state) in groups {
680                    // Output group key columns
681                    for (i, val) in state.key_values.iter().enumerate() {
682                        columns[i].push(val.clone());
683                    }
684
685                    // Output aggregate results
686                    for (i, (acc, expr)) in state
687                        .accumulators
688                        .iter_mut()
689                        .zip(&self.aggregates)
690                        .enumerate()
691                    {
692                        columns[self.group_by.len() + i].push(acc.finalize(expr.function));
693                    }
694                }
695            }
696        } else {
697            // Group by using regular hash map - one row per group
698            for state in self.groups.values_mut() {
699                // Output group key columns
700                for (i, val) in state.key_values.iter().enumerate() {
701                    columns[i].push(val.clone());
702                }
703
704                // Output aggregate results
705                for (i, (acc, expr)) in state
706                    .accumulators
707                    .iter_mut()
708                    .zip(&self.aggregates)
709                    .enumerate()
710                {
711                    columns[self.group_by.len() + i].push(acc.finalize(expr.function));
712                }
713            }
714        }
715
716        if !columns.is_empty() && !columns[0].is_empty() {
717            let chunk = DataChunk::new(columns);
718            sink.consume(chunk)?;
719        }
720
721        Ok(())
722    }
723
724    fn preferred_chunk_size(&self) -> ChunkSizeHint {
725        ChunkSizeHint::Default
726    }
727
728    fn name(&self) -> &'static str {
729        "SpillableAggregatePush"
730    }
731}
732
733#[cfg(test)]
734mod tests {
735    use super::*;
736    use crate::execution::sink::CollectorSink;
737
738    fn create_test_chunk(values: &[i64]) -> DataChunk {
739        let v: Vec<Value> = values.iter().map(|&i| Value::Int64(i)).collect();
740        let vector = ValueVector::from_values(&v);
741        DataChunk::new(vec![vector])
742    }
743
744    fn create_two_column_chunk(col1: &[i64], col2: &[i64]) -> DataChunk {
745        let v1: Vec<Value> = col1.iter().map(|&i| Value::Int64(i)).collect();
746        let v2: Vec<Value> = col2.iter().map(|&i| Value::Int64(i)).collect();
747        DataChunk::new(vec![
748            ValueVector::from_values(&v1),
749            ValueVector::from_values(&v2),
750        ])
751    }
752
753    #[test]
754    fn test_global_count() {
755        let mut agg = AggregatePushOperator::global(vec![AggregateExpr::count_star()]);
756        let mut sink = CollectorSink::new();
757
758        agg.push(create_test_chunk(&[1, 2, 3, 4, 5]), &mut sink)
759            .unwrap();
760        agg.finalize(&mut sink).unwrap();
761
762        let chunks = sink.into_chunks();
763        assert_eq!(chunks.len(), 1);
764        assert_eq!(
765            chunks[0].column(0).unwrap().get_value(0),
766            Some(Value::Int64(5))
767        );
768    }
769
770    #[test]
771    fn test_global_sum() {
772        let mut agg = AggregatePushOperator::global(vec![AggregateExpr::sum(0)]);
773        let mut sink = CollectorSink::new();
774
775        agg.push(create_test_chunk(&[1, 2, 3, 4, 5]), &mut sink)
776            .unwrap();
777        agg.finalize(&mut sink).unwrap();
778
779        let chunks = sink.into_chunks();
780        assert_eq!(
781            chunks[0].column(0).unwrap().get_value(0),
782            Some(Value::Float64(15.0))
783        );
784    }
785
786    #[test]
787    fn test_global_min_max() {
788        let mut agg =
789            AggregatePushOperator::global(vec![AggregateExpr::min(0), AggregateExpr::max(0)]);
790        let mut sink = CollectorSink::new();
791
792        agg.push(create_test_chunk(&[3, 1, 4, 1, 5, 9, 2, 6]), &mut sink)
793            .unwrap();
794        agg.finalize(&mut sink).unwrap();
795
796        let chunks = sink.into_chunks();
797        assert_eq!(
798            chunks[0].column(0).unwrap().get_value(0),
799            Some(Value::Int64(1))
800        );
801        assert_eq!(
802            chunks[0].column(1).unwrap().get_value(0),
803            Some(Value::Int64(9))
804        );
805    }
806
807    #[test]
808    fn test_group_by_sum() {
809        // Group by column 0, sum column 1
810        let mut agg = AggregatePushOperator::new(vec![0], vec![AggregateExpr::sum(1)]);
811        let mut sink = CollectorSink::new();
812
813        // Group 1: 10, 20 (sum=30), Group 2: 30, 40 (sum=70)
814        agg.push(
815            create_two_column_chunk(&[1, 1, 2, 2], &[10, 20, 30, 40]),
816            &mut sink,
817        )
818        .unwrap();
819        agg.finalize(&mut sink).unwrap();
820
821        let chunks = sink.into_chunks();
822        assert_eq!(chunks[0].len(), 2); // 2 groups
823    }
824
825    #[test]
826    #[cfg(feature = "spill")]
827    fn test_spillable_aggregate_no_spill() {
828        // When threshold is not reached, should work like normal aggregate
829        let mut agg = SpillableAggregatePushOperator::new(vec![0], vec![AggregateExpr::sum(1)])
830            .with_threshold(100);
831        let mut sink = CollectorSink::new();
832
833        agg.push(
834            create_two_column_chunk(&[1, 1, 2, 2], &[10, 20, 30, 40]),
835            &mut sink,
836        )
837        .unwrap();
838        agg.finalize(&mut sink).unwrap();
839
840        let chunks = sink.into_chunks();
841        assert_eq!(chunks[0].len(), 2); // 2 groups
842    }
843
844    #[test]
845    #[cfg(feature = "spill")]
846    fn test_spillable_aggregate_with_spilling() {
847        use tempfile::TempDir;
848
849        let temp_dir = TempDir::new().unwrap();
850        let manager = Arc::new(SpillManager::new(temp_dir.path()).unwrap());
851
852        // Set very low threshold to force spilling
853        let mut agg = SpillableAggregatePushOperator::with_spilling(
854            vec![0],
855            vec![AggregateExpr::sum(1)],
856            manager,
857            3, // Spill after 3 groups
858        );
859        let mut sink = CollectorSink::new();
860
861        // Create 10 different groups
862        for i in 0..10 {
863            let chunk = create_two_column_chunk(&[i], &[i * 10]);
864            agg.push(chunk, &mut sink).unwrap();
865        }
866        agg.finalize(&mut sink).unwrap();
867
868        let chunks = sink.into_chunks();
869        assert_eq!(chunks.len(), 1);
870        assert_eq!(chunks[0].len(), 10); // 10 groups
871
872        // Verify sums are correct
873        let mut sums: Vec<f64> = Vec::new();
874        for i in 0..chunks[0].len() {
875            if let Some(Value::Float64(sum)) = chunks[0].column(1).unwrap().get_value(i) {
876                sums.push(sum);
877            }
878        }
879        sums.sort_by(|a, b| a.partial_cmp(b).unwrap());
880        assert_eq!(
881            sums,
882            vec![0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0]
883        );
884    }
885
886    #[test]
887    #[cfg(feature = "spill")]
888    fn test_spillable_aggregate_global() {
889        // Global aggregation shouldn't be affected by spilling
890        let mut agg = SpillableAggregatePushOperator::global(vec![AggregateExpr::count_star()]);
891        let mut sink = CollectorSink::new();
892
893        agg.push(create_test_chunk(&[1, 2, 3, 4, 5]), &mut sink)
894            .unwrap();
895        agg.finalize(&mut sink).unwrap();
896
897        let chunks = sink.into_chunks();
898        assert_eq!(chunks.len(), 1);
899        assert_eq!(
900            chunks[0].column(0).unwrap().get_value(0),
901            Some(Value::Int64(5))
902        );
903    }
904
905    #[test]
906    #[cfg(feature = "spill")]
907    fn test_spillable_aggregate_many_groups() {
908        use tempfile::TempDir;
909
910        let temp_dir = TempDir::new().unwrap();
911        let manager = Arc::new(SpillManager::new(temp_dir.path()).unwrap());
912
913        let mut agg = SpillableAggregatePushOperator::with_spilling(
914            vec![0],
915            vec![AggregateExpr::count_star()],
916            manager,
917            10, // Very low threshold
918        );
919        let mut sink = CollectorSink::new();
920
921        // Create 100 different groups
922        for i in 0..100 {
923            let chunk = create_test_chunk(&[i]);
924            agg.push(chunk, &mut sink).unwrap();
925        }
926        agg.finalize(&mut sink).unwrap();
927
928        let chunks = sink.into_chunks();
929        assert_eq!(chunks.len(), 1);
930        assert_eq!(chunks[0].len(), 100); // 100 groups
931
932        // Each group should have count = 1
933        for i in 0..100 {
934            if let Some(Value::Int64(count)) = chunks[0].column(1).unwrap().get_value(i) {
935                assert_eq!(count, 1);
936            }
937        }
938    }
939}