satrs_core/
executable.rs

1//! Task scheduling module
2use alloc::string::String;
3use bus::BusReader;
4use std::boxed::Box;
5use std::sync::mpsc::TryRecvError;
6use std::thread::JoinHandle;
7use std::time::Duration;
8use std::vec;
9use std::vec::Vec;
10use std::{io, thread};
11
12#[derive(Debug, PartialEq, Eq)]
13pub enum OpResult {
14    Ok,
15    TerminationRequested,
16}
17
18pub enum ExecutionType {
19    Infinite,
20    Cycles(u32),
21    OneShot,
22}
23
24pub trait Executable: Send {
25    type Error;
26
27    fn exec_type(&self) -> ExecutionType;
28    fn task_name(&self) -> &'static str;
29    fn periodic_op(&mut self, op_code: i32) -> Result<OpResult, Self::Error>;
30}
31
32/// This function allows executing one task which implements the [Executable] trait
33///
34/// # Arguments
35///
36/// * `executable`: Executable task
37/// * `task_freq`: Optional frequency of task. Required for periodic and fixed cycle tasks.
38///    If [None] is passed, no sleeping will be performed.
39/// * `op_code`: Operation code which is passed to the executable task
40///    [operation call][Executable::periodic_op]
41/// * `termination`: Optional termination handler which can cancel threads with a broadcast
42pub fn exec_sched_single<T: Executable<Error = E> + Send + 'static + ?Sized, E: Send + 'static>(
43    mut executable: Box<T>,
44    task_freq: Option<Duration>,
45    op_code: i32,
46    mut termination: Option<BusReader<()>>,
47) -> Result<JoinHandle<Result<OpResult, E>>, io::Error> {
48    let mut cycle_count = 0;
49    thread::Builder::new()
50        .name(String::from(executable.task_name()))
51        .spawn(move || loop {
52            if let Some(ref mut terminator) = termination {
53                match terminator.try_recv() {
54                    Ok(_) | Err(TryRecvError::Disconnected) => {
55                        return Ok(OpResult::Ok);
56                    }
57                    Err(TryRecvError::Empty) => (),
58                }
59            }
60            match executable.exec_type() {
61                ExecutionType::OneShot => {
62                    executable.periodic_op(op_code)?;
63                    return Ok(OpResult::Ok);
64                }
65                ExecutionType::Infinite => {
66                    executable.periodic_op(op_code)?;
67                }
68                ExecutionType::Cycles(cycles) => {
69                    executable.periodic_op(op_code)?;
70                    cycle_count += 1;
71                    if cycle_count == cycles {
72                        return Ok(OpResult::Ok);
73                    }
74                }
75            }
76            if let Some(freq) = task_freq {
77                thread::sleep(freq);
78            }
79        })
80}
81
82/// This function allows executing multiple tasks as long as the tasks implement the
83/// [Executable] trait
84///
85/// # Arguments
86///
87/// * `executable_vec`: Vector of executable objects
88/// * `task_freq`: Optional frequency of task. Required for periodic and fixed cycle tasks
89/// * `op_code`: Operation code which is passed to the executable task [operation call][Executable::periodic_op]
90/// * `termination`: Optional termination handler which can cancel threads with a broadcast
91pub fn exec_sched_multi<T: Executable<Error = E> + Send + 'static + ?Sized, E: Send + 'static>(
92    task_name: &'static str,
93    mut executable_vec: Vec<Box<T>>,
94    task_freq: Option<Duration>,
95    op_code: i32,
96    mut termination: Option<BusReader<()>>,
97) -> Result<JoinHandle<Result<OpResult, E>>, io::Error> {
98    let mut cycle_counts = vec![0; executable_vec.len()];
99    let mut removal_flags = vec![false; executable_vec.len()];
100
101    thread::Builder::new()
102        .name(String::from(task_name))
103        .spawn(move || loop {
104            if let Some(ref mut terminator) = termination {
105                match terminator.try_recv() {
106                    Ok(_) | Err(TryRecvError::Disconnected) => {
107                        removal_flags.iter_mut().for_each(|x| *x = true);
108                    }
109                    Err(TryRecvError::Empty) => (),
110                }
111            }
112            for (idx, executable) in executable_vec.iter_mut().enumerate() {
113                match executable.exec_type() {
114                    ExecutionType::OneShot => {
115                        executable.periodic_op(op_code)?;
116                        removal_flags[idx] = true;
117                    }
118                    ExecutionType::Infinite => {
119                        executable.periodic_op(op_code)?;
120                    }
121                    ExecutionType::Cycles(cycles) => {
122                        executable.periodic_op(op_code)?;
123                        cycle_counts[idx] += 1;
124                        if cycle_counts[idx] == cycles {
125                            removal_flags[idx] = true;
126                        }
127                    }
128                }
129            }
130            let mut removal_iter = removal_flags.iter();
131            executable_vec.retain(|_| !*removal_iter.next().unwrap());
132            removal_iter = removal_flags.iter();
133            cycle_counts.retain(|_| !*removal_iter.next().unwrap());
134            removal_flags.retain(|&i| !i);
135            if executable_vec.is_empty() {
136                return Ok(OpResult::Ok);
137            }
138            let freq = task_freq.unwrap_or_else(|| panic!("No task frequency specified"));
139            thread::sleep(freq);
140        })
141}
142
143#[cfg(test)]
144mod tests {
145    use super::{exec_sched_multi, exec_sched_single, Executable, ExecutionType, OpResult};
146    use bus::Bus;
147    use std::boxed::Box;
148    use std::error::Error;
149    use std::string::{String, ToString};
150    use std::sync::{Arc, Mutex};
151    use std::time::Duration;
152    use std::vec::Vec;
153    use std::{fmt, thread, vec};
154
155    struct TestInfo {
156        exec_num: u32,
157        op_code: i32,
158    }
159    struct OneShotTask {
160        exec_num: Arc<Mutex<TestInfo>>,
161    }
162    struct FixedCyclesTask {
163        cycles: u32,
164        exec_num: Arc<Mutex<TestInfo>>,
165    }
166    struct PeriodicTask {
167        exec_num: Arc<Mutex<TestInfo>>,
168    }
169
170    #[derive(Clone, Debug)]
171    struct ExampleError {
172        kind: ErrorKind,
173    }
174
175    /// The kind of an error that can occur.
176    #[derive(Clone, Debug)]
177    pub enum ErrorKind {
178        Generic(String, i32),
179    }
180
181    impl ExampleError {
182        fn new(msg: &str, code: i32) -> ExampleError {
183            ExampleError {
184                kind: ErrorKind::Generic(msg.to_string(), code),
185            }
186        }
187
188        /// Return the kind of this error.
189        pub fn kind(&self) -> &ErrorKind {
190            &self.kind
191        }
192    }
193
194    impl fmt::Display for ExampleError {
195        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
196            match self.kind() {
197                ErrorKind::Generic(str, code) => {
198                    write!(f, "{str} with code {code}")
199                }
200            }
201        }
202    }
203
204    impl Error for ExampleError {}
205
206    const ONE_SHOT_TASK_NAME: &str = "One Shot Task";
207
208    impl Executable for OneShotTask {
209        type Error = ExampleError;
210
211        fn exec_type(&self) -> ExecutionType {
212            ExecutionType::OneShot
213        }
214
215        fn task_name(&self) -> &'static str {
216            ONE_SHOT_TASK_NAME
217        }
218
219        fn periodic_op(&mut self, op_code: i32) -> Result<OpResult, ExampleError> {
220            let mut data = self.exec_num.lock().expect("Locking Mutex failed");
221            data.exec_num += 1;
222            data.op_code = op_code;
223            std::mem::drop(data);
224            if op_code >= 0 {
225                Ok(OpResult::Ok)
226            } else {
227                Err(ExampleError::new("One Shot Task Failure", op_code))
228            }
229        }
230    }
231
232    const CYCLE_TASK_NAME: &str = "Fixed Cycles Task";
233
234    impl Executable for FixedCyclesTask {
235        type Error = ExampleError;
236
237        fn exec_type(&self) -> ExecutionType {
238            ExecutionType::Cycles(self.cycles)
239        }
240
241        fn task_name(&self) -> &'static str {
242            CYCLE_TASK_NAME
243        }
244
245        fn periodic_op(&mut self, op_code: i32) -> Result<OpResult, ExampleError> {
246            let mut data = self.exec_num.lock().expect("Locking Mutex failed");
247            data.exec_num += 1;
248            data.op_code = op_code;
249            std::mem::drop(data);
250            if op_code >= 0 {
251                Ok(OpResult::Ok)
252            } else {
253                Err(ExampleError::new("Fixed Cycle Task Failure", op_code))
254            }
255        }
256    }
257
258    const PERIODIC_TASK_NAME: &str = "Periodic Task";
259
260    impl Executable for PeriodicTask {
261        type Error = ExampleError;
262
263        fn exec_type(&self) -> ExecutionType {
264            ExecutionType::Infinite
265        }
266
267        fn task_name(&self) -> &'static str {
268            PERIODIC_TASK_NAME
269        }
270
271        fn periodic_op(&mut self, op_code: i32) -> Result<OpResult, ExampleError> {
272            let mut data = self.exec_num.lock().expect("Locking Mutex failed");
273            data.exec_num += 1;
274            data.op_code = op_code;
275            std::mem::drop(data);
276            if op_code >= 0 {
277                Ok(OpResult::Ok)
278            } else {
279                Err(ExampleError::new("Example Task Failure", op_code))
280            }
281        }
282    }
283
284    #[test]
285    fn test_simple_one_shot() {
286        let expected_op_code = 42;
287        let shared = Arc::new(Mutex::new(TestInfo {
288            exec_num: 0,
289            op_code: 0,
290        }));
291        let exec_task = OneShotTask {
292            exec_num: shared.clone(),
293        };
294        let task = Box::new(exec_task);
295        let jhandle = exec_sched_single(
296            task,
297            Some(Duration::from_millis(100)),
298            expected_op_code,
299            None,
300        )
301        .expect("thread creation failed");
302        let thread_res = jhandle.join().expect("One Shot Task failed");
303        assert!(thread_res.is_ok());
304        assert_eq!(thread_res.unwrap(), OpResult::Ok);
305        let data = shared.lock().expect("Locking Mutex failed");
306        assert_eq!(data.exec_num, 1);
307        assert_eq!(data.op_code, expected_op_code);
308    }
309
310    #[test]
311    fn test_failed_one_shot() {
312        let op_code_inducing_failure = -1;
313        let shared = Arc::new(Mutex::new(TestInfo {
314            exec_num: 0,
315            op_code: 0,
316        }));
317        let exec_task = OneShotTask {
318            exec_num: shared.clone(),
319        };
320        let task = Box::new(exec_task);
321        let jhandle = exec_sched_single(
322            task,
323            Some(Duration::from_millis(100)),
324            op_code_inducing_failure,
325            None,
326        )
327        .expect("thread creation failed");
328        let thread_res = jhandle.join().expect("One Shot Task failed");
329        assert!(thread_res.is_err());
330        let error = thread_res.unwrap_err();
331        let err = error.kind();
332        assert!(matches!(err, &ErrorKind::Generic { .. }));
333        match err {
334            ErrorKind::Generic(str, op_code) => {
335                assert_eq!(str, &String::from("One Shot Task Failure"));
336                assert_eq!(op_code, &op_code_inducing_failure);
337            }
338        }
339        let error_display = error.to_string();
340        assert_eq!(error_display, "One Shot Task Failure with code -1");
341        let data = shared.lock().expect("Locking Mutex failed");
342        assert_eq!(data.exec_num, 1);
343        assert_eq!(data.op_code, op_code_inducing_failure);
344    }
345
346    #[test]
347    fn test_simple_multi_one_shot() {
348        let expected_op_code = 43;
349        let shared = Arc::new(Mutex::new(TestInfo {
350            exec_num: 0,
351            op_code: 0,
352        }));
353        let exec_task_0 = OneShotTask {
354            exec_num: shared.clone(),
355        };
356        let exec_task_1 = OneShotTask {
357            exec_num: shared.clone(),
358        };
359        let task_vec = vec![Box::new(exec_task_0), Box::new(exec_task_1)];
360        for task in task_vec.iter() {
361            assert_eq!(task.task_name(), ONE_SHOT_TASK_NAME);
362        }
363        let jhandle = exec_sched_multi(
364            "multi-task-name",
365            task_vec,
366            Some(Duration::from_millis(100)),
367            expected_op_code,
368            None,
369        )
370        .expect("thread creation failed");
371        let thread_res = jhandle.join().expect("One Shot Task failed");
372        assert!(thread_res.is_ok());
373        assert_eq!(thread_res.unwrap(), OpResult::Ok);
374        let data = shared.lock().expect("Locking Mutex failed");
375        assert_eq!(data.exec_num, 2);
376        assert_eq!(data.op_code, expected_op_code);
377    }
378
379    #[test]
380    fn test_cycles_single() {
381        let expected_op_code = 44;
382        let shared = Arc::new(Mutex::new(TestInfo {
383            exec_num: 0,
384            op_code: 0,
385        }));
386        let cycled_task = Box::new(FixedCyclesTask {
387            exec_num: shared.clone(),
388            cycles: 1,
389        });
390        assert_eq!(cycled_task.task_name(), CYCLE_TASK_NAME);
391        let jh = exec_sched_single(
392            cycled_task,
393            Some(Duration::from_millis(100)),
394            expected_op_code,
395            None,
396        )
397        .expect("thread creation failed");
398        let thread_res = jh.join().expect("Cycles Task failed");
399        assert!(thread_res.is_ok());
400        let data = shared.lock().expect("Locking Mutex failed");
401        assert_eq!(thread_res.unwrap(), OpResult::Ok);
402        assert_eq!(data.exec_num, 1);
403        assert_eq!(data.op_code, expected_op_code);
404    }
405
406    #[test]
407    fn test_single_and_cycles() {
408        let expected_op_code = 50;
409        let shared = Arc::new(Mutex::new(TestInfo {
410            exec_num: 0,
411            op_code: 0,
412        }));
413        let one_shot_task = Box::new(OneShotTask {
414            exec_num: shared.clone(),
415        });
416        let cycled_task_0 = Box::new(FixedCyclesTask {
417            exec_num: shared.clone(),
418            cycles: 1,
419        });
420        let cycled_task_1 = Box::new(FixedCyclesTask {
421            exec_num: shared.clone(),
422            cycles: 1,
423        });
424        assert_eq!(cycled_task_0.task_name(), CYCLE_TASK_NAME);
425        assert_eq!(one_shot_task.task_name(), ONE_SHOT_TASK_NAME);
426        let task_vec: Vec<Box<dyn Executable<Error = ExampleError>>> =
427            vec![one_shot_task, cycled_task_0, cycled_task_1];
428        let jh = exec_sched_multi(
429            "multi-task-name",
430            task_vec,
431            Some(Duration::from_millis(100)),
432            expected_op_code,
433            None,
434        )
435        .expect("thread creation failed");
436        let thread_res = jh.join().expect("Cycles Task failed");
437        assert!(thread_res.is_ok());
438        let data = shared.lock().expect("Locking Mutex failed");
439        assert_eq!(thread_res.unwrap(), OpResult::Ok);
440        assert_eq!(data.exec_num, 3);
441        assert_eq!(data.op_code, expected_op_code);
442    }
443
444    #[test]
445    #[ignore]
446    fn test_periodic_single() {
447        let mut terminator = Bus::new(5);
448        let expected_op_code = 45;
449        let shared = Arc::new(Mutex::new(TestInfo {
450            exec_num: 0,
451            op_code: 0,
452        }));
453        let periodic_task = Box::new(PeriodicTask {
454            exec_num: shared.clone(),
455        });
456        assert_eq!(periodic_task.task_name(), PERIODIC_TASK_NAME);
457        let jh = exec_sched_single(
458            periodic_task,
459            Some(Duration::from_millis(20)),
460            expected_op_code,
461            Some(terminator.add_rx()),
462        )
463        .expect("thread creation failed");
464        thread::sleep(Duration::from_millis(40));
465        terminator.broadcast(());
466        let thread_res = jh.join().expect("Periodic Task failed");
467        assert!(thread_res.is_ok());
468        let data = shared.lock().expect("Locking Mutex failed");
469        assert_eq!(thread_res.unwrap(), OpResult::Ok);
470        let range = 2..4;
471        assert!(range.contains(&data.exec_num));
472        assert_eq!(data.op_code, expected_op_code);
473    }
474
475    #[test]
476    #[ignore]
477    fn test_periodic_multi() {
478        let mut terminator = Bus::new(5);
479        let expected_op_code = 46;
480        let shared = Arc::new(Mutex::new(TestInfo {
481            exec_num: 0,
482            op_code: 0,
483        }));
484        let cycled_task = Box::new(FixedCyclesTask {
485            exec_num: shared.clone(),
486            cycles: 1,
487        });
488        let periodic_task_0 = Box::new(PeriodicTask {
489            exec_num: shared.clone(),
490        });
491        let periodic_task_1 = Box::new(PeriodicTask {
492            exec_num: shared.clone(),
493        });
494        assert_eq!(periodic_task_0.task_name(), PERIODIC_TASK_NAME);
495        assert_eq!(periodic_task_1.task_name(), PERIODIC_TASK_NAME);
496        let task_vec: Vec<Box<dyn Executable<Error = ExampleError>>> =
497            vec![cycled_task, periodic_task_0, periodic_task_1];
498        let jh = exec_sched_multi(
499            "multi-task-name",
500            task_vec,
501            Some(Duration::from_millis(20)),
502            expected_op_code,
503            Some(terminator.add_rx()),
504        )
505        .expect("thread creation failed");
506        thread::sleep(Duration::from_millis(60));
507        terminator.broadcast(());
508        let thread_res = jh.join().expect("Periodic Task failed");
509        assert!(thread_res.is_ok());
510        let data = shared.lock().expect("Locking Mutex failed");
511        assert_eq!(thread_res.unwrap(), OpResult::Ok);
512        let range = 7..11;
513        assert!(range.contains(&data.exec_num));
514        assert_eq!(data.op_code, expected_op_code);
515    }
516}