Skip to main content

mako_engine/
error.rs

1//! Engine-level error types.
2
3/// Errors that can occur during engine operations (command dispatch, event
4/// persistence, state reconstruction).
5#[non_exhaustive]
6#[derive(Debug, thiserror::Error)]
7pub enum EngineError {
8    /// The event store returned an error.
9    #[error("store error: {message}")]
10    Store {
11        /// Human-readable description of the storage failure.
12        message: String,
13        /// `true` when the error is transient (retry may succeed after backoff).
14        transient: bool,
15    },
16
17    /// Optimistic concurrency check failed: the stream was modified by a
18    /// concurrent writer between the read and the append.
19    #[error("version conflict: expected {expected}, found {actual}")]
20    VersionConflict {
21        /// The sequence number the caller expected the stream to be at.
22        expected: u64,
23        /// The actual current sequence number of the stream.
24        actual: u64,
25    },
26
27    /// Could not deserialize a stored event payload into the typed event.
28    ///
29    /// This typically indicates a schema migration is required.
30    #[error("event deserialization failed: {0}")]
31    Deserialization(String),
32
33    /// Could not serialize a domain event into the envelope payload.
34    #[error("event serialization failed: {0}")]
35    Serialization(String),
36
37    /// A snapshot storage operation failed.
38    #[error("snapshot store error: {message}")]
39    Snapshot {
40        /// Human-readable description of the storage failure.
41        message: String,
42        /// `true` when the error is transient (retry may succeed after backoff).
43        transient: bool,
44    },
45
46    /// An outbox storage operation failed.
47    #[error("outbox store error: {message}")]
48    Outbox {
49        /// Human-readable description of the storage failure.
50        message: String,
51        /// `true` when the error is transient (retry may succeed after backoff).
52        transient: bool,
53    },
54
55    /// A deadline storage operation failed.
56    #[error("deadline store error: {message}")]
57    Deadline {
58        /// Human-readable description of the storage failure.
59        message: String,
60        /// `true` when the error is transient (retry may succeed after backoff).
61        transient: bool,
62    },
63
64    /// A process registry operation failed.
65    #[error("process registry error: {message}")]
66    Registry {
67        /// Human-readable description of the storage failure.
68        message: String,
69        /// `true` when the error is transient (retry may succeed after backoff).
70        transient: bool,
71    },
72
73    /// An inbox (AS4 dedup) operation failed.
74    #[error("inbox store error: {message}")]
75    Inbox {
76        /// Human-readable description of the storage failure.
77        message: String,
78        /// `true` when the error is transient (retry may succeed after backoff).
79        transient: bool,
80    },
81
82    /// A partner-store operation failed, or a required partner record is absent.
83    #[error("partner store error: {message}")]
84    Partner {
85        /// Human-readable description of the storage failure.
86        message: String,
87        /// `true` when the error is transient (retry may succeed after backoff).
88        transient: bool,
89    },
90
91    /// A dead-letter query operation failed.
92    ///
93    /// Covers `SlateDbDeadLetterSink::list_recent` and similar read-path
94    /// operations.  Writes are fire-and-forget (logged on error) and do not
95    /// produce this variant.
96    #[error("dead-letter store error: {message}")]
97    DeadLetter {
98        /// Human-readable description of the storage failure.
99        message: String,
100        /// `true` when the error is transient (retry may succeed after backoff).
101        transient: bool,
102    },
103
104    /// The workflow rejected the command or reached an invalid state.
105    #[error("workflow error: {0}")]
106    Workflow(#[from] WorkflowError),
107
108    /// Appending the requested events would exceed the per-stream event count
109    /// limit configured on the store.
110    ///
111    /// This is a hard safety guard against runaway streams. The caller should
112    /// archive or compact the stream before retrying.
113    ///
114    /// The `stream_id`, `limit`, `new_events`, and `actual` fields are
115    /// available for internal structured logging but are intentionally **not**
116    /// included in the `Display` string returned to API callers to avoid
117    /// leaking internal stream topology.
118    #[error("event stream quota exceeded")]
119    StreamQuotaExceeded {
120        /// The stream that hit the limit (for internal logging only).
121        stream_id: crate::ids::StreamId,
122        /// The configured maximum number of events per stream.
123        limit: u64,
124        /// Number of events that would be written by this append.
125        new_events: usize,
126        /// Total event count after the append would complete.
127        actual: u64,
128    },
129
130    /// An AS4 transport send operation failed.
131    ///
132    /// Distinct from [`Store`] so the outbox worker can decide retry strategy
133    /// without string-matching: transport errors are potentially
134    /// transient; serialization errors are permanent.
135    ///
136    /// [`Store`]: EngineError::Store
137    #[error("AS4 transport error sending to {endpoint}: {message}")]
138    Transport {
139        /// The AS4 endpoint URL (or `"unknown"` when not available).
140        endpoint: Box<str>,
141        /// The underlying error description.
142        message: String,
143    },
144
145    /// The outbound message cannot be delivered because no AS4 endpoint is
146    /// registered for the recipient GLN.
147    ///
148    /// This is a **permanent** failure: the operator must add the missing
149    /// `--as4-partner <GLN>=<URL>` entry before delivery can succeed.
150    /// The outbox worker should dead-letter immediately rather than retrying.
151    #[error("no AS4 endpoint configured for recipient {recipient}")]
152    PartnerUnknown {
153        /// The recipient GLN that has no registered endpoint.
154        recipient: Box<str>,
155    },
156
157    /// The outbound message cannot be rendered to EDIFACT wire format because
158    /// no renderer is implemented for its message type.
159    ///
160    /// This is a **permanent** failure: retrying will never succeed until a
161    /// wire-format renderer is implemented for the message type. The outbox
162    /// worker should dead-letter the message immediately and alert the operator.
163    ///
164    /// Use this instead of silently transmitting JSON blobs over AS4, which
165    /// violates BDEW MaKo interoperability requirements.
166    #[error(
167        "no EDIFACT renderer implemented for message type '{message_type}' \
168         (outbox entry {message_id}); implement a wire-format renderer before \
169         enabling this workflow path in production"
170    )]
171    RendererNotImplemented {
172        /// The EDIFACT message type string (e.g. `"MSCONS"`, `"INVOIC"`).
173        message_type: Box<str>,
174        /// The outbox message ID for correlation with the dead-letter store.
175        message_id: Box<str>,
176    },
177
178    /// A string could not be converted into a valid [`StreamId`].
179    ///
180    /// Stream IDs must be non-empty and must not contain NUL bytes.
181    /// This error is produced by [`StreamId::try_new`] and the
182    /// [`TryFrom`] impls. Use the typed constructors
183    /// ([`StreamId::for_process`], [`StreamId::for_partner`]) where possible
184    /// to avoid constructing stream IDs from raw strings.
185    ///
186    /// [`StreamId`]: crate::ids::StreamId
187    /// [`StreamId::try_new`]: crate::ids::StreamId::try_new
188    /// [`StreamId::for_process`]: crate::ids::StreamId::for_process
189    /// [`StreamId::for_partner`]: crate::ids::StreamId::for_partner
190    #[error("invalid stream ID: {reason}")]
191    InvalidStreamId {
192        /// The rejected input (truncated to 200 chars for log safety).
193        input: Box<str>,
194        /// Human-readable explanation of why the ID was rejected.
195        reason: &'static str,
196    },
197}
198
199impl EngineError {
200    // ── Storage error constructors ────────────────────────────────────────────
201
202    /// Construct a **permanent** (non-retriable) event-store error.
203    pub fn store(message: impl Into<String>) -> Self {
204        Self::Store {
205            message: message.into(),
206            transient: false,
207        }
208    }
209
210    /// Construct a **transient** (retriable) event-store error.
211    pub fn transient_store(message: impl Into<String>) -> Self {
212        Self::Store {
213            message: message.into(),
214            transient: true,
215        }
216    }
217
218    /// Construct a **permanent** outbox-store error.
219    pub fn outbox(message: impl Into<String>) -> Self {
220        Self::Outbox {
221            message: message.into(),
222            transient: false,
223        }
224    }
225
226    /// Construct a **transient** outbox-store error.
227    pub fn transient_outbox(message: impl Into<String>) -> Self {
228        Self::Outbox {
229            message: message.into(),
230            transient: true,
231        }
232    }
233
234    /// Construct a **permanent** deadline-store error.
235    pub fn deadline(message: impl Into<String>) -> Self {
236        Self::Deadline {
237            message: message.into(),
238            transient: false,
239        }
240    }
241
242    /// Construct a **transient** deadline-store error.
243    pub fn transient_deadline(message: impl Into<String>) -> Self {
244        Self::Deadline {
245            message: message.into(),
246            transient: true,
247        }
248    }
249
250    /// Construct a **permanent** process-registry error.
251    pub fn registry(message: impl Into<String>) -> Self {
252        Self::Registry {
253            message: message.into(),
254            transient: false,
255        }
256    }
257
258    /// Construct a **transient** process-registry error.
259    pub fn transient_registry(message: impl Into<String>) -> Self {
260        Self::Registry {
261            message: message.into(),
262            transient: true,
263        }
264    }
265
266    /// Construct a **permanent** inbox-store error.
267    pub fn inbox(message: impl Into<String>) -> Self {
268        Self::Inbox {
269            message: message.into(),
270            transient: false,
271        }
272    }
273
274    /// Construct a **transient** inbox-store error.
275    pub fn transient_inbox(message: impl Into<String>) -> Self {
276        Self::Inbox {
277            message: message.into(),
278            transient: true,
279        }
280    }
281
282    /// Construct a **permanent** snapshot-store error.
283    pub fn snapshot(message: impl Into<String>) -> Self {
284        Self::Snapshot {
285            message: message.into(),
286            transient: false,
287        }
288    }
289
290    /// Construct a **transient** snapshot-store error.
291    pub fn transient_snapshot(message: impl Into<String>) -> Self {
292        Self::Snapshot {
293            message: message.into(),
294            transient: true,
295        }
296    }
297
298    /// Construct a **permanent** partner-store error.
299    pub fn partner(message: impl Into<String>) -> Self {
300        Self::Partner {
301            message: message.into(),
302            transient: false,
303        }
304    }
305
306    /// Construct a **transient** partner-store error.
307    pub fn transient_partner(message: impl Into<String>) -> Self {
308        Self::Partner {
309            message: message.into(),
310            transient: true,
311        }
312    }
313
314    /// Construct a **permanent** dead-letter-store error.
315    pub fn dead_letter(message: impl Into<String>) -> Self {
316        Self::DeadLetter {
317            message: message.into(),
318            transient: false,
319        }
320    }
321
322    /// Construct a **transient** dead-letter-store error.
323    pub fn transient_dead_letter(message: impl Into<String>) -> Self {
324        Self::DeadLetter {
325            message: message.into(),
326            transient: true,
327        }
328    }
329
330    // ── Predicate helpers ─────────────────────────────────────────────────────
331
332    /// Return `true` when this is a [`EngineError::VersionConflict`].
333    ///
334    /// Useful for retry logic: on a version conflict the caller should reload
335    /// state and re-issue the command.
336    #[must_use]
337    pub fn is_version_conflict(&self) -> bool {
338        matches!(self, Self::VersionConflict { .. })
339    }
340
341    /// Return `true` when this is a [`EngineError::StreamQuotaExceeded`].
342    #[must_use]
343    pub fn is_stream_quota_exceeded(&self) -> bool {
344        matches!(self, Self::StreamQuotaExceeded { .. })
345    }
346
347    /// Return `true` when this is a [`EngineError::Workflow`].
348    ///
349    /// Useful for distinguishing infrastructure failures (store errors) from
350    /// domain rejections (the workflow refused the command).
351    #[must_use]
352    pub fn is_workflow_error(&self) -> bool {
353        matches!(self, Self::Workflow(_))
354    }
355
356    /// Return `true` when the error is likely transient and the operation
357    /// can be safely retried after a short backoff.
358    ///
359    /// Storage errors carry an explicit `transient` flag set at the point of
360    /// construction by the storage layer, eliminating any reliance on
361    /// string-matching heuristics.
362    ///
363    /// Transport errors (network timeouts, TLS failures) are always transient.
364    /// All other errors (version conflicts, quota exceeded, workflow
365    /// rejections, …) are permanent.
366    ///
367    /// # Usage
368    ///
369    /// ```rust,ignore
370    /// for attempt in 0..MAX_RETRIES {
371    ///     match process.execute(cmd.clone()).await {
372    ///         Ok(result) => return Ok(result),
373    ///         Err(e) if e.is_version_conflict() => { /* reload and retry */ }
374    ///         Err(e) if e.is_transient() => {
375    ///             tokio::time::sleep(backoff(attempt)).await;
376    ///         }
377    ///         Err(e) => return Err(e),
378    ///     }
379    /// }
380    /// ```
381    #[must_use]
382    pub fn is_transient(&self) -> bool {
383        match self {
384            Self::Store { transient, .. }
385            | Self::Outbox { transient, .. }
386            | Self::Deadline { transient, .. }
387            | Self::Registry { transient, .. }
388            | Self::Inbox { transient, .. }
389            | Self::Snapshot { transient, .. }
390            | Self::Partner { transient, .. }
391            | Self::DeadLetter { transient, .. } => *transient,
392            // Transport errors (network timeouts, TLS failures) are transient.
393            Self::Transport { .. } => true,
394            // Everything else (missing partner, version conflict, …) is permanent.
395            _ => false,
396        }
397    }
398
399    /// Return `true` when this is a [`EngineError::PartnerUnknown`].
400    ///
401    /// `PartnerUnknown` is a **permanent** failure: no retry will succeed until
402    /// the operator registers the missing `--as4-partner` entry. The outbox
403    /// worker should dead-letter the message immediately.
404    #[must_use]
405    pub fn is_partner_unknown(&self) -> bool {
406        matches!(self, Self::PartnerUnknown { .. })
407    }
408
409    /// Return `true` when this is a [`EngineError::RendererNotImplemented`].
410    ///
411    /// `RendererNotImplemented` is a **permanent** failure: no retry will
412    /// succeed until a wire-format renderer is implemented for the message type.
413    /// The outbox worker should dead-letter the message immediately.
414    #[must_use]
415    pub fn is_renderer_not_implemented(&self) -> bool {
416        matches!(self, Self::RendererNotImplemented { .. })
417    }
418
419    /// Return `true` when this is a [`EngineError::Transport`].
420    #[must_use]
421    pub fn is_transport_error(&self) -> bool {
422        matches!(self, Self::Transport { .. })
423    }
424
425    /// Return the inner [`WorkflowError`] if this is a workflow rejection,
426    /// or `None` otherwise.
427    #[must_use]
428    pub fn as_workflow_error(&self) -> Option<&WorkflowError> {
429        if let Self::Workflow(e) = self {
430            Some(e)
431        } else {
432            None
433        }
434    }
435
436    /// Return `true` when this is a [`EngineError::Snapshot`].
437    #[must_use]
438    pub fn is_snapshot_error(&self) -> bool {
439        matches!(self, Self::Snapshot { .. })
440    }
441
442    /// Return `true` when this is a [`EngineError::Outbox`].
443    #[must_use]
444    pub fn is_outbox_error(&self) -> bool {
445        matches!(self, Self::Outbox { .. })
446    }
447
448    /// Return `true` when this is a [`EngineError::Deadline`].
449    #[must_use]
450    pub fn is_deadline_error(&self) -> bool {
451        matches!(self, Self::Deadline { .. })
452    }
453
454    /// Return `true` when this is a [`EngineError::Registry`].
455    #[must_use]
456    pub fn is_registry_error(&self) -> bool {
457        matches!(self, Self::Registry { .. })
458    }
459
460    /// Return `true` when this is a [`EngineError::Inbox`].
461    #[must_use]
462    pub fn is_inbox_error(&self) -> bool {
463        matches!(self, Self::Inbox { .. })
464    }
465}
466
467/// Reasons a workflow may refuse a command.
468#[non_exhaustive]
469#[derive(Debug, thiserror::Error)]
470pub enum WorkflowError {
471    /// The command is not valid for the process in its current state.
472    #[error("command rejected: {reason}")]
473    CommandRejected {
474        /// Human-readable rejection reason.
475        reason: String,
476    },
477
478    /// The command arrived when the process was in an unexpected state.
479    #[error("invalid state: expected {expected}, found {actual}")]
480    InvalidState {
481        /// The state the workflow expected.
482        expected: String,
483        /// The actual state the process was in.
484        actual: String,
485    },
486
487    /// Domain validation of the command payload failed.
488    #[error("validation failed: {message}")]
489    ValidationFailed {
490        /// Description of what failed validation.
491        message: String,
492    },
493
494    /// The process identified by `pid` is registered in the PID router but has
495    /// no workflow implementation yet.
496    ///
497    /// Callers should respond to the sender with a CONTRL/APERAK rejection.
498    /// This variant is **never** a transient error — it indicates missing
499    /// implementation and must not be retried.
500    #[error("workflow not implemented for PID {pid}")]
501    NotImplemented {
502        /// The Prüfidentifikator that has no implementation.
503        pid: u32,
504    },
505
506    /// An unexpected error occurred inside the workflow handler.
507    #[error("{message}")]
508    Other {
509        /// Error description.
510        message: String,
511    },
512}
513
514impl WorkflowError {
515    /// Construct a [`WorkflowError::CommandRejected`] with a formatted reason.
516    #[must_use]
517    pub fn rejected(reason: impl Into<String>) -> Self {
518        Self::CommandRejected {
519            reason: reason.into(),
520        }
521    }
522
523    /// Construct a [`WorkflowError::InvalidState`].
524    #[must_use]
525    pub fn invalid_state(expected: impl Into<String>, actual: impl Into<String>) -> Self {
526        Self::InvalidState {
527            expected: expected.into(),
528            actual: actual.into(),
529        }
530    }
531
532    /// Construct a [`WorkflowError::ValidationFailed`].
533    #[must_use]
534    pub fn validation(message: impl Into<String>) -> Self {
535        Self::ValidationFailed {
536            message: message.into(),
537        }
538    }
539
540    /// Construct a [`WorkflowError::NotImplemented`] for a given PID.
541    ///
542    /// Use this to signal that the PID is routed but has no workflow
543    /// implementation. Callers must respond with a CONTRL/APERAK rejection
544    /// to the sender — this variant must never be silently discarded.
545    #[must_use]
546    pub fn not_implemented(pid: u32) -> Self {
547        Self::NotImplemented { pid }
548    }
549
550    /// Construct a [`WorkflowError::Other`].
551    #[must_use]
552    pub fn other(message: impl Into<String>) -> Self {
553        Self::Other {
554            message: message.into(),
555        }
556    }
557
558    /// Return `true` when this is a [`WorkflowError::CommandRejected`].
559    #[must_use]
560    pub fn is_rejected(&self) -> bool {
561        matches!(self, Self::CommandRejected { .. })
562    }
563
564    /// Return `true` when this is a [`WorkflowError::InvalidState`].
565    #[must_use]
566    pub fn is_invalid_state(&self) -> bool {
567        matches!(self, Self::InvalidState { .. })
568    }
569
570    /// Return `true` when this is a [`WorkflowError::ValidationFailed`].
571    #[must_use]
572    pub fn is_validation_failed(&self) -> bool {
573        matches!(self, Self::ValidationFailed { .. })
574    }
575
576    /// Return `true` when this is a [`WorkflowError::NotImplemented`].
577    ///
578    /// This variant is never transient — callers must not retry.
579    #[must_use]
580    pub fn is_not_implemented(&self) -> bool {
581        matches!(self, Self::NotImplemented { .. })
582    }
583}