Skip to main content

varpulis_runtime/
aggregation.rs

1//! Aggregation functions for stream processing.
2//!
3//! This module provides aggregation functions used in windowed stream processing.
4//! Aggregations compute summary statistics over collections of events.
5//!
6//! # Available Aggregations
7//!
8//! | Function | Description | SIMD Optimized |
9//! |----------|-------------|----------------|
10//! | [`Count`] | Count events | No |
11//! | [`Sum`] | Sum of field values | Yes (AVX2) |
12//! | [`Avg`] | Average of field values | Yes (AVX2) |
13//! | [`Min`] | Minimum value | Yes (AVX2) |
14//! | [`Max`] | Maximum value | Yes (AVX2) |
15//! | [`StdDev`] | Standard deviation (Welford's) | No |
16//! | [`First`] | First value in window | No |
17//! | [`Last`] | Last value in window | No |
18//! | [`CountDistinct`] | Count unique values | No |
19//! | [`Ema`] | Exponential moving average | No |
20//! | [`Percentile`] | Configurable percentile (0.0–1.0) | No |
21//! | [`Median`] | Median (p50) | No |
22//! | [`P50`] | 50th percentile | No |
23//! | [`P95`] | 95th percentile | No |
24//! | [`P99`] | 99th percentile | No |
25//!
26//! # Performance
27//!
28//! Aggregations use SIMD (AVX2) vectorization when available on x86_64:
29//! - `Sum`, `Avg`, `Min`, `Max` achieve ~4x speedup with AVX2
30//! - Fallback to 4-way loop unrolling on other architectures
31//!
32//! # Example
33//!
34//! ```rust,no_run
35//! use varpulis_runtime::aggregation::{AggregateFunc, Sum, Avg, Count};
36//! use varpulis_runtime::Event;
37//!
38//! let events = vec![
39//!     Event::new("Reading").with_field("value", 10.0),
40//!     Event::new("Reading").with_field("value", 20.0),
41//!     Event::new("Reading").with_field("value", 30.0),
42//! ];
43//!
44//! let sum = Sum.apply(&events, Some("value"));  // 60.0
45//! let avg = Avg.apply(&events, Some("value"));  // 20.0
46//! let count = Count.apply(&events, None);       // 3
47//! ```
48//!
49//! # Custom Aggregations
50//!
51//! Implement the [`AggregateFunc`] trait for custom aggregations:
52//!
53//! ```rust,no_run
54//! use varpulis_runtime::aggregation::AggregateFunc;
55//! use varpulis_runtime::Event;
56//! use varpulis_core::Value;
57//!
58//! struct Median;
59//!
60//! impl AggregateFunc for Median {
61//!     fn name(&self) -> &str { "median" }
62//!
63//!     fn apply(&self, events: &[Event], field: Option<&str>) -> Value {
64//!         let field = field.unwrap_or("value");
65//!         let mut values: Vec<f64> = events
66//!             .iter()
67//!             .filter_map(|e| e.get_float(field))
68//!             .collect();
69//!         values.sort_by(|a, b| a.partial_cmp(b).unwrap());
70//!
71//!         if values.is_empty() {
72//!             Value::Null
73//!         } else {
74//!             Value::Float(values[values.len() / 2])
75//!         }
76//!     }
77//! }
78//! ```
79
80use std::hash::{Hash, Hasher};
81
82use indexmap::IndexMap;
83use varpulis_core::Value;
84
85use crate::columnar::ColumnarBuffer;
86use crate::event::{Event, SharedEvent};
87
88/// Result type for aggregation outputs.
89///
90/// Maps aggregation names to their computed values.
91pub type AggResult = IndexMap<String, Value>;
92
93/// Trait for implementing aggregation functions.
94///
95/// Aggregation functions compute summary values over collections of events.
96/// They are used in windowed stream processing to produce aggregate results.
97///
98/// # Required Methods
99///
100/// - [`name`](Self::name): Returns the aggregation function name
101/// - [`apply`](Self::apply): Computes the aggregation over owned events
102///
103/// # Optional Methods
104///
105/// - [`apply_shared`](Self::apply_shared): Optimized for `Arc<Event>` (avoids cloning)
106/// - [`apply_refs`](Self::apply_refs): For reference slices (internal use)
107///
108/// # Thread Safety
109///
110/// Implementations must be `Send + Sync` to work with parallel processing.
111pub trait AggregateFunc: Send + Sync {
112    /// Returns the name of this aggregation function (e.g., "sum", "avg").
113    fn name(&self) -> &str;
114
115    /// Apply the aggregation to a slice of events.
116    ///
117    /// # Arguments
118    ///
119    /// - `events`: Slice of events to aggregate
120    /// - `field`: Optional field name to aggregate (defaults to "value")
121    ///
122    /// # Returns
123    ///
124    /// The aggregated value, or `Value::Null` if no valid values found.
125    fn apply(&self, events: &[Event], field: Option<&str>) -> Value;
126
127    /// Apply aggregation to shared events (avoids cloning in hot paths).
128    ///
129    /// Override this for better performance when events are wrapped in `Arc`.
130    fn apply_shared(&self, events: &[SharedEvent], field: Option<&str>) -> Value {
131        // Default implementation: create temporary references
132        // Aggregations only read, so we can iterate and dereference
133        // Individual implementations can override for better performance
134        let refs: Vec<&Event> = events.iter().map(|e| e.as_ref()).collect();
135        self.apply_refs(&refs, field)
136    }
137
138    /// Apply aggregation to event references (internal helper)
139    fn apply_refs(&self, events: &[&Event], field: Option<&str>) -> Value {
140        // Default: clone to Vec<Event> - suboptimal but correct
141        // Most aggregations just iterate and read fields
142        let owned: Vec<Event> = events.iter().map(|e| (*e).clone()).collect();
143        self.apply(&owned, field)
144    }
145
146    /// Apply aggregation to columnar buffer (SIMD-optimized for numeric aggregations).
147    ///
148    /// Default implementation falls back to shared events. Override this for
149    /// aggregations that can benefit from columnar data layout (sum, avg, min, max).
150    fn apply_columnar(&self, buffer: &mut ColumnarBuffer, field: Option<&str>) -> Value {
151        // Default: fall back to shared event access
152        self.apply_shared(buffer.events(), field)
153    }
154}
155
156/// Count aggregation
157#[derive(Debug)]
158pub struct Count;
159
160impl AggregateFunc for Count {
161    fn name(&self) -> &'static str {
162        "count"
163    }
164
165    fn apply(&self, events: &[Event], _field: Option<&str>) -> Value {
166        Value::Int(events.len() as i64)
167    }
168
169    fn apply_refs(&self, events: &[&Event], _field: Option<&str>) -> Value {
170        Value::Int(events.len() as i64)
171    }
172
173    fn apply_columnar(&self, buffer: &mut ColumnarBuffer, _field: Option<&str>) -> Value {
174        Value::Int(buffer.len() as i64)
175    }
176}
177
178/// Sum aggregation (SIMD-optimized)
179#[derive(Debug)]
180pub struct Sum;
181
182impl AggregateFunc for Sum {
183    fn name(&self) -> &'static str {
184        "sum"
185    }
186
187    fn apply(&self, events: &[Event], field: Option<&str>) -> Value {
188        let field = field.unwrap_or("value");
189        Value::Float(crate::simd::simd_sum(events, field))
190    }
191
192    fn apply_refs(&self, events: &[&Event], field: Option<&str>) -> Value {
193        let field = field.unwrap_or("value");
194        // Extract values to contiguous array for SIMD
195        let values: Vec<f64> = events
196            .iter()
197            .filter_map(|e| e.get_float(field))
198            .filter(|v| !v.is_nan())
199            .collect();
200        Value::Float(crate::simd::sum_f64(&values))
201    }
202
203    fn apply_columnar(&self, buffer: &mut ColumnarBuffer, field: Option<&str>) -> Value {
204        let field = field.unwrap_or("value");
205        let col = buffer.ensure_float_column(field);
206        // Filter NaN values for sum
207        let valid: Vec<f64> = col.iter().copied().filter(|v| !v.is_nan()).collect();
208        Value::Float(crate::simd::sum_f64(&valid))
209    }
210}
211
212/// Average aggregation (SIMD-optimized)
213#[derive(Debug)]
214pub struct Avg;
215
216impl AggregateFunc for Avg {
217    fn name(&self) -> &'static str {
218        "avg"
219    }
220
221    fn apply(&self, events: &[Event], field: Option<&str>) -> Value {
222        let field = field.unwrap_or("value");
223        match crate::simd::simd_avg(events, field) {
224            Some(avg) => Value::Float(avg),
225            None => Value::Null,
226        }
227    }
228
229    fn apply_refs(&self, events: &[&Event], field: Option<&str>) -> Value {
230        let field = field.unwrap_or("value");
231        // Extract values to contiguous array for SIMD
232        let values: Vec<f64> = events
233            .iter()
234            .filter_map(|e| e.get_float(field))
235            .filter(|v| !v.is_nan())
236            .collect();
237
238        if values.is_empty() {
239            Value::Null
240        } else {
241            Value::Float(crate::simd::sum_f64(&values) / values.len() as f64)
242        }
243    }
244
245    fn apply_columnar(&self, buffer: &mut ColumnarBuffer, field: Option<&str>) -> Value {
246        let field = field.unwrap_or("value");
247        let col = buffer.ensure_float_column(field);
248        let valid: Vec<f64> = col.iter().copied().filter(|v| !v.is_nan()).collect();
249        if valid.is_empty() {
250            Value::Null
251        } else {
252            Value::Float(crate::simd::sum_f64(&valid) / valid.len() as f64)
253        }
254    }
255}
256
257/// Min aggregation (SIMD-optimized)
258#[derive(Debug)]
259pub struct Min;
260
261impl AggregateFunc for Min {
262    fn name(&self) -> &'static str {
263        "min"
264    }
265
266    fn apply(&self, events: &[Event], field: Option<&str>) -> Value {
267        let field = field.unwrap_or("value");
268        match crate::simd::simd_min(events, field) {
269            Some(min) => Value::Float(min),
270            None => Value::Null,
271        }
272    }
273
274    fn apply_refs(&self, events: &[&Event], field: Option<&str>) -> Value {
275        let field = field.unwrap_or("value");
276        let values: Vec<f64> = events
277            .iter()
278            .filter_map(|e| e.get_float(field))
279            .filter(|v| !v.is_nan())
280            .collect();
281        match crate::simd::min_f64(&values) {
282            Some(min) => Value::Float(min),
283            None => Value::Null,
284        }
285    }
286
287    fn apply_columnar(&self, buffer: &mut ColumnarBuffer, field: Option<&str>) -> Value {
288        let field = field.unwrap_or("value");
289        let col = buffer.ensure_float_column(field);
290        let valid: Vec<f64> = col.iter().copied().filter(|v| !v.is_nan()).collect();
291        match crate::simd::min_f64(&valid) {
292            Some(min) => Value::Float(min),
293            None => Value::Null,
294        }
295    }
296}
297
298/// Max aggregation (SIMD-optimized)
299#[derive(Debug)]
300pub struct Max;
301
302impl AggregateFunc for Max {
303    fn name(&self) -> &'static str {
304        "max"
305    }
306
307    fn apply(&self, events: &[Event], field: Option<&str>) -> Value {
308        let field = field.unwrap_or("value");
309        match crate::simd::simd_max(events, field) {
310            Some(max) => Value::Float(max),
311            None => Value::Null,
312        }
313    }
314
315    fn apply_refs(&self, events: &[&Event], field: Option<&str>) -> Value {
316        let field = field.unwrap_or("value");
317        let values: Vec<f64> = events
318            .iter()
319            .filter_map(|e| e.get_float(field))
320            .filter(|v| !v.is_nan())
321            .collect();
322        match crate::simd::max_f64(&values) {
323            Some(max) => Value::Float(max),
324            None => Value::Null,
325        }
326    }
327
328    fn apply_columnar(&self, buffer: &mut ColumnarBuffer, field: Option<&str>) -> Value {
329        let field = field.unwrap_or("value");
330        let col = buffer.ensure_float_column(field);
331        let valid: Vec<f64> = col.iter().copied().filter(|v| !v.is_nan()).collect();
332        match crate::simd::max_f64(&valid) {
333            Some(max) => Value::Float(max),
334            None => Value::Null,
335        }
336    }
337}
338
339/// Standard deviation aggregation (Welford's online algorithm for single-pass)
340#[derive(Debug)]
341pub struct StdDev;
342
343impl AggregateFunc for StdDev {
344    fn name(&self) -> &'static str {
345        "stddev"
346    }
347
348    fn apply(&self, events: &[Event], field: Option<&str>) -> Value {
349        let field = field.unwrap_or("value");
350
351        // Welford's online algorithm for numerically stable variance
352        // Single pass: no intermediate Vec allocation
353        let mut count = 0usize;
354        let mut mean = 0.0;
355        let mut m2 = 0.0; // Sum of squared differences from mean
356
357        for event in events {
358            if let Some(x) = event.get_float(field) {
359                count += 1;
360                let delta = x - mean;
361                mean += delta / count as f64;
362                let delta2 = x - mean;
363                m2 += delta * delta2;
364            }
365        }
366
367        if count < 2 {
368            return Value::Null;
369        }
370
371        // Sample standard deviation (n-1 denominator)
372        let variance = m2 / (count - 1) as f64;
373        Value::Float(variance.sqrt())
374    }
375
376    fn apply_refs(&self, events: &[&Event], field: Option<&str>) -> Value {
377        let field = field.unwrap_or("value");
378
379        let mut count = 0usize;
380        let mut mean = 0.0;
381        let mut m2 = 0.0;
382
383        for event in events {
384            if let Some(x) = event.get_float(field) {
385                count += 1;
386                let delta = x - mean;
387                mean += delta / count as f64;
388                let delta2 = x - mean;
389                m2 += delta * delta2;
390            }
391        }
392
393        if count < 2 {
394            return Value::Null;
395        }
396
397        let variance = m2 / (count - 1) as f64;
398        Value::Float(variance.sqrt())
399    }
400}
401
402/// First value aggregation
403#[derive(Debug)]
404pub struct First;
405
406impl AggregateFunc for First {
407    fn name(&self) -> &'static str {
408        "first"
409    }
410
411    fn apply(&self, events: &[Event], field: Option<&str>) -> Value {
412        let field = field.unwrap_or("value");
413        events
414            .first()
415            .and_then(|e| e.get(field))
416            .cloned()
417            .unwrap_or(Value::Null)
418    }
419
420    fn apply_refs(&self, events: &[&Event], field: Option<&str>) -> Value {
421        let field = field.unwrap_or("value");
422        events
423            .first()
424            .and_then(|e| e.get(field))
425            .cloned()
426            .unwrap_or(Value::Null)
427    }
428}
429
430/// Last value aggregation
431#[derive(Debug)]
432pub struct Last;
433
434impl AggregateFunc for Last {
435    fn name(&self) -> &'static str {
436        "last"
437    }
438
439    fn apply(&self, events: &[Event], field: Option<&str>) -> Value {
440        let field = field.unwrap_or("value");
441        events
442            .last()
443            .and_then(|e| e.get(field))
444            .cloned()
445            .unwrap_or(Value::Null)
446    }
447
448    fn apply_refs(&self, events: &[&Event], field: Option<&str>) -> Value {
449        let field = field.unwrap_or("value");
450        events
451            .last()
452            .and_then(|e| e.get(field))
453            .cloned()
454            .unwrap_or(Value::Null)
455    }
456}
457
458/// Count distinct values aggregation
459/// Optimized to store only hashes instead of cloning full Values
460#[derive(Debug)]
461pub struct CountDistinct;
462
463impl AggregateFunc for CountDistinct {
464    fn name(&self) -> &'static str {
465        "count_distinct"
466    }
467
468    fn apply(&self, events: &[Event], field: Option<&str>) -> Value {
469        let field = field.unwrap_or("value");
470        let mut seen_hashes = std::collections::HashSet::new();
471
472        for event in events {
473            if let Some(value) = event.get(field) {
474                // Compute hash directly without cloning the value
475                // This avoids expensive deep clones for Array/Map values
476                let mut hasher = std::collections::hash_map::DefaultHasher::new();
477                value.hash(&mut hasher);
478                seen_hashes.insert(hasher.finish());
479            }
480        }
481
482        Value::Int(seen_hashes.len() as i64)
483    }
484
485    fn apply_refs(&self, events: &[&Event], field: Option<&str>) -> Value {
486        let field = field.unwrap_or("value");
487        let mut seen_hashes = std::collections::HashSet::new();
488
489        for event in events {
490            if let Some(value) = event.get(field) {
491                let mut hasher = std::collections::hash_map::DefaultHasher::new();
492                value.hash(&mut hasher);
493                seen_hashes.insert(hasher.finish());
494            }
495        }
496
497        Value::Int(seen_hashes.len() as i64)
498    }
499}
500
501/// Exponential Moving Average aggregation
502/// EMA = price * k + EMA(previous) * (1 - k)
503/// where k = 2 / (n + 1)
504#[derive(Debug)]
505pub struct Ema {
506    pub period: usize,
507}
508
509/// Binary operation for expression aggregates
510#[derive(Debug, Clone, Copy)]
511pub enum AggBinOp {
512    Add,
513    Sub,
514    Mul,
515    Div,
516}
517
518/// Expression-based aggregate that combines two aggregates with an operator
519pub struct ExprAggregate {
520    pub left: Box<dyn AggregateFunc>,
521    pub left_field: Option<String>,
522    pub op: AggBinOp,
523    pub right: Box<dyn AggregateFunc>,
524    pub right_field: Option<String>,
525}
526
527impl std::fmt::Debug for ExprAggregate {
528    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
529        f.debug_struct("ExprAggregate")
530            .field("left", &self.left.name())
531            .field("left_field", &self.left_field)
532            .field("op", &self.op)
533            .field("right", &self.right.name())
534            .field("right_field", &self.right_field)
535            .finish()
536    }
537}
538
539impl ExprAggregate {
540    pub fn new(
541        left: Box<dyn AggregateFunc>,
542        left_field: Option<String>,
543        op: AggBinOp,
544        right: Box<dyn AggregateFunc>,
545        right_field: Option<String>,
546    ) -> Self {
547        Self {
548            left,
549            left_field,
550            op,
551            right,
552            right_field,
553        }
554    }
555}
556
557impl AggregateFunc for ExprAggregate {
558    fn name(&self) -> &'static str {
559        "expr"
560    }
561
562    fn apply(&self, events: &[Event], _field: Option<&str>) -> Value {
563        let left_val = self.left.apply(events, self.left_field.as_deref());
564        let right_val = self.right.apply(events, self.right_field.as_deref());
565
566        match (left_val, right_val) {
567            (Value::Float(l), Value::Float(r)) => {
568                let result = match self.op {
569                    AggBinOp::Add => l + r,
570                    AggBinOp::Sub => l - r,
571                    AggBinOp::Mul => l * r,
572                    AggBinOp::Div => {
573                        if r != 0.0 {
574                            l / r
575                        } else {
576                            f64::NAN
577                        }
578                    }
579                };
580                Value::Float(result)
581            }
582            (Value::Int(l), Value::Int(r)) => {
583                let result = match self.op {
584                    AggBinOp::Add => l + r,
585                    AggBinOp::Sub => l - r,
586                    AggBinOp::Mul => l * r,
587                    AggBinOp::Div => {
588                        if r != 0 {
589                            l / r
590                        } else {
591                            0
592                        }
593                    }
594                };
595                Value::Int(result)
596            }
597            (Value::Int(l), Value::Float(r)) => {
598                let l = l as f64;
599                let result = match self.op {
600                    AggBinOp::Add => l + r,
601                    AggBinOp::Sub => l - r,
602                    AggBinOp::Mul => l * r,
603                    AggBinOp::Div => {
604                        if r != 0.0 {
605                            l / r
606                        } else {
607                            f64::NAN
608                        }
609                    }
610                };
611                Value::Float(result)
612            }
613            (Value::Float(l), Value::Int(r)) => {
614                let r = r as f64;
615                let result = match self.op {
616                    AggBinOp::Add => l + r,
617                    AggBinOp::Sub => l - r,
618                    AggBinOp::Mul => l * r,
619                    AggBinOp::Div => {
620                        if r != 0.0 {
621                            l / r
622                        } else {
623                            f64::NAN
624                        }
625                    }
626                };
627                Value::Float(result)
628            }
629            _ => Value::Null,
630        }
631    }
632
633    fn apply_refs(&self, events: &[&Event], _field: Option<&str>) -> Value {
634        let left_val = self.left.apply_refs(events, self.left_field.as_deref());
635        let right_val = self.right.apply_refs(events, self.right_field.as_deref());
636
637        match (left_val, right_val) {
638            (Value::Float(l), Value::Float(r)) => {
639                let result = match self.op {
640                    AggBinOp::Add => l + r,
641                    AggBinOp::Sub => l - r,
642                    AggBinOp::Mul => l * r,
643                    AggBinOp::Div => {
644                        if r != 0.0 {
645                            l / r
646                        } else {
647                            f64::NAN
648                        }
649                    }
650                };
651                Value::Float(result)
652            }
653            (Value::Int(l), Value::Int(r)) => {
654                let result = match self.op {
655                    AggBinOp::Add => l + r,
656                    AggBinOp::Sub => l - r,
657                    AggBinOp::Mul => l * r,
658                    AggBinOp::Div => {
659                        if r != 0 {
660                            l / r
661                        } else {
662                            0
663                        }
664                    }
665                };
666                Value::Int(result)
667            }
668            (Value::Int(l), Value::Float(r)) => {
669                let l = l as f64;
670                let result = match self.op {
671                    AggBinOp::Add => l + r,
672                    AggBinOp::Sub => l - r,
673                    AggBinOp::Mul => l * r,
674                    AggBinOp::Div => {
675                        if r != 0.0 {
676                            l / r
677                        } else {
678                            f64::NAN
679                        }
680                    }
681                };
682                Value::Float(result)
683            }
684            (Value::Float(l), Value::Int(r)) => {
685                let r = r as f64;
686                let result = match self.op {
687                    AggBinOp::Add => l + r,
688                    AggBinOp::Sub => l - r,
689                    AggBinOp::Mul => l * r,
690                    AggBinOp::Div => {
691                        if r != 0.0 {
692                            l / r
693                        } else {
694                            f64::NAN
695                        }
696                    }
697                };
698                Value::Float(result)
699            }
700            _ => Value::Null,
701        }
702    }
703}
704
705impl Ema {
706    pub fn new(period: usize) -> Self {
707        Self {
708            period: period.max(1),
709        }
710    }
711}
712
713impl AggregateFunc for Ema {
714    fn name(&self) -> &'static str {
715        "ema"
716    }
717
718    fn apply(&self, events: &[Event], field: Option<&str>) -> Value {
719        let field = field.unwrap_or("value");
720        let k = 2.0 / (self.period as f64 + 1.0);
721
722        // Single-pass EMA calculation
723        let mut ema: Option<f64> = None;
724        for event in events {
725            if let Some(value) = event.get_float(field) {
726                ema = Some(match ema {
727                    Some(prev) => value.mul_add(k, prev * (1.0 - k)),
728                    None => value,
729                });
730            }
731        }
732
733        ema.map_or(Value::Null, Value::Float)
734    }
735
736    fn apply_refs(&self, events: &[&Event], field: Option<&str>) -> Value {
737        let field = field.unwrap_or("value");
738        let k = 2.0 / (self.period as f64 + 1.0);
739
740        let mut ema: Option<f64> = None;
741        for event in events {
742            if let Some(value) = event.get_float(field) {
743                ema = Some(match ema {
744                    Some(prev) => value.mul_add(k, prev * (1.0 - k)),
745                    None => value,
746                });
747            }
748        }
749
750        ema.map_or(Value::Null, Value::Float)
751    }
752}
753
754/// Percentile aggregation (sort-based, exact)
755///
756/// Computes an exact percentile using sort + linear interpolation.
757/// Window sizes in CEP are bounded, so sort-based is correct and efficient.
758///
759/// # Examples
760/// - `Percentile::new(0.5)` = median
761/// - `Percentile::new(0.95)` = p95
762/// - `Percentile::new(0.99)` = p99
763#[derive(Debug)]
764pub struct Percentile {
765    pub quantile: f64,
766    label: String,
767}
768
769impl Percentile {
770    pub fn new(quantile: f64) -> Self {
771        let quantile = quantile.clamp(0.0, 1.0);
772        let label = format!("percentile({quantile})");
773        Self { quantile, label }
774    }
775
776    /// Compute the percentile from a sorted slice using linear interpolation.
777    fn interpolate(sorted: &[f64], quantile: f64) -> f64 {
778        debug_assert!(!sorted.is_empty());
779        if sorted.len() == 1 {
780            return sorted[0];
781        }
782        let pos = quantile * (sorted.len() - 1) as f64;
783        let lower = pos.floor() as usize;
784        let upper = pos.ceil() as usize;
785        if lower == upper {
786            sorted[lower]
787        } else {
788            let frac = pos - lower as f64;
789            sorted[lower].mul_add(1.0 - frac, sorted[upper] * frac)
790        }
791    }
792}
793
794impl AggregateFunc for Percentile {
795    fn name(&self) -> &str {
796        &self.label
797    }
798
799    fn apply(&self, events: &[Event], field: Option<&str>) -> Value {
800        let field = field.unwrap_or("value");
801        let mut values: Vec<f64> = events
802            .iter()
803            .filter_map(|e| e.get_float(field))
804            .filter(|v| !v.is_nan())
805            .collect();
806        if values.is_empty() {
807            return Value::Null;
808        }
809        values.sort_by(|a, b| a.partial_cmp(b).unwrap());
810        Value::Float(Self::interpolate(&values, self.quantile))
811    }
812
813    fn apply_refs(&self, events: &[&Event], field: Option<&str>) -> Value {
814        let field = field.unwrap_or("value");
815        let mut values: Vec<f64> = events
816            .iter()
817            .filter_map(|e| e.get_float(field))
818            .filter(|v| !v.is_nan())
819            .collect();
820        if values.is_empty() {
821            return Value::Null;
822        }
823        values.sort_by(|a, b| a.partial_cmp(b).unwrap());
824        Value::Float(Self::interpolate(&values, self.quantile))
825    }
826}
827
828/// Median aggregation (alias for Percentile(0.5))
829#[derive(Debug)]
830pub struct Median;
831
832impl AggregateFunc for Median {
833    fn name(&self) -> &'static str {
834        "median"
835    }
836
837    fn apply(&self, events: &[Event], field: Option<&str>) -> Value {
838        Percentile::new(0.5).apply(events, field)
839    }
840
841    fn apply_refs(&self, events: &[&Event], field: Option<&str>) -> Value {
842        Percentile::new(0.5).apply_refs(events, field)
843    }
844}
845
846/// P50 aggregation (alias for Percentile(0.5))
847#[derive(Debug)]
848pub struct P50;
849
850impl AggregateFunc for P50 {
851    fn name(&self) -> &'static str {
852        "p50"
853    }
854
855    fn apply(&self, events: &[Event], field: Option<&str>) -> Value {
856        Percentile::new(0.5).apply(events, field)
857    }
858
859    fn apply_refs(&self, events: &[&Event], field: Option<&str>) -> Value {
860        Percentile::new(0.5).apply_refs(events, field)
861    }
862}
863
864/// P95 aggregation (alias for Percentile(0.95))
865#[derive(Debug)]
866pub struct P95;
867
868impl AggregateFunc for P95 {
869    fn name(&self) -> &'static str {
870        "p95"
871    }
872
873    fn apply(&self, events: &[Event], field: Option<&str>) -> Value {
874        Percentile::new(0.95).apply(events, field)
875    }
876
877    fn apply_refs(&self, events: &[&Event], field: Option<&str>) -> Value {
878        Percentile::new(0.95).apply_refs(events, field)
879    }
880}
881
882/// P99 aggregation (alias for Percentile(0.99))
883#[derive(Debug)]
884pub struct P99;
885
886impl AggregateFunc for P99 {
887    fn name(&self) -> &'static str {
888        "p99"
889    }
890
891    fn apply(&self, events: &[Event], field: Option<&str>) -> Value {
892        Percentile::new(0.99).apply(events, field)
893    }
894
895    fn apply_refs(&self, events: &[&Event], field: Option<&str>) -> Value {
896        Percentile::new(0.99).apply_refs(events, field)
897    }
898}
899
900/// Aggregator that can apply multiple aggregations
901pub struct Aggregator {
902    aggregations: Vec<(String, Box<dyn AggregateFunc>, Option<String>)>,
903}
904
905impl std::fmt::Debug for Aggregator {
906    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
907        f.debug_struct("Aggregator")
908            .field(
909                "aggregations",
910                &self
911                    .aggregations
912                    .iter()
913                    .map(|(alias, func, field)| (alias, func.name(), field))
914                    .collect::<Vec<_>>(),
915            )
916            .finish()
917    }
918}
919
920impl Aggregator {
921    pub fn new() -> Self {
922        Self {
923            aggregations: Vec::new(),
924        }
925    }
926
927    pub fn add(
928        mut self,
929        alias: impl Into<String>,
930        func: Box<dyn AggregateFunc>,
931        field: Option<String>,
932    ) -> Self {
933        self.aggregations.push((alias.into(), func, field));
934        self
935    }
936
937    pub fn apply(&self, events: &[Event]) -> AggResult {
938        let mut result = IndexMap::new();
939        for (alias, func, field) in &self.aggregations {
940            let value = func.apply(events, field.as_deref());
941            result.insert(alias.clone(), value);
942        }
943        result
944    }
945
946    /// Apply aggregations to SharedEvent slice (avoids cloning in hot paths)
947    pub fn apply_shared(&self, events: &[SharedEvent]) -> AggResult {
948        let mut result = IndexMap::new();
949        for (alias, func, field) in &self.aggregations {
950            let value = func.apply_shared(events, field.as_deref());
951            result.insert(alias.clone(), value);
952        }
953        result
954    }
955
956    /// Apply aggregations to a columnar buffer (SIMD-optimized).
957    ///
958    /// This method uses lazy column extraction for SIMD operations on contiguous memory.
959    /// Each field is extracted once and cached for subsequent aggregations on the same field.
960    ///
961    /// # Example
962    ///
963    /// ```text
964    /// let aggregator = Aggregator::new()
965    ///     .add("total", Box::new(Sum), Some("price".to_string()))
966    ///     .add("avg_price", Box::new(Avg), Some("price".to_string()))
967    ///     .add("count", Box::new(Count), None);
968    ///
969    /// let mut window = TumblingWindow::new(Duration::seconds(60));
970    /// // ... add events ...
971    /// let mut buffer = window.flush_columnar();
972    /// let results = aggregator.apply_columnar(&mut buffer);
973    /// ```
974    pub fn apply_columnar(&self, buffer: &mut ColumnarBuffer) -> AggResult {
975        let mut result = IndexMap::new();
976        for (alias, func, field) in &self.aggregations {
977            let value = func.apply_columnar(buffer, field.as_deref());
978            result.insert(alias.clone(), value);
979        }
980        result
981    }
982}
983
984impl Default for Aggregator {
985    fn default() -> Self {
986        Self::new()
987    }
988}
989
990#[cfg(test)]
991mod tests {
992    use super::*;
993
994    fn make_events() -> Vec<Event> {
995        vec![
996            Event::new("Test").with_field("value", 10.0),
997            Event::new("Test").with_field("value", 20.0),
998            Event::new("Test").with_field("value", 30.0),
999        ]
1000    }
1001
1002    #[test]
1003    fn test_count() {
1004        let events = make_events();
1005        let result = Count.apply(&events, None);
1006        assert_eq!(result, Value::Int(3));
1007    }
1008
1009    #[test]
1010    fn test_sum() {
1011        let events = make_events();
1012        let result = Sum.apply(&events, Some("value"));
1013        assert_eq!(result, Value::Float(60.0));
1014    }
1015
1016    #[test]
1017    fn test_avg() {
1018        let events = make_events();
1019        let result = Avg.apply(&events, Some("value"));
1020        assert_eq!(result, Value::Float(20.0));
1021    }
1022
1023    #[test]
1024    fn test_min_max() {
1025        let events = make_events();
1026        assert_eq!(Min.apply(&events, Some("value")), Value::Float(10.0));
1027        assert_eq!(Max.apply(&events, Some("value")), Value::Float(30.0));
1028    }
1029
1030    #[test]
1031    fn test_aggregator() {
1032        let events = make_events();
1033        let aggregator = Aggregator::new()
1034            .add("count", Box::new(Count), None)
1035            .add("sum", Box::new(Sum), Some("value".to_string()))
1036            .add("avg", Box::new(Avg), Some("value".to_string()));
1037
1038        let result = aggregator.apply(&events);
1039        assert_eq!(result.get("count"), Some(&Value::Int(3)));
1040        assert_eq!(result.get("sum"), Some(&Value::Float(60.0)));
1041        assert_eq!(result.get("avg"), Some(&Value::Float(20.0)));
1042    }
1043
1044    #[test]
1045    fn test_first_last() {
1046        let events = make_events();
1047        assert_eq!(First.apply(&events, Some("value")), Value::Float(10.0));
1048        assert_eq!(Last.apply(&events, Some("value")), Value::Float(30.0));
1049    }
1050
1051    #[test]
1052    fn test_stddev() {
1053        let events = make_events();
1054        let result = StdDev.apply(&events, Some("value"));
1055        if let Value::Float(v) = result {
1056            assert!((v - 10.0).abs() < 0.01); // stddev of [10, 20, 30] = 10
1057        } else {
1058            panic!("Expected float");
1059        }
1060    }
1061
1062    #[test]
1063    fn test_ema() {
1064        let events = vec![
1065            Event::new("Test").with_field("value", 100.0),
1066            Event::new("Test").with_field("value", 110.0),
1067            Event::new("Test").with_field("value", 120.0),
1068            Event::new("Test").with_field("value", 130.0),
1069            Event::new("Test").with_field("value", 140.0),
1070        ];
1071        let ema = Ema::new(3);
1072        let result = ema.apply(&events, Some("value"));
1073        if let Value::Float(v) = result {
1074            // EMA(3) with k = 0.5: should be weighted towards recent values
1075            assert!(v > 120.0 && v < 140.0);
1076        } else {
1077            panic!("Expected float");
1078        }
1079    }
1080
1081    // ==========================================================================
1082    // Edge Case Tests
1083    // ==========================================================================
1084
1085    #[test]
1086    fn test_count_empty() {
1087        let events: Vec<Event> = vec![];
1088        assert_eq!(Count.apply(&events, None), Value::Int(0));
1089    }
1090
1091    #[test]
1092    fn test_sum_empty() {
1093        let events: Vec<Event> = vec![];
1094        assert_eq!(Sum.apply(&events, Some("value")), Value::Float(0.0));
1095    }
1096
1097    #[test]
1098    fn test_avg_empty() {
1099        let events: Vec<Event> = vec![];
1100        assert_eq!(Avg.apply(&events, Some("value")), Value::Null);
1101    }
1102
1103    #[test]
1104    fn test_min_empty() {
1105        let events: Vec<Event> = vec![];
1106        assert_eq!(Min.apply(&events, Some("value")), Value::Null);
1107    }
1108
1109    #[test]
1110    fn test_max_empty() {
1111        let events: Vec<Event> = vec![];
1112        assert_eq!(Max.apply(&events, Some("value")), Value::Null);
1113    }
1114
1115    #[test]
1116    fn test_first_empty() {
1117        let events: Vec<Event> = vec![];
1118        assert_eq!(First.apply(&events, Some("value")), Value::Null);
1119    }
1120
1121    #[test]
1122    fn test_last_empty() {
1123        let events: Vec<Event> = vec![];
1124        assert_eq!(Last.apply(&events, Some("value")), Value::Null);
1125    }
1126
1127    #[test]
1128    fn test_stddev_single_value() {
1129        let events = vec![Event::new("Test").with_field("value", 42.0)];
1130        assert_eq!(StdDev.apply(&events, Some("value")), Value::Null);
1131    }
1132
1133    #[test]
1134    fn test_ema_empty() {
1135        let events: Vec<Event> = vec![];
1136        assert_eq!(Ema::new(3).apply(&events, Some("value")), Value::Null);
1137    }
1138
1139    #[test]
1140    fn test_ema_single_value() {
1141        let events = vec![Event::new("Test").with_field("value", 100.0)];
1142        assert_eq!(
1143            Ema::new(3).apply(&events, Some("value")),
1144            Value::Float(100.0)
1145        );
1146    }
1147
1148    #[test]
1149    fn test_ema_period_zero_becomes_one() {
1150        let ema = Ema::new(0);
1151        assert_eq!(ema.period, 1);
1152    }
1153
1154    #[test]
1155    fn test_missing_field() {
1156        let events = vec![Event::new("Test").with_field("other", 10.0)];
1157        assert_eq!(Sum.apply(&events, Some("value")), Value::Float(0.0));
1158        assert_eq!(Avg.apply(&events, Some("value")), Value::Null);
1159    }
1160
1161    #[test]
1162    fn test_default_field() {
1163        let events = vec![Event::new("Test").with_field("value", 25.0)];
1164        // When field is None, default is "value"
1165        assert_eq!(Sum.apply(&events, None), Value::Float(25.0));
1166        assert_eq!(Avg.apply(&events, None), Value::Float(25.0));
1167    }
1168
1169    // ==========================================================================
1170    // ExprAggregate Tests
1171    // ==========================================================================
1172
1173    #[test]
1174    fn test_expr_aggregate_add() {
1175        let events = make_events(); // sum = 60
1176        let expr = ExprAggregate::new(
1177            Box::new(Sum),
1178            Some("value".to_string()),
1179            AggBinOp::Add,
1180            Box::new(Count),
1181            None,
1182        );
1183        // 60.0 + 3 = 63.0
1184        assert_eq!(expr.apply(&events, None), Value::Float(63.0));
1185    }
1186
1187    #[test]
1188    fn test_expr_aggregate_sub() {
1189        let events = make_events();
1190        let expr = ExprAggregate::new(
1191            Box::new(Max),
1192            Some("value".to_string()),
1193            AggBinOp::Sub,
1194            Box::new(Min),
1195            Some("value".to_string()),
1196        );
1197        // 30.0 - 10.0 = 20.0
1198        assert_eq!(expr.apply(&events, None), Value::Float(20.0));
1199    }
1200
1201    #[test]
1202    fn test_expr_aggregate_mul() {
1203        let events = vec![
1204            Event::new("Test").with_field("value", 5.0),
1205            Event::new("Test").with_field("value", 5.0),
1206        ];
1207        let expr = ExprAggregate::new(
1208            Box::new(Avg),
1209            Some("value".to_string()),
1210            AggBinOp::Mul,
1211            Box::new(Count),
1212            None,
1213        );
1214        // 5.0 * 2 = 10.0
1215        assert_eq!(expr.apply(&events, None), Value::Float(10.0));
1216    }
1217
1218    #[test]
1219    fn test_expr_aggregate_div() {
1220        let events = make_events();
1221        let expr = ExprAggregate::new(
1222            Box::new(Sum),
1223            Some("value".to_string()),
1224            AggBinOp::Div,
1225            Box::new(Count),
1226            None,
1227        );
1228        // 60.0 / 3 = 20.0
1229        assert_eq!(expr.apply(&events, None), Value::Float(20.0));
1230    }
1231
1232    #[test]
1233    fn test_expr_aggregate_div_by_zero_float() {
1234        let events: Vec<Event> = vec![];
1235        let expr = ExprAggregate::new(
1236            Box::new(Sum),
1237            Some("value".to_string()),
1238            AggBinOp::Div,
1239            Box::new(Sum),
1240            Some("value".to_string()),
1241        );
1242        // 0.0 / 0.0 = NaN
1243        if let Value::Float(v) = expr.apply(&events, None) {
1244            assert!(v.is_nan());
1245        } else {
1246            panic!("Expected float NaN");
1247        }
1248    }
1249
1250    #[test]
1251    fn test_expr_aggregate_int_operations() {
1252        let events = vec![
1253            Event::new("Test").with_field("count", 10i64),
1254            Event::new("Test").with_field("count", 20i64),
1255        ];
1256        // This tests that Count returns Int
1257        let expr = ExprAggregate::new(Box::new(Count), None, AggBinOp::Mul, Box::new(Count), None);
1258        // 2 * 2 = 4
1259        assert_eq!(expr.apply(&events, None), Value::Int(4));
1260    }
1261
1262    // ==========================================================================
1263    // Aggregator Builder Tests
1264    // ==========================================================================
1265
1266    #[test]
1267    fn test_aggregator_empty() {
1268        let events = make_events();
1269        let aggregator = Aggregator::new();
1270        let result = aggregator.apply(&events);
1271        assert!(result.is_empty());
1272    }
1273
1274    #[test]
1275    fn test_aggregator_default() {
1276        let aggregator = Aggregator::default();
1277        assert!(aggregator.aggregations.is_empty());
1278    }
1279
1280    #[test]
1281    fn test_aggregator_chain() {
1282        let events = make_events();
1283        let aggregator = Aggregator::new()
1284            .add("min_val", Box::new(Min), Some("value".to_string()))
1285            .add("max_val", Box::new(Max), Some("value".to_string()))
1286            .add(
1287                "range",
1288                Box::new(ExprAggregate::new(
1289                    Box::new(Max),
1290                    Some("value".to_string()),
1291                    AggBinOp::Sub,
1292                    Box::new(Min),
1293                    Some("value".to_string()),
1294                )),
1295                None,
1296            );
1297
1298        let result = aggregator.apply(&events);
1299        assert_eq!(result.get("min_val"), Some(&Value::Float(10.0)));
1300        assert_eq!(result.get("max_val"), Some(&Value::Float(30.0)));
1301        assert_eq!(result.get("range"), Some(&Value::Float(20.0)));
1302    }
1303
1304    // ==========================================================================
1305    // Name Tests
1306    // ==========================================================================
1307
1308    #[test]
1309    fn test_aggregate_names() {
1310        assert_eq!(Count.name(), "count");
1311        assert_eq!(Sum.name(), "sum");
1312        assert_eq!(Avg.name(), "avg");
1313        assert_eq!(Min.name(), "min");
1314        assert_eq!(Max.name(), "max");
1315        assert_eq!(StdDev.name(), "stddev");
1316        assert_eq!(First.name(), "first");
1317        assert_eq!(Last.name(), "last");
1318        assert_eq!(Ema::new(5).name(), "ema");
1319    }
1320
1321    #[test]
1322    fn test_expr_aggregate_name() {
1323        let expr = ExprAggregate::new(Box::new(Sum), None, AggBinOp::Add, Box::new(Count), None);
1324        assert_eq!(expr.name(), "expr");
1325    }
1326
1327    // ==========================================================================
1328    // NaN Handling Tests
1329    // ==========================================================================
1330
1331    #[test]
1332    fn test_min_with_nan_values_no_panic() {
1333        let events = vec![
1334            Event::new("Test").with_field("value", f64::NAN),
1335            Event::new("Test").with_field("value", 20.0),
1336            Event::new("Test").with_field("value", f64::NAN),
1337            Event::new("Test").with_field("value", 10.0),
1338        ];
1339        let result = Min.apply(&events, Some("value"));
1340        // Should return 10.0 (minimum of non-NaN values)
1341        assert_eq!(result, Value::Float(10.0));
1342    }
1343
1344    #[test]
1345    fn test_max_with_nan_values_no_panic() {
1346        let events = vec![
1347            Event::new("Test").with_field("value", f64::NAN),
1348            Event::new("Test").with_field("value", 20.0),
1349            Event::new("Test").with_field("value", f64::NAN),
1350            Event::new("Test").with_field("value", 30.0),
1351        ];
1352        let result = Max.apply(&events, Some("value"));
1353        // Should return 30.0 (maximum of non-NaN values)
1354        assert_eq!(result, Value::Float(30.0));
1355    }
1356
1357    #[test]
1358    fn test_min_all_nan_returns_null() {
1359        let events = vec![
1360            Event::new("Test").with_field("value", f64::NAN),
1361            Event::new("Test").with_field("value", f64::NAN),
1362        ];
1363        let result = Min.apply(&events, Some("value"));
1364        // All NaN should return Null
1365        assert_eq!(result, Value::Null);
1366    }
1367
1368    #[test]
1369    fn test_max_all_nan_returns_null() {
1370        let events = vec![
1371            Event::new("Test").with_field("value", f64::NAN),
1372            Event::new("Test").with_field("value", f64::NAN),
1373        ];
1374        let result = Max.apply(&events, Some("value"));
1375        // All NaN should return Null
1376        assert_eq!(result, Value::Null);
1377    }
1378
1379    #[test]
1380    fn test_count_distinct_basic() {
1381        let events = vec![
1382            Event::new("Test").with_field("category", "A"),
1383            Event::new("Test").with_field("category", "B"),
1384            Event::new("Test").with_field("category", "A"),
1385            Event::new("Test").with_field("category", "C"),
1386            Event::new("Test").with_field("category", "B"),
1387        ];
1388        let result = CountDistinct.apply(&events, Some("category"));
1389        // Should count 3 distinct values: A, B, C
1390        assert_eq!(result, Value::Int(3));
1391    }
1392
1393    // ==========================================================================
1394    // Columnar Aggregation Tests
1395    // ==========================================================================
1396
1397    use std::sync::Arc;
1398
1399    use crate::columnar::ColumnarBuffer;
1400
1401    fn make_columnar_buffer() -> ColumnarBuffer {
1402        let events = vec![
1403            Arc::new(Event::new("Test").with_field("value", 10.0)),
1404            Arc::new(Event::new("Test").with_field("value", 20.0)),
1405            Arc::new(Event::new("Test").with_field("value", 30.0)),
1406        ];
1407        ColumnarBuffer::from_events(events)
1408    }
1409
1410    #[test]
1411    fn test_columnar_count() {
1412        let mut buffer = make_columnar_buffer();
1413        let result = Count.apply_columnar(&mut buffer, None);
1414        assert_eq!(result, Value::Int(3));
1415    }
1416
1417    #[test]
1418    fn test_columnar_sum() {
1419        let mut buffer = make_columnar_buffer();
1420        let result = Sum.apply_columnar(&mut buffer, Some("value"));
1421        assert_eq!(result, Value::Float(60.0));
1422    }
1423
1424    #[test]
1425    fn test_columnar_avg() {
1426        let mut buffer = make_columnar_buffer();
1427        let result = Avg.apply_columnar(&mut buffer, Some("value"));
1428        assert_eq!(result, Value::Float(20.0));
1429    }
1430
1431    #[test]
1432    fn test_columnar_min() {
1433        let mut buffer = make_columnar_buffer();
1434        let result = Min.apply_columnar(&mut buffer, Some("value"));
1435        assert_eq!(result, Value::Float(10.0));
1436    }
1437
1438    #[test]
1439    fn test_columnar_max() {
1440        let mut buffer = make_columnar_buffer();
1441        let result = Max.apply_columnar(&mut buffer, Some("value"));
1442        assert_eq!(result, Value::Float(30.0));
1443    }
1444
1445    #[test]
1446    fn test_columnar_with_nan() {
1447        let events = vec![
1448            Arc::new(Event::new("Test").with_field("value", 10.0)),
1449            Arc::new(Event::new("Test")), // Missing value -> NaN
1450            Arc::new(Event::new("Test").with_field("value", 30.0)),
1451        ];
1452        let mut buffer = ColumnarBuffer::from_events(events);
1453
1454        // Sum should ignore NaN
1455        let sum_result = Sum.apply_columnar(&mut buffer, Some("value"));
1456        assert_eq!(sum_result, Value::Float(40.0));
1457
1458        // Avg should only count non-NaN values
1459        let avg_result = Avg.apply_columnar(&mut buffer, Some("value"));
1460        assert_eq!(avg_result, Value::Float(20.0)); // 40/2
1461
1462        // Min/Max should ignore NaN
1463        let min_result = Min.apply_columnar(&mut buffer, Some("value"));
1464        assert_eq!(min_result, Value::Float(10.0));
1465        let max_result = Max.apply_columnar(&mut buffer, Some("value"));
1466        assert_eq!(max_result, Value::Float(30.0));
1467    }
1468
1469    #[test]
1470    fn test_columnar_aggregator() {
1471        let mut buffer = make_columnar_buffer();
1472        let aggregator = Aggregator::new()
1473            .add("count", Box::new(Count), None)
1474            .add("sum", Box::new(Sum), Some("value".to_string()))
1475            .add("avg", Box::new(Avg), Some("value".to_string()))
1476            .add("min", Box::new(Min), Some("value".to_string()))
1477            .add("max", Box::new(Max), Some("value".to_string()));
1478
1479        let result = aggregator.apply_columnar(&mut buffer);
1480        assert_eq!(result.get("count"), Some(&Value::Int(3)));
1481        assert_eq!(result.get("sum"), Some(&Value::Float(60.0)));
1482        assert_eq!(result.get("avg"), Some(&Value::Float(20.0)));
1483        assert_eq!(result.get("min"), Some(&Value::Float(10.0)));
1484        assert_eq!(result.get("max"), Some(&Value::Float(30.0)));
1485    }
1486
1487    #[test]
1488    fn test_columnar_column_caching() {
1489        let mut buffer = make_columnar_buffer();
1490
1491        // First access extracts column
1492        assert!(!buffer.has_column("value"));
1493        let _sum1 = Sum.apply_columnar(&mut buffer, Some("value"));
1494        assert!(buffer.has_column("value"));
1495
1496        // Second access reuses cached column
1497        let _sum2 = Avg.apply_columnar(&mut buffer, Some("value"));
1498        assert!(buffer.has_column("value"));
1499    }
1500
1501    #[test]
1502    fn test_columnar_empty_buffer() {
1503        let mut buffer = ColumnarBuffer::new();
1504
1505        assert_eq!(Count.apply_columnar(&mut buffer, None), Value::Int(0));
1506        assert_eq!(
1507            Sum.apply_columnar(&mut buffer, Some("value")),
1508            Value::Float(0.0)
1509        );
1510        assert_eq!(Avg.apply_columnar(&mut buffer, Some("value")), Value::Null);
1511        assert_eq!(Min.apply_columnar(&mut buffer, Some("value")), Value::Null);
1512        assert_eq!(Max.apply_columnar(&mut buffer, Some("value")), Value::Null);
1513    }
1514
1515    // ==========================================================================
1516    // Percentile Aggregation Tests
1517    // ==========================================================================
1518
1519    #[test]
1520    fn test_percentile_median_odd() {
1521        let events = vec![
1522            Event::new("Test").with_field("value", 10.0),
1523            Event::new("Test").with_field("value", 20.0),
1524            Event::new("Test").with_field("value", 30.0),
1525        ];
1526        let result = Median.apply(&events, Some("value"));
1527        assert_eq!(result, Value::Float(20.0));
1528    }
1529
1530    #[test]
1531    fn test_percentile_median_even() {
1532        let events = vec![
1533            Event::new("Test").with_field("value", 10.0),
1534            Event::new("Test").with_field("value", 20.0),
1535            Event::new("Test").with_field("value", 30.0),
1536            Event::new("Test").with_field("value", 40.0),
1537        ];
1538        let result = Median.apply(&events, Some("value"));
1539        assert_eq!(result, Value::Float(25.0)); // interpolation between 20 and 30
1540    }
1541
1542    #[test]
1543    fn test_percentile_p50_equals_median() {
1544        let events = make_events();
1545        let median_val = Median.apply(&events, Some("value"));
1546        let p50_val = P50.apply(&events, Some("value"));
1547        assert_eq!(median_val, p50_val);
1548    }
1549
1550    #[test]
1551    fn test_percentile_p95() {
1552        // 20 events: 1.0, 2.0, ..., 20.0
1553        let events: Vec<Event> = (1..=20)
1554            .map(|i| Event::new("Test").with_field("value", i as f64))
1555            .collect();
1556        let result = P95.apply(&events, Some("value"));
1557        if let Value::Float(v) = result {
1558            // p95 of 1..20: position = 0.95 * 19 = 18.05, interpolate 19 and 20
1559            assert!(v > 18.0 && v < 20.0, "p95 = {v}, expected ~19.05");
1560        } else {
1561            panic!("Expected float");
1562        }
1563    }
1564
1565    #[test]
1566    fn test_percentile_p99() {
1567        // 100 events: 1.0, 2.0, ..., 100.0
1568        let events: Vec<Event> = (1..=100)
1569            .map(|i| Event::new("Test").with_field("value", i as f64))
1570            .collect();
1571        let result = P99.apply(&events, Some("value"));
1572        if let Value::Float(v) = result {
1573            // p99 of 1..100: position = 0.99 * 99 = 98.01
1574            assert!(v > 98.0 && v < 100.0, "p99 = {v}, expected ~99.01");
1575        } else {
1576            panic!("Expected float");
1577        }
1578    }
1579
1580    #[test]
1581    fn test_percentile_custom_quantile() {
1582        let events: Vec<Event> = (1..=100)
1583            .map(|i| Event::new("Test").with_field("value", i as f64))
1584            .collect();
1585        let result = Percentile::new(0.75).apply(&events, Some("value"));
1586        if let Value::Float(v) = result {
1587            assert!(v > 74.0 && v < 76.0, "p75 = {v}, expected ~75.25");
1588        } else {
1589            panic!("Expected float");
1590        }
1591    }
1592
1593    #[test]
1594    fn test_percentile_empty() {
1595        let events: Vec<Event> = vec![];
1596        assert_eq!(Median.apply(&events, Some("value")), Value::Null);
1597        assert_eq!(P50.apply(&events, Some("value")), Value::Null);
1598        assert_eq!(P95.apply(&events, Some("value")), Value::Null);
1599        assert_eq!(P99.apply(&events, Some("value")), Value::Null);
1600        assert_eq!(
1601            Percentile::new(0.5).apply(&events, Some("value")),
1602            Value::Null
1603        );
1604    }
1605
1606    #[test]
1607    fn test_percentile_single_value() {
1608        let events = vec![Event::new("Test").with_field("value", 42.0)];
1609        assert_eq!(Median.apply(&events, Some("value")), Value::Float(42.0));
1610        assert_eq!(P99.apply(&events, Some("value")), Value::Float(42.0));
1611    }
1612
1613    #[test]
1614    fn test_percentile_nan_values_filtered() {
1615        let events = vec![
1616            Event::new("Test").with_field("value", f64::NAN),
1617            Event::new("Test").with_field("value", 10.0),
1618            Event::new("Test").with_field("value", 20.0),
1619            Event::new("Test").with_field("value", f64::NAN),
1620            Event::new("Test").with_field("value", 30.0),
1621        ];
1622        let result = Median.apply(&events, Some("value"));
1623        assert_eq!(result, Value::Float(20.0));
1624    }
1625
1626    #[test]
1627    fn test_percentile_p0_and_p100() {
1628        let events = make_events(); // 10, 20, 30
1629        assert_eq!(
1630            Percentile::new(0.0).apply(&events, Some("value")),
1631            Value::Float(10.0)
1632        );
1633        assert_eq!(
1634            Percentile::new(1.0).apply(&events, Some("value")),
1635            Value::Float(30.0)
1636        );
1637    }
1638
1639    #[test]
1640    fn test_percentile_clamps_quantile() {
1641        let p = Percentile::new(1.5);
1642        assert!((p.quantile - 1.0).abs() < f64::EPSILON);
1643        let p = Percentile::new(-0.5);
1644        assert!((p.quantile - 0.0).abs() < f64::EPSILON);
1645    }
1646
1647    #[test]
1648    fn test_percentile_apply_refs() {
1649        let events = make_events();
1650        let refs: Vec<&Event> = events.iter().collect();
1651        let result = P95.apply_refs(&refs, Some("value"));
1652        if let Value::Float(v) = result {
1653            assert!(v > 20.0 && v <= 30.0);
1654        } else {
1655            panic!("Expected float");
1656        }
1657    }
1658
1659    #[test]
1660    fn test_percentile_unsorted_input() {
1661        let events = vec![
1662            Event::new("Test").with_field("value", 30.0),
1663            Event::new("Test").with_field("value", 10.0),
1664            Event::new("Test").with_field("value", 50.0),
1665            Event::new("Test").with_field("value", 20.0),
1666            Event::new("Test").with_field("value", 40.0),
1667        ];
1668        let result = Median.apply(&events, Some("value"));
1669        assert_eq!(result, Value::Float(30.0));
1670    }
1671}