1#![cfg_attr(test, deny(warnings))]
2#![deny(missing_docs)]
3
4extern crate variance;
10extern crate crossbeam;
11
12#[macro_use]
13extern crate scopeguard;
14
15use variance::InvariantLifetime as Id;
16use crossbeam::sync::MsQueue;
17
18use std::{thread, mem};
19use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
20use std::sync::{Arc, Mutex, Condvar};
21
22#[derive(Clone, Default)]
28pub struct Pool {
29 wait: Arc<WaitGroup>,
30 inner: Arc<PoolInner>
31}
32
33impl Pool {
34 #[inline]
44 pub fn new(size: usize) -> Pool {
45 let pool = Pool::empty();
47
48 for _ in 0..size { pool.expand(); }
50
51 pool
52 }
53
54 #[inline]
64 pub fn with_thread_config(size: usize, thread_config: ThreadConfig) -> Pool {
65 let pool = Pool {
67 inner: Arc::new(PoolInner::with_thread_config(thread_config)),
68 ..Pool::default()
69 };
70
71 for _ in 0..size { pool.expand(); }
73
74 pool
75 }
76
77 #[inline]
82 pub fn empty() -> Pool {
83 Pool::default()
84 }
85
86 #[inline]
88 pub fn workers(&self) -> usize {
89 self.wait.waiting()
93 }
94
95 #[inline]
101 pub fn spawn<F: FnOnce() + Send + 'static>(&self, job: F) {
102 Scope::forever(self.clone()).execute(job)
104 }
105
106 #[inline]
114 pub fn scoped<'scope, F, R>(&self, scheduler: F) -> R
115 where F: FnOnce(&Scope<'scope>) -> R {
116 Scope::forever(self.clone()).zoom(scheduler)
118 }
119
120 #[inline]
129 pub fn shutdown(&self) {
130 self.inner.queue.push(PoolMessage::Quit);
132
133 self.wait.join()
135 }
136
137 #[inline]
141 pub fn expand(&self) {
142 let pool = self.clone();
143
144 pool.wait.submit();
146
147 let thread_number = self.inner.thread_counter.fetch_add(1, Ordering::SeqCst);
148
149 let mut builder = thread::Builder::new();
151 if let Some(ref prefix) = self.inner.thread_config.prefix {
152 let name = format!("{}{}", prefix, thread_number);
153 builder = builder.name(name);
154 }
155 if let Some(stack_size) = self.inner.thread_config.stack_size {
156 builder = builder.stack_size(stack_size);
157 }
158
159 builder.spawn(move || pool.run_thread()).unwrap();
161 }
162
163 fn run_thread(self) {
164 let mut thread_sentinel = ThreadSentinel(Some(self.clone()));
166
167 loop {
168 match self.inner.queue.pop() {
169 PoolMessage::Quit => {
171 self.inner.queue.push(PoolMessage::Quit);
173
174 thread_sentinel.cancel();
177
178 break
180 },
181
182 PoolMessage::Task(job, wait) => {
184 let sentinel = Sentinel(self.clone(), Some(wait.clone()));
185 job.run();
186 sentinel.cancel();
187 }
188 }
189 }
190 }
191}
192
193struct PoolInner {
194 queue: MsQueue<PoolMessage>,
195 thread_config: ThreadConfig,
196 thread_counter: AtomicUsize
197}
198
199impl PoolInner {
200 fn with_thread_config(thread_config: ThreadConfig) -> Self {
201 PoolInner { thread_config: thread_config, ..Self::default() }
202 }
203}
204
205impl Default for PoolInner {
206 fn default() -> Self {
207 PoolInner {
208 queue: MsQueue::new(),
209 thread_config: ThreadConfig::default(),
210 thread_counter: AtomicUsize::new(1)
211 }
212 }
213}
214
215#[derive(Default)]
218pub struct ThreadConfig {
219 prefix: Option<String>,
220 stack_size: Option<usize>,
221}
222
223impl ThreadConfig {
224 pub fn new() -> ThreadConfig {
227 ThreadConfig {
228 prefix: None,
229 stack_size: None,
230 }
231 }
232
233 pub fn prefix<S: Into<String>>(self, prefix: S) -> ThreadConfig {
237 ThreadConfig {
238 prefix: Some(prefix.into()),
239 ..self
240 }
241 }
242
243 pub fn stack_size(self, stack_size: usize) -> ThreadConfig {
245 ThreadConfig {
246 stack_size: Some(stack_size),
247 ..self
248 }
249 }
250}
251
252pub struct Scope<'scope> {
272 pool: Pool,
273 wait: Arc<WaitGroup>,
274 _scope: Id<'scope>
275}
276
277impl<'scope> Scope<'scope> {
278 #[inline]
280 pub fn forever(pool: Pool) -> Scope<'static> {
281 Scope {
282 pool: pool,
283 wait: Arc::new(WaitGroup::new()),
284 _scope: Id::default()
285 }
286 }
287
288 pub fn execute<F>(&self, job: F)
292 where F: FnOnce() + Send + 'scope {
293 self.wait.submit();
295
296 let task = unsafe {
297 mem::transmute::<Box<Task + Send + 'scope>,
300 Box<Task + Send + 'static>>(Box::new(job))
301 };
302
303 self.pool.inner.queue.push(PoolMessage::Task(task, self.wait.clone()));
305 }
306
307 pub fn recurse<F>(&self, job: F)
312 where F: FnOnce(&Self) + Send + 'scope {
313 let this = unsafe { self.clone() };
315
316 self.execute(move || job(&this));
317 }
318
319 pub fn zoom<'smaller, F, R>(&self, scheduler: F) -> R
323 where F: FnOnce(&Scope<'smaller>) -> R,
324 'scope: 'smaller {
325 let scope = unsafe { self.refine::<'smaller>() };
326
327 defer!(scope.join());
329
330 scheduler(&scope)
332 }
333
334 #[inline]
340 pub fn join(&self) {
341 self.wait.join()
342 }
343
344 #[inline]
345 unsafe fn clone(&self) -> Self {
346 Scope {
347 pool: self.pool.clone(),
348 wait: self.wait.clone(),
349 _scope: Id::default()
350 }
351 }
352
353 #[inline]
355 unsafe fn refine<'other>(&self) -> Scope<'other> where 'scope: 'other {
356 Scope {
357 pool: self.pool.clone(),
358 wait: Arc::new(WaitGroup::new()),
359 _scope: Id::default()
360 }
361 }
362}
363
364enum PoolMessage {
365 Quit,
366 Task(Box<Task + Send>, Arc<WaitGroup>)
367}
368
369pub struct WaitGroup {
374 pending: AtomicUsize,
375 poisoned: AtomicBool,
376 lock: Mutex<()>,
377 cond: Condvar
378}
379
380impl Default for WaitGroup {
381 fn default() -> Self {
382 WaitGroup {
383 pending: AtomicUsize::new(0),
384 poisoned: AtomicBool::new(false),
385 lock: Mutex::new(()),
386 cond: Condvar::new()
387 }
388 }
389}
390
391impl WaitGroup {
392 #[inline]
394 pub fn new() -> Self {
395 WaitGroup::default()
396 }
397
398 #[inline]
400 pub fn waiting(&self) -> usize {
401 self.pending.load(Ordering::SeqCst)
402 }
403
404 #[inline]
407 pub fn submit(&self) {
408 self.pending.fetch_add(1, Ordering::SeqCst);
409 }
410
411 #[inline]
413 pub fn complete(&self) {
414 let old = self.pending.fetch_sub(1, Ordering::SeqCst);
416
417 if old == 1 {
419 let _lock = self.lock.lock().unwrap();
420 self.cond.notify_all()
421 }
422 }
423
424 #[inline]
426 pub fn poison(&self) {
427 self.poisoned.store(true, Ordering::SeqCst);
429
430 let old = self.pending.fetch_sub(1, Ordering::SeqCst);
432
433 if old == 1 {
435 let _lock = self.lock.lock().unwrap();
436 self.cond.notify_all()
437 }
438 }
439
440 #[inline]
449 pub fn join(&self) {
450 let mut lock = self.lock.lock().unwrap();
451
452 while self.pending.load(Ordering::SeqCst) > 0 {
453 lock = self.cond.wait(lock).unwrap();
454 }
455
456 if self.poisoned.load(Ordering::SeqCst) {
457 panic!("WaitGroup explicitly poisoned!")
458 }
459 }
460}
461
462struct Sentinel(Pool, Option<Arc<WaitGroup>>);
466
467impl Sentinel {
468 fn cancel(mut self) {
469 self.1.take().map(|wait| wait.complete());
470 }
471}
472
473impl Drop for Sentinel {
474 fn drop(&mut self) {
475 self.1.take().map(|wait| wait.poison());
476 }
477}
478
479struct ThreadSentinel(Option<Pool>);
480
481impl ThreadSentinel {
482 fn cancel(&mut self) {
483 self.0.take().map(|pool| {
484 pool.wait.complete();
485 });
486 }
487}
488
489impl Drop for ThreadSentinel {
490 fn drop(&mut self) {
491 self.0.take().map(|pool| {
492 pool.expand();
497
498 pool.wait.poison();
500 });
501 }
502}
503
504trait Task {
505 fn run(self: Box<Self>);
506}
507
508impl<F: FnOnce()> Task for F {
509 fn run(self: Box<Self>) { (*self)() }
510}
511
512#[cfg(test)]
513mod test {
514 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
515 use std::time::Duration;
516 use std::thread::sleep;
517
518 use {Pool, Scope, ThreadConfig};
519
520 #[test]
521 fn test_simple_use() {
522 let pool = Pool::new(4);
523
524 let mut buf = [0, 0, 0, 0];
525
526 pool.scoped(|scope| {
527 for i in &mut buf {
528 scope.execute(move || *i += 1);
529 }
530 });
531
532 assert_eq!(&buf, &[1, 1, 1, 1]);
533 }
534
535 #[test]
536 fn test_zoom() {
537 let pool = Pool::new(4);
538
539 let mut outer = 0;
540
541 pool.scoped(|scope| {
542 let mut inner = 0;
543 scope.zoom(|scope2| scope2.execute(|| inner = 1));
544 assert_eq!(inner, 1);
545
546 outer = 1;
547 });
548
549 assert_eq!(outer, 1);
550 }
551
552 #[test]
553 fn test_recurse() {
554 let pool = Pool::new(12);
555
556 let mut buf = [0, 0, 0, 0];
557
558 pool.scoped(|next| {
559 next.recurse(|next| {
560 buf[0] = 1;
561
562 next.execute(|| {
563 buf[1] = 1;
564 });
565 });
566 });
567
568 assert_eq!(&buf, &[1, 1, 0, 0]);
569 }
570
571 #[test]
572 fn test_spawn_doesnt_hang() {
573 let pool = Pool::new(1);
574 pool.spawn(move || loop {});
575 }
576
577 #[test]
578 fn test_forever_zoom() {
579 let pool = Pool::new(16);
580 let forever = Scope::forever(pool.clone());
581
582 let ran = AtomicBool::new(false);
583
584 forever.zoom(|scope| scope.execute(|| ran.store(true, Ordering::SeqCst)));
585
586 assert!(ran.load(Ordering::SeqCst));
587 }
588
589 #[test]
590 fn test_shutdown() {
591 let pool = Pool::new(4);
592 pool.shutdown();
593 }
594
595 #[test]
596 #[should_panic]
597 fn test_scheduler_panic() {
598 let pool = Pool::new(4);
599 pool.scoped(|_| panic!());
600 }
601
602 #[test]
603 #[should_panic]
604 fn test_scoped_execute_panic() {
605 let pool = Pool::new(4);
606 pool.scoped(|scope| scope.execute(|| panic!()));
607 }
608
609 #[test]
610 #[should_panic]
611 fn test_pool_panic() {
612 let _pool = Pool::new(1);
613 panic!();
614 }
615
616 #[test]
617 #[should_panic]
618 fn test_zoomed_scoped_execute_panic() {
619 let pool = Pool::new(4);
620 pool.scoped(|scope| scope.zoom(|scope2| scope2.execute(|| panic!())));
621 }
622
623 #[test]
624 #[should_panic]
625 fn test_recurse_scheduler_panic() {
626 let pool = Pool::new(4);
627 pool.scoped(|scope| scope.recurse(|_| panic!()));
628 }
629
630 #[test]
631 #[should_panic]
632 fn test_recurse_execute_panic() {
633 let pool = Pool::new(4);
634 pool.scoped(|scope| scope.recurse(|scope2| scope2.execute(|| panic!())));
635 }
636
637 struct Canary<'a> {
638 drops: DropCounter<'a>,
639 expected: usize
640 }
641
642 #[derive(Clone)]
643 struct DropCounter<'a>(&'a AtomicUsize);
644
645 impl<'a> Drop for DropCounter<'a> {
646 fn drop(&mut self) {
647 self.0.fetch_add(1, Ordering::SeqCst);
648 }
649 }
650
651 impl<'a> Drop for Canary<'a> {
652 fn drop(&mut self) {
653 let drops = self.drops.0.load(Ordering::SeqCst);
654 assert_eq!(drops, self.expected);
655 }
656 }
657
658 #[test]
659 #[should_panic]
660 fn test_scoped_panic_waits_for_all_tasks() {
661 let tasks = 50;
662 let panicking_task_fraction = 10;
663 let panicking_tasks = tasks / panicking_task_fraction;
664 let expected_drops = tasks + panicking_tasks;
665
666 let counter = Box::new(AtomicUsize::new(0));
667 let drops = DropCounter(&*counter);
668
669 let _canary = Canary {
671 drops: drops.clone(),
672 expected: expected_drops
673 };
674
675 let pool = Pool::new(12);
676
677 pool.scoped(|scope| {
678 for task in 0..tasks {
679 let drop_counter = drops.clone();
680
681 scope.execute(move || {
682 sleep(Duration::from_millis(10));
683
684 drop::<DropCounter>(drop_counter);
685 });
686
687 if task % panicking_task_fraction == 0 {
688 let drop_counter = drops.clone();
689
690 scope.execute(move || {
691 let _drops = drop_counter;
693 panic!();
694 });
695 }
696 }
697 });
698 }
699
700 #[test]
701 #[should_panic]
702 fn test_scheduler_panic_waits_for_tasks() {
703 let tasks = 50;
704 let counter = Box::new(AtomicUsize::new(0));
705 let drops = DropCounter(&*counter);
706
707 let _canary = Canary {
708 drops: drops.clone(),
709 expected: tasks
710 };
711
712 let pool = Pool::new(12);
713
714 pool.scoped(|scope| {
715 for _ in 0..tasks {
716 let drop_counter = drops.clone();
717
718 scope.execute(move || {
719 sleep(Duration::from_millis(25));
720 drop::<DropCounter>(drop_counter);
721 });
722 }
723
724 panic!();
725 });
726 }
727
728 #[test]
729 fn test_no_thread_config() {
730 let pool = Pool::new(1);
731
732 pool.scoped(|scope| {
733 scope.execute(|| {
734 assert!(::std::thread::current().name().is_none());
735 });
736 });
737 }
738
739 #[test]
740 fn test_with_thread_config() {
741 let config = ThreadConfig::new().prefix("pool-");
742
743 let pool = Pool::with_thread_config(1, config);
744
745 pool.scoped(|scope| {
746 scope.execute(|| {
747 assert_eq!(::std::thread::current().name().unwrap(), "pool-1");
748 });
749 });
750 }
751}
752