use super::{
types::{ExecuteError, Message, MessageSyncSender},
worker::Worker,
};
use std::sync::{
mpsc::{self, TrySendError},
Arc, Mutex,
};
use log::{error, trace};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: MessageSyncSender,
}
impl ThreadPool {
pub fn new(size: usize, max_jobs_queue: usize) -> ThreadPool {
trace!("Creating ThreadPool...");
assert!(size > 0);
assert!(max_jobs_queue > 0);
let (sender, receiver) = mpsc::sync_channel(max_jobs_queue);
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
trace!("Creating ThreadPool with size {}...", size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
trace!("ThreadPool created");
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F) -> Result<(), ExecuteError>
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
match self.sender.try_send(Message::NewJob(job)) {
Ok(()) => {
trace!("Job added to the queue");
Ok(())
}
Err(TrySendError::Full(_)) => Err(ExecuteError::Full),
Err(TrySendError::Disconnected(_)) => {
let msg = "Channel closed by receivers";
error!("{}", msg);
panic!("{}", msg);
}
}
}
pub fn join(&mut self) {
trace!("Joining workers...");
for _ in &mut self.workers {
self.sender.send(Message::Terminate).unwrap();
}
trace!("Terminate message sent to all workers");
for worker in &mut self.workers {
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
trace!("Worker #{} joined", worker.id);
}
}
trace!("Workers joined");
}
}