1use alloc::string::String;
3use bus::BusReader;
4use std::boxed::Box;
5use std::sync::mpsc::TryRecvError;
6use std::thread::JoinHandle;
7use std::time::Duration;
8use std::vec;
9use std::vec::Vec;
10use std::{io, thread};
11
12#[derive(Debug, PartialEq, Eq)]
13pub enum OpResult {
14 Ok,
15 TerminationRequested,
16}
17
18pub enum ExecutionType {
19 Infinite,
20 Cycles(u32),
21 OneShot,
22}
23
24pub trait Executable: Send {
25 type Error;
26
27 fn exec_type(&self) -> ExecutionType;
28 fn task_name(&self) -> &'static str;
29 fn periodic_op(&mut self, op_code: i32) -> Result<OpResult, Self::Error>;
30}
31
32pub fn exec_sched_single<T: Executable<Error = E> + Send + 'static + ?Sized, E: Send + 'static>(
43 mut executable: Box<T>,
44 task_freq: Option<Duration>,
45 op_code: i32,
46 mut termination: Option<BusReader<()>>,
47) -> Result<JoinHandle<Result<OpResult, E>>, io::Error> {
48 let mut cycle_count = 0;
49 thread::Builder::new()
50 .name(String::from(executable.task_name()))
51 .spawn(move || loop {
52 if let Some(ref mut terminator) = termination {
53 match terminator.try_recv() {
54 Ok(_) | Err(TryRecvError::Disconnected) => {
55 return Ok(OpResult::Ok);
56 }
57 Err(TryRecvError::Empty) => (),
58 }
59 }
60 match executable.exec_type() {
61 ExecutionType::OneShot => {
62 executable.periodic_op(op_code)?;
63 return Ok(OpResult::Ok);
64 }
65 ExecutionType::Infinite => {
66 executable.periodic_op(op_code)?;
67 }
68 ExecutionType::Cycles(cycles) => {
69 executable.periodic_op(op_code)?;
70 cycle_count += 1;
71 if cycle_count == cycles {
72 return Ok(OpResult::Ok);
73 }
74 }
75 }
76 if let Some(freq) = task_freq {
77 thread::sleep(freq);
78 }
79 })
80}
81
82pub fn exec_sched_multi<T: Executable<Error = E> + Send + 'static + ?Sized, E: Send + 'static>(
92 task_name: &'static str,
93 mut executable_vec: Vec<Box<T>>,
94 task_freq: Option<Duration>,
95 op_code: i32,
96 mut termination: Option<BusReader<()>>,
97) -> Result<JoinHandle<Result<OpResult, E>>, io::Error> {
98 let mut cycle_counts = vec![0; executable_vec.len()];
99 let mut removal_flags = vec![false; executable_vec.len()];
100
101 thread::Builder::new()
102 .name(String::from(task_name))
103 .spawn(move || loop {
104 if let Some(ref mut terminator) = termination {
105 match terminator.try_recv() {
106 Ok(_) | Err(TryRecvError::Disconnected) => {
107 removal_flags.iter_mut().for_each(|x| *x = true);
108 }
109 Err(TryRecvError::Empty) => (),
110 }
111 }
112 for (idx, executable) in executable_vec.iter_mut().enumerate() {
113 match executable.exec_type() {
114 ExecutionType::OneShot => {
115 executable.periodic_op(op_code)?;
116 removal_flags[idx] = true;
117 }
118 ExecutionType::Infinite => {
119 executable.periodic_op(op_code)?;
120 }
121 ExecutionType::Cycles(cycles) => {
122 executable.periodic_op(op_code)?;
123 cycle_counts[idx] += 1;
124 if cycle_counts[idx] == cycles {
125 removal_flags[idx] = true;
126 }
127 }
128 }
129 }
130 let mut removal_iter = removal_flags.iter();
131 executable_vec.retain(|_| !*removal_iter.next().unwrap());
132 removal_iter = removal_flags.iter();
133 cycle_counts.retain(|_| !*removal_iter.next().unwrap());
134 removal_flags.retain(|&i| !i);
135 if executable_vec.is_empty() {
136 return Ok(OpResult::Ok);
137 }
138 let freq = task_freq.unwrap_or_else(|| panic!("No task frequency specified"));
139 thread::sleep(freq);
140 })
141}
142
143#[cfg(test)]
144mod tests {
145 use super::{exec_sched_multi, exec_sched_single, Executable, ExecutionType, OpResult};
146 use bus::Bus;
147 use std::boxed::Box;
148 use std::error::Error;
149 use std::string::{String, ToString};
150 use std::sync::{Arc, Mutex};
151 use std::time::Duration;
152 use std::vec::Vec;
153 use std::{fmt, thread, vec};
154
155 struct TestInfo {
156 exec_num: u32,
157 op_code: i32,
158 }
159 struct OneShotTask {
160 exec_num: Arc<Mutex<TestInfo>>,
161 }
162 struct FixedCyclesTask {
163 cycles: u32,
164 exec_num: Arc<Mutex<TestInfo>>,
165 }
166 struct PeriodicTask {
167 exec_num: Arc<Mutex<TestInfo>>,
168 }
169
170 #[derive(Clone, Debug)]
171 struct ExampleError {
172 kind: ErrorKind,
173 }
174
175 #[derive(Clone, Debug)]
177 pub enum ErrorKind {
178 Generic(String, i32),
179 }
180
181 impl ExampleError {
182 fn new(msg: &str, code: i32) -> ExampleError {
183 ExampleError {
184 kind: ErrorKind::Generic(msg.to_string(), code),
185 }
186 }
187
188 pub fn kind(&self) -> &ErrorKind {
190 &self.kind
191 }
192 }
193
194 impl fmt::Display for ExampleError {
195 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
196 match self.kind() {
197 ErrorKind::Generic(str, code) => {
198 write!(f, "{str} with code {code}")
199 }
200 }
201 }
202 }
203
204 impl Error for ExampleError {}
205
206 const ONE_SHOT_TASK_NAME: &str = "One Shot Task";
207
208 impl Executable for OneShotTask {
209 type Error = ExampleError;
210
211 fn exec_type(&self) -> ExecutionType {
212 ExecutionType::OneShot
213 }
214
215 fn task_name(&self) -> &'static str {
216 ONE_SHOT_TASK_NAME
217 }
218
219 fn periodic_op(&mut self, op_code: i32) -> Result<OpResult, ExampleError> {
220 let mut data = self.exec_num.lock().expect("Locking Mutex failed");
221 data.exec_num += 1;
222 data.op_code = op_code;
223 std::mem::drop(data);
224 if op_code >= 0 {
225 Ok(OpResult::Ok)
226 } else {
227 Err(ExampleError::new("One Shot Task Failure", op_code))
228 }
229 }
230 }
231
232 const CYCLE_TASK_NAME: &str = "Fixed Cycles Task";
233
234 impl Executable for FixedCyclesTask {
235 type Error = ExampleError;
236
237 fn exec_type(&self) -> ExecutionType {
238 ExecutionType::Cycles(self.cycles)
239 }
240
241 fn task_name(&self) -> &'static str {
242 CYCLE_TASK_NAME
243 }
244
245 fn periodic_op(&mut self, op_code: i32) -> Result<OpResult, ExampleError> {
246 let mut data = self.exec_num.lock().expect("Locking Mutex failed");
247 data.exec_num += 1;
248 data.op_code = op_code;
249 std::mem::drop(data);
250 if op_code >= 0 {
251 Ok(OpResult::Ok)
252 } else {
253 Err(ExampleError::new("Fixed Cycle Task Failure", op_code))
254 }
255 }
256 }
257
258 const PERIODIC_TASK_NAME: &str = "Periodic Task";
259
260 impl Executable for PeriodicTask {
261 type Error = ExampleError;
262
263 fn exec_type(&self) -> ExecutionType {
264 ExecutionType::Infinite
265 }
266
267 fn task_name(&self) -> &'static str {
268 PERIODIC_TASK_NAME
269 }
270
271 fn periodic_op(&mut self, op_code: i32) -> Result<OpResult, ExampleError> {
272 let mut data = self.exec_num.lock().expect("Locking Mutex failed");
273 data.exec_num += 1;
274 data.op_code = op_code;
275 std::mem::drop(data);
276 if op_code >= 0 {
277 Ok(OpResult::Ok)
278 } else {
279 Err(ExampleError::new("Example Task Failure", op_code))
280 }
281 }
282 }
283
284 #[test]
285 fn test_simple_one_shot() {
286 let expected_op_code = 42;
287 let shared = Arc::new(Mutex::new(TestInfo {
288 exec_num: 0,
289 op_code: 0,
290 }));
291 let exec_task = OneShotTask {
292 exec_num: shared.clone(),
293 };
294 let task = Box::new(exec_task);
295 let jhandle = exec_sched_single(
296 task,
297 Some(Duration::from_millis(100)),
298 expected_op_code,
299 None,
300 )
301 .expect("thread creation failed");
302 let thread_res = jhandle.join().expect("One Shot Task failed");
303 assert!(thread_res.is_ok());
304 assert_eq!(thread_res.unwrap(), OpResult::Ok);
305 let data = shared.lock().expect("Locking Mutex failed");
306 assert_eq!(data.exec_num, 1);
307 assert_eq!(data.op_code, expected_op_code);
308 }
309
310 #[test]
311 fn test_failed_one_shot() {
312 let op_code_inducing_failure = -1;
313 let shared = Arc::new(Mutex::new(TestInfo {
314 exec_num: 0,
315 op_code: 0,
316 }));
317 let exec_task = OneShotTask {
318 exec_num: shared.clone(),
319 };
320 let task = Box::new(exec_task);
321 let jhandle = exec_sched_single(
322 task,
323 Some(Duration::from_millis(100)),
324 op_code_inducing_failure,
325 None,
326 )
327 .expect("thread creation failed");
328 let thread_res = jhandle.join().expect("One Shot Task failed");
329 assert!(thread_res.is_err());
330 let error = thread_res.unwrap_err();
331 let err = error.kind();
332 assert!(matches!(err, &ErrorKind::Generic { .. }));
333 match err {
334 ErrorKind::Generic(str, op_code) => {
335 assert_eq!(str, &String::from("One Shot Task Failure"));
336 assert_eq!(op_code, &op_code_inducing_failure);
337 }
338 }
339 let error_display = error.to_string();
340 assert_eq!(error_display, "One Shot Task Failure with code -1");
341 let data = shared.lock().expect("Locking Mutex failed");
342 assert_eq!(data.exec_num, 1);
343 assert_eq!(data.op_code, op_code_inducing_failure);
344 }
345
346 #[test]
347 fn test_simple_multi_one_shot() {
348 let expected_op_code = 43;
349 let shared = Arc::new(Mutex::new(TestInfo {
350 exec_num: 0,
351 op_code: 0,
352 }));
353 let exec_task_0 = OneShotTask {
354 exec_num: shared.clone(),
355 };
356 let exec_task_1 = OneShotTask {
357 exec_num: shared.clone(),
358 };
359 let task_vec = vec![Box::new(exec_task_0), Box::new(exec_task_1)];
360 for task in task_vec.iter() {
361 assert_eq!(task.task_name(), ONE_SHOT_TASK_NAME);
362 }
363 let jhandle = exec_sched_multi(
364 "multi-task-name",
365 task_vec,
366 Some(Duration::from_millis(100)),
367 expected_op_code,
368 None,
369 )
370 .expect("thread creation failed");
371 let thread_res = jhandle.join().expect("One Shot Task failed");
372 assert!(thread_res.is_ok());
373 assert_eq!(thread_res.unwrap(), OpResult::Ok);
374 let data = shared.lock().expect("Locking Mutex failed");
375 assert_eq!(data.exec_num, 2);
376 assert_eq!(data.op_code, expected_op_code);
377 }
378
379 #[test]
380 fn test_cycles_single() {
381 let expected_op_code = 44;
382 let shared = Arc::new(Mutex::new(TestInfo {
383 exec_num: 0,
384 op_code: 0,
385 }));
386 let cycled_task = Box::new(FixedCyclesTask {
387 exec_num: shared.clone(),
388 cycles: 1,
389 });
390 assert_eq!(cycled_task.task_name(), CYCLE_TASK_NAME);
391 let jh = exec_sched_single(
392 cycled_task,
393 Some(Duration::from_millis(100)),
394 expected_op_code,
395 None,
396 )
397 .expect("thread creation failed");
398 let thread_res = jh.join().expect("Cycles Task failed");
399 assert!(thread_res.is_ok());
400 let data = shared.lock().expect("Locking Mutex failed");
401 assert_eq!(thread_res.unwrap(), OpResult::Ok);
402 assert_eq!(data.exec_num, 1);
403 assert_eq!(data.op_code, expected_op_code);
404 }
405
406 #[test]
407 fn test_single_and_cycles() {
408 let expected_op_code = 50;
409 let shared = Arc::new(Mutex::new(TestInfo {
410 exec_num: 0,
411 op_code: 0,
412 }));
413 let one_shot_task = Box::new(OneShotTask {
414 exec_num: shared.clone(),
415 });
416 let cycled_task_0 = Box::new(FixedCyclesTask {
417 exec_num: shared.clone(),
418 cycles: 1,
419 });
420 let cycled_task_1 = Box::new(FixedCyclesTask {
421 exec_num: shared.clone(),
422 cycles: 1,
423 });
424 assert_eq!(cycled_task_0.task_name(), CYCLE_TASK_NAME);
425 assert_eq!(one_shot_task.task_name(), ONE_SHOT_TASK_NAME);
426 let task_vec: Vec<Box<dyn Executable<Error = ExampleError>>> =
427 vec![one_shot_task, cycled_task_0, cycled_task_1];
428 let jh = exec_sched_multi(
429 "multi-task-name",
430 task_vec,
431 Some(Duration::from_millis(100)),
432 expected_op_code,
433 None,
434 )
435 .expect("thread creation failed");
436 let thread_res = jh.join().expect("Cycles Task failed");
437 assert!(thread_res.is_ok());
438 let data = shared.lock().expect("Locking Mutex failed");
439 assert_eq!(thread_res.unwrap(), OpResult::Ok);
440 assert_eq!(data.exec_num, 3);
441 assert_eq!(data.op_code, expected_op_code);
442 }
443
444 #[test]
445 #[ignore]
446 fn test_periodic_single() {
447 let mut terminator = Bus::new(5);
448 let expected_op_code = 45;
449 let shared = Arc::new(Mutex::new(TestInfo {
450 exec_num: 0,
451 op_code: 0,
452 }));
453 let periodic_task = Box::new(PeriodicTask {
454 exec_num: shared.clone(),
455 });
456 assert_eq!(periodic_task.task_name(), PERIODIC_TASK_NAME);
457 let jh = exec_sched_single(
458 periodic_task,
459 Some(Duration::from_millis(20)),
460 expected_op_code,
461 Some(terminator.add_rx()),
462 )
463 .expect("thread creation failed");
464 thread::sleep(Duration::from_millis(40));
465 terminator.broadcast(());
466 let thread_res = jh.join().expect("Periodic Task failed");
467 assert!(thread_res.is_ok());
468 let data = shared.lock().expect("Locking Mutex failed");
469 assert_eq!(thread_res.unwrap(), OpResult::Ok);
470 let range = 2..4;
471 assert!(range.contains(&data.exec_num));
472 assert_eq!(data.op_code, expected_op_code);
473 }
474
475 #[test]
476 #[ignore]
477 fn test_periodic_multi() {
478 let mut terminator = Bus::new(5);
479 let expected_op_code = 46;
480 let shared = Arc::new(Mutex::new(TestInfo {
481 exec_num: 0,
482 op_code: 0,
483 }));
484 let cycled_task = Box::new(FixedCyclesTask {
485 exec_num: shared.clone(),
486 cycles: 1,
487 });
488 let periodic_task_0 = Box::new(PeriodicTask {
489 exec_num: shared.clone(),
490 });
491 let periodic_task_1 = Box::new(PeriodicTask {
492 exec_num: shared.clone(),
493 });
494 assert_eq!(periodic_task_0.task_name(), PERIODIC_TASK_NAME);
495 assert_eq!(periodic_task_1.task_name(), PERIODIC_TASK_NAME);
496 let task_vec: Vec<Box<dyn Executable<Error = ExampleError>>> =
497 vec![cycled_task, periodic_task_0, periodic_task_1];
498 let jh = exec_sched_multi(
499 "multi-task-name",
500 task_vec,
501 Some(Duration::from_millis(20)),
502 expected_op_code,
503 Some(terminator.add_rx()),
504 )
505 .expect("thread creation failed");
506 thread::sleep(Duration::from_millis(60));
507 terminator.broadcast(());
508 let thread_res = jh.join().expect("Periodic Task failed");
509 assert!(thread_res.is_ok());
510 let data = shared.lock().expect("Locking Mutex failed");
511 assert_eq!(thread_res.unwrap(), OpResult::Ok);
512 let range = 7..11;
513 assert!(range.contains(&data.exec_num));
514 assert_eq!(data.op_code, expected_op_code);
515 }
516}