mod circular_queue;
mod hammer_slide;
pub mod state;
mod util;
use crate::{aggregator::Aggregator, duration::Duration};
use hammer_slide::HammerSlide;
use state::{SessionState, SlicingState};
#[cfg(not(feature = "std"))]
use alloc::collections::VecDeque;
#[cfg(not(feature = "std"))]
use alloc::vec::Vec;
#[cfg(feature = "std")]
use std::collections::VecDeque;
use self::util::pairs_space;
#[derive(PartialEq, Debug, Clone)]
pub struct WindowAggregate<T> {
pub window_start_ms: u64,
pub window_end_ms: u64,
pub aggregate: T,
}
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
#[derive(Copy, Clone, Debug)]
pub enum Window {
Tumbling {
range: Duration,
},
Sliding {
range: Duration,
slide: Duration,
},
Session {
timeout: Duration,
},
}
impl Window {
pub fn tumbling(range: Duration) -> Self {
Self::Tumbling { range }
}
pub fn sliding(range: Duration, slide: Duration) -> Self {
assert!(
range >= slide,
"Window range must be larger or equal to slide"
);
Self::Sliding { range, slide }
}
pub fn session(timeout: Duration) -> Self {
Self::Session { timeout }
}
}
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
#[cfg_attr(feature = "serde", serde(bound = "A: Default"))]
pub struct WindowManager<A: Aggregator> {
pub(crate) aggregator: WindowAggregator<A>,
pub(crate) window: Window,
}
impl<A: Aggregator> WindowManager<A> {
pub fn new(watermark: u64, window: Window) -> Self {
let to_ms = |d: Duration| d.whole_milliseconds() as usize;
let aggregator = match window {
Window::Tumbling { range } => {
let pairs = pairs_space(to_ms(range), to_ms(range));
WindowAggregator::Slicing {
state: SlicingState::new(watermark, to_ms(range), to_ms(range)),
aggregator: SlicingAggregator::with_capacity(pairs),
}
}
Window::Sliding { range, slide } => {
let pairs = pairs_space(to_ms(range), to_ms(slide));
WindowAggregator::Slicing {
state: SlicingState::new(watermark, to_ms(range), to_ms(slide)),
aggregator: SlicingAggregator::with_capacity(pairs),
}
}
Window::Session { timeout } => WindowAggregator::Session {
state: SessionState::new(timeout),
aggregator: SessionAggregator::default(),
},
};
Self { aggregator, window }
}
}
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
#[cfg_attr(feature = "serde", serde(bound = "A: Default"))]
pub enum WindowAggregator<A: Aggregator> {
Slicing {
state: SlicingState,
aggregator: SlicingAggregator<A>,
},
Session {
state: SessionState,
aggregator: SessionAggregator<A>,
},
}
impl<A: Aggregator> WindowAggregator<A> {
pub fn session_as_mut(&mut self) -> (&mut SessionState, &mut SessionAggregator<A>) {
match self {
WindowAggregator::Session { state, aggregator } => (state, aggregator),
_ => panic!("Not a session window"),
}
}
pub fn slicing_as_mut(&mut self) -> (&mut SlicingState, &mut SlicingAggregator<A>) {
match self {
WindowAggregator::Slicing { state, aggregator } => (state, aggregator),
_ => panic!("Not a slicing window"),
}
}
}
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
#[cfg_attr(feature = "serde", serde(bound = "A: Default"))]
pub struct SessionAggregator<A: Aggregator> {
current: A::PartialAggregate,
}
impl<A: Aggregator> Default for SessionAggregator<A> {
fn default() -> Self {
Self {
current: A::PartialAggregate::default(),
}
}
}
impl<A: Aggregator> SessionAggregator<A> {
pub fn aggregate_session(&mut self, partial: A::PartialAggregate) {
let current = core::mem::take(&mut self.current);
self.current = A::combine(current, partial);
}
pub fn get_and_reset(&mut self) -> A::PartialAggregate {
core::mem::take(&mut self.current)
}
}
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
#[cfg_attr(feature = "serde", serde(bound = "A: Default"))]
pub enum SlicingAggregator<A: Aggregator> {
Soe(SubtractOnEvict<A>),
TwoStacks(TwoStacks<A>),
HammerSlide(HammerSlide<A>),
}
impl<A: Aggregator> Default for SlicingAggregator<A> {
fn default() -> Self {
if A::invertible() {
Self::Soe(SubtractOnEvict::new())
} else {
Self::TwoStacks(TwoStacks::new())
}
}
}
impl<A: Aggregator> SlicingAggregator<A> {
pub fn with_capacity(capacity: usize) -> Self {
if A::invertible() {
Self::Soe(SubtractOnEvict::with_capacity(capacity))
} else {
Self::HammerSlide(HammerSlide::with_capacity(capacity))
}
}
pub fn push(&mut self, agg: A::PartialAggregate) {
match self {
Self::Soe(soe) => soe.push(agg),
Self::TwoStacks(stacks) => stacks.push(agg),
Self::HammerSlide(hammer_slide) => hammer_slide.push(agg),
}
}
pub fn query(&mut self) -> A::PartialAggregate {
match self {
Self::Soe(soe) => soe.query(),
Self::TwoStacks(stacks) => stacks.query(),
Self::HammerSlide(hammer_slide) => hammer_slide.query(),
}
}
pub fn pop(&mut self) {
match self {
Self::Soe(soe) => soe.pop(),
Self::TwoStacks(stacks) => stacks.pop(),
Self::HammerSlide(hammer_slide) => hammer_slide.pop(),
}
}
}
impl<A: Aggregator> Default for SubtractOnEvict<A> {
fn default() -> Self {
assert!(
A::combine_inverse().is_some(),
"SubtractOnEvict requires inverse_combine"
);
Self {
stack: Default::default(),
agg: A::IDENTITY.clone(),
}
}
}
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
#[cfg_attr(feature = "serde", serde(bound = "A: Default"))]
pub struct SubtractOnEvict<A: Aggregator> {
stack: VecDeque<A::PartialAggregate>,
agg: A::PartialAggregate,
}
impl<A: Aggregator> SubtractOnEvict<A> {
pub fn new() -> Self {
Self::default()
}
pub fn with_capacity(capacity: usize) -> Self {
assert!(
A::combine_inverse().is_some(),
"SubtractOnEvict requires inverse_combine"
);
Self {
stack: VecDeque::with_capacity(capacity),
agg: A::IDENTITY.clone(),
}
}
fn pop(&mut self) {
if let Some(top) = self.stack.pop_front() {
let inverse_combine = A::combine_inverse().unwrap();
let current = core::mem::take(&mut self.agg);
self.agg = inverse_combine(current, top);
}
}
fn query(&self) -> A::PartialAggregate {
self.agg.clone()
}
fn push(&mut self, agg: A::PartialAggregate) {
let current = core::mem::take(&mut self.agg);
let new_agg = A::combine(current, agg.clone());
self.agg = new_agg;
self.stack.push_back(agg);
}
}
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
#[cfg_attr(feature = "serde", serde(bound = "A: Default"))]
#[derive(Clone)]
pub struct Value<A: Aggregator> {
agg: A::PartialAggregate,
val: A::PartialAggregate,
}
impl<A: Aggregator> Value<A> {
#[inline]
pub fn new(agg: A::PartialAggregate, val: A::PartialAggregate) -> Self {
Self { agg, val }
}
}
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
#[cfg_attr(feature = "serde", serde(bound = "A: Default"))]
#[derive(Default)]
pub struct TwoStacks<A: Aggregator> {
front: Vec<Value<A>>,
back: Vec<Value<A>>,
}
impl<A: Aggregator> TwoStacks<A> {
pub fn new() -> Self {
Self::default()
}
#[inline(always)]
fn agg(stack: &[Value<A>]) -> A::PartialAggregate {
if let Some(top) = stack.last() {
top.agg.clone()
} else {
A::IDENTITY.clone()
}
}
fn pop(&mut self) {
if self.front.is_empty() {
while let Some(top) = self.back.pop() {
let Value { agg: _, val } = top;
let combined = A::combine(val.clone(), Self::agg(&self.front));
self.front.push(Value::new(combined, val));
}
}
self.front.pop();
}
#[inline]
fn query(&self) -> A::PartialAggregate {
A::combine(Self::agg(&self.front), Self::agg(&self.back))
}
#[inline]
fn push(&mut self, agg: A::PartialAggregate) {
let combined = A::combine(Self::agg(&self.back), agg.clone());
self.back.push(Value::new(combined, agg));
}
}
#[cfg(test)]
mod tests {
use super::Window;
use crate::{
Duration,
Entry,
NumericalDuration,
RwWheel,
aggregator::{min::U64MinAggregator, sum::U64SumAggregator},
window::WindowAggregate,
};
#[test]
fn window_30_sec_tumbling_test() {
let mut wheel: RwWheel<U64SumAggregator> = RwWheel::new(1533081600000);
wheel.window(Window::tumbling(Duration::seconds(30)));
window_30_sec_tumbling(wheel);
}
#[test]
fn window_30_sec_tumbling_min_agg_test() {
let mut wheel: RwWheel<U64MinAggregator> = RwWheel::new(1533081600000);
wheel.window(Window::tumbling(Duration::seconds(30)));
window_30_sec_tumbling_min_agg(wheel);
}
fn window_30_sec_tumbling(mut wheel: RwWheel<U64SumAggregator>) {
wheel.insert(Entry::new(100, 1533081605000));
wheel.insert(Entry::new(200, 1533081615000));
wheel.insert(Entry::new(300, 1533081625000));
wheel.insert(Entry::new(400, 1533081635000));
wheel.insert(Entry::new(500, 1533081645000));
let results = wheel.advance_to(1533081660000);
assert_eq!(
results,
[
WindowAggregate {
window_start_ms: 1533081600000,
window_end_ms: 1533081630000,
aggregate: 600
}, WindowAggregate {
window_start_ms: 1533081630000,
window_end_ms: 1533081660000,
aggregate: 900
}, ]
);
}
fn window_30_sec_tumbling_min_agg(mut wheel: RwWheel<U64MinAggregator>) {
wheel.insert(Entry::new(100, 1533081605000));
wheel.insert(Entry::new(200, 1533081615000));
wheel.insert(Entry::new(300, 1533081625000));
wheel.insert(Entry::new(400, 1533081635000));
wheel.insert(Entry::new(500, 1533081645000));
let results = wheel.advance_to(1533081660000);
assert_eq!(
results,
[
WindowAggregate {
window_start_ms: 1533081600000,
window_end_ms: 1533081630000,
aggregate: 100
}, WindowAggregate {
window_start_ms: 1533081630000,
window_end_ms: 1533081660000,
aggregate: 400
}, ]
);
}
#[test]
fn window_1_min_tumbling_test() {
let mut wheel: RwWheel<U64SumAggregator> = RwWheel::new(0);
wheel.window(Window::tumbling(Duration::minutes(1)));
window_1_min_tumbling(wheel);
}
#[test]
fn window_1_min_tumbling_min_agg_test() {
let mut wheel: RwWheel<U64MinAggregator> = RwWheel::new(0);
wheel.window(Window::tumbling(Duration::minutes(1)));
window_1_min_tumbling_min_agg(wheel);
}
fn window_1_min_tumbling(mut wheel: RwWheel<U64SumAggregator>) {
wheel.insert(Entry::new(10, 15000));
wheel.insert(Entry::new(20, 45000));
wheel.insert(Entry::new(30, 75000));
wheel.insert(Entry::new(40, 105000));
let results = wheel.advance_to(120000);
assert_eq!(
results,
[
WindowAggregate {
window_start_ms: 0,
window_end_ms: 60000,
aggregate: 30,
},
WindowAggregate {
window_start_ms: 60000,
window_end_ms: 120000,
aggregate: 70,
},
]
);
}
fn window_1_min_tumbling_min_agg(mut wheel: RwWheel<U64MinAggregator>) {
wheel.insert(Entry::new(10, 15000));
wheel.insert(Entry::new(20, 45000));
wheel.insert(Entry::new(30, 75000));
wheel.insert(Entry::new(40, 105000));
let results = wheel.advance_to(120000);
assert_eq!(
results,
[
WindowAggregate {
window_start_ms: 0,
window_end_ms: 60000,
aggregate: 10,
},
WindowAggregate {
window_start_ms: 60000,
window_end_ms: 120000,
aggregate: 30,
},
]
);
}
#[test]
fn window_2_min_tumbling_with_gap_test() {
let mut wheel: RwWheel<U64SumAggregator> = RwWheel::new(0);
wheel.window(Window::tumbling(Duration::minutes(2)));
window_2_min_tumbling_with_gap(wheel);
}
#[test]
fn window_2_min_tumbling_with_gap_min_agg_test() {
let mut wheel: RwWheel<U64MinAggregator> = RwWheel::new(0);
wheel.window(Window::tumbling(Duration::minutes(2)));
window_2_min_tumbling_with_gap_min_agg(wheel);
}
fn window_2_min_tumbling_with_gap(mut wheel: RwWheel<U64SumAggregator>) {
wheel.insert(Entry::new(100, 30000));
wheel.insert(Entry::new(200, 90000));
wheel.insert(Entry::new(300, 150000));
wheel.insert(Entry::new(400, 270000));
wheel.insert(Entry::new(500, 330000));
let results = wheel.advance_to(360000);
assert_eq!(
results,
[
WindowAggregate {
window_start_ms: 0,
window_end_ms: 120000,
aggregate: 300
}, WindowAggregate {
window_start_ms: 120000,
window_end_ms: 240000,
aggregate: 300
}, WindowAggregate {
window_start_ms: 240000,
window_end_ms: 360000,
aggregate: 900
}, ]
);
}
fn window_2_min_tumbling_with_gap_min_agg(mut wheel: RwWheel<U64MinAggregator>) {
wheel.insert(Entry::new(100, 30000));
wheel.insert(Entry::new(200, 90000));
wheel.insert(Entry::new(300, 150000));
wheel.insert(Entry::new(400, 270000));
wheel.insert(Entry::new(500, 330000));
let results = wheel.advance_to(360000);
assert_eq!(
results,
[
WindowAggregate {
window_start_ms: 0,
window_end_ms: 120000,
aggregate: 100
}, WindowAggregate {
window_start_ms: 120000,
window_end_ms: 240000,
aggregate: 300
}, WindowAggregate {
window_start_ms: 240000,
window_end_ms: 360000,
aggregate: 400
}, ]
);
}
#[test]
fn window_30_sec_range_10_sec_slide_test() {
let mut wheel: RwWheel<U64SumAggregator> = RwWheel::new(1533081600000);
wheel.window(Window::sliding(
Duration::seconds(30),
Duration::seconds(10),
));
window_30_sec_range_10_sec_slide(wheel);
}
#[test]
fn window_30_sec_range_10_sec_slide_min_agg_test() {
let mut wheel: RwWheel<U64MinAggregator> = RwWheel::new(1533081600000);
wheel.window(Window::sliding(
Duration::seconds(30),
Duration::seconds(10),
));
window_30_sec_range_10_sec_slide_min_agg(wheel);
}
fn window_30_sec_range_10_sec_slide(mut wheel: RwWheel<U64SumAggregator>) {
wheel.insert(Entry::new(681, 1533081607321));
wheel.insert(Entry::new(625, 1533081619748));
wheel.insert(Entry::new(1319, 1533081621175));
wheel.insert(Entry::new(220, 1533081626470));
wheel.insert(Entry::new(398, 1533081630291));
wheel.insert(Entry::new(2839, 1533081662717));
wheel.insert(Entry::new(172, 1533081663534));
wheel.insert(Entry::new(1133, 1533081664024));
wheel.insert(Entry::new(1417, 1533081678095));
wheel.insert(Entry::new(195, 1533081679609));
let results = wheel.advance_to(1533081630000);
assert_eq!(
results,
[WindowAggregate {
window_start_ms: 1533081600000,
window_end_ms: 1533081630000,
aggregate: 2845
}]
)
}
fn window_30_sec_range_10_sec_slide_min_agg(mut wheel: RwWheel<U64MinAggregator>) {
wheel.insert(Entry::new(681, 1533081607321));
wheel.insert(Entry::new(625, 1533081619748));
wheel.insert(Entry::new(1319, 1533081621175));
wheel.insert(Entry::new(220, 1533081626470));
wheel.insert(Entry::new(398, 1533081630291));
wheel.insert(Entry::new(2839, 1533081662717));
wheel.insert(Entry::new(172, 1533081663534));
wheel.insert(Entry::new(1133, 1533081664024));
wheel.insert(Entry::new(1417, 1533081678095));
wheel.insert(Entry::new(195, 1533081679609));
let results = wheel.advance_to(1533081630000);
assert_eq!(
results,
[WindowAggregate {
window_start_ms: 1533081600000,
window_end_ms: 1533081630000,
aggregate: 220
}]
)
}
fn window_60_sec_range_10_sec_slide(mut wheel: RwWheel<U64SumAggregator>) {
wheel.insert(Entry::new(1, 9000));
wheel.insert(Entry::new(1, 15000));
wheel.insert(Entry::new(1, 25000));
wheel.insert(Entry::new(1, 35000));
wheel.insert(Entry::new(1, 59000));
assert!(wheel.advance_to(59000).is_empty());
wheel.insert(Entry::new(3, 69000));
wheel.insert(Entry::new(5, 75000));
wheel.insert(Entry::new(10, 110000));
let results = wheel.advance_to(130000);
assert_eq!(
results,
[
WindowAggregate {
window_start_ms: 0,
window_end_ms: 60000,
aggregate: 5
},
WindowAggregate {
window_start_ms: 10000,
window_end_ms: 70000,
aggregate: 7
},
WindowAggregate {
window_start_ms: 20000,
window_end_ms: 80000,
aggregate: 11
},
WindowAggregate {
window_start_ms: 30000,
window_end_ms: 90000,
aggregate: 10
},
WindowAggregate {
window_start_ms: 40000,
window_end_ms: 100000,
aggregate: 9
},
WindowAggregate {
window_start_ms: 50000,
window_end_ms: 110000,
aggregate: 9
},
WindowAggregate {
window_start_ms: 60000,
window_end_ms: 120000,
aggregate: 18
},
WindowAggregate {
window_start_ms: 70000,
window_end_ms: 130000,
aggregate: 15
}
]
);
}
fn window_60_sec_range_10_sec_slide_min_agg(mut wheel: RwWheel<U64MinAggregator>) {
wheel.insert(Entry::new(1, 9000));
wheel.insert(Entry::new(1, 15000));
wheel.insert(Entry::new(1, 25000));
wheel.insert(Entry::new(1, 35000));
wheel.insert(Entry::new(1, 59000));
assert!(wheel.advance_to(59000).is_empty());
wheel.insert(Entry::new(3, 69000));
wheel.insert(Entry::new(5, 75000));
wheel.insert(Entry::new(10, 110000));
let results = wheel.advance_to(130000);
assert_eq!(
results,
[
WindowAggregate {
window_start_ms: 0,
window_end_ms: 60000,
aggregate: 1
},
WindowAggregate {
window_start_ms: 10000,
window_end_ms: 70000,
aggregate: 1
},
WindowAggregate {
window_start_ms: 20000,
window_end_ms: 80000,
aggregate: 1
},
WindowAggregate {
window_start_ms: 30000,
window_end_ms: 90000,
aggregate: 1
},
WindowAggregate {
window_start_ms: 40000,
window_end_ms: 100000,
aggregate: 1
},
WindowAggregate {
window_start_ms: 50000,
window_end_ms: 110000,
aggregate: 1
},
WindowAggregate {
window_start_ms: 60000,
window_end_ms: 120000,
aggregate: 3
},
WindowAggregate {
window_start_ms: 70000,
window_end_ms: 130000,
aggregate: 5
}
]
);
}
#[test]
fn window_60_sec_range_10_sec_slide_test() {
let mut wheel: RwWheel<U64SumAggregator> = RwWheel::new(0);
wheel.window(Window::sliding(
Duration::seconds(60),
Duration::seconds(10),
));
window_60_sec_range_10_sec_slide(wheel);
}
#[test]
fn window_60_sec_range_10_sec_slide_min_agg_test() {
let mut wheel: RwWheel<U64MinAggregator> = RwWheel::new(0);
wheel.window(Window::sliding(
Duration::seconds(60),
Duration::seconds(10),
));
window_60_sec_range_10_sec_slide_min_agg(wheel);
}
fn window_120_sec_range_10_sec_slide(mut wheel: RwWheel<U64SumAggregator>) {
wheel.insert(Entry::new(1, 9000));
wheel.insert(Entry::new(1, 15000));
wheel.insert(Entry::new(1, 25000));
wheel.insert(Entry::new(1, 35000));
wheel.insert(Entry::new(1, 59000));
assert!(wheel.advance_to(60000).is_empty());
wheel.insert(Entry::new(3, 69000));
wheel.insert(Entry::new(5, 75000));
wheel.insert(Entry::new(10, 110000));
assert!(wheel.advance_to(100000).is_empty());
wheel.insert(Entry::new(3, 125000));
let results = wheel.advance_to(160000);
assert_eq!(
results,
[
WindowAggregate {
window_start_ms: 0,
window_end_ms: 120000,
aggregate: 23,
},
WindowAggregate {
window_start_ms: 10000,
window_end_ms: 130000,
aggregate: 25,
},
WindowAggregate {
window_start_ms: 20000,
window_end_ms: 140000,
aggregate: 24,
},
WindowAggregate {
window_start_ms: 30000,
window_end_ms: 150000,
aggregate: 23,
},
WindowAggregate {
window_start_ms: 40000,
window_end_ms: 160000,
aggregate: 22,
},
]
);
}
fn window_120_sec_range_10_sec_slide_min_agg(mut wheel: RwWheel<U64MinAggregator>) {
wheel.insert(Entry::new(1, 9000));
wheel.insert(Entry::new(1, 15000));
wheel.insert(Entry::new(1, 25000));
wheel.insert(Entry::new(1, 35000));
wheel.insert(Entry::new(1, 59000));
assert!(wheel.advance_to(60000).is_empty());
wheel.insert(Entry::new(3, 69000));
wheel.insert(Entry::new(5, 75000));
wheel.insert(Entry::new(10, 110000));
assert!(wheel.advance_to(100000).is_empty());
wheel.insert(Entry::new(3, 125000));
let results = wheel.advance_to(160000);
assert_eq!(
results,
[
WindowAggregate {
window_start_ms: 0,
window_end_ms: 120000,
aggregate: 1,
},
WindowAggregate {
window_start_ms: 10000,
window_end_ms: 130000,
aggregate: 1,
},
WindowAggregate {
window_start_ms: 20000,
window_end_ms: 140000,
aggregate: 1,
},
WindowAggregate {
window_start_ms: 30000,
window_end_ms: 150000,
aggregate: 1,
},
WindowAggregate {
window_start_ms: 40000,
window_end_ms: 160000,
aggregate: 1,
},
]
);
}
#[test]
fn window_2_min_range_10_sec_slide_test() {
let mut wheel: RwWheel<U64SumAggregator> = RwWheel::new(0);
wheel.window(Window::sliding(Duration::minutes(2), Duration::seconds(10)));
window_120_sec_range_10_sec_slide(wheel);
}
#[test]
fn window_2_min_range_10_sec_slide_min_agg_test() {
let mut wheel: RwWheel<U64MinAggregator> = RwWheel::new(0);
wheel.window(Window::sliding(Duration::minutes(2), Duration::seconds(10)));
window_120_sec_range_10_sec_slide_min_agg(wheel);
}
#[test]
fn window_10_sec_range_3_sec_slide_test() {
let mut wheel: RwWheel<U64SumAggregator> = RwWheel::new(0);
wheel.window(Window::sliding(Duration::seconds(10), Duration::seconds(3)));
window_10_sec_range_3_sec_slide(wheel);
}
#[test]
fn window_10_sec_range_3_sec_slide_min_agg_test() {
let mut wheel: RwWheel<U64MinAggregator> = RwWheel::new(0);
wheel.window(Window::sliding(Duration::seconds(10), Duration::seconds(3)));
window_10_sec_range_3_sec_slide_min_agg(wheel);
}
fn window_10_sec_range_3_sec_slide(mut wheel: RwWheel<U64SumAggregator>) {
for i in 1..=22 {
wheel.insert(Entry::new(i, i * 1000 - 1));
}
let results = wheel.advance(22.seconds());
assert_eq!(
results,
[
WindowAggregate {
window_start_ms: 0,
window_end_ms: 10000,
aggregate: 55
},
WindowAggregate {
window_start_ms: 3000,
window_end_ms: 13000,
aggregate: 85
},
WindowAggregate {
window_start_ms: 6000,
window_end_ms: 16000,
aggregate: 115
},
WindowAggregate {
window_start_ms: 9000,
window_end_ms: 19000,
aggregate: 145
},
WindowAggregate {
window_start_ms: 12000,
window_end_ms: 22000,
aggregate: 175
},
]
);
}
fn window_10_sec_range_3_sec_slide_min_agg(mut wheel: RwWheel<U64MinAggregator>) {
for i in 1..=22 {
wheel.insert(Entry::new(i, i * 1000 - 1));
}
let results = wheel.advance(22.seconds());
assert_eq!(
results,
[
WindowAggregate {
window_start_ms: 0,
window_end_ms: 10000,
aggregate: 1
},
WindowAggregate {
window_start_ms: 3000,
window_end_ms: 13000,
aggregate: 4
},
WindowAggregate {
window_start_ms: 6000,
window_end_ms: 16000,
aggregate: 7
},
WindowAggregate {
window_start_ms: 9000,
window_end_ms: 19000,
aggregate: 10
},
WindowAggregate {
window_start_ms: 12000,
window_end_ms: 22000,
aggregate: 13
},
]
);
}
#[test]
fn out_of_order_inserts_test() {
let mut wheel: RwWheel<U64SumAggregator> = RwWheel::new(1533081600000);
wheel.window(Window::sliding(
Duration::seconds(30),
Duration::seconds(10),
));
wheel.insert(Entry::new(300, 1533081625000));
wheel.insert(Entry::new(100, 1533081605000));
wheel.insert(Entry::new(200, 1533081615000));
let results = wheel.advance_to(1533081630000);
assert_eq!(
results,
[WindowAggregate {
window_start_ms: 1533081600000,
window_end_ms: 1533081630000,
aggregate: 600
}]
);
}
#[test]
fn out_of_order_inserts_min_agg_test() {
let mut wheel: RwWheel<U64MinAggregator> = RwWheel::new(1533081600000);
wheel.window(Window::sliding(
Duration::seconds(30),
Duration::seconds(10),
));
wheel.insert(Entry::new(300, 1533081625000));
wheel.insert(Entry::new(100, 1533081605000));
wheel.insert(Entry::new(200, 1533081615000));
let results = wheel.advance_to(1533081630000);
assert_eq!(
results,
[WindowAggregate {
window_start_ms: 1533081600000,
window_end_ms: 1533081630000,
aggregate: 100
}]
);
}
#[test]
fn edge_case_window_boundaries_test() {
let mut wheel: RwWheel<U64SumAggregator> = RwWheel::new(1533081600000);
wheel.window(Window::sliding(
Duration::seconds(10),
Duration::seconds(10),
));
wheel.insert(Entry::new(100, 1533081609999)); wheel.insert(Entry::new(200, 1533081610000)); wheel.insert(Entry::new(300, 1533081610001)); let results = wheel.advance_to(1533081610000);
assert_eq!(
results,
[WindowAggregate {
window_start_ms: 1533081600000,
window_end_ms: 1533081610000,
aggregate: 100
}]
);
}
#[test]
fn edge_case_window_boundaries_min_agg_test() {
let mut wheel: RwWheel<U64MinAggregator> = RwWheel::new(1533081600000);
wheel.window(Window::sliding(
Duration::seconds(10),
Duration::seconds(10),
));
wheel.insert(Entry::new(100, 1533081609999)); wheel.insert(Entry::new(200, 1533081610000)); wheel.insert(Entry::new(300, 1533081610001)); let results = wheel.advance_to(1533081610000);
assert_eq!(
results,
[WindowAggregate {
window_start_ms: 1533081600000,
window_end_ms: 1533081610000,
aggregate: 100
}]
);
}
#[test]
fn empty_window_test() {
let mut wheel: RwWheel<U64SumAggregator> = RwWheel::new(1533081600000);
wheel.window(Window::sliding(
Duration::seconds(30),
Duration::seconds(10),
));
let results = wheel.advance_to(1533081630000);
assert_eq!(
results,
[WindowAggregate {
window_start_ms: 1533081600000,
window_end_ms: 1533081630000,
aggregate: 0
}]
);
}
#[test]
fn empty_window_min_agg_test() {
let mut wheel: RwWheel<U64SumAggregator> = RwWheel::new(1533081600000);
wheel.window(Window::sliding(
Duration::seconds(30),
Duration::seconds(10),
));
let results = wheel.advance_to(1533081630000);
assert_eq!(
results,
[WindowAggregate {
window_start_ms: 1533081600000,
window_end_ms: 1533081630000,
aggregate: 0
}]
);
}
#[test]
#[should_panic]
fn invalid_window_spec() {
let mut wheel: RwWheel<U64SumAggregator> = RwWheel::new(1533081600000);
wheel.window(Window::sliding(
Duration::seconds(10),
Duration::seconds(30),
));
}
#[test]
#[should_panic]
fn invalid_window_spec_min_agg() {
let mut wheel: RwWheel<U64MinAggregator> = RwWheel::new(1533081600000);
wheel.window(Window::sliding(
Duration::seconds(10),
Duration::seconds(30),
));
}
#[test]
fn window_session_10_sec_timeout_test() {
let mut wheel: RwWheel<U64SumAggregator> = RwWheel::new(1000);
wheel.window(Window::session(Duration::seconds(10)));
window_session_10_sec_timeout(wheel);
}
fn window_session_10_sec_timeout(mut wheel: RwWheel<U64SumAggregator>) {
wheel.insert(Entry::new(100, 1000));
wheel.insert(Entry::new(200, 5000));
wheel.insert(Entry::new(300, 8000));
wheel.insert(Entry::new(150, 20000));
wheel.insert(Entry::new(250, 25000));
wheel.insert(Entry::new(500, 40000));
let results = wheel.advance_to(51000);
assert_eq!(
results,
[
WindowAggregate {
window_start_ms: 1000,
window_end_ms: 18000,
aggregate: 600
},
WindowAggregate {
window_start_ms: 20000,
window_end_ms: 35000,
aggregate: 400
},
WindowAggregate {
window_start_ms: 40000,
window_end_ms: 50000,
aggregate: 500
},
]
);
let no_results = wheel.advance_to(60000);
assert!(no_results.is_empty());
}
#[test]
fn window_session_multiple_sessions_same_time_test() {
let mut wheel: RwWheel<U64SumAggregator> = RwWheel::new(1000);
wheel.window(Window::session(Duration::seconds(5)));
wheel.insert(Entry::new(100, 1000));
wheel.insert(Entry::new(200, 3000));
wheel.insert(Entry::new(300, 10000));
wheel.insert(Entry::new(400, 12000));
wheel.insert(Entry::new(500, 20000));
let results = wheel.advance_to(26000);
assert_eq!(
results,
[
WindowAggregate {
window_start_ms: 1000,
window_end_ms: 8000,
aggregate: 300
},
WindowAggregate {
window_start_ms: 10000,
window_end_ms: 17000,
aggregate: 700,
},
WindowAggregate {
window_start_ms: 20000,
window_end_ms: 25000,
aggregate: 500
},
]
);
}
#[test]
fn window_session_single_entry_test() {
let mut wheel: RwWheel<U64SumAggregator> = RwWheel::new(1000);
wheel.window(Window::session(Duration::seconds(5)));
wheel.insert(Entry::new(100, 1000));
let results = wheel.advance_to(7000);
assert_eq!(
results,
[WindowAggregate {
window_start_ms: 1000,
window_end_ms: 6000,
aggregate: 100
}]
);
}
#[test]
fn window_session_empty_wheel_test() {
let mut wheel: RwWheel<U64SumAggregator> = RwWheel::new(1000);
wheel.window(Window::session(Duration::seconds(5)));
let results = wheel.advance_to(10000);
assert!(results.is_empty());
}
}