use crate::{aggregator::Aggregator, duration::Duration};
#[cfg(not(feature = "std"))]
use alloc::vec::Vec;
use core::{
assert,
fmt::Debug,
mem,
num::NonZeroUsize,
ops::{Range, RangeBounds},
option::Option::{self, None, Some},
};
#[cfg(feature = "profiler")]
use uwheel_stats::profile_scope;
#[cfg(feature = "profiler")]
pub(crate) mod stats;
pub mod conf;
pub mod deque;
pub mod iter;
pub mod maybe;
mod data;
#[cfg(feature = "profiler")]
use stats::Stats;
use self::{
conf::{DataLayout, RetentionPolicy, WheelConf, WheelMode},
data::Data,
};
#[inline]
pub fn combine_or_insert<A: Aggregator>(
dest: &mut Option<A::PartialAggregate>,
entry: A::PartialAggregate,
) {
match dest {
Some(curr) => {
let current = core::mem::take(curr);
*curr = A::combine(current, entry);
}
None => {
*dest = Some(entry);
}
}
}
#[inline]
fn into_range(range: &impl RangeBounds<usize>, len: usize) -> Range<usize> {
let start = match range.start_bound() {
core::ops::Bound::Included(&n) => n,
core::ops::Bound::Excluded(&n) => n + 1,
core::ops::Bound::Unbounded => 0,
};
let end = match range.end_bound() {
core::ops::Bound::Included(&n) => n + 1,
core::ops::Bound::Excluded(&n) => n,
core::ops::Bound::Unbounded => len,
};
assert!(start <= end, "lower bound was too large");
assert!(end <= len, "upper bound was too large");
start..end
}
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
#[derive(Clone, Debug)]
pub struct WheelSlot<A: Aggregator> {
pub total: A::PartialAggregate,
}
impl<A: Aggregator> WheelSlot<A> {
pub fn new(total: Option<A::PartialAggregate>) -> Self {
Self {
total: total.unwrap_or_else(|| A::IDENTITY.clone()),
}
}
#[cfg(test)]
fn with_total(total: Option<A::PartialAggregate>) -> Self {
Self::new(total)
}
}
#[repr(C)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
#[cfg_attr(feature = "serde", serde(bound = "A: Default"))]
#[derive(Clone, Debug)]
pub struct Wheel<A: Aggregator> {
capacity: NonZeroUsize,
num_slots: usize,
total: Option<A::PartialAggregate>,
watermark: u64,
tick_size_ms: u64,
retention: RetentionPolicy,
mode: WheelMode,
data: Data<A>,
rotation_count: usize,
#[cfg(test)]
pub(crate) total_ticks: usize,
#[cfg(feature = "profiler")]
stats: Stats,
}
impl<A: Aggregator> Wheel<A> {
pub fn new(conf: WheelConf) -> Self {
let capacity = conf.capacity;
let num_slots = crate::capacity_to_slots!(capacity);
let data = match conf.data_layout {
DataLayout::Normal => Data::create_deque_with_capacity(num_slots),
DataLayout::Prefix => {
assert!(
A::invertible(),
"Cannot configure prefix-sum without invertible agg function"
);
Data::create_prefix_deque()
}
DataLayout::Compressed(chunk_size) => {
assert!(
A::compression_support(),
"Compressed data layout requires the aggregator to implement compressor + decompressor"
);
Data::create_compressed_deque(chunk_size)
}
};
Self {
capacity,
num_slots,
data,
total: None,
watermark: conf.watermark,
tick_size_ms: conf.tick_size_ms,
retention: conf.retention,
mode: conf.mode,
rotation_count: 0,
#[cfg(test)]
total_ticks: 0,
#[cfg(feature = "profiler")]
stats: Stats::default(),
}
}
pub fn watermark(&self) -> u64 {
self.watermark
}
pub fn now(&self) -> Duration {
Duration::milliseconds(self.watermark as i64)
}
#[inline]
pub fn interval(&self, subtrahend: usize) -> (Option<A::PartialAggregate>, usize) {
if subtrahend > self.data.len() {
(None, 0)
} else {
(self.data.combine_range(0..subtrahend), subtrahend)
}
}
#[inline]
pub fn interval_or_total(&self, subtrahend: usize) -> (Option<A::PartialAggregate>, usize) {
if subtrahend == self.rotation_count {
(self.total(), 0)
} else {
self.interval(subtrahend)
}
}
#[inline]
pub fn lower_interval(&self, subtrahend: usize) -> Option<A::Aggregate> {
self.interval(subtrahend)
.0
.map(|partial_agg| A::lower(partial_agg))
}
#[inline]
pub fn at(&self, subtrahend: usize) -> Option<A::PartialAggregate> {
if subtrahend > self.len() {
None
} else {
self.data.get(subtrahend)
}
}
#[inline]
pub fn lower_at(&self, subtrahend: usize) -> Option<A::Aggregate> {
self.at(subtrahend).map(|res| A::lower(res))
}
#[inline]
pub fn is_prefix(&self) -> bool {
matches!(self.data, Data::PrefixDeque(_))
}
pub fn to_prefix(&mut self) {
assert!(A::invertible());
if let Data::Deque(deque) = &self.data {
let mut prefix_data = Data::deque_to_prefix(deque);
core::mem::swap(&mut self.data, &mut prefix_data);
}
}
#[doc(hidden)]
pub fn to_deque(&mut self) {
assert!(A::invertible());
if let Data::PrefixDeque(prefix_deque) = &self.data {
let mut deque_data = Data::prefix_to_deque(prefix_deque);
core::mem::swap(&mut self.data, &mut deque_data);
}
}
pub fn to_simd(&mut self) {
self.data.maybe_make_contigious();
}
#[inline]
pub fn len(&self) -> usize {
self.data.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline]
pub fn total_slots(&self) -> usize {
self.data.len()
}
#[inline]
pub fn range<R>(&self, range: R) -> Vec<A::PartialAggregate>
where
R: RangeBounds<usize>,
{
self.data.range(range)
}
#[inline]
pub fn combine_range<R>(&self, range: R) -> Option<A::PartialAggregate>
where
R: RangeBounds<usize>,
{
self.data.combine_range(range)
}
pub fn combine_range_and_lower<R>(&self, range: R) -> Option<A::Aggregate>
where
R: RangeBounds<usize>,
{
self.combine_range(range).map(A::lower)
}
#[inline]
fn clear_tail(&mut self) {
if !self.data.is_empty() && self.retention.should_drop() {
self.data.pop_back();
} else if let RetentionPolicy::KeepWithLimit(limit) = self.retention
&& self.data.len() > self.capacity.get() + limit
{
self.data.pop_back();
};
}
pub fn rotation_count(&self) -> usize {
self.rotation_count
}
#[inline]
pub fn ticks_remaining(&self) -> usize {
self.capacity.get() - self.rotation_count
}
fn size_bytesz(&self) -> Option<usize> {
let data_size = self.data.size_bytes(); Some(mem::size_of::<Self>() + data_size)
}
pub fn clear(&mut self) {
let mut new = Self::new(WheelConf {
capacity: self.capacity,
watermark: self.watermark,
data_layout: self.data.layout(),
tick_size_ms: self.tick_size_ms,
retention: self.retention,
mode: self.mode,
});
core::mem::swap(self, &mut new);
}
#[inline]
pub fn total(&self) -> Option<A::PartialAggregate> {
#[cfg(feature = "profiler")]
self.stats.bump_total();
self.total.clone()
}
#[inline]
pub fn insert_head(&mut self, entry: A::PartialAggregate) {
#[cfg(feature = "profiler")]
profile_scope!(&self.stats.insert);
self.data.push_front(entry);
if A::simd_support() && self.mode != WheelMode::Index {
self.data.maybe_make_contigious();
}
}
#[inline]
#[doc(hidden)]
pub fn insert_slot(&mut self, slot: WheelSlot<A>) {
self.insert_head(slot.total);
}
#[allow(clippy::useless_conversion)]
#[inline]
pub fn merge(&mut self, other: &Self) {
if let Some(other_total) = other.total.clone() {
combine_or_insert::<A>(&mut self.total, other_total)
}
self.data.merge(&other.data);
}
#[inline]
pub fn tick(&mut self) -> Option<WheelSlot<A>> {
self.watermark += self.tick_size_ms;
if let Some(curr) = self.data.head() {
combine_or_insert::<A>(&mut self.total, curr);
}
if self.is_full() {
self.clear_tail();
}
self.rotation_count += 1;
#[cfg(test)]
{
self.total_ticks += 1;
}
if self.rotation_count == self.capacity.get() {
let total = self.total.take();
self.rotation_count = 0;
Some(WheelSlot::new(total))
} else {
None
}
}
#[inline]
pub fn is_full(&self) -> bool {
self.data.len() >= self.capacity.get()
}
#[cfg(feature = "profiler")]
pub fn stats(&self) -> &Stats {
&self.stats
}
}
#[cfg(test)]
mod tests {
use core::num::NonZeroUsize;
use super::{deque::MutablePartialDeque, *};
use crate::{
aggregator::{
Compression,
min::U64MinAggregator,
sum::{U32SumAggregator, U64SumAggregator},
},
wheels::read::hierarchical::HOUR_TICK_MS,
};
#[derive(Clone, Debug, Default)]
pub struct PcoSumAggregator;
impl Aggregator for PcoSumAggregator {
const IDENTITY: Self::PartialAggregate = 0;
type Input = u32;
type PartialAggregate = u32;
type MutablePartialAggregate = u32;
type Aggregate = u32;
fn lift(input: Self::Input) -> Self::MutablePartialAggregate {
input
}
fn combine_mutable(mutable: &mut Self::MutablePartialAggregate, input: Self::Input) {
*mutable += input
}
fn freeze(a: Self::MutablePartialAggregate) -> Self::PartialAggregate {
a
}
fn combine(a: Self::PartialAggregate, b: Self::PartialAggregate) -> Self::PartialAggregate {
a + b
}
fn lower(a: Self::PartialAggregate) -> Self::Aggregate {
a
}
fn compression() -> Option<Compression<Self::PartialAggregate>> {
let compressor = |slice: &[u32]| {
pco::standalone::auto_compress(slice, pco::DEFAULT_COMPRESSION_LEVEL)
};
let decompressor = |slice: &[u8]| {
pco::standalone::auto_decompress(slice).expect("failed to decompress")
};
Some(Compression::new(compressor, decompressor))
}
}
#[test]
#[should_panic]
fn invalid_prefix_conf() {
let conf = WheelConf::new(HOUR_TICK_MS, NonZeroUsize::new(24).unwrap())
.with_retention_policy(RetentionPolicy::Keep)
.with_data_layout(DataLayout::Prefix);
let _wheel = Wheel::<U64MinAggregator>::new(conf);
}
#[test]
fn agg_wheel_prefix_test() {
let prefix_conf = WheelConf::new(HOUR_TICK_MS, NonZeroUsize::new(24).unwrap())
.with_retention_policy(RetentionPolicy::Keep)
.with_data_layout(DataLayout::Prefix);
let mut prefix_wheel = Wheel::<U64SumAggregator>::new(prefix_conf);
let conf = WheelConf::new(HOUR_TICK_MS, NonZeroUsize::new(24).unwrap())
.with_retention_policy(RetentionPolicy::Keep);
let mut wheel = Wheel::<U64SumAggregator>::new(conf);
for i in 0..30 {
wheel.insert_slot(WheelSlot::with_total(Some(i)));
prefix_wheel.insert_slot(WheelSlot::with_total(Some(i)));
wheel.tick();
prefix_wheel.tick();
}
let prefix_result = prefix_wheel.combine_range(0..4);
let result = wheel.combine_range(0..4);
assert_eq!(prefix_result, result);
assert_eq!(prefix_wheel.combine_range(10..10), Some(0));
assert_eq!(wheel.combine_range(10..10), Some(0));
let full_range_prefix = prefix_wheel.combine_range(..);
let full_range_regular = wheel.combine_range(..);
assert_eq!(full_range_prefix, full_range_regular);
for i in 0..30 {
let single_slot_prefix = prefix_wheel.combine_range(i..i + 1);
let single_slot_regular = wheel.combine_range(i..i + 1);
assert_eq!(
single_slot_prefix, single_slot_regular,
"Mismatch for single slot {}: prefix={:?}, regular={:?}",
i, single_slot_prefix, single_slot_regular
);
}
}
#[test]
fn compressed_deque_test() {
let chunk_sizes = vec![8, 24, 60, 120, 160];
for chunk_size in chunk_sizes {
let compressed_conf = WheelConf::new(HOUR_TICK_MS, NonZeroUsize::new(24).unwrap())
.with_retention_policy(RetentionPolicy::Keep)
.with_data_layout(DataLayout::Compressed(chunk_size));
let mut compressed_wheel = Wheel::<PcoSumAggregator>::new(compressed_conf);
let conf = WheelConf::new(HOUR_TICK_MS, NonZeroUsize::new(24).unwrap())
.with_retention_policy(RetentionPolicy::Keep);
let mut wheel = Wheel::<U32SumAggregator>::new(conf);
for i in 0..150 {
wheel.insert_slot(WheelSlot::with_total(Some(i)));
compressed_wheel.insert_slot(WheelSlot::with_total(Some(i)));
wheel.tick();
compressed_wheel.tick();
}
assert_eq!(compressed_wheel.range(0..4), wheel.range(0..4));
assert_eq!(
compressed_wheel.combine_range(0..4),
wheel.combine_range(0..4)
);
assert_eq!(compressed_wheel.range(55..75), wheel.range(55..75));
assert_eq!(
compressed_wheel.combine_range(55..75),
wheel.combine_range(55..75)
);
assert_eq!(
wheel.combine_range(60..120),
compressed_wheel.combine_range(60..120)
);
assert_eq!(wheel.range(60..120), compressed_wheel.range(60..120));
for i in 0u32..(chunk_size as u32 / 2) {
wheel.insert_slot(WheelSlot::with_total(Some(i)));
compressed_wheel.insert_slot(WheelSlot::with_total(Some(i)));
wheel.tick();
compressed_wheel.tick();
}
assert_eq!(compressed_wheel.range(55..75), wheel.range(55..75));
assert_eq!(
compressed_wheel.combine_range(55..75),
wheel.combine_range(55..75)
);
assert_eq!(wheel.range(60..85), compressed_wheel.range(60..85));
assert_eq!(
compressed_wheel.combine_range(60..85),
wheel.combine_range(60..85)
);
}
}
#[test]
fn mutable_partial_deque_test() {
let mut deque = MutablePartialDeque::<U64SumAggregator>::default();
deque.push_front(10);
deque.push_front(20);
deque.push_front(30);
assert_eq!(deque.len(), 3);
assert_eq!(deque.combine_range(..), Some(60));
assert_eq!(deque.combine_range(0..2), Some(50));
assert_eq!(deque.combine_range_with_filter(.., |v| *v > 10), Some(50));
assert_eq!(deque.get(0), Some(&30));
assert_eq!(deque.get(1), Some(&20));
assert_eq!(deque.get(2), Some(&10));
deque.pop_back();
assert_eq!(deque.len(), 2);
assert_eq!(deque.combine_range(..), Some(50));
assert_eq!(deque.combine_range(0..2), Some(50));
}
#[test]
fn retention_drop_test() {
let conf = WheelConf::new(HOUR_TICK_MS, NonZeroUsize::new(24).unwrap())
.with_retention_policy(RetentionPolicy::Drop);
let mut wheel = Wheel::<U64SumAggregator>::new(conf);
for i in 0..24 {
wheel.insert_slot(WheelSlot::with_total(Some(i)));
wheel.tick();
}
assert_eq!(wheel.total_slots(), 23);
wheel.tick();
}
#[test]
fn retention_keep_test() {
let conf = WheelConf::new(HOUR_TICK_MS, NonZeroUsize::new(24).unwrap())
.with_retention_policy(RetentionPolicy::Keep);
let mut wheel = Wheel::<U64SumAggregator>::new(conf);
for i in 0..60 {
wheel.insert_slot(WheelSlot::with_total(Some(i)));
wheel.tick();
}
assert_eq!(wheel.total_slots(), 60);
}
#[test]
fn retention_keep_with_limit_test() {
let conf = WheelConf::new(HOUR_TICK_MS, NonZeroUsize::new(24).unwrap())
.with_retention_policy(RetentionPolicy::KeepWithLimit(10));
let mut wheel = Wheel::<U64SumAggregator>::new(conf);
for i in 0..60 {
wheel.insert_slot(WheelSlot::with_total(Some(i)));
wheel.tick();
}
assert_eq!(wheel.total_slots(), 24 + 10);
}
}