mod events;
mod subscribers;
mod utils;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::debug;
pub use self::events::{Event, EventTopic, RuntimeEvent, now_ms};
pub use self::subscribers::{FilteredSubscriber, TopicSubscriber};
pub use self::utils::{group_events_by_type, group_runtime_events_by_topic};
const DEFAULT_CAPACITY: usize = 256;
#[derive(Clone)]
pub struct EventBus {
inner: Arc<EventBusInner>,
}
struct EventBusInner {
sender: broadcast::Sender<RuntimeEvent>,
_capacity: usize,
}
impl EventBus {
pub fn new() -> Self {
Self::with_capacity(DEFAULT_CAPACITY)
}
pub fn with_capacity(capacity: usize) -> Self {
let (sender, _) = broadcast::channel(capacity);
Self {
inner: Arc::new(EventBusInner {
sender,
_capacity: capacity,
}),
}
}
pub fn publish(&self, event: RuntimeEvent) {
let topic = event.topic();
match self.inner.sender.send(event) {
Ok(n) => debug!("Event {:?} sent to {} subscribers", topic, n),
Err(_) => debug!("Event {:?} published with no subscribers", topic),
}
}
pub fn emit(&self, event_type: &str, source: &str, data: serde_json::Value) {
let event = Event::new(event_type, source, data);
self.publish(event.into_runtime_event());
}
pub fn subscribe(&self) -> broadcast::Receiver<RuntimeEvent> {
self.inner.sender.subscribe()
}
pub fn subscribe_topics(&self, topics: HashSet<EventTopic>) -> TopicSubscriber {
TopicSubscriber::new(self.inner.sender.subscribe(), topics)
}
pub fn subscriber_count(&self) -> usize {
self.inner.sender.receiver_count()
}
}
impl Default for EventBus {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Debug for EventBus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EventBus")
.field("subscribers", &self.subscriber_count())
.finish()
}
}
#[cfg(test)]
mod tests;