1use std::sync::Arc;
2use std::sync::Mutex;
3use std::sync::MutexGuard;
4use std::sync::PoisonError;
5use std::sync::mpsc;
6use std::sync::mpsc::Sender;
7use std::thread::JoinHandle;
8use std::collections::HashMap;
9use std::num::Wrapping;
10
11#[derive(Clone, Copy, Eq, PartialOrd, PartialEq, Debug)]
12pub enum Notify {
13 Started(u32),
14 Go,
15 Terminated(u32),
16}
17pub struct ThreadQueue {
18 sender_map:Arc<Mutex<HashMap<u32,Sender<Notify>>>>,
19 sender:Option<Sender<Notify>>,
20 current_id:Arc<Mutex<u32>>,
21 last_id:Arc<Mutex<u32>>,
22}
23impl ThreadQueue {
24 pub fn new() -> ThreadQueue {
25 ThreadQueue {
26 sender_map:Arc::new(Mutex::new(HashMap::new())),
27 sender:None,
28 current_id:Arc::new(Mutex::new(0)),
29 last_id:Arc::new(Mutex::new(0)),
30 }
31 }
32
33 pub fn submit<F,T>(&mut self,f:F) ->
34 Result<JoinHandle<T>,PoisonError<MutexGuard<HashMap<u32,Sender<Notify>>>>>
35 where F: FnOnce() -> T, F: Send + 'static, T: Send + 'static {
36
37 if self.sender_map.lock()?.len() == 0 {
38 let (ss,sr) = mpsc::channel();
39 let current_id = self.current_id.clone();
40 let last_id = self.last_id.clone();
41 let sender_map = self.sender_map.clone();
42
43 std::thread::spawn(move || {
44 while *last_id.lock().unwrap() == *current_id.lock().unwrap() {
45 std::thread::sleep(std::time::Duration::from_millis(1));
46 }
47 while sender_map.lock().unwrap().len() > 0 {
48 match sr.recv().unwrap() {
49 Notify::Started(id) => {
50 match current_id.lock() {
51 Ok(ref mut current_id) if id == **current_id => {
52 sender_map.lock()
53 .unwrap().get(&id)
54 .unwrap().send(Notify::Go).unwrap();
55 },
56 Ok(_) => (),
57 Err(ref e) => {
58 panic!(format!("{:?}",e));
59 }
60 }
61 },
62 Notify::Terminated(id) => {
63 match current_id.lock() {
64 Ok(ref mut current_id) if id == **current_id => {
65 match sender_map.lock() {
66 Ok(mut map) => {
67 map.remove(&id);
68 let id = Wrapping(id) + Wrapping(1);
69 let id = id.0;
70 **current_id = id;
71 match map.get(&id) {
72 Some(ref sender) => {
73 sender.send(Notify::Go).unwrap();
74 },
75 None => ()
76 }
77 },
78 Err(ref e) => {
79 panic!(format!("{:?}",e));
80 }
81 };
82 },
83 Ok(_) => (),
84 Err(ref e) => {
85 panic!(format!("{:?}",e));
86 }
87 }
88 },
89 _ => (),
90 }
91 }
92 });
93 self.sender = Some(ss)
94 }
95
96 match self.last_id.lock() {
97 Ok(ref mut last_id) => {
98 let (cs,cr) = mpsc::channel();
99
100 self.sender_map.lock()?.insert(**last_id, cs.clone());
101
102 let ss = match self.sender {
103 Some(ref ss) => ss.clone(),
104 None => panic!("Sender is not initialized."),
105 };
106
107 let id = **last_id;
108 let next_id = Wrapping(**last_id) + Wrapping(1);
109
110 **last_id = next_id.0;
111
112 let r = std::thread::spawn(move || {
113 ss.send(Notify::Started(id)).unwrap();
114 cr.recv().unwrap();
115
116 let r = f();
117
118 ss.send(Notify::Terminated(id)).unwrap();
119 r
120 });
121 Ok(r)
122 },
123 Err(ref e) => {
124 panic!(format!("{:?}",e));
125 }
126 }
127 }
128}