async_resource/pool/
executor.rs

1use futures_util::future::BoxFuture;
2
3pub trait Executor: Send + Sync {
4    fn spawn_ok(&self, task: BoxFuture<'static, ()>);
5}
6
7#[cfg(feature = "exec-multitask")]
8mod exec_multitask {
9    use super::BoxFuture;
10    use super::Executor;
11    use crate::pool::Sentinel;
12    use crate::util::thread_waker;
13    use multitask::Executor as MTExecutor;
14    use option_lock::{self, OptionLock};
15    use std::sync::{
16        atomic::{AtomicBool, Ordering},
17        Arc,
18    };
19    use std::thread;
20
21    const GLOBAL_INST: OptionLock<MultitaskExecutor> = OptionLock::new();
22
23    pub struct MultitaskExecutor {
24        inner: Sentinel<(
25            MTExecutor,
26            Vec<(thread::JoinHandle<()>, thread_waker::Waker)>,
27        )>,
28    }
29
30    impl MultitaskExecutor {
31        pub fn new(threads: usize) -> Self {
32            assert_ne!(threads, 0);
33            let ex = MTExecutor::new();
34            let running = Arc::new(AtomicBool::new(true));
35            let tickers = (0..threads)
36                .map(|_| {
37                    let (waker, waiter) = thread_waker::pair();
38                    let waker_copy = waker.clone();
39                    let ticker = ex.ticker(move || waker_copy.wake());
40                    let running = running.clone();
41                    (
42                        thread::spawn(move || loop {
43                            if !ticker.tick() {
44                                waiter.prepare_wait();
45                                if !running.load(Ordering::Acquire) {
46                                    break;
47                                }
48                                waiter.wait();
49                            }
50                        }),
51                        waker,
52                    )
53                })
54                .collect();
55
56            Self {
57                inner: Sentinel::new(Arc::new((ex, tickers)), move |inner, count| {
58                    if count == 0 {
59                        running.store(false, Ordering::Release);
60                        if let Ok((_, threads)) = Arc::try_unwrap(inner) {
61                            for (thread, waker) in threads {
62                                waker.wake();
63                                thread.join().unwrap();
64                            }
65                        } else {
66                            panic!("Error unwrapping executor state")
67                        }
68                    }
69                }),
70            }
71        }
72
73        pub fn global() -> Self {
74            loop {
75                match GLOBAL_INST.try_read() {
76                    Ok(read) => break read.clone(),
77                    Err(option_lock::ReadError::Empty) => {
78                        if let Ok(mut guard) = GLOBAL_INST.try_lock() {
79                            let inst = Self::new(5);
80                            guard.replace(inst.clone());
81                            break inst;
82                        }
83                    }
84                    Err(_) => {}
85                }
86                // wait for another thread to populate the instance
87                std::thread::yield_now();
88            }
89        }
90    }
91
92    impl Clone for MultitaskExecutor {
93        fn clone(&self) -> Self {
94            Self {
95                inner: self.inner.clone(),
96            }
97        }
98    }
99
100    impl Executor for MultitaskExecutor {
101        fn spawn_ok(&self, task: BoxFuture<'static, ()>) {
102            self.inner.0.spawn(task).detach()
103        }
104    }
105}
106
107#[cfg(feature = "exec-multitask")]
108pub use exec_multitask::MultitaskExecutor;
109
110#[cfg(feature = "exec-multitask")]
111pub fn default_executor() -> Box<dyn Executor> {
112    Box::new(exec_multitask::MultitaskExecutor::global())
113}