runtime/runtime/
mod.rs

1use std::{clone::Clone, sync::Arc, thread};
2
3use crossbeam_channel::{unbounded, Receiver, Sender};
4use parking_lot::Mutex;
5
6use crate::{Installer, RuntimeError, Status, ThreadStatuses, Threadable};
7
8const RUNTIME_THREAD_NAME: &str = "runtime";
9
10/// A system the manages the lifetime of threads. This includes ensuring errors are handled, threads are paused and
11/// resumed on request and that once the main application is completed, all threads complete and end.
12#[allow(missing_debug_implementations)]
13pub struct Runtime<'runtime> {
14	receiver: Receiver<(String, Status)>,
15	sender: Sender<(String, Status)>,
16	thread_statuses: ThreadStatuses,
17	threadables: Arc<Mutex<Vec<&'runtime mut dyn Threadable>>>,
18}
19
20impl<'runtime> Runtime<'runtime> {
21	/// Create a new instances of the `Runtime`.
22	#[inline]
23	#[must_use]
24	pub fn new(thread_statuses: ThreadStatuses) -> Self {
25		let (sender, receiver) = unbounded();
26
27		thread_statuses.register_thread(RUNTIME_THREAD_NAME, Status::Waiting);
28
29		Self {
30			receiver,
31			sender,
32			thread_statuses,
33			threadables: Arc::new(Mutex::new(vec![])),
34		}
35	}
36
37	/// Get a cloned copy of the `ThreadStatuses`.
38	#[inline]
39	#[must_use]
40	pub fn statuses(&self) -> ThreadStatuses {
41		self.thread_statuses.clone()
42	}
43
44	/// Register a new `Threadable`.
45	#[inline]
46	pub fn register(&self, threadable: &'runtime mut (dyn Threadable)) {
47		self.threadables.lock().push(threadable);
48	}
49
50	/// Join the runtime thread, waiting for all threads to finish.
51	///
52	/// # Errors
53	/// Returns and error if any of the threads registered to the runtime produce an error.
54	#[inline]
55	pub fn join(&self) -> Result<(), RuntimeError> {
56		let installer = Installer::new(self.thread_statuses.clone(), self.sender.clone());
57		{
58			let threadables = self.threadables.lock();
59			for threadable in threadables.iter() {
60				threadable.install(&installer);
61			}
62		}
63		let mut handles = vec![];
64
65		for (name, op) in installer.into_ops().drain() {
66			handles.push(
67				thread::Builder::new()
68					.name(String::from(name.as_str()))
69					.spawn(op)
70					.map_err(|_err| RuntimeError::ThreadSpawnError(name))?,
71			);
72		}
73
74		let mut result = Ok(());
75
76		for (name, status) in &self.receiver {
77			match status {
78				Status::Error(err) => {
79					// since we entered an error state, we attempt to shutdown the other threads, but
80					// they could fail due to the error state, but keeping the shutdown error is less
81					// important than the original error.
82					let _result = self.shutdown();
83					result = Err(err);
84					break;
85				},
86				Status::RequestPause => {
87					for threadable in self.threadables.lock().iter() {
88						threadable.pause();
89					}
90				},
91				Status::RequestResume => {
92					for threadable in self.threadables.lock().iter() {
93						threadable.resume();
94					}
95				},
96				Status::RequestEnd => {
97					self.thread_statuses.update_thread(RUNTIME_THREAD_NAME, Status::Ended);
98					for threadable in self.threadables.lock().iter() {
99						threadable.end();
100					}
101				},
102				Status::New | Status::Busy | Status::Waiting | Status::Ended => {},
103			}
104
105			self.thread_statuses.update_thread(name.as_str(), status);
106
107			if self.thread_statuses.all_ended() {
108				result = self.shutdown();
109				break;
110			}
111		}
112
113		while let Some(handle) = handles.pop() {
114			let _result = handle.join();
115		}
116
117		result
118	}
119
120	#[inline]
121	fn shutdown(&self) -> Result<(), RuntimeError> {
122		if self.thread_statuses.all_ended() {
123			return Ok(());
124		}
125
126		for threadable in self.threadables.lock().iter() {
127			threadable.end();
128		}
129		self.sender
130			.send((String::from(RUNTIME_THREAD_NAME), Status::Ended))
131			.map_err(|_err| RuntimeError::SendError)
132	}
133}
134
135#[cfg(test)]
136mod tests {
137	use std::{
138		sync::atomic::{AtomicBool, Ordering},
139		thread::sleep,
140		time::Duration,
141	};
142
143	use claim::assert_err;
144
145	use super::*;
146
147	#[test]
148	fn run_thread_finish() {
149		struct Thread;
150
151		impl Thread {
152			const fn new() -> Self {
153				Self {}
154			}
155		}
156
157		impl Threadable for Thread {
158			fn install(&self, installer: &Installer) {
159				installer.spawn("name", |notifier| {
160					move || {
161						notifier.end();
162						notifier.request_end();
163					}
164				});
165			}
166		}
167
168		let runtime = Runtime::new(ThreadStatuses::new());
169		let mut thread = Thread::new();
170		runtime.register(&mut thread);
171		runtime.join().unwrap();
172		assert!(runtime.statuses().all_ended());
173	}
174
175	#[test]
176	fn run_thread_error() {
177		struct Thread1;
178
179		impl Thread1 {
180			const fn new() -> Self {
181				Self {}
182			}
183		}
184
185		impl Threadable for Thread1 {
186			fn install(&self, installer: &Installer) {
187				installer.spawn("name0", |notifier| {
188					move || {
189						notifier.error(RuntimeError::ThreadError(String::from("error")));
190					}
191				});
192			}
193		}
194
195		struct Thread2 {
196			ended: Arc<AtomicBool>,
197		}
198
199		impl Thread2 {
200			fn new() -> Self {
201				Self {
202					ended: Arc::new(AtomicBool::new(false)),
203				}
204			}
205		}
206
207		impl Threadable for Thread2 {
208			fn install(&self, installer: &Installer) {
209				let ended = Arc::clone(&self.ended);
210				installer.spawn("name1", |notifier| {
211					move || {
212						while !ended.load(Ordering::Acquire) {
213							sleep(Duration::from_millis(10));
214						}
215						notifier.end();
216					}
217				});
218			}
219
220			fn end(&self) {
221				self.ended.store(true, Ordering::Release);
222			}
223		}
224
225		let runtime = Runtime::new(ThreadStatuses::new());
226		let mut thread1 = Thread1::new();
227		let mut thread2 = Thread2::new();
228		runtime.register(&mut thread1);
229		runtime.register(&mut thread2);
230		assert_err!(runtime.join());
231	}
232
233	#[test]
234	fn run_thread_request_pause() {
235		struct Thread1;
236
237		impl Thread1 {
238			const fn new() -> Self {
239				Self {}
240			}
241		}
242
243		impl Threadable for Thread1 {
244			fn install(&self, installer: &Installer) {
245				installer.spawn("name0", |notifier| {
246					move || {
247						notifier.request_pause();
248						notifier.end();
249					}
250				});
251			}
252		}
253
254		struct Thread2 {
255			paused: Arc<AtomicBool>,
256		}
257
258		impl Thread2 {
259			fn new() -> Self {
260				Self {
261					paused: Arc::new(AtomicBool::new(false)),
262				}
263			}
264		}
265
266		impl Threadable for Thread2 {
267			fn install(&self, installer: &Installer) {
268				let paused = Arc::clone(&self.paused);
269				installer.spawn("name1", |notifier| {
270					move || {
271						while !paused.load(Ordering::Acquire) {
272							sleep(Duration::from_millis(10));
273						}
274						notifier.end();
275						notifier.request_end();
276					}
277				});
278			}
279
280			fn pause(&self) {
281				self.paused.store(true, Ordering::Release);
282			}
283		}
284
285		let runtime = Runtime::new(ThreadStatuses::new());
286		let mut thread1 = Thread1::new();
287		let mut thread2 = Thread2::new();
288		runtime.register(&mut thread1);
289		runtime.register(&mut thread2);
290		runtime.join().unwrap();
291		assert!(thread2.paused.load(Ordering::Acquire));
292	}
293
294	#[test]
295	fn run_thread_request_resume() {
296		struct Thread1;
297
298		impl Thread1 {
299			const fn new() -> Self {
300				Self {}
301			}
302		}
303
304		impl Threadable for Thread1 {
305			fn install(&self, installer: &Installer) {
306				installer.spawn("name0", |notifier| {
307					move || {
308						notifier.request_resume();
309						notifier.end();
310					}
311				});
312			}
313		}
314
315		struct Thread2 {
316			resumed: Arc<AtomicBool>,
317		}
318
319		impl Thread2 {
320			fn new() -> Self {
321				Self {
322					resumed: Arc::new(AtomicBool::new(false)),
323				}
324			}
325		}
326
327		impl Threadable for Thread2 {
328			fn install(&self, installer: &Installer) {
329				let resumed = Arc::clone(&self.resumed);
330				installer.spawn("name1", |notifier| {
331					move || {
332						while !resumed.load(Ordering::Acquire) {
333							sleep(Duration::from_millis(10));
334						}
335						notifier.end();
336						notifier.request_end();
337					}
338				});
339			}
340
341			fn resume(&self) {
342				self.resumed.store(true, Ordering::Release);
343			}
344		}
345
346		let runtime = Runtime::new(ThreadStatuses::new());
347		let mut thread1 = Thread1::new();
348		let mut thread2 = Thread2::new();
349		runtime.register(&mut thread1);
350		runtime.register(&mut thread2);
351		runtime.join().unwrap();
352		assert!(thread2.resumed.load(Ordering::Acquire));
353	}
354
355	#[test]
356	fn run_thread_request_end() {
357		struct Thread1;
358
359		impl Thread1 {
360			const fn new() -> Self {
361				Self {}
362			}
363		}
364
365		impl Threadable for Thread1 {
366			fn install(&self, installer: &Installer) {
367				installer.spawn("name0", |notifier| {
368					move || {
369						notifier.request_end();
370						notifier.end();
371					}
372				});
373			}
374		}
375
376		struct Thread2 {
377			ended: Arc<AtomicBool>,
378		}
379
380		impl Thread2 {
381			fn new() -> Self {
382				Self {
383					ended: Arc::new(AtomicBool::new(false)),
384				}
385			}
386		}
387
388		impl Threadable for Thread2 {
389			fn install(&self, installer: &Installer) {
390				let ended = Arc::clone(&self.ended);
391				installer.spawn("name1", |notifier| {
392					move || {
393						while !ended.load(Ordering::Acquire) {
394							sleep(Duration::from_millis(10));
395						}
396						notifier.end();
397					}
398				});
399			}
400
401			fn end(&self) {
402				self.ended.store(true, Ordering::Release);
403			}
404		}
405
406		let runtime = Runtime::new(ThreadStatuses::new());
407		let mut thread1 = Thread1::new();
408		let mut thread2 = Thread2::new();
409		runtime.register(&mut thread1);
410		runtime.register(&mut thread2);
411		runtime.join().unwrap();
412		assert!(thread2.ended.load(Ordering::Acquire));
413	}
414}