1use std::{clone::Clone, sync::Arc, thread};
2
3use crossbeam_channel::{unbounded, Receiver, Sender};
4use parking_lot::Mutex;
5
6use crate::{Installer, RuntimeError, Status, ThreadStatuses, Threadable};
7
8const RUNTIME_THREAD_NAME: &str = "runtime";
9
10#[allow(missing_debug_implementations)]
13pub struct Runtime<'runtime> {
14 receiver: Receiver<(String, Status)>,
15 sender: Sender<(String, Status)>,
16 thread_statuses: ThreadStatuses,
17 threadables: Arc<Mutex<Vec<&'runtime mut dyn Threadable>>>,
18}
19
20impl<'runtime> Runtime<'runtime> {
21 #[inline]
23 #[must_use]
24 pub fn new(thread_statuses: ThreadStatuses) -> Self {
25 let (sender, receiver) = unbounded();
26
27 thread_statuses.register_thread(RUNTIME_THREAD_NAME, Status::Waiting);
28
29 Self {
30 receiver,
31 sender,
32 thread_statuses,
33 threadables: Arc::new(Mutex::new(vec![])),
34 }
35 }
36
37 #[inline]
39 #[must_use]
40 pub fn statuses(&self) -> ThreadStatuses {
41 self.thread_statuses.clone()
42 }
43
44 #[inline]
46 pub fn register(&self, threadable: &'runtime mut (dyn Threadable)) {
47 self.threadables.lock().push(threadable);
48 }
49
50 #[inline]
55 pub fn join(&self) -> Result<(), RuntimeError> {
56 let installer = Installer::new(self.thread_statuses.clone(), self.sender.clone());
57 {
58 let threadables = self.threadables.lock();
59 for threadable in threadables.iter() {
60 threadable.install(&installer);
61 }
62 }
63 let mut handles = vec![];
64
65 for (name, op) in installer.into_ops().drain() {
66 handles.push(
67 thread::Builder::new()
68 .name(String::from(name.as_str()))
69 .spawn(op)
70 .map_err(|_err| RuntimeError::ThreadSpawnError(name))?,
71 );
72 }
73
74 let mut result = Ok(());
75
76 for (name, status) in &self.receiver {
77 match status {
78 Status::Error(err) => {
79 let _result = self.shutdown();
83 result = Err(err);
84 break;
85 },
86 Status::RequestPause => {
87 for threadable in self.threadables.lock().iter() {
88 threadable.pause();
89 }
90 },
91 Status::RequestResume => {
92 for threadable in self.threadables.lock().iter() {
93 threadable.resume();
94 }
95 },
96 Status::RequestEnd => {
97 self.thread_statuses.update_thread(RUNTIME_THREAD_NAME, Status::Ended);
98 for threadable in self.threadables.lock().iter() {
99 threadable.end();
100 }
101 },
102 Status::New | Status::Busy | Status::Waiting | Status::Ended => {},
103 }
104
105 self.thread_statuses.update_thread(name.as_str(), status);
106
107 if self.thread_statuses.all_ended() {
108 result = self.shutdown();
109 break;
110 }
111 }
112
113 while let Some(handle) = handles.pop() {
114 let _result = handle.join();
115 }
116
117 result
118 }
119
120 #[inline]
121 fn shutdown(&self) -> Result<(), RuntimeError> {
122 if self.thread_statuses.all_ended() {
123 return Ok(());
124 }
125
126 for threadable in self.threadables.lock().iter() {
127 threadable.end();
128 }
129 self.sender
130 .send((String::from(RUNTIME_THREAD_NAME), Status::Ended))
131 .map_err(|_err| RuntimeError::SendError)
132 }
133}
134
135#[cfg(test)]
136mod tests {
137 use std::{
138 sync::atomic::{AtomicBool, Ordering},
139 thread::sleep,
140 time::Duration,
141 };
142
143 use claim::assert_err;
144
145 use super::*;
146
147 #[test]
148 fn run_thread_finish() {
149 struct Thread;
150
151 impl Thread {
152 const fn new() -> Self {
153 Self {}
154 }
155 }
156
157 impl Threadable for Thread {
158 fn install(&self, installer: &Installer) {
159 installer.spawn("name", |notifier| {
160 move || {
161 notifier.end();
162 notifier.request_end();
163 }
164 });
165 }
166 }
167
168 let runtime = Runtime::new(ThreadStatuses::new());
169 let mut thread = Thread::new();
170 runtime.register(&mut thread);
171 runtime.join().unwrap();
172 assert!(runtime.statuses().all_ended());
173 }
174
175 #[test]
176 fn run_thread_error() {
177 struct Thread1;
178
179 impl Thread1 {
180 const fn new() -> Self {
181 Self {}
182 }
183 }
184
185 impl Threadable for Thread1 {
186 fn install(&self, installer: &Installer) {
187 installer.spawn("name0", |notifier| {
188 move || {
189 notifier.error(RuntimeError::ThreadError(String::from("error")));
190 }
191 });
192 }
193 }
194
195 struct Thread2 {
196 ended: Arc<AtomicBool>,
197 }
198
199 impl Thread2 {
200 fn new() -> Self {
201 Self {
202 ended: Arc::new(AtomicBool::new(false)),
203 }
204 }
205 }
206
207 impl Threadable for Thread2 {
208 fn install(&self, installer: &Installer) {
209 let ended = Arc::clone(&self.ended);
210 installer.spawn("name1", |notifier| {
211 move || {
212 while !ended.load(Ordering::Acquire) {
213 sleep(Duration::from_millis(10));
214 }
215 notifier.end();
216 }
217 });
218 }
219
220 fn end(&self) {
221 self.ended.store(true, Ordering::Release);
222 }
223 }
224
225 let runtime = Runtime::new(ThreadStatuses::new());
226 let mut thread1 = Thread1::new();
227 let mut thread2 = Thread2::new();
228 runtime.register(&mut thread1);
229 runtime.register(&mut thread2);
230 assert_err!(runtime.join());
231 }
232
233 #[test]
234 fn run_thread_request_pause() {
235 struct Thread1;
236
237 impl Thread1 {
238 const fn new() -> Self {
239 Self {}
240 }
241 }
242
243 impl Threadable for Thread1 {
244 fn install(&self, installer: &Installer) {
245 installer.spawn("name0", |notifier| {
246 move || {
247 notifier.request_pause();
248 notifier.end();
249 }
250 });
251 }
252 }
253
254 struct Thread2 {
255 paused: Arc<AtomicBool>,
256 }
257
258 impl Thread2 {
259 fn new() -> Self {
260 Self {
261 paused: Arc::new(AtomicBool::new(false)),
262 }
263 }
264 }
265
266 impl Threadable for Thread2 {
267 fn install(&self, installer: &Installer) {
268 let paused = Arc::clone(&self.paused);
269 installer.spawn("name1", |notifier| {
270 move || {
271 while !paused.load(Ordering::Acquire) {
272 sleep(Duration::from_millis(10));
273 }
274 notifier.end();
275 notifier.request_end();
276 }
277 });
278 }
279
280 fn pause(&self) {
281 self.paused.store(true, Ordering::Release);
282 }
283 }
284
285 let runtime = Runtime::new(ThreadStatuses::new());
286 let mut thread1 = Thread1::new();
287 let mut thread2 = Thread2::new();
288 runtime.register(&mut thread1);
289 runtime.register(&mut thread2);
290 runtime.join().unwrap();
291 assert!(thread2.paused.load(Ordering::Acquire));
292 }
293
294 #[test]
295 fn run_thread_request_resume() {
296 struct Thread1;
297
298 impl Thread1 {
299 const fn new() -> Self {
300 Self {}
301 }
302 }
303
304 impl Threadable for Thread1 {
305 fn install(&self, installer: &Installer) {
306 installer.spawn("name0", |notifier| {
307 move || {
308 notifier.request_resume();
309 notifier.end();
310 }
311 });
312 }
313 }
314
315 struct Thread2 {
316 resumed: Arc<AtomicBool>,
317 }
318
319 impl Thread2 {
320 fn new() -> Self {
321 Self {
322 resumed: Arc::new(AtomicBool::new(false)),
323 }
324 }
325 }
326
327 impl Threadable for Thread2 {
328 fn install(&self, installer: &Installer) {
329 let resumed = Arc::clone(&self.resumed);
330 installer.spawn("name1", |notifier| {
331 move || {
332 while !resumed.load(Ordering::Acquire) {
333 sleep(Duration::from_millis(10));
334 }
335 notifier.end();
336 notifier.request_end();
337 }
338 });
339 }
340
341 fn resume(&self) {
342 self.resumed.store(true, Ordering::Release);
343 }
344 }
345
346 let runtime = Runtime::new(ThreadStatuses::new());
347 let mut thread1 = Thread1::new();
348 let mut thread2 = Thread2::new();
349 runtime.register(&mut thread1);
350 runtime.register(&mut thread2);
351 runtime.join().unwrap();
352 assert!(thread2.resumed.load(Ordering::Acquire));
353 }
354
355 #[test]
356 fn run_thread_request_end() {
357 struct Thread1;
358
359 impl Thread1 {
360 const fn new() -> Self {
361 Self {}
362 }
363 }
364
365 impl Threadable for Thread1 {
366 fn install(&self, installer: &Installer) {
367 installer.spawn("name0", |notifier| {
368 move || {
369 notifier.request_end();
370 notifier.end();
371 }
372 });
373 }
374 }
375
376 struct Thread2 {
377 ended: Arc<AtomicBool>,
378 }
379
380 impl Thread2 {
381 fn new() -> Self {
382 Self {
383 ended: Arc::new(AtomicBool::new(false)),
384 }
385 }
386 }
387
388 impl Threadable for Thread2 {
389 fn install(&self, installer: &Installer) {
390 let ended = Arc::clone(&self.ended);
391 installer.spawn("name1", |notifier| {
392 move || {
393 while !ended.load(Ordering::Acquire) {
394 sleep(Duration::from_millis(10));
395 }
396 notifier.end();
397 }
398 });
399 }
400
401 fn end(&self) {
402 self.ended.store(true, Ordering::Release);
403 }
404 }
405
406 let runtime = Runtime::new(ThreadStatuses::new());
407 let mut thread1 = Thread1::new();
408 let mut thread2 = Thread2::new();
409 runtime.register(&mut thread1);
410 runtime.register(&mut thread2);
411 runtime.join().unwrap();
412 assert!(thread2.ended.load(Ordering::Acquire));
413 }
414}