use tokio::sync::broadcast;
use crate::DEFAULT_CHANNEL_SIZE;
#[derive(Clone)]
pub struct EventsBus<T> {
event_tx: broadcast::Sender<T>,
}
impl<T: Clone> EventsBus<T> {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
Self::new_with_size(DEFAULT_CHANNEL_SIZE)
}
pub fn new_with_size(size: usize) -> Self {
Self {
event_tx: broadcast::channel(size).0,
}
}
pub fn send(&self, event: T) {
let _ = self.event_tx.send(event);
}
pub fn subscribe(&self) -> EventsRx<'_, T> {
EventsRx::subscribe(&self.event_tx)
}
}
pub struct EventsRx<'a, T> {
_event_tx: &'a broadcast::Sender<T>,
event_rx: broadcast::Receiver<T>,
}
impl<'a, T: Clone> EventsRx<'a, T> {
fn subscribe(event_tx: &'a broadcast::Sender<T>) -> Self {
Self {
_event_tx: event_tx,
event_rx: event_tx.subscribe(),
}
}
pub async fn recv(&mut self) -> T {
self.recv_filtered(|_| true).await
}
pub async fn recv_filtered(&mut self, filter: impl Fn(&T) -> bool) -> T {
use tokio::sync::broadcast::error::RecvError;
loop {
match self.event_rx.recv().await {
Ok(event) =>
if filter(&event) {
return event;
},
Err(RecvError::Closed) => unreachable!(
"This cannot happen. We currently have a handle to the \
`event_tx` sender, so the channel cannot be closed."
),
Err(RecvError::Lagged(_)) => (),
}
}
}
}