worker_sentinel/
lib.rs

1use std::thread;
2use std::sync::{atomic, Arc};
3
4pub trait Work: Send + Sized + 'static {
5    fn work(self) -> Option<Self>;
6}
7
8pub trait WorkFactory: Sync + Send + Sized + 'static {
9    type Work: Work + 'static;
10    fn build(&self) -> Self::Work;
11}
12impl<W, F> WorkFactory for F
13where
14    W: Work,
15    F: Fn() -> W + Sync + Send + 'static,
16{
17    type Work = W;
18    fn build(&self) -> W {
19        self()
20    }
21}
22
23pub struct Baton<F>
24where
25    F: WorkFactory,
26{
27    sentinel: Arc<Sentinel<F>>,
28}
29
30struct Sentinel<F>
31where
32    F: WorkFactory,
33{
34    work_factory: F,
35    desired: atomic::AtomicUsize,
36    count: atomic::AtomicUsize,
37}
38impl<F> Sentinel<F>
39where
40    F: WorkFactory,
41{
42    fn new(desired: usize, work_factory: F) -> Self {
43        let count = atomic::AtomicUsize::new(0);
44        let desired = atomic::AtomicUsize::new(desired);
45        Sentinel { work_factory, desired, count }
46    }
47}
48
49fn balance<F>(sentinel: &Arc<Sentinel<F>>)
50where
51    F: WorkFactory,
52{
53    loop {
54        let curr = sentinel.count.load(atomic::Ordering::SeqCst);
55        if curr >= sentinel.desired.load(atomic::Ordering::Relaxed) {
56            break;
57        }
58        let prev = sentinel.count.compare_and_swap(curr, curr + 1, atomic::Ordering::SeqCst);
59        if prev != curr {
60            continue;
61        }
62        let baton_sentinel = sentinel.clone();
63        let work = sentinel.work_factory.build();
64        thread::spawn(move || {
65            let baton = Baton { sentinel: baton_sentinel };
66            let mut work = work;
67            loop {
68                match work.work() {
69                    Some(next_work) => work = next_work,
70                    None => {
71                        baton.sentinel.desired.store(0, atomic::Ordering::SeqCst);
72                        return;
73                    },
74                }
75            }
76        });
77    }
78}
79
80impl<F> Drop for Baton<F>
81where
82    F: WorkFactory,
83{
84    fn drop(&mut self) {
85        self.sentinel.count.fetch_sub(1, atomic::Ordering::SeqCst);
86
87        balance(&self.sentinel);
88    }
89}
90
91pub fn spawn<F>(desired: usize, work_factory: F)
92where
93    F: WorkFactory
94{
95    let sentinel = Arc::new(Sentinel::new(desired, work_factory));
96    balance(&sentinel);
97}