use std::cmp::Ordering;
use std::collections::VecDeque;
use std::fmt::Debug;
use std::ops::Add;
use std::time::Duration;
use ordered_float::NotNan;
use rtlola_frontend::mir::{Type, WindowOperation as WinOp, WindowOperation};
use super::discrete_window::DiscreteWindowInstance;
use super::window_aggregations::*;
use super::Value;
use crate::Time;
const SIZE: usize = 64;
pub(crate) trait WindowInstanceTrait: Debug {
fn get_value(&self, ts: Time) -> Value;
fn accept_value(&mut self, v: Value, ts: Time);
fn update_buckets(&mut self, ts: Time);
fn deactivate(&mut self);
fn is_active(&self) -> bool;
fn activate(&mut self, ts: Time);
}
#[derive(Debug)]
pub(crate) struct SlidingWindow {
inner: Box<dyn WindowInstanceTrait>,
}
macro_rules! create_window_instance {
($type: ty, $dur: ident, $wait: ident, $ts: ident, $active: ident) => {
Self {
inner: Box::new(RealTimeWindowInstance::<$type>::new($dur, $wait, $ts, $active)),
}
};
}
macro_rules! create_percentile_instance {
($type: ty, $dur: ident, $wait: ident, $ts: ident, $active: ident, $percentile: ident) => {
Self {
inner: Box::new(PercentileWindow {
inner: RealTimeWindowInstance::<$type>::new($dur, $wait, $ts, $active),
percentile: $percentile,
}),
}
};
}
macro_rules! create_discrete_window_instance {
($type: ty, $dur: ident, $wait: ident, $ts: ident, $active: ident) => {
Self {
inner: Box::new(DiscreteWindowInstance::<$type>::new($dur, $wait, $ts, $active)),
}
};
}
macro_rules! create_discrete_percentile_instance {
($type: ty, $dur: ident, $wait: ident, $ts: ident, $active: ident, $percentile: ident) => {
Self {
inner: Box::new(PercentileWindow {
inner: DiscreteWindowInstance::<$type>::new($dur, $wait, $ts, $active),
percentile: $percentile,
}),
}
};
}
impl SlidingWindow {
pub(crate) fn from_sliding(
dur: Duration,
wait: bool,
op: WinOp,
ts: Time,
ty: &Type,
active: bool,
) -> SlidingWindow {
match (op, ty) {
(WinOp::Count, _) => create_window_instance!(CountIv, dur, wait, ts, active),
(WinOp::Min, Type::UInt(_)) => create_window_instance!(MinIv<WindowUnsigned>, dur, wait, ts, active),
(WinOp::Min, Type::Int(_)) => create_window_instance!(MinIv<WindowSigned>, dur, wait, ts, active),
(WinOp::Min, Type::Float(_)) => create_window_instance!(MinIv<WindowFloat>, dur, wait, ts, active),
(WinOp::Max, Type::UInt(_)) => create_window_instance!(MaxIv<WindowUnsigned>, dur, wait, ts, active),
(WinOp::Max, Type::Int(_)) => create_window_instance!(MaxIv<WindowSigned>, dur, wait, ts, active),
(WinOp::Max, Type::Float(_)) => create_window_instance!(MaxIv<WindowFloat>, dur, wait, ts, active),
(WinOp::Sum, Type::UInt(_)) => create_window_instance!(SumIv<WindowUnsigned>, dur, wait, ts, active),
(WinOp::Sum, Type::Int(_)) => create_window_instance!(SumIv<WindowSigned>, dur, wait, ts, active),
(WinOp::Sum, Type::Float(_)) => create_window_instance!(SumIv<WindowFloat>, dur, wait, ts, active),
(WinOp::Sum, Type::Bool) => create_window_instance!(SumIv<WindowBool>, dur, wait, ts, active),
(WinOp::Average, Type::UInt(_)) => create_window_instance!(AvgIv<WindowUnsigned>, dur, wait, ts, active),
(WinOp::Average, Type::Int(_)) => create_window_instance!(AvgIv<WindowSigned>, dur, wait, ts, active),
(WinOp::Average, Type::Float(_)) => create_window_instance!(AvgIv<WindowFloat>, dur, wait, ts, active),
(WinOp::Integral, Type::Float(_)) | (WinOp::Integral, Type::Int(_)) | (WinOp::Integral, Type::UInt(_)) => {
create_window_instance!(IntegralIv, dur, wait, ts, active)
},
(WinOp::Conjunction, Type::Bool) => create_window_instance!(ConjIv, dur, wait, ts, active),
(WinOp::Disjunction, Type::Bool) => create_window_instance!(DisjIv, dur, wait, ts, active),
(_, Type::Option(t)) => Self::from_sliding(dur, wait, op, ts, t, active),
(WinOp::Conjunction, _) | (WinOp::Disjunction, _) => {
panic!("conjunction and disjunction only defined on bool")
},
(WinOp::Min, _) | (WinOp::Max, _) | (WinOp::Sum, _) | (WinOp::Average, _) | (WinOp::Integral, _) => {
panic!("arithmetic operation only defined on atomic numerics")
},
(WinOp::Last, Type::Int(_)) => create_window_instance!(LastIv<WindowSigned>, dur, wait, ts, active),
(WinOp::Last, Type::UInt(_)) => create_window_instance!(LastIv<WindowUnsigned>, dur, wait, ts, active),
(WinOp::Last, Type::Float(_)) => create_window_instance!(LastIv<WindowFloat>, dur, wait, ts, active),
(WinOp::NthPercentile(x), Type::Int(_)) => {
create_percentile_instance!(PercentileIv<WindowSigned>, dur, wait, ts, active, x)
},
(WinOp::NthPercentile(x), Type::UInt(_)) => {
create_percentile_instance!(PercentileIv<WindowUnsigned>, dur, wait, ts, active, x)
},
(WinOp::NthPercentile(x), Type::Float(_)) => {
create_percentile_instance!(PercentileIv<WindowFloat>, dur, wait, ts, active, x)
},
(WinOp::Variance, Type::Float(_)) => create_window_instance!(VarianceIv, dur, wait, ts, active),
(WinOp::StandardDeviation, Type::Float(_)) => create_window_instance!(SdIv, dur, wait, ts, active),
(WinOp::Covariance, Type::Float(_)) => create_window_instance!(CovIv, dur, wait, ts, active),
(WinOp::Product, _) => unimplemented!("product not implemented"),
(WindowOperation::Last, _) => unimplemented!(),
(WindowOperation::Variance, _) => unimplemented!(),
(WindowOperation::Covariance, _) => unimplemented!(),
(WindowOperation::StandardDeviation, _) => unimplemented!(),
(WindowOperation::NthPercentile(_), _) => unimplemented!(),
}
}
pub(crate) fn from_discrete(
size: usize,
wait: bool,
op: WinOp,
ts: Time,
ty: &Type,
active: bool,
) -> SlidingWindow {
match (op, ty) {
(WinOp::Count, _) => create_discrete_window_instance!(CountIv, size, wait, ts, active),
(WinOp::Min, Type::UInt(_)) => {
create_discrete_window_instance!(MinIv<WindowUnsigned>, size, wait, ts, active)
},
(WinOp::Min, Type::Int(_)) => create_discrete_window_instance!(MinIv<WindowSigned>, size, wait, ts, active),
(WinOp::Min, Type::Float(_)) => {
create_discrete_window_instance!(MinIv<WindowFloat>, size, wait, ts, active)
},
(WinOp::Max, Type::UInt(_)) => {
create_discrete_window_instance!(MaxIv<WindowUnsigned>, size, wait, ts, active)
},
(WinOp::Max, Type::Int(_)) => create_discrete_window_instance!(MaxIv<WindowSigned>, size, wait, ts, active),
(WinOp::Max, Type::Float(_)) => {
create_discrete_window_instance!(MaxIv<WindowFloat>, size, wait, ts, active)
},
(WinOp::Sum, Type::UInt(_)) => {
create_discrete_window_instance!(SumIv<WindowUnsigned>, size, wait, ts, active)
},
(WinOp::Sum, Type::Int(_)) => create_discrete_window_instance!(SumIv<WindowSigned>, size, wait, ts, active),
(WinOp::Sum, Type::Float(_)) => {
create_discrete_window_instance!(SumIv<WindowFloat>, size, wait, ts, active)
},
(WinOp::Sum, Type::Bool) => create_discrete_window_instance!(SumIv<WindowBool>, size, wait, ts, active),
(WinOp::Average, Type::UInt(_)) => {
create_discrete_window_instance!(AvgIv<WindowUnsigned>, size, wait, ts, active)
},
(WinOp::Average, Type::Int(_)) => {
create_discrete_window_instance!(AvgIv<WindowSigned>, size, wait, ts, active)
},
(WinOp::Average, Type::Float(_)) => {
create_discrete_window_instance!(AvgIv<WindowFloat>, size, wait, ts, active)
},
(WinOp::Integral, Type::Float(_)) | (WinOp::Integral, Type::Int(_)) | (WinOp::Integral, Type::UInt(_)) => {
create_discrete_window_instance!(IntegralIv, size, wait, ts, active)
},
(WinOp::Conjunction, Type::Bool) => create_discrete_window_instance!(ConjIv, size, wait, ts, active),
(WinOp::Disjunction, Type::Bool) => create_discrete_window_instance!(DisjIv, size, wait, ts, active),
(_, Type::Option(t)) => Self::from_discrete(size, wait, op, ts, t, active),
(WinOp::Conjunction, _) | (WinOp::Disjunction, _) => {
panic!("conjunction and disjunction only defined on bool")
},
(WinOp::Min, _) | (WinOp::Max, _) | (WinOp::Sum, _) | (WinOp::Average, _) | (WinOp::Integral, _) => {
panic!("arithmetic operation only defined on atomic numerics")
},
(WinOp::Last, Type::Int(_)) => {
create_discrete_window_instance!(LastIv<WindowSigned>, size, wait, ts, active)
},
(WinOp::Last, Type::UInt(_)) => {
create_discrete_window_instance!(LastIv<WindowUnsigned>, size, wait, ts, active)
},
(WinOp::Last, Type::Float(_)) => {
create_discrete_window_instance!(LastIv<WindowFloat>, size, wait, ts, active)
},
(WinOp::NthPercentile(x), Type::Int(_)) => {
create_discrete_percentile_instance!(PercentileIv<WindowSigned>, size, wait, ts, active, x)
},
(WinOp::NthPercentile(x), Type::UInt(_)) => {
create_discrete_percentile_instance!(PercentileIv<WindowUnsigned>, size, wait, ts, active, x)
},
(WinOp::NthPercentile(x), Type::Float(_)) => {
create_discrete_percentile_instance!(PercentileIv<WindowFloat>, size, wait, ts, active, x)
},
(WinOp::Variance, Type::Float(_)) => create_discrete_window_instance!(VarianceIv, size, wait, ts, active),
(WinOp::StandardDeviation, Type::Float(_)) => {
create_discrete_window_instance!(SdIv, size, wait, ts, active)
},
(WinOp::Covariance, Type::Float(_)) => create_discrete_window_instance!(CovIv, size, wait, ts, active),
(WinOp::Product, _) => unimplemented!("product not implemented"),
(WindowOperation::Last, _) => unimplemented!(),
(WindowOperation::Variance, _) => unimplemented!(),
(WindowOperation::Covariance, _) => unimplemented!(),
(WindowOperation::StandardDeviation, _) => unimplemented!(),
(WindowOperation::NthPercentile(_), _) => unimplemented!(),
}
}
pub(crate) fn update(&mut self, ts: Time) {
self.inner.update_buckets(ts);
}
pub(crate) fn get_value(&self, ts: Time) -> Value {
self.inner.get_value(ts)
}
pub(crate) fn accept_value(&mut self, v: Value, ts: Time) {
self.inner.accept_value(v, ts);
}
pub(crate) fn deactivate(&mut self) {
self.inner.deactivate();
}
pub(crate) fn is_active(&self) -> bool {
self.inner.is_active()
}
pub(crate) fn activate(&mut self, ts: Time) {
self.inner.activate(ts);
}
}
pub(crate) trait WindowIv:
Clone + Add<Output = Self> + From<(Value, Time)> + Sized + Debug + Into<Value>
{
fn default(ts: Time) -> Self;
}
#[derive(Debug)]
pub(crate) struct RealTimeWindowInstance<IV: WindowIv> {
buckets: VecDeque<IV>,
time_per_bucket: Duration,
start_time: Time,
last_bucket_ix: BIx,
wait: bool,
wait_duration: Duration,
active: bool,
}
#[derive(Clone, Copy, Debug)]
struct BIx {
period: usize,
ix: usize,
}
impl BIx {
fn new(period: usize, ix: usize) -> BIx {
BIx { period, ix }
}
fn buckets_since(self, other: BIx, num_buckets: usize) -> usize {
match self.period.cmp(&other.period) {
Ordering::Less => panic!("`other` bucket is more recent than `self`."),
Ordering::Greater => {
let period_diff = self.period - other.period;
match self.ix.cmp(&other.ix) {
Ordering::Equal => period_diff * num_buckets,
Ordering::Less => {
let actual_p_diff = period_diff - 1;
let ix_diff = num_buckets - other.ix + self.ix;
actual_p_diff * num_buckets + ix_diff
},
Ordering::Greater => period_diff * num_buckets + (self.ix - other.ix),
}
},
Ordering::Equal => {
match self.ix.cmp(&other.ix) {
Ordering::Equal => 0,
Ordering::Less => panic!("`other` bucket is more recent than `self`."),
Ordering::Greater => self.ix - other.ix,
}
},
}
}
}
impl<IV: WindowIv> WindowInstanceTrait for RealTimeWindowInstance<IV> {
fn deactivate(&mut self) {
self.last_bucket_ix = BIx::new(0, 0);
self.active = false;
}
fn is_active(&self) -> bool {
self.active
}
fn activate(&mut self, ts: Time) {
self.buckets = VecDeque::from(vec![IV::default(ts); SIZE]);
self.start_time = ts;
self.active = true;
}
fn get_value(&self, ts: Time) -> Value {
if !self.active {
return IV::default(ts).into();
}
if self.wait && ts < self.wait_duration {
return Value::None;
}
self.buckets
.iter()
.rev()
.fold(IV::default(ts), |acc, e| acc + e.clone())
.into()
}
fn accept_value(&mut self, v: Value, ts: Time) {
assert!(self.active);
self.update_buckets(ts);
let b = self.buckets.get_mut(0).expect("Bug!");
*b = b.clone() + (v, ts).into(); }
fn update_buckets(&mut self, ts: Time) {
assert!(self.active);
let curr = self.get_current_bucket(ts);
let last = self.last_bucket_ix;
let diff = curr.buckets_since(last, self.buckets.len());
self.invalidate_n(diff, ts);
self.last_bucket_ix = curr;
}
}
impl<IV: WindowIv> RealTimeWindowInstance<IV> {
fn new(dur: Duration, wait: bool, ts: Time, active: bool) -> Self {
let time_per_bucket = dur / (SIZE as u32);
let buckets = VecDeque::from(vec![IV::default(ts); SIZE]);
Self {
buckets,
time_per_bucket,
start_time: ts,
last_bucket_ix: BIx::new(0, 0),
wait,
wait_duration: dur,
active,
}
}
fn invalidate_n(&mut self, n: usize, ts: Time) {
assert!(self.active);
for _ in 0..n {
self.buckets.pop_back();
self.buckets.push_front(IV::default(ts));
}
}
fn get_current_bucket(&self, ts: Time) -> BIx {
assert!(self.active);
assert!(ts >= self.start_time, "Time does not behave monotonically!");
let overall_ix = Self::quickfix_duration_div(ts - self.start_time, self.time_per_bucket);
let overall_ix = overall_ix.floor() as usize;
let period = overall_ix / self.buckets.len();
let ix = overall_ix % self.buckets.len();
BIx { period, ix }
}
fn quickfix_duration_div(a: Duration, b: Duration) -> f64 {
let a_secs = a.as_secs();
let a_nanos = a.subsec_nanos();
let b_secs = b.as_secs();
let b_nanos = b.subsec_nanos();
let a = (a_secs as f64) + f64::from(a_nanos) / f64::from(1_000_000_000);
let b = (b_secs as f64) + f64::from(b_nanos) / f64::from(1_000_000_000);
a / b
}
}
#[derive(Debug)]
pub(crate) struct PercentileWindow<IC: WindowInstanceTrait> {
pub(crate) inner: IC,
pub(crate) percentile: u8,
}
impl<G: WindowGeneric> WindowInstanceTrait for PercentileWindow<RealTimeWindowInstance<PercentileIv<G>>> {
fn get_value(&self, ts: Time) -> Value {
if self.inner.wait && ts < self.inner.wait_duration {
return Value::None;
}
self.inner
.buckets
.iter()
.rev()
.fold(PercentileIv::default(ts), |acc, e| acc + e.clone())
.percentile_get_value(self.percentile)
}
fn accept_value(&mut self, v: Value, ts: Time) {
self.inner.accept_value(v, ts)
}
fn update_buckets(&mut self, ts: Time) {
self.inner.update_buckets(ts)
}
fn deactivate(&mut self) {
self.inner.deactivate()
}
fn is_active(&self) -> bool {
self.inner.is_active()
}
fn activate(&mut self, ts: Time) {
self.inner.activate(ts)
}
}
pub(crate) trait WindowGeneric: Debug + Clone {
fn from_value(v: Value) -> Value;
}
#[derive(Debug, Clone)]
pub(crate) struct WindowSigned {}
impl WindowGeneric for WindowSigned {
fn from_value(v: Value) -> Value {
match v {
Value::Signed(_) => v,
Value::Unsigned(u) => Value::Signed(u as i64),
_ => unreachable!("Type error."),
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct WindowBool {}
impl WindowGeneric for WindowBool {
fn from_value(v: Value) -> Value {
match v {
Value::Bool(b) if b => Value::Unsigned(1),
Value::Bool(_) => Value::Unsigned(0),
_ => unreachable!("Type error."),
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct WindowUnsigned {}
impl WindowGeneric for WindowUnsigned {
fn from_value(v: Value) -> Value {
match v {
Value::Unsigned(_) => v,
_ => unreachable!("Type error."),
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct WindowFloat {}
impl WindowGeneric for WindowFloat {
fn from_value(v: Value) -> Value {
let f = match v {
Value::Signed(i) => i as f64,
Value::Unsigned(u) => u as f64,
Value::Float(f) => f.into(),
_ => unreachable!("Type error."),
};
Value::Float(NotNan::new(f).unwrap())
}
}