blocking_permit/
dispatch_pool.rs

1use std::collections::VecDeque;
2use std::fmt;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::thread;
6use std::panic::{catch_unwind, AssertUnwindSafe};
7
8use tao_log::{error, trace};
9use parking_lot::{Condvar, Mutex};
10
11/// A specialized thread pool and queue for dispatching _blocking_
12/// (synchronous, long running) operations.
13///
14/// This pool is not an _executor_, has no _waking_ facilities, etc. As
15/// compared with other thread pools supporting `spawn`, or `spawn_blocking`
16/// in _tokio_, here also called [`dispatch()`](crate::dispatch()) or
17/// [`dispatch_rx()`](crate::dispatch_rx()) this pool has some unique features:
18///
19/// * A configurable, fixed number of threads created before return from
20///   construction and terminated on `Drop::drop`.  Consistent memory
21///   footprint. No warmup required. No per-task thread management overhead.
22///
23/// * Configurable panic handling policy: Either catches and logs dispatch
24///   panics, or aborts the process, on panic unwind.
25///
26/// * Supports fixed (bounded) or unbounded queue length.
27///
28/// * When the queue is bounded and becomes full, [`DispatchPool::spawn`] pops
29///   the oldest operation off the queue before pushing the newest passed
30///   operation, to ensure space while holding a lock. Then as a fallback it
31///   runs the old operation. Thus we enlist calling threads once the queue
32///   reaches limit, but operation order (at least from perspective of a single
33///   thread) is preserved.
34///
35/// ## Usage
36///
37/// By default, the pool uses an unbounded queue, with the assumption that
38/// resource/capacity is externally constrained. Once constructed, a fixed
39/// number of threads are spawned and the instance acts as a handle to the
40/// pool. This may be inexpensively cloned for additional handles to the same
41/// pool.
42///
43/// See [`DispatchPoolBuilder`] for an extensive set of options.
44///
45/// ### With tokio's threaded runtime
46///
47/// One can schedule a clone of the `DispatchPool` (handle) on each tokio
48/// runtime thread (tokio's _rt-threaded_ feature).
49///
50#[cfg_attr(feature = "tokio-threaded", doc = r##"
51``` rust
52use blocking_permit::{
53    DispatchPool, register_dispatch_pool, deregister_dispatch_pool
54};
55
56let pool = DispatchPool::builder().create();
57
58let mut rt = tokio::runtime::Builder::new_multi_thread()
59    .on_thread_start(move || {
60        register_dispatch_pool(pool.clone());
61    })
62    .on_thread_stop(|| {
63        deregister_dispatch_pool();
64    })
65    .build()
66    .unwrap();
67```
68"##)]
69
70#[derive(Clone)]
71pub struct DispatchPool {
72    sender: Arc<Sender>,
73    ignore_panics: bool,
74}
75
76// `Arc`s may look a bit redundant above and below, but `Sender` has the `Drop`
77// implementation, and counter and ws are used/moved independently in the work
78// loop.
79
80#[derive(Debug)]
81struct Sender {
82    ws: Arc<WorkState>,
83    counter: Arc<AtomicUsize>,
84}
85
86type AroundFn = Arc<dyn Fn(usize) + Send + Sync>;
87
88/// A builder for [`DispatchPool`] supporting an extenstive set of
89/// configuration options.
90pub struct DispatchPoolBuilder {
91    pool_size: Option<usize>,
92    queue_length: Option<usize>,
93    stack_size: Option<usize>,
94    name_prefix: Option<String>,
95    after_start: Option<AroundFn>,
96    before_stop: Option<AroundFn>,
97    ignore_panics: bool
98}
99
100enum Work {
101    Unit(Box<dyn FnOnce() + Send>),
102    SafeUnit(AssertUnwindSafe<Box<dyn FnOnce() + Send>>),
103    Terminate,
104}
105
106impl DispatchPool {
107    /// Create new pool using defaults.
108    pub fn new() -> DispatchPool {
109        DispatchPoolBuilder::default().create()
110    }
111
112    /// Create a new builder for configuring a new pool.
113    pub fn builder() -> DispatchPoolBuilder {
114        DispatchPoolBuilder::new()
115    }
116
117    /// Enqueue a blocking operation to be executed.
118    ///
119    /// This first attempts to send to the associated queue, which will always
120    /// succeed if _unbounded_, e.g. no [`DispatchPoolBuilder::queue_length`]
121    /// is set, the default. If however the queue is _bounded_ and at capacity,
122    /// then this task will be pushed after taking the oldest task, which is
123    /// then run on the calling thread.
124    pub fn spawn(&self, f: Box<dyn FnOnce() + Send>) {
125        let work = if self.ignore_panics {
126            Work::SafeUnit(AssertUnwindSafe(f))
127        } else {
128            Work::Unit(f)
129        };
130
131        let work = self.sender.send(work);
132
133        match work {
134            None => {},
135            // Full, so run here. Panics will propagate.
136            Some(Work::Unit(f)) => f(),
137            // Full, so run here. Ignore panic unwinds.
138            Some(Work::SafeUnit(af)) => {
139                if catch_unwind(af).is_err() {
140                    error!("DispatchPool: panic on calling thread \
141                            was caught and ignored");
142                }
143            }
144            _ => {
145                // Safety: `send` will never return anything but Unit or
146                // SafeUnit.
147                unsafe { std::hint::unreachable_unchecked() }
148            }
149
150        }
151    }
152}
153
154// Guard type that can increment thread count, then on Drop: decrements count
155// and runs any before_stop function on thread.
156struct Turnstile {
157    index: usize,
158    counter: Arc<AtomicUsize>,
159    before_stop: Option<AroundFn>
160}
161
162impl Turnstile {
163    fn increment(&self) {
164        self.counter.fetch_add(1, Ordering::SeqCst);
165    }
166}
167
168impl Drop for Turnstile {
169    fn drop(&mut self) {
170        trace!("Turnstile::drop entered");
171        self.counter.fetch_sub(1, Ordering::SeqCst);
172        if let Some(bsfn) = &self.before_stop {
173            bsfn(self.index);
174        }
175    }
176}
177
178// Aborts the process if dropped
179struct AbortOnPanic;
180
181impl Drop for AbortOnPanic {
182    fn drop(&mut self) {
183        error!("DispatchPool: aborting due to panic on dispatch thread");
184        tao_log::log::logger().flush();
185        std::process::abort();
186    }
187}
188
189struct WorkState {
190    queue: Mutex<VecDeque<Work>>,
191    limit: usize,
192    condvar: Condvar,
193}
194
195impl fmt::Debug for WorkState {
196    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
197        f.debug_struct("WorkState")
198            .finish()
199    }
200}
201
202fn work(
203    index: usize,
204    counter: Arc<AtomicUsize>,
205    after_start: Option<AroundFn>,
206    before_stop: Option<AroundFn>,
207    ws: Arc<WorkState>)
208{
209    if let Some(ref asfn) = after_start {
210        asfn(index);
211    }
212    drop(after_start);
213
214    {
215        let ts = Turnstile { index, counter, before_stop };
216        let ws = ws; // moved to here so it drops before ts.
217        let mut lock = ws.queue.lock();
218        ts.increment();
219        'worker: loop {
220            while let Some(w) = lock.pop_front() {
221                drop(lock);
222                match w {
223                    Work::Unit(bfn) => {
224                        let abort = AbortOnPanic;
225                        bfn();
226                        std::mem::forget(abort);
227                    }
228                    Work::SafeUnit(abfn) => {
229                        if catch_unwind(abfn).is_err() {
230                            error!("DispatchPool: panic on pool \
231                                    was caught and ignored");
232                        }
233                    }
234                    Work::Terminate => break 'worker,
235                }
236                lock = ws.queue.lock();
237            }
238
239            ws.condvar.wait(&mut lock);
240        }
241    }
242}
243
244impl Default for DispatchPool {
245    fn default() -> Self {
246        Self::new()
247    }
248}
249
250impl Sender {
251    // Send new work, possibly returning some, different, older work if the
252    // queue is bound and its limit is reached. If queue has limit 0, then
253    // always return the work given.
254    fn send(&self, work: Work) -> Option<Work> {
255        let mut queue = self.ws.queue.lock();
256        let qlen = queue.len();
257        if matches!(work, Work::Terminate) || qlen < self.ws.limit {
258            queue.push_back(work);
259            self.ws.condvar.notify_one();
260            None
261        } else if qlen > 0 && qlen == self.ws.limit {
262            // Avoid the swap if front (oldest) element is a `Terminate`
263            if let Some(&Work::Terminate) = queue.front() {
264                Some(work)
265            } else {
266                // Otherwise swap old for new work
267                let old = queue.pop_front().unwrap();
268                queue.push_back(work);
269                self.ws.condvar.notify_one();
270                Some(old)
271            }
272        } else {
273            Some(work)
274        }
275    }
276}
277
278impl Drop for Sender {
279    fn drop(&mut self) {
280        trace!("Sender::drop entered");
281        let threads = self.counter.load(Ordering::SeqCst);
282        for _ in 0..threads {
283            assert!(self.send(Work::Terminate).is_none());
284        }
285
286        // This intentionally only yields a number of times equivalent to the
287        // termination messages sent, to avoid risk of hanging.
288        for _ in 0..threads {
289            let size = self.counter.load(Ordering::SeqCst);
290            if size > 0 {
291                trace!("DipatchPool::(Sender::)drop yielding, \
292                        pool size: {}", size);
293                thread::yield_now();
294            } else {
295                break;
296            }
297        }
298    }
299}
300
301impl fmt::Debug for DispatchPool {
302    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
303        f.debug_struct("DispatchPool")
304            .field("threads", &self.sender.counter.load(Ordering::Relaxed))
305            .field("ignore_panics", &self.ignore_panics)
306            .finish()
307    }
308}
309
310impl DispatchPoolBuilder {
311    /// Create new dispatch pool builder, for configuration.
312    pub fn new() -> DispatchPoolBuilder {
313        DispatchPoolBuilder {
314            pool_size: None,
315            queue_length: None,
316            stack_size: None,
317            name_prefix: None,
318            after_start: None,
319            before_stop: None,
320            ignore_panics: false,
321        }
322    }
323
324    /// Set the fixed number of threads in the pool.
325    ///
326    /// This must at least be one (1) thread, asserted. However the value is
327    /// ignored (and no threads are spawned) if queue_length is zero (0).
328    ///
329    /// Default: the number of logical CPU's minus one, but one at minimum:
330    ///
331    /// | Detected CPUs | Default Pool Size |
332    /// | -------------:| -----------------:|
333    /// |       0       |         1         |
334    /// |       1       |         1         |
335    /// |       2       |         1         |
336    /// |       3       |         2         |
337    /// |       4       |         3         |
338    ///
339    /// Detected CPUs may be influenced by simultaneous multithreading (SMT,
340    /// e.g. Intel hyper-threading) or scheduler affinity. Zero (0) detected
341    /// CPUs is likely an error.
342    pub fn pool_size(&mut self, size: usize) -> &mut Self {
343        assert!(size > 0);
344        self.pool_size = Some(size);
345        self
346    }
347
348    /// Set the length (aka maximum capacity or depth) of the associated
349    /// dispatch task queue.
350    ///
351    /// The length may be zero, in which case the pool is always considered
352    /// _full_ and no threads are spawned.  If the queue is ever _full_, the
353    /// oldest tasks will be executed on the _calling thread_, see
354    /// [`DispatchPool::spawn`].
355    ///
356    /// Default: unbounded (unlimited)
357    pub fn queue_length(&mut self, length: usize) -> &mut Self {
358        self.queue_length = Some(length);
359        self
360    }
361
362    /// Set whether to catch and ignore unwinds for dispatch tasks that panic,
363    /// or to abort.
364    ///
365    /// If true, panics are ignored. Note that the unwind safety of dispatched
366    /// tasks is not well assured by the `UnwindSafe` marker trait and may
367    /// later result in undefined behavior (UB) or logic bugs.
368    ///
369    /// If false, a panic in a dispatch pool thread will result in process
370    /// abort.
371    ///
372    /// Default: false
373    pub fn ignore_panics(&mut self, ignore: bool) -> &mut Self {
374        self.ignore_panics = ignore;
375        self
376    }
377
378    /// Set the stack size in bytes for each thread in the pool.
379    ///
380    /// Default: the default thread stack size.
381    pub fn stack_size(&mut self, stack_size: usize) -> &mut Self {
382        self.stack_size = Some(stack_size);
383        self
384    }
385
386    /// Set name prefix for threads in the pool.
387    ///
388    /// The (unique) thread index is appended to form the complete thread name.
389    ///
390    /// Default: "dpx-pool-N-" where N is a 0-based global pool counter.
391    pub fn name_prefix<S: Into<String>>(&mut self, name_prefix: S) -> &mut Self {
392        self.name_prefix = Some(name_prefix.into());
393        self
394    }
395
396    /// Set a closure to be called immediately after each thread is started.
397    ///
398    /// The closure is passed a 0-based index of the thread.
399    ///
400    /// Default: None
401    pub fn after_start<F>(&mut self, f: F) -> &mut Self
402        where F: Fn(usize) + Send + Sync + 'static
403    {
404        self.after_start = Some(Arc::new(f));
405        self
406    }
407
408    /// Set a closure to be called immediately before a pool thread exits.
409    ///
410    /// The closure is passed a 0-based index of the thread.
411    ///
412    /// Default: None
413    pub fn before_stop<F>(&mut self, f: F) -> &mut Self
414        where F: Fn(usize) + Send + Sync + 'static
415    {
416        self.before_stop = Some(Arc::new(f));
417        self
418    }
419
420    /// Create a new [`DispatchPool`] with the provided configuration.
421    pub fn create(&mut self) -> DispatchPool {
422
423        let pool_size = if let Some(0) = self.queue_length {
424            // Zero pool size if zero queue length
425            0
426        } else if let Some(size) = self.pool_size {
427            size
428        } else {
429            let mut size = num_cpus::get();
430            if size > 1 {
431                size -= 1;
432            }
433            if size == 0 {
434                size = 1;
435            }
436            size
437        };
438
439        static POOL_CNT: AtomicUsize = AtomicUsize::new(0);
440        let name_prefix = if let Some(ref prefix) = self.name_prefix {
441            prefix.to_owned()
442        } else {
443            format!(
444                "dpx-pool-{}-",
445                POOL_CNT.fetch_add(1, Ordering::SeqCst))
446        };
447
448        let ws = if let Some(l) = self.queue_length {
449            Arc::new(WorkState {
450                queue: Mutex::new(VecDeque::with_capacity(l)),
451                limit: l,
452                condvar: Condvar::new(),
453            })
454        } else {
455            Arc::new(WorkState {
456                queue: Mutex::new(VecDeque::with_capacity(pool_size*2)),
457                limit: usize::max_value(),
458                condvar: Condvar::new()
459            })
460        };
461
462        let sender = Arc::new(Sender {
463            ws: ws.clone(),
464            counter: Arc::new(AtomicUsize::new(0))
465        });
466
467        for i in 0..pool_size {
468            let after_start = self.after_start.clone();
469            let before_stop = self.before_stop.clone();
470            let ws = ws.clone();
471
472            let mut builder = thread::Builder::new();
473            builder = builder.name(format!("{}{}", name_prefix, i));
474            if let Some(size) = self.stack_size {
475                builder = builder.stack_size(size);
476            }
477            let cnt = sender.counter.clone();
478            builder
479                .spawn(move || work(i, cnt, after_start, before_stop, ws))
480                .expect("DispatchPoolBuilder::create thread spawn");
481        }
482
483        // Wait until counter reaches pool size.
484        while sender.counter.load(Ordering::SeqCst) < pool_size {
485            thread::yield_now();
486        }
487
488        DispatchPool {
489            sender,
490            ignore_panics: self.ignore_panics,
491        }
492    }
493}
494
495impl Default for DispatchPoolBuilder {
496    fn default() -> Self {
497        Self::new()
498    }
499}