maniac_runtime/runtime/
timer.rs

1use super::task::TaskSlot;
2use super::worker::{
3    cancel_timer_for_current_task, current_worker_now_ns, schedule_timer_for_current_task,
4};
5use std::cell::Cell;
6use std::future::Future;
7use std::pin::Pin;
8use std::ptr::{self, NonNull};
9use std::task::{Context, Poll};
10use std::time::{Duration, Instant};
11
12#[repr(u8)]
13#[derive(Clone, Copy, Debug, PartialEq, Eq)]
14pub enum TimerState {
15    Idle = 0,
16    Scheduled = 1,
17    Cancelled = 2,
18}
19
20#[derive(Clone, Copy, Debug)]
21pub struct TimerHandle {
22    task_slot: NonNull<TaskSlot>,
23    task_id: u32,
24    worker_id: u32,
25    timer_id: u64,
26}
27
28impl TimerHandle {
29    #[inline(always)]
30    pub(crate) fn new(
31        task_slot: NonNull<TaskSlot>,
32        task_id: u32,
33        worker_id: u32,
34        timer_id: u64,
35    ) -> Self {
36        Self {
37            task_slot,
38            task_id,
39            worker_id,
40            timer_id,
41        }
42    }
43
44    #[inline(always)]
45    pub(crate) fn task_slot(&self) -> NonNull<TaskSlot> {
46        self.task_slot
47    }
48
49    #[inline(always)]
50    pub(crate) fn task_id(&self) -> u32 {
51        self.task_id
52    }
53
54    #[inline(always)]
55    pub(crate) fn worker_id(&self) -> u32 {
56        self.worker_id
57    }
58
59    #[inline(always)]
60    pub(crate) fn timer_id(&self) -> u64 {
61        self.timer_id
62    }
63}
64
65unsafe impl Send for TimerHandle {}
66unsafe impl Sync for TimerHandle {}
67
68#[derive(Debug)]
69pub struct Timer {
70    state: Cell<TimerState>,
71    deadline_ns: Cell<u64>,
72    task_slot: Cell<*mut TaskSlot>,
73    task_id: Cell<u32>,
74    worker_id: Cell<u32>,
75    timer_id: Cell<u64>,
76}
77
78impl Timer {
79    pub const fn new() -> Self {
80        Self {
81            state: Cell::new(TimerState::Idle),
82            deadline_ns: Cell::new(0),
83            task_slot: Cell::new(ptr::null_mut()),
84            task_id: Cell::new(0),
85            worker_id: Cell::new(u32::MAX),
86            timer_id: Cell::new(0),
87        }
88    }
89
90    #[inline(always)]
91    pub fn delay(&self, duration: Duration) -> TimerDelay<'_> {
92        TimerDelay::new(self, duration)
93    }
94
95    #[inline(always)]
96    pub fn state(&self) -> TimerState {
97        self.state.get()
98    }
99
100    #[inline(always)]
101    pub fn deadline_ns(&self) -> u64 {
102        self.deadline_ns.get()
103    }
104
105    #[inline(always)]
106    pub fn is_scheduled(&self) -> bool {
107        self.state.get() == TimerState::Scheduled
108    }
109
110    #[inline(always)]
111    pub fn cancel(&self) -> bool {
112        cancel_timer_for_current_task(self)
113    }
114
115    #[inline(always)]
116    pub(crate) fn prepare(
117        &self,
118        task_slot: NonNull<TaskSlot>,
119        task_id: u32,
120        worker_id: u32,
121    ) -> TimerHandle {
122        self.task_slot.set(task_slot.as_ptr());
123        self.task_id.set(task_id);
124        self.worker_id.set(worker_id);
125        self.timer_id.set(0);
126        self.deadline_ns.set(0);
127        self.state.set(TimerState::Idle);
128
129        TimerHandle::new(task_slot, task_id, worker_id, 0)
130    }
131
132    #[inline(always)]
133    pub(crate) fn commit_schedule(&self, timer_id: u64, deadline_ns: u64) {
134        self.timer_id.set(timer_id);
135        self.deadline_ns.set(deadline_ns);
136        self.state.set(TimerState::Scheduled);
137    }
138
139    #[inline(always)]
140    pub(crate) fn mark_cancelled(&self, timer_id: u64) -> bool {
141        if self.timer_id.get() != timer_id {
142            return false;
143        }
144        self.state.set(TimerState::Cancelled);
145        self.clear_identity();
146        true
147    }
148
149    #[inline(always)]
150    pub(crate) fn reset(&self) {
151        self.state.set(TimerState::Idle);
152        self.clear_identity();
153    }
154
155    #[inline(always)]
156    pub(crate) fn worker_id(&self) -> Option<u32> {
157        match self.worker_id.get() {
158            id if id == u32::MAX => None,
159            id => Some(id),
160        }
161    }
162
163    #[inline(always)]
164    pub(crate) fn timer_id(&self) -> u64 {
165        self.timer_id.get()
166    }
167
168    #[inline(always)]
169    fn clear_identity(&self) {
170        self.deadline_ns.set(0);
171        self.timer_id.set(0);
172        self.worker_id.set(u32::MAX);
173        self.task_slot.set(ptr::null_mut());
174    }
175}
176
177unsafe impl Send for Timer {}
178unsafe impl Sync for Timer {}
179
180impl Default for Timer {
181    fn default() -> Self {
182        Self::new()
183    }
184}
185
186pub struct TimerDelay<'a> {
187    timer: &'a Timer,
188    delay: Duration,
189    deadline: Instant,
190    scheduled: bool,
191}
192
193impl<'a> TimerDelay<'a> {
194    #[inline(always)]
195    pub fn new(timer: &'a Timer, delay: Duration) -> Self {
196        Self {
197            timer,
198            delay,
199            deadline: Instant::now().checked_add(delay).unwrap(),
200            scheduled: false,
201        }
202    }
203}
204
205impl<'a> Future for TimerDelay<'a> {
206    type Output = ();
207
208    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
209        if !self.scheduled {
210            if schedule_timer_for_current_task(cx, self.timer, self.delay).is_some() {
211                self.scheduled = true;
212            }
213            return Poll::Pending;
214        }
215
216        // Check timer deadline using the worker's time source (same as timer wheel)
217        // rather than Instant::now() which may use a different clock on some platforms
218        let deadline = self.timer.deadline_ns();
219        if let Some(now_ns) = current_worker_now_ns() {
220            if now_ns >= deadline {
221                self.timer.reset();
222                self.scheduled = false;
223                return Poll::Ready(());
224            }
225        }
226
227        Poll::Pending
228
229        // match self.timer.state() {
230        //     TimerState::Idle | TimerState::Cancelled => {
231        //         println!("TimerDelay::Idle | Cancelled");
232        //         self.scheduled = false;
233        //         Poll::Ready(())
234        //     }
235        //     TimerState::Scheduled => {
236        //         let deadline = self.timer.deadline_ns();
237        //         if deadline == 0 {
238        //             println!("TimerDelay deadline = 0");
239        //             return Poll::Pending;
240        //         }
241
242        //         if let Some(now_ns) = current_worker_now_ns() {
243        //             println!("TimerDelay now_ns = {}", now_ns);
244        //             if now_ns >= deadline {
245        //                 self.timer.reset();
246        //                 self.scheduled = false;
247        //                 println!("TimerDelay now_ns >= deadline!");
248        //                 return Poll::Ready(());
249        //             }
250        //         }
251        //         println!("TimerDelay Pending");
252        //         Poll::Pending
253        //     }
254        // }
255    }
256}
257
258impl<'a> Drop for TimerDelay<'a> {
259    fn drop(&mut self) {
260        if self.scheduled {
261            let _ = self.timer.cancel();
262        }
263    }
264}