ruststream 0.2.3

Async messaging framework for Rust: broker-agnostic traits, router, codecs, and a conformance harness for broker authors.
Documentation
//! Handler abstraction and the [`HandlerResult`] enum returned to the router.

use std::{future::Future, sync::Arc};

use super::context::Context;

/// What the router should do with the message after the handler returns.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum HandlerResult {
    /// Acknowledge the message; the broker will remove it from the queue.
    Ack,
    /// Negatively acknowledge the message; `requeue = true` asks the broker to redeliver.
    Nack {
        /// Whether the broker should redeliver the message.
        requeue: bool,
    },
}

impl HandlerResult {
    /// Convenience constructor for `Nack { requeue: true }`.
    #[must_use]
    pub const fn retry() -> Self {
        Self::Nack { requeue: true }
    }

    /// Convenience constructor for `Nack { requeue: false }`.
    #[must_use]
    pub const fn drop() -> Self {
        Self::Nack { requeue: false }
    }
}

/// Conversion into a [`HandlerResult`], so `#[subscriber]` handlers can return a plain value
/// instead of always constructing one.
///
/// Implemented for [`HandlerResult`] (identity), `()` (always [`Ack`](HandlerResult::Ack)), and
/// `Result<_, E>` (`Ok` acks, `Err` drops).
pub trait IntoHandlerResult {
    /// Converts `self` into the outcome the dispatcher acts on.
    fn into_handler_result(self) -> HandlerResult;
}

impl IntoHandlerResult for HandlerResult {
    fn into_handler_result(self) -> HandlerResult {
        self
    }
}

impl IntoHandlerResult for () {
    fn into_handler_result(self) -> HandlerResult {
        HandlerResult::Ack
    }
}

impl<E> IntoHandlerResult for Result<(), E> {
    fn into_handler_result(self) -> HandlerResult {
        match self {
            Ok(()) => HandlerResult::Ack,
            Err(_) => HandlerResult::drop(),
        }
    }
}

impl<E> IntoHandlerResult for Result<HandlerResult, E> {
    fn into_handler_result(self) -> HandlerResult {
        self.unwrap_or_else(|_| HandlerResult::drop())
    }
}

/// A handler invoked on each input it is given.
///
/// The same trait serves both pipeline levels: a raw delivery (`Handler<M>` where
/// `M: IncomingMessage`) and a decoded value (`Handler<T>`). Implementations are `Send + Sync` so a
/// single handler can be shared across many concurrent inputs.
///
/// # Examples
///
/// Closures implement `Handler` automatically:
///
/// ```
/// use ruststream::IncomingMessage;
/// use ruststream::runtime::{Context, Handler, HandlerResult};
///
/// fn assert_handler<M, H>(_: H)
/// where
///     M: IncomingMessage,
///     H: Handler<M>,
/// {
/// }
///
/// fn use_closure<M: IncomingMessage + 'static>() {
///     assert_handler::<M, _>(|_msg: &M, _ctx: &mut Context| async { HandlerResult::Ack });
/// }
/// ```
pub trait Handler<M>: Send + Sync {
    /// Handle one input, with the per-delivery [`Context`].
    fn handle(&self, msg: &M, ctx: &mut Context) -> impl Future<Output = HandlerResult> + Send;
}

impl<M, F, Fut> Handler<M> for F
where
    F: Fn(&M, &mut Context) -> Fut + Send + Sync,
    Fut: Future<Output = HandlerResult> + Send,
{
    fn handle(&self, msg: &M, ctx: &mut Context) -> impl Future<Output = HandlerResult> + Send {
        (self)(msg, ctx)
    }
}

impl<M, H> Handler<M> for Arc<H>
where
    H: Handler<M>,
{
    fn handle(&self, msg: &M, ctx: &mut Context) -> impl Future<Output = HandlerResult> + Send {
        (**self).handle(msg, ctx)
    }
}