uwheel/wheels/mod.rs
1/// Reader Wheel
2///
3/// Single reader or multi-reader with the ``sync`` feature enabled.
4pub mod read;
5/// Extension trait for implementing a custom wheel
6pub mod wheel_ext;
7/// Writer Wheel
8///
9/// Optimised for a single writer
10pub mod write;
11
12#[cfg(feature = "profiler")]
13mod stats;
14
15/// Hierarchical Wheel Timer
16#[allow(dead_code)]
17mod timer;
18
19use crate::{Entry, aggregator::Aggregator, duration::Duration, window::WindowAggregate};
20use core::{fmt::Debug, num::NonZeroUsize};
21use write::DEFAULT_WRITE_AHEAD_SLOTS;
22
23pub use read::{DAYS, HOURS, MINUTES, SECONDS, WEEKS, YEARS};
24pub use wheel_ext::WheelExt;
25pub use write::WriterWheel;
26
27use self::read::{ReaderWheel, hierarchical::HawConf};
28
29use crate::window::Window;
30
31#[cfg(not(feature = "std"))]
32use alloc::vec::Vec;
33
34#[cfg(feature = "profiler")]
35use uwheel_stats::profile_scope;
36
37/// A Reader-Writer aggregation wheel with decoupled read and write paths.
38///
39/// # How it works
40/// The design of µWheel is centered around [low-watermarking](http://www.vldb.org/pvldb/vol14/p3135-begoli.pdf), a concept found in modern streaming systems (e.g., Apache Flink, RisingWave, Arroyo).
41/// Wheels in µWheel are low-watermark indexed which enables fast writes and lookups. Additionally, it lets us avoid storing explicit timestamps since they are implicit in the wheels.
42///
43/// A low watermark `w` indicates that all records with timestamps `t` where `t <= w` have been ingested.
44/// µWheel exploits this property and seperates the write and read paths. Writes (timestamps above `w`) are handled by a Writer Wheel which is optimized for single-threaded ingestion of out-of-order aggregates.
45/// Reads (queries with `time < w`) are managed by a hierarchically indexed Reader Wheel that employs a wheel-based query optimizer whose cost function
46/// minimizes the number of aggregate operations for a query.
47///
48/// µWheel adopts a Lazy Synchronization aggregation approach. Aggregates are only shifted over from the `WriterWheel` to the `ReaderWheel`
49/// once the internal low watermark has been advanced.
50///
51/// 
52///
53/// ## Queries
54///
55/// [RwWheel] supports both streaming window aggregation queries and temporal adhoc queries over arbitrary time ranges.
56/// A window may be installed through [RwWheel::window] and queries can be executed through its ReaderWheel which is accessible through [RwWheel::read].
57///
58/// ## Example
59///
60/// Here is an example showing the use of a U32 SUM aggregator
61///
62/// ```
63/// use uwheel::{aggregator::sum::U32SumAggregator, NumericalDuration, Entry, RwWheel};
64///
65/// let time = 0;
66/// // initialize the wheel with a low watermark
67/// let mut wheel: RwWheel<U32SumAggregator> = RwWheel::new(time);
68/// // Insert an entry into the wheel
69/// wheel.insert(Entry::new(100, time));
70/// // advance the wheel to 1000
71/// wheel.advance_to(1000);
72/// // verify the new low watermark
73/// assert_eq!(wheel.watermark(), 1000);
74/// // query the last second of aggregates
75/// assert_eq!(wheel.read().interval(1.seconds()), Some(100));
76/// ```
77#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
78#[cfg_attr(feature = "serde", serde(bound = "A: Default"))]
79pub struct RwWheel<A>
80where
81 A: Aggregator,
82{
83 /// A single-writer wheel designed for high-throughput ingestion of stream aggregates
84 writer: WriterWheel<A>,
85 /// A multiple-reader wheel designed for efficient querying of aggregate across arbitrary time ranges
86 reader: ReaderWheel<A>,
87 #[cfg(feature = "profiler")]
88 stats: stats::Stats,
89}
90
91impl<A: Aggregator> Default for RwWheel<A> {
92 fn default() -> Self {
93 Self::with_conf(Default::default())
94 }
95}
96
97impl<A> RwWheel<A>
98where
99 A: Aggregator,
100{
101 /// Creates a new Wheel starting from the given time
102 ///
103 /// Time is represented as milliseconds since unix timestamp
104 ///
105 /// See [`RwWheel::with_conf`] for more detailed configuration possibilities
106 pub fn new(time: u64) -> Self {
107 let conf = Conf::default().with_haw_conf(HawConf::default().with_watermark(time));
108 Self::with_conf(conf)
109 }
110 /// Creates a new wheel using the specified configuration
111 ///
112 /// # Example
113 ///
114 /// ```
115 /// use uwheel::{RwWheel, Conf, aggregator::sum::U32SumAggregator, HawConf};
116 /// let conf = Conf::default().with_haw_conf(HawConf::default().with_watermark(10000));
117 /// let wheel: RwWheel<U32SumAggregator> = RwWheel::with_conf(conf);
118 /// ```
119 pub fn with_conf(conf: Conf) -> Self {
120 Self {
121 writer: WriterWheel::with_capacity_and_watermark(
122 conf.writer_conf.write_ahead_capacity,
123 conf.reader_conf.haw_conf.watermark,
124 ),
125 reader: ReaderWheel::with_conf(conf.reader_conf.haw_conf),
126 #[cfg(feature = "profiler")]
127 stats: stats::Stats::default(),
128 }
129 }
130 /// Installs a periodic window aggregation query
131 ///
132 /// Results of the window are returned when advancing the wheel with [Self::advance_to] or [Self::advance].
133 ///
134 /// # Example
135 ///
136 /// ```
137 /// use uwheel::{Window, aggregator::sum::U32SumAggregator, RwWheel, NumericalDuration};
138 ///
139 /// // Define a window query
140 /// let window = Window::sliding(10.seconds(), 3.seconds());
141 /// // Initialize a Reader-Writer Wheel and install the window
142 /// let mut wheel: RwWheel<U32SumAggregator> = RwWheel::new(0);
143 /// wheel.window(window);
144 /// ```
145 pub fn window(&mut self, window: impl Into<Window>) {
146 self.reader.window(window.into());
147 }
148
149 /// Inserts an entry into the wheel
150 ///
151 /// # Safety
152 ///
153 /// Entries with timestamps below the current low watermark ([Self::watermark]) are dropped.
154 ///
155 /// # Example
156 ///
157 /// ```
158 /// use uwheel::{aggregator::sum::U32SumAggregator, RwWheel, Entry};
159 ///
160 /// let mut wheel: RwWheel<U32SumAggregator> = RwWheel::new(0);
161 /// let data = 10;
162 /// let timestamp = 1000;
163 /// wheel.insert(Entry::new(data, timestamp));
164 /// ```
165 #[inline]
166 pub fn insert(&mut self, e: impl Into<Entry<A::Input>>) {
167 #[cfg(feature = "profiler")]
168 profile_scope!(&self.stats.insert);
169
170 self.writer.insert(e);
171 }
172
173 /// Returns a reference to the writer wheel
174 pub fn write(&self) -> &WriterWheel<A> {
175 &self.writer
176 }
177 /// Returns a reference to the underlying reader wheel
178 pub fn read(&self) -> &ReaderWheel<A> {
179 &self.reader
180 }
181 /// Merges another read wheel with same size into this one
182 pub fn merge_read_wheel(&self, other: &ReaderWheel<A>) {
183 self.read().merge(other);
184 }
185 /// Returns the current watermark of this wheel
186 pub fn watermark(&self) -> u64 {
187 self.writer.watermark()
188 }
189 /// Advance the watermark of the wheel by the given [Duration]
190 ///
191 /// May return possible window aggregates if any window is installed (see [RwWheel::window]).
192 ///
193 /// # Example
194 ///
195 /// ```
196 /// use uwheel::{aggregator::sum::U32SumAggregator, RwWheel, NumericalDuration};
197 ///
198 /// let mut wheel: RwWheel<U32SumAggregator> = RwWheel::new(0);
199 /// wheel.advance(5.seconds());
200 /// assert_eq!(wheel.watermark(), 5000);
201 /// ```
202 #[inline]
203 pub fn advance(&mut self, duration: Duration) -> Vec<WindowAggregate<A::PartialAggregate>> {
204 let to = self.watermark() + duration.whole_milliseconds() as u64;
205 self.advance_to(to)
206 }
207
208 /// Advances the time of the wheel to the specified watermark.
209 ///
210 /// May return possible window aggregates if any window is installed (see [RwWheel::window]).
211 ///
212 /// # Safety
213 ///
214 /// This function assumes advancement to occur in atomic units (e.g., 5 not 4.5 seconds)
215 ///
216 /// # Example
217 ///
218 /// ```
219 /// use uwheel::{aggregator::sum::U32SumAggregator, RwWheel, NumericalDuration};
220 ///
221 /// let mut wheel: RwWheel<U32SumAggregator> = RwWheel::new(0);
222 /// wheel.advance_to(5000);
223 /// assert_eq!(wheel.watermark(), 5000);
224 /// ```
225 #[inline]
226 pub fn advance_to(&mut self, watermark: u64) -> Vec<WindowAggregate<A::PartialAggregate>> {
227 #[cfg(feature = "profiler")]
228 profile_scope!(&self.stats.advance);
229
230 self.reader.advance_to(watermark, &mut self.writer)
231 }
232
233 /// Returns an estimation of bytes used by the wheel
234 pub fn size_bytes(&self) -> usize {
235 let read = self.reader.as_ref().size_bytes();
236 let write = self.writer.size_bytes().unwrap();
237 read + write
238 }
239
240 #[cfg(feature = "profiler")]
241 /// Prints the stats of the [RwWheel]
242 pub fn print_stats(&self) {
243 use prettytable::{Table, row};
244 use uwheel_stats::Sketch;
245 let mut table = Table::new();
246 table.add_row(row![
247 "name", "count", "min", "p50", "p99", "p99.9", "p99.99", "p99.999", "max",
248 ]);
249 // helper fn to format percentile
250 let percentile_fmt = |p: f64| -> String { format!("{:.2}ns", p) };
251
252 // helper fn to add row to the table
253 let add_row = |id: &str, table: &mut Table, sketch: &Sketch| {
254 let percentiles = sketch.percentiles();
255 table.add_row(row![
256 id,
257 percentiles.count,
258 percentiles.min,
259 percentile_fmt(percentiles.p50),
260 percentile_fmt(percentiles.p99),
261 percentile_fmt(percentiles.p99_9),
262 percentile_fmt(percentiles.p99_99),
263 percentile_fmt(percentiles.p99_999),
264 percentiles.min,
265 ]);
266 };
267
268 add_row("insert", &mut table, &self.stats.insert);
269 add_row("advance", &mut table, &self.stats.advance);
270
271 let read = self.reader.as_ref();
272 add_row("tick", &mut table, &read.stats().tick);
273 add_row("interval", &mut table, &read.stats().interval);
274 add_row("landmark", &mut table, &read.stats().landmark);
275 add_row("combine range", &mut table, &read.stats().combine_range);
276 add_row(
277 "combine range plan",
278 &mut table,
279 &read.stats().combine_range_plan,
280 );
281 add_row(
282 "combined aggregation",
283 &mut table,
284 &read.stats().combined_aggregation,
285 );
286
287 add_row("execution plan", &mut table, &read.stats().exec_plan);
288
289 add_row(
290 "combined aggregation plan",
291 &mut table,
292 &read.stats().combined_aggregation_plan,
293 );
294 add_row(
295 "wheel aggregation",
296 &mut table,
297 &read.stats().wheel_aggregation,
298 );
299
300 println!("====RwWheel Profiler Dump====");
301 table.printstd();
302 }
303}
304
305impl<A> Drop for RwWheel<A>
306where
307 A: Aggregator,
308{
309 fn drop(&mut self) {
310 #[cfg(feature = "profiler")]
311 self.print_stats();
312 }
313}
314
315/// [`WriterWheel`] Configuration
316#[derive(Debug, Copy, Clone)]
317pub struct WriterConf {
318 /// Defines the capacity of write-ahead slots
319 write_ahead_capacity: NonZeroUsize,
320}
321impl Default for WriterConf {
322 fn default() -> Self {
323 Self {
324 write_ahead_capacity: NonZeroUsize::new(DEFAULT_WRITE_AHEAD_SLOTS).unwrap(),
325 }
326 }
327}
328
329/// [`ReaderWheel`] Configuration
330#[derive(Debug, Default, Copy, Clone)]
331pub struct ReaderConf {
332 /// Hierarchical Aggregation Wheel configuration
333 haw_conf: HawConf,
334}
335
336/// Reader-Writer Wheel Configuration
337#[derive(Debug, Default, Copy, Clone)]
338pub struct Conf {
339 /// Writer Wheel Configuration
340 writer_conf: WriterConf,
341 /// Reader Wheel Configuration
342 reader_conf: ReaderConf,
343}
344
345impl Conf {
346 /// Configure the number of write-ahead slots
347 ///
348 /// The default value is [DEFAULT_WRITE_AHEAD_SLOTS]
349 ///
350 /// # Example
351 ///
352 /// ```
353 /// use uwheel::Conf;
354 /// use core::num::NonZeroUsize;
355 ///
356 /// // Configure a write-ahead capacity of 128 (seconds)
357 /// let rw_conf = Conf::default().with_write_ahead(NonZeroUsize::new(128).unwrap());
358 /// ```
359 pub fn with_write_ahead(mut self, capacity: NonZeroUsize) -> Self {
360 self.writer_conf.write_ahead_capacity = capacity;
361 self
362 }
363 /// Configures the reader wheel to use the given [HawConf]
364 ///
365 /// # Example
366 ///
367 /// ```
368 /// use uwheel::{HawConf, Conf, RetentionPolicy};
369 ///
370 /// // Configure all wheels in the HAW to maintain data
371 /// let haw_conf = HawConf::default().with_retention_policy(RetentionPolicy::Keep);
372 /// let rw_conf = Conf::default().with_haw_conf(haw_conf);
373 /// ```
374 pub fn with_haw_conf(mut self, conf: HawConf) -> Self {
375 self.reader_conf.haw_conf = conf;
376 self
377 }
378}
379
380#[cfg(test)]
381mod tests {
382 #[cfg(feature = "timer")]
383 use core::cell::RefCell;
384 #[cfg(feature = "timer")]
385 use std::rc::Rc;
386
387 use super::*;
388 use crate::{aggregator::sum::U32SumAggregator, duration::*, *};
389 use proptest::prelude::*;
390
391 #[test]
392 fn delta_generate_test() {
393 let haw_conf = HawConf::default().with_deltas();
394 let conf = Conf::default().with_haw_conf(haw_conf);
395
396 let mut rw_wheel: RwWheel<U32SumAggregator> = RwWheel::with_conf(conf);
397 rw_wheel.insert(Entry::new(250, 1000));
398 rw_wheel.insert(Entry::new(250, 2000));
399 rw_wheel.insert(Entry::new(250, 3000));
400 rw_wheel.insert(Entry::new(250, 4000));
401
402 rw_wheel.advance_to(5000);
403 let delta_state = rw_wheel.read().delta_state();
404
405 assert_eq!(delta_state.oldest_ts, 0);
406 assert_eq!(
407 delta_state.deltas,
408 vec![None, Some(250), Some(250), Some(250), Some(250)]
409 );
410
411 assert_eq!(rw_wheel.read().interval(4.seconds()), Some(1000));
412
413 // create a new read wheel from deltas
414 let read: ReaderWheel<U32SumAggregator> = ReaderWheel::from_delta_state(delta_state);
415 // verify the watermark
416 assert_eq!(read.watermark(), 5000);
417 // verify the the results
418 assert_eq!(read.interval(4.seconds()), Some(1000));
419 }
420
421 #[test]
422 fn insert_test() {
423 let mut rw_wheel: RwWheel<U32SumAggregator> = RwWheel::default();
424 rw_wheel.insert(Entry::new(250, 1000));
425 rw_wheel.insert(Entry::new(250, 2000));
426 rw_wheel.insert(Entry::new(250, 3000));
427 rw_wheel.insert(Entry::new(250, 4000));
428
429 // should be inserted into the timer wheel
430 rw_wheel.insert(Entry::new(250, 150000));
431
432 rw_wheel.advance_to(5000);
433
434 // should trigger insert
435 rw_wheel.advance_to(86000);
436 }
437
438 #[cfg(feature = "serde")]
439 #[test]
440 fn rw_wheel_serde_test() {
441 let mut rw_wheel: RwWheel<U32SumAggregator> = RwWheel::new(0);
442 rw_wheel.insert(Entry::new(250, 1000));
443 rw_wheel.insert(Entry::new(250, 2000));
444 rw_wheel.insert(Entry::new(250, 3000));
445 rw_wheel.insert(Entry::new(250, 4000));
446
447 rw_wheel.insert(Entry::new(250, 100000)); // insert entry that will go into overflow wheel
448
449 let serialized = bincode::serialize(&rw_wheel).unwrap();
450
451 let mut deserialized_wheel =
452 bincode::deserialize::<RwWheel<U32SumAggregator>>(&serialized).unwrap();
453
454 assert_eq!(deserialized_wheel.watermark(), 0);
455 deserialized_wheel.advance_to(5000);
456 assert_eq!(deserialized_wheel.read().interval(4.seconds()), Some(1000));
457
458 // advance passed the overflow wheel entry and make sure its still there
459 deserialized_wheel.advance_to(101000);
460 assert_eq!(deserialized_wheel.read().interval(1.seconds()), Some(250));
461 }
462
463 #[cfg(feature = "timer")]
464 #[test]
465 fn timer_once_test() {
466 let mut rw_wheel: RwWheel<U32SumAggregator> = RwWheel::default();
467 let gate = Rc::new(RefCell::new(false));
468 let inner_gate = gate.clone();
469
470 let _ = rw_wheel.read().schedule_once(5000, move |read| {
471 if let Some(last_five) = read.interval(5.seconds()) {
472 *inner_gate.borrow_mut() = true;
473 assert_eq!(last_five, 1000);
474 }
475 });
476 rw_wheel.insert(Entry::new(250, 1000));
477 rw_wheel.insert(Entry::new(250, 2000));
478 rw_wheel.insert(Entry::new(250, 3000));
479 rw_wheel.insert(Entry::new(250, 4000));
480
481 rw_wheel.advance_to(5000);
482
483 // assert that the timer action was triggered
484 assert!(*gate.borrow());
485 }
486
487 #[cfg(feature = "timer")]
488 #[test]
489 fn timer_repeat_test() {
490 let mut rw_wheel: RwWheel<U32SumAggregator> = RwWheel::default();
491 let sum = Rc::new(RefCell::new(0));
492 let inner_sum = sum.clone();
493
494 // schedule a repeat action
495 let _ = rw_wheel
496 .read()
497 .schedule_repeat(5000, 5.seconds(), move |read| {
498 if let Some(last_five) = read.interval(5.seconds()) {
499 *inner_sum.borrow_mut() += last_five;
500 }
501 });
502
503 rw_wheel.insert(Entry::new(250, 1000));
504 rw_wheel.insert(Entry::new(250, 2000));
505 rw_wheel.insert(Entry::new(250, 3000));
506 rw_wheel.insert(Entry::new(250, 4000));
507
508 // trigger first timer to add sum of last 5 seconds
509 rw_wheel.advance_to(5000);
510 assert_eq!(*sum.borrow(), 1000);
511
512 rw_wheel.insert(Entry::new(250, 5000));
513 rw_wheel.insert(Entry::new(250, 6000));
514 rw_wheel.insert(Entry::new(250, 7000));
515
516 // trigger second timer to add sum of last 5 seconds
517 rw_wheel.advance_to(10000);
518 assert_eq!(*sum.borrow(), 1750);
519 }
520
521 #[cfg(feature = "sync")]
522 #[test]
523 fn read_wheel_move_thread_test() {
524 let mut rw_wheel: RwWheel<U32SumAggregator> = RwWheel::default();
525 rw_wheel.insert(Entry::new(1, 999));
526 rw_wheel.advance(1.seconds());
527
528 let read = rw_wheel.read().clone();
529
530 let handle = std::thread::spawn(move || {
531 assert_eq!(read.interval(1.seconds()), Some(1));
532 });
533
534 handle.join().expect("Failed to join the thread.");
535 }
536
537 #[test]
538 fn interval_test() {
539 let mut time = 0;
540 let mut wheel = RwWheel::<U32SumAggregator>::new(time);
541 wheel.advance(1.seconds());
542
543 wheel.insert(Entry::new(1u32, 1000));
544 wheel.insert(Entry::new(5u32, 5000));
545 wheel.insert(Entry::new(11u32, 11000));
546
547 wheel.advance(5.seconds());
548 assert_eq!(wheel.watermark(), 6000);
549
550 // let expected: &[_] = &[&0, &1u32, &0, &0, &0, &5];
551 // assert_eq!(
552 // &wheel
553 // .read()
554 // .as_ref()
555 // .seconds_unchecked()
556 // .iter()
557 // .collect::<Vec<&u32>>(),
558 // expected
559 // );
560
561 assert_eq!(
562 wheel.read().as_ref().seconds_unchecked().interval(5).0,
563 Some(6u32)
564 );
565 assert_eq!(
566 wheel.read().as_ref().seconds_unchecked().interval(1).0,
567 Some(5u32)
568 );
569
570 time = 12000;
571 wheel.advance_to(time);
572
573 wheel.insert(Entry::new(100u32, 61000));
574 wheel.insert(Entry::new(100u32, 63000));
575 wheel.insert(Entry::new(100u32, 67000));
576
577 // go pass seconds wheel
578 time = 65000;
579 wheel.advance_to(time);
580 }
581
582 #[test]
583 fn mixed_timestamp_insertions_test() {
584 let mut time = 1000;
585 let mut wheel = RwWheel::<U32SumAggregator>::new(time);
586 wheel.advance_to(time);
587
588 wheel.insert(Entry::new(1u32, 1000));
589 wheel.insert(Entry::new(5u32, 5000));
590 wheel.insert(Entry::new(11u32, 11000));
591
592 time = 6000; // new watermark
593 wheel.advance_to(time);
594
595 assert_eq!(
596 wheel.read().as_ref().seconds_unchecked().total(),
597 Some(6u32)
598 );
599 // check we get the same result by combining the range of last 6 seconds
600 assert_eq!(
601 wheel
602 .read()
603 .as_ref()
604 .seconds_unchecked()
605 .combine_range_and_lower(0..5),
606 Some(6u32)
607 );
608 }
609
610 // #[test]
611 // fn full_cycle_test() {
612 // let mut wheel = RwWheel::<U32SumAggregator>::new(0);
613
614 // let ticks = wheel.read().remaining_ticks() - 1;
615 // wheel.advance(Duration::seconds(ticks as i64));
616
617 // // one tick away from full cycle clear
618 // assert_eq!(
619 // wheel.read().as_ref().seconds_unchecked().rotation_count(),
620 // SECONDS - 1
621 // );
622 // assert_eq!(
623 // wheel.read().as_ref().minutes_unchecked().rotation_count(),
624 // MINUTES - 1
625 // );
626 // assert_eq!(
627 // wheel.read().as_ref().hours_unchecked().rotation_count(),
628 // HOURS - 1
629 // );
630 // assert_eq!(
631 // wheel.read().as_ref().days_unchecked().rotation_count(),
632 // DAYS - 1
633 // );
634 // assert_eq!(
635 // wheel.read().as_ref().weeks_unchecked().rotation_count(),
636 // WEEKS - 1
637 // );
638 // assert_eq!(
639 // wheel.read().as_ref().years_unchecked().rotation_count(),
640 // YEARS - 1
641 // );
642
643 // // force full cycle clear
644 // wheel.advance(1.seconds());
645
646 // // rotation count of all wheels should be zero
647 // assert_eq!(
648 // wheel.read().as_ref().seconds_unchecked().rotation_count(),
649 // 0,
650 // );
651 // assert_eq!(
652 // wheel.read().as_ref().minutes_unchecked().rotation_count(),
653 // 0,
654 // );
655 // assert_eq!(wheel.read().as_ref().hours_unchecked().rotation_count(), 0,);
656 // assert_eq!(wheel.read().as_ref().days_unchecked().rotation_count(), 0,);
657 // assert_eq!(wheel.read().as_ref().weeks_unchecked().rotation_count(), 0,);
658 // assert_eq!(wheel.read().as_ref().years_unchecked().rotation_count(), 0,);
659
660 // // Verify len of all wheels
661 // assert_eq!(wheel.read().as_ref().seconds_unchecked().len(), SECONDS);
662 // assert_eq!(wheel.read().as_ref().minutes_unchecked().len(), MINUTES);
663 // assert_eq!(wheel.read().as_ref().hours_unchecked().len(), HOURS);
664 // assert_eq!(wheel.read().as_ref().days_unchecked().len(), DAYS);
665 // assert_eq!(wheel.read().as_ref().weeks_unchecked().len(), WEEKS);
666 // assert_eq!(wheel.read().as_ref().years_unchecked().len(), YEARS);
667
668 // assert!(wheel.read().is_full());
669 // assert!(!wheel.read().is_empty());
670 // assert!(wheel.read().landmark().is_none());
671 // }
672
673 #[test]
674 fn merge_test() {
675 let time = 0;
676 let mut wheel = RwWheel::<U32SumAggregator>::new(time);
677
678 let entry = Entry::new(1u32, 5000);
679 wheel.insert(entry);
680
681 wheel.advance(60.minutes());
682
683 let fresh_wheel_time = 0;
684 let fresh_wheel = RwWheel::<U32SumAggregator>::new(fresh_wheel_time);
685 fresh_wheel.read().merge(wheel.read());
686
687 assert_eq!(fresh_wheel.read().watermark(), wheel.read().watermark());
688 assert_eq!(fresh_wheel.read().landmark(), wheel.read().landmark());
689 assert_eq!(
690 fresh_wheel.read().remaining_ticks(),
691 wheel.read().remaining_ticks()
692 );
693 }
694
695 #[test]
696 fn merge_test_low_to_high() {
697 let time = 0;
698 let mut wheel = RwWheel::<U32SumAggregator>::new(time);
699
700 let entry = Entry::new(1u32, 5000);
701 wheel.insert(entry);
702
703 wheel.advance(10.seconds());
704
705 let mut fresh_wheel = RwWheel::<U32SumAggregator>::new(time);
706 fresh_wheel.insert(Entry::new(5u32, 8000));
707 fresh_wheel.advance(9.seconds());
708
709 wheel.read().merge(fresh_wheel.read());
710
711 assert_eq!(wheel.read().landmark(), Some(6));
712 // assert_eq!(wheel.read().interval(2.seconds()), Some(5));
713 assert_eq!(wheel.read().interval(10.seconds()), Some(6));
714 }
715
716 fn create_and_advance_wheel(start: u64, end: u64) -> u64 {
717 let mut wheel: RwWheel<U32SumAggregator> = RwWheel::new(start);
718 wheel.advance_to(end);
719 wheel.watermark()
720 }
721
722 proptest! {
723 #[test]
724 fn advance_to(start in 0u64..u64::MAX, watermark in 0u64..u64::MAX) {
725 let advanced_time = create_and_advance_wheel(start, watermark);
726 prop_assert!(advanced_time >= start);
727 }
728 }
729}