rtcom_core/event.rs
1//! Cross-task event bus for `rtcom-core`.
2//!
3//! Every meaningful thing that happens in a [`Session`](crate::Session) — a
4//! chunk of bytes from the wire, a chunk pending transmission, a config
5//! change, a fatal error — flows through the [`EventBus`] as an [`Event`].
6//! The bus is a thin wrapper over [`tokio::sync::broadcast`] so any number
7//! of subscribers (terminal renderer, log writer, scripting engine, ...)
8//! can tap in without coupling to each other.
9//!
10//! ## Subscription timing
11//!
12//! Broadcast channels do **not** replay history for late subscribers — only
13//! events sent *after* a subscription are observable. Subscribe via
14//! [`EventBus::subscribe`] before any code that may publish events of
15//! interest, typically before calling [`Session::run`](crate::Session::run).
16
17use std::sync::Arc;
18
19use bytes::Bytes;
20use tokio::sync::broadcast;
21
22use crate::command::Command;
23use crate::config::SerialConfig;
24use crate::error::Error;
25
26/// Default channel capacity. Large enough to absorb burst traffic from
27/// 3 Mbaud ports while keeping memory bounded; lagging subscribers see
28/// [`broadcast::error::RecvError::Lagged`] and can resync.
29pub const DEFAULT_BUS_CAPACITY: usize = 1024;
30
31/// One unit of work that flowed through (or originated inside) a session.
32///
33/// `#[non_exhaustive]` so future variants (`UserInput`, `Command`, ...)
34/// added in later issues do not break downstream code that matches.
35#[derive(Clone, Debug)]
36#[non_exhaustive]
37pub enum Event {
38 /// Bytes just read from the serial device.
39 RxBytes(Bytes),
40 /// Bytes pending transmission to the serial device. Publishing this
41 /// asks the writer task to send them.
42 TxBytes(Bytes),
43 /// A runtime command produced by the keyboard state machine
44 /// (Issue #6); subscribed by the command-handler dispatcher in
45 /// Issue #7.
46 Command(Command),
47 /// The session opened the device and is ready to do I/O.
48 DeviceConnected,
49 /// The session lost the device (EOF, write failure, hot-unplug).
50 DeviceDisconnected {
51 /// Human-readable reason intended for logs and the status bar.
52 reason: String,
53 },
54 /// The serial configuration changed at runtime (e.g. `^T b 9600`).
55 ConfigChanged(SerialConfig),
56 /// Human-readable status text emitted by the session itself
57 /// (Help banner, `ShowConfig`, line-toggle acknowledgements, ...).
58 /// The terminal renderer renders these with a `*** rtcom: ` prefix
59 /// to keep them distinct from serial data; log writers
60 /// (Issue #10) must drop them so they do not pollute capture
61 /// files.
62 SystemMessage(String),
63 /// A non-fatal error worth surfacing to subscribers. Wrapped in `Arc`
64 /// so the broadcast channel can clone it cheaply across receivers.
65 Error(Arc<Error>),
66}
67
68/// Multi-producer, multi-consumer event hub.
69///
70/// `EventBus` is `Clone` because it is meant to be handed to as many tasks
71/// as need to publish or subscribe; clones share the same underlying
72/// channel.
73#[derive(Clone, Debug)]
74pub struct EventBus {
75 inner: broadcast::Sender<Event>,
76}
77
78impl EventBus {
79 /// Creates a new bus with the given channel capacity.
80 ///
81 /// A capacity of zero is silently raised to one so the underlying
82 /// broadcast channel does not panic.
83 #[must_use]
84 pub fn new(capacity: usize) -> Self {
85 let (tx, _) = broadcast::channel(capacity.max(1));
86 Self { inner: tx }
87 }
88
89 /// Publishes an event to all current subscribers.
90 ///
91 /// Returns the number of subscribers that received the event, or 0 if
92 /// none were attached. Unlike [`broadcast::Sender::send`], a missing
93 /// subscriber is *not* treated as an error: events are best-effort and
94 /// callers should not block their own work because nobody is listening.
95 pub fn publish(&self, event: Event) -> usize {
96 self.inner.send(event).unwrap_or(0)
97 }
98
99 /// Returns a fresh subscription that yields every event published from
100 /// this point on.
101 #[must_use]
102 pub fn subscribe(&self) -> broadcast::Receiver<Event> {
103 self.inner.subscribe()
104 }
105
106 /// Returns the current number of active subscribers.
107 #[must_use]
108 pub fn receiver_count(&self) -> usize {
109 self.inner.receiver_count()
110 }
111}
112
113impl Default for EventBus {
114 fn default() -> Self {
115 Self::new(DEFAULT_BUS_CAPACITY)
116 }
117}
118
119#[cfg(test)]
120mod tests {
121 use super::*;
122
123 #[tokio::test]
124 async fn publish_round_trips_to_subscribers() {
125 let bus = EventBus::new(8);
126 let mut rx = bus.subscribe();
127 let delivered = bus.publish(Event::DeviceConnected);
128 assert_eq!(delivered, 1);
129 assert!(matches!(rx.recv().await.unwrap(), Event::DeviceConnected));
130 }
131
132 #[tokio::test]
133 async fn publish_with_no_subscribers_returns_zero() {
134 let bus = EventBus::new(8);
135 assert_eq!(bus.publish(Event::DeviceConnected), 0);
136 }
137
138 #[tokio::test]
139 async fn system_message_round_trips() {
140 let bus = EventBus::new(8);
141 let mut rx = bus.subscribe();
142 bus.publish(Event::SystemMessage("hello".into()));
143 match rx.recv().await.unwrap() {
144 Event::SystemMessage(text) => assert_eq!(text, "hello"),
145 other => panic!("unexpected event: {other:?}"),
146 }
147 }
148
149 #[tokio::test]
150 async fn command_event_round_trips() {
151 use crate::Command;
152 let bus = EventBus::new(8);
153 let mut rx = bus.subscribe();
154 bus.publish(Event::Command(Command::Quit));
155 match rx.recv().await.unwrap() {
156 Event::Command(Command::Quit) => {}
157 other => panic!("unexpected event: {other:?}"),
158 }
159 }
160
161 #[tokio::test]
162 async fn each_subscriber_sees_each_event() {
163 let bus = EventBus::new(8);
164 let mut a = bus.subscribe();
165 let mut b = bus.subscribe();
166 bus.publish(Event::DeviceConnected);
167 assert!(matches!(a.recv().await.unwrap(), Event::DeviceConnected));
168 assert!(matches!(b.recv().await.unwrap(), Event::DeviceConnected));
169 }
170
171 #[test]
172 fn zero_capacity_is_promoted_to_one() {
173 // Mostly a smoke check: broadcast::channel(0) panics; we must not.
174 let _bus = EventBus::new(0);
175 }
176}