Skip to main content

tokio/runtime/time_alt/
entry.rs

1use super::cancellation_queue::Sender;
2use crate::loom::sync::{Arc, Mutex};
3use crate::util::linked_list;
4
5use std::marker::PhantomPinned;
6use std::ptr::NonNull;
7use std::task::Waker;
8
9pub(super) type EntryList = linked_list::LinkedList<Entry, Entry>;
10
11#[derive(Debug)]
12struct State {
13    cancelled: bool,
14    woken_up: bool,
15    waker: Option<Waker>,
16    cancel_tx: Option<Sender>,
17}
18
19#[derive(Debug)]
20pub(crate) struct Entry {
21    /// The intrusive pointer used by [`super::cancellation_queue`].
22    cancel_pointers: linked_list::Pointers<Entry>,
23
24    /// The intrusive pointer used by any of the following queues:
25    ///
26    /// - [`Wheel`]
27    /// - [`RegistrationQueue`]
28    /// - [`WakeQueue`]
29    ///
30    /// We can guarantee that this pointer is only used by one of the above
31    /// at any given time. See below for the journey of this pointer.
32    ///
33    /// Initially, this pointer is used by the [`RegistrationQueue`].
34    ///
35    /// And then, before parking the resource driver,
36    /// the scheduler removes the entry from the [`RegistrationQueue`]
37    /// [`RegistrationQueue`] and insert it into the [`Wheel`].
38    ///
39    /// Finally, after parking the resource driver, the scheduler removes
40    /// the entry from the [`Wheel`] and insert it into the [`WakeQueue`].
41    ///
42    /// [`RegistrationQueue`]: super::RegistrationQueue
43    /// [`Wheel`]: super::Wheel
44    /// [`WakeQueue`]: super::WakeQueue
45    extra_pointers: linked_list::Pointers<Entry>,
46
47    /// The tick when this entry is scheduled to expire.
48    deadline: u64,
49
50    state: Mutex<State>,
51
52    /// Make the type `!Unpin` to prevent LLVM from emitting
53    /// the `noalias` attribute for mutable references.
54    ///
55    /// See <https://github.com/rust-lang/rust/pull/82834>.
56    _pin: PhantomPinned,
57}
58
59// Safety: `Entry` is always in an `Arc`.
60unsafe impl linked_list::Link for Entry {
61    type Handle = Handle;
62    type Target = Entry;
63
64    fn as_raw(hdl: &Self::Handle) -> NonNull<Self::Target> {
65        unsafe { NonNull::new_unchecked(Arc::as_ptr(&hdl.entry).cast_mut()) }
66    }
67
68    unsafe fn from_raw(ptr: NonNull<Self::Target>) -> Self::Handle {
69        Handle {
70            entry: unsafe { Arc::from_raw(ptr.as_ptr()) },
71        }
72    }
73
74    unsafe fn pointers(
75        target: NonNull<Self::Target>,
76    ) -> NonNull<linked_list::Pointers<Self::Target>> {
77        let this = target.as_ptr();
78        let field = unsafe { std::ptr::addr_of_mut!((*this).extra_pointers) };
79        unsafe { NonNull::new_unchecked(field) }
80    }
81}
82
83/// An ZST to allow [`super::registration_queue`] to utilize the [`Entry::extra_pointers`]
84/// by impl [`linked_list::Link`] as we cannot impl it on [`Entry`]
85/// directly due to the conflicting implementations.
86///
87/// This type should never be constructed.
88pub(super) struct RegistrationQueueEntry;
89
90// Safety: `Entry` is always in an `Arc`.
91unsafe impl linked_list::Link for RegistrationQueueEntry {
92    type Handle = Handle;
93    type Target = Entry;
94
95    fn as_raw(hdl: &Self::Handle) -> NonNull<Self::Target> {
96        unsafe { NonNull::new_unchecked(Arc::as_ptr(&hdl.entry).cast_mut()) }
97    }
98
99    unsafe fn from_raw(ptr: NonNull<Self::Target>) -> Self::Handle {
100        Handle {
101            entry: unsafe { Arc::from_raw(ptr.as_ptr()) },
102        }
103    }
104
105    unsafe fn pointers(
106        target: NonNull<Self::Target>,
107    ) -> NonNull<linked_list::Pointers<Self::Target>> {
108        let this = target.as_ptr();
109        let field = unsafe { std::ptr::addr_of_mut!((*this).extra_pointers) };
110        unsafe { NonNull::new_unchecked(field) }
111    }
112}
113
114/// An ZST to allow [`super::cancellation_queue`] to utilize the [`Entry::cancel_pointers`]
115/// by impl [`linked_list::Link`] as we cannot impl it on [`Entry`]
116/// directly due to the conflicting implementations.
117///
118/// This type should never be constructed.
119pub(super) struct CancellationQueueEntry;
120
121// Safety: `Entry` is always in an `Arc`.
122unsafe impl linked_list::Link for CancellationQueueEntry {
123    type Handle = Handle;
124    type Target = Entry;
125
126    fn as_raw(hdl: &Self::Handle) -> NonNull<Self::Target> {
127        unsafe { NonNull::new_unchecked(Arc::as_ptr(&hdl.entry).cast_mut()) }
128    }
129
130    unsafe fn from_raw(ptr: NonNull<Self::Target>) -> Self::Handle {
131        Handle {
132            entry: unsafe { Arc::from_raw(ptr.as_ptr()) },
133        }
134    }
135
136    unsafe fn pointers(
137        target: NonNull<Self::Target>,
138    ) -> NonNull<linked_list::Pointers<Self::Target>> {
139        let this = target.as_ptr();
140        let field = unsafe { std::ptr::addr_of_mut!((*this).cancel_pointers) };
141        unsafe { NonNull::new_unchecked(field) }
142    }
143}
144
145/// An ZST to allow [`super::WakeQueue`] to utilize the [`Entry::extra_pointers`]
146/// by impl [`linked_list::Link`] as we cannot impl it on [`Entry`]
147/// directly due to the conflicting implementations.
148///
149/// This type should never be constructed.
150pub(super) struct WakeQueueEntry;
151
152// Safety: `Entry` is always in an `Arc`.
153unsafe impl linked_list::Link for WakeQueueEntry {
154    type Handle = Handle;
155    type Target = Entry;
156
157    fn as_raw(hdl: &Self::Handle) -> NonNull<Self::Target> {
158        unsafe { NonNull::new_unchecked(Arc::as_ptr(&hdl.entry).cast_mut()) }
159    }
160
161    unsafe fn from_raw(ptr: NonNull<Self::Target>) -> Self::Handle {
162        Handle {
163            entry: unsafe { Arc::from_raw(ptr.as_ptr()) },
164        }
165    }
166
167    unsafe fn pointers(
168        target: NonNull<Self::Target>,
169    ) -> NonNull<linked_list::Pointers<Self::Target>> {
170        let this = target.as_ptr();
171        let field = unsafe { std::ptr::addr_of_mut!((*this).extra_pointers) };
172        unsafe { NonNull::new_unchecked(field) }
173    }
174}
175
176#[derive(Debug, Clone)]
177pub(crate) struct Handle {
178    pub(crate) entry: Arc<Entry>,
179}
180
181impl From<&Handle> for NonNull<Entry> {
182    fn from(hdl: &Handle) -> Self {
183        // Safety: entry is in an `Arc`, so the pointer is valid.
184        unsafe { NonNull::new_unchecked(Arc::as_ptr(&hdl.entry) as *mut Entry) }
185    }
186}
187
188impl Handle {
189    pub(crate) fn new(deadline: u64, waker: Waker) -> Self {
190        let state = State {
191            cancelled: false,
192            woken_up: false,
193            waker: Some(waker),
194            cancel_tx: None,
195        };
196
197        let entry = Arc::new(Entry {
198            cancel_pointers: linked_list::Pointers::new(),
199            extra_pointers: linked_list::Pointers::new(),
200            deadline,
201            state: Mutex::new(state),
202            _pin: PhantomPinned,
203        });
204
205        Handle { entry }
206    }
207
208    /// Wake the entry if it is already in the pending queue of the timer wheel.
209    pub(crate) fn wake(&self) {
210        let mut lock = self.entry.state.lock();
211
212        if !lock.cancelled {
213            lock.woken_up = true;
214            if let Some(waker) = lock.waker.take() {
215                // unlock before calling waker
216                drop(lock);
217                waker.wake();
218            }
219        }
220    }
221
222    pub(crate) fn register_cancel_tx(&self, cancel_tx: Sender) {
223        let mut lock = self.entry.state.lock();
224        if !lock.cancelled && !lock.woken_up {
225            let old_tx = lock.cancel_tx.replace(cancel_tx);
226            // don't unlock — poisoning the `Mutex` stops others from using the bad state.
227            assert!(old_tx.is_none(), "cancel_tx is already registered");
228        }
229    }
230
231    pub(crate) fn register_waker(&self, waker: Waker) {
232        let mut lock = self.entry.state.lock();
233        if !lock.cancelled && !lock.woken_up {
234            let maybe_old_waker = lock.waker.replace(waker);
235            // unlock before calling waker
236            drop(lock);
237            drop(maybe_old_waker);
238        }
239    }
240
241    pub(crate) fn cancel(&self) {
242        let mut lock = self.entry.state.lock();
243        if !lock.cancelled {
244            lock.cancelled = true;
245            if let Some(cancel_tx) = lock.cancel_tx.take() {
246                drop(lock);
247
248                // Safety: we can guarantee that `self` is not in any cancellation queue
249                // because the `self.cancelled` was just set to `true`.
250                unsafe {
251                    cancel_tx.send(self.clone());
252                }
253            }
254        }
255    }
256
257    pub(crate) fn deadline(&self) -> u64 {
258        self.entry.deadline
259    }
260
261    pub(crate) fn is_woken_up(&self) -> bool {
262        let lock = self.entry.state.lock();
263        lock.woken_up
264    }
265
266    pub(crate) fn is_cancelled(&self) -> bool {
267        let lock = self.entry.state.lock();
268        lock.cancelled
269    }
270
271    #[cfg(test)]
272    /// Only used for unit tests.
273    pub(crate) fn inner_strong_count(&self) -> usize {
274        Arc::strong_count(&self.entry)
275    }
276}