uwheel/wheels/
mod.rs

1/// Reader Wheel
2///
3/// Single reader or multi-reader with the ``sync`` feature enabled.
4pub mod read;
5/// Extension trait for implementing a custom wheel
6pub mod wheel_ext;
7/// Writer Wheel
8///
9/// Optimised for a single writer
10pub mod write;
11
12#[cfg(feature = "profiler")]
13mod stats;
14
15/// Hierarchical Wheel Timer
16#[allow(dead_code)]
17mod timer;
18
19use crate::{Entry, aggregator::Aggregator, duration::Duration, window::WindowAggregate};
20use core::{fmt::Debug, num::NonZeroUsize};
21use write::DEFAULT_WRITE_AHEAD_SLOTS;
22
23pub use read::{DAYS, HOURS, MINUTES, SECONDS, WEEKS, YEARS};
24pub use wheel_ext::WheelExt;
25pub use write::WriterWheel;
26
27use self::read::{ReaderWheel, hierarchical::HawConf};
28
29use crate::window::Window;
30
31#[cfg(not(feature = "std"))]
32use alloc::vec::Vec;
33
34#[cfg(feature = "profiler")]
35use uwheel_stats::profile_scope;
36
37/// A Reader-Writer aggregation wheel with decoupled read and write paths.
38///
39/// # How it works
40/// The design of µWheel is centered around [low-watermarking](http://www.vldb.org/pvldb/vol14/p3135-begoli.pdf), a concept found in modern streaming systems (e.g., Apache Flink, RisingWave, Arroyo).
41/// Wheels in µWheel are low-watermark indexed which enables fast writes and lookups. Additionally, it lets us avoid storing explicit timestamps since they are implicit in the wheels.
42///
43/// A low watermark `w` indicates that all records with timestamps `t` where `t <= w` have been ingested.
44/// µWheel exploits this property and seperates the write and read paths. Writes (timestamps above `w`) are handled by a Writer Wheel which is optimized for single-threaded ingestion of out-of-order aggregates.
45/// Reads (queries with `time < w`) are managed by a hierarchically indexed Reader Wheel that employs a wheel-based query optimizer whose cost function
46/// minimizes the number of aggregate operations for a query.
47///
48/// µWheel adopts a Lazy Synchronization aggregation approach. Aggregates are only shifted over from the `WriterWheel` to the `ReaderWheel`
49/// once the internal low watermark has been advanced.
50///
51/// ![](https://raw.githubusercontent.com/uwheel/uwheel/a63799ca63b0d50a25565b150120570603b6d4cf/assets/overview.svg)
52///
53/// ## Queries
54///
55/// [RwWheel] supports both streaming window aggregation queries and temporal adhoc queries over arbitrary time ranges.
56/// A window may be installed through [RwWheel::window] and queries can be executed through its ReaderWheel which is accessible through [RwWheel::read].
57///
58/// ## Example
59///
60/// Here is an example showing the use of a U32 SUM aggregator
61///
62/// ```
63/// use uwheel::{aggregator::sum::U32SumAggregator, NumericalDuration, Entry, RwWheel};
64///
65/// let time = 0;
66/// // initialize the wheel with a low watermark
67/// let mut wheel: RwWheel<U32SumAggregator> = RwWheel::new(time);
68/// // Insert an entry into the wheel
69/// wheel.insert(Entry::new(100, time));
70/// // advance the wheel to 1000
71/// wheel.advance_to(1000);
72/// // verify the new low watermark
73/// assert_eq!(wheel.watermark(), 1000);
74/// // query the last second of aggregates
75/// assert_eq!(wheel.read().interval(1.seconds()), Some(100));
76/// ```
77#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
78#[cfg_attr(feature = "serde", serde(bound = "A: Default"))]
79pub struct RwWheel<A>
80where
81    A: Aggregator,
82{
83    /// A single-writer wheel designed for high-throughput ingestion of stream aggregates
84    writer: WriterWheel<A>,
85    /// A multiple-reader wheel designed for efficient querying of aggregate across arbitrary time ranges
86    reader: ReaderWheel<A>,
87    #[cfg(feature = "profiler")]
88    stats: stats::Stats,
89}
90
91impl<A: Aggregator> Default for RwWheel<A> {
92    fn default() -> Self {
93        Self::with_conf(Default::default())
94    }
95}
96
97impl<A> RwWheel<A>
98where
99    A: Aggregator,
100{
101    /// Creates a new Wheel starting from the given time
102    ///
103    /// Time is represented as milliseconds since unix timestamp
104    ///
105    /// See [`RwWheel::with_conf`] for more detailed configuration possibilities
106    pub fn new(time: u64) -> Self {
107        let conf = Conf::default().with_haw_conf(HawConf::default().with_watermark(time));
108        Self::with_conf(conf)
109    }
110    /// Creates a new wheel using the specified configuration
111    ///
112    /// # Example
113    ///
114    /// ```
115    /// use uwheel::{RwWheel, Conf, aggregator::sum::U32SumAggregator, HawConf};
116    /// let conf = Conf::default().with_haw_conf(HawConf::default().with_watermark(10000));
117    /// let wheel: RwWheel<U32SumAggregator> = RwWheel::with_conf(conf);
118    /// ```
119    pub fn with_conf(conf: Conf) -> Self {
120        Self {
121            writer: WriterWheel::with_capacity_and_watermark(
122                conf.writer_conf.write_ahead_capacity,
123                conf.reader_conf.haw_conf.watermark,
124            ),
125            reader: ReaderWheel::with_conf(conf.reader_conf.haw_conf),
126            #[cfg(feature = "profiler")]
127            stats: stats::Stats::default(),
128        }
129    }
130    /// Installs a periodic window aggregation query
131    ///
132    /// Results of the window are returned when advancing the wheel with [Self::advance_to] or [Self::advance].
133    ///
134    /// # Example
135    ///
136    /// ```
137    /// use uwheel::{Window, aggregator::sum::U32SumAggregator, RwWheel, NumericalDuration};
138    ///
139    /// // Define a window query
140    /// let window = Window::sliding(10.seconds(), 3.seconds());
141    /// // Initialize a Reader-Writer Wheel and install the window
142    /// let mut wheel: RwWheel<U32SumAggregator> = RwWheel::new(0);
143    /// wheel.window(window);
144    /// ```
145    pub fn window(&mut self, window: impl Into<Window>) {
146        self.reader.window(window.into());
147    }
148
149    /// Inserts an entry into the wheel
150    ///
151    /// # Safety
152    ///
153    /// Entries with timestamps below the current low watermark ([Self::watermark]) are dropped.
154    ///
155    /// # Example
156    ///
157    /// ```
158    /// use uwheel::{aggregator::sum::U32SumAggregator, RwWheel, Entry};
159    ///
160    /// let mut wheel: RwWheel<U32SumAggregator> = RwWheel::new(0);
161    /// let data = 10;
162    /// let timestamp = 1000;
163    /// wheel.insert(Entry::new(data, timestamp));
164    /// ```
165    #[inline]
166    pub fn insert(&mut self, e: impl Into<Entry<A::Input>>) {
167        #[cfg(feature = "profiler")]
168        profile_scope!(&self.stats.insert);
169
170        self.writer.insert(e);
171    }
172
173    /// Returns a reference to the writer wheel
174    pub fn write(&self) -> &WriterWheel<A> {
175        &self.writer
176    }
177    /// Returns a reference to the underlying reader wheel
178    pub fn read(&self) -> &ReaderWheel<A> {
179        &self.reader
180    }
181    /// Merges another read wheel with same size into this one
182    pub fn merge_read_wheel(&self, other: &ReaderWheel<A>) {
183        self.read().merge(other);
184    }
185    /// Returns the current watermark of this wheel
186    pub fn watermark(&self) -> u64 {
187        self.writer.watermark()
188    }
189    /// Advance the watermark of the wheel by the given [Duration]
190    ///
191    /// May return possible window aggregates if any window is installed (see [RwWheel::window]).
192    ///
193    /// # Example
194    ///
195    /// ```
196    /// use uwheel::{aggregator::sum::U32SumAggregator, RwWheel, NumericalDuration};
197    ///
198    /// let mut wheel: RwWheel<U32SumAggregator> = RwWheel::new(0);
199    /// wheel.advance(5.seconds());
200    /// assert_eq!(wheel.watermark(), 5000);
201    /// ```
202    #[inline]
203    pub fn advance(&mut self, duration: Duration) -> Vec<WindowAggregate<A::PartialAggregate>> {
204        let to = self.watermark() + duration.whole_milliseconds() as u64;
205        self.advance_to(to)
206    }
207
208    /// Advances the time of the wheel to the specified watermark.
209    ///
210    /// May return possible window aggregates if any window is installed (see [RwWheel::window]).
211    ///
212    /// # Safety
213    ///
214    /// This function assumes advancement to occur in atomic units (e.g., 5 not 4.5 seconds)
215    ///
216    /// # Example
217    ///
218    /// ```
219    /// use uwheel::{aggregator::sum::U32SumAggregator, RwWheel, NumericalDuration};
220    ///
221    /// let mut wheel: RwWheel<U32SumAggregator> = RwWheel::new(0);
222    /// wheel.advance_to(5000);
223    /// assert_eq!(wheel.watermark(), 5000);
224    /// ```
225    #[inline]
226    pub fn advance_to(&mut self, watermark: u64) -> Vec<WindowAggregate<A::PartialAggregate>> {
227        #[cfg(feature = "profiler")]
228        profile_scope!(&self.stats.advance);
229
230        self.reader.advance_to(watermark, &mut self.writer)
231    }
232
233    /// Returns an estimation of bytes used by the wheel
234    pub fn size_bytes(&self) -> usize {
235        let read = self.reader.as_ref().size_bytes();
236        let write = self.writer.size_bytes().unwrap();
237        read + write
238    }
239
240    #[cfg(feature = "profiler")]
241    /// Prints the stats of the [RwWheel]
242    pub fn print_stats(&self) {
243        use prettytable::{Table, row};
244        use uwheel_stats::Sketch;
245        let mut table = Table::new();
246        table.add_row(row![
247            "name", "count", "min", "p50", "p99", "p99.9", "p99.99", "p99.999", "max",
248        ]);
249        // helper fn to format percentile
250        let percentile_fmt = |p: f64| -> String { format!("{:.2}ns", p) };
251
252        // helper fn to add row to the table
253        let add_row = |id: &str, table: &mut Table, sketch: &Sketch| {
254            let percentiles = sketch.percentiles();
255            table.add_row(row![
256                id,
257                percentiles.count,
258                percentiles.min,
259                percentile_fmt(percentiles.p50),
260                percentile_fmt(percentiles.p99),
261                percentile_fmt(percentiles.p99_9),
262                percentile_fmt(percentiles.p99_99),
263                percentile_fmt(percentiles.p99_999),
264                percentiles.min,
265            ]);
266        };
267
268        add_row("insert", &mut table, &self.stats.insert);
269        add_row("advance", &mut table, &self.stats.advance);
270
271        let read = self.reader.as_ref();
272        add_row("tick", &mut table, &read.stats().tick);
273        add_row("interval", &mut table, &read.stats().interval);
274        add_row("landmark", &mut table, &read.stats().landmark);
275        add_row("combine range", &mut table, &read.stats().combine_range);
276        add_row(
277            "combine range plan",
278            &mut table,
279            &read.stats().combine_range_plan,
280        );
281        add_row(
282            "combined aggregation",
283            &mut table,
284            &read.stats().combined_aggregation,
285        );
286
287        add_row("execution plan", &mut table, &read.stats().exec_plan);
288
289        add_row(
290            "combined aggregation plan",
291            &mut table,
292            &read.stats().combined_aggregation_plan,
293        );
294        add_row(
295            "wheel aggregation",
296            &mut table,
297            &read.stats().wheel_aggregation,
298        );
299
300        println!("====RwWheel Profiler Dump====");
301        table.printstd();
302    }
303}
304
305impl<A> Drop for RwWheel<A>
306where
307    A: Aggregator,
308{
309    fn drop(&mut self) {
310        #[cfg(feature = "profiler")]
311        self.print_stats();
312    }
313}
314
315/// [`WriterWheel`] Configuration
316#[derive(Debug, Copy, Clone)]
317pub struct WriterConf {
318    /// Defines the capacity of write-ahead slots
319    write_ahead_capacity: NonZeroUsize,
320}
321impl Default for WriterConf {
322    fn default() -> Self {
323        Self {
324            write_ahead_capacity: NonZeroUsize::new(DEFAULT_WRITE_AHEAD_SLOTS).unwrap(),
325        }
326    }
327}
328
329/// [`ReaderWheel`] Configuration
330#[derive(Debug, Default, Copy, Clone)]
331pub struct ReaderConf {
332    /// Hierarchical Aggregation Wheel configuration
333    haw_conf: HawConf,
334}
335
336/// Reader-Writer Wheel Configuration
337#[derive(Debug, Default, Copy, Clone)]
338pub struct Conf {
339    /// Writer Wheel Configuration
340    writer_conf: WriterConf,
341    /// Reader Wheel Configuration
342    reader_conf: ReaderConf,
343}
344
345impl Conf {
346    /// Configure the number of write-ahead slots
347    ///
348    /// The default value is [DEFAULT_WRITE_AHEAD_SLOTS]
349    ///
350    /// # Example
351    ///
352    /// ```
353    /// use uwheel::Conf;
354    /// use core::num::NonZeroUsize;
355    ///
356    /// // Configure a write-ahead capacity of 128 (seconds)
357    /// let rw_conf = Conf::default().with_write_ahead(NonZeroUsize::new(128).unwrap());
358    /// ```
359    pub fn with_write_ahead(mut self, capacity: NonZeroUsize) -> Self {
360        self.writer_conf.write_ahead_capacity = capacity;
361        self
362    }
363    /// Configures the reader wheel to use the given [HawConf]
364    ///
365    /// # Example
366    ///
367    /// ```
368    /// use uwheel::{HawConf, Conf, RetentionPolicy};
369    ///
370    /// // Configure all wheels in the HAW to maintain data
371    /// let haw_conf = HawConf::default().with_retention_policy(RetentionPolicy::Keep);
372    /// let rw_conf = Conf::default().with_haw_conf(haw_conf);
373    /// ```
374    pub fn with_haw_conf(mut self, conf: HawConf) -> Self {
375        self.reader_conf.haw_conf = conf;
376        self
377    }
378}
379
380#[cfg(test)]
381mod tests {
382    #[cfg(feature = "timer")]
383    use core::cell::RefCell;
384    #[cfg(feature = "timer")]
385    use std::rc::Rc;
386
387    use super::*;
388    use crate::{aggregator::sum::U32SumAggregator, duration::*, *};
389    use proptest::prelude::*;
390
391    #[test]
392    fn delta_generate_test() {
393        let haw_conf = HawConf::default().with_deltas();
394        let conf = Conf::default().with_haw_conf(haw_conf);
395
396        let mut rw_wheel: RwWheel<U32SumAggregator> = RwWheel::with_conf(conf);
397        rw_wheel.insert(Entry::new(250, 1000));
398        rw_wheel.insert(Entry::new(250, 2000));
399        rw_wheel.insert(Entry::new(250, 3000));
400        rw_wheel.insert(Entry::new(250, 4000));
401
402        rw_wheel.advance_to(5000);
403        let delta_state = rw_wheel.read().delta_state();
404
405        assert_eq!(delta_state.oldest_ts, 0);
406        assert_eq!(
407            delta_state.deltas,
408            vec![None, Some(250), Some(250), Some(250), Some(250)]
409        );
410
411        assert_eq!(rw_wheel.read().interval(4.seconds()), Some(1000));
412
413        // create a new read wheel from deltas
414        let read: ReaderWheel<U32SumAggregator> = ReaderWheel::from_delta_state(delta_state);
415        // verify the watermark
416        assert_eq!(read.watermark(), 5000);
417        // verify the the results
418        assert_eq!(read.interval(4.seconds()), Some(1000));
419    }
420
421    #[test]
422    fn insert_test() {
423        let mut rw_wheel: RwWheel<U32SumAggregator> = RwWheel::default();
424        rw_wheel.insert(Entry::new(250, 1000));
425        rw_wheel.insert(Entry::new(250, 2000));
426        rw_wheel.insert(Entry::new(250, 3000));
427        rw_wheel.insert(Entry::new(250, 4000));
428
429        // should be inserted into the timer wheel
430        rw_wheel.insert(Entry::new(250, 150000));
431
432        rw_wheel.advance_to(5000);
433
434        // should trigger insert
435        rw_wheel.advance_to(86000);
436    }
437
438    #[cfg(feature = "serde")]
439    #[test]
440    fn rw_wheel_serde_test() {
441        let mut rw_wheel: RwWheel<U32SumAggregator> = RwWheel::new(0);
442        rw_wheel.insert(Entry::new(250, 1000));
443        rw_wheel.insert(Entry::new(250, 2000));
444        rw_wheel.insert(Entry::new(250, 3000));
445        rw_wheel.insert(Entry::new(250, 4000));
446
447        rw_wheel.insert(Entry::new(250, 100000)); // insert entry that will go into overflow wheel
448
449        let serialized = bincode::serialize(&rw_wheel).unwrap();
450
451        let mut deserialized_wheel =
452            bincode::deserialize::<RwWheel<U32SumAggregator>>(&serialized).unwrap();
453
454        assert_eq!(deserialized_wheel.watermark(), 0);
455        deserialized_wheel.advance_to(5000);
456        assert_eq!(deserialized_wheel.read().interval(4.seconds()), Some(1000));
457
458        // advance passed the overflow wheel entry and make sure its still there
459        deserialized_wheel.advance_to(101000);
460        assert_eq!(deserialized_wheel.read().interval(1.seconds()), Some(250));
461    }
462
463    #[cfg(feature = "timer")]
464    #[test]
465    fn timer_once_test() {
466        let mut rw_wheel: RwWheel<U32SumAggregator> = RwWheel::default();
467        let gate = Rc::new(RefCell::new(false));
468        let inner_gate = gate.clone();
469
470        let _ = rw_wheel.read().schedule_once(5000, move |read| {
471            if let Some(last_five) = read.interval(5.seconds()) {
472                *inner_gate.borrow_mut() = true;
473                assert_eq!(last_five, 1000);
474            }
475        });
476        rw_wheel.insert(Entry::new(250, 1000));
477        rw_wheel.insert(Entry::new(250, 2000));
478        rw_wheel.insert(Entry::new(250, 3000));
479        rw_wheel.insert(Entry::new(250, 4000));
480
481        rw_wheel.advance_to(5000);
482
483        // assert that the timer action was triggered
484        assert!(*gate.borrow());
485    }
486
487    #[cfg(feature = "timer")]
488    #[test]
489    fn timer_repeat_test() {
490        let mut rw_wheel: RwWheel<U32SumAggregator> = RwWheel::default();
491        let sum = Rc::new(RefCell::new(0));
492        let inner_sum = sum.clone();
493
494        // schedule a repeat action
495        let _ = rw_wheel
496            .read()
497            .schedule_repeat(5000, 5.seconds(), move |read| {
498                if let Some(last_five) = read.interval(5.seconds()) {
499                    *inner_sum.borrow_mut() += last_five;
500                }
501            });
502
503        rw_wheel.insert(Entry::new(250, 1000));
504        rw_wheel.insert(Entry::new(250, 2000));
505        rw_wheel.insert(Entry::new(250, 3000));
506        rw_wheel.insert(Entry::new(250, 4000));
507
508        // trigger first timer to add sum of last 5 seconds
509        rw_wheel.advance_to(5000);
510        assert_eq!(*sum.borrow(), 1000);
511
512        rw_wheel.insert(Entry::new(250, 5000));
513        rw_wheel.insert(Entry::new(250, 6000));
514        rw_wheel.insert(Entry::new(250, 7000));
515
516        // trigger second timer to add sum of last 5 seconds
517        rw_wheel.advance_to(10000);
518        assert_eq!(*sum.borrow(), 1750);
519    }
520
521    #[cfg(feature = "sync")]
522    #[test]
523    fn read_wheel_move_thread_test() {
524        let mut rw_wheel: RwWheel<U32SumAggregator> = RwWheel::default();
525        rw_wheel.insert(Entry::new(1, 999));
526        rw_wheel.advance(1.seconds());
527
528        let read = rw_wheel.read().clone();
529
530        let handle = std::thread::spawn(move || {
531            assert_eq!(read.interval(1.seconds()), Some(1));
532        });
533
534        handle.join().expect("Failed to join the thread.");
535    }
536
537    #[test]
538    fn interval_test() {
539        let mut time = 0;
540        let mut wheel = RwWheel::<U32SumAggregator>::new(time);
541        wheel.advance(1.seconds());
542
543        wheel.insert(Entry::new(1u32, 1000));
544        wheel.insert(Entry::new(5u32, 5000));
545        wheel.insert(Entry::new(11u32, 11000));
546
547        wheel.advance(5.seconds());
548        assert_eq!(wheel.watermark(), 6000);
549
550        // let expected: &[_] = &[&0, &1u32, &0, &0, &0, &5];
551        // assert_eq!(
552        //     &wheel
553        //         .read()
554        //         .as_ref()
555        //         .seconds_unchecked()
556        //         .iter()
557        //         .collect::<Vec<&u32>>(),
558        //     expected
559        // );
560
561        assert_eq!(
562            wheel.read().as_ref().seconds_unchecked().interval(5).0,
563            Some(6u32)
564        );
565        assert_eq!(
566            wheel.read().as_ref().seconds_unchecked().interval(1).0,
567            Some(5u32)
568        );
569
570        time = 12000;
571        wheel.advance_to(time);
572
573        wheel.insert(Entry::new(100u32, 61000));
574        wheel.insert(Entry::new(100u32, 63000));
575        wheel.insert(Entry::new(100u32, 67000));
576
577        // go pass seconds wheel
578        time = 65000;
579        wheel.advance_to(time);
580    }
581
582    #[test]
583    fn mixed_timestamp_insertions_test() {
584        let mut time = 1000;
585        let mut wheel = RwWheel::<U32SumAggregator>::new(time);
586        wheel.advance_to(time);
587
588        wheel.insert(Entry::new(1u32, 1000));
589        wheel.insert(Entry::new(5u32, 5000));
590        wheel.insert(Entry::new(11u32, 11000));
591
592        time = 6000; // new watermark
593        wheel.advance_to(time);
594
595        assert_eq!(
596            wheel.read().as_ref().seconds_unchecked().total(),
597            Some(6u32)
598        );
599        // check we get the same result by combining the range of last 6 seconds
600        assert_eq!(
601            wheel
602                .read()
603                .as_ref()
604                .seconds_unchecked()
605                .combine_range_and_lower(0..5),
606            Some(6u32)
607        );
608    }
609
610    // #[test]
611    // fn full_cycle_test() {
612    //     let mut wheel = RwWheel::<U32SumAggregator>::new(0);
613
614    //     let ticks = wheel.read().remaining_ticks() - 1;
615    //     wheel.advance(Duration::seconds(ticks as i64));
616
617    //     // one tick away from full cycle clear
618    //     assert_eq!(
619    //         wheel.read().as_ref().seconds_unchecked().rotation_count(),
620    //         SECONDS - 1
621    //     );
622    //     assert_eq!(
623    //         wheel.read().as_ref().minutes_unchecked().rotation_count(),
624    //         MINUTES - 1
625    //     );
626    //     assert_eq!(
627    //         wheel.read().as_ref().hours_unchecked().rotation_count(),
628    //         HOURS - 1
629    //     );
630    //     assert_eq!(
631    //         wheel.read().as_ref().days_unchecked().rotation_count(),
632    //         DAYS - 1
633    //     );
634    //     assert_eq!(
635    //         wheel.read().as_ref().weeks_unchecked().rotation_count(),
636    //         WEEKS - 1
637    //     );
638    //     assert_eq!(
639    //         wheel.read().as_ref().years_unchecked().rotation_count(),
640    //         YEARS - 1
641    //     );
642
643    //     // force full cycle clear
644    //     wheel.advance(1.seconds());
645
646    //     // rotation count of all wheels should be zero
647    //     assert_eq!(
648    //         wheel.read().as_ref().seconds_unchecked().rotation_count(),
649    //         0,
650    //     );
651    //     assert_eq!(
652    //         wheel.read().as_ref().minutes_unchecked().rotation_count(),
653    //         0,
654    //     );
655    //     assert_eq!(wheel.read().as_ref().hours_unchecked().rotation_count(), 0,);
656    //     assert_eq!(wheel.read().as_ref().days_unchecked().rotation_count(), 0,);
657    //     assert_eq!(wheel.read().as_ref().weeks_unchecked().rotation_count(), 0,);
658    //     assert_eq!(wheel.read().as_ref().years_unchecked().rotation_count(), 0,);
659
660    //     // Verify len of all wheels
661    //     assert_eq!(wheel.read().as_ref().seconds_unchecked().len(), SECONDS);
662    //     assert_eq!(wheel.read().as_ref().minutes_unchecked().len(), MINUTES);
663    //     assert_eq!(wheel.read().as_ref().hours_unchecked().len(), HOURS);
664    //     assert_eq!(wheel.read().as_ref().days_unchecked().len(), DAYS);
665    //     assert_eq!(wheel.read().as_ref().weeks_unchecked().len(), WEEKS);
666    //     assert_eq!(wheel.read().as_ref().years_unchecked().len(), YEARS);
667
668    //     assert!(wheel.read().is_full());
669    //     assert!(!wheel.read().is_empty());
670    //     assert!(wheel.read().landmark().is_none());
671    // }
672
673    #[test]
674    fn merge_test() {
675        let time = 0;
676        let mut wheel = RwWheel::<U32SumAggregator>::new(time);
677
678        let entry = Entry::new(1u32, 5000);
679        wheel.insert(entry);
680
681        wheel.advance(60.minutes());
682
683        let fresh_wheel_time = 0;
684        let fresh_wheel = RwWheel::<U32SumAggregator>::new(fresh_wheel_time);
685        fresh_wheel.read().merge(wheel.read());
686
687        assert_eq!(fresh_wheel.read().watermark(), wheel.read().watermark());
688        assert_eq!(fresh_wheel.read().landmark(), wheel.read().landmark());
689        assert_eq!(
690            fresh_wheel.read().remaining_ticks(),
691            wheel.read().remaining_ticks()
692        );
693    }
694
695    #[test]
696    fn merge_test_low_to_high() {
697        let time = 0;
698        let mut wheel = RwWheel::<U32SumAggregator>::new(time);
699
700        let entry = Entry::new(1u32, 5000);
701        wheel.insert(entry);
702
703        wheel.advance(10.seconds());
704
705        let mut fresh_wheel = RwWheel::<U32SumAggregator>::new(time);
706        fresh_wheel.insert(Entry::new(5u32, 8000));
707        fresh_wheel.advance(9.seconds());
708
709        wheel.read().merge(fresh_wheel.read());
710
711        assert_eq!(wheel.read().landmark(), Some(6));
712        // assert_eq!(wheel.read().interval(2.seconds()), Some(5));
713        assert_eq!(wheel.read().interval(10.seconds()), Some(6));
714    }
715
716    fn create_and_advance_wheel(start: u64, end: u64) -> u64 {
717        let mut wheel: RwWheel<U32SumAggregator> = RwWheel::new(start);
718        wheel.advance_to(end);
719        wheel.watermark()
720    }
721
722    proptest! {
723        #[test]
724        fn advance_to(start in 0u64..u64::MAX, watermark in 0u64..u64::MAX) {
725            let advanced_time = create_and_advance_wheel(start, watermark);
726            prop_assert!(advanced_time >= start);
727        }
728    }
729}