Skip to main content

archiver_core/retrieval/postprocessors/
last_sample.rs

1//! `lastSample_N` — emits the last sample of each N-second bin.
2//!
3//! Buffers one event per bin and yields it when the bin closes (next event
4//! crosses the boundary, or the input ends).
5
6use crate::storage::traits::{EventStream, PostProcessor};
7use crate::types::{ArchiverSample, EventStreamDesc};
8
9pub struct LastSamplePostProcessor {
10    interval_secs: u64,
11}
12
13impl LastSamplePostProcessor {
14    pub fn new(interval_secs: u64) -> Self {
15        Self { interval_secs }
16    }
17}
18
19impl PostProcessor for LastSamplePostProcessor {
20    fn name(&self) -> &str {
21        "lastSample"
22    }
23    fn interval_secs(&self) -> u64 {
24        self.interval_secs
25    }
26    fn process(&self, input: Box<dyn EventStream>) -> Box<dyn EventStream> {
27        Box::new(LastSampleStream {
28            input,
29            interval_secs: self.interval_secs,
30            current_bin: None,
31            pending: None,
32        })
33    }
34}
35
36struct LastSampleStream {
37    input: Box<dyn EventStream>,
38    interval_secs: u64,
39    /// Wall-clock-aligned bin (epoch_secs / interval_secs) of `pending`.
40    current_bin: Option<u64>,
41    /// Last sample seen in the current bin, awaiting bin close.
42    pending: Option<ArchiverSample>,
43}
44
45impl EventStream for LastSampleStream {
46    fn description(&self) -> &EventStreamDesc {
47        self.input.description()
48    }
49
50    fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
51        loop {
52            match self.input.next_event()? {
53                Some(sample) => {
54                    let bin = crate::etl::decimation::bin_of(sample.timestamp, self.interval_secs);
55                    if self.current_bin != Some(bin) {
56                        // Crossed the boundary: emit the pending sample and
57                        // start a fresh bin with the current one as the new
58                        // pending.
59                        let emit = self.pending.take();
60                        self.current_bin = Some(bin);
61                        self.pending = Some(sample);
62                        if let Some(out) = emit {
63                            return Ok(Some(out));
64                        }
65                        // No pending in the prior bin (first ever sample);
66                        // continue to wait for next.
67                        continue;
68                    }
69                    // Still in the current bin: replace pending.
70                    self.pending = Some(sample);
71                }
72                None => {
73                    // Drain the final bin.
74                    return Ok(self.pending.take());
75                }
76            }
77        }
78    }
79}
80
81#[cfg(test)]
82mod tests {
83    use super::*;
84    use crate::types::{ArchDbType, ArchiverValue};
85    use std::time::{Duration, SystemTime, UNIX_EPOCH};
86
87    struct VecStream {
88        desc: EventStreamDesc,
89        items: std::vec::IntoIter<(u64, f64)>,
90        start: SystemTime,
91    }
92
93    impl VecStream {
94        fn new(items: Vec<(u64, f64)>) -> Self {
95            Self {
96                desc: EventStreamDesc {
97                    pv_name: "TEST".to_string(),
98                    db_type: ArchDbType::ScalarDouble,
99                    year: 2024,
100                    element_count: Some(1),
101                    headers: Vec::new(),
102                },
103                items: items.into_iter(),
104                start: UNIX_EPOCH + Duration::from_secs(1_700_000_000),
105            }
106        }
107    }
108
109    impl EventStream for VecStream {
110        fn description(&self) -> &EventStreamDesc {
111            &self.desc
112        }
113        fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
114            Ok(self.items.next().map(|(offset, v)| {
115                ArchiverSample::new(
116                    self.start + Duration::from_secs(offset),
117                    ArchiverValue::ScalarDouble(v),
118                )
119            }))
120        }
121    }
122
123    #[test]
124    fn last_sample_per_bin() {
125        // 10s bins. Bin [0,10): values 1,2,3 → emit 3. Bin [10,20): values 4,5 → emit 5.
126        let pp = LastSamplePostProcessor::new(10);
127        let mut s = pp.process(Box::new(VecStream::new(vec![
128            (0, 1.0),
129            (3, 2.0),
130            (9, 3.0),
131            (10, 4.0),
132            (15, 5.0),
133        ])));
134        let v1 = match s.next_event().unwrap().unwrap().value {
135            ArchiverValue::ScalarDouble(v) => v,
136            _ => panic!("wrong type"),
137        };
138        let v2 = match s.next_event().unwrap().unwrap().value {
139            ArchiverValue::ScalarDouble(v) => v,
140            _ => panic!("wrong type"),
141        };
142        assert_eq!(v1, 3.0);
143        assert_eq!(v2, 5.0);
144        assert!(s.next_event().unwrap().is_none());
145    }
146}