use std::sync::Arc;
use std::time::Duration;
use futures::Stream;
use tokio::sync::broadcast;
use tokio::time::timeout;
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::StreamExt as _;
use crate::{
error::{ClawDBError, ClawDBResult},
events::types::ClawEvent,
};
pub struct EventSubscriber {
receiver: broadcast::Receiver<Arc<ClawEvent>>,
}
impl EventSubscriber {
pub fn new(receiver: broadcast::Receiver<Arc<ClawEvent>>) -> Self {
Self { receiver }
}
pub async fn recv(&mut self) -> ClawDBResult<Arc<ClawEvent>> {
loop {
match self.receiver.recv().await {
Ok(event) => return Ok(event),
Err(broadcast::error::RecvError::Lagged(skipped)) => {
tracing::debug!(skipped, "EventSubscriber lagged, skipping events");
continue;
}
Err(broadcast::error::RecvError::Closed) => {
return Err(ClawDBError::EventBusError(
"event bus channel closed".to_string(),
));
}
}
}
}
pub async fn recv_matching<F>(
&mut self,
predicate: F,
deadline: Duration,
) -> ClawDBResult<Option<Arc<ClawEvent>>>
where
F: Fn(&ClawEvent) -> bool,
{
let result = timeout(deadline, async {
loop {
let event = self.recv().await?;
if predicate(&event) {
return Ok::<_, ClawDBError>(event);
}
}
})
.await;
match result {
Ok(Ok(event)) => Ok(Some(event)),
Ok(Err(e)) => Err(e),
Err(_elapsed) => Ok(None),
}
}
pub fn into_stream(self) -> impl Stream<Item = Arc<ClawEvent>> {
BroadcastStream::new(self.receiver).filter_map(|res| res.ok())
}
}