use crate::Event;
use async_trait::async_trait;
use tokio::sync::broadcast::{Receiver as EventReceiver, error::RecvError};
use tracing::{debug, warn};
pub struct Sink<E: Event> {
subscriber: Box<dyn Subscriber<E>>,
event_receiver: EventReceiver<E>,
}
impl<E: Event> Sink<E> {
pub fn new(
event_receiver: EventReceiver<E>,
subscriber: impl Subscriber<E>,
) -> Self {
Self {
subscriber: Box::new(subscriber),
event_receiver,
}
}
pub async fn run(&mut self) {
loop {
match self.event_receiver.recv().await {
Ok(event) => {
self.subscriber.notify(event).await;
}
Err(error) => match error {
RecvError::Closed => {
debug!("Event channel closed, sink stopping");
break;
}
RecvError::Lagged(skipped) => {
warn!(skipped, "Sink lagged, skipped events");
continue;
}
},
}
}
}
}
#[async_trait]
pub trait Subscriber<E: Event>: Send + Sync + 'static {
async fn notify(&self, event: E);
}