use stateset_core::CommerceEvent;
use std::sync::atomic::{AtomicU64, Ordering};
use std::task::{Context, Poll};
use tokio::sync::broadcast;
pub struct EventBus {
sender: broadcast::Sender<CommerceEvent>,
events_published: AtomicU64,
events_publish_failures: AtomicU64,
}
impl std::fmt::Debug for EventBus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EventBus")
.field("events_published", &self.events_published.load(Ordering::Relaxed))
.field("receiver_count", &self.sender.receiver_count())
.finish_non_exhaustive()
}
}
impl EventBus {
pub fn new(capacity: usize) -> Self {
let (sender, _) = broadcast::channel(capacity);
Self {
sender,
events_published: AtomicU64::new(0),
events_publish_failures: AtomicU64::new(0),
}
}
pub fn publish(&self, event: CommerceEvent) -> usize {
self.events_published.fetch_add(1, Ordering::Relaxed);
let event_type = event.event_type().to_string();
let receiver_count = self.sender.receiver_count();
match self.sender.send(event) {
Ok(receivers) => receivers,
Err(error) => {
self.events_publish_failures.fetch_add(1, Ordering::Relaxed);
if receiver_count == 0 {
tracing::debug!(
event_type,
error = %error,
receiver_count,
"Dropped event publish: no active subscribers"
);
} else {
tracing::warn!(
event_type,
error = %error,
receiver_count,
"Failed to publish event to in-process subscribers"
);
}
0
}
}
}
pub fn subscribe(&self) -> EventSubscription {
EventSubscription { receiver: EventReceiver::new(self.sender.subscribe()) }
}
pub fn receiver_count(&self) -> usize {
self.sender.receiver_count()
}
pub fn events_published(&self) -> u64 {
self.events_published.load(Ordering::Relaxed)
}
pub fn events_publish_failures(&self) -> u64 {
self.events_publish_failures.load(Ordering::Relaxed)
}
}
pub struct EventReceiver {
inner: broadcast::Receiver<CommerceEvent>,
}
impl std::fmt::Debug for EventReceiver {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EventReceiver").finish_non_exhaustive()
}
}
impl EventReceiver {
const fn new(receiver: broadcast::Receiver<CommerceEvent>) -> Self {
Self { inner: receiver }
}
pub async fn recv(&mut self) -> Option<CommerceEvent> {
loop {
match self.inner.recv().await {
Ok(event) => return Some(event),
Err(broadcast::error::RecvError::Lagged(skipped)) => {
tracing::warn!(skipped, "Event receiver lagged, skipped events");
continue;
}
Err(broadcast::error::RecvError::Closed) => return None,
}
}
}
pub fn try_recv(&mut self) -> Option<CommerceEvent> {
self.inner.try_recv().ok()
}
fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<CommerceEvent>> {
loop {
let mut recv = std::pin::pin!(self.inner.recv());
match std::future::Future::poll(recv.as_mut(), cx) {
Poll::Ready(Ok(event)) => return Poll::Ready(Some(event)),
Poll::Ready(Err(broadcast::error::RecvError::Lagged(skipped))) => {
tracing::warn!(skipped, "Event receiver lagged, skipped events");
continue;
}
Poll::Ready(Err(broadcast::error::RecvError::Closed)) => return Poll::Ready(None),
Poll::Pending => return Poll::Pending,
}
}
}
}
pub struct EventSubscription {
receiver: EventReceiver,
}
impl std::fmt::Debug for EventSubscription {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EventSubscription").finish_non_exhaustive()
}
}
impl EventSubscription {
pub async fn recv(&mut self) -> Option<CommerceEvent> {
self.receiver.recv().await
}
pub fn try_recv(&mut self) -> Option<CommerceEvent> {
self.receiver.try_recv()
}
}
impl futures::Stream for EventSubscription {
type Item = CommerceEvent;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.receiver.poll_recv(cx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
use rust_decimal_macros::dec;
#[tokio::test]
async fn test_event_bus_publish_subscribe() {
let bus = EventBus::new(16);
let mut sub1 = bus.subscribe();
let mut sub2 = bus.subscribe();
let event = CommerceEvent::OrderCreated {
order_id: stateset_core::OrderId::new(),
customer_id: stateset_core::CustomerId::new(),
total_amount: dec!(100.00),
item_count: 2,
timestamp: Utc::now(),
};
let receivers = bus.publish(event);
assert_eq!(receivers, 2);
let received1 = sub1.try_recv();
let received2 = sub2.try_recv();
assert!(received1.is_some());
assert!(received2.is_some());
}
#[tokio::test]
async fn test_event_bus_no_subscribers() {
let bus = EventBus::new(16);
let event = CommerceEvent::CustomerCreated {
customer_id: stateset_core::CustomerId::new(),
email: "test@example.com".to_string(),
timestamp: Utc::now(),
};
let receivers = bus.publish(event);
assert_eq!(receivers, 0);
}
#[test]
fn test_receiver_count() {
let bus = EventBus::new(16);
assert_eq!(bus.receiver_count(), 0);
let _sub1 = bus.subscribe();
assert_eq!(bus.receiver_count(), 1);
let _sub2 = bus.subscribe();
assert_eq!(bus.receiver_count(), 2);
}
#[test]
fn test_event_bus_publish_failure_tracking() {
let bus = EventBus::new(16);
let event = CommerceEvent::CustomerCreated {
customer_id: stateset_core::CustomerId::new(),
email: "test@example.com".to_string(),
timestamp: Utc::now(),
};
let receivers = bus.publish(event);
assert_eq!(receivers, 0);
assert_eq!(bus.events_published(), 1);
assert_eq!(bus.events_publish_failures(), 1);
}
#[tokio::test]
async fn test_event_subscription_stream_next_wakes_on_publish() {
use futures::StreamExt;
let bus = EventBus::new(16);
let mut sub = bus.subscribe();
let event = CommerceEvent::OrderCreated {
order_id: stateset_core::OrderId::new(),
customer_id: stateset_core::CustomerId::new(),
total_amount: dec!(100.00),
item_count: 2,
timestamp: Utc::now(),
};
let bus_for_publish = bus;
let publish_task = tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(25)).await;
bus_for_publish.publish(event.clone());
});
let got = tokio::time::timeout(std::time::Duration::from_millis(500), sub.next())
.await
.expect("timed out while waiting for published event");
publish_task.await.expect("publisher task failed");
assert!(got.is_some());
}
}