use fluers_core::{EventSink, RunEvent};
use tokio::sync::broadcast;
pub type Event = RunEvent;
#[derive(Clone)]
pub struct EventBus {
sender: broadcast::Sender<RunEvent>,
}
impl EventBus {
pub const DEFAULT_CAPACITY: usize = 256;
#[must_use]
pub fn new(capacity: usize) -> Self {
let capacity = capacity.max(1);
let (sender, _) = broadcast::channel(capacity);
Self { sender }
}
#[must_use]
pub fn new_default() -> Self {
Self::new(Self::DEFAULT_CAPACITY)
}
#[must_use]
pub fn subscribe(&self) -> broadcast::Receiver<RunEvent> {
self.sender.subscribe()
}
pub fn emit(&self, event: RunEvent) -> bool {
self.sender.send(event).is_ok()
}
}
impl EventSink for EventBus {
fn emit(&self, event: RunEvent) {
let _ = EventBus::emit(self, event);
}
}
impl Default for EventBus {
fn default() -> Self {
Self::new_default()
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use tokio::time::timeout;
use super::EventBus;
use fluers_core::{EventSink, RunEvent};
use uuid::Uuid;
const TEST_TIMEOUT: Duration = Duration::from_secs(2);
type TestResult = Result<(), Box<dyn std::error::Error + Send + Sync>>;
#[tokio::test]
async fn emit_delivers_to_subscriber() -> TestResult {
let bus = EventBus::new(16);
let session = Uuid::nil();
let mut receiver = bus.subscribe();
let handle = tokio::spawn(async move { receiver.recv().await.ok() });
assert!(bus.emit(RunEvent::SessionStarted { session }));
let received = timeout(TEST_TIMEOUT, handle).await??;
assert!(matches!(
received,
Some(RunEvent::SessionStarted { session: s }) if s == session
));
Ok(())
}
#[tokio::test]
async fn event_sink_trait_delegates_to_emit() -> TestResult {
let bus = EventBus::new(16);
let session = Uuid::nil();
let mut receiver = bus.subscribe();
EventSink::emit(&bus, RunEvent::SessionStarted { session });
let received = timeout(TEST_TIMEOUT, receiver.recv()).await?;
assert!(matches!(
received,
Ok(RunEvent::SessionStarted { session: s }) if s == session
));
Ok(())
}
#[tokio::test]
async fn emit_with_no_receivers_returns_false() {
let bus = EventBus::new(16);
let session = Uuid::nil();
assert!(!bus.emit(RunEvent::SessionStarted { session }));
}
#[tokio::test]
async fn multiple_subscribers_each_receive() -> TestResult {
let bus = EventBus::new(16);
let session = Uuid::nil();
let mut first = bus.subscribe();
let mut second = bus.subscribe();
assert!(bus.emit(RunEvent::TurnStarted { session, turn: 1 }));
let first_event = timeout(TEST_TIMEOUT, first.recv()).await?;
let second_event = timeout(TEST_TIMEOUT, second.recv()).await?;
assert!(matches!(
first_event,
Ok(RunEvent::TurnStarted { session: s, turn: 1 }) if s == session
));
assert!(matches!(
second_event,
Ok(RunEvent::TurnStarted { session: s, turn: 1 }) if s == session
));
Ok(())
}
}