use std::fmt;
pub struct EventBus<E: Clone + Send + 'static> {
tx: tokio::sync::broadcast::Sender<E>,
}
impl<E: Clone + Send + 'static> EventBus<E> {
pub fn new(capacity: usize) -> Self {
let (tx, _rx) = tokio::sync::broadcast::channel(capacity);
drop(_rx);
Self { tx }
}
pub fn subscribe(&self) -> tokio::sync::broadcast::Receiver<E> {
self.tx.subscribe()
}
pub fn publish(&self, event: E) -> anyhow::Result<()> {
let _ = self.tx.send(event);
Ok(())
}
pub fn subscriber_count(&self) -> usize {
self.tx.receiver_count()
}
}
impl<E: Clone + Send + 'static> Clone for EventBus<E> {
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
}
}
}
impl<E: Clone + Send + 'static> fmt::Debug for EventBus<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("EventBus")
.field("subscribers", &self.tx.receiver_count())
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[derive(Debug, Clone, PartialEq, Eq)]
struct TestEvent {
name: String,
value: i32,
}
#[test]
fn test_new_bus() {
let bus: EventBus<TestEvent> = EventBus::new(16);
assert_eq!(bus.subscriber_count(), 0);
}
#[test]
fn test_publish_with_no_subscribers() {
let bus: EventBus<TestEvent> = EventBus::new(16);
assert!(bus
.publish(TestEvent {
name: "test".into(),
value: 1
})
.is_ok());
}
#[tokio::test]
async fn test_single_subscriber() {
let bus: EventBus<TestEvent> = EventBus::new(16);
let mut rx = bus.subscribe();
assert_eq!(bus.subscriber_count(), 1);
bus.publish(TestEvent {
name: "hello".into(),
value: 42,
})
.unwrap();
let event = rx.recv().await.unwrap();
assert_eq!(event.name, "hello");
assert_eq!(event.value, 42);
}
#[tokio::test]
async fn test_multiple_subscribers() {
let bus: EventBus<TestEvent> = EventBus::new(16);
let mut rx1 = bus.subscribe();
let mut rx2 = bus.subscribe();
bus.publish(TestEvent {
name: "broadcast".into(),
value: 99,
})
.unwrap();
let e1 = rx1.recv().await.unwrap();
let e2 = rx2.recv().await.unwrap();
assert_eq!(e1, e2);
}
#[tokio::test]
async fn test_late_subscriber_misses_events() {
let bus: EventBus<TestEvent> = EventBus::new(16);
bus.publish(TestEvent {
name: "early".into(),
value: 1,
})
.unwrap();
let mut rx = bus.subscribe();
assert!(rx.try_recv().is_err());
}
#[test]
fn test_clone() {
let bus: EventBus<TestEvent> = EventBus::new(16);
let bus2 = bus.clone();
assert_eq!(bus.subscriber_count(), 0);
assert_eq!(bus2.subscriber_count(), 0);
let _rx = bus.subscribe();
assert_eq!(bus2.subscriber_count(), 1);
}
#[tokio::test]
async fn test_generic_with_string() {
let bus: EventBus<String> = EventBus::new(64);
let mut rx = bus.subscribe();
bus.publish("hello world".into()).unwrap();
let msg = rx.recv().await.unwrap();
assert_eq!(msg, "hello world");
}
}