async_resource/pool/
executor.rs1use futures_util::future::BoxFuture;
2
3pub trait Executor: Send + Sync {
4 fn spawn_ok(&self, task: BoxFuture<'static, ()>);
5}
6
7#[cfg(feature = "exec-multitask")]
8mod exec_multitask {
9 use super::BoxFuture;
10 use super::Executor;
11 use crate::pool::Sentinel;
12 use crate::util::thread_waker;
13 use multitask::Executor as MTExecutor;
14 use option_lock::{self, OptionLock};
15 use std::sync::{
16 atomic::{AtomicBool, Ordering},
17 Arc,
18 };
19 use std::thread;
20
21 const GLOBAL_INST: OptionLock<MultitaskExecutor> = OptionLock::new();
22
23 pub struct MultitaskExecutor {
24 inner: Sentinel<(
25 MTExecutor,
26 Vec<(thread::JoinHandle<()>, thread_waker::Waker)>,
27 )>,
28 }
29
30 impl MultitaskExecutor {
31 pub fn new(threads: usize) -> Self {
32 assert_ne!(threads, 0);
33 let ex = MTExecutor::new();
34 let running = Arc::new(AtomicBool::new(true));
35 let tickers = (0..threads)
36 .map(|_| {
37 let (waker, waiter) = thread_waker::pair();
38 let waker_copy = waker.clone();
39 let ticker = ex.ticker(move || waker_copy.wake());
40 let running = running.clone();
41 (
42 thread::spawn(move || loop {
43 if !ticker.tick() {
44 waiter.prepare_wait();
45 if !running.load(Ordering::Acquire) {
46 break;
47 }
48 waiter.wait();
49 }
50 }),
51 waker,
52 )
53 })
54 .collect();
55
56 Self {
57 inner: Sentinel::new(Arc::new((ex, tickers)), move |inner, count| {
58 if count == 0 {
59 running.store(false, Ordering::Release);
60 if let Ok((_, threads)) = Arc::try_unwrap(inner) {
61 for (thread, waker) in threads {
62 waker.wake();
63 thread.join().unwrap();
64 }
65 } else {
66 panic!("Error unwrapping executor state")
67 }
68 }
69 }),
70 }
71 }
72
73 pub fn global() -> Self {
74 loop {
75 match GLOBAL_INST.try_read() {
76 Ok(read) => break read.clone(),
77 Err(option_lock::ReadError::Empty) => {
78 if let Ok(mut guard) = GLOBAL_INST.try_lock() {
79 let inst = Self::new(5);
80 guard.replace(inst.clone());
81 break inst;
82 }
83 }
84 Err(_) => {}
85 }
86 std::thread::yield_now();
88 }
89 }
90 }
91
92 impl Clone for MultitaskExecutor {
93 fn clone(&self) -> Self {
94 Self {
95 inner: self.inner.clone(),
96 }
97 }
98 }
99
100 impl Executor for MultitaskExecutor {
101 fn spawn_ok(&self, task: BoxFuture<'static, ()>) {
102 self.inner.0.spawn(task).detach()
103 }
104 }
105}
106
107#[cfg(feature = "exec-multitask")]
108pub use exec_multitask::MultitaskExecutor;
109
110#[cfg(feature = "exec-multitask")]
111pub fn default_executor() -> Box<dyn Executor> {
112 Box::new(exec_multitask::MultitaskExecutor::global())
113}