ruststream 0.3.0

Async messaging framework for Rust: broker-agnostic traits, router, codecs, and a conformance harness for broker authors.
Documentation
//! Publishing subscribers: a handler whose return value is encoded and published.
//!
//! Generated by `#[subscriber("name", publish("reply-name"))]` and mounted with
//! [`BrokerScope::include_publishing`](super::BrokerScope::include_publishing), which is handed a
//! [`TypedPublisher`] (the broker connection + reply codec). The destination name comes from the
//! macro; the publisher and codec come from wiring.

use std::{future::Future, sync::Arc};

use serde::{Serialize, de::DeserializeOwned};
use tracing::warn;

use crate::codec::Codec;
use crate::{IncomingMessage, Publisher};

use super::context::Context;
use super::handler::{Handler, HandlerResult};
use super::metadata::HandlerMetadata;
use super::publish::{PublishLayer, PublishMiddleware, TypedPublisher};

/// A subscriber definition that produces a reply to publish.
///
/// The generated type carries the subscribe name, the reply name, and the reply type. *How* the
/// reply is encoded (codec) and *through which* connection it is sent come from the
/// [`TypedPublisher`] passed at wiring time.
pub trait PublishingDef: Send + Sync {
    /// The decoded message type the handler consumes.
    type Input;

    /// The reply type the handler produces, encoded and published.
    type Reply;

    /// The subscription source this handler binds to (see
    /// [`SubscriberDef::Source`](super::SubscriberDef::Source)).
    type Source;

    /// Builds the subscription source (fresh each call).
    fn source(&self) -> Self::Source;

    /// The name (subject / channel) the reply is published to.
    fn reply_name(&self) -> &str;

    /// An optional human description (from the handler's doc comment), for `AsyncAPI`.
    fn description(&self) -> Option<&str> {
        None
    }

    /// The input type's serialized JSON Schema, when it implements [`schemars::JsonSchema`] and the
    /// `asyncapi` feature is on. The macro fills this in; the default omits it.
    fn input_schema(&self) -> Option<String> {
        None
    }

    /// The input type's [`Message`](crate::Message) name, when it implements that trait. The macro
    /// fills this in; the default omits it.
    fn message_name(&self) -> Option<&'static str> {
        None
    }

    /// The input type's [`Message`](crate::Message) description, when it implements that trait.
    /// The macro fills this in; the default omits it.
    fn message_description(&self) -> Option<&'static str> {
        None
    }

    /// Runs the handler body.
    ///
    /// `Ok(reply)` is encoded and published to [`reply_name`](Self::reply_name), then the incoming
    /// message is acked. `Err(result)` skips publishing and the dispatcher acts on the returned
    /// [`HandlerResult`] (for example [`HandlerResult::retry`] to ask for redelivery).
    fn call(
        &self,
        input: &Self::Input,
        ctx: &mut Context<'_>,
    ) -> impl Future<Output = Result<Self::Reply, HandlerResult>> + Send;
}

/// Builds the registration metadata for a publishing definition mounted under `name`.
pub(crate) fn publishing_metadata<D: PublishingDef>(name: String, def: &D) -> HandlerMetadata {
    let mut meta = HandlerMetadata::typed::<D::Input>(name)
        .with_output_type(std::any::type_name::<D::Reply>());
    if let Some(description) = def.description() {
        meta = meta.with_description(description.to_owned());
    }
    if let Some(schema) = def.input_schema() {
        meta = meta.with_payload_schema(schema);
    }
    if let Some(message_name) = def.message_name() {
        meta = meta.with_message_name(message_name);
    }
    if let Some(message_description) = def.message_description() {
        meta = meta.with_message_description(message_description);
    }
    meta
}

/// The [`Handler`] built from a [`PublishingDef`]: decode, run, encode the reply, publish, ack.
///
/// `C` decodes the incoming message; the reply is encoded by the [`TypedPublisher`] (with its static
/// [`PublishLayer`] stack `PL`) and sent to the definition's
/// [`reply_name`](PublishingDef::reply_name). A handler returning `Err(result)` skips the publish;
/// a failed reply publish nacks the incoming message with `requeue = true`, so the broker
/// redelivers it instead of silently losing the reply.
pub struct PublishingHandler<D, C, P, PC, PL> {
    pub(crate) def: D,
    pub(crate) codec: C,
    pub(crate) publisher: TypedPublisher<P, PC, PL>,
    pub(crate) pipeline: Arc<[Arc<dyn PublishMiddleware>]>,
}

impl<D, C, P, PC, PL> std::fmt::Debug for PublishingHandler<D, C, P, PC, PL> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("PublishingHandler")
            .field("publisher", &self.publisher)
            .finish_non_exhaustive()
    }
}

impl<M, D, C, P, PC, PL> Handler<M> for PublishingHandler<D, C, P, PC, PL>
where
    M: IncomingMessage,
    D: PublishingDef,
    D::Input: DeserializeOwned + Send + Sync,
    D::Reply: Serialize + Send + Sync,
    C: Codec,
    P: Publisher,
    PC: Codec,
    PL: PublishLayer,
{
    async fn handle(&self, msg: &M, ctx: &mut Context<'_>) -> HandlerResult {
        let input = match self.codec.decode::<D::Input>(msg.payload()) {
            Ok(value) => value,
            Err(err) => {
                warn!(target: "ruststream::dispatch", error = %err, "codec decode failed");
                return HandlerResult::drop();
            }
        };
        let reply = match self.def.call(&input, ctx).await {
            Ok(reply) => reply,
            Err(result) => return result,
        };
        let name = self.def.reply_name();
        if let Err(err) = self.publisher.publish(name, &reply, &self.pipeline).await {
            warn!(target: "ruststream::dispatch", error = %err, "reply publish failed");
            return HandlerResult::retry();
        }
        HandlerResult::Ack
    }
}