archiver_core/retrieval/
query.rs1use std::time::{Duration, SystemTime};
2
3use crate::retrieval::merge::MergedEventStream;
4use crate::storage::traits::{EventStream, PostProcessor, StoragePlugin};
5use crate::types::{ArchiverSample, EventStreamDesc};
6
7pub async fn query_data(
16 storage: &dyn StoragePlugin,
17 pv: &str,
18 start: SystemTime,
19 end: SystemTime,
20 post_processor: Option<Box<dyn PostProcessor>>,
21) -> anyhow::Result<Box<dyn EventStream>> {
22 let streams = storage.get_data(pv, start, end).await?;
23 let prefix = storage
24 .get_last_event_before(pv, start)
25 .await
26 .unwrap_or(None);
27
28 if streams.is_empty() && prefix.is_none() {
29 let desc = EventStreamDesc {
30 pv_name: pv.to_string(),
31 db_type: crate::types::ArchDbType::ScalarDouble,
32 year: chrono::Utc::now().year(),
33 element_count: None,
34 headers: Vec::new(),
35 };
36 return Ok(Box::new(EmptyStream { desc }));
37 }
38
39 let desc = if let Some(s) = streams.first() {
43 s.description().clone()
44 } else {
45 EventStreamDesc {
46 pv_name: pv.to_string(),
47 db_type: crate::types::ArchDbType::ScalarDouble,
48 year: chrono::Utc::now().year(),
49 element_count: None,
50 headers: Vec::new(),
51 }
52 };
53
54 let mut all_streams = Vec::with_capacity(streams.len() + 1);
55 if let Some(sample) = prefix {
56 all_streams.push(Box::new(SingleSampleStream {
57 sample: Some(sample),
58 desc: desc.clone(),
59 }) as Box<dyn EventStream>);
60 }
61 all_streams.extend(streams);
62
63 let merged: Box<dyn EventStream> = Box::new(MergedEventStream::new(desc, all_streams));
64
65 match post_processor {
66 Some(pp) => Ok(pp.process(merged)),
67 None => Ok(merged),
68 }
69}
70
71pub(crate) struct SingleSampleStream {
76 pub(crate) sample: Option<ArchiverSample>,
77 pub(crate) desc: EventStreamDesc,
78}
79
80impl EventStream for SingleSampleStream {
81 fn description(&self) -> &EventStreamDesc {
82 &self.desc
83 }
84
85 fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
86 Ok(self.sample.take())
87 }
88}
89
90pub fn parse_post_processor(spec: &str) -> Option<Box<dyn PostProcessor>> {
98 let parts: Vec<&str> = spec.splitn(2, '_').collect();
99 if parts.len() != 2 {
100 return None;
101 }
102 let interval: u64 = parts[1].parse().ok()?;
103 if interval == 0 {
104 return None;
105 }
106 use crate::retrieval::postprocessors::{counts, last_sample, statistics};
107 match parts[0] {
108 "mean" => Some(Box::new(crate::etl::decimation::MeanDecimation::new(
109 interval,
110 ))),
111 "firstSample" => Some(Box::new(
112 crate::etl::decimation::FirstSampleDecimation::new(interval),
113 )),
114 "max" => Some(Box::new(statistics::MaxPostProcessor::new(interval))),
115 "min" => Some(Box::new(statistics::MinPostProcessor::new(interval))),
116 "std" => Some(Box::new(statistics::StdPostProcessor::new(interval))),
117 "median" => Some(Box::new(statistics::MedianPostProcessor::new(interval))),
118 "variance" => Some(Box::new(statistics::VariancePostProcessor::new(interval))),
119 "rms" => Some(Box::new(statistics::RmsPostProcessor::new(interval))),
120 "count" => Some(Box::new(counts::CountPostProcessor::new(interval))),
121 "ncount" => Some(Box::new(counts::NCountPostProcessor::new(interval))),
122 "lastSample" => Some(Box::new(last_sample::LastSamplePostProcessor::new(
123 interval,
124 ))),
125 _ => None,
126 }
127}
128
129use chrono::Datelike;
130
131const TWO_WEEKS_SECS: u64 = 2 * 7 * 86400;
137const SPARSIFY_INTERVAL_SECS: u64 = 900; pub struct TwoWeekRawProcessor;
140
141impl PostProcessor for TwoWeekRawProcessor {
142 fn name(&self) -> &str {
143 "twoweek"
144 }
145
146 fn interval_secs(&self) -> u64 {
147 SPARSIFY_INTERVAL_SECS
148 }
149
150 fn process(&self, input: Box<dyn EventStream>) -> Box<dyn EventStream> {
151 let two_weeks_ago = SystemTime::now() - Duration::from_secs(TWO_WEEKS_SECS);
152 Box::new(TwoWeekRawStream {
153 input,
154 two_weeks_ago,
155 interval_secs: SPARSIFY_INTERVAL_SECS,
156 last_bin: None,
157 })
158 }
159}
160
161struct TwoWeekRawStream {
162 input: Box<dyn EventStream>,
163 two_weeks_ago: SystemTime,
164 interval_secs: u64,
165 last_bin: Option<u64>,
166}
167
168impl EventStream for TwoWeekRawStream {
169 fn description(&self) -> &EventStreamDesc {
170 self.input.description()
171 }
172
173 fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
174 loop {
175 match self.input.next_event()? {
176 Some(sample) => {
177 if sample.timestamp >= self.two_weeks_ago {
178 return Ok(Some(sample));
180 }
181 let epoch_secs = sample
183 .timestamp
184 .duration_since(SystemTime::UNIX_EPOCH)
185 .unwrap_or_default()
186 .as_secs();
187 let bin = epoch_secs / self.interval_secs;
188 if self.last_bin != Some(bin) {
189 self.last_bin = Some(bin);
190 return Ok(Some(sample));
191 }
192 }
194 None => return Ok(None),
195 }
196 }
197 }
198}
199
200struct EmptyStream {
201 desc: EventStreamDesc,
202}
203
204impl EventStream for EmptyStream {
205 fn description(&self) -> &EventStreamDesc {
206 &self.desc
207 }
208
209 fn next_event(&mut self) -> anyhow::Result<Option<crate::types::ArchiverSample>> {
210 Ok(None)
211 }
212}