ruststream 0.4.0

Async messaging framework for Rust: broker-agnostic traits, router, codecs, and a conformance harness for broker authors.
Documentation
//! Batch publishing subscribers: consume a batch, publish the derived replies, then ack.
//!
//! Generated by `#[subscriber(batch(..), publish("reply-name"))]` and mounted with
//! `include_batch_publishing`, which is handed a [`ReplyPublisher`]: a plain
//! [`TypedPublisher`](super::TypedPublisher) publishes each reply independently, while a
//! [`Transactional`](super::Transactional) one (built with
//! [`TypedPublisher::transactional`](super::TypedPublisher::transactional)) makes the whole
//! batch's replies visible atomically - the consume-transform-produce pattern.

use std::{future::Future, sync::Arc};

use serde::{Serialize, de::DeserializeOwned};
use tracing::warn;

use crate::IncomingMessage;
use crate::codec::Codec;

use super::batch::{BatchHandler, decode_batch, settle};
use super::context::Context;
use super::dispatch::Workers;
use super::failure::{FailurePolicies, FailurePolicy};
use super::handler::HandlerResult;
use super::metadata::HandlerMetadata;
use super::publish::{PublishMiddleware, ReplyPublisher};

/// A batch subscriber definition that produces replies to publish.
///
/// The batch counterpart of [`PublishingDef`](super::PublishingDef): the handler consumes the
/// whole decoded batch and returns the replies for it, all-or-nothing. Selective per-element
/// outcomes are deliberately unsupported here - there is no coherent transaction commit for a
/// half-nacked batch; handlers needing both publish manually via
/// [`Context::publisher`](super::Context::publisher) from a plain batch handler.
pub trait BatchPublishingDef: Send + Sync {
    /// The decoded element type; the handler consumes `&[Input]`.
    type Input;

    /// The reply element type; each entry of the returned `Vec` is encoded and published.
    type Reply;

    /// The subscription source this handler binds to (see
    /// [`SubscriberDef::Source`](super::SubscriberDef::Source)).
    type Source;

    /// Builds the subscription source (fresh each call).
    fn source(&self) -> Self::Source;

    /// The name (subject / channel) the replies are published to.
    fn reply_name(&self) -> &str;

    /// The concurrency policy for this subscriber's dispatch loop (how many batches are in
    /// flight at once). The macro fills this in from the `workers(..)` argument; the default is
    /// sequential dispatch.
    fn workers(&self) -> Workers {
        Workers::sequential()
    }

    /// The failure policy for a batch-handler panic and a per-element decode failure. The macro
    /// fills this in from the `on_failure(panic = .., decode = ..)` argument; the default fails
    /// fast on a panic and drops on a decode failure.
    fn failure_policies(&self) -> FailurePolicies {
        FailurePolicies::default()
    }

    /// An optional human description (from the handler's doc comment), for `AsyncAPI`.
    fn description(&self) -> Option<&str> {
        None
    }

    /// The element type's serialized JSON Schema, when it implements [`schemars::JsonSchema`]
    /// and the `asyncapi` feature is on. The macro fills this in; the default omits it.
    fn input_schema(&self) -> Option<String> {
        None
    }

    /// The element type's [`Message`](crate::Message) name, when it implements that trait. The
    /// macro fills this in; the default omits it.
    fn message_name(&self) -> Option<&'static str> {
        None
    }

    /// The element type's [`Message`](crate::Message) description, when it implements that
    /// trait. The macro fills this in; the default omits it.
    fn message_description(&self) -> Option<&'static str> {
        None
    }

    /// Runs the handler body on one decoded batch.
    ///
    /// `Ok(replies)` publishes every reply to [`reply_name`](Self::reply_name) and acks the
    /// batch; `Err(result)` publishes nothing and settles the whole batch with `result`.
    fn call(
        &self,
        batch: &[Self::Input],
        ctx: &mut Context<'_>,
    ) -> impl Future<Output = Result<Vec<Self::Reply>, HandlerResult>> + Send;
}

/// Builds the registration metadata for a batch publishing definition mounted under `name`.
pub(crate) fn batch_publishing_metadata<D: BatchPublishingDef>(
    name: String,
    def: &D,
) -> HandlerMetadata {
    HandlerMetadata::typed::<D::Input>(name)
        .with_output_type(std::any::type_name::<D::Reply>())
        .with_def_details(
            def.description(),
            def.input_schema(),
            def.message_name(),
            def.message_description(),
        )
}

/// The `BatchHandler` built from a [`BatchPublishingDef`]: decode the batch, run the handler,
/// publish the replies through the [`ReplyPublisher`], then ack the batch.
///
/// Elements that fail to decode are nacked individually and never reach the handler. A failed
/// reply publish (or a failed transaction) retries the whole batch instead of losing replies;
/// with a plain publisher a mid-batch failure can therefore re-publish the earlier replies on
/// redelivery (at-least-once), while a [`Transactional`](super::Transactional) publisher never
/// leaves them half-visible.
pub struct BatchPublishingHandler<D, C, R> {
    pub(crate) def: D,
    pub(crate) codec: C,
    pub(crate) publisher: R,
    pub(crate) pipeline: Arc<[Arc<dyn PublishMiddleware>]>,
    pub(crate) decode: FailurePolicy,
}

impl<D, C, R> std::fmt::Debug for BatchPublishingHandler<D, C, R> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("BatchPublishingHandler")
            .finish_non_exhaustive()
    }
}

impl<M, D, C, R> BatchHandler<M> for BatchPublishingHandler<D, C, R>
where
    M: IncomingMessage,
    D: BatchPublishingDef,
    D::Input: DeserializeOwned + Send + Sync,
    D::Reply: Serialize + Send + Sync,
    C: Codec,
    R: ReplyPublisher,
{
    async fn handle_batch(&self, batch: Vec<M>, ctx: &mut Context<'_>) {
        let subscription = ctx.name().to_owned();
        let (values, accepted) =
            decode_batch::<M, D::Input, C>(batch, &self.codec, self.decode, ctx).await;
        if accepted.is_empty() {
            return;
        }
        let outcome = match self.def.call(&values, ctx).await {
            Ok(replies) => {
                let name = self.def.reply_name();
                match self
                    .publisher
                    .publish_batch(name, &replies, &self.pipeline, ctx.extensions())
                    .await
                {
                    Ok(()) => HandlerResult::Ack,
                    Err(err) => {
                        warn!(
                            target: "ruststream::dispatch",
                            subscription = %subscription,
                            reply = %name,
                            reply_type = std::any::type_name::<D::Reply>(),
                            error = %err,
                            "batch reply publish failed",
                        );
                        HandlerResult::retry()
                    }
                }
            }
            Err(result) => result,
        };
        for msg in accepted {
            settle(msg, outcome, &subscription).await;
        }
    }
}

#[cfg(all(test, feature = "memory", feature = "json"))]
mod tests {
    use futures::StreamExt;

    use super::super::context::State;
    use super::super::dispatch::Delivery;
    use super::super::publish::TypedPublisher;
    use super::*;
    use crate::codec::JsonCodec;
    use crate::memory::{MemoryBroker, MemoryMessage, MemorySubscriber};
    use crate::{BatchSubscriber, Headers, OutgoingMessage, Publisher, Subscriber};

    struct Confirm {
        reply_to: &'static str,
        fail_with: Option<HandlerResult>,
    }

    impl BatchPublishingDef for Confirm {
        type Input = u32;
        type Reply = u32;
        type Source = crate::Name;

        fn source(&self) -> Self::Source {
            crate::Name::new("orders")
        }

        fn reply_name(&self) -> &str {
            self.reply_to
        }

        async fn call(
            &self,
            batch: &[u32],
            _ctx: &mut Context<'_>,
        ) -> Result<Vec<u32>, HandlerResult> {
            if let Some(result) = self.fail_with {
                return Err(result);
            }
            Ok(batch.iter().map(|n| n * 10).collect())
        }
    }

    async fn publish_numbers(broker: &MemoryBroker, name: &str, numbers: &[u32]) {
        let publisher = broker.publisher();
        for n in numbers {
            publisher
                .publish(OutgoingMessage::new(name, &serde_json::to_vec(n).unwrap()))
                .await
                .unwrap();
        }
    }

    async fn pull_batch(sub: &mut MemorySubscriber) -> Vec<MemoryMessage> {
        let mut stream = std::pin::pin!(sub.batches());
        stream.next().await.unwrap().unwrap()
    }

    #[tokio::test]
    async fn transactional_replies_publish_atomically_then_ack() {
        let broker = MemoryBroker::new();
        let mut input = broker.subscribe("orders");
        let mut replies = broker.subscribe("confirmations");

        let handler = BatchPublishingHandler {
            def: Confirm {
                reply_to: "confirmations",
                fail_with: None,
            },
            codec: JsonCodec,
            publisher: TypedPublisher::with_codec(broker.publisher(), JsonCodec).transactional(),
            pipeline: Arc::from([]),
            decode: FailurePolicy::Drop,
        };

        publish_numbers(&broker, "orders", &[1, 2]).await;
        let state = State::default();
        let delivery = Delivery::empty();
        let headers = Headers::new();
        let mut ctx = Context::new("orders", &headers, &state, &delivery);
        let batch = pull_batch(&mut input).await;
        handler.handle_batch(batch, &mut ctx).await;

        // Both replies are visible after the commit, in order.
        let confirmed = pull_batch(&mut replies).await;
        let payloads: Vec<&[u8]> = confirmed.iter().map(IncomingMessage::payload).collect();
        assert_eq!(payloads, [b"10", b"20"]);
        for msg in confirmed {
            msg.ack().await.unwrap();
        }

        // The acked input batch is not redelivered.
        let mut stream = std::pin::pin!(input.stream());
        assert!(futures::poll!(stream.next()).is_pending());
    }

    #[tokio::test]
    async fn handler_error_publishes_nothing_and_settles_the_batch() {
        let broker = MemoryBroker::new();
        let mut input = broker.subscribe("orders");
        let mut replies = broker.subscribe("confirmations");

        let handler = BatchPublishingHandler {
            def: Confirm {
                reply_to: "confirmations",
                fail_with: Some(HandlerResult::retry()),
            },
            codec: JsonCodec,
            publisher: TypedPublisher::with_codec(broker.publisher(), JsonCodec).transactional(),
            pipeline: Arc::from([]),
            decode: FailurePolicy::Drop,
        };

        publish_numbers(&broker, "orders", &[1, 2]).await;
        let state = State::default();
        let delivery = Delivery::empty();
        let headers = Headers::new();
        let mut ctx = Context::new("orders", &headers, &state, &delivery);
        let batch = pull_batch(&mut input).await;
        handler.handle_batch(batch, &mut ctx).await;

        // Nothing was published, and the whole input batch is back for redelivery.
        let mut reply_stream = std::pin::pin!(replies.stream());
        assert!(futures::poll!(reply_stream.next()).is_pending());
        let redelivered = pull_batch(&mut input).await;
        assert_eq!(redelivered.len(), 2);
        for msg in redelivered {
            msg.ack().await.unwrap();
        }
    }
}