actix_dynamic_threadpool/
lib.rs

1//! Dynamic thread pool for blocking operations
2//!
3//! The pool would lazily generate thread according to the workload and spawn up to a total amount
4//! of you machine's `logical CPU cores * 5` threads. Any spawned threads kept idle for 30 seconds
5//! would be recycled and de spawned.
6//!
7//! *. Settings are configuable through env variables.
8//!
9//! # Example:
10//! ```rust
11//! use std::env::set_var;
12//!
13//! #[actix_rt::main]
14//! async fn main() {
15//!     // Optional: Set the max thread count for the blocking pool.
16//!     set_var("ACTIX_THREADPOOL", "30");
17//!     // Optional: Set the min thread count for the blocking pool.
18//!     set_var("ACTIX_THREADPOOL_MIN", "1");
19//!     // Optional: Set the timeout duration IN SECONDS for the blocking pool's idle threads.
20//!     set_var("ACTIX_THREADPOOL_TIMEOUT", "30");
21//!
22//!     let future = actix_dynamic_threadpool::run(|| {
23//!         /* Some blocking code with a Result<T, E> as return type */
24//!         Ok::<usize, ()>(1usize)
25//!     });
26//!
27//!     /*
28//!         We can await on this blocking code and NOT block our runtime.
29//!         When we waiting our actix runtime can switch to other async tasks.
30//!     */
31//!
32//!     let result: Result<usize, actix_dynamic_threadpool::BlockingError<()>> = future.await;
33//!
34//!     assert_eq!(1usize, result.unwrap())
35//! }
36//! ```
37
38#![forbid(unsafe_code)]
39
40pub(crate) mod builder;
41pub(crate) mod error;
42pub(crate) mod pool;
43pub(crate) mod pool_inner;
44
45use crate::{error::ThreadPoolError, pool::ThreadPool};
46
47use core::fmt;
48use core::future::Future;
49use core::pin::Pin;
50use core::task::{Context, Poll};
51use core::time::Duration;
52
53use derive_more::Display;
54use futures_channel::oneshot;
55use parking_lot::Mutex;
56
57/// Env variable for default cpu pool max size.
58const ENV_MAX_THREADS: &str = "ACTIX_THREADPOOL";
59
60/// Env variable for default cpu pool min size.
61const ENV_MIN_THREADS: &str = "ACTIX_THREADPOOL_MIN";
62
63/// Env variable for default thread idle timeout duration.
64const ENV_IDLE_TIMEOUT: &str = "ACTIX_THREADPOOL_TIMEOUT";
65
66lazy_static::lazy_static! {
67    pub(crate) static ref POOL: Mutex<ThreadPool> = {
68        let max = parse_env(ENV_MAX_THREADS).unwrap_or_else(|| num_cpus::get() * 5);
69        let min = parse_env(ENV_MIN_THREADS).unwrap_or(1);
70        let dur = parse_env(ENV_IDLE_TIMEOUT).unwrap_or(30);
71
72        Mutex::new(ThreadPool::builder()
73            .thread_name("actix-dynamic-threadpool")
74            .max_threads(max)
75            .min_threads(min)
76            .idle_timeout(Duration::from_secs(dur))
77            .build())
78    };
79}
80
81thread_local! {
82    static POOL_LOCAL: ThreadPool = {
83        POOL.lock().clone()
84    }
85}
86
87fn parse_env<R: std::str::FromStr>(env: &str) -> Option<R> {
88    std::env::var(env).ok().and_then(|val| {
89        val.parse()
90            .map_err(|_| log::warn!("Can not parse {} value, using default", env))
91            .ok()
92    })
93}
94
95/// Blocking operation execution error
96#[derive(Debug, Display)]
97pub enum BlockingError<E: fmt::Debug> {
98    #[display(fmt = "{:?}", _0)]
99    Error(E),
100    #[display(fmt = "Thread pool is gone")]
101    Canceled,
102}
103
104impl<E: fmt::Debug> std::error::Error for BlockingError<E> {}
105
106/// Execute blocking function on a thread pool, returns future that resolves
107/// to result of the function execution.
108pub fn run<F, I, E>(f: F) -> CpuFuture<I, E>
109where
110    F: FnOnce() -> Result<I, E> + Send + 'static,
111    I: Send + 'static,
112    E: Send + fmt::Debug + 'static,
113{
114    let (tx, rx) = oneshot::channel();
115
116    POOL_LOCAL.with(|pool| {
117        let _ = pool.execute(move || {
118            if !tx.is_canceled() {
119                let _ = tx.send(f());
120            }
121        });
122    });
123
124    CpuFuture { rx }
125}
126
127/// Blocking operation completion future. It resolves with results
128/// of blocking function execution.
129pub struct CpuFuture<I, E> {
130    rx: oneshot::Receiver<Result<I, E>>,
131}
132
133impl<I, E: fmt::Debug> Future for CpuFuture<I, E> {
134    type Output = Result<I, BlockingError<E>>;
135
136    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
137        let rx = Pin::new(&mut self.rx);
138        let res = match rx.poll(cx) {
139            Poll::Pending => return Poll::Pending,
140            Poll::Ready(res) => res
141                .map_err(|_| BlockingError::Canceled)
142                .and_then(|res| res.map_err(BlockingError::Error)),
143        };
144        Poll::Ready(res)
145    }
146}
147
148#[cfg(test)]
149mod test {
150    use super::*;
151
152    use core::time::Duration;
153
154    use std::thread;
155
156    #[test]
157    fn init() {
158        let pool = ThreadPool::builder()
159            .max_threads(12)
160            .min_threads(7)
161            .thread_name("test-pool")
162            .idle_timeout(Duration::from_secs(5))
163            .build();
164
165        let state = pool.state();
166
167        assert_eq!(state.active_threads, 7);
168        assert_eq!(state.max_threads, 12);
169        assert_eq!(state.name, Some("test-pool-worker"));
170    }
171
172    #[test]
173    fn recycle() {
174        let pool = ThreadPool::builder()
175            .max_threads(12)
176            .min_threads(3)
177            .thread_name("test-pool")
178            .idle_timeout(Duration::from_millis(300))
179            .build();
180
181        (0..1024).for_each(|_| {
182            let _ = pool.execute(|| {
183                thread::sleep(Duration::from_nanos(1));
184            });
185        });
186
187        let state = pool.state();
188        assert_eq!(12, state.active_threads);
189
190        thread::sleep(Duration::from_secs(4));
191
192        // threads have been idle for 3 seconds so we go back to min_threads.
193
194        let state = pool.state();
195        assert_eq!(3, state.active_threads);
196    }
197
198    #[test]
199    fn panic_recover() {
200        let pool = ThreadPool::builder()
201            .max_threads(12)
202            .min_threads(3)
203            .thread_name("test-pool")
204            .build();
205
206        let _ = pool.execute(|| {
207            panic!("This is a on purpose panic for testing panic recovery");
208        });
209
210        thread::sleep(Duration::from_millis(100));
211
212        // we spawn new thread when a panic happen(if we are going below min_threads)
213        let state = pool.state();
214        assert_eq!(3, state.active_threads);
215
216        (0..128).for_each(|_| {
217            let _ = pool.execute(|| {
218                thread::sleep(Duration::from_millis(1));
219            });
220        });
221
222        let _ = pool.execute(|| {
223            panic!("This is a on purpose panic for testing panic recovery");
224        });
225
226        thread::sleep(Duration::from_millis(100));
227
228        // We didn't try to spawn new thread after previous panic
229        // because we are still above the min_threads count
230        let state = pool.state();
231        assert_eq!(11, state.active_threads);
232    }
233
234    #[test]
235    fn no_eager_spawn() {
236        let pool = ThreadPool::builder()
237            .max_threads(12)
238            .thread_name("test-pool")
239            .build();
240
241        let _a = pool.execute(|| {
242            thread::sleep(Duration::from_millis(1));
243        });
244
245        thread::sleep(Duration::from_millis(100));
246
247        let _a = pool.execute(|| {
248            thread::sleep(Duration::from_millis(1));
249        });
250
251        thread::sleep(Duration::from_millis(100));
252
253        let state = pool.state();
254
255        assert_eq!(1, state.active_threads);
256    }
257}