poolite/
lib.rs

1/*!
2# [poolite](https://github.com/biluohc/poolite)
3
4A lite threadpool library written for Rust.
5
6## Usage
7
8On Cargo.toml:
9
10```toml
11 [dependencies]
12 poolite = "0.7.1"
13```
14
15## Documentation  
16* Visit [Docs.rs](https://docs.rs/poolite/)  
17
18or 
19
20* Run `cargo doc --open` after modified the toml file.
21
22## Base usage
23```
24extern crate poolite;
25use poolite::Pool;
26
27fn main() {
28    let pool = Pool::new().unwrap();
29    for i in 0..38 {
30        pool.push(move || test(i));
31    }
32
33    pool.join(); //wait for the pool
34}
35
36fn test(msg: i32) {
37    println!("key: {}\tvalue: {}", msg, fib(msg));
38}
39
40fn fib(msg: i32) -> i32 {
41    match msg {
42        0...2 => 1,
43        x => fib(x - 1) + fib(x - 2),
44    }
45}
46```
47
48## `Scoped` `Task`
49```
50extern crate poolite;
51use poolite::Pool;
52
53fn main() {
54    let pool = Pool::new().unwrap();
55    let mut array = (0..100usize).into_iter().map(|i| (i, 0)).collect::<Vec<_>>();
56
57    // scoped method will waiting scoped's task running finish.
58    pool.scoped(|scope| for i in array.iter_mut() {
59        // have to move
60        scope.push(move|| i.1 = i.0*i.0);
61    });
62
63    for (i, j) in array {
64        println!("key: {}\tvalue: {}", i, j);
65    }
66}
67```
68
69## [More Examples..](https://github.com/biluohc/poolite/blob/master/examples/)
70*/
71#[macro_use]
72extern crate log;
73extern crate mxo_env_logger;
74extern crate crossbeam_channel;
75extern crate num_cpus;
76
77use crossbeam_channel::{unbounded, Sender, Receiver, RecvTimeoutError};
78use mxo_env_logger::{init, LogErr};
79
80use std::sync::atomic::{Ordering, AtomicUsize, AtomicBool};
81use std::sync::{Arc, Once, ONCE_INIT};
82use std::fmt::{self, Debug, Display};
83use std::time::Duration;
84use std::mem::transmute;
85use std::marker::PhantomData;
86use std::error::Error;
87use std::thread;
88use std::io;
89unsafe impl Send for Pool {}
90unsafe impl Sync for Pool {}
91
92/// The `Pool` struct
93#[derive(Debug)]
94pub struct Pool {
95    inner: Inner,
96}
97
98impl Pool {
99    pub fn new() -> Result<Self, PoolError> {
100       Self::with_builder(Builder::default())
101    }
102    pub fn with_builder(b: Builder) -> Result<Self, PoolError> {
103        assert!(b.max >= b.min, "min > max");
104        assert!(b.max != 0, "max == 0");
105        let _ = init();        
106
107        let mut new =  Pool { inner: Inner::with_builder(b) };
108
109        match (&mut new).inner.run() {
110            Ok(_) => Ok(new),
111            Err(e) => Err(PoolError::new(new, e)),
112        }
113    }
114    /// Get `Pool`'s settings
115    pub fn as_builder(&self) -> &Builder {
116        self.inner.as_builder()
117    }
118    /// All threads are waiting and tasks_queue'length is 0.
119    pub fn is_empty(&self) -> bool {
120        self.inner.is_empty()
121    }
122    /// Returns the length of the tasks_queue.
123    pub fn tasks_len(&self) -> usize {
124        self.inner.tasks_len()
125    }
126    // #[doc(hidden)]
127    /// Contains the number of ready to create
128    pub fn threads_future(&self) -> usize {
129        self.inner.threads_future()
130    }
131    /// Returns the number of threads in the Pool.
132    pub fn threads_alive(&self) -> usize {
133        self.inner.threads_alive()
134    }
135    /// Returns the number of threads that is waiting for Task in the Pool
136    pub fn threads_waiting(&self) -> usize {
137        self.inner.threads_waiting()
138    }
139    /// The daemon thread's status
140    pub fn daemon_alive(&self) -> bool {
141        self.inner.daemon_alive()
142    }
143    #[doc(hidden)]
144    pub fn dropped(&self) -> bool {
145        self.inner.dropped()
146    }
147    /// Appends a task to the Pool,
148    ///
149    /// it receives `Fn() + Send + 'static,FnMut() + Send + 'static` and `FnOnce() + Send + 'static>`.
150    pub fn push<T>(&self, task: T)
151    where
152        T: Runable + Send + 'static,
153    {
154        self.inner.push(
155            Box::new(task) as Box<Runable + Send + 'static>,
156        )
157    }
158    /// Manually add the number of threads to `Pool`
159    pub fn add_threads(&self, add_num: usize) -> Result<(), (usize, io::Error)> {
160        self.inner.add_threads(add_num)
161    }
162    ///wait for the pool
163    pub fn join(&self) {
164        self.join_ms(10);
165    }
166    #[doc(hidden)]
167    pub fn join_ms(&self, ms: u64) {
168        while !self.is_empty() {
169            thread::sleep(Duration::from_millis(ms)); //wait for the pool time(ms).
170        }
171    }
172}
173
174impl Drop for Pool {
175    fn drop(&mut self) {
176        self.inner.as_builder().dropped.store(
177            true,
178            Ordering::SeqCst,
179        )
180    }
181}
182
183include!("inner.rs");
184include!("scope.rs");
185
186/// The error type for the pool's `run()` if the pool spawning the daemon thread fails.
187#[derive(Debug)]
188pub struct PoolError {
189    pool: Pool,
190    error: std::io::Error,
191}
192
193impl PoolError {
194    #[inline]
195    fn new(pool: Pool, error: std::io::Error) -> Self {
196        PoolError {
197            pool: pool,
198            error: error,
199        }
200    }
201    ///  Into `Pool`
202    #[inline]
203    pub fn into_inner(self) -> Pool {
204        self.pool
205    }
206    /// Into `std::io::Error`
207    #[inline]
208    pub fn into_error(self) -> std::io::Error {
209        self.error
210    }
211}
212
213impl Error for PoolError {
214    fn description(&self) -> &str {
215        self.error.description()
216    }
217}
218
219impl Display for PoolError {
220    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
221        use std::error::Error;
222        write!(
223            f,
224            "PoolError {{ pool : Pool, err : {} }}",
225            self.error.description()
226        )
227    }
228}
229
230
231#[cfg(test)]
232mod tests {
233    use super::*;
234
235    use std::sync::{Arc, Mutex};
236    use std::time::Duration;
237    use std::thread;
238    #[should_panic]
239    #[test]
240    fn min_bq_max() {
241        let _pool = Builder::new().min(100).max(99).build();
242    }
243    #[should_panic]
244    #[test]
245    fn max_zero() {
246        let _pool = Builder::new().max(0).build();
247    }
248    #[test]
249    fn min_eq_max() {
250        let _pool = Builder::new().min(100).max(100).build();
251    }
252    #[test]
253    fn min_max() {
254        let min0 = Builder::min_default();
255        let max0 = Builder::max_default();
256
257        let p0 = Builder::new().max(min0 - 1);
258        assert_eq!(min0 - 1, p0.max);
259        assert_eq!(min0 - 1, p0.min);
260
261        let p1 = Builder::new().min(max0 + 1);
262        assert_eq!(p1.min, max0 + 1);
263        assert_eq!(p1.max, max0 + 1);
264
265        let p2 = Builder::new().min(max0).max(min0);
266        assert_eq!(p2.min, max0);
267        assert_eq!(p2.max, min0);
268    }
269    #[test]
270    fn fn_fnmut_fnonce_closure() {
271        fn fnn() {
272            println!("call Fn() push");
273        }
274
275        fn fnm(msg: &mut String) {
276            println!("{}", msg);
277            *msg = "call FnMut() return".to_owned()
278        }
279
280        fn fno(msg: String) {
281            println!("{}", msg);
282        }
283        let mut str = std::env::args().nth(0).unwrap();
284        let str1 = std::env::args().nth(0).unwrap();
285        let str2 = std::env::args().nth(0).unwrap();
286
287        let pool = Pool::new().unwrap();
288        pool.push(fnn);
289        pool.push(move || fnm(&mut str));
290        pool.push(move || fno(str1));
291
292        let closure = move || for _ in 0..str2.len() {
293            if std::env::args().count() > 100_0000 {
294                println!("Fake");
295            }
296        };
297        pool.push(closure);
298        pool.join()
299    }
300
301    #[test]
302    fn pool() {
303        let pool = Pool::new().unwrap();
304        assert!(Builder::num_cpus() >= 1);
305        assert_eq!(*pool.as_builder().min_get(), Builder::min_default());
306        assert_eq!(
307            *pool.as_builder().max_get(),
308            Builder::max_default()
309        );
310        assert_eq!(
311            pool.as_builder().timeout_get(),
312            Some(&Duration::from_millis(TIME_OUT_MS))
313        );
314        assert!(pool.as_builder().name_get().is_none());
315        assert!(pool.as_builder().stack_size_get().is_none());
316        assert_eq!(
317            *pool.as_builder().load_limit_get(),
318            Builder::num_cpus() * Builder::num_cpus()
319        );
320
321        assert_eq!(
322            pool.as_builder().daemon_get(),
323            Some(&Duration::from_millis(TIME_OUT_MS))
324        );
325        assert!(pool.daemon_alive());
326        assert!(!pool.dropped());
327
328        let array = (0..33usize).into_iter().map(|i| (i, 0)).collect::<Vec<_>>();
329
330        let map = Arc::new(Mutex::new(array));
331        for i in 0..33 {
332            let mutex = map.clone();
333            pool.push(move || test(i, mutex));
334        }
335
336        while !pool.is_empty() {
337            thread::sleep(Duration::from_millis(10)); //wait for the pool 10ms.
338            eprint!(
339                "len()/min()/max(): {}/{}/{}",
340                pool.threads_alive(),
341                pool.as_builder().min_get(),
342                pool.as_builder().max_get()
343            );
344        }
345
346        for &(k, v) in map.lock().unwrap().iter() {
347            println!("key: {}\tvalue: {}", k, v);
348        }
349
350        assert!(pool.threads_alive() > 0);
351        assert!(pool.threads_waiting() > 0);
352        assert_eq!(*pool.as_builder().min_get(), Builder::min_default());
353        assert_eq!(
354            *pool.as_builder().max_get(),
355            Builder::max_default()
356        );
357        assert_eq!(
358            pool.as_builder().timeout_get(),
359            Some(&Duration::from_millis(TIME_OUT_MS))
360        );
361        assert!(pool.as_builder().name_get().is_none());
362        assert!(pool.as_builder().stack_size_get().is_none());
363        assert_eq!(
364            *pool.as_builder().load_limit_get(),
365            Builder::num_cpus() * Builder::num_cpus()
366        );
367
368        assert_eq!(
369            pool.as_builder().daemon_get(),
370            Some(&Duration::from_millis(TIME_OUT_MS))
371        );
372        assert!(pool.daemon_alive());
373        assert!(!pool.dropped());
374        println!("{:?}", pool);
375    }
376
377    fn test(msg: usize, map: Arc<Mutex<Vec<(usize,usize)>>>) {
378        let res = fib(msg);
379        let mut maplock = map.lock().unwrap();
380        maplock[msg as usize].1 = res;
381    }
382    fn fib(msg: usize) -> usize {
383        match msg {
384            0...2 => 1,
385            x => fib(x - 1) + fib(x - 2),
386        }
387    }
388    #[test]
389    fn scope_fib() {
390        let pool = Pool::new().unwrap();
391        let mut array = (0..33usize).into_iter().map(|i| (i, 0)).collect::<Vec<_>>();
392
393        let mutex = Arc::new(Mutex::new(array.clone()));
394        for i in 0..33usize {
395            let mutex = mutex.clone();
396            pool.push(move || test(i, mutex));
397        }
398
399        pool.scoped(|scope| for i in array.iter_mut() {
400            scope.push(move|| i.1 = fib(i.0));
401        });
402
403        pool.join();
404        let array_true = mutex.lock().unwrap();
405        assert_eq!(*array_true,array);
406        for (i, j) in array {
407            println!("key: {}\tvalue: {}", i, j);
408        }
409    }
410    #[test]
411    fn scope_x2() {
412        let pool = Pool::new().unwrap();
413        let mut array = (0..100usize).into_iter().map(|i| (i, 0)).collect::<Vec<_>>();
414
415        let mutex = Arc::new(Mutex::new(array.clone()));
416        for i in 0..100 {
417            let mutex = mutex.clone();
418            pool.push(move || x2(i, mutex));
419        }
420
421        pool.scoped(|scope| for i in array.iter_mut() {
422            scope.push(move|| i.1 = i.0*i.0);
423        });
424
425        pool.join();
426        let array_true = mutex.lock().unwrap();
427        assert_eq!(*array_true,array);
428        for (i, j) in array {
429            println!("key: {}\tvalue: {}", i, j);
430        }
431    }
432    fn x2(msg: usize, map: Arc<Mutex<Vec<(usize,usize)>>>) {
433        let res = msg*msg;
434        let mut maplock = map.lock().unwrap();
435        maplock[msg].1 = res;
436    }
437}