1use std::io;
6use std::sync::{Arc, Mutex, Weak};
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::thread::{Builder, JoinHandle};
9use std::time::{Duration, Instant};
10
11struct Canary {
12 alive: AtomicBool,
13 thread: Mutex<Option<JoinHandle<()>>>,
14}
15
16impl Canary {
17 fn new() -> Self {
18 Self {
19 alive: AtomicBool::new(true),
20 thread: Mutex::new(None),
21 }
22 }
23}
24
25pub struct RunLoop {
26 flag: Weak<Canary>,
27}
28
29impl RunLoop {
30 pub fn new<F, T>(fun: F) -> io::Result<Self>
31 where
32 F: FnOnce(&Fn() -> bool) -> T,
33 F: Send + 'static,
34 {
35 Self::new_with_timeout(fun, 0 )
36 }
37
38 pub fn new_with_timeout<F, T>(fun: F, timeout_ms: u64) -> io::Result<Self>
39 where
40 F: FnOnce(&Fn() -> bool) -> T,
41 F: Send + 'static,
42 {
43 let flag = Arc::new(Canary::new());
44 let flag_ = flag.clone();
45
46 let thread = Builder::new().spawn(move || {
48 let timeout = Duration::from_millis(timeout_ms);
49 let start = Instant::now();
50
51 let still_alive = || {
53 flag.alive.load(Ordering::Relaxed) &&
55 (timeout_ms == 0 || start.elapsed() < timeout)
57 };
58
59 let _ = fun(&still_alive);
61 })?;
62
63 let mut guard = (*flag_).thread.lock().map_err(|_| {
65 io::Error::new(io::ErrorKind::Other, "failed to lock")
66 })?;
67
68 *guard = Some(thread);
70
71 Ok(Self { flag: Arc::downgrade(&flag_) })
72 }
73
74 pub fn cancel(&self) {
77 if let Some(flag) = self.flag.upgrade() {
79 flag.alive.store(false, Ordering::Relaxed);
81
82 if let Ok(mut guard) = flag.thread.lock() {
84 if let Some(handle) = (*guard).take() {
86 let _ = handle.join();
88 }
89 }
90 }
91 }
92
93 pub fn alive(&self) -> bool {
95 if let Some(flag) = self.flag.upgrade() {
97 flag.alive.load(Ordering::Relaxed)
98 } else {
99 false
100 }
101 }
102}
103
104#[cfg(test)]
105mod tests {
106 use std::sync::{Arc, Barrier};
107 use std::sync::mpsc::channel;
108
109 use super::RunLoop;
110
111 #[test]
112 fn test_empty() {
113 let rloop = RunLoop::new(|_| {}).unwrap();
115 while rloop.alive() { }
116 rloop.cancel(); }
118
119 #[test]
120 fn test_cancel_early() {
121 RunLoop::new(|alive| assert!(!alive())).unwrap().cancel();
123 }
124
125 #[test]
126 fn test_cancel_endless_loop() {
127 let barrier = Arc::new(Barrier::new(2));
128 let b = barrier.clone();
129
130 let rloop = RunLoop::new(move |alive| {
132 b.wait();
133 while alive() { }
134 }).unwrap();
135
136 barrier.wait();
137 assert!(rloop.alive());
138 rloop.cancel();
139 assert!(!rloop.alive());
140 }
141
142 #[test]
143 fn test_timeout() {
144 let rloop = RunLoop::new_with_timeout(|alive| while alive() {}, 1).unwrap();
146
147 while rloop.alive() { }
148 assert!(!rloop.alive());
149 rloop.cancel(); }
151
152 #[test]
153 fn test_channel() {
154 let (tx, rx) = channel();
155
156 let rloop = RunLoop::new(move |alive| while alive() {
158 tx.send(0u8).unwrap();
159 }).unwrap();
160
161 assert_eq!(rx.recv().unwrap(), 0u8);
163
164 assert!(rloop.alive());
165 rloop.cancel();
166 assert!(!rloop.alive());
167 }
168}