1pub mod error;
40pub mod spawn_policy;
41mod pipe;
42
43use std::ops::Index;
44use std::thread::{ JoinHandle, Builder };
45use std::panic;
46use std::panic:: { RefUnwindSafe };
47use std::sync::atomic::{ AtomicBool, Ordering };
48use std::sync::Arc;
49
50use error::TaskQueueError;
51use pipe::Sender;
52use pipe::Receiver;
53use pipe::ReceiverHandle;
54use pipe::Priority;
55use spawn_policy::SpawnPolicy;
56use spawn_policy::StaticSpawnPolicy;
57
58pub struct TaskQueue {
59 sender: Sender<Message>,
60
61 policy: Box<SpawnPolicy>,
62 min_threads: usize,
63 max_threads: usize,
64
65 threads: Vec<ThreadInfo>,
66 closing_threads: Vec<ThreadInfo>
67}
68
69impl TaskQueue {
70 pub fn new() -> Self {
72 TaskQueue::with_threads(10, 10).expect("10 and 10 satisfy with_threads method validation")
73 }
74
75 pub fn with_threads(min: usize, max: usize) -> Result<Self, TaskQueueError> {
77 if min <= 0 || max <= 0 || max < min {
78 return Err(TaskQueueError::illegal_start_threads(min, max));
79 }
80
81 Ok(TaskQueue {
82 sender: Sender::<Message>::new(),
83 policy: Box::new(StaticSpawnPolicy::new()),
84 min_threads: min,
85 max_threads: max,
86 threads: Vec::new(),
87 closing_threads: Vec::new()
88 })
89 }
90
91 pub fn enqueue<F>(&mut self, f: F) -> Result<(), TaskQueueError> where F: Fn() + Send + 'static, {
108 let task = Task { value: Box::new(f) };
110 self.sender.put(Message::Task(task));
111
112 let stats = TaskQueueStats::new(self);
114 let count = self.policy.get_count(stats);
115 if self.min_threads > count || count > self.max_threads {
116 return Err(TaskQueueError::illegal_policy_threads(self.min_threads, self.max_threads, count));
117 }
118
119 let mut runned = self.threads.len();
121 while runned != count {
122 if runned > count {
123 let info = self.threads.remove(0);
124 let receiver = info.receiver.clone();
125
126 self.closing_threads.push(info);
127 self.sender.put_with_priority(Some(receiver), Priority::High, Message::CloseThread);
128
129 runned -= 1;
130 } else {
131 let info = self.build_and_run()?;
132 self.threads.push(info);
133
134 runned += 1;
135 }
136 }
137
138 for i in (0..self.closing_threads.len()).rev() {
140 let is_thread_closed = {
141 let info = self.closing_threads.index(i);
142 info.closed.load(Ordering::SeqCst)
143 };
144
145 if is_thread_closed {
146 self.closing_threads.remove(i);
147 }
148 }
149
150 Ok(())
152 }
153
154 fn build_and_run(&mut self) -> Result<ThreadInfo, TaskQueueError> {
155 let receiver = self.sender.create_receiver();
156 let receiver_handle = receiver.handle();
157 let name = format!("TaskQueue::thread {}", receiver_handle);
158 let close_flag = Arc::new(AtomicBool::new(false));
159 let close_flag_clone = close_flag.clone();
160
161 let handle = Builder::new()
162 .name(name)
163 .spawn(move || Self::thread_update(close_flag_clone, receiver))?;
164
165 Ok(ThreadInfo::new(receiver_handle, handle, close_flag))
166 }
167
168 fn thread_update(close_flag: Arc<AtomicBool>, receiver: Receiver<Message>) {
169 loop {
170 let message = receiver.get();
171 match message {
172 Message::Task(t) => {
173 let _ = panic::catch_unwind(|| t.run());
174 },
175 Message::CloseThread => {
176 close_flag.store(true, Ordering::SeqCst);
177 return;
178 }
179 }
180 }
181 }
182
183 pub fn stop(mut self) -> Vec<JoinHandle<()>> {
204 self.stop_impl()
205 }
206
207 pub fn stop_wait(mut self) {
225 let handles = self.stop_impl();
226 for h in handles {
227 h.join().expect("Join error");
228 }
229 }
230
231 fn stop_impl(&mut self) -> Vec<JoinHandle<()>> {
232 for info in &self.threads {
234 self.sender.put_with_priority(Some(info.receiver), Priority::Min, Message::CloseThread);
235 }
236
237 self.threads
238 .drain(..)
239 .chain(self.closing_threads.drain(..))
240 .map(|t| t.handle)
241 .collect()
242 }
243
244 pub fn stop_immediately(mut self) -> Vec<Task> {
262 for info in &self.threads {
264 self.sender.put_with_priority(Some(info.receiver), Priority::High, Message::CloseThread);
265 }
266
267 let threads : Vec<ThreadInfo> = self.threads
268 .drain(..)
269 .chain(self.closing_threads.drain(..))
270 .collect();
271
272 for info in threads {
274 info.handle.join().expect("Join error");
275 }
276
277 let not_executed = self.sender.cancel_all();
279 let mut result = Vec::<Task>::new();
280 for m in not_executed {
281 let task = match m {
282 Message::Task(t) => t,
283 Message::CloseThread => panic!("This should never happen")
284 };
285
286 result.push(task);
287 }
288
289 result
290 }
291
292 pub fn set_spawn_policy(&mut self, policy: Box<SpawnPolicy>) {
294 self.policy = policy;
295 }
296
297 pub fn get_threads_count(&self) -> usize {
299 self.threads.len()
300 }
301
302 pub fn get_threads_max(&self) -> usize {
304 self.max_threads
305 }
306
307 pub fn get_threads_min(&self) -> usize {
309 self.min_threads
310 }
311
312 pub fn tasks_count(&self) -> usize {
314 self.sender.size()
315 }
316}
317
318impl Drop for TaskQueue {
319 fn drop(&mut self) {
321 self.stop_impl();
322 }
323}
324
325struct ThreadInfo {
326 receiver: ReceiverHandle,
327 handle: JoinHandle<()>,
328 closed: Arc<AtomicBool>
329}
330
331impl ThreadInfo {
332 fn new(receiver: ReceiverHandle, handle: JoinHandle<()>, close_flag: Arc<AtomicBool>) -> Self {
333 ThreadInfo {
334 receiver: receiver,
335 handle: handle,
336 closed: close_flag
337 }
338 }
339}
340
341enum Message {
342 Task(Task),
343 CloseThread,
344}
345
346pub struct Task {
347 value: Box<Fn() + Send>,
348}
349
350impl Task {
351 pub fn run(&self) {
352 (self.value)();
353 }
354}
355
356impl RefUnwindSafe for Task {
357
358}
359
360#[derive(Clone, Copy)]
361pub struct TaskQueueStats {
362 pub threads_count: usize,
363 pub threads_max: usize,
364 pub threads_min: usize,
365 pub tasks_count: usize,
366}
367
368impl TaskQueueStats {
369 fn new(queue: &TaskQueue) -> Self {
370 TaskQueueStats {
371 threads_count: queue.get_threads_count(),
372 threads_max: queue.get_threads_max(),
373 threads_min: queue.get_threads_min(),
374 tasks_count: queue.tasks_count(),
375 }
376 }
377
378 pub fn empty() -> Self {
379 TaskQueueStats {
380 threads_count: 0,
381 threads_max: 0,
382 threads_min: 0,
383 tasks_count: 0
384 }
385 }
386}