archiver_core/retrieval/postprocessors/
statistics.rs1use crate::storage::traits::{EventStream, PostProcessor};
2use crate::types::{ArchDbType, ArchiverSample, ArchiverValue, EventStreamDesc};
3
4macro_rules! stat_processor {
6 ($Type:ident, $name:literal, $op:expr) => {
7 pub struct $Type {
8 interval_secs: u64,
9 }
10
11 impl $Type {
12 pub fn new(interval_secs: u64) -> Self {
13 Self { interval_secs }
14 }
15 }
16
17 impl PostProcessor for $Type {
18 fn name(&self) -> &str {
19 $name
20 }
21 fn interval_secs(&self) -> u64 {
22 self.interval_secs
23 }
24 fn process(&self, input: Box<dyn EventStream>) -> Box<dyn EventStream> {
25 Box::new(StatStream::new(input, self.interval_secs, $op))
26 }
27 }
28 };
29}
30
31stat_processor!(MaxPostProcessor, "max", StatOp::Max);
32stat_processor!(MinPostProcessor, "min", StatOp::Min);
33stat_processor!(StdPostProcessor, "std", StatOp::Std);
34stat_processor!(MedianPostProcessor, "median", StatOp::Median);
35stat_processor!(VariancePostProcessor, "variance", StatOp::Variance);
36stat_processor!(RmsPostProcessor, "rms", StatOp::Rms);
37
38enum StatOp {
41 Max,
42 Min,
43 Std,
44 Median,
45 Variance,
46 Rms,
47}
48
49struct StatStream {
50 input: Box<dyn EventStream>,
51 desc: EventStreamDesc,
54 interval_secs: u64,
55 op: StatOp,
56 buffer: Vec<f64>,
57 current_bin: Option<u64>,
61 finished: bool,
62}
63
64impl StatStream {
65 fn new(input: Box<dyn EventStream>, interval_secs: u64, op: StatOp) -> Self {
66 let mut desc = input.description().clone();
67 desc.db_type = ArchDbType::ScalarDouble;
68 Self {
69 desc,
70 input,
71 interval_secs,
72 op,
73 buffer: Vec::new(),
74 current_bin: None,
75 finished: false,
76 }
77 }
78
79 fn compute(&self) -> f64 {
80 match self.op {
81 StatOp::Max => self
82 .buffer
83 .iter()
84 .cloned()
85 .fold(f64::NEG_INFINITY, f64::max),
86 StatOp::Min => self.buffer.iter().cloned().fold(f64::INFINITY, f64::min),
87 StatOp::Std => sample_std(&self.buffer),
88 StatOp::Variance => {
89 if self.buffer.len() < 2 {
90 return 0.0;
91 }
92 let n = self.buffer.len() as f64;
93 let mean = self.buffer.iter().sum::<f64>() / n;
94 self.buffer.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / (n - 1.0)
95 }
96 StatOp::Median => {
97 if self.buffer.is_empty() {
98 return 0.0;
99 }
100 let mut sorted: Vec<f64> = self.buffer.clone();
101 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
102 let mid = sorted.len() / 2;
103 if sorted.len().is_multiple_of(2) {
104 (sorted[mid - 1] + sorted[mid]) / 2.0
105 } else {
106 sorted[mid]
107 }
108 }
109 StatOp::Rms => {
110 if self.buffer.is_empty() {
111 return 0.0;
112 }
113 let n = self.buffer.len() as f64;
114 let sum_sq: f64 = self.buffer.iter().map(|v| v * v).sum();
115 (sum_sq / n).sqrt()
116 }
117 }
118 }
119}
120
121fn sample_std(values: &[f64]) -> f64 {
122 if values.len() < 2 {
123 return 0.0;
124 }
125 let n = values.len() as f64;
126 let mean = values.iter().sum::<f64>() / n;
127 let variance = values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / (n - 1.0);
128 variance.sqrt()
129}
130
131impl EventStream for StatStream {
132 fn description(&self) -> &EventStreamDesc {
133 &self.desc
134 }
135
136 fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
137 if self.finished {
138 return Ok(None);
139 }
140
141 loop {
142 match self.input.next_event()? {
143 Some(sample) => {
144 let bin = crate::etl::decimation::bin_of(sample.timestamp, self.interval_secs);
145
146 if let Some(prev_bin) = self.current_bin
147 && bin != prev_bin
148 && !self.buffer.is_empty()
149 {
150 let result_val = self.compute();
151 let result = ArchiverSample::new(
152 crate::etl::decimation::bin_start(prev_bin, self.interval_secs),
153 ArchiverValue::ScalarDouble(result_val),
154 );
155 self.buffer.clear();
156 self.current_bin = Some(bin);
157 if let Some(v) = sample.value.as_f64() {
158 self.buffer.push(v);
159 }
160 return Ok(Some(result));
161 }
162
163 self.current_bin = Some(bin);
164 if let Some(v) = sample.value.as_f64() {
165 self.buffer.push(v);
166 }
167 }
168 None => {
169 self.finished = true;
170 if let Some(prev_bin) = self.current_bin
171 && !self.buffer.is_empty()
172 {
173 let result_val = self.compute();
174 let result = ArchiverSample::new(
175 crate::etl::decimation::bin_start(prev_bin, self.interval_secs),
176 ArchiverValue::ScalarDouble(result_val),
177 );
178 self.buffer.clear();
179 return Ok(Some(result));
180 }
181 return Ok(None);
182 }
183 }
184 }
185 }
186}
187
188#[cfg(test)]
189mod tests {
190 use super::*;
191 use crate::types::ArchDbType;
192 use std::time::{Duration, SystemTime, UNIX_EPOCH};
193
194 struct VecStream {
197 desc: EventStreamDesc,
198 items: std::vec::IntoIter<(u64, f64)>,
199 start: SystemTime,
200 }
201
202 impl VecStream {
203 fn new(items: Vec<(u64, f64)>) -> Self {
204 Self {
205 desc: EventStreamDesc {
206 pv_name: "TEST".to_string(),
207 db_type: ArchDbType::ScalarDouble,
208 year: 2024,
209 element_count: Some(1),
210 headers: Vec::new(),
211 },
212 items: items.into_iter(),
213 start: UNIX_EPOCH + Duration::from_secs(1_700_000_000),
214 }
215 }
216 }
217
218 impl EventStream for VecStream {
219 fn description(&self) -> &EventStreamDesc {
220 &self.desc
221 }
222 fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
223 Ok(self.items.next().map(|(offset, v)| {
224 ArchiverSample::new(
225 self.start + Duration::from_secs(offset),
226 ArchiverValue::ScalarDouble(v),
227 )
228 }))
229 }
230 }
231
232 fn drain(pp: Box<dyn PostProcessor>, items: Vec<(u64, f64)>) -> Vec<f64> {
233 let stream = pp.process(Box::new(VecStream::new(items)));
234 let mut out = Vec::new();
235 let mut s = stream;
236 while let Some(sample) = s.next_event().unwrap() {
237 if let ArchiverValue::ScalarDouble(v) = sample.value {
238 out.push(v);
239 }
240 }
241 out
242 }
243
244 #[test]
245 fn median_odd_and_even() {
246 let pp: Box<dyn PostProcessor> = Box::new(MedianPostProcessor::new(10));
248 let items = vec![(0, 1.0), (1, 2.0), (2, 3.0), (3, 4.0), (4, 5.0)];
249 assert_eq!(drain(pp, items), vec![3.0]);
250
251 let pp: Box<dyn PostProcessor> = Box::new(MedianPostProcessor::new(10));
253 let items = vec![(0, 1.0), (1, 2.0), (2, 3.0), (3, 4.0)];
254 assert_eq!(drain(pp, items), vec![2.5]);
255 }
256
257 #[test]
258 fn variance_sample_formula() {
259 let pp: Box<dyn PostProcessor> = Box::new(VariancePostProcessor::new(100));
261 let items: Vec<(u64, f64)> = (0u64..8)
262 .zip([2.0, 4.0, 4.0, 4.0, 5.0, 5.0, 7.0, 9.0])
263 .collect();
264 let out = drain(pp, items);
265 assert_eq!(out.len(), 1);
266 assert!((out[0] - (32.0 / 7.0)).abs() < 1e-9);
267 }
268
269 #[test]
270 fn rms_simple() {
271 let pp: Box<dyn PostProcessor> = Box::new(RmsPostProcessor::new(100));
273 let out = drain(pp, vec![(0, 3.0), (1, 4.0)]);
274 assert_eq!(out.len(), 1);
275 assert!((out[0] - 12.5_f64.sqrt()).abs() < 1e-9);
276 }
277
278 #[test]
285 fn bins_aligned_to_wall_clock() {
286 let pp: Box<dyn PostProcessor> = Box::new(MaxPostProcessor::new(60));
292 let stream = pp.process(Box::new(VecStream::new(vec![
293 (17, 5.0),
294 (45, 10.0),
295 (75, 7.0),
296 ])));
297 let mut s = stream;
298 let first = s.next_event().unwrap().unwrap();
299 let first_ts = first
300 .timestamp
301 .duration_since(UNIX_EPOCH)
302 .unwrap()
303 .as_secs();
304 assert_eq!(
305 first_ts, 1_699_999_980,
306 "first bin must start at floor(ts/60)*60"
307 );
308 if let ArchiverValue::ScalarDouble(v) = first.value {
309 assert_eq!(v, 5.0);
310 } else {
311 panic!("wrong value type");
312 }
313 let second = s.next_event().unwrap().unwrap();
314 let second_ts = second
315 .timestamp
316 .duration_since(UNIX_EPOCH)
317 .unwrap()
318 .as_secs();
319 assert_eq!(second_ts, 1_700_000_040);
320 if let ArchiverValue::ScalarDouble(v) = second.value {
321 assert_eq!(v, 10.0); } else {
323 panic!("wrong value type");
324 }
325 }
326
327 #[test]
328 fn max_min_std_remain_correct() {
329 let pp: Box<dyn PostProcessor> = Box::new(MaxPostProcessor::new(100));
330 assert_eq!(drain(pp, vec![(0, 1.0), (1, 5.0), (2, 3.0)]), vec![5.0]);
331
332 let pp: Box<dyn PostProcessor> = Box::new(MinPostProcessor::new(100));
333 assert_eq!(drain(pp, vec![(0, 1.0), (1, 5.0), (2, 3.0)]), vec![1.0]);
334
335 let pp: Box<dyn PostProcessor> = Box::new(StdPostProcessor::new(100));
336 let out = drain(pp, vec![(0, 1.0), (1, 2.0), (2, 3.0)]);
337 assert_eq!(out.len(), 1);
339 assert!((out[0] - 1.0).abs() < 1e-9);
340 }
341}