tokio-events 0.2.3

A modern, type-safe async event bus for Rust applications
Documentation
//! Event handler traits and implementations.

use crate::{Error, Event, EventEnvelope, Result};
use async_trait::async_trait;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;

/// Type alias for a boxed future that handlers return
pub type HandlerFuture = Pin<Box<dyn Future<Output = Result<()>> + Send>>;

/// Trait for event handlers that can process events asynchronously.
#[async_trait]
pub trait EventHandler: Send + Sync + 'static {
    /// Process an event envelope
    async fn handle(&self, envelope: &EventEnvelope) -> Result<()>;

    /// Get the handler name for debugging
    fn name(&self) -> &str {
        "unnamed"
    }

    /// Called when the handler is being shut down
    async fn on_shutdown(&self) -> Result<()> {
        Ok(())
    }
}

/// A typed event handler that processes specific event types.
pub trait TypedHandler<T: Event>: Send + Sync + 'static {
    /// Handle a specific event type
    fn handle_typed(&self, event: &T) -> HandlerFuture;

    /// Get the handler name
    fn name(&self) -> &str {
        std::any::type_name::<Self>()
    }
}

/// Adapter that converts a TypedHandler into an EventHandler
#[derive(Debug)]
pub struct TypedHandlerAdapter<T: Event, H: TypedHandler<T>> {
    handler: Arc<H>,
    _phantom: std::marker::PhantomData<T>,
}

impl<T: Event, H: TypedHandler<T>> TypedHandlerAdapter<T, H> {
    /// Create a new typed handler adapter
    pub fn new(handler: H) -> Self {
        Self {
            handler: Arc::new(handler),
            _phantom: std::marker::PhantomData,
        }
    }
}

#[async_trait]
impl<T: Event, H: TypedHandler<T>> EventHandler for TypedHandlerAdapter<T, H> {
    async fn handle(&self, envelope: &EventEnvelope) -> Result<()> {
        match envelope.get_event::<T>() {
            Ok(event) => self.handler.handle_typed(&event).await,
            Err(_) => Err(Error::EventNotRegistered {
                type_name: envelope.event_type().to_string(),
            }),
        }
    }

    fn name(&self) -> &str {
        self.handler.name()
    }
}

/// A function-based event handler using closures.
#[derive(Debug)]
pub struct FunctionHandler<T, F, Fut>
where
    T: Event,
    F: Fn(T) -> Fut + Send + Sync + 'static,
    Fut: Future<Output = ()> + Send + 'static,
{
    function: F,
    name: String,
    _phantom: std::marker::PhantomData<T>,
}

impl<T, F, Fut> FunctionHandler<T, F, Fut>
where
    T: Event,
    F: Fn(T) -> Fut + Send + Sync + 'static,
    Fut: Future<Output = ()> + Send + 'static,
{
    /// Create a new function handler
    pub fn new(function: F) -> Self {
        Self {
            function,
            name: format!("FunctionHandler<{}>", T::event_type()),
            _phantom: std::marker::PhantomData,
        }
    }

    /// Create a new function handler with a custom name
    pub fn with_name(function: F, name: impl Into<String>) -> Self {
        Self {
            function,
            name: name.into(),
            _phantom: std::marker::PhantomData,
        }
    }
}

#[async_trait]
impl<T, F, Fut> EventHandler for FunctionHandler<T, F, Fut>
where
    T: Event,
    F: Fn(T) -> Fut + Send + Sync + 'static,
    Fut: Future<Output = ()> + Send + 'static,
{
    async fn handle(&self, envelope: &EventEnvelope) -> Result<()> {
        match envelope.get_event::<T>() {
            Ok(event) => {
                (self.function)(event).await;
                Ok(())
            }
            Err(_) => Err(Error::EventNotRegistered {
                type_name: envelope.event_type().to_string(),
            }),
        }
    }

    fn name(&self) -> &str {
        &self.name
    }
}

/// A handler that wraps an asynchronous function that can fail
#[allow(missing_debug_implementations)]
pub struct FallibleFunctionHandler<T, F, Fut>
where
    T: Event,
    F: Fn(T) -> Fut + Send + Sync + 'static,
    Fut: Future<Output = Result<()>> + Send + 'static,
{
    function: F,
    name: String,
    _phantom: std::marker::PhantomData<T>,
}

impl<T, F, Fut> FallibleFunctionHandler<T, F, Fut>
where
    T: Event,
    F: Fn(T) -> Fut + Send + Sync + 'static,
    Fut: Future<Output = Result<()>> + Send + 'static,
{
    /// Create a new fallible function handler
    pub fn new(function: F) -> Self {
        Self {
            function,
            name: std::any::type_name::<F>().to_string(),
            _phantom: std::marker::PhantomData,
        }
    }

    /// Create a new fallible function handler with a custom name
    pub fn with_name(function: F, name: impl Into<String>) -> Self {
        Self {
            function,
            name: name.into(),
            _phantom: std::marker::PhantomData,
        }
    }
}

#[async_trait]
impl<T, F, Fut> EventHandler for FallibleFunctionHandler<T, F, Fut>
where
    T: Event,
    F: Fn(T) -> Fut + Send + Sync + 'static,
    Fut: Future<Output = Result<()>> + Send + 'static,
{
    async fn handle(&self, envelope: &EventEnvelope) -> Result<()> {
        match envelope.get_event::<T>() {
            Ok(event) => {
                (self.function)(event).await
            }
            Err(_) => Err(Error::EventNotRegistered {
                type_name: envelope.event_type().to_string(),
            }),
        }
    }

    fn name(&self) -> &str {
        &self.name
    }
}

/// A handler that can filter events before processing
#[allow(missing_debug_implementations)]
pub struct FilteredHandler<H: EventHandler> {
    inner: H,
    filter: Box<dyn Fn(&EventEnvelope) -> bool + Send + Sync>,
    _filter_name: String,
}

impl<H: EventHandler> FilteredHandler<H> {
    /// Create a new filtered handler
    pub fn new<F>(inner: H, filter: F, filter_name: impl Into<String>) -> Self
    where
        F: Fn(&EventEnvelope) -> bool + Send + Sync + 'static,
    {
        Self {
            inner,
            filter: Box::new(filter),
            _filter_name: filter_name.into(),
        }
    }
}

#[async_trait]
impl<H: EventHandler> EventHandler for FilteredHandler<H> {
    async fn handle(&self, envelope: &EventEnvelope) -> Result<()> {
        if (self.filter)(envelope) {
            self.inner.handle(envelope).await
        } else {
            Ok(())
        }
    }

    fn name(&self) -> &str {
        self.inner.name()
    }
}

/// A handler that wraps errors and continues processing
#[allow(missing_debug_implementations)]
pub struct ErrorWrappingHandler<H: EventHandler> {
    inner: H,
    error_handler: Box<dyn Fn(Error) + Send + Sync>,
}

impl<H: EventHandler> ErrorWrappingHandler<H> {
    /// Create a new error wrapping handler
    pub fn new<E>(inner: H, error_handler: E) -> Self
    where
        E: Fn(Error) + Send + Sync + 'static,
    {
        Self {
            inner,
            error_handler: Box::new(error_handler),
        }
    }
}

#[async_trait]
impl<H: EventHandler> EventHandler for ErrorWrappingHandler<H> {
    async fn handle(&self, envelope: &EventEnvelope) -> Result<()> {
        match self.inner.handle(envelope).await {
            Ok(()) => Ok(()),
            Err(e) => {
                (self.error_handler)(e);
                Ok(())
            }
        }
    }

    fn name(&self) -> &str {
        self.inner.name()
    }
}

/// Handler statistics for monitoring
#[derive(Clone, Default, Debug)]
pub struct HandlerStats {
    /// Number of events successfully processed.
    pub events_processed: u64,
    /// Number of events that failed to process.
    pub events_failed: u64,
    /// Total processing time in milliseconds.
    pub total_processing_time_ms: u64,
    /// The last error encountered, if any.
    pub last_error: Option<String>,
}

impl fmt::Display for HandlerStats {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
            f,
            "Processed: {}, Failed: {}, Avg time: {}ms",
            self.events_processed,
            self.events_failed,
            self.total_processing_time_ms
                .checked_div(self.events_processed)
                .unwrap_or(0)
        )
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
    struct TestEvent {
        value: i32,
    }

    impl Event for TestEvent {
        fn event_type() -> &'static str {
            "TestEvent"
        }
    }

    #[tokio::test]
    async fn test_function_handler() {
        let handler = FunctionHandler::new(|event: TestEvent| async move {
            assert_eq!(event.value, 42);
        });

        let event = TestEvent { value: 42 };
        let envelope = EventEnvelope::new(event);

        handler.handle(&envelope).await.unwrap();
    }

    #[tokio::test]
    async fn test_filtered_handler() {
        let base_handler = FunctionHandler::new(|_: TestEvent| async move {
            // This should only be called for events with value > 10
        });

        let filtered = FilteredHandler::new(
            base_handler,
            |envelope: &EventEnvelope| {
                envelope
                    .get_event::<TestEvent>()
                    .map(|e| e.value > 10)
                    .unwrap_or(false)
            },
            "value > 10",
        );

        // This should be processed
        let event1 = TestEvent { value: 20 };
        let envelope1 = EventEnvelope::new(event1);
        assert!(filtered.handle(&envelope1).await.is_ok());

        // This should be filtered out
        let event2 = TestEvent { value: 5 };
        let envelope2 = EventEnvelope::new(event2);
        assert!(filtered.handle(&envelope2).await.is_ok());
    }
}