use std::collections::VecDeque;
use super::window::{WindowInstanceTrait, WindowIv};
use super::Value;
use crate::storage::window::{PercentileWindow, WindowGeneric};
use crate::storage::window_aggregations::PercentileIv;
use crate::Time;
#[derive(Debug)]
pub(crate) struct DiscreteWindowInstance<IV: WindowIv> {
buckets: VecDeque<IV>,
next_bucket: usize,
wait: bool,
active: bool,
received_vals: usize,
}
impl<IV: WindowIv> DiscreteWindowInstance<IV> {
pub(crate) fn new(size: usize, wait: bool, ts: Time, active: bool) -> DiscreteWindowInstance<IV> {
let buckets = VecDeque::from(vec![IV::default(ts); size]);
DiscreteWindowInstance {
buckets,
next_bucket: 0,
wait,
active,
received_vals: 0,
}
}
}
impl<IV: WindowIv> WindowInstanceTrait for DiscreteWindowInstance<IV> {
fn get_value(&self, ts: Time) -> Value {
if !self.active {
return IV::default(ts).into();
}
let size = self.buckets.len();
if self.wait && self.received_vals < size {
return Value::None;
}
self.buckets
.iter()
.cycle()
.skip(self.next_bucket)
.take(size)
.fold(IV::default(ts), |acc, e| acc + e.clone())
.into()
}
fn accept_value(&mut self, v: Value, ts: Time) {
assert!(self.active);
let b = self.buckets.get_mut(self.next_bucket).expect("Bug!");
*b = (v, ts).into(); self.next_bucket = (self.next_bucket + 1) % self.buckets.len();
if self.received_vals < self.buckets.len() {
self.received_vals += 1;
}
}
fn update_buckets(&mut self, _ts: Time) {}
fn deactivate(&mut self) {
self.next_bucket = 0;
self.active = false;
}
fn is_active(&self) -> bool {
self.active
}
fn activate(&mut self, ts: Time) {
self.buckets = VecDeque::from(vec![IV::default(ts); self.buckets.len()]);
self.active = true;
}
}
impl<G: WindowGeneric> WindowInstanceTrait for PercentileWindow<DiscreteWindowInstance<PercentileIv<G>>> {
fn get_value(&self, ts: Time) -> Value {
let size = self.inner.buckets.len();
self.inner
.buckets
.iter()
.cycle()
.skip(self.inner.next_bucket)
.take(size)
.fold(PercentileIv::default(ts), |acc, e| acc + e.clone())
.percentile_get_value(self.percentile)
}
fn accept_value(&mut self, v: Value, ts: Time) {
self.inner.accept_value(v, ts)
}
fn update_buckets(&mut self, ts: Time) {
self.inner.update_buckets(ts)
}
fn deactivate(&mut self) {
self.inner.deactivate()
}
fn is_active(&self) -> bool {
self.inner.is_active()
}
fn activate(&mut self, ts: Time) {
self.inner.activate(ts)
}
}