scoped_pool/
lib.rs

1#![cfg_attr(test, deny(warnings))]
2#![deny(missing_docs)]
3
4//! # scoped-pool
5//!
6//! A flexible thread pool providing scoped threads.
7//!
8
9extern crate variance;
10extern crate crossbeam;
11
12#[macro_use]
13extern crate scopeguard;
14
15use variance::InvariantLifetime as Id;
16use crossbeam::sync::MsQueue;
17
18use std::{thread, mem};
19use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
20use std::sync::{Arc, Mutex, Condvar};
21
22/// A thread-pool providing scoped and unscoped threads.
23///
24/// The primary ways of interacting with the `Pool` are
25/// the `spawn` and `scoped` convenience methods or through
26/// the `Scope` type directly.
27#[derive(Clone, Default)]
28pub struct Pool {
29    wait: Arc<WaitGroup>,
30    inner: Arc<PoolInner>
31}
32
33impl Pool {
34    /// Create a new Pool with `size` threads.
35    ///
36    /// If `size` is zero, no threads will be spawned. Threads can
37    /// be added later via `expand`.
38    ///
39    /// NOTE: Since Pool can be freely cloned, it does not represent a unique
40    /// handle to the thread pool. As a consequence, the thread pool is not
41    /// automatically shut down; you must explicitly call `Pool::shutdown` to
42    /// shut down the pool.
43    #[inline]
44    pub fn new(size: usize) -> Pool {
45        // Create an empty pool.
46        let pool = Pool::empty();
47
48        // Start the requested number of threads.
49        for _ in 0..size { pool.expand(); }
50
51        pool
52    }
53
54    /// Create a new Pool with `size` threads and given thread config.
55    ///
56    /// If `size` is zero, no threads will be spawned. Threads can
57    /// be added later via `expand`.
58    ///
59    /// NOTE: Since Pool can be freely cloned, it does not represent a unique
60    /// handle to the thread pool. As a consequence, the thread pool is not
61    /// automatically shut down; you must explicitly call `Pool::shutdown` to
62    /// shut down the pool.
63    #[inline]
64    pub fn with_thread_config(size: usize, thread_config: ThreadConfig) -> Pool {
65        // Create an empty pool with configuration.
66        let pool = Pool {
67            inner: Arc::new(PoolInner::with_thread_config(thread_config)),
68            ..Pool::default()
69        };
70
71        // Start the requested number of threads.
72        for _ in 0..size { pool.expand(); }
73
74        pool
75    }
76
77    /// Create an empty Pool, with no threads.
78    ///
79    /// Note that no jobs will run until `expand` is called and
80    /// worker threads are added.
81    #[inline]
82    pub fn empty() -> Pool {
83        Pool::default()
84    }
85
86    /// How many worker threads are currently active.
87    #[inline]
88    pub fn workers(&self) -> usize {
89        // All threads submit themselves when they start and
90        // complete when they stop, so the threads we are waiting
91        // for are still active.
92        self.wait.waiting()
93    }
94
95    /// Spawn a `'static'` job to be run on this pool.
96    ///
97    /// We do not wait on the job to complete.
98    ///
99    /// Panics in the job will propogate to the calling thread.
100    #[inline]
101    pub fn spawn<F: FnOnce() + Send + 'static>(&self, job: F) {
102        // Run the job on a scope which lasts forever, and won't block.
103        Scope::forever(self.clone()).execute(job)
104    }
105
106    /// Create a Scope for scheduling a group of jobs in `'scope'`.
107    ///
108    /// `scoped` will return only when the `scheduler` function and
109    /// all jobs queued on the given Scope have been run.
110    ///
111    /// Panics in any of the jobs or in the scheduler function itself
112    /// will propogate to the calling thread.
113    #[inline]
114    pub fn scoped<'scope, F, R>(&self, scheduler: F) -> R
115    where F: FnOnce(&Scope<'scope>) -> R {
116        // Zoom to the correct scope, then run the scheduler.
117        Scope::forever(self.clone()).zoom(scheduler)
118    }
119
120    /// Shutdown the Pool.
121    ///
122    /// WARNING: Extreme care should be taken to not call shutdown concurrently
123    /// with any scoped calls, or deadlock can occur.
124    ///
125    /// All threads will be shut down eventually, but only threads started before the
126    /// call to shutdown are guaranteed to be shut down before the call to shutdown
127    /// returns.
128    #[inline]
129    pub fn shutdown(&self) {
130        // Start the shutdown process.
131        self.inner.queue.push(PoolMessage::Quit);
132
133        // Wait for it to complete.
134        self.wait.join()
135    }
136
137    /// Expand the Pool by spawning an additional thread.
138    ///
139    /// Can accelerate the completion of running jobs.
140    #[inline]
141    pub fn expand(&self) {
142        let pool = self.clone();
143
144        // Submit the new thread to the thread waitgroup.
145        pool.wait.submit();
146
147        let thread_number = self.inner.thread_counter.fetch_add(1, Ordering::SeqCst);
148
149        // Deal with thread configuration.
150        let mut builder = thread::Builder::new();
151        if let Some(ref prefix) = self.inner.thread_config.prefix {
152            let name = format!("{}{}", prefix, thread_number);
153            builder = builder.name(name);
154        }
155        if let Some(stack_size) = self.inner.thread_config.stack_size {
156            builder = builder.stack_size(stack_size);
157        }
158
159        // Start the actual thread.
160        builder.spawn(move || pool.run_thread()).unwrap();
161    }
162
163    fn run_thread(self) {
164        // Create a sentinel to capture panics on this thread.
165        let mut thread_sentinel = ThreadSentinel(Some(self.clone()));
166
167        loop {
168            match self.inner.queue.pop() {
169                // On Quit, repropogate and quit.
170                PoolMessage::Quit => {
171                    // Repropogate the Quit message to other threads.
172                    self.inner.queue.push(PoolMessage::Quit);
173
174                    // Cancel the thread sentinel so we don't panic waiting
175                    // shutdown threads, and don't restart the thread.
176                    thread_sentinel.cancel();
177
178                    // Terminate the thread.
179                    break
180                },
181
182                // On Task, run the task then complete the WaitGroup.
183                PoolMessage::Task(job, wait) => {
184                    let sentinel = Sentinel(self.clone(), Some(wait.clone()));
185                    job.run();
186                    sentinel.cancel();
187                }
188            }
189        }
190    }
191}
192
193struct PoolInner {
194    queue: MsQueue<PoolMessage>,
195    thread_config: ThreadConfig,
196    thread_counter: AtomicUsize
197}
198
199impl PoolInner {
200    fn with_thread_config(thread_config: ThreadConfig) -> Self {
201        PoolInner { thread_config: thread_config, ..Self::default() }
202    }
203}
204
205impl Default for PoolInner {
206    fn default() -> Self {
207        PoolInner {
208            queue: MsQueue::new(),
209            thread_config: ThreadConfig::default(),
210            thread_counter: AtomicUsize::new(1)
211        }
212    }
213}
214
215/// Thread configuration. Provides detailed control over the properties and behavior of new
216/// threads.
217#[derive(Default)]
218pub struct ThreadConfig {
219    prefix: Option<String>,
220    stack_size: Option<usize>,
221}
222
223impl ThreadConfig {
224    /// Generates the base configuration for spawning a thread, from which configuration methods
225    /// can be chained.
226    pub fn new() -> ThreadConfig {
227        ThreadConfig {
228            prefix: None,
229            stack_size: None,
230        }
231    }
232
233    /// Name prefix of spawned threads. Thread number will be appended to this prefix to form each
234    /// thread's unique name. Currently the name is used for identification only in panic
235    /// messages.
236    pub fn prefix<S: Into<String>>(self, prefix: S) -> ThreadConfig {
237        ThreadConfig {
238            prefix: Some(prefix.into()),
239            ..self
240        }
241    }
242
243    /// Sets the size of the stack for the new thread.
244    pub fn stack_size(self, stack_size: usize) -> ThreadConfig {
245        ThreadConfig {
246            stack_size: Some(stack_size),
247            ..self
248        }
249    }
250}
251
252/// An execution scope, represents a set of jobs running on a Pool.
253///
254/// ## Understanding Scope lifetimes
255///
256/// Besides `Scope<'static>`, all `Scope` objects are accessed behind a
257/// reference of the form `&'scheduler Scope<'scope>`.
258///
259/// `'scheduler` is the lifetime associated with the *body* of the
260/// "scheduler" function (functions passed to `zoom`/`scoped`).
261///
262/// `'scope` is the lifetime which data captured in `execute` or `recurse`
263/// closures must outlive - in other words, `'scope` is the maximum lifetime
264/// of all jobs scheduler on a `Scope`.
265///
266/// Note that since `'scope: 'scheduler` (`'scope` outlives `'scheduler`)
267/// `&'scheduler Scope<'scope>` can't be captured in an `execute` closure;
268/// this is the reason for the existence of the `recurse` API, which will
269/// inject the same scope with a new `'scheduler` lifetime (this time set
270/// to the body of the function passed to `recurse`).
271pub struct Scope<'scope> {
272    pool: Pool,
273    wait: Arc<WaitGroup>,
274    _scope: Id<'scope>
275}
276
277impl<'scope> Scope<'scope> {
278    /// Create a Scope which lasts forever.
279    #[inline]
280    pub fn forever(pool: Pool) -> Scope<'static> {
281        Scope {
282            pool: pool,
283            wait: Arc::new(WaitGroup::new()),
284            _scope: Id::default()
285        }
286    }
287
288    /// Add a job to this scope.
289    ///
290    /// Subsequent calls to `join` will wait for this job to complete.
291    pub fn execute<F>(&self, job: F)
292    where F: FnOnce() + Send + 'scope {
293        // Submit the job *before* submitting it to the queue.
294        self.wait.submit();
295
296        let task = unsafe {
297            // Safe because we will ensure the task finishes executing before
298            // 'scope via joining before the resolution of `'scope`.
299            mem::transmute::<Box<Task + Send + 'scope>,
300                             Box<Task + Send + 'static>>(Box::new(job))
301        };
302
303        // Submit the task to be executed.
304        self.pool.inner.queue.push(PoolMessage::Task(task, self.wait.clone()));
305    }
306
307    /// Add a job to this scope which itself will get access to the scope.
308    ///
309    /// Like with `execute`, subsequent calls to `join` will wait for this
310    /// job (and all jobs scheduled on the scope it receives) to complete.
311    pub fn recurse<F>(&self, job: F)
312    where F: FnOnce(&Self) + Send + 'scope {
313        // Create another scope with the *same* lifetime.
314        let this = unsafe { self.clone() };
315
316        self.execute(move || job(&this));
317    }
318
319    /// Create a new subscope, bound to a lifetime smaller than our existing Scope.
320    ///
321    /// The subscope has a different job set, and is joined before zoom returns.
322    pub fn zoom<'smaller, F, R>(&self, scheduler: F) -> R
323    where F: FnOnce(&Scope<'smaller>) -> R,
324          'scope: 'smaller {
325        let scope = unsafe { self.refine::<'smaller>() };
326
327        // Join the scope either on completion of the scheduler or panic.
328        defer!(scope.join());
329
330        // Schedule all tasks then join all tasks
331        scheduler(&scope)
332    }
333
334    /// Awaits all jobs submitted on this Scope to be completed.
335    ///
336    /// Only guaranteed to join jobs which where `execute`d logically
337    /// prior to `join`. Jobs `execute`d concurrently with `join` may
338    /// or may not be completed before `join` returns.
339    #[inline]
340    pub fn join(&self) {
341        self.wait.join()
342    }
343
344    #[inline]
345    unsafe fn clone(&self) -> Self {
346        Scope {
347            pool: self.pool.clone(),
348            wait: self.wait.clone(),
349            _scope: Id::default()
350        }
351    }
352
353    // Create a new scope with a smaller lifetime on the same pool.
354    #[inline]
355    unsafe fn refine<'other>(&self) -> Scope<'other> where 'scope: 'other {
356        Scope {
357            pool: self.pool.clone(),
358            wait: Arc::new(WaitGroup::new()),
359            _scope: Id::default()
360        }
361    }
362}
363
364enum PoolMessage {
365    Quit,
366    Task(Box<Task + Send>, Arc<WaitGroup>)
367}
368
369/// A synchronization primitive for awaiting a set of actions.
370///
371/// Adding new jobs is done with `submit`, jobs are completed with `complete`,
372/// and any thread may wait for all jobs to be `complete`d with `join`.
373pub struct WaitGroup {
374    pending: AtomicUsize,
375    poisoned: AtomicBool,
376    lock: Mutex<()>,
377    cond: Condvar
378}
379
380impl Default for WaitGroup {
381    fn default() -> Self {
382        WaitGroup {
383            pending: AtomicUsize::new(0),
384            poisoned: AtomicBool::new(false),
385            lock: Mutex::new(()),
386            cond: Condvar::new()
387        }
388    }
389}
390
391impl WaitGroup {
392    /// Create a new empty WaitGroup.
393    #[inline]
394    pub fn new() -> Self {
395        WaitGroup::default()
396    }
397
398    /// How many submitted tasks are waiting for completion.
399    #[inline]
400    pub fn waiting(&self) -> usize {
401        self.pending.load(Ordering::SeqCst)
402    }
403
404    /// Submit to this WaitGroup, causing `join` to wait
405    /// for an additional `complete`.
406    #[inline]
407    pub fn submit(&self) {
408        self.pending.fetch_add(1, Ordering::SeqCst);
409    }
410
411    /// Complete a previous `submit`.
412    #[inline]
413    pub fn complete(&self) {
414        // Mark the current job complete.
415        let old = self.pending.fetch_sub(1, Ordering::SeqCst);
416
417        // If that was the last job, wake joiners.
418        if old == 1 {
419            let _lock = self.lock.lock().unwrap();
420            self.cond.notify_all()
421        }
422    }
423
424    /// Poison the WaitGroup so all `join`ing threads panic.
425    #[inline]
426    pub fn poison(&self) {
427        // Poison the waitgroup.
428        self.poisoned.store(true, Ordering::SeqCst);
429
430        // Mark the current job complete.
431        let old = self.pending.fetch_sub(1, Ordering::SeqCst);
432
433        // If that was the last job, wake joiners.
434        if old == 1 {
435            let _lock = self.lock.lock().unwrap();
436            self.cond.notify_all()
437        }
438    }
439
440    /// Wait for `submit`s to this WaitGroup to be `complete`d.
441    ///
442    /// Submits occuring completely before joins will always be waited on.
443    ///
444    /// Submits occuring concurrently with a `join` may or may not
445    /// be waited for.
446    ///
447    /// Before submitting, `join` will always return immediately.
448    #[inline]
449    pub fn join(&self) {
450        let mut lock = self.lock.lock().unwrap();
451
452        while self.pending.load(Ordering::SeqCst) > 0 {
453            lock = self.cond.wait(lock).unwrap();
454        }
455
456        if self.poisoned.load(Ordering::SeqCst) {
457            panic!("WaitGroup explicitly poisoned!")
458        }
459    }
460}
461
462// Poisons the given pool on drop unless canceled.
463//
464// Used to ensure panic propogation between jobs and waiting threads.
465struct Sentinel(Pool, Option<Arc<WaitGroup>>);
466
467impl Sentinel {
468    fn cancel(mut self) {
469        self.1.take().map(|wait| wait.complete());
470    }
471}
472
473impl Drop for Sentinel {
474    fn drop(&mut self) {
475        self.1.take().map(|wait| wait.poison());
476    }
477}
478
479struct ThreadSentinel(Option<Pool>);
480
481impl ThreadSentinel {
482    fn cancel(&mut self) {
483        self.0.take().map(|pool| {
484            pool.wait.complete();
485        });
486    }
487}
488
489impl Drop for ThreadSentinel {
490    fn drop(&mut self) {
491        self.0.take().map(|pool| {
492            // NOTE: We restart the thread first so we don't accidentally
493            // hit zero threads before restarting.
494
495            // Restart the thread.
496            pool.expand();
497
498            // Poison the pool.
499            pool.wait.poison();
500        });
501    }
502}
503
504trait Task {
505    fn run(self: Box<Self>);
506}
507
508impl<F: FnOnce()> Task for F {
509    fn run(self: Box<Self>) { (*self)() }
510}
511
512#[cfg(test)]
513mod test {
514    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
515    use std::time::Duration;
516    use std::thread::sleep;
517
518    use {Pool, Scope, ThreadConfig};
519
520    #[test]
521    fn test_simple_use() {
522        let pool = Pool::new(4);
523
524        let mut buf = [0, 0, 0, 0];
525
526        pool.scoped(|scope| {
527            for i in &mut buf {
528                scope.execute(move || *i += 1);
529            }
530        });
531
532        assert_eq!(&buf, &[1, 1, 1, 1]);
533    }
534
535    #[test]
536    fn test_zoom() {
537        let pool = Pool::new(4);
538
539        let mut outer = 0;
540
541        pool.scoped(|scope| {
542            let mut inner = 0;
543            scope.zoom(|scope2| scope2.execute(|| inner = 1));
544            assert_eq!(inner, 1);
545
546            outer = 1;
547        });
548
549        assert_eq!(outer, 1);
550    }
551
552    #[test]
553    fn test_recurse() {
554        let pool = Pool::new(12);
555
556        let mut buf = [0, 0, 0, 0];
557
558        pool.scoped(|next| {
559            next.recurse(|next| {
560                buf[0] = 1;
561
562                next.execute(|| {
563                    buf[1] = 1;
564                });
565            });
566        });
567
568        assert_eq!(&buf, &[1, 1, 0, 0]);
569    }
570
571    #[test]
572    fn test_spawn_doesnt_hang() {
573        let pool = Pool::new(1);
574        pool.spawn(move || loop {});
575    }
576
577    #[test]
578    fn test_forever_zoom() {
579        let pool = Pool::new(16);
580        let forever = Scope::forever(pool.clone());
581
582        let ran = AtomicBool::new(false);
583
584        forever.zoom(|scope| scope.execute(|| ran.store(true, Ordering::SeqCst)));
585
586        assert!(ran.load(Ordering::SeqCst));
587    }
588
589    #[test]
590    fn test_shutdown() {
591        let pool = Pool::new(4);
592        pool.shutdown();
593    }
594
595    #[test]
596    #[should_panic]
597    fn test_scheduler_panic() {
598        let pool = Pool::new(4);
599        pool.scoped(|_| panic!());
600    }
601
602    #[test]
603    #[should_panic]
604    fn test_scoped_execute_panic() {
605        let pool = Pool::new(4);
606        pool.scoped(|scope| scope.execute(|| panic!()));
607    }
608
609    #[test]
610    #[should_panic]
611    fn test_pool_panic() {
612        let _pool = Pool::new(1);
613        panic!();
614    }
615
616    #[test]
617    #[should_panic]
618    fn test_zoomed_scoped_execute_panic() {
619        let pool = Pool::new(4);
620        pool.scoped(|scope| scope.zoom(|scope2| scope2.execute(|| panic!())));
621    }
622
623    #[test]
624    #[should_panic]
625    fn test_recurse_scheduler_panic() {
626        let pool = Pool::new(4);
627        pool.scoped(|scope| scope.recurse(|_| panic!()));
628    }
629
630    #[test]
631    #[should_panic]
632    fn test_recurse_execute_panic() {
633        let pool = Pool::new(4);
634        pool.scoped(|scope| scope.recurse(|scope2| scope2.execute(|| panic!())));
635    }
636
637    struct Canary<'a> {
638        drops: DropCounter<'a>,
639        expected: usize
640    }
641
642    #[derive(Clone)]
643    struct DropCounter<'a>(&'a AtomicUsize);
644
645    impl<'a> Drop for DropCounter<'a> {
646        fn drop(&mut self) {
647            self.0.fetch_add(1, Ordering::SeqCst);
648        }
649    }
650
651    impl<'a> Drop for Canary<'a> {
652        fn drop(&mut self) {
653            let drops = self.drops.0.load(Ordering::SeqCst);
654            assert_eq!(drops, self.expected);
655        }
656    }
657
658    #[test]
659    #[should_panic]
660    fn test_scoped_panic_waits_for_all_tasks() {
661        let tasks = 50;
662        let panicking_task_fraction = 10;
663        let panicking_tasks = tasks / panicking_task_fraction;
664        let expected_drops = tasks + panicking_tasks;
665
666        let counter = Box::new(AtomicUsize::new(0));
667        let drops = DropCounter(&*counter);
668
669        // Actual check occurs on drop of this during unwinding.
670        let _canary = Canary {
671            drops: drops.clone(),
672            expected: expected_drops
673        };
674
675        let pool = Pool::new(12);
676
677        pool.scoped(|scope| {
678            for task in 0..tasks {
679                let drop_counter = drops.clone();
680
681                scope.execute(move || {
682                    sleep(Duration::from_millis(10));
683
684                    drop::<DropCounter>(drop_counter);
685                });
686
687                if task % panicking_task_fraction == 0 {
688                    let drop_counter = drops.clone();
689
690                    scope.execute(move || {
691                        // Just make sure we capture it.
692                        let _drops = drop_counter;
693                        panic!();
694                    });
695                }
696            }
697        });
698    }
699
700    #[test]
701    #[should_panic]
702    fn test_scheduler_panic_waits_for_tasks() {
703        let tasks = 50;
704        let counter = Box::new(AtomicUsize::new(0));
705        let drops = DropCounter(&*counter);
706
707        let _canary = Canary {
708            drops: drops.clone(),
709            expected: tasks
710        };
711
712        let pool = Pool::new(12);
713
714        pool.scoped(|scope| {
715            for _ in 0..tasks {
716                let drop_counter = drops.clone();
717
718                scope.execute(move || {
719                    sleep(Duration::from_millis(25));
720                    drop::<DropCounter>(drop_counter);
721                });
722            }
723
724            panic!();
725        });
726    }
727
728    #[test]
729    fn test_no_thread_config() {
730        let pool = Pool::new(1);
731
732        pool.scoped(|scope| {
733            scope.execute(|| {
734                assert!(::std::thread::current().name().is_none());
735            });
736        });
737    }
738
739    #[test]
740    fn test_with_thread_config() {
741        let config = ThreadConfig::new().prefix("pool-");
742
743        let pool = Pool::with_thread_config(1, config);
744
745        pool.scoped(|scope| {
746            scope.execute(|| {
747                assert_eq!(::std::thread::current().name().unwrap(), "pool-1");
748            });
749        });
750    }
751}
752