use crate::error::Error;
use crate::server::pool::Executable;
use flume::Receiver;
use std::hash::{BuildHasher, Hasher, RandomState};
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use std::thread::Builder;
use std::time::Duration;
pub struct Worker<T, const STACK_SIZE: usize> {
queue_rx: Receiver<T>,
worker: Arc<AtomicUsize>,
}
impl<T, const STACK_SIZE: usize> Worker<T, STACK_SIZE> {
const TIMEOUT: Duration = Duration::from_secs(4);
const TERMCHANCE: u64 = 7;
pub fn spawn(queue_rx: Receiver<T>, worker: Arc<AtomicUsize>) -> Result<(), Error>
where
T: Executable + Send + 'static,
{
worker.fetch_add(1, SeqCst);
let this = Self { queue_rx, worker };
let builder = Builder::new().stack_size(STACK_SIZE).name("threadpool worker thread".to_string());
builder.spawn(|| this.runloop())?;
Ok(())
}
fn runloop(self)
where
T: Executable,
{
'runloop: loop {
let Ok(job) = self.queue_rx.recv_timeout(Self::TIMEOUT) else {
let hasher = RandomState::new().build_hasher();
match hasher.finish() % Self::TERMCHANCE {
0 => break 'runloop,
_ => continue 'runloop,
}
};
job.exec();
}
}
}
impl<T, const STACK_SIZE: usize> Drop for Worker<T, STACK_SIZE> {
fn drop(&mut self) {
self.worker.fetch_sub(1, SeqCst);
}
}