1use crate::*;
2
3#[derive(Clone, Debug)]
4pub(crate) struct State {
5 inner: Arc<AtomicU32>,
6 max_available_workers: u32,
7 max_workers: u32,
8 min_available_workers: u32,
9 min_workers: u32,
10}
11
12const AVAILABLE_MASK: u32 = 0b_0000_0000_0000_0000_1111_1111_1111_1111;
13const WORKER_COUNT_BASE: u32 = 0b_0001_0000_0000_0000_0000;
14
15impl State {
16 pub(crate) fn new(conf: ThreadPoolConfig) -> Self {
17 Self {
18 inner: Arc::new(AtomicU32::new(0)),
19 max_available_workers: conf.max_available_workers.get() as u32,
20 max_workers: conf.max_workers.get() as u32,
21 min_available_workers: conf.min_available_workers.get() as u32,
22 min_workers: conf.min_workers.get() as u32,
23 }
24 }
25 pub(crate) fn decrease_available(&self) -> bool {
26 let previous_state = self.inner.fetch_sub(1, Ordering::SeqCst);
27 let available_count = previous_state & AVAILABLE_MASK;
28 (available_count - 1) <= self.max_available_workers
30 }
31 pub(crate) fn increment_available(&self) -> bool {
32 let previous_state = self.inner.fetch_add(1, Ordering::SeqCst);
33 let available_count = previous_state & AVAILABLE_MASK;
34 let worker_count = previous_state >> 16;
35 (available_count + 1) <= self.max_available_workers && worker_count < self.max_workers
36 }
37 pub(crate) fn allow_new_worker(&self) -> bool {
38 self.inner
39 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
40 let worker_count = state >> 16;
41 if worker_count < self.max_workers {
42 Some(state + WORKER_COUNT_BASE + 1)
43 } else {
44 None
45 }
46 })
47 .is_ok()
48 }
49 pub(crate) fn need_spawn(&self) -> bool {
50 let state = self.inner.load(Ordering::SeqCst);
51 let available_count = state & AVAILABLE_MASK;
52 let worker_count = state >> 16;
53 available_count <= self.max_available_workers && worker_count < self.max_workers
54 }
55 pub(crate) fn allow_to_shutdown(&self) -> bool {
56 self.inner
57 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
58 let available_count = state & AVAILABLE_MASK;
59 let worker_count = state >> 16;
60 if available_count > self.min_available_workers && worker_count > self.min_workers {
61 Some(state - WORKER_COUNT_BASE)
62 } else {
63 None
64 }
65 })
66 .is_ok()
67 }
68}
69
70#[cfg(test)]
71mod tests {
72 use super::*;
73
74 const EIGHT: NonZeroU16 = unsafe { NonZeroU16::new_unchecked(8) };
75
76 #[test]
77 fn test_state() {
78 let state = State::new(
79 ThreadPoolConfig::default()
80 .min_workers(EIGHT)
81 .max_available_workers(EIGHT),
82 );
83
84 for _ in 0..8 {
85 assert!(state.allow_new_worker());
86 }
87
88 for _ in 0..8 {
89 assert!(state.decrease_available());
90 }
91 }
92}