Skip to main content

archiver_core/retrieval/postprocessors/
counts.rs

1//! `count_N` and `ncount_N` post-processors.
2//!
3//! - `count`: number of samples in each N-second bin.
4//! - `ncount`: total number of samples across the whole query (single emitted
5//!   sample at the start of the input range, mirroring the Java
6//!   `NCount.java` semantics — useful as a quick sanity check from
7//!   archiver clients).
8
9use std::time::SystemTime;
10
11use crate::storage::traits::{EventStream, PostProcessor};
12use crate::types::{ArchiverSample, ArchiverValue, EventStreamDesc};
13
14pub struct CountPostProcessor {
15    interval_secs: u64,
16}
17
18impl CountPostProcessor {
19    pub fn new(interval_secs: u64) -> Self {
20        Self { interval_secs }
21    }
22}
23
24impl PostProcessor for CountPostProcessor {
25    fn name(&self) -> &str {
26        "count"
27    }
28    fn interval_secs(&self) -> u64 {
29        self.interval_secs
30    }
31    fn process(&self, input: Box<dyn EventStream>) -> Box<dyn EventStream> {
32        Box::new(CountStream {
33            input,
34            interval_secs: self.interval_secs,
35            count: 0,
36            current_bin: None,
37            finished: false,
38        })
39    }
40}
41
42struct CountStream {
43    input: Box<dyn EventStream>,
44    interval_secs: u64,
45    count: u64,
46    /// Wall-clock-aligned bin (epoch_secs / interval_secs) for the
47    /// in-progress count.
48    current_bin: Option<u64>,
49    finished: bool,
50}
51
52impl EventStream for CountStream {
53    fn description(&self) -> &EventStreamDesc {
54        self.input.description()
55    }
56
57    fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
58        if self.finished {
59            return Ok(None);
60        }
61        loop {
62            match self.input.next_event()? {
63                Some(sample) => {
64                    let bin = crate::etl::decimation::bin_of(sample.timestamp, self.interval_secs);
65                    if let Some(prev_bin) = self.current_bin
66                        && bin != prev_bin
67                        && self.count > 0
68                    {
69                        let result = ArchiverSample::new(
70                            crate::etl::decimation::bin_start(prev_bin, self.interval_secs),
71                            ArchiverValue::ScalarDouble(self.count as f64),
72                        );
73                        self.count = 1;
74                        self.current_bin = Some(bin);
75                        return Ok(Some(result));
76                    }
77                    self.current_bin = Some(bin);
78                    self.count += 1;
79                }
80                None => {
81                    self.finished = true;
82                    if let Some(prev_bin) = self.current_bin
83                        && self.count > 0
84                    {
85                        let result = ArchiverSample::new(
86                            crate::etl::decimation::bin_start(prev_bin, self.interval_secs),
87                            ArchiverValue::ScalarDouble(self.count as f64),
88                        );
89                        self.count = 0;
90                        return Ok(Some(result));
91                    }
92                    return Ok(None);
93                }
94            }
95        }
96    }
97}
98
99/// `ncount_N` — emits a single sample whose value is the total number of
100/// samples in the input. The interval parameter is ignored, mirroring the
101/// Java `NCount` behaviour.
102pub struct NCountPostProcessor;
103
104impl NCountPostProcessor {
105    pub fn new(_interval_secs: u64) -> Self {
106        Self
107    }
108}
109
110impl PostProcessor for NCountPostProcessor {
111    fn name(&self) -> &str {
112        "ncount"
113    }
114    fn interval_secs(&self) -> u64 {
115        0
116    }
117    fn process(&self, input: Box<dyn EventStream>) -> Box<dyn EventStream> {
118        Box::new(NCountStream {
119            input,
120            count: 0,
121            first_ts: None,
122            done: false,
123        })
124    }
125}
126
127struct NCountStream {
128    input: Box<dyn EventStream>,
129    count: u64,
130    first_ts: Option<SystemTime>,
131    done: bool,
132}
133
134impl EventStream for NCountStream {
135    fn description(&self) -> &EventStreamDesc {
136        self.input.description()
137    }
138
139    fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
140        if self.done {
141            return Ok(None);
142        }
143        while let Some(sample) = self.input.next_event()? {
144            if self.first_ts.is_none() {
145                self.first_ts = Some(sample.timestamp);
146            }
147            self.count += 1;
148        }
149        self.done = true;
150        match self.first_ts {
151            Some(ts) => Ok(Some(ArchiverSample::new(
152                ts,
153                ArchiverValue::ScalarDouble(self.count as f64),
154            ))),
155            None => Ok(None),
156        }
157    }
158}
159
160#[cfg(test)]
161mod tests {
162    use super::*;
163    use crate::types::ArchDbType;
164    use std::time::{Duration, UNIX_EPOCH};
165
166    struct VecStream {
167        desc: EventStreamDesc,
168        items: std::vec::IntoIter<(u64, f64)>,
169        start: SystemTime,
170    }
171
172    impl VecStream {
173        fn new(items: Vec<(u64, f64)>) -> Self {
174            Self {
175                desc: EventStreamDesc {
176                    pv_name: "TEST".to_string(),
177                    db_type: ArchDbType::ScalarDouble,
178                    year: 2024,
179                    element_count: Some(1),
180                    headers: Vec::new(),
181                },
182                items: items.into_iter(),
183                start: UNIX_EPOCH + Duration::from_secs(1_700_000_000),
184            }
185        }
186    }
187
188    impl EventStream for VecStream {
189        fn description(&self) -> &EventStreamDesc {
190            &self.desc
191        }
192        fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
193            Ok(self.items.next().map(|(offset, v)| {
194                ArchiverSample::new(
195                    self.start + Duration::from_secs(offset),
196                    ArchiverValue::ScalarDouble(v),
197                )
198            }))
199        }
200    }
201
202    fn drain(pp: Box<dyn PostProcessor>, items: Vec<(u64, f64)>) -> Vec<f64> {
203        let stream = pp.process(Box::new(VecStream::new(items)));
204        let mut out = Vec::new();
205        let mut s = stream;
206        while let Some(sample) = s.next_event().unwrap() {
207            if let ArchiverValue::ScalarDouble(v) = sample.value {
208                out.push(v);
209            }
210        }
211        out
212    }
213
214    #[test]
215    fn count_per_bin() {
216        let pp: Box<dyn PostProcessor> = Box::new(CountPostProcessor::new(10));
217        // 4 samples in [0..10), 2 in [10..20).
218        let items = vec![(0, 1.0), (3, 1.0), (6, 1.0), (9, 1.0), (10, 1.0), (12, 1.0)];
219        assert_eq!(drain(pp, items), vec![4.0, 2.0]);
220    }
221
222    #[test]
223    fn ncount_total() {
224        let pp: Box<dyn PostProcessor> = Box::new(NCountPostProcessor::new(0));
225        let items = vec![(0, 1.0), (1, 2.0), (2, 3.0), (10, 4.0), (12, 5.0)];
226        assert_eq!(drain(pp, items), vec![5.0]);
227    }
228
229    #[test]
230    fn ncount_empty() {
231        let pp: Box<dyn PostProcessor> = Box::new(NCountPostProcessor::new(0));
232        assert!(drain(pp, vec![]).is_empty());
233    }
234}