tokio/runtime/time_alt/
timer.rs

1use super::{EntryHandle, TempLocalContext};
2use crate::runtime::scheduler::Handle as SchedulerHandle;
3use crate::time::Instant;
4
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8#[cfg(any(feature = "rt", feature = "rt-multi-thread"))]
9use crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR;
10
11pub(crate) struct Timer {
12    sched_handle: SchedulerHandle,
13
14    /// The entry in the timing wheel.
15    ///
16    /// - `Some` if the timer is registered / pending / woken up / cancelling.
17    /// - `None` if the timer is unregistered.
18    entry: Option<EntryHandle>,
19
20    /// The deadline for the timer.
21    deadline: Instant,
22}
23
24impl std::fmt::Debug for Timer {
25    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26        f.debug_struct("Timer")
27            .field("deadline", &self.deadline)
28            .finish()
29    }
30}
31
32impl Drop for Timer {
33    fn drop(&mut self) {
34        if let Some(entry) = self.entry.take() {
35            entry.cancel();
36        }
37    }
38}
39
40impl Timer {
41    #[track_caller]
42    pub(crate) fn new(sched_hdl: SchedulerHandle, deadline: Instant) -> Self {
43        // Panic if the time driver is not enabled
44        let _ = sched_hdl.driver().time();
45        Timer {
46            sched_handle: sched_hdl,
47            entry: None,
48            deadline,
49        }
50    }
51
52    pub(crate) fn deadline(&self) -> Instant {
53        self.deadline
54    }
55
56    pub(crate) fn is_elapsed(&self) -> bool {
57        self.entry.as_ref().is_some_and(|entry| entry.is_woken_up())
58    }
59
60    fn register(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
61        let this = self.get_mut();
62
63        with_current_temp_local_context(&this.sched_handle, |maybe_time_cx| {
64            let deadline = deadline_to_tick(&this.sched_handle, this.deadline);
65
66            match maybe_time_cx {
67                Some(TempLocalContext::Running {
68                    registration_queue: _,
69                    elapsed,
70                }) if deadline <= elapsed => Poll::Ready(()),
71
72                Some(TempLocalContext::Running {
73                    registration_queue,
74                    elapsed: _,
75                }) => {
76                    let hdl = EntryHandle::new(deadline, cx.waker().clone());
77                    this.entry = Some(hdl.clone());
78                    unsafe {
79                        registration_queue.push_front(hdl);
80                    }
81                    Poll::Pending
82                }
83                #[cfg(feature = "rt-multi-thread")]
84                Some(TempLocalContext::Shutdown) => panic!("{RUNTIME_SHUTTING_DOWN_ERROR}"),
85
86                _ => {
87                    let hdl = EntryHandle::new(deadline, cx.waker().clone());
88                    this.entry = Some(hdl.clone());
89                    push_from_remote(&this.sched_handle, hdl);
90                    Poll::Pending
91                }
92            }
93        })
94    }
95
96    pub(crate) fn poll_elapsed(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
97        match self.entry.as_ref() {
98            Some(entry) if entry.is_woken_up() => Poll::Ready(()),
99            Some(entry) => {
100                entry.register_waker(cx.waker().clone());
101                Poll::Pending
102            }
103            None => self.register(cx),
104        }
105    }
106
107    pub(crate) fn scheduler_handle(&self) -> &SchedulerHandle {
108        &self.sched_handle
109    }
110
111    #[cfg(all(tokio_unstable, feature = "tracing"))]
112    pub(crate) fn driver(&self) -> &crate::runtime::time::Handle {
113        self.sched_handle.driver().time()
114    }
115
116    #[cfg(all(tokio_unstable, feature = "tracing"))]
117    pub(crate) fn clock(&self) -> &crate::time::Clock {
118        self.sched_handle.driver().clock()
119    }
120}
121
122fn with_current_temp_local_context<F, R>(hdl: &SchedulerHandle, f: F) -> R
123where
124    F: FnOnce(Option<TempLocalContext<'_>>) -> R,
125{
126    #[cfg(not(feature = "rt"))]
127    {
128        let (_, _) = (hdl, f);
129        panic!("Tokio runtime is not enabled, cannot access the current wheel");
130    }
131
132    #[cfg(feature = "rt")]
133    {
134        use crate::runtime::context;
135
136        let is_same_rt =
137            context::with_current(|cur_hdl| cur_hdl.is_same_runtime(hdl)).unwrap_or_default();
138
139        if !is_same_rt {
140            // We don't want to create the timer in one runtime,
141            // but register it in a different runtime's timer wheel.
142            f(None)
143        } else {
144            context::with_scheduler(|maybe_cx| match maybe_cx {
145                Some(cx) => cx.with_time_temp_local_context(f),
146                None => f(None),
147            })
148        }
149    }
150}
151
152fn push_from_remote(sched_hdl: &SchedulerHandle, entry_hdl: EntryHandle) {
153    #[cfg(not(feature = "rt"))]
154    {
155        let (_, _) = (sched_hdl, entry_hdl);
156        panic!("Tokio runtime is not enabled, cannot access the current wheel");
157    }
158
159    #[cfg(feature = "rt")]
160    {
161        assert!(!sched_hdl.is_shutdown(), "{RUNTIME_SHUTTING_DOWN_ERROR}");
162        sched_hdl.push_remote_timer(entry_hdl);
163    }
164}
165
166fn deadline_to_tick(sched_hdl: &SchedulerHandle, deadline: Instant) -> u64 {
167    let time_hdl = sched_hdl.driver().time();
168    time_hdl.time_source().deadline_to_tick(deadline)
169}