fp_rust/
handler.rs

1/*!
2In this module there're implementations & tests of `Handler`.
3(Inspired by `Android Handler`)
4*/
5use std::sync::{
6    atomic::{AtomicBool, Ordering},
7    Arc, Mutex,
8};
9use std::thread;
10
11use super::common::RawFunc;
12use super::sync as fpSync;
13use super::sync::Queue;
14
15/**
16`Handler` `trait` defines the interface which could receive `FnMut` and run them on its own `thread`.
17
18# Remarks
19
20This is highly inspired by `Android Handler` concepts.
21
22*/
23pub trait Handler: Send + Sync + 'static {
24    /**
25    Did this `Handler` start?
26    Return `true` when it did started (no matter it has stopped or not)
27
28    */
29    fn is_started(&mut self) -> bool;
30
31    /**
32    Is this `Handler` alive?
33    Return `true` when it has started and not stopped yet.
34    */
35    fn is_alive(&mut self) -> bool;
36
37    /**
38    Start `Handler`.
39    */
40    fn start(&mut self);
41
42    /**
43
44    Stop `Cor`.
45    This will make self.`is_alive`() returns `false`,
46    and all `FnMut` posted by self.`post`() will not be executed.
47    (Because it has stopped :P, that's reasonable)
48
49    */
50    fn stop(&mut self);
51
52    /**
53    Post a job which will run on this `Handler`.
54
55    # Arguments
56
57    * `func` - The posted job.
58    ``
59    */
60    fn post(&mut self, func: RawFunc);
61}
62
63/**
64`HandlerThread` could receive `FnMut` and run them on its own `thread`.
65It implements `Handler` `trait` simply and works well.
66
67This is kind of facade which just handles `thread`,
68and bypasses jobs to `HandlerThreadInner`(private implementations).
69
70# Remarks
71
72This is highly inspired by `Android Handler` concepts.
73
74*/
75#[derive(Clone)]
76pub struct HandlerThread {
77    started_alive: Arc<Mutex<(AtomicBool, AtomicBool)>>,
78
79    inner: Arc<HandlerThreadInner>,
80
81    handle: Arc<Mutex<Option<thread::JoinHandle<()>>>>,
82}
83
84impl Default for HandlerThread {
85    fn default() -> Self {
86        HandlerThread {
87            started_alive: Arc::new(Mutex::new((AtomicBool::new(false), AtomicBool::new(false)))),
88            inner: Arc::new(HandlerThreadInner::new()),
89
90            handle: Arc::new(Mutex::new(None)),
91        }
92    }
93}
94
95impl HandlerThread {
96    pub fn new() -> HandlerThread {
97        Default::default()
98    }
99    pub fn new_with_mutex() -> Arc<Mutex<HandlerThread>> {
100        Arc::new(Mutex::new(HandlerThread::new()))
101    }
102}
103
104impl Handler for HandlerThread {
105    fn is_started(&mut self) -> bool {
106        let started_alive = self.started_alive.lock().unwrap();
107        let &(ref started, _) = &*started_alive;
108        started.load(Ordering::SeqCst)
109    }
110
111    fn is_alive(&mut self) -> bool {
112        let started_alive = self.started_alive.lock().unwrap();
113        let &(_, ref alive) = &*started_alive;
114        alive.load(Ordering::SeqCst)
115    }
116
117    fn start(&mut self) {
118        {
119            let started_alive = self.started_alive.lock().unwrap();
120            let &(ref started, ref alive) = &*started_alive;
121
122            if started.load(Ordering::SeqCst) {
123                return;
124            }
125            started.store(true, Ordering::SeqCst);
126            if alive.load(Ordering::SeqCst) {
127                return;
128            }
129            alive.store(true, Ordering::SeqCst);
130        }
131
132        let mut _inner = self.inner.clone();
133        let mut this = self.clone();
134        self.handle = Arc::new(Mutex::new(Some(thread::spawn(move || {
135            Arc::make_mut(&mut _inner).start();
136
137            this.stop();
138        }))));
139    }
140
141    fn stop(&mut self) {
142        {
143            let started_alive = self.started_alive.lock().unwrap();
144            let &(ref started, ref alive) = &*started_alive;
145
146            if !started.load(Ordering::SeqCst) {
147                return;
148            }
149            if !alive.load(Ordering::SeqCst) {
150                return;
151            }
152            alive.store(false, Ordering::SeqCst);
153        }
154
155        Arc::make_mut(&mut self.inner).stop();
156
157        // NOTE: Kill thread <- OS depending
158        // let mut handle = self.handle.lock().unwrap();
159        // handle
160        //     .take()
161        //     .expect("Called stop on non-running thread")
162        //     .join()
163        //     .expect("Could not join spawned thread");
164    }
165
166    fn post(&mut self, func: RawFunc) {
167        Arc::make_mut(&mut self.inner).post(func);
168    }
169}
170
171#[derive(Clone)]
172struct HandlerThreadInner {
173    // this: Option<Arc<HandlerThreadInner>>,
174    started: Arc<AtomicBool>,
175    alive: Arc<AtomicBool>,
176    q: Arc<fpSync::BlockingQueue<RawFunc>>,
177}
178
179impl HandlerThreadInner {
180    pub fn new() -> HandlerThreadInner {
181        HandlerThreadInner {
182            started: Arc::new(AtomicBool::new(false)),
183            alive: Arc::new(AtomicBool::new(false)),
184            q: Arc::new(<fpSync::BlockingQueue<RawFunc>>::new()),
185        }
186    }
187}
188
189impl Handler for HandlerThreadInner {
190    fn is_started(&mut self) -> bool {
191        self.started.load(Ordering::SeqCst)
192    }
193
194    fn is_alive(&mut self) -> bool {
195        self.alive.load(Ordering::SeqCst)
196    }
197
198    fn start(&mut self) {
199        self.alive.store(true, Ordering::SeqCst);
200
201        if self.is_started() {
202            return;
203        }
204        self.started.store(true, Ordering::SeqCst);
205
206        let q = Arc::make_mut(&mut self.q);
207
208        while self.alive.load(Ordering::SeqCst) {
209            let v = q.take();
210
211            match v {
212                Some(f) => {
213                    f.invoke();
214                }
215                None => {
216                    self.alive.store(false, Ordering::SeqCst);
217                }
218            }
219        }
220    }
221
222    fn stop(&mut self) {
223        self.alive.store(false, Ordering::SeqCst);
224    }
225
226    fn post(&mut self, func: RawFunc) {
227        let q = Arc::make_mut(&mut self.q);
228
229        q.put(func);
230    }
231}
232
233#[test]
234fn test_handler_new() {
235    use super::sync::CountDownLatch;
236    use std::time;
237
238    let mut _h = HandlerThread::new_with_mutex();
239    let mut h = _h.lock().unwrap();
240
241    assert_eq!(false, h.is_alive());
242    assert_eq!(false, h.is_started());
243
244    h.stop();
245    h.stop();
246    assert_eq!(false, h.is_alive());
247    assert_eq!(false, h.is_started());
248    // let mut h1 = _h.clone();
249    h.start();
250    assert_eq!(true, h.is_alive());
251    assert_eq!(true, h.is_started());
252
253    let latch = CountDownLatch::new(1);
254    let latch2 = latch.clone();
255
256    // /*
257    h.post(RawFunc::new(move || {
258        println!("Executed !");
259
260        let latch3 = latch2.clone();
261
262        let mut _h2 = HandlerThread::new_with_mutex();
263        let mut _h2_inside = _h2.clone();
264
265        let mut h2 = _h2.lock().unwrap();
266        h2.start();
267
268        h2.post(RawFunc::new(move || {
269            latch3.countdown();
270
271            {
272                _h2_inside.lock().unwrap().stop();
273            }
274        }));
275    }));
276    println!("Test");
277
278    thread::sleep(time::Duration::from_millis(1));
279
280    assert_eq!(true, h.is_alive());
281    assert_eq!(true, h.is_started());
282
283    h.stop();
284    thread::sleep(time::Duration::from_millis(1));
285
286    assert_eq!(false, h.is_alive());
287    assert_eq!(true, h.is_started());
288
289    latch.clone().wait();
290}