use crate::err;
use crate::error::Error;
use crate::server::worker::Worker;
use flume::{Receiver, Sender};
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
pub trait Executable {
fn exec(self);
}
#[derive(Debug)]
pub struct Threadpool<T, const STACK_SIZE: usize> {
queue_tx: Sender<T>,
queue_rx_seed: Receiver<T>,
workers: Arc<AtomicUsize>,
}
impl<T, const STACK_SIZE: usize> Threadpool<T, STACK_SIZE> {
pub fn new(workers_max: usize) -> Self
where
T: Executable + Send + 'static,
{
let (queue_tx, queue_rx_seed) = flume::bounded(workers_max);
let workers = Arc::new(AtomicUsize::default());
Self { queue_tx, queue_rx_seed, workers }
}
pub fn dispatch(&self, job: T) -> Result<(), Error>
where
T: Executable + Send + 'static,
{
let worker_count = self.workers.load(SeqCst);
if worker_count == 0 {
self.spawn()?;
}
if worker_count <= self.queue_tx.len() {
let _ = self.spawn();
}
self.queue_tx.try_send(job).map_err(|_| err!("Threadpool is congested"))?;
Ok(())
}
fn spawn(&self) -> Result<(), Error>
where
T: Executable + Send + 'static,
{
if Some(self.workers.load(SeqCst)) >= self.queue_tx.capacity() {
return Err(err!("Worker limit exceeded"));
}
Worker::<T, STACK_SIZE>::spawn(self.queue_rx_seed.clone(), self.workers.clone())
}
}
impl<T, const STACK_SIZE: usize> Clone for Threadpool<T, STACK_SIZE> {
fn clone(&self) -> Self {
Self {
queue_tx: self.queue_tx.clone(),
queue_rx_seed: self.queue_rx_seed.clone(),
workers: self.workers.clone(),
}
}
}