use std::collections::HashSet;
use tokio::sync::broadcast;
use tracing::debug;
use super::{Event, EventBus, EventTopic, RuntimeEvent};
pub struct TopicSubscriber {
receiver: broadcast::Receiver<RuntimeEvent>,
topics: HashSet<EventTopic>,
}
impl TopicSubscriber {
pub(super) fn new(
receiver: broadcast::Receiver<RuntimeEvent>,
topics: HashSet<EventTopic>,
) -> Self {
Self { receiver, topics }
}
pub async fn recv(&mut self) -> Option<RuntimeEvent> {
loop {
match self.receiver.recv().await {
Ok(event) => {
if self.topics.contains(&event.topic()) {
return Some(event);
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
debug!("TopicSubscriber lagged, missed {n} events");
}
Err(broadcast::error::RecvError::Closed) => return None,
}
}
}
pub fn topics(&self) -> &HashSet<EventTopic> {
&self.topics
}
}
pub struct FilteredSubscriber {
receiver: broadcast::Receiver<RuntimeEvent>,
event_types: Option<Vec<String>>,
}
impl FilteredSubscriber {
pub fn new(bus: &EventBus, event_types: Option<Vec<String>>) -> Self {
Self {
receiver: bus.subscribe(),
event_types,
}
}
pub async fn recv(&mut self) -> Option<Event> {
loop {
match self.receiver.recv().await {
Ok(runtime_event) => {
let legacy = match &runtime_event {
RuntimeEvent::Custom {
event_type,
source,
data,
timestamp_ms,
} => Event {
event_type: event_type.clone(),
source: source.clone(),
data: data.clone(),
timestamp_ms: *timestamp_ms,
},
other => Event {
event_type: format!("{:?}", other.topic()),
source: String::new(),
data: serde_json::to_value(other).unwrap_or(serde_json::Value::Null),
timestamp_ms: other.timestamp_ms(),
},
};
if let Some(ref types) = self.event_types
&& !types.iter().any(|t| t == &legacy.event_type)
{
continue;
}
return Some(legacy);
}
Err(broadcast::error::RecvError::Lagged(n)) => {
debug!("Subscriber lagged, missed {n} events");
continue;
}
Err(broadcast::error::RecvError::Closed) => return None,
}
}
}
}