1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
use std::thread;
use std::sync::{mpsc, mpsc::Sender, Arc, Mutex};
pub type Job = Box<dyn FnOnce() + Send + 'static>;
/// A ThreadPool that runs the given task with the given threads.
/// If all the threads are occupied then it waits until a new thread is available.
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Sender<Message>,
}
struct Worker {
thread: Option<thread::JoinHandle<()>>,
id: usize,
}
enum Message {
Terminate,
Job(Job)
}
impl ThreadPool {
/// Returns a new instce of ThreadPool.
/// The ThreadPool::new() method takes an argument which is the number of threads
/// the threadpool has.
///
/// # Examples
///
/// ```
/// let pool = ThreadPool::new(4);
/// ```
pub fn new(size: usize) -> Self {
let (sender, reciever) = mpsc::channel();
let reciever = Arc::new(Mutex::new(reciever));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
let reciever = Arc::clone(&reciever);
workers.push(Worker { thread: Some(thread::spawn(move || loop {
let msg: Message = match reciever.lock().unwrap().recv() {
Ok(n) => n,
Err(_) => continue,
};
match msg {
Message::Job(msg) => {
msg()
},
_ => {
break;
}
}
})), id: id + 1 });
}
ThreadPool {
sender,
workers,
}
}
/// Runs a FnOnce() with the number of threads it has.
///
/// # Examples
///
/// ```
/// let pool = ThreadPool::new();
/// ```
pub fn execute<F: FnOnce() + Send + 'static>(&self, job: F) {
self.sender.send(Message::Job(Box::new(job))).unwrap();
}
pub fn shutdown(&mut self) {
for _ in &self.workers {
self.sender.send(Message::Terminate).unwrap();
}
println!("ThreadPool 1.0.1: Shutting down all threads.");
for worker in &mut self.workers {
println!("ThreadPool 1.0.1: Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
self.shutdown();
}
}