lft_rust/single_queue_threadpool.rs
1// Copyright 2014 The Rust Project Developers.
2// Copyright 2022 @yucwang.
3// See the COPYRIGHT file at the top-level directory of this
4// distribution and at http://rust-lang.org/COPYRIGHT.
5//
6// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
7// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
8// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
9// option. This file may not be copied, modified, or distributed
10// except according to those terms.
11
12//! A thread pool used to execute functions in parallel.
13//!
14//! Spawns a specified number of worker threads and replenishes the pool if any worker threads
15//! panic.
16//!
17//! # Examples
18//!
19//! ## Synchronized with a channel
20//!
21//! Every thread sends one message over the channel, which then is collected with the `take()`.
22//!
23//! ```
24//! use crossbeam_channel::unbounded;
25//!
26//! let n_workers = 4;
27//! let n_jobs = 8;
28//! let pool = threadpool::builder().num_workers(n_workers).build();
29//!
30//! let (tx, rx) = unbounded();
31//! for _ in 0..n_jobs {
32//! let tx = tx.clone();
33//! pool.execute(move|| {
34//! tx.send(1).expect("channel will be there waiting for the pool");
35//! });
36//! }
37//! drop(tx);
38//!
39//! assert_eq!(rx.iter().take(n_jobs).fold(0, |a, b| a + b), 8);
40//! ```
41//!
42//! ## Synchronized with a barrier
43//!
44//! Keep in mind, if a barrier synchronizes more jobs than you have workers in the pool,
45//! you will end up with a [deadlock](https://en.wikipedia.org/wiki/Deadlock)
46//! at the barrier which is [not considered unsafe](
47//! https://doc.rust-lang.org/reference/behavior-not-considered-unsafe.html).
48//!
49//! ```
50//! use threadpool::SingleQueueThreadpool;
51//! use std::sync::{Arc, Barrier};
52//! use std::sync::atomic::{AtomicUsize, Ordering};
53//!
54//! // create at least as many workers as jobs or you will deadlock yourself
55//! let n_workers = 42;
56//! let n_jobs = 23;
57//! let pool = threadpool::builder().num_workers(n_workers).build();
58//! let an_atomic = Arc::new(AtomicUsize::new(0));
59//!
60//! assert!(n_jobs <= n_workers, "too many jobs, will deadlock");
61//!
62//! // create a barrier that waits for all jobs plus the starter thread
63//! let barrier = Arc::new(Barrier::new(n_jobs + 1));
64//! for _ in 0..n_jobs {
65//! let barrier = barrier.clone();
66//! let an_atomic = an_atomic.clone();
67//!
68//! pool.execute(move|| {
69//! // do the heavy work
70//! an_atomic.fetch_add(1, Ordering::Relaxed);
71//!
72//! // then wait for the other threads
73//! barrier.wait();
74//! });
75//! }
76//!
77//! // wait for the threads to finish the work
78//! barrier.wait();
79//! assert_eq!(an_atomic.load(Ordering::SeqCst), n_jobs);
80//! ```
81
82use num_cpus;
83
84use crossbeam_channel::{ unbounded, Receiver, Sender };
85
86use std::fmt;
87use std::sync::atomic::{ AtomicUsize, Ordering };
88use std::sync::{ Arc, Condvar, Mutex };
89use std::thread;
90
91#[cfg(test)]
92mod test;
93
94/// Creates a new thread pool with the same number of workers as CPUs are detected.
95///
96/// # Examples
97///
98/// Create a new thread pool capable of executing at least one jobs concurrently:
99///
100/// ```
101/// let pool = lft_rust::single_queue_threadpool_auto_config();
102/// ```
103pub fn single_queue_threadpool_auto_config() -> SingleQueueThreadpool {
104 single_queue_threadpool_builder().build()
105}
106
107/// Initiate a new [`SingleQueueThreadpoolBuilder`].
108///
109/// [`SingleQueueThreadpoolBuilder`]: struct.SingleQueueThreadpoolBuilder.html
110///
111/// # Examples
112///
113/// ```
114/// let builder = lft_rust::single_queue_threadpool_builder();
115/// ```
116pub const fn single_queue_threadpool_builder() -> SingleQueueThreadpoolBuilder {
117 SingleQueueThreadpoolBuilder {
118 num_workers: None,
119 worker_name: None,
120 thread_stack_size: None,
121 }
122}
123
124trait FnBox {
125 fn call_box(self: Box<Self>);
126}
127
128impl<F: FnOnce()> FnBox for F {
129 fn call_box(self: Box<F>) {
130 (*self)()
131 }
132}
133
134type Thunk<'a> = Box<dyn FnBox + Send + 'a>;
135
136struct Sentinel<'a> {
137 shared_data: &'a Arc<SingleQueueThreadpoolSharedData>,
138 active: bool,
139}
140
141impl<'a> Sentinel<'a> {
142 fn new(shared_data: &'a Arc<SingleQueueThreadpoolSharedData>) -> Sentinel<'a> {
143 Sentinel {
144 shared_data: shared_data,
145 active: true,
146 }
147 }
148
149 /// Cancel and destroy this sentinel.
150 fn cancel(mut self) {
151 self.active = false;
152 }
153}
154
155impl<'a> Drop for Sentinel<'a> {
156 fn drop(&mut self) {
157 if self.active {
158 self.shared_data.active_count.fetch_sub(1, Ordering::SeqCst);
159 if thread::panicking() {
160 self.shared_data.panic_count.fetch_add(1, Ordering::SeqCst);
161 }
162 self.shared_data.no_work_notify_all();
163 spawn_in_pool(self.shared_data.clone())
164 }
165 }
166}
167
168/// [`SingleQueueThreadpool`] factory, which can be used in order to configure the properties of the
169/// [`SingleQueueThreadpool`].
170///
171/// The three configuration options available:
172///
173/// * `num_workers`: maximum number of threads that will be alive at any given moment by the built
174/// [`SingleQueueThreadpool`]
175/// * `worker_name`: thread name for each of the threads spawned by the built [`SingleQueueThreadpool`]
176/// * `thread_stack_size`: stack size (in bytes) for each of the threads spawned by the built
177/// [`SingleQueueThreadpool`]
178///
179/// [`SingleQueueThreadpool`]: struct.SingleQueueThreadpool.html
180///
181/// # Examples
182///
183/// Build a [`SingleQueueThreadpool`] that uses a maximum of eight threads simultaneously and each thread has
184/// a 8 MB stack size:
185///
186/// ```
187/// let pool = lft_rust::single_queue_threadpool_builder()
188/// .num_workers(8)
189/// .thread_stack_size(8 * 1024 * 1024)
190/// .build();
191/// ```
192#[derive(Clone, Default)]
193pub struct SingleQueueThreadpoolBuilder {
194 num_workers: Option<usize>,
195 worker_name: Option<String>,
196 thread_stack_size: Option<usize>,
197}
198
199impl SingleQueueThreadpoolBuilder {
200 /// Set the maximum number of worker-threads that will be alive at any given moment by the built
201 /// [`SingleQueueThreadpool`]. If not specified, defaults the number of threads to the number of CPUs.
202 ///
203 /// [`SingleQueueThreadpool`]: struct.SingleQueueThreadpool.html
204 ///
205 /// # Panics
206 ///
207 /// This method will panic if `num_workers` is 0.
208 ///
209 /// # Examples
210 ///
211 /// No more than eight threads will be alive simultaneously for this pool:
212 ///
213 /// ```
214 /// use std::thread;
215 ///
216 /// let pool = lft_rust::single_queue_threadpool_builder()
217 /// .num_workers(8)
218 /// .build();
219 ///
220 /// for _ in 0..42 {
221 /// pool.execute(|| {
222 /// println!("Hello from a worker thread!")
223 /// })
224 /// }
225 /// ```
226 pub fn num_workers(mut self, num_workers: usize) -> SingleQueueThreadpoolBuilder {
227 assert!(num_workers > 0);
228 self.num_workers = Some(num_workers);
229 self
230 }
231
232 /// Set the thread name for each of the threads spawned by the built [`SingleQueueThreadpool`]. If not
233 /// specified, threads spawned by the thread pool will be unnamed.
234 ///
235 /// [`SingleQueueThreadpool`]: struct.SingleQueueThreadpool.html
236 ///
237 /// # Examples
238 ///
239 /// Each thread spawned by this pool will have the name "foo":
240 ///
241 /// ```
242 /// use std::thread;
243 ///
244 /// let pool = lft_rust::single_queue_threadpool_builder()
245 /// .worker_name("foo")
246 /// .build();
247 ///
248 /// for _ in 0..100 {
249 /// pool.execute(|| {
250 /// assert_eq!(thread::current().name(), Some("foo"));
251 /// })
252 /// }
253 /// ```
254 pub fn worker_name<S: AsRef<str>>(mut self, name: S) -> SingleQueueThreadpoolBuilder {
255 // TODO save the copy with Into<String>
256 self.worker_name = Some(name.as_ref().to_owned());
257 self
258 }
259
260 /// Set the stack size (in bytes) for each of the threads spawned by the built [`SingleQueueThreadpool`].
261 /// If not specified, threads spawned by the threadpool will have a stack size [as specified in
262 /// the `std::thread` documentation][thread].
263 ///
264 /// [thread]: https://doc.rust-lang.org/nightly/std/thread/index.html#stack-size
265 /// [`SingleQueueThreadpool`]: struct.SingleQueueThreadpool.html
266 ///
267 /// # Examples
268 ///
269 /// Each thread spawned by this pool will have a 4 MB stack:
270 ///
271 /// ```
272 /// let pool = lft_rust::single_queue_threadpool_builder()
273 /// .thread_stack_size(4096 * 1024)
274 /// .build();
275 ///
276 /// for _ in 0..100 {
277 /// pool.execute(|| {
278 /// println!("This thread has a 4 MB stack size!");
279 /// })
280 /// }
281 /// ```
282 pub fn thread_stack_size(mut self, size: usize) -> SingleQueueThreadpoolBuilder {
283 self.thread_stack_size = Some(size);
284 self
285 }
286
287 /// Finalize the [`SingleQueueThreadpoolBuilder`] and build the [`SingleQueueThreadpool`].
288 ///
289 /// [`SingleQueueThreadpoolBuilder`]: struct.SingleQueueThreadpoolBuilder.html
290 /// [`SingleQueueThreadpool`]: struct.SingleQueueThreadpool.html
291 ///
292 /// # Examples
293 ///
294 /// ```
295 /// let pool = lft_rust::single_queue_threadpool_builder()
296 /// .num_workers(8)
297 /// .thread_stack_size(16*1024*1024)
298 /// .build();
299 /// ```
300 pub fn build(self) -> SingleQueueThreadpool {
301 let (tx, rx) = unbounded::<Thunk<'static>>();
302
303 let num_workers = self.num_workers.unwrap_or_else(num_cpus::get);
304
305 let shared_data = Arc::new(SingleQueueThreadpoolSharedData {
306 name: self.worker_name,
307 job_receiver: Mutex::new(rx),
308 empty_condvar: Condvar::new(),
309 empty_trigger: Mutex::new(()),
310 join_generation: AtomicUsize::new(0),
311 queued_count: AtomicUsize::new(0),
312 active_count: AtomicUsize::new(0),
313 max_thread_count: AtomicUsize::new(num_workers),
314 panic_count: AtomicUsize::new(0),
315 stack_size: self.thread_stack_size,
316 });
317
318 // Threadpool threads
319 for _ in 0..num_workers {
320 spawn_in_pool(shared_data.clone());
321 }
322
323 SingleQueueThreadpool {
324 jobs: tx,
325 shared_data: shared_data,
326 }
327 }
328}
329
330struct SingleQueueThreadpoolSharedData {
331 name: Option<String>,
332 job_receiver: Mutex<Receiver<Thunk<'static>>>,
333 empty_trigger: Mutex<()>,
334 empty_condvar: Condvar,
335 join_generation: AtomicUsize,
336 queued_count: AtomicUsize,
337 active_count: AtomicUsize,
338 max_thread_count: AtomicUsize,
339 panic_count: AtomicUsize,
340 stack_size: Option<usize>,
341}
342
343impl SingleQueueThreadpoolSharedData {
344 fn has_work(&self) -> bool {
345 self.queued_count.load(Ordering::SeqCst) > 0 || self.active_count.load(Ordering::SeqCst) > 0
346 }
347
348 /// Notify all observers joining this pool if there is no more work to do.
349 fn no_work_notify_all(&self) {
350 if !self.has_work() {
351 *self
352 .empty_trigger
353 .lock()
354 .expect("Unable to notify all joining threads");
355 self.empty_condvar.notify_all();
356 }
357 }
358}
359
360/// Abstraction of a thread pool for basic parallelism.
361pub struct SingleQueueThreadpool {
362 // How the threadpool communicates with subthreads.
363 //
364 // This is the only such Sender, so when it is dropped all subthreads will
365 // quit.
366 jobs: Sender<Thunk<'static>>,
367 shared_data: Arc<SingleQueueThreadpoolSharedData>,
368}
369
370impl SingleQueueThreadpool {
371 /// Executes the function `job` on a thread in the pool.
372 ///
373 /// # Examples
374 ///
375 /// Execute four jobs on a thread pool that can run two jobs concurrently:
376 ///
377 /// ```
378 /// let pool = lft_rust::single_queue_threadpool_auto_config();
379 /// pool.execute(|| println!("hello"));
380 /// pool.execute(|| println!("world"));
381 /// pool.execute(|| println!("foo"));
382 /// pool.execute(|| println!("bar"));
383 /// pool.join();
384 /// ```
385 pub fn execute<F>(&self, job: F)
386 where
387 F: FnOnce() + Send + 'static,
388 {
389 self.shared_data.queued_count.fetch_add(1, Ordering::SeqCst);
390 self.jobs
391 .send(Box::new(job))
392 .expect("SingleQueueThreadpool::execute unable to send job into queue.");
393 }
394
395 /// Returns the number of jobs waiting to executed in the pool.
396 ///
397 /// # Examples
398 ///
399 /// ```
400 /// use lft_rust::SingleQueueThreadpool;
401 /// use std::time::Duration;
402 /// use std::thread::sleep;
403 ///
404 /// let pool = lft_rust::single_queue_threadpool_builder()
405 /// .num_workers(2)
406 /// .build();
407 /// for _ in 0..10 {
408 /// pool.execute(|| {
409 /// sleep(Duration::from_secs(100));
410 /// });
411 /// }
412 ///
413 /// sleep(Duration::from_secs(1)); // wait for threads to start
414 /// assert_eq!(8, pool.queued_count());
415 /// ```
416 pub fn queued_count(&self) -> usize {
417 self.shared_data.queued_count.load(Ordering::Relaxed)
418 }
419
420 /// Returns the number of currently active worker threads.
421 ///
422 /// # Examples
423 ///
424 /// ```
425 /// use std::time::Duration;
426 /// use std::thread::sleep;
427 ///
428 /// let pool = lft_rust::single_queue_threadpool_builder()
429 /// .num_workers(4)
430 /// .build();
431 ///
432 /// for _ in 0..10 {
433 /// pool.execute(move || {
434 /// sleep(Duration::from_secs(100));
435 /// });
436 /// }
437 ///
438 /// sleep(Duration::from_secs(1)); // wait for threads to start
439 /// assert_eq!(4, pool.active_count());
440 /// ```
441 pub fn active_count(&self) -> usize {
442 self.shared_data.active_count.load(Ordering::SeqCst)
443 }
444
445 /// Returns the maximum number of threads the pool will execute concurrently.
446 ///
447 /// # Examples
448 ///
449 /// ```
450 /// let pool = lft_rust::single_queue_threadpool_builder()
451 /// .num_workers(4)
452 /// .build();
453 /// assert_eq!(4, pool.max_count());
454 ///
455 /// pool.set_num_workers(8);
456 /// assert_eq!(8, pool.max_count());
457 /// ```
458 pub fn max_count(&self) -> usize {
459 self.shared_data.max_thread_count.load(Ordering::Relaxed)
460 }
461
462 /// Returns the number of panicked threads over the lifetime of the pool.
463 ///
464 /// # Examples
465 ///
466 /// ```
467 /// let pool = lft_rust::single_queue_threadpool_auto_config();
468 /// for n in 0..10 {
469 /// pool.execute(move || {
470 /// // simulate a panic
471 /// if n % 2 == 0 {
472 /// panic!()
473 /// }
474 /// });
475 /// }
476 /// pool.join();
477 ///
478 /// assert_eq!(5, pool.panic_count());
479 /// ```
480 pub fn panic_count(&self) -> usize {
481 self.shared_data.panic_count.load(Ordering::Relaxed)
482 }
483
484 /// Sets the number of worker-threads to use as `num_workers`.
485 /// Can be used to change the threadpool size during runtime.
486 /// Will not abort already running or waiting threads.
487 ///
488 /// # Panics
489 ///
490 /// This function will panic if `num_workers` is 0.
491 ///
492 /// # Examples
493 ///
494 /// ```
495 /// use std::time::Duration;
496 /// use std::thread::sleep;
497 ///
498 /// let pool = lft_rust::single_queue_threadpool_builder()
499 /// .num_workers(4)
500 /// .build();
501 ///
502 /// for _ in 0..10 {
503 /// pool.execute(move || {
504 /// sleep(Duration::from_secs(100));
505 /// });
506 /// }
507 ///
508 /// sleep(Duration::from_secs(1)); // wait for threads to start
509 /// assert_eq!(4, pool.active_count());
510 /// assert_eq!(6, pool.queued_count());
511 ///
512 /// // Increase thread capacity of the pool
513 /// pool.set_num_workers(8);
514 ///
515 /// sleep(Duration::from_secs(1)); // wait for new threads to start
516 /// assert_eq!(8, pool.active_count());
517 /// assert_eq!(2, pool.queued_count());
518 ///
519 /// // Decrease thread capacity of the pool
520 /// // No active threads are killed
521 /// pool.set_num_workers(4);
522 ///
523 /// assert_eq!(8, pool.active_count());
524 /// assert_eq!(2, pool.queued_count());
525 /// ```
526 pub fn set_num_workers(&self, num_workers: usize) {
527 assert!(num_workers >= 1);
528 let prev_num_workers = self
529 .shared_data
530 .max_thread_count
531 .swap(num_workers, Ordering::Release);
532 if let Some(num_spawn) = num_workers.checked_sub(prev_num_workers) {
533 // Spawn new threads
534 for _ in 0..num_spawn {
535 spawn_in_pool(self.shared_data.clone());
536 }
537 }
538 }
539
540 /// Block the current thread until all jobs in the pool have been executed.
541 ///
542 /// Calling `join` on an empty pool will cause an immediate return.
543 /// `join` may be called from multiple threads concurrently.
544 /// A `join` is an atomic point in time. All threads joining before the join
545 /// event will exit together even if the pool is processing new jobs by the
546 /// time they get scheduled.
547 ///
548 /// Calling `join` from a thread within the pool will cause a deadlock. This
549 /// behavior is considered safe.
550 ///
551 /// **Note:** Join will not stop the worker threads. You will need to `drop`
552 /// all instances of `SingleQueueThreadpool` for the worker threads to terminate.
553 ///
554 /// # Examples
555 ///
556 /// ```
557 /// use lft_rust::SingleQueueThreadpool;
558 /// use std::sync::Arc;
559 /// use std::sync::atomic::{AtomicUsize, Ordering};
560 ///
561 /// let pool = lft_rust::single_queue_threadpool_auto_config();
562 /// let test_count = Arc::new(AtomicUsize::new(0));
563 ///
564 /// for _ in 0..42 {
565 /// let test_count = test_count.clone();
566 /// pool.execute(move || {
567 /// test_count.fetch_add(1, Ordering::Relaxed);
568 /// });
569 /// }
570 ///
571 /// pool.join();
572 /// assert_eq!(42, test_count.load(Ordering::Relaxed));
573 /// ```
574 pub fn join(&self) {
575 // fast path requires no mutex
576 if self.shared_data.has_work() == false {
577 return ();
578 }
579
580 let generation = self.shared_data.join_generation.load(Ordering::SeqCst);
581 let mut lock = self.shared_data.empty_trigger.lock().unwrap();
582
583 while generation == self.shared_data.join_generation.load(Ordering::Relaxed)
584 && self.shared_data.has_work()
585 {
586 lock = self.shared_data.empty_condvar.wait(lock).unwrap();
587 }
588
589 // increase generation if we are the first joining thread to come out of the loop
590 let _ = self.shared_data.join_generation.compare_exchange(
591 generation,
592 generation.wrapping_add(1),
593 Ordering::SeqCst,
594 Ordering::SeqCst,
595 );
596 }
597}
598
599impl Clone for SingleQueueThreadpool {
600 /// Cloning a pool will create a new handle to the pool.
601 /// The behavior is similar to [Arc](https://doc.rust-lang.org/stable/std/sync/struct.Arc.html).
602 ///
603 /// We could for example submit jobs from multiple threads concurrently.
604 ///
605 /// ```
606 /// use std::thread;
607 /// use crossbeam_channel::unbounded;
608 ///
609 /// let pool = lft_rust::single_queue_threadpool_builder()
610 /// .worker_name("clone example")
611 /// .num_workers(2)
612 /// .build();
613 ///
614 /// let results = (0..2)
615 /// .map(|i| {
616 /// let pool = pool.clone();
617 /// thread::spawn(move || {
618 /// let (tx, rx) = unbounded();
619 /// for i in 1..12 {
620 /// let tx = tx.clone();
621 /// pool.execute(move || {
622 /// tx.send(i).expect("channel will be waiting");
623 /// });
624 /// }
625 /// drop(tx);
626 /// if i == 0 {
627 /// rx.iter().fold(0, |accumulator, element| accumulator + element)
628 /// } else {
629 /// rx.iter().fold(1, |accumulator, element| accumulator * element)
630 /// }
631 /// })
632 /// })
633 /// .map(|join_handle| join_handle.join().expect("collect results from threads"))
634 /// .collect::<Vec<usize>>();
635 ///
636 /// assert_eq!(vec![66, 39916800], results);
637 /// ```
638 fn clone(&self) -> SingleQueueThreadpool {
639 SingleQueueThreadpool {
640 jobs: self.jobs.clone(),
641 shared_data: self.shared_data.clone(),
642 }
643 }
644}
645
646impl fmt::Debug for SingleQueueThreadpool {
647 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
648 f.debug_struct("SingleQueueThreadpool")
649 .field("name", &self.shared_data.name)
650 .field("queued_count", &self.queued_count())
651 .field("active_count", &self.active_count())
652 .field("max_count", &self.max_count())
653 .finish()
654 }
655}
656
657impl PartialEq for SingleQueueThreadpool {
658 /// Check if you are working with the same pool
659 ///
660 /// ```
661 /// let a = lft_rust::single_queue_threadpool_auto_config();
662 /// let b = lft_rust::single_queue_threadpool_auto_config();
663 ///
664 /// assert_eq!(a, a);
665 /// assert_eq!(b, b);
666 ///
667 /// assert_ne!(a, b);
668 /// assert_ne!(b, a);
669 /// ```
670 fn eq(&self, other: &SingleQueueThreadpool) -> bool {
671 Arc::ptr_eq(&self.shared_data, &other.shared_data)
672 }
673}
674impl Eq for SingleQueueThreadpool {}
675
676fn spawn_in_pool(shared_data: Arc<SingleQueueThreadpoolSharedData>) {
677 let mut builder = thread::Builder::new();
678 if let Some(ref name) = shared_data.name {
679 builder = builder.name(name.clone());
680 }
681 if let Some(ref stack_size) = shared_data.stack_size {
682 builder = builder.stack_size(stack_size.to_owned());
683 }
684 builder
685 .spawn(move || {
686 // Will spawn a new thread on panic unless it is cancelled.
687 let sentinel = Sentinel::new(&shared_data);
688
689 loop {
690 // Shutdown this thread if the pool has become smaller
691 let thread_counter_val = shared_data.active_count.load(Ordering::Acquire);
692 let max_thread_count_val = shared_data.max_thread_count.load(Ordering::Relaxed);
693 if thread_counter_val >= max_thread_count_val {
694 break;
695 }
696 let message = {
697 // Only lock jobs for the time it takes
698 // to get a job, not run it.
699 let lock = shared_data
700 .job_receiver
701 .lock()
702 .expect("Worker thread unable to lock job_receiver");
703 lock.recv()
704 };
705
706 let job = match message {
707 Ok(job) => job,
708 // The SingleQueueThreadpool was dropped.
709 Err(..) => break,
710 };
711 // Do not allow IR around the job execution
712 shared_data.active_count.fetch_add(1, Ordering::SeqCst);
713 shared_data.queued_count.fetch_sub(1, Ordering::SeqCst);
714
715 job.call_box();
716
717 shared_data.active_count.fetch_sub(1, Ordering::SeqCst);
718 shared_data.no_work_notify_all();
719 }
720
721 sentinel.cancel();
722 })
723 .unwrap();
724}