Skip to main content

nexus_async_rt/
timer.rs

1//! Timer driver backed by nexus-timer wheel.
2//!
3//! O(1) insert and cancel via hierarchical timer wheel. Expired timers
4//! are collected into a pre-allocated buffer and their wakers fired.
5//! Integrates with the mio poll timeout — the nearest deadline
6//! determines how long epoll blocks.
7
8use std::future::Future;
9use std::pin::Pin;
10use std::task::{Context, Poll, Waker};
11use std::time::{Duration, Instant};
12
13use nexus_timer::{Wheel, WheelBuilder};
14
15// =============================================================================
16// TimerDriver — owned by Runtime
17// =============================================================================
18
19/// Timer wheel driver. O(1) insert, O(1) cancel, no-cascade poll.
20pub(crate) struct TimerDriver {
21    wheel: Wheel<Waker>,
22    /// Pre-allocated buffer for expired wakers. Reused across cycles.
23    expired: Vec<Waker>,
24}
25
26impl TimerDriver {
27    pub(crate) fn new(capacity: usize) -> Self {
28        let now = Instant::now();
29        let wheel = WheelBuilder::default().unbounded(capacity).build(now);
30        Self {
31            wheel,
32            expired: Vec::with_capacity(64),
33        }
34    }
35
36    /// Schedule a deadline with a waker to call on expiry.
37    /// Fire-and-forget — no handle returned (the Sleep future
38    /// doesn't need to cancel).
39    pub(crate) fn schedule(&mut self, deadline: Instant, waker: Waker) {
40        self.wheel.schedule_forget(deadline, waker);
41    }
42
43    /// Returns the nearest deadline, or `None` if no timers are pending.
44    pub(crate) fn next_deadline(&self) -> Option<Instant> {
45        self.wheel.next_deadline()
46    }
47
48    /// Drain all expired timers and wake their tasks.
49    ///
50    /// Returns the number of timers fired.
51    pub(crate) fn fire_expired(&mut self, now: Instant) -> usize {
52        self.expired.clear();
53        let fired = self.wheel.poll(now, &mut self.expired);
54        for waker in self.expired.drain(..) {
55            waker.wake();
56        }
57        fired
58    }
59}
60
61// =============================================================================
62// TimerHandle — Copy handle for tasks
63// =============================================================================
64
65/// [`Copy`] handle for scheduling timers from async tasks.
66#[derive(Clone, Copy)]
67pub struct TimerHandle {
68    driver: *mut TimerDriver,
69}
70
71impl TimerHandle {
72    pub(crate) fn new(driver: &mut TimerDriver) -> Self {
73        Self {
74            driver: std::ptr::from_mut(driver),
75        }
76    }
77
78    /// Create a [`Sleep`] future that completes after `duration`.
79    pub fn sleep(&self, duration: Duration) -> Sleep {
80        Sleep {
81            deadline: Instant::now() + duration,
82            driver: self.driver,
83            registered: false,
84            waker: None,
85        }
86    }
87
88    /// Create a [`Sleep`] future that completes at `deadline`.
89    pub fn sleep_until(&self, deadline: Instant) -> Sleep {
90        Sleep {
91            deadline,
92            driver: self.driver,
93            registered: false,
94            waker: None,
95        }
96    }
97}
98
99// =============================================================================
100// Sleep future
101// =============================================================================
102
103/// Future that completes when a deadline expires.
104///
105/// On first poll, registers the deadline with the timer wheel. On
106/// subsequent polls, re-registers if the waker has changed (the timer
107/// wheel stores a clone of the waker — a stale waker means the
108/// expiry notification goes to the wrong task).
109pub struct Sleep {
110    deadline: Instant,
111    driver: *mut TimerDriver,
112    registered: bool,
113    waker: Option<Waker>,
114}
115
116impl Future for Sleep {
117    type Output = ();
118
119    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
120        if Instant::now() >= self.deadline {
121            return Poll::Ready(());
122        }
123
124        let needs_register =
125            !self.registered || self.waker.as_ref().is_none_or(|w| !w.will_wake(cx.waker()));
126
127        if needs_register {
128            // SAFETY: driver pointer is valid (Runtime lifetime).
129            let driver = unsafe { &mut *self.driver };
130            driver.schedule(self.deadline, cx.waker().clone());
131            self.registered = true;
132            self.waker = Some(cx.waker().clone());
133        }
134
135        Poll::Pending
136    }
137}
138
139// =============================================================================
140// Timeout — wraps a future with a deadline
141// =============================================================================
142
143/// Error returned when a [`Timeout`] expires.
144#[derive(Debug, Clone, Copy, PartialEq, Eq)]
145pub struct Elapsed;
146
147impl std::fmt::Display for Elapsed {
148    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149        f.write_str("deadline elapsed")
150    }
151}
152
153impl std::error::Error for Elapsed {}
154
155/// Future that completes with `Ok(T)` if the inner future finishes
156/// before the deadline, or `Err(Elapsed)` if the deadline fires first.
157pub struct Timeout<F> {
158    future: F,
159    sleep: Sleep,
160}
161
162impl<F> Timeout<F> {
163    pub(crate) fn new(future: F, sleep: Sleep) -> Self {
164        Self { future, sleep }
165    }
166
167    /// Recover the wrapped future, discarding the timeout.
168    pub fn into_inner(self) -> F {
169        self.future
170    }
171}
172
173impl<F: Future> Future for Timeout<F> {
174    type Output = Result<F::Output, Elapsed>;
175
176    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
177        // SAFETY: we never move the inner fields out of the Pin.
178        let this = unsafe { self.get_unchecked_mut() };
179
180        // Check the deadline first so already-expired timeouts reliably
181        // return Err(Elapsed) even if the inner future is also ready.
182        if Pin::new(&mut this.sleep).poll(cx).is_ready() {
183            return Poll::Ready(Err(Elapsed));
184        }
185
186        // SAFETY: this.future is pinned because self is pinned.
187        if let Poll::Ready(val) = unsafe { Pin::new_unchecked(&mut this.future) }.poll(cx) {
188            return Poll::Ready(Ok(val));
189        }
190
191        Poll::Pending
192    }
193}
194
195// =============================================================================
196// Interval — periodic ticks
197// =============================================================================
198
199/// Strategy for handling missed interval ticks.
200///
201/// When processing takes longer than the interval period, ticks are
202/// "missed." This enum controls how the interval catches up.
203#[derive(Debug, Clone, Copy, PartialEq, Eq)]
204pub enum MissedTickBehavior {
205    /// Fire missed ticks immediately to catch up (default).
206    /// Maintains the original schedule timestamps.
207    Burst,
208    /// Skip missed ticks and jump to the next future tick aligned
209    /// with the original start time.
210    Skip,
211    /// Reschedule from now — the next tick fires one full period
212    /// from the current time, discarding the original schedule.
213    Delay,
214}
215
216/// Periodic timer that ticks at a fixed interval.
217///
218/// Created via [`crate::context::interval`]. Call `.tick().await` to
219/// wait for the next tick.
220pub struct Interval {
221    period: Duration,
222    start: Instant,
223    next_deadline: Instant,
224    sleep: Option<Sleep>,
225    missed_tick_behavior: MissedTickBehavior,
226}
227
228impl Interval {
229    pub(crate) fn new(period: Duration) -> Self {
230        assert!(!period.is_zero(), "interval period must be non-zero");
231        let now = Instant::now();
232        Self {
233            period,
234            start: now,
235            next_deadline: now + period,
236            sleep: None,
237            missed_tick_behavior: MissedTickBehavior::Burst,
238        }
239    }
240
241    pub(crate) fn new_at(start: Instant, period: Duration) -> Self {
242        assert!(!period.is_zero(), "interval period must be non-zero");
243        Self {
244            period,
245            start,
246            next_deadline: start,
247            sleep: None,
248            missed_tick_behavior: MissedTickBehavior::Burst,
249        }
250    }
251
252    /// Wait for the next tick.
253    pub async fn tick(&mut self) {
254        if self.sleep.is_none() {
255            self.sleep = Some(crate::context::sleep_until(self.next_deadline));
256        }
257
258        if let Some(ref mut sleep) = self.sleep {
259            Pin::new(sleep).await;
260        }
261
262        let now = Instant::now();
263        self.sleep = None;
264
265        match self.missed_tick_behavior {
266            MissedTickBehavior::Burst => {
267                // Advance by one period. If behind, next tick fires immediately.
268                self.next_deadline += self.period;
269            }
270            MissedTickBehavior::Skip => {
271                // Jump to the next tick aligned with the original start.
272                if now >= self.next_deadline {
273                    let elapsed = now.duration_since(self.start);
274                    let period_nanos = self.period.as_nanos();
275                    let periods = elapsed.as_nanos() / period_nanos;
276                    // Compute next deadline in nanos to avoid u32 truncation
277                    // (Duration * u32 wraps after ~49 days at 1ms intervals).
278                    let next_nanos = (periods + 1).saturating_mul(period_nanos);
279                    let offset =
280                        Duration::from_nanos(u64::try_from(next_nanos).unwrap_or(u64::MAX));
281                    self.next_deadline = self.start + offset;
282                } else {
283                    self.next_deadline += self.period;
284                }
285            }
286            MissedTickBehavior::Delay => {
287                // Reschedule from now.
288                self.next_deadline = now + self.period;
289            }
290        }
291    }
292
293    /// Reset the interval to fire one period from now.
294    pub fn reset(&mut self) {
295        self.next_deadline = Instant::now() + self.period;
296        self.sleep = None;
297    }
298
299    /// Reset the interval to fire at a specific instant.
300    pub fn reset_at(&mut self, deadline: Instant) {
301        self.next_deadline = deadline;
302        self.sleep = None;
303    }
304
305    /// Get the interval period.
306    pub fn period(&self) -> Duration {
307        self.period
308    }
309
310    /// Get the current missed tick behavior.
311    pub fn missed_tick_behavior(&self) -> MissedTickBehavior {
312        self.missed_tick_behavior
313    }
314
315    /// Set the missed tick behavior.
316    pub fn set_missed_tick_behavior(&mut self, behavior: MissedTickBehavior) {
317        self.missed_tick_behavior = behavior;
318    }
319}
320
321// =============================================================================
322// YieldNow — cooperative yield
323// =============================================================================
324
325/// Future that yields once, then completes.
326///
327/// Returns `Pending` on first poll, wakes itself, completes on second poll.
328/// Other ready tasks get a turn before this task resumes.
329///
330/// # Caveat: cross-thread waits
331///
332/// `yield_now` is a *cooperative* yield within the executor. It does not
333/// park the executor or yield CPU to other OS threads. On a single-threaded
334/// runtime, a tight wait loop like
335///
336/// ```ignore
337/// while !cross_thread_state_ready() {
338///     yield_now().await;
339/// }
340/// ```
341///
342/// will busy-spin and starve other OS threads (a tokio worker thread, an
343/// Aeron media driver, a separate sender thread) of CPU. The producer
344/// can't fire its wake in time, the loop appears hung even though the
345/// external work would have completed eventually.
346///
347/// For cross-thread waits, use a parking primitive instead:
348/// - `await rx.recv()` on a channel — parks until the sender wakes
349/// - `await notify.notified()` on a `Notify` — parks until `notify_one()`
350/// - mix `yield_now` with periodic `sleep` — bounded park gives the OS
351///   time to schedule producer threads
352pub struct YieldNow(pub(crate) bool);
353
354impl Future for YieldNow {
355    type Output = ();
356
357    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
358        if self.0 {
359            Poll::Ready(())
360        } else {
361            self.0 = true;
362            cx.waker().wake_by_ref();
363            Poll::Pending
364        }
365    }
366}
367
368#[cfg(test)]
369mod tests {
370    use super::*;
371    use std::task::{RawWaker, RawWakerVTable};
372
373    fn noop_waker() -> Waker {
374        fn noop(_: *const ()) {}
375        fn clone(p: *const ()) -> RawWaker {
376            RawWaker::new(p, &VTABLE)
377        }
378        static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, noop, noop, noop);
379        unsafe { Waker::from_raw(RawWaker::new(std::ptr::null(), &VTABLE)) }
380    }
381
382    #[test]
383    fn timer_driver_fire_expired() {
384        let mut driver = TimerDriver::new(64);
385        let now = Instant::now();
386        let waker = noop_waker();
387
388        driver.schedule(now - Duration::from_millis(10), waker.clone());
389        driver.schedule(now + Duration::from_secs(100), waker);
390
391        let fired = driver.fire_expired(now);
392        assert_eq!(fired, 1);
393        assert!(driver.next_deadline().unwrap() > now);
394    }
395
396    #[test]
397    fn timer_driver_next_deadline() {
398        let mut driver = TimerDriver::new(64);
399        assert!(driver.next_deadline().is_none());
400
401        let now = Instant::now();
402        let soon = now + Duration::from_millis(10);
403        let later = now + Duration::from_millis(100);
404        let waker = noop_waker();
405
406        driver.schedule(later, waker.clone());
407        driver.schedule(soon, waker);
408
409        let next = driver.next_deadline().unwrap();
410        // Timer wheel has tick-resolution quantization, so check within 2ms.
411        assert!(next <= soon + Duration::from_millis(2));
412    }
413}