blocking_permit/dispatch_pool.rs
1use std::collections::VecDeque;
2use std::fmt;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::thread;
6use std::panic::{catch_unwind, AssertUnwindSafe};
7
8use tao_log::{error, trace};
9use parking_lot::{Condvar, Mutex};
10
11/// A specialized thread pool and queue for dispatching _blocking_
12/// (synchronous, long running) operations.
13///
14/// This pool is not an _executor_, has no _waking_ facilities, etc. As
15/// compared with other thread pools supporting `spawn`, or `spawn_blocking`
16/// in _tokio_, here also called [`dispatch()`](crate::dispatch()) or
17/// [`dispatch_rx()`](crate::dispatch_rx()) this pool has some unique features:
18///
19/// * A configurable, fixed number of threads created before return from
20/// construction and terminated on `Drop::drop`. Consistent memory
21/// footprint. No warmup required. No per-task thread management overhead.
22///
23/// * Configurable panic handling policy: Either catches and logs dispatch
24/// panics, or aborts the process, on panic unwind.
25///
26/// * Supports fixed (bounded) or unbounded queue length.
27///
28/// * When the queue is bounded and becomes full, [`DispatchPool::spawn`] pops
29/// the oldest operation off the queue before pushing the newest passed
30/// operation, to ensure space while holding a lock. Then as a fallback it
31/// runs the old operation. Thus we enlist calling threads once the queue
32/// reaches limit, but operation order (at least from perspective of a single
33/// thread) is preserved.
34///
35/// ## Usage
36///
37/// By default, the pool uses an unbounded queue, with the assumption that
38/// resource/capacity is externally constrained. Once constructed, a fixed
39/// number of threads are spawned and the instance acts as a handle to the
40/// pool. This may be inexpensively cloned for additional handles to the same
41/// pool.
42///
43/// See [`DispatchPoolBuilder`] for an extensive set of options.
44///
45/// ### With tokio's threaded runtime
46///
47/// One can schedule a clone of the `DispatchPool` (handle) on each tokio
48/// runtime thread (tokio's _rt-threaded_ feature).
49///
50#[cfg_attr(feature = "tokio-threaded", doc = r##"
51``` rust
52use blocking_permit::{
53 DispatchPool, register_dispatch_pool, deregister_dispatch_pool
54};
55
56let pool = DispatchPool::builder().create();
57
58let mut rt = tokio::runtime::Builder::new_multi_thread()
59 .on_thread_start(move || {
60 register_dispatch_pool(pool.clone());
61 })
62 .on_thread_stop(|| {
63 deregister_dispatch_pool();
64 })
65 .build()
66 .unwrap();
67```
68"##)]
69
70#[derive(Clone)]
71pub struct DispatchPool {
72 sender: Arc<Sender>,
73 ignore_panics: bool,
74}
75
76// `Arc`s may look a bit redundant above and below, but `Sender` has the `Drop`
77// implementation, and counter and ws are used/moved independently in the work
78// loop.
79
80#[derive(Debug)]
81struct Sender {
82 ws: Arc<WorkState>,
83 counter: Arc<AtomicUsize>,
84}
85
86type AroundFn = Arc<dyn Fn(usize) + Send + Sync>;
87
88/// A builder for [`DispatchPool`] supporting an extenstive set of
89/// configuration options.
90pub struct DispatchPoolBuilder {
91 pool_size: Option<usize>,
92 queue_length: Option<usize>,
93 stack_size: Option<usize>,
94 name_prefix: Option<String>,
95 after_start: Option<AroundFn>,
96 before_stop: Option<AroundFn>,
97 ignore_panics: bool
98}
99
100enum Work {
101 Unit(Box<dyn FnOnce() + Send>),
102 SafeUnit(AssertUnwindSafe<Box<dyn FnOnce() + Send>>),
103 Terminate,
104}
105
106impl DispatchPool {
107 /// Create new pool using defaults.
108 pub fn new() -> DispatchPool {
109 DispatchPoolBuilder::default().create()
110 }
111
112 /// Create a new builder for configuring a new pool.
113 pub fn builder() -> DispatchPoolBuilder {
114 DispatchPoolBuilder::new()
115 }
116
117 /// Enqueue a blocking operation to be executed.
118 ///
119 /// This first attempts to send to the associated queue, which will always
120 /// succeed if _unbounded_, e.g. no [`DispatchPoolBuilder::queue_length`]
121 /// is set, the default. If however the queue is _bounded_ and at capacity,
122 /// then this task will be pushed after taking the oldest task, which is
123 /// then run on the calling thread.
124 pub fn spawn(&self, f: Box<dyn FnOnce() + Send>) {
125 let work = if self.ignore_panics {
126 Work::SafeUnit(AssertUnwindSafe(f))
127 } else {
128 Work::Unit(f)
129 };
130
131 let work = self.sender.send(work);
132
133 match work {
134 None => {},
135 // Full, so run here. Panics will propagate.
136 Some(Work::Unit(f)) => f(),
137 // Full, so run here. Ignore panic unwinds.
138 Some(Work::SafeUnit(af)) => {
139 if catch_unwind(af).is_err() {
140 error!("DispatchPool: panic on calling thread \
141 was caught and ignored");
142 }
143 }
144 _ => {
145 // Safety: `send` will never return anything but Unit or
146 // SafeUnit.
147 unsafe { std::hint::unreachable_unchecked() }
148 }
149
150 }
151 }
152}
153
154// Guard type that can increment thread count, then on Drop: decrements count
155// and runs any before_stop function on thread.
156struct Turnstile {
157 index: usize,
158 counter: Arc<AtomicUsize>,
159 before_stop: Option<AroundFn>
160}
161
162impl Turnstile {
163 fn increment(&self) {
164 self.counter.fetch_add(1, Ordering::SeqCst);
165 }
166}
167
168impl Drop for Turnstile {
169 fn drop(&mut self) {
170 trace!("Turnstile::drop entered");
171 self.counter.fetch_sub(1, Ordering::SeqCst);
172 if let Some(bsfn) = &self.before_stop {
173 bsfn(self.index);
174 }
175 }
176}
177
178// Aborts the process if dropped
179struct AbortOnPanic;
180
181impl Drop for AbortOnPanic {
182 fn drop(&mut self) {
183 error!("DispatchPool: aborting due to panic on dispatch thread");
184 tao_log::log::logger().flush();
185 std::process::abort();
186 }
187}
188
189struct WorkState {
190 queue: Mutex<VecDeque<Work>>,
191 limit: usize,
192 condvar: Condvar,
193}
194
195impl fmt::Debug for WorkState {
196 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
197 f.debug_struct("WorkState")
198 .finish()
199 }
200}
201
202fn work(
203 index: usize,
204 counter: Arc<AtomicUsize>,
205 after_start: Option<AroundFn>,
206 before_stop: Option<AroundFn>,
207 ws: Arc<WorkState>)
208{
209 if let Some(ref asfn) = after_start {
210 asfn(index);
211 }
212 drop(after_start);
213
214 {
215 let ts = Turnstile { index, counter, before_stop };
216 let ws = ws; // moved to here so it drops before ts.
217 let mut lock = ws.queue.lock();
218 ts.increment();
219 'worker: loop {
220 while let Some(w) = lock.pop_front() {
221 drop(lock);
222 match w {
223 Work::Unit(bfn) => {
224 let abort = AbortOnPanic;
225 bfn();
226 std::mem::forget(abort);
227 }
228 Work::SafeUnit(abfn) => {
229 if catch_unwind(abfn).is_err() {
230 error!("DispatchPool: panic on pool \
231 was caught and ignored");
232 }
233 }
234 Work::Terminate => break 'worker,
235 }
236 lock = ws.queue.lock();
237 }
238
239 ws.condvar.wait(&mut lock);
240 }
241 }
242}
243
244impl Default for DispatchPool {
245 fn default() -> Self {
246 Self::new()
247 }
248}
249
250impl Sender {
251 // Send new work, possibly returning some, different, older work if the
252 // queue is bound and its limit is reached. If queue has limit 0, then
253 // always return the work given.
254 fn send(&self, work: Work) -> Option<Work> {
255 let mut queue = self.ws.queue.lock();
256 let qlen = queue.len();
257 if matches!(work, Work::Terminate) || qlen < self.ws.limit {
258 queue.push_back(work);
259 self.ws.condvar.notify_one();
260 None
261 } else if qlen > 0 && qlen == self.ws.limit {
262 // Avoid the swap if front (oldest) element is a `Terminate`
263 if let Some(&Work::Terminate) = queue.front() {
264 Some(work)
265 } else {
266 // Otherwise swap old for new work
267 let old = queue.pop_front().unwrap();
268 queue.push_back(work);
269 self.ws.condvar.notify_one();
270 Some(old)
271 }
272 } else {
273 Some(work)
274 }
275 }
276}
277
278impl Drop for Sender {
279 fn drop(&mut self) {
280 trace!("Sender::drop entered");
281 let threads = self.counter.load(Ordering::SeqCst);
282 for _ in 0..threads {
283 assert!(self.send(Work::Terminate).is_none());
284 }
285
286 // This intentionally only yields a number of times equivalent to the
287 // termination messages sent, to avoid risk of hanging.
288 for _ in 0..threads {
289 let size = self.counter.load(Ordering::SeqCst);
290 if size > 0 {
291 trace!("DipatchPool::(Sender::)drop yielding, \
292 pool size: {}", size);
293 thread::yield_now();
294 } else {
295 break;
296 }
297 }
298 }
299}
300
301impl fmt::Debug for DispatchPool {
302 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
303 f.debug_struct("DispatchPool")
304 .field("threads", &self.sender.counter.load(Ordering::Relaxed))
305 .field("ignore_panics", &self.ignore_panics)
306 .finish()
307 }
308}
309
310impl DispatchPoolBuilder {
311 /// Create new dispatch pool builder, for configuration.
312 pub fn new() -> DispatchPoolBuilder {
313 DispatchPoolBuilder {
314 pool_size: None,
315 queue_length: None,
316 stack_size: None,
317 name_prefix: None,
318 after_start: None,
319 before_stop: None,
320 ignore_panics: false,
321 }
322 }
323
324 /// Set the fixed number of threads in the pool.
325 ///
326 /// This must at least be one (1) thread, asserted. However the value is
327 /// ignored (and no threads are spawned) if queue_length is zero (0).
328 ///
329 /// Default: the number of logical CPU's minus one, but one at minimum:
330 ///
331 /// | Detected CPUs | Default Pool Size |
332 /// | -------------:| -----------------:|
333 /// | 0 | 1 |
334 /// | 1 | 1 |
335 /// | 2 | 1 |
336 /// | 3 | 2 |
337 /// | 4 | 3 |
338 ///
339 /// Detected CPUs may be influenced by simultaneous multithreading (SMT,
340 /// e.g. Intel hyper-threading) or scheduler affinity. Zero (0) detected
341 /// CPUs is likely an error.
342 pub fn pool_size(&mut self, size: usize) -> &mut Self {
343 assert!(size > 0);
344 self.pool_size = Some(size);
345 self
346 }
347
348 /// Set the length (aka maximum capacity or depth) of the associated
349 /// dispatch task queue.
350 ///
351 /// The length may be zero, in which case the pool is always considered
352 /// _full_ and no threads are spawned. If the queue is ever _full_, the
353 /// oldest tasks will be executed on the _calling thread_, see
354 /// [`DispatchPool::spawn`].
355 ///
356 /// Default: unbounded (unlimited)
357 pub fn queue_length(&mut self, length: usize) -> &mut Self {
358 self.queue_length = Some(length);
359 self
360 }
361
362 /// Set whether to catch and ignore unwinds for dispatch tasks that panic,
363 /// or to abort.
364 ///
365 /// If true, panics are ignored. Note that the unwind safety of dispatched
366 /// tasks is not well assured by the `UnwindSafe` marker trait and may
367 /// later result in undefined behavior (UB) or logic bugs.
368 ///
369 /// If false, a panic in a dispatch pool thread will result in process
370 /// abort.
371 ///
372 /// Default: false
373 pub fn ignore_panics(&mut self, ignore: bool) -> &mut Self {
374 self.ignore_panics = ignore;
375 self
376 }
377
378 /// Set the stack size in bytes for each thread in the pool.
379 ///
380 /// Default: the default thread stack size.
381 pub fn stack_size(&mut self, stack_size: usize) -> &mut Self {
382 self.stack_size = Some(stack_size);
383 self
384 }
385
386 /// Set name prefix for threads in the pool.
387 ///
388 /// The (unique) thread index is appended to form the complete thread name.
389 ///
390 /// Default: "dpx-pool-N-" where N is a 0-based global pool counter.
391 pub fn name_prefix<S: Into<String>>(&mut self, name_prefix: S) -> &mut Self {
392 self.name_prefix = Some(name_prefix.into());
393 self
394 }
395
396 /// Set a closure to be called immediately after each thread is started.
397 ///
398 /// The closure is passed a 0-based index of the thread.
399 ///
400 /// Default: None
401 pub fn after_start<F>(&mut self, f: F) -> &mut Self
402 where F: Fn(usize) + Send + Sync + 'static
403 {
404 self.after_start = Some(Arc::new(f));
405 self
406 }
407
408 /// Set a closure to be called immediately before a pool thread exits.
409 ///
410 /// The closure is passed a 0-based index of the thread.
411 ///
412 /// Default: None
413 pub fn before_stop<F>(&mut self, f: F) -> &mut Self
414 where F: Fn(usize) + Send + Sync + 'static
415 {
416 self.before_stop = Some(Arc::new(f));
417 self
418 }
419
420 /// Create a new [`DispatchPool`] with the provided configuration.
421 pub fn create(&mut self) -> DispatchPool {
422
423 let pool_size = if let Some(0) = self.queue_length {
424 // Zero pool size if zero queue length
425 0
426 } else if let Some(size) = self.pool_size {
427 size
428 } else {
429 let mut size = num_cpus::get();
430 if size > 1 {
431 size -= 1;
432 }
433 if size == 0 {
434 size = 1;
435 }
436 size
437 };
438
439 static POOL_CNT: AtomicUsize = AtomicUsize::new(0);
440 let name_prefix = if let Some(ref prefix) = self.name_prefix {
441 prefix.to_owned()
442 } else {
443 format!(
444 "dpx-pool-{}-",
445 POOL_CNT.fetch_add(1, Ordering::SeqCst))
446 };
447
448 let ws = if let Some(l) = self.queue_length {
449 Arc::new(WorkState {
450 queue: Mutex::new(VecDeque::with_capacity(l)),
451 limit: l,
452 condvar: Condvar::new(),
453 })
454 } else {
455 Arc::new(WorkState {
456 queue: Mutex::new(VecDeque::with_capacity(pool_size*2)),
457 limit: usize::max_value(),
458 condvar: Condvar::new()
459 })
460 };
461
462 let sender = Arc::new(Sender {
463 ws: ws.clone(),
464 counter: Arc::new(AtomicUsize::new(0))
465 });
466
467 for i in 0..pool_size {
468 let after_start = self.after_start.clone();
469 let before_stop = self.before_stop.clone();
470 let ws = ws.clone();
471
472 let mut builder = thread::Builder::new();
473 builder = builder.name(format!("{}{}", name_prefix, i));
474 if let Some(size) = self.stack_size {
475 builder = builder.stack_size(size);
476 }
477 let cnt = sender.counter.clone();
478 builder
479 .spawn(move || work(i, cnt, after_start, before_stop, ws))
480 .expect("DispatchPoolBuilder::create thread spawn");
481 }
482
483 // Wait until counter reaches pool size.
484 while sender.counter.load(Ordering::SeqCst) < pool_size {
485 thread::yield_now();
486 }
487
488 DispatchPool {
489 sender,
490 ignore_panics: self.ignore_panics,
491 }
492 }
493}
494
495impl Default for DispatchPoolBuilder {
496 fn default() -> Self {
497 Self::new()
498 }
499}