Skip to main content

archiver_core/etl/
decimation.rs

1use std::time::{Duration, SystemTime, UNIX_EPOCH};
2
3use crate::storage::traits::{EventStream, PostProcessor};
4use crate::types::{ArchDbType, ArchiverSample, ArchiverValue, EventStreamDesc};
5
6/// Compute the bin number that a timestamp falls into for a given interval.
7/// Mirrors Java's `epochSeconds / intervalSecs`. Two PVs aggregating with
8/// the same `interval_secs` produce bins anchored on the same wall-clock
9/// instants regardless of when their first sample arrived.
10pub fn bin_of(ts: SystemTime, interval_secs: u64) -> u64 {
11    let secs = ts.duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
12    secs / interval_secs
13}
14
15/// Convert a bin number back to the wall-clock instant of its leading edge.
16pub fn bin_start(bin: u64, interval_secs: u64) -> SystemTime {
17    UNIX_EPOCH + Duration::from_secs(bin.saturating_mul(interval_secs))
18}
19
20/// Mean decimation post-processor: averages numeric values over fixed intervals.
21pub struct MeanDecimation {
22    interval_secs: u64,
23}
24
25impl MeanDecimation {
26    pub fn new(interval_secs: u64) -> Self {
27        Self { interval_secs }
28    }
29}
30
31impl PostProcessor for MeanDecimation {
32    fn name(&self) -> &str {
33        "mean"
34    }
35
36    fn interval_secs(&self) -> u64 {
37        self.interval_secs
38    }
39
40    fn process(&self, input: Box<dyn EventStream>) -> Box<dyn EventStream> {
41        let mut desc = input.description().clone();
42        // Java parity (9a5f7a0): mean always emits ScalarDouble; the
43        // descriptor must say so even when the input PV is an integer type.
44        desc.db_type = ArchDbType::ScalarDouble;
45        Box::new(MeanDecimationStream {
46            desc,
47            input,
48            interval_secs: self.interval_secs,
49            buffer: Vec::new(),
50            current_bin: None,
51            finished: false,
52        })
53    }
54}
55
56struct MeanDecimationStream {
57    input: Box<dyn EventStream>,
58    desc: EventStreamDesc,
59    interval_secs: u64,
60    buffer: Vec<f64>,
61    current_bin: Option<u64>,
62    finished: bool,
63}
64
65impl EventStream for MeanDecimationStream {
66    fn description(&self) -> &EventStreamDesc {
67        &self.desc
68    }
69
70    fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
71        if self.finished {
72            return Ok(None);
73        }
74
75        loop {
76            match self.input.next_event()? {
77                Some(sample) => {
78                    let bin = bin_of(sample.timestamp, self.interval_secs);
79
80                    // Bin transition: emit the prior bin's mean before
81                    // starting the new one.
82                    if let Some(prev_bin) = self.current_bin
83                        && bin != prev_bin
84                        && !self.buffer.is_empty()
85                    {
86                        let mean = self.buffer.iter().sum::<f64>() / self.buffer.len() as f64;
87                        let result = ArchiverSample::new(
88                            bin_start(prev_bin, self.interval_secs),
89                            ArchiverValue::ScalarDouble(mean),
90                        );
91                        self.buffer.clear();
92                        self.current_bin = Some(bin);
93                        if let Some(v) = sample.value.as_f64() {
94                            self.buffer.push(v);
95                        }
96                        return Ok(Some(result));
97                    }
98
99                    self.current_bin = Some(bin);
100                    if let Some(v) = sample.value.as_f64() {
101                        self.buffer.push(v);
102                    }
103                }
104                None => {
105                    self.finished = true;
106                    if let Some(prev_bin) = self.current_bin
107                        && !self.buffer.is_empty()
108                    {
109                        let mean = self.buffer.iter().sum::<f64>() / self.buffer.len() as f64;
110                        let result = ArchiverSample::new(
111                            bin_start(prev_bin, self.interval_secs),
112                            ArchiverValue::ScalarDouble(mean),
113                        );
114                        self.buffer.clear();
115                        return Ok(Some(result));
116                    }
117                    return Ok(None);
118                }
119            }
120        }
121    }
122}
123
124/// First-sample decimation: takes only the first sample in each interval.
125pub struct FirstSampleDecimation {
126    interval_secs: u64,
127}
128
129impl FirstSampleDecimation {
130    pub fn new(interval_secs: u64) -> Self {
131        Self { interval_secs }
132    }
133}
134
135impl PostProcessor for FirstSampleDecimation {
136    fn name(&self) -> &str {
137        "firstSample"
138    }
139
140    fn interval_secs(&self) -> u64 {
141        self.interval_secs
142    }
143
144    fn process(&self, input: Box<dyn EventStream>) -> Box<dyn EventStream> {
145        Box::new(FirstSampleStream {
146            input,
147            interval_secs: self.interval_secs,
148            current_bin: None,
149        })
150    }
151}
152
153struct FirstSampleStream {
154    input: Box<dyn EventStream>,
155    interval_secs: u64,
156    /// The bin we've already emitted a sample for, so subsequent samples
157    /// in the same bin are skipped. `None` means "haven't emitted yet".
158    current_bin: Option<u64>,
159}
160
161impl EventStream for FirstSampleStream {
162    fn description(&self) -> &EventStreamDesc {
163        self.input.description()
164    }
165
166    fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
167        loop {
168            match self.input.next_event()? {
169                Some(sample) => {
170                    let bin = bin_of(sample.timestamp, self.interval_secs);
171                    if self.current_bin != Some(bin) {
172                        self.current_bin = Some(bin);
173                        return Ok(Some(sample));
174                    }
175                }
176                None => return Ok(None),
177            }
178        }
179    }
180}