lft_rust/
single_queue_threadpool.rs

1// Copyright 2014 The Rust Project Developers. 
2// Copyright 2022 @yucwang.
3// See the COPYRIGHT file at the top-level directory of this 
4// distribution and at http://rust-lang.org/COPYRIGHT.
5//
6// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
7// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
8// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
9// option. This file may not be copied, modified, or distributed
10// except according to those terms.
11
12//! A thread pool used to execute functions in parallel.
13//!
14//! Spawns a specified number of worker threads and replenishes the pool if any worker threads
15//! panic.
16//!
17//! # Examples
18//!
19//! ## Synchronized with a channel
20//!
21//! Every thread sends one message over the channel, which then is collected with the `take()`.
22//!
23//! ```
24//! use crossbeam_channel::unbounded;
25//!
26//! let n_workers = 4;
27//! let n_jobs = 8;
28//! let pool = threadpool::builder().num_workers(n_workers).build();
29//!
30//! let (tx, rx) = unbounded();
31//! for _ in 0..n_jobs {
32//!     let tx = tx.clone();
33//!     pool.execute(move|| {
34//!         tx.send(1).expect("channel will be there waiting for the pool");
35//!     });
36//! }
37//! drop(tx);
38//!
39//! assert_eq!(rx.iter().take(n_jobs).fold(0, |a, b| a + b), 8);
40//! ```
41//!
42//! ## Synchronized with a barrier
43//!
44//! Keep in mind, if a barrier synchronizes more jobs than you have workers in the pool,
45//! you will end up with a [deadlock](https://en.wikipedia.org/wiki/Deadlock)
46//! at the barrier which is [not considered unsafe](
47//! https://doc.rust-lang.org/reference/behavior-not-considered-unsafe.html).
48//!
49//! ```
50//! use threadpool::SingleQueueThreadpool;
51//! use std::sync::{Arc, Barrier};
52//! use std::sync::atomic::{AtomicUsize, Ordering};
53//!
54//! // create at least as many workers as jobs or you will deadlock yourself
55//! let n_workers = 42;
56//! let n_jobs = 23;
57//! let pool = threadpool::builder().num_workers(n_workers).build();
58//! let an_atomic = Arc::new(AtomicUsize::new(0));
59//!
60//! assert!(n_jobs <= n_workers, "too many jobs, will deadlock");
61//!
62//! // create a barrier that waits for all jobs plus the starter thread
63//! let barrier = Arc::new(Barrier::new(n_jobs + 1));
64//! for _ in 0..n_jobs {
65//!     let barrier = barrier.clone();
66//!     let an_atomic = an_atomic.clone();
67//!
68//!     pool.execute(move|| {
69//!         // do the heavy work
70//!         an_atomic.fetch_add(1, Ordering::Relaxed);
71//!
72//!         // then wait for the other threads
73//!         barrier.wait();
74//!     });
75//! }
76//!
77//! // wait for the threads to finish the work
78//! barrier.wait();
79//! assert_eq!(an_atomic.load(Ordering::SeqCst), n_jobs);
80//! ```
81
82use num_cpus;
83
84use crossbeam_channel::{ unbounded, Receiver, Sender };
85
86use std::fmt;
87use std::sync::atomic::{ AtomicUsize, Ordering };
88use std::sync::{ Arc, Condvar, Mutex };
89use std::thread;
90
91#[cfg(test)]
92mod test;
93
94/// Creates a new thread pool with the same number of workers as CPUs are detected.
95///
96/// # Examples
97///
98/// Create a new thread pool capable of executing at least one jobs concurrently:
99///
100/// ```
101/// let pool = lft_rust::single_queue_threadpool_auto_config();
102/// ```
103pub fn single_queue_threadpool_auto_config() -> SingleQueueThreadpool {
104    single_queue_threadpool_builder().build()
105}
106
107/// Initiate a new [`SingleQueueThreadpoolBuilder`].
108///
109/// [`SingleQueueThreadpoolBuilder`]: struct.SingleQueueThreadpoolBuilder.html
110///
111/// # Examples
112///
113/// ```
114/// let builder = lft_rust::single_queue_threadpool_builder();
115/// ```
116pub const fn single_queue_threadpool_builder() -> SingleQueueThreadpoolBuilder {
117    SingleQueueThreadpoolBuilder {
118        num_workers: None,
119        worker_name: None,
120        thread_stack_size: None,
121    }
122}
123
124trait FnBox {
125    fn call_box(self: Box<Self>);
126}
127
128impl<F: FnOnce()> FnBox for F {
129    fn call_box(self: Box<F>) {
130        (*self)()
131    }
132}
133
134type Thunk<'a> = Box<dyn FnBox + Send + 'a>;
135
136struct Sentinel<'a> {
137    shared_data: &'a Arc<SingleQueueThreadpoolSharedData>,
138    active: bool,
139}
140
141impl<'a> Sentinel<'a> {
142    fn new(shared_data: &'a Arc<SingleQueueThreadpoolSharedData>) -> Sentinel<'a> {
143        Sentinel {
144            shared_data: shared_data,
145            active: true,
146        }
147    }
148
149    /// Cancel and destroy this sentinel.
150    fn cancel(mut self) {
151        self.active = false;
152    }
153}
154
155impl<'a> Drop for Sentinel<'a> {
156    fn drop(&mut self) {
157        if self.active {
158            self.shared_data.active_count.fetch_sub(1, Ordering::SeqCst);
159            if thread::panicking() {
160                self.shared_data.panic_count.fetch_add(1, Ordering::SeqCst);
161            }
162            self.shared_data.no_work_notify_all();
163            spawn_in_pool(self.shared_data.clone())
164        }
165    }
166}
167
168/// [`SingleQueueThreadpool`] factory, which can be used in order to configure the properties of the
169/// [`SingleQueueThreadpool`].
170///
171/// The three configuration options available:
172///
173/// * `num_workers`: maximum number of threads that will be alive at any given moment by the built
174///   [`SingleQueueThreadpool`]
175/// * `worker_name`: thread name for each of the threads spawned by the built [`SingleQueueThreadpool`]
176/// * `thread_stack_size`: stack size (in bytes) for each of the threads spawned by the built
177///   [`SingleQueueThreadpool`]
178///
179/// [`SingleQueueThreadpool`]: struct.SingleQueueThreadpool.html
180///
181/// # Examples
182///
183/// Build a [`SingleQueueThreadpool`] that uses a maximum of eight threads simultaneously and each thread has
184/// a 8 MB stack size:
185///
186/// ```
187/// let pool = lft_rust::single_queue_threadpool_builder()
188///     .num_workers(8)
189///     .thread_stack_size(8 * 1024 * 1024)
190///     .build();
191/// ```
192#[derive(Clone, Default)]
193pub struct SingleQueueThreadpoolBuilder {
194    num_workers: Option<usize>,
195    worker_name: Option<String>,
196    thread_stack_size: Option<usize>,
197}
198
199impl SingleQueueThreadpoolBuilder {
200    /// Set the maximum number of worker-threads that will be alive at any given moment by the built
201    /// [`SingleQueueThreadpool`]. If not specified, defaults the number of threads to the number of CPUs.
202    ///
203    /// [`SingleQueueThreadpool`]: struct.SingleQueueThreadpool.html
204    ///
205    /// # Panics
206    ///
207    /// This method will panic if `num_workers` is 0.
208    ///
209    /// # Examples
210    ///
211    /// No more than eight threads will be alive simultaneously for this pool:
212    ///
213    /// ```
214    /// use std::thread;
215    ///
216    /// let pool = lft_rust::single_queue_threadpool_builder()
217    ///     .num_workers(8)
218    ///     .build();
219    ///
220    /// for _ in 0..42 {
221    ///     pool.execute(|| {
222    ///         println!("Hello from a worker thread!")
223    ///     })
224    /// }
225    /// ```
226    pub fn num_workers(mut self, num_workers: usize) -> SingleQueueThreadpoolBuilder {
227        assert!(num_workers > 0);
228        self.num_workers = Some(num_workers);
229        self
230    }
231
232    /// Set the thread name for each of the threads spawned by the built [`SingleQueueThreadpool`]. If not
233    /// specified, threads spawned by the thread pool will be unnamed.
234    ///
235    /// [`SingleQueueThreadpool`]: struct.SingleQueueThreadpool.html
236    ///
237    /// # Examples
238    ///
239    /// Each thread spawned by this pool will have the name "foo":
240    ///
241    /// ```
242    /// use std::thread;
243    ///
244    /// let pool = lft_rust::single_queue_threadpool_builder()
245    ///     .worker_name("foo")
246    ///     .build();
247    ///
248    /// for _ in 0..100 {
249    ///     pool.execute(|| {
250    ///         assert_eq!(thread::current().name(), Some("foo"));
251    ///     })
252    /// }
253    /// ```
254    pub fn worker_name<S: AsRef<str>>(mut self, name: S) -> SingleQueueThreadpoolBuilder {
255        // TODO save the copy with Into<String>
256        self.worker_name = Some(name.as_ref().to_owned());
257        self
258    }
259
260    /// Set the stack size (in bytes) for each of the threads spawned by the built [`SingleQueueThreadpool`].
261    /// If not specified, threads spawned by the threadpool will have a stack size [as specified in
262    /// the `std::thread` documentation][thread].
263    ///
264    /// [thread]: https://doc.rust-lang.org/nightly/std/thread/index.html#stack-size
265    /// [`SingleQueueThreadpool`]: struct.SingleQueueThreadpool.html
266    ///
267    /// # Examples
268    ///
269    /// Each thread spawned by this pool will have a 4 MB stack:
270    ///
271    /// ```
272    /// let pool = lft_rust::single_queue_threadpool_builder()
273    ///     .thread_stack_size(4096 * 1024)
274    ///     .build();
275    ///
276    /// for _ in 0..100 {
277    ///     pool.execute(|| {
278    ///         println!("This thread has a 4 MB stack size!");
279    ///     })
280    /// }
281    /// ```
282    pub fn thread_stack_size(mut self, size: usize) -> SingleQueueThreadpoolBuilder {
283        self.thread_stack_size = Some(size);
284        self
285    }
286
287    /// Finalize the [`SingleQueueThreadpoolBuilder`] and build the [`SingleQueueThreadpool`].
288    ///
289    /// [`SingleQueueThreadpoolBuilder`]: struct.SingleQueueThreadpoolBuilder.html
290    /// [`SingleQueueThreadpool`]: struct.SingleQueueThreadpool.html
291    ///
292    /// # Examples
293    ///
294    /// ```
295    /// let pool = lft_rust::single_queue_threadpool_builder()
296    ///     .num_workers(8)
297    ///     .thread_stack_size(16*1024*1024)
298    ///     .build();
299    /// ```
300    pub fn build(self) -> SingleQueueThreadpool {
301        let (tx, rx) = unbounded::<Thunk<'static>>();
302
303        let num_workers = self.num_workers.unwrap_or_else(num_cpus::get);
304
305        let shared_data = Arc::new(SingleQueueThreadpoolSharedData {
306            name: self.worker_name,
307            job_receiver: Mutex::new(rx),
308            empty_condvar: Condvar::new(),
309            empty_trigger: Mutex::new(()),
310            join_generation: AtomicUsize::new(0),
311            queued_count: AtomicUsize::new(0),
312            active_count: AtomicUsize::new(0),
313            max_thread_count: AtomicUsize::new(num_workers),
314            panic_count: AtomicUsize::new(0),
315            stack_size: self.thread_stack_size,
316        });
317
318        // Threadpool threads
319        for _ in 0..num_workers {
320            spawn_in_pool(shared_data.clone());
321        }
322
323        SingleQueueThreadpool {
324            jobs: tx,
325            shared_data: shared_data,
326        }
327    }
328}
329
330struct SingleQueueThreadpoolSharedData {
331    name: Option<String>,
332    job_receiver: Mutex<Receiver<Thunk<'static>>>,
333    empty_trigger: Mutex<()>,
334    empty_condvar: Condvar,
335    join_generation: AtomicUsize,
336    queued_count: AtomicUsize,
337    active_count: AtomicUsize,
338    max_thread_count: AtomicUsize,
339    panic_count: AtomicUsize,
340    stack_size: Option<usize>,
341}
342
343impl SingleQueueThreadpoolSharedData {
344    fn has_work(&self) -> bool {
345        self.queued_count.load(Ordering::SeqCst) > 0 || self.active_count.load(Ordering::SeqCst) > 0
346    }
347
348    /// Notify all observers joining this pool if there is no more work to do.
349    fn no_work_notify_all(&self) {
350        if !self.has_work() {
351            *self
352                .empty_trigger
353                .lock()
354                .expect("Unable to notify all joining threads");
355            self.empty_condvar.notify_all();
356        }
357    }
358}
359
360/// Abstraction of a thread pool for basic parallelism.
361pub struct SingleQueueThreadpool {
362    // How the threadpool communicates with subthreads.
363    //
364    // This is the only such Sender, so when it is dropped all subthreads will
365    // quit.
366    jobs: Sender<Thunk<'static>>,
367    shared_data: Arc<SingleQueueThreadpoolSharedData>,
368}
369
370impl SingleQueueThreadpool {
371    /// Executes the function `job` on a thread in the pool.
372    ///
373    /// # Examples
374    ///
375    /// Execute four jobs on a thread pool that can run two jobs concurrently:
376    ///
377    /// ```
378    /// let pool = lft_rust::single_queue_threadpool_auto_config();
379    /// pool.execute(|| println!("hello"));
380    /// pool.execute(|| println!("world"));
381    /// pool.execute(|| println!("foo"));
382    /// pool.execute(|| println!("bar"));
383    /// pool.join();
384    /// ```
385    pub fn execute<F>(&self, job: F)
386    where
387        F: FnOnce() + Send + 'static,
388    {
389        self.shared_data.queued_count.fetch_add(1, Ordering::SeqCst);
390        self.jobs
391            .send(Box::new(job))
392            .expect("SingleQueueThreadpool::execute unable to send job into queue.");
393    }
394
395    /// Returns the number of jobs waiting to executed in the pool.
396    ///
397    /// # Examples
398    ///
399    /// ```
400    /// use lft_rust::SingleQueueThreadpool;
401    /// use std::time::Duration;
402    /// use std::thread::sleep;
403    ///
404    /// let pool = lft_rust::single_queue_threadpool_builder()
405    ///                         .num_workers(2)
406    ///                         .build();
407    /// for _ in 0..10 {
408    ///     pool.execute(|| {
409    ///         sleep(Duration::from_secs(100));
410    ///     });
411    /// }
412    ///
413    /// sleep(Duration::from_secs(1)); // wait for threads to start
414    /// assert_eq!(8, pool.queued_count());
415    /// ```
416    pub fn queued_count(&self) -> usize {
417        self.shared_data.queued_count.load(Ordering::Relaxed)
418    }
419
420    /// Returns the number of currently active worker threads.
421    ///
422    /// # Examples
423    ///
424    /// ```
425    /// use std::time::Duration;
426    /// use std::thread::sleep;
427    ///
428    /// let pool = lft_rust::single_queue_threadpool_builder()
429    ///                         .num_workers(4)
430    ///                         .build();
431    ///
432    /// for _ in 0..10 {
433    ///     pool.execute(move || {
434    ///         sleep(Duration::from_secs(100));
435    ///     });
436    /// }
437    ///
438    /// sleep(Duration::from_secs(1)); // wait for threads to start
439    /// assert_eq!(4, pool.active_count());
440    /// ```
441    pub fn active_count(&self) -> usize {
442        self.shared_data.active_count.load(Ordering::SeqCst)
443    }
444
445    /// Returns the maximum number of threads the pool will execute concurrently.
446    ///
447    /// # Examples
448    ///
449    /// ```
450    /// let pool = lft_rust::single_queue_threadpool_builder()
451    ///                         .num_workers(4)
452    ///                         .build();
453    /// assert_eq!(4, pool.max_count());
454    ///
455    /// pool.set_num_workers(8);
456    /// assert_eq!(8, pool.max_count());
457    /// ```
458    pub fn max_count(&self) -> usize {
459        self.shared_data.max_thread_count.load(Ordering::Relaxed)
460    }
461
462    /// Returns the number of panicked threads over the lifetime of the pool.
463    ///
464    /// # Examples
465    ///
466    /// ```
467    /// let pool = lft_rust::single_queue_threadpool_auto_config();
468    /// for n in 0..10 {
469    ///     pool.execute(move || {
470    ///         // simulate a panic
471    ///         if n % 2 == 0 {
472    ///             panic!()
473    ///         }
474    ///     });
475    /// }
476    /// pool.join();
477    ///
478    /// assert_eq!(5, pool.panic_count());
479    /// ```
480    pub fn panic_count(&self) -> usize {
481        self.shared_data.panic_count.load(Ordering::Relaxed)
482    }
483
484    /// Sets the number of worker-threads to use as `num_workers`.
485    /// Can be used to change the threadpool size during runtime.
486    /// Will not abort already running or waiting threads.
487    ///
488    /// # Panics
489    ///
490    /// This function will panic if `num_workers` is 0.
491    ///
492    /// # Examples
493    ///
494    /// ```
495    /// use std::time::Duration;
496    /// use std::thread::sleep;
497    ///
498    /// let pool = lft_rust::single_queue_threadpool_builder()
499    ///                             .num_workers(4)
500    ///                             .build();
501    ///
502    /// for _ in 0..10 {
503    ///     pool.execute(move || {
504    ///         sleep(Duration::from_secs(100));
505    ///     });
506    /// }
507    ///
508    /// sleep(Duration::from_secs(1)); // wait for threads to start
509    /// assert_eq!(4, pool.active_count());
510    /// assert_eq!(6, pool.queued_count());
511    ///
512    /// // Increase thread capacity of the pool
513    /// pool.set_num_workers(8);
514    ///
515    /// sleep(Duration::from_secs(1)); // wait for new threads to start
516    /// assert_eq!(8, pool.active_count());
517    /// assert_eq!(2, pool.queued_count());
518    ///
519    /// // Decrease thread capacity of the pool
520    /// // No active threads are killed
521    /// pool.set_num_workers(4);
522    ///
523    /// assert_eq!(8, pool.active_count());
524    /// assert_eq!(2, pool.queued_count());
525    /// ```
526    pub fn set_num_workers(&self, num_workers: usize) {
527        assert!(num_workers >= 1);
528        let prev_num_workers = self
529            .shared_data
530            .max_thread_count
531            .swap(num_workers, Ordering::Release);
532        if let Some(num_spawn) = num_workers.checked_sub(prev_num_workers) {
533            // Spawn new threads
534            for _ in 0..num_spawn {
535                spawn_in_pool(self.shared_data.clone());
536            }
537        }
538    }
539
540    /// Block the current thread until all jobs in the pool have been executed.
541    ///
542    /// Calling `join` on an empty pool will cause an immediate return.
543    /// `join` may be called from multiple threads concurrently.
544    /// A `join` is an atomic point in time. All threads joining before the join
545    /// event will exit together even if the pool is processing new jobs by the
546    /// time they get scheduled.
547    ///
548    /// Calling `join` from a thread within the pool will cause a deadlock. This
549    /// behavior is considered safe.
550    ///
551    /// **Note:** Join will not stop the worker threads. You will need to `drop`
552    /// all instances of `SingleQueueThreadpool` for the worker threads to terminate.
553    ///
554    /// # Examples
555    ///
556    /// ```
557    /// use lft_rust::SingleQueueThreadpool;
558    /// use std::sync::Arc;
559    /// use std::sync::atomic::{AtomicUsize, Ordering};
560    ///
561    /// let pool = lft_rust::single_queue_threadpool_auto_config();
562    /// let test_count = Arc::new(AtomicUsize::new(0));
563    ///
564    /// for _ in 0..42 {
565    ///     let test_count = test_count.clone();
566    ///     pool.execute(move || {
567    ///         test_count.fetch_add(1, Ordering::Relaxed);
568    ///     });
569    /// }
570    ///
571    /// pool.join();
572    /// assert_eq!(42, test_count.load(Ordering::Relaxed));
573    /// ```
574    pub fn join(&self) {
575        // fast path requires no mutex
576        if self.shared_data.has_work() == false {
577            return ();
578        }
579
580        let generation = self.shared_data.join_generation.load(Ordering::SeqCst);
581        let mut lock = self.shared_data.empty_trigger.lock().unwrap();
582
583        while generation == self.shared_data.join_generation.load(Ordering::Relaxed)
584            && self.shared_data.has_work()
585        {
586            lock = self.shared_data.empty_condvar.wait(lock).unwrap();
587        }
588
589        // increase generation if we are the first joining thread to come out of the loop
590        let _ = self.shared_data.join_generation.compare_exchange(
591            generation,
592            generation.wrapping_add(1),
593            Ordering::SeqCst,
594            Ordering::SeqCst,
595        );
596    }
597}
598
599impl Clone for SingleQueueThreadpool {
600    /// Cloning a pool will create a new handle to the pool.
601    /// The behavior is similar to [Arc](https://doc.rust-lang.org/stable/std/sync/struct.Arc.html).
602    ///
603    /// We could for example submit jobs from multiple threads concurrently.
604    ///
605    /// ```
606    /// use std::thread;
607    /// use crossbeam_channel::unbounded;
608    ///
609    /// let pool = lft_rust::single_queue_threadpool_builder()
610    ///                 .worker_name("clone example")
611    ///                 .num_workers(2)
612    ///                 .build();
613    ///
614    /// let results = (0..2)
615    ///     .map(|i| {
616    ///         let pool = pool.clone();
617    ///         thread::spawn(move || {
618    ///             let (tx, rx) = unbounded();
619    ///             for i in 1..12 {
620    ///                 let tx = tx.clone();
621    ///                 pool.execute(move || {
622    ///                     tx.send(i).expect("channel will be waiting");
623    ///                 });
624    ///             }
625    ///             drop(tx);
626    ///             if i == 0 {
627    ///                 rx.iter().fold(0, |accumulator, element| accumulator + element)
628    ///             } else {
629    ///                 rx.iter().fold(1, |accumulator, element| accumulator * element)
630    ///             }
631    ///         })
632    ///     })
633    ///     .map(|join_handle| join_handle.join().expect("collect results from threads"))
634    ///     .collect::<Vec<usize>>();
635    ///
636    /// assert_eq!(vec![66, 39916800], results);
637    /// ```
638    fn clone(&self) -> SingleQueueThreadpool {
639        SingleQueueThreadpool {
640            jobs: self.jobs.clone(),
641            shared_data: self.shared_data.clone(),
642        }
643    }
644}
645
646impl fmt::Debug for SingleQueueThreadpool {
647    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
648        f.debug_struct("SingleQueueThreadpool")
649            .field("name", &self.shared_data.name)
650            .field("queued_count", &self.queued_count())
651            .field("active_count", &self.active_count())
652            .field("max_count", &self.max_count())
653            .finish()
654    }
655}
656
657impl PartialEq for SingleQueueThreadpool {
658    /// Check if you are working with the same pool
659    ///
660    /// ```
661    /// let a = lft_rust::single_queue_threadpool_auto_config();
662    /// let b = lft_rust::single_queue_threadpool_auto_config();
663    ///
664    /// assert_eq!(a, a);
665    /// assert_eq!(b, b);
666    ///
667    /// assert_ne!(a, b);
668    /// assert_ne!(b, a);
669    /// ```
670    fn eq(&self, other: &SingleQueueThreadpool) -> bool {
671        Arc::ptr_eq(&self.shared_data, &other.shared_data)
672    }
673}
674impl Eq for SingleQueueThreadpool {}
675
676fn spawn_in_pool(shared_data: Arc<SingleQueueThreadpoolSharedData>) {
677    let mut builder = thread::Builder::new();
678    if let Some(ref name) = shared_data.name {
679        builder = builder.name(name.clone());
680    }
681    if let Some(ref stack_size) = shared_data.stack_size {
682        builder = builder.stack_size(stack_size.to_owned());
683    }
684    builder
685        .spawn(move || {
686            // Will spawn a new thread on panic unless it is cancelled.
687            let sentinel = Sentinel::new(&shared_data);
688
689            loop {
690                // Shutdown this thread if the pool has become smaller
691                let thread_counter_val = shared_data.active_count.load(Ordering::Acquire);
692                let max_thread_count_val = shared_data.max_thread_count.load(Ordering::Relaxed);
693                if thread_counter_val >= max_thread_count_val {
694                    break;
695                }
696                let message = {
697                    // Only lock jobs for the time it takes
698                    // to get a job, not run it.
699                    let lock = shared_data
700                        .job_receiver
701                        .lock()
702                        .expect("Worker thread unable to lock job_receiver");
703                    lock.recv()
704                };
705
706                let job = match message {
707                    Ok(job) => job,
708                    // The SingleQueueThreadpool was dropped.
709                    Err(..) => break,
710                };
711                // Do not allow IR around the job execution
712                shared_data.active_count.fetch_add(1, Ordering::SeqCst);
713                shared_data.queued_count.fetch_sub(1, Ordering::SeqCst);
714
715                job.call_box();
716
717                shared_data.active_count.fetch_sub(1, Ordering::SeqCst);
718                shared_data.no_work_notify_all();
719            }
720
721            sentinel.cancel();
722        })
723        .unwrap();
724}