runloop/
lib.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::io;
6use std::sync::{Arc, Mutex, Weak};
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::thread::{Builder, JoinHandle};
9use std::time::{Duration, Instant};
10
11struct Canary {
12    alive: AtomicBool,
13    thread: Mutex<Option<JoinHandle<()>>>,
14}
15
16impl Canary {
17    fn new() -> Self {
18        Self {
19            alive: AtomicBool::new(true),
20            thread: Mutex::new(None),
21        }
22    }
23}
24
25pub struct RunLoop {
26    flag: Weak<Canary>,
27}
28
29impl RunLoop {
30    pub fn new<F, T>(fun: F) -> io::Result<Self>
31    where
32        F: FnOnce(&Fn() -> bool) -> T,
33        F: Send + 'static,
34    {
35        Self::new_with_timeout(fun, 0 /* no timeout */)
36    }
37
38    pub fn new_with_timeout<F, T>(fun: F, timeout_ms: u64) -> io::Result<Self>
39    where
40        F: FnOnce(&Fn() -> bool) -> T,
41        F: Send + 'static,
42    {
43        let flag = Arc::new(Canary::new());
44        let flag_ = flag.clone();
45
46        // Spawn the run loop thread.
47        let thread = Builder::new().spawn(move || {
48            let timeout = Duration::from_millis(timeout_ms);
49            let start = Instant::now();
50
51            // A callback to determine whether the thread should terminate.
52            let still_alive = || {
53                // `flag.alive` will be false after cancel() was called.
54                flag.alive.load(Ordering::Relaxed) &&
55                // If a timeout was provided, we'll check that too.
56                (timeout_ms == 0 || start.elapsed() < timeout)
57            };
58
59            // Ignore return values.
60            let _ = fun(&still_alive);
61        })?;
62
63        // We really should never fail to lock here.
64        let mut guard = (*flag_).thread.lock().map_err(|_| {
65            io::Error::new(io::ErrorKind::Other, "failed to lock")
66        })?;
67
68        // Store the thread handle so we can join later.
69        *guard = Some(thread);
70
71        Ok(Self { flag: Arc::downgrade(&flag_) })
72    }
73
74    // Cancels the run loop and waits for the thread to terminate.
75    // This is a potentially BLOCKING operation.
76    pub fn cancel(&self) {
77        // If the thread still exists...
78        if let Some(flag) = self.flag.upgrade() {
79            // ...let the run loop terminate.
80            flag.alive.store(false, Ordering::Relaxed);
81
82            // Locking should never fail here either.
83            if let Ok(mut guard) = flag.thread.lock() {
84                // This really can't fail.
85                if let Some(handle) = (*guard).take() {
86                    // This might fail, ignore.
87                    let _ = handle.join();
88                }
89            }
90        }
91    }
92
93    // Tells whether the runloop is alive.
94    pub fn alive(&self) -> bool {
95        // If the thread still exists...
96        if let Some(flag) = self.flag.upgrade() {
97            flag.alive.load(Ordering::Relaxed)
98        } else {
99            false
100        }
101    }
102}
103
104#[cfg(test)]
105mod tests {
106    use std::sync::{Arc, Barrier};
107    use std::sync::mpsc::channel;
108
109    use super::RunLoop;
110
111    #[test]
112    fn test_empty() {
113        // Create a runloop that exits right away.
114        let rloop = RunLoop::new(|_| {}).unwrap();
115        while rloop.alive() { /* wait */ }
116        rloop.cancel(); // noop
117    }
118
119    #[test]
120    fn test_cancel_early() {
121        // Create a runloop and cancel it before the thread spawns.
122        RunLoop::new(|alive| assert!(!alive())).unwrap().cancel();
123    }
124
125    #[test]
126    fn test_cancel_endless_loop() {
127        let barrier = Arc::new(Barrier::new(2));
128        let b = barrier.clone();
129
130        // Create a runloop that never exits.
131        let rloop = RunLoop::new(move |alive| {
132            b.wait();
133            while alive() { /* loop */ }
134        }).unwrap();
135
136        barrier.wait();
137        assert!(rloop.alive());
138        rloop.cancel();
139        assert!(!rloop.alive());
140    }
141
142    #[test]
143    fn test_timeout() {
144        // Create a runloop that never exits, but times out after 1ms.
145        let rloop = RunLoop::new_with_timeout(|alive| while alive() {}, 1).unwrap();
146
147        while rloop.alive() { /* wait */ }
148        assert!(!rloop.alive());
149        rloop.cancel(); // noop
150    }
151
152    #[test]
153    fn test_channel() {
154        let (tx, rx) = channel();
155
156        // A runloop that sends data via a channel.
157        let rloop = RunLoop::new(move |alive| while alive() {
158            tx.send(0u8).unwrap();
159        }).unwrap();
160
161        // Wait until the data arrives.
162        assert_eq!(rx.recv().unwrap(), 0u8);
163
164        assert!(rloop.alive());
165        rloop.cancel();
166        assert!(!rloop.alive());
167    }
168}