pi_timer/
lib.rs

1//! 不可撤销的定时器
2
3use std::{cmp::Reverse, fmt};
4
5use pi_ext_heap::{empty as heap_empty, ExtHeap};
6use pi_wheel::{TimeoutItem, Wheel};
7
8/// 不可撤销的定时器
9pub struct Timer<T, const N0: usize, const N: usize, const L: usize> {
10    wheel: Wheel<T, N0, N, L>, // 定时轮
11    heap: ExtHeap<Reverse<TimeoutItem<T>>>, // 最小堆
12    add_count: usize,
13    remove_count: usize,
14    roll_count: u64,
15}
16
17impl<T: fmt::Debug, const N0: usize, const N: usize, const L: usize> fmt::Debug
18    for Timer<T, N0, N, L>
19{
20    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
21        f.debug_struct("Timer")
22            .field("wheel", &self.wheel)
23            .field("heap", &self.heap)
24            .field("add_count", &self.add_count)
25            .field("remove_count", &self.remove_count)
26            .field("roll_count", &self.roll_count)
27            .finish()
28    }
29}
30impl<T, const N0: usize, const N: usize, const L: usize> Default for Timer<T, N0, N, L> {
31    fn default() -> Self {
32        Timer {
33            wheel: Default::default(),
34            heap: Default::default(),
35            add_count: 0,
36            remove_count: 0,
37            roll_count: 0,
38        }
39    }
40}
41
42impl<T, const N0: usize, const N: usize, const L: usize> Timer<T, N0, N, L> {
43    /// 获得添加任务数量
44    pub fn add_count(&self) -> usize {
45        self.add_count
46    }
47    /// 获得移除任务数量
48    pub fn remove_count(&self) -> usize {
49        self.remove_count
50    }
51    /// 获得滚动次数
52    pub fn roll_count(&self) -> u64 {
53        self.roll_count
54    }
55    /// 放入一个定时任务
56    pub fn push(&mut self, timeout: usize, el: T) {
57        self.add_count += 1;
58        if let Some(r) = self.wheel.push(TimeoutItem::new(timeout, el)) {
59            // 没有放入的定时任务的时间已经被转换成绝对时间,放入堆中
60            self.heap.push(Reverse(r), &mut (), heap_empty);
61        }
62    }
63    /// 弹出一个定时任务
64    /// * `now` 当前时间
65    /// * @return `Option<T>` 弹出的定时元素
66    pub fn pop(&mut self, now: u64) -> Option<T> {
67        loop {
68            if let Some(r) = self.wheel.pop() {
69                self.remove_count += 1;
70                return Some(r.el)
71            }
72            if self.roll_count >= now {
73                return None
74            }
75            self.roll();
76        }
77    }
78    /// 判断指定时间内是否还有定时任务
79    pub fn is_ok(&mut self, now: u64) -> bool {
80        loop {
81            if !self.wheel.is_cur_over() {
82                return true
83            }
84            if self.roll_count >= now {
85                return false
86            }
87            self.roll();
88        }
89    }
90    /// 轮滚动 - 向后滚动一个最小粒度, 可能会造成轮的逐层滚动。如果滚动到底,则修正堆上全部的定时任务,并将堆上的到期任务放入轮中
91    pub fn roll(&mut self) {
92        self.roll_count += 1;
93        if self.wheel.roll() {
94            // 修正堆上全部的定时任务
95            for i in 0..self.heap.len() {
96                unsafe { self.heap.get_unchecked_mut(i).0.timeout -= self.wheel.max_time() };
97            }
98            // 如果滚到轮的最后一层的最后一个, 则将堆上的到期任务放入轮中
99            // 检查堆顶的最近的任务
100            while let Some(it) = self.heap.peek() {
101                // 判断任务是否需要放入轮中
102                if it.0.timeout >= self.wheel.max_time() {
103                    break;
104                }
105                let it = self.heap.pop(&mut (), heap_empty).unwrap();
106                // 时间已经修正过了,可以直接放入定时轮中
107                self.wheel.push(it.0);
108            }
109        }
110    }
111}
112
113// 测试定时器得延时情况
114#[cfg(test)]
115mod test_mod {
116    extern crate pcg_rand;
117    extern crate rand_core;
118
119    use std::{
120        thread,
121        time::{Duration, Instant},
122    };
123
124    use self::rand_core::{RngCore, SeedableRng};
125    use crate::*;
126
127    #[test]
128    fn test() {
129        let mut timer: Timer<(u64, u64), 128, 16, 1> = Default::default();
130        let mut rng = pcg_rand::Pcg32::seed_from_u64(22222);
131        let start = Instant::now();
132        println!("max_time:{}", timer.wheel.max_time());
133        for i in 1..100000 {
134            let t = (rng.next_u32() % 16100) as u64;
135            let now = Instant::now();
136            let tt = now.duration_since(start).as_millis() as u64;
137            if i < 100 {
138                println!("push: timeout:{} realtime:{:?}", t, (i, t + tt));
139                timer.push(t as usize, (i, t + tt));
140            }
141            if t == 9937 || t == 15280 {
142                println!("{:?}", timer.wheel);
143            }
144            //while let Some(it) = timer.pop(tt) {
145            while timer.is_ok(tt) {
146                let it = timer.pop(tt).unwrap();
147                println!("ppp:{:?}, now:{}", it, tt);
148            }
149            if i > 100 && timer.add_count == timer.remove_count {
150                //println!("vec:{:?}", vec);
151                println!(
152                    "return: add_count:{:?}",
153                    timer.add_count
154                );
155                return;
156            }
157            thread::sleep(Duration::from_millis(1 as u64));
158        }
159    }
160
161}