pub struct RwWheel<A>where
A: Aggregator,{ /* private fields */ }
Expand description
A Reader-Writer aggregation wheel with decoupled read and write paths.
§How it works
The design of µWheel is centered around low-watermarking, a concept found in modern streaming systems (e.g., Apache Flink, RisingWave, Arroyo). Wheels in µWheel are low-watermark indexed which enables fast writes and lookups. Additionally, it lets us avoid storing explicit timestamps since they are implicit in the wheels.
A low watermark w
indicates that all records with timestamps t
where t <= w
have been ingested.
µWheel exploits this property and seperates the write and read paths. Writes (timestamps above w
) are handled by a Writer Wheel which is optimized for single-threaded ingestion of out-of-order aggregates.
Reads (queries with time < w
) are managed by a hierarchically indexed Reader Wheel that employs a wheel-based query optimizer whose cost function
minimizes the number of aggregate operations for a query.
µWheel adopts a Lazy Synchronization aggregation approach. Aggregates are only shifted over from the WriterWheel
to the ReaderWheel
once the internal low watermark has been advanced.
§Queries
RwWheel supports both streaming window aggregation queries and temporal adhoc queries over arbitrary time ranges. A window may be installed through RwWheel::window and queries can be executed through its ReaderWheel which is accessible through RwWheel::read.
§Example
Here is an example showing the use of a U32 SUM aggregator
use uwheel::{aggregator::sum::U32SumAggregator, NumericalDuration, Entry, RwWheel};
let time = 0;
// initialize the wheel with a low watermark
let mut wheel: RwWheel<U32SumAggregator> = RwWheel::new(time);
// Insert an entry into the wheel
wheel.insert(Entry::new(100, time));
// advance the wheel to 1000
wheel.advance_to(1000);
// verify the new low watermark
assert_eq!(wheel.watermark(), 1000);
// query the last second of aggregates
assert_eq!(wheel.read().interval(1.seconds()), Some(100));
Implementations§
Source§impl<A> RwWheel<A>where
A: Aggregator,
impl<A> RwWheel<A>where
A: Aggregator,
Sourcepub fn new(time: u64) -> Self
pub fn new(time: u64) -> Self
Creates a new Wheel starting from the given time
Time is represented as milliseconds since unix timestamp
See RwWheel::with_conf
for more detailed configuration possibilities
Sourcepub fn with_conf(conf: Conf) -> Self
pub fn with_conf(conf: Conf) -> Self
Creates a new wheel using the specified configuration
§Example
use uwheel::{RwWheel, Conf, aggregator::sum::U32SumAggregator, HawConf};
let conf = Conf::default().with_haw_conf(HawConf::default().with_watermark(10000));
let wheel: RwWheel<U32SumAggregator> = RwWheel::with_conf(conf);
Sourcepub fn window(&mut self, window: impl Into<Window>)
pub fn window(&mut self, window: impl Into<Window>)
Installs a periodic window aggregation query
Results of the window are returned when advancing the wheel with Self::advance_to or Self::advance.
§Example
use uwheel::{Window, aggregator::sum::U32SumAggregator, RwWheel, NumericalDuration};
// Define a window query
let window = Window::sliding(10.seconds(), 3.seconds());
// Initialize a Reader-Writer Wheel and install the window
let mut wheel: RwWheel<U32SumAggregator> = RwWheel::new(0);
wheel.window(window);
Sourcepub fn insert(&mut self, e: impl Into<Entry<A::Input>>)
pub fn insert(&mut self, e: impl Into<Entry<A::Input>>)
Inserts an entry into the wheel
§Safety
Entries with timestamps below the current low watermark (Self::watermark) are dropped.
§Example
use uwheel::{aggregator::sum::U32SumAggregator, RwWheel, Entry};
let mut wheel: RwWheel<U32SumAggregator> = RwWheel::new(0);
let data = 10;
let timestamp = 1000;
wheel.insert(Entry::new(data, timestamp));
Sourcepub fn write(&self) -> &WriterWheel<A>
pub fn write(&self) -> &WriterWheel<A>
Returns a reference to the writer wheel
Sourcepub fn read(&self) -> &ReaderWheel<A>
pub fn read(&self) -> &ReaderWheel<A>
Returns a reference to the underlying reader wheel
Sourcepub fn merge_read_wheel(&self, other: &ReaderWheel<A>)
pub fn merge_read_wheel(&self, other: &ReaderWheel<A>)
Merges another read wheel with same size into this one
Sourcepub fn advance(
&mut self,
duration: Duration,
) -> Vec<WindowAggregate<A::PartialAggregate>>
pub fn advance( &mut self, duration: Duration, ) -> Vec<WindowAggregate<A::PartialAggregate>>
Advance the watermark of the wheel by the given Duration
May return possible window aggregates if any window is installed (see RwWheel::window).
§Example
use uwheel::{aggregator::sum::U32SumAggregator, RwWheel, NumericalDuration};
let mut wheel: RwWheel<U32SumAggregator> = RwWheel::new(0);
wheel.advance(5.seconds());
assert_eq!(wheel.watermark(), 5000);
Sourcepub fn advance_to(
&mut self,
watermark: u64,
) -> Vec<WindowAggregate<A::PartialAggregate>>
pub fn advance_to( &mut self, watermark: u64, ) -> Vec<WindowAggregate<A::PartialAggregate>>
Advances the time of the wheel to the specified watermark.
May return possible window aggregates if any window is installed (see RwWheel::window).
§Safety
This function assumes advancement to occur in atomic units (e.g., 5 not 4.5 seconds)
§Example
use uwheel::{aggregator::sum::U32SumAggregator, RwWheel, NumericalDuration};
let mut wheel: RwWheel<U32SumAggregator> = RwWheel::new(0);
wheel.advance_to(5000);
assert_eq!(wheel.watermark(), 5000);
Sourcepub fn size_bytes(&self) -> usize
pub fn size_bytes(&self) -> usize
Returns an estimation of bytes used by the wheel
Sourcepub fn print_stats(&self)
Available on crate feature profiler
only.
pub fn print_stats(&self)
profiler
only.Prints the stats of the RwWheel