rust_bus 3.0.6

bus — Lightweight CQRS Library for Rust
Documentation
use crate::contracts::bus_event::IBusEvent;
use crate::dispatch::registration::MEMORY_HANDLERS;
use crate::error::BusError;
use futures::FutureExt;
use std::panic::AssertUnwindSafe;

#[cfg(not(feature = "context"))]
pub(crate) async fn dispatch_in_memory<'a, TEvent>(
    #[cfg(feature = "_db_any")]
    #[cfg(feature = "sqlx-postgres")]
    txn: &mut sqlx::Transaction<'a, sqlx::Postgres>,
    #[cfg(feature = "sqlx-mysql")] txn: &mut sqlx::Transaction<'a, sqlx::MySql>,
    #[cfg(feature = "_db_sea_orm")] txn: &'a sea_orm::DatabaseTransaction,
    event: &TEvent,
    metadata: &crate::contracts::meta::BusMetadata,
) -> Result<usize, Box<dyn std::error::Error + Send + Sync>>
where
    TEvent: IBusEvent,
{
    let event_identity = TEvent::EVENT_IDENTITY;
    let event_ptr = event as *const TEvent as usize;
    let meta_ptr = metadata as *const crate::contracts::meta::BusMetadata as usize;

    #[cfg(feature = "_db_any")]
    let txn_ptr = {
        #[cfg(feature = "_db_sqlx")]
        {
            txn as *mut _ as usize
        }

        #[cfg(feature = "_db_sea_orm")]
        {
            txn as *const _ as usize
        }
    };

    #[cfg(feature = "logging")]
    log::trace!(
        "Dispatching in-memory event (no context): {}",
        event_identity
    );

    let handlers_map = MEMORY_HANDLERS.get().ok_or_else(|| {
        BusError::Configuration(
            "Bus not initialized. Call Bus::init() before dispatching.".to_string(),
        )
    })?;

    let handlers = match handlers_map.get(event_identity) {
        None => return Ok(0),
        Some(h) => h,
    };

    for reg in handlers.iter() {
        #[cfg(feature = "logging")]
        log::trace!(
            "Executing handler: {} for event: {}",
            reg.handler_identity,
            event_identity
        );

        #[cfg(feature = "_db_any")]
        let res = AssertUnwindSafe((reg.execute)(txn_ptr, event_ptr, meta_ptr))
            .catch_unwind()
            .await;
        #[cfg(not(feature = "_db_any"))]
        let res = AssertUnwindSafe((reg.execute)(event_ptr, meta_ptr))
            .catch_unwind()
            .await;

        handle_execution_result(reg.handler_identity, event_identity, res)?;
    }

    Ok(handlers.len())
}

#[cfg(feature = "context")]
pub(crate) async fn dispatch_in_memory<TContext, TEvent>(
    ctx: TContext,
    event: &TEvent,
) -> Result<usize, Box<dyn std::error::Error + Send + Sync>>
where
    TEvent: IBusEvent,
    TContext: crate::contracts::ctx::ToRawContext,
{
    let event_identity = TEvent::EVENT_IDENTITY;
    let event_ptr = event as *const TEvent as usize;

    #[cfg(feature = "logging")]
    log::trace!(
        "Dispatching in-memory event: {} with context (mut: {})",
        event_identity,
        ctx.to_raw().is_mutable
    );

    let handlers_map = MEMORY_HANDLERS.get().ok_or_else(|| {
        BusError::Configuration(
            "Bus not initialized. Call Bus::init() before dispatching.".to_string(),
        )
    })?;

    let handlers = match handlers_map.get(event_identity) {
        None => return Ok(0),
        Some(h) => h,
    };

    for reg in handlers.iter() {
        #[cfg(feature = "logging")]
        log::trace!(
            "Executing handler: {} for event: {}",
            reg.handler_identity,
            event_identity
        );

        let res = AssertUnwindSafe((reg.execute)(ctx.to_raw(), event_ptr))
            .catch_unwind()
            .await;
        handle_execution_result(reg.handler_identity, event_identity, res)?;
    }

    Ok(handlers.len())
}

fn handle_execution_result(
    _handler_id: &str,
    _event_id: &str,
    result: Result<
        Result<(), Box<dyn std::error::Error + Send + Sync>>,
        Box<dyn std::any::Any + Send>,
    >,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    match result {
        Ok(Ok(_)) => Ok(()),
        Ok(Err(e)) => {
            #[cfg(feature = "logging")]
            log::error!(
                "Handler {} failed for event {}: {:?}",
                _handler_id,
                _event_id,
                e
            );
            Err(e)
        }
        Err(panic_payload) => {
            #[cfg(feature = "logging")]
            log::error!("Handler {} PANICKED for event {}", _handler_id, _event_id);

            let msg = if let Some(s) = panic_payload.downcast_ref::<&str>() {
                s.to_string()
            } else if let Some(s) = panic_payload.downcast_ref::<String>() {
                s.clone()
            } else {
                "Unknown panic message".to_string()
            };
            Err(format!("Handler panicked: {}", msg).into())
        }
    }
}