Skip to main content

moduvex_runtime/executor/
scheduler.rs

1//! Per-core task queues: `LocalQueue` (ring buffer) + `GlobalQueue` (mutex deque).
2//!
3//! # Design
4//! - `LocalQueue` — fixed-capacity 256-slot ring buffer for LIFO local dequeuing.
5//!   Overflow tasks (when ring is full) are spilled to the `GlobalQueue`.
6//! - `GlobalQueue` — `Mutex<VecDeque<Arc<TaskHeader>>>` for cross-thread injection
7//!   and work-stealing. Also stores `Task` handles for executor ownership.
8//!
9//! Both queues operate on `Arc<TaskHeader>` for the waker path, and `Task` for
10//! the executor-ownership path. The distinction matters for drop semantics:
11//! - Wakers push `Arc<TaskHeader>` (no Future ownership).
12//! - Executor pops `Arc<TaskHeader>` and looks up its owned `Task` by pointer.
13//!
14//! For simplicity in the single-threaded executor, both queues store
15//! `Arc<TaskHeader>` and the executor maintains a separate slab of `Task` owners.
16
17use std::collections::VecDeque;
18use std::sync::{Arc, Mutex};
19
20use super::task::TaskHeader;
21
22// ── LocalQueue ────────────────────────────────────────────────────────────────
23
24/// Fixed-size ring-buffer local queue (256 slots).
25///
26/// Operates as a LIFO stack for cache-friendly reuse of recently-scheduled
27/// tasks. When full, `push` returns the overflow item for the caller to spill
28/// to the global queue.
29pub(crate) struct LocalQueue {
30    /// Ring buffer storage. `head` is the next pop index; `tail` is the next
31    /// push index. Full when `(tail - head) == CAPACITY`.
32    buf: Box<[Option<Arc<TaskHeader>>; CAPACITY]>,
33    head: usize,
34    tail: usize,
35}
36
37const CAPACITY: usize = 256;
38
39impl LocalQueue {
40    pub(crate) fn new() -> Self {
41        // SAFETY: Option<Arc<TaskHeader>> is safely zero-initialised as None
42        // via the MaybeUninit → assume_init pattern below.
43        let buf = {
44            // Box<[Option<Arc<TaskHeader>>; CAPACITY]> cannot be created with
45            // a const initialiser because Arc is not Copy. Use a vec-based
46            // approach instead.
47            let v: Vec<Option<Arc<TaskHeader>>> = (0..CAPACITY).map(|_| None).collect();
48            // Convert Vec → Box<[_; CAPACITY]>.
49            let boxed_slice = v.into_boxed_slice();
50            // SAFETY: We constructed exactly CAPACITY elements above.
51            unsafe {
52                Box::from_raw(Box::into_raw(boxed_slice) as *mut [Option<Arc<TaskHeader>>; CAPACITY])
53            }
54        };
55        Self {
56            buf,
57            head: 0,
58            tail: 0,
59        }
60    }
61
62    /// Number of items currently held.
63    #[inline]
64    pub(crate) fn len(&self) -> usize {
65        self.tail.wrapping_sub(self.head)
66    }
67
68    /// `true` if the queue holds no items.
69    #[inline]
70    pub(crate) fn is_empty(&self) -> bool {
71        self.len() == 0
72    }
73
74    /// `true` when the ring buffer is at capacity.
75    #[inline]
76    fn is_full(&self) -> bool {
77        self.len() == CAPACITY
78    }
79
80    /// Push `header` onto the local queue.
81    ///
82    /// Returns `Some(header)` if the queue was full (caller must spill to
83    /// global), `None` on success.
84    pub(crate) fn push(&mut self, header: Arc<TaskHeader>) -> Option<Arc<TaskHeader>> {
85        if self.is_full() {
86            return Some(header);
87        }
88        let idx = self.tail % CAPACITY;
89        self.buf[idx] = Some(header);
90        self.tail = self.tail.wrapping_add(1);
91        None
92    }
93
94    /// Pop the most-recently-pushed item (LIFO).
95    pub(crate) fn pop(&mut self) -> Option<Arc<TaskHeader>> {
96        if self.is_empty() {
97            return None;
98        }
99        // Decrement tail for LIFO behaviour.
100        self.tail = self.tail.wrapping_sub(1);
101        let idx = self.tail % CAPACITY;
102        self.buf[idx].take()
103    }
104
105    /// Drain up to `count` items from the front (FIFO) into `dest`.
106    /// Used by the work-stealer to grab a batch.
107    pub(crate) fn drain_front(&mut self, dest: &mut Vec<Arc<TaskHeader>>, count: usize) {
108        let to_take = count.min(self.len());
109        for _ in 0..to_take {
110            let idx = self.head % CAPACITY;
111            if let Some(item) = self.buf[idx].take() {
112                dest.push(item);
113            }
114            self.head = self.head.wrapping_add(1);
115        }
116    }
117}
118
119// ── GlobalQueue ───────────────────────────────────────────────────────────────
120
121/// Cross-thread injection queue for wakers and stolen tasks.
122///
123/// Guarded by a `Mutex`; only accessed when the local queue is empty or a
124/// waker fires from another thread.
125pub(crate) struct GlobalQueue {
126    inner: Mutex<VecDeque<Arc<TaskHeader>>>,
127}
128
129impl GlobalQueue {
130    pub(crate) fn new() -> Self {
131        Self {
132            inner: Mutex::new(VecDeque::new()),
133        }
134    }
135
136    /// Append `header` to the back of the queue.
137    pub(crate) fn push_header(&self, header: Arc<TaskHeader>) {
138        // Unwrap: only fails if the mutex is poisoned, which is a programming error.
139        self.inner.lock().unwrap().push_back(header);
140    }
141
142    /// Remove and return the front item, or `None` if empty.
143    pub(crate) fn pop(&self) -> Option<Arc<TaskHeader>> {
144        self.inner.lock().unwrap().pop_front()
145    }
146
147    /// Steal up to half the queue's contents into `local`.
148    ///
149    /// Returns the number of tasks stolen.
150    pub(crate) fn steal_batch(&self, local: &mut LocalQueue) -> usize {
151        let mut guard = self.inner.lock().unwrap();
152        let count = (guard.len() / 2).max(1).min(guard.len());
153        let mut stolen = 0;
154        for _ in 0..count {
155            match guard.pop_front() {
156                Some(h) => {
157                    if local.push(h).is_none() {
158                        stolen += 1;
159                    }
160                    // If local overflows, stop stealing.
161                    else {
162                        break;
163                    }
164                }
165                None => break,
166            }
167        }
168        stolen
169    }
170
171    /// Number of items waiting in the global queue.
172    pub(crate) fn len(&self) -> usize {
173        self.inner.lock().unwrap().len()
174    }
175}
176
177// ── Tests ─────────────────────────────────────────────────────────────────────
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182    use crate::executor::task::Task;
183
184    fn make_header() -> Arc<TaskHeader> {
185        let (task, _jh) = Task::new(async { 0u32 });
186        Arc::clone(&task.header)
187        // task drops here but header Arc stays alive
188    }
189
190    // --- LocalQueue tests ---
191
192    #[test]
193    fn local_queue_push_pop_lifo() {
194        let mut q = LocalQueue::new();
195        let h1 = make_header();
196        let h2 = make_header();
197        let p1 = Arc::as_ptr(&h1);
198        let p2 = Arc::as_ptr(&h2);
199        assert!(q.push(h1).is_none());
200        assert!(q.push(h2).is_none());
201        // LIFO: last in, first out
202        assert_eq!(Arc::as_ptr(&q.pop().unwrap()), p2);
203        assert_eq!(Arc::as_ptr(&q.pop().unwrap()), p1);
204        assert!(q.pop().is_none());
205    }
206
207    #[test]
208    fn local_queue_overflow_returns_item() {
209        let mut q = LocalQueue::new();
210        // Fill to capacity
211        for _ in 0..CAPACITY {
212            assert!(q.push(make_header()).is_none());
213        }
214        assert!(q.is_full());
215        let overflow = q.push(make_header());
216        assert!(overflow.is_some(), "full queue must return overflow item");
217    }
218
219    #[test]
220    fn local_queue_drain_front() {
221        let mut q = LocalQueue::new();
222        for _ in 0..6 {
223            q.push(make_header());
224        }
225        let mut dest = Vec::new();
226        q.drain_front(&mut dest, 3);
227        assert_eq!(dest.len(), 3);
228        assert_eq!(q.len(), 3);
229    }
230
231    // --- GlobalQueue tests ---
232
233    #[test]
234    fn global_queue_push_pop() {
235        let gq = GlobalQueue::new();
236        let h = make_header();
237        let p = Arc::as_ptr(&h);
238        gq.push_header(h);
239        let popped = gq.pop().unwrap();
240        assert_eq!(Arc::as_ptr(&popped), p);
241        assert!(gq.pop().is_none());
242    }
243
244    #[test]
245    fn global_queue_steal_batch_half() {
246        let gq = GlobalQueue::new();
247        for _ in 0..8 {
248            gq.push_header(make_header());
249        }
250        let mut local = LocalQueue::new();
251        let stolen = gq.steal_batch(&mut local);
252        assert!(
253            (1..=4).contains(&stolen),
254            "should steal ~half: got {stolen}"
255        );
256        assert_eq!(local.len(), stolen);
257    }
258}