use crate::ResamplingFunction;
use chrono::{DateTime, TimeDelta, Utc};
use itertools::Itertools;
use log::warn;
use num_traits::FromPrimitive;
use std::fmt::Debug;
use std::ops::Div;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum Label {
#[default]
Left,
Right,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum Closed {
#[default]
Left,
Right,
}
pub trait Sample: Clone + Debug + Default {
type Value;
fn new(timestamp: DateTime<Utc>, value: Option<Self::Value>) -> Self;
fn timestamp(&self) -> DateTime<Utc>;
fn value(&self) -> Option<Self::Value>;
}
impl<
T: Div<Output = T> + std::iter::Sum + PartialOrd + FromPrimitive + Default + Debug,
S: Sample<Value = T>,
> ResamplingFunction<T, S>
{
pub fn apply(&mut self, samples: &[&S]) -> Option<T> {
match self {
Self::Average => Self::Sum
.apply(samples)
.and_then(|sum| Self::Count.apply(samples).map(|count| sum.div(count))),
Self::Sum => samples.iter().filter_map(|s| s.value()).sum1(),
Self::Max => samples.iter().filter_map(|s| s.value()).max_by(|a, b| {
a.partial_cmp(b).unwrap_or_else(|| {
if a.partial_cmp(&T::default()).is_some() {
std::cmp::Ordering::Greater
} else {
std::cmp::Ordering::Less
}
})
}),
Self::Min => samples.iter().filter_map(|s| s.value()).min_by(|a, b| {
a.partial_cmp(b).unwrap_or_else(|| {
if a.partial_cmp(&T::default()).is_some() {
std::cmp::Ordering::Less
} else {
std::cmp::Ordering::Greater
}
})
}),
Self::First => samples.first().and_then(|s| s.value()),
Self::Last => samples.last().and_then(|s| s.value()),
Self::Coalesce => samples.iter().find_map(|s| s.value()),
Self::Count => Some(
T::from_usize(samples.iter().filter_map(|s| s.value()).count())
.unwrap_or_else(|| T::default()),
),
Self::Custom(f) => f.as_mut()(samples),
}
}
}
impl<T: Div<Output = T> + std::iter::Sum + Default + Debug, S: Sample<Value = T>> Debug
for ResamplingFunction<T, S>
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Average => write!(f, "Average"),
Self::Sum => write!(f, "Sum"),
Self::Max => write!(f, "Max"),
Self::Min => write!(f, "Min"),
Self::First => write!(f, "First"),
Self::Last => write!(f, "Last"),
Self::Coalesce => write!(f, "Coalesce"),
Self::Count => write!(f, "Count"),
Self::Custom(_) => write!(f, "Custom"),
}
}
}
#[derive(Debug, Default)]
pub struct Resampler<
T: Div<Output = T> + std::iter::Sum + PartialOrd + FromPrimitive + Default + Debug,
S: Sample<Value = T>,
> {
interval: TimeDelta,
resampling_function: ResamplingFunction<T, S>,
buffer: Vec<S>,
max_age_in_intervals: i32,
start: DateTime<Utc>,
input_start: Option<DateTime<Utc>>,
input_interval: Option<TimeDelta>,
closed: Closed,
label: Label,
}
impl<
T: Div<Output = T> + std::iter::Sum + PartialOrd + FromPrimitive + Default + Debug,
S: Sample<Value = T>,
> Resampler<T, S>
{
pub fn new(
interval: TimeDelta,
resampling_function: ResamplingFunction<T, S>,
max_age_in_intervals: i32,
start: DateTime<Utc>,
closed: Closed,
label: Label,
) -> Self {
let aligned_start = epoch_align(interval, start, None);
Self {
interval,
resampling_function,
max_age_in_intervals,
start: aligned_start,
closed,
label,
..Default::default()
}
}
pub fn push(&mut self, sample: S) {
self.buffer.push(sample);
}
pub fn buffer(&self) -> &Vec<S> {
&self.buffer
}
pub fn resample(&mut self, end: DateTime<Utc>) -> Vec<S> {
if self.start >= end {
warn!("start time is greater or equal to end time");
return vec![];
}
let mut res = vec![];
let mut interval_buffer = vec![];
let mut buffer_iter = self.buffer.iter();
let mut next_sample: Option<&S> = buffer_iter.next();
self.input_start = next_sample.map(|s| s.timestamp());
let offset = match self.label {
Label::Left => TimeDelta::zero(),
Label::Right => self.interval,
};
while self.start < end {
while next_sample
.map(|s| {
is_in_interval(
&s.timestamp(),
&self.start,
&(self.start + self.interval),
self.closed,
)
})
.unwrap_or(false)
{
if let Some(s) = next_sample {
interval_buffer.push(s);
next_sample = buffer_iter.next();
if let Some(input_start) = self.input_start {
if self.input_interval.is_none() {
self.input_interval =
Some((s.timestamp() - input_start).max(self.interval));
}
}
}
}
let input_interval = self.input_interval.unwrap_or(self.interval);
let drain_end_date =
self.start + self.interval - input_interval * self.max_age_in_intervals;
interval_buffer
.retain(|s| is_after_retention_edge(&s.timestamp(), &drain_end_date, self.closed));
res.push(Sample::new(
self.start + offset,
self.resampling_function.apply(interval_buffer.as_slice()),
));
self.start += self.interval;
}
let interval = self.input_interval.unwrap_or(self.interval);
let drain_end_date = end - interval * self.max_age_in_intervals;
self.buffer
.retain(|s| is_after_retention_edge(&s.timestamp(), &drain_end_date, self.closed));
res
}
pub fn resample_now(&mut self) -> Vec<S> {
self.resample(Utc::now())
}
}
impl<
T: Div<Output = T> + std::iter::Sum + PartialOrd + FromPrimitive + Default + Debug,
S: Sample<Value = T>,
> Extend<S> for Resampler<T, S>
{
fn extend<I: IntoIterator<Item = S>>(&mut self, iter: I) {
self.buffer.extend(iter);
}
}
pub fn epoch_align(
interval: TimeDelta,
timestamp: DateTime<Utc>,
alignment_timestamp: Option<DateTime<Utc>>,
) -> DateTime<Utc> {
let alignment_timestamp = alignment_timestamp.unwrap_or(DateTime::UNIX_EPOCH);
DateTime::from_timestamp_millis(
(timestamp.timestamp_millis() / interval.num_milliseconds()) * interval.num_milliseconds()
+ alignment_timestamp.timestamp_millis(),
)
.unwrap_or(timestamp)
}
fn is_in_interval(
timestamp: &DateTime<Utc>,
_start: &DateTime<Utc>,
end: &DateTime<Utc>,
closed: Closed,
) -> bool {
match closed {
Closed::Left => timestamp < end,
Closed::Right => timestamp <= end,
}
}
fn is_after_retention_edge(
timestamp: &DateTime<Utc>,
edge_timestamp: &DateTime<Utc>,
closed: Closed,
) -> bool {
match closed {
Closed::Left => timestamp >= edge_timestamp,
Closed::Right => timestamp > edge_timestamp,
}
}