mako_engine/error.rs
1//! Engine-level error types.
2
3/// Errors that can occur during engine operations (command dispatch, event
4/// persistence, state reconstruction).
5#[non_exhaustive]
6#[derive(Debug, thiserror::Error)]
7pub enum EngineError {
8 /// The event store returned an error.
9 #[error("store error: {message}")]
10 Store {
11 /// Human-readable description of the storage failure.
12 message: String,
13 /// `true` when the error is transient (retry may succeed after backoff).
14 transient: bool,
15 },
16
17 /// Optimistic concurrency check failed: the stream was modified by a
18 /// concurrent writer between the read and the append.
19 #[error("version conflict: expected {expected}, found {actual}")]
20 VersionConflict {
21 /// The sequence number the caller expected the stream to be at.
22 expected: u64,
23 /// The actual current sequence number of the stream.
24 actual: u64,
25 },
26
27 /// Could not deserialize a stored event payload into the typed event.
28 ///
29 /// This typically indicates a schema migration is required.
30 #[error("event deserialization failed: {0}")]
31 Deserialization(String),
32
33 /// Could not serialize a domain event into the envelope payload.
34 #[error("event serialization failed: {0}")]
35 Serialization(String),
36
37 /// A snapshot storage operation failed.
38 #[error("snapshot store error: {message}")]
39 Snapshot {
40 /// Human-readable description of the storage failure.
41 message: String,
42 /// `true` when the error is transient (retry may succeed after backoff).
43 transient: bool,
44 },
45
46 /// An outbox storage operation failed.
47 #[error("outbox store error: {message}")]
48 Outbox {
49 /// Human-readable description of the storage failure.
50 message: String,
51 /// `true` when the error is transient (retry may succeed after backoff).
52 transient: bool,
53 },
54
55 /// A deadline storage operation failed.
56 #[error("deadline store error: {message}")]
57 Deadline {
58 /// Human-readable description of the storage failure.
59 message: String,
60 /// `true` when the error is transient (retry may succeed after backoff).
61 transient: bool,
62 },
63
64 /// A process registry operation failed.
65 #[error("process registry error: {message}")]
66 Registry {
67 /// Human-readable description of the storage failure.
68 message: String,
69 /// `true` when the error is transient (retry may succeed after backoff).
70 transient: bool,
71 },
72
73 /// An inbox (AS4 dedup) operation failed.
74 #[error("inbox store error: {message}")]
75 Inbox {
76 /// Human-readable description of the storage failure.
77 message: String,
78 /// `true` when the error is transient (retry may succeed after backoff).
79 transient: bool,
80 },
81
82 /// A partner-store operation failed, or a required partner record is absent.
83 #[error("partner store error: {message}")]
84 Partner {
85 /// Human-readable description of the storage failure.
86 message: String,
87 /// `true` when the error is transient (retry may succeed after backoff).
88 transient: bool,
89 },
90
91 /// A dead-letter query operation failed.
92 ///
93 /// Covers `SlateDbDeadLetterSink::list_recent` and similar read-path
94 /// operations. Writes are fire-and-forget (logged on error) and do not
95 /// produce this variant.
96 #[error("dead-letter store error: {message}")]
97 DeadLetter {
98 /// Human-readable description of the storage failure.
99 message: String,
100 /// `true` when the error is transient (retry may succeed after backoff).
101 transient: bool,
102 },
103
104 /// The workflow rejected the command or reached an invalid state.
105 #[error("workflow error: {0}")]
106 Workflow(#[from] WorkflowError),
107
108 /// Appending the requested events would exceed the per-stream event count
109 /// limit configured on the store.
110 ///
111 /// This is a hard safety guard against runaway streams. The caller should
112 /// archive or compact the stream before retrying.
113 ///
114 /// The `stream_id`, `limit`, `new_events`, and `actual` fields are
115 /// available for internal structured logging but are intentionally **not**
116 /// included in the `Display` string returned to API callers to avoid
117 /// leaking internal stream topology.
118 #[error("event stream quota exceeded")]
119 StreamQuotaExceeded {
120 /// The stream that hit the limit (for internal logging only).
121 stream_id: crate::ids::StreamId,
122 /// The configured maximum number of events per stream.
123 limit: u64,
124 /// Number of events that would be written by this append.
125 new_events: usize,
126 /// Total event count after the append would complete.
127 actual: u64,
128 },
129
130 /// An AS4 transport send operation failed.
131 ///
132 /// Distinct from [`Store`] so the outbox worker can decide retry strategy
133 /// without string-matching: transport errors are potentially
134 /// transient; serialization errors are permanent.
135 ///
136 /// [`Store`]: EngineError::Store
137 #[error("AS4 transport error sending to {endpoint}: {message}")]
138 Transport {
139 /// The AS4 endpoint URL (or `"unknown"` when not available).
140 endpoint: Box<str>,
141 /// The underlying error description.
142 message: String,
143 },
144
145 /// The outbound message cannot be delivered because no AS4 endpoint is
146 /// registered for the recipient GLN.
147 ///
148 /// This is a **permanent** failure: the operator must add the missing
149 /// `--as4-partner <GLN>=<URL>` entry before delivery can succeed.
150 /// The outbox worker should dead-letter immediately rather than retrying.
151 #[error("no AS4 endpoint configured for recipient {recipient}")]
152 PartnerUnknown {
153 /// The recipient GLN that has no registered endpoint.
154 recipient: Box<str>,
155 },
156
157 /// The outbound message cannot be rendered to EDIFACT wire format because
158 /// no renderer is implemented for its message type.
159 ///
160 /// This is a **permanent** failure: retrying will never succeed until a
161 /// wire-format renderer is implemented for the message type. The outbox
162 /// worker should dead-letter the message immediately and alert the operator.
163 ///
164 /// Use this instead of silently transmitting JSON blobs over AS4, which
165 /// violates BDEW MaKo interoperability requirements.
166 #[error(
167 "no EDIFACT renderer implemented for message type '{message_type}' \
168 (outbox entry {message_id}); implement a wire-format renderer before \
169 enabling this workflow path in production"
170 )]
171 RendererNotImplemented {
172 /// The EDIFACT message type string (e.g. `"MSCONS"`, `"INVOIC"`).
173 message_type: Box<str>,
174 /// The outbox message ID for correlation with the dead-letter store.
175 message_id: Box<str>,
176 },
177
178 /// A string could not be converted into a valid [`StreamId`].
179 ///
180 /// Stream IDs must be non-empty and must not contain NUL bytes.
181 /// This error is produced by [`StreamId::try_new`] and the
182 /// [`TryFrom`] impls. Use the typed constructors
183 /// ([`StreamId::for_process`], [`StreamId::for_partner`]) where possible
184 /// to avoid constructing stream IDs from raw strings.
185 ///
186 /// [`StreamId`]: crate::ids::StreamId
187 /// [`StreamId::try_new`]: crate::ids::StreamId::try_new
188 /// [`StreamId::for_process`]: crate::ids::StreamId::for_process
189 /// [`StreamId::for_partner`]: crate::ids::StreamId::for_partner
190 #[error("invalid stream ID: {reason}")]
191 InvalidStreamId {
192 /// The rejected input (truncated to 200 chars for log safety).
193 input: Box<str>,
194 /// Human-readable explanation of why the ID was rejected.
195 reason: &'static str,
196 },
197}
198
199impl EngineError {
200 // ── Storage error constructors ────────────────────────────────────────────
201
202 /// Construct a **permanent** (non-retriable) event-store error.
203 pub fn store(message: impl Into<String>) -> Self {
204 Self::Store {
205 message: message.into(),
206 transient: false,
207 }
208 }
209
210 /// Construct a **transient** (retriable) event-store error.
211 pub fn transient_store(message: impl Into<String>) -> Self {
212 Self::Store {
213 message: message.into(),
214 transient: true,
215 }
216 }
217
218 /// Construct a **permanent** outbox-store error.
219 pub fn outbox(message: impl Into<String>) -> Self {
220 Self::Outbox {
221 message: message.into(),
222 transient: false,
223 }
224 }
225
226 /// Construct a **transient** outbox-store error.
227 pub fn transient_outbox(message: impl Into<String>) -> Self {
228 Self::Outbox {
229 message: message.into(),
230 transient: true,
231 }
232 }
233
234 /// Construct a **permanent** deadline-store error.
235 pub fn deadline(message: impl Into<String>) -> Self {
236 Self::Deadline {
237 message: message.into(),
238 transient: false,
239 }
240 }
241
242 /// Construct a **transient** deadline-store error.
243 pub fn transient_deadline(message: impl Into<String>) -> Self {
244 Self::Deadline {
245 message: message.into(),
246 transient: true,
247 }
248 }
249
250 /// Construct a **permanent** process-registry error.
251 pub fn registry(message: impl Into<String>) -> Self {
252 Self::Registry {
253 message: message.into(),
254 transient: false,
255 }
256 }
257
258 /// Construct a **transient** process-registry error.
259 pub fn transient_registry(message: impl Into<String>) -> Self {
260 Self::Registry {
261 message: message.into(),
262 transient: true,
263 }
264 }
265
266 /// Construct a **permanent** inbox-store error.
267 pub fn inbox(message: impl Into<String>) -> Self {
268 Self::Inbox {
269 message: message.into(),
270 transient: false,
271 }
272 }
273
274 /// Construct a **transient** inbox-store error.
275 pub fn transient_inbox(message: impl Into<String>) -> Self {
276 Self::Inbox {
277 message: message.into(),
278 transient: true,
279 }
280 }
281
282 /// Construct a **permanent** snapshot-store error.
283 pub fn snapshot(message: impl Into<String>) -> Self {
284 Self::Snapshot {
285 message: message.into(),
286 transient: false,
287 }
288 }
289
290 /// Construct a **transient** snapshot-store error.
291 pub fn transient_snapshot(message: impl Into<String>) -> Self {
292 Self::Snapshot {
293 message: message.into(),
294 transient: true,
295 }
296 }
297
298 /// Construct a **permanent** partner-store error.
299 pub fn partner(message: impl Into<String>) -> Self {
300 Self::Partner {
301 message: message.into(),
302 transient: false,
303 }
304 }
305
306 /// Construct a **transient** partner-store error.
307 pub fn transient_partner(message: impl Into<String>) -> Self {
308 Self::Partner {
309 message: message.into(),
310 transient: true,
311 }
312 }
313
314 /// Construct a **permanent** dead-letter-store error.
315 pub fn dead_letter(message: impl Into<String>) -> Self {
316 Self::DeadLetter {
317 message: message.into(),
318 transient: false,
319 }
320 }
321
322 /// Construct a **transient** dead-letter-store error.
323 pub fn transient_dead_letter(message: impl Into<String>) -> Self {
324 Self::DeadLetter {
325 message: message.into(),
326 transient: true,
327 }
328 }
329
330 // ── Predicate helpers ─────────────────────────────────────────────────────
331
332 /// Return `true` when this is a [`EngineError::VersionConflict`].
333 ///
334 /// Useful for retry logic: on a version conflict the caller should reload
335 /// state and re-issue the command.
336 #[must_use]
337 pub fn is_version_conflict(&self) -> bool {
338 matches!(self, Self::VersionConflict { .. })
339 }
340
341 /// Return `true` when this is a [`EngineError::StreamQuotaExceeded`].
342 #[must_use]
343 pub fn is_stream_quota_exceeded(&self) -> bool {
344 matches!(self, Self::StreamQuotaExceeded { .. })
345 }
346
347 /// Return `true` when this is a [`EngineError::Workflow`].
348 ///
349 /// Useful for distinguishing infrastructure failures (store errors) from
350 /// domain rejections (the workflow refused the command).
351 #[must_use]
352 pub fn is_workflow_error(&self) -> bool {
353 matches!(self, Self::Workflow(_))
354 }
355
356 /// Return `true` when the error is likely transient and the operation
357 /// can be safely retried after a short backoff.
358 ///
359 /// Storage errors carry an explicit `transient` flag set at the point of
360 /// construction by the storage layer, eliminating any reliance on
361 /// string-matching heuristics.
362 ///
363 /// Transport errors (network timeouts, TLS failures) are always transient.
364 /// All other errors (version conflicts, quota exceeded, workflow
365 /// rejections, …) are permanent.
366 ///
367 /// # Usage
368 ///
369 /// ```rust,ignore
370 /// for attempt in 0..MAX_RETRIES {
371 /// match process.execute(cmd.clone()).await {
372 /// Ok(result) => return Ok(result),
373 /// Err(e) if e.is_version_conflict() => { /* reload and retry */ }
374 /// Err(e) if e.is_transient() => {
375 /// tokio::time::sleep(backoff(attempt)).await;
376 /// }
377 /// Err(e) => return Err(e),
378 /// }
379 /// }
380 /// ```
381 #[must_use]
382 pub fn is_transient(&self) -> bool {
383 match self {
384 Self::Store { transient, .. }
385 | Self::Outbox { transient, .. }
386 | Self::Deadline { transient, .. }
387 | Self::Registry { transient, .. }
388 | Self::Inbox { transient, .. }
389 | Self::Snapshot { transient, .. }
390 | Self::Partner { transient, .. }
391 | Self::DeadLetter { transient, .. } => *transient,
392 // Transport errors (network timeouts, TLS failures) are transient.
393 Self::Transport { .. } => true,
394 // Everything else (missing partner, version conflict, …) is permanent.
395 _ => false,
396 }
397 }
398
399 /// Return `true` when this is a [`EngineError::PartnerUnknown`].
400 ///
401 /// `PartnerUnknown` is a **permanent** failure: no retry will succeed until
402 /// the operator registers the missing `--as4-partner` entry. The outbox
403 /// worker should dead-letter the message immediately.
404 #[must_use]
405 pub fn is_partner_unknown(&self) -> bool {
406 matches!(self, Self::PartnerUnknown { .. })
407 }
408
409 /// Return `true` when this is a [`EngineError::RendererNotImplemented`].
410 ///
411 /// `RendererNotImplemented` is a **permanent** failure: no retry will
412 /// succeed until a wire-format renderer is implemented for the message type.
413 /// The outbox worker should dead-letter the message immediately.
414 #[must_use]
415 pub fn is_renderer_not_implemented(&self) -> bool {
416 matches!(self, Self::RendererNotImplemented { .. })
417 }
418
419 /// Return `true` when this is a [`EngineError::Transport`].
420 #[must_use]
421 pub fn is_transport_error(&self) -> bool {
422 matches!(self, Self::Transport { .. })
423 }
424
425 /// Return the inner [`WorkflowError`] if this is a workflow rejection,
426 /// or `None` otherwise.
427 #[must_use]
428 pub fn as_workflow_error(&self) -> Option<&WorkflowError> {
429 if let Self::Workflow(e) = self {
430 Some(e)
431 } else {
432 None
433 }
434 }
435
436 /// Return `true` when this is a [`EngineError::Snapshot`].
437 #[must_use]
438 pub fn is_snapshot_error(&self) -> bool {
439 matches!(self, Self::Snapshot { .. })
440 }
441
442 /// Return `true` when this is a [`EngineError::Outbox`].
443 #[must_use]
444 pub fn is_outbox_error(&self) -> bool {
445 matches!(self, Self::Outbox { .. })
446 }
447
448 /// Return `true` when this is a [`EngineError::Deadline`].
449 #[must_use]
450 pub fn is_deadline_error(&self) -> bool {
451 matches!(self, Self::Deadline { .. })
452 }
453
454 /// Return `true` when this is a [`EngineError::Registry`].
455 #[must_use]
456 pub fn is_registry_error(&self) -> bool {
457 matches!(self, Self::Registry { .. })
458 }
459
460 /// Return `true` when this is a [`EngineError::Inbox`].
461 #[must_use]
462 pub fn is_inbox_error(&self) -> bool {
463 matches!(self, Self::Inbox { .. })
464 }
465}
466
467/// Reasons a workflow may refuse a command.
468#[non_exhaustive]
469#[derive(Debug, thiserror::Error)]
470pub enum WorkflowError {
471 /// The command is not valid for the process in its current state.
472 #[error("command rejected: {reason}")]
473 CommandRejected {
474 /// Human-readable rejection reason.
475 reason: String,
476 },
477
478 /// The command arrived when the process was in an unexpected state.
479 #[error("invalid state: expected {expected}, found {actual}")]
480 InvalidState {
481 /// The state the workflow expected.
482 expected: String,
483 /// The actual state the process was in.
484 actual: String,
485 },
486
487 /// Domain validation of the command payload failed.
488 #[error("validation failed: {message}")]
489 ValidationFailed {
490 /// Description of what failed validation.
491 message: String,
492 },
493
494 /// The process identified by `pid` is registered in the PID router but has
495 /// no workflow implementation yet.
496 ///
497 /// Callers should respond to the sender with a CONTRL/APERAK rejection.
498 /// This variant is **never** a transient error — it indicates missing
499 /// implementation and must not be retried.
500 #[error("workflow not implemented for PID {pid}")]
501 NotImplemented {
502 /// The Prüfidentifikator that has no implementation.
503 pid: u32,
504 },
505
506 /// An unexpected error occurred inside the workflow handler.
507 #[error("{message}")]
508 Other {
509 /// Error description.
510 message: String,
511 },
512}
513
514impl WorkflowError {
515 /// Construct a [`WorkflowError::CommandRejected`] with a formatted reason.
516 #[must_use]
517 pub fn rejected(reason: impl Into<String>) -> Self {
518 Self::CommandRejected {
519 reason: reason.into(),
520 }
521 }
522
523 /// Construct a [`WorkflowError::InvalidState`].
524 #[must_use]
525 pub fn invalid_state(expected: impl Into<String>, actual: impl Into<String>) -> Self {
526 Self::InvalidState {
527 expected: expected.into(),
528 actual: actual.into(),
529 }
530 }
531
532 /// Construct a [`WorkflowError::ValidationFailed`].
533 #[must_use]
534 pub fn validation(message: impl Into<String>) -> Self {
535 Self::ValidationFailed {
536 message: message.into(),
537 }
538 }
539
540 /// Construct a [`WorkflowError::NotImplemented`] for a given PID.
541 ///
542 /// Use this to signal that the PID is routed but has no workflow
543 /// implementation. Callers must respond with a CONTRL/APERAK rejection
544 /// to the sender — this variant must never be silently discarded.
545 #[must_use]
546 pub fn not_implemented(pid: u32) -> Self {
547 Self::NotImplemented { pid }
548 }
549
550 /// Construct a [`WorkflowError::Other`].
551 #[must_use]
552 pub fn other(message: impl Into<String>) -> Self {
553 Self::Other {
554 message: message.into(),
555 }
556 }
557
558 /// Return `true` when this is a [`WorkflowError::CommandRejected`].
559 #[must_use]
560 pub fn is_rejected(&self) -> bool {
561 matches!(self, Self::CommandRejected { .. })
562 }
563
564 /// Return `true` when this is a [`WorkflowError::InvalidState`].
565 #[must_use]
566 pub fn is_invalid_state(&self) -> bool {
567 matches!(self, Self::InvalidState { .. })
568 }
569
570 /// Return `true` when this is a [`WorkflowError::ValidationFailed`].
571 #[must_use]
572 pub fn is_validation_failed(&self) -> bool {
573 matches!(self, Self::ValidationFailed { .. })
574 }
575
576 /// Return `true` when this is a [`WorkflowError::NotImplemented`].
577 ///
578 /// This variant is never transient — callers must not retry.
579 #[must_use]
580 pub fn is_not_implemented(&self) -> bool {
581 matches!(self, Self::NotImplemented { .. })
582 }
583}