ruststream 0.4.0

Async messaging framework for Rust: broker-agnostic traits, router, codecs, and a conformance harness for broker authors.
Documentation
//! Publishing subscribers: a handler whose return value is encoded and published.
//!
//! Generated by `#[subscriber("name", publish("reply-name"))]` and mounted with
//! [`BrokerScope::include_publishing`](super::BrokerScope::include_publishing), which is handed a
//! [`TypedPublisher`] (the broker connection + reply codec). The destination name comes from the
//! macro; the publisher and codec come from wiring.

use std::{future::Future, sync::Arc};

use serde::{Serialize, de::DeserializeOwned};
use tracing::warn;

use crate::codec::Codec;
use crate::{IncomingMessage, Publisher};

use super::context::Context;
use super::dispatch::Workers;
use super::failure::{FailurePolicies, FailurePolicy};
use super::handler::{Handler, HandlerResult, Settle};
use super::metadata::HandlerMetadata;
use super::publish::{PublishLayer, PublishMiddleware, TypedPublisher};

/// A subscriber definition that produces a reply to publish.
///
/// The generated type carries the subscribe name, the reply name, and the reply type. *How* the
/// reply is encoded (codec) and *through which* connection it is sent come from the
/// [`TypedPublisher`] passed at wiring time.
pub trait PublishingDef: Send + Sync {
    /// The decoded message type the handler consumes.
    type Input;

    /// The reply type the handler produces, encoded and published.
    type Reply;

    /// The subscription source this handler binds to (see
    /// [`SubscriberDef::Source`](super::SubscriberDef::Source)).
    type Source;

    /// Builds the subscription source (fresh each call).
    fn source(&self) -> Self::Source;

    /// The name (subject / channel) the reply is published to.
    fn reply_name(&self) -> &str;

    /// The concurrency policy for this subscriber's dispatch loop. The macro fills this in from
    /// the `workers(..)` argument; the default is sequential dispatch.
    fn workers(&self) -> Workers {
        Workers::sequential()
    }

    /// The failure policy for a handler panic and a decode failure. The macro fills this in from
    /// the `on_failure(panic = .., decode = ..)` argument; the default fails fast on a panic and
    /// drops on a decode failure.
    fn failure_policies(&self) -> FailurePolicies {
        FailurePolicies::default()
    }

    /// An optional human description (from the handler's doc comment), for `AsyncAPI`.
    fn description(&self) -> Option<&str> {
        None
    }

    /// The input type's serialized JSON Schema, when it implements [`schemars::JsonSchema`] and the
    /// `asyncapi` feature is on. The macro fills this in; the default omits it.
    fn input_schema(&self) -> Option<String> {
        None
    }

    /// The input type's [`Message`](crate::Message) name, when it implements that trait. The macro
    /// fills this in; the default omits it.
    fn message_name(&self) -> Option<&'static str> {
        None
    }

    /// The input type's [`Message`](crate::Message) description, when it implements that trait.
    /// The macro fills this in; the default omits it.
    fn message_description(&self) -> Option<&'static str> {
        None
    }

    /// Runs the handler body.
    ///
    /// `Ok(reply)` is encoded and published to [`reply_name`](Self::reply_name), then the incoming
    /// message is acked. `Err(result)` skips publishing and the dispatcher acts on the returned
    /// [`HandlerResult`] (for example [`HandlerResult::retry`] to ask for redelivery).
    fn call(
        &self,
        input: &Self::Input,
        ctx: &mut Context<'_>,
    ) -> impl Future<Output = Result<Self::Reply, HandlerResult>> + Send;
}

/// Builds the registration metadata for a publishing definition mounted under `name`.
pub(crate) fn publishing_metadata<D: PublishingDef>(name: String, def: &D) -> HandlerMetadata {
    HandlerMetadata::typed::<D::Input>(name)
        .with_output_type(std::any::type_name::<D::Reply>())
        .with_def_details(
            def.description(),
            def.input_schema(),
            def.message_name(),
            def.message_description(),
        )
}

/// The [`Handler`] built from a [`PublishingDef`]: decode, run, encode the reply, publish, ack.
///
/// `C` decodes the incoming message; the reply is encoded by the [`TypedPublisher`] (with its static
/// [`PublishLayer`] stack `PL`) and sent to the definition's
/// [`reply_name`](PublishingDef::reply_name). A handler returning `Err(result)` skips the publish;
/// a failed reply publish nacks the incoming message with `requeue = true`, so the broker
/// redelivers it instead of silently losing the reply.
pub struct PublishingHandler<D, C, P, PC, PL> {
    pub(crate) def: D,
    pub(crate) codec: C,
    pub(crate) publisher: TypedPublisher<P, PC, PL>,
    pub(crate) pipeline: Arc<[Arc<dyn PublishMiddleware>]>,
    pub(crate) decode: FailurePolicy,
}

impl<D, C, P, PC, PL> std::fmt::Debug for PublishingHandler<D, C, P, PC, PL> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("PublishingHandler")
            .field("publisher", &self.publisher)
            .finish_non_exhaustive()
    }
}

impl<M, D, C, P, PC, PL> Handler<M> for PublishingHandler<D, C, P, PC, PL>
where
    M: IncomingMessage,
    D: PublishingDef,
    D::Input: DeserializeOwned + Send + Sync,
    D::Reply: Serialize + Send + Sync,
    C: Codec,
    P: Publisher,
    PC: Codec,
    PL: PublishLayer,
{
    async fn handle(&self, msg: &M, ctx: &mut Context<'_>) -> Settle {
        // The publishing path settles by a bare outcome (no per-element continuation): decode,
        // run, publish the reply, then ack. It converts to `Settle` with no `and_after`.
        let input = match self.codec.decode::<D::Input>(msg.payload()) {
            Ok(value) => value,
            Err(err) => {
                warn!(
                    target: "ruststream::dispatch",
                    subscription = %ctx.name(),
                    message_type = std::any::type_name::<D::Input>(),
                    error = %err,
                    "codec decode failed",
                );
                return match self.decode {
                    FailurePolicy::FailFast => {
                        ctx.fail_fast(&format!("decode failed: {err}"));
                        HandlerResult::drop().into()
                    }
                    other => other
                        .settlement()
                        .unwrap_or_else(HandlerResult::drop)
                        .into(),
                };
            }
        };
        let reply = match self.def.call(&input, ctx).await {
            Ok(reply) => reply,
            Err(result) => return result.into(),
        };
        let name = self.def.reply_name();
        let publish = self
            .publisher
            .publish(name, &reply, &self.pipeline, ctx.extensions());
        if let Err(err) = publish.await {
            warn!(
                target: "ruststream::dispatch",
                subscription = %ctx.name(),
                reply = %name,
                reply_type = std::any::type_name::<D::Reply>(),
                error = %err,
                "reply publish failed",
            );
            return HandlerResult::retry().into();
        }
        HandlerResult::Ack.into()
    }
}

#[cfg(test)]
mod tests {
    use super::{PublishingDef, publishing_metadata};
    use crate::Headers;
    use crate::Name;
    use crate::runtime::context::{Context, State};
    use crate::runtime::dispatch::{Delivery, Workers};
    use crate::runtime::handler::HandlerResult;

    /// A hand-written publishing def overriding nothing optional, pinning the trait defaults that
    /// the macro always fills in.
    struct ManualPub;

    impl PublishingDef for ManualPub {
        type Input = u32;
        type Reply = u32;
        type Source = Name;

        fn source(&self) -> Name {
            Name::new("in")
        }

        // The trait signature returns `&str` (tied to `&self`); the macro-generated impls do the
        // same, so this hand-written one cannot narrow to `&'static str` without diverging.
        #[allow(clippy::unnecessary_literal_bound)]
        fn reply_name(&self) -> &str {
            "out"
        }

        async fn call(&self, input: &u32, _ctx: &mut Context<'_>) -> Result<u32, HandlerResult> {
            Ok(*input)
        }
    }

    #[tokio::test]
    async fn defaults_metadata_and_call() {
        let def = ManualPub;
        assert_eq!(def.workers(), Workers::sequential());
        assert!(def.description().is_none());
        assert!(def.input_schema().is_none());
        assert!(def.message_name().is_none());
        assert!(def.message_description().is_none());
        assert_eq!(def.reply_name(), "out");
        let _source = def.source();

        let meta = publishing_metadata("in".to_owned(), &def);
        assert_eq!(meta.name, "in");

        let state = State::default();
        let delivery = Delivery::empty();
        let headers = Headers::new();
        let mut ctx = Context::new("in", &headers, &state, &delivery);
        assert_eq!(def.call(&5, &mut ctx).await.unwrap(), 5);
    }
}