use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct AuditContext {
pub message_type: Option<String>,
pub release_code: Option<String>,
pub pid: Option<u32>,
pub sender_eic: Option<String>,
pub receiver_eic: Option<String>,
pub message_ref: Option<String>,
pub process_id: Option<String>,
pub tenant_id: Option<String>,
pub correlation_id: Option<String>,
pub timestamp: time::OffsetDateTime,
}
impl AuditContext {
#[must_use]
pub fn now() -> Self {
Self {
message_type: None,
release_code: None,
pid: None,
sender_eic: None,
receiver_eic: None,
message_ref: None,
process_id: None,
tenant_id: None,
correlation_id: None,
timestamp: time::OffsetDateTime::now_utc(),
}
}
#[must_use]
pub fn with_message_type(mut self, mt: impl Into<String>) -> Self {
self.message_type = Some(mt.into());
self
}
#[must_use]
pub fn with_release_code(mut self, rc: impl Into<String>) -> Self {
self.release_code = Some(rc.into());
self
}
#[must_use]
pub fn with_pid(mut self, pid: u32) -> Self {
self.pid = Some(pid);
self
}
#[must_use]
pub fn with_sender_eic(mut self, eic: impl Into<String>) -> Self {
self.sender_eic = Some(eic.into());
self
}
#[must_use]
pub fn with_receiver_eic(mut self, eic: impl Into<String>) -> Self {
self.receiver_eic = Some(eic.into());
self
}
#[must_use]
pub fn with_message_ref(mut self, r: impl Into<String>) -> Self {
self.message_ref = Some(r.into());
self
}
#[must_use]
pub fn with_process_id(mut self, id: impl Into<String>) -> Self {
self.process_id = Some(id.into());
self
}
#[must_use]
pub fn with_tenant_id(mut self, id: impl Into<String>) -> Self {
self.tenant_id = Some(id.into());
self
}
#[must_use]
pub fn with_correlation_id(mut self, id: impl Into<String>) -> Self {
self.correlation_id = Some(id.into());
self
}
}
impl Default for AuditContext {
fn default() -> Self {
Self::now()
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum DeadLetterReason {
UnknownPid {
pid: u32,
context: AuditContext,
},
UnknownConversation {
conversation_id: String,
context: AuditContext,
},
VersionMismatch {
expected: String,
received: String,
context: AuditContext,
},
DuplicateMessage {
inbox_key: String,
context: AuditContext,
},
ProcessingError {
message: String,
context: AuditContext,
},
OutboxExhausted {
message_id: crate::ids::OutboxMessageId,
message_type: String,
recipient: String,
last_error: String,
attempts: u32,
},
}
impl DeadLetterReason {
#[must_use]
pub fn label(&self) -> &'static str {
match self {
Self::UnknownPid { .. } => "unknown_pid",
Self::UnknownConversation { .. } => "unknown_conversation",
Self::VersionMismatch { .. } => "version_mismatch",
Self::DuplicateMessage { .. } => "duplicate_message",
Self::ProcessingError { .. } => "processing_error",
Self::OutboxExhausted { .. } => "outbox_exhausted",
}
}
#[must_use]
pub fn audit_context(&self) -> Option<&AuditContext> {
match self {
Self::UnknownPid { context, .. }
| Self::UnknownConversation { context, .. }
| Self::VersionMismatch { context, .. }
| Self::DuplicateMessage { context, .. }
| Self::ProcessingError { context, .. } => Some(context),
Self::OutboxExhausted { .. } => None,
}
}
}
impl std::fmt::Display for DeadLetterReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::UnknownPid { pid, .. } => write!(f, "unknown PID {pid}"),
Self::UnknownConversation {
conversation_id, ..
} => {
write!(f, "unknown conversation {conversation_id}")
}
Self::VersionMismatch {
expected, received, ..
} => write!(
f,
"version mismatch: expected {expected}, received {received}"
),
Self::DuplicateMessage { inbox_key, .. } => write!(f, "duplicate message {inbox_key}"),
Self::ProcessingError { message, .. } => write!(f, "processing error: {message}"),
Self::OutboxExhausted {
message_id,
message_type,
recipient,
attempts,
..
} => write!(
f,
"outbox exhausted after {attempts} attempts: {message_type} → {recipient} (id={message_id})"
),
}
}
}
pub trait DeadLetterSink: Send + Sync + 'static {
fn reject(&self, reason: &DeadLetterReason);
}
#[derive(Debug, Clone, Default)]
pub struct LogDeadLetterSink;
impl DeadLetterSink for LogDeadLetterSink {
fn reject(&self, reason: &DeadLetterReason) {
if let Some(ctx) = reason.audit_context() {
tracing::warn!(
reason = reason.label(),
message_type = ctx.message_type.as_deref().unwrap_or(""),
release_code = ctx.release_code.as_deref().unwrap_or(""),
pid = ctx.pid.unwrap_or(0),
sender_eic = ctx.sender_eic.as_deref().unwrap_or(""),
receiver_eic = ctx.receiver_eic.as_deref().unwrap_or(""),
message_ref = ctx.message_ref.as_deref().unwrap_or(""),
process_id = ctx.process_id.as_deref().unwrap_or(""),
tenant_id = ctx.tenant_id.as_deref().unwrap_or(""),
correlation_id = ctx.correlation_id.as_deref().unwrap_or(""),
%ctx.timestamp,
"dead letter: {reason}",
);
} else {
match reason {
DeadLetterReason::OutboxExhausted {
message_id,
message_type,
recipient,
last_error,
attempts,
} => {
tracing::error!(
%message_id,
message_type,
recipient,
last_error,
attempts,
reason = reason.label(),
"dead letter: outbox exhausted — message removed after max delivery \
attempts; manual intervention required to deliver this message",
);
}
_ => {
tracing::warn!(reason = reason.label(), "dead letter: {reason}");
}
}
}
}
}
#[derive(Debug, Clone, Default)]
#[must_use = "NoopDeadLetterSink discards all rejections; use LogDeadLetterSink in production"]
pub struct NoopDeadLetterSink;
#[cfg(any(test, feature = "testing"))]
impl DeadLetterSink for NoopDeadLetterSink {
fn reject(&self, _reason: &DeadLetterReason) {}
}
impl<T: DeadLetterSink> DeadLetterSink for Arc<T> {
fn reject(&self, reason: &DeadLetterReason) {
self.as_ref().reject(reason);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn dead_letter_reason_labels() {
assert_eq!(
DeadLetterReason::UnknownPid {
pid: 55001,
context: AuditContext::now()
}
.label(),
"unknown_pid"
);
assert_eq!(
DeadLetterReason::UnknownConversation {
conversation_id: "abc".into(),
context: AuditContext::now(),
}
.label(),
"unknown_conversation"
);
assert_eq!(
DeadLetterReason::VersionMismatch {
expected: "FV2025-10-01".into(),
received: "FV2026-10-01".into(),
context: AuditContext::now(),
}
.label(),
"version_mismatch"
);
assert_eq!(
DeadLetterReason::DuplicateMessage {
inbox_key: "msg-1".into(),
context: AuditContext::now(),
}
.label(),
"duplicate_message"
);
assert_eq!(
DeadLetterReason::ProcessingError {
message: "invalid state".into(),
context: AuditContext::now(),
}
.label(),
"processing_error"
);
}
#[test]
fn log_sink_does_not_panic() {
let sink = LogDeadLetterSink;
sink.reject(&DeadLetterReason::UnknownPid {
pid: 99999,
context: AuditContext::now(),
});
sink.reject(&DeadLetterReason::UnknownConversation {
conversation_id: "conv-123".into(),
context: AuditContext::now(),
});
sink.reject(&DeadLetterReason::VersionMismatch {
expected: "FV2025-10-01".into(),
received: "FV2026-10-01".into(),
context: AuditContext::now(),
});
sink.reject(&DeadLetterReason::DuplicateMessage {
inbox_key: "msg-42".into(),
context: AuditContext::now(),
});
sink.reject(&DeadLetterReason::ProcessingError {
message: "workflow rejected command".into(),
context: AuditContext::now(),
});
}
#[test]
fn noop_sink_does_not_panic() {
let sink = NoopDeadLetterSink;
sink.reject(&DeadLetterReason::UnknownPid {
pid: 55001,
context: AuditContext::now(),
});
}
#[test]
fn arc_blanket_impl_works() {
let sink: Arc<LogDeadLetterSink> = Arc::new(LogDeadLetterSink);
sink.reject(&DeadLetterReason::UnknownPid {
pid: 1,
context: AuditContext::now(),
});
}
#[test]
fn dead_letter_reason_display() {
assert_eq!(
DeadLetterReason::UnknownPid {
pid: 55001,
context: AuditContext::now()
}
.to_string(),
"unknown PID 55001"
);
assert!(
DeadLetterReason::VersionMismatch {
expected: "FV2025-10-01".into(),
received: "FV2026-10-01".into(),
context: AuditContext::now(),
}
.to_string()
.contains("version mismatch")
);
}
#[test]
fn audit_context_builder() {
let ctx = AuditContext::now()
.with_message_type("UTILMD")
.with_pid(55001)
.with_sender_eic("4012345000023")
.with_receiver_eic("9900357000004")
.with_message_ref("00001")
.with_tenant_id("tenant-a")
.with_correlation_id("conv-xyz");
assert_eq!(ctx.message_type.as_deref(), Some("UTILMD"));
assert_eq!(ctx.pid, Some(55001));
assert_eq!(ctx.sender_eic.as_deref(), Some("4012345000023"));
assert_eq!(ctx.correlation_id.as_deref(), Some("conv-xyz"));
}
#[test]
fn audit_context_returned_for_inbound_reasons() {
let r = DeadLetterReason::UnknownPid {
pid: 99,
context: AuditContext::now().with_pid(99),
};
assert!(r.audit_context().is_some());
}
}