mako_engine/dead_letter.rs
1//! Dead-letter sink for unroutable or unprocessable inbound messages.
2//!
3//! BDEW AS4 requires that every received message is either processed or
4//! explicitly refused (CONTRL negative acknowledgement). Messages that
5//! cannot be routed to a workflow — because the PID is unknown, the
6//! conversation is not in-flight, or the format version has no adapter —
7//! must not be silently dropped.
8//!
9//! # Design
10//!
11//! [`DeadLetterSink`] is a synchronous trait that receives a structured
12//! [`DeadLetterReason`] for every rejected message. The synchronous contract
13//! keeps dispatch-path hot code fast; implementations that need async work
14//! (e.g. persisting to a durable DLQ or sending a CONTRL) can use
15//! `tokio::spawn` internally.
16//!
17//! # Implementations
18//!
19//! | Type | Behaviour |
20//! |------|-----------|
21//! | [`LogDeadLetterSink`] | Emits structured `tracing::warn!`; suitable for all deployments |
22//! | [`NoopDeadLetterSink`] | Silently discards; **only for testing** |
23//!
24//! # Wiring
25//!
26//! Pass an implementation to [`EngineBuilder::with_dead_letter_sink`].
27//! The default is [`LogDeadLetterSink`] so unroutable messages are always
28//! visible in the log output without any configuration.
29//!
30//! ```rust
31//! use mako_engine::dead_letter::{DeadLetterReason, DeadLetterSink, LogDeadLetterSink};
32//!
33//! let sink = LogDeadLetterSink;
34//! sink.reject(&DeadLetterReason::UnknownPid(99999));
35//! ```
36//!
37//! [`EngineBuilder::with_dead_letter_sink`]: crate::builder::EngineBuilder::with_dead_letter_sink
38
39use std::sync::Arc;
40
41// ── DeadLetterReason ──────────────────────────────────────────────────────────
42
43/// Structured reason why an inbound message was rejected.
44///
45/// The variant gives the dispatch path enough information to emit an
46/// actionable CONTRL or log entry. Adding new variants is a non-breaking
47/// change thanks to `#[non_exhaustive]`.
48#[derive(Debug, Clone)]
49#[non_exhaustive]
50pub enum DeadLetterReason {
51 /// No workflow is registered for this PID in the [`PidRouter`].
52 ///
53 /// The PID is either from a future BDEW release not yet deployed or a
54 /// malformed message. Respond with a CONTRL negative acknowledgement.
55 ///
56 /// [`PidRouter`]: crate::pid_router::PidRouter
57 UnknownPid(u32),
58
59 /// No in-flight process matched the inbound `conversation_id`.
60 ///
61 /// This typically means the process completed, was never started, or
62 /// the [`ProcessRegistry`] was lost on restart (see.
63 ///
64 /// [`ProcessRegistry`]: crate::registry::ProcessRegistry
65 UnknownConversation {
66 /// The `conversation_id` from the inbound EDIFACT interchange.
67 conversation_id: String,
68 },
69
70 /// The message's format version has no registered [`MessageAdapter`].
71 ///
72 /// Either the adapter registry is incomplete (see or the sender
73 /// is using a deprecated / future format version.
74 ///
75 /// [`MessageAdapter`]: crate::message_adapter::MessageAdapter
76 VersionMismatch {
77 /// The format version string the adapter registry expected.
78 expected: String,
79 /// The format version string carried in the inbound message.
80 received: String,
81 },
82
83 /// A message with this inbox key was already accepted (AS4 duplicate).
84 ///
85 /// The AS4 sender retries for up to 72 hours. The [`InboxStore`]
86 /// detected the duplicate and the message must not be processed again.
87 ///
88 /// [`InboxStore`]: crate::inbox::InboxStore
89 DuplicateMessage {
90 /// The inbox deduplication key (typically the AS4 `MessageId`).
91 inbox_key: String,
92 },
93
94 /// A workflow or adapter returned a processing error.
95 ///
96 /// The message was routed correctly but could not be processed. Use
97 /// this variant when the failure is definitive (not retriable).
98 ProcessingError {
99 /// Short, human-readable description of the failure.
100 message: String,
101 },
102
103 /// The outbox delivery worker gave up after exhausting all retry attempts.
104 ///
105 /// The message was re-queued `max_attempts` times and never successfully
106 /// delivered to the AS4 endpoint (or ERP webhook). The message is removed
107 /// from the outbox and recorded here for regulatory audit.
108 OutboxExhausted {
109 /// The outbox message ID of the undeliverable message.
110 message_id: crate::ids::OutboxMessageId,
111 /// The message type (e.g. `"APERAK"`, `"CONTRL"`).
112 message_type: String,
113 /// The intended recipient GLN.
114 recipient: String,
115 /// The last error returned by the AS4 sender.
116 last_error: String,
117 /// How many delivery attempts were made.
118 attempts: u32,
119 },
120}
121
122impl DeadLetterReason {
123 /// Short label identifying the rejection category.
124 ///
125 /// Suitable for structured log fields and metric labels.
126 #[must_use]
127 pub fn label(&self) -> &'static str {
128 match self {
129 Self::UnknownPid(_) => "unknown_pid",
130 Self::UnknownConversation { .. } => "unknown_conversation",
131 Self::VersionMismatch { .. } => "version_mismatch",
132 Self::DuplicateMessage { .. } => "duplicate_message",
133 Self::ProcessingError { .. } => "processing_error",
134 Self::OutboxExhausted { .. } => "outbox_exhausted",
135 }
136 }
137}
138
139impl std::fmt::Display for DeadLetterReason {
140 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141 match self {
142 Self::UnknownPid(pid) => write!(f, "unknown PID {pid}"),
143 Self::UnknownConversation { conversation_id } => {
144 write!(f, "unknown conversation {conversation_id}")
145 }
146 Self::VersionMismatch { expected, received } => write!(
147 f,
148 "version mismatch: expected {expected}, received {received}"
149 ),
150 Self::DuplicateMessage { inbox_key } => write!(f, "duplicate message {inbox_key}"),
151 Self::ProcessingError { message } => write!(f, "processing error: {message}"),
152 Self::OutboxExhausted {
153 message_id,
154 message_type,
155 recipient,
156 attempts,
157 ..
158 } => write!(
159 f,
160 "outbox exhausted after {attempts} attempts: {message_type} → {recipient} (id={message_id})"
161 ),
162 }
163 }
164}
165
166// ── DeadLetterSink trait ──────────────────────────────────────────────────────
167
168/// Receives messages that cannot be routed or processed.
169///
170/// Implement this trait to:
171/// - Emit CONTRL negative acknowledgements for unroutable messages
172/// - Persist rejections to a durable dead-letter queue for manual review
173/// - Trigger alerts when duplicate-message counts exceed a threshold
174///
175/// The method is **synchronous**. Implementations that require async work
176/// (network calls, database writes) must use `tokio::spawn` internally.
177///
178/// # Default
179///
180/// The default dead-letter sink is [`LogDeadLetterSink`], which emits
181/// `tracing::warn!` events. Override with
182/// [`EngineBuilder::with_dead_letter_sink`] to add CONTRL dispatch or
183/// persistent DLQ storage.
184///
185/// [`EngineBuilder::with_dead_letter_sink`]: crate::builder::EngineBuilder::with_dead_letter_sink
186pub trait DeadLetterSink: Send + Sync + 'static {
187 /// Record a rejected message.
188 ///
189 /// Called by the dispatch path synchronously, before the inbound
190 /// message is acknowledged at the AS4 transport layer. Must not block.
191 fn reject(&self, reason: &DeadLetterReason);
192}
193
194// ── LogDeadLetterSink ─────────────────────────────────────────────────────────
195
196/// A [`DeadLetterSink`] that emits a structured `tracing::warn!` event for
197/// every rejected message.
198///
199/// Suitable for all deployment tiers. In production, combine with a
200/// `tracing` subscriber that forwards `warn`-level events to your alert
201/// pipeline (Loki, CloudWatch, etc.).
202///
203/// This is the **default** dead-letter sink in [`EngineBuilder`].
204///
205/// [`EngineBuilder`]: crate::builder::EngineBuilder
206#[derive(Debug, Clone, Default)]
207pub struct LogDeadLetterSink;
208
209impl DeadLetterSink for LogDeadLetterSink {
210 fn reject(&self, reason: &DeadLetterReason) {
211 match reason {
212 DeadLetterReason::UnknownPid(pid) => {
213 tracing::warn!(
214 pid,
215 reason = reason.label(),
216 "dead letter: unknown PID — no workflow registered; \
217 send CONTRL negative acknowledgement",
218 );
219 }
220 DeadLetterReason::UnknownConversation { conversation_id } => {
221 tracing::warn!(
222 conversation_id,
223 reason = reason.label(),
224 "dead letter: unknown conversation — no in-flight process found; \
225 process may have completed or registry was lost on restart",
226 );
227 }
228 DeadLetterReason::VersionMismatch { expected, received } => {
229 tracing::warn!(
230 expected,
231 received,
232 reason = reason.label(),
233 "dead letter: format version mismatch — no adapter registered for received version",
234 );
235 }
236 DeadLetterReason::DuplicateMessage { inbox_key } => {
237 tracing::warn!(
238 inbox_key,
239 reason = reason.label(),
240 "dead letter: duplicate message — AS4 retry already processed; ignoring",
241 );
242 }
243 DeadLetterReason::ProcessingError { message } => {
244 tracing::warn!(
245 message,
246 reason = reason.label(),
247 "dead letter: processing error — message routed but could not be processed",
248 );
249 }
250 DeadLetterReason::OutboxExhausted {
251 message_id,
252 message_type,
253 recipient,
254 last_error,
255 attempts,
256 } => {
257 tracing::error!(
258 %message_id,
259 message_type,
260 recipient,
261 last_error,
262 attempts,
263 reason = reason.label(),
264 "dead letter: outbox exhausted — message removed after max delivery attempts; \
265 manual intervention required to deliver this message",
266 );
267 }
268 }
269 }
270}
271
272// ── NoopDeadLetterSink ────────────────────────────────────────────────────────
273
274/// A [`DeadLetterSink`] that silently discards all rejected messages.
275///
276/// **Use only in unit tests** where dead-letter events are not the subject
277/// under test. Using this in production means unroutable messages are lost
278/// without any diagnostic output, violating BDEW AS4 requirements.
279#[derive(Debug, Clone, Default)]
280#[must_use = "NoopDeadLetterSink discards all rejections; use LogDeadLetterSink in production"]
281pub struct NoopDeadLetterSink;
282
283#[cfg(any(test, feature = "testing"))]
284impl DeadLetterSink for NoopDeadLetterSink {
285 fn reject(&self, _reason: &DeadLetterReason) {}
286}
287
288// ── ArcDeadLetterSink ─────────────────────────────────────────────────────────
289
290/// Blanket implementation so `Arc<T>` is a `DeadLetterSink` whenever `T` is.
291///
292/// This allows passing `Arc<LogDeadLetterSink>` or `Arc<dyn DeadLetterSink>`
293/// wherever a `DeadLetterSink` is expected without an extra wrapper.
294impl<T: DeadLetterSink> DeadLetterSink for Arc<T> {
295 fn reject(&self, reason: &DeadLetterReason) {
296 self.as_ref().reject(reason);
297 }
298}
299
300#[cfg(test)]
301mod tests {
302 use super::*;
303
304 #[test]
305 fn dead_letter_reason_labels() {
306 assert_eq!(DeadLetterReason::UnknownPid(55001).label(), "unknown_pid");
307 assert_eq!(
308 DeadLetterReason::UnknownConversation {
309 conversation_id: "abc".into()
310 }
311 .label(),
312 "unknown_conversation"
313 );
314 assert_eq!(
315 DeadLetterReason::VersionMismatch {
316 expected: "FV2025-10-01".into(),
317 received: "FV2026-10-01".into(),
318 }
319 .label(),
320 "version_mismatch"
321 );
322 assert_eq!(
323 DeadLetterReason::DuplicateMessage {
324 inbox_key: "msg-1".into(),
325 }
326 .label(),
327 "duplicate_message"
328 );
329 assert_eq!(
330 DeadLetterReason::ProcessingError {
331 message: "invalid state".into(),
332 }
333 .label(),
334 "processing_error"
335 );
336 }
337
338 #[test]
339 fn log_sink_does_not_panic() {
340 let sink = LogDeadLetterSink;
341 sink.reject(&DeadLetterReason::UnknownPid(99999));
342 sink.reject(&DeadLetterReason::UnknownConversation {
343 conversation_id: "conv-123".into(),
344 });
345 sink.reject(&DeadLetterReason::VersionMismatch {
346 expected: "FV2025-10-01".into(),
347 received: "FV2026-10-01".into(),
348 });
349 sink.reject(&DeadLetterReason::DuplicateMessage {
350 inbox_key: "msg-42".into(),
351 });
352 sink.reject(&DeadLetterReason::ProcessingError {
353 message: "workflow rejected command".into(),
354 });
355 }
356
357 #[test]
358 fn noop_sink_does_not_panic() {
359 let sink = NoopDeadLetterSink;
360 sink.reject(&DeadLetterReason::UnknownPid(55001));
361 }
362
363 #[test]
364 fn arc_blanket_impl_works() {
365 let sink: Arc<LogDeadLetterSink> = Arc::new(LogDeadLetterSink);
366 sink.reject(&DeadLetterReason::UnknownPid(1));
367 }
368
369 #[test]
370 fn dead_letter_reason_display() {
371 assert_eq!(
372 DeadLetterReason::UnknownPid(55001).to_string(),
373 "unknown PID 55001"
374 );
375 assert!(
376 DeadLetterReason::VersionMismatch {
377 expected: "FV2025-10-01".into(),
378 received: "FV2026-10-01".into(),
379 }
380 .to_string()
381 .contains("version mismatch")
382 );
383 }
384}