moduvex_runtime/executor/waker.rs
1//! Custom `RawWakerVTable` implementation.
2//!
3//! Each `Waker` holds an `Arc<TaskHeader>` cast to a raw `*const ()`.
4//! The four vtable functions implement the `RawWaker` contract:
5//!
6//! | function | action |
7//! |---------------|-----------------------------------------------------|
8//! | `clone_waker` | `Arc::clone` — increments refcount |
9//! | `wake` | schedule task, consume (decrement) Arc |
10//! | `wake_by_ref` | schedule task, keep Arc alive |
11//! | `drop_waker` | `Arc::from_raw` then drop — decrements refcount |
12//!
13//! Safety contract: the data pointer is always a valid `Arc<TaskHeader>` that
14//! was created via `Arc::into_raw`. All four functions restore it to an `Arc`
15//! before performing any operation, maintaining the reference count correctly.
16
17use std::sync::Arc;
18use std::task::{RawWaker, RawWakerVTable, Waker};
19
20use super::scheduler::GlobalQueue;
21use super::task::{TaskHeader, STATE_IDLE, STATE_SCHEDULED};
22
23use std::sync::atomic::Ordering;
24
25// ── Vtable ────────────────────────────────────────────────────────────────────
26
27/// The single static vtable shared by all task wakers.
28static TASK_WAKER_VTABLE: RawWakerVTable =
29 RawWakerVTable::new(clone_waker, wake, wake_by_ref, drop_waker);
30
31// ── Public entry point ────────────────────────────────────────────────────────
32
33/// Construct a `Waker` from an `Arc<TaskHeader>` and a reference to the
34/// global queue into which the waker will push the task when fired.
35///
36/// Ownership of the `Arc` is transferred into the waker (the Arc's refcount
37/// is incremented by the caller before passing, or the caller gives up their
38/// `Arc` — here we use `Arc::clone` to keep the caller's handle alive).
39pub(crate) fn make_waker(header: Arc<TaskHeader>, queue: Arc<GlobalQueue>) -> Waker {
40 // Combine header + queue into a single heap allocation so the data pointer
41 // carries both pieces of information needed by `wake`.
42 let data = Arc::new(WakerData { header, queue });
43 let ptr = Arc::into_raw(data) as *const ();
44 let raw = RawWaker::new(ptr, &TASK_WAKER_VTABLE);
45 // SAFETY: The vtable functions correctly implement the RawWaker contract
46 // (see module doc). `ptr` is a valid Arc pointer.
47 unsafe { Waker::from_raw(raw) }
48}
49
50// ── WakerData ─────────────────────────────────────────────────────────────────
51
52/// Heap allocation backing each `Waker`. Bundles the task header with the
53/// queue reference needed to reschedule the task.
54struct WakerData {
55 header: Arc<TaskHeader>,
56 queue: Arc<GlobalQueue>,
57}
58
59// SAFETY: WakerData contains only Send+Sync types.
60unsafe impl Send for WakerData {}
61unsafe impl Sync for WakerData {}
62
63// ── Vtable functions ──────────────────────────────────────────────────────────
64
65/// Reconstruct an `Arc<WakerData>` from a raw pointer WITHOUT consuming it,
66/// then immediately `forget` the Arc so the refcount is unchanged.
67///
68/// # Safety
69/// `ptr` must be a valid `Arc<WakerData>` pointer produced by `Arc::into_raw`.
70#[inline]
71unsafe fn data_ref(ptr: *const ()) -> std::mem::ManuallyDrop<Arc<WakerData>> {
72 // SAFETY: `ptr` is always `Arc::into_raw(Arc<WakerData>)`.
73 std::mem::ManuallyDrop::new(Arc::from_raw(ptr as *const WakerData))
74}
75
76unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
77 // SAFETY: `ptr` is a valid Arc<WakerData> pointer (contract of RawWaker).
78 let data = data_ref(ptr);
79 // Increment refcount by cloning, then leak the clone.
80 let cloned = Arc::clone(&*data);
81 let new_ptr = Arc::into_raw(cloned) as *const ();
82 RawWaker::new(new_ptr, &TASK_WAKER_VTABLE)
83}
84
85unsafe fn wake(ptr: *const ()) {
86 // SAFETY: `ptr` is `Arc::into_raw(Arc<WakerData>)`; consuming it here
87 // correctly decrements the refcount when `data` is dropped at end of fn.
88 let data = Arc::from_raw(ptr as *const WakerData);
89 schedule_task(&data);
90 // `data` drops here → Arc refcount decremented.
91}
92
93unsafe fn wake_by_ref(ptr: *const ()) {
94 // SAFETY: same pointer contract; we borrow without consuming.
95 let data = data_ref(ptr);
96 schedule_task(&data);
97 // ManuallyDrop — refcount unchanged.
98}
99
100unsafe fn drop_waker(ptr: *const ()) {
101 // SAFETY: Reconstruct and immediately drop to decrement Arc refcount.
102 drop(Arc::from_raw(ptr as *const WakerData));
103}
104
105// ── Scheduling helper ─────────────────────────────────────────────────────────
106
107/// Attempt to transition the task from IDLE → SCHEDULED and push it to the
108/// global queue. If the task is already SCHEDULED/RUNNING, skip (it will be
109/// re-polled automatically).
110fn schedule_task(data: &WakerData) {
111 let header = &data.header;
112 // Only transition IDLE → SCHEDULED. Other states:
113 // SCHEDULED: already queued, nothing to do.
114 // RUNNING: executor holds it; it will check for re-schedule after poll.
115 // COMPLETED/CANCELLED: done, ignore wake.
116 let prev = header.state.compare_exchange(
117 STATE_IDLE,
118 STATE_SCHEDULED,
119 Ordering::AcqRel,
120 Ordering::Relaxed,
121 );
122 if prev.is_ok() {
123 data.queue.push_header(Arc::clone(header));
124 }
125}
126
127// ── Tests ─────────────────────────────────────────────────────────────────────
128
129#[cfg(test)]
130mod tests {
131 use super::*;
132 use crate::executor::task::{Task, STATE_IDLE, STATE_SCHEDULED};
133 use std::sync::atomic::Ordering;
134
135 fn make_test_waker(task: &Task) -> (Waker, Arc<GlobalQueue>) {
136 let q = Arc::new(GlobalQueue::new());
137 let w = make_waker(Arc::clone(&task.header), Arc::clone(&q));
138 (w, q)
139 }
140
141 #[test]
142 fn waker_clone_increments_refcount() {
143 let (task, _jh) = Task::new(async { 1u32 });
144 task.header.state.store(STATE_IDLE, Ordering::Release);
145 let q = Arc::new(GlobalQueue::new());
146 let w1 = make_waker(Arc::clone(&task.header), Arc::clone(&q));
147 let w2 = w1.clone();
148 // Both wakers exist — refcount is at least 2 on top of task.header.
149 drop(w1);
150 drop(w2);
151 // No panic = correct refcount management.
152 }
153
154 #[test]
155 fn wake_by_ref_schedules_idle_task() {
156 let (task, _jh) = Task::new(async { 2u32 });
157 task.header.state.store(STATE_IDLE, Ordering::Release);
158 let (waker, queue) = make_test_waker(&task);
159 waker.wake_by_ref();
160 assert_eq!(task.header.state.load(Ordering::Acquire), STATE_SCHEDULED);
161 assert!(queue.pop().is_some());
162 }
163
164 #[test]
165 fn wake_consumes_and_schedules() {
166 let (task, _jh) = Task::new(async { 3u32 });
167 task.header.state.store(STATE_IDLE, Ordering::Release);
168 let (waker, queue) = make_test_waker(&task);
169 waker.wake(); // consumes the waker
170 assert_eq!(task.header.state.load(Ordering::Acquire), STATE_SCHEDULED);
171 assert!(queue.pop().is_some());
172 }
173
174 #[test]
175 fn wake_noop_when_already_scheduled() {
176 let (task, _jh) = Task::new(async { 4u32 });
177 task.header.state.store(STATE_SCHEDULED, Ordering::Release);
178 let (waker, queue) = make_test_waker(&task);
179 waker.wake_by_ref();
180 // State stays SCHEDULED, queue stays empty (CAS rejected).
181 assert_eq!(task.header.state.load(Ordering::Acquire), STATE_SCHEDULED);
182 assert!(queue.pop().is_none());
183 }
184}