workers_pool/
lib.rs

1
2//! A worker pool used for parallel computing of a large number of relatively small tasks.
3//!
4//! Tasks are computed on separate threads and are given read-only access to a common context.
5//!
6//! Threads are spawned at creation and do not currently recover from panics.
7
8use std::sync::Arc;
9use std::{thread};
10use std::thread::JoinHandle;
11use crossbeam::channel::{TryRecvError, unbounded};
12
13
14/// Abstraction of a worker that can execute computations
15///
16/// Although a shared context is provided for each computation, a Worker itself can contain state which cannot be shared across threads.
17///
18/// The execute function should be non blocking as a blocking execute function can lead to deadlocks when dropping the WorkersPool.
19/// The execute function should be non panicking as a panic leads to the thread executing the worker shutting down.
20pub trait Worker: Default  {
21    type Data: 'static + Send;
22    type Result: 'static + Send;
23    type Context: 'static + Send + Sync;
24
25    fn execute(&mut self, data: Self::Data, context: &Arc<Self::Context>) -> Self::Result;
26}
27
28/// Abstraction of a threadpool for executing units of computation in parallel.
29pub struct WorkersPool<W: Worker> {
30    result_receiver: crossbeam::channel::Receiver<W::Result>,
31    work_sender: crossbeam::channel::Sender<W::Data>,
32    #[allow(dead_code)]
33    workers: Vec<JoinHandle<()>>
34}
35
36impl<W: Worker> WorkersPool<W> {
37    pub fn new(context: W::Context) -> Self {
38        let (result_sender,result_receiver) = unbounded();
39        let (work_sender,work_receiver) = unbounded();
40
41        let context = Arc::new(context);
42
43        let thread_count = num_cpus::get();
44
45        let mut workers = vec![];
46
47        for _ in 0..thread_count {
48            let work_receiver = work_receiver.clone();
49            let result_sender = result_sender.clone();
50
51            let context_clone = context.clone();
52
53            let thread = thread::spawn(move || {
54                let mut worker = W::default();
55                let context = context_clone;
56
57                loop {
58                    let work = work_receiver.recv();
59
60                    let work = match work {
61                        Err(_) => {
62                            return;
63                        },
64                        Ok(work) => {
65                            work
66                        }
67                    };
68
69                    let result = worker.execute(work, &context);
70
71                    let send_result = result_sender.send(result);
72
73                    match send_result {
74                        Ok(_) => {}
75                        Err(_) => {
76                            return;
77                        }
78                    }
79                }
80            });
81
82            workers.push(thread);
83        }
84
85        Self {
86            result_receiver,
87            work_sender,
88            workers
89        }
90    }
91
92    /// Adds work to be executed on one of the threads of this pool
93    /// This function is non-blocking
94    pub fn add_work(&mut self, work: W::Data) -> Result<(),()>{
95        self.work_sender.send(work)
96            .map_err(|_| ())?;
97
98        Ok(())
99    }
100
101    /// Receives the result of a computation
102    /// This function blocks until a result is available or all threads have panicked
103    pub fn receive_result(&mut self) -> Result<W::Result, ()> {
104        self.result_receiver.recv().map_err(|_| ())
105    }
106
107    /// Tries to receive the result of a computation
108    /// This function does not block if no result is available but rather returns Ok(None)
109    pub fn try_receive_result(&mut self) -> Result<Option<W::Result>, ()> {
110        let result = self.result_receiver.try_recv();
111
112        match result {
113            Err(err) => {
114                match err {
115                    TryRecvError::Empty => {
116                        Ok(None)
117                    }
118                    TryRecvError::Disconnected => {
119                        Err(())
120                    }
121                }
122            }
123            Ok(ok) => {
124                Ok(Some(ok))
125            }
126        }
127    }
128
129    /// Collects all available results
130    pub fn collect_finished(&mut self) -> Result<Vec<W::Result>, ()> {
131        let mut results = vec![];
132
133        loop {
134            let result = self.try_receive_result()?;
135            match result {
136                None => break,
137                Some(result) => {
138                    results.push(result);
139                }
140            }
141        }
142
143        Ok(results)
144    }
145
146    /// Checks if any work is left in the work queue
147    ///
148    /// Note: An empty work queue does not mean that none of the worker threads is still busy
149    pub fn has_work_left(&self) -> bool {
150        self.work_sender.is_empty()
151    }
152
153    /// Checks if there are results available
154    pub fn has_results(&self) -> bool {
155        !self.result_receiver.is_empty()
156    }
157}