use std::any::{Any, TypeId};
use std::sync::Arc;
use std::sync::atomic::Ordering;
use crate::error::EventBusError;
use crate::registry::{DispatchContext, RegistrySnapshot, TypeSlot, dispatch_slot, dispatch_sync_only_with_snapshot, dispatch_with_snapshot};
use crate::types::Event;
use super::EventBus;
impl EventBus {
pub(super) async fn publish_erased(
&self,
snapshot: &RegistrySnapshot,
slot: Option<&Arc<TypeSlot>>,
event: Arc<dyn Any + Send + Sync>,
event_name: &'static str,
dispatch_ctx: &DispatchContext<'_>,
) -> Result<(), EventBusError> {
let once_removed = if snapshot.global_middlewares.is_empty() && slot.is_none_or(|s| s.middlewares.is_empty()) {
match slot {
None => Vec::new(),
Some(s) => dispatch_slot(s.as_ref(), &event, event_name, dispatch_ctx).await,
}
} else {
dispatch_with_snapshot(snapshot, slot, event, event_name, dispatch_ctx).await?
};
if !once_removed.is_empty() {
let mut registry = self.inner.registry.lock().await;
for subscription_id in once_removed {
registry.remove_once(subscription_id);
}
self.refresh_snapshot_locked(®istry).await;
}
Ok(())
}
async fn publish_sync_only<E>(
&self,
snapshot: &RegistrySnapshot,
slot: Option<&Arc<TypeSlot>>,
event: E,
event_name: &'static str,
) -> Result<(), EventBusError>
where
E: Event + Clone,
{
let once_removed = dispatch_sync_only_with_snapshot(snapshot, slot, &event, event_name, &self.inner.notify_tx).await?;
if !once_removed.is_empty() {
let mut registry = self.inner.registry.lock().await;
for subscription_id in once_removed {
registry.remove_once(subscription_id);
}
self.refresh_snapshot_locked(®istry).await;
}
Ok(())
}
pub async fn publish<E>(&self, event: E) -> Result<(), EventBusError>
where
E: Event + Clone,
{
if self.inner.shutdown_called.load(Ordering::Acquire) {
return Err(EventBusError::Stopped);
}
let snapshot = self.inner.snapshot.load();
let event_type = TypeId::of::<E>();
let slot = snapshot.by_type.get(&event_type);
if slot.is_none() && snapshot.global_middlewares.is_empty() {
return Ok(());
}
let _permit = match self.inner.publish_permits.try_acquire() {
Ok(permit) => permit,
Err(tokio::sync::TryAcquireError::NoPermits) => self.inner.publish_permits.acquire().await.map_err(|_| EventBusError::Stopped)?,
Err(tokio::sync::TryAcquireError::Closed) => return Err(EventBusError::Stopped),
};
let sync_only =
!snapshot.global_has_async_middleware && slot.is_none_or(|slot| slot.async_listeners.is_empty() && !slot.has_async_middleware);
if sync_only {
self.publish_sync_only(snapshot.as_ref(), slot, event, std::any::type_name::<E>()).await
} else {
let dispatch_ctx = self.inner.full_dispatch_context();
self.publish_erased(snapshot.as_ref(), slot, Arc::new(event), std::any::type_name::<E>(), &dispatch_ctx)
.await
}
}
pub fn try_publish<E>(&self, event: E) -> Result<(), EventBusError>
where
E: Event + Clone,
{
if self.inner.shutdown_called.load(Ordering::Acquire) {
return Err(EventBusError::Stopped);
}
let snapshot = self.inner.snapshot.load_full();
let event_type = TypeId::of::<E>();
let slot = snapshot.by_type.get(&event_type);
if slot.is_none() && snapshot.global_middlewares.is_empty() {
return Ok(());
}
let sync_only =
!snapshot.global_has_async_middleware && slot.is_none_or(|slot| slot.async_listeners.is_empty() && !slot.has_async_middleware);
let Ok(permit) = Arc::clone(&self.inner.publish_permits).try_acquire_owned() else {
return Err(EventBusError::ChannelFull);
};
let slot = slot.cloned();
let bus = self.clone();
tokio::spawn(async move {
let _keep = permit;
let slot = slot.as_ref();
if sync_only {
if let Err(_err) = bus.publish_sync_only(snapshot.as_ref(), slot, event, std::any::type_name::<E>()).await {
#[cfg(feature = "trace")]
tracing::error!(error = %_err, "event_bus.try_publish.dispatch_failed");
}
} else {
let dispatch_ctx = bus.inner.full_dispatch_context();
if let Err(_err) = bus
.publish_erased(snapshot.as_ref(), slot, Arc::new(event), std::any::type_name::<E>(), &dispatch_ctx)
.await
{
#[cfg(feature = "trace")]
tracing::error!(error = %_err, "event_bus.try_publish.dispatch_failed");
}
}
});
Ok(())
}
}