use crate::storage::traits::{EventStream, PostProcessor};
use crate::types::{ArchiverSample, EventStreamDesc};
pub struct LastSamplePostProcessor {
interval_secs: u64,
}
impl LastSamplePostProcessor {
pub fn new(interval_secs: u64) -> Self {
Self { interval_secs }
}
}
impl PostProcessor for LastSamplePostProcessor {
fn name(&self) -> &str {
"lastSample"
}
fn interval_secs(&self) -> u64 {
self.interval_secs
}
fn process(&self, input: Box<dyn EventStream>) -> Box<dyn EventStream> {
Box::new(LastSampleStream {
input,
interval_secs: self.interval_secs,
current_bin: None,
pending: None,
})
}
}
struct LastSampleStream {
input: Box<dyn EventStream>,
interval_secs: u64,
current_bin: Option<u64>,
pending: Option<ArchiverSample>,
}
impl EventStream for LastSampleStream {
fn description(&self) -> &EventStreamDesc {
self.input.description()
}
fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
loop {
match self.input.next_event()? {
Some(sample) => {
let bin = crate::etl::decimation::bin_of(sample.timestamp, self.interval_secs);
if self.current_bin != Some(bin) {
let emit = self.pending.take();
self.current_bin = Some(bin);
self.pending = Some(sample);
if let Some(out) = emit {
return Ok(Some(out));
}
continue;
}
self.pending = Some(sample);
}
None => {
return Ok(self.pending.take());
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{ArchDbType, ArchiverValue};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
struct VecStream {
desc: EventStreamDesc,
items: std::vec::IntoIter<(u64, f64)>,
start: SystemTime,
}
impl VecStream {
fn new(items: Vec<(u64, f64)>) -> Self {
Self {
desc: EventStreamDesc {
pv_name: "TEST".to_string(),
db_type: ArchDbType::ScalarDouble,
year: 2024,
element_count: Some(1),
headers: Vec::new(),
},
items: items.into_iter(),
start: UNIX_EPOCH + Duration::from_secs(1_700_000_000),
}
}
}
impl EventStream for VecStream {
fn description(&self) -> &EventStreamDesc {
&self.desc
}
fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
Ok(self.items.next().map(|(offset, v)| {
ArchiverSample::new(
self.start + Duration::from_secs(offset),
ArchiverValue::ScalarDouble(v),
)
}))
}
}
#[test]
fn last_sample_per_bin() {
let pp = LastSamplePostProcessor::new(10);
let mut s = pp.process(Box::new(VecStream::new(vec![
(0, 1.0),
(3, 2.0),
(9, 3.0),
(10, 4.0),
(15, 5.0),
])));
let v1 = match s.next_event().unwrap().unwrap().value {
ArchiverValue::ScalarDouble(v) => v,
_ => panic!("wrong type"),
};
let v2 = match s.next_event().unwrap().unwrap().value {
ArchiverValue::ScalarDouble(v) => v,
_ => panic!("wrong type"),
};
assert_eq!(v1, 3.0);
assert_eq!(v2, 5.0);
assert!(s.next_event().unwrap().is_none());
}
}