#[cfg(feature = "metrics")]
use metrics::counter;
use std::any::{Any, TypeId};
use std::sync::Arc;
use crate::error::EventBusError;
use crate::registry::{DispatchContext, RegistrySnapshot, TypeSlot, dispatch_slot, dispatch_sync_only_with_snapshot, dispatch_with_snapshot};
use crate::types::Event;
use super::EventBus;
impl EventBus {
pub(crate) async fn publish_erased(
&self,
snapshot: &RegistrySnapshot,
slot: Option<&Arc<TypeSlot>>,
event: Arc<dyn Any + Send + Sync>,
event_name: &'static str,
dispatch_ctx: &DispatchContext<'_>,
) -> Result<(), EventBusError> {
let once_removed = if snapshot.global_middlewares.is_empty() && slot.is_none_or(|s| s.middlewares.is_empty()) {
#[cfg(feature = "metrics")]
counter!("eventbus.publish", "event" => event_name).increment(1);
match slot {
None => Vec::new(),
Some(s) => dispatch_slot(s.as_ref(), &event, event_name, dispatch_ctx).await,
}
} else {
dispatch_with_snapshot(snapshot, slot, event, event_name, dispatch_ctx).await?
};
if !once_removed.is_empty() {
let mut registry = self.inner.registry.lock().await;
for subscription_id in once_removed {
registry.remove_once(subscription_id);
}
self.refresh_snapshot_locked(®istry).await;
}
Ok(())
}
pub(crate) async fn publish_sync_only(
&self,
snapshot: &RegistrySnapshot,
slot: Option<&Arc<TypeSlot>>,
event: &Arc<dyn std::any::Any + Send + Sync>,
event_name: &'static str,
) -> Result<(), EventBusError> {
let dispatch_ctx = self.inner.sync_only_dispatch_context(self);
let once_removed = dispatch_sync_only_with_snapshot(snapshot, slot, event, event_name, &dispatch_ctx).await?;
if !once_removed.is_empty() {
let mut registry = self.inner.registry.lock().await;
for subscription_id in once_removed {
registry.remove_once(subscription_id);
}
self.refresh_snapshot_locked(®istry).await;
}
Ok(())
}
#[must_use = "publish returns a Future that must be awaited, and a Result that should not be discarded"]
pub async fn publish<E>(&self, event: E) -> Result<(), EventBusError>
where
E: Event,
{
let _dispatch = self.begin_dispatch()?;
let snapshot = self.inner.snapshot.load();
let event_type = TypeId::of::<E>();
let slot = snapshot.by_type.get(&event_type);
if slot.is_none() && snapshot.global_middlewares.is_empty() {
return Ok(());
}
let sync_only =
!snapshot.global_has_async_middleware && slot.is_none_or(|slot| slot.async_listeners.is_empty() && !slot.has_async_middleware);
let event: Arc<dyn Any + Send + Sync> = Arc::new(event);
if sync_only {
self.publish_sync_only(snapshot.as_ref(), slot, &event, std::any::type_name::<E>()).await
} else {
let dispatch_ctx = self.inner.full_dispatch_context(self);
self.publish_erased(snapshot.as_ref(), slot, event, std::any::type_name::<E>(), &dispatch_ctx)
.await
}
}
}