Skip to main content

grafeo_core/execution/operators/push/
aggregate.rs

1//! Push-based aggregate operator (pipeline breaker).
2
3use crate::execution::chunk::DataChunk;
4use crate::execution::operators::OperatorError;
5use crate::execution::operators::accumulator::{AggregateExpr, AggregateFunction};
6use crate::execution::pipeline::{ChunkSizeHint, PushOperator, Sink};
7#[cfg(feature = "spill")]
8use crate::execution::spill::{PartitionedState, SpillManager};
9use crate::execution::vector::ValueVector;
10use grafeo_common::types::Value;
11use std::collections::HashMap;
12#[cfg(feature = "spill")]
13use std::io::{Read, Write};
14#[cfg(feature = "spill")]
15use std::sync::Arc;
16
17/// Accumulator for aggregate state.
18#[derive(Debug, Clone, Default)]
19struct Accumulator {
20    count: i64,
21    sum: f64,
22    min: Option<Value>,
23    max: Option<Value>,
24    first: Option<Value>,
25}
26
27impl Accumulator {
28    fn new() -> Self {
29        Self {
30            count: 0,
31            sum: 0.0,
32            min: None,
33            max: None,
34            first: None,
35        }
36    }
37
38    fn add(&mut self, value: &Value) {
39        // Skip nulls for aggregates
40        if matches!(value, Value::Null) {
41            return;
42        }
43
44        self.count += 1;
45
46        // Sum (for numeric types)
47        if let Some(n) = value_to_f64(value) {
48            self.sum += n;
49        }
50
51        // Min
52        if self.min.is_none() || compare_for_min(&self.min, value) {
53            self.min = Some(value.clone());
54        }
55
56        // Max
57        if self.max.is_none() || compare_for_max(&self.max, value) {
58            self.max = Some(value.clone());
59        }
60
61        // First
62        if self.first.is_none() {
63            self.first = Some(value.clone());
64        }
65    }
66
67    fn finalize(&mut self, func: AggregateFunction) -> Value {
68        match func {
69            AggregateFunction::Count | AggregateFunction::CountNonNull => Value::Int64(self.count),
70            AggregateFunction::Sum => {
71                if self.count == 0 {
72                    Value::Null
73                } else {
74                    Value::Float64(self.sum)
75                }
76            }
77            AggregateFunction::Min => self.min.take().unwrap_or(Value::Null),
78            AggregateFunction::Max => self.max.take().unwrap_or(Value::Null),
79            AggregateFunction::Avg => {
80                if self.count == 0 {
81                    Value::Null
82                } else {
83                    Value::Float64(self.sum / self.count as f64)
84                }
85            }
86            AggregateFunction::First => self.first.take().unwrap_or(Value::Null),
87            // Advanced functions not supported in push-based accumulator;
88            // these are handled by the pull-based AggregateState instead.
89            AggregateFunction::Last
90            | AggregateFunction::Collect
91            | AggregateFunction::StdDev
92            | AggregateFunction::StdDevPop
93            | AggregateFunction::Variance
94            | AggregateFunction::VariancePop
95            | AggregateFunction::PercentileDisc
96            | AggregateFunction::PercentileCont
97            | AggregateFunction::GroupConcat
98            | AggregateFunction::Sample
99            | AggregateFunction::CovarSamp
100            | AggregateFunction::CovarPop
101            | AggregateFunction::Corr
102            | AggregateFunction::RegrSlope
103            | AggregateFunction::RegrIntercept
104            | AggregateFunction::RegrR2
105            | AggregateFunction::RegrCount
106            | AggregateFunction::RegrSxx
107            | AggregateFunction::RegrSyy
108            | AggregateFunction::RegrSxy
109            | AggregateFunction::RegrAvgx
110            | AggregateFunction::RegrAvgy => Value::Null,
111        }
112    }
113}
114
115use crate::execution::operators::value_utils::{
116    is_greater_than as compare_for_max, is_less_than as compare_for_min, value_to_f64,
117};
118
119/// Hash key for grouping.
120#[derive(Debug, Clone, PartialEq, Eq, Hash)]
121struct GroupKey(Vec<u64>);
122
123impl GroupKey {
124    fn from_row(chunk: &DataChunk, row: usize, group_by: &[usize]) -> Self {
125        let hashes: Vec<u64> = group_by
126            .iter()
127            .map(|&col| {
128                chunk
129                    .column(col)
130                    .and_then(|c| c.get_value(row))
131                    .map_or(0, |v| hash_value(&v))
132            })
133            .collect();
134        Self(hashes)
135    }
136}
137
138fn hash_value(value: &Value) -> u64 {
139    use std::collections::hash_map::DefaultHasher;
140    use std::hash::{Hash, Hasher};
141
142    let mut hasher = DefaultHasher::new();
143    // Discriminant tag prevents cross-type collisions (e.g. Null vs unknown)
144    match value {
145        Value::Null => 0u8.hash(&mut hasher),
146        Value::Bool(b) => {
147            1u8.hash(&mut hasher);
148            b.hash(&mut hasher);
149        }
150        Value::Int64(i) => {
151            2u8.hash(&mut hasher);
152            i.hash(&mut hasher);
153        }
154        Value::Float64(f) => {
155            3u8.hash(&mut hasher);
156            f.to_bits().hash(&mut hasher);
157        }
158        Value::String(s) => {
159            4u8.hash(&mut hasher);
160            s.hash(&mut hasher);
161        }
162        Value::Bytes(b) => {
163            5u8.hash(&mut hasher);
164            b.hash(&mut hasher);
165        }
166        Value::Timestamp(t) => {
167            6u8.hash(&mut hasher);
168            t.hash(&mut hasher);
169        }
170        Value::Date(d) => {
171            7u8.hash(&mut hasher);
172            d.hash(&mut hasher);
173        }
174        Value::Time(t) => {
175            8u8.hash(&mut hasher);
176            t.hash(&mut hasher);
177        }
178        Value::Duration(d) => {
179            9u8.hash(&mut hasher);
180            d.hash(&mut hasher);
181        }
182        Value::ZonedDatetime(zdt) => {
183            10u8.hash(&mut hasher);
184            zdt.hash(&mut hasher);
185        }
186        Value::List(list) => {
187            11u8.hash(&mut hasher);
188            list.len().hash(&mut hasher);
189            for elem in list.iter() {
190                hash_value(elem).hash(&mut hasher);
191            }
192        }
193        Value::Map(map) => {
194            12u8.hash(&mut hasher);
195            map.len().hash(&mut hasher);
196            // BTreeMap iterates in key order, so hashing is deterministic
197            for (k, v) in map.as_ref() {
198                k.as_str().hash(&mut hasher);
199                hash_value(v).hash(&mut hasher);
200            }
201        }
202        Value::Vector(vec) => {
203            13u8.hash(&mut hasher);
204            vec.len().hash(&mut hasher);
205            for f in vec.iter() {
206                f.to_bits().hash(&mut hasher);
207            }
208        }
209        Value::Path { nodes, edges } => {
210            14u8.hash(&mut hasher);
211            nodes.len().hash(&mut hasher);
212            for n in nodes.iter() {
213                hash_value(n).hash(&mut hasher);
214            }
215            for e in edges.iter() {
216                hash_value(e).hash(&mut hasher);
217            }
218        }
219        Value::GCounter(map) => {
220            15u8.hash(&mut hasher);
221            let mut entries: Vec<_> = map.iter().collect();
222            entries.sort_by_key(|(k, _)| *k);
223            for (k, v) in entries {
224                k.hash(&mut hasher);
225                v.hash(&mut hasher);
226            }
227        }
228        Value::OnCounter { pos, neg } => {
229            16u8.hash(&mut hasher);
230            let mut pos_entries: Vec<_> = pos.iter().collect();
231            pos_entries.sort_by_key(|(k, _)| *k);
232            for (k, v) in pos_entries {
233                k.hash(&mut hasher);
234                v.hash(&mut hasher);
235            }
236            let mut neg_entries: Vec<_> = neg.iter().collect();
237            neg_entries.sort_by_key(|(k, _)| *k);
238            for (k, v) in neg_entries {
239                k.hash(&mut hasher);
240                v.hash(&mut hasher);
241            }
242        }
243    }
244    hasher.finish()
245}
246
247/// Group state with key values and accumulators.
248#[derive(Clone)]
249struct GroupState {
250    key_values: Vec<Value>,
251    accumulators: Vec<Accumulator>,
252}
253
254/// Push-based aggregate operator.
255///
256/// This is a pipeline breaker that accumulates all input, groups by key,
257/// and produces aggregated output in the finalize phase.
258pub struct AggregatePushOperator {
259    /// Columns to group by.
260    group_by: Vec<usize>,
261    /// Aggregate expressions.
262    aggregates: Vec<AggregateExpr>,
263    /// Group states by hash key.
264    groups: HashMap<GroupKey, GroupState>,
265    /// Global accumulator (for no GROUP BY).
266    global_state: Option<Vec<Accumulator>>,
267}
268
269impl AggregatePushOperator {
270    /// Create a new aggregate operator.
271    pub fn new(group_by: Vec<usize>, aggregates: Vec<AggregateExpr>) -> Self {
272        let global_state = if group_by.is_empty() {
273            Some(aggregates.iter().map(|_| Accumulator::new()).collect())
274        } else {
275            None
276        };
277
278        Self {
279            group_by,
280            aggregates,
281            groups: HashMap::new(),
282            global_state,
283        }
284    }
285
286    /// Create a simple global aggregate (no GROUP BY).
287    pub fn global(aggregates: Vec<AggregateExpr>) -> Self {
288        Self::new(Vec::new(), aggregates)
289    }
290}
291
292impl PushOperator for AggregatePushOperator {
293    fn push(&mut self, chunk: DataChunk, _sink: &mut dyn Sink) -> Result<bool, OperatorError> {
294        if chunk.is_empty() {
295            return Ok(true);
296        }
297
298        for row in chunk.selected_indices() {
299            if self.group_by.is_empty() {
300                // Global aggregation
301                if let Some(ref mut accumulators) = self.global_state {
302                    for (acc, expr) in accumulators.iter_mut().zip(&self.aggregates) {
303                        if let Some(col) = expr.column {
304                            if let Some(c) = chunk.column(col)
305                                && let Some(val) = c.get_value(row)
306                            {
307                                acc.add(&val);
308                            }
309                        } else {
310                            // COUNT(*)
311                            acc.count += 1;
312                        }
313                    }
314                }
315            } else {
316                // Group by aggregation
317                let key = GroupKey::from_row(&chunk, row, &self.group_by);
318
319                let state = self.groups.entry(key).or_insert_with(|| {
320                    let key_values: Vec<Value> = self
321                        .group_by
322                        .iter()
323                        .map(|&col| {
324                            chunk
325                                .column(col)
326                                .and_then(|c| c.get_value(row))
327                                .unwrap_or(Value::Null)
328                        })
329                        .collect();
330
331                    GroupState {
332                        key_values,
333                        accumulators: self.aggregates.iter().map(|_| Accumulator::new()).collect(),
334                    }
335                });
336
337                for (acc, expr) in state.accumulators.iter_mut().zip(&self.aggregates) {
338                    if let Some(col) = expr.column {
339                        if let Some(c) = chunk.column(col)
340                            && let Some(val) = c.get_value(row)
341                        {
342                            acc.add(&val);
343                        }
344                    } else {
345                        // COUNT(*)
346                        acc.count += 1;
347                    }
348                }
349            }
350        }
351
352        Ok(true)
353    }
354
355    fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError> {
356        let num_output_cols = self.group_by.len() + self.aggregates.len();
357        let mut columns: Vec<ValueVector> =
358            (0..num_output_cols).map(|_| ValueVector::new()).collect();
359
360        if self.group_by.is_empty() {
361            // Global aggregation - single row output
362            if let Some(ref mut accumulators) = self.global_state {
363                for (i, (acc, expr)) in accumulators.iter_mut().zip(&self.aggregates).enumerate() {
364                    columns[i].push(acc.finalize(expr.function));
365                }
366            }
367        } else {
368            // Group by - one row per group
369            for state in self.groups.values_mut() {
370                // Output group key columns
371                for (i, val) in state.key_values.iter().enumerate() {
372                    columns[i].push(val.clone());
373                }
374
375                // Output aggregate results
376                for (i, (acc, expr)) in state
377                    .accumulators
378                    .iter_mut()
379                    .zip(&self.aggregates)
380                    .enumerate()
381                {
382                    columns[self.group_by.len() + i].push(acc.finalize(expr.function));
383                }
384            }
385        }
386
387        if !columns.is_empty() && !columns[0].is_empty() {
388            let chunk = DataChunk::new(columns);
389            sink.consume(chunk)?;
390        }
391
392        Ok(())
393    }
394
395    fn preferred_chunk_size(&self) -> ChunkSizeHint {
396        ChunkSizeHint::Default
397    }
398
399    fn name(&self) -> &'static str {
400        "AggregatePush"
401    }
402}
403
404/// Default spill threshold for aggregates (number of groups).
405#[cfg(feature = "spill")]
406pub const DEFAULT_AGGREGATE_SPILL_THRESHOLD: usize = 50_000;
407
408/// Serializes a GroupState to bytes.
409#[cfg(feature = "spill")]
410fn serialize_group_state(state: &GroupState, w: &mut dyn Write) -> std::io::Result<()> {
411    use crate::execution::spill::serialize_value;
412
413    // Write key values
414    w.write_all(&(state.key_values.len() as u64).to_le_bytes())?;
415    for val in &state.key_values {
416        serialize_value(val, w)?;
417    }
418
419    // Write accumulators
420    w.write_all(&(state.accumulators.len() as u64).to_le_bytes())?;
421    for acc in &state.accumulators {
422        w.write_all(&acc.count.to_le_bytes())?;
423        w.write_all(&acc.sum.to_bits().to_le_bytes())?;
424
425        // Min
426        let has_min = acc.min.is_some();
427        w.write_all(&[has_min as u8])?;
428        if let Some(ref v) = acc.min {
429            serialize_value(v, w)?;
430        }
431
432        // Max
433        let has_max = acc.max.is_some();
434        w.write_all(&[has_max as u8])?;
435        if let Some(ref v) = acc.max {
436            serialize_value(v, w)?;
437        }
438
439        // First
440        let has_first = acc.first.is_some();
441        w.write_all(&[has_first as u8])?;
442        if let Some(ref v) = acc.first {
443            serialize_value(v, w)?;
444        }
445    }
446
447    Ok(())
448}
449
450/// Deserializes a GroupState from bytes.
451#[cfg(feature = "spill")]
452fn deserialize_group_state(r: &mut dyn Read) -> std::io::Result<GroupState> {
453    use crate::execution::spill::deserialize_value;
454
455    // Read key values
456    let mut len_buf = [0u8; 8];
457    r.read_exact(&mut len_buf)?;
458    let num_keys = u64::from_le_bytes(len_buf) as usize;
459
460    let mut key_values = Vec::with_capacity(num_keys);
461    for _ in 0..num_keys {
462        key_values.push(deserialize_value(r)?);
463    }
464
465    // Read accumulators
466    r.read_exact(&mut len_buf)?;
467    let num_accumulators = u64::from_le_bytes(len_buf) as usize;
468
469    let mut accumulators = Vec::with_capacity(num_accumulators);
470    for _ in 0..num_accumulators {
471        let mut count_buf = [0u8; 8];
472        r.read_exact(&mut count_buf)?;
473        let count = i64::from_le_bytes(count_buf);
474
475        r.read_exact(&mut count_buf)?;
476        let sum = f64::from_bits(u64::from_le_bytes(count_buf));
477
478        // Min
479        let mut flag_buf = [0u8; 1];
480        r.read_exact(&mut flag_buf)?;
481        let min = if flag_buf[0] != 0 {
482            Some(deserialize_value(r)?)
483        } else {
484            None
485        };
486
487        // Max
488        r.read_exact(&mut flag_buf)?;
489        let max = if flag_buf[0] != 0 {
490            Some(deserialize_value(r)?)
491        } else {
492            None
493        };
494
495        // First
496        r.read_exact(&mut flag_buf)?;
497        let first = if flag_buf[0] != 0 {
498            Some(deserialize_value(r)?)
499        } else {
500            None
501        };
502
503        accumulators.push(Accumulator {
504            count,
505            sum,
506            min,
507            max,
508            first,
509        });
510    }
511
512    Ok(GroupState {
513        key_values,
514        accumulators,
515    })
516}
517
518/// Push-based aggregate operator with spilling support.
519///
520/// Uses partitioned hash table that can spill cold partitions to disk
521/// when memory pressure is high.
522#[cfg(feature = "spill")]
523pub struct SpillableAggregatePushOperator {
524    /// Columns to group by.
525    group_by: Vec<usize>,
526    /// Aggregate expressions.
527    aggregates: Vec<AggregateExpr>,
528    /// Spill manager (None = no spilling).
529    spill_manager: Option<Arc<SpillManager>>,
530    /// Partitioned groups (used when spilling is enabled).
531    partitioned_groups: Option<PartitionedState<GroupState>>,
532    /// Non-partitioned groups (used when spilling is disabled).
533    groups: HashMap<GroupKey, GroupState>,
534    /// Global accumulator (for no GROUP BY).
535    global_state: Option<Vec<Accumulator>>,
536    /// Spill threshold (number of groups).
537    spill_threshold: usize,
538    /// Whether we've switched to partitioned mode.
539    using_partitioned: bool,
540}
541
542#[cfg(feature = "spill")]
543impl SpillableAggregatePushOperator {
544    /// Create a new spillable aggregate operator.
545    pub fn new(group_by: Vec<usize>, aggregates: Vec<AggregateExpr>) -> Self {
546        let global_state = if group_by.is_empty() {
547            Some(aggregates.iter().map(|_| Accumulator::new()).collect())
548        } else {
549            None
550        };
551
552        Self {
553            group_by,
554            aggregates,
555            spill_manager: None,
556            partitioned_groups: None,
557            groups: HashMap::new(),
558            global_state,
559            spill_threshold: DEFAULT_AGGREGATE_SPILL_THRESHOLD,
560            using_partitioned: false,
561        }
562    }
563
564    /// Create a spillable aggregate operator with spilling enabled.
565    pub fn with_spilling(
566        group_by: Vec<usize>,
567        aggregates: Vec<AggregateExpr>,
568        manager: Arc<SpillManager>,
569        threshold: usize,
570    ) -> Self {
571        let global_state = if group_by.is_empty() {
572            Some(aggregates.iter().map(|_| Accumulator::new()).collect())
573        } else {
574            None
575        };
576
577        let partitioned = PartitionedState::new(
578            Arc::clone(&manager),
579            256, // Number of partitions
580            serialize_group_state,
581            deserialize_group_state,
582        );
583
584        Self {
585            group_by,
586            aggregates,
587            spill_manager: Some(manager),
588            partitioned_groups: Some(partitioned),
589            groups: HashMap::new(),
590            global_state,
591            spill_threshold: threshold,
592            using_partitioned: true,
593        }
594    }
595
596    /// Create a simple global aggregate (no GROUP BY).
597    pub fn global(aggregates: Vec<AggregateExpr>) -> Self {
598        Self::new(Vec::new(), aggregates)
599    }
600
601    /// Sets the spill threshold.
602    pub fn with_threshold(mut self, threshold: usize) -> Self {
603        self.spill_threshold = threshold;
604        self
605    }
606
607    /// Switches to partitioned mode if needed.
608    fn maybe_spill(&mut self) -> Result<(), OperatorError> {
609        if self.global_state.is_some() {
610            // Global aggregation doesn't need spilling
611            return Ok(());
612        }
613
614        // If using partitioned state, check if we need to spill
615        if let Some(ref mut partitioned) = self.partitioned_groups {
616            if partitioned.total_size() >= self.spill_threshold {
617                partitioned
618                    .spill_largest()
619                    .map_err(|e| OperatorError::Execution(e.to_string()))?;
620            }
621        } else if self.groups.len() >= self.spill_threshold {
622            // Not using partitioned state yet, but reached threshold
623            // If spilling is configured, switch to partitioned mode
624            if let Some(ref manager) = self.spill_manager {
625                let mut partitioned = PartitionedState::new(
626                    Arc::clone(manager),
627                    256,
628                    serialize_group_state,
629                    deserialize_group_state,
630                );
631
632                // Move existing groups to partitioned state
633                for (_key, state) in self.groups.drain() {
634                    partitioned
635                        .insert(state.key_values.clone(), state)
636                        .map_err(|e| OperatorError::Execution(e.to_string()))?;
637                }
638
639                self.partitioned_groups = Some(partitioned);
640                self.using_partitioned = true;
641            }
642        }
643
644        Ok(())
645    }
646}
647
648#[cfg(feature = "spill")]
649impl PushOperator for SpillableAggregatePushOperator {
650    fn push(&mut self, chunk: DataChunk, _sink: &mut dyn Sink) -> Result<bool, OperatorError> {
651        if chunk.is_empty() {
652            return Ok(true);
653        }
654
655        for row in chunk.selected_indices() {
656            if self.group_by.is_empty() {
657                // Global aggregation - same as non-spillable
658                if let Some(ref mut accumulators) = self.global_state {
659                    for (acc, expr) in accumulators.iter_mut().zip(&self.aggregates) {
660                        if let Some(col) = expr.column {
661                            if let Some(c) = chunk.column(col)
662                                && let Some(val) = c.get_value(row)
663                            {
664                                acc.add(&val);
665                            }
666                        } else {
667                            acc.count += 1;
668                        }
669                    }
670                }
671            } else if self.using_partitioned {
672                // Use partitioned state
673                if let Some(ref mut partitioned) = self.partitioned_groups {
674                    let key_values: Vec<Value> = self
675                        .group_by
676                        .iter()
677                        .map(|&col| {
678                            chunk
679                                .column(col)
680                                .and_then(|c| c.get_value(row))
681                                .unwrap_or(Value::Null)
682                        })
683                        .collect();
684
685                    let aggregates = &self.aggregates;
686                    let state = partitioned
687                        .get_or_insert_with(key_values.clone(), || GroupState {
688                            key_values: key_values.clone(),
689                            accumulators: aggregates.iter().map(|_| Accumulator::new()).collect(),
690                        })
691                        .map_err(|e| OperatorError::Execution(e.to_string()))?;
692
693                    for (acc, expr) in state.accumulators.iter_mut().zip(&self.aggregates) {
694                        if let Some(col) = expr.column {
695                            if let Some(c) = chunk.column(col)
696                                && let Some(val) = c.get_value(row)
697                            {
698                                acc.add(&val);
699                            }
700                        } else {
701                            acc.count += 1;
702                        }
703                    }
704                }
705            } else {
706                // Use regular hash map
707                let key = GroupKey::from_row(&chunk, row, &self.group_by);
708
709                let state = self.groups.entry(key).or_insert_with(|| {
710                    let key_values: Vec<Value> = self
711                        .group_by
712                        .iter()
713                        .map(|&col| {
714                            chunk
715                                .column(col)
716                                .and_then(|c| c.get_value(row))
717                                .unwrap_or(Value::Null)
718                        })
719                        .collect();
720
721                    GroupState {
722                        key_values,
723                        accumulators: self.aggregates.iter().map(|_| Accumulator::new()).collect(),
724                    }
725                });
726
727                for (acc, expr) in state.accumulators.iter_mut().zip(&self.aggregates) {
728                    if let Some(col) = expr.column {
729                        if let Some(c) = chunk.column(col)
730                            && let Some(val) = c.get_value(row)
731                        {
732                            acc.add(&val);
733                        }
734                    } else {
735                        acc.count += 1;
736                    }
737                }
738            }
739        }
740
741        // Check if we need to spill
742        self.maybe_spill()?;
743
744        Ok(true)
745    }
746
747    fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError> {
748        let num_output_cols = self.group_by.len() + self.aggregates.len();
749        let mut columns: Vec<ValueVector> =
750            (0..num_output_cols).map(|_| ValueVector::new()).collect();
751
752        if self.group_by.is_empty() {
753            // Global aggregation - single row output
754            if let Some(ref mut accumulators) = self.global_state {
755                for (i, (acc, expr)) in accumulators.iter_mut().zip(&self.aggregates).enumerate() {
756                    columns[i].push(acc.finalize(expr.function));
757                }
758            }
759        } else if self.using_partitioned {
760            // Drain partitioned state
761            if let Some(ref mut partitioned) = self.partitioned_groups {
762                let groups = partitioned
763                    .drain_all()
764                    .map_err(|e| OperatorError::Execution(e.to_string()))?;
765
766                for (_key, mut state) in groups {
767                    // Output group key columns
768                    for (i, val) in state.key_values.iter().enumerate() {
769                        columns[i].push(val.clone());
770                    }
771
772                    // Output aggregate results
773                    for (i, (acc, expr)) in state
774                        .accumulators
775                        .iter_mut()
776                        .zip(&self.aggregates)
777                        .enumerate()
778                    {
779                        columns[self.group_by.len() + i].push(acc.finalize(expr.function));
780                    }
781                }
782            }
783        } else {
784            // Group by using regular hash map - one row per group
785            for state in self.groups.values_mut() {
786                // Output group key columns
787                for (i, val) in state.key_values.iter().enumerate() {
788                    columns[i].push(val.clone());
789                }
790
791                // Output aggregate results
792                for (i, (acc, expr)) in state
793                    .accumulators
794                    .iter_mut()
795                    .zip(&self.aggregates)
796                    .enumerate()
797                {
798                    columns[self.group_by.len() + i].push(acc.finalize(expr.function));
799                }
800            }
801        }
802
803        if !columns.is_empty() && !columns[0].is_empty() {
804            let chunk = DataChunk::new(columns);
805            sink.consume(chunk)?;
806        }
807
808        Ok(())
809    }
810
811    fn preferred_chunk_size(&self) -> ChunkSizeHint {
812        ChunkSizeHint::Default
813    }
814
815    fn name(&self) -> &'static str {
816        "SpillableAggregatePush"
817    }
818}
819
820#[cfg(test)]
821mod tests {
822    use super::*;
823    use crate::execution::sink::CollectorSink;
824
825    fn create_test_chunk(values: &[i64]) -> DataChunk {
826        let v: Vec<Value> = values.iter().map(|&i| Value::Int64(i)).collect();
827        let vector = ValueVector::from_values(&v);
828        DataChunk::new(vec![vector])
829    }
830
831    fn create_two_column_chunk(col1: &[i64], col2: &[i64]) -> DataChunk {
832        let v1: Vec<Value> = col1.iter().map(|&i| Value::Int64(i)).collect();
833        let v2: Vec<Value> = col2.iter().map(|&i| Value::Int64(i)).collect();
834        DataChunk::new(vec![
835            ValueVector::from_values(&v1),
836            ValueVector::from_values(&v2),
837        ])
838    }
839
840    #[test]
841    fn test_global_count() {
842        let mut agg = AggregatePushOperator::global(vec![AggregateExpr::count_star()]);
843        let mut sink = CollectorSink::new();
844
845        agg.push(create_test_chunk(&[1, 2, 3, 4, 5]), &mut sink)
846            .unwrap();
847        agg.finalize(&mut sink).unwrap();
848
849        let chunks = sink.into_chunks();
850        assert_eq!(chunks.len(), 1);
851        assert_eq!(
852            chunks[0].column(0).unwrap().get_value(0),
853            Some(Value::Int64(5))
854        );
855    }
856
857    #[test]
858    fn test_global_sum() {
859        let mut agg = AggregatePushOperator::global(vec![AggregateExpr::sum(0)]);
860        let mut sink = CollectorSink::new();
861
862        agg.push(create_test_chunk(&[1, 2, 3, 4, 5]), &mut sink)
863            .unwrap();
864        agg.finalize(&mut sink).unwrap();
865
866        let chunks = sink.into_chunks();
867        assert_eq!(
868            chunks[0].column(0).unwrap().get_value(0),
869            Some(Value::Float64(15.0))
870        );
871    }
872
873    #[test]
874    fn test_global_min_max() {
875        let mut agg =
876            AggregatePushOperator::global(vec![AggregateExpr::min(0), AggregateExpr::max(0)]);
877        let mut sink = CollectorSink::new();
878
879        agg.push(create_test_chunk(&[3, 1, 4, 1, 5, 9, 2, 6]), &mut sink)
880            .unwrap();
881        agg.finalize(&mut sink).unwrap();
882
883        let chunks = sink.into_chunks();
884        assert_eq!(
885            chunks[0].column(0).unwrap().get_value(0),
886            Some(Value::Int64(1))
887        );
888        assert_eq!(
889            chunks[0].column(1).unwrap().get_value(0),
890            Some(Value::Int64(9))
891        );
892    }
893
894    #[test]
895    fn test_group_by_sum() {
896        // Group by column 0, sum column 1
897        let mut agg = AggregatePushOperator::new(vec![0], vec![AggregateExpr::sum(1)]);
898        let mut sink = CollectorSink::new();
899
900        // Group 1: 10, 20 (sum=30), Group 2: 30, 40 (sum=70)
901        agg.push(
902            create_two_column_chunk(&[1, 1, 2, 2], &[10, 20, 30, 40]),
903            &mut sink,
904        )
905        .unwrap();
906        agg.finalize(&mut sink).unwrap();
907
908        let chunks = sink.into_chunks();
909        assert_eq!(chunks[0].len(), 2); // 2 groups
910    }
911
912    #[test]
913    #[cfg(feature = "spill")]
914    fn test_spillable_aggregate_no_spill() {
915        // When threshold is not reached, should work like normal aggregate
916        let mut agg = SpillableAggregatePushOperator::new(vec![0], vec![AggregateExpr::sum(1)])
917            .with_threshold(100);
918        let mut sink = CollectorSink::new();
919
920        agg.push(
921            create_two_column_chunk(&[1, 1, 2, 2], &[10, 20, 30, 40]),
922            &mut sink,
923        )
924        .unwrap();
925        agg.finalize(&mut sink).unwrap();
926
927        let chunks = sink.into_chunks();
928        assert_eq!(chunks[0].len(), 2); // 2 groups
929    }
930
931    #[test]
932    #[cfg(feature = "spill")]
933    fn test_spillable_aggregate_with_spilling() {
934        use tempfile::TempDir;
935
936        let temp_dir = TempDir::new().unwrap();
937        let manager = Arc::new(SpillManager::new(temp_dir.path()).unwrap());
938
939        // Set very low threshold to force spilling
940        let mut agg = SpillableAggregatePushOperator::with_spilling(
941            vec![0],
942            vec![AggregateExpr::sum(1)],
943            manager,
944            3, // Spill after 3 groups
945        );
946        let mut sink = CollectorSink::new();
947
948        // Create 10 different groups
949        for i in 0..10 {
950            let chunk = create_two_column_chunk(&[i], &[i * 10]);
951            agg.push(chunk, &mut sink).unwrap();
952        }
953        agg.finalize(&mut sink).unwrap();
954
955        let chunks = sink.into_chunks();
956        assert_eq!(chunks.len(), 1);
957        assert_eq!(chunks[0].len(), 10); // 10 groups
958
959        // Verify sums are correct
960        let mut sums: Vec<f64> = Vec::new();
961        for i in 0..chunks[0].len() {
962            if let Some(Value::Float64(sum)) = chunks[0].column(1).unwrap().get_value(i) {
963                sums.push(sum);
964            }
965        }
966        sums.sort_by(|a, b| a.partial_cmp(b).unwrap());
967        assert_eq!(
968            sums,
969            vec![0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0]
970        );
971    }
972
973    #[test]
974    #[cfg(feature = "spill")]
975    fn test_spillable_aggregate_global() {
976        // Global aggregation shouldn't be affected by spilling
977        let mut agg = SpillableAggregatePushOperator::global(vec![AggregateExpr::count_star()]);
978        let mut sink = CollectorSink::new();
979
980        agg.push(create_test_chunk(&[1, 2, 3, 4, 5]), &mut sink)
981            .unwrap();
982        agg.finalize(&mut sink).unwrap();
983
984        let chunks = sink.into_chunks();
985        assert_eq!(chunks.len(), 1);
986        assert_eq!(
987            chunks[0].column(0).unwrap().get_value(0),
988            Some(Value::Int64(5))
989        );
990    }
991
992    #[test]
993    #[cfg(feature = "spill")]
994    fn test_spillable_aggregate_many_groups() {
995        use tempfile::TempDir;
996
997        let temp_dir = TempDir::new().unwrap();
998        let manager = Arc::new(SpillManager::new(temp_dir.path()).unwrap());
999
1000        let mut agg = SpillableAggregatePushOperator::with_spilling(
1001            vec![0],
1002            vec![AggregateExpr::count_star()],
1003            manager,
1004            10, // Very low threshold
1005        );
1006        let mut sink = CollectorSink::new();
1007
1008        // Create 100 different groups
1009        for i in 0..100 {
1010            let chunk = create_test_chunk(&[i]);
1011            agg.push(chunk, &mut sink).unwrap();
1012        }
1013        agg.finalize(&mut sink).unwrap();
1014
1015        let chunks = sink.into_chunks();
1016        assert_eq!(chunks.len(), 1);
1017        assert_eq!(chunks[0].len(), 100); // 100 groups
1018
1019        // Each group should have count = 1
1020        for i in 0..100 {
1021            if let Some(Value::Int64(count)) = chunks[0].column(1).unwrap().get_value(i) {
1022                assert_eq!(count, 1);
1023            }
1024        }
1025    }
1026
1027    // ---------------------------------------------------------------
1028    // hash_value coverage for all Value variants
1029    // ---------------------------------------------------------------
1030
1031    #[test]
1032    fn hash_value_null() {
1033        let h = hash_value(&Value::Null);
1034        assert_ne!(h, 0); // hasher produces non-zero for Null discriminant
1035    }
1036
1037    #[test]
1038    fn hash_value_bool() {
1039        let t = hash_value(&Value::Bool(true));
1040        let f = hash_value(&Value::Bool(false));
1041        assert_ne!(t, f);
1042    }
1043
1044    #[test]
1045    fn hash_value_int64() {
1046        let a = hash_value(&Value::Int64(42));
1047        let b = hash_value(&Value::Int64(43));
1048        assert_ne!(a, b);
1049    }
1050
1051    #[test]
1052    fn hash_value_float64() {
1053        let a = hash_value(&Value::Float64(19.88));
1054        let b = hash_value(&Value::Float64(3.19));
1055        assert_ne!(a, b);
1056    }
1057
1058    #[test]
1059    fn hash_value_string() {
1060        let a = hash_value(&Value::String("hello".into()));
1061        let b = hash_value(&Value::String("world".into()));
1062        assert_ne!(a, b);
1063    }
1064
1065    #[test]
1066    fn hash_value_bytes() {
1067        let a = hash_value(&Value::Bytes(vec![1, 2, 3].into()));
1068        let b = hash_value(&Value::Bytes(vec![4, 5, 6].into()));
1069        assert_ne!(a, b);
1070    }
1071
1072    #[test]
1073    fn hash_value_list() {
1074        let a = hash_value(&Value::List(vec![Value::Int64(1), Value::Int64(2)].into()));
1075        let b = hash_value(&Value::List(vec![Value::Int64(3)].into()));
1076        assert_ne!(a, b);
1077    }
1078
1079    #[test]
1080    fn hash_value_map() {
1081        use grafeo_common::types::PropertyKey;
1082        use std::collections::BTreeMap;
1083        use std::sync::Arc;
1084        let mut map = BTreeMap::new();
1085        map.insert(PropertyKey::new("key"), Value::Int64(42));
1086        let h = hash_value(&Value::Map(Arc::new(map)));
1087        assert_ne!(h, 0);
1088    }
1089
1090    #[test]
1091    fn hash_value_vector() {
1092        let h = hash_value(&Value::Vector(vec![1.0, 2.0, 3.0].into()));
1093        assert_ne!(h, 0);
1094    }
1095
1096    #[test]
1097    fn hash_value_path() {
1098        let h = hash_value(&Value::Path {
1099            nodes: vec![Value::Int64(1), Value::Int64(2)].into(),
1100            edges: vec![Value::Int64(10)].into(),
1101        });
1102        assert_ne!(h, 0);
1103    }
1104
1105    #[test]
1106    fn hash_value_gcounter() {
1107        use std::sync::Arc;
1108        let mut map = std::collections::HashMap::new();
1109        map.insert("replica1".to_string(), 10u64);
1110        let h = hash_value(&Value::GCounter(Arc::new(map)));
1111        assert_ne!(h, 0);
1112    }
1113
1114    #[test]
1115    fn hash_value_on_counter() {
1116        use std::sync::Arc;
1117        let mut pos = std::collections::HashMap::new();
1118        pos.insert("replica1".to_string(), 10u64);
1119        let neg = std::collections::HashMap::new();
1120        let h = hash_value(&Value::OnCounter {
1121            pos: Arc::new(pos),
1122            neg: Arc::new(neg),
1123        });
1124        assert_ne!(h, 0);
1125    }
1126
1127    #[test]
1128    fn hash_value_timestamp() {
1129        use grafeo_common::types::Timestamp;
1130        let h = hash_value(&Value::Timestamp(Timestamp::from_micros(1_700_000_000_000)));
1131        assert_ne!(h, 0);
1132    }
1133
1134    #[test]
1135    fn hash_value_date() {
1136        use grafeo_common::types::Date;
1137        let h = hash_value(&Value::Date(Date::from_days(19000)));
1138        assert_ne!(h, 0);
1139    }
1140
1141    #[test]
1142    fn hash_value_time() {
1143        use grafeo_common::types::Time;
1144        let h = hash_value(&Value::Time(Time::from_hms(12, 0, 0).unwrap()));
1145        assert_ne!(h, 0);
1146    }
1147
1148    #[test]
1149    fn hash_value_duration() {
1150        use grafeo_common::types::Duration;
1151        let h = hash_value(&Value::Duration(Duration::from_days(1)));
1152        assert_ne!(h, 0);
1153    }
1154
1155    #[test]
1156    fn hash_value_zoned_datetime() {
1157        use grafeo_common::types::{Timestamp, ZonedDatetime};
1158        let zdt =
1159            ZonedDatetime::from_timestamp_offset(Timestamp::from_micros(1_700_000_000_000), 3600);
1160        let h = hash_value(&Value::ZonedDatetime(zdt));
1161        assert_ne!(h, 0);
1162    }
1163
1164    // ---------------------------------------------------------------
1165    // Accumulator finalize for advanced functions (fallback to Null)
1166    // ---------------------------------------------------------------
1167
1168    #[test]
1169    fn finalize_advanced_functions_return_null() {
1170        let advanced = [
1171            AggregateFunction::Last,
1172            AggregateFunction::Collect,
1173            AggregateFunction::StdDev,
1174            AggregateFunction::StdDevPop,
1175            AggregateFunction::Variance,
1176            AggregateFunction::VariancePop,
1177            AggregateFunction::PercentileDisc,
1178            AggregateFunction::PercentileCont,
1179            AggregateFunction::GroupConcat,
1180            AggregateFunction::Sample,
1181            AggregateFunction::CovarSamp,
1182            AggregateFunction::CovarPop,
1183            AggregateFunction::Corr,
1184            AggregateFunction::RegrSlope,
1185            AggregateFunction::RegrIntercept,
1186            AggregateFunction::RegrR2,
1187            AggregateFunction::RegrCount,
1188            AggregateFunction::RegrSxx,
1189            AggregateFunction::RegrSyy,
1190            AggregateFunction::RegrSxy,
1191            AggregateFunction::RegrAvgx,
1192            AggregateFunction::RegrAvgy,
1193        ];
1194
1195        for func in advanced {
1196            let mut acc = Accumulator::new();
1197            acc.add(&Value::Int64(42));
1198            let result = acc.finalize(func);
1199            assert_eq!(
1200                result,
1201                Value::Null,
1202                "Advanced function {func:?} should return Null in push accumulator"
1203            );
1204        }
1205    }
1206
1207    #[test]
1208    fn finalize_first_returns_first_value() {
1209        let mut acc = Accumulator::new();
1210        acc.add(&Value::Int64(10));
1211        acc.add(&Value::Int64(20));
1212        assert_eq!(acc.finalize(AggregateFunction::First), Value::Int64(10));
1213    }
1214
1215    #[test]
1216    fn finalize_avg_empty_returns_null() {
1217        let mut acc = Accumulator::new();
1218        assert_eq!(acc.finalize(AggregateFunction::Avg), Value::Null);
1219    }
1220
1221    #[test]
1222    fn finalize_sum_empty_returns_null() {
1223        let mut acc = Accumulator::new();
1224        assert_eq!(acc.finalize(AggregateFunction::Sum), Value::Null);
1225    }
1226
1227    #[test]
1228    fn finalize_min_max_empty_returns_null() {
1229        let mut acc_min = Accumulator::new();
1230        let mut acc_max = Accumulator::new();
1231        assert_eq!(acc_min.finalize(AggregateFunction::Min), Value::Null);
1232        assert_eq!(acc_max.finalize(AggregateFunction::Max), Value::Null);
1233    }
1234
1235    #[test]
1236    fn accumulator_skips_nulls() {
1237        let mut acc = Accumulator::new();
1238        acc.add(&Value::Null);
1239        acc.add(&Value::Int64(5));
1240        acc.add(&Value::Null);
1241        assert_eq!(acc.count, 1);
1242        assert_eq!(acc.finalize(AggregateFunction::Count), Value::Int64(1));
1243    }
1244
1245    #[test]
1246    fn test_empty_chunk_returns_ok() {
1247        let mut agg = AggregatePushOperator::global(vec![AggregateExpr::count_star()]);
1248        let mut sink = CollectorSink::new();
1249        let empty = DataChunk::new(vec![ValueVector::new()]);
1250        let result = agg.push(empty, &mut sink).unwrap();
1251        assert!(result);
1252    }
1253}