Skip to main content

archiver_core/retrieval/
query.rs

1use std::time::{Duration, SystemTime};
2
3use crate::retrieval::merge::MergedEventStream;
4use crate::storage::traits::{EventStream, PostProcessor, StoragePlugin};
5use crate::types::{ArchiverSample, EventStreamDesc};
6
7/// Execute a data query across a storage plugin, optionally applying a post-processor.
8///
9/// Prepends the last sample whose timestamp is strictly before `start` so
10/// plots stay continuous when the requested window starts in a gap. Java's
11/// `getLastEventOfPreviousPartitionBeforeTimeAsStream` (5666a8b5) does the
12/// same: a slow PV last sampled 10 days ago, queried for the last 5
13/// minutes, returns that 10-day-old sample as the leading point instead
14/// of an empty stream.
15pub async fn query_data(
16    storage: &dyn StoragePlugin,
17    pv: &str,
18    start: SystemTime,
19    end: SystemTime,
20    post_processor: Option<Box<dyn PostProcessor>>,
21) -> anyhow::Result<Box<dyn EventStream>> {
22    let streams = storage.get_data(pv, start, end).await?;
23    let prefix = storage
24        .get_last_event_before(pv, start)
25        .await
26        .unwrap_or(None);
27
28    if streams.is_empty() && prefix.is_none() {
29        let desc = EventStreamDesc {
30            pv_name: pv.to_string(),
31            db_type: crate::types::ArchDbType::ScalarDouble,
32            year: chrono::Utc::now().year(),
33            element_count: None,
34            headers: Vec::new(),
35        };
36        return Ok(Box::new(EmptyStream { desc }));
37    }
38
39    // Pick a description: prefer the first in-range stream's description
40    // (it carries the right year/dbr_type for the PV); fall back to a
41    // synthetic one if only the prefix sample exists.
42    let desc = if let Some(s) = streams.first() {
43        s.description().clone()
44    } else {
45        EventStreamDesc {
46            pv_name: pv.to_string(),
47            db_type: crate::types::ArchDbType::ScalarDouble,
48            year: chrono::Utc::now().year(),
49            element_count: None,
50            headers: Vec::new(),
51        }
52    };
53
54    let mut all_streams = Vec::with_capacity(streams.len() + 1);
55    if let Some(sample) = prefix {
56        all_streams.push(Box::new(SingleSampleStream {
57            sample: Some(sample),
58            desc: desc.clone(),
59        }) as Box<dyn EventStream>);
60    }
61    all_streams.extend(streams);
62
63    let merged: Box<dyn EventStream> = Box::new(MergedEventStream::new(desc, all_streams));
64
65    match post_processor {
66        Some(pp) => Ok(pp.process(merged)),
67        None => Ok(merged),
68    }
69}
70
71/// One-shot stream that yields a single pre-built sample. Used to prepend
72/// the prior-partition continuity sample to a query's results, and (via
73/// the crate-public re-export) by the storage layer's stale-file
74/// short-circuit (Java parity 88c7601).
75pub(crate) struct SingleSampleStream {
76    pub(crate) sample: Option<ArchiverSample>,
77    pub(crate) desc: EventStreamDesc,
78}
79
80impl EventStream for SingleSampleStream {
81    fn description(&self) -> &EventStreamDesc {
82        &self.desc
83    }
84
85    fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
86        Ok(self.sample.take())
87    }
88}
89
90/// Parse a post-processor spec like "mean_600" or "firstSample_3600".
91///
92/// Java's `SummaryStatsPostProcessor` enforces `intervalSecs >= 1`
93/// (CSSTUDIO-2134) — without that floor, `mean_0` makes every sample its
94/// own bin (`elapsed >= 0` always true) which both misuses the operator
95/// and risks divide-by-zero in any future `epoch_secs / interval_secs`
96/// path. Reject `_0` here so the API surface is consistent.
97pub fn parse_post_processor(spec: &str) -> Option<Box<dyn PostProcessor>> {
98    let parts: Vec<&str> = spec.splitn(2, '_').collect();
99    if parts.len() != 2 {
100        return None;
101    }
102    let interval: u64 = parts[1].parse().ok()?;
103    if interval == 0 {
104        return None;
105    }
106    use crate::retrieval::postprocessors::{counts, last_sample, statistics};
107    match parts[0] {
108        "mean" => Some(Box::new(crate::etl::decimation::MeanDecimation::new(
109            interval,
110        ))),
111        "firstSample" => Some(Box::new(
112            crate::etl::decimation::FirstSampleDecimation::new(interval),
113        )),
114        "max" => Some(Box::new(statistics::MaxPostProcessor::new(interval))),
115        "min" => Some(Box::new(statistics::MinPostProcessor::new(interval))),
116        "std" => Some(Box::new(statistics::StdPostProcessor::new(interval))),
117        "median" => Some(Box::new(statistics::MedianPostProcessor::new(interval))),
118        "variance" => Some(Box::new(statistics::VariancePostProcessor::new(interval))),
119        "rms" => Some(Box::new(statistics::RmsPostProcessor::new(interval))),
120        "count" => Some(Box::new(counts::CountPostProcessor::new(interval))),
121        "ncount" => Some(Box::new(counts::NCountPostProcessor::new(interval))),
122        "lastSample" => Some(Box::new(last_sample::LastSamplePostProcessor::new(
123            interval,
124        ))),
125        _ => None,
126    }
127}
128
129use chrono::Datelike;
130
131// ── TwoWeekRaw post-processor ──
132// Compatible with Java EPICS Archiver Appliance's TwoWeekRaw policy:
133// - Data within the last 2 weeks: pass through raw
134// - Data older than 2 weeks: firstSample binning (one per 15-minute bin)
135
136const TWO_WEEKS_SECS: u64 = 2 * 7 * 86400;
137const SPARSIFY_INTERVAL_SECS: u64 = 900; // 15 minutes (PostProcessors.DEFAULT_SUMMARIZING_INTERVAL)
138
139pub struct TwoWeekRawProcessor;
140
141impl PostProcessor for TwoWeekRawProcessor {
142    fn name(&self) -> &str {
143        "twoweek"
144    }
145
146    fn interval_secs(&self) -> u64 {
147        SPARSIFY_INTERVAL_SECS
148    }
149
150    fn process(&self, input: Box<dyn EventStream>) -> Box<dyn EventStream> {
151        let two_weeks_ago = SystemTime::now() - Duration::from_secs(TWO_WEEKS_SECS);
152        Box::new(TwoWeekRawStream {
153            input,
154            two_weeks_ago,
155            interval_secs: SPARSIFY_INTERVAL_SECS,
156            last_bin: None,
157        })
158    }
159}
160
161struct TwoWeekRawStream {
162    input: Box<dyn EventStream>,
163    two_weeks_ago: SystemTime,
164    interval_secs: u64,
165    last_bin: Option<u64>,
166}
167
168impl EventStream for TwoWeekRawStream {
169    fn description(&self) -> &EventStreamDesc {
170        self.input.description()
171    }
172
173    fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
174        loop {
175            match self.input.next_event()? {
176                Some(sample) => {
177                    if sample.timestamp >= self.two_weeks_ago {
178                        // Recent data: pass through raw
179                        return Ok(Some(sample));
180                    }
181                    // Old data: firstSample binning
182                    let epoch_secs = sample
183                        .timestamp
184                        .duration_since(SystemTime::UNIX_EPOCH)
185                        .unwrap_or_default()
186                        .as_secs();
187                    let bin = epoch_secs / self.interval_secs;
188                    if self.last_bin != Some(bin) {
189                        self.last_bin = Some(bin);
190                        return Ok(Some(sample));
191                    }
192                    // Skip: same bin as previous sample
193                }
194                None => return Ok(None),
195            }
196        }
197    }
198}
199
200struct EmptyStream {
201    desc: EventStreamDesc,
202}
203
204impl EventStream for EmptyStream {
205    fn description(&self) -> &EventStreamDesc {
206        &self.desc
207    }
208
209    fn next_event(&mut self) -> anyhow::Result<Option<crate::types::ArchiverSample>> {
210        Ok(None)
211    }
212}