archiver_core/retrieval/postprocessors/
counts.rs1use std::time::SystemTime;
10
11use crate::storage::traits::{EventStream, PostProcessor};
12use crate::types::{ArchiverSample, ArchiverValue, EventStreamDesc};
13
14pub struct CountPostProcessor {
15 interval_secs: u64,
16}
17
18impl CountPostProcessor {
19 pub fn new(interval_secs: u64) -> Self {
20 Self { interval_secs }
21 }
22}
23
24impl PostProcessor for CountPostProcessor {
25 fn name(&self) -> &str {
26 "count"
27 }
28 fn interval_secs(&self) -> u64 {
29 self.interval_secs
30 }
31 fn process(&self, input: Box<dyn EventStream>) -> Box<dyn EventStream> {
32 Box::new(CountStream {
33 input,
34 interval_secs: self.interval_secs,
35 count: 0,
36 current_bin: None,
37 finished: false,
38 })
39 }
40}
41
42struct CountStream {
43 input: Box<dyn EventStream>,
44 interval_secs: u64,
45 count: u64,
46 current_bin: Option<u64>,
49 finished: bool,
50}
51
52impl EventStream for CountStream {
53 fn description(&self) -> &EventStreamDesc {
54 self.input.description()
55 }
56
57 fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
58 if self.finished {
59 return Ok(None);
60 }
61 loop {
62 match self.input.next_event()? {
63 Some(sample) => {
64 let bin = crate::etl::decimation::bin_of(sample.timestamp, self.interval_secs);
65 if let Some(prev_bin) = self.current_bin
66 && bin != prev_bin
67 && self.count > 0
68 {
69 let result = ArchiverSample::new(
70 crate::etl::decimation::bin_start(prev_bin, self.interval_secs),
71 ArchiverValue::ScalarDouble(self.count as f64),
72 );
73 self.count = 1;
74 self.current_bin = Some(bin);
75 return Ok(Some(result));
76 }
77 self.current_bin = Some(bin);
78 self.count += 1;
79 }
80 None => {
81 self.finished = true;
82 if let Some(prev_bin) = self.current_bin
83 && self.count > 0
84 {
85 let result = ArchiverSample::new(
86 crate::etl::decimation::bin_start(prev_bin, self.interval_secs),
87 ArchiverValue::ScalarDouble(self.count as f64),
88 );
89 self.count = 0;
90 return Ok(Some(result));
91 }
92 return Ok(None);
93 }
94 }
95 }
96 }
97}
98
99pub struct NCountPostProcessor;
103
104impl NCountPostProcessor {
105 pub fn new(_interval_secs: u64) -> Self {
106 Self
107 }
108}
109
110impl PostProcessor for NCountPostProcessor {
111 fn name(&self) -> &str {
112 "ncount"
113 }
114 fn interval_secs(&self) -> u64 {
115 0
116 }
117 fn process(&self, input: Box<dyn EventStream>) -> Box<dyn EventStream> {
118 Box::new(NCountStream {
119 input,
120 count: 0,
121 first_ts: None,
122 done: false,
123 })
124 }
125}
126
127struct NCountStream {
128 input: Box<dyn EventStream>,
129 count: u64,
130 first_ts: Option<SystemTime>,
131 done: bool,
132}
133
134impl EventStream for NCountStream {
135 fn description(&self) -> &EventStreamDesc {
136 self.input.description()
137 }
138
139 fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
140 if self.done {
141 return Ok(None);
142 }
143 while let Some(sample) = self.input.next_event()? {
144 if self.first_ts.is_none() {
145 self.first_ts = Some(sample.timestamp);
146 }
147 self.count += 1;
148 }
149 self.done = true;
150 match self.first_ts {
151 Some(ts) => Ok(Some(ArchiverSample::new(
152 ts,
153 ArchiverValue::ScalarDouble(self.count as f64),
154 ))),
155 None => Ok(None),
156 }
157 }
158}
159
160#[cfg(test)]
161mod tests {
162 use super::*;
163 use crate::types::ArchDbType;
164 use std::time::{Duration, UNIX_EPOCH};
165
166 struct VecStream {
167 desc: EventStreamDesc,
168 items: std::vec::IntoIter<(u64, f64)>,
169 start: SystemTime,
170 }
171
172 impl VecStream {
173 fn new(items: Vec<(u64, f64)>) -> Self {
174 Self {
175 desc: EventStreamDesc {
176 pv_name: "TEST".to_string(),
177 db_type: ArchDbType::ScalarDouble,
178 year: 2024,
179 element_count: Some(1),
180 headers: Vec::new(),
181 },
182 items: items.into_iter(),
183 start: UNIX_EPOCH + Duration::from_secs(1_700_000_000),
184 }
185 }
186 }
187
188 impl EventStream for VecStream {
189 fn description(&self) -> &EventStreamDesc {
190 &self.desc
191 }
192 fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
193 Ok(self.items.next().map(|(offset, v)| {
194 ArchiverSample::new(
195 self.start + Duration::from_secs(offset),
196 ArchiverValue::ScalarDouble(v),
197 )
198 }))
199 }
200 }
201
202 fn drain(pp: Box<dyn PostProcessor>, items: Vec<(u64, f64)>) -> Vec<f64> {
203 let stream = pp.process(Box::new(VecStream::new(items)));
204 let mut out = Vec::new();
205 let mut s = stream;
206 while let Some(sample) = s.next_event().unwrap() {
207 if let ArchiverValue::ScalarDouble(v) = sample.value {
208 out.push(v);
209 }
210 }
211 out
212 }
213
214 #[test]
215 fn count_per_bin() {
216 let pp: Box<dyn PostProcessor> = Box::new(CountPostProcessor::new(10));
217 let items = vec![(0, 1.0), (3, 1.0), (6, 1.0), (9, 1.0), (10, 1.0), (12, 1.0)];
219 assert_eq!(drain(pp, items), vec![4.0, 2.0]);
220 }
221
222 #[test]
223 fn ncount_total() {
224 let pp: Box<dyn PostProcessor> = Box::new(NCountPostProcessor::new(0));
225 let items = vec![(0, 1.0), (1, 2.0), (2, 3.0), (10, 4.0), (12, 5.0)];
226 assert_eq!(drain(pp, items), vec![5.0]);
227 }
228
229 #[test]
230 fn ncount_empty() {
231 let pp: Box<dyn PostProcessor> = Box::new(NCountPostProcessor::new(0));
232 assert!(drain(pp, vec![]).is_empty());
233 }
234}