use crate::scope::Scope;
use crate::worker::{Job, Worker, Message};
use crate::{channel, Counter, PoolConfig, Result};
use crate::channel::SenderWrapper;
pub struct ThreadPool {
workers: Vec<Worker>,
sender: SenderWrapper<Message>,
job_count: Counter,
max_jobs: Option<u16>,
}
impl ThreadPool {
pub fn new(config: PoolConfig) -> Result<ThreadPool> {
config.validate()?;
let size = config.n_workers as usize;
let (sender,receiver) =
if let Some(max) = config.incoming_buf_size {
channel::sync_channel(max as usize)
} else {
channel::channel()
};
let mut workers = Vec::with_capacity(size);
for _ in 0..size-1 {
let worker = Worker::new(receiver.clone());
workers.push(worker);
}
let worker = Worker::new(receiver);
workers.push(worker);
let global = Counter::new();
Ok(ThreadPool {
workers,
job_count: global,
max_jobs: config.max_jobs,
sender,
})
}
#[inline]
pub fn with_default_config() -> Self {
let conf = PoolConfig::default();
Self::new(conf).expect("The default config is valid")
}
#[inline]
pub fn with_size(size: u16) -> Result<Self> {
let conf = PoolConfig::builder()
.n_workers(size)
.build();
Self::new(conf)
}
pub fn pending_jobs(&self) -> usize {
self.job_count.count() as usize
}
pub(crate) fn execute_inside_scope(&self, job: Box<dyn Job<'static>>, scope_counter: Counter) {
self.job_count.inc(self.max_jobs);
scope_counter.inc(None);
let msg = Message::Job {
job: Box::new(job),
global_counter: self.job_count.clone(),
scope_counter: Some(scope_counter),
};
self.sender.send(msg).unwrap()
}
pub fn execute(&self, job: impl Job<'static>) {
self.job_count.inc(self.max_jobs);
let msg = Message::Job {
job: Box::new(job),
global_counter: self.job_count.clone(),
scope_counter: None
};
self.sender.send(msg).unwrap();
}
pub fn scope<'scope, 'pool, F, R>(&'pool self, f: F) -> R
where
F: FnOnce(&Scope<'scope, 'pool>) -> R,
'pool: 'scope
{
let scope = Scope::new(self);
f(&scope)
}
pub fn join(&self) {
self.job_count.join();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
for _ in 0..self.workers.len() {
self.sender.send(Message::Shutdown).unwrap();
}
for worker in &mut self.workers {
worker.shutdown();
}
}
}
impl Default for ThreadPool {
fn default() -> Self {
Self::with_default_config()
}
}