use std::path::PathBuf;
use std::sync::Arc;
use bytes::Bytes;
use tokio::sync::broadcast;
use crate::command::Command;
use crate::config::SerialConfig;
use crate::error::Error;
pub const DEFAULT_BUS_CAPACITY: usize = 1024;
#[derive(Clone, Debug)]
#[non_exhaustive]
pub enum Event {
RxBytes(Bytes),
TxBytes(Bytes),
Command(Command),
DeviceConnected,
DeviceDisconnected {
reason: String,
},
ConfigChanged(SerialConfig),
SystemMessage(String),
Error(Arc<Error>),
MenuOpened,
MenuClosed,
ProfileSaved {
path: PathBuf,
},
ProfileLoadFailed {
path: PathBuf,
error: Arc<Error>,
},
ModemLinesChanged {
dtr: bool,
rts: bool,
},
}
#[derive(Clone, Debug)]
pub struct EventBus {
inner: broadcast::Sender<Event>,
}
impl EventBus {
#[must_use]
pub fn new(capacity: usize) -> Self {
let (tx, _) = broadcast::channel(capacity.max(1));
Self { inner: tx }
}
pub fn publish(&self, event: Event) -> usize {
self.inner.send(event).unwrap_or(0)
}
#[must_use]
pub fn subscribe(&self) -> broadcast::Receiver<Event> {
self.inner.subscribe()
}
#[must_use]
pub fn receiver_count(&self) -> usize {
self.inner.receiver_count()
}
}
impl Default for EventBus {
fn default() -> Self {
Self::new(DEFAULT_BUS_CAPACITY)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn publish_round_trips_to_subscribers() {
let bus = EventBus::new(8);
let mut rx = bus.subscribe();
let delivered = bus.publish(Event::DeviceConnected);
assert_eq!(delivered, 1);
assert!(matches!(rx.recv().await.unwrap(), Event::DeviceConnected));
}
#[tokio::test]
async fn publish_with_no_subscribers_returns_zero() {
let bus = EventBus::new(8);
assert_eq!(bus.publish(Event::DeviceConnected), 0);
}
#[tokio::test]
async fn system_message_round_trips() {
let bus = EventBus::new(8);
let mut rx = bus.subscribe();
bus.publish(Event::SystemMessage("hello".into()));
match rx.recv().await.unwrap() {
Event::SystemMessage(text) => assert_eq!(text, "hello"),
other => panic!("unexpected event: {other:?}"),
}
}
#[tokio::test]
async fn command_event_round_trips() {
use crate::Command;
let bus = EventBus::new(8);
let mut rx = bus.subscribe();
bus.publish(Event::Command(Command::Quit));
match rx.recv().await.unwrap() {
Event::Command(Command::Quit) => {}
other => panic!("unexpected event: {other:?}"),
}
}
#[tokio::test]
async fn each_subscriber_sees_each_event() {
let bus = EventBus::new(8);
let mut a = bus.subscribe();
let mut b = bus.subscribe();
bus.publish(Event::DeviceConnected);
assert!(matches!(a.recv().await.unwrap(), Event::DeviceConnected));
assert!(matches!(b.recv().await.unwrap(), Event::DeviceConnected));
}
#[test]
fn zero_capacity_is_promoted_to_one() {
let _bus = EventBus::new(0);
}
#[test]
fn event_menu_opened_closed_are_clone() {
const fn assert_clone<T: Clone>() {}
assert_clone::<Event>();
assert!(matches!(Event::MenuOpened, Event::MenuOpened));
assert!(matches!(Event::MenuClosed, Event::MenuClosed));
}
#[test]
fn event_profile_saved_has_path() {
let ev = Event::ProfileSaved {
path: std::path::PathBuf::from("/tmp/x.toml"),
};
match ev {
Event::ProfileSaved { path } => {
assert_eq!(path, std::path::PathBuf::from("/tmp/x.toml"));
}
_ => panic!("wrong variant"),
}
}
#[test]
fn event_modem_lines_changed_carries_both_booleans() {
let ev = Event::ModemLinesChanged {
dtr: true,
rts: false,
};
match ev {
Event::ModemLinesChanged { dtr, rts } => {
assert!(dtr);
assert!(!rts);
}
_ => panic!("wrong variant"),
}
}
#[test]
fn event_profile_load_failed_has_path_and_error() {
use std::sync::Arc;
let err = crate::error::Error::InvalidConfig("boom".into());
let ev = Event::ProfileLoadFailed {
path: std::path::PathBuf::from("/tmp/bad.toml"),
error: Arc::new(err),
};
match ev {
Event::ProfileLoadFailed { path, error } => {
assert_eq!(path, std::path::PathBuf::from("/tmp/bad.toml"));
assert!(error.to_string().contains("boom"));
}
_ => panic!("wrong variant"),
}
}
}