moduvex_runtime/executor/
scheduler.rs1use std::collections::VecDeque;
18use std::sync::{Arc, Mutex};
19
20use super::task::TaskHeader;
21
22pub(crate) struct LocalQueue {
30 buf: Box<[Option<Arc<TaskHeader>>; CAPACITY]>,
33 head: usize,
34 tail: usize,
35}
36
37const CAPACITY: usize = 256;
38
39impl LocalQueue {
40 pub(crate) fn new() -> Self {
41 let buf = {
44 let v: Vec<Option<Arc<TaskHeader>>> = (0..CAPACITY).map(|_| None).collect();
48 let boxed_slice = v.into_boxed_slice();
50 unsafe {
52 Box::from_raw(Box::into_raw(boxed_slice) as *mut [Option<Arc<TaskHeader>>; CAPACITY])
53 }
54 };
55 Self {
56 buf,
57 head: 0,
58 tail: 0,
59 }
60 }
61
62 #[inline]
64 pub(crate) fn len(&self) -> usize {
65 self.tail.wrapping_sub(self.head)
66 }
67
68 #[inline]
70 pub(crate) fn is_empty(&self) -> bool {
71 self.len() == 0
72 }
73
74 #[inline]
76 fn is_full(&self) -> bool {
77 self.len() == CAPACITY
78 }
79
80 pub(crate) fn push(&mut self, header: Arc<TaskHeader>) -> Option<Arc<TaskHeader>> {
85 if self.is_full() {
86 return Some(header);
87 }
88 let idx = self.tail % CAPACITY;
89 self.buf[idx] = Some(header);
90 self.tail = self.tail.wrapping_add(1);
91 None
92 }
93
94 pub(crate) fn pop(&mut self) -> Option<Arc<TaskHeader>> {
96 if self.is_empty() {
97 return None;
98 }
99 self.tail = self.tail.wrapping_sub(1);
101 let idx = self.tail % CAPACITY;
102 self.buf[idx].take()
103 }
104
105 pub(crate) fn drain_front(&mut self, dest: &mut Vec<Arc<TaskHeader>>, count: usize) {
108 let to_take = count.min(self.len());
109 for _ in 0..to_take {
110 let idx = self.head % CAPACITY;
111 if let Some(item) = self.buf[idx].take() {
112 dest.push(item);
113 }
114 self.head = self.head.wrapping_add(1);
115 }
116 }
117}
118
119pub(crate) struct GlobalQueue {
126 inner: Mutex<VecDeque<Arc<TaskHeader>>>,
127}
128
129impl GlobalQueue {
130 pub(crate) fn new() -> Self {
131 Self {
132 inner: Mutex::new(VecDeque::new()),
133 }
134 }
135
136 pub(crate) fn push_header(&self, header: Arc<TaskHeader>) {
138 self.inner.lock().unwrap().push_back(header);
140 }
141
142 pub(crate) fn pop(&self) -> Option<Arc<TaskHeader>> {
144 self.inner.lock().unwrap().pop_front()
145 }
146
147 pub(crate) fn steal_batch(&self, local: &mut LocalQueue) -> usize {
151 let mut guard = self.inner.lock().unwrap();
152 let count = (guard.len() / 2).max(1).min(guard.len());
153 let mut stolen = 0;
154 for _ in 0..count {
155 match guard.pop_front() {
156 Some(h) => {
157 if local.push(h).is_none() {
158 stolen += 1;
159 }
160 else {
162 break;
163 }
164 }
165 None => break,
166 }
167 }
168 stolen
169 }
170
171 pub(crate) fn len(&self) -> usize {
173 self.inner.lock().unwrap().len()
174 }
175}
176
177#[cfg(test)]
180mod tests {
181 use super::*;
182 use crate::executor::task::Task;
183
184 fn make_header() -> Arc<TaskHeader> {
185 let (task, _jh) = Task::new(async { 0u32 });
186 Arc::clone(&task.header)
187 }
189
190 #[test]
193 fn local_queue_push_pop_lifo() {
194 let mut q = LocalQueue::new();
195 let h1 = make_header();
196 let h2 = make_header();
197 let p1 = Arc::as_ptr(&h1);
198 let p2 = Arc::as_ptr(&h2);
199 assert!(q.push(h1).is_none());
200 assert!(q.push(h2).is_none());
201 assert_eq!(Arc::as_ptr(&q.pop().unwrap()), p2);
203 assert_eq!(Arc::as_ptr(&q.pop().unwrap()), p1);
204 assert!(q.pop().is_none());
205 }
206
207 #[test]
208 fn local_queue_overflow_returns_item() {
209 let mut q = LocalQueue::new();
210 for _ in 0..CAPACITY {
212 assert!(q.push(make_header()).is_none());
213 }
214 assert!(q.is_full());
215 let overflow = q.push(make_header());
216 assert!(overflow.is_some(), "full queue must return overflow item");
217 }
218
219 #[test]
220 fn local_queue_drain_front() {
221 let mut q = LocalQueue::new();
222 for _ in 0..6 {
223 q.push(make_header());
224 }
225 let mut dest = Vec::new();
226 q.drain_front(&mut dest, 3);
227 assert_eq!(dest.len(), 3);
228 assert_eq!(q.len(), 3);
229 }
230
231 #[test]
234 fn global_queue_push_pop() {
235 let gq = GlobalQueue::new();
236 let h = make_header();
237 let p = Arc::as_ptr(&h);
238 gq.push_header(h);
239 let popped = gq.pop().unwrap();
240 assert_eq!(Arc::as_ptr(&popped), p);
241 assert!(gq.pop().is_none());
242 }
243
244 #[test]
245 fn global_queue_steal_batch_half() {
246 let gq = GlobalQueue::new();
247 for _ in 0..8 {
248 gq.push_header(make_header());
249 }
250 let mut local = LocalQueue::new();
251 let stolen = gq.steal_batch(&mut local);
252 assert!(
253 (1..=4).contains(&stolen),
254 "should steal ~half: got {stolen}"
255 );
256 assert_eq!(local.len(), stolen);
257 }
258
259 #[test]
262 fn local_queue_empty_on_new() {
263 let q = LocalQueue::new();
264 assert!(q.is_empty());
265 assert_eq!(q.len(), 0);
266 }
267
268 #[test]
269 fn local_queue_pop_empty_returns_none() {
270 let mut q = LocalQueue::new();
271 assert!(q.pop().is_none());
272 }
273
274 #[test]
275 fn local_queue_len_increments_on_push() {
276 let mut q = LocalQueue::new();
277 for i in 0..5 {
278 assert_eq!(q.len(), i);
279 assert!(q.push(make_header()).is_none());
280 assert_eq!(q.len(), i + 1);
281 }
282 }
283
284 #[test]
285 fn local_queue_drain_front_empty_is_noop() {
286 let mut q = LocalQueue::new();
287 let mut dest = Vec::new();
288 q.drain_front(&mut dest, 10);
289 assert!(dest.is_empty());
290 }
291
292 #[test]
293 fn local_queue_drain_front_more_than_len_drains_all() {
294 let mut q = LocalQueue::new();
295 for _ in 0..3 {
296 q.push(make_header());
297 }
298 let mut dest = Vec::new();
299 q.drain_front(&mut dest, 100);
300 assert_eq!(dest.len(), 3);
301 assert_eq!(q.len(), 0);
302 }
303
304 #[test]
305 fn global_queue_empty_pop_returns_none() {
306 let gq = GlobalQueue::new();
307 assert!(gq.pop().is_none());
308 }
309
310 #[test]
311 fn global_queue_len_tracks_count() {
312 let gq = GlobalQueue::new();
313 assert_eq!(gq.len(), 0);
314 gq.push_header(make_header());
315 assert_eq!(gq.len(), 1);
316 gq.push_header(make_header());
317 assert_eq!(gq.len(), 2);
318 let _ = gq.pop();
319 assert_eq!(gq.len(), 1);
320 }
321
322 #[test]
323 fn global_queue_fifo_ordering() {
324 let gq = GlobalQueue::new();
325 let h1 = make_header();
326 let h2 = make_header();
327 let p1 = Arc::as_ptr(&h1);
328 let p2 = Arc::as_ptr(&h2);
329 gq.push_header(h1);
330 gq.push_header(h2);
331 assert_eq!(Arc::as_ptr(&gq.pop().unwrap()), p1);
333 assert_eq!(Arc::as_ptr(&gq.pop().unwrap()), p2);
334 }
335
336 #[test]
337 fn global_queue_steal_batch_single_item_returns_one() {
338 let gq = GlobalQueue::new();
339 gq.push_header(make_header());
340 let mut local = LocalQueue::new();
341 let stolen = gq.steal_batch(&mut local);
342 assert_eq!(stolen, 1);
343 assert_eq!(gq.len(), 0);
344 }
345
346 #[test]
347 fn global_queue_steal_batch_empty_returns_zero() {
348 let gq = GlobalQueue::new();
349 let mut local = LocalQueue::new();
350 let stolen = gq.steal_batch(&mut local);
351 assert_eq!(stolen, 0);
352 }
353
354 #[test]
355 fn local_queue_push_many_pop_all() {
356 let mut q = LocalQueue::new();
357 for _ in 0..10 {
358 q.push(make_header());
359 }
360 assert_eq!(q.len(), 10);
361 let mut count = 0;
362 while q.pop().is_some() {
363 count += 1;
364 }
365 assert_eq!(count, 10);
366 assert!(q.is_empty());
367 }
368
369 #[test]
370 fn global_queue_push_many_pop_in_fifo_order() {
371 let gq = GlobalQueue::new();
372 let mut ptrs = Vec::new();
373 for _ in 0..5 {
374 let h = make_header();
375 ptrs.push(Arc::as_ptr(&h));
376 gq.push_header(h);
377 }
378 for ptr in ptrs {
379 let popped = gq.pop().unwrap();
380 assert_eq!(Arc::as_ptr(&popped), ptr);
381 }
382 assert!(gq.pop().is_none());
383 }
384
385 #[test]
386 fn local_queue_interleaved_push_pop() {
387 let mut q = LocalQueue::new();
388 q.push(make_header());
389 q.push(make_header());
390 q.pop();
391 assert_eq!(q.len(), 1);
392 q.push(make_header());
393 q.push(make_header());
394 assert_eq!(q.len(), 3);
395 }
396
397 #[test]
398 fn global_queue_steal_batch_10_items_steals_at_least_1() {
399 let gq = GlobalQueue::new();
400 for _ in 0..10 {
401 gq.push_header(make_header());
402 }
403 let mut local = LocalQueue::new();
404 let stolen = gq.steal_batch(&mut local);
405 assert!(stolen >= 1);
406 assert!(stolen <= 5); }
408
409 #[test]
410 fn local_queue_is_not_empty_after_push() {
411 let mut q = LocalQueue::new();
412 assert!(q.is_empty());
413 q.push(make_header());
414 assert!(!q.is_empty());
415 }
416
417 #[test]
418 fn local_queue_push_then_pop_lifo_2_items() {
419 let mut q = LocalQueue::new();
420 let h1 = make_header();
421 let h2 = make_header();
422 let p1 = Arc::as_ptr(&h1);
423 let p2 = Arc::as_ptr(&h2);
424 q.push(h1);
425 q.push(h2);
426 assert_eq!(Arc::as_ptr(&q.pop().unwrap()), p2);
428 assert_eq!(Arc::as_ptr(&q.pop().unwrap()), p1);
429 }
430
431 #[test]
432 fn global_queue_multiple_push_pop_cycles() {
433 let gq = GlobalQueue::new();
434 for _ in 0..3 {
435 gq.push_header(make_header());
436 gq.push_header(make_header());
437 gq.pop();
438 }
439 assert_eq!(gq.len(), 3);
440 }
441
442 #[test]
443 fn local_queue_drain_front_partial() {
444 let mut q = LocalQueue::new();
445 for _ in 0..10 {
446 q.push(make_header());
447 }
448 let mut dest = Vec::new();
449 q.drain_front(&mut dest, 4);
450 assert_eq!(dest.len(), 4);
451 assert_eq!(q.len(), 6);
452 }
453
454 #[test]
455 fn global_queue_steal_batch_large_queue() {
456 let gq = GlobalQueue::new();
457 for _ in 0..100 {
458 gq.push_header(make_header());
459 }
460 let mut local = LocalQueue::new();
461 let stolen = gq.steal_batch(&mut local);
462 assert!(stolen >= 1);
463 assert!(stolen <= 50);
464 }
465}