pub mod aggregation;
pub mod hierarchical;
mod plan;
#[cfg(feature = "profiler")]
pub(crate) mod stats;
#[cfg(feature = "timer")]
use crate::wheels::timer::{TimerAction, TimerError};
use crate::{
WheelRange,
cfg_not_sync,
cfg_sync,
delta::DeltaState,
duration::Duration,
window::WindowAggregate,
};
pub use hierarchical::{DAYS, HOURS, Haw, MINUTES, SECONDS, WEEKS, YEARS};
pub use plan::ExecutionPlan;
use crate::aggregator::Aggregator;
use self::hierarchical::HawConf;
use crate::window::Window;
use super::write::WriterWheel;
#[cfg(not(feature = "std"))]
use alloc::vec::Vec;
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
#[cfg_attr(feature = "serde", serde(bound = "A: Default"))]
#[derive(Clone)]
pub struct ReaderWheel<A>
where
A: Aggregator,
{
inner: Inner<A>,
}
impl<A> ReaderWheel<A>
where
A: Aggregator,
{
pub fn new(time: u64) -> Self {
Self {
inner: Inner::new(Haw::new(HawConf::default().with_watermark(time))),
}
}
pub fn with_conf(conf: HawConf) -> Self {
Self {
inner: Inner::new(Haw::new(conf)),
}
}
pub fn from_delta_state(state: DeltaState<A::PartialAggregate>) -> Self {
let rw = Self::new(state.oldest_ts);
rw.delta_advance(state.deltas);
rw
}
pub fn delta_state(&self) -> DeltaState<A::PartialAggregate> {
self.inner.read().delta_state()
}
pub fn len(&self) -> usize {
self.inner.read().len()
}
pub fn is_empty(&self) -> bool {
self.inner.read().is_empty()
}
pub fn is_full(&self) -> bool {
self.inner.read().is_full()
}
pub fn remaining_ticks(&self) -> u64 {
self.inner.read().remaining_ticks()
}
#[doc(hidden)]
pub fn set_optimizer_hints(&self, hints: bool) {
self.inner.write().set_optimizer_hints(hints);
}
pub fn to_prefix_wheels(&self) {
self.inner.write().to_prefix_wheels();
}
pub fn to_simd_wheels(&self) {
self.inner.write().to_simd_wheels();
}
#[doc(hidden)]
pub fn convert_all_to_array(&self) {
self.inner.write().convert_all_to_array();
}
#[inline]
pub fn current_time_in_cycle(&self) -> Duration {
self.inner.read().current_time_in_cycle()
}
#[cfg(feature = "timer")]
pub fn schedule_once(
&self,
at: u64,
f: impl Fn(&Haw<A>) + 'static,
) -> Result<(), TimerError<TimerAction<A>>> {
self.inner.write().schedule_once(at, f)
}
#[cfg(feature = "timer")]
pub fn schedule_repeat(
&self,
at: u64,
interval: Duration,
f: impl Fn(&Haw<A>) + 'static,
) -> Result<(), TimerError<TimerAction<A>>> {
self.inner.write().schedule_repeat(at, interval, f)
}
#[doc(hidden)]
pub fn window(&mut self, window: Window) {
self.inner.write().window(window);
}
#[inline]
#[doc(hidden)]
pub fn advance(
&self,
duration: Duration,
waw: &mut WriterWheel<A>,
) -> Vec<WindowAggregate<A::PartialAggregate>> {
self.inner.write().advance(duration, waw)
}
#[inline]
pub(crate) fn advance_to(
&self,
watermark: u64,
waw: &mut WriterWheel<A>,
) -> Vec<WindowAggregate<A::PartialAggregate>> {
self.inner.write().advance_to(watermark, waw)
}
#[inline]
pub fn delta_advance(&self, deltas: impl IntoIterator<Item = Option<A::PartialAggregate>>) {
self.inner.write().delta_advance(deltas);
}
pub fn clear(&self) {
self.inner.write().clear();
}
#[inline]
pub fn watermark(&self) -> u64 {
self.inner.read().watermark()
}
pub fn interval_and_lower(&self, dur: Duration) -> Option<A::Aggregate> {
self.interval(dur).map(|partial| A::lower(partial))
}
#[inline]
pub fn interval(&self, dur: Duration) -> Option<A::PartialAggregate> {
self.inner.read().interval(dur)
}
#[inline]
pub fn interval_with_ops(&self, dur: Duration) -> (Option<A::PartialAggregate>, usize) {
self.inner.read().interval_with_stats(dur)
}
#[inline]
pub fn combine_range(&self, range: impl Into<WheelRange>) -> Option<A::PartialAggregate> {
self.inner.read().combine_range(range)
}
#[inline]
pub fn combine_range_and_lower(&self, range: impl Into<WheelRange>) -> Option<A::Aggregate> {
self.combine_range(range).map(A::lower)
}
#[inline]
pub fn group_by(
&self,
range: WheelRange,
interval: Duration,
) -> Option<Vec<(u64, A::Aggregate)>> {
self.inner.read().group_by(range, interval)
}
#[inline]
pub fn range(&self, range: impl Into<WheelRange>) -> Option<Vec<(u64, A::PartialAggregate)>> {
self.inner.read().range(range)
}
#[inline]
pub fn range_and_lower(
&self,
range: impl Into<WheelRange>,
) -> Option<Vec<(u64, A::Aggregate)>> {
self.inner.read().range_and_lower(range)
}
#[inline]
pub fn landmark(&self) -> Option<A::PartialAggregate> {
self.inner.read().landmark()
}
#[inline]
pub fn merge(&self, other: &Self) {
self.inner.write().merge(&mut other.inner.write());
}
pub fn as_ref(&self) -> HawRef<'_, A> {
self.inner.read()
}
}
impl<A: Aggregator> From<Haw<A>> for ReaderWheel<A> {
fn from(value: Haw<A>) -> Self {
Self {
inner: Inner::new(value),
}
}
}
cfg_not_sync! {
#[cfg(not(feature = "std"))]
use alloc::rc::Rc;
use core::cell::RefCell;
#[cfg(feature = "std")]
use std::rc::Rc;
pub type HawRef<'a, T> = core::cell::Ref<'a, Haw<T>>;
pub type HawRefMut<'a, T> = core::cell::RefMut<'a, Haw<T>>;
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
#[cfg_attr(feature = "serde", serde(bound = "T: Default"))]
#[derive(Clone)]
#[doc(hidden)]
pub struct Inner<T: Aggregator>(Rc<RefCell<Haw<T>>>);
impl<T: Aggregator> Inner<T> {
#[inline(always)]
pub fn new(val: Haw<T>) -> Self {
Self(Rc::new(RefCell::new(val)))
}
#[inline(always)]
pub fn read(&self) -> HawRef<'_, T> {
self.0.borrow()
}
#[inline(always)]
pub fn write(&self) -> HawRefMut<'_, T> {
self.0.borrow_mut()
}
}
}
cfg_sync! {
use parking_lot::{MappedRwLockReadGuard, MappedRwLockWriteGuard, RwLock};
use std::sync::Arc;
pub type HawRef<'a, T> = MappedRwLockReadGuard<'a, Haw<T>>;
pub type HawRefMut<'a, T> = MappedRwLockWriteGuard<'a, Haw<T>>;
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
#[cfg_attr(feature = "serde", serde(bound = "T: Default"))]
#[derive(Clone)]
#[doc(hidden)]
pub struct Inner<T: Aggregator>(Arc<RwLock<Haw<T>>>);
impl<T: Aggregator> Inner<T> {
#[inline(always)]
pub fn new(val: Haw<T>) -> Self {
Self(Arc::new(RwLock::new(val)))
}
#[inline(always)]
pub fn read(&self) -> HawRef<'_, T> {
parking_lot::RwLockReadGuard::map(self.0.read(), |v| v)
}
#[inline(always)]
pub fn write(&self) -> HawRefMut<'_, T> {
parking_lot::RwLockWriteGuard::map(self.0.write(), |v| v)
}
}
}