use std::sync::Arc;
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum DeadLetterReason {
UnknownPid(u32),
UnknownConversation {
conversation_id: String,
},
VersionMismatch {
expected: String,
received: String,
},
DuplicateMessage {
inbox_key: String,
},
ProcessingError {
message: String,
},
OutboxExhausted {
message_id: crate::ids::OutboxMessageId,
message_type: String,
recipient: String,
last_error: String,
attempts: u32,
},
}
impl DeadLetterReason {
#[must_use]
pub fn label(&self) -> &'static str {
match self {
Self::UnknownPid(_) => "unknown_pid",
Self::UnknownConversation { .. } => "unknown_conversation",
Self::VersionMismatch { .. } => "version_mismatch",
Self::DuplicateMessage { .. } => "duplicate_message",
Self::ProcessingError { .. } => "processing_error",
Self::OutboxExhausted { .. } => "outbox_exhausted",
}
}
}
impl std::fmt::Display for DeadLetterReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::UnknownPid(pid) => write!(f, "unknown PID {pid}"),
Self::UnknownConversation { conversation_id } => {
write!(f, "unknown conversation {conversation_id}")
}
Self::VersionMismatch { expected, received } => write!(
f,
"version mismatch: expected {expected}, received {received}"
),
Self::DuplicateMessage { inbox_key } => write!(f, "duplicate message {inbox_key}"),
Self::ProcessingError { message } => write!(f, "processing error: {message}"),
Self::OutboxExhausted {
message_id,
message_type,
recipient,
attempts,
..
} => write!(
f,
"outbox exhausted after {attempts} attempts: {message_type} → {recipient} (id={message_id})"
),
}
}
}
pub trait DeadLetterSink: Send + Sync + 'static {
fn reject(&self, reason: &DeadLetterReason);
}
#[derive(Debug, Clone, Default)]
pub struct LogDeadLetterSink;
impl DeadLetterSink for LogDeadLetterSink {
fn reject(&self, reason: &DeadLetterReason) {
match reason {
DeadLetterReason::UnknownPid(pid) => {
tracing::warn!(
pid,
reason = reason.label(),
"dead letter: unknown PID — no workflow registered; \
send CONTRL negative acknowledgement",
);
}
DeadLetterReason::UnknownConversation { conversation_id } => {
tracing::warn!(
conversation_id,
reason = reason.label(),
"dead letter: unknown conversation — no in-flight process found; \
process may have completed or registry was lost on restart",
);
}
DeadLetterReason::VersionMismatch { expected, received } => {
tracing::warn!(
expected,
received,
reason = reason.label(),
"dead letter: format version mismatch — no adapter registered for received version",
);
}
DeadLetterReason::DuplicateMessage { inbox_key } => {
tracing::warn!(
inbox_key,
reason = reason.label(),
"dead letter: duplicate message — AS4 retry already processed; ignoring",
);
}
DeadLetterReason::ProcessingError { message } => {
tracing::warn!(
message,
reason = reason.label(),
"dead letter: processing error — message routed but could not be processed",
);
}
DeadLetterReason::OutboxExhausted {
message_id,
message_type,
recipient,
last_error,
attempts,
} => {
tracing::error!(
%message_id,
message_type,
recipient,
last_error,
attempts,
reason = reason.label(),
"dead letter: outbox exhausted — message removed after max delivery attempts; \
manual intervention required to deliver this message",
);
}
}
}
}
#[derive(Debug, Clone, Default)]
#[must_use = "NoopDeadLetterSink discards all rejections; use LogDeadLetterSink in production"]
pub struct NoopDeadLetterSink;
#[cfg(any(test, feature = "testing"))]
impl DeadLetterSink for NoopDeadLetterSink {
fn reject(&self, _reason: &DeadLetterReason) {}
}
impl<T: DeadLetterSink> DeadLetterSink for Arc<T> {
fn reject(&self, reason: &DeadLetterReason) {
self.as_ref().reject(reason);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn dead_letter_reason_labels() {
assert_eq!(DeadLetterReason::UnknownPid(55001).label(), "unknown_pid");
assert_eq!(
DeadLetterReason::UnknownConversation {
conversation_id: "abc".into()
}
.label(),
"unknown_conversation"
);
assert_eq!(
DeadLetterReason::VersionMismatch {
expected: "FV2025-10-01".into(),
received: "FV2026-10-01".into(),
}
.label(),
"version_mismatch"
);
assert_eq!(
DeadLetterReason::DuplicateMessage {
inbox_key: "msg-1".into(),
}
.label(),
"duplicate_message"
);
assert_eq!(
DeadLetterReason::ProcessingError {
message: "invalid state".into(),
}
.label(),
"processing_error"
);
}
#[test]
fn log_sink_does_not_panic() {
let sink = LogDeadLetterSink;
sink.reject(&DeadLetterReason::UnknownPid(99999));
sink.reject(&DeadLetterReason::UnknownConversation {
conversation_id: "conv-123".into(),
});
sink.reject(&DeadLetterReason::VersionMismatch {
expected: "FV2025-10-01".into(),
received: "FV2026-10-01".into(),
});
sink.reject(&DeadLetterReason::DuplicateMessage {
inbox_key: "msg-42".into(),
});
sink.reject(&DeadLetterReason::ProcessingError {
message: "workflow rejected command".into(),
});
}
#[test]
fn noop_sink_does_not_panic() {
let sink = NoopDeadLetterSink;
sink.reject(&DeadLetterReason::UnknownPid(55001));
}
#[test]
fn arc_blanket_impl_works() {
let sink: Arc<LogDeadLetterSink> = Arc::new(LogDeadLetterSink);
sink.reject(&DeadLetterReason::UnknownPid(1));
}
#[test]
fn dead_letter_reason_display() {
assert_eq!(
DeadLetterReason::UnknownPid(55001).to_string(),
"unknown PID 55001"
);
assert!(
DeadLetterReason::VersionMismatch {
expected: "FV2025-10-01".into(),
received: "FV2026-10-01".into(),
}
.to_string()
.contains("version mismatch")
);
}
}