algorithm/timer/
timer_wheel.rs

1use std::{
2    fmt::{self, Display},
3    ptr, u64,
4};
5
6use super::Timer;
7
8struct Entry<T: Timer> {
9    val: T,
10    when: u64,
11    id: u64,
12}
13/// 单轮结构
14pub struct OneTimerWheel<T: Timer> {
15    /// 当时指针指向的位置,如秒针指向3点钟方向
16    index: u64,
17    /// 当前结构的容量,比如60s可能有30个槽,每个都是2秒
18    capation: u64,
19    /// 当前槽的个数
20    num: u64,
21    /// 当前结构步长,如分钟就表示60s的
22    step: u64,
23    /// 修正步长,当前的步长*基础步长
24    fix_step: u64,
25    /// 当前槽位容纳的元素
26    slots: Vec<Vec<Entry<T>>>,
27    /// 当前轮结构的父轮,如当前是分的,那父轮为时轮
28    parent: *mut OneTimerWheel<T>,
29    /// 当前轮结构的子轮,如当前是分的,那父轮为秒轮
30    child: *mut OneTimerWheel<T>,
31    /// 当前轮的名字,辅助定位
32    name: &'static str,
33}
34
35impl<T: Timer> OneTimerWheel<T> {
36    pub fn new(num: u64, step: u64, one_step: u64, name: &'static str) -> Self {
37        let mut slots = vec![];
38        for _ in 0..num {
39            slots.push(Vec::new());
40        }
41        Self {
42            index: 0,
43            capation: num * step,
44            num,
45            step,
46            fix_step: one_step * step,
47            slots,
48            parent: ptr::null_mut(),
49            child: ptr::null_mut(),
50            name,
51        }
52    }
53
54    pub fn clear(&mut self) {
55        for idx in 0..self.num as usize {
56            self.slots[idx].clear();
57        }
58    }
59
60    pub fn append(&mut self, next: *mut OneTimerWheel<T>) {
61        if self.child.is_null() {
62            unsafe {
63                (*next).parent = self;
64                self.child = next;
65            }
66        } else {
67            unsafe {
68                (*self.child).append(next);
69            }
70        }
71    }
72
73    fn add_timer(&mut self, entry: Entry<T>) {
74        let offset = entry.when;
75        self.add_timer_with_offset(entry, offset);
76    }
77
78    fn del_timer(&mut self, timer_id: u64) -> Option<T> {
79        for i in 0..self.num as usize {
80            let mut found_idx = None;
81            for (idx, val) in self.slots[i].iter().enumerate() {
82                if val.id == timer_id {
83                    found_idx = Some(idx);
84                    break;
85                }
86            }
87            if let Some(idx) = found_idx {
88                return Some(self.slots[i].remove(idx).val);
89            }
90        }
91        None
92    }
93
94    fn get_timer(&self, timer_id: &u64) -> Option<&T> {
95        for i in 0..self.num as usize {
96            for val in self.slots[i].iter() {
97                if &val.id == timer_id {
98                    return Some(&val.val);
99                }
100            }
101        }
102        None
103    }
104
105    fn get_mut_timer(&mut self, timer_id: &u64) -> Option<&mut T> {
106        for i in 0..self.num as usize {
107            let mut found_idx = None;
108            let v = &mut self.slots[i];
109            for (idx, val) in v.iter().enumerate() {
110                if &val.id == timer_id {
111                    found_idx = Some(idx);
112                    break;
113                }
114            }
115            if let Some(idx) = found_idx {
116                return Some(&mut self.slots[i][idx].val);
117            }
118        }
119        None
120    }
121
122    fn add_step_timer(&mut self, entry: Entry<T>) {
123        let offset = entry.when % self.num;
124        self.add_timer_with_offset(entry, offset);
125    }
126
127    fn add_timer_with_offset(&mut self, entry: Entry<T>, offset: u64) {
128        if offset > self.capation {
129            let index = (self.index + self.num - 1) % self.num;
130            self.slots[index as usize].push(entry);
131        } else if offset <= self.fix_step && !self.child.is_null() {
132            unsafe {
133                (*self.child).add_timer_with_offset(entry, offset);
134            }
135        } else {
136            // 当前偏差值还在自己的容纳范围之前,做容错,排在最后处理位
137            let index = (offset - 1) / self.fix_step;
138            let index = (index + self.index) % self.num;
139            self.slots[index as usize].push(entry);
140        }
141    }
142
143    pub fn update_index(
144        &mut self,
145        offset: u64,
146        remainder: u64,
147        result: &mut Vec<(u64, T)>,
148    ) -> (u64, u64) {
149        let next = self.index + offset;
150        let mut all = 0;
151        for idx in self.index..next {
152            if all > self.num {
153                break;
154            }
155            all += 1;
156            let idx = idx % self.num;
157            let list = &mut self.slots[idx as usize];
158            for val in list.drain(..) {
159                result.push((val.id, val.val));
160            }
161        }
162        self.index = next % self.num;
163        if !self.child.is_null() {
164            unsafe {
165                let list = &mut self.slots[self.index as usize];
166                for mut val in list.drain(..) {
167                    val.when = (val.when % self.step).saturating_sub(remainder);
168                    if val.when == 0 {
169                        result.push((val.id, val.val));
170                    } else {
171                        (*self.child).add_step_timer(val);
172                    }
173                }
174            }
175        }
176        (next / self.num, next % self.num + remainder)
177    }
178}
179
180/// 计时器轮,模拟时钟格式组成的高效计时器
181///
182/// 时间轮是一个环形的数据结构,可以想象成一个时钟的面,被分成多个格子
183///
184/// 每个格子代表一段时间,这段时间越短,定时器的精度就越高。
185///
186/// 每个格子用一个Vec存储放在该格子上的延时任务。
187///
188/// Mark: 在Rust中双向链表中暂未提供元素关键列表的接口,这里改用Vec,删除时会额外移动Vec值
189///
190/// # Examples
191///
192/// ```
193/// use algorithm::TimerWheel;
194/// fn main() {
195///     let mut timer = TimerWheel::new();
196///     timer.append_timer_wheel(60, "SecondWheel");
197///     timer.append_timer_wheel(60, "MinuteWheel");
198///     timer.append_timer_wheel(12, "HourWheel");
199///     timer.add_timer(30);
200///     assert_eq!(timer.get_delay_id(), 30);
201///     timer.add_timer(149);
202///     assert_eq!(timer.get_delay_id(), 30);
203///     let t = timer.add_timer(600);
204///     assert_eq!(timer.get_delay_id(), 30);
205///     timer.add_timer(1);
206///     assert_eq!(timer.get_delay_id(), 1);
207///     timer.del_timer(t);
208///     timer.add_timer(150);
209///     assert_eq!(timer.get_delay_id(), 1);
210///     let val = timer.update_deltatime(30).unwrap();
211///     assert_eq!(val.iter().map(|(_, v)| *v).collect::<Vec<usize>>(), vec![1, 30]);
212///     timer.add_timer(2);
213///     let val = timer.update_deltatime(119).unwrap();
214///     assert_eq!(val.iter().map(|(_, v)| *v).collect::<Vec<usize>>(), vec![2, 149]);
215///     let val = timer.update_deltatime(1).unwrap();
216///     assert_eq!(val.iter().map(|(_, v)| *v).collect::<Vec<usize>>(), vec![150]);
217///     assert!(timer.is_empty());
218/// }
219/// ```
220pub struct TimerWheel<T: Timer> {
221    /// 时轮的最大轮,以时钟为例就是时针
222    greatest: *mut OneTimerWheel<T>,
223    /// 时轮的最小轮,以时钟为例就是秒针
224    lessest: *mut OneTimerWheel<T>,
225    /// 时轮的最小间隔,以时间为例就是秒
226    one_step: u64,
227    /// 维护定时器id
228    next_timer_id: u64,
229    /// 限制最大的timer id
230    max_timer_id: u64,
231    /// 离的最近的id
232    delay_id: u64,
233    /// 总共的递进步长,缓存优化触发
234    all_deltatime: u64,
235    /// 当时时轮里的元素个数
236    len: usize,
237}
238
239impl<T: Timer> TimerWheel<T> {
240    /// 创建一个计时器轮
241    /// # Examples
242    ///
243    /// ```
244    /// use algorithm::TimerWheel;
245    /// fn main() {
246    ///     let mut timer = TimerWheel::<u64>::new();
247    ///     assert!(timer.is_empty());
248    /// }
249    /// ```
250    pub fn new() -> Self {
251        Self {
252            greatest: ptr::null_mut(),
253            lessest: ptr::null_mut(),
254            next_timer_id: 1,
255            max_timer_id: u64::MAX,
256            delay_id: 0,
257            one_step: 1,
258            all_deltatime: 0,
259            len: 0,
260        }
261    }
262
263    /// 获取计时器轮的长度
264    /// # Examples
265    ///
266    /// ```
267    /// use algorithm::TimerWheel;
268    /// fn main() {
269    ///     let mut timer = TimerWheel::<u64>::new();
270    ///     timer.append_timer_wheel(60, "SecondWheel");
271    ///     assert!(timer.is_empty());
272    ///     timer.add_timer(1);
273    ///     assert_eq!(timer.len(), 1);
274    ///     let t = timer.add_timer(2);
275    ///     assert_eq!(timer.len(), 2);
276    ///     timer.del_timer(t);
277    ///     assert_eq!(timer.len(), 1);
278    /// }
279    /// ```
280    pub fn len(&self) -> usize {
281        self.len
282    }
283
284    /// 是否为空
285    /// # Examples
286    ///
287    /// ```
288    /// use algorithm::TimerWheel;
289    /// fn main() {
290    ///     let mut timer = TimerWheel::<u64>::new();
291    ///     assert!(timer.is_empty());
292    /// }
293    /// ```
294    pub fn is_empty(&self) -> bool {
295        self.len == 0
296    }
297
298    /// 清除所有的槽位
299    /// # Examples
300    ///
301    /// ```
302    /// use algorithm::TimerWheel;
303    /// fn main() {
304    ///     let mut timer = TimerWheel::<u64>::new();
305    ///     timer.append_timer_wheel(60, "SecondWheel");
306    ///     assert!(timer.is_empty());
307    ///     timer.add_timer(1);
308    ///     timer.add_timer(2);
309    ///     assert_eq!(timer.len(), 2);
310    ///     timer.clear();
311    ///     assert_eq!(timer.len(), 0);
312    /// }
313    /// ```
314    pub fn clear(&mut self) {
315        let mut wheel = self.lessest;
316        while !wheel.is_null() {
317            unsafe {
318                (*wheel).clear();
319                wheel = (*wheel).parent;
320            }
321        }
322        self.len = 0;
323    }
324
325    pub fn get_one_step(&self) -> u64 {
326        self.one_step
327    }
328
329    pub fn set_one_step(&mut self, step: u64) {
330        self.one_step = step.max(1);
331    }
332    /// 添加计时器轮, 设置槽位和精度值, 名字用来辅助
333    ///
334    /// # Examples
335    ///
336    /// ```
337    /// use algorithm::TimerWheel;
338    /// fn main() {
339    ///     let mut timer = TimerWheel::new();
340    ///     timer.append_timer_wheel(60, "SecondWheel");
341    ///     timer.append_timer_wheel(60, "MinuteWheel");
342    ///     timer.append_timer_wheel(12, "HourWheel");
343    ///     timer.add_timer(30);
344    /// }
345    pub fn append_timer_wheel(&mut self, slots: u64, name: &'static str) {
346        debug_assert!(self.len == 0, "必须时轮为空才可改变时轮");
347        let step = if self.greatest.is_null() {
348            self.one_step
349        } else {
350            let mut now_step = 1;
351            let mut node = self.greatest;
352            unsafe {
353                while !node.is_null() {
354                    now_step *= (*node).capation;
355                    node = (*node).child;
356                }
357            }
358            now_step / self.one_step
359        };
360        let one = Box::into_raw(Box::new(OneTimerWheel::new(slots, step, self.one_step, name)));
361        self.delay_id = self.delay_id.max(slots * step);
362        if self.lessest.is_null() {
363            self.lessest = one;
364            self.greatest = one;
365        } else {
366            unsafe {
367                let child = self.greatest;
368                (*one).append(child);
369                self.greatest = one;
370            }
371        }
372    }
373
374    /// 计时器轮的递进时间
375    ///
376    /// # Examples
377    ///
378    /// ```
379    /// use algorithm::TimerWheel;
380    /// fn main() {
381    ///     let mut timer = TimerWheel::new();
382    ///     timer.append_timer_wheel(60, "SecondWheel");
383    ///     timer.add_timer(30);
384    ///     let val = timer.update_deltatime(30).unwrap();
385    ///     assert_eq!(val, vec![(1, 30)]);
386    /// }
387    pub fn update_deltatime(&mut self, delta: u64) -> Option<Vec<(u64, T)>> {
388        debug_assert!(self.one_step > 0);
389        self.update_now(self.all_deltatime.wrapping_add(delta))
390    }
391
392    /// 计时器轮的递进时间
393    ///
394    /// # Examples
395    ///
396    /// ```
397    /// use algorithm::TimerWheel;
398    /// fn main() {
399    ///     let mut timer = TimerWheel::new();
400    ///     timer.append_timer_wheel(60, "SecondWheel");
401    ///     timer.add_timer(30);
402    ///     let val = timer.update_deltatime(30).unwrap();
403    ///     assert_eq!(val, vec![(1, 30)]);
404    /// }
405    pub fn update_now(&mut self, now: u64) -> Option<Vec<(u64, T)>> {
406        debug_assert!(self.one_step > 0);
407        self.all_deltatime = now;
408        let mut offset = self.all_deltatime / self.one_step;
409        if offset < self.delay_id {
410            return None;
411        }
412
413        self.all_deltatime -= offset * self.one_step;
414        let mut remainder = 0;
415        let mut result = vec![];
416        let mut wheel = self.lessest;
417        while !wheel.is_null() {
418            unsafe {
419                (offset, remainder) = (*wheel).update_index(offset, remainder, &mut result);
420                if offset == 0 {
421                    break;
422                }
423                wheel = (*wheel).parent;
424            }
425        }
426        self.calc_delay_id();
427        self.len -= result.len();
428        Some(result)
429    }
430
431    /// 计时器轮的递进时间
432    ///
433    /// # Examples
434    ///
435    /// ```
436    /// use algorithm::TimerWheel;
437    /// fn main() {
438    ///     let mut timer = TimerWheel::new();
439    ///     timer.append_timer_wheel(60, "SecondWheel");
440    ///     timer.add_timer(30);
441    ///     let mut idx = 0;
442    ///     timer.update_deltatime_with_callback(30, &mut |_, id, v| {
443    ///         idx = v;
444    ///         None
445    ///     });
446    ///     assert_eq!(idx, 30);
447    /// }
448    pub fn update_deltatime_with_callback<F>(&mut self, delta: u64, f: &mut F)
449    where
450        F: FnMut(&mut Self, u64, T) -> Option<(u64, T)>,
451    {
452        debug_assert!(self.one_step > 0);
453        self.update_now_with_callback(self.all_deltatime.wrapping_add(delta), f);
454    }
455
456    /// 计时器轮的递进时间
457    ///
458    /// # Examples
459    ///
460    /// ```
461    /// use algorithm::TimerWheel;
462    /// fn main() {
463    ///     let mut timer = TimerWheel::new();
464    ///     timer.append_timer_wheel(60, "SecondWheel");
465    ///     timer.add_timer(30);
466    ///     let mut idx = 0;
467    ///     timer.update_deltatime_with_callback(30, &mut |_, _, v| {
468    ///         idx = v;
469    ///         None
470    ///     });
471    ///     assert_eq!(idx, 30);
472    /// }
473    pub fn update_now_with_callback<F>(&mut self, now: u64, f: &mut F)
474    where
475        F: FnMut(&mut Self, u64, T) -> Option<(u64, T)>,
476    {
477        debug_assert!(self.one_step > 0);
478        if let Some(result) = self.update_now(now) {
479            let mut collect_result = vec![];
480            for r in result.into_iter() {
481                if let Some(v) = (*f)(self, r.0, r.1) {
482                    collect_result.push(v);
483                }
484            }
485            for (timer_id, val) in collect_result.drain(..) {
486                self.add_timer_by_id(timer_id, val);
487            }
488        }
489    }
490
491    /// 计算下一个delay_id, 根据容器的密度稀疏有关
492    /// 密度高的基本为O(1)的复杂度, 最差情况为O(n)的复杂度
493    /// 总刻度数以时钟为计秒轮遍历60次,分轮遍历60次,时轮遍历12次,即最高遍历132次
494    ///
495    /// # Examples
496    ///
497    /// ```
498    /// use algorithm::TimerWheel;
499    /// fn main() {
500    ///     let mut timer = TimerWheel::new();
501    ///     timer.append_timer_wheel(60, "SecondWheel");
502    ///     timer.add_timer(30);
503    ///     assert_eq!(timer.get_delay_id(), 30);
504    /// }
505    pub fn calc_delay_id(&mut self) {
506        let mut next_delay_id = 0;
507        let mut wheel = self.lessest;
508        'outer: while !wheel.is_null() {
509            unsafe {
510                let (step, index, cap) = ((*wheel).step, (*wheel).index, (*wheel).num);
511                for i in 0..cap {
512                    let index = (index + i) % cap;
513                    if !(*wheel).slots[index as usize].is_empty() {
514                        next_delay_id = (i + 1) * step;
515                        break 'outer;
516                    }
517                }
518                next_delay_id = cap * step;
519                wheel = (*wheel).parent;
520            }
521        }
522        self.delay_id = next_delay_id / self.one_step;
523    }
524
525    /// 删除指定的定时器,时间复杂度为O(n),
526    /// 该模型删除不具备优势,需要频繁删除请选用其它时间框架
527    ///
528    /// # Examples
529    ///
530    /// ```
531    /// use algorithm::TimerWheel;
532    /// fn main() {
533    ///     let mut timer = TimerWheel::new();
534    ///     timer.append_timer_wheel(60, "SecondWheel");
535    ///     let t = timer.add_timer(30);
536    ///     timer.del_timer(t);
537    ///     assert_eq!(timer.len(), 0);
538    /// }
539    pub fn del_timer(&mut self, timer_id: u64) -> Option<T> {
540        let mut wheel = self.lessest;
541        while !wheel.is_null() {
542            unsafe {
543                if let Some(v) = (*wheel).del_timer(timer_id) {
544                    self.len -= 1;
545                    return Some(v);
546                }
547                wheel = (*wheel).parent;
548            }
549        }
550        None
551    }
552
553    /// 获取指定的定时器,时间复杂度为O(n)
554    /// 该模型获取不具备优势,需要频繁获取请选用其它时间框架
555    ///
556    /// # Examples
557    ///
558    /// ```
559    /// use algorithm::TimerWheel;
560    /// fn main() {
561    ///     let mut timer = TimerWheel::new();
562    ///     timer.append_timer_wheel(60, "SecondWheel");
563    ///     let t = timer.add_timer(30);
564    ///     assert_eq!(timer.get_timer(&t), Some(&30));
565    /// }
566    pub fn get_timer(&self, timer_id: &u64) -> Option<&T> {
567        let mut wheel = self.lessest;
568        while !wheel.is_null() {
569            unsafe {
570                if let Some(v) = (*wheel).get_timer(timer_id) {
571                    return Some(v);
572                }
573                wheel = (*wheel).parent;
574            }
575        }
576        None
577    }
578
579    /// 获取指定的定时器,时间复杂度为O(n)
580    /// 该模型获取不具备优势,需要频繁获取请选用其它时间框架
581    ///
582    /// # Examples
583    ///
584    /// ```
585    /// use algorithm::TimerWheel;
586    /// fn main() {
587    ///     let mut timer = TimerWheel::new();
588    ///     timer.append_timer_wheel(60, "SecondWheel");
589    ///     let t = timer.add_timer(30);
590    ///     *timer.get_mut_timer(&t).unwrap() = 33;
591    ///     let val = timer.update_deltatime(30).unwrap();
592    ///     assert_eq!(val, vec![(1, 33)]);
593    /// }
594    pub fn get_mut_timer(&mut self, timer_id: &u64) -> Option<&mut T> {
595        let mut wheel = self.lessest;
596        while !wheel.is_null() {
597            unsafe {
598                if let Some(v) = (*wheel).get_mut_timer(timer_id) {
599                    return Some(v);
600                }
601                wheel = (*wheel).parent;
602            }
603        }
604        None
605    }
606
607    pub fn get_max_timerid(&self) -> u64 {
608        self.max_timer_id
609    }
610
611    pub fn set_max_timerid(&mut self, max: u64) {
612        self.max_timer_id = max;
613    }
614
615    fn get_next_timerid(&mut self) -> u64 {
616        let timer_id = self.next_timer_id;
617        if self.next_timer_id >= self.max_timer_id {
618            self.next_timer_id = 1;
619        } else {
620            self.next_timer_id = self.next_timer_id + 1;
621        }
622        timer_id
623    }
624
625    /// 添加定时器元素, 时间复杂度为O(1)
626    /// # Examples
627    ///
628    /// ```
629    /// use algorithm::TimerWheel;
630    /// fn main() {
631    ///     let mut timer = TimerWheel::new();
632    ///     timer.append_timer_wheel(60, "SecondWheel");
633    ///     timer.add_timer(30);
634    /// }
635    pub fn add_timer(&mut self, val: T) -> u64 {
636        debug_assert!(!self.greatest.is_null(), "必须设置时轮才能添加元素");
637        let timer_id: u64 = self.get_next_timerid();
638        self.add_timer_by_id(timer_id, val);
639        timer_id
640    }
641
642    pub fn add_timer_by_id(&mut self, timer_id: u64, mut val: T) {
643        let entry = Entry {
644            when: val.when_mut().max(1),
645            val,
646            id: timer_id,
647        };
648        self.delay_id = self.delay_id.min(entry.when / self.one_step);
649        unsafe {
650            (*self.greatest).add_timer(entry);
651        }
652        self.len += 1;
653    }
654
655    /// 获取下一个延时
656    /// # Examples
657    ///
658    /// ```
659    /// use algorithm::TimerWheel;
660    /// fn main() {
661    ///     let mut timer = TimerWheel::new();
662    ///     timer.append_timer_wheel(60, "SecondWheel");
663    ///     timer.add_timer(30);
664    ///     assert_eq!(timer.get_delay_id(), 30);
665    /// }
666    pub fn get_delay_id(&self) -> u64 {
667        self.delay_id
668    }
669}
670
671impl<T: Timer> Display for TimerWheel<T> {
672    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
673        f.write_str("TimerWheel {\r\n")?;
674        let mut wheel = self.greatest;
675        while !wheel.is_null() {
676            unsafe {
677                f.write_fmt(format_args!(
678                    "{}, slots: {}, step: {}",
679                    (*wheel).name,
680                    (*wheel).slots.len(),
681                    (*wheel).step
682                ))?;
683                wheel = (*wheel).child;
684            }
685        }
686        f.write_str("}")
687    }
688}
689
690impl<T: Timer> Drop for TimerWheel<T> {
691    fn drop(&mut self) {
692        let mut wheel = self.greatest;
693        while !wheel.is_null() {
694            unsafe {
695                let val = *Box::from_raw(wheel);
696                wheel = val.child;
697            }
698        }
699    }
700}