use std::sync::Arc;
use tokio::sync::broadcast;
use tokio_stream::Stream;
use tokio_stream::wrappers::BroadcastStream;
use super::types::DownloadEvent;
#[derive(Clone)]
pub struct EventBus {
tx: broadcast::Sender<Arc<DownloadEvent>>,
}
impl EventBus {
pub fn new(capacity: usize) -> Self {
tracing::debug!(capacity = capacity, "⚙️ Creating new EventBus");
let (tx, _) = broadcast::channel(capacity);
Self { tx }
}
pub fn with_default_capacity() -> Self {
Self::new(1024)
}
pub fn emit(&self, event: impl Into<Arc<DownloadEvent>>) -> usize {
let event = event.into();
let event_type = event.event_type();
let download_id = event.download_id();
tracing::debug!(
event_type = event_type,
download_id = download_id,
subscriber_count = self.subscriber_count(),
"🔔 Emitting event"
);
let receiver_count = self.tx.send(event).unwrap_or(0);
tracing::debug!(
event_type = event_type,
download_id = download_id,
receivers_notified = receiver_count,
"✅ Event emitted"
);
receiver_count
}
pub fn emit_if_subscribed(&self, event: DownloadEvent) -> bool {
if self.tx.receiver_count() > 0 {
self.emit(event) > 0
} else {
false
}
}
pub fn subscribe(&self) -> broadcast::Receiver<Arc<DownloadEvent>> {
tracing::debug!(
subscriber_count_before = self.subscriber_count(),
"🔔 Creating new subscriber"
);
let receiver = self.tx.subscribe();
tracing::debug!(
subscriber_count_after = self.subscriber_count(),
"✅ Subscriber created"
);
receiver
}
pub fn stream(
&self,
) -> impl Stream<Item = Result<Arc<DownloadEvent>, tokio_stream::wrappers::errors::BroadcastStreamRecvError>> {
BroadcastStream::new(self.subscribe())
}
pub fn subscriber_count(&self) -> usize {
self.tx.receiver_count()
}
pub fn has_subscribers(&self) -> bool {
self.tx.receiver_count() > 0
}
}
impl Default for EventBus {
fn default() -> Self {
Self::with_default_capacity()
}
}
impl std::fmt::Debug for EventBus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EventBus")
.field("subscriber_count", &self.subscriber_count())
.finish()
}
}
impl std::fmt::Display for EventBus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "EventBus(subscribers={})", self.subscriber_count())
}
}