shove 0.7.0

Async tasks via pubsub on steroids. Comes with built-in support for complex queue configurations, audit logs, autoscaling consumer groups and more.
Documentation
use std::sync::Arc;

use crate::metadata::{DeadMessageMetadata, MessageMetadata};
use crate::outcome::Outcome;
use crate::topic::Topic;

/// Handler for processing messages from a topic's queues.
///
/// Parameterized on the `Topic`, not the message type directly.
/// This ensures the handler is bound to a specific topic and prevents
/// accidentally reusing a handler across topics that share a message type.
pub trait MessageHandler<T: Topic>: Send + Sync + 'static {
    /// Process a message from the main queue.
    fn handle(
        &self,
        message: T::Message,
        metadata: MessageMetadata,
    ) -> impl Future<Output = Outcome> + Send;

    /// Process a message from the dead-letter queue.
    ///
    /// The message is always acked (removed from DLQ) after this returns.
    /// Override for logging, alerting, or investigation.
    fn handle_dead(
        &self,
        _message: T::Message,
        metadata: DeadMessageMetadata,
    ) -> impl Future<Output = ()> + Send {
        async move {
            tracing::warn!(
                delivery_id = %metadata.message.delivery_id,
                reason = metadata.reason.as_deref().unwrap_or("unknown"),
                original_queue = metadata.original_queue.as_deref().unwrap_or("unknown"),
                death_count = metadata.death_count,
                "Dead-letter message received, no handler implemented"
            );
        }
    }
}

// Blanket impl: Arc<H> delegates to H. This allows sharing handlers across tasks.
impl<T: Topic, H: MessageHandler<T>> MessageHandler<T> for Arc<H> {
    fn handle(
        &self,
        message: T::Message,
        metadata: MessageMetadata,
    ) -> impl Future<Output = Outcome> + Send {
        (**self).handle(message, metadata)
    }

    fn handle_dead(
        &self,
        message: T::Message,
        metadata: DeadMessageMetadata,
    ) -> impl Future<Output = ()> + Send {
        (**self).handle_dead(message, metadata)
    }
}

#[cfg(test)]
mod tests {
    use std::collections::HashMap;
    use std::sync::Arc;

    use super::*;
    use crate::metadata::{DeadMessageMetadata, MessageMetadata};
    use crate::outcome::Outcome;
    use crate::topology::{QueueTopology, TopologyBuilder};

    // -- test Topic --

    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
    struct TestMessage {
        value: u32,
    }

    struct TestTopic;
    impl crate::topic::Topic for TestTopic {
        type Message = TestMessage;
        fn topology() -> &'static QueueTopology {
            static TOPOLOGY: std::sync::OnceLock<QueueTopology> = std::sync::OnceLock::new();
            TOPOLOGY.get_or_init(|| TopologyBuilder::new("handler-test").build())
        }
    }

    // -- test handlers --

    struct FixedOutcomeHandler(Outcome);
    impl MessageHandler<TestTopic> for FixedOutcomeHandler {
        async fn handle(&self, _msg: TestMessage, _meta: MessageMetadata) -> Outcome {
            self.0.clone()
        }
    }

    // -- helpers --

    fn test_metadata() -> MessageMetadata {
        MessageMetadata {
            retry_count: 0,
            delivery_id: "d-1".into(),
            redelivered: false,
            headers: HashMap::new(),
        }
    }

    fn test_dead_metadata() -> DeadMessageMetadata {
        DeadMessageMetadata {
            message: test_metadata(),
            reason: Some("rejected".into()),
            original_queue: Some("handler-test".into()),
            death_count: 1,
        }
    }

    fn test_message() -> TestMessage {
        TestMessage { value: 42 }
    }

    // -- tests --

    /// Default `handle_dead` returns `()` without panicking.
    #[tokio::test]
    async fn default_handle_dead_returns_unit() {
        let handler = FixedOutcomeHandler(Outcome::Ack);
        // The default impl just logs; calling it must not panic and must return ().
        handler
            .handle_dead(test_message(), test_dead_metadata())
            .await;
    }

    /// `Arc<H>` delegates `handle` to the inner handler and returns the correct outcome.
    #[tokio::test]
    async fn arc_blanket_handle_delegates_correctly() {
        let handler = Arc::new(FixedOutcomeHandler(Outcome::Ack));
        let outcome = handler.handle(test_message(), test_metadata()).await;
        assert!(matches!(outcome, Outcome::Ack));
    }

    /// `Arc<H>` delegates `handle` for all outcome variants.
    #[tokio::test]
    async fn arc_blanket_handle_retry_outcome() {
        let handler = Arc::new(FixedOutcomeHandler(Outcome::Retry));
        let outcome = handler.handle(test_message(), test_metadata()).await;
        assert!(matches!(outcome, Outcome::Retry));
    }

    /// `Arc<H>` delegates `handle_dead` to the inner handler's default impl without panicking.
    #[tokio::test]
    async fn arc_blanket_handle_dead_delegates_correctly() {
        let handler = Arc::new(FixedOutcomeHandler(Outcome::Ack));
        // Calling through Arc must reach the same default impl and return ().
        handler
            .handle_dead(test_message(), test_dead_metadata())
            .await;
    }
}