use super::types::{Event, EventReceiver, EventSender, SubscriptionHandle};
use parking_lot::RwLock;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::{debug, trace, warn};
const DEFAULT_CHANNEL_CAPACITY: usize = 256;
#[derive(Debug, Clone)]
pub struct EventBrokerConfig {
pub channel_capacity: usize,
pub log_dropped_events: bool,
}
impl Default for EventBrokerConfig {
fn default() -> Self {
Self {
channel_capacity: DEFAULT_CHANNEL_CAPACITY,
log_dropped_events: true,
}
}
}
impl EventBrokerConfig {
pub fn with_capacity(capacity: usize) -> Self {
Self {
channel_capacity: capacity.max(1),
..Default::default()
}
}
}
struct SubscriberEntry<T>
where
T: Clone + Send + Sync,
{
sender: EventSender<T>,
handle: Arc<SubscriptionHandle>,
}
pub struct EventBroker<T>
where
T: Clone + Send + Sync + 'static,
{
subscribers: RwLock<HashMap<String, SubscriberEntry<T>>>,
config: EventBrokerConfig,
events_published: std::sync::atomic::AtomicU64,
events_dropped: std::sync::atomic::AtomicU64,
}
impl<T> EventBroker<T>
where
T: Clone + Send + Sync + 'static,
{
pub fn new() -> Self {
Self::with_config(EventBrokerConfig::default())
}
pub fn with_capacity(capacity: usize) -> Self {
Self::with_config(EventBrokerConfig::with_capacity(capacity))
}
pub fn with_config(config: EventBrokerConfig) -> Self {
debug!(
"Creating EventBroker with capacity: {}",
config.channel_capacity
);
Self {
subscribers: RwLock::new(HashMap::new()),
config,
events_published: std::sync::atomic::AtomicU64::new(0),
events_dropped: std::sync::atomic::AtomicU64::new(0),
}
}
pub fn subscribe(&self) -> (Arc<SubscriptionHandle>, EventReceiver<T>) {
let capacity = self.config.channel_capacity.max(1);
let (tx, rx) = mpsc::channel(capacity);
let handle = Arc::new(SubscriptionHandle::new());
let entry = SubscriberEntry {
sender: tx,
handle: handle.clone(),
};
{
let mut subscribers = self.subscribers.write();
subscribers.insert(handle.id.clone(), entry);
}
debug!("New subscriber registered: {}", handle.id);
(handle, rx)
}
pub fn subscribe_with_capacity(
&self,
capacity: usize,
) -> (Arc<SubscriptionHandle>, EventReceiver<T>) {
let capacity = capacity.max(1);
let (tx, rx) = mpsc::channel(capacity);
let handle = Arc::new(SubscriptionHandle::new());
let entry = SubscriberEntry {
sender: tx,
handle: handle.clone(),
};
{
let mut subscribers = self.subscribers.write();
subscribers.insert(handle.id.clone(), entry);
}
debug!(
"New subscriber registered with custom capacity {}: {}",
capacity, handle.id
);
(handle, rx)
}
pub fn unsubscribe(&self, handle: &SubscriptionHandle) -> bool {
handle.cancel();
let mut subscribers = self.subscribers.write();
let removed = subscribers.remove(&handle.id).is_some();
if removed {
debug!("Subscriber unsubscribed: {}", handle.id);
} else {
trace!("Attempted to unsubscribe unknown subscriber: {}", handle.id);
}
removed
}
pub fn unsubscribe_by_id(&self, id: &str) -> bool {
let mut subscribers = self.subscribers.write();
if let Some(entry) = subscribers.remove(id) {
entry.handle.cancel();
debug!("Subscriber unsubscribed by ID: {}", id);
true
} else {
trace!("Attempted to unsubscribe unknown subscriber ID: {}", id);
false
}
}
pub async fn publish(&self, event: Event<T>) -> usize {
self.events_published
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let subscribers_snapshot: Vec<(String, EventSender<T>, Arc<SubscriptionHandle>)> = {
let subscribers = self.subscribers.read();
subscribers
.iter()
.filter(|(_, entry)| entry.handle.is_active())
.map(|(id, entry)| (id.clone(), entry.sender.clone(), entry.handle.clone()))
.collect()
};
let mut delivered = 0;
let mut to_remove = Vec::new();
for (id, sender, handle) in subscribers_snapshot {
if !handle.is_active() {
to_remove.push(id);
continue;
}
match sender.try_send(event.clone()) {
Ok(()) => {
delivered += 1;
trace!("Event {} delivered to subscriber {}", event.id, id);
}
Err(mpsc::error::TrySendError::Full(_)) => {
self.events_dropped
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if self.config.log_dropped_events {
warn!(
"Event {} dropped for slow subscriber {}: channel full",
event.id, id
);
}
}
Err(mpsc::error::TrySendError::Closed(_)) => {
to_remove.push(id.clone());
debug!("Subscriber {} channel closed, marking for removal", id);
}
}
}
if !to_remove.is_empty() {
let mut subscribers = self.subscribers.write();
for id in to_remove {
if let Some(entry) = subscribers.remove(&id) {
entry.handle.cancel();
}
}
}
delivered
}
pub async fn publish_blocking(&self, event: Event<T>) -> usize {
self.events_published
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let subscribers_snapshot: Vec<(String, EventSender<T>, Arc<SubscriptionHandle>)> = {
let subscribers = self.subscribers.read();
subscribers
.iter()
.filter(|(_, entry)| entry.handle.is_active())
.map(|(id, entry)| (id.clone(), entry.sender.clone(), entry.handle.clone()))
.collect()
};
let mut delivered = 0;
let mut to_remove = Vec::new();
for (id, sender, handle) in subscribers_snapshot {
if !handle.is_active() {
to_remove.push(id);
continue;
}
match sender.send(event.clone()).await {
Ok(()) => {
delivered += 1;
trace!("Event {} delivered to subscriber {}", event.id, id);
}
Err(_) => {
to_remove.push(id.clone());
debug!("Subscriber {} channel closed, marking for removal", id);
}
}
}
if !to_remove.is_empty() {
let mut subscribers = self.subscribers.write();
for id in to_remove {
if let Some(entry) = subscribers.remove(&id) {
entry.handle.cancel();
}
}
}
delivered
}
pub fn subscriber_count(&self) -> usize {
let subscribers = self.subscribers.read();
subscribers
.values()
.filter(|entry| entry.handle.is_active())
.count()
}
pub fn events_published(&self) -> u64 {
self.events_published
.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn events_dropped(&self) -> u64 {
self.events_dropped
.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn has_subscribers(&self) -> bool {
self.subscriber_count() > 0
}
pub fn clear(&self) {
let mut subscribers = self.subscribers.write();
for (_, entry) in subscribers.drain() {
entry.handle.cancel();
}
debug!("All subscribers cleared");
}
pub fn stats(&self) -> EventBrokerStats {
EventBrokerStats {
subscriber_count: self.subscriber_count(),
events_published: self.events_published(),
events_dropped: self.events_dropped(),
channel_capacity: self.config.channel_capacity,
}
}
}
impl<T> Default for EventBroker<T>
where
T: Clone + Send + Sync + 'static,
{
fn default() -> Self {
Self::new()
}
}
impl<T> std::fmt::Debug for EventBroker<T>
where
T: Clone + Send + Sync + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EventBroker")
.field("subscriber_count", &self.subscriber_count())
.field("events_published", &self.events_published())
.field("events_dropped", &self.events_dropped())
.field("channel_capacity", &self.config.channel_capacity)
.finish()
}
}
#[derive(Debug, Clone)]
pub struct EventBrokerStats {
pub subscriber_count: usize,
pub events_published: u64,
pub events_dropped: u64,
pub channel_capacity: usize,
}