Skip to main content

moduvex_runtime/executor/
waker.rs

1//! Custom `RawWakerVTable` implementation.
2//!
3//! Each `Waker` holds an `Arc<TaskHeader>` cast to a raw `*const ()`.
4//! The four vtable functions implement the `RawWaker` contract:
5//!
6//! | function      | action                                              |
7//! |---------------|-----------------------------------------------------|
8//! | `clone_waker` | `Arc::clone` — increments refcount                 |
9//! | `wake`        | schedule task, consume (decrement) Arc              |
10//! | `wake_by_ref` | schedule task, keep Arc alive                       |
11//! | `drop_waker`  | `Arc::from_raw` then drop — decrements refcount     |
12//!
13//! Safety contract: the data pointer is always a valid `Arc<TaskHeader>` that
14//! was created via `Arc::into_raw`. All four functions restore it to an `Arc`
15//! before performing any operation, maintaining the reference count correctly.
16
17use std::sync::Arc;
18use std::task::{RawWaker, RawWakerVTable, Waker};
19
20use super::scheduler::GlobalQueue;
21use super::task::{TaskHeader, STATE_IDLE, STATE_SCHEDULED};
22
23use std::sync::atomic::Ordering;
24
25// ── Vtable ────────────────────────────────────────────────────────────────────
26
27/// The single static vtable shared by all task wakers.
28static TASK_WAKER_VTABLE: RawWakerVTable =
29    RawWakerVTable::new(clone_waker, wake, wake_by_ref, drop_waker);
30
31// ── Public entry point ────────────────────────────────────────────────────────
32
33/// Construct a `Waker` from an `Arc<TaskHeader>` and a reference to the
34/// global queue into which the waker will push the task when fired.
35///
36/// Ownership of the `Arc` is transferred into the waker (the Arc's refcount
37/// is incremented by the caller before passing, or the caller gives up their
38/// `Arc` — here we use `Arc::clone` to keep the caller's handle alive).
39pub(crate) fn make_waker(header: Arc<TaskHeader>, queue: Arc<GlobalQueue>) -> Waker {
40    // Combine header + queue into a single heap allocation so the data pointer
41    // carries both pieces of information needed by `wake`.
42    let data = Arc::new(WakerData { header, queue });
43    let ptr = Arc::into_raw(data) as *const ();
44    let raw = RawWaker::new(ptr, &TASK_WAKER_VTABLE);
45    // SAFETY: The vtable functions correctly implement the RawWaker contract
46    // (see module doc). `ptr` is a valid Arc pointer.
47    unsafe { Waker::from_raw(raw) }
48}
49
50// ── WakerData ─────────────────────────────────────────────────────────────────
51
52/// Heap allocation backing each `Waker`. Bundles the task header with the
53/// queue reference needed to reschedule the task.
54struct WakerData {
55    header: Arc<TaskHeader>,
56    queue: Arc<GlobalQueue>,
57}
58
59// SAFETY: WakerData contains only Send+Sync types.
60unsafe impl Send for WakerData {}
61unsafe impl Sync for WakerData {}
62
63// ── Vtable functions ──────────────────────────────────────────────────────────
64
65/// Reconstruct an `Arc<WakerData>` from a raw pointer WITHOUT consuming it,
66/// then immediately `forget` the Arc so the refcount is unchanged.
67///
68/// # Safety
69/// `ptr` must be a valid `Arc<WakerData>` pointer produced by `Arc::into_raw`.
70#[inline]
71unsafe fn data_ref(ptr: *const ()) -> std::mem::ManuallyDrop<Arc<WakerData>> {
72    // SAFETY: `ptr` is always `Arc::into_raw(Arc<WakerData>)`.
73    std::mem::ManuallyDrop::new(Arc::from_raw(ptr as *const WakerData))
74}
75
76unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
77    // SAFETY: `ptr` is a valid Arc<WakerData> pointer (contract of RawWaker).
78    let data = data_ref(ptr);
79    // Increment refcount by cloning, then leak the clone.
80    let cloned = Arc::clone(&*data);
81    let new_ptr = Arc::into_raw(cloned) as *const ();
82    RawWaker::new(new_ptr, &TASK_WAKER_VTABLE)
83}
84
85unsafe fn wake(ptr: *const ()) {
86    // SAFETY: `ptr` is `Arc::into_raw(Arc<WakerData>)`; consuming it here
87    // correctly decrements the refcount when `data` is dropped at end of fn.
88    let data = Arc::from_raw(ptr as *const WakerData);
89    schedule_task(&data);
90    // `data` drops here → Arc refcount decremented.
91}
92
93unsafe fn wake_by_ref(ptr: *const ()) {
94    // SAFETY: same pointer contract; we borrow without consuming.
95    let data = data_ref(ptr);
96    schedule_task(&data);
97    // ManuallyDrop — refcount unchanged.
98}
99
100unsafe fn drop_waker(ptr: *const ()) {
101    // SAFETY: Reconstruct and immediately drop to decrement Arc refcount.
102    drop(Arc::from_raw(ptr as *const WakerData));
103}
104
105// ── Scheduling helper ─────────────────────────────────────────────────────────
106
107/// Attempt to transition the task from IDLE → SCHEDULED and push it to the
108/// global queue. If the task is already SCHEDULED/RUNNING, skip (it will be
109/// re-polled automatically).
110fn schedule_task(data: &WakerData) {
111    let header = &data.header;
112    // Only transition IDLE → SCHEDULED. Other states:
113    //   SCHEDULED: already queued, nothing to do.
114    //   RUNNING:   executor holds it; it will check for re-schedule after poll.
115    //   COMPLETED/CANCELLED: done, ignore wake.
116    let prev = header.state.compare_exchange(
117        STATE_IDLE,
118        STATE_SCHEDULED,
119        Ordering::AcqRel,
120        Ordering::Relaxed,
121    );
122    if prev.is_ok() {
123        data.queue.push_header(Arc::clone(header));
124    }
125}
126
127// ── Tests ─────────────────────────────────────────────────────────────────────
128
129#[cfg(test)]
130mod tests {
131    use super::*;
132    use crate::executor::task::{Task, STATE_IDLE, STATE_SCHEDULED};
133    use std::sync::atomic::Ordering;
134
135    fn make_test_waker(task: &Task) -> (Waker, Arc<GlobalQueue>) {
136        let q = Arc::new(GlobalQueue::new());
137        let w = make_waker(Arc::clone(&task.header), Arc::clone(&q));
138        (w, q)
139    }
140
141    #[test]
142    fn waker_clone_increments_refcount() {
143        let (task, _jh) = Task::new(async { 1u32 });
144        task.header.state.store(STATE_IDLE, Ordering::Release);
145        let q = Arc::new(GlobalQueue::new());
146        let w1 = make_waker(Arc::clone(&task.header), Arc::clone(&q));
147        let w2 = w1.clone();
148        // Both wakers exist — refcount is at least 2 on top of task.header.
149        drop(w1);
150        drop(w2);
151        // No panic = correct refcount management.
152    }
153
154    #[test]
155    fn wake_by_ref_schedules_idle_task() {
156        let (task, _jh) = Task::new(async { 2u32 });
157        task.header.state.store(STATE_IDLE, Ordering::Release);
158        let (waker, queue) = make_test_waker(&task);
159        waker.wake_by_ref();
160        assert_eq!(task.header.state.load(Ordering::Acquire), STATE_SCHEDULED);
161        assert!(queue.pop().is_some());
162    }
163
164    #[test]
165    fn wake_consumes_and_schedules() {
166        let (task, _jh) = Task::new(async { 3u32 });
167        task.header.state.store(STATE_IDLE, Ordering::Release);
168        let (waker, queue) = make_test_waker(&task);
169        waker.wake(); // consumes the waker
170        assert_eq!(task.header.state.load(Ordering::Acquire), STATE_SCHEDULED);
171        assert!(queue.pop().is_some());
172    }
173
174    #[test]
175    fn wake_noop_when_already_scheduled() {
176        let (task, _jh) = Task::new(async { 4u32 });
177        task.header.state.store(STATE_SCHEDULED, Ordering::Release);
178        let (waker, queue) = make_test_waker(&task);
179        waker.wake_by_ref();
180        // State stays SCHEDULED, queue stays empty (CAS rejected).
181        assert_eq!(task.header.state.load(Ordering::Acquire), STATE_SCHEDULED);
182        assert!(queue.pop().is_none());
183    }
184}