nexus_async_rt/timer.rs
1//! Timer driver backed by nexus-timer wheel.
2//!
3//! O(1) insert and cancel via hierarchical timer wheel. Expired timers
4//! are collected into a pre-allocated buffer and their wakers fired.
5//! Integrates with the mio poll timeout — the nearest deadline
6//! determines how long epoll blocks.
7
8use std::future::Future;
9use std::pin::Pin;
10use std::task::{Context, Poll, Waker};
11use std::time::{Duration, Instant};
12
13use nexus_timer::{Wheel, WheelBuilder};
14
15// =============================================================================
16// TimerDriver — owned by Runtime
17// =============================================================================
18
19/// Timer wheel driver. O(1) insert, O(1) cancel, no-cascade poll.
20pub(crate) struct TimerDriver {
21 wheel: Wheel<Waker>,
22 /// Pre-allocated buffer for expired wakers. Reused across cycles.
23 expired: Vec<Waker>,
24}
25
26impl TimerDriver {
27 pub(crate) fn new(capacity: usize) -> Self {
28 let now = Instant::now();
29 let wheel = WheelBuilder::default().unbounded(capacity).build(now);
30 Self {
31 wheel,
32 expired: Vec::with_capacity(64),
33 }
34 }
35
36 /// Schedule a deadline with a waker to call on expiry.
37 /// Fire-and-forget — no handle returned (the Sleep future
38 /// doesn't need to cancel).
39 pub(crate) fn schedule(&mut self, deadline: Instant, waker: Waker) {
40 self.wheel.schedule_forget(deadline, waker);
41 }
42
43 /// Returns the nearest deadline, or `None` if no timers are pending.
44 pub(crate) fn next_deadline(&self) -> Option<Instant> {
45 self.wheel.next_deadline()
46 }
47
48 /// Drain all expired timers and wake their tasks.
49 ///
50 /// Returns the number of timers fired.
51 pub(crate) fn fire_expired(&mut self, now: Instant) -> usize {
52 self.expired.clear();
53 let fired = self.wheel.poll(now, &mut self.expired);
54 for waker in self.expired.drain(..) {
55 waker.wake();
56 }
57 fired
58 }
59}
60
61// =============================================================================
62// TimerHandle — Copy handle for tasks
63// =============================================================================
64
65/// [`Copy`] handle for scheduling timers from async tasks.
66#[derive(Clone, Copy)]
67pub struct TimerHandle {
68 driver: *mut TimerDriver,
69}
70
71impl TimerHandle {
72 pub(crate) fn new(driver: &mut TimerDriver) -> Self {
73 Self {
74 driver: std::ptr::from_mut(driver),
75 }
76 }
77
78 /// Create a [`Sleep`] future that completes after `duration`.
79 pub fn sleep(&self, duration: Duration) -> Sleep {
80 Sleep {
81 deadline: Instant::now() + duration,
82 driver: self.driver,
83 registered: false,
84 waker: None,
85 }
86 }
87
88 /// Create a [`Sleep`] future that completes at `deadline`.
89 pub fn sleep_until(&self, deadline: Instant) -> Sleep {
90 Sleep {
91 deadline,
92 driver: self.driver,
93 registered: false,
94 waker: None,
95 }
96 }
97}
98
99// =============================================================================
100// Sleep future
101// =============================================================================
102
103/// Future that completes when a deadline expires.
104///
105/// On first poll, registers the deadline with the timer wheel. On
106/// subsequent polls, re-registers if the waker has changed (the timer
107/// wheel stores a clone of the waker — a stale waker means the
108/// expiry notification goes to the wrong task).
109pub struct Sleep {
110 deadline: Instant,
111 driver: *mut TimerDriver,
112 registered: bool,
113 waker: Option<Waker>,
114}
115
116impl Future for Sleep {
117 type Output = ();
118
119 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
120 if Instant::now() >= self.deadline {
121 return Poll::Ready(());
122 }
123
124 let needs_register =
125 !self.registered || self.waker.as_ref().is_none_or(|w| !w.will_wake(cx.waker()));
126
127 if needs_register {
128 // SAFETY: driver pointer is valid (Runtime lifetime).
129 let driver = unsafe { &mut *self.driver };
130 driver.schedule(self.deadline, cx.waker().clone());
131 self.registered = true;
132 self.waker = Some(cx.waker().clone());
133 }
134
135 Poll::Pending
136 }
137}
138
139// =============================================================================
140// Timeout — wraps a future with a deadline
141// =============================================================================
142
143/// Error returned when a [`Timeout`] expires.
144#[derive(Debug, Clone, Copy, PartialEq, Eq)]
145pub struct Elapsed;
146
147impl std::fmt::Display for Elapsed {
148 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149 f.write_str("deadline elapsed")
150 }
151}
152
153impl std::error::Error for Elapsed {}
154
155/// Future that completes with `Ok(T)` if the inner future finishes
156/// before the deadline, or `Err(Elapsed)` if the deadline fires first.
157pub struct Timeout<F> {
158 future: F,
159 sleep: Sleep,
160}
161
162impl<F> Timeout<F> {
163 pub(crate) fn new(future: F, sleep: Sleep) -> Self {
164 Self { future, sleep }
165 }
166
167 /// Recover the wrapped future, discarding the timeout.
168 pub fn into_inner(self) -> F {
169 self.future
170 }
171}
172
173impl<F: Future> Future for Timeout<F> {
174 type Output = Result<F::Output, Elapsed>;
175
176 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
177 // SAFETY: we never move the inner fields out of the Pin.
178 let this = unsafe { self.get_unchecked_mut() };
179
180 // Check the deadline first so already-expired timeouts reliably
181 // return Err(Elapsed) even if the inner future is also ready.
182 if Pin::new(&mut this.sleep).poll(cx).is_ready() {
183 return Poll::Ready(Err(Elapsed));
184 }
185
186 // SAFETY: this.future is pinned because self is pinned.
187 if let Poll::Ready(val) = unsafe { Pin::new_unchecked(&mut this.future) }.poll(cx) {
188 return Poll::Ready(Ok(val));
189 }
190
191 Poll::Pending
192 }
193}
194
195// =============================================================================
196// Interval — periodic ticks
197// =============================================================================
198
199/// Strategy for handling missed interval ticks.
200///
201/// When processing takes longer than the interval period, ticks are
202/// "missed." This enum controls how the interval catches up.
203#[derive(Debug, Clone, Copy, PartialEq, Eq)]
204pub enum MissedTickBehavior {
205 /// Fire missed ticks immediately to catch up (default).
206 /// Maintains the original schedule timestamps.
207 Burst,
208 /// Skip missed ticks and jump to the next future tick aligned
209 /// with the original start time.
210 Skip,
211 /// Reschedule from now — the next tick fires one full period
212 /// from the current time, discarding the original schedule.
213 Delay,
214}
215
216/// Periodic timer that ticks at a fixed interval.
217///
218/// Created via [`crate::context::interval`]. Call `.tick().await` to
219/// wait for the next tick.
220pub struct Interval {
221 period: Duration,
222 start: Instant,
223 next_deadline: Instant,
224 sleep: Option<Sleep>,
225 missed_tick_behavior: MissedTickBehavior,
226}
227
228impl Interval {
229 pub(crate) fn new(period: Duration) -> Self {
230 assert!(!period.is_zero(), "interval period must be non-zero");
231 let now = Instant::now();
232 Self {
233 period,
234 start: now,
235 next_deadline: now + period,
236 sleep: None,
237 missed_tick_behavior: MissedTickBehavior::Burst,
238 }
239 }
240
241 pub(crate) fn new_at(start: Instant, period: Duration) -> Self {
242 assert!(!period.is_zero(), "interval period must be non-zero");
243 Self {
244 period,
245 start,
246 next_deadline: start,
247 sleep: None,
248 missed_tick_behavior: MissedTickBehavior::Burst,
249 }
250 }
251
252 /// Wait for the next tick.
253 pub async fn tick(&mut self) {
254 if self.sleep.is_none() {
255 self.sleep = Some(crate::context::sleep_until(self.next_deadline));
256 }
257
258 if let Some(ref mut sleep) = self.sleep {
259 Pin::new(sleep).await;
260 }
261
262 let now = Instant::now();
263 self.sleep = None;
264
265 match self.missed_tick_behavior {
266 MissedTickBehavior::Burst => {
267 // Advance by one period. If behind, next tick fires immediately.
268 self.next_deadline += self.period;
269 }
270 MissedTickBehavior::Skip => {
271 // Jump to the next tick aligned with the original start.
272 if now >= self.next_deadline {
273 let elapsed = now.duration_since(self.start);
274 let period_nanos = self.period.as_nanos();
275 let periods = elapsed.as_nanos() / period_nanos;
276 // Compute next deadline in nanos to avoid u32 truncation
277 // (Duration * u32 wraps after ~49 days at 1ms intervals).
278 let next_nanos = (periods + 1).saturating_mul(period_nanos);
279 let offset =
280 Duration::from_nanos(u64::try_from(next_nanos).unwrap_or(u64::MAX));
281 self.next_deadline = self.start + offset;
282 } else {
283 self.next_deadline += self.period;
284 }
285 }
286 MissedTickBehavior::Delay => {
287 // Reschedule from now.
288 self.next_deadline = now + self.period;
289 }
290 }
291 }
292
293 /// Reset the interval to fire one period from now.
294 pub fn reset(&mut self) {
295 self.next_deadline = Instant::now() + self.period;
296 self.sleep = None;
297 }
298
299 /// Reset the interval to fire at a specific instant.
300 pub fn reset_at(&mut self, deadline: Instant) {
301 self.next_deadline = deadline;
302 self.sleep = None;
303 }
304
305 /// Get the interval period.
306 pub fn period(&self) -> Duration {
307 self.period
308 }
309
310 /// Get the current missed tick behavior.
311 pub fn missed_tick_behavior(&self) -> MissedTickBehavior {
312 self.missed_tick_behavior
313 }
314
315 /// Set the missed tick behavior.
316 pub fn set_missed_tick_behavior(&mut self, behavior: MissedTickBehavior) {
317 self.missed_tick_behavior = behavior;
318 }
319}
320
321// =============================================================================
322// YieldNow — cooperative yield
323// =============================================================================
324
325/// Future that yields once, then completes.
326///
327/// Returns `Pending` on first poll, wakes itself, completes on second poll.
328/// Other ready tasks get a turn before this task resumes.
329///
330/// # Caveat: cross-thread waits
331///
332/// `yield_now` is a *cooperative* yield within the executor. It does not
333/// park the executor or yield CPU to other OS threads. On a single-threaded
334/// runtime, a tight wait loop like
335///
336/// ```ignore
337/// while !cross_thread_state_ready() {
338/// yield_now().await;
339/// }
340/// ```
341///
342/// will busy-spin and starve other OS threads (a tokio worker thread, an
343/// Aeron media driver, a separate sender thread) of CPU. The producer
344/// can't fire its wake in time, the loop appears hung even though the
345/// external work would have completed eventually.
346///
347/// For cross-thread waits, use a parking primitive instead:
348/// - `await rx.recv()` on a channel — parks until the sender wakes
349/// - `await notify.notified()` on a `Notify` — parks until `notify_one()`
350/// - mix `yield_now` with periodic `sleep` — bounded park gives the OS
351/// time to schedule producer threads
352pub struct YieldNow(pub(crate) bool);
353
354impl Future for YieldNow {
355 type Output = ();
356
357 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
358 if self.0 {
359 Poll::Ready(())
360 } else {
361 self.0 = true;
362 cx.waker().wake_by_ref();
363 Poll::Pending
364 }
365 }
366}
367
368#[cfg(test)]
369mod tests {
370 use super::*;
371 use std::task::{RawWaker, RawWakerVTable};
372
373 fn noop_waker() -> Waker {
374 fn noop(_: *const ()) {}
375 fn clone(p: *const ()) -> RawWaker {
376 RawWaker::new(p, &VTABLE)
377 }
378 static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, noop, noop, noop);
379 unsafe { Waker::from_raw(RawWaker::new(std::ptr::null(), &VTABLE)) }
380 }
381
382 #[test]
383 fn timer_driver_fire_expired() {
384 let mut driver = TimerDriver::new(64);
385 let now = Instant::now();
386 let waker = noop_waker();
387
388 driver.schedule(now - Duration::from_millis(10), waker.clone());
389 driver.schedule(now + Duration::from_secs(100), waker);
390
391 let fired = driver.fire_expired(now);
392 assert_eq!(fired, 1);
393 assert!(driver.next_deadline().unwrap() > now);
394 }
395
396 #[test]
397 fn timer_driver_next_deadline() {
398 let mut driver = TimerDriver::new(64);
399 assert!(driver.next_deadline().is_none());
400
401 let now = Instant::now();
402 let soon = now + Duration::from_millis(10);
403 let later = now + Duration::from_millis(100);
404 let waker = noop_waker();
405
406 driver.schedule(later, waker.clone());
407 driver.schedule(soon, waker);
408
409 let next = driver.next_deadline().unwrap();
410 // Timer wheel has tick-resolution quantization, so check within 2ms.
411 assert!(next <= soon + Duration::from_millis(2));
412 }
413}