pub mod read;
pub mod wheel_ext;
pub mod write;
#[cfg(feature = "profiler")]
mod stats;
#[allow(dead_code)]
mod timer;
use crate::{Entry, aggregator::Aggregator, duration::Duration, window::WindowAggregate};
use core::{fmt::Debug, num::NonZeroUsize};
use write::DEFAULT_WRITE_AHEAD_SLOTS;
pub use read::{DAYS, HOURS, MINUTES, SECONDS, WEEKS, YEARS};
pub use wheel_ext::WheelExt;
pub use write::WriterWheel;
use self::read::{ReaderWheel, hierarchical::HawConf};
use crate::window::Window;
#[cfg(not(feature = "std"))]
use alloc::vec::Vec;
#[cfg(feature = "profiler")]
use uwheel_stats::profile_scope;
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
#[cfg_attr(feature = "serde", serde(bound = "A: Default"))]
pub struct RwWheel<A>
where
A: Aggregator,
{
writer: WriterWheel<A>,
reader: ReaderWheel<A>,
#[cfg(feature = "profiler")]
stats: stats::Stats,
}
impl<A: Aggregator> Default for RwWheel<A> {
fn default() -> Self {
Self::with_conf(Default::default())
}
}
impl<A> RwWheel<A>
where
A: Aggregator,
{
pub fn new(time: u64) -> Self {
let conf = Conf::default().with_haw_conf(HawConf::default().with_watermark(time));
Self::with_conf(conf)
}
pub fn with_conf(conf: Conf) -> Self {
Self {
writer: WriterWheel::with_capacity_and_watermark(
conf.writer_conf.write_ahead_capacity,
conf.reader_conf.haw_conf.watermark,
),
reader: ReaderWheel::with_conf(conf.reader_conf.haw_conf),
#[cfg(feature = "profiler")]
stats: stats::Stats::default(),
}
}
pub fn window(&mut self, window: impl Into<Window>) {
self.reader.window(window.into());
}
#[inline]
pub fn insert(&mut self, e: impl Into<Entry<A::Input>>) {
#[cfg(feature = "profiler")]
profile_scope!(&self.stats.insert);
self.writer.insert(e);
}
pub fn write(&self) -> &WriterWheel<A> {
&self.writer
}
pub fn read(&self) -> &ReaderWheel<A> {
&self.reader
}
pub fn merge_read_wheel(&self, other: &ReaderWheel<A>) {
self.read().merge(other);
}
pub fn watermark(&self) -> u64 {
self.writer.watermark()
}
#[inline]
pub fn advance(&mut self, duration: Duration) -> Vec<WindowAggregate<A::PartialAggregate>> {
let to = self.watermark() + duration.whole_milliseconds() as u64;
self.advance_to(to)
}
#[inline]
pub fn advance_to(&mut self, watermark: u64) -> Vec<WindowAggregate<A::PartialAggregate>> {
#[cfg(feature = "profiler")]
profile_scope!(&self.stats.advance);
self.reader.advance_to(watermark, &mut self.writer)
}
pub fn size_bytes(&self) -> usize {
let read = self.reader.as_ref().size_bytes();
let write = self.writer.size_bytes().unwrap();
read + write
}
#[cfg(feature = "profiler")]
pub fn print_stats(&self) {
use prettytable::{Table, row};
use uwheel_stats::Sketch;
let mut table = Table::new();
table.add_row(row![
"name", "count", "min", "p50", "p99", "p99.9", "p99.99", "p99.999", "max",
]);
let percentile_fmt = |p: f64| -> String { format!("{:.2}ns", p) };
let add_row = |id: &str, table: &mut Table, sketch: &Sketch| {
let percentiles = sketch.percentiles();
table.add_row(row![
id,
percentiles.count,
percentiles.min,
percentile_fmt(percentiles.p50),
percentile_fmt(percentiles.p99),
percentile_fmt(percentiles.p99_9),
percentile_fmt(percentiles.p99_99),
percentile_fmt(percentiles.p99_999),
percentiles.min,
]);
};
add_row("insert", &mut table, &self.stats.insert);
add_row("advance", &mut table, &self.stats.advance);
let read = self.reader.as_ref();
add_row("tick", &mut table, &read.stats().tick);
add_row("interval", &mut table, &read.stats().interval);
add_row("landmark", &mut table, &read.stats().landmark);
add_row("combine range", &mut table, &read.stats().combine_range);
add_row(
"combine range plan",
&mut table,
&read.stats().combine_range_plan,
);
add_row(
"combined aggregation",
&mut table,
&read.stats().combined_aggregation,
);
add_row("execution plan", &mut table, &read.stats().exec_plan);
add_row(
"combined aggregation plan",
&mut table,
&read.stats().combined_aggregation_plan,
);
add_row(
"wheel aggregation",
&mut table,
&read.stats().wheel_aggregation,
);
println!("====RwWheel Profiler Dump====");
table.printstd();
}
}
impl<A> Drop for RwWheel<A>
where
A: Aggregator,
{
fn drop(&mut self) {
#[cfg(feature = "profiler")]
self.print_stats();
}
}
#[derive(Debug, Copy, Clone)]
pub struct WriterConf {
write_ahead_capacity: NonZeroUsize,
}
impl Default for WriterConf {
fn default() -> Self {
Self {
write_ahead_capacity: NonZeroUsize::new(DEFAULT_WRITE_AHEAD_SLOTS).unwrap(),
}
}
}
#[derive(Debug, Default, Copy, Clone)]
pub struct ReaderConf {
haw_conf: HawConf,
}
#[derive(Debug, Default, Copy, Clone)]
pub struct Conf {
writer_conf: WriterConf,
reader_conf: ReaderConf,
}
impl Conf {
pub fn with_write_ahead(mut self, capacity: NonZeroUsize) -> Self {
self.writer_conf.write_ahead_capacity = capacity;
self
}
pub fn with_haw_conf(mut self, conf: HawConf) -> Self {
self.reader_conf.haw_conf = conf;
self
}
}
#[cfg(test)]
mod tests {
#[cfg(feature = "timer")]
use core::cell::RefCell;
#[cfg(feature = "timer")]
use std::rc::Rc;
use super::*;
use crate::{aggregator::sum::U32SumAggregator, duration::*, *};
use proptest::prelude::*;
#[test]
fn delta_generate_test() {
let haw_conf = HawConf::default().with_deltas();
let conf = Conf::default().with_haw_conf(haw_conf);
let mut rw_wheel: RwWheel<U32SumAggregator> = RwWheel::with_conf(conf);
rw_wheel.insert(Entry::new(250, 1000));
rw_wheel.insert(Entry::new(250, 2000));
rw_wheel.insert(Entry::new(250, 3000));
rw_wheel.insert(Entry::new(250, 4000));
rw_wheel.advance_to(5000);
let delta_state = rw_wheel.read().delta_state();
assert_eq!(delta_state.oldest_ts, 0);
assert_eq!(
delta_state.deltas,
vec![None, Some(250), Some(250), Some(250), Some(250)]
);
assert_eq!(rw_wheel.read().interval(4.seconds()), Some(1000));
let read: ReaderWheel<U32SumAggregator> = ReaderWheel::from_delta_state(delta_state);
assert_eq!(read.watermark(), 5000);
assert_eq!(read.interval(4.seconds()), Some(1000));
}
#[test]
fn insert_test() {
let mut rw_wheel: RwWheel<U32SumAggregator> = RwWheel::default();
rw_wheel.insert(Entry::new(250, 1000));
rw_wheel.insert(Entry::new(250, 2000));
rw_wheel.insert(Entry::new(250, 3000));
rw_wheel.insert(Entry::new(250, 4000));
rw_wheel.insert(Entry::new(250, 150000));
rw_wheel.advance_to(5000);
rw_wheel.advance_to(86000);
}
#[cfg(feature = "serde")]
#[test]
fn rw_wheel_serde_test() {
let mut rw_wheel: RwWheel<U32SumAggregator> = RwWheel::new(0);
rw_wheel.insert(Entry::new(250, 1000));
rw_wheel.insert(Entry::new(250, 2000));
rw_wheel.insert(Entry::new(250, 3000));
rw_wheel.insert(Entry::new(250, 4000));
rw_wheel.insert(Entry::new(250, 100000));
let serialized = bincode::serialize(&rw_wheel).unwrap();
let mut deserialized_wheel =
bincode::deserialize::<RwWheel<U32SumAggregator>>(&serialized).unwrap();
assert_eq!(deserialized_wheel.watermark(), 0);
deserialized_wheel.advance_to(5000);
assert_eq!(deserialized_wheel.read().interval(4.seconds()), Some(1000));
deserialized_wheel.advance_to(101000);
assert_eq!(deserialized_wheel.read().interval(1.seconds()), Some(250));
}
#[cfg(feature = "timer")]
#[test]
fn timer_once_test() {
let mut rw_wheel: RwWheel<U32SumAggregator> = RwWheel::default();
let gate = Rc::new(RefCell::new(false));
let inner_gate = gate.clone();
let _ = rw_wheel.read().schedule_once(5000, move |read| {
if let Some(last_five) = read.interval(5.seconds()) {
*inner_gate.borrow_mut() = true;
assert_eq!(last_five, 1000);
}
});
rw_wheel.insert(Entry::new(250, 1000));
rw_wheel.insert(Entry::new(250, 2000));
rw_wheel.insert(Entry::new(250, 3000));
rw_wheel.insert(Entry::new(250, 4000));
rw_wheel.advance_to(5000);
assert!(*gate.borrow());
}
#[cfg(feature = "timer")]
#[test]
fn timer_repeat_test() {
let mut rw_wheel: RwWheel<U32SumAggregator> = RwWheel::default();
let sum = Rc::new(RefCell::new(0));
let inner_sum = sum.clone();
let _ = rw_wheel
.read()
.schedule_repeat(5000, 5.seconds(), move |read| {
if let Some(last_five) = read.interval(5.seconds()) {
*inner_sum.borrow_mut() += last_five;
}
});
rw_wheel.insert(Entry::new(250, 1000));
rw_wheel.insert(Entry::new(250, 2000));
rw_wheel.insert(Entry::new(250, 3000));
rw_wheel.insert(Entry::new(250, 4000));
rw_wheel.advance_to(5000);
assert_eq!(*sum.borrow(), 1000);
rw_wheel.insert(Entry::new(250, 5000));
rw_wheel.insert(Entry::new(250, 6000));
rw_wheel.insert(Entry::new(250, 7000));
rw_wheel.advance_to(10000);
assert_eq!(*sum.borrow(), 1750);
}
#[cfg(feature = "sync")]
#[test]
fn read_wheel_move_thread_test() {
let mut rw_wheel: RwWheel<U32SumAggregator> = RwWheel::default();
rw_wheel.insert(Entry::new(1, 999));
rw_wheel.advance(1.seconds());
let read = rw_wheel.read().clone();
let handle = std::thread::spawn(move || {
assert_eq!(read.interval(1.seconds()), Some(1));
});
handle.join().expect("Failed to join the thread.");
}
#[test]
fn interval_test() {
let mut time = 0;
let mut wheel = RwWheel::<U32SumAggregator>::new(time);
wheel.advance(1.seconds());
wheel.insert(Entry::new(1u32, 1000));
wheel.insert(Entry::new(5u32, 5000));
wheel.insert(Entry::new(11u32, 11000));
wheel.advance(5.seconds());
assert_eq!(wheel.watermark(), 6000);
assert_eq!(
wheel.read().as_ref().seconds_unchecked().interval(5).0,
Some(6u32)
);
assert_eq!(
wheel.read().as_ref().seconds_unchecked().interval(1).0,
Some(5u32)
);
time = 12000;
wheel.advance_to(time);
wheel.insert(Entry::new(100u32, 61000));
wheel.insert(Entry::new(100u32, 63000));
wheel.insert(Entry::new(100u32, 67000));
time = 65000;
wheel.advance_to(time);
}
#[test]
fn mixed_timestamp_insertions_test() {
let mut time = 1000;
let mut wheel = RwWheel::<U32SumAggregator>::new(time);
wheel.advance_to(time);
wheel.insert(Entry::new(1u32, 1000));
wheel.insert(Entry::new(5u32, 5000));
wheel.insert(Entry::new(11u32, 11000));
time = 6000; wheel.advance_to(time);
assert_eq!(
wheel.read().as_ref().seconds_unchecked().total(),
Some(6u32)
);
assert_eq!(
wheel
.read()
.as_ref()
.seconds_unchecked()
.combine_range_and_lower(0..5),
Some(6u32)
);
}
#[test]
fn merge_test() {
let time = 0;
let mut wheel = RwWheel::<U32SumAggregator>::new(time);
let entry = Entry::new(1u32, 5000);
wheel.insert(entry);
wheel.advance(60.minutes());
let fresh_wheel_time = 0;
let fresh_wheel = RwWheel::<U32SumAggregator>::new(fresh_wheel_time);
fresh_wheel.read().merge(wheel.read());
assert_eq!(fresh_wheel.read().watermark(), wheel.read().watermark());
assert_eq!(fresh_wheel.read().landmark(), wheel.read().landmark());
assert_eq!(
fresh_wheel.read().remaining_ticks(),
wheel.read().remaining_ticks()
);
}
#[test]
fn merge_test_low_to_high() {
let time = 0;
let mut wheel = RwWheel::<U32SumAggregator>::new(time);
let entry = Entry::new(1u32, 5000);
wheel.insert(entry);
wheel.advance(10.seconds());
let mut fresh_wheel = RwWheel::<U32SumAggregator>::new(time);
fresh_wheel.insert(Entry::new(5u32, 8000));
fresh_wheel.advance(9.seconds());
wheel.read().merge(fresh_wheel.read());
assert_eq!(wheel.read().landmark(), Some(6));
assert_eq!(wheel.read().interval(10.seconds()), Some(6));
}
fn create_and_advance_wheel(start: u64, end: u64) -> u64 {
let mut wheel: RwWheel<U32SumAggregator> = RwWheel::new(start);
wheel.advance_to(end);
wheel.watermark()
}
proptest! {
#[test]
fn advance_to(start in 0u64..u64::MAX, watermark in 0u64..u64::MAX) {
let advanced_time = create_and_advance_wheel(start, watermark);
prop_assert!(advanced_time >= start);
}
}
}