ruststream 0.2.3

Async messaging framework for Rust: broker-agnostic traits, router, codecs, and a conformance harness for broker authors.
Documentation
//! Publishing subscribers: a handler whose return value is encoded and published.
//!
//! Generated by `#[subscriber("name", publish("reply-name"))]` and mounted with
//! [`BrokerScope::include_publishing`](super::BrokerScope::include_publishing), which is handed a
//! [`TypedPublisher`] (the broker connection + reply codec). The destination name comes from the
//! macro; the publisher and codec come from wiring.

use std::{future::Future, sync::Arc};

use serde::{Serialize, de::DeserializeOwned};
use tracing::warn;

use crate::codec::Codec;
use crate::{IncomingMessage, Publisher};

use super::context::Context;
use super::handler::{Handler, HandlerResult};
use super::publish::{PublishLayer, PublishMiddleware, TypedPublisher};

/// A subscriber definition that produces a reply to publish.
///
/// The generated type carries the subscribe name, the reply name, and the reply type. *How* the
/// reply is encoded (codec) and *through which* connection it is sent come from the
/// [`TypedPublisher`] passed at wiring time.
pub trait PublishingDef: Send + Sync {
    /// The decoded message type the handler consumes.
    type Input;

    /// The reply type the handler produces, encoded and published.
    type Reply;

    /// The subscription source this handler binds to (see
    /// [`SubscriberDef::Source`](super::SubscriberDef::Source)).
    type Source;

    /// Builds the subscription source (fresh each call).
    fn source(&self) -> Self::Source;

    /// The name (subject / channel) the reply is published to.
    fn reply_name(&self) -> &str;

    /// An optional human description (from the handler's doc comment), for `AsyncAPI`.
    fn description(&self) -> Option<&str> {
        None
    }

    /// The input type's serialized JSON Schema, when it implements [`schemars::JsonSchema`] and the
    /// `asyncapi` feature is on. The macro fills this in; the default omits it.
    fn input_schema(&self) -> Option<String> {
        None
    }

    /// Runs the handler body, producing the reply.
    fn call(&self, input: &Self::Input) -> impl Future<Output = Self::Reply> + Send;
}

/// The [`Handler`] built from a [`PublishingDef`]: decode, run, encode the reply, publish, ack.
///
/// `C` decodes the incoming message; the reply is encoded by the [`TypedPublisher`] (with its static
/// [`PublishLayer`] stack `PL`) and sent to the definition's
/// [`reply_name`](PublishingDef::reply_name).
pub struct PublishingHandler<D, C, P, PC, PL> {
    pub(crate) def: D,
    pub(crate) codec: C,
    pub(crate) publisher: TypedPublisher<P, PC, PL>,
    pub(crate) pipeline: Arc<[Arc<dyn PublishMiddleware>]>,
}

impl<D, C, P, PC, PL> std::fmt::Debug for PublishingHandler<D, C, P, PC, PL> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("PublishingHandler")
            .field("publisher", &self.publisher)
            .finish_non_exhaustive()
    }
}

impl<M, D, C, P, PC, PL> Handler<M> for PublishingHandler<D, C, P, PC, PL>
where
    M: IncomingMessage,
    D: PublishingDef,
    D::Input: DeserializeOwned + Send + Sync,
    D::Reply: Serialize + Send + Sync,
    C: Codec,
    P: Publisher,
    PC: Codec,
    PL: PublishLayer,
{
    async fn handle(&self, msg: &M, _ctx: &mut Context<'_>) -> HandlerResult {
        let input = match self.codec.decode::<D::Input>(msg.payload()) {
            Ok(value) => value,
            Err(err) => {
                warn!(target: "ruststream::dispatch", error = %err, "codec decode failed");
                return HandlerResult::drop();
            }
        };
        let reply = self.def.call(&input).await;
        let name = self.def.reply_name();
        if let Err(err) = self.publisher.publish(name, &reply, &self.pipeline).await {
            warn!(target: "ruststream::dispatch", error = %err, "reply publish failed");
        }
        HandlerResult::Ack
    }
}