pub struct RwWheel<A>
where A: Aggregator,
{ /* private fields */ }
Expand description

A Reader-Writer aggregation wheel with decoupled read and write paths.

§How it works

The design of µWheel is centered around low-watermarking, a concept found in modern streaming systems (e.g., Apache Flink, RisingWave, Arroyo). Wheels in µWheel are low-watermark indexed which enables fast writes and lookups. Additionally, it lets us avoid storing explicit timestamps since they are implicit in the wheels.

A low watermark w indicates that all records with timestamps t where t <= w have been ingested. µWheel exploits this property and seperates the write and read paths. Writes (timestamps above w) are handled by a Writer Wheel which is optimized for single-threaded ingestion of out-of-order aggregates. Reads (queries with time < w) are managed by a hierarchically indexed Reader Wheel that employs a wheel-based query optimizer whose cost function minimizes the number of aggregate operations for a query.

µWheel adopts a Lazy Synchronization aggregation approach. Aggregates are only shifted over from the WriterWheel to the ReaderWheel once the internal low watermark has been advanced.


RwWheel supports both streaming window aggregation queries and temporal adhoc queries over arbitrary time ranges. A window may be installed through RwWheel::window and queries can be executed through its ReaderWheel which is accessible through RwWheel::read.


Here is an example showing the use of a U32 SUM aggregator

use uwheel::{aggregator::sum::U32SumAggregator, NumericalDuration, Entry, RwWheel};

let time = 0;
// initialize the wheel with a low watermark
let mut wheel: RwWheel<U32SumAggregator> = RwWheel::new(time);
// Insert an entry into the wheel
wheel.insert(Entry::new(100, time));
// advance the wheel to 1000
// verify the new low watermark
assert_eq!(wheel.watermark(), 1000);
// query the last second of aggregates
assert_eq!(, Some(100));



impl<A> RwWheel<A>
where A: Aggregator,


pub fn new(time: u64) -> Self

Creates a new Wheel starting from the given time

Time is represented as milliseconds since unix timestamp

See RwWheel::with_conf for more detailed configuration possibilities


pub fn with_conf(conf: Conf) -> Self

Creates a new wheel using the specified configuration

use uwheel::{RwWheel, Conf, aggregator::sum::U32SumAggregator, HawConf};
let conf = Conf::default().with_haw_conf(HawConf::default().with_watermark(10000));
let wheel: RwWheel<U32SumAggregator> = RwWheel::with_conf(conf);

pub fn window(&mut self, window: impl Into<Window>)

Installs a periodic window aggregation query

Results of the window are returned when advancing the wheel with Self::advance_to or Self::advance.

use uwheel::{Window, aggregator::sum::U32SumAggregator, RwWheel, NumericalDuration};

// Define a window query
let window = Window::sliding(10.seconds(), 3.seconds());
// Initialize a Reader-Writer Wheel and install the window
let mut wheel: RwWheel<U32SumAggregator> = RwWheel::new(0);

pub fn insert(&mut self, e: impl Into<Entry<A::Input>>)

Inserts an entry into the wheel


Entries with timestamps below the current low watermark (Self::watermark) are dropped.

use uwheel::{aggregator::sum::U32SumAggregator, RwWheel, Entry};

let mut wheel: RwWheel<U32SumAggregator> = RwWheel::new(0);
let data = 10;
let timestamp = 1000;
wheel.insert(Entry::new(data, timestamp));

pub fn write(&self) -> &WriterWheel<A>

Returns a reference to the writer wheel


pub fn read(&self) -> &ReaderWheel<A>

Returns a reference to the underlying reader wheel


pub fn merge_read_wheel(&self, other: &ReaderWheel<A>)

Merges another read wheel with same size into this one


pub fn watermark(&self) -> u64

Returns the current watermark of this wheel


pub fn advance( &mut self, duration: Duration, ) -> Vec<WindowAggregate<A::PartialAggregate>>

Advance the watermark of the wheel by the given Duration

May return possible window aggregates if any window is installed (see RwWheel::window).

use uwheel::{aggregator::sum::U32SumAggregator, RwWheel, NumericalDuration};

let mut wheel: RwWheel<U32SumAggregator> = RwWheel::new(0);
assert_eq!(wheel.watermark(), 5000);

pub fn advance_to( &mut self, watermark: u64, ) -> Vec<WindowAggregate<A::PartialAggregate>>

Advances the time of the wheel to the specified watermark.

May return possible window aggregates if any window is installed (see RwWheel::window).


This function assumes advancement to occur in atomic units (e.g., 5 not 4.5 seconds)

use uwheel::{aggregator::sum::U32SumAggregator, RwWheel, NumericalDuration};

let mut wheel: RwWheel<U32SumAggregator> = RwWheel::new(0);
assert_eq!(wheel.watermark(), 5000);

pub fn size_bytes(&self) -> usize

Returns an estimation of bytes used by the wheel


pub fn print_stats(&self)

Available on crate feature profiler only.

Prints the stats of the RwWheel

