1use std::future::Future;
9use std::pin::Pin;
10use std::task::{Context, Poll, Waker};
11use std::time::{Duration, Instant};
12
13use nexus_timer::{Wheel, WheelBuilder};
14
15pub(crate) struct TimerDriver {
21 wheel: Wheel<Waker>,
22 expired: Vec<Waker>,
24}
25
26impl TimerDriver {
27 pub(crate) fn new(capacity: usize) -> Self {
28 let now = Instant::now();
29 let wheel = WheelBuilder::default().unbounded(capacity).build(now);
30 Self {
31 wheel,
32 expired: Vec::with_capacity(64),
33 }
34 }
35
36 pub(crate) fn schedule(&mut self, deadline: Instant, waker: Waker) {
40 self.wheel.schedule_forget(deadline, waker);
41 }
42
43 pub(crate) fn next_deadline(&self) -> Option<Instant> {
45 self.wheel.next_deadline()
46 }
47
48 pub(crate) fn fire_expired(&mut self, now: Instant) -> usize {
52 self.expired.clear();
53 let fired = self.wheel.poll(now, &mut self.expired);
54 for waker in self.expired.drain(..) {
55 waker.wake();
56 }
57 fired
58 }
59}
60
61#[derive(Clone, Copy)]
67pub struct TimerHandle {
68 driver: *mut TimerDriver,
69}
70
71impl TimerHandle {
72 pub(crate) fn new(driver: &mut TimerDriver) -> Self {
73 Self {
74 driver: std::ptr::from_mut(driver),
75 }
76 }
77
78 pub fn sleep(&self, duration: Duration) -> Sleep {
80 Sleep {
81 deadline: Instant::now() + duration,
82 driver: self.driver,
83 registered: false,
84 waker: None,
85 }
86 }
87
88 pub fn sleep_until(&self, deadline: Instant) -> Sleep {
90 Sleep {
91 deadline,
92 driver: self.driver,
93 registered: false,
94 waker: None,
95 }
96 }
97}
98
99pub struct Sleep {
110 deadline: Instant,
111 driver: *mut TimerDriver,
112 registered: bool,
113 waker: Option<Waker>,
114}
115
116impl Future for Sleep {
117 type Output = ();
118
119 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
120 if Instant::now() >= self.deadline {
121 return Poll::Ready(());
122 }
123
124 let needs_register =
125 !self.registered || self.waker.as_ref().is_none_or(|w| !w.will_wake(cx.waker()));
126
127 if needs_register {
128 let driver = unsafe { &mut *self.driver };
130 driver.schedule(self.deadline, cx.waker().clone());
131 self.registered = true;
132 self.waker = Some(cx.waker().clone());
133 }
134
135 Poll::Pending
136 }
137}
138
139#[derive(Debug, Clone, Copy, PartialEq, Eq)]
145pub struct Elapsed;
146
147impl std::fmt::Display for Elapsed {
148 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149 f.write_str("deadline elapsed")
150 }
151}
152
153impl std::error::Error for Elapsed {}
154
155pub struct Timeout<F> {
158 future: F,
159 sleep: Sleep,
160}
161
162impl<F> Timeout<F> {
163 pub(crate) fn new(future: F, sleep: Sleep) -> Self {
164 Self { future, sleep }
165 }
166
167 pub fn into_inner(self) -> F {
169 self.future
170 }
171}
172
173impl<F: Future> Future for Timeout<F> {
174 type Output = Result<F::Output, Elapsed>;
175
176 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
177 let this = unsafe { self.get_unchecked_mut() };
179
180 if Pin::new(&mut this.sleep).poll(cx).is_ready() {
183 return Poll::Ready(Err(Elapsed));
184 }
185
186 if let Poll::Ready(val) = unsafe { Pin::new_unchecked(&mut this.future) }.poll(cx) {
188 return Poll::Ready(Ok(val));
189 }
190
191 Poll::Pending
192 }
193}
194
195#[derive(Debug, Clone, Copy, PartialEq, Eq)]
204pub enum MissedTickBehavior {
205 Burst,
208 Skip,
211 Delay,
214}
215
216pub struct Interval {
221 period: Duration,
222 start: Instant,
223 next_deadline: Instant,
224 sleep: Option<Sleep>,
225 missed_tick_behavior: MissedTickBehavior,
226}
227
228impl Interval {
229 pub(crate) fn new(period: Duration) -> Self {
230 assert!(!period.is_zero(), "interval period must be non-zero");
231 let now = Instant::now();
232 Self {
233 period,
234 start: now,
235 next_deadline: now + period,
236 sleep: None,
237 missed_tick_behavior: MissedTickBehavior::Burst,
238 }
239 }
240
241 pub(crate) fn new_at(start: Instant, period: Duration) -> Self {
242 assert!(!period.is_zero(), "interval period must be non-zero");
243 Self {
244 period,
245 start,
246 next_deadline: start,
247 sleep: None,
248 missed_tick_behavior: MissedTickBehavior::Burst,
249 }
250 }
251
252 pub async fn tick(&mut self) {
254 if self.sleep.is_none() {
255 self.sleep = Some(crate::context::sleep_until(self.next_deadline));
256 }
257
258 if let Some(ref mut sleep) = self.sleep {
259 Pin::new(sleep).await;
260 }
261
262 let now = Instant::now();
263 self.sleep = None;
264
265 match self.missed_tick_behavior {
266 MissedTickBehavior::Burst => {
267 self.next_deadline += self.period;
269 }
270 MissedTickBehavior::Skip => {
271 if now >= self.next_deadline {
273 let elapsed = now.duration_since(self.start);
274 let period_nanos = self.period.as_nanos();
275 let periods = elapsed.as_nanos() / period_nanos;
276 let next_nanos = (periods + 1).saturating_mul(period_nanos);
279 let offset =
280 Duration::from_nanos(u64::try_from(next_nanos).unwrap_or(u64::MAX));
281 self.next_deadline = self.start + offset;
282 } else {
283 self.next_deadline += self.period;
284 }
285 }
286 MissedTickBehavior::Delay => {
287 self.next_deadline = now + self.period;
289 }
290 }
291 }
292
293 pub fn reset(&mut self) {
295 self.next_deadline = Instant::now() + self.period;
296 self.sleep = None;
297 }
298
299 pub fn reset_at(&mut self, deadline: Instant) {
301 self.next_deadline = deadline;
302 self.sleep = None;
303 }
304
305 pub fn period(&self) -> Duration {
307 self.period
308 }
309
310 pub fn missed_tick_behavior(&self) -> MissedTickBehavior {
312 self.missed_tick_behavior
313 }
314
315 pub fn set_missed_tick_behavior(&mut self, behavior: MissedTickBehavior) {
317 self.missed_tick_behavior = behavior;
318 }
319}
320
321pub struct YieldNow(pub(crate) bool);
330
331impl Future for YieldNow {
332 type Output = ();
333
334 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
335 if self.0 {
336 Poll::Ready(())
337 } else {
338 self.0 = true;
339 cx.waker().wake_by_ref();
340 Poll::Pending
341 }
342 }
343}
344
345#[cfg(test)]
346mod tests {
347 use super::*;
348 use std::task::{RawWaker, RawWakerVTable};
349
350 fn noop_waker() -> Waker {
351 fn noop(_: *const ()) {}
352 fn clone(p: *const ()) -> RawWaker {
353 RawWaker::new(p, &VTABLE)
354 }
355 static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, noop, noop, noop);
356 unsafe { Waker::from_raw(RawWaker::new(std::ptr::null(), &VTABLE)) }
357 }
358
359 #[test]
360 fn timer_driver_fire_expired() {
361 let mut driver = TimerDriver::new(64);
362 let now = Instant::now();
363 let waker = noop_waker();
364
365 driver.schedule(now - Duration::from_millis(10), waker.clone());
366 driver.schedule(now + Duration::from_secs(100), waker);
367
368 let fired = driver.fire_expired(now);
369 assert_eq!(fired, 1);
370 assert!(driver.next_deadline().unwrap() > now);
371 }
372
373 #[test]
374 fn timer_driver_next_deadline() {
375 let mut driver = TimerDriver::new(64);
376 assert!(driver.next_deadline().is_none());
377
378 let now = Instant::now();
379 let soon = now + Duration::from_millis(10);
380 let later = now + Duration::from_millis(100);
381 let waker = noop_waker();
382
383 driver.schedule(later, waker.clone());
384 driver.schedule(soon, waker);
385
386 let next = driver.next_deadline().unwrap();
387 assert!(next <= soon + Duration::from_millis(2));
389 }
390}