hel_thread_pool/
thread_pool.rs1use std::sync::{
2 mpsc::{self, Sender},
3 Arc, Mutex,
4};
5
6use crate::{thread_pool::worker::THREAD_LOCAL_PANIC_HOOK, utils::optimal_number_of_threads};
7
8use self::{
9 state::{state_cell::StateCell, State},
10 worker::{Task, Worker},
11};
12
13#[derive(Debug)]
15pub struct ThreadPool {
16 _workers: Vec<Worker>,
17 sender: Sender<Task>,
18 capacity: usize,
19 state: Arc<State>,
20}
21
22pub const MAX_THREADS: usize = std::mem::size_of::<StateCell>() * 8;
24
25impl ThreadPool {
26 #[inline]
29 pub fn new() -> Self {
30 Self::with_capacity(optimal_number_of_threads(u16::MAX as usize))
31 }
32
33 #[inline]
35 pub fn with_capacity(capacity: usize) -> Self {
36 assert!(
37 capacity <= MAX_THREADS,
38 "ThreadPool: Does not support capacity over {}",
39 MAX_THREADS
40 );
41
42 let mut _workers = Vec::with_capacity(capacity);
43
44 let (sender, receiver) = mpsc::channel();
45 let receiver = Arc::new(Mutex::new(receiver));
46
47 let state = Arc::new(State::default());
48
49 let prev_hook = std::panic::take_hook();
50
51 std::panic::set_hook(Box::new(move |info| {
52 unsafe {
53 match THREAD_LOCAL_PANIC_HOOK {
54 Some(f) => (*f)(),
55 None => {}
56 }
57 }
58
59 prev_hook(info);
60 }));
61
62 for id in 0..capacity {
63 _workers.push(Worker::new(receiver.clone(), id, state.clone()));
64 }
65
66 Self {
67 _workers,
68 sender,
69 capacity,
70 state,
71 }
72 }
73
74 #[inline(always)]
76 pub fn capacity(&self) -> usize {
77 self.capacity
78 }
79
80 #[inline]
82 pub fn execute<F: FnOnce() + Send + 'static>(&self, f: F) {
83 self
84 .sender
85 .send(Task::New(Box::new(f)))
86 .expect("Error while sending job to thread worker")
87 }
88
89 #[inline]
91 pub fn iter(&self) -> std::ops::Range<usize> {
92 0..self.capacity
93 }
94
95 #[inline]
98 pub fn join(self) {
99 drop(self)
100 }
101
102 #[inline(always)]
104 pub fn check_panics(&self) -> usize {
105 self.state.panicking.count()
106 }
107
108 #[inline(always)]
110 pub fn check_busy(&self) -> usize {
111 self.state.busy.count()
112 }
113}
114
115impl Drop for ThreadPool {
116 fn drop(&mut self) {
117 for _ in self.iter() {
118 self.sender.send(Task::Break).unwrap()
119 }
120 }
121}
122
123impl Default for ThreadPool {
124 #[inline(always)]
125 fn default() -> Self {
126 Self::new()
127 }
128}
129
130mod state;
131mod worker;