use filter;
use metric;
use std::sync::atomic::{AtomicUsize, Ordering};
use time;
pub static DELAY_TELEM_REJECT: AtomicUsize = AtomicUsize::new(0);
pub static DELAY_TELEM_ACCEPT: AtomicUsize = AtomicUsize::new(0);
pub static DELAY_LOG_REJECT: AtomicUsize = AtomicUsize::new(0);
pub static DELAY_LOG_ACCEPT: AtomicUsize = AtomicUsize::new(0);
pub struct DelayFilter {
tolerance: i64,
}
#[derive(Clone, Debug)]
pub struct DelayFilterConfig {
pub config_path: Option<String>,
pub forwards: Vec<String>,
pub tolerance: i64,
}
impl DelayFilter {
pub fn new(config: &DelayFilterConfig) -> DelayFilter {
DelayFilter {
tolerance: config.tolerance,
}
}
}
impl filter::Filter for DelayFilter {
fn process(
&mut self,
event: metric::Event,
res: &mut Vec<metric::Event>,
) -> Result<(), filter::FilterError> {
match event {
metric::Event::Telemetry(telem) => {
if (telem.timestamp - time::now()).abs() < self.tolerance {
DELAY_TELEM_ACCEPT.fetch_add(1, Ordering::Relaxed);
res.push(metric::Event::Telemetry(telem));
} else {
DELAY_TELEM_REJECT.fetch_add(1, Ordering::Relaxed);
}
}
metric::Event::Log(log) => {
if (log.time - time::now()).abs() < self.tolerance {
DELAY_LOG_ACCEPT.fetch_add(1, Ordering::Relaxed);
res.push(metric::Event::Log(log));
} else {
DELAY_LOG_REJECT.fetch_add(1, Ordering::Relaxed);
}
}
ev => {
res.push(ev);
}
}
Ok(())
}
}