Struct uwheel::wheels::RwWheel

source ·
pub struct RwWheel<A>
where A: Aggregator,
{ /* private fields */ }
Expand description

A Reader-Writer aggregation wheel with decoupled read and write paths.

§How it works

The design of µWheel is centered around low-watermarking, a concept found in modern streaming systems (e.g., Apache Flink, RisingWave, Arroyo). Wheels in µWheel are low-watermark indexed which enables fast writes and lookups. Additionally, it lets us avoid storing explicit timestamps since they are implicit in the wheels.

A low watermark w indicates that all records with timestamps t where t <= w have been ingested. µWheel exploits this property and seperates the write and read paths. Writes (timestamps above w) are handled by a Writer Wheel which is optimized for single-threaded ingestion of out-of-order aggregates. Reads (queries with time < w) are managed by a hierarchically indexed Reader Wheel that employs a wheel-based query optimizer whose cost function minimizes the number of aggregate operations for a query.

µWheel adopts a Lazy Synchronization aggregation approach. Aggregates are only shifted over from the WriterWheel to the ReaderWheel once the internal low watermark has been advanced.

§Queries

RwWheel supports both streaming window aggregation queries and temporal adhoc queries over arbitrary time ranges. A window may be installed through RwWheel::window and queries can be executed through its ReaderWheel which is accessible through RwWheel::read.

§Example

Here is an example showing the use of a U32 SUM aggregator

use uwheel::{aggregator::sum::U32SumAggregator, NumericalDuration, Entry, RwWheel};

let time = 0;
// initialize the wheel with a low watermark
let mut wheel: RwWheel<U32SumAggregator> = RwWheel::new(time);
// Insert an entry into the wheel
wheel.insert(Entry::new(100, time));
// advance the wheel to 1000
wheel.advance_to(1000);
// verify the new low watermark
assert_eq!(wheel.watermark(), 1000);
// query the last second of aggregates
assert_eq!(wheel.read().interval(1.seconds()), Some(100));

Implementations§

source§

impl<A> RwWheel<A>
where A: Aggregator,

source

pub fn new(time: u64) -> Self

Creates a new Wheel starting from the given time

Time is represented as milliseconds since unix timestamp

See RwWheel::with_conf for more detailed configuration possibilities

source

pub fn with_conf(conf: Conf) -> Self

Creates a new wheel using the specified configuration

§Example
use uwheel::{RwWheel, Conf, aggregator::sum::U32SumAggregator, HawConf};
let conf = Conf::default().with_haw_conf(HawConf::default().with_watermark(10000));
let wheel: RwWheel<U32SumAggregator> = RwWheel::with_conf(conf);
source

pub fn window(&mut self, window: impl Into<Window>)

Installs a periodic window aggregation query

Results of the window are returned when advancing the wheel with Self::advance_to or Self::advance.

§Example
use uwheel::{Window, aggregator::sum::U32SumAggregator, RwWheel, NumericalDuration};

// Define a window query
let window = Window::sliding(10.seconds(), 3.seconds());
// Initialize a Reader-Writer Wheel and install the window
let mut wheel: RwWheel<U32SumAggregator> = RwWheel::new(0);
wheel.window(window);
source

pub fn insert(&mut self, e: impl Into<Entry<A::Input>>)

Inserts an entry into the wheel

§Safety

Entries with timestamps below the current low watermark (Self::watermark) are dropped.

§Example
use uwheel::{aggregator::sum::U32SumAggregator, RwWheel, Entry};

let mut wheel: RwWheel<U32SumAggregator> = RwWheel::new(0);
let data = 10;
let timestamp = 1000;
wheel.insert(Entry::new(data, timestamp));
source

pub fn write(&self) -> &WriterWheel<A>

Returns a reference to the writer wheel

source

pub fn read(&self) -> &ReaderWheel<A>

Returns a reference to the underlying reader wheel

source

pub fn merge_read_wheel(&self, other: &ReaderWheel<A>)

Merges another read wheel with same size into this one

source

pub fn watermark(&self) -> u64

Returns the current watermark of this wheel

source

pub fn advance( &mut self, duration: Duration, ) -> Vec<WindowAggregate<A::PartialAggregate>>

Advance the watermark of the wheel by the given Duration

May return possible window aggregates if any window is installed (see RwWheel::window).

§Example
use uwheel::{aggregator::sum::U32SumAggregator, RwWheel, NumericalDuration};

let mut wheel: RwWheel<U32SumAggregator> = RwWheel::new(0);
wheel.advance(5.seconds());
assert_eq!(wheel.watermark(), 5000);
source

pub fn advance_to( &mut self, watermark: u64, ) -> Vec<WindowAggregate<A::PartialAggregate>>

Advances the time of the wheel to the specified watermark.

May return possible window aggregates if any window is installed (see RwWheel::window).

§Safety

This function assumes advancement to occur in atomic units (e.g., 5 not 4.5 seconds)

§Example
use uwheel::{aggregator::sum::U32SumAggregator, RwWheel, NumericalDuration};

let mut wheel: RwWheel<U32SumAggregator> = RwWheel::new(0);
wheel.advance_to(5000);
assert_eq!(wheel.watermark(), 5000);
source

pub fn size_bytes(&self) -> usize

Returns an estimation of bytes used by the wheel

source

pub fn print_stats(&self)

Available on crate feature profiler only.

Prints the stats of the RwWheel

Trait Implementations§

source§

impl<A: Aggregator> Default for RwWheel<A>

source§

fn default() -> Self

Returns the “default value” for a type. Read more
source§

impl<'de, A> Deserialize<'de> for RwWheel<A>
where A: Aggregator + Default,

source§

fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>
where __D: Deserializer<'de>,

Deserialize this value from the given Serde deserializer. Read more
source§

impl<A> Drop for RwWheel<A>
where A: Aggregator,

source§

fn drop(&mut self)

Executes the destructor for this type. Read more
source§

impl<A> Serialize for RwWheel<A>
where A: Aggregator + Default,

source§

fn serialize<__S>(&self, __serializer: __S) -> Result<__S::Ok, __S::Error>
where __S: Serializer,

Serialize this value into the given Serde serializer. Read more

Auto Trait Implementations§

§

impl<A> Freeze for RwWheel<A>

§

impl<A> !RefUnwindSafe for RwWheel<A>

§

impl<A> !Send for RwWheel<A>

§

impl<A> !Sync for RwWheel<A>

§

impl<A> Unpin for RwWheel<A>
where <A as Aggregator>::Input: Unpin,

§

impl<A> !UnwindSafe for RwWheel<A>

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<T> DeserializeOwned for T
where T: for<'de> Deserialize<'de>,