Skip to main content

archiver_core/retrieval/postprocessors/
statistics.rs

1use crate::storage::traits::{EventStream, PostProcessor};
2use crate::types::{ArchDbType, ArchiverSample, ArchiverValue, EventStreamDesc};
3
4/// Generate a `PostProcessor` impl backed by `StatStream` for one `StatOp`.
5macro_rules! stat_processor {
6    ($Type:ident, $name:literal, $op:expr) => {
7        pub struct $Type {
8            interval_secs: u64,
9        }
10
11        impl $Type {
12            pub fn new(interval_secs: u64) -> Self {
13                Self { interval_secs }
14            }
15        }
16
17        impl PostProcessor for $Type {
18            fn name(&self) -> &str {
19                $name
20            }
21            fn interval_secs(&self) -> u64 {
22                self.interval_secs
23            }
24            fn process(&self, input: Box<dyn EventStream>) -> Box<dyn EventStream> {
25                Box::new(StatStream::new(input, self.interval_secs, $op))
26            }
27        }
28    };
29}
30
31stat_processor!(MaxPostProcessor, "max", StatOp::Max);
32stat_processor!(MinPostProcessor, "min", StatOp::Min);
33stat_processor!(StdPostProcessor, "std", StatOp::Std);
34stat_processor!(MedianPostProcessor, "median", StatOp::Median);
35stat_processor!(VariancePostProcessor, "variance", StatOp::Variance);
36stat_processor!(RmsPostProcessor, "rms", StatOp::Rms);
37
38// ── Shared implementation ──
39
40enum StatOp {
41    Max,
42    Min,
43    Std,
44    Median,
45    Variance,
46    Rms,
47}
48
49struct StatStream {
50    input: Box<dyn EventStream>,
51    // Java parity (9a5f7a0): stat operators always emit ScalarDouble values;
52    // the descriptor must reflect this even if the input PV is an integer type.
53    desc: EventStreamDesc,
54    interval_secs: u64,
55    op: StatOp,
56    buffer: Vec<f64>,
57    /// Wall-clock-aligned bin (epoch_secs / interval_secs). Mirrors
58    /// Java's SummaryStatsPostProcessor so two PVs with the same
59    /// `interval_secs` emit on the same wall-clock instants.
60    current_bin: Option<u64>,
61    finished: bool,
62}
63
64impl StatStream {
65    fn new(input: Box<dyn EventStream>, interval_secs: u64, op: StatOp) -> Self {
66        let mut desc = input.description().clone();
67        desc.db_type = ArchDbType::ScalarDouble;
68        Self {
69            desc,
70            input,
71            interval_secs,
72            op,
73            buffer: Vec::new(),
74            current_bin: None,
75            finished: false,
76        }
77    }
78
79    fn compute(&self) -> f64 {
80        match self.op {
81            StatOp::Max => self
82                .buffer
83                .iter()
84                .cloned()
85                .fold(f64::NEG_INFINITY, f64::max),
86            StatOp::Min => self.buffer.iter().cloned().fold(f64::INFINITY, f64::min),
87            StatOp::Std => sample_std(&self.buffer),
88            StatOp::Variance => {
89                if self.buffer.len() < 2 {
90                    return 0.0;
91                }
92                let n = self.buffer.len() as f64;
93                let mean = self.buffer.iter().sum::<f64>() / n;
94                self.buffer.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / (n - 1.0)
95            }
96            StatOp::Median => {
97                if self.buffer.is_empty() {
98                    return 0.0;
99                }
100                let mut sorted: Vec<f64> = self.buffer.clone();
101                sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
102                let mid = sorted.len() / 2;
103                if sorted.len().is_multiple_of(2) {
104                    (sorted[mid - 1] + sorted[mid]) / 2.0
105                } else {
106                    sorted[mid]
107                }
108            }
109            StatOp::Rms => {
110                if self.buffer.is_empty() {
111                    return 0.0;
112                }
113                let n = self.buffer.len() as f64;
114                let sum_sq: f64 = self.buffer.iter().map(|v| v * v).sum();
115                (sum_sq / n).sqrt()
116            }
117        }
118    }
119}
120
121fn sample_std(values: &[f64]) -> f64 {
122    if values.len() < 2 {
123        return 0.0;
124    }
125    let n = values.len() as f64;
126    let mean = values.iter().sum::<f64>() / n;
127    let variance = values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / (n - 1.0);
128    variance.sqrt()
129}
130
131impl EventStream for StatStream {
132    fn description(&self) -> &EventStreamDesc {
133        &self.desc
134    }
135
136    fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
137        if self.finished {
138            return Ok(None);
139        }
140
141        loop {
142            match self.input.next_event()? {
143                Some(sample) => {
144                    let bin = crate::etl::decimation::bin_of(sample.timestamp, self.interval_secs);
145
146                    if let Some(prev_bin) = self.current_bin
147                        && bin != prev_bin
148                        && !self.buffer.is_empty()
149                    {
150                        let result_val = self.compute();
151                        let result = ArchiverSample::new(
152                            crate::etl::decimation::bin_start(prev_bin, self.interval_secs),
153                            ArchiverValue::ScalarDouble(result_val),
154                        );
155                        self.buffer.clear();
156                        self.current_bin = Some(bin);
157                        if let Some(v) = sample.value.as_f64() {
158                            self.buffer.push(v);
159                        }
160                        return Ok(Some(result));
161                    }
162
163                    self.current_bin = Some(bin);
164                    if let Some(v) = sample.value.as_f64() {
165                        self.buffer.push(v);
166                    }
167                }
168                None => {
169                    self.finished = true;
170                    if let Some(prev_bin) = self.current_bin
171                        && !self.buffer.is_empty()
172                    {
173                        let result_val = self.compute();
174                        let result = ArchiverSample::new(
175                            crate::etl::decimation::bin_start(prev_bin, self.interval_secs),
176                            ArchiverValue::ScalarDouble(result_val),
177                        );
178                        self.buffer.clear();
179                        return Ok(Some(result));
180                    }
181                    return Ok(None);
182                }
183            }
184        }
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191    use crate::types::ArchDbType;
192    use std::time::{Duration, SystemTime, UNIX_EPOCH};
193
194    /// Toy stream that yields a fixed list of (offset_seconds, value) pairs
195    /// starting from `start`.
196    struct VecStream {
197        desc: EventStreamDesc,
198        items: std::vec::IntoIter<(u64, f64)>,
199        start: SystemTime,
200    }
201
202    impl VecStream {
203        fn new(items: Vec<(u64, f64)>) -> Self {
204            Self {
205                desc: EventStreamDesc {
206                    pv_name: "TEST".to_string(),
207                    db_type: ArchDbType::ScalarDouble,
208                    year: 2024,
209                    element_count: Some(1),
210                    headers: Vec::new(),
211                },
212                items: items.into_iter(),
213                start: UNIX_EPOCH + Duration::from_secs(1_700_000_000),
214            }
215        }
216    }
217
218    impl EventStream for VecStream {
219        fn description(&self) -> &EventStreamDesc {
220            &self.desc
221        }
222        fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
223            Ok(self.items.next().map(|(offset, v)| {
224                ArchiverSample::new(
225                    self.start + Duration::from_secs(offset),
226                    ArchiverValue::ScalarDouble(v),
227                )
228            }))
229        }
230    }
231
232    fn drain(pp: Box<dyn PostProcessor>, items: Vec<(u64, f64)>) -> Vec<f64> {
233        let stream = pp.process(Box::new(VecStream::new(items)));
234        let mut out = Vec::new();
235        let mut s = stream;
236        while let Some(sample) = s.next_event().unwrap() {
237            if let ArchiverValue::ScalarDouble(v) = sample.value {
238                out.push(v);
239            }
240        }
241        out
242    }
243
244    #[test]
245    fn median_odd_and_even() {
246        // 10s bin, values 1..=5 in first bin → median 3.
247        let pp: Box<dyn PostProcessor> = Box::new(MedianPostProcessor::new(10));
248        let items = vec![(0, 1.0), (1, 2.0), (2, 3.0), (3, 4.0), (4, 5.0)];
249        assert_eq!(drain(pp, items), vec![3.0]);
250
251        // even count → average of two middles
252        let pp: Box<dyn PostProcessor> = Box::new(MedianPostProcessor::new(10));
253        let items = vec![(0, 1.0), (1, 2.0), (2, 3.0), (3, 4.0)];
254        assert_eq!(drain(pp, items), vec![2.5]);
255    }
256
257    #[test]
258    fn variance_sample_formula() {
259        // values 2,4,4,4,5,5,7,9 → mean 5, sample variance = 32/7 ≈ 4.571
260        let pp: Box<dyn PostProcessor> = Box::new(VariancePostProcessor::new(100));
261        let items: Vec<(u64, f64)> = (0u64..8)
262            .zip([2.0, 4.0, 4.0, 4.0, 5.0, 5.0, 7.0, 9.0])
263            .collect();
264        let out = drain(pp, items);
265        assert_eq!(out.len(), 1);
266        assert!((out[0] - (32.0 / 7.0)).abs() < 1e-9);
267    }
268
269    #[test]
270    fn rms_simple() {
271        // sqrt((3^2 + 4^2)/2) = sqrt(12.5) ≈ 3.5355
272        let pp: Box<dyn PostProcessor> = Box::new(RmsPostProcessor::new(100));
273        let out = drain(pp, vec![(0, 3.0), (1, 4.0)]);
274        assert_eq!(out.len(), 1);
275        assert!((out[0] - 12.5_f64.sqrt()).abs() < 1e-9);
276    }
277
278    /// Wall-clock bin alignment (F-3). Bin start timestamps must be
279    /// `(epoch_secs / interval_secs) * interval_secs` regardless of where
280    /// the first sample falls within a bin — Java's
281    /// `binNumber = epochSeconds / intervalSecs`. Without this fix the
282    /// first emitted timestamp would be the first sample's, and two PVs
283    /// aggregating the same data would emit on different instants.
284    #[test]
285    fn bins_aligned_to_wall_clock() {
286        // VecStream's start is 1_700_000_000. With a 60s interval the
287        // bin boundaries fall at ...80, ...40, ...00 (mod 60 == 20):
288        //   offset 17  → ts 1_700_000_017 → bin starts at 1_699_999_980
289        //   offset 45  → ts 1_700_000_045 → bin starts at 1_700_000_040
290        //   offset 75  → ts 1_700_000_075 → same bin as offset 45
291        let pp: Box<dyn PostProcessor> = Box::new(MaxPostProcessor::new(60));
292        let stream = pp.process(Box::new(VecStream::new(vec![
293            (17, 5.0),
294            (45, 10.0),
295            (75, 7.0),
296        ])));
297        let mut s = stream;
298        let first = s.next_event().unwrap().unwrap();
299        let first_ts = first
300            .timestamp
301            .duration_since(UNIX_EPOCH)
302            .unwrap()
303            .as_secs();
304        assert_eq!(
305            first_ts, 1_699_999_980,
306            "first bin must start at floor(ts/60)*60"
307        );
308        if let ArchiverValue::ScalarDouble(v) = first.value {
309            assert_eq!(v, 5.0);
310        } else {
311            panic!("wrong value type");
312        }
313        let second = s.next_event().unwrap().unwrap();
314        let second_ts = second
315            .timestamp
316            .duration_since(UNIX_EPOCH)
317            .unwrap()
318            .as_secs();
319        assert_eq!(second_ts, 1_700_000_040);
320        if let ArchiverValue::ScalarDouble(v) = second.value {
321            assert_eq!(v, 10.0); // max of {10.0, 7.0}
322        } else {
323            panic!("wrong value type");
324        }
325    }
326
327    #[test]
328    fn max_min_std_remain_correct() {
329        let pp: Box<dyn PostProcessor> = Box::new(MaxPostProcessor::new(100));
330        assert_eq!(drain(pp, vec![(0, 1.0), (1, 5.0), (2, 3.0)]), vec![5.0]);
331
332        let pp: Box<dyn PostProcessor> = Box::new(MinPostProcessor::new(100));
333        assert_eq!(drain(pp, vec![(0, 1.0), (1, 5.0), (2, 3.0)]), vec![1.0]);
334
335        let pp: Box<dyn PostProcessor> = Box::new(StdPostProcessor::new(100));
336        let out = drain(pp, vec![(0, 1.0), (1, 2.0), (2, 3.0)]);
337        // sample std of 1,2,3 = 1.0
338        assert_eq!(out.len(), 1);
339        assert!((out[0] - 1.0).abs() < 1e-9);
340    }
341}