use crate::events::GameEvent;
const DEFAULT_CAPACITY: usize = 256;
#[derive(Clone)]
pub struct EventBus {
sender: tokio::sync::broadcast::Sender<GameEvent>,
}
impl EventBus {
pub fn new(capacity: usize) -> Self {
let capacity = capacity.max(1);
let (sender, _) = tokio::sync::broadcast::channel(capacity);
Self { sender }
}
pub fn with_default_capacity() -> Self {
Self::new(DEFAULT_CAPACITY)
}
pub fn send(&self, event: GameEvent) -> usize {
if let Ok(n) = self.sender.send(event) {
n
} else {
::log::debug!("event bus: no active subscribers, event dropped");
0
}
}
pub fn subscribe(&self) -> Subscriber {
let receiver = self.sender.subscribe();
Subscriber { receiver }
}
pub fn subscriber_count(&self) -> usize {
self.sender.receiver_count()
}
}
impl std::fmt::Debug for EventBus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EventBus")
.field("subscriber_count", &self.sender.receiver_count())
.finish()
}
}
pub struct Subscriber {
receiver: tokio::sync::broadcast::Receiver<GameEvent>,
}
impl Subscriber {
pub async fn recv(&mut self) -> Option<GameEvent> {
loop {
match self.receiver.recv().await {
Ok(event) => return Some(event),
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
::log::warn!("event bus subscriber lagged: {n} message(s) skipped");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
return None;
}
}
}
}
}
impl std::fmt::Debug for Subscriber {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Subscriber").finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::events::{EventMetadata, GameStateEvent, SessionEvent};
use chrono::{TimeZone, Utc};
type TestResult = Result<(), Box<dyn std::error::Error>>;
fn make_metadata(raw: &[u8]) -> EventMetadata {
let timestamp = Utc
.with_ymd_and_hms(2026, 2, 25, 12, 0, 0)
.single()
.unwrap_or_default();
EventMetadata::new(Some(timestamp), raw.to_vec())
}
fn make_game_state_event(label: &str) -> GameEvent {
let meta = make_metadata(label.as_bytes());
let payload = serde_json::json!({"type": label});
GameEvent::GameState(GameStateEvent::new(meta, payload))
}
fn make_session_event(label: &str) -> GameEvent {
let meta = make_metadata(label.as_bytes());
let payload = serde_json::json!({"action": label});
GameEvent::Session(SessionEvent::new(meta, payload))
}
#[test]
fn test_new_creates_bus_with_zero_subscribers() {
let bus = EventBus::new(16);
assert_eq!(bus.subscriber_count(), 0);
}
#[test]
fn test_with_default_capacity_creates_bus() {
let bus = EventBus::with_default_capacity();
assert_eq!(bus.subscriber_count(), 0);
}
#[test]
fn test_new_clamps_capacity_minimum_to_one() {
let bus = EventBus::new(0);
assert_eq!(bus.subscriber_count(), 0);
}
#[test]
fn test_subscribe_increments_subscriber_count() {
let bus = EventBus::new(16);
let _sub1 = bus.subscribe();
assert_eq!(bus.subscriber_count(), 1);
let _sub2 = bus.subscribe();
assert_eq!(bus.subscriber_count(), 2);
}
#[test]
fn test_subscriber_drop_decrements_count() {
let bus = EventBus::new(16);
let sub = bus.subscribe();
assert_eq!(bus.subscriber_count(), 1);
drop(sub);
assert_eq!(bus.subscriber_count(), 0);
}
#[test]
fn test_subscribe_dynamically_after_send() {
let bus = EventBus::new(16);
bus.send(make_game_state_event("before-sub"));
let _sub = bus.subscribe();
assert_eq!(bus.subscriber_count(), 1);
}
#[test]
fn test_send_no_subscribers_returns_zero() {
let bus = EventBus::new(16);
let count = bus.send(make_game_state_event("test"));
assert_eq!(count, 0);
}
#[test]
fn test_send_with_one_subscriber_returns_one() {
let bus = EventBus::new(16);
let _sub = bus.subscribe();
let count = bus.send(make_game_state_event("test"));
assert_eq!(count, 1);
}
#[test]
fn test_send_with_multiple_subscribers_returns_count() {
let bus = EventBus::new(16);
let _sub1 = bus.subscribe();
let _sub2 = bus.subscribe();
let _sub3 = bus.subscribe();
let count = bus.send(make_game_state_event("test"));
assert_eq!(count, 3);
}
#[tokio::test]
async fn test_recv_receives_sent_event() -> TestResult {
let bus = EventBus::new(16);
let mut sub = bus.subscribe();
let sent = make_game_state_event("hello");
bus.send(sent.clone());
let received = sub.recv().await;
assert_eq!(received, Some(sent));
Ok(())
}
#[tokio::test]
async fn test_recv_preserves_event_order() -> TestResult {
let bus = EventBus::new(16);
let mut sub = bus.subscribe();
let events: Vec<GameEvent> = (0..5)
.map(|i| make_game_state_event(&format!("event-{i}")))
.collect();
for event in &events {
bus.send(event.clone());
}
for expected in &events {
let received = sub.recv().await;
assert_eq!(received.as_ref(), Some(expected));
}
Ok(())
}
#[tokio::test]
async fn test_recv_returns_none_when_bus_dropped() -> TestResult {
let bus = EventBus::new(16);
let mut sub = bus.subscribe();
drop(bus);
let received = sub.recv().await;
assert_eq!(received, None);
Ok(())
}
#[tokio::test]
async fn test_fan_out_all_subscribers_receive_same_event() -> TestResult {
let bus = EventBus::new(16);
let mut sub1 = bus.subscribe();
let mut sub2 = bus.subscribe();
let mut sub3 = bus.subscribe();
let event = make_game_state_event("fan-out");
bus.send(event.clone());
assert_eq!(sub1.recv().await, Some(event.clone()));
assert_eq!(sub2.recv().await, Some(event.clone()));
assert_eq!(sub3.recv().await, Some(event));
Ok(())
}
#[tokio::test]
async fn test_fan_out_multiple_events_to_multiple_subscribers() -> TestResult {
let bus = EventBus::new(16);
let mut sub1 = bus.subscribe();
let mut sub2 = bus.subscribe();
let event_a = make_game_state_event("alpha");
let event_b = make_session_event("beta");
bus.send(event_a.clone());
bus.send(event_b.clone());
assert_eq!(sub1.recv().await, Some(event_a.clone()));
assert_eq!(sub1.recv().await, Some(event_b.clone()));
assert_eq!(sub2.recv().await, Some(event_a));
assert_eq!(sub2.recv().await, Some(event_b));
Ok(())
}
#[tokio::test]
async fn test_fan_out_different_event_types() -> TestResult {
let bus = EventBus::new(16);
let mut sub = bus.subscribe();
let gs_event = make_game_state_event("game-state");
let sess_event = make_session_event("session");
bus.send(gs_event.clone());
bus.send(sess_event.clone());
let r1 = sub.recv().await;
let r2 = sub.recv().await;
assert_eq!(r1, Some(gs_event));
assert_eq!(r2, Some(sess_event));
Ok(())
}
#[tokio::test]
async fn test_slow_subscriber_skips_lagged_messages() -> TestResult {
let bus = EventBus::new(4);
let mut sub = bus.subscribe();
for i in 0..6 {
bus.send(make_game_state_event(&format!("event-{i}")));
}
let mut received = Vec::new();
for _ in 0..4 {
if let Some(event) = sub.recv().await {
received.push(event);
}
}
assert!(
!received.is_empty(),
"subscriber should receive at least one event after lag"
);
Ok(())
}
#[tokio::test]
async fn test_slow_subscriber_does_not_block_sender() -> TestResult {
let bus = EventBus::new(2);
let _sub = bus.subscribe();
for i in 0..10 {
bus.send(make_game_state_event(&format!("event-{i}")));
}
Ok(())
}
#[tokio::test]
async fn test_late_subscriber_only_sees_future_events() -> TestResult {
let bus = EventBus::new(16);
bus.send(make_game_state_event("before"));
let mut sub = bus.subscribe();
let after = make_game_state_event("after");
bus.send(after.clone());
let received = sub.recv().await;
assert_eq!(received, Some(after));
Ok(())
}
#[tokio::test]
async fn test_multiple_dynamic_subscribers_at_different_times() -> TestResult {
let bus = EventBus::new(16);
let mut sub1 = bus.subscribe();
let event1 = make_game_state_event("first");
bus.send(event1.clone());
let mut sub2 = bus.subscribe();
let event2 = make_session_event("second");
bus.send(event2.clone());
assert_eq!(sub1.recv().await, Some(event1));
assert_eq!(sub1.recv().await, Some(event2.clone()));
assert_eq!(sub2.recv().await, Some(event2));
Ok(())
}
#[test]
fn test_event_bus_debug_format() {
let bus = EventBus::new(16);
let _sub = bus.subscribe();
let debug = format!("{bus:?}");
assert!(debug.contains("EventBus"));
assert!(debug.contains("subscriber_count"));
}
#[test]
fn test_subscriber_debug_format() {
let bus = EventBus::new(16);
let sub = bus.subscribe();
let debug = format!("{sub:?}");
assert!(debug.contains("Subscriber"));
}
#[tokio::test]
async fn test_recv_waits_for_event() -> TestResult {
let bus = EventBus::new(16);
let mut sub = bus.subscribe();
let bus_clone_sender = bus.sender.clone();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let _ = bus_clone_sender.send(make_game_state_event("delayed"));
});
let received = tokio::time::timeout(std::time::Duration::from_secs(2), sub.recv()).await?;
assert!(received.is_some());
Ok(())
}
#[test]
fn test_send_returns_zero_after_all_subscribers_dropped() {
let bus = EventBus::new(16);
let sub = bus.subscribe();
drop(sub);
let count = bus.send(make_game_state_event("test"));
assert_eq!(count, 0);
}
#[tokio::test]
async fn test_subscriber_receives_after_other_subscriber_dropped() -> TestResult {
let bus = EventBus::new(16);
let sub1 = bus.subscribe();
let mut sub2 = bus.subscribe();
drop(sub1);
let event = make_game_state_event("after-drop");
bus.send(event.clone());
assert_eq!(sub2.recv().await, Some(event));
Ok(())
}
}