Skip to main content

lustre_executor/core/
timer.rs

1//! Timer management with efficient O(1) hashed wheel timer.
2
3use std::collections::VecDeque;
4use std::task::Waker;
5use std::time::{Duration, Instant};
6
7/// O(1) hashed wheel timer for efficient timer management.
8#[derive(Clone)]
9pub struct TimerData {
10    wheel: Vec<VecDeque<(usize, Waker)>>,
11    current_tick: u64,
12    resolution: Duration,
13    size: usize,
14    next_timer_id: usize,
15    start_time: Instant,
16}
17
18impl TimerData {
19    pub fn new() -> Self {
20        let size = 1024; // 1024 slots
21        let resolution = Duration::from_millis(1); // 1ms per slot
22        Self {
23            wheel: (0..size).map(|_| VecDeque::new()).collect(),
24            current_tick: 0,
25            resolution,
26            size,
27            next_timer_id: 0,
28            start_time: Instant::now(),
29        }
30    }
31
32    pub fn register_timer(&mut self, deadline: Instant, waker: Waker) {
33        let delay = deadline.saturating_duration_since(self.start_time);
34        let ticks = delay.as_millis() as u64 / self.resolution.as_millis() as u64;
35        let slot = ((self.current_tick + ticks) % self.size as u64) as usize;
36        let id = self.next_timer_id;
37        self.next_timer_id += 1;
38        self.wheel[slot].push_back((id, waker));
39    }
40
41    pub fn check_expired(&mut self) {
42        let now = Instant::now();
43        let elapsed = now.saturating_duration_since(self.start_time);
44        let target_tick = elapsed.as_millis() as u64 / self.resolution.as_millis() as u64;
45        while self.current_tick <= target_tick {
46            let slot = (self.current_tick % self.size as u64) as usize;
47            while let Some((_, waker)) = self.wheel[slot].pop_front() {
48                waker.wake();
49            }
50            self.current_tick += 1;
51        }
52    }
53
54    pub fn next_timeout(&self) -> Option<Duration> {
55        // Find the next non-empty slot
56        for i in 0..self.size {
57            let slot = ((self.current_tick + i as u64) % self.size as u64) as usize;
58            if !self.wheel[slot].is_empty() {
59                return Some(Duration::from_millis(
60                    i as u64 * self.resolution.as_millis() as u64,
61                ));
62            }
63        }
64        None
65    }
66
67    pub fn is_empty(&self) -> bool {
68        self.wheel.iter().all(|bucket| bucket.is_empty())
69    }
70}
71
72impl Default for TimerData {
73    fn default() -> Self {
74        Self::new()
75    }
76}