Skip to main content

grafeo_core/execution/operators/push/
aggregate.rs

1//! Push-based aggregate operator (pipeline breaker).
2
3use crate::execution::chunk::DataChunk;
4use crate::execution::operators::OperatorError;
5use crate::execution::operators::accumulator::{AggregateExpr, AggregateFunction};
6use crate::execution::pipeline::{ChunkSizeHint, PushOperator, Sink};
7#[cfg(feature = "spill")]
8use crate::execution::spill::{PartitionedState, SpillManager};
9use crate::execution::vector::ValueVector;
10use grafeo_common::types::Value;
11use std::collections::HashMap;
12#[cfg(feature = "spill")]
13use std::io::{Read, Write};
14#[cfg(feature = "spill")]
15use std::sync::Arc;
16
17/// Accumulator for aggregate state.
18#[derive(Debug, Clone, Default)]
19struct Accumulator {
20    count: i64,
21    sum: f64,
22    min: Option<Value>,
23    max: Option<Value>,
24    first: Option<Value>,
25}
26
27impl Accumulator {
28    fn new() -> Self {
29        Self {
30            count: 0,
31            sum: 0.0,
32            min: None,
33            max: None,
34            first: None,
35        }
36    }
37
38    fn add(&mut self, value: &Value) {
39        // Skip nulls for aggregates
40        if matches!(value, Value::Null) {
41            return;
42        }
43
44        self.count += 1;
45
46        // Sum (for numeric types)
47        if let Some(n) = value_to_f64(value) {
48            self.sum += n;
49        }
50
51        // Min
52        if self.min.is_none() || compare_for_min(&self.min, value) {
53            self.min = Some(value.clone());
54        }
55
56        // Max
57        if self.max.is_none() || compare_for_max(&self.max, value) {
58            self.max = Some(value.clone());
59        }
60
61        // First
62        if self.first.is_none() {
63            self.first = Some(value.clone());
64        }
65    }
66
67    fn finalize(&mut self, func: AggregateFunction) -> Value {
68        match func {
69            AggregateFunction::Count | AggregateFunction::CountNonNull => Value::Int64(self.count),
70            AggregateFunction::Sum => {
71                if self.count == 0 {
72                    Value::Null
73                } else {
74                    Value::Float64(self.sum)
75                }
76            }
77            AggregateFunction::Min => self.min.take().unwrap_or(Value::Null),
78            AggregateFunction::Max => self.max.take().unwrap_or(Value::Null),
79            AggregateFunction::Avg => {
80                if self.count == 0 {
81                    Value::Null
82                } else {
83                    Value::Float64(self.sum / self.count as f64)
84                }
85            }
86            AggregateFunction::First => self.first.take().unwrap_or(Value::Null),
87            // Advanced functions not supported in push-based accumulator;
88            // these are handled by the pull-based AggregateState instead.
89            AggregateFunction::Last
90            | AggregateFunction::Collect
91            | AggregateFunction::StdDev
92            | AggregateFunction::StdDevPop
93            | AggregateFunction::Variance
94            | AggregateFunction::VariancePop
95            | AggregateFunction::PercentileDisc
96            | AggregateFunction::PercentileCont
97            | AggregateFunction::GroupConcat
98            | AggregateFunction::Sample
99            | AggregateFunction::CovarSamp
100            | AggregateFunction::CovarPop
101            | AggregateFunction::Corr
102            | AggregateFunction::RegrSlope
103            | AggregateFunction::RegrIntercept
104            | AggregateFunction::RegrR2
105            | AggregateFunction::RegrCount
106            | AggregateFunction::RegrSxx
107            | AggregateFunction::RegrSyy
108            | AggregateFunction::RegrSxy
109            | AggregateFunction::RegrAvgx
110            | AggregateFunction::RegrAvgy => Value::Null,
111        }
112    }
113}
114
115use crate::execution::operators::value_utils::{
116    is_greater_than as compare_for_max, is_less_than as compare_for_min, value_to_f64,
117};
118
119/// Hash key for grouping.
120#[derive(Debug, Clone, PartialEq, Eq, Hash)]
121struct GroupKey(Vec<u64>);
122
123impl GroupKey {
124    fn from_row(chunk: &DataChunk, row: usize, group_by: &[usize]) -> Self {
125        let hashes: Vec<u64> = group_by
126            .iter()
127            .map(|&col| {
128                chunk
129                    .column(col)
130                    .and_then(|c| c.get_value(row))
131                    .map_or(0, |v| hash_value(&v))
132            })
133            .collect();
134        Self(hashes)
135    }
136}
137
138fn hash_value(value: &Value) -> u64 {
139    use std::collections::hash_map::DefaultHasher;
140    use std::hash::{Hash, Hasher};
141
142    let mut hasher = DefaultHasher::new();
143    match value {
144        Value::Null => 0u8.hash(&mut hasher),
145        Value::Bool(b) => b.hash(&mut hasher),
146        Value::Int64(i) => i.hash(&mut hasher),
147        Value::Float64(f) => f.to_bits().hash(&mut hasher),
148        Value::String(s) => s.hash(&mut hasher),
149        _ => 0u8.hash(&mut hasher),
150    }
151    hasher.finish()
152}
153
154/// Group state with key values and accumulators.
155#[derive(Clone)]
156struct GroupState {
157    key_values: Vec<Value>,
158    accumulators: Vec<Accumulator>,
159}
160
161/// Push-based aggregate operator.
162///
163/// This is a pipeline breaker that accumulates all input, groups by key,
164/// and produces aggregated output in the finalize phase.
165pub struct AggregatePushOperator {
166    /// Columns to group by.
167    group_by: Vec<usize>,
168    /// Aggregate expressions.
169    aggregates: Vec<AggregateExpr>,
170    /// Group states by hash key.
171    groups: HashMap<GroupKey, GroupState>,
172    /// Global accumulator (for no GROUP BY).
173    global_state: Option<Vec<Accumulator>>,
174}
175
176impl AggregatePushOperator {
177    /// Create a new aggregate operator.
178    pub fn new(group_by: Vec<usize>, aggregates: Vec<AggregateExpr>) -> Self {
179        let global_state = if group_by.is_empty() {
180            Some(aggregates.iter().map(|_| Accumulator::new()).collect())
181        } else {
182            None
183        };
184
185        Self {
186            group_by,
187            aggregates,
188            groups: HashMap::new(),
189            global_state,
190        }
191    }
192
193    /// Create a simple global aggregate (no GROUP BY).
194    pub fn global(aggregates: Vec<AggregateExpr>) -> Self {
195        Self::new(Vec::new(), aggregates)
196    }
197}
198
199impl PushOperator for AggregatePushOperator {
200    fn push(&mut self, chunk: DataChunk, _sink: &mut dyn Sink) -> Result<bool, OperatorError> {
201        if chunk.is_empty() {
202            return Ok(true);
203        }
204
205        for row in chunk.selected_indices() {
206            if self.group_by.is_empty() {
207                // Global aggregation
208                if let Some(ref mut accumulators) = self.global_state {
209                    for (acc, expr) in accumulators.iter_mut().zip(&self.aggregates) {
210                        if let Some(col) = expr.column {
211                            if let Some(c) = chunk.column(col)
212                                && let Some(val) = c.get_value(row)
213                            {
214                                acc.add(&val);
215                            }
216                        } else {
217                            // COUNT(*)
218                            acc.count += 1;
219                        }
220                    }
221                }
222            } else {
223                // Group by aggregation
224                let key = GroupKey::from_row(&chunk, row, &self.group_by);
225
226                let state = self.groups.entry(key).or_insert_with(|| {
227                    let key_values: Vec<Value> = self
228                        .group_by
229                        .iter()
230                        .map(|&col| {
231                            chunk
232                                .column(col)
233                                .and_then(|c| c.get_value(row))
234                                .unwrap_or(Value::Null)
235                        })
236                        .collect();
237
238                    GroupState {
239                        key_values,
240                        accumulators: self.aggregates.iter().map(|_| Accumulator::new()).collect(),
241                    }
242                });
243
244                for (acc, expr) in state.accumulators.iter_mut().zip(&self.aggregates) {
245                    if let Some(col) = expr.column {
246                        if let Some(c) = chunk.column(col)
247                            && let Some(val) = c.get_value(row)
248                        {
249                            acc.add(&val);
250                        }
251                    } else {
252                        // COUNT(*)
253                        acc.count += 1;
254                    }
255                }
256            }
257        }
258
259        Ok(true)
260    }
261
262    fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError> {
263        let num_output_cols = self.group_by.len() + self.aggregates.len();
264        let mut columns: Vec<ValueVector> =
265            (0..num_output_cols).map(|_| ValueVector::new()).collect();
266
267        if self.group_by.is_empty() {
268            // Global aggregation - single row output
269            if let Some(ref mut accumulators) = self.global_state {
270                for (i, (acc, expr)) in accumulators.iter_mut().zip(&self.aggregates).enumerate() {
271                    columns[i].push(acc.finalize(expr.function));
272                }
273            }
274        } else {
275            // Group by - one row per group
276            for state in self.groups.values_mut() {
277                // Output group key columns
278                for (i, val) in state.key_values.iter().enumerate() {
279                    columns[i].push(val.clone());
280                }
281
282                // Output aggregate results
283                for (i, (acc, expr)) in state
284                    .accumulators
285                    .iter_mut()
286                    .zip(&self.aggregates)
287                    .enumerate()
288                {
289                    columns[self.group_by.len() + i].push(acc.finalize(expr.function));
290                }
291            }
292        }
293
294        if !columns.is_empty() && !columns[0].is_empty() {
295            let chunk = DataChunk::new(columns);
296            sink.consume(chunk)?;
297        }
298
299        Ok(())
300    }
301
302    fn preferred_chunk_size(&self) -> ChunkSizeHint {
303        ChunkSizeHint::Default
304    }
305
306    fn name(&self) -> &'static str {
307        "AggregatePush"
308    }
309}
310
311/// Default spill threshold for aggregates (number of groups).
312#[cfg(feature = "spill")]
313pub const DEFAULT_AGGREGATE_SPILL_THRESHOLD: usize = 50_000;
314
315/// Serializes a GroupState to bytes.
316#[cfg(feature = "spill")]
317fn serialize_group_state(state: &GroupState, w: &mut dyn Write) -> std::io::Result<()> {
318    use crate::execution::spill::serialize_value;
319
320    // Write key values
321    w.write_all(&(state.key_values.len() as u64).to_le_bytes())?;
322    for val in &state.key_values {
323        serialize_value(val, w)?;
324    }
325
326    // Write accumulators
327    w.write_all(&(state.accumulators.len() as u64).to_le_bytes())?;
328    for acc in &state.accumulators {
329        w.write_all(&acc.count.to_le_bytes())?;
330        w.write_all(&acc.sum.to_bits().to_le_bytes())?;
331
332        // Min
333        let has_min = acc.min.is_some();
334        w.write_all(&[has_min as u8])?;
335        if let Some(ref v) = acc.min {
336            serialize_value(v, w)?;
337        }
338
339        // Max
340        let has_max = acc.max.is_some();
341        w.write_all(&[has_max as u8])?;
342        if let Some(ref v) = acc.max {
343            serialize_value(v, w)?;
344        }
345
346        // First
347        let has_first = acc.first.is_some();
348        w.write_all(&[has_first as u8])?;
349        if let Some(ref v) = acc.first {
350            serialize_value(v, w)?;
351        }
352    }
353
354    Ok(())
355}
356
357/// Deserializes a GroupState from bytes.
358#[cfg(feature = "spill")]
359fn deserialize_group_state(r: &mut dyn Read) -> std::io::Result<GroupState> {
360    use crate::execution::spill::deserialize_value;
361
362    // Read key values
363    let mut len_buf = [0u8; 8];
364    r.read_exact(&mut len_buf)?;
365    let num_keys = u64::from_le_bytes(len_buf) as usize;
366
367    let mut key_values = Vec::with_capacity(num_keys);
368    for _ in 0..num_keys {
369        key_values.push(deserialize_value(r)?);
370    }
371
372    // Read accumulators
373    r.read_exact(&mut len_buf)?;
374    let num_accumulators = u64::from_le_bytes(len_buf) as usize;
375
376    let mut accumulators = Vec::with_capacity(num_accumulators);
377    for _ in 0..num_accumulators {
378        let mut count_buf = [0u8; 8];
379        r.read_exact(&mut count_buf)?;
380        let count = i64::from_le_bytes(count_buf);
381
382        r.read_exact(&mut count_buf)?;
383        let sum = f64::from_bits(u64::from_le_bytes(count_buf));
384
385        // Min
386        let mut flag_buf = [0u8; 1];
387        r.read_exact(&mut flag_buf)?;
388        let min = if flag_buf[0] != 0 {
389            Some(deserialize_value(r)?)
390        } else {
391            None
392        };
393
394        // Max
395        r.read_exact(&mut flag_buf)?;
396        let max = if flag_buf[0] != 0 {
397            Some(deserialize_value(r)?)
398        } else {
399            None
400        };
401
402        // First
403        r.read_exact(&mut flag_buf)?;
404        let first = if flag_buf[0] != 0 {
405            Some(deserialize_value(r)?)
406        } else {
407            None
408        };
409
410        accumulators.push(Accumulator {
411            count,
412            sum,
413            min,
414            max,
415            first,
416        });
417    }
418
419    Ok(GroupState {
420        key_values,
421        accumulators,
422    })
423}
424
425/// Push-based aggregate operator with spilling support.
426///
427/// Uses partitioned hash table that can spill cold partitions to disk
428/// when memory pressure is high.
429#[cfg(feature = "spill")]
430pub struct SpillableAggregatePushOperator {
431    /// Columns to group by.
432    group_by: Vec<usize>,
433    /// Aggregate expressions.
434    aggregates: Vec<AggregateExpr>,
435    /// Spill manager (None = no spilling).
436    spill_manager: Option<Arc<SpillManager>>,
437    /// Partitioned groups (used when spilling is enabled).
438    partitioned_groups: Option<PartitionedState<GroupState>>,
439    /// Non-partitioned groups (used when spilling is disabled).
440    groups: HashMap<GroupKey, GroupState>,
441    /// Global accumulator (for no GROUP BY).
442    global_state: Option<Vec<Accumulator>>,
443    /// Spill threshold (number of groups).
444    spill_threshold: usize,
445    /// Whether we've switched to partitioned mode.
446    using_partitioned: bool,
447}
448
449#[cfg(feature = "spill")]
450impl SpillableAggregatePushOperator {
451    /// Create a new spillable aggregate operator.
452    pub fn new(group_by: Vec<usize>, aggregates: Vec<AggregateExpr>) -> Self {
453        let global_state = if group_by.is_empty() {
454            Some(aggregates.iter().map(|_| Accumulator::new()).collect())
455        } else {
456            None
457        };
458
459        Self {
460            group_by,
461            aggregates,
462            spill_manager: None,
463            partitioned_groups: None,
464            groups: HashMap::new(),
465            global_state,
466            spill_threshold: DEFAULT_AGGREGATE_SPILL_THRESHOLD,
467            using_partitioned: false,
468        }
469    }
470
471    /// Create a spillable aggregate operator with spilling enabled.
472    pub fn with_spilling(
473        group_by: Vec<usize>,
474        aggregates: Vec<AggregateExpr>,
475        manager: Arc<SpillManager>,
476        threshold: usize,
477    ) -> Self {
478        let global_state = if group_by.is_empty() {
479            Some(aggregates.iter().map(|_| Accumulator::new()).collect())
480        } else {
481            None
482        };
483
484        let partitioned = PartitionedState::new(
485            Arc::clone(&manager),
486            256, // Number of partitions
487            serialize_group_state,
488            deserialize_group_state,
489        );
490
491        Self {
492            group_by,
493            aggregates,
494            spill_manager: Some(manager),
495            partitioned_groups: Some(partitioned),
496            groups: HashMap::new(),
497            global_state,
498            spill_threshold: threshold,
499            using_partitioned: true,
500        }
501    }
502
503    /// Create a simple global aggregate (no GROUP BY).
504    pub fn global(aggregates: Vec<AggregateExpr>) -> Self {
505        Self::new(Vec::new(), aggregates)
506    }
507
508    /// Sets the spill threshold.
509    pub fn with_threshold(mut self, threshold: usize) -> Self {
510        self.spill_threshold = threshold;
511        self
512    }
513
514    /// Switches to partitioned mode if needed.
515    fn maybe_spill(&mut self) -> Result<(), OperatorError> {
516        if self.global_state.is_some() {
517            // Global aggregation doesn't need spilling
518            return Ok(());
519        }
520
521        // If using partitioned state, check if we need to spill
522        if let Some(ref mut partitioned) = self.partitioned_groups {
523            if partitioned.total_size() >= self.spill_threshold {
524                partitioned
525                    .spill_largest()
526                    .map_err(|e| OperatorError::Execution(e.to_string()))?;
527            }
528        } else if self.groups.len() >= self.spill_threshold {
529            // Not using partitioned state yet, but reached threshold
530            // If spilling is configured, switch to partitioned mode
531            if let Some(ref manager) = self.spill_manager {
532                let mut partitioned = PartitionedState::new(
533                    Arc::clone(manager),
534                    256,
535                    serialize_group_state,
536                    deserialize_group_state,
537                );
538
539                // Move existing groups to partitioned state
540                for (_key, state) in self.groups.drain() {
541                    partitioned
542                        .insert(state.key_values.clone(), state)
543                        .map_err(|e| OperatorError::Execution(e.to_string()))?;
544                }
545
546                self.partitioned_groups = Some(partitioned);
547                self.using_partitioned = true;
548            }
549        }
550
551        Ok(())
552    }
553}
554
555#[cfg(feature = "spill")]
556impl PushOperator for SpillableAggregatePushOperator {
557    fn push(&mut self, chunk: DataChunk, _sink: &mut dyn Sink) -> Result<bool, OperatorError> {
558        if chunk.is_empty() {
559            return Ok(true);
560        }
561
562        for row in chunk.selected_indices() {
563            if self.group_by.is_empty() {
564                // Global aggregation - same as non-spillable
565                if let Some(ref mut accumulators) = self.global_state {
566                    for (acc, expr) in accumulators.iter_mut().zip(&self.aggregates) {
567                        if let Some(col) = expr.column {
568                            if let Some(c) = chunk.column(col)
569                                && let Some(val) = c.get_value(row)
570                            {
571                                acc.add(&val);
572                            }
573                        } else {
574                            acc.count += 1;
575                        }
576                    }
577                }
578            } else if self.using_partitioned {
579                // Use partitioned state
580                if let Some(ref mut partitioned) = self.partitioned_groups {
581                    let key_values: Vec<Value> = self
582                        .group_by
583                        .iter()
584                        .map(|&col| {
585                            chunk
586                                .column(col)
587                                .and_then(|c| c.get_value(row))
588                                .unwrap_or(Value::Null)
589                        })
590                        .collect();
591
592                    let aggregates = &self.aggregates;
593                    let state = partitioned
594                        .get_or_insert_with(key_values.clone(), || GroupState {
595                            key_values: key_values.clone(),
596                            accumulators: aggregates.iter().map(|_| Accumulator::new()).collect(),
597                        })
598                        .map_err(|e| OperatorError::Execution(e.to_string()))?;
599
600                    for (acc, expr) in state.accumulators.iter_mut().zip(&self.aggregates) {
601                        if let Some(col) = expr.column {
602                            if let Some(c) = chunk.column(col)
603                                && let Some(val) = c.get_value(row)
604                            {
605                                acc.add(&val);
606                            }
607                        } else {
608                            acc.count += 1;
609                        }
610                    }
611                }
612            } else {
613                // Use regular hash map
614                let key = GroupKey::from_row(&chunk, row, &self.group_by);
615
616                let state = self.groups.entry(key).or_insert_with(|| {
617                    let key_values: Vec<Value> = self
618                        .group_by
619                        .iter()
620                        .map(|&col| {
621                            chunk
622                                .column(col)
623                                .and_then(|c| c.get_value(row))
624                                .unwrap_or(Value::Null)
625                        })
626                        .collect();
627
628                    GroupState {
629                        key_values,
630                        accumulators: self.aggregates.iter().map(|_| Accumulator::new()).collect(),
631                    }
632                });
633
634                for (acc, expr) in state.accumulators.iter_mut().zip(&self.aggregates) {
635                    if let Some(col) = expr.column {
636                        if let Some(c) = chunk.column(col)
637                            && let Some(val) = c.get_value(row)
638                        {
639                            acc.add(&val);
640                        }
641                    } else {
642                        acc.count += 1;
643                    }
644                }
645            }
646        }
647
648        // Check if we need to spill
649        self.maybe_spill()?;
650
651        Ok(true)
652    }
653
654    fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError> {
655        let num_output_cols = self.group_by.len() + self.aggregates.len();
656        let mut columns: Vec<ValueVector> =
657            (0..num_output_cols).map(|_| ValueVector::new()).collect();
658
659        if self.group_by.is_empty() {
660            // Global aggregation - single row output
661            if let Some(ref mut accumulators) = self.global_state {
662                for (i, (acc, expr)) in accumulators.iter_mut().zip(&self.aggregates).enumerate() {
663                    columns[i].push(acc.finalize(expr.function));
664                }
665            }
666        } else if self.using_partitioned {
667            // Drain partitioned state
668            if let Some(ref mut partitioned) = self.partitioned_groups {
669                let groups = partitioned
670                    .drain_all()
671                    .map_err(|e| OperatorError::Execution(e.to_string()))?;
672
673                for (_key, mut state) in groups {
674                    // Output group key columns
675                    for (i, val) in state.key_values.iter().enumerate() {
676                        columns[i].push(val.clone());
677                    }
678
679                    // Output aggregate results
680                    for (i, (acc, expr)) in state
681                        .accumulators
682                        .iter_mut()
683                        .zip(&self.aggregates)
684                        .enumerate()
685                    {
686                        columns[self.group_by.len() + i].push(acc.finalize(expr.function));
687                    }
688                }
689            }
690        } else {
691            // Group by using regular hash map - one row per group
692            for state in self.groups.values_mut() {
693                // Output group key columns
694                for (i, val) in state.key_values.iter().enumerate() {
695                    columns[i].push(val.clone());
696                }
697
698                // Output aggregate results
699                for (i, (acc, expr)) in state
700                    .accumulators
701                    .iter_mut()
702                    .zip(&self.aggregates)
703                    .enumerate()
704                {
705                    columns[self.group_by.len() + i].push(acc.finalize(expr.function));
706                }
707            }
708        }
709
710        if !columns.is_empty() && !columns[0].is_empty() {
711            let chunk = DataChunk::new(columns);
712            sink.consume(chunk)?;
713        }
714
715        Ok(())
716    }
717
718    fn preferred_chunk_size(&self) -> ChunkSizeHint {
719        ChunkSizeHint::Default
720    }
721
722    fn name(&self) -> &'static str {
723        "SpillableAggregatePush"
724    }
725}
726
727#[cfg(test)]
728mod tests {
729    use super::*;
730    use crate::execution::sink::CollectorSink;
731
732    fn create_test_chunk(values: &[i64]) -> DataChunk {
733        let v: Vec<Value> = values.iter().map(|&i| Value::Int64(i)).collect();
734        let vector = ValueVector::from_values(&v);
735        DataChunk::new(vec![vector])
736    }
737
738    fn create_two_column_chunk(col1: &[i64], col2: &[i64]) -> DataChunk {
739        let v1: Vec<Value> = col1.iter().map(|&i| Value::Int64(i)).collect();
740        let v2: Vec<Value> = col2.iter().map(|&i| Value::Int64(i)).collect();
741        DataChunk::new(vec![
742            ValueVector::from_values(&v1),
743            ValueVector::from_values(&v2),
744        ])
745    }
746
747    #[test]
748    fn test_global_count() {
749        let mut agg = AggregatePushOperator::global(vec![AggregateExpr::count_star()]);
750        let mut sink = CollectorSink::new();
751
752        agg.push(create_test_chunk(&[1, 2, 3, 4, 5]), &mut sink)
753            .unwrap();
754        agg.finalize(&mut sink).unwrap();
755
756        let chunks = sink.into_chunks();
757        assert_eq!(chunks.len(), 1);
758        assert_eq!(
759            chunks[0].column(0).unwrap().get_value(0),
760            Some(Value::Int64(5))
761        );
762    }
763
764    #[test]
765    fn test_global_sum() {
766        let mut agg = AggregatePushOperator::global(vec![AggregateExpr::sum(0)]);
767        let mut sink = CollectorSink::new();
768
769        agg.push(create_test_chunk(&[1, 2, 3, 4, 5]), &mut sink)
770            .unwrap();
771        agg.finalize(&mut sink).unwrap();
772
773        let chunks = sink.into_chunks();
774        assert_eq!(
775            chunks[0].column(0).unwrap().get_value(0),
776            Some(Value::Float64(15.0))
777        );
778    }
779
780    #[test]
781    fn test_global_min_max() {
782        let mut agg =
783            AggregatePushOperator::global(vec![AggregateExpr::min(0), AggregateExpr::max(0)]);
784        let mut sink = CollectorSink::new();
785
786        agg.push(create_test_chunk(&[3, 1, 4, 1, 5, 9, 2, 6]), &mut sink)
787            .unwrap();
788        agg.finalize(&mut sink).unwrap();
789
790        let chunks = sink.into_chunks();
791        assert_eq!(
792            chunks[0].column(0).unwrap().get_value(0),
793            Some(Value::Int64(1))
794        );
795        assert_eq!(
796            chunks[0].column(1).unwrap().get_value(0),
797            Some(Value::Int64(9))
798        );
799    }
800
801    #[test]
802    fn test_group_by_sum() {
803        // Group by column 0, sum column 1
804        let mut agg = AggregatePushOperator::new(vec![0], vec![AggregateExpr::sum(1)]);
805        let mut sink = CollectorSink::new();
806
807        // Group 1: 10, 20 (sum=30), Group 2: 30, 40 (sum=70)
808        agg.push(
809            create_two_column_chunk(&[1, 1, 2, 2], &[10, 20, 30, 40]),
810            &mut sink,
811        )
812        .unwrap();
813        agg.finalize(&mut sink).unwrap();
814
815        let chunks = sink.into_chunks();
816        assert_eq!(chunks[0].len(), 2); // 2 groups
817    }
818
819    #[test]
820    #[cfg(feature = "spill")]
821    fn test_spillable_aggregate_no_spill() {
822        // When threshold is not reached, should work like normal aggregate
823        let mut agg = SpillableAggregatePushOperator::new(vec![0], vec![AggregateExpr::sum(1)])
824            .with_threshold(100);
825        let mut sink = CollectorSink::new();
826
827        agg.push(
828            create_two_column_chunk(&[1, 1, 2, 2], &[10, 20, 30, 40]),
829            &mut sink,
830        )
831        .unwrap();
832        agg.finalize(&mut sink).unwrap();
833
834        let chunks = sink.into_chunks();
835        assert_eq!(chunks[0].len(), 2); // 2 groups
836    }
837
838    #[test]
839    #[cfg(feature = "spill")]
840    fn test_spillable_aggregate_with_spilling() {
841        use tempfile::TempDir;
842
843        let temp_dir = TempDir::new().unwrap();
844        let manager = Arc::new(SpillManager::new(temp_dir.path()).unwrap());
845
846        // Set very low threshold to force spilling
847        let mut agg = SpillableAggregatePushOperator::with_spilling(
848            vec![0],
849            vec![AggregateExpr::sum(1)],
850            manager,
851            3, // Spill after 3 groups
852        );
853        let mut sink = CollectorSink::new();
854
855        // Create 10 different groups
856        for i in 0..10 {
857            let chunk = create_two_column_chunk(&[i], &[i * 10]);
858            agg.push(chunk, &mut sink).unwrap();
859        }
860        agg.finalize(&mut sink).unwrap();
861
862        let chunks = sink.into_chunks();
863        assert_eq!(chunks.len(), 1);
864        assert_eq!(chunks[0].len(), 10); // 10 groups
865
866        // Verify sums are correct
867        let mut sums: Vec<f64> = Vec::new();
868        for i in 0..chunks[0].len() {
869            if let Some(Value::Float64(sum)) = chunks[0].column(1).unwrap().get_value(i) {
870                sums.push(sum);
871            }
872        }
873        sums.sort_by(|a, b| a.partial_cmp(b).unwrap());
874        assert_eq!(
875            sums,
876            vec![0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0]
877        );
878    }
879
880    #[test]
881    #[cfg(feature = "spill")]
882    fn test_spillable_aggregate_global() {
883        // Global aggregation shouldn't be affected by spilling
884        let mut agg = SpillableAggregatePushOperator::global(vec![AggregateExpr::count_star()]);
885        let mut sink = CollectorSink::new();
886
887        agg.push(create_test_chunk(&[1, 2, 3, 4, 5]), &mut sink)
888            .unwrap();
889        agg.finalize(&mut sink).unwrap();
890
891        let chunks = sink.into_chunks();
892        assert_eq!(chunks.len(), 1);
893        assert_eq!(
894            chunks[0].column(0).unwrap().get_value(0),
895            Some(Value::Int64(5))
896        );
897    }
898
899    #[test]
900    #[cfg(feature = "spill")]
901    fn test_spillable_aggregate_many_groups() {
902        use tempfile::TempDir;
903
904        let temp_dir = TempDir::new().unwrap();
905        let manager = Arc::new(SpillManager::new(temp_dir.path()).unwrap());
906
907        let mut agg = SpillableAggregatePushOperator::with_spilling(
908            vec![0],
909            vec![AggregateExpr::count_star()],
910            manager,
911            10, // Very low threshold
912        );
913        let mut sink = CollectorSink::new();
914
915        // Create 100 different groups
916        for i in 0..100 {
917            let chunk = create_test_chunk(&[i]);
918            agg.push(chunk, &mut sink).unwrap();
919        }
920        agg.finalize(&mut sink).unwrap();
921
922        let chunks = sink.into_chunks();
923        assert_eq!(chunks.len(), 1);
924        assert_eq!(chunks[0].len(), 100); // 100 groups
925
926        // Each group should have count = 1
927        for i in 0..100 {
928            if let Some(Value::Int64(count)) = chunks[0].column(1).unwrap().get_value(i) {
929                assert_eq!(count, 1);
930            }
931        }
932    }
933}