use std::time::SystemTime;
use crate::storage::traits::{EventStream, PostProcessor};
use crate::types::{ArchiverSample, ArchiverValue, EventStreamDesc};
pub struct CountPostProcessor {
interval_secs: u64,
}
impl CountPostProcessor {
pub fn new(interval_secs: u64) -> Self {
Self { interval_secs }
}
}
impl PostProcessor for CountPostProcessor {
fn name(&self) -> &str {
"count"
}
fn interval_secs(&self) -> u64 {
self.interval_secs
}
fn process(&self, input: Box<dyn EventStream>) -> Box<dyn EventStream> {
Box::new(CountStream {
input,
interval_secs: self.interval_secs,
count: 0,
current_bin: None,
finished: false,
})
}
}
struct CountStream {
input: Box<dyn EventStream>,
interval_secs: u64,
count: u64,
current_bin: Option<u64>,
finished: bool,
}
impl EventStream for CountStream {
fn description(&self) -> &EventStreamDesc {
self.input.description()
}
fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
if self.finished {
return Ok(None);
}
loop {
match self.input.next_event()? {
Some(sample) => {
let bin = crate::etl::decimation::bin_of(sample.timestamp, self.interval_secs);
if let Some(prev_bin) = self.current_bin
&& bin != prev_bin
&& self.count > 0
{
let result = ArchiverSample::new(
crate::etl::decimation::bin_start(prev_bin, self.interval_secs),
ArchiverValue::ScalarDouble(self.count as f64),
);
self.count = 1;
self.current_bin = Some(bin);
return Ok(Some(result));
}
self.current_bin = Some(bin);
self.count += 1;
}
None => {
self.finished = true;
if let Some(prev_bin) = self.current_bin
&& self.count > 0
{
let result = ArchiverSample::new(
crate::etl::decimation::bin_start(prev_bin, self.interval_secs),
ArchiverValue::ScalarDouble(self.count as f64),
);
self.count = 0;
return Ok(Some(result));
}
return Ok(None);
}
}
}
}
}
pub struct NCountPostProcessor;
impl NCountPostProcessor {
pub fn new(_interval_secs: u64) -> Self {
Self
}
}
impl PostProcessor for NCountPostProcessor {
fn name(&self) -> &str {
"ncount"
}
fn interval_secs(&self) -> u64 {
0
}
fn process(&self, input: Box<dyn EventStream>) -> Box<dyn EventStream> {
Box::new(NCountStream {
input,
count: 0,
first_ts: None,
done: false,
})
}
}
struct NCountStream {
input: Box<dyn EventStream>,
count: u64,
first_ts: Option<SystemTime>,
done: bool,
}
impl EventStream for NCountStream {
fn description(&self) -> &EventStreamDesc {
self.input.description()
}
fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
if self.done {
return Ok(None);
}
while let Some(sample) = self.input.next_event()? {
if self.first_ts.is_none() {
self.first_ts = Some(sample.timestamp);
}
self.count += 1;
}
self.done = true;
match self.first_ts {
Some(ts) => Ok(Some(ArchiverSample::new(
ts,
ArchiverValue::ScalarDouble(self.count as f64),
))),
None => Ok(None),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::ArchDbType;
use std::time::{Duration, UNIX_EPOCH};
struct VecStream {
desc: EventStreamDesc,
items: std::vec::IntoIter<(u64, f64)>,
start: SystemTime,
}
impl VecStream {
fn new(items: Vec<(u64, f64)>) -> Self {
Self {
desc: EventStreamDesc {
pv_name: "TEST".to_string(),
db_type: ArchDbType::ScalarDouble,
year: 2024,
element_count: Some(1),
headers: Vec::new(),
},
items: items.into_iter(),
start: UNIX_EPOCH + Duration::from_secs(1_700_000_000),
}
}
}
impl EventStream for VecStream {
fn description(&self) -> &EventStreamDesc {
&self.desc
}
fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
Ok(self.items.next().map(|(offset, v)| {
ArchiverSample::new(
self.start + Duration::from_secs(offset),
ArchiverValue::ScalarDouble(v),
)
}))
}
}
fn drain(pp: Box<dyn PostProcessor>, items: Vec<(u64, f64)>) -> Vec<f64> {
let stream = pp.process(Box::new(VecStream::new(items)));
let mut out = Vec::new();
let mut s = stream;
while let Some(sample) = s.next_event().unwrap() {
if let ArchiverValue::ScalarDouble(v) = sample.value {
out.push(v);
}
}
out
}
#[test]
fn count_per_bin() {
let pp: Box<dyn PostProcessor> = Box::new(CountPostProcessor::new(10));
let items = vec![(0, 1.0), (3, 1.0), (6, 1.0), (9, 1.0), (10, 1.0), (12, 1.0)];
assert_eq!(drain(pp, items), vec![4.0, 2.0]);
}
#[test]
fn ncount_total() {
let pp: Box<dyn PostProcessor> = Box::new(NCountPostProcessor::new(0));
let items = vec![(0, 1.0), (1, 2.0), (2, 3.0), (10, 4.0), (12, 5.0)];
assert_eq!(drain(pp, items), vec![5.0]);
}
#[test]
fn ncount_empty() {
let pp: Box<dyn PostProcessor> = Box::new(NCountPostProcessor::new(0));
assert!(drain(pp, vec![]).is_empty());
}
}