use std::sync::Arc;
use crate::metadata::{DeadMessageMetadata, MessageMetadata};
use crate::outcome::Outcome;
use crate::topic::Topic;
pub trait MessageHandler<T: Topic>: Send + Sync + 'static {
fn handle(
&self,
message: T::Message,
metadata: MessageMetadata,
) -> impl Future<Output = Outcome> + Send;
fn handle_dead(
&self,
_message: T::Message,
metadata: DeadMessageMetadata,
) -> impl Future<Output = ()> + Send {
async move {
tracing::warn!(
delivery_id = %metadata.message.delivery_id,
reason = metadata.reason.as_deref().unwrap_or("unknown"),
original_queue = metadata.original_queue.as_deref().unwrap_or("unknown"),
death_count = metadata.death_count,
"Dead-letter message received, no handler implemented"
);
}
}
}
impl<T: Topic, H: MessageHandler<T>> MessageHandler<T> for Arc<H> {
fn handle(
&self,
message: T::Message,
metadata: MessageMetadata,
) -> impl Future<Output = Outcome> + Send {
(**self).handle(message, metadata)
}
fn handle_dead(
&self,
message: T::Message,
metadata: DeadMessageMetadata,
) -> impl Future<Output = ()> + Send {
(**self).handle_dead(message, metadata)
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use super::*;
use crate::metadata::{DeadMessageMetadata, MessageMetadata};
use crate::outcome::Outcome;
use crate::topology::{QueueTopology, TopologyBuilder};
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct TestMessage {
value: u32,
}
struct TestTopic;
impl Topic for TestTopic {
type Message = TestMessage;
fn topology() -> &'static QueueTopology {
static TOPOLOGY: std::sync::OnceLock<QueueTopology> = std::sync::OnceLock::new();
TOPOLOGY.get_or_init(|| TopologyBuilder::new("handler-test").build())
}
}
struct FixedOutcomeHandler(Outcome);
impl MessageHandler<TestTopic> for FixedOutcomeHandler {
async fn handle(&self, _msg: TestMessage, _meta: MessageMetadata) -> Outcome {
self.0.clone()
}
}
fn test_metadata() -> MessageMetadata {
MessageMetadata {
retry_count: 0,
delivery_id: "d-1".into(),
redelivered: false,
headers: HashMap::new(),
}
}
fn test_dead_metadata() -> DeadMessageMetadata {
DeadMessageMetadata {
message: test_metadata(),
reason: Some("rejected".into()),
original_queue: Some("handler-test".into()),
death_count: 1,
}
}
fn test_message() -> TestMessage {
TestMessage { value: 42 }
}
#[tokio::test]
async fn default_handle_dead_returns_unit() {
let handler = FixedOutcomeHandler(Outcome::Ack);
handler
.handle_dead(test_message(), test_dead_metadata())
.await;
}
#[tokio::test]
async fn arc_blanket_handle_delegates_correctly() {
let handler = Arc::new(FixedOutcomeHandler(Outcome::Ack));
let outcome = handler.handle(test_message(), test_metadata()).await;
assert!(matches!(outcome, Outcome::Ack));
}
#[tokio::test]
async fn arc_blanket_handle_retry_outcome() {
let handler = Arc::new(FixedOutcomeHandler(Outcome::Retry));
let outcome = handler.handle(test_message(), test_metadata()).await;
assert!(matches!(outcome, Outcome::Retry));
}
#[tokio::test]
async fn arc_blanket_handle_dead_delegates_correctly() {
let handler = Arc::new(FixedOutcomeHandler(Outcome::Ack));
handler
.handle_dead(test_message(), test_dead_metadata())
.await;
}
}