hel_thread_pool/
thread_pool.rs

1use std::sync::{
2	mpsc::{self, Sender},
3	Arc, Mutex,
4};
5
6use crate::{thread_pool::worker::THREAD_LOCAL_PANIC_HOOK, utils::optimal_number_of_threads};
7
8use self::{
9	state::{state_cell::StateCell, State},
10	worker::{Task, Worker},
11};
12
13///
14#[derive(Debug)]
15pub struct ThreadPool {
16	_workers: Vec<Worker>,
17	sender: Sender<Task>,
18	capacity: usize,
19	state: Arc<State>,
20}
21
22/// Max number or thread supported atm
23pub const MAX_THREADS: usize = std::mem::size_of::<StateCell>() * 8;
24
25impl ThreadPool {
26	/// Will return [`ThreadPool`] with `capacity = logical-cores - 1`
27	/// all the threads will spawn immediately
28	#[inline]
29	pub fn new() -> Self {
30		Self::with_capacity(optimal_number_of_threads(u16::MAX as usize))
31	}
32
33	/// Will return [`ThreadPool`] with user defined capacity
34	#[inline]
35	pub fn with_capacity(capacity: usize) -> Self {
36		assert!(
37			capacity <= MAX_THREADS,
38			"ThreadPool: Does not support capacity over {}",
39			MAX_THREADS
40		);
41
42		let mut _workers = Vec::with_capacity(capacity);
43
44		let (sender, receiver) = mpsc::channel();
45		let receiver = Arc::new(Mutex::new(receiver));
46
47		let state = Arc::new(State::default());
48
49		let prev_hook = std::panic::take_hook();
50
51		std::panic::set_hook(Box::new(move |info| {
52			unsafe {
53				match THREAD_LOCAL_PANIC_HOOK {
54					Some(f) => (*f)(),
55					None => {}
56				}
57			}
58
59			prev_hook(info);
60		}));
61
62		for id in 0..capacity {
63			_workers.push(Worker::new(receiver.clone(), id, state.clone()));
64		}
65
66		Self {
67			_workers,
68			sender,
69			capacity,
70			state,
71		}
72	}
73
74	/// Returns [`ThreadPool`] capacity
75	#[inline(always)]
76	pub fn capacity(&self) -> usize {
77		self.capacity
78	}
79
80	/// Executes passed function in
81	#[inline]
82	pub fn execute<F: FnOnce() + Send + 'static>(&self, f: F) {
83		self
84			.sender
85			.send(Task::New(Box::new(f)))
86			.expect("Error while sending job to thread worker")
87	}
88
89	/// Returns an iterator for capacity
90	#[inline]
91	pub fn iter(&self) -> std::ops::Range<usize> {
92		0..self.capacity
93	}
94
95	/// Does not actually call `join` on a thread
96	/// Instead breaks from internal loop
97	#[inline]
98	pub fn join(self) {
99		drop(self)
100	}
101
102	/// Returns an amount of threads panicking
103	#[inline(always)]
104	pub fn check_panics(&self) -> usize {
105		self.state.panicking.count()
106	}
107
108	/// Returns an amount of busy threads
109	#[inline(always)]
110	pub fn check_busy(&self) -> usize {
111		self.state.busy.count()
112	}
113}
114
115impl Drop for ThreadPool {
116	fn drop(&mut self) {
117		for _ in self.iter() {
118			self.sender.send(Task::Break).unwrap()
119		}
120	}
121}
122
123impl Default for ThreadPool {
124	#[inline(always)]
125	fn default() -> Self {
126		Self::new()
127	}
128}
129
130mod state;
131mod worker;