actix_dynamic_threadpool/
lib.rs1#![forbid(unsafe_code)]
39
40pub(crate) mod builder;
41pub(crate) mod error;
42pub(crate) mod pool;
43pub(crate) mod pool_inner;
44
45use crate::{error::ThreadPoolError, pool::ThreadPool};
46
47use core::fmt;
48use core::future::Future;
49use core::pin::Pin;
50use core::task::{Context, Poll};
51use core::time::Duration;
52
53use derive_more::Display;
54use futures_channel::oneshot;
55use parking_lot::Mutex;
56
57const ENV_MAX_THREADS: &str = "ACTIX_THREADPOOL";
59
60const ENV_MIN_THREADS: &str = "ACTIX_THREADPOOL_MIN";
62
63const ENV_IDLE_TIMEOUT: &str = "ACTIX_THREADPOOL_TIMEOUT";
65
66lazy_static::lazy_static! {
67 pub(crate) static ref POOL: Mutex<ThreadPool> = {
68 let max = parse_env(ENV_MAX_THREADS).unwrap_or_else(|| num_cpus::get() * 5);
69 let min = parse_env(ENV_MIN_THREADS).unwrap_or(1);
70 let dur = parse_env(ENV_IDLE_TIMEOUT).unwrap_or(30);
71
72 Mutex::new(ThreadPool::builder()
73 .thread_name("actix-dynamic-threadpool")
74 .max_threads(max)
75 .min_threads(min)
76 .idle_timeout(Duration::from_secs(dur))
77 .build())
78 };
79}
80
81thread_local! {
82 static POOL_LOCAL: ThreadPool = {
83 POOL.lock().clone()
84 }
85}
86
87fn parse_env<R: std::str::FromStr>(env: &str) -> Option<R> {
88 std::env::var(env).ok().and_then(|val| {
89 val.parse()
90 .map_err(|_| log::warn!("Can not parse {} value, using default", env))
91 .ok()
92 })
93}
94
95#[derive(Debug, Display)]
97pub enum BlockingError<E: fmt::Debug> {
98 #[display(fmt = "{:?}", _0)]
99 Error(E),
100 #[display(fmt = "Thread pool is gone")]
101 Canceled,
102}
103
104impl<E: fmt::Debug> std::error::Error for BlockingError<E> {}
105
106pub fn run<F, I, E>(f: F) -> CpuFuture<I, E>
109where
110 F: FnOnce() -> Result<I, E> + Send + 'static,
111 I: Send + 'static,
112 E: Send + fmt::Debug + 'static,
113{
114 let (tx, rx) = oneshot::channel();
115
116 POOL_LOCAL.with(|pool| {
117 let _ = pool.execute(move || {
118 if !tx.is_canceled() {
119 let _ = tx.send(f());
120 }
121 });
122 });
123
124 CpuFuture { rx }
125}
126
127pub struct CpuFuture<I, E> {
130 rx: oneshot::Receiver<Result<I, E>>,
131}
132
133impl<I, E: fmt::Debug> Future for CpuFuture<I, E> {
134 type Output = Result<I, BlockingError<E>>;
135
136 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
137 let rx = Pin::new(&mut self.rx);
138 let res = match rx.poll(cx) {
139 Poll::Pending => return Poll::Pending,
140 Poll::Ready(res) => res
141 .map_err(|_| BlockingError::Canceled)
142 .and_then(|res| res.map_err(BlockingError::Error)),
143 };
144 Poll::Ready(res)
145 }
146}
147
148#[cfg(test)]
149mod test {
150 use super::*;
151
152 use core::time::Duration;
153
154 use std::thread;
155
156 #[test]
157 fn init() {
158 let pool = ThreadPool::builder()
159 .max_threads(12)
160 .min_threads(7)
161 .thread_name("test-pool")
162 .idle_timeout(Duration::from_secs(5))
163 .build();
164
165 let state = pool.state();
166
167 assert_eq!(state.active_threads, 7);
168 assert_eq!(state.max_threads, 12);
169 assert_eq!(state.name, Some("test-pool-worker"));
170 }
171
172 #[test]
173 fn recycle() {
174 let pool = ThreadPool::builder()
175 .max_threads(12)
176 .min_threads(3)
177 .thread_name("test-pool")
178 .idle_timeout(Duration::from_millis(300))
179 .build();
180
181 (0..1024).for_each(|_| {
182 let _ = pool.execute(|| {
183 thread::sleep(Duration::from_nanos(1));
184 });
185 });
186
187 let state = pool.state();
188 assert_eq!(12, state.active_threads);
189
190 thread::sleep(Duration::from_secs(4));
191
192 let state = pool.state();
195 assert_eq!(3, state.active_threads);
196 }
197
198 #[test]
199 fn panic_recover() {
200 let pool = ThreadPool::builder()
201 .max_threads(12)
202 .min_threads(3)
203 .thread_name("test-pool")
204 .build();
205
206 let _ = pool.execute(|| {
207 panic!("This is a on purpose panic for testing panic recovery");
208 });
209
210 thread::sleep(Duration::from_millis(100));
211
212 let state = pool.state();
214 assert_eq!(3, state.active_threads);
215
216 (0..128).for_each(|_| {
217 let _ = pool.execute(|| {
218 thread::sleep(Duration::from_millis(1));
219 });
220 });
221
222 let _ = pool.execute(|| {
223 panic!("This is a on purpose panic for testing panic recovery");
224 });
225
226 thread::sleep(Duration::from_millis(100));
227
228 let state = pool.state();
231 assert_eq!(11, state.active_threads);
232 }
233
234 #[test]
235 fn no_eager_spawn() {
236 let pool = ThreadPool::builder()
237 .max_threads(12)
238 .thread_name("test-pool")
239 .build();
240
241 let _a = pool.execute(|| {
242 thread::sleep(Duration::from_millis(1));
243 });
244
245 thread::sleep(Duration::from_millis(100));
246
247 let _a = pool.execute(|| {
248 thread::sleep(Duration::from_millis(1));
249 });
250
251 thread::sleep(Duration::from_millis(100));
252
253 let state = pool.state();
254
255 assert_eq!(1, state.active_threads);
256 }
257}