1pub mod scheduler;
23pub mod task;
24pub mod task_local;
25pub mod waker;
26pub mod work_stealing;
27pub mod worker;
28
29use std::cell::Cell;
30use std::collections::HashMap;
31use std::future::Future;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::sync::{Arc, Mutex};
34use std::task::{Context, Poll};
35
36use crate::platform::sys::{create_pipe, events_with_capacity, Interest};
37use crate::reactor::{with_reactor, with_reactor_mut};
38use crate::time::{next_timer_deadline, tick_timer_wheel};
39
40#[cfg(unix)]
41use crate::signal::{on_signal_readable, SIGNAL_TOKEN};
42
43use scheduler::{GlobalQueue, LocalQueue};
44use task::{JoinHandle, Task, STATE_CANCELLED, STATE_COMPLETED};
45use waker::{make_waker, make_waker_with_notifier, WorkerNotifier};
46use work_stealing::{StealableQueue, WorkStealingPool};
47use worker::{clear_current_worker_wake_tx, set_current_worker_wake_tx, WorkerThread};
48
49pub struct Executor {
53 local: LocalQueue,
55 global: Arc<GlobalQueue>,
57 tasks: HashMap<usize, Task>,
59 wake_rx: i32,
61 wake_tx: i32,
63}
64
65impl Executor {
66 fn new() -> std::io::Result<Self> {
67 let (wake_rx, wake_tx) = create_pipe()?;
68 with_reactor(|r| r.register(wake_rx, WAKE_TOKEN, Interest::READABLE))?;
69 Ok(Self {
70 local: LocalQueue::new(),
71 global: Arc::new(GlobalQueue::new()),
72 tasks: HashMap::new(),
73 wake_rx,
74 wake_tx,
75 })
76 }
77
78 pub fn spawn<F>(&mut self, future: F) -> JoinHandle<F::Output>
80 where
81 F: Future + 'static,
82 F::Output: Send + 'static,
83 {
84 let (task, jh) = Task::new(future);
85 let key = Arc::as_ptr(&task.header) as usize;
86 self.global.push_header(Arc::clone(&task.header));
87 self.tasks.insert(key, task);
88 jh
89 }
90
91 pub fn block_on<F: Future>(&mut self, future: F) -> F::Output {
93 let mut root = std::pin::pin!(future);
94 let mut root_done = false;
95 let mut root_output: Option<F::Output> = None;
96
97 let root_waker = self.make_root_waker();
98
99 loop {
100 let expired = tick_timer_wheel(std::time::Instant::now());
102 for w in expired {
103 w.wake();
104 }
105
106 if !root_done {
108 let mut cx = Context::from_waker(&root_waker);
109 if let Poll::Ready(val) = root.as_mut().poll(&mut cx) {
110 root_output = Some(val);
111 root_done = true;
112 }
113 }
114
115 if root_done && self.tasks.is_empty() {
117 break;
118 }
119
120 let mut did_work = false;
122 loop {
123 let Some(header) = self.next_task() else {
124 break;
125 };
126 did_work = true;
127 let key = Arc::as_ptr(&header) as usize;
128 let state = header.state.load(Ordering::Acquire);
129
130 if state == STATE_CANCELLED {
131 if let Some(task) = self.tasks.remove(&key) {
132 task.cancel();
133 }
134 continue;
135 }
136 if state == STATE_COMPLETED {
137 self.tasks.remove(&key);
138 continue;
139 }
140
141 let waker = make_waker(Arc::clone(&header), Arc::clone(&self.global));
142 let mut cx = Context::from_waker(&waker);
143
144 if let Some(task) = self.tasks.get(&key) {
145 let completed = task.poll_task(&mut cx);
146 if completed {
147 self.tasks.remove(&key);
148 }
149 }
150 }
151
152 if root_done && self.tasks.is_empty() {
154 break;
155 }
156
157 if !did_work && self.local.is_empty() && self.global.len() == 0 {
159 self.park();
160 }
161 }
162
163 root_output.expect("root future must complete before block_on returns")
164 }
165
166 fn next_task(&mut self) -> Option<Arc<task::TaskHeader>> {
168 self.local.pop().or_else(|| self.global.pop())
169 }
170
171 fn park(&self) {
173 const MAX_PARK_MS: u64 = 10;
174
175 let timeout_ms = match next_timer_deadline() {
176 None => MAX_PARK_MS,
177 Some(deadline) => {
178 let now = std::time::Instant::now();
179 if deadline <= now {
180 0
181 } else {
182 let ms = deadline.duration_since(now).as_millis() as u64;
183 ms.min(MAX_PARK_MS)
184 }
185 }
186 };
187
188 let mut events = events_with_capacity(64);
189 let _ = with_reactor_mut(|r| r.poll(&mut events, Some(timeout_ms)));
190 self.drain_wake_pipe();
191
192 #[cfg(unix)]
193 {
194 let signal_fired = events.iter().any(|ev| ev.token == SIGNAL_TOKEN && ev.readable);
195 if signal_fired {
196 on_signal_readable();
197 }
198 }
199 }
200
201 #[cfg(unix)]
202 fn drain_wake_pipe(&self) {
203 let mut buf = [0u8; 64];
204 loop {
205 let n = unsafe { libc::read(self.wake_rx, buf.as_mut_ptr() as *mut _, buf.len()) };
207 if n <= 0 {
208 break;
209 }
210 }
211 }
212
213 #[cfg(not(unix))]
214 fn drain_wake_pipe(&self) {}
215
216 #[cfg(unix)]
217 fn make_root_waker(&self) -> std::task::Waker {
218 use std::task::{RawWaker, RawWakerVTable};
219
220 let tx = self.wake_tx;
221
222 unsafe fn clone_root(ptr: *const ()) -> RawWaker {
223 RawWaker::new(ptr, &ROOT_VTABLE)
224 }
225 unsafe fn wake_root(ptr: *const ()) {
226 let fd = ptr as usize as i32;
227 let b: u8 = 1;
228 libc::write(fd, &b as *const u8 as *const _, 1);
230 }
231 unsafe fn wake_root_by_ref(ptr: *const ()) {
232 wake_root(ptr);
233 }
234 unsafe fn drop_root(_: *const ()) {}
235
236 static ROOT_VTABLE: RawWakerVTable =
237 RawWakerVTable::new(clone_root, wake_root, wake_root_by_ref, drop_root);
238
239 let raw = std::task::RawWaker::new(tx as usize as *const (), &ROOT_VTABLE);
240 unsafe { std::task::Waker::from_raw(raw) }
242 }
243
244 #[cfg(not(unix))]
245 fn make_root_waker(&self) -> std::task::Waker {
246 use std::task::{RawWaker, RawWakerVTable};
247 static NOOP_VTABLE: RawWakerVTable = RawWakerVTable::new(
248 |p| RawWaker::new(p, &NOOP_VTABLE),
249 |_| {},
250 |_| {},
251 |_| {},
252 );
253 unsafe { std::task::Waker::from_raw(RawWaker::new(std::ptr::null(), &NOOP_VTABLE)) }
254 }
255}
256
257impl Drop for Executor {
258 fn drop(&mut self) {
259 let _ = with_reactor(|r| r.deregister(self.wake_rx));
260 #[cfg(unix)]
262 unsafe {
263 libc::close(self.wake_rx);
264 libc::close(self.wake_tx);
265 }
266 }
267}
268
269const WAKE_TOKEN: usize = usize::MAX;
271
272thread_local! {
275 static CURRENT_EXECUTOR: Cell<*mut Executor> = const { Cell::new(std::ptr::null_mut()) };
278}
279
280struct MultiState {
287 global: Arc<GlobalQueue>,
288 steal_pool: Arc<Mutex<WorkStealingPool>>,
289 tasks: Arc<Mutex<HashMap<usize, Task>>>,
290 shutdown: Arc<AtomicBool>,
291 notifier: Arc<WorkerNotifier>,
292}
293
294impl MultiState {
295 fn new() -> Self {
296 Self {
297 global: Arc::new(GlobalQueue::new()),
298 steal_pool: Arc::new(Mutex::new(WorkStealingPool::new())),
299 tasks: Arc::new(Mutex::new(HashMap::new())),
300 shutdown: Arc::new(AtomicBool::new(false)),
301 notifier: Arc::new(WorkerNotifier::new()),
302 }
303 }
304}
305
306thread_local! {
310 static MT_GLOBAL_QUEUE: Cell<*const GlobalQueue> = const { Cell::new(std::ptr::null()) };
311 static MT_TASKS: Cell<*const Mutex<HashMap<usize, Task>>> = const { Cell::new(std::ptr::null()) };
312}
313
314pub fn block_on<F: Future>(future: F) -> F::Output {
318 let mut exec = Executor::new().expect("executor init failed");
319 exec.block_on(future)
320}
321
322pub fn block_on_with_spawn<F: Future>(future: F) -> F::Output {
326 let mut exec = Executor::new().expect("executor init failed");
327 CURRENT_EXECUTOR.with(|c| c.set(&mut exec as *mut Executor));
328 let result = exec.block_on(future);
329 CURRENT_EXECUTOR.with(|c| c.set(std::ptr::null_mut()));
330 result
331}
332
333pub fn block_on_multi<F>(future: F, num_workers: usize) -> F::Output
340where
341 F: Future + Send + 'static,
342 F::Output: Send + 'static,
343{
344 if num_workers <= 1 {
345 return block_on_with_spawn(future);
346 }
347
348 let state = MultiState::new();
349
350 let steal_pool_arc = Arc::new({
352 let mut pool = WorkStealingPool::new();
353 for _ in 0..num_workers {
354 pool.add_worker(Arc::new(StealableQueue::new()));
355 }
356 pool
357 });
358
359 let global_ptr = Arc::as_ptr(&state.global);
361 let tasks_ptr = Arc::as_ptr(&state.tasks);
362 MT_GLOBAL_QUEUE.with(|c| c.set(global_ptr));
363 MT_TASKS.with(|c| c.set(tasks_ptr));
364
365 let mut handles = Vec::new();
367 for worker_id in 1..num_workers {
368 let global = Arc::clone(&state.global);
369 let steal_pool = Arc::clone(&steal_pool_arc);
370 let tasks = Arc::clone(&state.tasks);
371 let shutdown = Arc::clone(&state.shutdown);
372 let notifier = Arc::clone(&state.notifier);
373
374 let handle = std::thread::spawn(move || {
375 let global_ptr = Arc::as_ptr(&global);
377 let tasks_ptr = Arc::as_ptr(&tasks);
378 MT_GLOBAL_QUEUE.with(|c| c.set(global_ptr));
379 MT_TASKS.with(|c| c.set(tasks_ptr));
380
381 let mut worker = WorkerThread::new(
382 worker_id,
383 global,
384 steal_pool,
385 tasks,
386 shutdown,
387 Arc::clone(¬ifier),
388 )
389 .expect("worker init failed");
390
391 notifier.add_fd(worker.wake_tx());
393
394 set_current_worker_wake_tx(worker.wake_tx());
395 worker.run();
396 clear_current_worker_wake_tx();
397
398 MT_GLOBAL_QUEUE.with(|c| c.set(std::ptr::null()));
399 MT_TASKS.with(|c| c.set(std::ptr::null()));
400 });
401 handles.push(handle);
402 }
403
404 let result = run_worker_0(future, &state, steal_pool_arc);
406
407 state.shutdown.store(true, Ordering::Release);
409 for _ in 0..num_workers {
411 state.notifier.notify_one();
412 }
413
414 for h in handles {
416 let _ = h.join();
417 }
418
419 MT_GLOBAL_QUEUE.with(|c| c.set(std::ptr::null()));
420 MT_TASKS.with(|c| c.set(std::ptr::null()));
421
422 result
423}
424
425fn run_worker_0<F>(future: F, state: &MultiState, steal_pool: Arc<WorkStealingPool>) -> F::Output
427where
428 F: Future + Send + 'static,
429 F::Output: Send + 'static,
430{
431 let (wake_rx, wake_tx) =
433 create_pipe().expect("worker 0 self-pipe failed");
434 with_reactor(|r| {
435 r.register(wake_rx, WAKE_TOKEN, Interest::READABLE)
436 .expect("worker 0 wake pipe register failed")
437 });
438
439 state.notifier.add_fd(wake_tx);
441 set_current_worker_wake_tx(wake_tx);
442
443 let mut root = std::pin::pin!(future);
444 let mut root_done = false;
445 let mut root_output: Option<F::Output> = None;
446
447 let root_waker = make_worker0_root_waker(wake_tx);
448
449 let mut local = LocalQueue::new();
451
452 loop {
453 let expired = tick_timer_wheel(std::time::Instant::now());
455 for w in expired {
456 w.wake();
457 }
458
459 if !root_done {
461 let mut cx = Context::from_waker(&root_waker);
462 if let Poll::Ready(val) = root.as_mut().poll(&mut cx) {
463 root_output = Some(val);
464 root_done = true;
465 }
466 }
467
468 if root_done && state.tasks.lock().unwrap().is_empty() {
470 break;
471 }
472
473 let mut did_work = false;
475 loop {
476 let header = local.pop().or_else(|| state.global.pop()).or_else(|| {
478 let n = steal_pool.steal_one(0, &mut local, &state.global);
480 if n > 0 { local.pop() } else { None }
481 });
482
483 let Some(header) = header else { break };
484 did_work = true;
485
486 let key = Arc::as_ptr(&header) as usize;
487 let task_state = header.state.load(Ordering::Acquire);
488
489 if task_state == STATE_CANCELLED {
490 let t = state.tasks.lock().unwrap().remove(&key);
491 if let Some(task) = t {
492 task.cancel();
493 }
494 continue;
495 }
496 if task_state == STATE_COMPLETED {
497 state.tasks.lock().unwrap().remove(&key);
498 continue;
499 }
500
501 let waker = make_waker_with_notifier(
502 Arc::clone(&header),
503 Arc::clone(&state.global),
504 Some(Arc::clone(&state.notifier)),
505 );
506 let mut cx = Context::from_waker(&waker);
507
508 let task = state.tasks.lock().unwrap().remove(&key);
510 if let Some(task) = task {
511 let completed = task.poll_task(&mut cx);
512 if !completed {
513 state.tasks.lock().unwrap().insert(key, task);
514 }
515 }
516 }
517
518 if root_done && state.tasks.lock().unwrap().is_empty() {
520 break;
521 }
522
523 if !did_work {
525 park_worker(wake_rx);
526 }
527 }
528
529 clear_current_worker_wake_tx();
530
531 let _ = with_reactor(|r| r.deregister(wake_rx));
533 #[cfg(unix)]
534 unsafe {
535 libc::close(wake_rx);
536 libc::close(wake_tx);
537 }
538
539 root_output.expect("root future must complete")
540}
541
542fn park_worker(wake_rx: i32) {
544 const MAX_PARK_MS: u64 = 10;
545
546 let timeout_ms = match next_timer_deadline() {
547 None => MAX_PARK_MS,
548 Some(deadline) => {
549 let now = std::time::Instant::now();
550 if deadline <= now {
551 0
552 } else {
553 let ms = deadline.duration_since(now).as_millis() as u64;
554 ms.min(MAX_PARK_MS)
555 }
556 }
557 };
558
559 let mut events = events_with_capacity(64);
560 let _ = with_reactor_mut(|r| r.poll(&mut events, Some(timeout_ms)));
561
562 #[cfg(unix)]
564 {
565 let mut buf = [0u8; 64];
566 loop {
567 let n = unsafe { libc::read(wake_rx, buf.as_mut_ptr() as *mut _, buf.len()) };
569 if n <= 0 {
570 break;
571 }
572 }
573
574 let signal_fired = events.iter().any(|ev| ev.token == SIGNAL_TOKEN && ev.readable);
575 if signal_fired {
576 on_signal_readable();
577 }
578 }
579
580 #[cfg(not(unix))]
581 let _ = wake_rx;
582}
583
584#[cfg(unix)]
586fn make_worker0_root_waker(wake_tx: i32) -> std::task::Waker {
587 use std::task::{RawWaker, RawWakerVTable};
588
589 unsafe fn clone_root(ptr: *const ()) -> RawWaker {
590 RawWaker::new(ptr, &ROOT_VTABLE)
591 }
592 unsafe fn wake_root(ptr: *const ()) {
593 let fd = ptr as usize as i32;
594 let b: u8 = 1;
595 libc::write(fd, &b as *const u8 as *const _, 1);
597 }
598 unsafe fn wake_root_by_ref(ptr: *const ()) {
599 wake_root(ptr);
600 }
601 unsafe fn drop_root(_: *const ()) {}
602
603 static ROOT_VTABLE: RawWakerVTable =
604 RawWakerVTable::new(clone_root, wake_root, wake_root_by_ref, drop_root);
605
606 let raw = std::task::RawWaker::new(wake_tx as usize as *const (), &ROOT_VTABLE);
607 unsafe { std::task::Waker::from_raw(raw) }
609}
610
611#[cfg(not(unix))]
612fn make_worker0_root_waker(_wake_tx: i32) -> std::task::Waker {
613 use std::task::{RawWaker, RawWakerVTable};
614 static NOOP_VTABLE: RawWakerVTable = RawWakerVTable::new(
615 |p| RawWaker::new(p, &NOOP_VTABLE),
616 |_| {},
617 |_| {},
618 |_| {},
619 );
620 unsafe { std::task::Waker::from_raw(RawWaker::new(std::ptr::null(), &NOOP_VTABLE)) }
621}
622
623pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
630where
631 F: Future + 'static,
632 F::Output: Send + 'static,
633{
634 let st_ptr = CURRENT_EXECUTOR.with(|c| c.get());
636 if !st_ptr.is_null() {
637 return unsafe { (*st_ptr).spawn(future) };
639 }
640
641 let mt_global = MT_GLOBAL_QUEUE.with(|c| c.get());
643 let mt_tasks = MT_TASKS.with(|c| c.get());
644
645 if !mt_global.is_null() && !mt_tasks.is_null() {
646 let (task, jh) = Task::new(future);
647 let key = Arc::as_ptr(&task.header) as usize;
648 let header_clone = Arc::clone(&task.header);
649 unsafe {
653 (*mt_tasks).lock().unwrap().insert(key, task);
654 (*mt_global).push_header(header_clone);
655 }
656 return jh;
657 }
658
659 panic!("spawn() called outside of block_on_with_spawn or block_on_multi context");
660}
661
662#[cfg(test)]
665mod tests {
666 use super::*;
667 use std::sync::atomic::{AtomicUsize, Ordering as Ord};
668
669 #[test]
670 fn block_on_simple_value() {
671 assert_eq!(block_on(async { 42u32 }), 42);
672 }
673
674 #[test]
675 fn block_on_chain_of_awaits() {
676 async fn double(x: u32) -> u32 {
677 x * 2
678 }
679 async fn compute() -> u32 {
680 double(double(3).await).await
681 }
682 assert_eq!(block_on(compute()), 12);
683 }
684
685 #[test]
686 fn block_on_string_output() {
687 assert_eq!(block_on(async { String::from("hello") }), "hello");
688 }
689
690 #[test]
691 fn spawn_and_join() {
692 let result = block_on_with_spawn(async {
693 let jh = spawn(async { 100u32 });
694 jh.await.unwrap()
695 });
696 assert_eq!(result, 100);
697 }
698
699 #[test]
700 fn spawn_multiple_and_join_all() {
701 let counter = Arc::new(AtomicUsize::new(0));
702 let c1 = counter.clone();
703 let c2 = counter.clone();
704 block_on_with_spawn(async move {
705 let jh1 = spawn(async move {
706 c1.fetch_add(1, Ord::SeqCst);
707 });
708 let jh2 = spawn(async move {
709 c2.fetch_add(1, Ord::SeqCst);
710 });
711 jh1.await.unwrap();
712 jh2.await.unwrap();
713 });
714 assert_eq!(counter.load(Ord::SeqCst), 2);
715 }
716
717 #[test]
718 fn join_handle_abort_returns_cancelled() {
719 use std::future::poll_fn;
720 use std::task::Poll as P;
721
722 let result = block_on_with_spawn(async {
723 let jh = spawn(async { poll_fn(|_| P::<()>::Pending).await });
724 jh.abort();
725 jh.await
726 });
727 assert!(matches!(result, Err(task::JoinError::Cancelled)));
728 }
729
730 #[test]
731 fn block_on_nested_spawn_ordering() {
732 let order = Arc::new(std::sync::Mutex::new(Vec::<u32>::new()));
733 let o1 = order.clone();
734 let o2 = order.clone();
735 block_on_with_spawn(async move {
736 let jh1 = spawn(async move {
737 o1.lock().unwrap().push(1);
738 });
739 let jh2 = spawn(async move {
740 o2.lock().unwrap().push(2);
741 });
742 jh1.await.unwrap();
743 jh2.await.unwrap();
744 });
745 let v = order.lock().unwrap();
746 assert_eq!(v.len(), 2);
747 }
748
749 #[test]
752 fn multi_thread_simple_spawn() {
753 let result = block_on_multi(
754 async {
755 let jh = spawn(async { 42u32 });
756 jh.await.unwrap()
757 },
758 2,
759 );
760 assert_eq!(result, 42);
761 }
762
763 #[test]
764 fn multi_thread_many_tasks_complete() {
765 const N: usize = 100;
766 let counter = Arc::new(AtomicUsize::new(0));
767
768 let c = counter.clone();
769 block_on_multi(
770 async move {
771 let mut handles = Vec::new();
772 for _ in 0..N {
773 let cc = c.clone();
774 handles.push(spawn(async move {
775 cc.fetch_add(1, Ord::SeqCst);
776 }));
777 }
778 for h in handles {
779 h.await.unwrap();
780 }
781 },
782 4,
783 );
784
785 assert_eq!(counter.load(Ord::SeqCst), N);
786 }
787
788 #[test]
789 fn multi_thread_falls_back_to_single_with_one_worker() {
790 let result = block_on_multi(async { 99u32 }, 1);
792 assert_eq!(result, 99);
793 }
794
795 #[test]
796 fn multi_thread_1000_tasks_4_workers() {
797 const N: usize = 1000;
798 let counter = Arc::new(AtomicUsize::new(0));
799
800 let c = counter.clone();
801 block_on_multi(
802 async move {
803 let mut handles = Vec::new();
804 for _ in 0..N {
805 let cc = c.clone();
806 handles.push(spawn(async move {
807 cc.fetch_add(1, Ord::SeqCst);
808 }));
809 }
810 for h in handles {
811 h.await.unwrap();
812 }
813 },
814 4,
815 );
816
817 assert_eq!(counter.load(Ord::SeqCst), N);
818 }
819
820 #[test]
823 fn block_on_returns_unit() {
824 block_on(async {});
825 }
826
827 #[test]
828 fn block_on_with_spawn_returns_unit() {
829 block_on_with_spawn(async {});
830 }
831
832 #[test]
833 fn spawn_1000_tasks_single_thread_all_complete() {
834 let counter = Arc::new(AtomicUsize::new(0));
835 let c = counter.clone();
836 block_on_with_spawn(async move {
837 let mut handles = Vec::new();
838 for _ in 0..1000 {
839 let cc = c.clone();
840 handles.push(spawn(async move {
841 cc.fetch_add(1, Ord::SeqCst);
842 }));
843 }
844 for h in handles {
845 h.await.unwrap();
846 }
847 });
848 assert_eq!(counter.load(Ord::SeqCst), 1000);
849 }
850
851 #[test]
852 fn spawn_in_spawned_task() {
853 let result = block_on_with_spawn(async {
854 let jh = spawn(async {
855 let inner = spawn(async { 42u32 });
856 inner.await.unwrap()
857 });
858 jh.await.unwrap()
859 });
860 assert_eq!(result, 42);
861 }
862
863 #[test]
864 fn join_handle_dropped_without_await_no_panic() {
865 block_on_with_spawn(async move {
867 drop(spawn(async move { 42u32 }));
869 let jh2 = spawn(async move { 99u32 });
873 jh2.await.unwrap()
874 });
875 }
877
878 #[test]
879 fn multi_thread_0_workers_fallback_to_single() {
880 let result = block_on_multi(async { 7u32 }, 0);
882 assert_eq!(result, 7);
883 }
884
885 #[test]
886 fn multi_thread_3_workers_all_join() {
887 let counter = Arc::new(AtomicUsize::new(0));
888 let c = counter.clone();
889 block_on_multi(
890 async move {
891 let mut handles = Vec::new();
892 for _ in 0..30 {
893 let cc = c.clone();
894 handles.push(spawn(async move {
895 cc.fetch_add(1, Ord::SeqCst);
896 }));
897 }
898 for h in handles {
899 h.await.unwrap();
900 }
901 },
902 3,
903 );
904 assert_eq!(counter.load(Ord::SeqCst), 30);
905 }
906
907 #[test]
908 fn multi_thread_nested_spawn() {
909 let result = block_on_multi(
910 async {
911 let jh = spawn(async {
912 let inner = spawn(async { 99u32 });
913 inner.await.unwrap()
914 });
915 jh.await.unwrap()
916 },
917 2,
918 );
919 assert_eq!(result, 99);
920 }
921
922 #[test]
923 fn block_on_with_spawn_sequential_ordering() {
924 let order = Arc::new(std::sync::Mutex::new(Vec::<u32>::new()));
925 let o = order.clone();
926 block_on_with_spawn(async move {
927 let o1 = o.clone();
928 let o2 = o.clone();
929 let o3 = o.clone();
930 let jh1 = spawn(async move {
931 o1.lock().unwrap().push(1);
932 });
933 let jh2 = spawn(async move {
934 o2.lock().unwrap().push(2);
935 });
936 let jh3 = spawn(async move {
937 o3.lock().unwrap().push(3);
938 });
939 jh1.await.unwrap();
940 jh2.await.unwrap();
941 jh3.await.unwrap();
942 });
943 assert_eq!(order.lock().unwrap().len(), 3);
944 }
945
946 #[test]
947 fn multi_thread_result_type_roundtrip() {
948 let result: Result<u32, String> = block_on_multi(
949 async {
950 let jh = spawn(async { Ok::<u32, String>(42) });
951 jh.await.unwrap()
952 },
953 2,
954 );
955 assert_eq!(result, Ok(42));
956 }
957
958 #[test]
959 fn block_on_returns_string() {
960 let s = block_on(async { String::from("hello world") });
961 assert_eq!(s, "hello world");
962 }
963
964 #[test]
965 fn block_on_returns_vec() {
966 let v = block_on(async { vec![1u32, 2, 3] });
967 assert_eq!(v, vec![1, 2, 3]);
968 }
969
970 #[test]
971 fn spawn_returns_computed_value() {
972 let result = block_on_with_spawn(async {
973 let jh = spawn(async { 2u32 * 21 });
974 jh.await.unwrap()
975 });
976 assert_eq!(result, 42);
977 }
978
979 #[test]
980 fn spawn_with_move_captures_outer() {
981 let data = Arc::new(AtomicUsize::new(55));
982 let d = data.clone();
983 let result = block_on_with_spawn(async move {
984 let jh = spawn(async move { d.load(Ord::SeqCst) });
985 jh.await.unwrap()
986 });
987 assert_eq!(result, 55);
988 }
989
990 #[test]
991 fn multi_thread_2_workers_count_50() {
992 let counter = Arc::new(AtomicUsize::new(0));
993 let c = counter.clone();
994 block_on_multi(
995 async move {
996 let mut handles = Vec::new();
997 for _ in 0..50 {
998 let cc = c.clone();
999 handles.push(spawn(async move {
1000 cc.fetch_add(1, Ord::SeqCst);
1001 }));
1002 }
1003 for h in handles {
1004 h.await.unwrap();
1005 }
1006 },
1007 2,
1008 );
1009 assert_eq!(counter.load(Ord::SeqCst), 50);
1010 }
1011
1012 #[test]
1013 fn spawn_chain_3_deep() {
1014 let result = block_on_with_spawn(async {
1015 let h1 = spawn(async {
1016 let h2 = spawn(async {
1017 let h3 = spawn(async { 7u32 });
1018 h3.await.unwrap() * 2
1019 });
1020 h2.await.unwrap() + 1
1021 });
1022 h1.await.unwrap()
1023 });
1024 assert_eq!(result, 15); }
1026
1027 #[test]
1028 fn block_on_returns_option() {
1029 let v = block_on(async { Some(42u32) });
1030 assert_eq!(v, Some(42));
1031 }
1032
1033 #[test]
1034 fn block_on_returns_tuple() {
1035 let (a, b) = block_on(async { (1u32, 2u32) });
1036 assert_eq!(a, 1);
1037 assert_eq!(b, 2);
1038 }
1039
1040 #[test]
1041 fn spawn_10_independent_tasks_all_increment() {
1042 let counter = Arc::new(AtomicUsize::new(0));
1043 let c = counter.clone();
1044 block_on_with_spawn(async move {
1045 let mut handles: Vec<_> = (0..10)
1046 .map(|_| {
1047 let cc = c.clone();
1048 spawn(async move {
1049 cc.fetch_add(1, Ord::SeqCst);
1050 })
1051 })
1052 .collect();
1053 for h in handles.drain(..) {
1054 h.await.unwrap();
1055 }
1056 });
1057 assert_eq!(counter.load(Ord::SeqCst), 10);
1058 }
1059
1060 #[test]
1061 fn multi_thread_5_workers_500_tasks() {
1062 let counter = Arc::new(AtomicUsize::new(0));
1063 let c = counter.clone();
1064 block_on_multi(
1065 async move {
1066 let handles: Vec<_> = (0..500)
1067 .map(|_| {
1068 let cc = c.clone();
1069 spawn(async move {
1070 cc.fetch_add(1, Ord::SeqCst);
1071 })
1072 })
1073 .collect();
1074 for h in handles {
1075 h.await.unwrap();
1076 }
1077 },
1078 5,
1079 );
1080 assert_eq!(counter.load(Ord::SeqCst), 500);
1081 }
1082
1083 #[test]
1084 fn block_on_with_spawn_arc_shared_across_tasks() {
1085 let shared = Arc::new(AtomicUsize::new(0));
1086 let s = shared.clone();
1087 block_on_with_spawn(async move {
1088 let s1 = s.clone();
1089 let s2 = s.clone();
1090 let h1 = spawn(async move { s1.fetch_add(10, Ord::SeqCst) });
1091 let h2 = spawn(async move { s2.fetch_add(20, Ord::SeqCst) });
1092 h1.await.unwrap();
1093 h2.await.unwrap();
1094 });
1095 let v = shared.load(Ord::SeqCst);
1096 assert_eq!(v, 30);
1097 }
1098
1099 #[test]
1100 fn abort_before_poll_returns_cancelled() {
1101 let result = block_on_with_spawn(async {
1102 let jh = spawn(async {
1103 std::future::poll_fn(|_| std::task::Poll::<()>::Pending).await
1105 });
1106 jh.abort();
1107 jh.await
1108 });
1109 assert!(matches!(result, Err(task::JoinError::Cancelled)));
1110 }
1111
1112 #[test]
1113 fn spawn_returns_unit_output() {
1114 block_on_with_spawn(async {
1115 let jh = spawn(async {});
1116 jh.await.unwrap(); });
1118 }
1119
1120 #[test]
1121 fn multi_thread_result_err_type_roundtrip() {
1122 let result: Result<u32, String> = block_on_multi(
1123 async {
1124 let jh = spawn(async { Err::<u32, String>("fail".to_string()) });
1125 jh.await.unwrap()
1126 },
1127 2,
1128 );
1129 assert_eq!(result, Err("fail".to_string()));
1130 }
1131
1132 #[test]
1133 fn block_on_f64_value() {
1134 let v: f64 = block_on(async { 3.14 });
1135 assert!((v - 3.14).abs() < 1e-10);
1136 }
1137
1138 #[test]
1139 fn spawn_computes_product_of_two_values() {
1140 let result = block_on_with_spawn(async {
1141 let a = spawn(async { 6u32 });
1142 let b = spawn(async { 7u32 });
1143 a.await.unwrap() * b.await.unwrap()
1144 });
1145 assert_eq!(result, 42);
1146 }
1147
1148 #[test]
1149 fn block_on_with_spawn_returns_bool() {
1150 let v = block_on_with_spawn(async {
1151 let jh = spawn(async { true });
1152 jh.await.unwrap()
1153 });
1154 assert!(v);
1155 }
1156
1157 #[test]
1158 fn multi_thread_6_workers_200_tasks() {
1159 let counter = Arc::new(AtomicUsize::new(0));
1160 let c = counter.clone();
1161 block_on_multi(
1162 async move {
1163 let handles: Vec<_> = (0..200)
1164 .map(|_| {
1165 let cc = c.clone();
1166 spawn(async move {
1167 cc.fetch_add(1, Ord::SeqCst);
1168 })
1169 })
1170 .collect();
1171 for h in handles {
1172 h.await.unwrap();
1173 }
1174 },
1175 6,
1176 );
1177 assert_eq!(counter.load(Ord::SeqCst), 200);
1178 }
1179
1180 #[test]
1181 fn spawn_task_with_string_return() {
1182 let result = block_on_with_spawn(async {
1183 let jh = spawn(async { "hello".to_string() });
1184 jh.await.unwrap()
1185 });
1186 assert_eq!(result, "hello");
1187 }
1188
1189 #[test]
1190 fn block_on_nested_async_fns() {
1191 async fn add(a: u32, b: u32) -> u32 {
1192 a + b
1193 }
1194 async fn multiply(a: u32, b: u32) -> u32 {
1195 a * b
1196 }
1197 let result = block_on(async {
1198 let sum = add(3, 4).await;
1199 multiply(sum, 2).await
1200 });
1201 assert_eq!(result, 14);
1202 }
1203
1204 #[test]
1205 fn block_on_complex_expression() {
1206 let result = block_on(async {
1207 let a = 10u32;
1208 let b = 20u32;
1209 a + b + 12
1210 });
1211 assert_eq!(result, 42);
1212 }
1213
1214 #[test]
1215 fn spawn_50_tasks_all_complete_with_counter() {
1216 let counter = Arc::new(AtomicUsize::new(0));
1217 let c = counter.clone();
1218 block_on_with_spawn(async move {
1219 let handles: Vec<_> = (0..50)
1220 .map(|_| {
1221 let cc = c.clone();
1222 spawn(async move { cc.fetch_add(1, Ord::SeqCst) })
1223 })
1224 .collect();
1225 for h in handles {
1226 h.await.unwrap();
1227 }
1228 });
1229 assert_eq!(counter.load(Ord::SeqCst), 50);
1230 }
1231
1232 #[test]
1233 fn multi_thread_join_handle_result_preserved() {
1234 let values: Vec<u32> = (0..8).collect();
1236 let results: Vec<u32> = block_on_multi(
1237 async {
1238 let handles: Vec<_> = (0..8u32)
1239 .map(|i| spawn(async move { i * i }))
1240 .collect();
1241 let mut results = Vec::new();
1242 for h in handles {
1243 results.push(h.await.unwrap());
1244 }
1245 results
1246 },
1247 4,
1248 );
1249 assert_eq!(results.len(), 8);
1250 for (i, &v) in results.iter().enumerate() {
1251 assert_eq!(v, (i as u32) * (i as u32));
1252 }
1253 }
1254
1255 #[test]
1256 fn block_on_with_spawn_multiple_spawn_waves() {
1257 let counter = Arc::new(AtomicUsize::new(0));
1259 let c = counter.clone();
1260 block_on_with_spawn(async move {
1261 let handles1: Vec<_> = (0..5)
1263 .map(|_| {
1264 let cc = c.clone();
1265 spawn(async move { cc.fetch_add(1, Ord::SeqCst) })
1266 })
1267 .collect();
1268 for h in handles1 {
1269 h.await.unwrap();
1270 }
1271 let handles2: Vec<_> = (0..5)
1273 .map(|_| {
1274 let cc = c.clone();
1275 spawn(async move { cc.fetch_add(1, Ord::SeqCst) })
1276 })
1277 .collect();
1278 for h in handles2 {
1279 h.await.unwrap();
1280 }
1281 });
1282 assert_eq!(counter.load(Ord::SeqCst), 10);
1283 }
1284}