Skip to main content

mako_engine/
builder.rs

1//! [`EngineModule`] trait, [`EngineBuilder`], and [`EngineContext`].
2//!
3//! # Summary
4//!
5//! `EngineBuilder` assembles all engine infrastructure into a single
6//! [`EngineContext`] value. Domain modules (GPKE, WiM, GeLi Gas, …) register
7//! themselves at startup via the [`EngineModule`] trait, making their names
8//! visible in diagnostics and health checks.
9//!
10//! # Type-state guarantee
11//!
12//! [`EngineBuilder::build`] is only available when the event store type
13//! parameter `ES` implements [`EventStore`]. Forgetting to call
14//! [`with_event_store`] is a **compile-time error**, not a runtime panic.
15//!
16//! All other stores default to their respective `Noop` implementations:
17//!
18//! | Store | Default |
19//! |-------|---------|
20//! | Snapshot store | [`NoopSnapshotStore`] |
21//! | Outbox store | [`NoopOutboxStore`] |
22//! | Deadline store | [`NoopDeadlineStore`] |
23//! | Process registry | [`NoopProcessRegistry`] |
24//!
25//! # Assembly example
26//!
27//! ```rust,ignore
28//! use mako_engine::builder::{EngineBuilder, EngineModule};
29//! use mako_engine::event_store::InMemoryEventStore;
30//! use mako_engine::outbox::InMemoryOutboxStore;
31//! use mako_engine::deadline::InMemoryDeadlineStore;
32//! use mako_engine::registry::InMemoryProcessRegistry;
33//! use mako_engine::snapshot::InMemorySnapshotStore;
34//!
35//! struct GpkeModule;
36//! impl EngineModule for GpkeModule { fn name(&self) -> &'static str { "gpke" } }
37//!
38//! let ctx = EngineBuilder::new()
39//!     .with_event_store(InMemoryEventStore::new())
40//!     .with_snapshot_store(InMemorySnapshotStore::new())
41//!     .with_outbox_store(InMemoryOutboxStore::new())
42//!     .with_deadline_store(InMemoryDeadlineStore::new())
43//!     .with_registry(InMemoryProcessRegistry::new())
44//!     .register(Box::new(GpkeModule))
45//!     .build();
46//!
47//! // Spawn a fresh process:
48//! let p = ctx.spawn::<SupplierChangeWorkflow>(tenant_id, workflow_id);
49//! p.execute(ReceiveUtilmd { .. }).await?;
50//!
51//! // Resume an existing process from a persisted identity:
52//! let identity = ctx.registry.lookup(&conv_id.to_string()).await?.unwrap();
53//! let p = ctx.resume::<SupplierChangeWorkflow>(identity);
54//!
55//! // Access stores for delivery workers / schedulers:
56//! let pending = ctx.outbox_store.pending_now(50).await?;
57//! let overdue = ctx.deadline_store.due_now(50).await?;
58//! ```
59//!
60//! [`with_event_store`]: EngineBuilder::with_event_store
61
62// Type-state generics can produce long signatures that trip up the
63// `type_complexity` lint; suppress it for this module only.
64#![allow(clippy::type_complexity)]
65
66use crate::{
67    dead_letter::{DeadLetterSink, LogDeadLetterSink},
68    deadline::{Deadline, DeadlineStore, NoopDeadlineStore},
69    error::EngineError,
70    event_store::EventStore,
71    ids::{ProcessIdentity, TenantId},
72    marktrolle::DeploymentRoles,
73    outbox::{NoopOutboxStore, OutboxMessage, OutboxStore},
74    pid_router::PidRouter,
75    process::Process,
76    registry::{NoopProcessRegistry, ProcessRegistry},
77    snapshot::{NoopSnapshotStore, SnapshotStore},
78    version::WorkflowId,
79    workflow::Workflow,
80};
81
82use std::sync::Arc;
83
84// ── EngineModule ──────────────────────────────────────────────────────────────
85
86/// A self-contained domain module that registers with the engine at startup.
87///
88/// Domain crates implement this trait to declare their presence in the engine.
89/// The module name is surfaced in [`EngineContext::registered_modules`] for
90/// diagnostics, health checks, and log output.
91///
92/// ## Startup validation
93///
94/// Override [`configure`] to perform adapter coverage checks at engine startup
95/// time. The engine calls [`configure`] for every registered module during
96/// [`EngineBuilder::build`] and panics with an actionable message if any
97/// module returns `Err`. This surfaces missing adapter registrations as a
98/// startup failure rather than a silent runtime error.
99///
100/// ## Example
101///
102/// ```rust,ignore
103/// pub struct GpkeModule;
104///
105/// impl EngineModule for GpkeModule {
106///     fn name(&self) -> &'static str { "gpke" }
107///
108///     fn configure(&self) -> Result<(), String> {
109///         // Validate that every known BDEW format version has an adapter:
110///         GPKE_ADAPTER_REGISTRY
111///             .validate_policy(&GpkeWorkflow::version_policy(), &KNOWN_FVS)
112///             .map_err(|uncovered| format!(
113///                 "gpke: missing adapters for format versions: {:?}",
114///                 uncovered
115///             ))
116///     }
117/// }
118///
119/// let ctx = EngineBuilder::new()
120///     .with_event_store(my_store)
121///     .register(Box::new(GpkeModule))
122///     .build(); // panics if GpkeModule::configure returns Err
123///
124/// assert_eq!(ctx.registered_modules(), &["gpke"]);
125/// ```
126///
127/// [`configure`]: EngineModule::configure
128pub trait EngineModule: Send + 'static {
129    /// Stable, unique name for this domain module.
130    ///
131    /// Used in diagnostics, health checks, and structured log output.
132    /// Choose a short lowercase identifier (e.g. `"gpke"`, `"wim"`,
133    /// `"geli"`).
134    fn name(&self) -> &'static str;
135
136    /// Register all PIDs this module handles into the shared [`PidRouter`].
137    ///
138    /// # Mutability contract
139    ///
140    /// This method is called **exactly once** by [`EngineBuilder::build`],
141    /// before the resulting [`EngineContext`] is handed to the caller. The
142    /// `&mut PidRouter` reference is only available here, at build time.
143    /// After `build` returns the router is **sealed** — the engine provides
144    /// only a shared `&PidRouter` reference, with no mutation path at runtime.
145    ///
146    /// Consequence: **all PIDs a module will ever need must be registered
147    /// here**. Do not attempt to register PIDs lazily from async handlers or
148    /// after the engine has started — there is no API for that by design.
149    ///
150    /// Duplicate registrations (same PID from two modules) silently overwrite
151    /// the previous mapping; the last module to register wins. Use
152    /// `cargo xtask validate-pruefids` to catch accidental PID conflicts
153    /// between modules before they reach production.
154    ///
155    /// For role-conditional registration (PIDs that should only be active for
156    /// specific BDEW Marktrollen), override [`register_pids_with_roles`] instead.
157    ///
158    /// # Example
159    ///
160    /// ```rust,ignore
161    /// fn register_pids(&self, router: &mut PidRouter) {
162    ///     // GPKE Lieferantenwechsel / Lieferbeginn (BK6-22-024, PIDs 55001, 55002, 55017)
163    ///     for &pid in &[55001_u32, 55002, 55017] {
164    ///         router.register(pid, "gpke-supplier-change");
165    ///     }
166    ///     // Ex-MPES Einspeisestelle (übernommen per BK6-22-024 LFW24, ab 2025-06-06, PIDs 56001–56004)
167    ///     for pid in 56001_u32..=56004 {
168    ///         router.register(pid, "gpke-supplier-change");
169    ///     }
170    /// }
171    /// ```
172    ///
173    /// [`register_pids_with_roles`]: EngineModule::register_pids_with_roles
174    fn register_pids(&self, _router: &mut PidRouter) {}
175
176    /// Register PIDs with role-context awareness.
177    ///
178    /// This is the **preferred override** for modules that have role-conditional
179    /// PID registrations — PIDs that should only be active when this `makod`
180    /// instance holds a specific [`Marktrolle`].
181    ///
182    /// The default implementation calls [`register_pids`] (role-agnostic) so
183    /// existing modules that override `register_pids` continue to work without
184    /// changes.
185    ///
186    /// Override this method instead of `register_pids` when any PID registration
187    /// should be conditional on the deployment role:
188    ///
189    /// ```rust,ignore
190    /// use mako_engine::marktrolle::Marktrolle;
191    ///
192    /// fn register_pids_with_roles(&self, router: &mut PidRouter, roles: &DeploymentRoles) {
193    ///     // Always register: 55001, 55002 (not role-specific)
194    ///     for pid in [55001_u32, 55002] { router.register_with_module(pid, "gpke-supplier-change", self.name()); }
195    ///
196    ///     // Only when NB role: 19001/19002 inbound ORDRSP from MSB
197    ///     if roles.contains(Marktrolle::Nb) {
198    ///         for pid in [19001_u32, 19002] { router.register_with_module(pid, "gpke-konfiguration", self.name()); }
199    ///     }
200    /// }
201    /// ```
202    ///
203    /// # Conflict guard
204    ///
205    /// Use [`PidRouter::register_with_module`] (not `register`) inside this
206    /// method. The conflict guard panics at build time if two modules register
207    /// the same PID to different workflows — this makes role misconfigurations
208    /// visible at startup rather than silently misrouting messages.
209    ///
210    /// [`Marktrolle`]: crate::marktrolle::Marktrolle
211    /// [`register_pids`]: EngineModule::register_pids
212    fn register_pids_with_roles(&self, router: &mut PidRouter, _roles: &DeploymentRoles) {
213        self.register_pids(router);
214    }
215
216    /// Workflow names this module handles for deadline dispatch.
217    ///
218    /// Return the same name strings that [`register_pids`] maps PIDs to.
219    /// These names are stored in [`EngineContext::registered_workflows`] and
220    /// used to validate that every workflow that has deadlines scheduled is
221    /// covered by the deadline scheduler dispatch function at runtime.
222    ///
223    /// The default implementation returns an empty slice. Override it to
224    /// declare all workflow names that may fire deadlines:
225    ///
226    /// ```rust,ignore
227    /// fn workflow_names(&self) -> &'static [&'static str] {
228    ///     &["gpke-supplier-change", "gpke-abrechnung"]
229    /// }
230    /// ```
231    ///
232    /// [`register_pids`]: EngineModule::register_pids
233    /// [`EngineContext::registered_workflows`]: crate::builder::EngineContext::registered_workflows
234    fn workflow_names(&self) -> &'static [&'static str] {
235        &[]
236    }
237
238    /// Declare the EDIFACT profile types this module requires at runtime.
239    ///
240    /// Returning a non-empty slice causes [`EngineBuilder::build`] to call the
241    /// registered profile validator for each requirement.  If no active profile
242    /// exists for a required message type, `build` panics with an actionable
243    /// error so deployment fails fast rather than silently.
244    ///
245    /// **This replaces the previous pattern** of calling
246    /// `edi_energy::registry::ReleaseRegistry::global()` inside `configure()`.
247    /// Domain crates no longer need `edi-energy` in their production
248    /// `[dependencies]` — they just declare their requirements here.
249    ///
250    /// ```rust,ignore
251    /// fn profile_requirements(&self) -> &'static [ProfileRequirement] {
252    ///     &[
253    ///         ProfileRequirement { message_type: "UTILMD", label: "UTILMD Strom (GPKE)" },
254    ///         ProfileRequirement { message_type: "INVOIC", label: "INVOIC Abrechnung (GPKE)" },
255    ///     ]
256    /// }
257    /// ```
258    ///
259    /// [`ProfileRequirement`]: crate::profile::ProfileRequirement
260    fn profile_requirements(&self) -> &'static [crate::profile::ProfileRequirement] {
261        &[]
262    }
263
264    /// Validate adapter coverage and configuration at engine startup.
265    ///
266    /// Called by [`EngineBuilder::build`] after all modules are registered.
267    /// Return `Ok(())` when the module is fully configured. Return `Err(msg)`
268    /// with an actionable description when an adapter or configuration is
269    /// missing — the engine will panic with that message so the deployment
270    /// fails early rather than silently.
271    ///
272    /// The default implementation is a no-op (always returns `Ok(())`).
273    /// Override it in domain crates to call
274    /// [`AdapterRegistry::validate_policy`] and emit structured errors.
275    ///
276    /// Note: if your validation needs access to the edi-energy profile
277    /// registry, use [`profile_requirements`] instead — it does not require
278    /// importing `edi-energy` in domain crates.
279    ///
280    /// [`AdapterRegistry::validate_policy`]: crate::message_adapter::AdapterRegistry::validate_policy
281    /// [`profile_requirements`]: EngineModule::profile_requirements
282    ///
283    /// # Errors
284    ///
285    /// Returns a descriptive error string when the module's configuration is invalid.
286    fn configure(&self) -> Result<(), String> {
287        Ok(())
288    }
289}
290
291// ── EngineContext ─────────────────────────────────────────────────────────────
292
293/// Assembled engine infrastructure returned by [`EngineBuilder::build`].
294///
295/// `EngineContext` bundles all stores and the process registry into a single
296/// value. It is the root dependency for:
297///
298/// - Spawning new processes ([`spawn`])
299/// - Resuming existing processes ([`resume`])
300/// - Running outbox delivery workers (`outbox_store.pending_now(…)`)
301/// - Driving the deadline scheduler (`deadline_store.due_now(…)`)
302///
303/// ## Generic parameters
304///
305/// | Param | Role | Default |
306/// |-------|------|---------|
307/// | `ES`  | [`EventStore`] backend | — (required) |
308/// | `SS`  | [`SnapshotStore`] backend | [`NoopSnapshotStore`] |
309/// | `OS`  | [`OutboxStore`] backend  | [`NoopOutboxStore`]   |
310/// | `DS`  | [`DeadlineStore`] backend | [`NoopDeadlineStore`] |
311/// | `PR`  | [`ProcessRegistry`] backend | [`NoopProcessRegistry`] |
312///
313/// In most codebases all type parameters are inferred from the builder calls.
314///
315/// [`spawn`]: EngineContext::spawn
316/// [`resume`]: EngineContext::resume
317pub struct EngineContext<
318    ES,
319    SS = NoopSnapshotStore,
320    OS = NoopOutboxStore,
321    DS = NoopDeadlineStore,
322    PR = NoopProcessRegistry,
323> {
324    event_store: Arc<ES>,
325    snapshot_store: SS,
326    outbox_store: OS,
327    deadline_store: DS,
328    registry: PR,
329    /// Dead-letter sink for unroutable or unprocessable inbound messages.
330    ///
331    /// Stored as `Arc<dyn DeadLetterSink>` so callers can share it across
332    /// tasks without an extra type parameter on `EngineContext`.
333    pub dead_letter_sink: Arc<dyn DeadLetterSink>,
334    /// PID-to-workflow routing table, populated from all registered modules.
335    pid_router: PidRouter,
336    registered_modules: Vec<&'static str>,
337    /// Workflow names declared by all registered modules via
338    /// [`EngineModule::workflow_names`]. Used to validate deadline scheduler
339    /// coverage at runtime (see [`EngineContext::registered_workflows`]).
340    registered_workflows: Vec<&'static str>,
341}
342
343// ── Type aliases ──────────────────────────────────────────────────────────────
344
345/// An [`EngineContext`] with all optional subsystems disabled.
346///
347/// Uses `NoopSnapshotStore` and, in `testing`-enabled builds, Noop
348/// implementations for outbox, deadline, and process registry. Suitable for
349/// tests and minimal deployments where only a durable event store is required.
350///
351/// All five type parameters are inferred from context when used with
352/// [`EngineBuilder`]:
353///
354/// ```rust,ignore
355/// // Only available in test / testing-feature builds:
356/// use mako_engine::builder::{EngineBuilder, MinimalEngine};
357/// use mako_engine::event_store::InMemoryEventStore;
358///
359/// let ctx: MinimalEngine<InMemoryEventStore> = EngineBuilder::new()
360///     .with_event_store(InMemoryEventStore::new())
361///     .build();
362/// ```
363pub type MinimalEngine<ES> = EngineContext<ES>;
364
365impl<ES, SS, OS, DS, PR> EngineContext<ES, SS, OS, DS, PR>
366where
367    ES: EventStore,
368{
369    /// Spawn a new process and return a typed `Process<W, Arc<ES>>` handle.
370    ///
371    /// No `ES: Clone` bound is required — the engine stores the event store
372    /// behind an `Arc` so spawning is always a cheap pointer clone.
373    ///
374    /// ```rust,ignore
375    /// let p = ctx.spawn::<SupplierChangeWorkflow>(tenant_id, workflow_id);
376    /// p.execute(ReceiveUtilmd { .. }).await?;
377    /// ```
378    #[must_use]
379    pub fn spawn<W: Workflow>(
380        &self,
381        tenant_id: TenantId,
382        workflow_id: WorkflowId,
383    ) -> Process<W, Arc<ES>> {
384        Process::new(Arc::clone(&self.event_store), tenant_id, workflow_id)
385    }
386
387    /// Resume an existing process from a [`ProcessIdentity`].
388    ///
389    /// ```rust,ignore
390    /// let identity = ctx.registry()
391    ///     .lookup(tenant_id, &conv_id.to_string())
392    ///     .await?
393    ///     .ok_or(EngineError::Registry("unknown conversation".into()))?;
394    /// let p = ctx.resume::<SupplierChangeWorkflow>(identity);
395    /// p.execute(HandleAperak { .. }).await?;
396    /// ```
397    #[must_use]
398    pub fn resume<W: Workflow>(&self, identity: ProcessIdentity) -> Process<W, Arc<ES>> {
399        Process::from_identity(Arc::clone(&self.event_store), identity)
400    }
401
402    /// Names of all domain modules registered with the builder, in
403    /// registration order.
404    #[must_use]
405    pub fn registered_modules(&self) -> &[&'static str] {
406        &self.registered_modules
407    }
408
409    /// Workflow names declared by all registered modules, in registration order.
410    ///
411    /// Use this in the deadline scheduler dispatch function to detect unknown
412    /// workflow names at startup. If a deadline fires for a workflow name that
413    /// is not in this list, the scheduler's dispatch function should emit an
414    /// error rather than silently dropping the deadline:
415    ///
416    /// ```rust,ignore
417    /// let known = ctx.registered_workflows().iter().copied().collect::<HashSet<_>>();
418    /// let scheduler = ctx.run_deadline_scheduler(
419    ///     move |deadline| {
420    ///         let wf = deadline.workflow_id().name.as_ref();
421    ///         if !known.contains(wf) {
422    ///             tracing::error!(workflow = %wf, "deadline fired for unregistered workflow");
423    ///             return Box::pin(async { Ok(()) });
424    ///         }
425    ///         // dispatch by workflow name …
426    ///         Box::pin(async { Ok(()) })
427    ///     },
428    ///     100,
429    ///     Duration::from_secs(30),
430    /// );
431    /// ```
432    #[must_use]
433    pub fn registered_workflows(&self) -> &[&'static str] {
434        &self.registered_workflows
435    }
436
437    /// The event store backend (behind an `Arc`).
438    #[must_use]
439    pub fn event_store(&self) -> &Arc<ES> {
440        &self.event_store
441    }
442
443    /// The snapshot store backend.
444    #[must_use]
445    pub fn snapshot_store(&self) -> &SS {
446        &self.snapshot_store
447    }
448
449    /// The outbox store backend.
450    ///
451    /// Poll `outbox_store().pending_now(limit)` in a background task to drain
452    /// the delivery queue.
453    #[must_use]
454    pub fn outbox_store(&self) -> &OS {
455        &self.outbox_store
456    }
457
458    /// The deadline store backend.
459    ///
460    /// Poll `deadline_store().due_now(limit)` in a background scheduler to
461    /// fire overdue process timers.
462    #[must_use]
463    pub fn deadline_store(&self) -> &DS {
464        &self.deadline_store
465    }
466
467    /// The process routing registry.
468    ///
469    /// Register a [`ProcessIdentity`] under a `(tenant_id, key)` pair at
470    /// process creation, then `lookup` it when routing inbound messages.
471    #[must_use]
472    pub fn registry(&self) -> &PR {
473        &self.registry
474    }
475
476    /// The dead-letter sink for unroutable or unprocessable messages.
477    ///
478    /// Call [`DeadLetterSink::reject`] when an inbound message cannot be
479    /// dispatched to any workflow. The default sink emits `tracing::warn!`
480    /// so rejections are always visible in the log output.
481    #[must_use]
482    pub fn dead_letter_sink(&self) -> &Arc<dyn DeadLetterSink> {
483        &self.dead_letter_sink
484    }
485
486    /// Assert that no Noop store is active — call this during production startup.
487    ///
488    /// Checks the type names of `OS`, `DS`, and `PR` against the string `"Noop"`.
489    /// Panics with a human-readable message if any match, directing the operator
490    /// to configure a persistent backend.
491    ///
492    /// # When to call
493    ///
494    /// Call this early in `makod`'s startup path (and `--check` mode) to catch
495    /// deployments where a Noop store was accidentally wired — e.g. the
496    /// `[outbox]`, `[deadline]`, or `[registry]` configuration section was
497    /// omitted from `makod.toml`.  The check is defence-in-depth: in release
498    /// builds without the `testing` feature, Noop stores cannot implement the
499    /// required traits at all and the compiler would have already rejected them.
500    ///
501    /// # Panics
502    ///
503    /// Panics when any of `OS`, `DS`, or `PR` is a Noop implementation.
504    pub fn assert_production_stores(&self) {
505        let checks: &[(&str, &str)] = &[
506            ("OutboxStore", std::any::type_name::<OS>()),
507            ("DeadlineStore", std::any::type_name::<DS>()),
508            ("ProcessRegistry", std::any::type_name::<PR>()),
509        ];
510        for (trait_name, type_name) in checks {
511            assert!(
512                !type_name.contains("Noop"),
513                "makod: Noop{trait_name} is active — \
514                 configure a persistent {trait_name} backend in makod.toml. \
515                 Type resolved to: {type_name}"
516            );
517        }
518    }
519
520    /// The PID-to-workflow routing table.
521    ///
522    /// Populated **once** during [`EngineBuilder::build`] by calling
523    /// [`EngineModule::register_pids`] on every registered module in
524    /// registration order. After `build` returns the table is **sealed** —
525    /// it is read-only for the lifetime of the `EngineContext` and may be
526    /// freely shared across async tasks without synchronisation.
527    ///
528    /// # Mutability contract
529    ///
530    /// There is intentionally no `pid_router_mut()` accessor. Adding PIDs
531    /// after the engine is built would create a TOCTOU race between the
532    /// dispatch path (which calls `route(pid)`) and any hypothetical
533    /// concurrent mutator. Instead, register all PIDs during the build phase
534    /// via `EngineModule::register_pids`.
535    ///
536    /// If a new process family needs to be added without restarting the
537    /// binary, rebuild and restart `makod` — hot-swap of PID routing is not
538    /// supported.
539    ///
540    /// # Example — dispatch at the AS4 reception boundary
541    ///
542    /// ```rust,ignore
543    /// let workflow_name = ctx.pid_router().route(pid)
544    ///     .ok_or_else(|| EngineError::Workflow(WorkflowError::InvalidCommand(
545    ///         format!("no workflow registered for PID {pid}").into()
546    ///     )))?;
547    ///
548    /// match workflow_name {
549    ///     "gpke-supplier-change" => dispatch::<GpkeSupplierChangeWorkflow>(&ctx, pid, payload).await,
550    ///     "wim-device-change"    => dispatch::<WimDeviceChangeWorkflow>(&ctx, pid, payload).await,
551    ///     other => Err(EngineError::Workflow(WorkflowError::InvalidCommand(
552    ///         format!("unhandled workflow name: {other}").into()
553    ///     ))),
554    /// }
555    /// ```
556    #[must_use]
557    pub fn pid_router(&self) -> &PidRouter {
558        &self.pid_router
559    }
560}
561
562// ── As4Sender ─────────────────────────────────────────────────────────────────
563
564/// Sends a single AS4 / EDIINT-over-HTTP outbound message.
565///
566/// Implement this trait for your AS4 gateway client and pass it to
567/// [`EngineContext::run_outbox_worker`].
568///
569/// # Contract
570///
571/// Return `Ok(())` only after the message has been **durably accepted** by the
572/// receiving MSH.  Return `Err(…)` on transient or permanent failure — the
573/// outbox worker calls [`OutboxStore::reschedule`] so the message is retried.
574pub trait As4Sender: Send + Sync + 'static {
575    /// Transmit `msg` and return when the remote MSH has accepted it.
576    fn send(
577        &self,
578        msg: &OutboxMessage,
579    ) -> impl std::future::Future<Output = Result<(), EngineError>> + Send;
580}
581
582// ── OutboxWorker ──────────────────────────────────────────────────────────────
583
584/// A background worker that drains the outbox by polling pending
585/// [`OutboxMessage`]s and dispatching them via an [`As4Sender`].
586///
587/// Obtain via [`EngineContext::run_outbox_worker`] and drive by spawning
588/// [`OutboxWorker::run`] in a Tokio task.
589///
590/// # Polling behaviour
591///
592/// When the poll returns an empty batch the worker sleeps for `poll_interval`
593/// before polling again.  Non-empty batches are processed immediately.
594///
595/// # Error handling
596///
597/// Successful sends are acknowledged via [`OutboxStore::acknowledge`].
598/// Failed sends are rescheduled via [`OutboxStore::reschedule`] using
599/// **full-jitter exponential backoff**: `delay = rand(0, min(MAX, BASE * 2^n))`
600/// where `n = attempt_count`. This avoids thundering-herd when multiple
601/// `makod` instances restart simultaneously after a receiver outage.
602///
603/// When `attempt_count >= max_attempts`, the message is **acknowledged** (removed
604/// from the outbox) and a [`DeadLetterReason::OutboxExhausted`] record is written
605/// to the dead-letter sink. This prevents permanently-undeliverable messages
606/// from clogging the outbox forever.
607///
608/// All errors are emitted as structured `tracing` events at `warn` / `error`
609/// level rather than `eprintln!`, so they appear in the application's log
610/// pipeline with full context (message_id, error).
611///
612/// # Example
613///
614/// ```rust,ignore
615/// use std::time::Duration;
616///
617/// let worker = ctx.run_outbox_worker(my_sender, 50, Duration::from_secs(1));
618/// tokio::spawn(async move { worker.run().await });
619/// ```
620///
621/// [`DeadLetterReason::OutboxExhausted`]: crate::dead_letter::DeadLetterReason::OutboxExhausted
622pub struct OutboxWorker<OS: OutboxStore, S: As4Sender> {
623    store: OS,
624    sender: S,
625    batch_size: usize,
626    poll_interval: std::time::Duration,
627    /// Maximum total delivery attempts before a message is dead-lettered.
628    ///
629    /// Default: 48 (covers ~4 hours at the 300 s backoff cap).
630    /// Set to `u32::MAX` to disable the cap (not recommended for production).
631    max_attempts: u32,
632    /// Sink for messages that exceed `max_attempts`.
633    dead_letter_sink: std::sync::Arc<dyn crate::dead_letter::DeadLetterSink>,
634}
635
636/// Compute a full-jitter exponential backoff delay.
637///
638/// `attempt` is the number of prior attempts (0 = first retry).
639/// `entropy` provides randomness; derive from a stable message identifier
640/// (e.g. hash of `message_id`) rather than the current timestamp — a
641/// timestamp-derived value is deterministic within a single batch, which
642/// defeats jitter when multiple messages fail simultaneously.
643///
644/// | attempt | window (s) | expected delay (s) |
645/// |---------|------------|-------------------|
646/// | 0       | 5          | 2.5               |
647/// | 1       | 10         | 5                 |
648/// | 2       | 20         | 10                |
649/// | 3       | 40         | 20                |
650/// | 4       | 80         | 40                |
651/// | 5+      | 300 (cap)  | 150               |
652fn backoff_delay(attempt: u32, entropy: u64) -> std::time::Duration {
653    const BASE_SECS: u64 = 5;
654    const MAX_SECS: u64 = 300;
655    // Exponential window: BASE * 2^attempt, capped at MAX.
656    let window = BASE_SECS
657        .saturating_mul(1u64.wrapping_shl(attempt.min(5)))
658        .min(MAX_SECS);
659    // Full jitter: uniform random in [0, window).
660    let jitter_secs = if window == 0 { 0 } else { entropy % window };
661    std::time::Duration::from_secs(jitter_secs)
662}
663
664impl<OS: OutboxStore, S: As4Sender> OutboxWorker<OS, S> {
665    /// Run the outbox drain loop until the task is cancelled.
666    ///
667    /// # Panics
668    ///
669    /// Panics if `time::Duration::try_from(delay)` overflows (unreachable for
670    /// the delay values produced by `backoff_delay`).
671    #[allow(clippy::too_many_lines)]
672    pub async fn run(self) {
673        loop {
674            let batch = match self.store.pending_now(self.batch_size).await {
675                Ok(b) => b,
676                Err(e) => {
677                    tracing::warn!(error = %e, "outbox worker: store error polling pending messages (will retry)");
678                    tokio::time::sleep(self.poll_interval).await;
679                    continue;
680                }
681            };
682
683            if batch.is_empty() {
684                tokio::time::sleep(self.poll_interval).await;
685                continue;
686            }
687
688            for msg in batch {
689                // ── Max-attempt cap ───────────────────────────────────
690                // `attempt_count` starts at 0 and is incremented on each
691                // `reschedule` call.  When it reaches `max_attempts` the
692                // message is considered permanently undeliverable: acknowledge
693                // it (remove from outbox) and dead-letter it so the regulatory
694                // audit trail is preserved.
695                if msg.attempt_count >= self.max_attempts {
696                    tracing::error!(
697                        message_id   = %msg.message_id,
698                        message_type = %msg.message_type,
699                        recipient    = %msg.recipient,
700                        attempts     = msg.attempt_count,
701                        max_attempts = self.max_attempts,
702                        "outbox worker: max delivery attempts reached; dead-lettering message",
703                    );
704                    self.dead_letter_sink.reject(
705                        &crate::dead_letter::DeadLetterReason::OutboxExhausted {
706                            message_id: msg.message_id,
707                            message_type: msg.message_type.to_string(),
708                            recipient: msg.recipient.to_string(),
709                            last_error: format!(
710                                "delivery exhausted after {} attempts",
711                                msg.attempt_count
712                            ),
713                            attempts: msg.attempt_count,
714                        },
715                    );
716                    if let Err(e) = self.store.acknowledge(msg.message_id).await {
717                        tracing::error!(
718                            message_id = %msg.message_id,
719                            error = %e,
720                            "outbox worker: acknowledge after exhaust failed; message may reappear",
721                        );
722                    }
723                    continue;
724                }
725
726                match self.sender.send(&msg).await {
727                    Ok(()) => {
728                        if let Err(e) = self.store.acknowledge(msg.message_id).await {
729                            tracing::warn!(
730                                message_id = %msg.message_id,
731                                error = %e,
732                                "outbox worker: acknowledge failed",
733                            );
734                        }
735                        // CONTRL AHB 1.0 §1.2: the CONTRL must be delivered
736                        // within 6 wall-clock hours of interchange receipt.
737                        // `msg.created_at` is when the PendingOutbox was
738                        // materialised (which should equal the ingest timestamp
739                        // for transport-layer CONTRL obligations).
740                        if msg.message_type.as_ref() == "CONTRL" {
741                            let elapsed = time::OffsetDateTime::now_utc() - msg.created_at;
742                            if elapsed > time::Duration::hours(crate::fristen::CONTRL_FRIST_HOURS) {
743                                tracing::warn!(
744                                    message_id   = %msg.message_id,
745                                    elapsed_secs = elapsed.whole_seconds(),
746                                    max_secs     = crate::fristen::CONTRL_FRIST_HOURS * 3600,
747                                    "outbox worker: CONTRL delivered OUTSIDE the 6h Übertragungsfrist \
748                                     (CONTRL AHB 1.0 §1.2) — this is a BNetzA compliance violation"
749                                );
750                            }
751                        }
752                    }
753                    // Permanent error: dead-letter immediately without retrying.
754                    // PartnerUnknown requires operator intervention (add --as4-partner);
755                    // Serialization errors will never succeed on retry.
756                    Err(ref e)
757                        if e.is_partner_unknown() || matches!(e, EngineError::Serialization(_)) =>
758                    {
759                        tracing::error!(
760                            message_id   = %msg.message_id,
761                            message_type = %msg.message_type,
762                            recipient    = %msg.recipient,
763                            error        = %e,
764                            "outbox worker: permanent send failure; dead-lettering without retry",
765                        );
766                        self.dead_letter_sink.reject(
767                            &crate::dead_letter::DeadLetterReason::OutboxExhausted {
768                                message_id: msg.message_id,
769                                message_type: msg.message_type.to_string(),
770                                recipient: msg.recipient.to_string(),
771                                last_error: e.to_string(),
772                                attempts: msg.attempt_count,
773                            },
774                        );
775                        if let Err(re) = self.store.acknowledge(msg.message_id).await {
776                            tracing::error!(
777                                message_id = %msg.message_id,
778                                error = %re,
779                                "outbox worker: acknowledge after permanent failure failed",
780                            );
781                        }
782                    }
783                    Err(e) => {
784                        // Stable jitter entropy derived from the UUID bytes of
785                        // `message_id`.  Using the last 8 bytes as a `u64` gives
786                        // uniform entropy across message IDs (UUIDs are random in
787                        // all 128 bits for v4) and is stable across Rust versions —
788                        // unlike `DefaultHasher`, whose algorithm is explicitly
789                        // documented as unstable.
790                        let entropy = {
791                            let uuid = msg.message_id.as_uuid();
792                            let bytes = uuid.as_bytes();
793                            u64::from_le_bytes(bytes[8..16].try_into().unwrap())
794                        };
795                        let delay = backoff_delay(msg.attempt_count, entropy);
796                        let retry_at = time::OffsetDateTime::now_utc()
797                            + time::Duration::try_from(delay).unwrap_or(time::Duration::minutes(5));
798                        tracing::warn!(
799                            message_id   = %msg.message_id,
800                            attempt      = msg.attempt_count,
801                            max_attempts = self.max_attempts,
802                            retry_in     = ?delay,
803                            error        = %e,
804                            "outbox worker: send failed; rescheduling with backoff",
805                        );
806                        if let Err(re) = self.store.reschedule(msg.message_id, retry_at).await {
807                            tracing::error!(
808                                message_id = %msg.message_id,
809                                error      = %re,
810                                "outbox worker: reschedule failed; message may be stuck",
811                            );
812                        }
813                    }
814                }
815            }
816        }
817    }
818}
819
820impl<ES, SS, OS, DS, PR> EngineContext<ES, SS, OS, DS, PR>
821where
822    ES: EventStore,
823    OS: OutboxStore + Clone,
824{
825    /// Construct an [`OutboxWorker`] that drains the outbox via `sender`.
826    ///
827    /// `batch_size` — messages fetched per poll cycle.
828    /// `poll_interval` — sleep duration when the batch is empty.
829    ///
830    /// `max_attempts` — maximum total delivery attempts before dead-lettering.
831    /// Pass `48` for a ~4-hour retry budget at the 300 s backoff cap, or
832    /// `u32::MAX` to disable the cap (not recommended for production).
833    ///
834    /// ```rust,ignore
835    /// use std::time::Duration;
836    ///
837    /// let worker = ctx.run_outbox_worker(my_sender, 50, Duration::from_secs(1), 48);
838    /// tokio::spawn(async move { worker.run().await });
839    /// ```
840    #[must_use]
841    pub fn run_outbox_worker<S: As4Sender>(
842        &self,
843        sender: S,
844        batch_size: usize,
845        poll_interval: std::time::Duration,
846        max_attempts: u32,
847    ) -> OutboxWorker<OS, S> {
848        OutboxWorker {
849            store: self.outbox_store.clone(),
850            sender,
851            batch_size,
852            poll_interval,
853            max_attempts,
854            dead_letter_sink: self.dead_letter_sink.clone(),
855        }
856    }
857}
858
859impl<ES, SS, OS, DS, PR> std::fmt::Debug for EngineContext<ES, SS, OS, DS, PR>
860where
861    ES: std::fmt::Debug,
862    SS: std::fmt::Debug,
863    OS: std::fmt::Debug,
864    DS: std::fmt::Debug,
865    PR: std::fmt::Debug,
866{
867    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
868        f.debug_struct("EngineContext")
869            .field("registered_modules", &self.registered_modules)
870            .field("registered_workflows", &self.registered_workflows)
871            .field("pid_router_len", &self.pid_router.len())
872            .finish_non_exhaustive()
873    }
874}
875
876// ── NoopAs4Sender / LogAs4Sender ──────────────────────────────────────────────
877
878/// An [`As4Sender`] that succeeds immediately without sending anything.
879///
880/// Use in tests and environments where outbound AS4 delivery is not yet
881/// wired. All outbox messages are acknowledged (removed from the queue)
882/// without being transmitted.
883///
884/// # ⚠️ Data loss warning
885///
886/// Every outbox message is **silently discarded** — no EDIFACT message is
887/// sent to any counterparty. Do not use in production.
888#[derive(Debug, Clone, Copy, Default)]
889#[must_use = "NoopAs4Sender discards all outbound messages silently — use a real AS4 gateway in production"]
890pub struct NoopAs4Sender;
891
892impl As4Sender for NoopAs4Sender {
893    async fn send(&self, _msg: &OutboxMessage) -> Result<(), EngineError> {
894        Ok(())
895    }
896}
897
898/// An [`As4Sender`] that logs every outbound message at `warn` level and
899/// succeeds without transmitting.
900///
901/// Useful for development and integration-testing environments where the
902/// full AS4 stack is not yet available but message visibility is desired.
903/// All outbox messages are acknowledged (removed from the queue) after logging.
904///
905/// # ⚠️ Data loss warning
906///
907/// No EDIFACT message is sent to any counterparty. Do not use in production.
908#[derive(Debug, Clone, Copy, Default)]
909#[must_use = "LogAs4Sender discards all outbound messages — use a real AS4 gateway in production"]
910pub struct LogAs4Sender;
911
912impl As4Sender for LogAs4Sender {
913    async fn send(&self, msg: &OutboxMessage) -> Result<(), EngineError> {
914        tracing::warn!(
915            message_id   = %msg.message_id,
916            message_type = %msg.message_type,
917            recipient    = %msg.recipient,
918            "LogAs4Sender: outbox message dropped — configure a real AS4 gateway for production",
919        );
920        Ok(())
921    }
922}
923
924// ── DeadlineScheduler ─────────────────────────────────────────────────────────
925
926/// A background task that polls [`DeadlineStore::due_now`] and dispatches
927/// deadline commands to the owning processes via a caller-supplied function.
928///
929/// Obtain via [`EngineContext::run_deadline_scheduler`] and drive by spawning
930/// [`DeadlineScheduler::run`] in a Tokio task.
931///
932/// # Dispatch function
933///
934/// The `dispatch` function receives a fired [`Deadline`] and returns a future
935/// that dispatches the appropriate timeout command to the process. The function
936/// is responsible for resuming the correct workflow and calling `execute`.
937/// After the future completes, the scheduler cancels the deadline from the
938/// store regardless of the dispatch outcome (to prevent re-firing).
939///
940/// ```rust,ignore
941/// use std::time::Duration;
942///
943/// let scheduler = ctx.run_deadline_scheduler(
944///     |deadline| async move {
945///         tracing::warn!(
946///             deadline_id = %deadline.deadline_id(),
947///             label = %deadline.label(),
948///             "deadline fired",
949///         );
950///         Ok(())
951///     },
952///     100,
953///     Duration::from_secs(30),
954/// );
955/// tokio::spawn(async move { scheduler.run().await });
956/// ```
957pub struct DeadlineScheduler<DS: DeadlineStore> {
958    store: DS,
959    dispatch: Box<
960        dyn Fn(
961                Deadline,
962            ) -> std::pin::Pin<
963                Box<dyn std::future::Future<Output = Result<(), EngineError>> + Send>,
964            > + Send
965            + Sync,
966    >,
967    batch_size: usize,
968    poll_interval: std::time::Duration,
969}
970
971impl<DS: DeadlineStore> DeadlineScheduler<DS> {
972    /// Run the deadline poll loop until the task is cancelled.
973    pub async fn run(self) {
974        loop {
975            let result = match self.store.due_now(self.batch_size).await {
976                Ok(r) => r,
977                Err(e) => {
978                    tracing::warn!(
979                        error = %e,
980                        "deadline scheduler: store error polling due deadlines (will retry)",
981                    );
982                    tokio::time::sleep(self.poll_interval).await;
983                    continue;
984                }
985            };
986
987            if result.deadlines.is_empty() {
988                tokio::time::sleep(self.poll_interval).await;
989                continue;
990            }
991
992            for deadline in result.deadlines {
993                let id = deadline.deadline_id();
994                let label = deadline.label().to_owned();
995                let should_cancel = match (self.dispatch)(deadline).await {
996                    Ok(()) => true,
997                    Err(ref e) if e.is_version_conflict() => {
998                        // The process was modified concurrently; the timeout
999                        // command will be retried on the next poll cycle.
1000                        // Do NOT cancel — let the deadline remain due so it
1001                        // fires again until a non-conflict dispatch succeeds.
1002                        tracing::warn!(
1003                            deadline_id = %id,
1004                            label       = %label,
1005                            "deadline scheduler: VersionConflict; will retry on next poll",
1006                        );
1007                        false
1008                    }
1009                    Err(e) => {
1010                        tracing::warn!(
1011                            deadline_id = %id,
1012                            label       = %label,
1013                            error       = %e,
1014                            "deadline scheduler: dispatch failed (permanent); cancelling",
1015                        );
1016                        true
1017                    }
1018                };
1019                if should_cancel {
1020                    if let Err(e) = self.store.cancel(id).await {
1021                        tracing::error!(
1022                            deadline_id = %id,
1023                            error       = %e,
1024                            "deadline scheduler: cancel failed; deadline may fire again",
1025                        );
1026                    }
1027                }
1028            }
1029
1030            // If has_more, loop immediately to drain the batch.
1031        }
1032    }
1033}
1034
1035impl<ES, SS, OS, DS, PR> EngineContext<ES, SS, OS, DS, PR>
1036where
1037    ES: EventStore,
1038    DS: DeadlineStore + Clone,
1039{
1040    /// Construct a [`DeadlineScheduler`] that polls the deadline store and
1041    /// dispatches fired deadlines via `dispatch`.
1042    ///
1043    /// The `dispatch` function is called for every fired deadline. It should
1044    /// resume the owning process and execute the appropriate timeout command.
1045    ///
1046    /// `batch_size` — deadlines fetched per poll cycle.
1047    /// `poll_interval` — sleep duration when no deadlines are due.
1048    ///
1049    /// ```rust,ignore
1050    /// use std::time::Duration;
1051    ///
1052    /// let scheduler = ctx.run_deadline_scheduler(
1053    ///     |d| async move {
1054    ///         tracing::info!(label = %d.label(), "firing deadline");
1055    ///         Ok(())
1056    ///     },
1057    ///     100,
1058    ///     Duration::from_secs(30),
1059    /// );
1060    /// tokio::spawn(async move { scheduler.run().await });
1061    /// ```
1062    #[must_use]
1063    pub fn run_deadline_scheduler<F, Fut>(
1064        &self,
1065        dispatch: F,
1066        batch_size: usize,
1067        poll_interval: std::time::Duration,
1068    ) -> DeadlineScheduler<DS>
1069    where
1070        F: Fn(Deadline) -> Fut + Send + Sync + 'static,
1071        Fut: std::future::Future<Output = Result<(), EngineError>> + Send + 'static,
1072    {
1073        DeadlineScheduler {
1074            store: self.deadline_store.clone(),
1075            dispatch: Box::new(move |d| Box::pin(dispatch(d))),
1076            batch_size,
1077            poll_interval,
1078        }
1079    }
1080}
1081
1082// ── EngineBuilder ─────────────────────────────────────────────────────────────
1083
1084/// Assembles engine infrastructure and produces an [`EngineContext`].
1085///
1086/// Uses type-state to enforce that an event store is provided before
1087/// [`build`] can be called. All other stores default to `Noop`
1088/// implementations.
1089///
1090/// ## Quick start
1091///
1092/// ```rust,ignore
1093/// // Minimal — event store only, all others are Noop:
1094/// let ctx = EngineBuilder::new()
1095///     .with_event_store(InMemoryEventStore::new())
1096///     .build();
1097///
1098/// // Full infrastructure:
1099/// let ctx = EngineBuilder::new()
1100///     .with_event_store(InMemoryEventStore::new())
1101///     .with_snapshot_store(InMemorySnapshotStore::new())
1102///     .with_outbox_store(InMemoryOutboxStore::new())
1103///     .with_deadline_store(InMemoryDeadlineStore::new())
1104///     .with_registry(InMemoryProcessRegistry::new())
1105///     .register(Box::new(GpkeModule))
1106///     .build();
1107/// ```
1108///
1109/// [`build`]: EngineBuilder::build
1110pub struct EngineBuilder<
1111    ES = (),
1112    SS = NoopSnapshotStore,
1113    OS = NoopOutboxStore,
1114    DS = NoopDeadlineStore,
1115    PR = NoopProcessRegistry,
1116> {
1117    event_store: ES,
1118    snapshot_store: SS,
1119    outbox_store: OS,
1120    deadline_store: DS,
1121    registry: PR,
1122    dead_letter_sink: Arc<dyn DeadLetterSink>,
1123    modules: Vec<Box<dyn EngineModule>>,
1124    /// Active [`DeploymentRoles`] for this engine instance.
1125    ///
1126    /// Controls role-conditional PID registration via
1127    /// [`EngineModule::register_pids_with_roles`]. Defaults to
1128    /// [`DeploymentRoles::all()`] for backward compatibility.
1129    deployment_roles: DeploymentRoles,
1130    /// Optional profile validator injected by `makod` or callers that have
1131    /// access to `edi-energy`.  When `Some`, called for each
1132    /// [`ProfileRequirement`] declared by registered modules.  When `None`,
1133    /// profile requirements are not validated (safe in unit tests).
1134    ///
1135    /// Signature: `fn(message_type: &str) -> bool`
1136    ///
1137    /// [`ProfileRequirement`]: crate::profile::ProfileRequirement
1138    profile_validator: Option<Box<dyn Fn(&str) -> bool + Send + Sync>>,
1139}
1140#[cfg(any(test, feature = "testing"))]
1141impl Default
1142    for EngineBuilder<
1143        (),
1144        NoopSnapshotStore,
1145        NoopOutboxStore,
1146        NoopDeadlineStore,
1147        NoopProcessRegistry,
1148    >
1149{
1150    fn default() -> Self {
1151        Self {
1152            event_store: (),
1153            snapshot_store: NoopSnapshotStore,
1154            outbox_store: NoopOutboxStore,
1155            deadline_store: NoopDeadlineStore,
1156            registry: NoopProcessRegistry,
1157            dead_letter_sink: Arc::new(LogDeadLetterSink),
1158            modules: Vec::new(),
1159            deployment_roles: DeploymentRoles::all(),
1160            profile_validator: None,
1161        }
1162    }
1163}
1164
1165#[cfg(any(test, feature = "testing"))]
1166impl EngineBuilder {
1167    /// Create a new builder with all `Noop` defaults.
1168    ///
1169    /// Only available in `#[cfg(test)]` or with the `testing` feature enabled,
1170    /// because the Noop defaults silently discard outbox messages, deadlines,
1171    /// and process registry entries. Production binaries must wire real stores
1172    /// via the `with_*` builder methods.
1173    ///
1174    /// Call [`with_event_store`] before [`build`] — the event store is
1175    /// **required**.
1176    ///
1177    /// [`with_event_store`]: EngineBuilder::with_event_store
1178    /// [`build`]: EngineBuilder::build
1179    #[must_use]
1180    pub fn new() -> Self {
1181        Self::default()
1182    }
1183}
1184
1185impl<OS, DS, PR> EngineBuilder<(), NoopSnapshotStore, OS, DS, PR>
1186where
1187    OS: OutboxStore,
1188    DS: DeadlineStore,
1189    PR: ProcessRegistry,
1190{
1191    /// Create a production-ready builder with explicit stores for outbox,
1192    /// deadline, and process registry.
1193    ///
1194    /// This constructor is available in all build configurations including
1195    /// production binaries. It enforces that the three stores that can cause
1196    /// silent data loss (`OutboxStore`, `DeadlineStore`, `ProcessRegistry`)
1197    /// are provided explicitly — there is no Noop fallback.
1198    ///
1199    /// `NoopSnapshotStore` is used as the snapshot default because it is safe
1200    /// for production: skipping snapshots means full replay, but no data loss.
1201    /// Override with [`with_snapshot_store`] to enable snapshot-accelerated
1202    /// replay.
1203    ///
1204    /// Call [`with_event_store`] before [`build`] — the event store is
1205    /// **required**.
1206    ///
1207    /// ```rust,ignore
1208    /// let ctx = EngineBuilder::with_stores(outbox, deadline, registry)
1209    ///     .with_event_store(store.clone())
1210    ///     .with_snapshot_store(InMemorySnapshotStore::new())
1211    ///     .build();
1212    /// ```
1213    ///
1214    /// [`with_snapshot_store`]: EngineBuilder::with_snapshot_store
1215    /// [`with_event_store`]: EngineBuilder::with_event_store
1216    /// [`build`]: EngineBuilder::build
1217    #[must_use]
1218    pub fn with_stores(outbox_store: OS, deadline_store: DS, registry: PR) -> Self {
1219        Self {
1220            event_store: (),
1221            snapshot_store: NoopSnapshotStore,
1222            outbox_store,
1223            deadline_store,
1224            registry,
1225            dead_letter_sink: Arc::new(LogDeadLetterSink),
1226            modules: Vec::new(),
1227            deployment_roles: DeploymentRoles::all(),
1228            profile_validator: None,
1229        }
1230    }
1231}
1232
1233impl<ES, SS, OS, DS, PR> EngineBuilder<ES, SS, OS, DS, PR> {
1234    /// Set the event store. **Required** — `build()` is only available once
1235    /// this has been called with a type that implements [`EventStore`].
1236    ///
1237    /// Replaces any previously set event store (type-state transition).
1238    #[must_use]
1239    pub fn with_event_store<ES2: EventStore>(
1240        self,
1241        store: ES2,
1242    ) -> EngineBuilder<ES2, SS, OS, DS, PR> {
1243        EngineBuilder {
1244            event_store: store,
1245            snapshot_store: self.snapshot_store,
1246            outbox_store: self.outbox_store,
1247            deadline_store: self.deadline_store,
1248            registry: self.registry,
1249            dead_letter_sink: self.dead_letter_sink,
1250            modules: self.modules,
1251            deployment_roles: self.deployment_roles,
1252            profile_validator: self.profile_validator,
1253        }
1254    }
1255
1256    /// Set the snapshot store (default: [`NoopSnapshotStore`]).
1257    #[must_use]
1258    pub fn with_snapshot_store<SS2: SnapshotStore>(
1259        self,
1260        store: SS2,
1261    ) -> EngineBuilder<ES, SS2, OS, DS, PR> {
1262        EngineBuilder {
1263            event_store: self.event_store,
1264            snapshot_store: store,
1265            outbox_store: self.outbox_store,
1266            deadline_store: self.deadline_store,
1267            registry: self.registry,
1268            dead_letter_sink: self.dead_letter_sink,
1269            modules: self.modules,
1270            deployment_roles: self.deployment_roles,
1271            profile_validator: self.profile_validator,
1272        }
1273    }
1274
1275    /// Set the outbox store (default: [`NoopOutboxStore`]).
1276    #[must_use]
1277    pub fn with_outbox_store<OS2: OutboxStore>(
1278        self,
1279        store: OS2,
1280    ) -> EngineBuilder<ES, SS, OS2, DS, PR> {
1281        EngineBuilder {
1282            event_store: self.event_store,
1283            snapshot_store: self.snapshot_store,
1284            outbox_store: store,
1285            deadline_store: self.deadline_store,
1286            registry: self.registry,
1287            dead_letter_sink: self.dead_letter_sink,
1288            modules: self.modules,
1289            deployment_roles: self.deployment_roles,
1290            profile_validator: self.profile_validator,
1291        }
1292    }
1293
1294    /// Set the deadline store (default: [`NoopDeadlineStore`]).
1295    #[must_use]
1296    pub fn with_deadline_store<DS2: DeadlineStore>(
1297        self,
1298        store: DS2,
1299    ) -> EngineBuilder<ES, SS, OS, DS2, PR> {
1300        EngineBuilder {
1301            event_store: self.event_store,
1302            snapshot_store: self.snapshot_store,
1303            outbox_store: self.outbox_store,
1304            deadline_store: store,
1305            registry: self.registry,
1306            dead_letter_sink: self.dead_letter_sink,
1307            modules: self.modules,
1308            deployment_roles: self.deployment_roles,
1309            profile_validator: self.profile_validator,
1310        }
1311    }
1312
1313    /// Set the process registry (default: [`NoopProcessRegistry`]).
1314    #[must_use]
1315    pub fn with_registry<PR2: ProcessRegistry>(
1316        self,
1317        registry: PR2,
1318    ) -> EngineBuilder<ES, SS, OS, DS, PR2> {
1319        EngineBuilder {
1320            event_store: self.event_store,
1321            snapshot_store: self.snapshot_store,
1322            outbox_store: self.outbox_store,
1323            deadline_store: self.deadline_store,
1324            registry,
1325            dead_letter_sink: self.dead_letter_sink,
1326            modules: self.modules,
1327            deployment_roles: self.deployment_roles,
1328            profile_validator: self.profile_validator,
1329        }
1330    }
1331
1332    /// Set the dead-letter sink (default: [`LogDeadLetterSink`]).
1333    ///
1334    /// The dead-letter sink receives every message that cannot be routed to a
1335    /// workflow. The default [`LogDeadLetterSink`] emits `tracing::warn!`
1336    /// events, making rejections visible in log output without configuration.
1337    ///
1338    /// Override with a persistent DLQ implementation in production:
1339    ///
1340    /// ```rust,ignore
1341    /// use mako_engine::dead_letter::LogDeadLetterSink;
1342    ///
1343    /// let ctx = EngineBuilder::new()
1344    ///     .with_event_store(my_store)
1345    ///     .with_dead_letter_sink(MyPersistentDlq::new())
1346    ///     .build();
1347    /// ```
1348    ///
1349    /// [`LogDeadLetterSink`]: crate::dead_letter::LogDeadLetterSink
1350    #[must_use]
1351    pub fn with_dead_letter_sink(mut self, sink: impl DeadLetterSink) -> Self {
1352        self.dead_letter_sink = Arc::new(sink);
1353        self
1354    }
1355
1356    /// Register an `edi-energy` profile validator for startup profile checks.
1357    ///
1358    /// The closure receives a message-type string (e.g. `"UTILMD"`) and must
1359    /// return `true` if at least one active profile for that message type is
1360    /// registered for today's date.
1361    ///
1362    /// Wire this in `makod` using the `edi-energy` global registry:
1363    ///
1364    /// ```rust,ignore
1365    /// use edi_energy::registry::ReleaseRegistry;
1366    ///
1367    /// let today = time::OffsetDateTime::now_utc().date();
1368    /// builder.with_profile_validator(move |msg_type| {
1369    ///     ReleaseRegistry::global()
1370    ///         .profiles_for_str(msg_type)
1371    ///         .any(|p| match (p.valid_from(), p.valid_until()) {
1372    ///             (Some(f), Some(u)) => f <= today && today <= u,
1373    ///             (Some(f), None)    => f <= today,
1374    ///             (None, _)          => true,
1375    ///         })
1376    /// })
1377    /// ```
1378    ///
1379    /// Domain crates do **not** need to call this — they only declare
1380    /// [`profile_requirements`].
1381    ///
1382    /// [`profile_requirements`]: EngineModule::profile_requirements
1383    #[must_use]
1384    pub fn with_profile_validator(
1385        mut self,
1386        validator: impl Fn(&str) -> bool + Send + Sync + 'static,
1387    ) -> Self {
1388        self.profile_validator = Some(Box::new(validator));
1389        self
1390    }
1391
1392    /// Register a domain module.
1393    ///
1394    /// The module name becomes visible in
1395    /// [`EngineContext::registered_modules`] after [`build`] is called.
1396    ///
1397    /// [`build`]: EngineBuilder::build
1398    #[must_use]
1399    pub fn register(mut self, module: Box<dyn EngineModule>) -> Self {
1400        self.modules.push(module);
1401        self
1402    }
1403
1404    /// Set the active [`DeploymentRoles`] for this engine instance.
1405    ///
1406    /// Controls role-conditional PID registration in [`EngineModule::register_pids_with_roles`].
1407    ///
1408    /// The default is [`DeploymentRoles::all()`], which registers every PID unconditionally
1409    /// — identical to the pre-role-aware behavior. Providing an explicit role set
1410    /// restricts role-conditional blocks to only the declared roles:
1411    ///
1412    /// - **NB-only** (`DeploymentRoles::nb()`): 19001/19002 route to `gpke-konfiguration`;
1413    ///   WiM nMSB blocks are skipped.
1414    /// - **nMSB-only** (`DeploymentRoles::nmsb()`): 19001/19002 route to `wim-geraeteubernahme`;
1415    ///   GPKE NB blocks are skipped.
1416    /// - **NB + gMSB** (`DeploymentRoles::nb_msb()`): most common Stadtwerke combination.
1417    ///
1418    /// # Conflict guard
1419    ///
1420    /// When two modules would register the same PID to **different** workflows, the
1421    /// engine panics during [`build`]. Set explicit roles to prevent both modules from
1422    /// activating the same PID simultaneously:
1423    ///
1424    /// ```rust,ignore
1425    /// use mako_engine::marktrolle::DeploymentRoles;
1426    ///
1427    /// let ctx = EngineBuilder::with_stores(outbox, deadline, registry)
1428    ///     .with_event_store(store)
1429    ///     .with_deployment_roles(DeploymentRoles::nb())  // only NB: GPKE gets 19001/19002
1430    ///     .register(Box::new(GpkeModule))
1431    ///     .register(Box::new(WimModule))  // nMSB block skipped — no conflict
1432    ///     .build();
1433    /// ```
1434    ///
1435    /// [`build`]: EngineBuilder::build
1436    #[must_use]
1437    pub fn with_deployment_roles(mut self, roles: DeploymentRoles) -> Self {
1438        self.deployment_roles = roles;
1439        self
1440    }
1441}
1442
1443impl<ES, SS, OS, DS, PR> EngineBuilder<ES, SS, OS, DS, PR>
1444where
1445    ES: EventStore,
1446    SS: SnapshotStore,
1447    OS: OutboxStore,
1448    DS: DeadlineStore,
1449    PR: ProcessRegistry,
1450{
1451    /// Build the [`EngineContext`].
1452    ///
1453    /// Consumes the builder. All registered modules and configured stores are
1454    /// moved into the returned [`EngineContext`].
1455    ///
1456    /// This method is only available when `ES` implements [`EventStore`].
1457    /// If you have not called [`with_event_store`], this will not compile.
1458    ///
1459    /// # Panics
1460    ///
1461    /// Panics when any registered module returns `Err` from
1462    /// [`EngineModule::configure`]. The panic message includes the module
1463    /// name and the error string so the deployment failure is actionable.
1464    ///
1465    /// [`with_event_store`]: EngineBuilder::with_event_store
1466    #[must_use]
1467    #[allow(clippy::too_many_lines)]
1468    pub fn build(self) -> EngineContext<ES, SS, OS, DS, PR> {
1469        // Warn when Noop stores are active — they are fine for testing but
1470        // silently lose data in production. Checking type names is the only
1471        // approach that works with the generic type-state builder without adding
1472        // a new marker trait or changing the public API.
1473        //
1474        // Gate: always emit in `testing` / `#[cfg(test)]` builds so that test
1475        // harnesses see the warning in their log output. Also emit when the
1476        // `tracing` feature is enabled (explicit instrumentation mode). In
1477        // release production builds without the `testing` feature, Noop stores
1478        // are not constructible at all (the struct definitions are cfg-gated),
1479        // so this branch is dead code and will be optimised away by the compiler.
1480        #[cfg(any(test, feature = "testing", feature = "tracing"))]
1481        {
1482            let ss_name = std::any::type_name::<SS>();
1483            let os_name = std::any::type_name::<OS>();
1484            let ds_name = std::any::type_name::<DS>();
1485            let pr_name = std::any::type_name::<PR>();
1486            if ss_name.contains("NoopSnapshotStore") {
1487                tracing::warn!(
1488                    store = ss_name,
1489                    "EngineBuilder: NoopSnapshotStore is active — snapshots will not be persisted. \
1490                     Use SlateDbStore::as_snapshot_store() in production."
1491                );
1492            }
1493            if os_name.contains("NoopOutboxStore") {
1494                tracing::warn!(
1495                    store = os_name,
1496                    "EngineBuilder: NoopOutboxStore is active — outbound messages will be silently \
1497                     discarded. Use SlateDbStore::as_outbox_store() in production."
1498                );
1499            }
1500            if ds_name.contains("NoopDeadlineStore") {
1501                tracing::warn!(
1502                    store = ds_name,
1503                    "EngineBuilder: NoopDeadlineStore is active — scheduled deadlines will not fire \
1504                     after restart. Use SlateDbStore::as_deadline_store() in production."
1505                );
1506            }
1507            if pr_name.contains("NoopProcessRegistry") {
1508                tracing::warn!(
1509                    store = pr_name,
1510                    "EngineBuilder: NoopProcessRegistry is active — process routing will be lost on \
1511                     restart. Use SlateDbStore::as_process_registry() in production."
1512                );
1513            }
1514        }
1515        // Validate every module before assembling the context.
1516        // A missing adapter or misconfigured module fails at startup (not at
1517        // first inbound message), making deployment failures observable immediately.
1518        for module in &self.modules {
1519            if let Err(msg) = module.configure() {
1520                panic!(
1521                    "EngineBuilder::build: module '{}' failed configuration validation: {}",
1522                    module.name(),
1523                    msg
1524                );
1525            }
1526            // Validate profile requirements via the injected validator.
1527            // Domain crates declare requirements; only the binary crate (makod)
1528            // injects the edi-energy registry — domain crates need no edi-energy
1529            // import for this check.
1530            if let Some(ref validator) = self.profile_validator {
1531                for req in module.profile_requirements() {
1532                    assert!(
1533                        validator(req.message_type),
1534                        "EngineBuilder::build: module '{}' requires an active edi-energy \
1535                             profile for '{}' ({}) but none is registered for today's date. \
1536                             Run `cargo xtask codegen` to add the missing profile.",
1537                        module.name(),
1538                        req.message_type,
1539                        req.label,
1540                    );
1541                }
1542            }
1543        }
1544        // Build the PID router from all registered modules.
1545        // Also assert that no two modules claim the same PID — a PID overlap
1546        // is always a configuration error: one module's messages would be
1547        // silently swallowed by another's workflow, producing missing-process
1548        // errors or incorrect audit trails.
1549        let mut pid_router = PidRouter::new();
1550        let mut pid_owners: std::collections::HashMap<u32, &str> = std::collections::HashMap::new();
1551        for module in &self.modules {
1552            // Temporarily build a scratch router to read this module's PIDs
1553            // for cross-module overlap detection (module-ownership level).
1554            let mut scratch = PidRouter::new();
1555            module.register_pids_with_roles(&mut scratch, &self.deployment_roles);
1556            for pid in scratch.registered_pids() {
1557                if let Some(prev) = pid_owners.insert(pid, module.name()) {
1558                    if self.deployment_roles.is_all() {
1559                        // With DeploymentRoles::all() (the default), role-conditional PIDs
1560                        // are registered by all modules that claim them, producing last-wins
1561                        // semantics. This is acceptable for single-role and dev/test deployments.
1562                        //
1563                        // In production multi-role deployments where both an NB and nMSB role
1564                        // are served by the same instance, set explicit roles via
1565                        // `EngineBuilder::with_deployment_roles` to prevent silent misrouting.
1566                        //
1567                        // We emit a debug-level log here (not warn) because the vast majority
1568                        // of deployments are single-role and this overlap is expected/harmless.
1569                        #[cfg(feature = "tracing")]
1570                        tracing::debug!(
1571                            pid,
1572                            previous_module = prev,
1573                            current_module = module.name(),
1574                            "PID registered by multiple modules with DeploymentRoles::all(); \
1575                             last module wins (use with_deployment_roles for strict routing)",
1576                        );
1577                        let _ = prev; // suppress unused-variable warning when tracing is off
1578                    } else {
1579                        panic!(
1580                            "EngineBuilder::build: PID {pid} is claimed by both \
1581                             '{prev}' and '{}' — overlapping PID registrations are \
1582                             not allowed with explicit DeploymentRoles; each PID must be \
1583                             owned by exactly one module.\n  \
1584                             Hint: use EngineBuilder::with_deployment_roles to restrict \
1585                             role-conditional PIDs so only one module registers each PID.\n  \
1586                             Example: with_deployment_roles(DeploymentRoles::nb()) ensures \
1587                             GPKE registers ORDRSP 19001/19002, not WiM.",
1588                            module.name(),
1589                        );
1590                    }
1591                }
1592            }
1593            // Register into the real router with conflict detection.
1594            module.register_pids_with_roles(&mut pid_router, &self.deployment_roles);
1595        }
1596        let registered_modules = self.modules.iter().map(|m| m.name()).collect();
1597        let registered_workflows = self
1598            .modules
1599            .iter()
1600            .flat_map(|m| m.workflow_names().iter().copied())
1601            .collect();
1602        EngineContext {
1603            event_store: Arc::new(self.event_store),
1604            snapshot_store: self.snapshot_store,
1605            outbox_store: self.outbox_store,
1606            deadline_store: self.deadline_store,
1607            registry: self.registry,
1608            dead_letter_sink: self.dead_letter_sink,
1609            pid_router,
1610            registered_modules,
1611            registered_workflows,
1612        }
1613    }
1614}
1615
1616#[cfg(test)]
1617mod tests {
1618    use super::*;
1619    use crate::{
1620        deadline::InMemoryDeadlineStore,
1621        error::WorkflowError,
1622        event_store::InMemoryEventStore,
1623        ids::TenantId,
1624        outbox::InMemoryOutboxStore,
1625        pid_router::PidRouter,
1626        registry::InMemoryProcessRegistry,
1627        snapshot::InMemorySnapshotStore,
1628        version::WorkflowId,
1629        workflow::{CommandPayload, EventPayload, Workflow},
1630    };
1631
1632    // ── Minimal workflow for spawn/resume tests ───────────────────────────────
1633
1634    #[derive(serde::Serialize, serde::Deserialize)]
1635    struct PingEvent;
1636
1637    impl EventPayload for PingEvent {
1638        fn event_type(&self) -> &'static str {
1639            "Ping"
1640        }
1641    }
1642
1643    struct PingCommand;
1644
1645    impl CommandPayload for PingCommand {}
1646
1647    #[derive(Default, Clone)]
1648    struct PingState;
1649
1650    struct PingWorkflow;
1651
1652    impl Workflow for PingWorkflow {
1653        type State = PingState;
1654        type Event = PingEvent;
1655        type Command = PingCommand;
1656
1657        fn apply(state: PingState, _: &PingEvent) -> PingState {
1658            state
1659        }
1660
1661        fn handle(
1662            _: &PingState,
1663            _: PingCommand,
1664        ) -> Result<crate::workflow::WorkflowOutput<PingEvent>, WorkflowError> {
1665            Ok(vec![PingEvent].into())
1666        }
1667    }
1668
1669    struct TestModule;
1670
1671    impl EngineModule for TestModule {
1672        fn name(&self) -> &'static str {
1673            "test-module"
1674        }
1675    }
1676
1677    // ── Tests ─────────────────────────────────────────────────────────────────
1678
1679    #[test]
1680    fn build_with_event_store_only() {
1681        let ctx = EngineBuilder::new()
1682            .with_event_store(InMemoryEventStore::new())
1683            .build();
1684        assert!(ctx.registered_modules().is_empty());
1685    }
1686
1687    #[test]
1688    fn build_with_all_stores_and_module() {
1689        let ctx = EngineBuilder::new()
1690            .with_event_store(InMemoryEventStore::new())
1691            .with_snapshot_store(InMemorySnapshotStore::new())
1692            .with_outbox_store(InMemoryOutboxStore::new())
1693            .with_deadline_store(InMemoryDeadlineStore::new())
1694            .with_registry(InMemoryProcessRegistry::new())
1695            .register(Box::new(TestModule))
1696            .build();
1697        assert_eq!(ctx.registered_modules(), &["test-module"]);
1698    }
1699
1700    #[test]
1701    fn multiple_modules_ordered() {
1702        struct ModA;
1703        impl EngineModule for ModA {
1704            fn name(&self) -> &'static str {
1705                "mod-a"
1706            }
1707        }
1708        struct ModB;
1709        impl EngineModule for ModB {
1710            fn name(&self) -> &'static str {
1711                "mod-b"
1712            }
1713        }
1714
1715        let ctx = EngineBuilder::new()
1716            .with_event_store(InMemoryEventStore::new())
1717            .register(Box::new(ModA))
1718            .register(Box::new(ModB))
1719            .build();
1720        assert_eq!(ctx.registered_modules(), &["mod-a", "mod-b"]);
1721    }
1722
1723    #[tokio::test]
1724    async fn spawn_creates_independent_processes() {
1725        let ctx = EngineBuilder::new()
1726            .with_event_store(InMemoryEventStore::new())
1727            .build();
1728        let wf_id = WorkflowId::new("ping", "FV2024-10-01");
1729
1730        let p1 = ctx.spawn::<PingWorkflow>(TenantId::new(), wf_id.clone());
1731        let p2 = ctx.spawn::<PingWorkflow>(TenantId::new(), wf_id);
1732
1733        assert_ne!(p1.process_id(), p2.process_id());
1734    }
1735
1736    #[tokio::test]
1737    async fn resume_sees_previously_appended_events() {
1738        let store = InMemoryEventStore::new();
1739        let ctx = EngineBuilder::new().with_event_store(store).build();
1740
1741        let p = ctx.spawn::<PingWorkflow>(TenantId::new(), WorkflowId::new("ping", "FV2024-10-01"));
1742        p.execute(PingCommand).await.unwrap();
1743
1744        let identity = p.identity();
1745        let resumed = ctx.resume::<PingWorkflow>(identity);
1746        assert_eq!(resumed.event_count().await.unwrap(), 1);
1747    }
1748
1749    #[tokio::test]
1750    async fn registry_routes_process_via_conversation_key() {
1751        use crate::registry::RegistryKey;
1752        let ctx = EngineBuilder::new()
1753            .with_event_store(InMemoryEventStore::new())
1754            .with_registry(InMemoryProcessRegistry::new())
1755            .build();
1756
1757        let p = ctx.spawn::<PingWorkflow>(TenantId::new(), WorkflowId::new("ping", "FV2024-10-01"));
1758        let tenant = p.tenant_id();
1759        let conv_key = RegistryKey::parse("conv:test-conversation-123").expect("valid key");
1760        ctx.registry()
1761            .register(tenant, &conv_key, p.identity())
1762            .await
1763            .unwrap();
1764
1765        let found = ctx
1766            .registry()
1767            .lookup(tenant, &conv_key)
1768            .await
1769            .unwrap()
1770            .expect("must be registered");
1771        let resumed = ctx.resume::<PingWorkflow>(found);
1772        assert_eq!(resumed.process_id(), p.process_id());
1773    }
1774
1775    #[test]
1776    fn pid_router_populated_by_module_register_pids() {
1777        struct PidModule;
1778        impl EngineModule for PidModule {
1779            fn name(&self) -> &'static str {
1780                "pid-module"
1781            }
1782            fn register_pids(&self, router: &mut PidRouter) {
1783                router.register(55001, "gpke-supplier-change");
1784                router.register(55002, "gpke-supplier-change");
1785            }
1786        }
1787
1788        let ctx = EngineBuilder::new()
1789            .with_event_store(InMemoryEventStore::new())
1790            .register(Box::new(PidModule))
1791            .build();
1792
1793        assert_eq!(ctx.pid_router().route(55001), Some("gpke-supplier-change"));
1794        assert_eq!(ctx.pid_router().route(55002), Some("gpke-supplier-change"));
1795        assert!(ctx.pid_router().route(99999).is_none());
1796        assert_eq!(ctx.pid_router().len(), 2);
1797    }
1798
1799    /// Verify that `register_pids_with_roles` gates PIDs behind role checks.
1800    ///
1801    /// Scenario: two modules share PID 19001.
1802    /// - ModuleA registers 19001 → "workflow-a" when role `Nb` is present.
1803    /// - ModuleB registers 19001 → "workflow-b" when role `Nmsb` is explicitly set
1804    ///   (not on `all()`).
1805    ///
1806    /// - `all()`: ModuleA fires (Nb ∈ all), ModuleB does NOT (is_all → skip).
1807    ///   → 19001 routes to "workflow-a".
1808    /// - `from_roles([Nb])`: ModuleA fires, ModuleB skips.
1809    ///   → 19001 routes to "workflow-a".
1810    /// - `from_roles([Nmsb])`: ModuleA skips, ModuleB fires.
1811    ///   → 19001 routes to "workflow-b".
1812    #[test]
1813    fn register_pids_with_roles_gates_pids_correctly() {
1814        use crate::marktrolle::{DeploymentRoles, Marktrolle};
1815
1816        struct ModuleA;
1817        impl EngineModule for ModuleA {
1818            fn name(&self) -> &'static str {
1819                "module-a"
1820            }
1821            fn register_pids_with_roles(&self, router: &mut PidRouter, roles: &DeploymentRoles) {
1822                if roles.contains(Marktrolle::Nb) {
1823                    router.register(19_001, "workflow-a");
1824                }
1825            }
1826        }
1827
1828        struct ModuleB;
1829        impl EngineModule for ModuleB {
1830            fn name(&self) -> &'static str {
1831                "module-b"
1832            }
1833            fn register_pids_with_roles(&self, router: &mut PidRouter, roles: &DeploymentRoles) {
1834                // Only fires on explicit Nmsb, not on all() (backward-compat sentinel).
1835                if !roles.is_all() && roles.contains(Marktrolle::Nmsb) {
1836                    router.register(19_001, "workflow-b");
1837                    router.register(19_015, "workflow-b");
1838                }
1839            }
1840        }
1841
1842        let build = |roles: DeploymentRoles| {
1843            EngineBuilder::new()
1844                .with_event_store(InMemoryEventStore::new())
1845                .with_deployment_roles(roles)
1846                .register(Box::new(ModuleA))
1847                .register(Box::new(ModuleB))
1848                .build()
1849        };
1850
1851        // all() → backward compat: ModuleA registers 19001 (Nb ∈ all), ModuleB skips.
1852        let ctx = build(DeploymentRoles::all());
1853        assert_eq!(ctx.pid_router().route(19_001), Some("workflow-a"));
1854        assert!(ctx.pid_router().route(19_015).is_none());
1855
1856        // Explicit Nb → same result: ModuleA registers, ModuleB (nMSB) skips.
1857        let ctx = build(DeploymentRoles::nb());
1858        assert_eq!(ctx.pid_router().route(19_001), Some("workflow-a"));
1859        assert!(ctx.pid_router().route(19_015).is_none());
1860
1861        // Explicit Nmsb → ModuleA skips (Nb ∉ roles), ModuleB registers.
1862        let ctx = build(DeploymentRoles::nmsb());
1863        assert_eq!(ctx.pid_router().route(19_001), Some("workflow-b"));
1864        assert_eq!(ctx.pid_router().route(19_015), Some("workflow-b"));
1865    }
1866
1867    /// Verify that explicit roles with two conflicting modules panic at build time.
1868    #[test]
1869    #[should_panic(expected = "overlapping PID registrations")]
1870    fn register_pids_with_roles_conflict_panics_with_explicit_roles() {
1871        use crate::marktrolle::{DeploymentRoles, Marktrolle};
1872
1873        struct ConflictA;
1874        impl EngineModule for ConflictA {
1875            fn name(&self) -> &'static str {
1876                "conflict-a"
1877            }
1878            fn register_pids_with_roles(&self, router: &mut PidRouter, roles: &DeploymentRoles) {
1879                if roles.contains(Marktrolle::Nb) {
1880                    router.register(19_001, "workflow-a");
1881                }
1882            }
1883        }
1884
1885        struct ConflictB;
1886        impl EngineModule for ConflictB {
1887            fn name(&self) -> &'static str {
1888                "conflict-b"
1889            }
1890            fn register_pids_with_roles(&self, router: &mut PidRouter, roles: &DeploymentRoles) {
1891                if !roles.is_all() && roles.contains(Marktrolle::Nmsb) {
1892                    router.register(19_001, "workflow-b"); // same PID, different workflow
1893                }
1894            }
1895        }
1896
1897        // from_roles([Nb, Nmsb]): both modules fire → panic (overlapping PIDs).
1898        let _ = EngineBuilder::new()
1899            .with_event_store(InMemoryEventStore::new())
1900            .with_deployment_roles(DeploymentRoles::from_roles([
1901                Marktrolle::Nb,
1902                Marktrolle::Nmsb,
1903            ]))
1904            .register(Box::new(ConflictA))
1905            .register(Box::new(ConflictB))
1906            .build();
1907    }
1908}