use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{Mutex, broadcast};
use crate::error::EventBusError;
use crate::event::Event;
use crate::source::EventSource;
const DEFAULT_CHANNEL_CAPACITY: usize = 256;
pub struct EventBus {
sources: HashMap<String, Box<dyn EventSource>>,
sender: broadcast::Sender<Event>,
running: Arc<Mutex<bool>>,
capacity: usize,
}
impl EventBus {
pub fn new() -> Self {
Self::with_capacity(DEFAULT_CHANNEL_CAPACITY)
}
pub fn with_capacity(capacity: usize) -> Self {
let (sender, _) = broadcast::channel(capacity);
Self {
sources: HashMap::new(),
sender,
running: Arc::new(Mutex::new(false)),
capacity,
}
}
pub fn register<S: EventSource + 'static>(&mut self, source: S) {
let name = source.name().to_string();
self.sources.insert(name, Box::new(source));
}
pub fn has_source(&self, name: &str) -> bool {
self.sources.contains_key(name)
}
pub fn source_names(&self) -> Vec<&str> {
self.sources.keys().map(String::as_str).collect()
}
pub fn source_count(&self) -> usize {
self.sources.len()
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn subscribe(&self) -> broadcast::Receiver<Event> {
self.sender.subscribe()
}
pub async fn is_running(&self) -> bool {
*self.running.lock().await
}
pub fn publish(&self, event: Event) -> Result<(), EventBusError> {
self.sender
.send(event)
.map(|_| ())
.map_err(|_| EventBusError::ChannelClosed)
}
pub async fn poll_sources(&self) -> Result<usize, EventBusError> {
let mut count = 0;
for source in self.sources.values() {
match source.next_event().await {
Ok(Some(event)) => {
let _ = self.sender.send(event);
count += 1;
}
Ok(None) => {}
Err(e) => {
tracing::warn!(source = source.name(), error = %e, "source poll failed");
}
}
}
Ok(count)
}
}
impl Default for EventBus {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
struct FakeSource {
source_name: String,
events: tokio::sync::Mutex<Vec<Event>>,
}
impl FakeSource {
fn new(name: &str, events: Vec<Event>) -> Self {
Self {
source_name: name.to_string(),
events: tokio::sync::Mutex::new(events),
}
}
fn empty(name: &str) -> Self {
Self::new(name, vec![])
}
}
#[async_trait::async_trait]
impl EventSource for FakeSource {
fn name(&self) -> &str {
&self.source_name
}
#[allow(clippy::unnecessary_literal_bound)]
fn description(&self) -> &str {
"Fake source"
}
async fn next_event(&self) -> Result<Option<Event>, EventBusError> {
let mut events = self.events.lock().await;
if events.is_empty() {
Ok(None)
} else {
Ok(Some(events.remove(0)))
}
}
}
#[test]
fn new_creates_empty_bus() {
let bus = EventBus::new();
assert_eq!(bus.source_count(), 0);
assert!(bus.source_names().is_empty());
}
#[test]
fn default_creates_empty_bus() {
let bus = EventBus::default();
assert_eq!(bus.source_count(), 0);
}
#[test]
fn with_capacity_sets_capacity() {
let bus = EventBus::with_capacity(64);
assert_eq!(bus.capacity(), 64);
}
#[test]
fn register_adds_source() {
let mut bus = EventBus::new();
bus.register(FakeSource::empty("test"));
assert!(bus.has_source("test"));
assert_eq!(bus.source_count(), 1);
}
#[test]
fn register_multiple_sources() {
let mut bus = EventBus::new();
bus.register(FakeSource::empty("a"));
bus.register(FakeSource::empty("b"));
assert_eq!(bus.source_count(), 2);
assert!(bus.has_source("a"));
assert!(bus.has_source("b"));
}
#[test]
fn register_replaces_existing_source() {
let mut bus = EventBus::new();
bus.register(FakeSource::empty("src"));
bus.register(FakeSource::empty("src"));
assert_eq!(bus.source_count(), 1);
}
#[test]
fn has_source_returns_false_for_unknown() {
let bus = EventBus::new();
assert!(!bus.has_source("nonexistent"));
}
#[tokio::test]
async fn publish_sends_event_to_subscriber() {
let bus = EventBus::new();
let mut rx = bus.subscribe();
let event = Event::new("src", "test", json!(42));
bus.publish(event.clone()).unwrap();
let received = rx.recv().await.unwrap();
assert_eq!(received, event);
}
#[tokio::test]
async fn publish_sends_to_multiple_subscribers() {
let bus = EventBus::new();
let mut rx1 = bus.subscribe();
let mut rx2 = bus.subscribe();
let event = Event::new("src", "test", json!(null));
bus.publish(event.clone()).unwrap();
assert_eq!(rx1.recv().await.unwrap(), event);
assert_eq!(rx2.recv().await.unwrap(), event);
}
#[tokio::test]
async fn poll_sources_returns_zero_with_no_sources() {
let bus = EventBus::new();
let count = bus.poll_sources().await.unwrap();
assert_eq!(count, 0);
}
#[tokio::test]
async fn poll_sources_collects_event_from_source() {
let mut bus = EventBus::new();
let event = Event::new("fake", "polled", json!("data"));
bus.register(FakeSource::new("fake", vec![event.clone()]));
let mut rx = bus.subscribe();
let count = bus.poll_sources().await.unwrap();
assert_eq!(count, 1);
let received = rx.recv().await.unwrap();
assert_eq!(received, event);
}
#[tokio::test]
async fn poll_sources_returns_zero_when_sources_empty() {
let mut bus = EventBus::new();
bus.register(FakeSource::empty("empty"));
let count = bus.poll_sources().await.unwrap();
assert_eq!(count, 0);
}
#[tokio::test]
async fn poll_sources_collects_from_multiple_sources() {
let mut bus = EventBus::new();
let e1 = Event::new("src-a", "type", json!(1));
let e2 = Event::new("src-b", "type", json!(2));
bus.register(FakeSource::new("src-a", vec![e1]));
bus.register(FakeSource::new("src-b", vec![e2]));
let _rx = bus.subscribe();
let count = bus.poll_sources().await.unwrap();
assert_eq!(count, 2);
}
#[tokio::test]
async fn initially_not_running() {
let bus = EventBus::new();
assert!(!bus.is_running().await);
}
#[test]
fn source_names_returns_registered_names() {
let mut bus = EventBus::new();
bus.register(FakeSource::empty("alpha"));
bus.register(FakeSource::empty("beta"));
let mut names = bus.source_names();
names.sort_unstable();
assert_eq!(names, vec!["alpha", "beta"]);
}
}