Skip to main content

moduvex_runtime/executor/
mod.rs

1//! Async executor: single-threaded (default) and multi-threaded (opt-in).
2//!
3//! # Single-threaded run loop (default)
4//! ```text
5//! block_on(future)
6//!   └─ Executor::run_loop
7//!        ├─ LocalQueue  (LIFO ring, 256 slots)
8//!        ├─ GlobalQueue (Mutex<VecDeque> — waker injection)
9//!        └─ Reactor     (kqueue/epoll — parks when no work is ready)
10//! ```
11//!
12//! # Multi-threaded run loop (opt-in via RuntimeBuilder::worker_threads(n))
13//! ```text
14//! block_on_multi(future, n_workers)
15//!   ├─ worker 0 (main thread) — polls root future + runs tasks
16//!   ├─ worker 1..N-1 (spawned threads) — steal and run tasks
17//!   └─ GlobalQueue + WorkStealingPool shared across all workers
18//! ```
19//!
20//! Single-threaded mode is the default. Multi-thread is explicitly opt-in.
21
22pub mod scheduler;
23pub mod task;
24pub mod task_local;
25pub mod waker;
26pub mod work_stealing;
27pub mod worker;
28
29use std::cell::Cell;
30use std::collections::HashMap;
31use std::future::Future;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::sync::{Arc, Mutex};
34use std::task::{Context, Poll};
35
36use crate::platform::sys::{create_pipe, events_with_capacity, Interest};
37use crate::reactor::{with_reactor, with_reactor_mut};
38use crate::time::{next_timer_deadline, tick_timer_wheel};
39
40#[cfg(unix)]
41use crate::signal::{on_signal_readable, SIGNAL_TOKEN};
42
43use scheduler::{GlobalQueue, LocalQueue};
44use task::{JoinHandle, Task, STATE_CANCELLED, STATE_COMPLETED};
45use waker::{make_waker, make_waker_with_notifier, WorkerNotifier};
46use work_stealing::{StealableQueue, WorkStealingPool};
47use worker::{clear_current_worker_wake_tx, set_current_worker_wake_tx, WorkerThread};
48
49// ── Executor ──────────────────────────────────────────────────────────────────
50
51/// Per-thread async executor (single-threaded mode).
52pub struct Executor {
53    /// LIFO local task queue — popped first each iteration.
54    local: LocalQueue,
55    /// Shared with all `Waker`s — they push here to re-schedule tasks.
56    global: Arc<GlobalQueue>,
57    /// Owned `Task` handles keyed by `Arc<TaskHeader>` pointer address.
58    tasks: HashMap<usize, Task>,
59    /// Read end of the self-pipe, registered with the reactor.
60    wake_rx: i32,
61    /// Write end of the self-pipe; the root-waker writes here to unblock park.
62    wake_tx: i32,
63}
64
65impl Executor {
66    fn new() -> std::io::Result<Self> {
67        let (wake_rx, wake_tx) = create_pipe()?;
68        with_reactor(|r| r.register(wake_rx, WAKE_TOKEN, Interest::READABLE))?;
69        Ok(Self {
70            local: LocalQueue::new(),
71            global: Arc::new(GlobalQueue::new()),
72            tasks: HashMap::new(),
73            wake_rx,
74            wake_tx,
75        })
76    }
77
78    /// Spawn a future onto this executor, returning a `JoinHandle<T>`.
79    pub fn spawn<F>(&mut self, future: F) -> JoinHandle<F::Output>
80    where
81        F: Future + 'static,
82        F::Output: Send + 'static,
83    {
84        let (task, jh) = Task::new(future);
85        let key = Arc::as_ptr(&task.header) as usize;
86        self.global.push_header(Arc::clone(&task.header));
87        self.tasks.insert(key, task);
88        jh
89    }
90
91    /// Drive the executor until `root` resolves. Returns root's output.
92    pub fn block_on<F: Future>(&mut self, future: F) -> F::Output {
93        let mut root = std::pin::pin!(future);
94        let mut root_done = false;
95        let mut root_output: Option<F::Output> = None;
96
97        let root_waker = self.make_root_waker();
98
99        loop {
100            // ── 1. Tick timer wheel ────────────────────────────────────────
101            let expired = tick_timer_wheel(std::time::Instant::now());
102            for w in expired {
103                w.wake();
104            }
105
106            // ── 2. Poll root ───────────────────────────────────────────────
107            if !root_done {
108                let mut cx = Context::from_waker(&root_waker);
109                if let Poll::Ready(val) = root.as_mut().poll(&mut cx) {
110                    root_output = Some(val);
111                    root_done = true;
112                }
113            }
114
115            // ── 3. Exit if root done and no spawned tasks remain ───────────
116            if root_done && self.tasks.is_empty() {
117                break;
118            }
119
120            // ── 4. Drain task queues ───────────────────────────────────────
121            let mut did_work = false;
122            loop {
123                let Some(header) = self.next_task() else {
124                    break;
125                };
126                did_work = true;
127                let key = Arc::as_ptr(&header) as usize;
128                let state = header.state.load(Ordering::Acquire);
129
130                if state == STATE_CANCELLED {
131                    if let Some(task) = self.tasks.remove(&key) {
132                        task.cancel();
133                    }
134                    continue;
135                }
136                if state == STATE_COMPLETED {
137                    self.tasks.remove(&key);
138                    continue;
139                }
140
141                let waker = make_waker(Arc::clone(&header), Arc::clone(&self.global));
142                let mut cx = Context::from_waker(&waker);
143
144                if let Some(task) = self.tasks.get(&key) {
145                    let completed = task.poll_task(&mut cx);
146                    if completed {
147                        self.tasks.remove(&key);
148                    }
149                }
150            }
151
152            // ── 5. Re-check exit after draining ───────────────────────────
153            if root_done && self.tasks.is_empty() {
154                break;
155            }
156
157            // ── 6. Park on reactor when both queues empty ──────────────────
158            if !did_work && self.local.is_empty() && self.global.len() == 0 {
159                self.park();
160            }
161        }
162
163        root_output.expect("root future must complete before block_on returns")
164    }
165
166    /// Drain both queues: pop local first, then global.
167    fn next_task(&mut self) -> Option<Arc<task::TaskHeader>> {
168        self.local.pop().or_else(|| self.global.pop())
169    }
170
171    /// Block on the reactor using the next timer deadline as the timeout.
172    fn park(&self) {
173        const MAX_PARK_MS: u64 = 10;
174
175        let timeout_ms = match next_timer_deadline() {
176            None => MAX_PARK_MS,
177            Some(deadline) => {
178                let now = std::time::Instant::now();
179                if deadline <= now {
180                    0
181                } else {
182                    let ms = deadline.duration_since(now).as_millis() as u64;
183                    ms.min(MAX_PARK_MS)
184                }
185            }
186        };
187
188        let mut events = events_with_capacity(64);
189        let _ = with_reactor_mut(|r| r.poll(&mut events, Some(timeout_ms)));
190        self.drain_wake_pipe();
191
192        #[cfg(unix)]
193        {
194            let signal_fired = events.iter().any(|ev| ev.token == SIGNAL_TOKEN && ev.readable);
195            if signal_fired {
196                on_signal_readable();
197            }
198        }
199    }
200
201    #[cfg(unix)]
202    fn drain_wake_pipe(&self) {
203        let mut buf = [0u8; 64];
204        loop {
205            // SAFETY: `wake_rx` is a valid O_NONBLOCK fd we own.
206            let n = unsafe { libc::read(self.wake_rx, buf.as_mut_ptr() as *mut _, buf.len()) };
207            if n <= 0 {
208                break;
209            }
210        }
211    }
212
213    #[cfg(not(unix))]
214    fn drain_wake_pipe(&self) {}
215
216    #[cfg(unix)]
217    fn make_root_waker(&self) -> std::task::Waker {
218        use std::task::{RawWaker, RawWakerVTable};
219
220        let tx = self.wake_tx;
221
222        unsafe fn clone_root(ptr: *const ()) -> RawWaker {
223            RawWaker::new(ptr, &ROOT_VTABLE)
224        }
225        unsafe fn wake_root(ptr: *const ()) {
226            let fd = ptr as usize as i32;
227            let b: u8 = 1;
228            // SAFETY: fd is the write end of a non-blocking pipe we own.
229            libc::write(fd, &b as *const u8 as *const _, 1);
230        }
231        unsafe fn wake_root_by_ref(ptr: *const ()) {
232            wake_root(ptr);
233        }
234        unsafe fn drop_root(_: *const ()) {}
235
236        static ROOT_VTABLE: RawWakerVTable =
237            RawWakerVTable::new(clone_root, wake_root, wake_root_by_ref, drop_root);
238
239        let raw = std::task::RawWaker::new(tx as usize as *const (), &ROOT_VTABLE);
240        // SAFETY: ROOT_VTABLE satisfies the RawWaker contract.
241        unsafe { std::task::Waker::from_raw(raw) }
242    }
243
244    #[cfg(not(unix))]
245    fn make_root_waker(&self) -> std::task::Waker {
246        use std::task::{RawWaker, RawWakerVTable};
247        static NOOP_VTABLE: RawWakerVTable = RawWakerVTable::new(
248            |p| RawWaker::new(p, &NOOP_VTABLE),
249            |_| {},
250            |_| {},
251            |_| {},
252        );
253        unsafe { std::task::Waker::from_raw(RawWaker::new(std::ptr::null(), &NOOP_VTABLE)) }
254    }
255}
256
257impl Drop for Executor {
258    fn drop(&mut self) {
259        let _ = with_reactor(|r| r.deregister(self.wake_rx));
260        // SAFETY: we own wake_rx and wake_tx exclusively.
261        #[cfg(unix)]
262        unsafe {
263            libc::close(self.wake_rx);
264            libc::close(self.wake_tx);
265        }
266    }
267}
268
269/// Sentinel reactor token for the self-pipe read end.
270const WAKE_TOKEN: usize = usize::MAX;
271
272// ── Thread-local executor pointer (single-threaded path) ──────────────────────
273
274thread_local! {
275    /// Raw pointer to the current thread's `Executor`.
276    /// Non-null only inside a `block_on_with_spawn` call.
277    static CURRENT_EXECUTOR: Cell<*mut Executor> = const { Cell::new(std::ptr::null_mut()) };
278}
279
280// ── Multi-threaded executor state ─────────────────────────────────────────────
281
282/// Shared state for the multi-threaded executor.
283///
284/// All workers hold an `Arc` to this. The main thread (worker 0) also drives
285/// the root future and signals shutdown when it completes.
286struct MultiState {
287    global: Arc<GlobalQueue>,
288    steal_pool: Arc<Mutex<WorkStealingPool>>,
289    tasks: Arc<Mutex<HashMap<usize, Task>>>,
290    shutdown: Arc<AtomicBool>,
291    notifier: Arc<WorkerNotifier>,
292}
293
294impl MultiState {
295    fn new() -> Self {
296        Self {
297            global: Arc::new(GlobalQueue::new()),
298            steal_pool: Arc::new(Mutex::new(WorkStealingPool::new())),
299            tasks: Arc::new(Mutex::new(HashMap::new())),
300            shutdown: Arc::new(AtomicBool::new(false)),
301            notifier: Arc::new(WorkerNotifier::new()),
302        }
303    }
304}
305
306// Thread-locals for the multi-thread spawn path.
307// Set on each worker thread when multi-thread mode is active.
308// `spawn()` reads these to route to the shared global queue and task map.
309thread_local! {
310    static MT_GLOBAL_QUEUE: Cell<*const GlobalQueue> = const { Cell::new(std::ptr::null()) };
311    static MT_TASKS: Cell<*const Mutex<HashMap<usize, Task>>> = const { Cell::new(std::ptr::null()) };
312}
313
314// ── Public API ────────────────────────────────────────────────────────────────
315
316/// Drive `future` to completion on the current thread, returning its output.
317pub fn block_on<F: Future>(future: F) -> F::Output {
318    let mut exec = Executor::new().expect("executor init failed");
319    exec.block_on(future)
320}
321
322/// Drive `future` to completion with `spawn()` available in the async context.
323///
324/// Single-threaded mode (1 worker). Use `block_on_multi` for multi-thread.
325pub fn block_on_with_spawn<F: Future>(future: F) -> F::Output {
326    let mut exec = Executor::new().expect("executor init failed");
327    CURRENT_EXECUTOR.with(|c| c.set(&mut exec as *mut Executor));
328    let result = exec.block_on(future);
329    CURRENT_EXECUTOR.with(|c| c.set(std::ptr::null_mut()));
330    result
331}
332
333/// Drive `future` to completion using `num_workers` OS threads.
334///
335/// Worker 0 is the main thread (drives root future). Workers 1..N-1 are
336/// spawned as background threads. All workers share GlobalQueue and steal tasks.
337///
338/// When `num_workers <= 1`, falls back to `block_on_with_spawn` (single-thread).
339pub fn block_on_multi<F>(future: F, num_workers: usize) -> F::Output
340where
341    F: Future + Send + 'static,
342    F::Output: Send + 'static,
343{
344    if num_workers <= 1 {
345        return block_on_with_spawn(future);
346    }
347
348    let state = MultiState::new();
349
350    // Build a single WorkStealingPool for all workers.
351    let steal_pool_arc = Arc::new({
352        let mut pool = WorkStealingPool::new();
353        for _ in 0..num_workers {
354            pool.add_worker(Arc::new(StealableQueue::new()));
355        }
356        pool
357    });
358
359    // Set MT thread-locals on main thread.
360    let global_ptr = Arc::as_ptr(&state.global);
361    let tasks_ptr = Arc::as_ptr(&state.tasks);
362    MT_GLOBAL_QUEUE.with(|c| c.set(global_ptr));
363    MT_TASKS.with(|c| c.set(tasks_ptr));
364
365    // Spawn background worker threads (workers 1..N-1).
366    let mut handles = Vec::new();
367    for worker_id in 1..num_workers {
368        let global = Arc::clone(&state.global);
369        let steal_pool = Arc::clone(&steal_pool_arc);
370        let tasks = Arc::clone(&state.tasks);
371        let shutdown = Arc::clone(&state.shutdown);
372        let notifier = Arc::clone(&state.notifier);
373
374        let handle = std::thread::spawn(move || {
375            // Set MT thread-locals on this worker thread.
376            let global_ptr = Arc::as_ptr(&global);
377            let tasks_ptr = Arc::as_ptr(&tasks);
378            MT_GLOBAL_QUEUE.with(|c| c.set(global_ptr));
379            MT_TASKS.with(|c| c.set(tasks_ptr));
380
381            let mut worker = WorkerThread::new(
382                worker_id,
383                global,
384                steal_pool,
385                tasks,
386                shutdown,
387                Arc::clone(&notifier),
388            )
389            .expect("worker init failed");
390
391            // Register this worker's self-pipe with the notifier.
392            notifier.add_fd(worker.wake_tx());
393
394            set_current_worker_wake_tx(worker.wake_tx());
395            worker.run();
396            clear_current_worker_wake_tx();
397
398            MT_GLOBAL_QUEUE.with(|c| c.set(std::ptr::null()));
399            MT_TASKS.with(|c| c.set(std::ptr::null()));
400        });
401        handles.push(handle);
402    }
403
404    // Worker 0: main thread drives root future.
405    let result = run_worker_0(future, &state, steal_pool_arc);
406
407    // Signal all workers to stop.
408    state.shutdown.store(true, Ordering::Release);
409    // Wake all parked workers so they see shutdown.
410    for _ in 0..num_workers {
411        state.notifier.notify_one();
412    }
413
414    // Join all background workers.
415    for h in handles {
416        let _ = h.join();
417    }
418
419    MT_GLOBAL_QUEUE.with(|c| c.set(std::ptr::null()));
420    MT_TASKS.with(|c| c.set(std::ptr::null()));
421
422    result
423}
424
425/// Run the main-thread worker (worker 0) which also polls the root future.
426fn run_worker_0<F>(future: F, state: &MultiState, steal_pool: Arc<WorkStealingPool>) -> F::Output
427where
428    F: Future + Send + 'static,
429    F::Output: Send + 'static,
430{
431    // Worker 0 uses its own self-pipe for reactor wakeup.
432    let (wake_rx, wake_tx) =
433        create_pipe().expect("worker 0 self-pipe failed");
434    with_reactor(|r| {
435        r.register(wake_rx, WAKE_TOKEN, Interest::READABLE)
436            .expect("worker 0 wake pipe register failed")
437    });
438
439    // Register worker 0's self-pipe with the notifier.
440    state.notifier.add_fd(wake_tx);
441    set_current_worker_wake_tx(wake_tx);
442
443    let mut root = std::pin::pin!(future);
444    let mut root_done = false;
445    let mut root_output: Option<F::Output> = None;
446
447    let root_waker = make_worker0_root_waker(wake_tx);
448
449    // Local queue for worker 0.
450    let mut local = LocalQueue::new();
451
452    loop {
453        // Tick timers.
454        let expired = tick_timer_wheel(std::time::Instant::now());
455        for w in expired {
456            w.wake();
457        }
458
459        // Poll root future.
460        if !root_done {
461            let mut cx = Context::from_waker(&root_waker);
462            if let Poll::Ready(val) = root.as_mut().poll(&mut cx) {
463                root_output = Some(val);
464                root_done = true;
465            }
466        }
467
468        // Check exit: root done + no tasks remaining.
469        if root_done && state.tasks.lock().unwrap().is_empty() {
470            break;
471        }
472
473        // Drain task queues.
474        let mut did_work = false;
475        loop {
476            // Try local first.
477            let header = local.pop().or_else(|| state.global.pop()).or_else(|| {
478                // Steal from peer workers.
479                let n = steal_pool.steal_one(0, &mut local, &state.global);
480                if n > 0 { local.pop() } else { None }
481            });
482
483            let Some(header) = header else { break };
484            did_work = true;
485
486            let key = Arc::as_ptr(&header) as usize;
487            let task_state = header.state.load(Ordering::Acquire);
488
489            if task_state == STATE_CANCELLED {
490                let t = state.tasks.lock().unwrap().remove(&key);
491                if let Some(task) = t {
492                    task.cancel();
493                }
494                continue;
495            }
496            if task_state == STATE_COMPLETED {
497                state.tasks.lock().unwrap().remove(&key);
498                continue;
499            }
500
501            let waker = make_waker_with_notifier(
502                Arc::clone(&header),
503                Arc::clone(&state.global),
504                Some(Arc::clone(&state.notifier)),
505            );
506            let mut cx = Context::from_waker(&waker);
507
508            // Extract task, poll, re-insert or drop.
509            let task = state.tasks.lock().unwrap().remove(&key);
510            if let Some(task) = task {
511                let completed = task.poll_task(&mut cx);
512                if !completed {
513                    state.tasks.lock().unwrap().insert(key, task);
514                }
515            }
516        }
517
518        // Re-check exit.
519        if root_done && state.tasks.lock().unwrap().is_empty() {
520            break;
521        }
522
523        // Park.
524        if !did_work {
525            park_worker(wake_rx);
526        }
527    }
528
529    clear_current_worker_wake_tx();
530
531    // Deregister and close self-pipe.
532    let _ = with_reactor(|r| r.deregister(wake_rx));
533    #[cfg(unix)]
534    unsafe {
535        libc::close(wake_rx);
536        libc::close(wake_tx);
537    }
538
539    root_output.expect("root future must complete")
540}
541
542/// Park the current worker on the reactor.
543fn park_worker(wake_rx: i32) {
544    const MAX_PARK_MS: u64 = 10;
545
546    let timeout_ms = match next_timer_deadline() {
547        None => MAX_PARK_MS,
548        Some(deadline) => {
549            let now = std::time::Instant::now();
550            if deadline <= now {
551                0
552            } else {
553                let ms = deadline.duration_since(now).as_millis() as u64;
554                ms.min(MAX_PARK_MS)
555            }
556        }
557    };
558
559    let mut events = events_with_capacity(64);
560    let _ = with_reactor_mut(|r| r.poll(&mut events, Some(timeout_ms)));
561
562    // Drain self-pipe.
563    #[cfg(unix)]
564    {
565        let mut buf = [0u8; 64];
566        loop {
567            // SAFETY: wake_rx is a valid O_NONBLOCK fd.
568            let n = unsafe { libc::read(wake_rx, buf.as_mut_ptr() as *mut _, buf.len()) };
569            if n <= 0 {
570                break;
571            }
572        }
573
574        let signal_fired = events.iter().any(|ev| ev.token == SIGNAL_TOKEN && ev.readable);
575        if signal_fired {
576            on_signal_readable();
577        }
578    }
579
580    #[cfg(not(unix))]
581    let _ = wake_rx;
582}
583
584/// Build a root waker for worker 0 (writes to its self-pipe to unpark).
585#[cfg(unix)]
586fn make_worker0_root_waker(wake_tx: i32) -> std::task::Waker {
587    use std::task::{RawWaker, RawWakerVTable};
588
589    unsafe fn clone_root(ptr: *const ()) -> RawWaker {
590        RawWaker::new(ptr, &ROOT_VTABLE)
591    }
592    unsafe fn wake_root(ptr: *const ()) {
593        let fd = ptr as usize as i32;
594        let b: u8 = 1;
595        // SAFETY: fd is the write end of a non-blocking pipe.
596        libc::write(fd, &b as *const u8 as *const _, 1);
597    }
598    unsafe fn wake_root_by_ref(ptr: *const ()) {
599        wake_root(ptr);
600    }
601    unsafe fn drop_root(_: *const ()) {}
602
603    static ROOT_VTABLE: RawWakerVTable =
604        RawWakerVTable::new(clone_root, wake_root, wake_root_by_ref, drop_root);
605
606    let raw = std::task::RawWaker::new(wake_tx as usize as *const (), &ROOT_VTABLE);
607    // SAFETY: ROOT_VTABLE is correct; fd lives for the duration of the call.
608    unsafe { std::task::Waker::from_raw(raw) }
609}
610
611#[cfg(not(unix))]
612fn make_worker0_root_waker(_wake_tx: i32) -> std::task::Waker {
613    use std::task::{RawWaker, RawWakerVTable};
614    static NOOP_VTABLE: RawWakerVTable = RawWakerVTable::new(
615        |p| RawWaker::new(p, &NOOP_VTABLE),
616        |_| {},
617        |_| {},
618        |_| {},
619    );
620    unsafe { std::task::Waker::from_raw(RawWaker::new(std::ptr::null(), &NOOP_VTABLE)) }
621}
622
623// ── Spawn — routes to single-thread or multi-thread context ───────────────────
624
625/// Spawn a future onto the current thread's executor.
626///
627/// Works in both single-threaded (`block_on_with_spawn`) and multi-threaded
628/// (`block_on_multi`) contexts. Panics if called outside both.
629pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
630where
631    F: Future + 'static,
632    F::Output: Send + 'static,
633{
634    // Try single-threaded path first.
635    let st_ptr = CURRENT_EXECUTOR.with(|c| c.get());
636    if !st_ptr.is_null() {
637        // SAFETY: ptr valid for duration of `block_on_with_spawn`.
638        return unsafe { (*st_ptr).spawn(future) };
639    }
640
641    // Try multi-threaded path.
642    let mt_global = MT_GLOBAL_QUEUE.with(|c| c.get());
643    let mt_tasks = MT_TASKS.with(|c| c.get());
644
645    if !mt_global.is_null() && !mt_tasks.is_null() {
646        let (task, jh) = Task::new(future);
647        let key = Arc::as_ptr(&task.header) as usize;
648        let header_clone = Arc::clone(&task.header);
649        // SAFETY: pointers are valid for the duration of block_on_multi.
650        // Insert task BEFORE pushing header — prevents race where a background
651        // worker pops the header but finds no Task in the map.
652        unsafe {
653            (*mt_tasks).lock().unwrap().insert(key, task);
654            (*mt_global).push_header(header_clone);
655        }
656        return jh;
657    }
658
659    panic!("spawn() called outside of block_on_with_spawn or block_on_multi context");
660}
661
662// ── Tests ─────────────────────────────────────────────────────────────────────
663
664#[cfg(test)]
665mod tests {
666    use super::*;
667    use std::sync::atomic::{AtomicUsize, Ordering as Ord};
668
669    #[test]
670    fn block_on_simple_value() {
671        assert_eq!(block_on(async { 42u32 }), 42);
672    }
673
674    #[test]
675    fn block_on_chain_of_awaits() {
676        async fn double(x: u32) -> u32 {
677            x * 2
678        }
679        async fn compute() -> u32 {
680            double(double(3).await).await
681        }
682        assert_eq!(block_on(compute()), 12);
683    }
684
685    #[test]
686    fn block_on_string_output() {
687        assert_eq!(block_on(async { String::from("hello") }), "hello");
688    }
689
690    #[test]
691    fn spawn_and_join() {
692        let result = block_on_with_spawn(async {
693            let jh = spawn(async { 100u32 });
694            jh.await.unwrap()
695        });
696        assert_eq!(result, 100);
697    }
698
699    #[test]
700    fn spawn_multiple_and_join_all() {
701        let counter = Arc::new(AtomicUsize::new(0));
702        let c1 = counter.clone();
703        let c2 = counter.clone();
704        block_on_with_spawn(async move {
705            let jh1 = spawn(async move {
706                c1.fetch_add(1, Ord::SeqCst);
707            });
708            let jh2 = spawn(async move {
709                c2.fetch_add(1, Ord::SeqCst);
710            });
711            jh1.await.unwrap();
712            jh2.await.unwrap();
713        });
714        assert_eq!(counter.load(Ord::SeqCst), 2);
715    }
716
717    #[test]
718    fn join_handle_abort_returns_cancelled() {
719        use std::future::poll_fn;
720        use std::task::Poll as P;
721
722        let result = block_on_with_spawn(async {
723            let jh = spawn(async { poll_fn(|_| P::<()>::Pending).await });
724            jh.abort();
725            jh.await
726        });
727        assert!(matches!(result, Err(task::JoinError::Cancelled)));
728    }
729
730    #[test]
731    fn block_on_nested_spawn_ordering() {
732        let order = Arc::new(std::sync::Mutex::new(Vec::<u32>::new()));
733        let o1 = order.clone();
734        let o2 = order.clone();
735        block_on_with_spawn(async move {
736            let jh1 = spawn(async move {
737                o1.lock().unwrap().push(1);
738            });
739            let jh2 = spawn(async move {
740                o2.lock().unwrap().push(2);
741            });
742            jh1.await.unwrap();
743            jh2.await.unwrap();
744        });
745        let v = order.lock().unwrap();
746        assert_eq!(v.len(), 2);
747    }
748
749    // ── Multi-threaded tests ───────────────────────────────────────────────
750
751    #[test]
752    fn multi_thread_simple_spawn() {
753        let result = block_on_multi(
754            async {
755                let jh = spawn(async { 42u32 });
756                jh.await.unwrap()
757            },
758            2,
759        );
760        assert_eq!(result, 42);
761    }
762
763    #[test]
764    fn multi_thread_many_tasks_complete() {
765        const N: usize = 100;
766        let counter = Arc::new(AtomicUsize::new(0));
767
768        let c = counter.clone();
769        block_on_multi(
770            async move {
771                let mut handles = Vec::new();
772                for _ in 0..N {
773                    let cc = c.clone();
774                    handles.push(spawn(async move {
775                        cc.fetch_add(1, Ord::SeqCst);
776                    }));
777                }
778                for h in handles {
779                    h.await.unwrap();
780                }
781            },
782            4,
783        );
784
785        assert_eq!(counter.load(Ord::SeqCst), N);
786    }
787
788    #[test]
789    fn multi_thread_falls_back_to_single_with_one_worker() {
790        // num_workers=1 uses single-thread path, must still work.
791        let result = block_on_multi(async { 99u32 }, 1);
792        assert_eq!(result, 99);
793    }
794
795    #[test]
796    fn multi_thread_1000_tasks_4_workers() {
797        const N: usize = 1000;
798        let counter = Arc::new(AtomicUsize::new(0));
799
800        let c = counter.clone();
801        block_on_multi(
802            async move {
803                let mut handles = Vec::new();
804                for _ in 0..N {
805                    let cc = c.clone();
806                    handles.push(spawn(async move {
807                        cc.fetch_add(1, Ord::SeqCst);
808                    }));
809                }
810                for h in handles {
811                    h.await.unwrap();
812                }
813            },
814            4,
815        );
816
817        assert_eq!(counter.load(Ord::SeqCst), N);
818    }
819
820    // ── Additional executor tests ──────────────────────────────────────────
821
822    #[test]
823    fn block_on_returns_unit() {
824        block_on(async {});
825    }
826
827    #[test]
828    fn block_on_with_spawn_returns_unit() {
829        block_on_with_spawn(async {});
830    }
831
832    #[test]
833    fn spawn_1000_tasks_single_thread_all_complete() {
834        let counter = Arc::new(AtomicUsize::new(0));
835        let c = counter.clone();
836        block_on_with_spawn(async move {
837            let mut handles = Vec::new();
838            for _ in 0..1000 {
839                let cc = c.clone();
840                handles.push(spawn(async move {
841                    cc.fetch_add(1, Ord::SeqCst);
842                }));
843            }
844            for h in handles {
845                h.await.unwrap();
846            }
847        });
848        assert_eq!(counter.load(Ord::SeqCst), 1000);
849    }
850
851    #[test]
852    fn spawn_in_spawned_task() {
853        let result = block_on_with_spawn(async {
854            let jh = spawn(async {
855                let inner = spawn(async { 42u32 });
856                inner.await.unwrap()
857            });
858            jh.await.unwrap()
859        });
860        assert_eq!(result, 42);
861    }
862
863    #[test]
864    fn join_handle_dropped_without_await_no_panic() {
865        // Drop the JoinHandle without awaiting; executor must not panic or deadlock.
866        block_on_with_spawn(async move {
867            // Spawn a task and immediately drop the handle (detach it).
868            drop(spawn(async move { 42u32 }));
869            // Spawn a second task to give the executor a reason to keep running.
870            // This second task ensures the root future doesn't exit before the
871            // first task has had a chance to run.
872            let jh2 = spawn(async move { 99u32 });
873            jh2.await.unwrap()
874        });
875        // No assertion needed — we just verify no panic/hang.
876    }
877
878    #[test]
879    fn multi_thread_0_workers_fallback_to_single() {
880        // num_workers=0 edge case: should not panic, falls back to single.
881        let result = block_on_multi(async { 7u32 }, 0);
882        assert_eq!(result, 7);
883    }
884
885    #[test]
886    fn multi_thread_3_workers_all_join() {
887        let counter = Arc::new(AtomicUsize::new(0));
888        let c = counter.clone();
889        block_on_multi(
890            async move {
891                let mut handles = Vec::new();
892                for _ in 0..30 {
893                    let cc = c.clone();
894                    handles.push(spawn(async move {
895                        cc.fetch_add(1, Ord::SeqCst);
896                    }));
897                }
898                for h in handles {
899                    h.await.unwrap();
900                }
901            },
902            3,
903        );
904        assert_eq!(counter.load(Ord::SeqCst), 30);
905    }
906
907    #[test]
908    fn multi_thread_nested_spawn() {
909        let result = block_on_multi(
910            async {
911                let jh = spawn(async {
912                    let inner = spawn(async { 99u32 });
913                    inner.await.unwrap()
914                });
915                jh.await.unwrap()
916            },
917            2,
918        );
919        assert_eq!(result, 99);
920    }
921
922    #[test]
923    fn block_on_with_spawn_sequential_ordering() {
924        let order = Arc::new(std::sync::Mutex::new(Vec::<u32>::new()));
925        let o = order.clone();
926        block_on_with_spawn(async move {
927            let o1 = o.clone();
928            let o2 = o.clone();
929            let o3 = o.clone();
930            let jh1 = spawn(async move {
931                o1.lock().unwrap().push(1);
932            });
933            let jh2 = spawn(async move {
934                o2.lock().unwrap().push(2);
935            });
936            let jh3 = spawn(async move {
937                o3.lock().unwrap().push(3);
938            });
939            jh1.await.unwrap();
940            jh2.await.unwrap();
941            jh3.await.unwrap();
942        });
943        assert_eq!(order.lock().unwrap().len(), 3);
944    }
945
946    #[test]
947    fn multi_thread_result_type_roundtrip() {
948        let result: Result<u32, String> = block_on_multi(
949            async {
950                let jh = spawn(async { Ok::<u32, String>(42) });
951                jh.await.unwrap()
952            },
953            2,
954        );
955        assert_eq!(result, Ok(42));
956    }
957
958    #[test]
959    fn block_on_returns_string() {
960        let s = block_on(async { String::from("hello world") });
961        assert_eq!(s, "hello world");
962    }
963
964    #[test]
965    fn block_on_returns_vec() {
966        let v = block_on(async { vec![1u32, 2, 3] });
967        assert_eq!(v, vec![1, 2, 3]);
968    }
969
970    #[test]
971    fn spawn_returns_computed_value() {
972        let result = block_on_with_spawn(async {
973            let jh = spawn(async { 2u32 * 21 });
974            jh.await.unwrap()
975        });
976        assert_eq!(result, 42);
977    }
978
979    #[test]
980    fn spawn_with_move_captures_outer() {
981        let data = Arc::new(AtomicUsize::new(55));
982        let d = data.clone();
983        let result = block_on_with_spawn(async move {
984            let jh = spawn(async move { d.load(Ord::SeqCst) });
985            jh.await.unwrap()
986        });
987        assert_eq!(result, 55);
988    }
989
990    #[test]
991    fn multi_thread_2_workers_count_50() {
992        let counter = Arc::new(AtomicUsize::new(0));
993        let c = counter.clone();
994        block_on_multi(
995            async move {
996                let mut handles = Vec::new();
997                for _ in 0..50 {
998                    let cc = c.clone();
999                    handles.push(spawn(async move {
1000                        cc.fetch_add(1, Ord::SeqCst);
1001                    }));
1002                }
1003                for h in handles {
1004                    h.await.unwrap();
1005                }
1006            },
1007            2,
1008        );
1009        assert_eq!(counter.load(Ord::SeqCst), 50);
1010    }
1011
1012    #[test]
1013    fn spawn_chain_3_deep() {
1014        let result = block_on_with_spawn(async {
1015            let h1 = spawn(async {
1016                let h2 = spawn(async {
1017                    let h3 = spawn(async { 7u32 });
1018                    h3.await.unwrap() * 2
1019                });
1020                h2.await.unwrap() + 1
1021            });
1022            h1.await.unwrap()
1023        });
1024        assert_eq!(result, 15); // 7*2+1 = 15
1025    }
1026
1027    #[test]
1028    fn block_on_returns_option() {
1029        let v = block_on(async { Some(42u32) });
1030        assert_eq!(v, Some(42));
1031    }
1032
1033    #[test]
1034    fn block_on_returns_tuple() {
1035        let (a, b) = block_on(async { (1u32, 2u32) });
1036        assert_eq!(a, 1);
1037        assert_eq!(b, 2);
1038    }
1039
1040    #[test]
1041    fn spawn_10_independent_tasks_all_increment() {
1042        let counter = Arc::new(AtomicUsize::new(0));
1043        let c = counter.clone();
1044        block_on_with_spawn(async move {
1045            let mut handles: Vec<_> = (0..10)
1046                .map(|_| {
1047                    let cc = c.clone();
1048                    spawn(async move {
1049                        cc.fetch_add(1, Ord::SeqCst);
1050                    })
1051                })
1052                .collect();
1053            for h in handles.drain(..) {
1054                h.await.unwrap();
1055            }
1056        });
1057        assert_eq!(counter.load(Ord::SeqCst), 10);
1058    }
1059
1060    #[test]
1061    fn multi_thread_5_workers_500_tasks() {
1062        let counter = Arc::new(AtomicUsize::new(0));
1063        let c = counter.clone();
1064        block_on_multi(
1065            async move {
1066                let handles: Vec<_> = (0..500)
1067                    .map(|_| {
1068                        let cc = c.clone();
1069                        spawn(async move {
1070                            cc.fetch_add(1, Ord::SeqCst);
1071                        })
1072                    })
1073                    .collect();
1074                for h in handles {
1075                    h.await.unwrap();
1076                }
1077            },
1078            5,
1079        );
1080        assert_eq!(counter.load(Ord::SeqCst), 500);
1081    }
1082
1083    #[test]
1084    fn block_on_with_spawn_arc_shared_across_tasks() {
1085        let shared = Arc::new(AtomicUsize::new(0));
1086        let s = shared.clone();
1087        block_on_with_spawn(async move {
1088            let s1 = s.clone();
1089            let s2 = s.clone();
1090            let h1 = spawn(async move { s1.fetch_add(10, Ord::SeqCst) });
1091            let h2 = spawn(async move { s2.fetch_add(20, Ord::SeqCst) });
1092            h1.await.unwrap();
1093            h2.await.unwrap();
1094        });
1095        let v = shared.load(Ord::SeqCst);
1096        assert_eq!(v, 30);
1097    }
1098
1099    #[test]
1100    fn abort_before_poll_returns_cancelled() {
1101        let result = block_on_with_spawn(async {
1102            let jh = spawn(async {
1103                // This future never completes on its own
1104                std::future::poll_fn(|_| std::task::Poll::<()>::Pending).await
1105            });
1106            jh.abort();
1107            jh.await
1108        });
1109        assert!(matches!(result, Err(task::JoinError::Cancelled)));
1110    }
1111
1112    #[test]
1113    fn spawn_returns_unit_output() {
1114        block_on_with_spawn(async {
1115            let jh = spawn(async {});
1116            jh.await.unwrap(); // output is ()
1117        });
1118    }
1119
1120    #[test]
1121    fn multi_thread_result_err_type_roundtrip() {
1122        let result: Result<u32, String> = block_on_multi(
1123            async {
1124                let jh = spawn(async { Err::<u32, String>("fail".to_string()) });
1125                jh.await.unwrap()
1126            },
1127            2,
1128        );
1129        assert_eq!(result, Err("fail".to_string()));
1130    }
1131
1132    #[test]
1133    fn block_on_f64_value() {
1134        let v: f64 = block_on(async { 3.14 });
1135        assert!((v - 3.14).abs() < 1e-10);
1136    }
1137
1138    #[test]
1139    fn spawn_computes_product_of_two_values() {
1140        let result = block_on_with_spawn(async {
1141            let a = spawn(async { 6u32 });
1142            let b = spawn(async { 7u32 });
1143            a.await.unwrap() * b.await.unwrap()
1144        });
1145        assert_eq!(result, 42);
1146    }
1147
1148    #[test]
1149    fn block_on_with_spawn_returns_bool() {
1150        let v = block_on_with_spawn(async {
1151            let jh = spawn(async { true });
1152            jh.await.unwrap()
1153        });
1154        assert!(v);
1155    }
1156
1157    #[test]
1158    fn multi_thread_6_workers_200_tasks() {
1159        let counter = Arc::new(AtomicUsize::new(0));
1160        let c = counter.clone();
1161        block_on_multi(
1162            async move {
1163                let handles: Vec<_> = (0..200)
1164                    .map(|_| {
1165                        let cc = c.clone();
1166                        spawn(async move {
1167                            cc.fetch_add(1, Ord::SeqCst);
1168                        })
1169                    })
1170                    .collect();
1171                for h in handles {
1172                    h.await.unwrap();
1173                }
1174            },
1175            6,
1176        );
1177        assert_eq!(counter.load(Ord::SeqCst), 200);
1178    }
1179
1180    #[test]
1181    fn spawn_task_with_string_return() {
1182        let result = block_on_with_spawn(async {
1183            let jh = spawn(async { "hello".to_string() });
1184            jh.await.unwrap()
1185        });
1186        assert_eq!(result, "hello");
1187    }
1188
1189    #[test]
1190    fn block_on_nested_async_fns() {
1191        async fn add(a: u32, b: u32) -> u32 {
1192            a + b
1193        }
1194        async fn multiply(a: u32, b: u32) -> u32 {
1195            a * b
1196        }
1197        let result = block_on(async {
1198            let sum = add(3, 4).await;
1199            multiply(sum, 2).await
1200        });
1201        assert_eq!(result, 14);
1202    }
1203
1204    #[test]
1205    fn block_on_complex_expression() {
1206        let result = block_on(async {
1207            let a = 10u32;
1208            let b = 20u32;
1209            a + b + 12
1210        });
1211        assert_eq!(result, 42);
1212    }
1213
1214    #[test]
1215    fn spawn_50_tasks_all_complete_with_counter() {
1216        let counter = Arc::new(AtomicUsize::new(0));
1217        let c = counter.clone();
1218        block_on_with_spawn(async move {
1219            let handles: Vec<_> = (0..50)
1220                .map(|_| {
1221                    let cc = c.clone();
1222                    spawn(async move { cc.fetch_add(1, Ord::SeqCst) })
1223                })
1224                .collect();
1225            for h in handles {
1226                h.await.unwrap();
1227            }
1228        });
1229        assert_eq!(counter.load(Ord::SeqCst), 50);
1230    }
1231
1232    #[test]
1233    fn multi_thread_join_handle_result_preserved() {
1234        // Spawned task computes unique value, JoinHandle returns it correctly
1235        let values: Vec<u32> = (0..8).collect();
1236        let results: Vec<u32> = block_on_multi(
1237            async {
1238                let handles: Vec<_> = (0..8u32)
1239                    .map(|i| spawn(async move { i * i }))
1240                    .collect();
1241                let mut results = Vec::new();
1242                for h in handles {
1243                    results.push(h.await.unwrap());
1244                }
1245                results
1246            },
1247            4,
1248        );
1249        assert_eq!(results.len(), 8);
1250        for (i, &v) in results.iter().enumerate() {
1251            assert_eq!(v, (i as u32) * (i as u32));
1252        }
1253    }
1254
1255    #[test]
1256    fn block_on_with_spawn_multiple_spawn_waves() {
1257        // Spawn tasks in waves to exercise queue cycling
1258        let counter = Arc::new(AtomicUsize::new(0));
1259        let c = counter.clone();
1260        block_on_with_spawn(async move {
1261            // First wave
1262            let handles1: Vec<_> = (0..5)
1263                .map(|_| {
1264                    let cc = c.clone();
1265                    spawn(async move { cc.fetch_add(1, Ord::SeqCst) })
1266                })
1267                .collect();
1268            for h in handles1 {
1269                h.await.unwrap();
1270            }
1271            // Second wave
1272            let handles2: Vec<_> = (0..5)
1273                .map(|_| {
1274                    let cc = c.clone();
1275                    spawn(async move { cc.fetch_add(1, Ord::SeqCst) })
1276                })
1277                .collect();
1278            for h in handles2 {
1279                h.await.unwrap();
1280            }
1281        });
1282        assert_eq!(counter.load(Ord::SeqCst), 10);
1283    }
1284}