1use std::hash::{Hash, Hasher};
81
82use indexmap::IndexMap;
83use varpulis_core::Value;
84
85use crate::columnar::ColumnarBuffer;
86use crate::event::{Event, SharedEvent};
87
88pub type AggResult = IndexMap<String, Value>;
92
93pub trait AggregateFunc: Send + Sync {
112 fn name(&self) -> &str;
114
115 fn apply(&self, events: &[Event], field: Option<&str>) -> Value;
126
127 fn apply_shared(&self, events: &[SharedEvent], field: Option<&str>) -> Value {
131 let refs: Vec<&Event> = events.iter().map(|e| e.as_ref()).collect();
135 self.apply_refs(&refs, field)
136 }
137
138 fn apply_refs(&self, events: &[&Event], field: Option<&str>) -> Value {
140 let owned: Vec<Event> = events.iter().map(|e| (*e).clone()).collect();
143 self.apply(&owned, field)
144 }
145
146 fn apply_columnar(&self, buffer: &mut ColumnarBuffer, field: Option<&str>) -> Value {
151 self.apply_shared(buffer.events(), field)
153 }
154}
155
156#[derive(Debug)]
158pub struct Count;
159
160impl AggregateFunc for Count {
161 fn name(&self) -> &'static str {
162 "count"
163 }
164
165 fn apply(&self, events: &[Event], _field: Option<&str>) -> Value {
166 Value::Int(events.len() as i64)
167 }
168
169 fn apply_refs(&self, events: &[&Event], _field: Option<&str>) -> Value {
170 Value::Int(events.len() as i64)
171 }
172
173 fn apply_columnar(&self, buffer: &mut ColumnarBuffer, _field: Option<&str>) -> Value {
174 Value::Int(buffer.len() as i64)
175 }
176}
177
178#[derive(Debug)]
180pub struct Sum;
181
182impl AggregateFunc for Sum {
183 fn name(&self) -> &'static str {
184 "sum"
185 }
186
187 fn apply(&self, events: &[Event], field: Option<&str>) -> Value {
188 let field = field.unwrap_or("value");
189 Value::Float(crate::simd::simd_sum(events, field))
190 }
191
192 fn apply_refs(&self, events: &[&Event], field: Option<&str>) -> Value {
193 let field = field.unwrap_or("value");
194 let values: Vec<f64> = events
196 .iter()
197 .filter_map(|e| e.get_float(field))
198 .filter(|v| !v.is_nan())
199 .collect();
200 Value::Float(crate::simd::sum_f64(&values))
201 }
202
203 fn apply_columnar(&self, buffer: &mut ColumnarBuffer, field: Option<&str>) -> Value {
204 let field = field.unwrap_or("value");
205 let col = buffer.ensure_float_column(field);
206 let valid: Vec<f64> = col.iter().copied().filter(|v| !v.is_nan()).collect();
208 Value::Float(crate::simd::sum_f64(&valid))
209 }
210}
211
212#[derive(Debug)]
214pub struct Avg;
215
216impl AggregateFunc for Avg {
217 fn name(&self) -> &'static str {
218 "avg"
219 }
220
221 fn apply(&self, events: &[Event], field: Option<&str>) -> Value {
222 let field = field.unwrap_or("value");
223 match crate::simd::simd_avg(events, field) {
224 Some(avg) => Value::Float(avg),
225 None => Value::Null,
226 }
227 }
228
229 fn apply_refs(&self, events: &[&Event], field: Option<&str>) -> Value {
230 let field = field.unwrap_or("value");
231 let values: Vec<f64> = events
233 .iter()
234 .filter_map(|e| e.get_float(field))
235 .filter(|v| !v.is_nan())
236 .collect();
237
238 if values.is_empty() {
239 Value::Null
240 } else {
241 Value::Float(crate::simd::sum_f64(&values) / values.len() as f64)
242 }
243 }
244
245 fn apply_columnar(&self, buffer: &mut ColumnarBuffer, field: Option<&str>) -> Value {
246 let field = field.unwrap_or("value");
247 let col = buffer.ensure_float_column(field);
248 let valid: Vec<f64> = col.iter().copied().filter(|v| !v.is_nan()).collect();
249 if valid.is_empty() {
250 Value::Null
251 } else {
252 Value::Float(crate::simd::sum_f64(&valid) / valid.len() as f64)
253 }
254 }
255}
256
257#[derive(Debug)]
259pub struct Min;
260
261impl AggregateFunc for Min {
262 fn name(&self) -> &'static str {
263 "min"
264 }
265
266 fn apply(&self, events: &[Event], field: Option<&str>) -> Value {
267 let field = field.unwrap_or("value");
268 match crate::simd::simd_min(events, field) {
269 Some(min) => Value::Float(min),
270 None => Value::Null,
271 }
272 }
273
274 fn apply_refs(&self, events: &[&Event], field: Option<&str>) -> Value {
275 let field = field.unwrap_or("value");
276 let values: Vec<f64> = events
277 .iter()
278 .filter_map(|e| e.get_float(field))
279 .filter(|v| !v.is_nan())
280 .collect();
281 match crate::simd::min_f64(&values) {
282 Some(min) => Value::Float(min),
283 None => Value::Null,
284 }
285 }
286
287 fn apply_columnar(&self, buffer: &mut ColumnarBuffer, field: Option<&str>) -> Value {
288 let field = field.unwrap_or("value");
289 let col = buffer.ensure_float_column(field);
290 let valid: Vec<f64> = col.iter().copied().filter(|v| !v.is_nan()).collect();
291 match crate::simd::min_f64(&valid) {
292 Some(min) => Value::Float(min),
293 None => Value::Null,
294 }
295 }
296}
297
298#[derive(Debug)]
300pub struct Max;
301
302impl AggregateFunc for Max {
303 fn name(&self) -> &'static str {
304 "max"
305 }
306
307 fn apply(&self, events: &[Event], field: Option<&str>) -> Value {
308 let field = field.unwrap_or("value");
309 match crate::simd::simd_max(events, field) {
310 Some(max) => Value::Float(max),
311 None => Value::Null,
312 }
313 }
314
315 fn apply_refs(&self, events: &[&Event], field: Option<&str>) -> Value {
316 let field = field.unwrap_or("value");
317 let values: Vec<f64> = events
318 .iter()
319 .filter_map(|e| e.get_float(field))
320 .filter(|v| !v.is_nan())
321 .collect();
322 match crate::simd::max_f64(&values) {
323 Some(max) => Value::Float(max),
324 None => Value::Null,
325 }
326 }
327
328 fn apply_columnar(&self, buffer: &mut ColumnarBuffer, field: Option<&str>) -> Value {
329 let field = field.unwrap_or("value");
330 let col = buffer.ensure_float_column(field);
331 let valid: Vec<f64> = col.iter().copied().filter(|v| !v.is_nan()).collect();
332 match crate::simd::max_f64(&valid) {
333 Some(max) => Value::Float(max),
334 None => Value::Null,
335 }
336 }
337}
338
339#[derive(Debug)]
341pub struct StdDev;
342
343impl AggregateFunc for StdDev {
344 fn name(&self) -> &'static str {
345 "stddev"
346 }
347
348 fn apply(&self, events: &[Event], field: Option<&str>) -> Value {
349 let field = field.unwrap_or("value");
350
351 let mut count = 0usize;
354 let mut mean = 0.0;
355 let mut m2 = 0.0; for event in events {
358 if let Some(x) = event.get_float(field) {
359 count += 1;
360 let delta = x - mean;
361 mean += delta / count as f64;
362 let delta2 = x - mean;
363 m2 += delta * delta2;
364 }
365 }
366
367 if count < 2 {
368 return Value::Null;
369 }
370
371 let variance = m2 / (count - 1) as f64;
373 Value::Float(variance.sqrt())
374 }
375
376 fn apply_refs(&self, events: &[&Event], field: Option<&str>) -> Value {
377 let field = field.unwrap_or("value");
378
379 let mut count = 0usize;
380 let mut mean = 0.0;
381 let mut m2 = 0.0;
382
383 for event in events {
384 if let Some(x) = event.get_float(field) {
385 count += 1;
386 let delta = x - mean;
387 mean += delta / count as f64;
388 let delta2 = x - mean;
389 m2 += delta * delta2;
390 }
391 }
392
393 if count < 2 {
394 return Value::Null;
395 }
396
397 let variance = m2 / (count - 1) as f64;
398 Value::Float(variance.sqrt())
399 }
400}
401
402#[derive(Debug)]
404pub struct First;
405
406impl AggregateFunc for First {
407 fn name(&self) -> &'static str {
408 "first"
409 }
410
411 fn apply(&self, events: &[Event], field: Option<&str>) -> Value {
412 let field = field.unwrap_or("value");
413 events
414 .first()
415 .and_then(|e| e.get(field))
416 .cloned()
417 .unwrap_or(Value::Null)
418 }
419
420 fn apply_refs(&self, events: &[&Event], field: Option<&str>) -> Value {
421 let field = field.unwrap_or("value");
422 events
423 .first()
424 .and_then(|e| e.get(field))
425 .cloned()
426 .unwrap_or(Value::Null)
427 }
428}
429
430#[derive(Debug)]
432pub struct Last;
433
434impl AggregateFunc for Last {
435 fn name(&self) -> &'static str {
436 "last"
437 }
438
439 fn apply(&self, events: &[Event], field: Option<&str>) -> Value {
440 let field = field.unwrap_or("value");
441 events
442 .last()
443 .and_then(|e| e.get(field))
444 .cloned()
445 .unwrap_or(Value::Null)
446 }
447
448 fn apply_refs(&self, events: &[&Event], field: Option<&str>) -> Value {
449 let field = field.unwrap_or("value");
450 events
451 .last()
452 .and_then(|e| e.get(field))
453 .cloned()
454 .unwrap_or(Value::Null)
455 }
456}
457
458#[derive(Debug)]
461pub struct CountDistinct;
462
463impl AggregateFunc for CountDistinct {
464 fn name(&self) -> &'static str {
465 "count_distinct"
466 }
467
468 fn apply(&self, events: &[Event], field: Option<&str>) -> Value {
469 let field = field.unwrap_or("value");
470 let mut seen_hashes = std::collections::HashSet::new();
471
472 for event in events {
473 if let Some(value) = event.get(field) {
474 let mut hasher = std::collections::hash_map::DefaultHasher::new();
477 value.hash(&mut hasher);
478 seen_hashes.insert(hasher.finish());
479 }
480 }
481
482 Value::Int(seen_hashes.len() as i64)
483 }
484
485 fn apply_refs(&self, events: &[&Event], field: Option<&str>) -> Value {
486 let field = field.unwrap_or("value");
487 let mut seen_hashes = std::collections::HashSet::new();
488
489 for event in events {
490 if let Some(value) = event.get(field) {
491 let mut hasher = std::collections::hash_map::DefaultHasher::new();
492 value.hash(&mut hasher);
493 seen_hashes.insert(hasher.finish());
494 }
495 }
496
497 Value::Int(seen_hashes.len() as i64)
498 }
499}
500
501#[derive(Debug)]
505pub struct Ema {
506 pub period: usize,
507}
508
509#[derive(Debug, Clone, Copy)]
511pub enum AggBinOp {
512 Add,
513 Sub,
514 Mul,
515 Div,
516}
517
518pub struct ExprAggregate {
520 pub left: Box<dyn AggregateFunc>,
521 pub left_field: Option<String>,
522 pub op: AggBinOp,
523 pub right: Box<dyn AggregateFunc>,
524 pub right_field: Option<String>,
525}
526
527impl std::fmt::Debug for ExprAggregate {
528 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
529 f.debug_struct("ExprAggregate")
530 .field("left", &self.left.name())
531 .field("left_field", &self.left_field)
532 .field("op", &self.op)
533 .field("right", &self.right.name())
534 .field("right_field", &self.right_field)
535 .finish()
536 }
537}
538
539impl ExprAggregate {
540 pub fn new(
541 left: Box<dyn AggregateFunc>,
542 left_field: Option<String>,
543 op: AggBinOp,
544 right: Box<dyn AggregateFunc>,
545 right_field: Option<String>,
546 ) -> Self {
547 Self {
548 left,
549 left_field,
550 op,
551 right,
552 right_field,
553 }
554 }
555}
556
557impl AggregateFunc for ExprAggregate {
558 fn name(&self) -> &'static str {
559 "expr"
560 }
561
562 fn apply(&self, events: &[Event], _field: Option<&str>) -> Value {
563 let left_val = self.left.apply(events, self.left_field.as_deref());
564 let right_val = self.right.apply(events, self.right_field.as_deref());
565
566 match (left_val, right_val) {
567 (Value::Float(l), Value::Float(r)) => {
568 let result = match self.op {
569 AggBinOp::Add => l + r,
570 AggBinOp::Sub => l - r,
571 AggBinOp::Mul => l * r,
572 AggBinOp::Div => {
573 if r != 0.0 {
574 l / r
575 } else {
576 f64::NAN
577 }
578 }
579 };
580 Value::Float(result)
581 }
582 (Value::Int(l), Value::Int(r)) => {
583 let result = match self.op {
584 AggBinOp::Add => l + r,
585 AggBinOp::Sub => l - r,
586 AggBinOp::Mul => l * r,
587 AggBinOp::Div => {
588 if r != 0 {
589 l / r
590 } else {
591 0
592 }
593 }
594 };
595 Value::Int(result)
596 }
597 (Value::Int(l), Value::Float(r)) => {
598 let l = l as f64;
599 let result = match self.op {
600 AggBinOp::Add => l + r,
601 AggBinOp::Sub => l - r,
602 AggBinOp::Mul => l * r,
603 AggBinOp::Div => {
604 if r != 0.0 {
605 l / r
606 } else {
607 f64::NAN
608 }
609 }
610 };
611 Value::Float(result)
612 }
613 (Value::Float(l), Value::Int(r)) => {
614 let r = r as f64;
615 let result = match self.op {
616 AggBinOp::Add => l + r,
617 AggBinOp::Sub => l - r,
618 AggBinOp::Mul => l * r,
619 AggBinOp::Div => {
620 if r != 0.0 {
621 l / r
622 } else {
623 f64::NAN
624 }
625 }
626 };
627 Value::Float(result)
628 }
629 _ => Value::Null,
630 }
631 }
632
633 fn apply_refs(&self, events: &[&Event], _field: Option<&str>) -> Value {
634 let left_val = self.left.apply_refs(events, self.left_field.as_deref());
635 let right_val = self.right.apply_refs(events, self.right_field.as_deref());
636
637 match (left_val, right_val) {
638 (Value::Float(l), Value::Float(r)) => {
639 let result = match self.op {
640 AggBinOp::Add => l + r,
641 AggBinOp::Sub => l - r,
642 AggBinOp::Mul => l * r,
643 AggBinOp::Div => {
644 if r != 0.0 {
645 l / r
646 } else {
647 f64::NAN
648 }
649 }
650 };
651 Value::Float(result)
652 }
653 (Value::Int(l), Value::Int(r)) => {
654 let result = match self.op {
655 AggBinOp::Add => l + r,
656 AggBinOp::Sub => l - r,
657 AggBinOp::Mul => l * r,
658 AggBinOp::Div => {
659 if r != 0 {
660 l / r
661 } else {
662 0
663 }
664 }
665 };
666 Value::Int(result)
667 }
668 (Value::Int(l), Value::Float(r)) => {
669 let l = l as f64;
670 let result = match self.op {
671 AggBinOp::Add => l + r,
672 AggBinOp::Sub => l - r,
673 AggBinOp::Mul => l * r,
674 AggBinOp::Div => {
675 if r != 0.0 {
676 l / r
677 } else {
678 f64::NAN
679 }
680 }
681 };
682 Value::Float(result)
683 }
684 (Value::Float(l), Value::Int(r)) => {
685 let r = r as f64;
686 let result = match self.op {
687 AggBinOp::Add => l + r,
688 AggBinOp::Sub => l - r,
689 AggBinOp::Mul => l * r,
690 AggBinOp::Div => {
691 if r != 0.0 {
692 l / r
693 } else {
694 f64::NAN
695 }
696 }
697 };
698 Value::Float(result)
699 }
700 _ => Value::Null,
701 }
702 }
703}
704
705impl Ema {
706 pub fn new(period: usize) -> Self {
707 Self {
708 period: period.max(1),
709 }
710 }
711}
712
713impl AggregateFunc for Ema {
714 fn name(&self) -> &'static str {
715 "ema"
716 }
717
718 fn apply(&self, events: &[Event], field: Option<&str>) -> Value {
719 let field = field.unwrap_or("value");
720 let k = 2.0 / (self.period as f64 + 1.0);
721
722 let mut ema: Option<f64> = None;
724 for event in events {
725 if let Some(value) = event.get_float(field) {
726 ema = Some(match ema {
727 Some(prev) => value.mul_add(k, prev * (1.0 - k)),
728 None => value,
729 });
730 }
731 }
732
733 ema.map_or(Value::Null, Value::Float)
734 }
735
736 fn apply_refs(&self, events: &[&Event], field: Option<&str>) -> Value {
737 let field = field.unwrap_or("value");
738 let k = 2.0 / (self.period as f64 + 1.0);
739
740 let mut ema: Option<f64> = None;
741 for event in events {
742 if let Some(value) = event.get_float(field) {
743 ema = Some(match ema {
744 Some(prev) => value.mul_add(k, prev * (1.0 - k)),
745 None => value,
746 });
747 }
748 }
749
750 ema.map_or(Value::Null, Value::Float)
751 }
752}
753
754#[derive(Debug)]
764pub struct Percentile {
765 pub quantile: f64,
766 label: String,
767}
768
769impl Percentile {
770 pub fn new(quantile: f64) -> Self {
771 let quantile = quantile.clamp(0.0, 1.0);
772 let label = format!("percentile({quantile})");
773 Self { quantile, label }
774 }
775
776 fn interpolate(sorted: &[f64], quantile: f64) -> f64 {
778 debug_assert!(!sorted.is_empty());
779 if sorted.len() == 1 {
780 return sorted[0];
781 }
782 let pos = quantile * (sorted.len() - 1) as f64;
783 let lower = pos.floor() as usize;
784 let upper = pos.ceil() as usize;
785 if lower == upper {
786 sorted[lower]
787 } else {
788 let frac = pos - lower as f64;
789 sorted[lower].mul_add(1.0 - frac, sorted[upper] * frac)
790 }
791 }
792}
793
794impl AggregateFunc for Percentile {
795 fn name(&self) -> &str {
796 &self.label
797 }
798
799 fn apply(&self, events: &[Event], field: Option<&str>) -> Value {
800 let field = field.unwrap_or("value");
801 let mut values: Vec<f64> = events
802 .iter()
803 .filter_map(|e| e.get_float(field))
804 .filter(|v| !v.is_nan())
805 .collect();
806 if values.is_empty() {
807 return Value::Null;
808 }
809 values.sort_by(|a, b| a.partial_cmp(b).unwrap());
810 Value::Float(Self::interpolate(&values, self.quantile))
811 }
812
813 fn apply_refs(&self, events: &[&Event], field: Option<&str>) -> Value {
814 let field = field.unwrap_or("value");
815 let mut values: Vec<f64> = events
816 .iter()
817 .filter_map(|e| e.get_float(field))
818 .filter(|v| !v.is_nan())
819 .collect();
820 if values.is_empty() {
821 return Value::Null;
822 }
823 values.sort_by(|a, b| a.partial_cmp(b).unwrap());
824 Value::Float(Self::interpolate(&values, self.quantile))
825 }
826}
827
828#[derive(Debug)]
830pub struct Median;
831
832impl AggregateFunc for Median {
833 fn name(&self) -> &'static str {
834 "median"
835 }
836
837 fn apply(&self, events: &[Event], field: Option<&str>) -> Value {
838 Percentile::new(0.5).apply(events, field)
839 }
840
841 fn apply_refs(&self, events: &[&Event], field: Option<&str>) -> Value {
842 Percentile::new(0.5).apply_refs(events, field)
843 }
844}
845
846#[derive(Debug)]
848pub struct P50;
849
850impl AggregateFunc for P50 {
851 fn name(&self) -> &'static str {
852 "p50"
853 }
854
855 fn apply(&self, events: &[Event], field: Option<&str>) -> Value {
856 Percentile::new(0.5).apply(events, field)
857 }
858
859 fn apply_refs(&self, events: &[&Event], field: Option<&str>) -> Value {
860 Percentile::new(0.5).apply_refs(events, field)
861 }
862}
863
864#[derive(Debug)]
866pub struct P95;
867
868impl AggregateFunc for P95 {
869 fn name(&self) -> &'static str {
870 "p95"
871 }
872
873 fn apply(&self, events: &[Event], field: Option<&str>) -> Value {
874 Percentile::new(0.95).apply(events, field)
875 }
876
877 fn apply_refs(&self, events: &[&Event], field: Option<&str>) -> Value {
878 Percentile::new(0.95).apply_refs(events, field)
879 }
880}
881
882#[derive(Debug)]
884pub struct P99;
885
886impl AggregateFunc for P99 {
887 fn name(&self) -> &'static str {
888 "p99"
889 }
890
891 fn apply(&self, events: &[Event], field: Option<&str>) -> Value {
892 Percentile::new(0.99).apply(events, field)
893 }
894
895 fn apply_refs(&self, events: &[&Event], field: Option<&str>) -> Value {
896 Percentile::new(0.99).apply_refs(events, field)
897 }
898}
899
900pub struct Aggregator {
902 aggregations: Vec<(String, Box<dyn AggregateFunc>, Option<String>)>,
903}
904
905impl std::fmt::Debug for Aggregator {
906 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
907 f.debug_struct("Aggregator")
908 .field(
909 "aggregations",
910 &self
911 .aggregations
912 .iter()
913 .map(|(alias, func, field)| (alias, func.name(), field))
914 .collect::<Vec<_>>(),
915 )
916 .finish()
917 }
918}
919
920impl Aggregator {
921 pub fn new() -> Self {
922 Self {
923 aggregations: Vec::new(),
924 }
925 }
926
927 pub fn add(
928 mut self,
929 alias: impl Into<String>,
930 func: Box<dyn AggregateFunc>,
931 field: Option<String>,
932 ) -> Self {
933 self.aggregations.push((alias.into(), func, field));
934 self
935 }
936
937 pub fn apply(&self, events: &[Event]) -> AggResult {
938 let mut result = IndexMap::new();
939 for (alias, func, field) in &self.aggregations {
940 let value = func.apply(events, field.as_deref());
941 result.insert(alias.clone(), value);
942 }
943 result
944 }
945
946 pub fn apply_shared(&self, events: &[SharedEvent]) -> AggResult {
948 let mut result = IndexMap::new();
949 for (alias, func, field) in &self.aggregations {
950 let value = func.apply_shared(events, field.as_deref());
951 result.insert(alias.clone(), value);
952 }
953 result
954 }
955
956 pub fn apply_columnar(&self, buffer: &mut ColumnarBuffer) -> AggResult {
975 let mut result = IndexMap::new();
976 for (alias, func, field) in &self.aggregations {
977 let value = func.apply_columnar(buffer, field.as_deref());
978 result.insert(alias.clone(), value);
979 }
980 result
981 }
982}
983
984impl Default for Aggregator {
985 fn default() -> Self {
986 Self::new()
987 }
988}
989
990#[cfg(test)]
991mod tests {
992 use super::*;
993
994 fn make_events() -> Vec<Event> {
995 vec![
996 Event::new("Test").with_field("value", 10.0),
997 Event::new("Test").with_field("value", 20.0),
998 Event::new("Test").with_field("value", 30.0),
999 ]
1000 }
1001
1002 #[test]
1003 fn test_count() {
1004 let events = make_events();
1005 let result = Count.apply(&events, None);
1006 assert_eq!(result, Value::Int(3));
1007 }
1008
1009 #[test]
1010 fn test_sum() {
1011 let events = make_events();
1012 let result = Sum.apply(&events, Some("value"));
1013 assert_eq!(result, Value::Float(60.0));
1014 }
1015
1016 #[test]
1017 fn test_avg() {
1018 let events = make_events();
1019 let result = Avg.apply(&events, Some("value"));
1020 assert_eq!(result, Value::Float(20.0));
1021 }
1022
1023 #[test]
1024 fn test_min_max() {
1025 let events = make_events();
1026 assert_eq!(Min.apply(&events, Some("value")), Value::Float(10.0));
1027 assert_eq!(Max.apply(&events, Some("value")), Value::Float(30.0));
1028 }
1029
1030 #[test]
1031 fn test_aggregator() {
1032 let events = make_events();
1033 let aggregator = Aggregator::new()
1034 .add("count", Box::new(Count), None)
1035 .add("sum", Box::new(Sum), Some("value".to_string()))
1036 .add("avg", Box::new(Avg), Some("value".to_string()));
1037
1038 let result = aggregator.apply(&events);
1039 assert_eq!(result.get("count"), Some(&Value::Int(3)));
1040 assert_eq!(result.get("sum"), Some(&Value::Float(60.0)));
1041 assert_eq!(result.get("avg"), Some(&Value::Float(20.0)));
1042 }
1043
1044 #[test]
1045 fn test_first_last() {
1046 let events = make_events();
1047 assert_eq!(First.apply(&events, Some("value")), Value::Float(10.0));
1048 assert_eq!(Last.apply(&events, Some("value")), Value::Float(30.0));
1049 }
1050
1051 #[test]
1052 fn test_stddev() {
1053 let events = make_events();
1054 let result = StdDev.apply(&events, Some("value"));
1055 if let Value::Float(v) = result {
1056 assert!((v - 10.0).abs() < 0.01); } else {
1058 panic!("Expected float");
1059 }
1060 }
1061
1062 #[test]
1063 fn test_ema() {
1064 let events = vec![
1065 Event::new("Test").with_field("value", 100.0),
1066 Event::new("Test").with_field("value", 110.0),
1067 Event::new("Test").with_field("value", 120.0),
1068 Event::new("Test").with_field("value", 130.0),
1069 Event::new("Test").with_field("value", 140.0),
1070 ];
1071 let ema = Ema::new(3);
1072 let result = ema.apply(&events, Some("value"));
1073 if let Value::Float(v) = result {
1074 assert!(v > 120.0 && v < 140.0);
1076 } else {
1077 panic!("Expected float");
1078 }
1079 }
1080
1081 #[test]
1086 fn test_count_empty() {
1087 let events: Vec<Event> = vec![];
1088 assert_eq!(Count.apply(&events, None), Value::Int(0));
1089 }
1090
1091 #[test]
1092 fn test_sum_empty() {
1093 let events: Vec<Event> = vec![];
1094 assert_eq!(Sum.apply(&events, Some("value")), Value::Float(0.0));
1095 }
1096
1097 #[test]
1098 fn test_avg_empty() {
1099 let events: Vec<Event> = vec![];
1100 assert_eq!(Avg.apply(&events, Some("value")), Value::Null);
1101 }
1102
1103 #[test]
1104 fn test_min_empty() {
1105 let events: Vec<Event> = vec![];
1106 assert_eq!(Min.apply(&events, Some("value")), Value::Null);
1107 }
1108
1109 #[test]
1110 fn test_max_empty() {
1111 let events: Vec<Event> = vec![];
1112 assert_eq!(Max.apply(&events, Some("value")), Value::Null);
1113 }
1114
1115 #[test]
1116 fn test_first_empty() {
1117 let events: Vec<Event> = vec![];
1118 assert_eq!(First.apply(&events, Some("value")), Value::Null);
1119 }
1120
1121 #[test]
1122 fn test_last_empty() {
1123 let events: Vec<Event> = vec![];
1124 assert_eq!(Last.apply(&events, Some("value")), Value::Null);
1125 }
1126
1127 #[test]
1128 fn test_stddev_single_value() {
1129 let events = vec![Event::new("Test").with_field("value", 42.0)];
1130 assert_eq!(StdDev.apply(&events, Some("value")), Value::Null);
1131 }
1132
1133 #[test]
1134 fn test_ema_empty() {
1135 let events: Vec<Event> = vec![];
1136 assert_eq!(Ema::new(3).apply(&events, Some("value")), Value::Null);
1137 }
1138
1139 #[test]
1140 fn test_ema_single_value() {
1141 let events = vec![Event::new("Test").with_field("value", 100.0)];
1142 assert_eq!(
1143 Ema::new(3).apply(&events, Some("value")),
1144 Value::Float(100.0)
1145 );
1146 }
1147
1148 #[test]
1149 fn test_ema_period_zero_becomes_one() {
1150 let ema = Ema::new(0);
1151 assert_eq!(ema.period, 1);
1152 }
1153
1154 #[test]
1155 fn test_missing_field() {
1156 let events = vec![Event::new("Test").with_field("other", 10.0)];
1157 assert_eq!(Sum.apply(&events, Some("value")), Value::Float(0.0));
1158 assert_eq!(Avg.apply(&events, Some("value")), Value::Null);
1159 }
1160
1161 #[test]
1162 fn test_default_field() {
1163 let events = vec![Event::new("Test").with_field("value", 25.0)];
1164 assert_eq!(Sum.apply(&events, None), Value::Float(25.0));
1166 assert_eq!(Avg.apply(&events, None), Value::Float(25.0));
1167 }
1168
1169 #[test]
1174 fn test_expr_aggregate_add() {
1175 let events = make_events(); let expr = ExprAggregate::new(
1177 Box::new(Sum),
1178 Some("value".to_string()),
1179 AggBinOp::Add,
1180 Box::new(Count),
1181 None,
1182 );
1183 assert_eq!(expr.apply(&events, None), Value::Float(63.0));
1185 }
1186
1187 #[test]
1188 fn test_expr_aggregate_sub() {
1189 let events = make_events();
1190 let expr = ExprAggregate::new(
1191 Box::new(Max),
1192 Some("value".to_string()),
1193 AggBinOp::Sub,
1194 Box::new(Min),
1195 Some("value".to_string()),
1196 );
1197 assert_eq!(expr.apply(&events, None), Value::Float(20.0));
1199 }
1200
1201 #[test]
1202 fn test_expr_aggregate_mul() {
1203 let events = vec![
1204 Event::new("Test").with_field("value", 5.0),
1205 Event::new("Test").with_field("value", 5.0),
1206 ];
1207 let expr = ExprAggregate::new(
1208 Box::new(Avg),
1209 Some("value".to_string()),
1210 AggBinOp::Mul,
1211 Box::new(Count),
1212 None,
1213 );
1214 assert_eq!(expr.apply(&events, None), Value::Float(10.0));
1216 }
1217
1218 #[test]
1219 fn test_expr_aggregate_div() {
1220 let events = make_events();
1221 let expr = ExprAggregate::new(
1222 Box::new(Sum),
1223 Some("value".to_string()),
1224 AggBinOp::Div,
1225 Box::new(Count),
1226 None,
1227 );
1228 assert_eq!(expr.apply(&events, None), Value::Float(20.0));
1230 }
1231
1232 #[test]
1233 fn test_expr_aggregate_div_by_zero_float() {
1234 let events: Vec<Event> = vec![];
1235 let expr = ExprAggregate::new(
1236 Box::new(Sum),
1237 Some("value".to_string()),
1238 AggBinOp::Div,
1239 Box::new(Sum),
1240 Some("value".to_string()),
1241 );
1242 if let Value::Float(v) = expr.apply(&events, None) {
1244 assert!(v.is_nan());
1245 } else {
1246 panic!("Expected float NaN");
1247 }
1248 }
1249
1250 #[test]
1251 fn test_expr_aggregate_int_operations() {
1252 let events = vec![
1253 Event::new("Test").with_field("count", 10i64),
1254 Event::new("Test").with_field("count", 20i64),
1255 ];
1256 let expr = ExprAggregate::new(Box::new(Count), None, AggBinOp::Mul, Box::new(Count), None);
1258 assert_eq!(expr.apply(&events, None), Value::Int(4));
1260 }
1261
1262 #[test]
1267 fn test_aggregator_empty() {
1268 let events = make_events();
1269 let aggregator = Aggregator::new();
1270 let result = aggregator.apply(&events);
1271 assert!(result.is_empty());
1272 }
1273
1274 #[test]
1275 fn test_aggregator_default() {
1276 let aggregator = Aggregator::default();
1277 assert!(aggregator.aggregations.is_empty());
1278 }
1279
1280 #[test]
1281 fn test_aggregator_chain() {
1282 let events = make_events();
1283 let aggregator = Aggregator::new()
1284 .add("min_val", Box::new(Min), Some("value".to_string()))
1285 .add("max_val", Box::new(Max), Some("value".to_string()))
1286 .add(
1287 "range",
1288 Box::new(ExprAggregate::new(
1289 Box::new(Max),
1290 Some("value".to_string()),
1291 AggBinOp::Sub,
1292 Box::new(Min),
1293 Some("value".to_string()),
1294 )),
1295 None,
1296 );
1297
1298 let result = aggregator.apply(&events);
1299 assert_eq!(result.get("min_val"), Some(&Value::Float(10.0)));
1300 assert_eq!(result.get("max_val"), Some(&Value::Float(30.0)));
1301 assert_eq!(result.get("range"), Some(&Value::Float(20.0)));
1302 }
1303
1304 #[test]
1309 fn test_aggregate_names() {
1310 assert_eq!(Count.name(), "count");
1311 assert_eq!(Sum.name(), "sum");
1312 assert_eq!(Avg.name(), "avg");
1313 assert_eq!(Min.name(), "min");
1314 assert_eq!(Max.name(), "max");
1315 assert_eq!(StdDev.name(), "stddev");
1316 assert_eq!(First.name(), "first");
1317 assert_eq!(Last.name(), "last");
1318 assert_eq!(Ema::new(5).name(), "ema");
1319 }
1320
1321 #[test]
1322 fn test_expr_aggregate_name() {
1323 let expr = ExprAggregate::new(Box::new(Sum), None, AggBinOp::Add, Box::new(Count), None);
1324 assert_eq!(expr.name(), "expr");
1325 }
1326
1327 #[test]
1332 fn test_min_with_nan_values_no_panic() {
1333 let events = vec![
1334 Event::new("Test").with_field("value", f64::NAN),
1335 Event::new("Test").with_field("value", 20.0),
1336 Event::new("Test").with_field("value", f64::NAN),
1337 Event::new("Test").with_field("value", 10.0),
1338 ];
1339 let result = Min.apply(&events, Some("value"));
1340 assert_eq!(result, Value::Float(10.0));
1342 }
1343
1344 #[test]
1345 fn test_max_with_nan_values_no_panic() {
1346 let events = vec![
1347 Event::new("Test").with_field("value", f64::NAN),
1348 Event::new("Test").with_field("value", 20.0),
1349 Event::new("Test").with_field("value", f64::NAN),
1350 Event::new("Test").with_field("value", 30.0),
1351 ];
1352 let result = Max.apply(&events, Some("value"));
1353 assert_eq!(result, Value::Float(30.0));
1355 }
1356
1357 #[test]
1358 fn test_min_all_nan_returns_null() {
1359 let events = vec![
1360 Event::new("Test").with_field("value", f64::NAN),
1361 Event::new("Test").with_field("value", f64::NAN),
1362 ];
1363 let result = Min.apply(&events, Some("value"));
1364 assert_eq!(result, Value::Null);
1366 }
1367
1368 #[test]
1369 fn test_max_all_nan_returns_null() {
1370 let events = vec![
1371 Event::new("Test").with_field("value", f64::NAN),
1372 Event::new("Test").with_field("value", f64::NAN),
1373 ];
1374 let result = Max.apply(&events, Some("value"));
1375 assert_eq!(result, Value::Null);
1377 }
1378
1379 #[test]
1380 fn test_count_distinct_basic() {
1381 let events = vec![
1382 Event::new("Test").with_field("category", "A"),
1383 Event::new("Test").with_field("category", "B"),
1384 Event::new("Test").with_field("category", "A"),
1385 Event::new("Test").with_field("category", "C"),
1386 Event::new("Test").with_field("category", "B"),
1387 ];
1388 let result = CountDistinct.apply(&events, Some("category"));
1389 assert_eq!(result, Value::Int(3));
1391 }
1392
1393 use std::sync::Arc;
1398
1399 use crate::columnar::ColumnarBuffer;
1400
1401 fn make_columnar_buffer() -> ColumnarBuffer {
1402 let events = vec![
1403 Arc::new(Event::new("Test").with_field("value", 10.0)),
1404 Arc::new(Event::new("Test").with_field("value", 20.0)),
1405 Arc::new(Event::new("Test").with_field("value", 30.0)),
1406 ];
1407 ColumnarBuffer::from_events(events)
1408 }
1409
1410 #[test]
1411 fn test_columnar_count() {
1412 let mut buffer = make_columnar_buffer();
1413 let result = Count.apply_columnar(&mut buffer, None);
1414 assert_eq!(result, Value::Int(3));
1415 }
1416
1417 #[test]
1418 fn test_columnar_sum() {
1419 let mut buffer = make_columnar_buffer();
1420 let result = Sum.apply_columnar(&mut buffer, Some("value"));
1421 assert_eq!(result, Value::Float(60.0));
1422 }
1423
1424 #[test]
1425 fn test_columnar_avg() {
1426 let mut buffer = make_columnar_buffer();
1427 let result = Avg.apply_columnar(&mut buffer, Some("value"));
1428 assert_eq!(result, Value::Float(20.0));
1429 }
1430
1431 #[test]
1432 fn test_columnar_min() {
1433 let mut buffer = make_columnar_buffer();
1434 let result = Min.apply_columnar(&mut buffer, Some("value"));
1435 assert_eq!(result, Value::Float(10.0));
1436 }
1437
1438 #[test]
1439 fn test_columnar_max() {
1440 let mut buffer = make_columnar_buffer();
1441 let result = Max.apply_columnar(&mut buffer, Some("value"));
1442 assert_eq!(result, Value::Float(30.0));
1443 }
1444
1445 #[test]
1446 fn test_columnar_with_nan() {
1447 let events = vec![
1448 Arc::new(Event::new("Test").with_field("value", 10.0)),
1449 Arc::new(Event::new("Test")), Arc::new(Event::new("Test").with_field("value", 30.0)),
1451 ];
1452 let mut buffer = ColumnarBuffer::from_events(events);
1453
1454 let sum_result = Sum.apply_columnar(&mut buffer, Some("value"));
1456 assert_eq!(sum_result, Value::Float(40.0));
1457
1458 let avg_result = Avg.apply_columnar(&mut buffer, Some("value"));
1460 assert_eq!(avg_result, Value::Float(20.0)); let min_result = Min.apply_columnar(&mut buffer, Some("value"));
1464 assert_eq!(min_result, Value::Float(10.0));
1465 let max_result = Max.apply_columnar(&mut buffer, Some("value"));
1466 assert_eq!(max_result, Value::Float(30.0));
1467 }
1468
1469 #[test]
1470 fn test_columnar_aggregator() {
1471 let mut buffer = make_columnar_buffer();
1472 let aggregator = Aggregator::new()
1473 .add("count", Box::new(Count), None)
1474 .add("sum", Box::new(Sum), Some("value".to_string()))
1475 .add("avg", Box::new(Avg), Some("value".to_string()))
1476 .add("min", Box::new(Min), Some("value".to_string()))
1477 .add("max", Box::new(Max), Some("value".to_string()));
1478
1479 let result = aggregator.apply_columnar(&mut buffer);
1480 assert_eq!(result.get("count"), Some(&Value::Int(3)));
1481 assert_eq!(result.get("sum"), Some(&Value::Float(60.0)));
1482 assert_eq!(result.get("avg"), Some(&Value::Float(20.0)));
1483 assert_eq!(result.get("min"), Some(&Value::Float(10.0)));
1484 assert_eq!(result.get("max"), Some(&Value::Float(30.0)));
1485 }
1486
1487 #[test]
1488 fn test_columnar_column_caching() {
1489 let mut buffer = make_columnar_buffer();
1490
1491 assert!(!buffer.has_column("value"));
1493 let _sum1 = Sum.apply_columnar(&mut buffer, Some("value"));
1494 assert!(buffer.has_column("value"));
1495
1496 let _sum2 = Avg.apply_columnar(&mut buffer, Some("value"));
1498 assert!(buffer.has_column("value"));
1499 }
1500
1501 #[test]
1502 fn test_columnar_empty_buffer() {
1503 let mut buffer = ColumnarBuffer::new();
1504
1505 assert_eq!(Count.apply_columnar(&mut buffer, None), Value::Int(0));
1506 assert_eq!(
1507 Sum.apply_columnar(&mut buffer, Some("value")),
1508 Value::Float(0.0)
1509 );
1510 assert_eq!(Avg.apply_columnar(&mut buffer, Some("value")), Value::Null);
1511 assert_eq!(Min.apply_columnar(&mut buffer, Some("value")), Value::Null);
1512 assert_eq!(Max.apply_columnar(&mut buffer, Some("value")), Value::Null);
1513 }
1514
1515 #[test]
1520 fn test_percentile_median_odd() {
1521 let events = vec![
1522 Event::new("Test").with_field("value", 10.0),
1523 Event::new("Test").with_field("value", 20.0),
1524 Event::new("Test").with_field("value", 30.0),
1525 ];
1526 let result = Median.apply(&events, Some("value"));
1527 assert_eq!(result, Value::Float(20.0));
1528 }
1529
1530 #[test]
1531 fn test_percentile_median_even() {
1532 let events = vec![
1533 Event::new("Test").with_field("value", 10.0),
1534 Event::new("Test").with_field("value", 20.0),
1535 Event::new("Test").with_field("value", 30.0),
1536 Event::new("Test").with_field("value", 40.0),
1537 ];
1538 let result = Median.apply(&events, Some("value"));
1539 assert_eq!(result, Value::Float(25.0)); }
1541
1542 #[test]
1543 fn test_percentile_p50_equals_median() {
1544 let events = make_events();
1545 let median_val = Median.apply(&events, Some("value"));
1546 let p50_val = P50.apply(&events, Some("value"));
1547 assert_eq!(median_val, p50_val);
1548 }
1549
1550 #[test]
1551 fn test_percentile_p95() {
1552 let events: Vec<Event> = (1..=20)
1554 .map(|i| Event::new("Test").with_field("value", i as f64))
1555 .collect();
1556 let result = P95.apply(&events, Some("value"));
1557 if let Value::Float(v) = result {
1558 assert!(v > 18.0 && v < 20.0, "p95 = {v}, expected ~19.05");
1560 } else {
1561 panic!("Expected float");
1562 }
1563 }
1564
1565 #[test]
1566 fn test_percentile_p99() {
1567 let events: Vec<Event> = (1..=100)
1569 .map(|i| Event::new("Test").with_field("value", i as f64))
1570 .collect();
1571 let result = P99.apply(&events, Some("value"));
1572 if let Value::Float(v) = result {
1573 assert!(v > 98.0 && v < 100.0, "p99 = {v}, expected ~99.01");
1575 } else {
1576 panic!("Expected float");
1577 }
1578 }
1579
1580 #[test]
1581 fn test_percentile_custom_quantile() {
1582 let events: Vec<Event> = (1..=100)
1583 .map(|i| Event::new("Test").with_field("value", i as f64))
1584 .collect();
1585 let result = Percentile::new(0.75).apply(&events, Some("value"));
1586 if let Value::Float(v) = result {
1587 assert!(v > 74.0 && v < 76.0, "p75 = {v}, expected ~75.25");
1588 } else {
1589 panic!("Expected float");
1590 }
1591 }
1592
1593 #[test]
1594 fn test_percentile_empty() {
1595 let events: Vec<Event> = vec![];
1596 assert_eq!(Median.apply(&events, Some("value")), Value::Null);
1597 assert_eq!(P50.apply(&events, Some("value")), Value::Null);
1598 assert_eq!(P95.apply(&events, Some("value")), Value::Null);
1599 assert_eq!(P99.apply(&events, Some("value")), Value::Null);
1600 assert_eq!(
1601 Percentile::new(0.5).apply(&events, Some("value")),
1602 Value::Null
1603 );
1604 }
1605
1606 #[test]
1607 fn test_percentile_single_value() {
1608 let events = vec![Event::new("Test").with_field("value", 42.0)];
1609 assert_eq!(Median.apply(&events, Some("value")), Value::Float(42.0));
1610 assert_eq!(P99.apply(&events, Some("value")), Value::Float(42.0));
1611 }
1612
1613 #[test]
1614 fn test_percentile_nan_values_filtered() {
1615 let events = vec![
1616 Event::new("Test").with_field("value", f64::NAN),
1617 Event::new("Test").with_field("value", 10.0),
1618 Event::new("Test").with_field("value", 20.0),
1619 Event::new("Test").with_field("value", f64::NAN),
1620 Event::new("Test").with_field("value", 30.0),
1621 ];
1622 let result = Median.apply(&events, Some("value"));
1623 assert_eq!(result, Value::Float(20.0));
1624 }
1625
1626 #[test]
1627 fn test_percentile_p0_and_p100() {
1628 let events = make_events(); assert_eq!(
1630 Percentile::new(0.0).apply(&events, Some("value")),
1631 Value::Float(10.0)
1632 );
1633 assert_eq!(
1634 Percentile::new(1.0).apply(&events, Some("value")),
1635 Value::Float(30.0)
1636 );
1637 }
1638
1639 #[test]
1640 fn test_percentile_clamps_quantile() {
1641 let p = Percentile::new(1.5);
1642 assert!((p.quantile - 1.0).abs() < f64::EPSILON);
1643 let p = Percentile::new(-0.5);
1644 assert!((p.quantile - 0.0).abs() < f64::EPSILON);
1645 }
1646
1647 #[test]
1648 fn test_percentile_apply_refs() {
1649 let events = make_events();
1650 let refs: Vec<&Event> = events.iter().collect();
1651 let result = P95.apply_refs(&refs, Some("value"));
1652 if let Value::Float(v) = result {
1653 assert!(v > 20.0 && v <= 30.0);
1654 } else {
1655 panic!("Expected float");
1656 }
1657 }
1658
1659 #[test]
1660 fn test_percentile_unsorted_input() {
1661 let events = vec![
1662 Event::new("Test").with_field("value", 30.0),
1663 Event::new("Test").with_field("value", 10.0),
1664 Event::new("Test").with_field("value", 50.0),
1665 Event::new("Test").with_field("value", 20.0),
1666 Event::new("Test").with_field("value", 40.0),
1667 ];
1668 let result = Median.apply(&events, Some("value"));
1669 assert_eq!(result, Value::Float(30.0));
1670 }
1671}