Skip to main content

channel_runtime/
channel_runtime.rs

1//! Channel-based event source.
2//!
3//! Shows how to wire `Runner` to real threads sending events over
4//! `std::sync::mpsc`. A timer thread, a sensor thread, and a fault
5//! injector each push events into a shared channel. The main thread
6//! receives and dispatches them.
7
8#![allow(clippy::print_stdout, clippy::enum_glob_use)]
9
10use std::sync::mpsc;
11use std::thread;
12use std::time::Duration;
13
14use ready_active_safe::prelude::*;
15
16#[derive(Debug, Clone, PartialEq, Eq)]
17enum Mode {
18    Initializing,
19    Sampling,
20    Safe,
21}
22
23impl core::fmt::Display for Mode {
24    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
25        write!(f, "{self:?}")
26    }
27}
28
29#[derive(Debug)]
30enum Event {
31    Tick,
32    Measurement(u32),
33    Fault,
34}
35
36#[derive(Debug, PartialEq)]
37enum Command {
38    RecordSample(u32),
39    TriggerAlarm,
40    LogTick,
41}
42
43struct SensorSystem;
44
45impl Machine for SensorSystem {
46    type Mode = Mode;
47    type Event = Event;
48    type Command = Command;
49
50    fn initial_mode(&self) -> Mode {
51        Mode::Initializing
52    }
53
54    fn on_event(&self, mode: &Mode, event: &Event) -> Decision<Mode, Command> {
55        use Command::*;
56        use Event::*;
57        use Mode::*;
58
59        match (mode, event) {
60            (Initializing, Tick) => transition(Sampling),
61            (Sampling, Measurement(val)) => stay().emit(RecordSample(*val)),
62            (Sampling, Tick) => stay().emit(LogTick),
63            (Sampling, Fault) => transition(Safe).emit(TriggerAlarm),
64            _ => ignore(),
65        }
66    }
67}
68
69fn spawn_timer(tx: mpsc::Sender<Event>) {
70    thread::spawn(move || {
71        for _ in 0..6 {
72            thread::sleep(Duration::from_millis(50));
73            if tx.send(Event::Tick).is_err() {
74                return;
75            }
76        }
77    });
78}
79
80fn spawn_sensor(tx: mpsc::Sender<Event>) {
81    thread::spawn(move || {
82        for i in 0..4 {
83            thread::sleep(Duration::from_millis(80));
84            if tx.send(Event::Measurement(100 + i)).is_err() {
85                return;
86            }
87        }
88    });
89}
90
91fn spawn_fault_injector(tx: mpsc::Sender<Event>) {
92    thread::spawn(move || {
93        thread::sleep(Duration::from_millis(350));
94        let _ = tx.send(Event::Fault);
95    });
96}
97
98fn main() {
99    let system = SensorSystem;
100    let mut runner = Runner::new(&system);
101
102    let (tx, rx) = mpsc::channel();
103
104    spawn_timer(tx.clone());
105    spawn_sensor(tx.clone());
106    spawn_fault_injector(tx);
107
108    println!("Starting in {:?}", runner.mode());
109
110    loop {
111        match rx.recv_timeout(Duration::from_millis(500)) {
112            Ok(event) => {
113                let before = runner.mode().clone();
114                runner.feed_and_dispatch(&event, |cmd| {
115                    println!("  dispatch: {cmd:?}");
116                });
117                let after = runner.mode();
118                println!("{before:?} + {event:?} => {after:?}");
119
120                if *after == Mode::Safe {
121                    println!("Reached Safe mode, shutting down.");
122                    break;
123                }
124            }
125            Err(mpsc::RecvTimeoutError::Timeout) => {
126                println!("No events for 500ms, shutting down.");
127                break;
128            }
129            Err(mpsc::RecvTimeoutError::Disconnected) => {
130                println!("All senders dropped, shutting down.");
131                break;
132            }
133        }
134    }
135
136    println!("Final mode: {:?}", runner.mode());
137}