mako_engine/dead_letter.rs
1//! Dead-letter sink for unroutable or unprocessable inbound messages.
2//!
3//! BDEW AS4 requires that every received message is either processed or
4//! explicitly refused (CONTRL negative acknowledgement). Messages that
5//! cannot be routed to a workflow — because the PID is unknown, the
6//! conversation is not in-flight, or the format version has no adapter —
7//! must not be silently dropped.
8//!
9//! # Design
10//!
11//! [`DeadLetterSink`] is a synchronous trait that receives a structured
12//! [`DeadLetterReason`] for every rejected message. The synchronous contract
13//! keeps dispatch-path hot code fast; implementations that need async work
14//! (e.g. persisting to a durable DLQ or sending a CONTRL) can use
15//! `tokio::spawn` internally.
16//!
17//! # Implementations
18//!
19//! | Type | Behaviour |
20//! |------|-----------|
21//! | [`LogDeadLetterSink`] | Emits structured `tracing::warn!`; suitable for all deployments |
22//! | [`NoopDeadLetterSink`] | Silently discards; **only for testing** |
23//!
24//! # Wiring
25//!
26//! Pass an implementation to [`EngineBuilder::with_dead_letter_sink`].
27//! The default is [`LogDeadLetterSink`] so unroutable messages are always
28//! visible in the log output without any configuration.
29//!
30//! ```rust
31//! use mako_engine::dead_letter::{AuditContext, DeadLetterReason, DeadLetterSink, LogDeadLetterSink};
32//!
33//! let sink = LogDeadLetterSink;
34//! sink.reject(&DeadLetterReason::UnknownPid { pid: 99999, context: AuditContext::now() });
35//! ```
36//!
37//! [`EngineBuilder::with_dead_letter_sink`]: crate::builder::EngineBuilder::with_dead_letter_sink
38
39use std::sync::Arc;
40
41// ── AuditContext ──────────────────────────────────────────────────────────────
42
43/// Structured audit context attached to every dead-letter event.
44///
45/// All fields are `Option` because they are only partially known at rejection
46/// time (e.g. `pid` is not available for a parse failure before the PID is
47/// decoded). Callers fill in as many fields as they have.
48///
49/// These fields map to §22 MessZV audit-log requirements for AS4 message
50/// rejection events:
51///
52/// | Field | §22 MessZV requirement |
53/// |---|---|
54/// | `message_type` | Nachrichtentyp (UTILMD, MSCONS, APERAK, …) |
55/// | `release_code` | Releasekennung (S2.1, G1.1, 2.4c, …) |
56/// | `pid` | Prüfidentifikator |
57/// | `sender_eic` | GLN des Absenders |
58/// | `receiver_eic` | GLN des Empfängers |
59/// | `message_ref` | UNH-Referenz |
60/// | `process_id` | Geschäftsvorfallkennung |
61/// | `tenant_id` | Mandant |
62/// | `correlation_id` | AS4 `ConversationId` or similar |
63/// | `timestamp` | Zeitstempel des Eingangs (German local time) |
64#[derive(Debug, Clone)]
65pub struct AuditContext {
66 /// EDIFACT message type (e.g. `"UTILMD"`, `"MSCONS"`, `"APERAK"`).
67 pub message_type: Option<String>,
68 /// BDEW release code (e.g. `"S2.1"`, `"G1.1"`, `"2.4c"`).
69 pub release_code: Option<String>,
70 /// BDEW Prüfidentifikator numeric code.
71 pub pid: Option<u32>,
72 /// GLN of the AS4 sender.
73 pub sender_eic: Option<String>,
74 /// GLN of the AS4 receiver.
75 pub receiver_eic: Option<String>,
76 /// UNH message reference (interchange + message ref).
77 pub message_ref: Option<String>,
78 /// Internal process / workflow stream ID.
79 pub process_id: Option<String>,
80 /// Tenant identifier (Mandant).
81 pub tenant_id: Option<String>,
82 /// AS4 ConversationId or engine correlation key.
83 pub correlation_id: Option<String>,
84 /// Timestamp of message receipt, in German local time (CET/CEST).
85 pub timestamp: time::OffsetDateTime,
86}
87
88impl AuditContext {
89 /// Create an `AuditContext` with only a timestamp, all other fields `None`.
90 ///
91 /// Use builder-style setters to fill in known fields:
92 /// ```rust
93 /// use mako_engine::dead_letter::AuditContext;
94 ///
95 /// let ctx = AuditContext::now()
96 /// .with_message_type("UTILMD")
97 /// .with_pid(55001)
98 /// .with_sender_eic("4012345000023");
99 /// ```
100 #[must_use]
101 pub fn now() -> Self {
102 Self {
103 message_type: None,
104 release_code: None,
105 pid: None,
106 sender_eic: None,
107 receiver_eic: None,
108 message_ref: None,
109 process_id: None,
110 tenant_id: None,
111 correlation_id: None,
112 timestamp: time::OffsetDateTime::now_utc(),
113 }
114 }
115
116 /// Set the message type.
117 #[must_use]
118 pub fn with_message_type(mut self, mt: impl Into<String>) -> Self {
119 self.message_type = Some(mt.into());
120 self
121 }
122
123 /// Set the BDEW release code.
124 #[must_use]
125 pub fn with_release_code(mut self, rc: impl Into<String>) -> Self {
126 self.release_code = Some(rc.into());
127 self
128 }
129
130 /// Set the Prüfidentifikator.
131 #[must_use]
132 pub fn with_pid(mut self, pid: u32) -> Self {
133 self.pid = Some(pid);
134 self
135 }
136
137 /// Set the sender GLN.
138 #[must_use]
139 pub fn with_sender_eic(mut self, eic: impl Into<String>) -> Self {
140 self.sender_eic = Some(eic.into());
141 self
142 }
143
144 /// Set the receiver GLN.
145 #[must_use]
146 pub fn with_receiver_eic(mut self, eic: impl Into<String>) -> Self {
147 self.receiver_eic = Some(eic.into());
148 self
149 }
150
151 /// Set the UNH message reference.
152 #[must_use]
153 pub fn with_message_ref(mut self, r: impl Into<String>) -> Self {
154 self.message_ref = Some(r.into());
155 self
156 }
157
158 /// Set the internal process / stream ID.
159 #[must_use]
160 pub fn with_process_id(mut self, id: impl Into<String>) -> Self {
161 self.process_id = Some(id.into());
162 self
163 }
164
165 /// Set the tenant identifier.
166 #[must_use]
167 pub fn with_tenant_id(mut self, id: impl Into<String>) -> Self {
168 self.tenant_id = Some(id.into());
169 self
170 }
171
172 /// Set the AS4 correlation / conversation ID.
173 #[must_use]
174 pub fn with_correlation_id(mut self, id: impl Into<String>) -> Self {
175 self.correlation_id = Some(id.into());
176 self
177 }
178}
179
180impl Default for AuditContext {
181 fn default() -> Self {
182 Self::now()
183 }
184}
185
186// ── DeadLetterReason ──────────────────────────────────────────────────────────
187
188/// Structured reason why an inbound message was rejected.
189///
190/// The variant gives the dispatch path enough information to emit an
191/// actionable CONTRL or log entry. Each variant carries an [`AuditContext`]
192/// with the §22 MessZV fields required for regulatory audit logging.
193///
194/// Adding new variants is a non-breaking change thanks to `#[non_exhaustive]`.
195#[derive(Debug, Clone)]
196#[non_exhaustive]
197pub enum DeadLetterReason {
198 /// No workflow is registered for this PID in the [`PidRouter`].
199 ///
200 /// The PID is either from a future BDEW release not yet deployed or a
201 /// malformed message. Respond with a CONTRL negative acknowledgement.
202 ///
203 /// [`PidRouter`]: crate::pid_router::PidRouter
204 UnknownPid {
205 /// The numeric Prüfidentifikator that had no registered workflow.
206 pid: u32,
207 /// §22 MessZV structured audit context.
208 context: AuditContext,
209 },
210
211 /// No in-flight process matched the inbound `conversation_id`.
212 ///
213 /// This typically means the process completed, was never started, or
214 /// the [`ProcessRegistry`] was lost on restart (see.
215 ///
216 /// [`ProcessRegistry`]: crate::registry::ProcessRegistry
217 UnknownConversation {
218 /// The `conversation_id` from the inbound EDIFACT interchange.
219 conversation_id: String,
220 /// §22 MessZV structured audit context.
221 context: AuditContext,
222 },
223
224 /// The message's format version has no registered [`MessageAdapter`].
225 ///
226 /// Either the adapter registry is incomplete (see or the sender
227 /// is using a deprecated / future format version.
228 ///
229 /// [`MessageAdapter`]: crate::message_adapter::MessageAdapter
230 VersionMismatch {
231 /// The format version string the adapter registry expected.
232 expected: String,
233 /// The format version string carried in the inbound message.
234 received: String,
235 /// §22 MessZV structured audit context.
236 context: AuditContext,
237 },
238
239 /// A message with this inbox key was already accepted (AS4 duplicate).
240 ///
241 /// The AS4 sender retries for up to 72 hours. The [`InboxStore`]
242 /// detected the duplicate and the message must not be processed again.
243 ///
244 /// [`InboxStore`]: crate::inbox::InboxStore
245 DuplicateMessage {
246 /// The inbox deduplication key (typically the AS4 `MessageId`).
247 inbox_key: String,
248 /// §22 MessZV structured audit context.
249 context: AuditContext,
250 },
251
252 /// A workflow or adapter returned a processing error.
253 ///
254 /// The message was routed correctly but could not be processed. Use
255 /// this variant when the failure is definitive (not retriable).
256 ProcessingError {
257 /// Short, human-readable description of the failure.
258 message: String,
259 /// §22 MessZV structured audit context.
260 context: AuditContext,
261 },
262
263 /// The outbox delivery worker gave up after exhausting all retry attempts.
264 ///
265 /// The message was re-queued `max_attempts` times and never successfully
266 /// delivered to the AS4 endpoint (or ERP webhook). The message is removed
267 /// from the outbox and recorded here for regulatory audit.
268 OutboxExhausted {
269 /// The outbox message ID of the undeliverable message.
270 message_id: crate::ids::OutboxMessageId,
271 /// The message type (e.g. `"APERAK"`, `"CONTRL"`).
272 message_type: String,
273 /// The intended recipient GLN.
274 recipient: String,
275 /// The last error returned by the AS4 sender.
276 last_error: String,
277 /// How many delivery attempts were made.
278 attempts: u32,
279 },
280}
281
282impl DeadLetterReason {
283 /// Short label identifying the rejection category.
284 ///
285 /// Suitable for structured log fields and metric labels.
286 #[must_use]
287 pub fn label(&self) -> &'static str {
288 match self {
289 Self::UnknownPid { .. } => "unknown_pid",
290 Self::UnknownConversation { .. } => "unknown_conversation",
291 Self::VersionMismatch { .. } => "version_mismatch",
292 Self::DuplicateMessage { .. } => "duplicate_message",
293 Self::ProcessingError { .. } => "processing_error",
294 Self::OutboxExhausted { .. } => "outbox_exhausted",
295 }
296 }
297
298 /// Return the [`AuditContext`] embedded in this reason, if present.
299 ///
300 /// `OutboxExhausted` does not carry an `AuditContext` because it refers
301 /// to an outbound message (not an inbound AS4 message).
302 #[must_use]
303 pub fn audit_context(&self) -> Option<&AuditContext> {
304 match self {
305 Self::UnknownPid { context, .. }
306 | Self::UnknownConversation { context, .. }
307 | Self::VersionMismatch { context, .. }
308 | Self::DuplicateMessage { context, .. }
309 | Self::ProcessingError { context, .. } => Some(context),
310 Self::OutboxExhausted { .. } => None,
311 }
312 }
313}
314
315impl std::fmt::Display for DeadLetterReason {
316 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
317 match self {
318 Self::UnknownPid { pid, .. } => write!(f, "unknown PID {pid}"),
319 Self::UnknownConversation {
320 conversation_id, ..
321 } => {
322 write!(f, "unknown conversation {conversation_id}")
323 }
324 Self::VersionMismatch {
325 expected, received, ..
326 } => write!(
327 f,
328 "version mismatch: expected {expected}, received {received}"
329 ),
330 Self::DuplicateMessage { inbox_key, .. } => write!(f, "duplicate message {inbox_key}"),
331 Self::ProcessingError { message, .. } => write!(f, "processing error: {message}"),
332 Self::OutboxExhausted {
333 message_id,
334 message_type,
335 recipient,
336 attempts,
337 ..
338 } => write!(
339 f,
340 "outbox exhausted after {attempts} attempts: {message_type} → {recipient} (id={message_id})"
341 ),
342 }
343 }
344}
345
346// ── DeadLetterSink trait ──────────────────────────────────────────────────────
347
348/// Receives messages that cannot be routed or processed.
349///
350/// Implement this trait to:
351/// - Emit CONTRL negative acknowledgements for unroutable messages
352/// - Persist rejections to a durable dead-letter queue for manual review
353/// - Trigger alerts when duplicate-message counts exceed a threshold
354///
355/// The method is **synchronous**. Implementations that require async work
356/// (network calls, database writes) must use `tokio::spawn` internally.
357///
358/// # Default
359///
360/// The default dead-letter sink is [`LogDeadLetterSink`], which emits
361/// `tracing::warn!` events. Override with
362/// [`EngineBuilder::with_dead_letter_sink`] to add CONTRL dispatch or
363/// persistent DLQ storage.
364///
365/// [`EngineBuilder::with_dead_letter_sink`]: crate::builder::EngineBuilder::with_dead_letter_sink
366pub trait DeadLetterSink: Send + Sync + 'static {
367 /// Record a rejected message.
368 ///
369 /// Called by the dispatch path synchronously, before the inbound
370 /// message is acknowledged at the AS4 transport layer. Must not block.
371 fn reject(&self, reason: &DeadLetterReason);
372}
373
374// ── LogDeadLetterSink ─────────────────────────────────────────────────────────
375
376/// A [`DeadLetterSink`] that emits a structured `tracing::warn!` event for
377/// every rejected message.
378///
379/// Suitable for all deployment tiers. In production, combine with a
380/// `tracing` subscriber that forwards `warn`-level events to your alert
381/// pipeline (Loki, CloudWatch, etc.).
382///
383/// This is the **default** dead-letter sink in [`EngineBuilder`].
384///
385/// [`EngineBuilder`]: crate::builder::EngineBuilder
386#[derive(Debug, Clone, Default)]
387pub struct LogDeadLetterSink;
388
389impl DeadLetterSink for LogDeadLetterSink {
390 fn reject(&self, reason: &DeadLetterReason) {
391 // Emit all §22 MessZV structured audit fields when available.
392 if let Some(ctx) = reason.audit_context() {
393 tracing::warn!(
394 reason = reason.label(),
395 message_type = ctx.message_type.as_deref().unwrap_or(""),
396 release_code = ctx.release_code.as_deref().unwrap_or(""),
397 pid = ctx.pid.unwrap_or(0),
398 sender_eic = ctx.sender_eic.as_deref().unwrap_or(""),
399 receiver_eic = ctx.receiver_eic.as_deref().unwrap_or(""),
400 message_ref = ctx.message_ref.as_deref().unwrap_or(""),
401 process_id = ctx.process_id.as_deref().unwrap_or(""),
402 tenant_id = ctx.tenant_id.as_deref().unwrap_or(""),
403 correlation_id = ctx.correlation_id.as_deref().unwrap_or(""),
404 %ctx.timestamp,
405 "dead letter: {reason}",
406 );
407 } else {
408 // OutboxExhausted has no inbound audit context; log its own fields.
409 match reason {
410 DeadLetterReason::OutboxExhausted {
411 message_id,
412 message_type,
413 recipient,
414 last_error,
415 attempts,
416 } => {
417 tracing::error!(
418 %message_id,
419 message_type,
420 recipient,
421 last_error,
422 attempts,
423 reason = reason.label(),
424 "dead letter: outbox exhausted — message removed after max delivery \
425 attempts; manual intervention required to deliver this message",
426 );
427 }
428 _ => {
429 tracing::warn!(reason = reason.label(), "dead letter: {reason}");
430 }
431 }
432 }
433 }
434}
435
436// ── NoopDeadLetterSink ────────────────────────────────────────────────────────
437
438/// A [`DeadLetterSink`] that silently discards all rejected messages.
439///
440/// **Use only in unit tests** where dead-letter events are not the subject
441/// under test. Using this in production means unroutable messages are lost
442/// without any diagnostic output, violating BDEW AS4 requirements.
443#[derive(Debug, Clone, Default)]
444#[must_use = "NoopDeadLetterSink discards all rejections; use LogDeadLetterSink in production"]
445pub struct NoopDeadLetterSink;
446
447#[cfg(any(test, feature = "testing"))]
448impl DeadLetterSink for NoopDeadLetterSink {
449 fn reject(&self, _reason: &DeadLetterReason) {}
450}
451
452// ── ArcDeadLetterSink ─────────────────────────────────────────────────────────
453
454/// Blanket implementation so `Arc<T>` is a `DeadLetterSink` whenever `T` is.
455///
456/// This allows passing `Arc<LogDeadLetterSink>` or `Arc<dyn DeadLetterSink>`
457/// wherever a `DeadLetterSink` is expected without an extra wrapper.
458impl<T: DeadLetterSink> DeadLetterSink for Arc<T> {
459 fn reject(&self, reason: &DeadLetterReason) {
460 self.as_ref().reject(reason);
461 }
462}
463
464#[cfg(test)]
465mod tests {
466 use super::*;
467
468 #[test]
469 fn dead_letter_reason_labels() {
470 assert_eq!(
471 DeadLetterReason::UnknownPid {
472 pid: 55001,
473 context: AuditContext::now()
474 }
475 .label(),
476 "unknown_pid"
477 );
478 assert_eq!(
479 DeadLetterReason::UnknownConversation {
480 conversation_id: "abc".into(),
481 context: AuditContext::now(),
482 }
483 .label(),
484 "unknown_conversation"
485 );
486 assert_eq!(
487 DeadLetterReason::VersionMismatch {
488 expected: "FV2025-10-01".into(),
489 received: "FV2026-10-01".into(),
490 context: AuditContext::now(),
491 }
492 .label(),
493 "version_mismatch"
494 );
495 assert_eq!(
496 DeadLetterReason::DuplicateMessage {
497 inbox_key: "msg-1".into(),
498 context: AuditContext::now(),
499 }
500 .label(),
501 "duplicate_message"
502 );
503 assert_eq!(
504 DeadLetterReason::ProcessingError {
505 message: "invalid state".into(),
506 context: AuditContext::now(),
507 }
508 .label(),
509 "processing_error"
510 );
511 }
512
513 #[test]
514 fn log_sink_does_not_panic() {
515 let sink = LogDeadLetterSink;
516 sink.reject(&DeadLetterReason::UnknownPid {
517 pid: 99999,
518 context: AuditContext::now(),
519 });
520 sink.reject(&DeadLetterReason::UnknownConversation {
521 conversation_id: "conv-123".into(),
522 context: AuditContext::now(),
523 });
524 sink.reject(&DeadLetterReason::VersionMismatch {
525 expected: "FV2025-10-01".into(),
526 received: "FV2026-10-01".into(),
527 context: AuditContext::now(),
528 });
529 sink.reject(&DeadLetterReason::DuplicateMessage {
530 inbox_key: "msg-42".into(),
531 context: AuditContext::now(),
532 });
533 sink.reject(&DeadLetterReason::ProcessingError {
534 message: "workflow rejected command".into(),
535 context: AuditContext::now(),
536 });
537 }
538
539 #[test]
540 fn noop_sink_does_not_panic() {
541 let sink = NoopDeadLetterSink;
542 sink.reject(&DeadLetterReason::UnknownPid {
543 pid: 55001,
544 context: AuditContext::now(),
545 });
546 }
547
548 #[test]
549 fn arc_blanket_impl_works() {
550 let sink: Arc<LogDeadLetterSink> = Arc::new(LogDeadLetterSink);
551 sink.reject(&DeadLetterReason::UnknownPid {
552 pid: 1,
553 context: AuditContext::now(),
554 });
555 }
556
557 #[test]
558 fn dead_letter_reason_display() {
559 assert_eq!(
560 DeadLetterReason::UnknownPid {
561 pid: 55001,
562 context: AuditContext::now()
563 }
564 .to_string(),
565 "unknown PID 55001"
566 );
567 assert!(
568 DeadLetterReason::VersionMismatch {
569 expected: "FV2025-10-01".into(),
570 received: "FV2026-10-01".into(),
571 context: AuditContext::now(),
572 }
573 .to_string()
574 .contains("version mismatch")
575 );
576 }
577
578 #[test]
579 fn audit_context_builder() {
580 let ctx = AuditContext::now()
581 .with_message_type("UTILMD")
582 .with_pid(55001)
583 .with_sender_eic("4012345000023")
584 .with_receiver_eic("9900357000004")
585 .with_message_ref("00001")
586 .with_tenant_id("tenant-a")
587 .with_correlation_id("conv-xyz");
588
589 assert_eq!(ctx.message_type.as_deref(), Some("UTILMD"));
590 assert_eq!(ctx.pid, Some(55001));
591 assert_eq!(ctx.sender_eic.as_deref(), Some("4012345000023"));
592 assert_eq!(ctx.correlation_id.as_deref(), Some("conv-xyz"));
593 }
594
595 #[test]
596 fn audit_context_returned_for_inbound_reasons() {
597 let r = DeadLetterReason::UnknownPid {
598 pid: 99,
599 context: AuditContext::now().with_pid(99),
600 };
601 assert!(r.audit_context().is_some());
602 }
603}