rtcom_core/event.rs
1//! Cross-task event bus for `rtcom-core`.
2//!
3//! Every meaningful thing that happens in a [`Session`](crate::Session) — a
4//! chunk of bytes from the wire, a chunk pending transmission, a config
5//! change, a fatal error — flows through the [`EventBus`] as an [`Event`].
6//! The bus is a thin wrapper over [`tokio::sync::broadcast`] so any number
7//! of subscribers (terminal renderer, log writer, scripting engine, ...)
8//! can tap in without coupling to each other.
9//!
10//! ## Subscription timing
11//!
12//! Broadcast channels do **not** replay history for late subscribers — only
13//! events sent *after* a subscription are observable. Subscribe via
14//! [`EventBus::subscribe`] before any code that may publish events of
15//! interest, typically before calling [`Session::run`](crate::Session::run).
16
17use std::path::PathBuf;
18use std::sync::Arc;
19
20use bytes::Bytes;
21use tokio::sync::broadcast;
22
23use crate::command::Command;
24use crate::config::SerialConfig;
25use crate::error::Error;
26
27/// Default channel capacity. Large enough to absorb burst traffic from
28/// 3 Mbaud ports while keeping memory bounded; lagging subscribers see
29/// [`broadcast::error::RecvError::Lagged`] and can resync.
30pub const DEFAULT_BUS_CAPACITY: usize = 1024;
31
32/// One unit of work that flowed through (or originated inside) a session.
33///
34/// `#[non_exhaustive]` so future variants (`UserInput`, `Command`, ...)
35/// added in later issues do not break downstream code that matches.
36#[derive(Clone, Debug)]
37#[non_exhaustive]
38pub enum Event {
39 /// Bytes just read from the serial device.
40 RxBytes(Bytes),
41 /// Bytes pending transmission to the serial device. Publishing this
42 /// asks the writer task to send them.
43 TxBytes(Bytes),
44 /// A runtime command produced by the keyboard state machine
45 /// (Issue #6); subscribed by the command-handler dispatcher in
46 /// Issue #7.
47 Command(Command),
48 /// The session opened the device and is ready to do I/O.
49 DeviceConnected,
50 /// The session lost the device (EOF, write failure, hot-unplug).
51 DeviceDisconnected {
52 /// Human-readable reason intended for logs and the status bar.
53 reason: String,
54 },
55 /// The serial configuration changed at runtime (e.g. `^T b 9600`).
56 ConfigChanged(SerialConfig),
57 /// Human-readable status text emitted by the session itself
58 /// (Help banner, `ShowConfig`, line-toggle acknowledgements, ...).
59 /// The terminal renderer renders these with a `*** rtcom: ` prefix
60 /// to keep them distinct from serial data; log writers
61 /// (Issue #10) must drop them so they do not pollute capture
62 /// files.
63 SystemMessage(String),
64 /// A non-fatal error worth surfacing to subscribers. Wrapped in `Arc`
65 /// so the broadcast channel can clone it cheaply across receivers.
66 Error(Arc<Error>),
67 /// The TUI menu opened. Informational signal so log writers / scripts
68 /// can react (e.g., pause disk flushing while the UI is interactive).
69 MenuOpened,
70 /// The TUI menu closed.
71 MenuClosed,
72 /// A profile was successfully written to disk.
73 ProfileSaved {
74 /// Destination path on disk.
75 path: PathBuf,
76 },
77 /// A profile read or write failed. The session continues with the
78 /// last-known-good configuration; subscribers surface this to the user
79 /// (e.g. as a toast) but must not treat it as fatal.
80 ProfileLoadFailed {
81 /// Path that failed to load or save.
82 path: PathBuf,
83 /// Source error, shareable across the broadcast fan-out.
84 error: Arc<Error>,
85 },
86 /// DTR / RTS output-line state changed. Published by the session
87 /// after a successful
88 /// [`ToggleDtr`](crate::command::Command::ToggleDtr) /
89 /// [`ToggleRts`](crate::command::Command::ToggleRts) /
90 /// [`SetDtrAbs`](crate::command::Command::SetDtrAbs) /
91 /// [`SetRtsAbs`](crate::command::Command::SetRtsAbs) dispatch so
92 /// subscribers (notably the TUI) can refresh their cached
93 /// [`ModemLineSnapshot`](crate::config::ModemLineSnapshot) without
94 /// re-reading the device.
95 ModemLinesChanged {
96 /// Current DTR state after the change.
97 dtr: bool,
98 /// Current RTS state after the change.
99 rts: bool,
100 },
101}
102
103/// Multi-producer, multi-consumer event hub.
104///
105/// `EventBus` is `Clone` because it is meant to be handed to as many tasks
106/// as need to publish or subscribe; clones share the same underlying
107/// channel.
108#[derive(Clone, Debug)]
109pub struct EventBus {
110 inner: broadcast::Sender<Event>,
111}
112
113impl EventBus {
114 /// Creates a new bus with the given channel capacity.
115 ///
116 /// A capacity of zero is silently raised to one so the underlying
117 /// broadcast channel does not panic.
118 #[must_use]
119 pub fn new(capacity: usize) -> Self {
120 let (tx, _) = broadcast::channel(capacity.max(1));
121 Self { inner: tx }
122 }
123
124 /// Publishes an event to all current subscribers.
125 ///
126 /// Returns the number of subscribers that received the event, or 0 if
127 /// none were attached. Unlike [`broadcast::Sender::send`], a missing
128 /// subscriber is *not* treated as an error: events are best-effort and
129 /// callers should not block their own work because nobody is listening.
130 pub fn publish(&self, event: Event) -> usize {
131 self.inner.send(event).unwrap_or(0)
132 }
133
134 /// Returns a fresh subscription that yields every event published from
135 /// this point on.
136 #[must_use]
137 pub fn subscribe(&self) -> broadcast::Receiver<Event> {
138 self.inner.subscribe()
139 }
140
141 /// Returns the current number of active subscribers.
142 #[must_use]
143 pub fn receiver_count(&self) -> usize {
144 self.inner.receiver_count()
145 }
146}
147
148impl Default for EventBus {
149 fn default() -> Self {
150 Self::new(DEFAULT_BUS_CAPACITY)
151 }
152}
153
154#[cfg(test)]
155mod tests {
156 use super::*;
157
158 #[tokio::test]
159 async fn publish_round_trips_to_subscribers() {
160 let bus = EventBus::new(8);
161 let mut rx = bus.subscribe();
162 let delivered = bus.publish(Event::DeviceConnected);
163 assert_eq!(delivered, 1);
164 assert!(matches!(rx.recv().await.unwrap(), Event::DeviceConnected));
165 }
166
167 #[tokio::test]
168 async fn publish_with_no_subscribers_returns_zero() {
169 let bus = EventBus::new(8);
170 assert_eq!(bus.publish(Event::DeviceConnected), 0);
171 }
172
173 #[tokio::test]
174 async fn system_message_round_trips() {
175 let bus = EventBus::new(8);
176 let mut rx = bus.subscribe();
177 bus.publish(Event::SystemMessage("hello".into()));
178 match rx.recv().await.unwrap() {
179 Event::SystemMessage(text) => assert_eq!(text, "hello"),
180 other => panic!("unexpected event: {other:?}"),
181 }
182 }
183
184 #[tokio::test]
185 async fn command_event_round_trips() {
186 use crate::Command;
187 let bus = EventBus::new(8);
188 let mut rx = bus.subscribe();
189 bus.publish(Event::Command(Command::Quit));
190 match rx.recv().await.unwrap() {
191 Event::Command(Command::Quit) => {}
192 other => panic!("unexpected event: {other:?}"),
193 }
194 }
195
196 #[tokio::test]
197 async fn each_subscriber_sees_each_event() {
198 let bus = EventBus::new(8);
199 let mut a = bus.subscribe();
200 let mut b = bus.subscribe();
201 bus.publish(Event::DeviceConnected);
202 assert!(matches!(a.recv().await.unwrap(), Event::DeviceConnected));
203 assert!(matches!(b.recv().await.unwrap(), Event::DeviceConnected));
204 }
205
206 #[test]
207 fn zero_capacity_is_promoted_to_one() {
208 // Mostly a smoke check: broadcast::channel(0) panics; we must not.
209 let _bus = EventBus::new(0);
210 }
211
212 #[test]
213 fn event_menu_opened_closed_are_clone() {
214 const fn assert_clone<T: Clone>() {}
215 assert_clone::<Event>();
216 assert!(matches!(Event::MenuOpened, Event::MenuOpened));
217 assert!(matches!(Event::MenuClosed, Event::MenuClosed));
218 }
219
220 #[test]
221 fn event_profile_saved_has_path() {
222 let ev = Event::ProfileSaved {
223 path: std::path::PathBuf::from("/tmp/x.toml"),
224 };
225 match ev {
226 Event::ProfileSaved { path } => {
227 assert_eq!(path, std::path::PathBuf::from("/tmp/x.toml"));
228 }
229 _ => panic!("wrong variant"),
230 }
231 }
232
233 #[test]
234 fn event_modem_lines_changed_carries_both_booleans() {
235 let ev = Event::ModemLinesChanged {
236 dtr: true,
237 rts: false,
238 };
239 match ev {
240 Event::ModemLinesChanged { dtr, rts } => {
241 assert!(dtr);
242 assert!(!rts);
243 }
244 _ => panic!("wrong variant"),
245 }
246 }
247
248 #[test]
249 fn event_profile_load_failed_has_path_and_error() {
250 use std::sync::Arc;
251 let err = crate::error::Error::InvalidConfig("boom".into());
252 let ev = Event::ProfileLoadFailed {
253 path: std::path::PathBuf::from("/tmp/bad.toml"),
254 error: Arc::new(err),
255 };
256 match ev {
257 Event::ProfileLoadFailed { path, error } => {
258 assert_eq!(path, std::path::PathBuf::from("/tmp/bad.toml"));
259 assert!(error.to_string().contains("boom"));
260 }
261 _ => panic!("wrong variant"),
262 }
263 }
264}