use core::{
cmp,
fmt::{self, Display},
num::NonZeroUsize,
};
use time::OffsetDateTime;
use super::{
super::write::WriterWheel,
aggregation::{
Wheel,
conf::{DataLayout, RetentionPolicy, WheelMode},
maybe::MaybeWheel,
},
plan::{ExecutionPlan, WheelAggregation, WheelRanges},
};
use crate::{
Duration,
Window,
aggregator::Aggregator,
delta::DeltaState,
wheels::read::{
aggregation::combine_or_insert,
plan::{CombinedAggregation, WheelAggregations},
},
window::{WindowAggregate, WindowManager},
};
#[cfg(test)]
use proptest::prelude::*;
#[cfg(not(feature = "std"))]
use alloc::vec::Vec;
#[cfg(feature = "profiler")]
use super::stats::Stats;
#[cfg(feature = "profiler")]
use uwheel_stats::profile_scope;
crate::cfg_timer! {
#[cfg(not(feature = "std"))]
use alloc::{boxed::Box, rc::Rc};
use crate::wheels::timer::{RawTimerWheel, TimerWheel, TimerError, TimerAction};
}
use super::aggregation::conf::WheelConf;
pub const SECOND_TICK_MS: u64 = time::Duration::SECOND.whole_milliseconds() as u64;
pub const MINUTE_TICK_MS: u64 = time::Duration::MINUTE.whole_milliseconds() as u64;
pub const HOUR_TICK_MS: u64 = time::Duration::HOUR.whole_milliseconds() as u64;
pub const DAY_TICK_MS: u64 = time::Duration::DAY.whole_milliseconds() as u64;
pub const WEEK_TICK_MS: u64 = time::Duration::WEEK.whole_milliseconds() as u64;
pub const YEAR_TICK_MS: u64 = WEEK_TICK_MS * 52;
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
#[derive(Clone, Copy, Debug)]
pub struct HawConf {
pub watermark: u64,
pub seconds: WheelConf,
pub minutes: WheelConf,
pub hours: WheelConf,
pub days: WheelConf,
pub weeks: WheelConf,
pub years: WheelConf,
pub optimizer: Optimizer,
pub generate_deltas: bool,
}
impl Default for HawConf {
fn default() -> Self {
Self {
watermark: 0,
seconds: WheelConf::new(SECOND_TICK_MS, SECONDS),
minutes: WheelConf::new(MINUTE_TICK_MS, MINUTES),
hours: WheelConf::new(HOUR_TICK_MS, HOURS),
days: WheelConf::new(DAY_TICK_MS, DAYS),
weeks: WheelConf::new(WEEK_TICK_MS, WEEKS),
years: WheelConf::new(YEAR_TICK_MS, YEARS),
optimizer: Default::default(),
generate_deltas: false,
}
}
}
impl HawConf {
pub fn with_watermark(mut self, watermark: u64) -> Self {
self.seconds.set_watermark(watermark);
self.minutes.set_watermark(watermark);
self.hours.set_watermark(watermark);
self.days.set_watermark(watermark);
self.weeks.set_watermark(watermark);
self.years.set_watermark(watermark);
self.watermark = watermark;
self
}
pub fn with_mode(mut self, mode: WheelMode) -> Self {
self.seconds.set_mode(mode);
self.minutes.set_mode(mode);
self.hours.set_mode(mode);
self.days.set_mode(mode);
self.weeks.set_mode(mode);
self.years.set_mode(mode);
self
}
pub fn with_prefix_sum(mut self) -> Self {
self.seconds.set_data_layout(DataLayout::Prefix);
self.minutes.set_data_layout(DataLayout::Prefix);
self.hours.set_data_layout(DataLayout::Prefix);
self.days.set_data_layout(DataLayout::Prefix);
self.weeks.set_data_layout(DataLayout::Prefix);
self.years.set_data_layout(DataLayout::Prefix);
self
}
pub fn with_retention_policy(mut self, policy: RetentionPolicy) -> Self {
self.seconds.set_retention_policy(policy);
self.minutes.set_retention_policy(policy);
self.hours.set_retention_policy(policy);
self.days.set_retention_policy(policy);
self.weeks.set_retention_policy(policy);
self.years.set_retention_policy(policy);
self
}
pub fn with_seconds(mut self, seconds: WheelConf) -> Self {
self.seconds = seconds;
self
}
pub fn with_minutes(mut self, minutes: WheelConf) -> Self {
self.minutes = minutes;
self
}
pub fn with_hours(mut self, hours: WheelConf) -> Self {
self.hours = hours;
self
}
pub fn with_days(mut self, days: WheelConf) -> Self {
self.days = days;
self
}
pub fn with_weeks(mut self, weeks: WheelConf) -> Self {
self.weeks = weeks;
self
}
pub fn with_years(mut self, years: WheelConf) -> Self {
self.years = years;
self
}
pub fn with_deltas(mut self) -> Self {
self.generate_deltas = true;
self
}
}
pub const SECONDS: NonZeroUsize = NonZeroUsize::new(60).unwrap();
pub const MINUTES: NonZeroUsize = NonZeroUsize::new(60).unwrap();
pub const HOURS: NonZeroUsize = NonZeroUsize::new(24).unwrap();
pub const DAYS: NonZeroUsize = NonZeroUsize::new(7).unwrap();
pub const WEEKS: NonZeroUsize = NonZeroUsize::new(52).unwrap();
pub const YEARS: NonZeroUsize = NonZeroUsize::new(10).unwrap();
#[derive(Debug, Copy, Clone)]
pub enum RangeError {
InvalidStart {
start_ms: u64,
},
InvalidEnd {
end_ms: u64,
},
}
impl Display for RangeError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RangeError::InvalidStart { start_ms } => {
write!(f, "{start_ms} is not a valid unix timestamp")
}
RangeError::InvalidEnd { end_ms } => {
write!(f, "{end_ms} is not a valid unix timestamp")
}
}
}
}
#[derive(Debug, Copy, Hash, PartialEq, Eq, Clone)]
pub struct WheelRange {
pub(crate) start: OffsetDateTime,
pub(crate) end: OffsetDateTime,
}
#[cfg(test)]
impl Arbitrary for WheelRange {
type Parameters = ();
type Strategy = BoxedStrategy<Self>;
fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
(any::<i64>(), any::<i64>())
.prop_map(|(start, end)| {
let start = OffsetDateTime::from_unix_timestamp(start.abs() % 1_600_000_000)
.unwrap_or(OffsetDateTime::UNIX_EPOCH);
let end = OffsetDateTime::from_unix_timestamp(end.abs() % 1_600_000_000)
.unwrap_or(OffsetDateTime::UNIX_EPOCH);
WheelRange {
start: start.min(end),
end: start.max(end),
}
})
.boxed()
}
}
impl WheelRange {
pub fn new(start_ms: u64, end_ms: u64) -> Result<Self, RangeError> {
let start = OffsetDateTime::from_unix_timestamp(start_ms as i64 / 1000)
.map_err(|_| RangeError::InvalidStart { start_ms })?;
let end = OffsetDateTime::from_unix_timestamp(end_ms as i64 / 1000)
.map_err(|_| RangeError::InvalidEnd { end_ms })?;
Ok(Self { start, end })
}
pub fn new_unchecked(start_ms: u64, end_ms: u64) -> Self {
Self::new(start_ms, end_ms).unwrap()
}
#[doc(hidden)]
pub fn from(start: OffsetDateTime, end: OffsetDateTime) -> Self {
Self { start, end }
}
}
impl WheelRange {
pub(crate) fn lowest_granularity(&self) -> Granularity {
let is_seconds = self.start.second() != 0 || self.end.second() != 0;
let is_minutes = self.start.minute() != 0 || self.end.minute() != 0;
let is_hours = self.start.hour() != 0 || self.end.hour() != 0;
let is_days = self.start.day() != 0 || self.end.day() != 0;
if is_seconds {
Granularity::Second
} else if is_minutes {
Granularity::Minute
} else if is_hours {
Granularity::Hour
} else if is_days {
Granularity::Day
} else {
unimplemented!("Weeks and years not supported");
}
}
pub fn scan_estimation(&self) -> i64 {
let dur = self.duration();
match self.lowest_granularity() {
Granularity::Second => dur.whole_seconds(),
Granularity::Minute => dur.whole_minutes(),
Granularity::Hour => dur.whole_hours(),
Granularity::Day => dur.whole_days(),
}
}
#[inline]
pub fn duration(&self) -> Duration {
Duration::seconds((self.end - self.start).whole_seconds())
}
}
#[derive(Debug, Copy, PartialEq, Eq, Clone)]
#[repr(usize)]
pub(crate) enum Granularity {
Second,
Minute,
Hour,
Day,
}
pub const DEFAULT_SIMD_THRESHOLD: usize = 15000;
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
#[derive(Debug, Clone, Copy)]
pub struct Heuristics {
simd_threshold: usize,
}
impl Default for Heuristics {
fn default() -> Self {
Self {
simd_threshold: DEFAULT_SIMD_THRESHOLD,
}
}
}
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
#[derive(Default, Debug, Clone, Copy)]
pub struct Optimizer {
use_hints: bool,
heuristics: Heuristics,
}
impl Optimizer {
pub fn use_hints(&mut self, use_hints: bool) {
self.use_hints = use_hints;
}
}
#[repr(C)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
#[cfg_attr(feature = "serde", serde(bound = "A: Default"))]
pub struct Haw<A>
where
A: Aggregator,
{
watermark: u64,
seconds_wheel: MaybeWheel<A>,
minutes_wheel: MaybeWheel<A>,
hours_wheel: MaybeWheel<A>,
days_wheel: MaybeWheel<A>,
weeks_wheel: MaybeWheel<A>,
years_wheel: MaybeWheel<A>,
window_manager: Option<WindowManager<A>>,
conf: HawConf,
delta: DeltaState<A::PartialAggregate>,
#[cfg(feature = "timer")]
#[cfg_attr(feature = "serde", serde(skip))]
timer: TimerWheel<A>,
#[cfg(feature = "profiler")]
stats: Stats,
}
impl<A: Aggregator> Default for Haw<A> {
fn default() -> Self {
Self::new(Default::default())
}
}
impl<A> Haw<A>
where
A: Aggregator,
{
const MINUTES_AS_SECS: u64 = time::Duration::MINUTE.whole_seconds() as u64;
const HOURS_AS_SECS: u64 = time::Duration::HOUR.whole_seconds() as u64;
const DAYS_AS_SECS: u64 = time::Duration::DAY.whole_seconds() as u64;
const WEEK_AS_SECS: u64 = time::Duration::WEEK.whole_seconds() as u64;
const YEAR_AS_SECS: u64 = Self::WEEK_AS_SECS * WEEKS.get() as u64;
const CYCLE_LENGTH_SECS: u64 = Self::CYCLE_LENGTH.whole_seconds() as u64;
const SECOND_AS_MS: u64 = time::Duration::SECOND.whole_milliseconds() as u64;
const TOTAL_SECS_IN_WHEEL: u64 = Self::YEAR_AS_SECS * YEARS.get() as u64;
pub const CYCLE_LENGTH: time::Duration =
time::Duration::seconds((Self::YEAR_AS_SECS * (YEARS.get() as u64 + 1)) as i64); pub const TOTAL_WHEEL_SLOTS: usize =
SECONDS.get() + MINUTES.get() + HOURS.get() + DAYS.get() + WEEKS.get() + YEARS.get();
pub fn new(conf: HawConf) -> Self {
Self {
watermark: conf.watermark,
seconds_wheel: MaybeWheel::new(conf.seconds),
minutes_wheel: MaybeWheel::new(conf.minutes),
hours_wheel: MaybeWheel::new(conf.hours),
days_wheel: MaybeWheel::new(conf.days),
weeks_wheel: MaybeWheel::new(conf.weeks),
years_wheel: MaybeWheel::new(conf.years),
conf,
delta: DeltaState::new(conf.watermark, Vec::new()),
window_manager: None,
#[cfg(feature = "timer")]
timer: TimerWheel::new(RawTimerWheel::default()),
#[cfg(feature = "profiler")]
stats: Stats::default(),
}
}
#[doc(hidden)]
pub fn set_optimizer_hints(&mut self, hints: bool) {
self.conf.optimizer.use_hints = hints;
}
pub fn delta_state(&self) -> DeltaState<A::PartialAggregate> {
self.delta.clone()
}
pub fn len(&self) -> usize {
self.seconds_wheel.len()
+ self.minutes_wheel.len()
+ self.hours_wheel.len()
+ self.days_wheel.len()
+ self.weeks_wheel.len()
+ self.years_wheel.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn is_full(&self) -> bool {
self.len() == Self::TOTAL_WHEEL_SLOTS
}
#[inline]
fn now(&self) -> OffsetDateTime {
Self::to_offset_date(self.watermark)
}
pub fn size_bytes(&self) -> usize {
let secs = self.seconds_wheel.size_bytes();
let min = self.minutes_wheel.size_bytes();
let hr = self.hours_wheel.size_bytes();
let day = self.days_wheel.size_bytes();
let week = self.weeks_wheel.size_bytes();
let year = self.years_wheel.size_bytes();
secs + min + hr + day + week + year
}
pub fn remaining_ticks(&self) -> u64 {
Self::TOTAL_SECS_IN_WHEEL - self.current_time_in_cycle().whole_seconds() as u64
}
#[inline]
pub fn current_time_in_cycle(&self) -> Duration {
let secs = self.seconds_wheel.rotation_count() as u64;
let min_secs = self.minutes_wheel.rotation_count() as u64 * Self::MINUTES_AS_SECS;
let hr_secs = self.hours_wheel.rotation_count() as u64 * Self::HOURS_AS_SECS;
let day_secs = self.days_wheel.rotation_count() as u64 * Self::DAYS_AS_SECS;
let week_secs = self.weeks_wheel.rotation_count() as u64 * Self::WEEK_AS_SECS;
let year_secs = self.years_wheel.rotation_count() as u64 * Self::YEAR_AS_SECS;
let cycle_time = secs + min_secs + hr_secs + day_secs + week_secs + year_secs;
Duration::seconds(cycle_time as i64)
}
#[inline]
const fn to_ms(ts: u64) -> u64 {
ts.saturating_mul(1000)
}
#[inline]
fn to_offset_date(ts: u64) -> OffsetDateTime {
OffsetDateTime::from_unix_timestamp(ts as i64 / 1000).unwrap()
}
pub fn to_prefix_wheels(&mut self) {
assert!(
A::invertible(),
"Aggregator is not invertible, must implement combine_inverse."
);
if let Some(seconds) = self.seconds_wheel.as_mut() {
seconds.to_prefix();
if let Some(minutes) = self.minutes_wheel.as_mut() {
minutes.to_prefix();
if let Some(hours) = self.hours_wheel.as_mut() {
hours.to_prefix();
if let Some(days) = self.days_wheel.as_mut() {
days.to_prefix();
}
}
}
}
}
pub fn to_simd_wheels(&mut self) {
if let Some(seconds) = self.seconds_wheel.as_mut() {
seconds.to_simd();
if let Some(minutes) = self.minutes_wheel.as_mut() {
minutes.to_simd();
if let Some(hours) = self.hours_wheel.as_mut() {
hours.to_simd();
if let Some(days) = self.days_wheel.as_mut() {
days.to_simd();
}
}
}
}
}
#[doc(hidden)]
pub fn convert_all_to_array(&mut self) {
self.seconds_wheel.as_mut().unwrap().to_deque();
self.minutes_wheel.as_mut().unwrap().to_deque();
self.hours_wheel.as_mut().unwrap().to_deque();
self.days_wheel.as_mut().unwrap().to_deque();
}
pub fn window(&mut self, window: Window) {
self.window_manager = Some(WindowManager::new(self.watermark, window));
}
#[inline]
pub(crate) fn advance_to(
&mut self,
watermark: u64,
waw: &mut WriterWheel<A>,
) -> Vec<WindowAggregate<A::PartialAggregate>> {
let diff = watermark.saturating_sub(self.watermark());
self.advance(Duration::milliseconds(diff as i64), waw)
}
#[inline]
pub fn delta_advance(
&mut self,
deltas: impl IntoIterator<Item = Option<A::PartialAggregate>>,
) -> Vec<WindowAggregate<A::PartialAggregate>> {
let mut windows = Vec::new();
for delta in deltas {
let delta_for_tick = delta.clone();
self.tick(delta_for_tick);
if self.conf.generate_deltas {
self.delta.push(delta.clone());
}
self.handle_window_maybe(delta, &mut windows);
}
windows
}
#[inline(always)]
pub fn advance(
&mut self,
duration: Duration,
waw: &mut WriterWheel<A>,
) -> Vec<WindowAggregate<A::PartialAggregate>> {
let ticks: usize = duration.whole_seconds() as usize;
let mut windows = Vec::new();
if ticks <= Self::CYCLE_LENGTH_SECS as usize {
for _ in 0..ticks {
let delta = waw.tick().map(A::freeze);
if self.conf.generate_deltas {
self.delta.push(delta.clone());
}
let delta_for_tick = delta.clone();
self.tick(delta_for_tick);
self.handle_window_maybe(delta, &mut windows);
}
} else {
self.clear();
}
windows
}
fn handle_window_maybe(
&mut self,
delta: Option<A::PartialAggregate>,
windows: &mut Vec<WindowAggregate<A::PartialAggregate>>,
) {
match self.window_manager.as_ref().map(|wm| wm.window) {
Some(Window::Session { .. }) => self.handle_session_window(delta, windows),
Some(Window::Sliding { .. }) => self.handle_slicing_window(windows),
Some(Window::Tumbling { .. }) => self.handle_slicing_window(windows),
None => {
}
}
}
fn handle_session_window(
&mut self,
delta: Option<A::PartialAggregate>,
windows: &mut Vec<WindowAggregate<A::PartialAggregate>>,
) {
let manager = self.window_manager.as_mut().unwrap();
let (ref mut state, ref mut aggregator) = manager.aggregator.session_as_mut();
if let Some(partial) = delta {
if !state.has_active_session() {
state.activate_session(self.watermark.saturating_sub(1000)); }
aggregator.aggregate_session(partial);
state.reset_inactive_period();
} else {
if state.has_active_session() {
state.bump_inactive_period(Duration::SECOND);
if state.is_inactive() {
let session_aggregate = aggregator.get_and_reset();
let window_start_ms = state.reset().unwrap();
let window_end_ms = self.watermark.saturating_sub(1000);
windows.push(WindowAggregate {
window_start_ms,
window_end_ms,
aggregate: session_aggregate,
});
}
}
}
}
fn handle_slicing_window(&mut self, windows: &mut Vec<WindowAggregate<A::PartialAggregate>>) {
if let Some((pairs_remaining, current_pair_len)) = self.window_manager.as_mut().map(|wm| {
let (ref mut state, _) = wm.aggregator.slicing_as_mut();
state.pair_ticks_remaining -= 1;
(state.pair_ticks_remaining, state.current_pair_len)
}) {
if pairs_remaining == 0 {
let from = Self::to_offset_date(self.watermark - current_pair_len as u64);
let to = Self::to_offset_date(self.watermark);
let pair = self.combine_range(WheelRange {
start: from,
end: to,
});
let manager = self.window_manager.as_mut().unwrap();
let (ref mut state, ref mut aggregator) = manager.aggregator.slicing_as_mut();
aggregator.push(pair.unwrap_or_else(|| A::IDENTITY.clone()));
state.update_state(self.watermark);
if self.watermark == state.next_window_end {
let window_start_ms = self.watermark.saturating_sub(state.range as u64);
let window_end_ms = self.watermark;
windows.push(WindowAggregate {
window_start_ms,
window_end_ms,
aggregate: aggregator.query(),
});
for _i in 0..state.total_pairs() {
aggregator.pop();
}
state.next_window_end += state.slide as u64;
}
}
}
}
pub fn clear(&mut self) {
self.seconds_wheel.clear();
self.minutes_wheel.clear();
self.hours_wheel.clear();
self.days_wheel.clear();
self.weeks_wheel.clear();
self.years_wheel.clear();
}
#[inline]
pub fn watermark(&self) -> u64 {
self.watermark
}
#[inline]
pub fn group_by(
&self,
range: WheelRange,
interval: Duration,
) -> Option<Vec<(u64, A::Aggregate)>> {
let WheelRange { start, end } = range;
if start >= end
|| interval.whole_seconds() == 0
|| Self::to_ms(start.unix_timestamp() as u64) > self.watermark
{
return None;
}
let mut result = Vec::new();
let mut step = start;
while step < end {
let next: OffsetDateTime = step + time::Duration::seconds(interval.whole_seconds());
let query_range = WheelRange {
start: step,
end: next,
};
result.push((
Self::to_ms(step.unix_timestamp() as u64),
A::lower(
self.combine_range(query_range)
.unwrap_or_else(|| A::IDENTITY.clone()),
),
));
step = next;
}
Some(result)
}
#[inline]
pub fn range(&self, range: impl Into<WheelRange>) -> Option<Vec<(u64, A::PartialAggregate)>> {
let range = range.into();
let start = range.start;
let end = range.end;
match range.lowest_granularity() {
Granularity::Second => {
let seconds = (end - start).whole_seconds() as usize;
self.seconds_wheel
.range(start, seconds, Granularity::Second)
}
Granularity::Minute => {
let minutes = (end - start).whole_minutes() as usize;
self.minutes_wheel
.range(start, minutes, Granularity::Minute)
}
Granularity::Hour => {
let hours = (end - start).whole_hours() as usize;
self.hours_wheel.range(start, hours, Granularity::Hour)
}
Granularity::Day => {
let days = (end - start).whole_days() as usize;
self.days_wheel.range(start, days, Granularity::Day)
}
}
}
#[inline]
pub fn range_and_lower(
&self,
range: impl Into<WheelRange>,
) -> Option<Vec<(u64, A::Aggregate)>> {
self.range(range).map(|partials| {
partials
.into_iter()
.map(|(ts, pa)| (ts, A::lower(pa)))
.collect()
})
}
#[inline]
pub fn explain_combine_range(&self, range: impl Into<WheelRange>) -> Option<ExecutionPlan> {
self.create_exec_plan(range.into())
}
#[inline]
pub fn combine_range_and_lower(&self, range: impl Into<WheelRange>) -> Option<A::Aggregate> {
self.combine_range(range).map(A::lower)
}
#[inline]
pub fn combine_range(&self, range: impl Into<WheelRange>) -> Option<A::PartialAggregate> {
self.combine_range_inner(range).0
}
#[inline]
pub fn analyze_combine_range(
&self,
range: impl Into<WheelRange>,
) -> (Option<A::PartialAggregate>, usize) {
self.combine_range_inner(range)
}
#[inline]
fn combine_range_inner(
&self,
range: impl Into<WheelRange>,
) -> (Option<A::PartialAggregate>, usize) {
#[cfg(feature = "profiler")]
profile_scope!(&self.stats.combine_range);
let range = range.into();
let WheelRange { start, end } = range;
if start > end {
return (None, 0);
}
match self.create_exec_plan(range) {
Some(ExecutionPlan::WheelAggregation(wheel_agg)) => {
(self.wheel_aggregation(wheel_agg), wheel_agg.cost())
}
Some(ExecutionPlan::CombinedAggregation(combined)) => {
self.combined_aggregation(combined)
}
Some(ExecutionPlan::LandmarkAggregation) => self.analyze_landmark(),
Some(ExecutionPlan::InverseLandmarkAggregation(wheel_aggs)) => {
let (result, cost) = self.inverse_landmark_aggregation(wheel_aggs);
(Some(result), cost)
}
None => (None, 0), }
}
#[inline]
fn create_exec_plan(&self, mut range: WheelRange) -> Option<ExecutionPlan> {
#[cfg(feature = "profiler")]
profile_scope!(&self.stats.exec_plan);
let mut best_plan: Option<ExecutionPlan> = None;
let wheel_start = self
.watermark()
.saturating_sub(self.current_time_in_cycle().whole_milliseconds() as u64);
range.start = cmp::max(range.start, Self::to_offset_date(wheel_start));
let end_ms = Self::to_ms(range.end.unix_timestamp() as u64);
let start_ms = Self::to_ms(range.start.unix_timestamp() as u64);
if start_ms <= wheel_start && end_ms >= self.watermark() {
return Some(ExecutionPlan::LandmarkAggregation);
}
if let Some(plan) = self.wheel_aggregation_plan(range) {
if plan.is_prefix() || plan.cost() < SECONDS.get() {
return Some(ExecutionPlan::WheelAggregation(plan));
}
best_plan = Some(ExecutionPlan::WheelAggregation(plan));
}
let use_combined_aggregation = {
let single_wheel_cost = best_plan.as_ref().map(|p| p.cost()).unwrap_or(0);
if self.conf.optimizer.use_hints && A::simd_support() {
single_wheel_cost > self.conf.optimizer.heuristics.simd_threshold
} else {
true
}
};
if use_combined_aggregation {
Self::maybe_update_plan_or_insert(
self.combined_aggregation_plan(Self::split_wheel_ranges(range))
.map(ExecutionPlan::CombinedAggregation),
&mut best_plan,
);
}
best_plan
}
#[inline]
fn maybe_update_plan_or_insert(
plan: Option<ExecutionPlan>,
current: &mut Option<ExecutionPlan>,
) {
match (plan, current.take()) {
(Some(p), Some(c)) => *current = Some(cmp::min(c, p)),
(Some(p), None) => *current = Some(p),
(None, Some(c)) => *current = Some(c),
(None, None) => (),
}
}
#[inline]
fn wheel_aggregation_plan(&self, range: WheelRange) -> Option<WheelAggregation> {
let start = range.start;
let end = range.end;
match range.lowest_granularity() {
Granularity::Second => {
let seconds = (end - start).whole_seconds() as usize;
self.seconds_wheel
.plan(start, seconds, range, Granularity::Second)
}
Granularity::Minute => {
let minutes = (end - start).whole_minutes() as usize;
self.minutes_wheel
.plan(start, minutes, range, Granularity::Minute)
}
Granularity::Hour => {
let hours = (end - start).whole_hours() as usize;
self.hours_wheel
.plan(start, hours, range, Granularity::Hour)
}
Granularity::Day => {
let days = (end - start).whole_days() as usize;
self.days_wheel.plan(start, days, range, Granularity::Day)
}
}
}
#[inline]
#[doc(hidden)]
pub fn split_wheel_ranges(range: WheelRange) -> WheelRanges {
let mut ranges = WheelRanges::default();
let WheelRange { start, end } = range;
let mut current_start = start;
while current_start < end {
let current_end = Self::new_curr_end(current_start, end);
let range = WheelRange {
start: current_start,
end: current_end,
};
ranges.push(range);
current_start = current_end;
}
ranges
}
#[inline]
fn new_curr_end(current_start: OffsetDateTime, end: OffsetDateTime) -> OffsetDateTime {
let second = current_start.second();
let minute = current_start.minute();
let hour = current_start.hour();
let day = current_start.day();
let next = if second > 0 {
time::Duration::seconds(60 - second as i64)
} else if minute > 0 {
time::Duration::minutes(60 - minute as i64)
} else if hour > 0 {
time::Duration::hours(24 - hour as i64)
} else if day > 0 {
let year = current_start.year();
let month = current_start.month() as u8;
let days_in_current_month = Self::days_in_month(year, month);
let remaining_days_in_month = days_in_current_month as i64 - day as i64;
let days_to_add = if remaining_days_in_month > 0 {
remaining_days_in_month
} else {
1 };
time::Duration::days(days_to_add)
} else {
unimplemented!("Weeks and Years not supported yet");
};
let next_aligned = current_start + next;
if next_aligned > end {
let rem = end - current_start;
let rem_secs = rem.whole_seconds();
let rem_mins = rem.whole_minutes();
let rem_hours = rem.whole_hours();
let rem_days = rem.whole_days();
let remains = [
(rem_secs, time::Duration::seconds(rem_secs)),
(rem_mins, time::Duration::minutes(rem_mins)),
(rem_hours, time::Duration::hours(rem_hours)),
(rem_days, time::Duration::days(rem_days)),
];
let idx = remains
.iter()
.rposition(|&(rem, _)| rem > 0)
.unwrap_or(remains.len() - 1);
current_start + remains[idx].1
} else {
next_aligned
}
}
#[inline]
fn days_in_month(year: i32, month: u8) -> u8 {
const DAYS_30: [u8; 4] = [4, 6, 9, 11];
const DAYS_31: [u8; 7] = [1, 3, 5, 7, 8, 10, 12];
if DAYS_31.contains(&month) {
31
} else if DAYS_30.contains(&month) {
30
} else {
if (year % 4 == 0 && year % 100 != 0) || (year % 400 == 0) {
29
} else {
28
}
}
}
#[doc(hidden)]
#[inline]
pub fn combined_aggregation_plan(&self, ranges: WheelRanges) -> Option<CombinedAggregation> {
#[cfg(feature = "profiler")]
profile_scope!(&self.stats.combined_aggregation_plan);
let mut aggregations = WheelAggregations::default();
for range in ranges.into_iter() {
match self.wheel_aggregation_plan(range) {
Some(plan) => aggregations.push(plan),
None => return None, }
}
let granularity_score = |start: &OffsetDateTime, end: &OffsetDateTime| {
let (sh, sm, ss) = start.time().as_hms();
let (eh, em, es) = end.time().as_hms();
if ss > 0 || es > 0 {
0
} else if sm > 0 || em > 0 {
1
} else if sh > 0 || eh > 0 {
2
} else {
3
}
};
aggregations.sort_unstable_by(|a, b| {
let a_score = granularity_score(&a.range.start, &a.range.end);
let b_score = granularity_score(&b.range.start, &b.range.end);
a_score.cmp(&b_score)
});
Some(CombinedAggregation::from(aggregations))
}
fn inverse_landmark_aggregation(
&self,
wheel_aggregations: WheelAggregations,
) -> (A::PartialAggregate, usize) {
#[cfg(feature = "profiler")]
profile_scope!(&self.stats.inverse_landmark);
let (landmark, lcost) = self.analyze_landmark();
let combine_inverse = A::combine_inverse().unwrap();
wheel_aggregations.into_iter().fold(
(landmark.unwrap_or_else(|| A::IDENTITY.clone()), lcost),
|mut acc, plan| {
let cost = plan.cost();
let agg = self
.wheel_aggregation(plan)
.unwrap_or_else(|| A::IDENTITY.clone());
acc.0 = combine_inverse(acc.0, agg);
acc.1 += cost;
acc
},
)
}
#[inline]
fn combined_aggregation(
&self,
combined: CombinedAggregation,
) -> (Option<A::PartialAggregate>, usize) {
#[cfg(feature = "profiler")]
profile_scope!(&self.stats.combined_aggregation);
let cost = combined.cost();
let agg = combined
.aggregations
.into_iter()
.fold(None, |mut acc, wheel_agg| {
match self.wheel_aggregation(wheel_agg) {
Some(agg) => {
combine_or_insert::<A>(&mut acc, agg);
acc
}
None => acc,
}
});
(agg, cost)
}
fn wheel_aggregation(&self, agg: WheelAggregation) -> Option<A::PartialAggregate> {
#[cfg(feature = "profiler")]
profile_scope!(&self.stats.wheel_aggregation);
let (start, end) = agg.slots;
let slot_range = start..end;
match agg.granularity {
Granularity::Second => self.seconds_wheel.combine_range(slot_range),
Granularity::Minute => self.minutes_wheel.combine_range(slot_range),
Granularity::Hour => self.hours_wheel.combine_range(slot_range),
Granularity::Day => self.days_wheel.combine_range(slot_range),
}
}
pub fn interval(&self, dur: Duration) -> Option<A::PartialAggregate> {
self.interval_with_stats(dur).0
}
pub fn interval_and_lower(&self, dur: Duration) -> Option<A::Aggregate> {
self.interval(dur).map(|partial| A::lower(partial))
}
#[inline]
pub fn interval_with_stats(&self, dur: Duration) -> (Option<A::PartialAggregate>, usize) {
#[cfg(feature = "profiler")]
profile_scope!(&self.stats.interval);
let to = self.now();
let from = to.saturating_sub(time::Duration::seconds(dur.whole_seconds()));
self.analyze_combine_range(WheelRange {
start: from,
end: to,
})
}
#[inline]
pub fn landmark(&self) -> Option<A::PartialAggregate> {
self.analyze_landmark().0
}
#[inline]
pub fn landmark_and_lower(&self) -> Option<A::Aggregate> {
self.landmark().map(|partial| A::lower(partial))
}
#[inline]
pub(crate) fn analyze_landmark(&self) -> (Option<A::PartialAggregate>, usize) {
#[cfg(feature = "profiler")]
profile_scope!(&self.stats.landmark);
let wheels = [
self.seconds_wheel.total(),
self.minutes_wheel.total(),
self.hours_wheel.total(),
self.days_wheel.total(),
self.weeks_wheel.total(),
self.years_wheel.total(),
];
Self::reduce(wheels)
}
#[inline]
fn reduce(
partial_aggs: impl IntoIterator<Item = Option<A::PartialAggregate>>,
) -> (Option<A::PartialAggregate>, usize) {
let mut combines = 0;
let mut accumulator: Option<A::PartialAggregate> = None;
for value in partial_aggs.into_iter().flatten() {
match accumulator.take() {
Some(current) => {
combines += 1;
accumulator = Some(A::combine(current, value));
}
None => {
accumulator = Some(value);
}
}
}
(accumulator, combines)
}
#[cfg(feature = "timer")]
pub fn schedule_once(
&self,
time: u64,
f: impl Fn(&Haw<A>) + 'static,
) -> Result<(), TimerError<TimerAction<A>>> {
self.timer
.write()
.schedule_at(time, TimerAction::Oneshot(Box::new(f)))
}
#[cfg(feature = "timer")]
pub fn schedule_repeat(
&self,
at: u64,
interval: Duration,
f: impl Fn(&Haw<A>) + 'static,
) -> Result<(), TimerError<TimerAction<A>>> {
self.timer
.write()
.schedule_at(at, TimerAction::Repeat((at, interval, Box::new(f))))
}
#[inline]
fn tick(&mut self, partial_opt: Option<A::PartialAggregate>) {
#[cfg(feature = "profiler")]
profile_scope!(&self.stats.tick);
self.watermark += Self::SECOND_AS_MS;
let partial = partial_opt.unwrap_or_else(|| A::IDENTITY.clone());
let seconds = self.seconds_wheel.get_or_insert();
seconds.insert_head(partial);
if let Some(rot_data) = seconds.tick() {
let minutes = self.minutes_wheel.get_or_insert();
minutes.insert_slot(rot_data);
if let Some(rot_data) = minutes.tick() {
let hours = self.hours_wheel.get_or_insert();
hours.insert_slot(rot_data);
if let Some(rot_data) = hours.tick() {
let days = self.days_wheel.get_or_insert();
days.insert_slot(rot_data);
if let Some(rot_data) = days.tick() {
let weeks = self.weeks_wheel.get_or_insert();
weeks.insert_slot(rot_data);
if let Some(rot_data) = weeks.tick() {
let years = self.years_wheel.get_or_insert();
years.insert_slot(rot_data);
let _ = years.tick();
}
}
}
}
}
#[cfg(feature = "timer")]
{
let mut timer = self.timer.write();
for action in timer.advance_to(self.watermark) {
match action {
TimerAction::Oneshot(udf) => {
udf(self);
}
TimerAction::Repeat((at, interval, udf)) => {
udf(self);
let new_at = at + interval.whole_milliseconds() as u64;
let _ =
timer.schedule_at(new_at, TimerAction::Repeat((new_at, interval, udf)));
}
}
}
}
}
pub fn seconds(&self) -> Option<&Wheel<A>> {
self.seconds_wheel.as_ref()
}
pub fn seconds_unchecked(&self) -> &Wheel<A> {
self.seconds_wheel.as_ref().unwrap()
}
pub fn minutes(&self) -> Option<&Wheel<A>> {
self.minutes_wheel.as_ref()
}
pub fn minutes_unchecked(&self) -> &Wheel<A> {
self.minutes_wheel.as_ref().unwrap()
}
pub fn hours(&self) -> Option<&Wheel<A>> {
self.hours_wheel.as_ref()
}
pub fn hours_unchecked(&self) -> &Wheel<A> {
self.hours_wheel.as_ref().unwrap()
}
pub fn days(&self) -> Option<&Wheel<A>> {
self.days_wheel.as_ref()
}
pub fn days_unchecked(&self) -> &Wheel<A> {
self.days_wheel.as_ref().unwrap()
}
pub fn weeks(&self) -> Option<&Wheel<A>> {
self.weeks_wheel.as_ref()
}
pub fn weeks_unchecked(&self) -> &Wheel<A> {
self.weeks_wheel.as_ref().unwrap()
}
pub fn years(&self) -> Option<&Wheel<A>> {
self.years_wheel.as_ref()
}
pub fn years_unchecked(&self) -> &Wheel<A> {
self.years_wheel.as_ref().unwrap()
}
pub(crate) fn merge(&mut self, other: &mut Self) {
let other_watermark = other.watermark();
if self.watermark() > other_watermark {
other.advance_to(self.watermark(), &mut WriterWheel::default());
} else {
self.advance_to(other_watermark, &mut WriterWheel::default());
}
self.seconds_wheel.merge(&other.seconds_wheel);
self.minutes_wheel.merge(&other.minutes_wheel);
self.hours_wheel.merge(&other.hours_wheel);
self.days_wheel.merge(&other.days_wheel);
self.weeks_wheel.merge(&other.weeks_wheel);
self.years_wheel.merge(&other.years_wheel);
}
#[cfg(feature = "profiler")]
pub fn stats(&self) -> &Stats {
&self.stats
}
}
#[cfg(test)]
mod tests {
use crate::{
aggregator::sum::{U32SumAggregator, U64SumAggregator},
duration::NumericalDuration,
wheels::read::plan::Aggregation,
};
use proptest::prelude::*;
use time::macros::datetime;
use super::*;
#[test]
fn delta_advance_test() {
let mut haw: Haw<U64SumAggregator> = Haw::default();
let deltas = vec![Some(10), None, Some(50), None];
haw.delta_advance(deltas);
assert_eq!(haw.interval(1.seconds()), Some(0));
assert_eq!(haw.interval(2.seconds()), Some(50));
assert_eq!(haw.interval(3.seconds()), Some(50));
assert_eq!(haw.interval(4.seconds()), Some(60));
}
#[test]
fn wheel_range_test() {
let start = datetime!(2023 - 11 - 09 00:00:00 UTC);
let end = datetime!(2023 - 11 - 12 00:00:00 UTC);
assert!(
WheelRange::new(start.unix_timestamp() as u64, end.unix_timestamp() as u64).is_ok()
);
}
#[test]
fn out_of_bounds_aggregation() {
let watermark = 1699488000000;
let conf = HawConf::default().with_watermark(watermark);
let mut haw: Haw<U64SumAggregator> = Haw::new(conf);
let days_as_secs = 3600 * 24;
let days = days_as_secs * 3;
let deltas: Vec<Option<u64>> = (0..days).map(|_| Some(1)).collect();
haw.delta_advance(deltas);
let start = datetime!(2023 - 11 - 09 15:50:50 UTC);
let end = datetime!(2023 - 11 - 11 12:30:45 UTC);
assert_eq!(haw.combine_range(WheelRange { start, end }), None);
}
#[test]
fn range_query_sec_test() {
let watermark = 1699488000000;
let conf = HawConf::default().with_watermark(watermark);
let mut haw: Haw<U64SumAggregator> = Haw::new(conf);
let deltas = vec![Some(10), None, Some(50), None];
haw.delta_advance(deltas);
assert_eq!(haw.interval(1.seconds()), Some(0));
assert_eq!(haw.interval(2.seconds()), Some(50));
assert_eq!(haw.interval(3.seconds()), Some(50));
assert_eq!(haw.interval(4.seconds()), Some(60));
let start = datetime!(2023-11-09 00:00:00 UTC);
let end = datetime!(2023-11-09 00:00:02 UTC);
let agg = haw.combine_range(WheelRange { start, end });
assert_eq!(agg, Some(10));
assert_eq!(
haw.range(WheelRange { start, end }),
Some(vec![(1699488000000, 10), (1699488001000, 0)])
);
let start = datetime!(2023-11-09 00:00:00 UTC);
let end = datetime!(2023-11-09 00:00:04 UTC);
let agg = haw.combine_range(WheelRange { start, end });
assert_eq!(agg, Some(60));
assert_eq!(
haw.range(WheelRange { start, end }),
Some(vec![
(1699488000000, 10),
(1699488001000, 0),
(1699488002000, 50),
(1699488003000, 0),
])
);
}
#[test]
fn range_query_min_test() {
let watermark = 1699488000000;
let conf = HawConf::default().with_watermark(watermark);
let mut haw: Haw<U64SumAggregator> = Haw::new(conf);
let deltas: Vec<Option<u64>> = (0..180).map(|_| Some(1)).collect();
haw.delta_advance(deltas);
assert_eq!(haw.watermark(), 1699488180000);
let start = datetime!(2023-11-09 00:00 UTC);
let end = datetime!(2023-11-09 00:03 UTC);
let agg = haw.combine_range(WheelRange { start, end });
assert_eq!(agg, Some(180));
}
#[test]
fn range_query_hour_test() {
let watermark = 1699488000000;
let conf = HawConf::default().with_watermark(watermark);
let mut haw: Haw<U64SumAggregator> = Haw::new(conf);
let hours_as_secs = 3600;
let hours = hours_as_secs * 3;
let deltas: Vec<Option<u64>> = (0..hours).map(|_| Some(1)).collect();
haw.delta_advance(deltas);
let start = datetime!(2023-11-09 00:00 UTC);
let end = datetime!(2023-11-09 01:00 UTC);
assert_eq!(haw.combine_range(WheelRange { start, end }), Some(3600));
assert_eq!(
haw.range(WheelRange { start, end }),
Some(vec![(1699488000000, 3600)])
);
let start = datetime!(2023-11-09 00:00 UTC);
let end = datetime!(2023-11-09 03:00 UTC);
assert_eq!(haw.combine_range(WheelRange { start, end }), Some(10800));
assert_eq!(
haw.range(WheelRange { start, end }),
Some(vec![
(1699488000000, 3600),
(1699491600000, 3600),
(1699495200000, 3600),
])
);
}
#[test]
fn range_query_day_test() {
let watermark = 1699488000000;
let conf = HawConf::default()
.with_watermark(watermark)
.with_retention_policy(RetentionPolicy::Keep);
let mut haw: Haw<U64SumAggregator> = Haw::new(conf);
let days_as_secs = 3600 * 24;
let days = days_as_secs * 3;
let deltas: Vec<Option<u64>> = (0..days).map(|_| Some(1)).collect();
haw.delta_advance(deltas);
let start = datetime!(2023 - 11 - 09 00:00:00 UTC);
let end = datetime!(2023 - 11 - 10 00:00:00 UTC);
assert_eq!(
haw.combine_range(WheelRange { start, end }),
Some(3600 * 24)
);
let start = datetime!(2023 - 11 - 09 00:00:00 UTC);
let end = datetime!(2023 - 11 - 12 00:00:00 UTC);
assert_eq!(haw.combine_range(WheelRange { start, end }), Some(259200));
let start = datetime!(2023 - 11 - 09 15:50:50 UTC);
let end = datetime!(2023 - 11 - 11 12:30:45 UTC);
assert_eq!(haw.combine_range(WheelRange { start, end }), Some(160795));
let plan = haw
.explain_combine_range(WheelRange { start, end })
.unwrap();
let combined_plan = match plan {
ExecutionPlan::CombinedAggregation(plan) => plan,
_ => panic!("not supposed to happen"),
};
let mut expected = WheelAggregations::default();
expected.push(WheelAggregation {
range: WheelRange {
start: datetime!(2023 - 11 - 09 15:50:50 UTC),
end: datetime!(2023 - 11 - 09 15:51:00 UTC),
},
plan: Aggregation::Scan(10),
slots: (202140, 202150),
granularity: Granularity::Second,
});
expected.push(WheelAggregation {
range: WheelRange {
start: datetime!(2023 - 11 - 11 12:30:00 UTC),
end: datetime!(2023 - 11 - 11 12:30:45 UTC),
},
plan: Aggregation::Scan(45),
slots: (41355, 41400),
granularity: Granularity::Second,
});
expected.push(WheelAggregation {
range: WheelRange {
start: datetime!(2023 - 11 - 09 15:51:00 UTC),
end: datetime!(2023 - 11 - 09 16:00:00 UTC),
},
plan: Aggregation::Scan(9),
slots: (3360, 3369),
granularity: Granularity::Minute,
});
expected.push(WheelAggregation {
range: WheelRange {
start: datetime!(2023 - 11 - 11 12:00:00 UTC),
end: datetime!(2023 - 11 - 11 12:30:00 UTC),
},
plan: Aggregation::Scan(30),
slots: (690, 720),
granularity: Granularity::Minute,
});
expected.push(WheelAggregation {
range: WheelRange {
start: datetime!(2023 - 11 - 09 16:00:00 UTC),
end: datetime!(2023 - 11 - 10 00:00:00 UTC),
},
plan: Aggregation::Scan(8),
slots: (48, 56),
granularity: Granularity::Hour,
});
expected.push(WheelAggregation {
range: WheelRange {
start: datetime!(2023 - 11 - 11 00:00:00 UTC),
end: datetime!(2023 - 11 - 11 12:00:00 UTC),
},
plan: Aggregation::Scan(12),
slots: (12, 24),
granularity: Granularity::Hour,
});
expected.push(WheelAggregation {
range: WheelRange {
start: datetime!(2023 - 11 - 10 00:00:00 UTC),
end: datetime!(2023 - 11 - 11 00:00:00 UTC),
},
plan: Aggregation::Scan(1),
slots: (1, 2),
granularity: Granularity::Day,
});
assert_eq!(combined_plan.aggregations, expected);
let start = datetime!(2023 - 11 - 09 05:00:00 UTC);
let end = datetime!(2023 - 11 - 12 00:00:00 UTC);
let result = haw.combine_range(WheelRange { start, end });
let plan = haw.create_exec_plan(WheelRange { start, end });
assert!(matches!(plan, Some(ExecutionPlan::CombinedAggregation(_))));
assert_eq!(result, Some(241200));
}
#[test]
fn month_cross_test() {
let watermark = 1535673600000;
let conf = HawConf::default()
.with_watermark(watermark)
.with_retention_policy(RetentionPolicy::Keep);
let mut haw: Haw<U64SumAggregator> = Haw::new(conf);
let days_as_secs = 3600 * 24;
let days = days_as_secs * 3;
let deltas: Vec<Option<u64>> = (0..days).map(|_| Some(1)).collect();
haw.delta_advance(deltas);
let start = datetime!(2018 - 08 - 31 00:00:00 UTC);
let end = datetime!(2018 - 08 - 31 00:05:00 UTC);
assert_eq!(haw.combine_range(WheelRange { start, end }), Some(300));
let start = datetime!(2018 - 08 - 31 00:00:00 UTC);
let end = datetime!(2018 - 09 - 01 12:00:00 UTC);
assert_eq!(haw.combine_range(WheelRange { start, end }), Some(129600));
let start = datetime!(2018 - 08 - 31 00:00:00 UTC);
let end = datetime!(2018 - 09 - 02 00:00:00 UTC);
assert_eq!(haw.combine_range(WheelRange { start, end }), Some(172800));
}
#[test]
fn year_cross_test() {
let watermark = 1672444800000;
let conf = HawConf::default()
.with_watermark(watermark)
.with_retention_policy(RetentionPolicy::Keep);
let mut haw: Haw<U64SumAggregator> = Haw::new(conf);
let days_as_secs = 3600 * 24;
let days = days_as_secs * 366;
let deltas: Vec<Option<u64>> = (0..days).map(|_| Some(1)).collect();
haw.delta_advance(deltas);
let start = datetime!(2022 - 12 - 31 00:00:00 UTC);
let end = datetime!(2022 - 12 - 31 23:59:59 UTC);
assert_eq!(haw.combine_range(WheelRange { start, end }), Some(86399));
let start = datetime!(2022 - 12 - 31 00:00:00 UTC);
let end = datetime!(2023 - 01 - 01 12:00:00 UTC);
assert_eq!(haw.combine_range(WheelRange { start, end }), Some(129600));
let start = datetime!(2022 - 12 - 31 00:00:00 UTC);
let end = datetime!(2024 - 01 - 01 00:00:00 UTC);
assert_eq!(haw.combine_range(WheelRange { start, end }), Some(31622400));
}
#[test]
fn overlapping_ranges_test() {
let watermark = 1699488000000; let conf = HawConf::default().with_watermark(watermark);
let mut haw: Haw<U64SumAggregator> = Haw::new(conf);
let deltas = vec![Some(10), Some(20), Some(30), Some(40)];
haw.delta_advance(deltas);
let range1 = WheelRange {
start: datetime!(2023-11-09 00:00:00 UTC),
end: datetime!(2023-11-09 00:00:02 UTC),
};
let range2 = WheelRange {
start: datetime!(2023-11-09 00:00:01 UTC),
end: datetime!(2023-11-09 00:00:03 UTC),
};
let range3 = WheelRange {
start: datetime!(2023-11-09 00:00:02 UTC),
end: datetime!(2023-11-09 00:00:04 UTC),
};
assert_eq!(haw.combine_range(range1), Some(30));
assert_eq!(haw.combine_range(range2), Some(50));
assert_eq!(haw.combine_range(range3), Some(70));
}
#[test]
fn sparse_data_test() {
let watermark = 1699488000000; let conf = HawConf::default().with_watermark(watermark);
let mut haw: Haw<U64SumAggregator> = Haw::new(conf);
let deltas = vec![
Some(10),
None,
None,
None,
None,
Some(20),
None,
None,
None,
None,
Some(30),
];
haw.delta_advance(deltas);
let start = datetime!(2023-11-09 00:00:00 UTC);
let end = datetime!(2023-11-09 00:00:11 UTC);
assert_eq!(haw.combine_range(WheelRange { start, end }), Some(60));
assert_eq!(
haw.range(WheelRange { start, end }),
Some(vec![
(1699488000000, 10),
(1699488001000, 0),
(1699488002000, 0),
(1699488003000, 0),
(1699488004000, 0),
(1699488005000, 20),
(1699488006000, 0),
(1699488007000, 0),
(1699488008000, 0),
(1699488009000, 0),
(1699488010000, 30),
])
);
}
#[test]
fn empty_haw_test() {
let watermark = 1699488000000; let conf = HawConf::default().with_watermark(watermark);
let haw: Haw<U64SumAggregator> = Haw::new(conf);
assert_eq!(haw.interval(1.seconds()), None);
let start = datetime!(2023-11-09 00:00:00 UTC);
let end = datetime!(2023-11-09 00:00:02 UTC);
assert_eq!(haw.combine_range(WheelRange { start, end }), None);
assert_eq!(haw.range(WheelRange { start, end }), None);
}
#[test]
fn out_of_bounds_query_test() {
let watermark = 1699488000000; let conf = HawConf::default().with_watermark(watermark);
let mut haw: Haw<U64SumAggregator> = Haw::new(conf);
let deltas = vec![Some(10), Some(20), Some(30), Some(40)];
haw.delta_advance(deltas);
let before_and_in_range = WheelRange {
start: datetime!(2023-11-08 23:59:58 UTC),
end: datetime!(2023-11-09 00:00:01 UTC),
};
assert_eq!(haw.combine_range(before_and_in_range), Some(10));
let outside_range = WheelRange {
start: datetime!(2023-11-09 00:00:05 UTC),
end: datetime!(2023-11-09 00:00:08 UTC),
};
assert_eq!(haw.combine_range(outside_range), None);
}
#[test]
fn group_by_test() {
let watermark = 1699488000000;
let conf = HawConf::default()
.with_watermark(watermark)
.with_retention_policy(RetentionPolicy::Keep);
let mut haw: Haw<U64SumAggregator> = Haw::new(conf);
let days = Duration::days(10);
let deltas: Vec<Option<u64>> = (0..days.whole_seconds()).map(|_| Some(1)).collect();
haw.delta_advance(deltas);
let start = datetime!(2023 - 11 - 09 00:00:00 UTC);
let end = datetime!(2023 - 11 - 10 00:00:00 UTC);
assert_eq!(
haw.group_by(WheelRange { start, end }, 1.days()),
Some(vec![(1699488000000, 86400)])
);
let start = datetime!(2023 - 11 - 09 00:00:00 UTC);
let end = datetime!(2023 - 11 - 12 00:00:00 UTC);
assert_eq!(
haw.group_by(WheelRange { start, end }, 2.days()),
Some(vec![(1699488000000, 172800), (1699660800000, 172800)])
);
let start = datetime!(2023 - 11 - 09 00:00:00 UTC);
let end = datetime!(2023 - 11 - 11 00:00:00 UTC);
assert_eq!(
haw.group_by(WheelRange { start, end }, 12.hours()),
Some(vec![
(1699488000000, 43200),
(1699531200000, 43200),
(1699574400000, 43200),
(1699617600000, 43200)
])
);
let start = datetime!(2023 - 11 - 09 00:00:00 UTC);
let end = datetime!(2023 - 11 - 09 00:00:00 UTC);
assert_eq!(haw.group_by(WheelRange { start, end }, 1.days()), None,);
let start = datetime!(2023 - 11 - 20 00:00:00 UTC);
let end = datetime!(2023 - 11 - 21 00:00:00 UTC);
assert_eq!(haw.group_by(WheelRange { start, end }, 1.days()), None);
let start = datetime!(2023 - 11 - 10 00:00:00 UTC);
let end = datetime!(2023 - 11 - 09 00:00:00 UTC);
assert_eq!(haw.group_by(WheelRange { start, end }, 1.days()), None);
let start = datetime!(2023 - 11 - 09 00:00:00 UTC);
let end = datetime!(2023 - 11 - 10 00:00:00 UTC);
assert_eq!(haw.group_by(WheelRange { start, end }, 0.seconds()), None);
let start = datetime!(2023 - 11 - 09 00:00:00 UTC);
let end = datetime!(2023 - 11 - 09 01:00:00 UTC);
assert_eq!(
haw.group_by(WheelRange { start, end }, 15.minutes()),
Some(vec![
(1699488000000, 900), (1699488900000, 900), (1699489800000, 900), (1699490700000, 900), ])
);
}
proptest! {
#[test]
fn split_wheel_ranges(range in any::<WheelRange>()) {
let ranges = Haw::<U32SumAggregator>::split_wheel_ranges(range);
let mut current_start = range.start;
for r in ranges.iter() {
prop_assert!(r.start >= current_start, "Start time should be non-decreasing");
prop_assert!(r.end <= range.end, "End time should not exceed the original range's end");
prop_assert!(r.start < r.end, "Each split range must have a valid start and end");
current_start = r.end;
}
if !ranges.is_empty() {
prop_assert_eq!(ranges.first().unwrap().start, range.start, "First range should start at the original start");
prop_assert_eq!(ranges.last().unwrap().end, range.end, "Last range should end at the original end");
}
}
#[test]
fn combine_range_empty_haw(range in any::<WheelRange>()) {
let haw = Haw::<U32SumAggregator>::new(HawConf::default());
haw.combine_range(range);
}
#[test]
fn range_empty_haw(range in any::<WheelRange>()) {
let haw = Haw::<U32SumAggregator>::new(HawConf::default());
haw.range(range);
}
}
}