1use std::cmp::max;
8use std::collections::VecDeque;
9use std::fmt::{Display, Formatter};
10use std::future::Future;
11use std::pin::Pin;
12use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
13use std::task::{Context, Poll};
14use std::thread;
15use std::thread::JoinHandle;
16
17use parking_lot::{Condvar, Mutex};
18use tokio::sync::oneshot::channel as oneshot;
19use tokio::sync::oneshot::Receiver;
20
21macro_rules! exec {
22 () => {{
23 let thread_limit = Executor::max_threads();
24 Executor {
25 queue: Mutex::new(VecDeque::with_capacity(max(thread_limit, 256))),
26 thread_count: AtomicUsize::new(0),
27 join: Mutex::new(Vec::with_capacity(thread_limit)),
28 shutdown: AtomicBool::new(false),
29 cvar: Condvar::new(),
30 thread_limit,
31 }
32 }};
33}
34
35#[cfg(feature = "lazy")]
36use once_cell::sync::Lazy;
37
38#[cfg(feature = "lazy")]
40static EXECUTOR: Lazy<Executor> = Lazy::new(|| exec!());
41
42#[ctor::ctor]
44#[cfg(not(miri))]
45#[cfg(not(feature = "lazy"))]
46static EXECUTOR: Executor = exec!();
47
48#[derive(Clone, Copy, PartialEq, Eq, Debug)]
50pub struct Error;
51
52impl Display for Error {
53 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
54 "Error".fmt(f)
55 }
56}
57
58impl std::error::Error for Error {}
59
60impl From<Error> for std::io::Error {
61 fn from(_: Error) -> std::io::Error {
62 std::io::Error::from(std::io::ErrorKind::Other)
63 }
64}
65
66pub trait Val: Send + 'static {}
67impl<T: Send + 'static> Val for T {}
68
69pub trait Fun<T: Val>: FnOnce() -> T + Val {}
70impl<T: Val, F: FnOnce() -> T + Val> Fun<T> for F {}
71
72type Runnable = Box<dyn FnOnce() + Send + 'static>;
73
74struct Executor {
76 queue: Mutex<VecDeque<Runnable>>,
78
79 thread_count: AtomicUsize,
81
82 join: Mutex<Vec<JoinHandle<()>>>,
84
85 shutdown: AtomicBool,
87
88 cvar: Condvar,
90
91 thread_limit: usize,
93}
94
95struct LiveMonitor;
96
97impl Drop for LiveMonitor {
98 fn drop(&mut self) {
99 if thread::panicking() {
100 EXECUTOR.thread_count.fetch_sub(1, Ordering::SeqCst);
101 EXECUTOR.grow_pool();
102 }
103 }
104}
105
106#[derive(Debug)]
107pub struct Join<T>(Receiver<T>);
108
109impl<T> Future for Join<T> {
110 type Output = Result<T, Error>;
111
112 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
113 Poll::Ready(
114 match Pin::new(&mut self.0).poll(cx) {
115 Poll::Ready(t) => t,
116 Poll::Pending => return Poll::Pending,
117 }
118 .map_err(|_| Error),
119 )
120 }
121}
122
123macro_rules! run {
125 ($f:ident in $_self:ident) => {
126 run!(inside $_self, $f, schedule)
127 };
128 ($f:ident 's in $_self:ident ) => {
129 run!(inside $_self, $f, schedules)
130 };
131 (inside $_self:ident, $f:ident, $m:ident) => {{
132 let (tx, rx) = oneshot();
133
134 $_self.$m(Box::new(move || {
135 let _ = tx.send($f());
136 }));
137 Join(rx)
138 }};
139}
140
141impl Executor {
142 #[inline(always)]
143 fn max_threads() -> usize {
144 #[allow(unused_mut, unused_assignments)]
145 let mut threads = 1usize;
146 #[cfg(feature = "mt")]
147 {
148 threads = match std::env::var("BLOCK_THREADS")
149 .ok()
150 .and_then(|x| x.parse().ok())
151 {
152 Some(num_cpus) => num_cpus,
153 None => num_cpus::get(),
154 };
155 };
156
157 threads
158 }
159
160 #[inline(always)]
162 fn spawns<T: Val>(&'static self, f: impl IntoIterator<Item = impl Fun<T>>) -> Vec<Join<T>> {
163 let tasks = f.into_iter().map(|f| run!(f 's in self)).collect();
164 self.grow_pool();
165 tasks
166 }
167
168 #[inline(always)]
170 fn spawn<T: Val>(&'static self, f: impl Fun<T>) -> Join<T> {
171 run!(f in self)
172 }
173
174 fn main_loop(&'static self) {
178 let _live = LiveMonitor;
179 let mut queue = self.queue.lock();
180 loop {
181 while let Some(runnable) = queue.pop_front() {
183 drop(queue);
184 runnable();
185 queue = self.queue.lock();
186 }
187
188 if self.shutdown.load(Ordering::Relaxed) {
189 break;
190 }
191
192 self.cvar.wait(&mut queue);
194 }
195 }
196 #[inline(always)]
198 fn schedules(&'static self, runnable: Runnable) {
199 if self.shutdown.load(Ordering::Relaxed) {
200 return;
201 }
202 self.queue.lock().push_back(runnable);
203
204 self.cvar.notify_one();
206 }
207
208 #[inline(always)]
210 fn schedule(&'static self, runnable: Runnable) {
211 self.schedules(runnable);
212 self.grow_pool();
214 }
215
216 #[inline(always)]
218 fn grow_pool(&'static self) {
219 while self.thread_count.load(Ordering::SeqCst) < self.thread_limit
220 && !self.shutdown.load(Ordering::Relaxed)
221 {
222 let id = self.thread_count.fetch_add(1, Ordering::Relaxed);
223
224 self.join.lock().push(
226 thread::Builder::new()
227 .name(format!("unblock-{}", id))
228 .spawn(move || self.main_loop())
229 .unwrap(),
230 );
231 }
232 }
233
234 fn drop(&'static self) {
236 self.shutdown.store(true, Ordering::SeqCst);
237 self.queue.lock().drain(..);
238 self.cvar.notify_all();
239 for j in self.join.lock().drain(..) {
240 let _ = j.join();
241 }
242 }
243}
244
245#[ctor::dtor]
246fn des() {
247 EXECUTOR.drop();
248}
249
250pub fn unblock<T: Val>(f: impl Fun<T>) -> Join<T> {
252 EXECUTOR.spawn(f)
253}
254
255pub fn unblocks<T: Val>(f: impl IntoIterator<Item = impl Fun<T>>) -> Vec<Join<T>> {
257 EXECUTOR.spawns(f)
258}