pi_timer/
lib.rs

1//! 不可撤销的定时器
2
3use std::{cmp::Reverse, fmt};
4
5use pi_ext_heap::{empty as heap_empty, ExtHeap};
6use pi_wheel::{TimeoutItem, Wheel};
7
8/// 不可撤销的定时器
9pub struct Timer<T, const N0: usize, const N: usize, const L: usize> {
10    wheel: Wheel<T, N0, N, L>, // 定时轮
11    heap: ExtHeap<Reverse<TimeoutItem<T>>>, // 最小堆
12    add_count: usize,
13    remove_count: usize,
14    roll_count: u64,
15}
16
17impl<T: fmt::Debug, const N0: usize, const N: usize, const L: usize> fmt::Debug
18    for Timer<T, N0, N, L>
19{
20    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
21        f.debug_struct("Timer")
22            .field("wheel", &self.wheel)
23            .field("heap", &self.heap)
24            .field("add_count", &self.add_count)
25            .field("remove_count", &self.remove_count)
26            .field("roll_count", &self.roll_count)
27            .finish()
28    }
29}
30impl<T, const N0: usize, const N: usize, const L: usize> Default for Timer<T, N0, N, L> {
31    fn default() -> Self {
32        Timer {
33            wheel: Default::default(),
34            heap: Default::default(),
35            add_count: 0,
36            remove_count: 0,
37            roll_count: 0,
38        }
39    }
40}
41
42impl<T, const N0: usize, const N: usize, const L: usize> Timer<T, N0, N, L> {
43    /// 获得添加任务数量
44    pub fn add_count(&self) -> usize {
45        self.add_count
46    }
47    /// 获得移除任务数量
48    pub fn remove_count(&self) -> usize {
49        self.remove_count
50    }
51    /// 获得滚动次数
52    pub fn roll_count(&self) -> u64 {
53        self.roll_count
54    }
55
56     /// 在当前时间之后,放入一个定时任务
57    pub fn push_time(&mut self, time: u64, el: T) {
58        self.push(match time.checked_sub(self.roll_count) {
59            Some(r) => r as usize,
60            _ => 0,
61        }, el)
62    }
63
64    /// 放入一个定时任务
65    pub fn push(&mut self, timeout: usize, el: T) {
66        self.add_count += 1;
67        if let Some(r) = self.wheel.push(TimeoutItem::new(timeout, el)) {
68            // 没有放入的定时任务的时间已经被转换成绝对时间,放入堆中
69            self.heap.push(Reverse(r), &mut (), heap_empty);
70        }
71    }
72    /// 弹出一个定时任务
73    /// * `now` 当前时间
74    /// * @return `Option<T>` 弹出的定时元素
75    pub fn pop(&mut self, now: u64) -> Option<T> {
76        loop {
77            if let Some(r) = self.wheel.pop() {
78                self.remove_count += 1;
79                return Some(r.el)
80            }
81            if self.roll_count >= now {
82                return None
83            }
84            self.roll();
85        }
86    }
87    /// 判断指定时间内是否还有定时任务
88    pub fn is_ok(&mut self, now: u64) -> bool {
89        loop {
90            if !self.wheel.is_cur_over() {
91                return true
92            }
93            if self.roll_count >= now {
94                return false
95            }
96            self.roll();
97        }
98    }
99    /// 轮滚动 - 向后滚动一个最小粒度, 可能会造成轮的逐层滚动。如果滚动到底,则修正堆上全部的定时任务,并将堆上的到期任务放入轮中
100    pub fn roll(&mut self) {
101        self.roll_count += 1;
102        if self.wheel.roll() {
103            // 修正堆上全部的定时任务
104            for i in 0..self.heap.len() {
105                unsafe { self.heap.get_unchecked_mut(i).0.timeout -= self.wheel.max_time() };
106            }
107            // 如果滚到轮的最后一层的最后一个, 则将堆上的到期任务放入轮中
108            // 检查堆顶的最近的任务
109            while let Some(it) = self.heap.peek() {
110                // 判断任务是否需要放入轮中
111                if it.0.timeout >= self.wheel.max_time() {
112                    break;
113                }
114                let it = self.heap.pop(&mut (), heap_empty).unwrap();
115                // 时间已经修正过了,可以直接放入定时轮中
116                self.wheel.push(it.0);
117            }
118        }
119    }
120}
121
122// 测试定时器得延时情况
123#[cfg(test)]
124mod test_mod {
125    extern crate pcg_rand;
126    extern crate rand_core;
127
128    use std::{
129        thread,
130        time::{Duration, Instant},
131    };
132
133    use self::rand_core::{RngCore, SeedableRng};
134    use crate::*;
135
136    #[test]
137    fn test() {
138        let mut timer: Timer<(u64, u64), 128, 16, 1> = Default::default();
139        let mut rng = pcg_rand::Pcg32::seed_from_u64(22222);
140        let start = Instant::now();
141        println!("max_time:{}", timer.wheel.max_time());
142        for i in 1..100000 {
143            let t = (rng.next_u32() % 16100) as u64;
144            let now = Instant::now();
145            let tt = now.duration_since(start).as_millis() as u64;
146            if i < 100 {
147                println!("push: timeout:{} realtime:{:?}", t, (i, t + tt));
148                timer.push(t as usize, (i, t + tt));
149            }
150            if t == 9937 || t == 15280 {
151                println!("{:?}", timer.wheel);
152            }
153            //while let Some(it) = timer.pop(tt) {
154            while timer.is_ok(tt) {
155                let it = timer.pop(tt).unwrap();
156                println!("ppp:{:?}, now:{}", it, tt);
157            }
158            if i > 100 && timer.add_count == timer.remove_count {
159                //println!("vec:{:?}", vec);
160                println!(
161                    "return: add_count:{:?}",
162                    timer.add_count
163                );
164                return;
165            }
166            thread::sleep(Duration::from_millis(1 as u64));
167        }
168    }
169
170}