Skip to main content

rtcom_core/
event.rs

1//! Cross-task event bus for `rtcom-core`.
2//!
3//! Every meaningful thing that happens in a [`Session`](crate::Session) — a
4//! chunk of bytes from the wire, a chunk pending transmission, a config
5//! change, a fatal error — flows through the [`EventBus`] as an [`Event`].
6//! The bus is a thin wrapper over [`tokio::sync::broadcast`] so any number
7//! of subscribers (terminal renderer, log writer, scripting engine, ...)
8//! can tap in without coupling to each other.
9//!
10//! ## Subscription timing
11//!
12//! Broadcast channels do **not** replay history for late subscribers — only
13//! events sent *after* a subscription are observable. Subscribe via
14//! [`EventBus::subscribe`] before any code that may publish events of
15//! interest, typically before calling [`Session::run`](crate::Session::run).
16
17use std::sync::Arc;
18
19use bytes::Bytes;
20use tokio::sync::broadcast;
21
22use crate::command::Command;
23use crate::config::SerialConfig;
24use crate::error::Error;
25
26/// Default channel capacity. Large enough to absorb burst traffic from
27/// 3 Mbaud ports while keeping memory bounded; lagging subscribers see
28/// [`broadcast::error::RecvError::Lagged`] and can resync.
29pub const DEFAULT_BUS_CAPACITY: usize = 1024;
30
31/// One unit of work that flowed through (or originated inside) a session.
32///
33/// `#[non_exhaustive]` so future variants (`UserInput`, `Command`, ...)
34/// added in later issues do not break downstream code that matches.
35#[derive(Clone, Debug)]
36#[non_exhaustive]
37pub enum Event {
38    /// Bytes just read from the serial device.
39    RxBytes(Bytes),
40    /// Bytes pending transmission to the serial device. Publishing this
41    /// asks the writer task to send them.
42    TxBytes(Bytes),
43    /// A runtime command produced by the keyboard state machine
44    /// (Issue #6); subscribed by the command-handler dispatcher in
45    /// Issue #7.
46    Command(Command),
47    /// The session opened the device and is ready to do I/O.
48    DeviceConnected,
49    /// The session lost the device (EOF, write failure, hot-unplug).
50    DeviceDisconnected {
51        /// Human-readable reason intended for logs and the status bar.
52        reason: String,
53    },
54    /// The serial configuration changed at runtime (e.g. `^T b 9600`).
55    ConfigChanged(SerialConfig),
56    /// Human-readable status text emitted by the session itself
57    /// (Help banner, `ShowConfig`, line-toggle acknowledgements, ...).
58    /// The terminal renderer renders these with a `*** rtcom: ` prefix
59    /// to keep them distinct from serial data; log writers
60    /// (Issue #10) must drop them so they do not pollute capture
61    /// files.
62    SystemMessage(String),
63    /// A non-fatal error worth surfacing to subscribers. Wrapped in `Arc`
64    /// so the broadcast channel can clone it cheaply across receivers.
65    Error(Arc<Error>),
66}
67
68/// Multi-producer, multi-consumer event hub.
69///
70/// `EventBus` is `Clone` because it is meant to be handed to as many tasks
71/// as need to publish or subscribe; clones share the same underlying
72/// channel.
73#[derive(Clone, Debug)]
74pub struct EventBus {
75    inner: broadcast::Sender<Event>,
76}
77
78impl EventBus {
79    /// Creates a new bus with the given channel capacity.
80    ///
81    /// A capacity of zero is silently raised to one so the underlying
82    /// broadcast channel does not panic.
83    #[must_use]
84    pub fn new(capacity: usize) -> Self {
85        let (tx, _) = broadcast::channel(capacity.max(1));
86        Self { inner: tx }
87    }
88
89    /// Publishes an event to all current subscribers.
90    ///
91    /// Returns the number of subscribers that received the event, or 0 if
92    /// none were attached. Unlike [`broadcast::Sender::send`], a missing
93    /// subscriber is *not* treated as an error: events are best-effort and
94    /// callers should not block their own work because nobody is listening.
95    pub fn publish(&self, event: Event) -> usize {
96        self.inner.send(event).unwrap_or(0)
97    }
98
99    /// Returns a fresh subscription that yields every event published from
100    /// this point on.
101    #[must_use]
102    pub fn subscribe(&self) -> broadcast::Receiver<Event> {
103        self.inner.subscribe()
104    }
105
106    /// Returns the current number of active subscribers.
107    #[must_use]
108    pub fn receiver_count(&self) -> usize {
109        self.inner.receiver_count()
110    }
111}
112
113impl Default for EventBus {
114    fn default() -> Self {
115        Self::new(DEFAULT_BUS_CAPACITY)
116    }
117}
118
119#[cfg(test)]
120mod tests {
121    use super::*;
122
123    #[tokio::test]
124    async fn publish_round_trips_to_subscribers() {
125        let bus = EventBus::new(8);
126        let mut rx = bus.subscribe();
127        let delivered = bus.publish(Event::DeviceConnected);
128        assert_eq!(delivered, 1);
129        assert!(matches!(rx.recv().await.unwrap(), Event::DeviceConnected));
130    }
131
132    #[tokio::test]
133    async fn publish_with_no_subscribers_returns_zero() {
134        let bus = EventBus::new(8);
135        assert_eq!(bus.publish(Event::DeviceConnected), 0);
136    }
137
138    #[tokio::test]
139    async fn system_message_round_trips() {
140        let bus = EventBus::new(8);
141        let mut rx = bus.subscribe();
142        bus.publish(Event::SystemMessage("hello".into()));
143        match rx.recv().await.unwrap() {
144            Event::SystemMessage(text) => assert_eq!(text, "hello"),
145            other => panic!("unexpected event: {other:?}"),
146        }
147    }
148
149    #[tokio::test]
150    async fn command_event_round_trips() {
151        use crate::Command;
152        let bus = EventBus::new(8);
153        let mut rx = bus.subscribe();
154        bus.publish(Event::Command(Command::Quit));
155        match rx.recv().await.unwrap() {
156            Event::Command(Command::Quit) => {}
157            other => panic!("unexpected event: {other:?}"),
158        }
159    }
160
161    #[tokio::test]
162    async fn each_subscriber_sees_each_event() {
163        let bus = EventBus::new(8);
164        let mut a = bus.subscribe();
165        let mut b = bus.subscribe();
166        bus.publish(Event::DeviceConnected);
167        assert!(matches!(a.recv().await.unwrap(), Event::DeviceConnected));
168        assert!(matches!(b.recv().await.unwrap(), Event::DeviceConnected));
169    }
170
171    #[test]
172    fn zero_capacity_is_promoted_to_one() {
173        // Mostly a smoke check: broadcast::channel(0) panics; we must not.
174        let _bus = EventBus::new(0);
175    }
176}