1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
use std::thread; use std::sync::{atomic, Arc}; pub trait Work: Send + Sized + 'static { fn work(self) -> Option<Self>; } pub trait WorkFactory: Sync + Send + Sized + 'static { type Work: Work + 'static; fn build(&self) -> Self::Work; } impl<W, F> WorkFactory for F where W: Work, F: Fn() -> W + Sync + Send + 'static, { type Work = W; fn build(&self) -> W { self() } } pub struct Baton<F> where F: WorkFactory, { sentinel: Arc<Sentinel<F>>, } struct Sentinel<F> where F: WorkFactory, { work_factory: F, desired: atomic::AtomicUsize, count: atomic::AtomicUsize, } impl<F> Sentinel<F> where F: WorkFactory, { fn new(desired: usize, work_factory: F) -> Self { let count = atomic::AtomicUsize::new(0); let desired = atomic::AtomicUsize::new(desired); Sentinel { work_factory, desired, count } } } fn balance<F>(sentinel: &Arc<Sentinel<F>>) where F: WorkFactory, { loop { let curr = sentinel.count.load(atomic::Ordering::SeqCst); if curr >= sentinel.desired.load(atomic::Ordering::Relaxed) { break; } let prev = sentinel.count.compare_and_swap(curr, curr + 1, atomic::Ordering::SeqCst); if prev != curr { continue; } let baton_sentinel = sentinel.clone(); let work = sentinel.work_factory.build(); thread::spawn(move || { let baton = Baton { sentinel: baton_sentinel }; let mut work = work; loop { match work.work() { Some(next_work) => work = next_work, None => { baton.sentinel.desired.store(0, atomic::Ordering::SeqCst); return; }, } } }); } } impl<F> Drop for Baton<F> where F: WorkFactory, { fn drop(&mut self) { self.sentinel.count.fetch_sub(1, atomic::Ordering::SeqCst); balance(&self.sentinel); } } pub fn spawn<F>(desired: usize, work_factory: F) where F: WorkFactory { let sentinel = Arc::new(Sentinel::new(desired, work_factory)); balance(&sentinel); }