fast_threadpool/
state.rs

1use crate::*;
2
3#[derive(Clone, Debug)]
4pub(crate) struct State {
5    inner: Arc<AtomicU32>,
6    max_available_workers: u32,
7    max_workers: u32,
8    min_available_workers: u32,
9    min_workers: u32,
10}
11
12const AVAILABLE_MASK: u32 = 0b_0000_0000_0000_0000_1111_1111_1111_1111;
13const WORKER_COUNT_BASE: u32 = 0b_0001_0000_0000_0000_0000;
14
15impl State {
16    pub(crate) fn new(conf: ThreadPoolConfig) -> Self {
17        Self {
18            inner: Arc::new(AtomicU32::new(0)),
19            max_available_workers: conf.max_available_workers.get() as u32,
20            max_workers: conf.max_workers.get() as u32,
21            min_available_workers: conf.min_available_workers.get() as u32,
22            min_workers: conf.min_workers.get() as u32,
23        }
24    }
25    pub(crate) fn decrease_available(&self) -> bool {
26        let previous_state = self.inner.fetch_sub(1, Ordering::SeqCst);
27        let available_count = previous_state & AVAILABLE_MASK;
28        //println!("TMP DEBUG: available_count={}", available_count);
29        (available_count - 1) <= self.max_available_workers
30    }
31    pub(crate) fn increment_available(&self) -> bool {
32        let previous_state = self.inner.fetch_add(1, Ordering::SeqCst);
33        let available_count = previous_state & AVAILABLE_MASK;
34        let worker_count = previous_state >> 16;
35        (available_count + 1) <= self.max_available_workers && worker_count < self.max_workers
36    }
37    pub(crate) fn allow_new_worker(&self) -> bool {
38        self.inner
39            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
40                let worker_count = state >> 16;
41                if worker_count < self.max_workers {
42                    Some(state + WORKER_COUNT_BASE + 1)
43                } else {
44                    None
45                }
46            })
47            .is_ok()
48    }
49    pub(crate) fn need_spawn(&self) -> bool {
50        let state = self.inner.load(Ordering::SeqCst);
51        let available_count = state & AVAILABLE_MASK;
52        let worker_count = state >> 16;
53        available_count <= self.max_available_workers && worker_count < self.max_workers
54    }
55    pub(crate) fn allow_to_shutdown(&self) -> bool {
56        self.inner
57            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
58                let available_count = state & AVAILABLE_MASK;
59                let worker_count = state >> 16;
60                if available_count > self.min_available_workers && worker_count > self.min_workers {
61                    Some(state - WORKER_COUNT_BASE)
62                } else {
63                    None
64                }
65            })
66            .is_ok()
67    }
68}
69
70#[cfg(test)]
71mod tests {
72    use super::*;
73
74    const EIGHT: NonZeroU16 = unsafe { NonZeroU16::new_unchecked(8) };
75
76    #[test]
77    fn test_state() {
78        let state = State::new(
79            ThreadPoolConfig::default()
80                .min_workers(EIGHT)
81                .max_available_workers(EIGHT),
82        );
83
84        for _ in 0..8 {
85            assert!(state.allow_new_worker());
86        }
87
88        for _ in 0..8 {
89            assert!(state.decrease_available());
90        }
91    }
92}