archiver_core/retrieval/postprocessors/
last_sample.rs1use crate::storage::traits::{EventStream, PostProcessor};
7use crate::types::{ArchiverSample, EventStreamDesc};
8
9pub struct LastSamplePostProcessor {
10 interval_secs: u64,
11}
12
13impl LastSamplePostProcessor {
14 pub fn new(interval_secs: u64) -> Self {
15 Self { interval_secs }
16 }
17}
18
19impl PostProcessor for LastSamplePostProcessor {
20 fn name(&self) -> &str {
21 "lastSample"
22 }
23 fn interval_secs(&self) -> u64 {
24 self.interval_secs
25 }
26 fn process(&self, input: Box<dyn EventStream>) -> Box<dyn EventStream> {
27 Box::new(LastSampleStream {
28 input,
29 interval_secs: self.interval_secs,
30 current_bin: None,
31 pending: None,
32 })
33 }
34}
35
36struct LastSampleStream {
37 input: Box<dyn EventStream>,
38 interval_secs: u64,
39 current_bin: Option<u64>,
41 pending: Option<ArchiverSample>,
43}
44
45impl EventStream for LastSampleStream {
46 fn description(&self) -> &EventStreamDesc {
47 self.input.description()
48 }
49
50 fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
51 loop {
52 match self.input.next_event()? {
53 Some(sample) => {
54 let bin = crate::etl::decimation::bin_of(sample.timestamp, self.interval_secs);
55 if self.current_bin != Some(bin) {
56 let emit = self.pending.take();
60 self.current_bin = Some(bin);
61 self.pending = Some(sample);
62 if let Some(out) = emit {
63 return Ok(Some(out));
64 }
65 continue;
68 }
69 self.pending = Some(sample);
71 }
72 None => {
73 return Ok(self.pending.take());
75 }
76 }
77 }
78 }
79}
80
81#[cfg(test)]
82mod tests {
83 use super::*;
84 use crate::types::{ArchDbType, ArchiverValue};
85 use std::time::{Duration, SystemTime, UNIX_EPOCH};
86
87 struct VecStream {
88 desc: EventStreamDesc,
89 items: std::vec::IntoIter<(u64, f64)>,
90 start: SystemTime,
91 }
92
93 impl VecStream {
94 fn new(items: Vec<(u64, f64)>) -> Self {
95 Self {
96 desc: EventStreamDesc {
97 pv_name: "TEST".to_string(),
98 db_type: ArchDbType::ScalarDouble,
99 year: 2024,
100 element_count: Some(1),
101 headers: Vec::new(),
102 },
103 items: items.into_iter(),
104 start: UNIX_EPOCH + Duration::from_secs(1_700_000_000),
105 }
106 }
107 }
108
109 impl EventStream for VecStream {
110 fn description(&self) -> &EventStreamDesc {
111 &self.desc
112 }
113 fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
114 Ok(self.items.next().map(|(offset, v)| {
115 ArchiverSample::new(
116 self.start + Duration::from_secs(offset),
117 ArchiverValue::ScalarDouble(v),
118 )
119 }))
120 }
121 }
122
123 #[test]
124 fn last_sample_per_bin() {
125 let pp = LastSamplePostProcessor::new(10);
127 let mut s = pp.process(Box::new(VecStream::new(vec![
128 (0, 1.0),
129 (3, 2.0),
130 (9, 3.0),
131 (10, 4.0),
132 (15, 5.0),
133 ])));
134 let v1 = match s.next_event().unwrap().unwrap().value {
135 ArchiverValue::ScalarDouble(v) => v,
136 _ => panic!("wrong type"),
137 };
138 let v2 = match s.next_event().unwrap().unwrap().value {
139 ArchiverValue::ScalarDouble(v) => v,
140 _ => panic!("wrong type"),
141 };
142 assert_eq!(v1, 3.0);
143 assert_eq!(v2, 5.0);
144 assert!(s.next_event().unwrap().is_none());
145 }
146}