Skip to main content

rtcom_core/
event.rs

1//! Cross-task event bus for `rtcom-core`.
2//!
3//! Every meaningful thing that happens in a [`Session`](crate::Session) — a
4//! chunk of bytes from the wire, a chunk pending transmission, a config
5//! change, a fatal error — flows through the [`EventBus`] as an [`Event`].
6//! The bus is a thin wrapper over [`tokio::sync::broadcast`] so any number
7//! of subscribers (terminal renderer, log writer, scripting engine, ...)
8//! can tap in without coupling to each other.
9//!
10//! ## Subscription timing
11//!
12//! Broadcast channels do **not** replay history for late subscribers — only
13//! events sent *after* a subscription are observable. Subscribe via
14//! [`EventBus::subscribe`] before any code that may publish events of
15//! interest, typically before calling [`Session::run`](crate::Session::run).
16
17use std::path::PathBuf;
18use std::sync::Arc;
19
20use bytes::Bytes;
21use tokio::sync::broadcast;
22
23use crate::command::Command;
24use crate::config::SerialConfig;
25use crate::error::Error;
26
27/// Default channel capacity. Large enough to absorb burst traffic from
28/// 3 Mbaud ports while keeping memory bounded; lagging subscribers see
29/// [`broadcast::error::RecvError::Lagged`] and can resync.
30pub const DEFAULT_BUS_CAPACITY: usize = 1024;
31
32/// One unit of work that flowed through (or originated inside) a session.
33///
34/// `#[non_exhaustive]` so future variants (`UserInput`, `Command`, ...)
35/// added in later issues do not break downstream code that matches.
36#[derive(Clone, Debug)]
37#[non_exhaustive]
38pub enum Event {
39    /// Bytes just read from the serial device.
40    RxBytes(Bytes),
41    /// Bytes pending transmission to the serial device. Publishing this
42    /// asks the writer task to send them.
43    TxBytes(Bytes),
44    /// A runtime command produced by the keyboard state machine
45    /// (Issue #6); subscribed by the command-handler dispatcher in
46    /// Issue #7.
47    Command(Command),
48    /// The session opened the device and is ready to do I/O.
49    DeviceConnected,
50    /// The session lost the device (EOF, write failure, hot-unplug).
51    DeviceDisconnected {
52        /// Human-readable reason intended for logs and the status bar.
53        reason: String,
54    },
55    /// The serial configuration changed at runtime (e.g. `^T b 9600`).
56    ConfigChanged(SerialConfig),
57    /// Human-readable status text emitted by the session itself
58    /// (Help banner, `ShowConfig`, line-toggle acknowledgements, ...).
59    /// The terminal renderer renders these with a `*** rtcom: ` prefix
60    /// to keep them distinct from serial data; log writers
61    /// (Issue #10) must drop them so they do not pollute capture
62    /// files.
63    SystemMessage(String),
64    /// A non-fatal error worth surfacing to subscribers. Wrapped in `Arc`
65    /// so the broadcast channel can clone it cheaply across receivers.
66    Error(Arc<Error>),
67    /// The TUI menu opened. Informational signal so log writers / scripts
68    /// can react (e.g., pause disk flushing while the UI is interactive).
69    MenuOpened,
70    /// The TUI menu closed.
71    MenuClosed,
72    /// A profile was successfully written to disk.
73    ProfileSaved {
74        /// Destination path on disk.
75        path: PathBuf,
76    },
77    /// A profile read or write failed. The session continues with the
78    /// last-known-good configuration; subscribers surface this to the user
79    /// (e.g. as a toast) but must not treat it as fatal.
80    ProfileLoadFailed {
81        /// Path that failed to load or save.
82        path: PathBuf,
83        /// Source error, shareable across the broadcast fan-out.
84        error: Arc<Error>,
85    },
86    /// DTR / RTS output-line state changed. Published by the session
87    /// after a successful
88    /// [`ToggleDtr`](crate::command::Command::ToggleDtr) /
89    /// [`ToggleRts`](crate::command::Command::ToggleRts) /
90    /// [`SetDtrAbs`](crate::command::Command::SetDtrAbs) /
91    /// [`SetRtsAbs`](crate::command::Command::SetRtsAbs) dispatch so
92    /// subscribers (notably the TUI) can refresh their cached
93    /// [`ModemLineSnapshot`](crate::config::ModemLineSnapshot) without
94    /// re-reading the device.
95    ModemLinesChanged {
96        /// Current DTR state after the change.
97        dtr: bool,
98        /// Current RTS state after the change.
99        rts: bool,
100    },
101}
102
103/// Multi-producer, multi-consumer event hub.
104///
105/// `EventBus` is `Clone` because it is meant to be handed to as many tasks
106/// as need to publish or subscribe; clones share the same underlying
107/// channel.
108#[derive(Clone, Debug)]
109pub struct EventBus {
110    inner: broadcast::Sender<Event>,
111}
112
113impl EventBus {
114    /// Creates a new bus with the given channel capacity.
115    ///
116    /// A capacity of zero is silently raised to one so the underlying
117    /// broadcast channel does not panic.
118    #[must_use]
119    pub fn new(capacity: usize) -> Self {
120        let (tx, _) = broadcast::channel(capacity.max(1));
121        Self { inner: tx }
122    }
123
124    /// Publishes an event to all current subscribers.
125    ///
126    /// Returns the number of subscribers that received the event, or 0 if
127    /// none were attached. Unlike [`broadcast::Sender::send`], a missing
128    /// subscriber is *not* treated as an error: events are best-effort and
129    /// callers should not block their own work because nobody is listening.
130    pub fn publish(&self, event: Event) -> usize {
131        self.inner.send(event).unwrap_or(0)
132    }
133
134    /// Returns a fresh subscription that yields every event published from
135    /// this point on.
136    #[must_use]
137    pub fn subscribe(&self) -> broadcast::Receiver<Event> {
138        self.inner.subscribe()
139    }
140
141    /// Returns the current number of active subscribers.
142    #[must_use]
143    pub fn receiver_count(&self) -> usize {
144        self.inner.receiver_count()
145    }
146}
147
148impl Default for EventBus {
149    fn default() -> Self {
150        Self::new(DEFAULT_BUS_CAPACITY)
151    }
152}
153
154#[cfg(test)]
155mod tests {
156    use super::*;
157
158    #[tokio::test]
159    async fn publish_round_trips_to_subscribers() {
160        let bus = EventBus::new(8);
161        let mut rx = bus.subscribe();
162        let delivered = bus.publish(Event::DeviceConnected);
163        assert_eq!(delivered, 1);
164        assert!(matches!(rx.recv().await.unwrap(), Event::DeviceConnected));
165    }
166
167    #[tokio::test]
168    async fn publish_with_no_subscribers_returns_zero() {
169        let bus = EventBus::new(8);
170        assert_eq!(bus.publish(Event::DeviceConnected), 0);
171    }
172
173    #[tokio::test]
174    async fn system_message_round_trips() {
175        let bus = EventBus::new(8);
176        let mut rx = bus.subscribe();
177        bus.publish(Event::SystemMessage("hello".into()));
178        match rx.recv().await.unwrap() {
179            Event::SystemMessage(text) => assert_eq!(text, "hello"),
180            other => panic!("unexpected event: {other:?}"),
181        }
182    }
183
184    #[tokio::test]
185    async fn command_event_round_trips() {
186        use crate::Command;
187        let bus = EventBus::new(8);
188        let mut rx = bus.subscribe();
189        bus.publish(Event::Command(Command::Quit));
190        match rx.recv().await.unwrap() {
191            Event::Command(Command::Quit) => {}
192            other => panic!("unexpected event: {other:?}"),
193        }
194    }
195
196    #[tokio::test]
197    async fn each_subscriber_sees_each_event() {
198        let bus = EventBus::new(8);
199        let mut a = bus.subscribe();
200        let mut b = bus.subscribe();
201        bus.publish(Event::DeviceConnected);
202        assert!(matches!(a.recv().await.unwrap(), Event::DeviceConnected));
203        assert!(matches!(b.recv().await.unwrap(), Event::DeviceConnected));
204    }
205
206    #[test]
207    fn zero_capacity_is_promoted_to_one() {
208        // Mostly a smoke check: broadcast::channel(0) panics; we must not.
209        let _bus = EventBus::new(0);
210    }
211
212    #[test]
213    fn event_menu_opened_closed_are_clone() {
214        const fn assert_clone<T: Clone>() {}
215        assert_clone::<Event>();
216        assert!(matches!(Event::MenuOpened, Event::MenuOpened));
217        assert!(matches!(Event::MenuClosed, Event::MenuClosed));
218    }
219
220    #[test]
221    fn event_profile_saved_has_path() {
222        let ev = Event::ProfileSaved {
223            path: std::path::PathBuf::from("/tmp/x.toml"),
224        };
225        match ev {
226            Event::ProfileSaved { path } => {
227                assert_eq!(path, std::path::PathBuf::from("/tmp/x.toml"));
228            }
229            _ => panic!("wrong variant"),
230        }
231    }
232
233    #[test]
234    fn event_modem_lines_changed_carries_both_booleans() {
235        let ev = Event::ModemLinesChanged {
236            dtr: true,
237            rts: false,
238        };
239        match ev {
240            Event::ModemLinesChanged { dtr, rts } => {
241                assert!(dtr);
242                assert!(!rts);
243            }
244            _ => panic!("wrong variant"),
245        }
246    }
247
248    #[test]
249    fn event_profile_load_failed_has_path_and_error() {
250        use std::sync::Arc;
251        let err = crate::error::Error::InvalidConfig("boom".into());
252        let ev = Event::ProfileLoadFailed {
253            path: std::path::PathBuf::from("/tmp/bad.toml"),
254            error: Arc::new(err),
255        };
256        match ev {
257            Event::ProfileLoadFailed { path, error } => {
258                assert_eq!(path, std::path::PathBuf::from("/tmp/bad.toml"));
259                assert!(error.to_string().contains("boom"));
260            }
261            _ => panic!("wrong variant"),
262        }
263    }
264}