Skip to main content

mako_engine/
builder.rs

1//! [`EngineModule`] trait, [`EngineBuilder`], and [`EngineContext`].
2//!
3//! # Summary
4//!
5//! `EngineBuilder` assembles all engine infrastructure into a single
6//! [`EngineContext`] value. Domain modules (GPKE, WiM, GeLi Gas, …) register
7//! themselves at startup via the [`EngineModule`] trait, making their names
8//! visible in diagnostics and health checks.
9//!
10//! # Type-state guarantee
11//!
12//! [`EngineBuilder::build`] is only available when the event store type
13//! parameter `ES` implements [`EventStore`]. Forgetting to call
14//! [`with_event_store`] is a **compile-time error**, not a runtime panic.
15//!
16//! All other stores default to their respective `Noop` implementations:
17//!
18//! | Store | Default |
19//! |-------|---------|
20//! | Snapshot store | [`NoopSnapshotStore`] |
21//! | Outbox store | [`NoopOutboxStore`] |
22//! | Deadline store | [`NoopDeadlineStore`] |
23//! | Process registry | [`NoopProcessRegistry`] |
24//!
25//! # Assembly example
26//!
27//! ```rust,ignore
28//! use mako_engine::builder::{EngineBuilder, EngineModule};
29//! use mako_engine::event_store::InMemoryEventStore;
30//! use mako_engine::outbox::InMemoryOutboxStore;
31//! use mako_engine::deadline::InMemoryDeadlineStore;
32//! use mako_engine::registry::InMemoryProcessRegistry;
33//! use mako_engine::snapshot::InMemorySnapshotStore;
34//!
35//! struct GpkeModule;
36//! impl EngineModule for GpkeModule { fn name(&self) -> &'static str { "gpke" } }
37//!
38//! let ctx = EngineBuilder::new()
39//!     .with_event_store(InMemoryEventStore::new())
40//!     .with_snapshot_store(InMemorySnapshotStore::new())
41//!     .with_outbox_store(InMemoryOutboxStore::new())
42//!     .with_deadline_store(InMemoryDeadlineStore::new())
43//!     .with_registry(InMemoryProcessRegistry::new())
44//!     .register(Box::new(GpkeModule))
45//!     .build();
46//!
47//! // Spawn a fresh process:
48//! let p = ctx.spawn::<SupplierChangeWorkflow>(tenant_id, workflow_id);
49//! p.execute(ReceiveUtilmd { .. }).await?;
50//!
51//! // Resume an existing process from a persisted identity:
52//! let identity = ctx.registry.lookup(&conv_id.to_string()).await?.unwrap();
53//! let p = ctx.resume::<SupplierChangeWorkflow>(identity);
54//!
55//! // Access stores for delivery workers / schedulers:
56//! let pending = ctx.outbox_store.pending_now(50).await?;
57//! let overdue = ctx.deadline_store.due_now(50).await?;
58//! ```
59//!
60//! [`with_event_store`]: EngineBuilder::with_event_store
61
62// Type-state generics can produce long signatures that trip up the
63// `type_complexity` lint; suppress it for this module only.
64#![allow(clippy::type_complexity)]
65
66use crate::{
67    dead_letter::{DeadLetterSink, LogDeadLetterSink},
68    deadline::{Deadline, DeadlineStore, NoopDeadlineStore},
69    error::EngineError,
70    event_store::EventStore,
71    ids::{ProcessIdentity, TenantId},
72    marktrolle::DeploymentRoles,
73    outbox::{NoopOutboxStore, OutboxMessage, OutboxStore},
74    pid_router::PidRouter,
75    process::Process,
76    registry::{NoopProcessRegistry, ProcessRegistry},
77    snapshot::{NoopSnapshotStore, SnapshotStore},
78    version::WorkflowId,
79    workflow::Workflow,
80};
81
82use std::sync::Arc;
83
84// ── EngineModule ──────────────────────────────────────────────────────────────
85
86/// A self-contained domain module that registers with the engine at startup.
87///
88/// Domain crates implement this trait to declare their presence in the engine.
89/// The module name is surfaced in [`EngineContext::registered_modules`] for
90/// diagnostics, health checks, and log output.
91///
92/// ## Startup validation
93///
94/// Override [`configure`] to perform adapter coverage checks at engine startup
95/// time. The engine calls [`configure`] for every registered module during
96/// [`EngineBuilder::build`] and panics with an actionable message if any
97/// module returns `Err`. This surfaces missing adapter registrations as a
98/// startup failure rather than a silent runtime error.
99///
100/// ## Example
101///
102/// ```rust,ignore
103/// pub struct GpkeModule;
104///
105/// impl EngineModule for GpkeModule {
106///     fn name(&self) -> &'static str { "gpke" }
107///
108///     fn configure(&self) -> Result<(), String> {
109///         // Validate that every known BDEW format version has an adapter:
110///         GPKE_ADAPTER_REGISTRY
111///             .validate_policy(&GpkeWorkflow::version_policy(), &KNOWN_FVS)
112///             .map_err(|uncovered| format!(
113///                 "gpke: missing adapters for format versions: {:?}",
114///                 uncovered
115///             ))
116///     }
117/// }
118///
119/// let ctx = EngineBuilder::new()
120///     .with_event_store(my_store)
121///     .register(Box::new(GpkeModule))
122///     .build(); // panics if GpkeModule::configure returns Err
123///
124/// assert_eq!(ctx.registered_modules(), &["gpke"]);
125/// ```
126///
127/// [`configure`]: EngineModule::configure
128pub trait EngineModule: Send + 'static {
129    /// Stable, unique name for this domain module.
130    ///
131    /// Used in diagnostics, health checks, and structured log output.
132    /// Choose a short lowercase identifier (e.g. `"gpke"`, `"wim"`,
133    /// `"geli"`).
134    fn name(&self) -> &'static str;
135
136    /// Register all PIDs this module handles into the shared [`PidRouter`].
137    ///
138    /// # Mutability contract
139    ///
140    /// This method is called **exactly once** by [`EngineBuilder::build`],
141    /// before the resulting [`EngineContext`] is handed to the caller. The
142    /// `&mut PidRouter` reference is only available here, at build time.
143    /// After `build` returns the router is **sealed** — the engine provides
144    /// only a shared `&PidRouter` reference, with no mutation path at runtime.
145    ///
146    /// Consequence: **all PIDs a module will ever need must be registered
147    /// here**. Do not attempt to register PIDs lazily from async handlers or
148    /// after the engine has started — there is no API for that by design.
149    ///
150    /// Duplicate registrations (same PID from two modules) silently overwrite
151    /// the previous mapping; the last module to register wins. Use
152    /// `cargo xtask validate-pruefids` to catch accidental PID conflicts
153    /// between modules before they reach production.
154    ///
155    /// For role-conditional registration (PIDs that should only be active for
156    /// specific BDEW Marktrollen), override [`register_pids_with_roles`] instead.
157    ///
158    /// # Example
159    ///
160    /// ```rust,ignore
161    /// fn register_pids(&self, router: &mut PidRouter) {
162    ///     // GPKE Lieferantenwechsel / Lieferbeginn (BK6-22-024, PIDs 55001, 55002, 55017)
163    ///     for &pid in &[55001_u32, 55002, 55017] {
164    ///         router.register(pid, "gpke-supplier-change");
165    ///     }
166    /// }
167    /// ```
168    ///
169    /// [`register_pids_with_roles`]: EngineModule::register_pids_with_roles
170    fn register_pids(&self, _router: &mut PidRouter) {}
171
172    /// Register PIDs with role-context awareness.
173    ///
174    /// This is the **preferred override** for modules that have role-conditional
175    /// PID registrations — PIDs that should only be active when this `makod`
176    /// instance holds a specific [`Marktrolle`].
177    ///
178    /// The default implementation calls [`register_pids`] (role-agnostic) so
179    /// existing modules that override `register_pids` continue to work without
180    /// changes.
181    ///
182    /// Override this method instead of `register_pids` when any PID registration
183    /// should be conditional on the deployment role:
184    ///
185    /// ```rust,ignore
186    /// use mako_engine::marktrolle::Marktrolle;
187    ///
188    /// fn register_pids_with_roles(&self, router: &mut PidRouter, roles: &DeploymentRoles) {
189    ///     // Always register: 55001, 55002 (not role-specific)
190    ///     for pid in [55001_u32, 55002] { router.register_with_module(pid, "gpke-supplier-change", self.name()); }
191    ///
192    ///     // Only when NB role: 19001/19002 inbound ORDRSP from MSB
193    ///     if roles.contains(Marktrolle::Nb) {
194    ///         for pid in [19001_u32, 19002] { router.register_with_module(pid, "gpke-konfiguration", self.name()); }
195    ///     }
196    /// }
197    /// ```
198    ///
199    /// # Conflict guard
200    ///
201    /// Use [`PidRouter::register_with_module`] (not `register`) inside this
202    /// method. The conflict guard panics at build time if two modules register
203    /// the same PID to different workflows — this makes role misconfigurations
204    /// visible at startup rather than silently misrouting messages.
205    ///
206    /// [`Marktrolle`]: crate::marktrolle::Marktrolle
207    /// [`register_pids`]: EngineModule::register_pids
208    fn register_pids_with_roles(&self, router: &mut PidRouter, _roles: &DeploymentRoles) {
209        self.register_pids(router);
210    }
211
212    /// Workflow names this module handles for deadline dispatch.
213    ///
214    /// Return the same name strings that [`register_pids`] maps PIDs to.
215    /// These names are stored in [`EngineContext::registered_workflows`] and
216    /// used to validate that every workflow that has deadlines scheduled is
217    /// covered by the deadline scheduler dispatch function at runtime.
218    ///
219    /// The default implementation returns an empty slice. Override it to
220    /// declare all workflow names that may fire deadlines:
221    ///
222    /// ```rust,ignore
223    /// fn workflow_names(&self) -> &'static [&'static str] {
224    ///     &["gpke-supplier-change", "gpke-abrechnung"]
225    /// }
226    /// ```
227    ///
228    /// [`register_pids`]: EngineModule::register_pids
229    /// [`EngineContext::registered_workflows`]: crate::builder::EngineContext::registered_workflows
230    fn workflow_names(&self) -> &'static [&'static str] {
231        &[]
232    }
233
234    /// Declare the EDIFACT profile types this module requires at runtime.
235    ///
236    /// Returning a non-empty slice causes [`EngineBuilder::build`] to call the
237    /// registered profile validator for each requirement.  If no active profile
238    /// exists for a required message type, `build` panics with an actionable
239    /// error so deployment fails fast rather than silently.
240    ///
241    /// **This replaces the previous pattern** of calling
242    /// `edi_energy::registry::ReleaseRegistry::global()` inside `configure()`.
243    /// Domain crates no longer need `edi-energy` in their production
244    /// `[dependencies]` — they just declare their requirements here.
245    ///
246    /// ```rust,ignore
247    /// fn profile_requirements(&self) -> &'static [ProfileRequirement] {
248    ///     &[
249    ///         ProfileRequirement { message_type: "UTILMD", label: "UTILMD Strom (GPKE)" },
250    ///         ProfileRequirement { message_type: "INVOIC", label: "INVOIC Abrechnung (GPKE)" },
251    ///     ]
252    /// }
253    /// ```
254    ///
255    /// [`ProfileRequirement`]: crate::profile::ProfileRequirement
256    fn profile_requirements(&self) -> &'static [crate::profile::ProfileRequirement] {
257        &[]
258    }
259
260    /// Validate adapter coverage and configuration at engine startup.
261    ///
262    /// Called by [`EngineBuilder::build`] after all modules are registered.
263    /// Return `Ok(())` when the module is fully configured. Return `Err(msg)`
264    /// with an actionable description when an adapter or configuration is
265    /// missing — the engine will panic with that message so the deployment
266    /// fails early rather than silently.
267    ///
268    /// The default implementation is a no-op (always returns `Ok(())`).
269    /// Override it in domain crates to call
270    /// [`AdapterRegistry::validate_policy`] and emit structured errors.
271    ///
272    /// Note: if your validation needs access to the edi-energy profile
273    /// registry, use [`profile_requirements`] instead — it does not require
274    /// importing `edi-energy` in domain crates.
275    ///
276    /// [`AdapterRegistry::validate_policy`]: crate::message_adapter::AdapterRegistry::validate_policy
277    /// [`profile_requirements`]: EngineModule::profile_requirements
278    ///
279    /// # Errors
280    ///
281    /// Returns a descriptive error string when the module's configuration is invalid.
282    fn configure(&self) -> Result<(), String> {
283        Ok(())
284    }
285}
286
287// ── EngineContext ─────────────────────────────────────────────────────────────
288
289/// Assembled engine infrastructure returned by [`EngineBuilder::build`].
290///
291/// `EngineContext` bundles all stores and the process registry into a single
292/// value. It is the root dependency for:
293///
294/// - Spawning new processes ([`spawn`])
295/// - Resuming existing processes ([`resume`])
296/// - Running outbox delivery workers (`outbox_store.pending_now(…)`)
297/// - Driving the deadline scheduler (`deadline_store.due_now(…)`)
298///
299/// ## Generic parameters
300///
301/// | Param | Role | Default |
302/// |-------|------|---------|
303/// | `ES`  | [`EventStore`] backend | — (required) |
304/// | `SS`  | [`SnapshotStore`] backend | [`NoopSnapshotStore`] |
305/// | `OS`  | [`OutboxStore`] backend  | [`NoopOutboxStore`]   |
306/// | `DS`  | [`DeadlineStore`] backend | [`NoopDeadlineStore`] |
307/// | `PR`  | [`ProcessRegistry`] backend | [`NoopProcessRegistry`] |
308///
309/// In most codebases all type parameters are inferred from the builder calls.
310///
311/// [`spawn`]: EngineContext::spawn
312/// [`resume`]: EngineContext::resume
313pub struct EngineContext<
314    ES,
315    SS = NoopSnapshotStore,
316    OS = NoopOutboxStore,
317    DS = NoopDeadlineStore,
318    PR = NoopProcessRegistry,
319> {
320    event_store: Arc<ES>,
321    snapshot_store: SS,
322    outbox_store: OS,
323    deadline_store: DS,
324    registry: PR,
325    /// Dead-letter sink for unroutable or unprocessable inbound messages.
326    ///
327    /// Stored as `Arc<dyn DeadLetterSink>` so callers can share it across
328    /// tasks without an extra type parameter on `EngineContext`.
329    pub dead_letter_sink: Arc<dyn DeadLetterSink>,
330    /// PID-to-workflow routing table, populated from all registered modules.
331    pid_router: PidRouter,
332    registered_modules: Vec<&'static str>,
333    /// Workflow names declared by all registered modules via
334    /// [`EngineModule::workflow_names`]. Used to validate deadline scheduler
335    /// coverage at runtime (see [`EngineContext::registered_workflows`]).
336    registered_workflows: Vec<&'static str>,
337}
338
339// ── Type aliases ──────────────────────────────────────────────────────────────
340
341/// An [`EngineContext`] with all optional subsystems disabled.
342///
343/// Uses `NoopSnapshotStore` and, in `testing`-enabled builds, Noop
344/// implementations for outbox, deadline, and process registry. Suitable for
345/// tests and minimal deployments where only a durable event store is required.
346///
347/// All five type parameters are inferred from context when used with
348/// [`EngineBuilder`]:
349///
350/// ```rust,ignore
351/// // Only available in test / testing-feature builds:
352/// use mako_engine::builder::{EngineBuilder, MinimalEngine};
353/// use mako_engine::event_store::InMemoryEventStore;
354///
355/// let ctx: MinimalEngine<InMemoryEventStore> = EngineBuilder::new()
356///     .with_event_store(InMemoryEventStore::new())
357///     .build();
358/// ```
359pub type MinimalEngine<ES> = EngineContext<ES>;
360
361impl<ES, SS, OS, DS, PR> EngineContext<ES, SS, OS, DS, PR>
362where
363    ES: EventStore,
364{
365    /// Spawn a new process and return a typed `Process<W, Arc<ES>>` handle.
366    ///
367    /// No `ES: Clone` bound is required — the engine stores the event store
368    /// behind an `Arc` so spawning is always a cheap pointer clone.
369    ///
370    /// ```rust,ignore
371    /// let p = ctx.spawn::<SupplierChangeWorkflow>(tenant_id, workflow_id);
372    /// p.execute(ReceiveUtilmd { .. }).await?;
373    /// ```
374    #[must_use]
375    pub fn spawn<W: Workflow>(
376        &self,
377        tenant_id: TenantId,
378        workflow_id: WorkflowId,
379    ) -> Process<W, Arc<ES>> {
380        Process::new(Arc::clone(&self.event_store), tenant_id, workflow_id)
381    }
382
383    /// Resume an existing process from a [`ProcessIdentity`].
384    ///
385    /// ```rust,ignore
386    /// let identity = ctx.registry()
387    ///     .lookup(tenant_id, &conv_id.to_string())
388    ///     .await?
389    ///     .ok_or(EngineError::Registry("unknown conversation".into()))?;
390    /// let p = ctx.resume::<SupplierChangeWorkflow>(identity);
391    /// p.execute(HandleAperak { .. }).await?;
392    /// ```
393    #[must_use]
394    pub fn resume<W: Workflow>(&self, identity: ProcessIdentity) -> Process<W, Arc<ES>> {
395        Process::from_identity(Arc::clone(&self.event_store), identity)
396    }
397
398    /// Names of all domain modules registered with the builder, in
399    /// registration order.
400    #[must_use]
401    pub fn registered_modules(&self) -> &[&'static str] {
402        &self.registered_modules
403    }
404
405    /// Workflow names declared by all registered modules, in registration order.
406    ///
407    /// Use this in the deadline scheduler dispatch function to detect unknown
408    /// workflow names at startup. If a deadline fires for a workflow name that
409    /// is not in this list, the scheduler's dispatch function should emit an
410    /// error rather than silently dropping the deadline:
411    ///
412    /// ```rust,ignore
413    /// let known = ctx.registered_workflows().iter().copied().collect::<HashSet<_>>();
414    /// let scheduler = ctx.run_deadline_scheduler(
415    ///     move |deadline| {
416    ///         let wf = deadline.workflow_id().name.as_ref();
417    ///         if !known.contains(wf) {
418    ///             tracing::error!(workflow = %wf, "deadline fired for unregistered workflow");
419    ///             return Box::pin(async { Ok(()) });
420    ///         }
421    ///         // dispatch by workflow name …
422    ///         Box::pin(async { Ok(()) })
423    ///     },
424    ///     100,
425    ///     Duration::from_secs(30),
426    /// );
427    /// ```
428    #[must_use]
429    pub fn registered_workflows(&self) -> &[&'static str] {
430        &self.registered_workflows
431    }
432
433    /// The event store backend (behind an `Arc`).
434    #[must_use]
435    pub fn event_store(&self) -> &Arc<ES> {
436        &self.event_store
437    }
438
439    /// The snapshot store backend.
440    #[must_use]
441    pub fn snapshot_store(&self) -> &SS {
442        &self.snapshot_store
443    }
444
445    /// The outbox store backend.
446    ///
447    /// Poll `outbox_store().pending_now(limit)` in a background task to drain
448    /// the delivery queue.
449    #[must_use]
450    pub fn outbox_store(&self) -> &OS {
451        &self.outbox_store
452    }
453
454    /// The deadline store backend.
455    ///
456    /// Poll `deadline_store().due_now(limit)` in a background scheduler to
457    /// fire overdue process timers.
458    #[must_use]
459    pub fn deadline_store(&self) -> &DS {
460        &self.deadline_store
461    }
462
463    /// The process routing registry.
464    ///
465    /// Register a [`ProcessIdentity`] under a `(tenant_id, key)` pair at
466    /// process creation, then `lookup` it when routing inbound messages.
467    #[must_use]
468    pub fn registry(&self) -> &PR {
469        &self.registry
470    }
471
472    /// The dead-letter sink for unroutable or unprocessable messages.
473    ///
474    /// Call [`DeadLetterSink::reject`] when an inbound message cannot be
475    /// dispatched to any workflow. The default sink emits `tracing::warn!`
476    /// so rejections are always visible in the log output.
477    #[must_use]
478    pub fn dead_letter_sink(&self) -> &Arc<dyn DeadLetterSink> {
479        &self.dead_letter_sink
480    }
481
482    /// Assert that no Noop store is active — call this during production startup.
483    ///
484    /// Checks the type names of `OS`, `DS`, and `PR` against the string `"Noop"`.
485    /// Panics with a human-readable message if any match, directing the operator
486    /// to configure a persistent backend.
487    ///
488    /// # When to call
489    ///
490    /// Call this early in `makod`'s startup path (and `--check` mode) to catch
491    /// deployments where a Noop store was accidentally wired — e.g. the
492    /// `[outbox]`, `[deadline]`, or `[registry]` configuration section was
493    /// omitted from `makod.toml`.  The check is defence-in-depth: in release
494    /// builds without the `testing` feature, Noop stores cannot implement the
495    /// required traits at all and the compiler would have already rejected them.
496    ///
497    /// # Panics
498    ///
499    /// Panics when any of `OS`, `DS`, or `PR` is a Noop implementation.
500    pub fn assert_production_stores(&self) {
501        let checks: &[(&str, &str)] = &[
502            ("OutboxStore", std::any::type_name::<OS>()),
503            ("DeadlineStore", std::any::type_name::<DS>()),
504            ("ProcessRegistry", std::any::type_name::<PR>()),
505        ];
506        for (trait_name, type_name) in checks {
507            assert!(
508                !type_name.contains("Noop"),
509                "makod: Noop{trait_name} is active — \
510                 configure a persistent {trait_name} backend in makod.toml. \
511                 Type resolved to: {type_name}"
512            );
513        }
514    }
515
516    /// The PID-to-workflow routing table.
517    ///
518    /// Populated **once** during [`EngineBuilder::build`] by calling
519    /// [`EngineModule::register_pids`] on every registered module in
520    /// registration order. After `build` returns the table is **sealed** —
521    /// it is read-only for the lifetime of the `EngineContext` and may be
522    /// freely shared across async tasks without synchronisation.
523    ///
524    /// # Mutability contract
525    ///
526    /// There is intentionally no `pid_router_mut()` accessor. Adding PIDs
527    /// after the engine is built would create a TOCTOU race between the
528    /// dispatch path (which calls `route(pid)`) and any hypothetical
529    /// concurrent mutator. Instead, register all PIDs during the build phase
530    /// via `EngineModule::register_pids`.
531    ///
532    /// If a new process family needs to be added without restarting the
533    /// binary, rebuild and restart `makod` — hot-swap of PID routing is not
534    /// supported.
535    ///
536    /// # Example — dispatch at the AS4 reception boundary
537    ///
538    /// ```rust,ignore
539    /// let workflow_name = ctx.pid_router().route(pid)
540    ///     .ok_or_else(|| EngineError::Workflow(WorkflowError::InvalidCommand(
541    ///         format!("no workflow registered for PID {pid}").into()
542    ///     )))?;
543    ///
544    /// match workflow_name {
545    ///     "gpke-supplier-change" => dispatch::<GpkeSupplierChangeWorkflow>(&ctx, pid, payload).await,
546    ///     "wim-device-change"    => dispatch::<WimDeviceChangeWorkflow>(&ctx, pid, payload).await,
547    ///     other => Err(EngineError::Workflow(WorkflowError::InvalidCommand(
548    ///         format!("unhandled workflow name: {other}").into()
549    ///     ))),
550    /// }
551    /// ```
552    #[must_use]
553    pub fn pid_router(&self) -> &PidRouter {
554        &self.pid_router
555    }
556}
557
558// ── As4Sender ─────────────────────────────────────────────────────────────────
559
560/// Sends a single AS4 / EDIINT-over-HTTP outbound message.
561///
562/// Implement this trait for your AS4 gateway client and pass it to
563/// [`EngineContext::run_outbox_worker`].
564///
565/// # Contract
566///
567/// Return `Ok(())` only after the message has been **durably accepted** by the
568/// receiving MSH.  Return `Err(…)` on transient or permanent failure — the
569/// outbox worker calls [`OutboxStore::reschedule`] so the message is retried.
570pub trait As4Sender: Send + Sync + 'static {
571    /// Transmit `msg` and return when the remote MSH has accepted it.
572    fn send(
573        &self,
574        msg: &OutboxMessage,
575    ) -> impl std::future::Future<Output = Result<(), EngineError>> + Send;
576}
577
578// ── OutboxWorker ──────────────────────────────────────────────────────────────
579
580/// A background worker that drains the outbox by polling pending
581/// [`OutboxMessage`]s and dispatching them via an [`As4Sender`].
582///
583/// Obtain via [`EngineContext::run_outbox_worker`] and drive by spawning
584/// [`OutboxWorker::run`] in a Tokio task.
585///
586/// # Polling behaviour
587///
588/// When the poll returns an empty batch the worker sleeps for `poll_interval`
589/// before polling again.  Non-empty batches are processed immediately.
590///
591/// # Error handling
592///
593/// Successful sends are acknowledged via [`OutboxStore::acknowledge`].
594/// Failed sends are rescheduled via [`OutboxStore::reschedule`] using
595/// **full-jitter exponential backoff**: `delay = rand(0, min(MAX, BASE * 2^n))`
596/// where `n = attempt_count`. This avoids thundering-herd when multiple
597/// `makod` instances restart simultaneously after a receiver outage.
598///
599/// When `attempt_count >= max_attempts`, the message is **acknowledged** (removed
600/// from the outbox) and a [`DeadLetterReason::OutboxExhausted`] record is written
601/// to the dead-letter sink. This prevents permanently-undeliverable messages
602/// from clogging the outbox forever.
603///
604/// All errors are emitted as structured `tracing` events at `warn` / `error`
605/// level rather than `eprintln!`, so they appear in the application's log
606/// pipeline with full context (message_id, error).
607///
608/// # Example
609///
610/// ```rust,ignore
611/// use std::time::Duration;
612///
613/// let worker = ctx.run_outbox_worker(my_sender, 50, Duration::from_secs(1));
614/// tokio::spawn(async move { worker.run().await });
615/// ```
616///
617/// [`DeadLetterReason::OutboxExhausted`]: crate::dead_letter::DeadLetterReason::OutboxExhausted
618pub struct OutboxWorker<OS: OutboxStore, S: As4Sender> {
619    store: OS,
620    sender: S,
621    batch_size: usize,
622    poll_interval: std::time::Duration,
623    /// Maximum total delivery attempts before a message is dead-lettered.
624    ///
625    /// Default: 48 (covers ~4 hours at the 300 s backoff cap).
626    /// Set to `u32::MAX` to disable the cap (not recommended for production).
627    max_attempts: u32,
628    /// Sink for messages that exceed `max_attempts`.
629    dead_letter_sink: std::sync::Arc<dyn crate::dead_letter::DeadLetterSink>,
630}
631
632/// Compute a full-jitter exponential backoff delay.
633///
634/// `attempt` is the number of prior attempts (0 = first retry).
635/// `entropy` provides randomness; derive from a stable message identifier
636/// (e.g. hash of `message_id`) rather than the current timestamp — a
637/// timestamp-derived value is deterministic within a single batch, which
638/// defeats jitter when multiple messages fail simultaneously.
639///
640/// | attempt | window (s) | expected delay (s) |
641/// |---------|------------|-------------------|
642/// | 0       | 5          | 2.5               |
643/// | 1       | 10         | 5                 |
644/// | 2       | 20         | 10                |
645/// | 3       | 40         | 20                |
646/// | 4       | 80         | 40                |
647/// | 5+      | 300 (cap)  | 150               |
648fn backoff_delay(attempt: u32, entropy: u64) -> std::time::Duration {
649    const BASE_SECS: u64 = 5;
650    const MAX_SECS: u64 = 300;
651    // Exponential window: BASE * 2^attempt, capped at MAX.
652    let window = BASE_SECS
653        .saturating_mul(1u64.wrapping_shl(attempt.min(5)))
654        .min(MAX_SECS);
655    // Full jitter: uniform random in [0, window).
656    let jitter_secs = if window == 0 { 0 } else { entropy % window };
657    std::time::Duration::from_secs(jitter_secs)
658}
659
660impl<OS: OutboxStore, S: As4Sender> OutboxWorker<OS, S> {
661    /// Run the outbox drain loop until the task is cancelled.
662    ///
663    /// # Panics
664    ///
665    /// Panics if `time::Duration::try_from(delay)` overflows (unreachable for
666    /// the delay values produced by `backoff_delay`).
667    #[allow(clippy::too_many_lines)]
668    pub async fn run(self) {
669        loop {
670            let batch = match self.store.pending_now(self.batch_size).await {
671                Ok(b) => b,
672                Err(e) => {
673                    tracing::warn!(error = %e, "outbox worker: store error polling pending messages (will retry)");
674                    tokio::time::sleep(self.poll_interval).await;
675                    continue;
676                }
677            };
678
679            if batch.is_empty() {
680                tokio::time::sleep(self.poll_interval).await;
681                continue;
682            }
683
684            for msg in batch {
685                // ── Max-attempt cap ───────────────────────────────────
686                // `attempt_count` starts at 0 and is incremented on each
687                // `reschedule` call.  When it reaches `max_attempts` the
688                // message is considered permanently undeliverable: acknowledge
689                // it (remove from outbox) and dead-letter it so the regulatory
690                // audit trail is preserved.
691                if msg.attempt_count >= self.max_attempts {
692                    tracing::error!(
693                        message_id   = %msg.message_id,
694                        message_type = %msg.message_type,
695                        recipient    = %msg.recipient,
696                        attempts     = msg.attempt_count,
697                        max_attempts = self.max_attempts,
698                        "outbox worker: max delivery attempts reached; dead-lettering message",
699                    );
700                    self.dead_letter_sink.reject(
701                        &crate::dead_letter::DeadLetterReason::OutboxExhausted {
702                            message_id: msg.message_id,
703                            message_type: msg.message_type.to_string(),
704                            recipient: msg.recipient.to_string(),
705                            last_error: format!(
706                                "delivery exhausted after {} attempts",
707                                msg.attempt_count
708                            ),
709                            attempts: msg.attempt_count,
710                        },
711                    );
712                    if let Err(e) = self.store.acknowledge(msg.message_id).await {
713                        tracing::error!(
714                            message_id = %msg.message_id,
715                            error = %e,
716                            "outbox worker: acknowledge after exhaust failed; message may reappear",
717                        );
718                    }
719                    continue;
720                }
721
722                match self.sender.send(&msg).await {
723                    Ok(()) => {
724                        if let Err(e) = self.store.acknowledge(msg.message_id).await {
725                            tracing::warn!(
726                                message_id = %msg.message_id,
727                                error = %e,
728                                "outbox worker: acknowledge failed",
729                            );
730                        }
731                        // CONTRL AHB 1.0 §1.2: the CONTRL must be delivered
732                        // within 6 wall-clock hours of interchange receipt.
733                        // `msg.created_at` is when the PendingOutbox was
734                        // materialised (which should equal the ingest timestamp
735                        // for transport-layer CONTRL obligations).
736                        if msg.message_type.as_ref() == "CONTRL" {
737                            let elapsed = time::OffsetDateTime::now_utc() - msg.created_at;
738                            if elapsed > time::Duration::hours(crate::fristen::CONTRL_FRIST_HOURS) {
739                                tracing::warn!(
740                                    message_id   = %msg.message_id,
741                                    elapsed_secs = elapsed.whole_seconds(),
742                                    max_secs     = crate::fristen::CONTRL_FRIST_HOURS * 3600,
743                                    "outbox worker: CONTRL delivered OUTSIDE the 6h Übertragungsfrist \
744                                     (CONTRL AHB 1.0 §1.2) — this is a BNetzA compliance violation"
745                                );
746                            }
747                        }
748                    }
749                    // Permanent error: dead-letter immediately without retrying.
750                    // PartnerUnknown requires operator intervention (add --as4-partner);
751                    // Serialization errors will never succeed on retry.
752                    Err(ref e)
753                        if e.is_partner_unknown() || matches!(e, EngineError::Serialization(_)) =>
754                    {
755                        tracing::error!(
756                            message_id   = %msg.message_id,
757                            message_type = %msg.message_type,
758                            recipient    = %msg.recipient,
759                            error        = %e,
760                            "outbox worker: permanent send failure; dead-lettering without retry",
761                        );
762                        self.dead_letter_sink.reject(
763                            &crate::dead_letter::DeadLetterReason::OutboxExhausted {
764                                message_id: msg.message_id,
765                                message_type: msg.message_type.to_string(),
766                                recipient: msg.recipient.to_string(),
767                                last_error: e.to_string(),
768                                attempts: msg.attempt_count,
769                            },
770                        );
771                        if let Err(re) = self.store.acknowledge(msg.message_id).await {
772                            tracing::error!(
773                                message_id = %msg.message_id,
774                                error = %re,
775                                "outbox worker: acknowledge after permanent failure failed",
776                            );
777                        }
778                    }
779                    Err(e) => {
780                        // Stable jitter entropy derived from the UUID bytes of
781                        // `message_id`.  Using the last 8 bytes as a `u64` gives
782                        // uniform entropy across message IDs (UUIDs are random in
783                        // all 128 bits for v4) and is stable across Rust versions —
784                        // unlike `DefaultHasher`, whose algorithm is explicitly
785                        // documented as unstable.
786                        let entropy = {
787                            let uuid = msg.message_id.as_uuid();
788                            let bytes = uuid.as_bytes();
789                            u64::from_le_bytes(bytes[8..16].try_into().unwrap())
790                        };
791                        let delay = backoff_delay(msg.attempt_count, entropy);
792                        let retry_at = time::OffsetDateTime::now_utc()
793                            + time::Duration::try_from(delay).unwrap_or(time::Duration::minutes(5));
794                        tracing::warn!(
795                            message_id   = %msg.message_id,
796                            attempt      = msg.attempt_count,
797                            max_attempts = self.max_attempts,
798                            retry_in     = ?delay,
799                            error        = %e,
800                            "outbox worker: send failed; rescheduling with backoff",
801                        );
802                        if let Err(re) = self.store.reschedule(msg.message_id, retry_at).await {
803                            tracing::error!(
804                                message_id = %msg.message_id,
805                                error      = %re,
806                                "outbox worker: reschedule failed; message may be stuck",
807                            );
808                        }
809                    }
810                }
811            }
812        }
813    }
814}
815
816impl<ES, SS, OS, DS, PR> EngineContext<ES, SS, OS, DS, PR>
817where
818    ES: EventStore,
819    OS: OutboxStore + Clone,
820{
821    /// Construct an [`OutboxWorker`] that drains the outbox via `sender`.
822    ///
823    /// `batch_size` — messages fetched per poll cycle.
824    /// `poll_interval` — sleep duration when the batch is empty.
825    ///
826    /// `max_attempts` — maximum total delivery attempts before dead-lettering.
827    /// Pass `48` for a ~4-hour retry budget at the 300 s backoff cap, or
828    /// `u32::MAX` to disable the cap (not recommended for production).
829    ///
830    /// ```rust,ignore
831    /// use std::time::Duration;
832    ///
833    /// let worker = ctx.run_outbox_worker(my_sender, 50, Duration::from_secs(1), 48);
834    /// tokio::spawn(async move { worker.run().await });
835    /// ```
836    #[must_use]
837    pub fn run_outbox_worker<S: As4Sender>(
838        &self,
839        sender: S,
840        batch_size: usize,
841        poll_interval: std::time::Duration,
842        max_attempts: u32,
843    ) -> OutboxWorker<OS, S> {
844        OutboxWorker {
845            store: self.outbox_store.clone(),
846            sender,
847            batch_size,
848            poll_interval,
849            max_attempts,
850            dead_letter_sink: self.dead_letter_sink.clone(),
851        }
852    }
853}
854
855impl<ES, SS, OS, DS, PR> std::fmt::Debug for EngineContext<ES, SS, OS, DS, PR>
856where
857    ES: std::fmt::Debug,
858    SS: std::fmt::Debug,
859    OS: std::fmt::Debug,
860    DS: std::fmt::Debug,
861    PR: std::fmt::Debug,
862{
863    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
864        f.debug_struct("EngineContext")
865            .field("registered_modules", &self.registered_modules)
866            .field("registered_workflows", &self.registered_workflows)
867            .field("pid_router_len", &self.pid_router.len())
868            .finish_non_exhaustive()
869    }
870}
871
872// ── NoopAs4Sender / LogAs4Sender ──────────────────────────────────────────────
873
874/// An [`As4Sender`] that succeeds immediately without sending anything.
875///
876/// Use in tests and environments where outbound AS4 delivery is not yet
877/// wired. All outbox messages are acknowledged (removed from the queue)
878/// without being transmitted.
879///
880/// # ⚠️ Data loss warning
881///
882/// Every outbox message is **silently discarded** — no EDIFACT message is
883/// sent to any counterparty. Do not use in production.
884#[derive(Debug, Clone, Copy, Default)]
885#[must_use = "NoopAs4Sender discards all outbound messages silently — use a real AS4 gateway in production"]
886pub struct NoopAs4Sender;
887
888impl As4Sender for NoopAs4Sender {
889    async fn send(&self, _msg: &OutboxMessage) -> Result<(), EngineError> {
890        Ok(())
891    }
892}
893
894/// An [`As4Sender`] that logs every outbound message at `warn` level and
895/// succeeds without transmitting.
896///
897/// Useful for development and integration-testing environments where the
898/// full AS4 stack is not yet available but message visibility is desired.
899/// All outbox messages are acknowledged (removed from the queue) after logging.
900///
901/// # ⚠️ Data loss warning
902///
903/// No EDIFACT message is sent to any counterparty. Do not use in production.
904#[derive(Debug, Clone, Copy, Default)]
905#[must_use = "LogAs4Sender discards all outbound messages — use a real AS4 gateway in production"]
906pub struct LogAs4Sender;
907
908impl As4Sender for LogAs4Sender {
909    async fn send(&self, msg: &OutboxMessage) -> Result<(), EngineError> {
910        tracing::warn!(
911            message_id   = %msg.message_id,
912            message_type = %msg.message_type,
913            recipient    = %msg.recipient,
914            "LogAs4Sender: outbox message dropped — configure a real AS4 gateway for production",
915        );
916        Ok(())
917    }
918}
919
920// ── DeadlineScheduler ─────────────────────────────────────────────────────────
921
922/// A background task that polls [`DeadlineStore::due_now`] and dispatches
923/// deadline commands to the owning processes via a caller-supplied function.
924///
925/// Obtain via [`EngineContext::run_deadline_scheduler`] and drive by spawning
926/// [`DeadlineScheduler::run`] in a Tokio task.
927///
928/// # Dispatch function
929///
930/// The `dispatch` function receives a fired [`Deadline`] and returns a future
931/// that dispatches the appropriate timeout command to the process. The function
932/// is responsible for resuming the correct workflow and calling `execute`.
933/// After the future completes, the scheduler cancels the deadline from the
934/// store regardless of the dispatch outcome (to prevent re-firing).
935///
936/// ```rust,ignore
937/// use std::time::Duration;
938///
939/// let scheduler = ctx.run_deadline_scheduler(
940///     |deadline| async move {
941///         tracing::warn!(
942///             deadline_id = %deadline.deadline_id(),
943///             label = %deadline.label(),
944///             "deadline fired",
945///         );
946///         Ok(())
947///     },
948///     100,
949///     Duration::from_secs(30),
950/// );
951/// tokio::spawn(async move { scheduler.run().await });
952/// ```
953pub struct DeadlineScheduler<DS: DeadlineStore> {
954    store: DS,
955    dispatch: Box<
956        dyn Fn(
957                Deadline,
958            ) -> std::pin::Pin<
959                Box<dyn std::future::Future<Output = Result<(), EngineError>> + Send>,
960            > + Send
961            + Sync,
962    >,
963    batch_size: usize,
964    poll_interval: std::time::Duration,
965}
966
967impl<DS: DeadlineStore> DeadlineScheduler<DS> {
968    /// Run the deadline poll loop until the task is cancelled.
969    pub async fn run(self) {
970        loop {
971            let result = match self.store.due_now(self.batch_size).await {
972                Ok(r) => r,
973                Err(e) => {
974                    tracing::warn!(
975                        error = %e,
976                        "deadline scheduler: store error polling due deadlines (will retry)",
977                    );
978                    tokio::time::sleep(self.poll_interval).await;
979                    continue;
980                }
981            };
982
983            if result.deadlines.is_empty() {
984                tokio::time::sleep(self.poll_interval).await;
985                continue;
986            }
987
988            for deadline in result.deadlines {
989                let id = deadline.deadline_id();
990                let label = deadline.label().to_owned();
991                let should_cancel = match (self.dispatch)(deadline).await {
992                    Ok(()) => true,
993                    Err(ref e) if e.is_version_conflict() => {
994                        // The process was modified concurrently; the timeout
995                        // command will be retried on the next poll cycle.
996                        // Do NOT cancel — let the deadline remain due so it
997                        // fires again until a non-conflict dispatch succeeds.
998                        tracing::warn!(
999                            deadline_id = %id,
1000                            label       = %label,
1001                            "deadline scheduler: VersionConflict; will retry on next poll",
1002                        );
1003                        false
1004                    }
1005                    Err(e) => {
1006                        tracing::warn!(
1007                            deadline_id = %id,
1008                            label       = %label,
1009                            error       = %e,
1010                            "deadline scheduler: dispatch failed (permanent); cancelling",
1011                        );
1012                        true
1013                    }
1014                };
1015                if should_cancel && let Err(e) = self.store.cancel(id).await {
1016                    tracing::error!(
1017                        deadline_id = %id,
1018                        error       = %e,
1019                        "deadline scheduler: cancel failed; deadline may fire again",
1020                    );
1021                }
1022            }
1023
1024            // If has_more, loop immediately to drain the batch.
1025        }
1026    }
1027}
1028
1029impl<ES, SS, OS, DS, PR> EngineContext<ES, SS, OS, DS, PR>
1030where
1031    ES: EventStore,
1032    DS: DeadlineStore + Clone,
1033{
1034    /// Construct a [`DeadlineScheduler`] that polls the deadline store and
1035    /// dispatches fired deadlines via `dispatch`.
1036    ///
1037    /// The `dispatch` function is called for every fired deadline. It should
1038    /// resume the owning process and execute the appropriate timeout command.
1039    ///
1040    /// `batch_size` — deadlines fetched per poll cycle.
1041    /// `poll_interval` — sleep duration when no deadlines are due.
1042    ///
1043    /// ```rust,ignore
1044    /// use std::time::Duration;
1045    ///
1046    /// let scheduler = ctx.run_deadline_scheduler(
1047    ///     |d| async move {
1048    ///         tracing::info!(label = %d.label(), "firing deadline");
1049    ///         Ok(())
1050    ///     },
1051    ///     100,
1052    ///     Duration::from_secs(30),
1053    /// );
1054    /// tokio::spawn(async move { scheduler.run().await });
1055    /// ```
1056    #[must_use]
1057    pub fn run_deadline_scheduler<F, Fut>(
1058        &self,
1059        dispatch: F,
1060        batch_size: usize,
1061        poll_interval: std::time::Duration,
1062    ) -> DeadlineScheduler<DS>
1063    where
1064        F: Fn(Deadline) -> Fut + Send + Sync + 'static,
1065        Fut: std::future::Future<Output = Result<(), EngineError>> + Send + 'static,
1066    {
1067        DeadlineScheduler {
1068            store: self.deadline_store.clone(),
1069            dispatch: Box::new(move |d| Box::pin(dispatch(d))),
1070            batch_size,
1071            poll_interval,
1072        }
1073    }
1074}
1075
1076// ── EngineBuilder ─────────────────────────────────────────────────────────────
1077
1078/// Assembles engine infrastructure and produces an [`EngineContext`].
1079///
1080/// Uses type-state to enforce that an event store is provided before
1081/// [`build`] can be called. All other stores default to `Noop`
1082/// implementations.
1083///
1084/// ## Quick start
1085///
1086/// ```rust,ignore
1087/// // Minimal — event store only, all others are Noop:
1088/// let ctx = EngineBuilder::new()
1089///     .with_event_store(InMemoryEventStore::new())
1090///     .build();
1091///
1092/// // Full infrastructure:
1093/// let ctx = EngineBuilder::new()
1094///     .with_event_store(InMemoryEventStore::new())
1095///     .with_snapshot_store(InMemorySnapshotStore::new())
1096///     .with_outbox_store(InMemoryOutboxStore::new())
1097///     .with_deadline_store(InMemoryDeadlineStore::new())
1098///     .with_registry(InMemoryProcessRegistry::new())
1099///     .register(Box::new(GpkeModule))
1100///     .build();
1101/// ```
1102///
1103/// [`build`]: EngineBuilder::build
1104pub struct EngineBuilder<
1105    ES = (),
1106    SS = NoopSnapshotStore,
1107    OS = NoopOutboxStore,
1108    DS = NoopDeadlineStore,
1109    PR = NoopProcessRegistry,
1110> {
1111    event_store: ES,
1112    snapshot_store: SS,
1113    outbox_store: OS,
1114    deadline_store: DS,
1115    registry: PR,
1116    dead_letter_sink: Arc<dyn DeadLetterSink>,
1117    modules: Vec<Box<dyn EngineModule>>,
1118    /// Active [`DeploymentRoles`] for this engine instance.
1119    ///
1120    /// Controls role-conditional PID registration via
1121    /// [`EngineModule::register_pids_with_roles`]. Defaults to
1122    /// [`DeploymentRoles::all()`] for backward compatibility.
1123    deployment_roles: DeploymentRoles,
1124    /// Optional profile validator injected by `makod` or callers that have
1125    /// access to `edi-energy`.  When `Some`, called for each
1126    /// [`ProfileRequirement`] declared by registered modules.  When `None`,
1127    /// profile requirements are not validated (safe in unit tests).
1128    ///
1129    /// Signature: `fn(message_type: &str) -> bool`
1130    ///
1131    /// [`ProfileRequirement`]: crate::profile::ProfileRequirement
1132    profile_validator: Option<Box<dyn Fn(&str) -> bool + Send + Sync>>,
1133}
1134#[cfg(any(test, feature = "testing"))]
1135impl Default
1136    for EngineBuilder<
1137        (),
1138        NoopSnapshotStore,
1139        NoopOutboxStore,
1140        NoopDeadlineStore,
1141        NoopProcessRegistry,
1142    >
1143{
1144    fn default() -> Self {
1145        Self {
1146            event_store: (),
1147            snapshot_store: NoopSnapshotStore,
1148            outbox_store: NoopOutboxStore,
1149            deadline_store: NoopDeadlineStore,
1150            registry: NoopProcessRegistry,
1151            dead_letter_sink: Arc::new(LogDeadLetterSink),
1152            modules: Vec::new(),
1153            deployment_roles: DeploymentRoles::all(),
1154            profile_validator: None,
1155        }
1156    }
1157}
1158
1159#[cfg(any(test, feature = "testing"))]
1160impl EngineBuilder {
1161    /// Create a new builder with all `Noop` defaults.
1162    ///
1163    /// Only available in `#[cfg(test)]` or with the `testing` feature enabled,
1164    /// because the Noop defaults silently discard outbox messages, deadlines,
1165    /// and process registry entries. Production binaries must wire real stores
1166    /// via the `with_*` builder methods.
1167    ///
1168    /// Call [`with_event_store`] before [`build`] — the event store is
1169    /// **required**.
1170    ///
1171    /// [`with_event_store`]: EngineBuilder::with_event_store
1172    /// [`build`]: EngineBuilder::build
1173    #[must_use]
1174    pub fn new() -> Self {
1175        Self::default()
1176    }
1177}
1178
1179impl<OS, DS, PR> EngineBuilder<(), NoopSnapshotStore, OS, DS, PR>
1180where
1181    OS: OutboxStore,
1182    DS: DeadlineStore,
1183    PR: ProcessRegistry,
1184{
1185    /// Create a production-ready builder with explicit stores for outbox,
1186    /// deadline, and process registry.
1187    ///
1188    /// This constructor is available in all build configurations including
1189    /// production binaries. It enforces that the three stores that can cause
1190    /// silent data loss (`OutboxStore`, `DeadlineStore`, `ProcessRegistry`)
1191    /// are provided explicitly — there is no Noop fallback.
1192    ///
1193    /// `NoopSnapshotStore` is used as the snapshot default because it is safe
1194    /// for production: skipping snapshots means full replay, but no data loss.
1195    /// Override with [`with_snapshot_store`] to enable snapshot-accelerated
1196    /// replay.
1197    ///
1198    /// Call [`with_event_store`] before [`build`] — the event store is
1199    /// **required**.
1200    ///
1201    /// ```rust,ignore
1202    /// let ctx = EngineBuilder::with_stores(outbox, deadline, registry)
1203    ///     .with_event_store(store.clone())
1204    ///     .with_snapshot_store(InMemorySnapshotStore::new())
1205    ///     .build();
1206    /// ```
1207    ///
1208    /// [`with_snapshot_store`]: EngineBuilder::with_snapshot_store
1209    /// [`with_event_store`]: EngineBuilder::with_event_store
1210    /// [`build`]: EngineBuilder::build
1211    #[must_use]
1212    pub fn with_stores(outbox_store: OS, deadline_store: DS, registry: PR) -> Self {
1213        Self {
1214            event_store: (),
1215            snapshot_store: NoopSnapshotStore,
1216            outbox_store,
1217            deadline_store,
1218            registry,
1219            dead_letter_sink: Arc::new(LogDeadLetterSink),
1220            modules: Vec::new(),
1221            deployment_roles: DeploymentRoles::all(),
1222            profile_validator: None,
1223        }
1224    }
1225}
1226
1227impl<ES, SS, OS, DS, PR> EngineBuilder<ES, SS, OS, DS, PR> {
1228    /// Set the event store. **Required** — `build()` is only available once
1229    /// this has been called with a type that implements [`EventStore`].
1230    ///
1231    /// Replaces any previously set event store (type-state transition).
1232    #[must_use]
1233    pub fn with_event_store<ES2: EventStore>(
1234        self,
1235        store: ES2,
1236    ) -> EngineBuilder<ES2, SS, OS, DS, PR> {
1237        EngineBuilder {
1238            event_store: store,
1239            snapshot_store: self.snapshot_store,
1240            outbox_store: self.outbox_store,
1241            deadline_store: self.deadline_store,
1242            registry: self.registry,
1243            dead_letter_sink: self.dead_letter_sink,
1244            modules: self.modules,
1245            deployment_roles: self.deployment_roles,
1246            profile_validator: self.profile_validator,
1247        }
1248    }
1249
1250    /// Set the snapshot store (default: [`NoopSnapshotStore`]).
1251    ///
1252    /// ## Default: `NoopSnapshotStore`
1253    ///
1254    /// Without calling this method the builder uses [`NoopSnapshotStore`],
1255    /// which silently discards all snapshot writes and returns `None` for
1256    /// every snapshot read.  The engine still functions correctly — every
1257    /// command handling call replays the full event log from the beginning
1258    /// instead of starting from a stored snapshot.  For low-volume processes
1259    /// this is fine; for long-lived processes with many events the replay cost
1260    /// can become significant.
1261    ///
1262    /// Enable snapshotting in production by providing a real [`SnapshotStore`]
1263    /// implementation (e.g. the SlateDB-backed store in `makod`).  In tests,
1264    /// [`InMemorySnapshotStore`][crate::snapshot::InMemorySnapshotStore] is
1265    /// available behind the `testing` feature flag.
1266    ///
1267    /// Note: [`Process::state_with_snapshot`][crate::process::Process::state_with_snapshot]
1268    /// is a compile-time no-op when the snapshot store is `NoopSnapshotStore`
1269    /// — it never calls the store and always returns `None`, so no snapshot is
1270    /// ever saved or loaded.
1271    #[must_use]
1272    pub fn with_snapshot_store<SS2: SnapshotStore>(
1273        self,
1274        store: SS2,
1275    ) -> EngineBuilder<ES, SS2, OS, DS, PR> {
1276        EngineBuilder {
1277            event_store: self.event_store,
1278            snapshot_store: store,
1279            outbox_store: self.outbox_store,
1280            deadline_store: self.deadline_store,
1281            registry: self.registry,
1282            dead_letter_sink: self.dead_letter_sink,
1283            modules: self.modules,
1284            deployment_roles: self.deployment_roles,
1285            profile_validator: self.profile_validator,
1286        }
1287    }
1288
1289    /// Set the outbox store (default: [`NoopOutboxStore`]).
1290    #[must_use]
1291    pub fn with_outbox_store<OS2: OutboxStore>(
1292        self,
1293        store: OS2,
1294    ) -> EngineBuilder<ES, SS, OS2, DS, PR> {
1295        EngineBuilder {
1296            event_store: self.event_store,
1297            snapshot_store: self.snapshot_store,
1298            outbox_store: store,
1299            deadline_store: self.deadline_store,
1300            registry: self.registry,
1301            dead_letter_sink: self.dead_letter_sink,
1302            modules: self.modules,
1303            deployment_roles: self.deployment_roles,
1304            profile_validator: self.profile_validator,
1305        }
1306    }
1307
1308    /// Set the deadline store (default: [`NoopDeadlineStore`]).
1309    #[must_use]
1310    pub fn with_deadline_store<DS2: DeadlineStore>(
1311        self,
1312        store: DS2,
1313    ) -> EngineBuilder<ES, SS, OS, DS2, PR> {
1314        EngineBuilder {
1315            event_store: self.event_store,
1316            snapshot_store: self.snapshot_store,
1317            outbox_store: self.outbox_store,
1318            deadline_store: store,
1319            registry: self.registry,
1320            dead_letter_sink: self.dead_letter_sink,
1321            modules: self.modules,
1322            deployment_roles: self.deployment_roles,
1323            profile_validator: self.profile_validator,
1324        }
1325    }
1326
1327    /// Set the process registry (default: [`NoopProcessRegistry`]).
1328    #[must_use]
1329    pub fn with_registry<PR2: ProcessRegistry>(
1330        self,
1331        registry: PR2,
1332    ) -> EngineBuilder<ES, SS, OS, DS, PR2> {
1333        EngineBuilder {
1334            event_store: self.event_store,
1335            snapshot_store: self.snapshot_store,
1336            outbox_store: self.outbox_store,
1337            deadline_store: self.deadline_store,
1338            registry,
1339            dead_letter_sink: self.dead_letter_sink,
1340            modules: self.modules,
1341            deployment_roles: self.deployment_roles,
1342            profile_validator: self.profile_validator,
1343        }
1344    }
1345
1346    /// Set the dead-letter sink (default: [`LogDeadLetterSink`]).
1347    ///
1348    /// The dead-letter sink receives every message that cannot be routed to a
1349    /// workflow. The default [`LogDeadLetterSink`] emits `tracing::warn!`
1350    /// events, making rejections visible in log output without configuration.
1351    ///
1352    /// Override with a persistent DLQ implementation in production:
1353    ///
1354    /// ```rust,ignore
1355    /// use mako_engine::dead_letter::LogDeadLetterSink;
1356    ///
1357    /// let ctx = EngineBuilder::new()
1358    ///     .with_event_store(my_store)
1359    ///     .with_dead_letter_sink(MyPersistentDlq::new())
1360    ///     .build();
1361    /// ```
1362    ///
1363    /// [`LogDeadLetterSink`]: crate::dead_letter::LogDeadLetterSink
1364    #[must_use]
1365    pub fn with_dead_letter_sink(mut self, sink: impl DeadLetterSink) -> Self {
1366        self.dead_letter_sink = Arc::new(sink);
1367        self
1368    }
1369
1370    /// Register an `edi-energy` profile validator for startup profile checks.
1371    ///
1372    /// The closure receives a message-type string (e.g. `"UTILMD"`) and must
1373    /// return `true` if at least one active profile for that message type is
1374    /// registered for today's date.
1375    ///
1376    /// Wire this in `makod` using the `edi-energy` global registry:
1377    ///
1378    /// ```rust,ignore
1379    /// use edi_energy::registry::ReleaseRegistry;
1380    ///
1381    /// let today = time::OffsetDateTime::now_utc().date();
1382    /// builder.with_profile_validator(move |msg_type| {
1383    ///     ReleaseRegistry::global()
1384    ///         .profiles_for_str(msg_type)
1385    ///         .any(|p| match (p.valid_from(), p.valid_until()) {
1386    ///             (Some(f), Some(u)) => f <= today && today <= u,
1387    ///             (Some(f), None)    => f <= today,
1388    ///             (None, _)          => true,
1389    ///         })
1390    /// })
1391    /// ```
1392    ///
1393    /// Domain crates do **not** need to call this — they only declare
1394    /// [`profile_requirements`].
1395    ///
1396    /// [`profile_requirements`]: EngineModule::profile_requirements
1397    #[must_use]
1398    pub fn with_profile_validator(
1399        mut self,
1400        validator: impl Fn(&str) -> bool + Send + Sync + 'static,
1401    ) -> Self {
1402        self.profile_validator = Some(Box::new(validator));
1403        self
1404    }
1405
1406    /// Register a domain module.
1407    ///
1408    /// The module name becomes visible in
1409    /// [`EngineContext::registered_modules`] after [`build`] is called.
1410    ///
1411    /// [`build`]: EngineBuilder::build
1412    #[must_use]
1413    pub fn register(mut self, module: Box<dyn EngineModule>) -> Self {
1414        self.modules.push(module);
1415        self
1416    }
1417
1418    /// Set the active [`DeploymentRoles`] for this engine instance.
1419    ///
1420    /// Controls role-conditional PID registration in [`EngineModule::register_pids_with_roles`].
1421    ///
1422    /// The default is [`DeploymentRoles::all()`], which registers every PID unconditionally
1423    /// — identical to the pre-role-aware behavior. Providing an explicit role set
1424    /// restricts role-conditional blocks to only the declared roles:
1425    ///
1426    /// - **NB-only** (`DeploymentRoles::nb()`): 19001/19002 route to `gpke-konfiguration`;
1427    ///   WiM nMSB blocks are skipped.
1428    /// - **nMSB-only** (`DeploymentRoles::nmsb()`): 19001/19002 route to `wim-geraeteubernahme`;
1429    ///   GPKE NB blocks are skipped.
1430    /// - **NB + gMSB** (`DeploymentRoles::nb_msb()`): most common Stadtwerke combination.
1431    ///
1432    /// # Conflict guard
1433    ///
1434    /// When two modules would register the same PID to **different** workflows, the
1435    /// engine panics during [`build`]. Set explicit roles to prevent both modules from
1436    /// activating the same PID simultaneously:
1437    ///
1438    /// ```rust,ignore
1439    /// use mako_engine::marktrolle::DeploymentRoles;
1440    ///
1441    /// let ctx = EngineBuilder::with_stores(outbox, deadline, registry)
1442    ///     .with_event_store(store)
1443    ///     .with_deployment_roles(DeploymentRoles::nb())  // only NB: GPKE gets 19001/19002
1444    ///     .register(Box::new(GpkeModule))
1445    ///     .register(Box::new(WimModule))  // nMSB block skipped — no conflict
1446    ///     .build();
1447    /// ```
1448    ///
1449    /// [`build`]: EngineBuilder::build
1450    #[must_use]
1451    pub fn with_deployment_roles(mut self, roles: DeploymentRoles) -> Self {
1452        self.deployment_roles = roles;
1453        self
1454    }
1455}
1456
1457impl<ES, SS, OS, DS, PR> EngineBuilder<ES, SS, OS, DS, PR>
1458where
1459    ES: EventStore,
1460    SS: SnapshotStore,
1461    OS: OutboxStore,
1462    DS: DeadlineStore,
1463    PR: ProcessRegistry,
1464{
1465    /// Build the [`EngineContext`].
1466    ///
1467    /// Consumes the builder. All registered modules and configured stores are
1468    /// moved into the returned [`EngineContext`].
1469    ///
1470    /// This method is only available when `ES` implements [`EventStore`].
1471    /// If you have not called [`with_event_store`], this will not compile.
1472    ///
1473    /// # Panics
1474    ///
1475    /// Panics when any registered module returns `Err` from
1476    /// [`EngineModule::configure`]. The panic message includes the module
1477    /// name and the error string so the deployment failure is actionable.
1478    ///
1479    /// [`with_event_store`]: EngineBuilder::with_event_store
1480    #[must_use]
1481    #[allow(clippy::too_many_lines)]
1482    pub fn build(self) -> EngineContext<ES, SS, OS, DS, PR> {
1483        // ── Noop store safety checks ──────────────────────────────────────────
1484        //
1485        // Noop stores lose data silently: NoopDeadlineStore drops every APERAK
1486        // deadline (BNetzA violation), NoopOutboxStore discards all outbound
1487        // messages, NoopProcessRegistry loses conversation routing on restart.
1488        //
1489        // In production builds (no `testing` feature, not running under
1490        // `#[test]`), the Noop constructors are cfg-gated out so this branch
1491        // is dead code and compiles away. In test/testing/tracing builds we
1492        // emit warnings so test harnesses see the configuration in log output.
1493        //
1494        // IMPORTANT: if you are reading this because a panic fired in production,
1495        // it means the `testing` feature was accidentally enabled in the binary.
1496        // Remove it from the production Cargo.toml feature list immediately.
1497        {
1498            let os_name = std::any::type_name::<OS>();
1499            let ds_name = std::any::type_name::<DS>();
1500            let pr_name = std::any::type_name::<PR>();
1501
1502            // Regulatory-critical stores: panic in any build context if these
1503            // are noop. OutboxStore and DeadlineStore must be durable in
1504            // production; ProcessRegistry must survive restarts.
1505            #[cfg(not(any(test, feature = "testing")))]
1506            {
1507                if ds_name.contains("NoopDeadlineStore") {
1508                    panic!(
1509                        "EngineBuilder::build: NoopDeadlineStore is active in a \
1510                         non-testing build. This silently discards all APERAK deadlines, \
1511                         which is an immediately reportable BNetzA violation \
1512                         (BK6-22-024 §5, BK7-24-01-009). \
1513                         Call .with_deadline_store(SlateDbStore::as_deadline_store()) \
1514                         in your production engine assembly. \
1515                         If this is a test, enable the 'testing' feature."
1516                    );
1517                }
1518                if os_name.contains("NoopOutboxStore") {
1519                    panic!(
1520                        "EngineBuilder::build: NoopOutboxStore is active in a \
1521                         non-testing build. This silently discards all outbound \
1522                         APERAK, CONTRL, and UTILMD messages. \
1523                         Call .with_outbox_store(SlateDbStore::as_outbox_store()) \
1524                         in your production engine assembly. \
1525                         If this is a test, enable the 'testing' feature."
1526                    );
1527                }
1528                if pr_name.contains("NoopProcessRegistry") {
1529                    panic!(
1530                        "EngineBuilder::build: NoopProcessRegistry is active in a \
1531                         non-testing build. This means conversation routing \
1532                         (PID → stream_id lookup) is lost on every restart, \
1533                         breaking all WiM, GeLi Gas, and GPKE in-flight processes. \
1534                         Call .with_registry(SlateDbStore::as_process_registry()) \
1535                         in your production engine assembly. \
1536                         If this is a test, enable the 'testing' feature."
1537                    );
1538                }
1539            }
1540
1541            // In test/testing/tracing builds: emit warnings instead of panicking.
1542            #[cfg(any(test, feature = "testing", feature = "tracing"))]
1543            {
1544                let ss_name = std::any::type_name::<SS>();
1545                if ss_name.contains("NoopSnapshotStore") {
1546                    tracing::warn!(
1547                        store = ss_name,
1548                        "EngineBuilder: NoopSnapshotStore is active — snapshots will not be \
1549                         persisted. Use SlateDbStore::as_snapshot_store() in production."
1550                    );
1551                }
1552                if os_name.contains("NoopOutboxStore") {
1553                    tracing::warn!(
1554                        store = os_name,
1555                        "EngineBuilder: NoopOutboxStore is active — outbound messages will be \
1556                         silently discarded. Use SlateDbStore::as_outbox_store() in production."
1557                    );
1558                }
1559                if ds_name.contains("NoopDeadlineStore") {
1560                    tracing::warn!(
1561                        store = ds_name,
1562                        "EngineBuilder: NoopDeadlineStore is active — scheduled deadlines will \
1563                         not fire after restart. Use SlateDbStore::as_deadline_store() in production."
1564                    );
1565                }
1566                if pr_name.contains("NoopProcessRegistry") {
1567                    tracing::warn!(
1568                        store = pr_name,
1569                        "EngineBuilder: NoopProcessRegistry is active — process routing will be \
1570                         lost on restart. Use SlateDbStore::as_process_registry() in production."
1571                    );
1572                }
1573            }
1574        }
1575        // Validate every module before assembling the context.
1576        // A missing adapter or misconfigured module fails at startup (not at
1577        // first inbound message), making deployment failures observable immediately.
1578        for module in &self.modules {
1579            if let Err(msg) = module.configure() {
1580                panic!(
1581                    "EngineBuilder::build: module '{}' failed configuration validation: {}",
1582                    module.name(),
1583                    msg
1584                );
1585            }
1586            // Validate profile requirements via the injected validator.
1587            // Domain crates declare requirements; only the binary crate (makod)
1588            // injects the edi-energy registry — domain crates need no edi-energy
1589            // import for this check.
1590            if let Some(ref validator) = self.profile_validator {
1591                for req in module.profile_requirements() {
1592                    assert!(
1593                        validator(req.message_type),
1594                        "EngineBuilder::build: module '{}' requires an active edi-energy \
1595                             profile for '{}' ({}) but none is registered for today's date. \
1596                             Run `cargo xtask codegen` to add the missing profile.",
1597                        module.name(),
1598                        req.message_type,
1599                        req.label,
1600                    );
1601                }
1602            }
1603        }
1604        // Build the PID router from all registered modules.
1605        // Also assert that no two modules claim the same PID — a PID overlap
1606        // is always a configuration error: one module's messages would be
1607        // silently swallowed by another's workflow, producing missing-process
1608        // errors or incorrect audit trails.
1609        let mut pid_router = PidRouter::new();
1610        let mut pid_owners: std::collections::HashMap<u32, &str> = std::collections::HashMap::new();
1611        for module in &self.modules {
1612            // Temporarily build a scratch router to read this module's PIDs
1613            // for cross-module overlap detection (module-ownership level).
1614            let mut scratch = PidRouter::new();
1615            module.register_pids_with_roles(&mut scratch, &self.deployment_roles);
1616            for pid in scratch.registered_pids() {
1617                if let Some(prev) = pid_owners.insert(pid, module.name()) {
1618                    if self.deployment_roles.is_all() {
1619                        // With DeploymentRoles::all() (the default), role-conditional PIDs
1620                        // are registered by all modules that claim them, producing last-wins
1621                        // semantics. This is acceptable for single-role and dev/test deployments.
1622                        //
1623                        // In production multi-role deployments where both an NB and nMSB role
1624                        // are served by the same instance, set explicit roles via
1625                        // `EngineBuilder::with_deployment_roles` to prevent silent misrouting.
1626                        //
1627                        // We emit a debug-level log here (not warn) because the vast majority
1628                        // of deployments are single-role and this overlap is expected/harmless.
1629                        #[cfg(feature = "tracing")]
1630                        tracing::debug!(
1631                            pid,
1632                            previous_module = prev,
1633                            current_module = module.name(),
1634                            "PID registered by multiple modules with DeploymentRoles::all(); \
1635                             last module wins (use with_deployment_roles for strict routing)",
1636                        );
1637                        let _ = prev; // suppress unused-variable warning when tracing is off
1638                    } else {
1639                        panic!(
1640                            "EngineBuilder::build: PID {pid} is claimed by both \
1641                             '{prev}' and '{}' — overlapping PID registrations are \
1642                             not allowed with explicit DeploymentRoles; each PID must be \
1643                             owned by exactly one module.\n  \
1644                             Hint: use EngineBuilder::with_deployment_roles to restrict \
1645                             role-conditional PIDs so only one module registers each PID.\n  \
1646                             Example: with_deployment_roles(DeploymentRoles::nb()) ensures \
1647                             GPKE registers ORDRSP 19001/19002, not WiM.",
1648                            module.name(),
1649                        );
1650                    }
1651                }
1652            }
1653            // Register into the real router with conflict detection.
1654            module.register_pids_with_roles(&mut pid_router, &self.deployment_roles);
1655        }
1656        let registered_modules = self.modules.iter().map(|m| m.name()).collect();
1657        let registered_workflows = self
1658            .modules
1659            .iter()
1660            .flat_map(|m| m.workflow_names().iter().copied())
1661            .collect();
1662        EngineContext {
1663            event_store: Arc::new(self.event_store),
1664            snapshot_store: self.snapshot_store,
1665            outbox_store: self.outbox_store,
1666            deadline_store: self.deadline_store,
1667            registry: self.registry,
1668            dead_letter_sink: self.dead_letter_sink,
1669            pid_router,
1670            registered_modules,
1671            registered_workflows,
1672        }
1673    }
1674}
1675
1676#[cfg(test)]
1677mod tests {
1678    use super::*;
1679    use crate::{
1680        deadline::InMemoryDeadlineStore,
1681        error::WorkflowError,
1682        event_store::InMemoryEventStore,
1683        ids::TenantId,
1684        outbox::InMemoryOutboxStore,
1685        pid_router::PidRouter,
1686        registry::InMemoryProcessRegistry,
1687        snapshot::InMemorySnapshotStore,
1688        version::WorkflowId,
1689        workflow::{CommandPayload, EventPayload, Workflow},
1690    };
1691
1692    // ── Minimal workflow for spawn/resume tests ───────────────────────────────
1693
1694    #[derive(serde::Serialize, serde::Deserialize)]
1695    struct PingEvent;
1696
1697    impl EventPayload for PingEvent {
1698        fn event_type(&self) -> &'static str {
1699            "Ping"
1700        }
1701    }
1702
1703    struct PingCommand;
1704
1705    impl CommandPayload for PingCommand {}
1706
1707    #[derive(Default, Clone)]
1708    struct PingState;
1709
1710    struct PingWorkflow;
1711
1712    impl Workflow for PingWorkflow {
1713        type State = PingState;
1714        type Event = PingEvent;
1715        type Command = PingCommand;
1716
1717        fn apply(state: PingState, _: &PingEvent) -> PingState {
1718            state
1719        }
1720
1721        fn handle(
1722            _: &PingState,
1723            _: PingCommand,
1724        ) -> Result<crate::workflow::WorkflowOutput<PingEvent>, WorkflowError> {
1725            Ok(vec![PingEvent].into())
1726        }
1727    }
1728
1729    struct TestModule;
1730
1731    impl EngineModule for TestModule {
1732        fn name(&self) -> &'static str {
1733            "test-module"
1734        }
1735    }
1736
1737    // ── Tests ─────────────────────────────────────────────────────────────────
1738
1739    #[test]
1740    fn build_with_event_store_only() {
1741        let ctx = EngineBuilder::new()
1742            .with_event_store(InMemoryEventStore::new())
1743            .build();
1744        assert!(ctx.registered_modules().is_empty());
1745    }
1746
1747    #[test]
1748    fn build_with_all_stores_and_module() {
1749        let ctx = EngineBuilder::new()
1750            .with_event_store(InMemoryEventStore::new())
1751            .with_snapshot_store(InMemorySnapshotStore::new())
1752            .with_outbox_store(InMemoryOutboxStore::new())
1753            .with_deadline_store(InMemoryDeadlineStore::new())
1754            .with_registry(InMemoryProcessRegistry::new())
1755            .register(Box::new(TestModule))
1756            .build();
1757        assert_eq!(ctx.registered_modules(), &["test-module"]);
1758    }
1759
1760    #[test]
1761    fn multiple_modules_ordered() {
1762        struct ModA;
1763        impl EngineModule for ModA {
1764            fn name(&self) -> &'static str {
1765                "mod-a"
1766            }
1767        }
1768        struct ModB;
1769        impl EngineModule for ModB {
1770            fn name(&self) -> &'static str {
1771                "mod-b"
1772            }
1773        }
1774
1775        let ctx = EngineBuilder::new()
1776            .with_event_store(InMemoryEventStore::new())
1777            .register(Box::new(ModA))
1778            .register(Box::new(ModB))
1779            .build();
1780        assert_eq!(ctx.registered_modules(), &["mod-a", "mod-b"]);
1781    }
1782
1783    #[tokio::test]
1784    async fn spawn_creates_independent_processes() {
1785        let ctx = EngineBuilder::new()
1786            .with_event_store(InMemoryEventStore::new())
1787            .build();
1788        let wf_id = WorkflowId::new("ping", "FV2024-10-01");
1789
1790        let p1 = ctx.spawn::<PingWorkflow>(TenantId::new(), wf_id.clone());
1791        let p2 = ctx.spawn::<PingWorkflow>(TenantId::new(), wf_id);
1792
1793        assert_ne!(p1.process_id(), p2.process_id());
1794    }
1795
1796    #[tokio::test]
1797    async fn resume_sees_previously_appended_events() {
1798        let store = InMemoryEventStore::new();
1799        let ctx = EngineBuilder::new().with_event_store(store).build();
1800
1801        let p = ctx.spawn::<PingWorkflow>(TenantId::new(), WorkflowId::new("ping", "FV2024-10-01"));
1802        p.execute(PingCommand).await.unwrap();
1803
1804        let identity = p.identity();
1805        let resumed = ctx.resume::<PingWorkflow>(identity);
1806        assert_eq!(resumed.event_count().await.unwrap(), 1);
1807    }
1808
1809    #[tokio::test]
1810    async fn registry_routes_process_via_conversation_key() {
1811        use crate::registry::RegistryKey;
1812        let ctx = EngineBuilder::new()
1813            .with_event_store(InMemoryEventStore::new())
1814            .with_registry(InMemoryProcessRegistry::new())
1815            .build();
1816
1817        let p = ctx.spawn::<PingWorkflow>(TenantId::new(), WorkflowId::new("ping", "FV2024-10-01"));
1818        let tenant = p.tenant_id();
1819        let conv_key = RegistryKey::parse("conv:test-conversation-123").expect("valid key");
1820        ctx.registry()
1821            .register(tenant, &conv_key, p.identity())
1822            .await
1823            .unwrap();
1824
1825        let found = ctx
1826            .registry()
1827            .lookup(tenant, &conv_key)
1828            .await
1829            .unwrap()
1830            .expect("must be registered");
1831        let resumed = ctx.resume::<PingWorkflow>(found);
1832        assert_eq!(resumed.process_id(), p.process_id());
1833    }
1834
1835    #[test]
1836    fn pid_router_populated_by_module_register_pids() {
1837        struct PidModule;
1838        impl EngineModule for PidModule {
1839            fn name(&self) -> &'static str {
1840                "pid-module"
1841            }
1842            fn register_pids(&self, router: &mut PidRouter) {
1843                router.register(55001, "gpke-supplier-change");
1844                router.register(55002, "gpke-supplier-change");
1845            }
1846        }
1847
1848        let ctx = EngineBuilder::new()
1849            .with_event_store(InMemoryEventStore::new())
1850            .register(Box::new(PidModule))
1851            .build();
1852
1853        assert_eq!(ctx.pid_router().route(55001), Some("gpke-supplier-change"));
1854        assert_eq!(ctx.pid_router().route(55002), Some("gpke-supplier-change"));
1855        assert!(ctx.pid_router().route(99999).is_none());
1856        assert_eq!(ctx.pid_router().len(), 2);
1857    }
1858
1859    /// Verify that `register_pids_with_roles` gates PIDs behind role checks.
1860    ///
1861    /// Scenario: two modules share PID 19001.
1862    /// - ModuleA registers 19001 → "workflow-a" when role `Nb` is present.
1863    /// - ModuleB registers 19001 → "workflow-b" when role `Nmsb` is explicitly set
1864    ///   (not on `all()`).
1865    ///
1866    /// - `all()`: ModuleA fires (Nb ∈ all), ModuleB does NOT (is_all → skip).
1867    ///   → 19001 routes to "workflow-a".
1868    /// - `from_roles([Nb])`: ModuleA fires, ModuleB skips.
1869    ///   → 19001 routes to "workflow-a".
1870    /// - `from_roles([Nmsb])`: ModuleA skips, ModuleB fires.
1871    ///   → 19001 routes to "workflow-b".
1872    #[test]
1873    fn register_pids_with_roles_gates_pids_correctly() {
1874        use crate::marktrolle::{DeploymentRoles, Marktrolle};
1875
1876        struct ModuleA;
1877        impl EngineModule for ModuleA {
1878            fn name(&self) -> &'static str {
1879                "module-a"
1880            }
1881            fn register_pids_with_roles(&self, router: &mut PidRouter, roles: &DeploymentRoles) {
1882                if roles.contains(Marktrolle::Nb) {
1883                    router.register(19_001, "workflow-a");
1884                }
1885            }
1886        }
1887
1888        struct ModuleB;
1889        impl EngineModule for ModuleB {
1890            fn name(&self) -> &'static str {
1891                "module-b"
1892            }
1893            fn register_pids_with_roles(&self, router: &mut PidRouter, roles: &DeploymentRoles) {
1894                // Only fires on explicit Nmsb, not on all() (backward-compat sentinel).
1895                if !roles.is_all() && roles.contains(Marktrolle::Nmsb) {
1896                    router.register(19_001, "workflow-b");
1897                    router.register(19_015, "workflow-b");
1898                }
1899            }
1900        }
1901
1902        let build = |roles: DeploymentRoles| {
1903            EngineBuilder::new()
1904                .with_event_store(InMemoryEventStore::new())
1905                .with_deployment_roles(roles)
1906                .register(Box::new(ModuleA))
1907                .register(Box::new(ModuleB))
1908                .build()
1909        };
1910
1911        // all() → backward compat: ModuleA registers 19001 (Nb ∈ all), ModuleB skips.
1912        let ctx = build(DeploymentRoles::all());
1913        assert_eq!(ctx.pid_router().route(19_001), Some("workflow-a"));
1914        assert!(ctx.pid_router().route(19_015).is_none());
1915
1916        // Explicit Nb → same result: ModuleA registers, ModuleB (nMSB) skips.
1917        let ctx = build(DeploymentRoles::nb());
1918        assert_eq!(ctx.pid_router().route(19_001), Some("workflow-a"));
1919        assert!(ctx.pid_router().route(19_015).is_none());
1920
1921        // Explicit Nmsb → ModuleA skips (Nb ∉ roles), ModuleB registers.
1922        let ctx = build(DeploymentRoles::nmsb());
1923        assert_eq!(ctx.pid_router().route(19_001), Some("workflow-b"));
1924        assert_eq!(ctx.pid_router().route(19_015), Some("workflow-b"));
1925    }
1926
1927    /// Verify that explicit roles with two conflicting modules panic at build time.
1928    #[test]
1929    #[should_panic(expected = "overlapping PID registrations")]
1930    fn register_pids_with_roles_conflict_panics_with_explicit_roles() {
1931        use crate::marktrolle::{DeploymentRoles, Marktrolle};
1932
1933        struct ConflictA;
1934        impl EngineModule for ConflictA {
1935            fn name(&self) -> &'static str {
1936                "conflict-a"
1937            }
1938            fn register_pids_with_roles(&self, router: &mut PidRouter, roles: &DeploymentRoles) {
1939                if roles.contains(Marktrolle::Nb) {
1940                    router.register(19_001, "workflow-a");
1941                }
1942            }
1943        }
1944
1945        struct ConflictB;
1946        impl EngineModule for ConflictB {
1947            fn name(&self) -> &'static str {
1948                "conflict-b"
1949            }
1950            fn register_pids_with_roles(&self, router: &mut PidRouter, roles: &DeploymentRoles) {
1951                if !roles.is_all() && roles.contains(Marktrolle::Nmsb) {
1952                    router.register(19_001, "workflow-b"); // same PID, different workflow
1953                }
1954            }
1955        }
1956
1957        // from_roles([Nb, Nmsb]): both modules fire → panic (overlapping PIDs).
1958        let _ = EngineBuilder::new()
1959            .with_event_store(InMemoryEventStore::new())
1960            .with_deployment_roles(DeploymentRoles::from_roles([
1961                Marktrolle::Nb,
1962                Marktrolle::Nmsb,
1963            ]))
1964            .register(Box::new(ConflictA))
1965            .register(Box::new(ConflictB))
1966            .build();
1967    }
1968}