queuingtask/
lib.rs

1use std::sync::Arc;
2use std::sync::Mutex;
3use std::sync::MutexGuard;
4use std::sync::PoisonError;
5use std::sync::mpsc;
6use std::sync::mpsc::Sender;
7use std::thread::JoinHandle;
8use std::collections::HashMap;
9use std::num::Wrapping;
10
11#[derive(Clone, Copy, Eq, PartialOrd, PartialEq, Debug)]
12pub enum Notify {
13	Started(u32),
14	Go,
15	Terminated(u32),
16}
17pub struct ThreadQueue {
18	sender_map:Arc<Mutex<HashMap<u32,Sender<Notify>>>>,
19	sender:Option<Sender<Notify>>,
20	current_id:Arc<Mutex<u32>>,
21	last_id:Arc<Mutex<u32>>,
22}
23impl ThreadQueue {
24	pub fn new() -> ThreadQueue {
25		ThreadQueue {
26			sender_map:Arc::new(Mutex::new(HashMap::new())),
27			sender:None,
28			current_id:Arc::new(Mutex::new(0)),
29			last_id:Arc::new(Mutex::new(0)),
30		}
31	}
32
33	pub fn submit<F,T>(&mut self,f:F) ->
34		Result<JoinHandle<T>,PoisonError<MutexGuard<HashMap<u32,Sender<Notify>>>>>
35		where F: FnOnce() -> T, F: Send + 'static, T: Send + 'static {
36
37		if self.sender_map.lock()?.len() == 0 {
38			let (ss,sr) = mpsc::channel();
39			let current_id = self.current_id.clone();
40			let last_id = self.last_id.clone();
41			let sender_map = self.sender_map.clone();
42
43			std::thread::spawn(move || {
44				while *last_id.lock().unwrap() == *current_id.lock().unwrap() {
45					std::thread::sleep(std::time::Duration::from_millis(1));
46				}
47				while sender_map.lock().unwrap().len() > 0 {
48					match sr.recv().unwrap() {
49						Notify::Started(id) => {
50							match current_id.lock() {
51								Ok(ref mut current_id) if id == **current_id => {
52									sender_map.lock()
53										.unwrap().get(&id)
54										.unwrap().send(Notify::Go).unwrap();
55								},
56								Ok(_) => (),
57								Err(ref e) => {
58									panic!(format!("{:?}",e));
59								}
60							}
61						},
62						Notify::Terminated(id) => {
63							match current_id.lock() {
64								Ok(ref mut current_id) if id == **current_id => {
65									match sender_map.lock() {
66										Ok(mut map) => {
67											map.remove(&id);
68											let id = Wrapping(id) + Wrapping(1);
69											let id = id.0;
70											**current_id = id;
71											match map.get(&id) {
72												Some(ref sender) => {
73													sender.send(Notify::Go).unwrap();
74												},
75												None => ()
76											}
77										},
78										Err(ref e) => {
79											panic!(format!("{:?}",e));
80										}
81									};
82								},
83								Ok(_) => (),
84								Err(ref e) => {
85									panic!(format!("{:?}",e));
86								}
87							}
88						},
89						_ => (),
90					}
91				}
92			});
93			self.sender = Some(ss)
94		}
95
96		match self.last_id.lock() {
97			Ok(ref mut last_id) => {
98				let (cs,cr) = mpsc::channel();
99
100				self.sender_map.lock()?.insert(**last_id, cs.clone());
101
102				let ss = match self.sender {
103					Some(ref ss) => ss.clone(),
104					None => panic!("Sender is not initialized."),
105				};
106
107				let id = **last_id;
108				let next_id = Wrapping(**last_id) + Wrapping(1);
109
110				**last_id = next_id.0;
111
112				let r = std::thread::spawn(move || {
113					ss.send(Notify::Started(id)).unwrap();
114					cr.recv().unwrap();
115
116					let r = f();
117
118					ss.send(Notify::Terminated(id)).unwrap();
119					r
120				});
121				Ok(r)
122			},
123			Err(ref e) => {
124				panic!(format!("{:?}",e));
125			}
126		}
127	}
128}