archiver_core/etl/
decimation.rs1use std::time::{Duration, SystemTime, UNIX_EPOCH};
2
3use crate::storage::traits::{EventStream, PostProcessor};
4use crate::types::{ArchDbType, ArchiverSample, ArchiverValue, EventStreamDesc};
5
6pub fn bin_of(ts: SystemTime, interval_secs: u64) -> u64 {
11 let secs = ts.duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
12 secs / interval_secs
13}
14
15pub fn bin_start(bin: u64, interval_secs: u64) -> SystemTime {
17 UNIX_EPOCH + Duration::from_secs(bin.saturating_mul(interval_secs))
18}
19
20pub struct MeanDecimation {
22 interval_secs: u64,
23}
24
25impl MeanDecimation {
26 pub fn new(interval_secs: u64) -> Self {
27 Self { interval_secs }
28 }
29}
30
31impl PostProcessor for MeanDecimation {
32 fn name(&self) -> &str {
33 "mean"
34 }
35
36 fn interval_secs(&self) -> u64 {
37 self.interval_secs
38 }
39
40 fn process(&self, input: Box<dyn EventStream>) -> Box<dyn EventStream> {
41 let mut desc = input.description().clone();
42 desc.db_type = ArchDbType::ScalarDouble;
45 Box::new(MeanDecimationStream {
46 desc,
47 input,
48 interval_secs: self.interval_secs,
49 buffer: Vec::new(),
50 current_bin: None,
51 finished: false,
52 })
53 }
54}
55
56struct MeanDecimationStream {
57 input: Box<dyn EventStream>,
58 desc: EventStreamDesc,
59 interval_secs: u64,
60 buffer: Vec<f64>,
61 current_bin: Option<u64>,
62 finished: bool,
63}
64
65impl EventStream for MeanDecimationStream {
66 fn description(&self) -> &EventStreamDesc {
67 &self.desc
68 }
69
70 fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
71 if self.finished {
72 return Ok(None);
73 }
74
75 loop {
76 match self.input.next_event()? {
77 Some(sample) => {
78 let bin = bin_of(sample.timestamp, self.interval_secs);
79
80 if let Some(prev_bin) = self.current_bin
83 && bin != prev_bin
84 && !self.buffer.is_empty()
85 {
86 let mean = self.buffer.iter().sum::<f64>() / self.buffer.len() as f64;
87 let result = ArchiverSample::new(
88 bin_start(prev_bin, self.interval_secs),
89 ArchiverValue::ScalarDouble(mean),
90 );
91 self.buffer.clear();
92 self.current_bin = Some(bin);
93 if let Some(v) = sample.value.as_f64() {
94 self.buffer.push(v);
95 }
96 return Ok(Some(result));
97 }
98
99 self.current_bin = Some(bin);
100 if let Some(v) = sample.value.as_f64() {
101 self.buffer.push(v);
102 }
103 }
104 None => {
105 self.finished = true;
106 if let Some(prev_bin) = self.current_bin
107 && !self.buffer.is_empty()
108 {
109 let mean = self.buffer.iter().sum::<f64>() / self.buffer.len() as f64;
110 let result = ArchiverSample::new(
111 bin_start(prev_bin, self.interval_secs),
112 ArchiverValue::ScalarDouble(mean),
113 );
114 self.buffer.clear();
115 return Ok(Some(result));
116 }
117 return Ok(None);
118 }
119 }
120 }
121 }
122}
123
124pub struct FirstSampleDecimation {
126 interval_secs: u64,
127}
128
129impl FirstSampleDecimation {
130 pub fn new(interval_secs: u64) -> Self {
131 Self { interval_secs }
132 }
133}
134
135impl PostProcessor for FirstSampleDecimation {
136 fn name(&self) -> &str {
137 "firstSample"
138 }
139
140 fn interval_secs(&self) -> u64 {
141 self.interval_secs
142 }
143
144 fn process(&self, input: Box<dyn EventStream>) -> Box<dyn EventStream> {
145 Box::new(FirstSampleStream {
146 input,
147 interval_secs: self.interval_secs,
148 current_bin: None,
149 })
150 }
151}
152
153struct FirstSampleStream {
154 input: Box<dyn EventStream>,
155 interval_secs: u64,
156 current_bin: Option<u64>,
159}
160
161impl EventStream for FirstSampleStream {
162 fn description(&self) -> &EventStreamDesc {
163 self.input.description()
164 }
165
166 fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
167 loop {
168 match self.input.next_event()? {
169 Some(sample) => {
170 let bin = bin_of(sample.timestamp, self.interval_secs);
171 if self.current_bin != Some(bin) {
172 self.current_bin = Some(bin);
173 return Ok(Some(sample));
174 }
175 }
176 None => return Ok(None),
177 }
178 }
179 }
180}