use std::time::{Duration, SystemTime, UNIX_EPOCH};
use crate::storage::traits::{EventStream, PostProcessor};
use crate::types::{ArchDbType, ArchiverSample, ArchiverValue, EventStreamDesc};
pub fn bin_of(ts: SystemTime, interval_secs: u64) -> u64 {
let secs = ts.duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
secs / interval_secs
}
pub fn bin_start(bin: u64, interval_secs: u64) -> SystemTime {
UNIX_EPOCH + Duration::from_secs(bin.saturating_mul(interval_secs))
}
pub struct MeanDecimation {
interval_secs: u64,
}
impl MeanDecimation {
pub fn new(interval_secs: u64) -> Self {
Self { interval_secs }
}
}
impl PostProcessor for MeanDecimation {
fn name(&self) -> &str {
"mean"
}
fn interval_secs(&self) -> u64 {
self.interval_secs
}
fn process(&self, input: Box<dyn EventStream>) -> Box<dyn EventStream> {
let mut desc = input.description().clone();
desc.db_type = ArchDbType::ScalarDouble;
Box::new(MeanDecimationStream {
desc,
input,
interval_secs: self.interval_secs,
buffer: Vec::new(),
current_bin: None,
finished: false,
})
}
}
struct MeanDecimationStream {
input: Box<dyn EventStream>,
desc: EventStreamDesc,
interval_secs: u64,
buffer: Vec<f64>,
current_bin: Option<u64>,
finished: bool,
}
impl EventStream for MeanDecimationStream {
fn description(&self) -> &EventStreamDesc {
&self.desc
}
fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
if self.finished {
return Ok(None);
}
loop {
match self.input.next_event()? {
Some(sample) => {
let bin = bin_of(sample.timestamp, self.interval_secs);
if let Some(prev_bin) = self.current_bin
&& bin != prev_bin
&& !self.buffer.is_empty()
{
let mean = self.buffer.iter().sum::<f64>() / self.buffer.len() as f64;
let result = ArchiverSample::new(
bin_start(prev_bin, self.interval_secs),
ArchiverValue::ScalarDouble(mean),
);
self.buffer.clear();
self.current_bin = Some(bin);
if let Some(v) = sample.value.as_f64() {
self.buffer.push(v);
}
return Ok(Some(result));
}
self.current_bin = Some(bin);
if let Some(v) = sample.value.as_f64() {
self.buffer.push(v);
}
}
None => {
self.finished = true;
if let Some(prev_bin) = self.current_bin
&& !self.buffer.is_empty()
{
let mean = self.buffer.iter().sum::<f64>() / self.buffer.len() as f64;
let result = ArchiverSample::new(
bin_start(prev_bin, self.interval_secs),
ArchiverValue::ScalarDouble(mean),
);
self.buffer.clear();
return Ok(Some(result));
}
return Ok(None);
}
}
}
}
}
pub struct FirstSampleDecimation {
interval_secs: u64,
}
impl FirstSampleDecimation {
pub fn new(interval_secs: u64) -> Self {
Self { interval_secs }
}
}
impl PostProcessor for FirstSampleDecimation {
fn name(&self) -> &str {
"firstSample"
}
fn interval_secs(&self) -> u64 {
self.interval_secs
}
fn process(&self, input: Box<dyn EventStream>) -> Box<dyn EventStream> {
Box::new(FirstSampleStream {
input,
interval_secs: self.interval_secs,
current_bin: None,
})
}
}
struct FirstSampleStream {
input: Box<dyn EventStream>,
interval_secs: u64,
current_bin: Option<u64>,
}
impl EventStream for FirstSampleStream {
fn description(&self) -> &EventStreamDesc {
self.input.description()
}
fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
loop {
match self.input.next_event()? {
Some(sample) => {
let bin = bin_of(sample.timestamp, self.interval_secs);
if self.current_bin != Some(bin) {
self.current_bin = Some(bin);
return Ok(Some(sample));
}
}
None => return Ok(None),
}
}
}
}