1
2use std::sync::Arc;
9use std::{thread};
10use std::thread::JoinHandle;
11use crossbeam::channel::{TryRecvError, unbounded};
12
13
14pub trait Worker: Default {
21 type Data: 'static + Send;
22 type Result: 'static + Send;
23 type Context: 'static + Send + Sync;
24
25 fn execute(&mut self, data: Self::Data, context: &Arc<Self::Context>) -> Self::Result;
26}
27
28pub struct WorkersPool<W: Worker> {
30 result_receiver: crossbeam::channel::Receiver<W::Result>,
31 work_sender: crossbeam::channel::Sender<W::Data>,
32 #[allow(dead_code)]
33 workers: Vec<JoinHandle<()>>
34}
35
36impl<W: Worker> WorkersPool<W> {
37 pub fn new(context: W::Context) -> Self {
38 let (result_sender,result_receiver) = unbounded();
39 let (work_sender,work_receiver) = unbounded();
40
41 let context = Arc::new(context);
42
43 let thread_count = num_cpus::get();
44
45 let mut workers = vec![];
46
47 for _ in 0..thread_count {
48 let work_receiver = work_receiver.clone();
49 let result_sender = result_sender.clone();
50
51 let context_clone = context.clone();
52
53 let thread = thread::spawn(move || {
54 let mut worker = W::default();
55 let context = context_clone;
56
57 loop {
58 let work = work_receiver.recv();
59
60 let work = match work {
61 Err(_) => {
62 return;
63 },
64 Ok(work) => {
65 work
66 }
67 };
68
69 let result = worker.execute(work, &context);
70
71 let send_result = result_sender.send(result);
72
73 match send_result {
74 Ok(_) => {}
75 Err(_) => {
76 return;
77 }
78 }
79 }
80 });
81
82 workers.push(thread);
83 }
84
85 Self {
86 result_receiver,
87 work_sender,
88 workers
89 }
90 }
91
92 pub fn add_work(&mut self, work: W::Data) -> Result<(),()>{
95 self.work_sender.send(work)
96 .map_err(|_| ())?;
97
98 Ok(())
99 }
100
101 pub fn receive_result(&mut self) -> Result<W::Result, ()> {
104 self.result_receiver.recv().map_err(|_| ())
105 }
106
107 pub fn try_receive_result(&mut self) -> Result<Option<W::Result>, ()> {
110 let result = self.result_receiver.try_recv();
111
112 match result {
113 Err(err) => {
114 match err {
115 TryRecvError::Empty => {
116 Ok(None)
117 }
118 TryRecvError::Disconnected => {
119 Err(())
120 }
121 }
122 }
123 Ok(ok) => {
124 Ok(Some(ok))
125 }
126 }
127 }
128
129 pub fn collect_finished(&mut self) -> Result<Vec<W::Result>, ()> {
131 let mut results = vec![];
132
133 loop {
134 let result = self.try_receive_result()?;
135 match result {
136 None => break,
137 Some(result) => {
138 results.push(result);
139 }
140 }
141 }
142
143 Ok(results)
144 }
145
146 pub fn has_work_left(&self) -> bool {
150 self.work_sender.is_empty()
151 }
152
153 pub fn has_results(&self) -> bool {
155 !self.result_receiver.is_empty()
156 }
157}