1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
use crate::{
error::Error,
threadpool::{counter::Counter, Executable},
};
use flume::Receiver;
use std::{sync::Arc, thread::Builder, time::Duration};
pub struct Worker<T, const STACK_SIZE: usize, const TIMEOUT: u64 = 5> {
queue_rx: Receiver<T>,
worker_idle: Arc<Counter>,
worker_total: Arc<Counter>,
}
impl<T, const STACK_SIZE: usize, const TIMEOUT: u64> Worker<T, STACK_SIZE, TIMEOUT> {
const TIMEOUT: Duration = Duration::from_secs(TIMEOUT);
pub fn spawn(queue_rx: &Receiver<T>, worker_idle: &Arc<Counter>, worker_total: &Arc<Counter>) -> Result<(), Error>
where
T: Executable + Send + 'static,
{
let this =
Self { queue_rx: queue_rx.clone(), worker_idle: worker_idle.clone(), worker_total: worker_total.clone() };
let builder = Builder::new().stack_size(STACK_SIZE).name("threadpool worker thread".to_string());
builder.spawn(|| this.runloop())?;
Ok(())
}
fn runloop(self)
where
T: Executable,
{
let _worker_total_guard = self.worker_total.increment_tmp();
let _worker_idle_guard = self.worker_idle.increment_tmp();
'runloop: loop {
let job = match self.queue_rx.recv_timeout(Self::TIMEOUT) {
Ok(job) => job,
Err(_) if self.worker_total.get() < 2 => continue 'runloop,
Err(_) => break 'runloop,
};
let _worker_idle_guard = self.worker_idle.decrement_tmp();
job.exec();
}
}
}