use core::{mem, num::NonZeroUsize, time::Duration as CoreDuration};
use crate::{Entry, aggregator::Aggregator, duration::Duration};
use super::{timer::RawTimerWheel, wheel_ext::WheelExt};
pub const DEFAULT_WRITE_AHEAD_SLOTS: usize = 64;
#[cfg(not(feature = "std"))]
use alloc::{boxed::Box, vec::Vec};
#[repr(C)]
#[derive(Clone)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
#[cfg_attr(feature = "serde", serde(bound = "A: Default"))]
pub struct WriterWheel<A: Aggregator> {
watermark: u64,
num_slots: usize,
capacity: NonZeroUsize,
overflow: RawTimerWheel<Entry<A::Input>>,
slots: Box<[Option<A::MutablePartialAggregate>]>,
tail: usize,
head: usize,
}
impl<A: Aggregator> Default for WriterWheel<A> {
fn default() -> Self {
Self::with_watermark(0)
}
}
impl<A: Aggregator> WriterWheel<A> {
pub fn with_watermark(watermark: u64) -> Self {
Self::with_capacity_and_watermark(
NonZeroUsize::new(DEFAULT_WRITE_AHEAD_SLOTS).unwrap(),
watermark,
)
}
pub fn with_capacity_and_watermark(capacity: NonZeroUsize, watermark: u64) -> Self {
let num_slots = crate::capacity_to_slots!(capacity);
Self {
num_slots,
capacity,
watermark,
overflow: RawTimerWheel::new(watermark),
slots: (0..capacity.get())
.map(|_| None)
.collect::<Vec<_>>()
.into_boxed_slice(),
head: 0,
tail: 0,
}
}
pub fn watermark(&self) -> u64 {
self.watermark
}
#[inline]
pub fn tick(&mut self) -> Option<A::MutablePartialAggregate> {
self.watermark += Duration::SECOND.whole_milliseconds() as u64;
self.head = self.wrap_add(self.head, 1);
let tail = self.tail;
self.tail = self.wrap_add(self.tail, 1);
for entry in self.overflow.advance_to(self.watermark) {
self.insert(entry); }
self.slot(tail).take()
}
#[inline]
pub(crate) fn can_write_ahead(&self, addend: u64) -> bool {
(addend as usize) < self.write_ahead_len()
}
#[inline]
pub fn write_ahead_len(&self) -> usize {
self.capacity.get() - self.len()
}
#[doc(hidden)]
pub fn at(&self, subtrahend: usize) -> Option<&A::MutablePartialAggregate> {
let idx = self.wrap_add(self.tail(), subtrahend);
self.slots[idx].as_ref()
}
#[inline(always)]
fn write_ahead(&mut self, addend: u64, data: A::Input) {
let slot_idx = self.slot_idx_forward_from_head(addend as usize);
self.combine_or_lift(slot_idx, data);
}
#[inline]
fn slot(&mut self, idx: usize) -> &mut Option<A::MutablePartialAggregate> {
&mut self.slots[idx]
}
#[inline(always)]
fn combine_or_lift(&mut self, idx: usize, entry: A::Input) {
let slot = self.slot(idx);
match slot {
Some(dst) => A::combine_mutable(dst, entry),
None => *slot = Some(A::lift(entry)),
}
}
#[inline]
pub fn insert(&mut self, e: impl Into<Entry<A::Input>>) {
let entry = e.into();
let watermark = self.watermark;
if entry.timestamp >= watermark {
let diff = entry.timestamp - self.watermark;
let seconds = CoreDuration::from_millis(diff).as_secs();
if self.can_write_ahead(seconds) {
self.write_ahead(seconds, entry.data);
} else {
let schedule_ts = watermark + seconds * 1000; self.overflow.schedule_at(schedule_ts, entry).unwrap();
}
}
}
}
impl<A: Aggregator> WheelExt for WriterWheel<A> {
fn num_slots(&self) -> usize {
self.num_slots
}
fn capacity(&self) -> usize {
self.capacity.get()
}
fn head(&self) -> usize {
self.head
}
fn tail(&self) -> usize {
self.tail
}
fn size_bytes(&self) -> Option<usize> {
let inner_slots = mem::size_of::<Option<A::MutablePartialAggregate>>() * self.num_slots;
Some(mem::size_of::<Self>() + inner_slots)
}
}
#[cfg(test)]
mod tests {
use crate::aggregator::sum::U64SumAggregator;
use super::*;
#[test]
fn empty_wheel_test() {
let mut wheel: WriterWheel<U64SumAggregator> =
WriterWheel::with_capacity_and_watermark(NonZeroUsize::new(8).unwrap(), 0);
assert_eq!(wheel.head, 0);
assert_eq!(wheel.tail, 0);
assert_eq!(wheel.tick(), None);
}
#[test]
fn write_ahead_test() {
let mut wheel: WriterWheel<U64SumAggregator> =
WriterWheel::with_capacity_and_watermark(NonZeroUsize::new(16).unwrap(), 0);
wheel.insert(Entry::new(1, 0));
wheel.insert(Entry::new(10, 1000));
wheel.insert(Entry::new(20, 2000));
wheel.insert(Entry::new(10, 15000));
assert_eq!(wheel.tick(), Some(1));
assert_eq!(wheel.head, 1);
assert_eq!(wheel.tail, 1);
wheel.insert(Entry::new(10, 0));
assert_eq!(wheel.at(0), Some(&10));
wheel.insert(Entry::new(5, 1000));
assert_eq!(wheel.at(0), Some(&15));
assert_eq!(wheel.tick(), Some(15));
assert_eq!(wheel.head, 2);
assert_eq!(wheel.tail, 2);
assert_eq!(wheel.tick(), Some(20));
assert_eq!(wheel.head, 3);
assert_eq!(wheel.tail, 3);
for _ in 0..12 {
assert_eq!(wheel.tick(), None);
}
wheel.insert(Entry::new(2, 16000));
assert_eq!(wheel.tick(), Some(10));
assert_eq!(wheel.head, 0);
assert_eq!(wheel.tail, 0);
}
#[test]
fn wrap_around_test() {
let mut wheel: WriterWheel<U64SumAggregator> =
WriterWheel::with_capacity_and_watermark(NonZeroUsize::new(4).unwrap(), 1000);
wheel.insert(Entry::new(1, 1000));
wheel.insert(Entry::new(2, 2000));
wheel.insert(Entry::new(3, 3000));
wheel.insert(Entry::new(4, 4000));
assert_eq!(wheel.tick(), Some(1));
assert_eq!(wheel.tick(), Some(2));
assert_eq!(wheel.tick(), Some(3));
assert_eq!(wheel.tick(), Some(4));
wheel.insert(Entry::new(5, 5000));
assert_eq!(wheel.head, 0);
assert_eq!(wheel.tail, 0);
assert_eq!(wheel.tick(), Some(5));
}
#[test]
fn late_event_handling_test() {
let mut wheel: WriterWheel<U64SumAggregator> =
WriterWheel::with_capacity_and_watermark(NonZeroUsize::new(8).unwrap(), 0);
wheel.insert(Entry::new(10, 1000));
wheel.insert(Entry::new(20, 2000));
assert_eq!(wheel.tick(), None);
wheel.insert(Entry::new(5, 500)); assert_eq!(wheel.at(0), Some(&10)); assert_eq!(wheel.tick(), Some(10));
}
#[test]
fn overflow_test() {
let mut watermark = 0;
let write_ahead_capacity = NonZeroUsize::new(8).unwrap();
let mut wheel: WriterWheel<U64SumAggregator> =
WriterWheel::with_capacity_and_watermark(write_ahead_capacity, watermark);
for _i in 0..1000 {
wheel.insert(Entry::new(1, watermark));
watermark += 1000;
}
let mut time = 0;
while time < watermark {
assert_eq!(wheel.tick(), Some(1));
time += 1000;
}
}
}