Skip to main content

mako_engine/
dead_letter.rs

1//! Dead-letter sink for unroutable or unprocessable inbound messages.
2//!
3//! BDEW AS4 requires that every received message is either processed or
4//! explicitly refused (CONTRL negative acknowledgement). Messages that
5//! cannot be routed to a workflow — because the PID is unknown, the
6//! conversation is not in-flight, or the format version has no adapter —
7//! must not be silently dropped.
8//!
9//! # Design
10//!
11//! [`DeadLetterSink`] is a synchronous trait that receives a structured
12//! [`DeadLetterReason`] for every rejected message. The synchronous contract
13//! keeps dispatch-path hot code fast; implementations that need async work
14//! (e.g. persisting to a durable DLQ or sending a CONTRL) can use
15//! `tokio::spawn` internally.
16//!
17//! # Implementations
18//!
19//! | Type | Behaviour |
20//! |------|-----------|
21//! | [`LogDeadLetterSink`] | Emits structured `tracing::warn!`; suitable for all deployments |
22//! | [`NoopDeadLetterSink`] | Silently discards; **only for testing** |
23//!
24//! # Wiring
25//!
26//! Pass an implementation to [`EngineBuilder::with_dead_letter_sink`].
27//! The default is [`LogDeadLetterSink`] so unroutable messages are always
28//! visible in the log output without any configuration.
29//!
30//! ```rust
31//! use mako_engine::dead_letter::{DeadLetterReason, DeadLetterSink, LogDeadLetterSink};
32//!
33//! let sink = LogDeadLetterSink;
34//! sink.reject(&DeadLetterReason::UnknownPid(99999));
35//! ```
36//!
37//! [`EngineBuilder::with_dead_letter_sink`]: crate::builder::EngineBuilder::with_dead_letter_sink
38
39use std::sync::Arc;
40
41// ── DeadLetterReason ──────────────────────────────────────────────────────────
42
43/// Structured reason why an inbound message was rejected.
44///
45/// The variant gives the dispatch path enough information to emit an
46/// actionable CONTRL or log entry. Adding new variants is a non-breaking
47/// change thanks to `#[non_exhaustive]`.
48#[derive(Debug, Clone)]
49#[non_exhaustive]
50pub enum DeadLetterReason {
51    /// No workflow is registered for this PID in the [`PidRouter`].
52    ///
53    /// The PID is either from a future BDEW release not yet deployed or a
54    /// malformed message. Respond with a CONTRL negative acknowledgement.
55    ///
56    /// [`PidRouter`]: crate::pid_router::PidRouter
57    UnknownPid(u32),
58
59    /// No in-flight process matched the inbound `conversation_id`.
60    ///
61    /// This typically means the process completed, was never started, or
62    /// the [`ProcessRegistry`] was lost on restart (see.
63    ///
64    /// [`ProcessRegistry`]: crate::registry::ProcessRegistry
65    UnknownConversation {
66        /// The `conversation_id` from the inbound EDIFACT interchange.
67        conversation_id: String,
68    },
69
70    /// The message's format version has no registered [`MessageAdapter`].
71    ///
72    /// Either the adapter registry is incomplete (see or the sender
73    /// is using a deprecated / future format version.
74    ///
75    /// [`MessageAdapter`]: crate::message_adapter::MessageAdapter
76    VersionMismatch {
77        /// The format version string the adapter registry expected.
78        expected: String,
79        /// The format version string carried in the inbound message.
80        received: String,
81    },
82
83    /// A message with this inbox key was already accepted (AS4 duplicate).
84    ///
85    /// The AS4 sender retries for up to 72 hours. The [`InboxStore`]
86    /// detected the duplicate and the message must not be processed again.
87    ///
88    /// [`InboxStore`]: crate::inbox::InboxStore
89    DuplicateMessage {
90        /// The inbox deduplication key (typically the AS4 `MessageId`).
91        inbox_key: String,
92    },
93
94    /// A workflow or adapter returned a processing error.
95    ///
96    /// The message was routed correctly but could not be processed. Use
97    /// this variant when the failure is definitive (not retriable).
98    ProcessingError {
99        /// Short, human-readable description of the failure.
100        message: String,
101    },
102
103    /// The outbox delivery worker gave up after exhausting all retry attempts.
104    ///
105    /// The message was re-queued `max_attempts` times and never successfully
106    /// delivered to the AS4 endpoint (or ERP webhook). The message is removed
107    /// from the outbox and recorded here for regulatory audit.
108    OutboxExhausted {
109        /// The outbox message ID of the undeliverable message.
110        message_id: crate::ids::OutboxMessageId,
111        /// The message type (e.g. `"APERAK"`, `"CONTRL"`).
112        message_type: String,
113        /// The intended recipient GLN.
114        recipient: String,
115        /// The last error returned by the AS4 sender.
116        last_error: String,
117        /// How many delivery attempts were made.
118        attempts: u32,
119    },
120}
121
122impl DeadLetterReason {
123    /// Short label identifying the rejection category.
124    ///
125    /// Suitable for structured log fields and metric labels.
126    #[must_use]
127    pub fn label(&self) -> &'static str {
128        match self {
129            Self::UnknownPid(_) => "unknown_pid",
130            Self::UnknownConversation { .. } => "unknown_conversation",
131            Self::VersionMismatch { .. } => "version_mismatch",
132            Self::DuplicateMessage { .. } => "duplicate_message",
133            Self::ProcessingError { .. } => "processing_error",
134            Self::OutboxExhausted { .. } => "outbox_exhausted",
135        }
136    }
137}
138
139impl std::fmt::Display for DeadLetterReason {
140    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141        match self {
142            Self::UnknownPid(pid) => write!(f, "unknown PID {pid}"),
143            Self::UnknownConversation { conversation_id } => {
144                write!(f, "unknown conversation {conversation_id}")
145            }
146            Self::VersionMismatch { expected, received } => write!(
147                f,
148                "version mismatch: expected {expected}, received {received}"
149            ),
150            Self::DuplicateMessage { inbox_key } => write!(f, "duplicate message {inbox_key}"),
151            Self::ProcessingError { message } => write!(f, "processing error: {message}"),
152            Self::OutboxExhausted {
153                message_id,
154                message_type,
155                recipient,
156                attempts,
157                ..
158            } => write!(
159                f,
160                "outbox exhausted after {attempts} attempts: {message_type} → {recipient} (id={message_id})"
161            ),
162        }
163    }
164}
165
166// ── DeadLetterSink trait ──────────────────────────────────────────────────────
167
168/// Receives messages that cannot be routed or processed.
169///
170/// Implement this trait to:
171/// - Emit CONTRL negative acknowledgements for unroutable messages
172/// - Persist rejections to a durable dead-letter queue for manual review
173/// - Trigger alerts when duplicate-message counts exceed a threshold
174///
175/// The method is **synchronous**. Implementations that require async work
176/// (network calls, database writes) must use `tokio::spawn` internally.
177///
178/// # Default
179///
180/// The default dead-letter sink is [`LogDeadLetterSink`], which emits
181/// `tracing::warn!` events. Override with
182/// [`EngineBuilder::with_dead_letter_sink`] to add CONTRL dispatch or
183/// persistent DLQ storage.
184///
185/// [`EngineBuilder::with_dead_letter_sink`]: crate::builder::EngineBuilder::with_dead_letter_sink
186pub trait DeadLetterSink: Send + Sync + 'static {
187    /// Record a rejected message.
188    ///
189    /// Called by the dispatch path synchronously, before the inbound
190    /// message is acknowledged at the AS4 transport layer. Must not block.
191    fn reject(&self, reason: &DeadLetterReason);
192}
193
194// ── LogDeadLetterSink ─────────────────────────────────────────────────────────
195
196/// A [`DeadLetterSink`] that emits a structured `tracing::warn!` event for
197/// every rejected message.
198///
199/// Suitable for all deployment tiers. In production, combine with a
200/// `tracing` subscriber that forwards `warn`-level events to your alert
201/// pipeline (Loki, CloudWatch, etc.).
202///
203/// This is the **default** dead-letter sink in [`EngineBuilder`].
204///
205/// [`EngineBuilder`]: crate::builder::EngineBuilder
206#[derive(Debug, Clone, Default)]
207pub struct LogDeadLetterSink;
208
209impl DeadLetterSink for LogDeadLetterSink {
210    fn reject(&self, reason: &DeadLetterReason) {
211        match reason {
212            DeadLetterReason::UnknownPid(pid) => {
213                tracing::warn!(
214                    pid,
215                    reason = reason.label(),
216                    "dead letter: unknown PID — no workflow registered; \
217                     send CONTRL negative acknowledgement",
218                );
219            }
220            DeadLetterReason::UnknownConversation { conversation_id } => {
221                tracing::warn!(
222                    conversation_id,
223                    reason = reason.label(),
224                    "dead letter: unknown conversation — no in-flight process found; \
225                     process may have completed or registry was lost on restart",
226                );
227            }
228            DeadLetterReason::VersionMismatch { expected, received } => {
229                tracing::warn!(
230                    expected,
231                    received,
232                    reason = reason.label(),
233                    "dead letter: format version mismatch — no adapter registered for received version",
234                );
235            }
236            DeadLetterReason::DuplicateMessage { inbox_key } => {
237                tracing::warn!(
238                    inbox_key,
239                    reason = reason.label(),
240                    "dead letter: duplicate message — AS4 retry already processed; ignoring",
241                );
242            }
243            DeadLetterReason::ProcessingError { message } => {
244                tracing::warn!(
245                    message,
246                    reason = reason.label(),
247                    "dead letter: processing error — message routed but could not be processed",
248                );
249            }
250            DeadLetterReason::OutboxExhausted {
251                message_id,
252                message_type,
253                recipient,
254                last_error,
255                attempts,
256            } => {
257                tracing::error!(
258                    %message_id,
259                    message_type,
260                    recipient,
261                    last_error,
262                    attempts,
263                    reason = reason.label(),
264                    "dead letter: outbox exhausted — message removed after max delivery attempts; \
265                     manual intervention required to deliver this message",
266                );
267            }
268        }
269    }
270}
271
272// ── NoopDeadLetterSink ────────────────────────────────────────────────────────
273
274/// A [`DeadLetterSink`] that silently discards all rejected messages.
275///
276/// **Use only in unit tests** where dead-letter events are not the subject
277/// under test. Using this in production means unroutable messages are lost
278/// without any diagnostic output, violating BDEW AS4 requirements.
279#[derive(Debug, Clone, Default)]
280#[must_use = "NoopDeadLetterSink discards all rejections; use LogDeadLetterSink in production"]
281pub struct NoopDeadLetterSink;
282
283#[cfg(any(test, feature = "testing"))]
284impl DeadLetterSink for NoopDeadLetterSink {
285    fn reject(&self, _reason: &DeadLetterReason) {}
286}
287
288// ── ArcDeadLetterSink ─────────────────────────────────────────────────────────
289
290/// Blanket implementation so `Arc<T>` is a `DeadLetterSink` whenever `T` is.
291///
292/// This allows passing `Arc<LogDeadLetterSink>` or `Arc<dyn DeadLetterSink>`
293/// wherever a `DeadLetterSink` is expected without an extra wrapper.
294impl<T: DeadLetterSink> DeadLetterSink for Arc<T> {
295    fn reject(&self, reason: &DeadLetterReason) {
296        self.as_ref().reject(reason);
297    }
298}
299
300#[cfg(test)]
301mod tests {
302    use super::*;
303
304    #[test]
305    fn dead_letter_reason_labels() {
306        assert_eq!(DeadLetterReason::UnknownPid(55001).label(), "unknown_pid");
307        assert_eq!(
308            DeadLetterReason::UnknownConversation {
309                conversation_id: "abc".into()
310            }
311            .label(),
312            "unknown_conversation"
313        );
314        assert_eq!(
315            DeadLetterReason::VersionMismatch {
316                expected: "FV2025-10-01".into(),
317                received: "FV2026-10-01".into(),
318            }
319            .label(),
320            "version_mismatch"
321        );
322        assert_eq!(
323            DeadLetterReason::DuplicateMessage {
324                inbox_key: "msg-1".into(),
325            }
326            .label(),
327            "duplicate_message"
328        );
329        assert_eq!(
330            DeadLetterReason::ProcessingError {
331                message: "invalid state".into(),
332            }
333            .label(),
334            "processing_error"
335        );
336    }
337
338    #[test]
339    fn log_sink_does_not_panic() {
340        let sink = LogDeadLetterSink;
341        sink.reject(&DeadLetterReason::UnknownPid(99999));
342        sink.reject(&DeadLetterReason::UnknownConversation {
343            conversation_id: "conv-123".into(),
344        });
345        sink.reject(&DeadLetterReason::VersionMismatch {
346            expected: "FV2025-10-01".into(),
347            received: "FV2026-10-01".into(),
348        });
349        sink.reject(&DeadLetterReason::DuplicateMessage {
350            inbox_key: "msg-42".into(),
351        });
352        sink.reject(&DeadLetterReason::ProcessingError {
353            message: "workflow rejected command".into(),
354        });
355    }
356
357    #[test]
358    fn noop_sink_does_not_panic() {
359        let sink = NoopDeadLetterSink;
360        sink.reject(&DeadLetterReason::UnknownPid(55001));
361    }
362
363    #[test]
364    fn arc_blanket_impl_works() {
365        let sink: Arc<LogDeadLetterSink> = Arc::new(LogDeadLetterSink);
366        sink.reject(&DeadLetterReason::UnknownPid(1));
367    }
368
369    #[test]
370    fn dead_letter_reason_display() {
371        assert_eq!(
372            DeadLetterReason::UnknownPid(55001).to_string(),
373            "unknown PID 55001"
374        );
375        assert!(
376            DeadLetterReason::VersionMismatch {
377                expected: "FV2025-10-01".into(),
378                received: "FV2026-10-01".into(),
379            }
380            .to_string()
381            .contains("version mismatch")
382        );
383    }
384}