1use std::sync::{
6 atomic::{AtomicBool, Ordering},
7 Arc, Mutex,
8};
9use std::thread;
10
11use super::common::RawFunc;
12use super::sync as fpSync;
13use super::sync::Queue;
14
15pub trait Handler: Send + Sync + 'static {
24 fn is_started(&mut self) -> bool;
30
31 fn is_alive(&mut self) -> bool;
36
37 fn start(&mut self);
41
42 fn stop(&mut self);
51
52 fn post(&mut self, func: RawFunc);
61}
62
63#[derive(Clone)]
76pub struct HandlerThread {
77 started_alive: Arc<Mutex<(AtomicBool, AtomicBool)>>,
78
79 inner: Arc<HandlerThreadInner>,
80
81 handle: Arc<Mutex<Option<thread::JoinHandle<()>>>>,
82}
83
84impl Default for HandlerThread {
85 fn default() -> Self {
86 HandlerThread {
87 started_alive: Arc::new(Mutex::new((AtomicBool::new(false), AtomicBool::new(false)))),
88 inner: Arc::new(HandlerThreadInner::new()),
89
90 handle: Arc::new(Mutex::new(None)),
91 }
92 }
93}
94
95impl HandlerThread {
96 pub fn new() -> HandlerThread {
97 Default::default()
98 }
99 pub fn new_with_mutex() -> Arc<Mutex<HandlerThread>> {
100 Arc::new(Mutex::new(HandlerThread::new()))
101 }
102}
103
104impl Handler for HandlerThread {
105 fn is_started(&mut self) -> bool {
106 let started_alive = self.started_alive.lock().unwrap();
107 let &(ref started, _) = &*started_alive;
108 started.load(Ordering::SeqCst)
109 }
110
111 fn is_alive(&mut self) -> bool {
112 let started_alive = self.started_alive.lock().unwrap();
113 let &(_, ref alive) = &*started_alive;
114 alive.load(Ordering::SeqCst)
115 }
116
117 fn start(&mut self) {
118 {
119 let started_alive = self.started_alive.lock().unwrap();
120 let &(ref started, ref alive) = &*started_alive;
121
122 if started.load(Ordering::SeqCst) {
123 return;
124 }
125 started.store(true, Ordering::SeqCst);
126 if alive.load(Ordering::SeqCst) {
127 return;
128 }
129 alive.store(true, Ordering::SeqCst);
130 }
131
132 let mut _inner = self.inner.clone();
133 let mut this = self.clone();
134 self.handle = Arc::new(Mutex::new(Some(thread::spawn(move || {
135 Arc::make_mut(&mut _inner).start();
136
137 this.stop();
138 }))));
139 }
140
141 fn stop(&mut self) {
142 {
143 let started_alive = self.started_alive.lock().unwrap();
144 let &(ref started, ref alive) = &*started_alive;
145
146 if !started.load(Ordering::SeqCst) {
147 return;
148 }
149 if !alive.load(Ordering::SeqCst) {
150 return;
151 }
152 alive.store(false, Ordering::SeqCst);
153 }
154
155 Arc::make_mut(&mut self.inner).stop();
156
157 }
165
166 fn post(&mut self, func: RawFunc) {
167 Arc::make_mut(&mut self.inner).post(func);
168 }
169}
170
171#[derive(Clone)]
172struct HandlerThreadInner {
173 started: Arc<AtomicBool>,
175 alive: Arc<AtomicBool>,
176 q: Arc<fpSync::BlockingQueue<RawFunc>>,
177}
178
179impl HandlerThreadInner {
180 pub fn new() -> HandlerThreadInner {
181 HandlerThreadInner {
182 started: Arc::new(AtomicBool::new(false)),
183 alive: Arc::new(AtomicBool::new(false)),
184 q: Arc::new(<fpSync::BlockingQueue<RawFunc>>::new()),
185 }
186 }
187}
188
189impl Handler for HandlerThreadInner {
190 fn is_started(&mut self) -> bool {
191 self.started.load(Ordering::SeqCst)
192 }
193
194 fn is_alive(&mut self) -> bool {
195 self.alive.load(Ordering::SeqCst)
196 }
197
198 fn start(&mut self) {
199 self.alive.store(true, Ordering::SeqCst);
200
201 if self.is_started() {
202 return;
203 }
204 self.started.store(true, Ordering::SeqCst);
205
206 let q = Arc::make_mut(&mut self.q);
207
208 while self.alive.load(Ordering::SeqCst) {
209 let v = q.take();
210
211 match v {
212 Some(f) => {
213 f.invoke();
214 }
215 None => {
216 self.alive.store(false, Ordering::SeqCst);
217 }
218 }
219 }
220 }
221
222 fn stop(&mut self) {
223 self.alive.store(false, Ordering::SeqCst);
224 }
225
226 fn post(&mut self, func: RawFunc) {
227 let q = Arc::make_mut(&mut self.q);
228
229 q.put(func);
230 }
231}
232
233#[test]
234fn test_handler_new() {
235 use super::sync::CountDownLatch;
236 use std::time;
237
238 let mut _h = HandlerThread::new_with_mutex();
239 let mut h = _h.lock().unwrap();
240
241 assert_eq!(false, h.is_alive());
242 assert_eq!(false, h.is_started());
243
244 h.stop();
245 h.stop();
246 assert_eq!(false, h.is_alive());
247 assert_eq!(false, h.is_started());
248 h.start();
250 assert_eq!(true, h.is_alive());
251 assert_eq!(true, h.is_started());
252
253 let latch = CountDownLatch::new(1);
254 let latch2 = latch.clone();
255
256 h.post(RawFunc::new(move || {
258 println!("Executed !");
259
260 let latch3 = latch2.clone();
261
262 let mut _h2 = HandlerThread::new_with_mutex();
263 let mut _h2_inside = _h2.clone();
264
265 let mut h2 = _h2.lock().unwrap();
266 h2.start();
267
268 h2.post(RawFunc::new(move || {
269 latch3.countdown();
270
271 {
272 _h2_inside.lock().unwrap().stop();
273 }
274 }));
275 }));
276 println!("Test");
277
278 thread::sleep(time::Duration::from_millis(1));
279
280 assert_eq!(true, h.is_alive());
281 assert_eq!(true, h.is_started());
282
283 h.stop();
284 thread::sleep(time::Duration::from_millis(1));
285
286 assert_eq!(false, h.is_alive());
287 assert_eq!(true, h.is_started());
288
289 latch.clone().wait();
290}