1use crate::execution::chunk::DataChunk;
4use crate::execution::operators::OperatorError;
5use crate::execution::operators::accumulator::{AggregateExpr, AggregateFunction};
6use crate::execution::pipeline::{ChunkSizeHint, PushOperator, Sink};
7#[cfg(feature = "spill")]
8use crate::execution::spill::{PartitionedState, SpillManager};
9use crate::execution::vector::ValueVector;
10use grafeo_common::types::Value;
11use std::collections::HashMap;
12#[cfg(feature = "spill")]
13use std::io::{Read, Write};
14#[cfg(feature = "spill")]
15use std::sync::Arc;
16
17#[derive(Debug, Clone, Default)]
19struct Accumulator {
20 count: i64,
21 sum: f64,
22 min: Option<Value>,
23 max: Option<Value>,
24 first: Option<Value>,
25}
26
27impl Accumulator {
28 fn new() -> Self {
29 Self {
30 count: 0,
31 sum: 0.0,
32 min: None,
33 max: None,
34 first: None,
35 }
36 }
37
38 fn add(&mut self, value: &Value) {
39 if matches!(value, Value::Null) {
41 return;
42 }
43
44 self.count += 1;
45
46 if let Some(n) = value_to_f64(value) {
48 self.sum += n;
49 }
50
51 if self.min.is_none() || compare_for_min(&self.min, value) {
53 self.min = Some(value.clone());
54 }
55
56 if self.max.is_none() || compare_for_max(&self.max, value) {
58 self.max = Some(value.clone());
59 }
60
61 if self.first.is_none() {
63 self.first = Some(value.clone());
64 }
65 }
66
67 fn finalize(&mut self, func: AggregateFunction) -> Value {
68 match func {
69 AggregateFunction::Count | AggregateFunction::CountNonNull => Value::Int64(self.count),
70 AggregateFunction::Sum => {
71 if self.count == 0 {
72 Value::Null
73 } else {
74 Value::Float64(self.sum)
75 }
76 }
77 AggregateFunction::Min => self.min.take().unwrap_or(Value::Null),
78 AggregateFunction::Max => self.max.take().unwrap_or(Value::Null),
79 AggregateFunction::Avg => {
80 if self.count == 0 {
81 Value::Null
82 } else {
83 Value::Float64(self.sum / self.count as f64)
84 }
85 }
86 AggregateFunction::First => self.first.take().unwrap_or(Value::Null),
87 AggregateFunction::Last
90 | AggregateFunction::Collect
91 | AggregateFunction::StdDev
92 | AggregateFunction::StdDevPop
93 | AggregateFunction::Variance
94 | AggregateFunction::VariancePop
95 | AggregateFunction::PercentileDisc
96 | AggregateFunction::PercentileCont
97 | AggregateFunction::GroupConcat
98 | AggregateFunction::Sample
99 | AggregateFunction::CovarSamp
100 | AggregateFunction::CovarPop
101 | AggregateFunction::Corr
102 | AggregateFunction::RegrSlope
103 | AggregateFunction::RegrIntercept
104 | AggregateFunction::RegrR2
105 | AggregateFunction::RegrCount
106 | AggregateFunction::RegrSxx
107 | AggregateFunction::RegrSyy
108 | AggregateFunction::RegrSxy
109 | AggregateFunction::RegrAvgx
110 | AggregateFunction::RegrAvgy => Value::Null,
111 }
112 }
113}
114
115use crate::execution::operators::value_utils::{
116 is_greater_than as compare_for_max, is_less_than as compare_for_min, value_to_f64,
117};
118
119#[derive(Debug, Clone, PartialEq, Eq, Hash)]
121struct GroupKey(Vec<u64>);
122
123impl GroupKey {
124 fn from_row(chunk: &DataChunk, row: usize, group_by: &[usize]) -> Self {
125 let hashes: Vec<u64> = group_by
126 .iter()
127 .map(|&col| {
128 chunk
129 .column(col)
130 .and_then(|c| c.get_value(row))
131 .map_or(0, |v| hash_value(&v))
132 })
133 .collect();
134 Self(hashes)
135 }
136}
137
138fn hash_value(value: &Value) -> u64 {
139 use std::collections::hash_map::DefaultHasher;
140 use std::hash::{Hash, Hasher};
141
142 let mut hasher = DefaultHasher::new();
143 match value {
145 Value::Null => 0u8.hash(&mut hasher),
146 Value::Bool(b) => {
147 1u8.hash(&mut hasher);
148 b.hash(&mut hasher);
149 }
150 Value::Int64(i) => {
151 2u8.hash(&mut hasher);
152 i.hash(&mut hasher);
153 }
154 Value::Float64(f) => {
155 3u8.hash(&mut hasher);
156 f.to_bits().hash(&mut hasher);
157 }
158 Value::String(s) => {
159 4u8.hash(&mut hasher);
160 s.hash(&mut hasher);
161 }
162 Value::Bytes(b) => {
163 5u8.hash(&mut hasher);
164 b.hash(&mut hasher);
165 }
166 Value::Timestamp(t) => {
167 6u8.hash(&mut hasher);
168 t.hash(&mut hasher);
169 }
170 Value::Date(d) => {
171 7u8.hash(&mut hasher);
172 d.hash(&mut hasher);
173 }
174 Value::Time(t) => {
175 8u8.hash(&mut hasher);
176 t.hash(&mut hasher);
177 }
178 Value::Duration(d) => {
179 9u8.hash(&mut hasher);
180 d.hash(&mut hasher);
181 }
182 Value::ZonedDatetime(zdt) => {
183 10u8.hash(&mut hasher);
184 zdt.hash(&mut hasher);
185 }
186 Value::List(list) => {
187 11u8.hash(&mut hasher);
188 list.len().hash(&mut hasher);
189 for elem in list.iter() {
190 hash_value(elem).hash(&mut hasher);
191 }
192 }
193 Value::Map(map) => {
194 12u8.hash(&mut hasher);
195 map.len().hash(&mut hasher);
196 for (k, v) in map.as_ref() {
198 k.as_str().hash(&mut hasher);
199 hash_value(v).hash(&mut hasher);
200 }
201 }
202 Value::Vector(vec) => {
203 13u8.hash(&mut hasher);
204 vec.len().hash(&mut hasher);
205 for f in vec.iter() {
206 f.to_bits().hash(&mut hasher);
207 }
208 }
209 Value::Path { nodes, edges } => {
210 14u8.hash(&mut hasher);
211 nodes.len().hash(&mut hasher);
212 for n in nodes.iter() {
213 hash_value(n).hash(&mut hasher);
214 }
215 for e in edges.iter() {
216 hash_value(e).hash(&mut hasher);
217 }
218 }
219 Value::GCounter(map) => {
220 15u8.hash(&mut hasher);
221 let mut entries: Vec<_> = map.iter().collect();
222 entries.sort_by_key(|(k, _)| *k);
223 for (k, v) in entries {
224 k.hash(&mut hasher);
225 v.hash(&mut hasher);
226 }
227 }
228 Value::OnCounter { pos, neg } => {
229 16u8.hash(&mut hasher);
230 let mut pos_entries: Vec<_> = pos.iter().collect();
231 pos_entries.sort_by_key(|(k, _)| *k);
232 for (k, v) in pos_entries {
233 k.hash(&mut hasher);
234 v.hash(&mut hasher);
235 }
236 let mut neg_entries: Vec<_> = neg.iter().collect();
237 neg_entries.sort_by_key(|(k, _)| *k);
238 for (k, v) in neg_entries {
239 k.hash(&mut hasher);
240 v.hash(&mut hasher);
241 }
242 }
243 }
244 hasher.finish()
245}
246
247#[derive(Clone)]
249struct GroupState {
250 key_values: Vec<Value>,
251 accumulators: Vec<Accumulator>,
252}
253
254pub struct AggregatePushOperator {
259 group_by: Vec<usize>,
261 aggregates: Vec<AggregateExpr>,
263 groups: HashMap<GroupKey, GroupState>,
265 global_state: Option<Vec<Accumulator>>,
267}
268
269impl AggregatePushOperator {
270 pub fn new(group_by: Vec<usize>, aggregates: Vec<AggregateExpr>) -> Self {
272 let global_state = if group_by.is_empty() {
273 Some(aggregates.iter().map(|_| Accumulator::new()).collect())
274 } else {
275 None
276 };
277
278 Self {
279 group_by,
280 aggregates,
281 groups: HashMap::new(),
282 global_state,
283 }
284 }
285
286 pub fn global(aggregates: Vec<AggregateExpr>) -> Self {
288 Self::new(Vec::new(), aggregates)
289 }
290}
291
292impl PushOperator for AggregatePushOperator {
293 fn push(&mut self, chunk: DataChunk, _sink: &mut dyn Sink) -> Result<bool, OperatorError> {
294 if chunk.is_empty() {
295 return Ok(true);
296 }
297
298 for row in chunk.selected_indices() {
299 if self.group_by.is_empty() {
300 if let Some(ref mut accumulators) = self.global_state {
302 for (acc, expr) in accumulators.iter_mut().zip(&self.aggregates) {
303 if let Some(col) = expr.column {
304 if let Some(c) = chunk.column(col)
305 && let Some(val) = c.get_value(row)
306 {
307 acc.add(&val);
308 }
309 } else {
310 acc.count += 1;
312 }
313 }
314 }
315 } else {
316 let key = GroupKey::from_row(&chunk, row, &self.group_by);
318
319 let state = self.groups.entry(key).or_insert_with(|| {
320 let key_values: Vec<Value> = self
321 .group_by
322 .iter()
323 .map(|&col| {
324 chunk
325 .column(col)
326 .and_then(|c| c.get_value(row))
327 .unwrap_or(Value::Null)
328 })
329 .collect();
330
331 GroupState {
332 key_values,
333 accumulators: self.aggregates.iter().map(|_| Accumulator::new()).collect(),
334 }
335 });
336
337 for (acc, expr) in state.accumulators.iter_mut().zip(&self.aggregates) {
338 if let Some(col) = expr.column {
339 if let Some(c) = chunk.column(col)
340 && let Some(val) = c.get_value(row)
341 {
342 acc.add(&val);
343 }
344 } else {
345 acc.count += 1;
347 }
348 }
349 }
350 }
351
352 Ok(true)
353 }
354
355 fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError> {
356 let num_output_cols = self.group_by.len() + self.aggregates.len();
357 let mut columns: Vec<ValueVector> =
358 (0..num_output_cols).map(|_| ValueVector::new()).collect();
359
360 if self.group_by.is_empty() {
361 if let Some(ref mut accumulators) = self.global_state {
363 for (i, (acc, expr)) in accumulators.iter_mut().zip(&self.aggregates).enumerate() {
364 columns[i].push(acc.finalize(expr.function));
365 }
366 }
367 } else {
368 for state in self.groups.values_mut() {
370 for (i, val) in state.key_values.iter().enumerate() {
372 columns[i].push(val.clone());
373 }
374
375 for (i, (acc, expr)) in state
377 .accumulators
378 .iter_mut()
379 .zip(&self.aggregates)
380 .enumerate()
381 {
382 columns[self.group_by.len() + i].push(acc.finalize(expr.function));
383 }
384 }
385 }
386
387 if !columns.is_empty() && !columns[0].is_empty() {
388 let chunk = DataChunk::new(columns);
389 sink.consume(chunk)?;
390 }
391
392 Ok(())
393 }
394
395 fn preferred_chunk_size(&self) -> ChunkSizeHint {
396 ChunkSizeHint::Default
397 }
398
399 fn name(&self) -> &'static str {
400 "AggregatePush"
401 }
402}
403
404#[cfg(feature = "spill")]
406pub const DEFAULT_AGGREGATE_SPILL_THRESHOLD: usize = 50_000;
407
408#[cfg(feature = "spill")]
410fn serialize_group_state(state: &GroupState, w: &mut dyn Write) -> std::io::Result<()> {
411 use crate::execution::spill::serialize_value;
412
413 w.write_all(&(state.key_values.len() as u64).to_le_bytes())?;
415 for val in &state.key_values {
416 serialize_value(val, w)?;
417 }
418
419 w.write_all(&(state.accumulators.len() as u64).to_le_bytes())?;
421 for acc in &state.accumulators {
422 w.write_all(&acc.count.to_le_bytes())?;
423 w.write_all(&acc.sum.to_bits().to_le_bytes())?;
424
425 let has_min = acc.min.is_some();
427 w.write_all(&[has_min as u8])?;
428 if let Some(ref v) = acc.min {
429 serialize_value(v, w)?;
430 }
431
432 let has_max = acc.max.is_some();
434 w.write_all(&[has_max as u8])?;
435 if let Some(ref v) = acc.max {
436 serialize_value(v, w)?;
437 }
438
439 let has_first = acc.first.is_some();
441 w.write_all(&[has_first as u8])?;
442 if let Some(ref v) = acc.first {
443 serialize_value(v, w)?;
444 }
445 }
446
447 Ok(())
448}
449
450#[cfg(feature = "spill")]
452fn deserialize_group_state(r: &mut dyn Read) -> std::io::Result<GroupState> {
453 use crate::execution::spill::deserialize_value;
454
455 let mut len_buf = [0u8; 8];
457 r.read_exact(&mut len_buf)?;
458 let num_keys = u64::from_le_bytes(len_buf) as usize;
459
460 let mut key_values = Vec::with_capacity(num_keys);
461 for _ in 0..num_keys {
462 key_values.push(deserialize_value(r)?);
463 }
464
465 r.read_exact(&mut len_buf)?;
467 let num_accumulators = u64::from_le_bytes(len_buf) as usize;
468
469 let mut accumulators = Vec::with_capacity(num_accumulators);
470 for _ in 0..num_accumulators {
471 let mut count_buf = [0u8; 8];
472 r.read_exact(&mut count_buf)?;
473 let count = i64::from_le_bytes(count_buf);
474
475 r.read_exact(&mut count_buf)?;
476 let sum = f64::from_bits(u64::from_le_bytes(count_buf));
477
478 let mut flag_buf = [0u8; 1];
480 r.read_exact(&mut flag_buf)?;
481 let min = if flag_buf[0] != 0 {
482 Some(deserialize_value(r)?)
483 } else {
484 None
485 };
486
487 r.read_exact(&mut flag_buf)?;
489 let max = if flag_buf[0] != 0 {
490 Some(deserialize_value(r)?)
491 } else {
492 None
493 };
494
495 r.read_exact(&mut flag_buf)?;
497 let first = if flag_buf[0] != 0 {
498 Some(deserialize_value(r)?)
499 } else {
500 None
501 };
502
503 accumulators.push(Accumulator {
504 count,
505 sum,
506 min,
507 max,
508 first,
509 });
510 }
511
512 Ok(GroupState {
513 key_values,
514 accumulators,
515 })
516}
517
518#[cfg(feature = "spill")]
523pub struct SpillableAggregatePushOperator {
524 group_by: Vec<usize>,
526 aggregates: Vec<AggregateExpr>,
528 spill_manager: Option<Arc<SpillManager>>,
530 partitioned_groups: Option<PartitionedState<GroupState>>,
532 groups: HashMap<GroupKey, GroupState>,
534 global_state: Option<Vec<Accumulator>>,
536 spill_threshold: usize,
538 using_partitioned: bool,
540}
541
542#[cfg(feature = "spill")]
543impl SpillableAggregatePushOperator {
544 pub fn new(group_by: Vec<usize>, aggregates: Vec<AggregateExpr>) -> Self {
546 let global_state = if group_by.is_empty() {
547 Some(aggregates.iter().map(|_| Accumulator::new()).collect())
548 } else {
549 None
550 };
551
552 Self {
553 group_by,
554 aggregates,
555 spill_manager: None,
556 partitioned_groups: None,
557 groups: HashMap::new(),
558 global_state,
559 spill_threshold: DEFAULT_AGGREGATE_SPILL_THRESHOLD,
560 using_partitioned: false,
561 }
562 }
563
564 pub fn with_spilling(
566 group_by: Vec<usize>,
567 aggregates: Vec<AggregateExpr>,
568 manager: Arc<SpillManager>,
569 threshold: usize,
570 ) -> Self {
571 let global_state = if group_by.is_empty() {
572 Some(aggregates.iter().map(|_| Accumulator::new()).collect())
573 } else {
574 None
575 };
576
577 let partitioned = PartitionedState::new(
578 Arc::clone(&manager),
579 256, serialize_group_state,
581 deserialize_group_state,
582 );
583
584 Self {
585 group_by,
586 aggregates,
587 spill_manager: Some(manager),
588 partitioned_groups: Some(partitioned),
589 groups: HashMap::new(),
590 global_state,
591 spill_threshold: threshold,
592 using_partitioned: true,
593 }
594 }
595
596 pub fn global(aggregates: Vec<AggregateExpr>) -> Self {
598 Self::new(Vec::new(), aggregates)
599 }
600
601 pub fn with_threshold(mut self, threshold: usize) -> Self {
603 self.spill_threshold = threshold;
604 self
605 }
606
607 fn maybe_spill(&mut self) -> Result<(), OperatorError> {
609 if self.global_state.is_some() {
610 return Ok(());
612 }
613
614 if let Some(ref mut partitioned) = self.partitioned_groups {
616 if partitioned.total_size() >= self.spill_threshold {
617 partitioned
618 .spill_largest()
619 .map_err(|e| OperatorError::Execution(e.to_string()))?;
620 }
621 } else if self.groups.len() >= self.spill_threshold {
622 if let Some(ref manager) = self.spill_manager {
625 let mut partitioned = PartitionedState::new(
626 Arc::clone(manager),
627 256,
628 serialize_group_state,
629 deserialize_group_state,
630 );
631
632 for (_key, state) in self.groups.drain() {
634 partitioned
635 .insert(state.key_values.clone(), state)
636 .map_err(|e| OperatorError::Execution(e.to_string()))?;
637 }
638
639 self.partitioned_groups = Some(partitioned);
640 self.using_partitioned = true;
641 }
642 }
643
644 Ok(())
645 }
646}
647
648#[cfg(feature = "spill")]
649impl PushOperator for SpillableAggregatePushOperator {
650 fn push(&mut self, chunk: DataChunk, _sink: &mut dyn Sink) -> Result<bool, OperatorError> {
651 if chunk.is_empty() {
652 return Ok(true);
653 }
654
655 for row in chunk.selected_indices() {
656 if self.group_by.is_empty() {
657 if let Some(ref mut accumulators) = self.global_state {
659 for (acc, expr) in accumulators.iter_mut().zip(&self.aggregates) {
660 if let Some(col) = expr.column {
661 if let Some(c) = chunk.column(col)
662 && let Some(val) = c.get_value(row)
663 {
664 acc.add(&val);
665 }
666 } else {
667 acc.count += 1;
668 }
669 }
670 }
671 } else if self.using_partitioned {
672 if let Some(ref mut partitioned) = self.partitioned_groups {
674 let key_values: Vec<Value> = self
675 .group_by
676 .iter()
677 .map(|&col| {
678 chunk
679 .column(col)
680 .and_then(|c| c.get_value(row))
681 .unwrap_or(Value::Null)
682 })
683 .collect();
684
685 let aggregates = &self.aggregates;
686 let state = partitioned
687 .get_or_insert_with(key_values.clone(), || GroupState {
688 key_values: key_values.clone(),
689 accumulators: aggregates.iter().map(|_| Accumulator::new()).collect(),
690 })
691 .map_err(|e| OperatorError::Execution(e.to_string()))?;
692
693 for (acc, expr) in state.accumulators.iter_mut().zip(&self.aggregates) {
694 if let Some(col) = expr.column {
695 if let Some(c) = chunk.column(col)
696 && let Some(val) = c.get_value(row)
697 {
698 acc.add(&val);
699 }
700 } else {
701 acc.count += 1;
702 }
703 }
704 }
705 } else {
706 let key = GroupKey::from_row(&chunk, row, &self.group_by);
708
709 let state = self.groups.entry(key).or_insert_with(|| {
710 let key_values: Vec<Value> = self
711 .group_by
712 .iter()
713 .map(|&col| {
714 chunk
715 .column(col)
716 .and_then(|c| c.get_value(row))
717 .unwrap_or(Value::Null)
718 })
719 .collect();
720
721 GroupState {
722 key_values,
723 accumulators: self.aggregates.iter().map(|_| Accumulator::new()).collect(),
724 }
725 });
726
727 for (acc, expr) in state.accumulators.iter_mut().zip(&self.aggregates) {
728 if let Some(col) = expr.column {
729 if let Some(c) = chunk.column(col)
730 && let Some(val) = c.get_value(row)
731 {
732 acc.add(&val);
733 }
734 } else {
735 acc.count += 1;
736 }
737 }
738 }
739 }
740
741 self.maybe_spill()?;
743
744 Ok(true)
745 }
746
747 fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError> {
748 let num_output_cols = self.group_by.len() + self.aggregates.len();
749 let mut columns: Vec<ValueVector> =
750 (0..num_output_cols).map(|_| ValueVector::new()).collect();
751
752 if self.group_by.is_empty() {
753 if let Some(ref mut accumulators) = self.global_state {
755 for (i, (acc, expr)) in accumulators.iter_mut().zip(&self.aggregates).enumerate() {
756 columns[i].push(acc.finalize(expr.function));
757 }
758 }
759 } else if self.using_partitioned {
760 if let Some(ref mut partitioned) = self.partitioned_groups {
762 let groups = partitioned
763 .drain_all()
764 .map_err(|e| OperatorError::Execution(e.to_string()))?;
765
766 for (_key, mut state) in groups {
767 for (i, val) in state.key_values.iter().enumerate() {
769 columns[i].push(val.clone());
770 }
771
772 for (i, (acc, expr)) in state
774 .accumulators
775 .iter_mut()
776 .zip(&self.aggregates)
777 .enumerate()
778 {
779 columns[self.group_by.len() + i].push(acc.finalize(expr.function));
780 }
781 }
782 }
783 } else {
784 for state in self.groups.values_mut() {
786 for (i, val) in state.key_values.iter().enumerate() {
788 columns[i].push(val.clone());
789 }
790
791 for (i, (acc, expr)) in state
793 .accumulators
794 .iter_mut()
795 .zip(&self.aggregates)
796 .enumerate()
797 {
798 columns[self.group_by.len() + i].push(acc.finalize(expr.function));
799 }
800 }
801 }
802
803 if !columns.is_empty() && !columns[0].is_empty() {
804 let chunk = DataChunk::new(columns);
805 sink.consume(chunk)?;
806 }
807
808 Ok(())
809 }
810
811 fn preferred_chunk_size(&self) -> ChunkSizeHint {
812 ChunkSizeHint::Default
813 }
814
815 fn name(&self) -> &'static str {
816 "SpillableAggregatePush"
817 }
818}
819
820#[cfg(test)]
821mod tests {
822 use super::*;
823 use crate::execution::sink::CollectorSink;
824
825 fn create_test_chunk(values: &[i64]) -> DataChunk {
826 let v: Vec<Value> = values.iter().map(|&i| Value::Int64(i)).collect();
827 let vector = ValueVector::from_values(&v);
828 DataChunk::new(vec![vector])
829 }
830
831 fn create_two_column_chunk(col1: &[i64], col2: &[i64]) -> DataChunk {
832 let v1: Vec<Value> = col1.iter().map(|&i| Value::Int64(i)).collect();
833 let v2: Vec<Value> = col2.iter().map(|&i| Value::Int64(i)).collect();
834 DataChunk::new(vec![
835 ValueVector::from_values(&v1),
836 ValueVector::from_values(&v2),
837 ])
838 }
839
840 #[test]
841 fn test_global_count() {
842 let mut agg = AggregatePushOperator::global(vec![AggregateExpr::count_star()]);
843 let mut sink = CollectorSink::new();
844
845 agg.push(create_test_chunk(&[1, 2, 3, 4, 5]), &mut sink)
846 .unwrap();
847 agg.finalize(&mut sink).unwrap();
848
849 let chunks = sink.into_chunks();
850 assert_eq!(chunks.len(), 1);
851 assert_eq!(
852 chunks[0].column(0).unwrap().get_value(0),
853 Some(Value::Int64(5))
854 );
855 }
856
857 #[test]
858 fn test_global_sum() {
859 let mut agg = AggregatePushOperator::global(vec![AggregateExpr::sum(0)]);
860 let mut sink = CollectorSink::new();
861
862 agg.push(create_test_chunk(&[1, 2, 3, 4, 5]), &mut sink)
863 .unwrap();
864 agg.finalize(&mut sink).unwrap();
865
866 let chunks = sink.into_chunks();
867 assert_eq!(
868 chunks[0].column(0).unwrap().get_value(0),
869 Some(Value::Float64(15.0))
870 );
871 }
872
873 #[test]
874 fn test_global_min_max() {
875 let mut agg =
876 AggregatePushOperator::global(vec![AggregateExpr::min(0), AggregateExpr::max(0)]);
877 let mut sink = CollectorSink::new();
878
879 agg.push(create_test_chunk(&[3, 1, 4, 1, 5, 9, 2, 6]), &mut sink)
880 .unwrap();
881 agg.finalize(&mut sink).unwrap();
882
883 let chunks = sink.into_chunks();
884 assert_eq!(
885 chunks[0].column(0).unwrap().get_value(0),
886 Some(Value::Int64(1))
887 );
888 assert_eq!(
889 chunks[0].column(1).unwrap().get_value(0),
890 Some(Value::Int64(9))
891 );
892 }
893
894 #[test]
895 fn test_group_by_sum() {
896 let mut agg = AggregatePushOperator::new(vec![0], vec![AggregateExpr::sum(1)]);
898 let mut sink = CollectorSink::new();
899
900 agg.push(
902 create_two_column_chunk(&[1, 1, 2, 2], &[10, 20, 30, 40]),
903 &mut sink,
904 )
905 .unwrap();
906 agg.finalize(&mut sink).unwrap();
907
908 let chunks = sink.into_chunks();
909 assert_eq!(chunks[0].len(), 2); }
911
912 #[test]
913 #[cfg(feature = "spill")]
914 fn test_spillable_aggregate_no_spill() {
915 let mut agg = SpillableAggregatePushOperator::new(vec![0], vec![AggregateExpr::sum(1)])
917 .with_threshold(100);
918 let mut sink = CollectorSink::new();
919
920 agg.push(
921 create_two_column_chunk(&[1, 1, 2, 2], &[10, 20, 30, 40]),
922 &mut sink,
923 )
924 .unwrap();
925 agg.finalize(&mut sink).unwrap();
926
927 let chunks = sink.into_chunks();
928 assert_eq!(chunks[0].len(), 2); }
930
931 #[test]
932 #[cfg(feature = "spill")]
933 fn test_spillable_aggregate_with_spilling() {
934 use tempfile::TempDir;
935
936 let temp_dir = TempDir::new().unwrap();
937 let manager = Arc::new(SpillManager::new(temp_dir.path()).unwrap());
938
939 let mut agg = SpillableAggregatePushOperator::with_spilling(
941 vec![0],
942 vec![AggregateExpr::sum(1)],
943 manager,
944 3, );
946 let mut sink = CollectorSink::new();
947
948 for i in 0..10 {
950 let chunk = create_two_column_chunk(&[i], &[i * 10]);
951 agg.push(chunk, &mut sink).unwrap();
952 }
953 agg.finalize(&mut sink).unwrap();
954
955 let chunks = sink.into_chunks();
956 assert_eq!(chunks.len(), 1);
957 assert_eq!(chunks[0].len(), 10); let mut sums: Vec<f64> = Vec::new();
961 for i in 0..chunks[0].len() {
962 if let Some(Value::Float64(sum)) = chunks[0].column(1).unwrap().get_value(i) {
963 sums.push(sum);
964 }
965 }
966 sums.sort_by(|a, b| a.partial_cmp(b).unwrap());
967 assert_eq!(
968 sums,
969 vec![0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0]
970 );
971 }
972
973 #[test]
974 #[cfg(feature = "spill")]
975 fn test_spillable_aggregate_global() {
976 let mut agg = SpillableAggregatePushOperator::global(vec![AggregateExpr::count_star()]);
978 let mut sink = CollectorSink::new();
979
980 agg.push(create_test_chunk(&[1, 2, 3, 4, 5]), &mut sink)
981 .unwrap();
982 agg.finalize(&mut sink).unwrap();
983
984 let chunks = sink.into_chunks();
985 assert_eq!(chunks.len(), 1);
986 assert_eq!(
987 chunks[0].column(0).unwrap().get_value(0),
988 Some(Value::Int64(5))
989 );
990 }
991
992 #[test]
993 #[cfg(feature = "spill")]
994 fn test_spillable_aggregate_many_groups() {
995 use tempfile::TempDir;
996
997 let temp_dir = TempDir::new().unwrap();
998 let manager = Arc::new(SpillManager::new(temp_dir.path()).unwrap());
999
1000 let mut agg = SpillableAggregatePushOperator::with_spilling(
1001 vec![0],
1002 vec![AggregateExpr::count_star()],
1003 manager,
1004 10, );
1006 let mut sink = CollectorSink::new();
1007
1008 for i in 0..100 {
1010 let chunk = create_test_chunk(&[i]);
1011 agg.push(chunk, &mut sink).unwrap();
1012 }
1013 agg.finalize(&mut sink).unwrap();
1014
1015 let chunks = sink.into_chunks();
1016 assert_eq!(chunks.len(), 1);
1017 assert_eq!(chunks[0].len(), 100); for i in 0..100 {
1021 if let Some(Value::Int64(count)) = chunks[0].column(1).unwrap().get_value(i) {
1022 assert_eq!(count, 1);
1023 }
1024 }
1025 }
1026
1027 #[test]
1032 fn hash_value_null() {
1033 let h = hash_value(&Value::Null);
1034 assert_ne!(h, 0); }
1036
1037 #[test]
1038 fn hash_value_bool() {
1039 let t = hash_value(&Value::Bool(true));
1040 let f = hash_value(&Value::Bool(false));
1041 assert_ne!(t, f);
1042 }
1043
1044 #[test]
1045 fn hash_value_int64() {
1046 let a = hash_value(&Value::Int64(42));
1047 let b = hash_value(&Value::Int64(43));
1048 assert_ne!(a, b);
1049 }
1050
1051 #[test]
1052 fn hash_value_float64() {
1053 let a = hash_value(&Value::Float64(19.88));
1054 let b = hash_value(&Value::Float64(3.19));
1055 assert_ne!(a, b);
1056 }
1057
1058 #[test]
1059 fn hash_value_string() {
1060 let a = hash_value(&Value::String("hello".into()));
1061 let b = hash_value(&Value::String("world".into()));
1062 assert_ne!(a, b);
1063 }
1064
1065 #[test]
1066 fn hash_value_bytes() {
1067 let a = hash_value(&Value::Bytes(vec![1, 2, 3].into()));
1068 let b = hash_value(&Value::Bytes(vec![4, 5, 6].into()));
1069 assert_ne!(a, b);
1070 }
1071
1072 #[test]
1073 fn hash_value_list() {
1074 let a = hash_value(&Value::List(vec![Value::Int64(1), Value::Int64(2)].into()));
1075 let b = hash_value(&Value::List(vec![Value::Int64(3)].into()));
1076 assert_ne!(a, b);
1077 }
1078
1079 #[test]
1080 fn hash_value_map() {
1081 use grafeo_common::types::PropertyKey;
1082 use std::collections::BTreeMap;
1083 use std::sync::Arc;
1084 let mut map = BTreeMap::new();
1085 map.insert(PropertyKey::new("key"), Value::Int64(42));
1086 let h = hash_value(&Value::Map(Arc::new(map)));
1087 assert_ne!(h, 0);
1088 }
1089
1090 #[test]
1091 fn hash_value_vector() {
1092 let h = hash_value(&Value::Vector(vec![1.0, 2.0, 3.0].into()));
1093 assert_ne!(h, 0);
1094 }
1095
1096 #[test]
1097 fn hash_value_path() {
1098 let h = hash_value(&Value::Path {
1099 nodes: vec![Value::Int64(1), Value::Int64(2)].into(),
1100 edges: vec![Value::Int64(10)].into(),
1101 });
1102 assert_ne!(h, 0);
1103 }
1104
1105 #[test]
1106 fn hash_value_gcounter() {
1107 use std::sync::Arc;
1108 let mut map = std::collections::HashMap::new();
1109 map.insert("replica1".to_string(), 10u64);
1110 let h = hash_value(&Value::GCounter(Arc::new(map)));
1111 assert_ne!(h, 0);
1112 }
1113
1114 #[test]
1115 fn hash_value_on_counter() {
1116 use std::sync::Arc;
1117 let mut pos = std::collections::HashMap::new();
1118 pos.insert("replica1".to_string(), 10u64);
1119 let neg = std::collections::HashMap::new();
1120 let h = hash_value(&Value::OnCounter {
1121 pos: Arc::new(pos),
1122 neg: Arc::new(neg),
1123 });
1124 assert_ne!(h, 0);
1125 }
1126
1127 #[test]
1128 fn hash_value_timestamp() {
1129 use grafeo_common::types::Timestamp;
1130 let h = hash_value(&Value::Timestamp(Timestamp::from_micros(1_700_000_000_000)));
1131 assert_ne!(h, 0);
1132 }
1133
1134 #[test]
1135 fn hash_value_date() {
1136 use grafeo_common::types::Date;
1137 let h = hash_value(&Value::Date(Date::from_days(19000)));
1138 assert_ne!(h, 0);
1139 }
1140
1141 #[test]
1142 fn hash_value_time() {
1143 use grafeo_common::types::Time;
1144 let h = hash_value(&Value::Time(Time::from_hms(12, 0, 0).unwrap()));
1145 assert_ne!(h, 0);
1146 }
1147
1148 #[test]
1149 fn hash_value_duration() {
1150 use grafeo_common::types::Duration;
1151 let h = hash_value(&Value::Duration(Duration::from_days(1)));
1152 assert_ne!(h, 0);
1153 }
1154
1155 #[test]
1156 fn hash_value_zoned_datetime() {
1157 use grafeo_common::types::{Timestamp, ZonedDatetime};
1158 let zdt =
1159 ZonedDatetime::from_timestamp_offset(Timestamp::from_micros(1_700_000_000_000), 3600);
1160 let h = hash_value(&Value::ZonedDatetime(zdt));
1161 assert_ne!(h, 0);
1162 }
1163
1164 #[test]
1169 fn finalize_advanced_functions_return_null() {
1170 let advanced = [
1171 AggregateFunction::Last,
1172 AggregateFunction::Collect,
1173 AggregateFunction::StdDev,
1174 AggregateFunction::StdDevPop,
1175 AggregateFunction::Variance,
1176 AggregateFunction::VariancePop,
1177 AggregateFunction::PercentileDisc,
1178 AggregateFunction::PercentileCont,
1179 AggregateFunction::GroupConcat,
1180 AggregateFunction::Sample,
1181 AggregateFunction::CovarSamp,
1182 AggregateFunction::CovarPop,
1183 AggregateFunction::Corr,
1184 AggregateFunction::RegrSlope,
1185 AggregateFunction::RegrIntercept,
1186 AggregateFunction::RegrR2,
1187 AggregateFunction::RegrCount,
1188 AggregateFunction::RegrSxx,
1189 AggregateFunction::RegrSyy,
1190 AggregateFunction::RegrSxy,
1191 AggregateFunction::RegrAvgx,
1192 AggregateFunction::RegrAvgy,
1193 ];
1194
1195 for func in advanced {
1196 let mut acc = Accumulator::new();
1197 acc.add(&Value::Int64(42));
1198 let result = acc.finalize(func);
1199 assert_eq!(
1200 result,
1201 Value::Null,
1202 "Advanced function {func:?} should return Null in push accumulator"
1203 );
1204 }
1205 }
1206
1207 #[test]
1208 fn finalize_first_returns_first_value() {
1209 let mut acc = Accumulator::new();
1210 acc.add(&Value::Int64(10));
1211 acc.add(&Value::Int64(20));
1212 assert_eq!(acc.finalize(AggregateFunction::First), Value::Int64(10));
1213 }
1214
1215 #[test]
1216 fn finalize_avg_empty_returns_null() {
1217 let mut acc = Accumulator::new();
1218 assert_eq!(acc.finalize(AggregateFunction::Avg), Value::Null);
1219 }
1220
1221 #[test]
1222 fn finalize_sum_empty_returns_null() {
1223 let mut acc = Accumulator::new();
1224 assert_eq!(acc.finalize(AggregateFunction::Sum), Value::Null);
1225 }
1226
1227 #[test]
1228 fn finalize_min_max_empty_returns_null() {
1229 let mut acc_min = Accumulator::new();
1230 let mut acc_max = Accumulator::new();
1231 assert_eq!(acc_min.finalize(AggregateFunction::Min), Value::Null);
1232 assert_eq!(acc_max.finalize(AggregateFunction::Max), Value::Null);
1233 }
1234
1235 #[test]
1236 fn accumulator_skips_nulls() {
1237 let mut acc = Accumulator::new();
1238 acc.add(&Value::Null);
1239 acc.add(&Value::Int64(5));
1240 acc.add(&Value::Null);
1241 assert_eq!(acc.count, 1);
1242 assert_eq!(acc.finalize(AggregateFunction::Count), Value::Int64(1));
1243 }
1244
1245 #[test]
1246 fn test_empty_chunk_returns_ok() {
1247 let mut agg = AggregatePushOperator::global(vec![AggregateExpr::count_star()]);
1248 let mut sink = CollectorSink::new();
1249 let empty = DataChunk::new(vec![ValueVector::new()]);
1250 let result = agg.push(empty, &mut sink).unwrap();
1251 assert!(result);
1252 }
1253}