1#[macro_use]
72extern crate log;
73extern crate mxo_env_logger;
74extern crate crossbeam_channel;
75extern crate num_cpus;
76
77use crossbeam_channel::{unbounded, Sender, Receiver, RecvTimeoutError};
78use mxo_env_logger::{init, LogErr};
79
80use std::sync::atomic::{Ordering, AtomicUsize, AtomicBool};
81use std::sync::{Arc, Once, ONCE_INIT};
82use std::fmt::{self, Debug, Display};
83use std::time::Duration;
84use std::mem::transmute;
85use std::marker::PhantomData;
86use std::error::Error;
87use std::thread;
88use std::io;
89unsafe impl Send for Pool {}
90unsafe impl Sync for Pool {}
91
92#[derive(Debug)]
94pub struct Pool {
95 inner: Inner,
96}
97
98impl Pool {
99 pub fn new() -> Result<Self, PoolError> {
100 Self::with_builder(Builder::default())
101 }
102 pub fn with_builder(b: Builder) -> Result<Self, PoolError> {
103 assert!(b.max >= b.min, "min > max");
104 assert!(b.max != 0, "max == 0");
105 let _ = init();
106
107 let mut new = Pool { inner: Inner::with_builder(b) };
108
109 match (&mut new).inner.run() {
110 Ok(_) => Ok(new),
111 Err(e) => Err(PoolError::new(new, e)),
112 }
113 }
114 pub fn as_builder(&self) -> &Builder {
116 self.inner.as_builder()
117 }
118 pub fn is_empty(&self) -> bool {
120 self.inner.is_empty()
121 }
122 pub fn tasks_len(&self) -> usize {
124 self.inner.tasks_len()
125 }
126 pub fn threads_future(&self) -> usize {
129 self.inner.threads_future()
130 }
131 pub fn threads_alive(&self) -> usize {
133 self.inner.threads_alive()
134 }
135 pub fn threads_waiting(&self) -> usize {
137 self.inner.threads_waiting()
138 }
139 pub fn daemon_alive(&self) -> bool {
141 self.inner.daemon_alive()
142 }
143 #[doc(hidden)]
144 pub fn dropped(&self) -> bool {
145 self.inner.dropped()
146 }
147 pub fn push<T>(&self, task: T)
151 where
152 T: Runable + Send + 'static,
153 {
154 self.inner.push(
155 Box::new(task) as Box<Runable + Send + 'static>,
156 )
157 }
158 pub fn add_threads(&self, add_num: usize) -> Result<(), (usize, io::Error)> {
160 self.inner.add_threads(add_num)
161 }
162 pub fn join(&self) {
164 self.join_ms(10);
165 }
166 #[doc(hidden)]
167 pub fn join_ms(&self, ms: u64) {
168 while !self.is_empty() {
169 thread::sleep(Duration::from_millis(ms)); }
171 }
172}
173
174impl Drop for Pool {
175 fn drop(&mut self) {
176 self.inner.as_builder().dropped.store(
177 true,
178 Ordering::SeqCst,
179 )
180 }
181}
182
183include!("inner.rs");
184include!("scope.rs");
185
186#[derive(Debug)]
188pub struct PoolError {
189 pool: Pool,
190 error: std::io::Error,
191}
192
193impl PoolError {
194 #[inline]
195 fn new(pool: Pool, error: std::io::Error) -> Self {
196 PoolError {
197 pool: pool,
198 error: error,
199 }
200 }
201 #[inline]
203 pub fn into_inner(self) -> Pool {
204 self.pool
205 }
206 #[inline]
208 pub fn into_error(self) -> std::io::Error {
209 self.error
210 }
211}
212
213impl Error for PoolError {
214 fn description(&self) -> &str {
215 self.error.description()
216 }
217}
218
219impl Display for PoolError {
220 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
221 use std::error::Error;
222 write!(
223 f,
224 "PoolError {{ pool : Pool, err : {} }}",
225 self.error.description()
226 )
227 }
228}
229
230
231#[cfg(test)]
232mod tests {
233 use super::*;
234
235 use std::sync::{Arc, Mutex};
236 use std::time::Duration;
237 use std::thread;
238 #[should_panic]
239 #[test]
240 fn min_bq_max() {
241 let _pool = Builder::new().min(100).max(99).build();
242 }
243 #[should_panic]
244 #[test]
245 fn max_zero() {
246 let _pool = Builder::new().max(0).build();
247 }
248 #[test]
249 fn min_eq_max() {
250 let _pool = Builder::new().min(100).max(100).build();
251 }
252 #[test]
253 fn min_max() {
254 let min0 = Builder::min_default();
255 let max0 = Builder::max_default();
256
257 let p0 = Builder::new().max(min0 - 1);
258 assert_eq!(min0 - 1, p0.max);
259 assert_eq!(min0 - 1, p0.min);
260
261 let p1 = Builder::new().min(max0 + 1);
262 assert_eq!(p1.min, max0 + 1);
263 assert_eq!(p1.max, max0 + 1);
264
265 let p2 = Builder::new().min(max0).max(min0);
266 assert_eq!(p2.min, max0);
267 assert_eq!(p2.max, min0);
268 }
269 #[test]
270 fn fn_fnmut_fnonce_closure() {
271 fn fnn() {
272 println!("call Fn() push");
273 }
274
275 fn fnm(msg: &mut String) {
276 println!("{}", msg);
277 *msg = "call FnMut() return".to_owned()
278 }
279
280 fn fno(msg: String) {
281 println!("{}", msg);
282 }
283 let mut str = std::env::args().nth(0).unwrap();
284 let str1 = std::env::args().nth(0).unwrap();
285 let str2 = std::env::args().nth(0).unwrap();
286
287 let pool = Pool::new().unwrap();
288 pool.push(fnn);
289 pool.push(move || fnm(&mut str));
290 pool.push(move || fno(str1));
291
292 let closure = move || for _ in 0..str2.len() {
293 if std::env::args().count() > 100_0000 {
294 println!("Fake");
295 }
296 };
297 pool.push(closure);
298 pool.join()
299 }
300
301 #[test]
302 fn pool() {
303 let pool = Pool::new().unwrap();
304 assert!(Builder::num_cpus() >= 1);
305 assert_eq!(*pool.as_builder().min_get(), Builder::min_default());
306 assert_eq!(
307 *pool.as_builder().max_get(),
308 Builder::max_default()
309 );
310 assert_eq!(
311 pool.as_builder().timeout_get(),
312 Some(&Duration::from_millis(TIME_OUT_MS))
313 );
314 assert!(pool.as_builder().name_get().is_none());
315 assert!(pool.as_builder().stack_size_get().is_none());
316 assert_eq!(
317 *pool.as_builder().load_limit_get(),
318 Builder::num_cpus() * Builder::num_cpus()
319 );
320
321 assert_eq!(
322 pool.as_builder().daemon_get(),
323 Some(&Duration::from_millis(TIME_OUT_MS))
324 );
325 assert!(pool.daemon_alive());
326 assert!(!pool.dropped());
327
328 let array = (0..33usize).into_iter().map(|i| (i, 0)).collect::<Vec<_>>();
329
330 let map = Arc::new(Mutex::new(array));
331 for i in 0..33 {
332 let mutex = map.clone();
333 pool.push(move || test(i, mutex));
334 }
335
336 while !pool.is_empty() {
337 thread::sleep(Duration::from_millis(10)); eprint!(
339 "len()/min()/max(): {}/{}/{}",
340 pool.threads_alive(),
341 pool.as_builder().min_get(),
342 pool.as_builder().max_get()
343 );
344 }
345
346 for &(k, v) in map.lock().unwrap().iter() {
347 println!("key: {}\tvalue: {}", k, v);
348 }
349
350 assert!(pool.threads_alive() > 0);
351 assert!(pool.threads_waiting() > 0);
352 assert_eq!(*pool.as_builder().min_get(), Builder::min_default());
353 assert_eq!(
354 *pool.as_builder().max_get(),
355 Builder::max_default()
356 );
357 assert_eq!(
358 pool.as_builder().timeout_get(),
359 Some(&Duration::from_millis(TIME_OUT_MS))
360 );
361 assert!(pool.as_builder().name_get().is_none());
362 assert!(pool.as_builder().stack_size_get().is_none());
363 assert_eq!(
364 *pool.as_builder().load_limit_get(),
365 Builder::num_cpus() * Builder::num_cpus()
366 );
367
368 assert_eq!(
369 pool.as_builder().daemon_get(),
370 Some(&Duration::from_millis(TIME_OUT_MS))
371 );
372 assert!(pool.daemon_alive());
373 assert!(!pool.dropped());
374 println!("{:?}", pool);
375 }
376
377 fn test(msg: usize, map: Arc<Mutex<Vec<(usize,usize)>>>) {
378 let res = fib(msg);
379 let mut maplock = map.lock().unwrap();
380 maplock[msg as usize].1 = res;
381 }
382 fn fib(msg: usize) -> usize {
383 match msg {
384 0...2 => 1,
385 x => fib(x - 1) + fib(x - 2),
386 }
387 }
388 #[test]
389 fn scope_fib() {
390 let pool = Pool::new().unwrap();
391 let mut array = (0..33usize).into_iter().map(|i| (i, 0)).collect::<Vec<_>>();
392
393 let mutex = Arc::new(Mutex::new(array.clone()));
394 for i in 0..33usize {
395 let mutex = mutex.clone();
396 pool.push(move || test(i, mutex));
397 }
398
399 pool.scoped(|scope| for i in array.iter_mut() {
400 scope.push(move|| i.1 = fib(i.0));
401 });
402
403 pool.join();
404 let array_true = mutex.lock().unwrap();
405 assert_eq!(*array_true,array);
406 for (i, j) in array {
407 println!("key: {}\tvalue: {}", i, j);
408 }
409 }
410 #[test]
411 fn scope_x2() {
412 let pool = Pool::new().unwrap();
413 let mut array = (0..100usize).into_iter().map(|i| (i, 0)).collect::<Vec<_>>();
414
415 let mutex = Arc::new(Mutex::new(array.clone()));
416 for i in 0..100 {
417 let mutex = mutex.clone();
418 pool.push(move || x2(i, mutex));
419 }
420
421 pool.scoped(|scope| for i in array.iter_mut() {
422 scope.push(move|| i.1 = i.0*i.0);
423 });
424
425 pool.join();
426 let array_true = mutex.lock().unwrap();
427 assert_eq!(*array_true,array);
428 for (i, j) in array {
429 println!("key: {}\tvalue: {}", i, j);
430 }
431 }
432 fn x2(msg: usize, map: Arc<Mutex<Vec<(usize,usize)>>>) {
433 let res = msg*msg;
434 let mut maplock = map.lock().unwrap();
435 maplock[msg].1 = res;
436 }
437}