Skip to main content

mako_engine/
dead_letter.rs

1//! Dead-letter sink for unroutable or unprocessable inbound messages.
2//!
3//! BDEW AS4 requires that every received message is either processed or
4//! explicitly refused (CONTRL negative acknowledgement). Messages that
5//! cannot be routed to a workflow — because the PID is unknown, the
6//! conversation is not in-flight, or the format version has no adapter —
7//! must not be silently dropped.
8//!
9//! # Design
10//!
11//! [`DeadLetterSink`] is a synchronous trait that receives a structured
12//! [`DeadLetterReason`] for every rejected message. The synchronous contract
13//! keeps dispatch-path hot code fast; implementations that need async work
14//! (e.g. persisting to a durable DLQ or sending a CONTRL) can use
15//! `tokio::spawn` internally.
16//!
17//! # Implementations
18//!
19//! | Type | Behaviour |
20//! |------|-----------|
21//! | [`LogDeadLetterSink`] | Emits structured `tracing::warn!`; suitable for all deployments |
22//! | [`NoopDeadLetterSink`] | Silently discards; **only for testing** |
23//!
24//! # Wiring
25//!
26//! Pass an implementation to [`EngineBuilder::with_dead_letter_sink`].
27//! The default is [`LogDeadLetterSink`] so unroutable messages are always
28//! visible in the log output without any configuration.
29//!
30//! ```rust
31//! use mako_engine::dead_letter::{AuditContext, DeadLetterReason, DeadLetterSink, LogDeadLetterSink};
32//!
33//! let sink = LogDeadLetterSink;
34//! sink.reject(&DeadLetterReason::UnknownPid { pid: 99999, context: AuditContext::now() });
35//! ```
36//!
37//! [`EngineBuilder::with_dead_letter_sink`]: crate::builder::EngineBuilder::with_dead_letter_sink
38
39use std::sync::Arc;
40
41// ── AuditContext ──────────────────────────────────────────────────────────────
42
43/// Structured audit context attached to every dead-letter event.
44///
45/// All fields are `Option` because they are only partially known at rejection
46/// time (e.g. `pid` is not available for a parse failure before the PID is
47/// decoded). Callers fill in as many fields as they have.
48///
49/// These fields map to §22 MessZV audit-log requirements for AS4 message
50/// rejection events:
51///
52/// | Field | §22 MessZV requirement |
53/// |---|---|
54/// | `message_type` | Nachrichtentyp (UTILMD, MSCONS, APERAK, …) |
55/// | `release_code` | Releasekennung (S2.1, G1.1, 2.4c, …) |
56/// | `pid` | Prüfidentifikator |
57/// | `sender_eic` | GLN des Absenders |
58/// | `receiver_eic` | GLN des Empfängers |
59/// | `message_ref` | UNH-Referenz |
60/// | `process_id` | Geschäftsvorfallkennung |
61/// | `tenant_id` | Mandant |
62/// | `correlation_id` | AS4 `ConversationId` or similar |
63/// | `timestamp` | Zeitstempel des Eingangs (German local time) |
64#[derive(Debug, Clone)]
65pub struct AuditContext {
66    /// EDIFACT message type (e.g. `"UTILMD"`, `"MSCONS"`, `"APERAK"`).
67    pub message_type: Option<String>,
68    /// BDEW release code (e.g. `"S2.1"`, `"G1.1"`, `"2.4c"`).
69    pub release_code: Option<String>,
70    /// BDEW Prüfidentifikator numeric code.
71    pub pid: Option<u32>,
72    /// GLN of the AS4 sender.
73    pub sender_eic: Option<String>,
74    /// GLN of the AS4 receiver.
75    pub receiver_eic: Option<String>,
76    /// UNH message reference (interchange + message ref).
77    pub message_ref: Option<String>,
78    /// Internal process / workflow stream ID.
79    pub process_id: Option<String>,
80    /// Tenant identifier (Mandant).
81    pub tenant_id: Option<String>,
82    /// AS4 ConversationId or engine correlation key.
83    pub correlation_id: Option<String>,
84    /// Timestamp of message receipt, in German local time (CET/CEST).
85    pub timestamp: time::OffsetDateTime,
86}
87
88impl AuditContext {
89    /// Create an `AuditContext` with only a timestamp, all other fields `None`.
90    ///
91    /// Use builder-style setters to fill in known fields:
92    /// ```rust
93    /// use mako_engine::dead_letter::AuditContext;
94    ///
95    /// let ctx = AuditContext::now()
96    ///     .with_message_type("UTILMD")
97    ///     .with_pid(55001)
98    ///     .with_sender_eic("4012345000023");
99    /// ```
100    #[must_use]
101    pub fn now() -> Self {
102        Self {
103            message_type: None,
104            release_code: None,
105            pid: None,
106            sender_eic: None,
107            receiver_eic: None,
108            message_ref: None,
109            process_id: None,
110            tenant_id: None,
111            correlation_id: None,
112            timestamp: time::OffsetDateTime::now_utc(),
113        }
114    }
115
116    /// Set the message type.
117    #[must_use]
118    pub fn with_message_type(mut self, mt: impl Into<String>) -> Self {
119        self.message_type = Some(mt.into());
120        self
121    }
122
123    /// Set the BDEW release code.
124    #[must_use]
125    pub fn with_release_code(mut self, rc: impl Into<String>) -> Self {
126        self.release_code = Some(rc.into());
127        self
128    }
129
130    /// Set the Prüfidentifikator.
131    #[must_use]
132    pub fn with_pid(mut self, pid: u32) -> Self {
133        self.pid = Some(pid);
134        self
135    }
136
137    /// Set the sender GLN.
138    #[must_use]
139    pub fn with_sender_eic(mut self, eic: impl Into<String>) -> Self {
140        self.sender_eic = Some(eic.into());
141        self
142    }
143
144    /// Set the receiver GLN.
145    #[must_use]
146    pub fn with_receiver_eic(mut self, eic: impl Into<String>) -> Self {
147        self.receiver_eic = Some(eic.into());
148        self
149    }
150
151    /// Set the UNH message reference.
152    #[must_use]
153    pub fn with_message_ref(mut self, r: impl Into<String>) -> Self {
154        self.message_ref = Some(r.into());
155        self
156    }
157
158    /// Set the internal process / stream ID.
159    #[must_use]
160    pub fn with_process_id(mut self, id: impl Into<String>) -> Self {
161        self.process_id = Some(id.into());
162        self
163    }
164
165    /// Set the tenant identifier.
166    #[must_use]
167    pub fn with_tenant_id(mut self, id: impl Into<String>) -> Self {
168        self.tenant_id = Some(id.into());
169        self
170    }
171
172    /// Set the AS4 correlation / conversation ID.
173    #[must_use]
174    pub fn with_correlation_id(mut self, id: impl Into<String>) -> Self {
175        self.correlation_id = Some(id.into());
176        self
177    }
178}
179
180impl Default for AuditContext {
181    fn default() -> Self {
182        Self::now()
183    }
184}
185
186// ── DeadLetterReason ──────────────────────────────────────────────────────────
187
188/// Structured reason why an inbound message was rejected.
189///
190/// The variant gives the dispatch path enough information to emit an
191/// actionable CONTRL or log entry. Each variant carries an [`AuditContext`]
192/// with the §22 MessZV fields required for regulatory audit logging.
193///
194/// Adding new variants is a non-breaking change thanks to `#[non_exhaustive]`.
195#[derive(Debug, Clone)]
196#[non_exhaustive]
197pub enum DeadLetterReason {
198    /// No workflow is registered for this PID in the [`PidRouter`].
199    ///
200    /// The PID is either from a future BDEW release not yet deployed or a
201    /// malformed message. Respond with a CONTRL negative acknowledgement.
202    ///
203    /// [`PidRouter`]: crate::pid_router::PidRouter
204    UnknownPid {
205        /// The numeric Prüfidentifikator that had no registered workflow.
206        pid: u32,
207        /// §22 MessZV structured audit context.
208        context: AuditContext,
209    },
210
211    /// No in-flight process matched the inbound `conversation_id`.
212    ///
213    /// This typically means the process completed, was never started, or
214    /// the [`ProcessRegistry`] was lost on restart (see.
215    ///
216    /// [`ProcessRegistry`]: crate::registry::ProcessRegistry
217    UnknownConversation {
218        /// The `conversation_id` from the inbound EDIFACT interchange.
219        conversation_id: String,
220        /// §22 MessZV structured audit context.
221        context: AuditContext,
222    },
223
224    /// The message's format version has no registered [`MessageAdapter`].
225    ///
226    /// Either the adapter registry is incomplete (see or the sender
227    /// is using a deprecated / future format version.
228    ///
229    /// [`MessageAdapter`]: crate::message_adapter::MessageAdapter
230    VersionMismatch {
231        /// The format version string the adapter registry expected.
232        expected: String,
233        /// The format version string carried in the inbound message.
234        received: String,
235        /// §22 MessZV structured audit context.
236        context: AuditContext,
237    },
238
239    /// A message with this inbox key was already accepted (AS4 duplicate).
240    ///
241    /// The AS4 sender retries for up to 72 hours. The [`InboxStore`]
242    /// detected the duplicate and the message must not be processed again.
243    ///
244    /// [`InboxStore`]: crate::inbox::InboxStore
245    DuplicateMessage {
246        /// The inbox deduplication key (typically the AS4 `MessageId`).
247        inbox_key: String,
248        /// §22 MessZV structured audit context.
249        context: AuditContext,
250    },
251
252    /// A workflow or adapter returned a processing error.
253    ///
254    /// The message was routed correctly but could not be processed. Use
255    /// this variant when the failure is definitive (not retriable).
256    ProcessingError {
257        /// Short, human-readable description of the failure.
258        message: String,
259        /// §22 MessZV structured audit context.
260        context: AuditContext,
261    },
262
263    /// The outbox delivery worker gave up after exhausting all retry attempts.
264    ///
265    /// The message was re-queued `max_attempts` times and never successfully
266    /// delivered to the AS4 endpoint (or ERP webhook). The message is removed
267    /// from the outbox and recorded here for regulatory audit.
268    OutboxExhausted {
269        /// The outbox message ID of the undeliverable message.
270        message_id: crate::ids::OutboxMessageId,
271        /// The message type (e.g. `"APERAK"`, `"CONTRL"`).
272        message_type: String,
273        /// The intended recipient GLN.
274        recipient: String,
275        /// The last error returned by the AS4 sender.
276        last_error: String,
277        /// How many delivery attempts were made.
278        attempts: u32,
279    },
280}
281
282impl DeadLetterReason {
283    /// Short label identifying the rejection category.
284    ///
285    /// Suitable for structured log fields and metric labels.
286    #[must_use]
287    pub fn label(&self) -> &'static str {
288        match self {
289            Self::UnknownPid { .. } => "unknown_pid",
290            Self::UnknownConversation { .. } => "unknown_conversation",
291            Self::VersionMismatch { .. } => "version_mismatch",
292            Self::DuplicateMessage { .. } => "duplicate_message",
293            Self::ProcessingError { .. } => "processing_error",
294            Self::OutboxExhausted { .. } => "outbox_exhausted",
295        }
296    }
297
298    /// Return the [`AuditContext`] embedded in this reason, if present.
299    ///
300    /// `OutboxExhausted` does not carry an `AuditContext` because it refers
301    /// to an outbound message (not an inbound AS4 message).
302    #[must_use]
303    pub fn audit_context(&self) -> Option<&AuditContext> {
304        match self {
305            Self::UnknownPid { context, .. }
306            | Self::UnknownConversation { context, .. }
307            | Self::VersionMismatch { context, .. }
308            | Self::DuplicateMessage { context, .. }
309            | Self::ProcessingError { context, .. } => Some(context),
310            Self::OutboxExhausted { .. } => None,
311        }
312    }
313}
314
315impl std::fmt::Display for DeadLetterReason {
316    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
317        match self {
318            Self::UnknownPid { pid, .. } => write!(f, "unknown PID {pid}"),
319            Self::UnknownConversation {
320                conversation_id, ..
321            } => {
322                write!(f, "unknown conversation {conversation_id}")
323            }
324            Self::VersionMismatch {
325                expected, received, ..
326            } => write!(
327                f,
328                "version mismatch: expected {expected}, received {received}"
329            ),
330            Self::DuplicateMessage { inbox_key, .. } => write!(f, "duplicate message {inbox_key}"),
331            Self::ProcessingError { message, .. } => write!(f, "processing error: {message}"),
332            Self::OutboxExhausted {
333                message_id,
334                message_type,
335                recipient,
336                attempts,
337                ..
338            } => write!(
339                f,
340                "outbox exhausted after {attempts} attempts: {message_type} → {recipient} (id={message_id})"
341            ),
342        }
343    }
344}
345
346// ── DeadLetterSink trait ──────────────────────────────────────────────────────
347
348/// Receives messages that cannot be routed or processed.
349///
350/// Implement this trait to:
351/// - Emit CONTRL negative acknowledgements for unroutable messages
352/// - Persist rejections to a durable dead-letter queue for manual review
353/// - Trigger alerts when duplicate-message counts exceed a threshold
354///
355/// The method is **synchronous**. Implementations that require async work
356/// (network calls, database writes) must use `tokio::spawn` internally.
357///
358/// # Default
359///
360/// The default dead-letter sink is [`LogDeadLetterSink`], which emits
361/// `tracing::warn!` events. Override with
362/// [`EngineBuilder::with_dead_letter_sink`] to add CONTRL dispatch or
363/// persistent DLQ storage.
364///
365/// [`EngineBuilder::with_dead_letter_sink`]: crate::builder::EngineBuilder::with_dead_letter_sink
366pub trait DeadLetterSink: Send + Sync + 'static {
367    /// Record a rejected message.
368    ///
369    /// Called by the dispatch path synchronously, before the inbound
370    /// message is acknowledged at the AS4 transport layer. Must not block.
371    fn reject(&self, reason: &DeadLetterReason);
372}
373
374// ── LogDeadLetterSink ─────────────────────────────────────────────────────────
375
376/// A [`DeadLetterSink`] that emits a structured `tracing::warn!` event for
377/// every rejected message.
378///
379/// Suitable for all deployment tiers. In production, combine with a
380/// `tracing` subscriber that forwards `warn`-level events to your alert
381/// pipeline (Loki, CloudWatch, etc.).
382///
383/// This is the **default** dead-letter sink in [`EngineBuilder`].
384///
385/// [`EngineBuilder`]: crate::builder::EngineBuilder
386#[derive(Debug, Clone, Default)]
387pub struct LogDeadLetterSink;
388
389impl DeadLetterSink for LogDeadLetterSink {
390    fn reject(&self, reason: &DeadLetterReason) {
391        // Emit all §22 MessZV structured audit fields when available.
392        if let Some(ctx) = reason.audit_context() {
393            tracing::warn!(
394                reason = reason.label(),
395                message_type = ctx.message_type.as_deref().unwrap_or(""),
396                release_code = ctx.release_code.as_deref().unwrap_or(""),
397                pid = ctx.pid.unwrap_or(0),
398                sender_eic = ctx.sender_eic.as_deref().unwrap_or(""),
399                receiver_eic = ctx.receiver_eic.as_deref().unwrap_or(""),
400                message_ref = ctx.message_ref.as_deref().unwrap_or(""),
401                process_id = ctx.process_id.as_deref().unwrap_or(""),
402                tenant_id = ctx.tenant_id.as_deref().unwrap_or(""),
403                correlation_id = ctx.correlation_id.as_deref().unwrap_or(""),
404                %ctx.timestamp,
405                "dead letter: {reason}",
406            );
407        } else {
408            // OutboxExhausted has no inbound audit context; log its own fields.
409            match reason {
410                DeadLetterReason::OutboxExhausted {
411                    message_id,
412                    message_type,
413                    recipient,
414                    last_error,
415                    attempts,
416                } => {
417                    tracing::error!(
418                        %message_id,
419                        message_type,
420                        recipient,
421                        last_error,
422                        attempts,
423                        reason = reason.label(),
424                        "dead letter: outbox exhausted — message removed after max delivery \
425                         attempts; manual intervention required to deliver this message",
426                    );
427                }
428                _ => {
429                    tracing::warn!(reason = reason.label(), "dead letter: {reason}");
430                }
431            }
432        }
433    }
434}
435
436// ── NoopDeadLetterSink ────────────────────────────────────────────────────────
437
438/// A [`DeadLetterSink`] that silently discards all rejected messages.
439///
440/// **Use only in unit tests** where dead-letter events are not the subject
441/// under test. Using this in production means unroutable messages are lost
442/// without any diagnostic output, violating BDEW AS4 requirements.
443#[derive(Debug, Clone, Default)]
444#[must_use = "NoopDeadLetterSink discards all rejections; use LogDeadLetterSink in production"]
445pub struct NoopDeadLetterSink;
446
447#[cfg(any(test, feature = "testing"))]
448impl DeadLetterSink for NoopDeadLetterSink {
449    fn reject(&self, _reason: &DeadLetterReason) {}
450}
451
452// ── ArcDeadLetterSink ─────────────────────────────────────────────────────────
453
454/// Blanket implementation so `Arc<T>` is a `DeadLetterSink` whenever `T` is.
455///
456/// This allows passing `Arc<LogDeadLetterSink>` or `Arc<dyn DeadLetterSink>`
457/// wherever a `DeadLetterSink` is expected without an extra wrapper.
458impl<T: DeadLetterSink> DeadLetterSink for Arc<T> {
459    fn reject(&self, reason: &DeadLetterReason) {
460        self.as_ref().reject(reason);
461    }
462}
463
464#[cfg(test)]
465mod tests {
466    use super::*;
467
468    #[test]
469    fn dead_letter_reason_labels() {
470        assert_eq!(
471            DeadLetterReason::UnknownPid {
472                pid: 55001,
473                context: AuditContext::now()
474            }
475            .label(),
476            "unknown_pid"
477        );
478        assert_eq!(
479            DeadLetterReason::UnknownConversation {
480                conversation_id: "abc".into(),
481                context: AuditContext::now(),
482            }
483            .label(),
484            "unknown_conversation"
485        );
486        assert_eq!(
487            DeadLetterReason::VersionMismatch {
488                expected: "FV2025-10-01".into(),
489                received: "FV2026-10-01".into(),
490                context: AuditContext::now(),
491            }
492            .label(),
493            "version_mismatch"
494        );
495        assert_eq!(
496            DeadLetterReason::DuplicateMessage {
497                inbox_key: "msg-1".into(),
498                context: AuditContext::now(),
499            }
500            .label(),
501            "duplicate_message"
502        );
503        assert_eq!(
504            DeadLetterReason::ProcessingError {
505                message: "invalid state".into(),
506                context: AuditContext::now(),
507            }
508            .label(),
509            "processing_error"
510        );
511    }
512
513    #[test]
514    fn log_sink_does_not_panic() {
515        let sink = LogDeadLetterSink;
516        sink.reject(&DeadLetterReason::UnknownPid {
517            pid: 99999,
518            context: AuditContext::now(),
519        });
520        sink.reject(&DeadLetterReason::UnknownConversation {
521            conversation_id: "conv-123".into(),
522            context: AuditContext::now(),
523        });
524        sink.reject(&DeadLetterReason::VersionMismatch {
525            expected: "FV2025-10-01".into(),
526            received: "FV2026-10-01".into(),
527            context: AuditContext::now(),
528        });
529        sink.reject(&DeadLetterReason::DuplicateMessage {
530            inbox_key: "msg-42".into(),
531            context: AuditContext::now(),
532        });
533        sink.reject(&DeadLetterReason::ProcessingError {
534            message: "workflow rejected command".into(),
535            context: AuditContext::now(),
536        });
537    }
538
539    #[test]
540    fn noop_sink_does_not_panic() {
541        let sink = NoopDeadLetterSink;
542        sink.reject(&DeadLetterReason::UnknownPid {
543            pid: 55001,
544            context: AuditContext::now(),
545        });
546    }
547
548    #[test]
549    fn arc_blanket_impl_works() {
550        let sink: Arc<LogDeadLetterSink> = Arc::new(LogDeadLetterSink);
551        sink.reject(&DeadLetterReason::UnknownPid {
552            pid: 1,
553            context: AuditContext::now(),
554        });
555    }
556
557    #[test]
558    fn dead_letter_reason_display() {
559        assert_eq!(
560            DeadLetterReason::UnknownPid {
561                pid: 55001,
562                context: AuditContext::now()
563            }
564            .to_string(),
565            "unknown PID 55001"
566        );
567        assert!(
568            DeadLetterReason::VersionMismatch {
569                expected: "FV2025-10-01".into(),
570                received: "FV2026-10-01".into(),
571                context: AuditContext::now(),
572            }
573            .to_string()
574            .contains("version mismatch")
575        );
576    }
577
578    #[test]
579    fn audit_context_builder() {
580        let ctx = AuditContext::now()
581            .with_message_type("UTILMD")
582            .with_pid(55001)
583            .with_sender_eic("4012345000023")
584            .with_receiver_eic("9900357000004")
585            .with_message_ref("00001")
586            .with_tenant_id("tenant-a")
587            .with_correlation_id("conv-xyz");
588
589        assert_eq!(ctx.message_type.as_deref(), Some("UTILMD"));
590        assert_eq!(ctx.pid, Some(55001));
591        assert_eq!(ctx.sender_eic.as_deref(), Some("4012345000023"));
592        assert_eq!(ctx.correlation_id.as_deref(), Some("conv-xyz"));
593    }
594
595    #[test]
596    fn audit_context_returned_for_inbound_reasons() {
597        let r = DeadLetterReason::UnknownPid {
598            pid: 99,
599            context: AuditContext::now().with_pid(99),
600        };
601        assert!(r.audit_context().is_some());
602    }
603}