jaeb 0.5.0

simple snapshot-driven event bus
Documentation
#[cfg(feature = "metrics")]
use metrics::counter;
use std::any::{Any, TypeId};
use std::sync::Arc;

use crate::error::EventBusError;
use crate::registry::{DispatchContext, RegistrySnapshot, TypeSlot, dispatch_slot, dispatch_sync_only_with_snapshot, dispatch_with_snapshot};
use crate::types::Event;

use super::EventBus;

impl EventBus {
    pub(crate) async fn publish_erased(
        &self,
        snapshot: &RegistrySnapshot,
        slot: Option<&Arc<TypeSlot>>,
        event: Arc<dyn Any + Send + Sync>,
        event_name: &'static str,
        dispatch_ctx: &DispatchContext<'_>,
    ) -> Result<(), EventBusError> {
        // Fast path: no middleware at all — skip dispatch_with_snapshot indirection.
        let once_removed = if snapshot.global_middlewares.is_empty() && slot.is_none_or(|s| s.middlewares.is_empty()) {
            #[cfg(feature = "metrics")]
            counter!("eventbus.publish", "event" => event_name).increment(1);

            match slot {
                None => Vec::new(),
                Some(s) => dispatch_slot(s.as_ref(), &event, event_name, dispatch_ctx).await,
            }
        } else {
            dispatch_with_snapshot(snapshot, slot, event, event_name, dispatch_ctx).await?
        };

        if !once_removed.is_empty() {
            let mut registry = self.inner.registry.lock().await;
            for subscription_id in once_removed {
                registry.remove_once(subscription_id);
            }
            self.refresh_snapshot_locked(&registry).await;
        }

        Ok(())
    }

    pub(crate) async fn publish_sync_only(
        &self,
        snapshot: &RegistrySnapshot,
        slot: Option<&Arc<TypeSlot>>,
        event: &Arc<dyn std::any::Any + Send + Sync>,
        event_name: &'static str,
    ) -> Result<(), EventBusError> {
        let dispatch_ctx = self.inner.sync_only_dispatch_context(self);
        let once_removed = dispatch_sync_only_with_snapshot(snapshot, slot, event, event_name, &dispatch_ctx).await?;

        if !once_removed.is_empty() {
            let mut registry = self.inner.registry.lock().await;
            for subscription_id in once_removed {
                registry.remove_once(subscription_id);
            }
            self.refresh_snapshot_locked(&registry).await;
        }

        Ok(())
    }

    /// Publish an event to all registered listeners.
    ///
    /// Dispatch behaviour depends on handler type:
    /// - **Sync handlers** run inline; `publish` waits for each one to return.
    /// - **Async handlers** are spawned as tracked async tasks; `publish`
    ///   returns once all matching async listeners have been *spawned*, not
    ///   necessarily completed.
    ///
    /// Events with no registered listeners (and no global middleware) are
    /// silently dropped without allocating.
    ///
    /// # Errors
    ///
    /// - [`EventBusError::Stopped`] — the bus has been shut down.
    /// - [`EventBusError::MiddlewareRejected`] — a middleware rejected the event.
    #[must_use = "publish returns a Future that must be awaited, and a Result that should not be discarded"]
    pub async fn publish<E>(&self, event: E) -> Result<(), EventBusError>
    where
        E: Event,
    {
        let _dispatch = self.begin_dispatch()?;

        let snapshot = self.inner.snapshot.load();
        let event_type = TypeId::of::<E>();
        let slot = snapshot.by_type.get(&event_type);
        if slot.is_none() && snapshot.global_middlewares.is_empty() {
            return Ok(());
        }

        let sync_only =
            !snapshot.global_has_async_middleware && slot.is_none_or(|slot| slot.async_listeners.is_empty() && !slot.has_async_middleware);

        let event: Arc<dyn Any + Send + Sync> = Arc::new(event);

        if sync_only {
            self.publish_sync_only(snapshot.as_ref(), slot, &event, std::any::type_name::<E>()).await
        } else {
            let dispatch_ctx = self.inner.full_dispatch_context(self);
            self.publish_erased(snapshot.as_ref(), slot, event, std::any::type_name::<E>(), &dispatch_ctx)
                .await
        }
    }
}