use std::sync::Arc;
use crate::audit::{AuditHandler, Audited};
use crate::metadata::{DeadMessageMetadata, MessageMetadata};
use crate::outcome::Outcome;
use crate::topic::Topic;
pub trait MessageHandler<T: Topic>: Send + Sync + 'static {
type Context: Clone + Send + Sync + 'static;
fn handle(
&self,
message: T::Message,
metadata: MessageMetadata,
ctx: &Self::Context,
) -> impl Future<Output = Outcome> + Send;
fn handle_dead(
&self,
_message: T::Message,
metadata: DeadMessageMetadata,
_ctx: &Self::Context,
) -> impl Future<Output = ()> + Send {
async move {
tracing::warn!(
delivery_id = %metadata.message.delivery_id,
reason = metadata.reason.as_deref().unwrap_or("unknown"),
original_queue = metadata.original_queue.as_deref().unwrap_or("unknown"),
death_count = metadata.death_count,
"Dead-letter message received, no handler implemented"
);
}
}
}
pub trait MessageHandlerExt<T: Topic>: MessageHandler<T> + Sized {
fn audited<A>(self, audit: A) -> Audited<Self, A>
where
A: AuditHandler<T>;
}
impl<T: Topic, H: MessageHandler<T>> MessageHandlerExt<T> for H {
fn audited<A>(self, audit: A) -> Audited<Self, A>
where
A: AuditHandler<T>,
{
Audited::new(self, audit)
}
}
impl<T: Topic, H: MessageHandler<T>> MessageHandler<T> for Arc<H> {
type Context = H::Context;
fn handle(
&self,
message: T::Message,
metadata: MessageMetadata,
ctx: &H::Context,
) -> impl Future<Output = Outcome> + Send {
(**self).handle(message, metadata, ctx)
}
fn handle_dead(
&self,
message: T::Message,
metadata: DeadMessageMetadata,
ctx: &H::Context,
) -> impl Future<Output = ()> + Send {
(**self).handle_dead(message, metadata, ctx)
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use super::*;
use crate::metadata::{DeadMessageMetadata, MessageMetadata};
use crate::outcome::Outcome;
use crate::topology::{QueueTopology, TopologyBuilder};
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct TestMessage {
value: u32,
}
struct TestTopic;
impl Topic for TestTopic {
type Message = TestMessage;
fn topology() -> &'static QueueTopology {
static TOPOLOGY: std::sync::OnceLock<QueueTopology> = std::sync::OnceLock::new();
TOPOLOGY.get_or_init(|| TopologyBuilder::new("handler-test").build())
}
}
struct FixedOutcomeHandler(Outcome);
impl MessageHandler<TestTopic> for FixedOutcomeHandler {
type Context = ();
async fn handle(&self, _msg: TestMessage, _meta: MessageMetadata, _: &()) -> Outcome {
self.0.clone()
}
}
fn test_metadata() -> MessageMetadata {
MessageMetadata {
retry_count: 0,
delivery_id: "d-1".into(),
redelivered: false,
headers: HashMap::new(),
}
}
fn test_dead_metadata() -> DeadMessageMetadata {
DeadMessageMetadata {
message: test_metadata(),
reason: Some("rejected".into()),
original_queue: Some("handler-test".into()),
death_count: 1,
}
}
fn test_message() -> TestMessage {
TestMessage { value: 42 }
}
#[tokio::test]
async fn default_handle_dead_returns_unit() {
let handler = FixedOutcomeHandler(Outcome::Ack);
handler
.handle_dead(test_message(), test_dead_metadata(), &())
.await;
}
#[tokio::test]
async fn arc_blanket_handle_delegates_correctly() {
let handler = Arc::new(FixedOutcomeHandler(Outcome::Ack));
let outcome = handler.handle(test_message(), test_metadata(), &()).await;
assert!(matches!(outcome, Outcome::Ack));
}
#[tokio::test]
async fn arc_blanket_handle_retry_outcome() {
let handler = Arc::new(FixedOutcomeHandler(Outcome::Retry));
let outcome = handler.handle(test_message(), test_metadata(), &()).await;
assert!(matches!(outcome, Outcome::Retry));
}
#[tokio::test]
async fn arc_blanket_handle_dead_delegates_correctly() {
let handler = Arc::new(FixedOutcomeHandler(Outcome::Ack));
handler
.handle_dead(test_message(), test_dead_metadata(), &())
.await;
}
#[tokio::test]
async fn handle_receives_context_by_reference() {
struct CtxHandler;
impl MessageHandler<TestTopic> for CtxHandler {
type Context = u32;
async fn handle(
&self,
_msg: TestMessage,
_meta: MessageMetadata,
ctx: &u32,
) -> Outcome {
assert_eq!(*ctx, 42);
Outcome::Ack
}
}
let outcome = CtxHandler
.handle(test_message(), test_metadata(), &42)
.await;
assert!(matches!(outcome, Outcome::Ack));
}
#[tokio::test]
async fn audited_extension_wraps_handler() {
use crate::audit::{AuditHandler, AuditRecord};
use crate::error::Result;
struct NullAudit;
impl AuditHandler<TestTopic> for NullAudit {
async fn audit(&self, _rec: &AuditRecord<TestMessage>) -> Result<()> {
Ok(())
}
}
let wrapped = FixedOutcomeHandler(Outcome::Ack).audited(NullAudit);
let outcome = wrapped.handle(test_message(), test_metadata(), &()).await;
assert!(matches!(outcome, Outcome::Ack));
}
#[tokio::test]
async fn arc_blanket_handle_forwards_context() {
struct CtxHandler;
impl MessageHandler<TestTopic> for CtxHandler {
type Context = u32;
async fn handle(
&self,
_msg: TestMessage,
_meta: MessageMetadata,
ctx: &u32,
) -> Outcome {
assert_eq!(*ctx, 7);
Outcome::Ack
}
}
let handler = Arc::new(CtxHandler);
let outcome = handler.handle(test_message(), test_metadata(), &7).await;
assert!(matches!(outcome, Outcome::Ack));
}
}