lustre_executor/core/
timer.rs1use std::collections::VecDeque;
4use std::task::Waker;
5use std::time::{Duration, Instant};
6
7#[derive(Clone)]
9pub struct TimerData {
10 wheel: Vec<VecDeque<(usize, Waker)>>,
11 current_tick: u64,
12 resolution: Duration,
13 size: usize,
14 next_timer_id: usize,
15 start_time: Instant,
16}
17
18impl TimerData {
19 pub fn new() -> Self {
20 let size = 1024; let resolution = Duration::from_millis(1); Self {
23 wheel: (0..size).map(|_| VecDeque::new()).collect(),
24 current_tick: 0,
25 resolution,
26 size,
27 next_timer_id: 0,
28 start_time: Instant::now(),
29 }
30 }
31
32 pub fn register_timer(&mut self, deadline: Instant, waker: Waker) {
33 let delay = deadline.saturating_duration_since(self.start_time);
34 let ticks = delay.as_millis() as u64 / self.resolution.as_millis() as u64;
35 let slot = ((self.current_tick + ticks) % self.size as u64) as usize;
36 let id = self.next_timer_id;
37 self.next_timer_id += 1;
38 self.wheel[slot].push_back((id, waker));
39 }
40
41 pub fn check_expired(&mut self) {
42 let now = Instant::now();
43 let elapsed = now.saturating_duration_since(self.start_time);
44 let target_tick = elapsed.as_millis() as u64 / self.resolution.as_millis() as u64;
45 while self.current_tick <= target_tick {
46 let slot = (self.current_tick % self.size as u64) as usize;
47 while let Some((_, waker)) = self.wheel[slot].pop_front() {
48 waker.wake();
49 }
50 self.current_tick += 1;
51 }
52 }
53
54 pub fn next_timeout(&self) -> Option<Duration> {
55 for i in 0..self.size {
57 let slot = ((self.current_tick + i as u64) % self.size as u64) as usize;
58 if !self.wheel[slot].is_empty() {
59 return Some(Duration::from_millis(
60 i as u64 * self.resolution.as_millis() as u64,
61 ));
62 }
63 }
64 None
65 }
66
67 pub fn is_empty(&self) -> bool {
68 self.wheel.iter().all(|bucket| bucket.is_empty())
69 }
70}
71
72impl Default for TimerData {
73 fn default() -> Self {
74 Self::new()
75 }
76}