unblock/
lib.rs

1//! A thread pool for isolating blocking in async programs.
2//!
3//! With `mt` feature the default number of threads (set to number of cpus) can be altered
4//! by setting `BLOCK_THREADS` environment variable with value.
5//!
6
7use std::cmp::max;
8use std::collections::VecDeque;
9use std::fmt::{Display, Formatter};
10use std::future::Future;
11use std::pin::Pin;
12use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
13use std::task::{Context, Poll};
14use std::thread;
15use std::thread::JoinHandle;
16
17use parking_lot::{Condvar, Mutex};
18use tokio::sync::oneshot::channel as oneshot;
19use tokio::sync::oneshot::Receiver;
20
21macro_rules! exec {
22    () => {{
23        let thread_limit = Executor::max_threads();
24        Executor {
25            queue: Mutex::new(VecDeque::with_capacity(max(thread_limit, 256))),
26            thread_count: AtomicUsize::new(0),
27            join: Mutex::new(Vec::with_capacity(thread_limit)),
28            shutdown: AtomicBool::new(false),
29            cvar: Condvar::new(),
30            thread_limit,
31        }
32    }};
33}
34
35#[cfg(feature = "lazy")]
36use once_cell::sync::Lazy;
37
38/// Lazy initialized global executor.
39#[cfg(feature = "lazy")]
40static EXECUTOR: Lazy<Executor> = Lazy::new(|| exec!());
41
42/// initialized global executor.
43#[ctor::ctor]
44#[cfg(not(miri))]
45#[cfg(not(feature = "lazy"))]
46static EXECUTOR: Executor = exec!();
47
48/// No-size error
49#[derive(Clone, Copy, PartialEq, Eq, Debug)]
50pub struct Error;
51
52impl Display for Error {
53    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
54        "Error".fmt(f)
55    }
56}
57
58impl std::error::Error for Error {}
59
60impl From<Error> for std::io::Error {
61    fn from(_: Error) -> std::io::Error {
62        std::io::Error::from(std::io::ErrorKind::Other)
63    }
64}
65
66pub trait Val: Send + 'static {}
67impl<T: Send + 'static> Val for T {}
68
69pub trait Fun<T: Val>: FnOnce() -> T + Val {}
70impl<T: Val, F: FnOnce() -> T + Val> Fun<T> for F {}
71
72type Runnable = Box<dyn FnOnce() + Send + 'static>;
73
74/// The unblock executor.
75struct Executor {
76    /// Inner queue
77    queue: Mutex<VecDeque<Runnable>>,
78
79    /// Number of spawned threads
80    thread_count: AtomicUsize,
81
82    /// Main thread waited
83    join: Mutex<Vec<JoinHandle<()>>>,
84
85    /// Shutdown threads can block all
86    shutdown: AtomicBool,
87
88    /// Used to put idle threads to sleep and wake them up when new work comes in.
89    cvar: Condvar,
90
91    /// Maximum number of threads in the pool
92    thread_limit: usize,
93}
94
95struct LiveMonitor;
96
97impl Drop for LiveMonitor {
98    fn drop(&mut self) {
99        if thread::panicking() {
100            EXECUTOR.thread_count.fetch_sub(1, Ordering::SeqCst);
101            EXECUTOR.grow_pool();
102        }
103    }
104}
105
106#[derive(Debug)]
107pub struct Join<T>(Receiver<T>);
108
109impl<T> Future for Join<T> {
110    type Output = Result<T, Error>;
111
112    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
113        Poll::Ready(
114            match Pin::new(&mut self.0).poll(cx) {
115                Poll::Ready(t) => t,
116                Poll::Pending => return Poll::Pending,
117            }
118            .map_err(|_| Error),
119        )
120    }
121}
122
123/// create Runnable, schedule and return join
124macro_rules! run {
125    ($f:ident in $_self:ident) => {
126        run!(inside $_self, $f, schedule)
127    };
128    ($f:ident 's in $_self:ident ) => {
129        run!(inside $_self, $f, schedules)
130    };
131    (inside $_self:ident, $f:ident, $m:ident) => {{
132        let (tx, rx) = oneshot();
133
134        $_self.$m(Box::new(move || {
135            let _ = tx.send($f());
136        }));
137        Join(rx)
138    }};
139}
140
141impl Executor {
142    #[inline(always)]
143    fn max_threads() -> usize {
144        #[allow(unused_mut, unused_assignments)]
145        let mut threads = 1usize;
146        #[cfg(feature = "mt")]
147        {
148            threads = match std::env::var("BLOCK_THREADS")
149                .ok()
150                .and_then(|x| x.parse().ok())
151            {
152                Some(num_cpus) => num_cpus,
153                None => num_cpus::get(),
154            };
155        };
156
157        threads
158    }
159
160    /// Spawns futures onto this executor.
161    #[inline(always)]
162    fn spawns<T: Val>(&'static self, f: impl IntoIterator<Item = impl Fun<T>>) -> Vec<Join<T>> {
163        let tasks = f.into_iter().map(|f| run!(f 's in self)).collect();
164        self.grow_pool();
165        tasks
166    }
167
168    /// Spawns a future onto this executor.
169    #[inline(always)]
170    fn spawn<T: Val>(&'static self, f: impl Fun<T>) -> Join<T> {
171        run!(f in self)
172    }
173
174    /// Runs the main loop on the current thread.
175    ///
176    /// This function runs unblock tasks until it becomes idle.
177    fn main_loop(&'static self) {
178        let _live = LiveMonitor;
179        let mut queue = self.queue.lock();
180        loop {
181            // Run tasks in the queue.
182            while let Some(runnable) = queue.pop_front() {
183                drop(queue);
184                runnable();
185                queue = self.queue.lock();
186            }
187
188            if self.shutdown.load(Ordering::Relaxed) {
189                break;
190            }
191
192            // Put the thread to sleep until another task is scheduled.
193            self.cvar.wait(&mut queue);
194        }
195    }
196    /// Schedules a runnable task for execution.
197    #[inline(always)]
198    fn schedules(&'static self, runnable: Runnable) {
199        if self.shutdown.load(Ordering::Relaxed) {
200            return;
201        }
202        self.queue.lock().push_back(runnable);
203
204        // Notify a sleeping thread
205        self.cvar.notify_one();
206    }
207
208    /// Schedules a runnable task for execution and grow thread pool if needed
209    #[inline(always)]
210    fn schedule(&'static self, runnable: Runnable) {
211        self.schedules(runnable);
212        // spawn more threads if needed.
213        self.grow_pool();
214    }
215
216    /// Spawns more block threads
217    #[inline(always)]
218    fn grow_pool(&'static self) {
219        while self.thread_count.load(Ordering::SeqCst) < self.thread_limit
220            && !self.shutdown.load(Ordering::Relaxed)
221        {
222            let id = self.thread_count.fetch_add(1, Ordering::Relaxed);
223
224            // Spawn the new thread.
225            self.join.lock().push(
226                thread::Builder::new()
227                    .name(format!("unblock-{}", id))
228                    .spawn(move || self.main_loop())
229                    .unwrap(),
230            );
231        }
232    }
233
234    /// Put executor in shutdown
235    fn drop(&'static self) {
236        self.shutdown.store(true, Ordering::SeqCst);
237        self.queue.lock().drain(..);
238        self.cvar.notify_all();
239        for j in self.join.lock().drain(..) {
240            let _ = j.join();
241        }
242    }
243}
244
245#[ctor::dtor]
246fn des() {
247    EXECUTOR.drop();
248}
249
250/// Runs unblock code on a thread pool and return a future
251pub fn unblock<T: Val>(f: impl Fun<T>) -> Join<T> {
252    EXECUTOR.spawn(f)
253}
254
255/// Runs multiple unblock code on a thread pool and return futures in order
256pub fn unblocks<T: Val>(f: impl IntoIterator<Item = impl Fun<T>>) -> Vec<Join<T>> {
257    EXECUTOR.spawns(f)
258}