1use std::thread;
2use std::sync::{atomic, Arc};
3
4pub trait Work: Send + Sized + 'static {
5 fn work(self) -> Option<Self>;
6}
7
8pub trait WorkFactory: Sync + Send + Sized + 'static {
9 type Work: Work + 'static;
10 fn build(&self) -> Self::Work;
11}
12impl<W, F> WorkFactory for F
13where
14 W: Work,
15 F: Fn() -> W + Sync + Send + 'static,
16{
17 type Work = W;
18 fn build(&self) -> W {
19 self()
20 }
21}
22
23pub struct Baton<F>
24where
25 F: WorkFactory,
26{
27 sentinel: Arc<Sentinel<F>>,
28}
29
30struct Sentinel<F>
31where
32 F: WorkFactory,
33{
34 work_factory: F,
35 desired: atomic::AtomicUsize,
36 count: atomic::AtomicUsize,
37}
38impl<F> Sentinel<F>
39where
40 F: WorkFactory,
41{
42 fn new(desired: usize, work_factory: F) -> Self {
43 let count = atomic::AtomicUsize::new(0);
44 let desired = atomic::AtomicUsize::new(desired);
45 Sentinel { work_factory, desired, count }
46 }
47}
48
49fn balance<F>(sentinel: &Arc<Sentinel<F>>)
50where
51 F: WorkFactory,
52{
53 loop {
54 let curr = sentinel.count.load(atomic::Ordering::SeqCst);
55 if curr >= sentinel.desired.load(atomic::Ordering::Relaxed) {
56 break;
57 }
58 let prev = sentinel.count.compare_and_swap(curr, curr + 1, atomic::Ordering::SeqCst);
59 if prev != curr {
60 continue;
61 }
62 let baton_sentinel = sentinel.clone();
63 let work = sentinel.work_factory.build();
64 thread::spawn(move || {
65 let baton = Baton { sentinel: baton_sentinel };
66 let mut work = work;
67 loop {
68 match work.work() {
69 Some(next_work) => work = next_work,
70 None => {
71 baton.sentinel.desired.store(0, atomic::Ordering::SeqCst);
72 return;
73 },
74 }
75 }
76 });
77 }
78}
79
80impl<F> Drop for Baton<F>
81where
82 F: WorkFactory,
83{
84 fn drop(&mut self) {
85 self.sentinel.count.fetch_sub(1, atomic::Ordering::SeqCst);
86
87 balance(&self.sentinel);
88 }
89}
90
91pub fn spawn<F>(desired: usize, work_factory: F)
92where
93 F: WorkFactory
94{
95 let sentinel = Arc::new(Sentinel::new(desired, work_factory));
96 balance(&sentinel);
97}