use std::sync::Arc;
use bytes::Bytes;
use tokio::sync::broadcast;
use crate::command::Command;
use crate::config::SerialConfig;
use crate::error::Error;
pub const DEFAULT_BUS_CAPACITY: usize = 1024;
#[derive(Clone, Debug)]
#[non_exhaustive]
pub enum Event {
RxBytes(Bytes),
TxBytes(Bytes),
Command(Command),
DeviceConnected,
DeviceDisconnected {
reason: String,
},
ConfigChanged(SerialConfig),
SystemMessage(String),
Error(Arc<Error>),
}
#[derive(Clone, Debug)]
pub struct EventBus {
inner: broadcast::Sender<Event>,
}
impl EventBus {
#[must_use]
pub fn new(capacity: usize) -> Self {
let (tx, _) = broadcast::channel(capacity.max(1));
Self { inner: tx }
}
pub fn publish(&self, event: Event) -> usize {
self.inner.send(event).unwrap_or(0)
}
#[must_use]
pub fn subscribe(&self) -> broadcast::Receiver<Event> {
self.inner.subscribe()
}
#[must_use]
pub fn receiver_count(&self) -> usize {
self.inner.receiver_count()
}
}
impl Default for EventBus {
fn default() -> Self {
Self::new(DEFAULT_BUS_CAPACITY)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn publish_round_trips_to_subscribers() {
let bus = EventBus::new(8);
let mut rx = bus.subscribe();
let delivered = bus.publish(Event::DeviceConnected);
assert_eq!(delivered, 1);
assert!(matches!(rx.recv().await.unwrap(), Event::DeviceConnected));
}
#[tokio::test]
async fn publish_with_no_subscribers_returns_zero() {
let bus = EventBus::new(8);
assert_eq!(bus.publish(Event::DeviceConnected), 0);
}
#[tokio::test]
async fn system_message_round_trips() {
let bus = EventBus::new(8);
let mut rx = bus.subscribe();
bus.publish(Event::SystemMessage("hello".into()));
match rx.recv().await.unwrap() {
Event::SystemMessage(text) => assert_eq!(text, "hello"),
other => panic!("unexpected event: {other:?}"),
}
}
#[tokio::test]
async fn command_event_round_trips() {
use crate::Command;
let bus = EventBus::new(8);
let mut rx = bus.subscribe();
bus.publish(Event::Command(Command::Quit));
match rx.recv().await.unwrap() {
Event::Command(Command::Quit) => {}
other => panic!("unexpected event: {other:?}"),
}
}
#[tokio::test]
async fn each_subscriber_sees_each_event() {
let bus = EventBus::new(8);
let mut a = bus.subscribe();
let mut b = bus.subscribe();
bus.publish(Event::DeviceConnected);
assert!(matches!(a.recv().await.unwrap(), Event::DeviceConnected));
assert!(matches!(b.recv().await.unwrap(), Event::DeviceConnected));
}
#[test]
fn zero_capacity_is_promoted_to_one() {
let _bus = EventBus::new(0);
}
}