use std::fmt;
use tokio::sync::broadcast;
pub trait Event: Clone + Send + Sync + fmt::Debug + 'static {}
#[derive(Debug)]
pub struct EventBus<E: Event> {
sender: broadcast::Sender<E>,
}
impl<E: Event> EventBus<E> {
pub fn new(capacity: usize) -> Self {
let (sender, _) = broadcast::channel(capacity);
Self { sender }
}
pub fn publish(&self, event: E) -> usize {
self.sender.send(event).unwrap_or(0)
}
pub fn subscribe(&self) -> EventSubscription<E> {
EventSubscription {
receiver: self.sender.subscribe(),
}
}
pub fn subscriber_count(&self) -> usize {
self.sender.receiver_count()
}
}
impl<E: Event> Default for EventBus<E> {
fn default() -> Self {
Self::new(256)
}
}
impl<E: Event> Clone for EventBus<E> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
}
}
}
pub struct EventSubscription<E: Event> {
receiver: broadcast::Receiver<E>,
}
impl<E: Event> EventSubscription<E> {
pub async fn recv(&mut self) -> Option<E> {
loop {
match self.receiver.recv().await {
Ok(event) => return Some(event),
Err(broadcast::error::RecvError::Lagged(count)) => {
tracing::warn!(count, "event subscription lagged, dropped events");
continue;
}
Err(broadcast::error::RecvError::Closed) => return None,
}
}
}
}
impl<E: Event> fmt::Debug for EventSubscription<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("EventSubscription").finish_non_exhaustive()
}
}