use std::sync::Arc;
use std::{thread};
use std::thread::JoinHandle;
use crossbeam::channel::{TryRecvError, unbounded};
pub trait Worker: Default {
type Data: 'static + Send;
type Result: 'static + Send;
type Context: 'static + Send + Sync;
fn execute(&mut self, data: Self::Data, context: &Arc<Self::Context>) -> Self::Result;
}
pub struct WorkersPool<W: Worker> {
result_receiver: crossbeam::channel::Receiver<W::Result>,
work_sender: crossbeam::channel::Sender<W::Data>,
#[allow(dead_code)]
workers: Vec<JoinHandle<()>>
}
impl<W: Worker> WorkersPool<W> {
pub fn new(context: W::Context) -> Self {
let (result_sender,result_receiver) = unbounded();
let (work_sender,work_receiver) = unbounded();
let context = Arc::new(context);
let thread_count = num_cpus::get();
let mut workers = vec![];
for _ in 0..thread_count {
let work_receiver = work_receiver.clone();
let result_sender = result_sender.clone();
let context_clone = context.clone();
let thread = thread::spawn(move || {
let mut worker = W::default();
let context = context_clone;
loop {
let work = work_receiver.recv();
let work = match work {
Err(_) => {
return;
},
Ok(work) => {
work
}
};
let result = worker.execute(work, &context);
let send_result = result_sender.send(result);
match send_result {
Ok(_) => {}
Err(_) => {
return;
}
}
}
});
workers.push(thread);
}
Self {
result_receiver,
work_sender,
workers
}
}
pub fn add_work(&mut self, work: W::Data) -> Result<(),()>{
self.work_sender.send(work)
.map_err(|_| ())?;
Ok(())
}
pub fn receive_result(&mut self) -> Result<W::Result, ()> {
self.result_receiver.recv().map_err(|_| ())
}
pub fn try_receive_result(&mut self) -> Result<Option<W::Result>, ()> {
let result = self.result_receiver.try_recv();
match result {
Err(err) => {
match err {
TryRecvError::Empty => {
Ok(None)
}
TryRecvError::Disconnected => {
Err(())
}
}
}
Ok(ok) => {
Ok(Some(ok))
}
}
}
pub fn collect_finished(&mut self) -> Result<Vec<W::Result>, ()> {
let mut results = vec![];
loop {
let result = self.try_receive_result()?;
match result {
None => break,
Some(result) => {
results.push(result);
}
}
}
Ok(results)
}
pub fn has_work_left(&self) -> bool {
self.work_sender.is_empty()
}
pub fn has_results(&self) -> bool {
!self.result_receiver.is_empty()
}
}