Skip to main content

moduvex_runtime/executor/
scheduler.rs

1//! Per-core task queues: `LocalQueue` (ring buffer) + `GlobalQueue` (mutex deque).
2//!
3//! # Design
4//! - `LocalQueue` — fixed-capacity 256-slot ring buffer for LIFO local dequeuing.
5//!   Overflow tasks (when ring is full) are spilled to the `GlobalQueue`.
6//! - `GlobalQueue` — `Mutex<VecDeque<Arc<TaskHeader>>>` for cross-thread injection
7//!   and work-stealing. Also stores `Task` handles for executor ownership.
8//!
9//! Both queues operate on `Arc<TaskHeader>` for the waker path, and `Task` for
10//! the executor-ownership path. The distinction matters for drop semantics:
11//! - Wakers push `Arc<TaskHeader>` (no Future ownership).
12//! - Executor pops `Arc<TaskHeader>` and looks up its owned `Task` by pointer.
13//!
14//! For simplicity in the single-threaded executor, both queues store
15//! `Arc<TaskHeader>` and the executor maintains a separate slab of `Task` owners.
16
17use std::collections::VecDeque;
18use std::sync::{Arc, Mutex};
19
20use super::task::TaskHeader;
21
22// ── LocalQueue ────────────────────────────────────────────────────────────────
23
24/// Fixed-size ring-buffer local queue (256 slots).
25///
26/// Operates as a LIFO stack for cache-friendly reuse of recently-scheduled
27/// tasks. When full, `push` returns the overflow item for the caller to spill
28/// to the global queue.
29pub(crate) struct LocalQueue {
30    /// Ring buffer storage. `head` is the next pop index; `tail` is the next
31    /// push index. Full when `(tail - head) == CAPACITY`.
32    buf: Box<[Option<Arc<TaskHeader>>; CAPACITY]>,
33    head: usize,
34    tail: usize,
35}
36
37const CAPACITY: usize = 256;
38
39impl LocalQueue {
40    pub(crate) fn new() -> Self {
41        // SAFETY: Option<Arc<TaskHeader>> is safely zero-initialised as None
42        // via the MaybeUninit → assume_init pattern below.
43        let buf = {
44            // Box<[Option<Arc<TaskHeader>>; CAPACITY]> cannot be created with
45            // a const initialiser because Arc is not Copy. Use a vec-based
46            // approach instead.
47            let v: Vec<Option<Arc<TaskHeader>>> = (0..CAPACITY).map(|_| None).collect();
48            // Convert Vec → Box<[_; CAPACITY]>.
49            let boxed_slice = v.into_boxed_slice();
50            // SAFETY: We constructed exactly CAPACITY elements above.
51            unsafe {
52                Box::from_raw(Box::into_raw(boxed_slice) as *mut [Option<Arc<TaskHeader>>; CAPACITY])
53            }
54        };
55        Self {
56            buf,
57            head: 0,
58            tail: 0,
59        }
60    }
61
62    /// Number of items currently held.
63    #[inline]
64    pub(crate) fn len(&self) -> usize {
65        self.tail.wrapping_sub(self.head)
66    }
67
68    /// `true` if the queue holds no items.
69    #[inline]
70    pub(crate) fn is_empty(&self) -> bool {
71        self.len() == 0
72    }
73
74    /// `true` when the ring buffer is at capacity.
75    #[inline]
76    fn is_full(&self) -> bool {
77        self.len() == CAPACITY
78    }
79
80    /// Push `header` onto the local queue.
81    ///
82    /// Returns `Some(header)` if the queue was full (caller must spill to
83    /// global), `None` on success.
84    pub(crate) fn push(&mut self, header: Arc<TaskHeader>) -> Option<Arc<TaskHeader>> {
85        if self.is_full() {
86            return Some(header);
87        }
88        let idx = self.tail % CAPACITY;
89        self.buf[idx] = Some(header);
90        self.tail = self.tail.wrapping_add(1);
91        None
92    }
93
94    /// Pop the most-recently-pushed item (LIFO).
95    pub(crate) fn pop(&mut self) -> Option<Arc<TaskHeader>> {
96        if self.is_empty() {
97            return None;
98        }
99        // Decrement tail for LIFO behaviour.
100        self.tail = self.tail.wrapping_sub(1);
101        let idx = self.tail % CAPACITY;
102        self.buf[idx].take()
103    }
104
105    /// Drain up to `count` items from the front (FIFO) into `dest`.
106    /// Used by the work-stealer to grab a batch.
107    pub(crate) fn drain_front(&mut self, dest: &mut Vec<Arc<TaskHeader>>, count: usize) {
108        let to_take = count.min(self.len());
109        for _ in 0..to_take {
110            let idx = self.head % CAPACITY;
111            if let Some(item) = self.buf[idx].take() {
112                dest.push(item);
113            }
114            self.head = self.head.wrapping_add(1);
115        }
116    }
117}
118
119// ── GlobalQueue ───────────────────────────────────────────────────────────────
120
121/// Cross-thread injection queue for wakers and stolen tasks.
122///
123/// Guarded by a `Mutex`; only accessed when the local queue is empty or a
124/// waker fires from another thread.
125pub(crate) struct GlobalQueue {
126    inner: Mutex<VecDeque<Arc<TaskHeader>>>,
127}
128
129impl GlobalQueue {
130    pub(crate) fn new() -> Self {
131        Self {
132            inner: Mutex::new(VecDeque::new()),
133        }
134    }
135
136    /// Append `header` to the back of the queue.
137    pub(crate) fn push_header(&self, header: Arc<TaskHeader>) {
138        // Unwrap: only fails if the mutex is poisoned, which is a programming error.
139        self.inner.lock().unwrap().push_back(header);
140    }
141
142    /// Remove and return the front item, or `None` if empty.
143    pub(crate) fn pop(&self) -> Option<Arc<TaskHeader>> {
144        self.inner.lock().unwrap().pop_front()
145    }
146
147    /// Steal up to half the queue's contents into `local`.
148    ///
149    /// Returns the number of tasks stolen.
150    pub(crate) fn steal_batch(&self, local: &mut LocalQueue) -> usize {
151        let mut guard = self.inner.lock().unwrap();
152        let count = (guard.len() / 2).max(1).min(guard.len());
153        let mut stolen = 0;
154        for _ in 0..count {
155            match guard.pop_front() {
156                Some(h) => {
157                    if local.push(h).is_none() {
158                        stolen += 1;
159                    }
160                    // If local overflows, stop stealing.
161                    else {
162                        break;
163                    }
164                }
165                None => break,
166            }
167        }
168        stolen
169    }
170
171    /// Number of items waiting in the global queue.
172    pub(crate) fn len(&self) -> usize {
173        self.inner.lock().unwrap().len()
174    }
175}
176
177// ── Tests ─────────────────────────────────────────────────────────────────────
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182    use crate::executor::task::Task;
183
184    fn make_header() -> Arc<TaskHeader> {
185        let (task, _jh) = Task::new(async { 0u32 });
186        Arc::clone(&task.header)
187        // task drops here but header Arc stays alive
188    }
189
190    // --- LocalQueue tests ---
191
192    #[test]
193    fn local_queue_push_pop_lifo() {
194        let mut q = LocalQueue::new();
195        let h1 = make_header();
196        let h2 = make_header();
197        let p1 = Arc::as_ptr(&h1);
198        let p2 = Arc::as_ptr(&h2);
199        assert!(q.push(h1).is_none());
200        assert!(q.push(h2).is_none());
201        // LIFO: last in, first out
202        assert_eq!(Arc::as_ptr(&q.pop().unwrap()), p2);
203        assert_eq!(Arc::as_ptr(&q.pop().unwrap()), p1);
204        assert!(q.pop().is_none());
205    }
206
207    #[test]
208    fn local_queue_overflow_returns_item() {
209        let mut q = LocalQueue::new();
210        // Fill to capacity
211        for _ in 0..CAPACITY {
212            assert!(q.push(make_header()).is_none());
213        }
214        assert!(q.is_full());
215        let overflow = q.push(make_header());
216        assert!(overflow.is_some(), "full queue must return overflow item");
217    }
218
219    #[test]
220    fn local_queue_drain_front() {
221        let mut q = LocalQueue::new();
222        for _ in 0..6 {
223            q.push(make_header());
224        }
225        let mut dest = Vec::new();
226        q.drain_front(&mut dest, 3);
227        assert_eq!(dest.len(), 3);
228        assert_eq!(q.len(), 3);
229    }
230
231    // --- GlobalQueue tests ---
232
233    #[test]
234    fn global_queue_push_pop() {
235        let gq = GlobalQueue::new();
236        let h = make_header();
237        let p = Arc::as_ptr(&h);
238        gq.push_header(h);
239        let popped = gq.pop().unwrap();
240        assert_eq!(Arc::as_ptr(&popped), p);
241        assert!(gq.pop().is_none());
242    }
243
244    #[test]
245    fn global_queue_steal_batch_half() {
246        let gq = GlobalQueue::new();
247        for _ in 0..8 {
248            gq.push_header(make_header());
249        }
250        let mut local = LocalQueue::new();
251        let stolen = gq.steal_batch(&mut local);
252        assert!(
253            (1..=4).contains(&stolen),
254            "should steal ~half: got {stolen}"
255        );
256        assert_eq!(local.len(), stolen);
257    }
258
259    // ── Additional scheduler tests ─────────────────────────────────────────
260
261    #[test]
262    fn local_queue_empty_on_new() {
263        let q = LocalQueue::new();
264        assert!(q.is_empty());
265        assert_eq!(q.len(), 0);
266    }
267
268    #[test]
269    fn local_queue_pop_empty_returns_none() {
270        let mut q = LocalQueue::new();
271        assert!(q.pop().is_none());
272    }
273
274    #[test]
275    fn local_queue_len_increments_on_push() {
276        let mut q = LocalQueue::new();
277        for i in 0..5 {
278            assert_eq!(q.len(), i);
279            assert!(q.push(make_header()).is_none());
280            assert_eq!(q.len(), i + 1);
281        }
282    }
283
284    #[test]
285    fn local_queue_drain_front_empty_is_noop() {
286        let mut q = LocalQueue::new();
287        let mut dest = Vec::new();
288        q.drain_front(&mut dest, 10);
289        assert!(dest.is_empty());
290    }
291
292    #[test]
293    fn local_queue_drain_front_more_than_len_drains_all() {
294        let mut q = LocalQueue::new();
295        for _ in 0..3 {
296            q.push(make_header());
297        }
298        let mut dest = Vec::new();
299        q.drain_front(&mut dest, 100);
300        assert_eq!(dest.len(), 3);
301        assert_eq!(q.len(), 0);
302    }
303
304    #[test]
305    fn global_queue_empty_pop_returns_none() {
306        let gq = GlobalQueue::new();
307        assert!(gq.pop().is_none());
308    }
309
310    #[test]
311    fn global_queue_len_tracks_count() {
312        let gq = GlobalQueue::new();
313        assert_eq!(gq.len(), 0);
314        gq.push_header(make_header());
315        assert_eq!(gq.len(), 1);
316        gq.push_header(make_header());
317        assert_eq!(gq.len(), 2);
318        let _ = gq.pop();
319        assert_eq!(gq.len(), 1);
320    }
321
322    #[test]
323    fn global_queue_fifo_ordering() {
324        let gq = GlobalQueue::new();
325        let h1 = make_header();
326        let h2 = make_header();
327        let p1 = Arc::as_ptr(&h1);
328        let p2 = Arc::as_ptr(&h2);
329        gq.push_header(h1);
330        gq.push_header(h2);
331        // FIFO: first in, first out
332        assert_eq!(Arc::as_ptr(&gq.pop().unwrap()), p1);
333        assert_eq!(Arc::as_ptr(&gq.pop().unwrap()), p2);
334    }
335
336    #[test]
337    fn global_queue_steal_batch_single_item_returns_one() {
338        let gq = GlobalQueue::new();
339        gq.push_header(make_header());
340        let mut local = LocalQueue::new();
341        let stolen = gq.steal_batch(&mut local);
342        assert_eq!(stolen, 1);
343        assert_eq!(gq.len(), 0);
344    }
345
346    #[test]
347    fn global_queue_steal_batch_empty_returns_zero() {
348        let gq = GlobalQueue::new();
349        let mut local = LocalQueue::new();
350        let stolen = gq.steal_batch(&mut local);
351        assert_eq!(stolen, 0);
352    }
353
354    #[test]
355    fn local_queue_push_many_pop_all() {
356        let mut q = LocalQueue::new();
357        for _ in 0..10 {
358            q.push(make_header());
359        }
360        assert_eq!(q.len(), 10);
361        let mut count = 0;
362        while q.pop().is_some() {
363            count += 1;
364        }
365        assert_eq!(count, 10);
366        assert!(q.is_empty());
367    }
368
369    #[test]
370    fn global_queue_push_many_pop_in_fifo_order() {
371        let gq = GlobalQueue::new();
372        let mut ptrs = Vec::new();
373        for _ in 0..5 {
374            let h = make_header();
375            ptrs.push(Arc::as_ptr(&h));
376            gq.push_header(h);
377        }
378        for ptr in ptrs {
379            let popped = gq.pop().unwrap();
380            assert_eq!(Arc::as_ptr(&popped), ptr);
381        }
382        assert!(gq.pop().is_none());
383    }
384
385    #[test]
386    fn local_queue_interleaved_push_pop() {
387        let mut q = LocalQueue::new();
388        q.push(make_header());
389        q.push(make_header());
390        q.pop();
391        assert_eq!(q.len(), 1);
392        q.push(make_header());
393        q.push(make_header());
394        assert_eq!(q.len(), 3);
395    }
396
397    #[test]
398    fn global_queue_steal_batch_10_items_steals_at_least_1() {
399        let gq = GlobalQueue::new();
400        for _ in 0..10 {
401            gq.push_header(make_header());
402        }
403        let mut local = LocalQueue::new();
404        let stolen = gq.steal_batch(&mut local);
405        assert!(stolen >= 1);
406        assert!(stolen <= 5); // at most half
407    }
408
409    #[test]
410    fn local_queue_is_not_empty_after_push() {
411        let mut q = LocalQueue::new();
412        assert!(q.is_empty());
413        q.push(make_header());
414        assert!(!q.is_empty());
415    }
416
417    #[test]
418    fn local_queue_push_then_pop_lifo_2_items() {
419        let mut q = LocalQueue::new();
420        let h1 = make_header();
421        let h2 = make_header();
422        let p1 = Arc::as_ptr(&h1);
423        let p2 = Arc::as_ptr(&h2);
424        q.push(h1);
425        q.push(h2);
426        // LIFO: second pushed is popped first
427        assert_eq!(Arc::as_ptr(&q.pop().unwrap()), p2);
428        assert_eq!(Arc::as_ptr(&q.pop().unwrap()), p1);
429    }
430
431    #[test]
432    fn global_queue_multiple_push_pop_cycles() {
433        let gq = GlobalQueue::new();
434        for _ in 0..3 {
435            gq.push_header(make_header());
436            gq.push_header(make_header());
437            gq.pop();
438        }
439        assert_eq!(gq.len(), 3);
440    }
441
442    #[test]
443    fn local_queue_drain_front_partial() {
444        let mut q = LocalQueue::new();
445        for _ in 0..10 {
446            q.push(make_header());
447        }
448        let mut dest = Vec::new();
449        q.drain_front(&mut dest, 4);
450        assert_eq!(dest.len(), 4);
451        assert_eq!(q.len(), 6);
452    }
453
454    #[test]
455    fn global_queue_steal_batch_large_queue() {
456        let gq = GlobalQueue::new();
457        for _ in 0..100 {
458            gq.push_header(make_header());
459        }
460        let mut local = LocalQueue::new();
461        let stolen = gq.steal_batch(&mut local);
462        assert!(stolen >= 1);
463        assert!(stolen <= 50);
464    }
465}