task_queue/
lib.rs

1//! Task queue
2//! The implementation of the thread pool for Rust.
3//!
4//! # Example
5//! ``` rust
6//! extern crate task_queue;
7//!
8//! let mut queue = task_queue::TaskQueue::new();
9//!
10//! for _ in 0..10 {
11//!    queue.enqueue(|| {
12//!        println!("Hi from pool")
13//!    }).unwrap();
14//! }
15//! # queue.stop_wait();
16//! ```
17//!
18//! Library supports dynamic control over the number of threads.
19//! For implement it you should use SpawnPolicy trait.
20//!
21//! For example StaticSpawnPolicy implementation:
22//! # Example
23//! ```
24//! use task_queue::TaskQueueStats;
25//! use task_queue::spawn_policy::SpawnPolicy;
26//!
27//! pub struct StaticSpawnPolicy;
28//!
29//! impl SpawnPolicy for StaticSpawnPolicy {
30//!     fn get_count(&mut self, stats: TaskQueueStats) -> usize {
31//!         stats.threads_max
32//!     }
33//! }
34//! #
35//! # fn main() {
36//! # }
37//! ```
38
39pub mod error;
40pub mod spawn_policy;
41mod pipe;
42
43use std::ops::Index;
44use std::thread::{ JoinHandle, Builder };
45use std::panic;
46use std::panic:: { RefUnwindSafe };
47use std::sync::atomic::{ AtomicBool, Ordering };
48use std::sync::Arc;
49
50use error::TaskQueueError;
51use pipe::Sender;
52use pipe::Receiver;
53use pipe::ReceiverHandle;
54use pipe::Priority;
55use spawn_policy::SpawnPolicy;
56use spawn_policy::StaticSpawnPolicy;
57
58pub struct TaskQueue {
59    sender: Sender<Message>,
60
61    policy: Box<SpawnPolicy>,
62    min_threads: usize,
63    max_threads: usize,
64
65    threads: Vec<ThreadInfo>,
66    closing_threads: Vec<ThreadInfo>
67}
68
69impl TaskQueue {
70    /// Create new task queue with 10 threads.
71    pub fn new() -> Self {
72        TaskQueue::with_threads(10, 10).expect("10 and 10 satisfy with_threads method validation")
73    }
74
75    /// Create new task queue with selected threads count.
76    pub fn with_threads(min: usize, max: usize) -> Result<Self, TaskQueueError> {
77        if min <= 0 || max <= 0 || max < min {
78            return Err(TaskQueueError::illegal_start_threads(min, max));
79        }
80
81        Ok(TaskQueue {
82            sender: Sender::<Message>::new(),
83            policy: Box::new(StaticSpawnPolicy::new()),
84            min_threads: min,
85            max_threads: max,
86            threads: Vec::new(),
87            closing_threads: Vec::new()
88        })
89    }
90
91    /// Schedule task in queue
92    /// # Example
93    /// ``` rust
94    /// extern crate task_queue;
95    ///
96    /// let mut queue = task_queue::TaskQueue::new();
97    ///
98    /// for _ in 0..10 {
99    ///    queue.enqueue(move || {
100    ///        println!("Hi from pool")
101    ///    }).unwrap();
102    /// }
103    /// # queue.stop_wait();
104    /// ```
105    /// # Panics
106    /// If spawn policy returned illegal number of threads.
107    pub fn enqueue<F>(&mut self, f: F) -> Result<(), TaskQueueError> where F: Fn() + Send + 'static, {
108        // Put task
109        let task = Task { value: Box::new(f) };
110        self.sender.put(Message::Task(task));
111
112        // Get threads count from policy
113        let stats = TaskQueueStats::new(self);
114        let count = self.policy.get_count(stats);
115        if self.min_threads > count || count > self.max_threads {
116            return Err(TaskQueueError::illegal_policy_threads(self.min_threads, self.max_threads, count));
117        }
118
119        // Apply threads count if need
120        let mut runned = self.threads.len();
121        while runned != count {
122            if runned > count {
123                let info = self.threads.remove(0);
124                let receiver = info.receiver.clone();
125
126                self.closing_threads.push(info);
127                self.sender.put_with_priority(Some(receiver), Priority::High, Message::CloseThread);
128
129                runned -= 1;
130            } else {
131                let info = self.build_and_run()?;
132                self.threads.push(info);
133
134                runned += 1;
135            }
136        }
137
138        // Check removed threads
139        for i in (0..self.closing_threads.len()).rev() {
140            let is_thread_closed = {
141                let info = self.closing_threads.index(i);
142                info.closed.load(Ordering::SeqCst)
143            };
144
145            if is_thread_closed {
146                self.closing_threads.remove(i);
147            }
148        }
149
150        // Result
151        Ok(())
152    }
153
154    fn build_and_run(&mut self) -> Result<ThreadInfo, TaskQueueError> {
155        let receiver = self.sender.create_receiver();
156        let receiver_handle = receiver.handle();
157        let name = format!("TaskQueue::thread {}", receiver_handle);
158        let close_flag = Arc::new(AtomicBool::new(false));
159        let close_flag_clone = close_flag.clone();
160
161        let handle = Builder::new()
162            .name(name)
163            .spawn(move || Self::thread_update(close_flag_clone, receiver))?;
164
165        Ok(ThreadInfo::new(receiver_handle, handle, close_flag))
166    }
167
168    fn thread_update(close_flag: Arc<AtomicBool>, receiver: Receiver<Message>) {
169        loop {
170            let message = receiver.get();
171            match message {
172                Message::Task(t) => {
173                    let _ = panic::catch_unwind(|| t.run());
174                },
175                Message::CloseThread => {
176                    close_flag.store(true, Ordering::SeqCst);
177                    return;
178                }
179            }
180        }
181    }
182
183    /// Stops tasks queue work.
184    /// All task in queue will be completed by threads.
185    /// Method not block current thread work, but returns threads joinHandles.
186    ///
187    /// # Examples
188    /// ``` rust
189    /// extern crate task_queue;
190    ///
191    /// let mut queue = task_queue::TaskQueue::new();
192    ///
193    /// for _ in 0..10 {
194    ///    queue.enqueue(move || {
195    ///        println!("Hi from pool")
196    ///    }).unwrap();
197    /// }
198    /// let handles = queue.stop();
199    /// for h in handles {
200    ///     h.join().unwrap();
201    /// }
202    /// ```
203    pub fn stop(mut self) -> Vec<JoinHandle<()>> {
204        self.stop_impl()
205    }
206
207    /// Stops tasks queue work.
208    /// All task in queue will be completed by threads.
209    /// Method block current thread work.
210    ///
211    /// # Examples
212    /// ``` rust
213    /// extern crate task_queue;
214    ///
215    /// let mut queue = task_queue::TaskQueue::new();
216    ///
217    /// for _ in 0..10 {
218    ///    queue.enqueue(move || {
219    ///        println!("Hi from pool")
220    ///    }).unwrap();
221    /// }
222    /// queue.stop_wait();
223    /// ```
224    pub fn stop_wait(mut self) {
225        let handles = self.stop_impl();
226        for h in handles {
227            h.join().expect("Join error");
228        }
229    }
230
231    fn stop_impl(&mut self) -> Vec<JoinHandle<()>> {
232        // Close threads only after all tasks (send message with min priority)
233        for info in &self.threads {
234            self.sender.put_with_priority(Some(info.receiver), Priority::Min, Message::CloseThread);
235        }
236
237        self.threads
238            .drain(..)
239            .chain(self.closing_threads.drain(..))
240            .map(|t| t.handle)
241            .collect()
242    }
243
244    /// Stops tasks queue work immediately and return are not completed tasks.
245    /// # Examples
246    /// ``` rust
247    /// extern crate task_queue;
248    ///
249    /// let mut queue = task_queue::TaskQueue::new();
250    ///
251    /// for _ in 0..10 {
252    ///    queue.enqueue(move || {
253    ///        println!("Hi from pool")
254    ///    }).unwrap();
255    /// }
256    /// let not_completed = queue.stop_immediately();
257    /// for t in &not_completed {
258    ///     t.run();
259    /// }
260    /// ```
261    pub fn stop_immediately(mut self) -> Vec<Task> {
262        // Close threads immediately (send message with high priority)
263        for info in &self.threads {
264            self.sender.put_with_priority(Some(info.receiver), Priority::High, Message::CloseThread);
265        }
266
267        let threads : Vec<ThreadInfo> = self.threads
268            .drain(..)
269            .chain(self.closing_threads.drain(..))
270            .collect();
271
272        // Wait threads
273        for info in threads {
274            info.handle.join().expect("Join error");
275        }
276
277        // Cancel all tasks, and check it
278        let not_executed = self.sender.cancel_all();
279        let mut result = Vec::<Task>::new();
280        for m in not_executed {
281            let task = match m {
282                Message::Task(t) => t,
283                Message::CloseThread => panic!("This should never happen")
284            };
285
286            result.push(task);
287        }
288
289        result
290    }
291
292    /// Sets a policy for controlling the amount of threads
293    pub fn set_spawn_policy(&mut self, policy: Box<SpawnPolicy>) {
294        self.policy = policy;
295    }
296
297    /// Returns current threads count
298    pub fn get_threads_count(&self) -> usize {
299        self.threads.len()
300    }
301
302    /// Return max threads count
303    pub fn get_threads_max(&self) -> usize {
304        self.max_threads
305    }
306
307    /// Return min threads count
308    pub fn get_threads_min(&self) -> usize {
309        self.min_threads
310    }
311
312    /// Gets tasks count in queue
313    pub fn tasks_count(&self) -> usize {
314        self.sender.size()
315    }
316}
317
318impl Drop for TaskQueue {
319    /// All task in queue will be completed by threads.
320    fn drop(&mut self) {
321        self.stop_impl();
322    }
323}
324
325struct ThreadInfo {
326    receiver: ReceiverHandle,
327    handle: JoinHandle<()>,
328    closed: Arc<AtomicBool>
329}
330
331impl ThreadInfo {
332    fn new(receiver: ReceiverHandle, handle: JoinHandle<()>, close_flag: Arc<AtomicBool>) -> Self {
333        ThreadInfo {
334            receiver: receiver,
335            handle: handle,
336            closed: close_flag
337        }
338    }
339}
340
341enum Message {
342    Task(Task),
343    CloseThread,
344}
345
346pub struct Task {
347    value: Box<Fn() + Send>,
348}
349
350impl Task {
351    pub fn run(&self) {
352        (self.value)();
353    }
354}
355
356impl RefUnwindSafe for Task {
357
358}
359
360#[derive(Clone, Copy)]
361pub struct TaskQueueStats {
362    pub threads_count: usize,
363    pub threads_max: usize,
364    pub threads_min: usize,
365    pub tasks_count: usize,
366}
367
368impl TaskQueueStats {
369    fn new(queue: &TaskQueue) -> Self {
370        TaskQueueStats {
371            threads_count: queue.get_threads_count(),
372            threads_max: queue.get_threads_max(),
373            threads_min: queue.get_threads_min(),
374            tasks_count: queue.tasks_count(),
375        }
376    }
377
378    pub fn empty() -> Self {
379        TaskQueueStats {
380            threads_count: 0,
381            threads_max: 0,
382            threads_min: 0,
383            tasks_count: 0
384        }
385    }
386}