use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::{debug, trace, warn};
use crate::event::AstridEvent;
use crate::subscriber::SubscriberRegistry;
pub const DEFAULT_CHANNEL_CAPACITY: usize = 1024;
#[derive(Debug)]
pub struct EventBus {
sender: broadcast::Sender<Arc<AstridEvent>>,
registry: SubscriberRegistry,
capacity: usize,
}
impl EventBus {
#[must_use]
pub fn new() -> Self {
Self::with_capacity(DEFAULT_CHANNEL_CAPACITY)
}
#[must_use]
pub fn with_capacity(capacity: usize) -> Self {
let (sender, _) = broadcast::channel(capacity);
Self {
sender,
registry: SubscriberRegistry::new(),
capacity,
}
}
pub fn publish(&self, event: AstridEvent) -> usize {
let event = Arc::new(event);
trace!(event_type = %event.event_type(), "Publishing event");
self.registry.notify(&event);
if let Ok(count) = self.sender.send(Arc::clone(&event)) {
debug!(
event_type = %event.event_type(),
receiver_count = count,
"Event published"
);
count
} else {
trace!(event_type = %event.event_type(), "No receivers for event");
0
}
}
#[must_use]
pub fn subscribe(&self) -> EventReceiver {
EventReceiver {
receiver: self.sender.subscribe(),
}
}
#[must_use]
pub fn registry(&self) -> &SubscriberRegistry {
&self.registry
}
pub fn registry_mut(&mut self) -> &mut SubscriberRegistry {
&mut self.registry
}
#[must_use]
pub fn subscriber_count(&self) -> usize {
self.sender.receiver_count()
}
#[must_use]
pub fn capacity(&self) -> usize {
self.capacity
}
}
impl Default for EventBus {
fn default() -> Self {
Self::new()
}
}
impl Clone for EventBus {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
registry: SubscriberRegistry::new(),
capacity: self.capacity,
}
}
}
pub struct EventReceiver {
receiver: broadcast::Receiver<Arc<AstridEvent>>,
}
impl EventReceiver {
pub async fn recv(&mut self) -> Option<Arc<AstridEvent>> {
loop {
match self.receiver.recv().await {
Ok(event) => return Some(event),
Err(broadcast::error::RecvError::Lagged(count)) => {
warn!(skipped = count, "Event receiver lagged, events dropped");
},
Err(broadcast::error::RecvError::Closed) => return None,
}
}
}
pub fn try_recv(&mut self) -> Option<Arc<AstridEvent>> {
loop {
match self.receiver.try_recv() {
Ok(event) => return Some(event),
Err(broadcast::error::TryRecvError::Lagged(count)) => {
warn!(skipped = count, "Event receiver lagged, events dropped");
},
Err(
broadcast::error::TryRecvError::Empty | broadcast::error::TryRecvError::Closed,
) => return None,
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::event::EventMetadata;
#[tokio::test]
async fn test_event_bus_creation() {
let bus = EventBus::new();
assert_eq!(bus.capacity(), DEFAULT_CHANNEL_CAPACITY);
assert_eq!(bus.subscriber_count(), 0);
}
#[tokio::test]
async fn test_event_bus_with_capacity() {
let bus = EventBus::with_capacity(100);
assert_eq!(bus.capacity(), 100);
}
#[tokio::test]
async fn test_publish_and_receive() {
let bus = EventBus::new();
let mut receiver = bus.subscribe();
let event = AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "0.1.0".to_string(),
};
let count = bus.publish(event);
assert_eq!(count, 1);
let received = receiver.recv().await.unwrap();
assert_eq!(received.event_type(), "runtime_started");
}
#[tokio::test]
async fn test_multiple_subscribers() {
let bus = EventBus::new();
let mut receiver1 = bus.subscribe();
let mut receiver2 = bus.subscribe();
let event = AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "0.1.0".to_string(),
};
let count = bus.publish(event);
assert_eq!(count, 2);
let received1 = receiver1.recv().await.unwrap();
let received2 = receiver2.recv().await.unwrap();
assert_eq!(received1.event_type(), "runtime_started");
assert_eq!(received2.event_type(), "runtime_started");
}
#[tokio::test]
async fn test_no_subscribers() {
let bus = EventBus::new();
let event = AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "0.1.0".to_string(),
};
let count = bus.publish(event);
assert_eq!(count, 0);
}
#[tokio::test]
async fn test_try_recv_empty() {
let bus = EventBus::new();
let mut receiver = bus.subscribe();
let result = receiver.try_recv();
assert!(result.is_none());
}
#[tokio::test]
async fn test_try_recv_with_event() {
let bus = EventBus::new();
let mut receiver = bus.subscribe();
let event = AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "0.1.0".to_string(),
};
bus.publish(event);
let result = receiver.try_recv();
assert!(result.is_some());
}
#[tokio::test]
async fn test_subscriber_count() {
let bus = EventBus::new();
assert_eq!(bus.subscriber_count(), 0);
let _receiver1 = bus.subscribe();
assert_eq!(bus.subscriber_count(), 1);
let _receiver2 = bus.subscribe();
assert_eq!(bus.subscriber_count(), 2);
drop(_receiver1);
}
}