Skip to main content

mako_engine/
builder.rs

1//! [`EngineModule`] trait, [`EngineBuilder`], and [`EngineContext`].
2//!
3//! # Summary
4//!
5//! `EngineBuilder` assembles all engine infrastructure into a single
6//! [`EngineContext`] value. Domain modules (GPKE, WiM, GeLi Gas, …) register
7//! themselves at startup via the [`EngineModule`] trait, making their names
8//! visible in diagnostics and health checks.
9//!
10//! # Type-state guarantee
11//!
12//! [`EngineBuilder::build`] is only available when the event store type
13//! parameter `ES` implements [`EventStore`]. Forgetting to call
14//! [`with_event_store`] is a **compile-time error**, not a runtime panic.
15//!
16//! All other stores default to their respective `Noop` implementations:
17//!
18//! | Store | Default |
19//! |-------|---------|
20//! | Snapshot store | [`NoopSnapshotStore`] |
21//! | Outbox store | [`NoopOutboxStore`] |
22//! | Deadline store | [`NoopDeadlineStore`] |
23//! | Process registry | [`NoopProcessRegistry`] |
24//!
25//! # Assembly example
26//!
27//! ```rust,ignore
28//! use mako_engine::builder::{EngineBuilder, EngineModule};
29//! use mako_engine::event_store::InMemoryEventStore;
30//! use mako_engine::outbox::InMemoryOutboxStore;
31//! use mako_engine::deadline::InMemoryDeadlineStore;
32//! use mako_engine::registry::InMemoryProcessRegistry;
33//! use mako_engine::snapshot::InMemorySnapshotStore;
34//!
35//! struct GpkeModule;
36//! impl EngineModule for GpkeModule { fn name(&self) -> &'static str { "gpke" } }
37//!
38//! let ctx = EngineBuilder::new()
39//!     .with_event_store(InMemoryEventStore::new())
40//!     .with_snapshot_store(InMemorySnapshotStore::new())
41//!     .with_outbox_store(InMemoryOutboxStore::new())
42//!     .with_deadline_store(InMemoryDeadlineStore::new())
43//!     .with_registry(InMemoryProcessRegistry::new())
44//!     .register(Box::new(GpkeModule))
45//!     .build();
46//!
47//! // Spawn a fresh process:
48//! let p = ctx.spawn::<SupplierChangeWorkflow>(tenant_id, workflow_id);
49//! p.execute(ReceiveUtilmd { .. }).await?;
50//!
51//! // Resume an existing process from a persisted identity:
52//! let identity = ctx.registry.lookup(&conv_id.to_string()).await?.unwrap();
53//! let p = ctx.resume::<SupplierChangeWorkflow>(identity);
54//!
55//! // Access stores for delivery workers / schedulers:
56//! let pending = ctx.outbox_store.pending_now(50).await?;
57//! let overdue = ctx.deadline_store.due_now(50).await?;
58//! ```
59//!
60//! [`with_event_store`]: EngineBuilder::with_event_store
61
62// Type-state generics can produce long signatures that trip up the
63// `type_complexity` lint; suppress it for this module only.
64#![allow(clippy::type_complexity)]
65
66use crate::{
67    dead_letter::{DeadLetterSink, LogDeadLetterSink},
68    deadline::{Deadline, DeadlineStore, NoopDeadlineStore},
69    error::EngineError,
70    event_store::EventStore,
71    ids::{ProcessIdentity, TenantId},
72    marktrolle::DeploymentRoles,
73    outbox::{NoopOutboxStore, OutboxMessage, OutboxStore},
74    pid_router::PidRouter,
75    process::Process,
76    registry::{NoopProcessRegistry, ProcessRegistry},
77    snapshot::{NoopSnapshotStore, SnapshotStore},
78    version::WorkflowId,
79    workflow::Workflow,
80};
81
82use std::sync::Arc;
83
84// ── EngineModule ──────────────────────────────────────────────────────────────
85
86/// A self-contained domain module that registers with the engine at startup.
87///
88/// Domain crates implement this trait to declare their presence in the engine.
89/// The module name is surfaced in [`EngineContext::registered_modules`] for
90/// diagnostics, health checks, and log output.
91///
92/// ## Startup validation
93///
94/// Override [`configure`] to perform adapter coverage checks at engine startup
95/// time. The engine calls [`configure`] for every registered module during
96/// [`EngineBuilder::build`] and panics with an actionable message if any
97/// module returns `Err`. This surfaces missing adapter registrations as a
98/// startup failure rather than a silent runtime error.
99///
100/// ## Example
101///
102/// ```rust,ignore
103/// pub struct GpkeModule;
104///
105/// impl EngineModule for GpkeModule {
106///     fn name(&self) -> &'static str { "gpke" }
107///
108///     fn configure(&self) -> Result<(), String> {
109///         // Validate that every known BDEW format version has an adapter:
110///         GPKE_ADAPTER_REGISTRY
111///             .validate_policy(&GpkeWorkflow::version_policy(), &KNOWN_FVS)
112///             .map_err(|uncovered| format!(
113///                 "gpke: missing adapters for format versions: {:?}",
114///                 uncovered
115///             ))
116///     }
117/// }
118///
119/// let ctx = EngineBuilder::new()
120///     .with_event_store(my_store)
121///     .register(Box::new(GpkeModule))
122///     .build(); // panics if GpkeModule::configure returns Err
123///
124/// assert_eq!(ctx.registered_modules(), &["gpke"]);
125/// ```
126///
127/// [`configure`]: EngineModule::configure
128pub trait EngineModule: Send + 'static {
129    /// Stable, unique name for this domain module.
130    ///
131    /// Used in diagnostics, health checks, and structured log output.
132    /// Choose a short lowercase identifier (e.g. `"gpke"`, `"wim"`,
133    /// `"geli"`).
134    fn name(&self) -> &'static str;
135
136    /// Register all PIDs this module handles into the shared [`PidRouter`].
137    ///
138    /// # Mutability contract
139    ///
140    /// This method is called **exactly once** by [`EngineBuilder::build`],
141    /// before the resulting [`EngineContext`] is handed to the caller. The
142    /// `&mut PidRouter` reference is only available here, at build time.
143    /// After `build` returns the router is **sealed** — the engine provides
144    /// only a shared `&PidRouter` reference, with no mutation path at runtime.
145    ///
146    /// Consequence: **all PIDs a module will ever need must be registered
147    /// here**. Do not attempt to register PIDs lazily from async handlers or
148    /// after the engine has started — there is no API for that by design.
149    ///
150    /// Duplicate registrations (same PID from two modules) silently overwrite
151    /// the previous mapping; the last module to register wins. Use
152    /// `cargo xtask validate-pruefids` to catch accidental PID conflicts
153    /// between modules before they reach production.
154    ///
155    /// For role-conditional registration (PIDs that should only be active for
156    /// specific BDEW Marktrollen), override [`register_pids_with_roles`] instead.
157    ///
158    /// # Example
159    ///
160    /// ```rust,ignore
161    /// fn register_pids(&self, router: &mut PidRouter) {
162    ///     // GPKE Lieferantenwechsel / Lieferbeginn (BK6-22-024, PIDs 55001, 55002, 55017)
163    ///     for &pid in &[55001_u32, 55002, 55017] {
164    ///         router.register(pid, "gpke-supplier-change");
165    ///     }
166    /// }
167    /// ```
168    ///
169    /// [`register_pids_with_roles`]: EngineModule::register_pids_with_roles
170    fn register_pids(&self, _router: &mut PidRouter) {}
171
172    /// Register PIDs with role-context awareness.
173    ///
174    /// This is the **preferred override** for modules that have role-conditional
175    /// PID registrations — PIDs that should only be active when this `makod`
176    /// instance holds a specific [`Marktrolle`].
177    ///
178    /// The default implementation calls [`register_pids`] (role-agnostic) so
179    /// existing modules that override `register_pids` continue to work without
180    /// changes.
181    ///
182    /// Override this method instead of `register_pids` when any PID registration
183    /// should be conditional on the deployment role:
184    ///
185    /// ```rust,ignore
186    /// use mako_engine::marktrolle::Marktrolle;
187    ///
188    /// fn register_pids_with_roles(&self, router: &mut PidRouter, roles: &DeploymentRoles) {
189    ///     // Always register: 55001, 55002 (not role-specific)
190    ///     for pid in [55001_u32, 55002] { router.register_with_module(pid, "gpke-supplier-change", self.name()); }
191    ///
192    ///     // Only when NB role: 19001/19002 inbound ORDRSP from MSB
193    ///     if roles.contains(Marktrolle::Nb) {
194    ///         for pid in [19001_u32, 19002] { router.register_with_module(pid, "gpke-konfiguration", self.name()); }
195    ///     }
196    /// }
197    /// ```
198    ///
199    /// # Conflict guard
200    ///
201    /// Use [`PidRouter::register_with_module`] (not `register`) inside this
202    /// method. The conflict guard panics at build time if two modules register
203    /// the same PID to different workflows — this makes role misconfigurations
204    /// visible at startup rather than silently misrouting messages.
205    ///
206    /// [`Marktrolle`]: crate::marktrolle::Marktrolle
207    /// [`register_pids`]: EngineModule::register_pids
208    fn register_pids_with_roles(&self, router: &mut PidRouter, _roles: &DeploymentRoles) {
209        self.register_pids(router);
210    }
211
212    /// Workflow names this module handles for deadline dispatch.
213    ///
214    /// Return the same name strings that [`register_pids`] maps PIDs to.
215    /// These names are stored in [`EngineContext::registered_workflows`] and
216    /// used to validate that every workflow that has deadlines scheduled is
217    /// covered by the deadline scheduler dispatch function at runtime.
218    ///
219    /// The default implementation returns an empty slice. Override it to
220    /// declare all workflow names that may fire deadlines:
221    ///
222    /// ```rust,ignore
223    /// fn workflow_names(&self) -> &'static [&'static str] {
224    ///     &["gpke-supplier-change", "gpke-abrechnung"]
225    /// }
226    /// ```
227    ///
228    /// [`register_pids`]: EngineModule::register_pids
229    /// [`EngineContext::registered_workflows`]: crate::builder::EngineContext::registered_workflows
230    fn workflow_names(&self) -> &'static [&'static str] {
231        &[]
232    }
233
234    /// Declare the EDIFACT profile types this module requires at runtime.
235    ///
236    /// Returning a non-empty slice causes [`EngineBuilder::build`] to call the
237    /// registered profile validator for each requirement.  If no active profile
238    /// exists for a required message type, `build` panics with an actionable
239    /// error so deployment fails fast rather than silently.
240    ///
241    /// **This replaces the previous pattern** of calling
242    /// `edi_energy::registry::ReleaseRegistry::global()` inside `configure()`.
243    /// Domain crates no longer need `edi-energy` in their production
244    /// `[dependencies]` — they just declare their requirements here.
245    ///
246    /// ```rust,ignore
247    /// fn profile_requirements(&self) -> &'static [ProfileRequirement] {
248    ///     &[
249    ///         ProfileRequirement { message_type: "UTILMD", label: "UTILMD Strom (GPKE)" },
250    ///         ProfileRequirement { message_type: "INVOIC", label: "INVOIC Abrechnung (GPKE)" },
251    ///     ]
252    /// }
253    /// ```
254    ///
255    /// [`ProfileRequirement`]: crate::profile::ProfileRequirement
256    fn profile_requirements(&self) -> &'static [crate::profile::ProfileRequirement] {
257        &[]
258    }
259
260    /// Validate adapter coverage and configuration at engine startup.
261    ///
262    /// Called by [`EngineBuilder::build`] after all modules are registered.
263    /// Return `Ok(())` when the module is fully configured. Return `Err(msg)`
264    /// with an actionable description when an adapter or configuration is
265    /// missing — the engine will panic with that message so the deployment
266    /// fails early rather than silently.
267    ///
268    /// The default implementation is a no-op (always returns `Ok(())`).
269    /// Override it in domain crates to call
270    /// [`AdapterRegistry::validate_policy`] and emit structured errors.
271    ///
272    /// Note: if your validation needs access to the edi-energy profile
273    /// registry, use [`profile_requirements`] instead — it does not require
274    /// importing `edi-energy` in domain crates.
275    ///
276    /// [`AdapterRegistry::validate_policy`]: crate::message_adapter::AdapterRegistry::validate_policy
277    /// [`profile_requirements`]: EngineModule::profile_requirements
278    ///
279    /// # Errors
280    ///
281    /// Returns a descriptive error string when the module's configuration is invalid.
282    fn configure(&self) -> Result<(), String> {
283        Ok(())
284    }
285}
286
287// ── EngineContext ─────────────────────────────────────────────────────────────
288
289/// Assembled engine infrastructure returned by [`EngineBuilder::build`].
290///
291/// `EngineContext` bundles all stores and the process registry into a single
292/// value. It is the root dependency for:
293///
294/// - Spawning new processes ([`spawn`])
295/// - Resuming existing processes ([`resume`])
296/// - Running outbox delivery workers (`outbox_store.pending_now(…)`)
297/// - Driving the deadline scheduler (`deadline_store.due_now(…)`)
298///
299/// ## Generic parameters
300///
301/// | Param | Role | Default |
302/// |-------|------|---------|
303/// | `ES`  | [`EventStore`] backend | — (required) |
304/// | `SS`  | [`SnapshotStore`] backend | [`NoopSnapshotStore`] |
305/// | `OS`  | [`OutboxStore`] backend  | [`NoopOutboxStore`]   |
306/// | `DS`  | [`DeadlineStore`] backend | [`NoopDeadlineStore`] |
307/// | `PR`  | [`ProcessRegistry`] backend | [`NoopProcessRegistry`] |
308///
309/// In most codebases all type parameters are inferred from the builder calls.
310///
311/// [`spawn`]: EngineContext::spawn
312/// [`resume`]: EngineContext::resume
313pub struct EngineContext<
314    ES,
315    SS = NoopSnapshotStore,
316    OS = NoopOutboxStore,
317    DS = NoopDeadlineStore,
318    PR = NoopProcessRegistry,
319> {
320    event_store: Arc<ES>,
321    snapshot_store: SS,
322    outbox_store: OS,
323    deadline_store: DS,
324    registry: PR,
325    /// Dead-letter sink for unroutable or unprocessable inbound messages.
326    ///
327    /// Stored as `Arc<dyn DeadLetterSink>` so callers can share it across
328    /// tasks without an extra type parameter on `EngineContext`.
329    pub dead_letter_sink: Arc<dyn DeadLetterSink>,
330    /// PID-to-workflow routing table, populated from all registered modules.
331    pid_router: PidRouter,
332    registered_modules: Vec<&'static str>,
333    /// Workflow names declared by all registered modules via
334    /// [`EngineModule::workflow_names`]. Used to validate deadline scheduler
335    /// coverage at runtime (see [`EngineContext::registered_workflows`]).
336    registered_workflows: Vec<&'static str>,
337}
338
339// ── Type aliases ──────────────────────────────────────────────────────────────
340
341/// An [`EngineContext`] with all optional subsystems disabled.
342///
343/// Uses `NoopSnapshotStore` and, in `testing`-enabled builds, Noop
344/// implementations for outbox, deadline, and process registry. Suitable for
345/// tests and minimal deployments where only a durable event store is required.
346///
347/// All five type parameters are inferred from context when used with
348/// [`EngineBuilder`]:
349///
350/// ```rust,ignore
351/// // Only available in test / testing-feature builds:
352/// use mako_engine::builder::{EngineBuilder, MinimalEngine};
353/// use mako_engine::event_store::InMemoryEventStore;
354///
355/// let ctx: MinimalEngine<InMemoryEventStore> = EngineBuilder::new()
356///     .with_event_store(InMemoryEventStore::new())
357///     .build();
358/// ```
359pub type MinimalEngine<ES> = EngineContext<ES>;
360
361impl<ES, SS, OS, DS, PR> EngineContext<ES, SS, OS, DS, PR>
362where
363    ES: EventStore,
364{
365    /// Spawn a new process and return a typed `Process<W, Arc<ES>>` handle.
366    ///
367    /// No `ES: Clone` bound is required — the engine stores the event store
368    /// behind an `Arc` so spawning is always a cheap pointer clone.
369    ///
370    /// ```rust,ignore
371    /// let p = ctx.spawn::<SupplierChangeWorkflow>(tenant_id, workflow_id);
372    /// p.execute(ReceiveUtilmd { .. }).await?;
373    /// ```
374    #[must_use]
375    pub fn spawn<W: Workflow>(
376        &self,
377        tenant_id: TenantId,
378        workflow_id: WorkflowId,
379    ) -> Process<W, Arc<ES>> {
380        Process::new(Arc::clone(&self.event_store), tenant_id, workflow_id)
381    }
382
383    /// Resume an existing process from a [`ProcessIdentity`].
384    ///
385    /// ```rust,ignore
386    /// let identity = ctx.registry()
387    ///     .lookup(tenant_id, &conv_id.to_string())
388    ///     .await?
389    ///     .ok_or(EngineError::Registry("unknown conversation".into()))?;
390    /// let p = ctx.resume::<SupplierChangeWorkflow>(identity);
391    /// p.execute(HandleAperak { .. }).await?;
392    /// ```
393    #[must_use]
394    pub fn resume<W: Workflow>(&self, identity: ProcessIdentity) -> Process<W, Arc<ES>> {
395        Process::from_identity(Arc::clone(&self.event_store), identity)
396    }
397
398    /// Names of all domain modules registered with the builder, in
399    /// registration order.
400    #[must_use]
401    pub fn registered_modules(&self) -> &[&'static str] {
402        &self.registered_modules
403    }
404
405    /// Workflow names declared by all registered modules, in registration order.
406    ///
407    /// Use this in the deadline scheduler dispatch function to detect unknown
408    /// workflow names at startup. If a deadline fires for a workflow name that
409    /// is not in this list, the scheduler's dispatch function should emit an
410    /// error rather than silently dropping the deadline:
411    ///
412    /// ```rust,ignore
413    /// let known = ctx.registered_workflows().iter().copied().collect::<HashSet<_>>();
414    /// let scheduler = ctx.run_deadline_scheduler(
415    ///     move |deadline| {
416    ///         let wf = deadline.workflow_id().name.as_ref();
417    ///         if !known.contains(wf) {
418    ///             tracing::error!(workflow = %wf, "deadline fired for unregistered workflow");
419    ///             return Box::pin(async { Ok(()) });
420    ///         }
421    ///         // dispatch by workflow name …
422    ///         Box::pin(async { Ok(()) })
423    ///     },
424    ///     100,
425    ///     Duration::from_secs(30),
426    /// );
427    /// ```
428    #[must_use]
429    pub fn registered_workflows(&self) -> &[&'static str] {
430        &self.registered_workflows
431    }
432
433    /// The event store backend (behind an `Arc`).
434    #[must_use]
435    pub fn event_store(&self) -> &Arc<ES> {
436        &self.event_store
437    }
438
439    /// The snapshot store backend.
440    #[must_use]
441    pub fn snapshot_store(&self) -> &SS {
442        &self.snapshot_store
443    }
444
445    /// The outbox store backend.
446    ///
447    /// Poll `outbox_store().pending_now(limit)` in a background task to drain
448    /// the delivery queue.
449    #[must_use]
450    pub fn outbox_store(&self) -> &OS {
451        &self.outbox_store
452    }
453
454    /// The deadline store backend.
455    ///
456    /// Poll `deadline_store().due_now(limit)` in a background scheduler to
457    /// fire overdue process timers.
458    #[must_use]
459    pub fn deadline_store(&self) -> &DS {
460        &self.deadline_store
461    }
462
463    /// The process routing registry.
464    ///
465    /// Register a [`ProcessIdentity`] under a `(tenant_id, key)` pair at
466    /// process creation, then `lookup` it when routing inbound messages.
467    #[must_use]
468    pub fn registry(&self) -> &PR {
469        &self.registry
470    }
471
472    /// The dead-letter sink for unroutable or unprocessable messages.
473    ///
474    /// Call [`DeadLetterSink::reject`] when an inbound message cannot be
475    /// dispatched to any workflow. The default sink emits `tracing::warn!`
476    /// so rejections are always visible in the log output.
477    #[must_use]
478    pub fn dead_letter_sink(&self) -> &Arc<dyn DeadLetterSink> {
479        &self.dead_letter_sink
480    }
481
482    /// Assert that no Noop store is active — call this during production startup.
483    ///
484    /// Checks the type names of `OS`, `DS`, and `PR` against the string `"Noop"`.
485    /// Panics with a human-readable message if any match, directing the operator
486    /// to configure a persistent backend.
487    ///
488    /// # When to call
489    ///
490    /// Call this early in `makod`'s startup path (and `--check` mode) to catch
491    /// deployments where a Noop store was accidentally wired — e.g. the
492    /// `[outbox]`, `[deadline]`, or `[registry]` configuration section was
493    /// omitted from `makod.toml`.  The check is defence-in-depth: in release
494    /// builds without the `testing` feature, Noop stores cannot implement the
495    /// required traits at all and the compiler would have already rejected them.
496    ///
497    /// # Panics
498    ///
499    /// Panics when any of `OS`, `DS`, or `PR` is a Noop implementation.
500    pub fn assert_production_stores(&self) {
501        let checks: &[(&str, &str)] = &[
502            ("OutboxStore", std::any::type_name::<OS>()),
503            ("DeadlineStore", std::any::type_name::<DS>()),
504            ("ProcessRegistry", std::any::type_name::<PR>()),
505        ];
506        for (trait_name, type_name) in checks {
507            assert!(
508                !type_name.contains("Noop"),
509                "makod: Noop{trait_name} is active — \
510                 configure a persistent {trait_name} backend in makod.toml. \
511                 Type resolved to: {type_name}"
512            );
513        }
514    }
515
516    /// The PID-to-workflow routing table.
517    ///
518    /// Populated **once** during [`EngineBuilder::build`] by calling
519    /// [`EngineModule::register_pids`] on every registered module in
520    /// registration order. After `build` returns the table is **sealed** —
521    /// it is read-only for the lifetime of the `EngineContext` and may be
522    /// freely shared across async tasks without synchronisation.
523    ///
524    /// # Mutability contract
525    ///
526    /// There is intentionally no `pid_router_mut()` accessor. Adding PIDs
527    /// after the engine is built would create a TOCTOU race between the
528    /// dispatch path (which calls `route(pid)`) and any hypothetical
529    /// concurrent mutator. Instead, register all PIDs during the build phase
530    /// via `EngineModule::register_pids`.
531    ///
532    /// If a new process family needs to be added without restarting the
533    /// binary, rebuild and restart `makod` — hot-swap of PID routing is not
534    /// supported.
535    ///
536    /// # Example — dispatch at the AS4 reception boundary
537    ///
538    /// ```rust,ignore
539    /// let workflow_name = ctx.pid_router().route(pid)
540    ///     .ok_or_else(|| EngineError::Workflow(WorkflowError::InvalidCommand(
541    ///         format!("no workflow registered for PID {pid}").into()
542    ///     )))?;
543    ///
544    /// match workflow_name {
545    ///     "gpke-supplier-change" => dispatch::<GpkeSupplierChangeWorkflow>(&ctx, pid, payload).await,
546    ///     "wim-device-change"    => dispatch::<WimDeviceChangeWorkflow>(&ctx, pid, payload).await,
547    ///     other => Err(EngineError::Workflow(WorkflowError::InvalidCommand(
548    ///         format!("unhandled workflow name: {other}").into()
549    ///     ))),
550    /// }
551    /// ```
552    #[must_use]
553    pub fn pid_router(&self) -> &PidRouter {
554        &self.pid_router
555    }
556}
557
558// ── As4Sender ─────────────────────────────────────────────────────────────────
559
560/// Sends a single AS4 / EDIINT-over-HTTP outbound message.
561///
562/// Implement this trait for your AS4 gateway client and pass it to
563/// [`EngineContext::run_outbox_worker`].
564///
565/// # Contract
566///
567/// Return `Ok(())` only after the message has been **durably accepted** by the
568/// receiving MSH.  Return `Err(…)` on transient or permanent failure — the
569/// outbox worker calls [`OutboxStore::reschedule`] so the message is retried.
570pub trait As4Sender: Send + Sync + 'static {
571    /// Transmit `msg` and return when the remote MSH has accepted it.
572    fn send(
573        &self,
574        msg: &OutboxMessage,
575    ) -> impl std::future::Future<Output = Result<(), EngineError>> + Send;
576}
577
578// ── OutboxWorker ──────────────────────────────────────────────────────────────
579
580/// A background worker that drains the outbox by polling pending
581/// [`OutboxMessage`]s and dispatching them via an [`As4Sender`].
582///
583/// Obtain via [`EngineContext::run_outbox_worker`] and drive by spawning
584/// [`OutboxWorker::run`] in a Tokio task.
585///
586/// # Polling behaviour
587///
588/// When the poll returns an empty batch the worker sleeps for `poll_interval`
589/// before polling again.  Non-empty batches are processed immediately.
590///
591/// # Error handling
592///
593/// Successful sends are acknowledged via [`OutboxStore::acknowledge`].
594/// Failed sends are rescheduled via [`OutboxStore::reschedule`] using
595/// **full-jitter exponential backoff**: `delay = rand(0, min(MAX, BASE * 2^n))`
596/// where `n = attempt_count`. This avoids thundering-herd when multiple
597/// `makod` instances restart simultaneously after a receiver outage.
598///
599/// When `attempt_count >= max_attempts`, the message is **acknowledged** (removed
600/// from the outbox) and a [`DeadLetterReason::OutboxExhausted`] record is written
601/// to the dead-letter sink. This prevents permanently-undeliverable messages
602/// from clogging the outbox forever.
603///
604/// All errors are emitted as structured `tracing` events at `warn` / `error`
605/// level rather than `eprintln!`, so they appear in the application's log
606/// pipeline with full context (message_id, error).
607///
608/// # Example
609///
610/// ```rust,ignore
611/// use std::time::Duration;
612///
613/// let worker = ctx.run_outbox_worker(my_sender, 50, Duration::from_secs(1));
614/// tokio::spawn(async move { worker.run().await });
615/// ```
616///
617/// [`DeadLetterReason::OutboxExhausted`]: crate::dead_letter::DeadLetterReason::OutboxExhausted
618pub struct OutboxWorker<OS: OutboxStore, S: As4Sender> {
619    store: OS,
620    sender: S,
621    batch_size: usize,
622    poll_interval: std::time::Duration,
623    /// Maximum total delivery attempts before a message is dead-lettered.
624    ///
625    /// Default: 48 (covers ~4 hours at the 300 s backoff cap).
626    /// Set to `u32::MAX` to disable the cap (not recommended for production).
627    max_attempts: u32,
628    /// Sink for messages that exceed `max_attempts`.
629    dead_letter_sink: std::sync::Arc<dyn crate::dead_letter::DeadLetterSink>,
630}
631
632/// Compute a full-jitter exponential backoff delay.
633///
634/// `attempt` is the number of prior attempts (0 = first retry).
635/// `entropy` provides randomness; derive from a stable message identifier
636/// (e.g. hash of `message_id`) rather than the current timestamp — a
637/// timestamp-derived value is deterministic within a single batch, which
638/// defeats jitter when multiple messages fail simultaneously.
639///
640/// | attempt | window (s) | expected delay (s) |
641/// |---------|------------|-------------------|
642/// | 0       | 5          | 2.5               |
643/// | 1       | 10         | 5                 |
644/// | 2       | 20         | 10                |
645/// | 3       | 40         | 20                |
646/// | 4       | 80         | 40                |
647/// | 5+      | 300 (cap)  | 150               |
648fn backoff_delay(attempt: u32, entropy: u64) -> std::time::Duration {
649    const BASE_SECS: u64 = 5;
650    const MAX_SECS: u64 = 300;
651    // Exponential window: BASE * 2^attempt, capped at MAX.
652    let window = BASE_SECS
653        .saturating_mul(1u64.wrapping_shl(attempt.min(5)))
654        .min(MAX_SECS);
655    // Full jitter: uniform random in [0, window).
656    let jitter_secs = if window == 0 { 0 } else { entropy % window };
657    std::time::Duration::from_secs(jitter_secs)
658}
659
660impl<OS: OutboxStore, S: As4Sender> OutboxWorker<OS, S> {
661    /// Run the outbox drain loop until the task is cancelled.
662    ///
663    /// # Panics
664    ///
665    /// Panics if `time::Duration::try_from(delay)` overflows (unreachable for
666    /// the delay values produced by `backoff_delay`).
667    #[allow(clippy::too_many_lines)]
668    pub async fn run(self) {
669        loop {
670            let batch = match self.store.pending_now(self.batch_size).await {
671                Ok(b) => b,
672                Err(e) => {
673                    tracing::warn!(error = %e, "outbox worker: store error polling pending messages (will retry)");
674                    tokio::time::sleep(self.poll_interval).await;
675                    continue;
676                }
677            };
678
679            if batch.is_empty() {
680                tokio::time::sleep(self.poll_interval).await;
681                continue;
682            }
683
684            for msg in batch {
685                // ── Max-attempt cap ───────────────────────────────────
686                // `attempt_count` starts at 0 and is incremented on each
687                // `reschedule` call.  When it reaches `max_attempts` the
688                // message is considered permanently undeliverable: acknowledge
689                // it (remove from outbox) and dead-letter it so the regulatory
690                // audit trail is preserved.
691                if msg.attempt_count >= self.max_attempts {
692                    tracing::error!(
693                        message_id   = %msg.message_id,
694                        message_type = %msg.message_type,
695                        recipient    = %msg.recipient,
696                        attempts     = msg.attempt_count,
697                        max_attempts = self.max_attempts,
698                        "outbox worker: max delivery attempts reached; dead-lettering message",
699                    );
700                    self.dead_letter_sink.reject(
701                        &crate::dead_letter::DeadLetterReason::OutboxExhausted {
702                            message_id: msg.message_id,
703                            message_type: msg.message_type.to_string(),
704                            recipient: msg.recipient.to_string(),
705                            last_error: format!(
706                                "delivery exhausted after {} attempts",
707                                msg.attempt_count
708                            ),
709                            attempts: msg.attempt_count,
710                        },
711                    );
712                    if let Err(e) = self.store.acknowledge(msg.message_id).await {
713                        tracing::error!(
714                            message_id = %msg.message_id,
715                            error = %e,
716                            "outbox worker: acknowledge after exhaust failed; message may reappear",
717                        );
718                    }
719                    continue;
720                }
721
722                match self.sender.send(&msg).await {
723                    Ok(()) => {
724                        if let Err(e) = self.store.acknowledge(msg.message_id).await {
725                            tracing::warn!(
726                                message_id = %msg.message_id,
727                                error = %e,
728                                "outbox worker: acknowledge failed",
729                            );
730                        }
731                        // CONTRL AHB 1.0 §1.2: the CONTRL must be delivered
732                        // within 6 wall-clock hours of interchange receipt.
733                        // `msg.created_at` is when the PendingOutbox was
734                        // materialised (which should equal the ingest timestamp
735                        // for transport-layer CONTRL obligations).
736                        if msg.message_type.as_ref() == "CONTRL" {
737                            let elapsed = time::OffsetDateTime::now_utc() - msg.created_at;
738                            if elapsed > time::Duration::hours(crate::fristen::CONTRL_FRIST_HOURS) {
739                                tracing::warn!(
740                                    message_id   = %msg.message_id,
741                                    elapsed_secs = elapsed.whole_seconds(),
742                                    max_secs     = crate::fristen::CONTRL_FRIST_HOURS * 3600,
743                                    "outbox worker: CONTRL delivered OUTSIDE the 6h Übertragungsfrist \
744                                     (CONTRL AHB 1.0 §1.2) — this is a BNetzA compliance violation"
745                                );
746                            }
747                        }
748                    }
749                    // Permanent error: dead-letter immediately without retrying.
750                    // PartnerUnknown requires operator intervention (add --as4-partner);
751                    // Serialization errors will never succeed on retry.
752                    Err(ref e)
753                        if e.is_partner_unknown() || matches!(e, EngineError::Serialization(_)) =>
754                    {
755                        tracing::error!(
756                            message_id   = %msg.message_id,
757                            message_type = %msg.message_type,
758                            recipient    = %msg.recipient,
759                            error        = %e,
760                            "outbox worker: permanent send failure; dead-lettering without retry",
761                        );
762                        self.dead_letter_sink.reject(
763                            &crate::dead_letter::DeadLetterReason::OutboxExhausted {
764                                message_id: msg.message_id,
765                                message_type: msg.message_type.to_string(),
766                                recipient: msg.recipient.to_string(),
767                                last_error: e.to_string(),
768                                attempts: msg.attempt_count,
769                            },
770                        );
771                        if let Err(re) = self.store.acknowledge(msg.message_id).await {
772                            tracing::error!(
773                                message_id = %msg.message_id,
774                                error = %re,
775                                "outbox worker: acknowledge after permanent failure failed",
776                            );
777                        }
778                    }
779                    Err(e) => {
780                        // Stable jitter entropy derived from the UUID bytes of
781                        // `message_id`.  Using the last 8 bytes as a `u64` gives
782                        // uniform entropy across message IDs (UUIDs are random in
783                        // all 128 bits for v4) and is stable across Rust versions —
784                        // unlike `DefaultHasher`, whose algorithm is explicitly
785                        // documented as unstable.
786                        let entropy = {
787                            let uuid = msg.message_id.as_uuid();
788                            let bytes = uuid.as_bytes();
789                            u64::from_le_bytes(bytes[8..16].try_into().unwrap())
790                        };
791                        let delay = backoff_delay(msg.attempt_count, entropy);
792                        let retry_at = time::OffsetDateTime::now_utc()
793                            + time::Duration::try_from(delay).unwrap_or(time::Duration::minutes(5));
794                        tracing::warn!(
795                            message_id   = %msg.message_id,
796                            attempt      = msg.attempt_count,
797                            max_attempts = self.max_attempts,
798                            retry_in     = ?delay,
799                            error        = %e,
800                            "outbox worker: send failed; rescheduling with backoff",
801                        );
802                        if let Err(re) = self.store.reschedule(msg.message_id, retry_at).await {
803                            tracing::error!(
804                                message_id = %msg.message_id,
805                                error      = %re,
806                                "outbox worker: reschedule failed; message may be stuck",
807                            );
808                        }
809                    }
810                }
811            }
812        }
813    }
814}
815
816impl<ES, SS, OS, DS, PR> EngineContext<ES, SS, OS, DS, PR>
817where
818    ES: EventStore,
819    OS: OutboxStore + Clone,
820{
821    /// Construct an [`OutboxWorker`] that drains the outbox via `sender`.
822    ///
823    /// `batch_size` — messages fetched per poll cycle.
824    /// `poll_interval` — sleep duration when the batch is empty.
825    ///
826    /// `max_attempts` — maximum total delivery attempts before dead-lettering.
827    /// Pass `48` for a ~4-hour retry budget at the 300 s backoff cap, or
828    /// `u32::MAX` to disable the cap (not recommended for production).
829    ///
830    /// ```rust,ignore
831    /// use std::time::Duration;
832    ///
833    /// let worker = ctx.run_outbox_worker(my_sender, 50, Duration::from_secs(1), 48);
834    /// tokio::spawn(async move { worker.run().await });
835    /// ```
836    #[must_use]
837    pub fn run_outbox_worker<S: As4Sender>(
838        &self,
839        sender: S,
840        batch_size: usize,
841        poll_interval: std::time::Duration,
842        max_attempts: u32,
843    ) -> OutboxWorker<OS, S> {
844        OutboxWorker {
845            store: self.outbox_store.clone(),
846            sender,
847            batch_size,
848            poll_interval,
849            max_attempts,
850            dead_letter_sink: self.dead_letter_sink.clone(),
851        }
852    }
853}
854
855impl<ES, SS, OS, DS, PR> std::fmt::Debug for EngineContext<ES, SS, OS, DS, PR>
856where
857    ES: std::fmt::Debug,
858    SS: std::fmt::Debug,
859    OS: std::fmt::Debug,
860    DS: std::fmt::Debug,
861    PR: std::fmt::Debug,
862{
863    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
864        f.debug_struct("EngineContext")
865            .field("registered_modules", &self.registered_modules)
866            .field("registered_workflows", &self.registered_workflows)
867            .field("pid_router_len", &self.pid_router.len())
868            .finish_non_exhaustive()
869    }
870}
871
872// ── NoopAs4Sender / LogAs4Sender ──────────────────────────────────────────────
873
874/// An [`As4Sender`] that succeeds immediately without sending anything.
875///
876/// Use in tests and environments where outbound AS4 delivery is not yet
877/// wired. All outbox messages are acknowledged (removed from the queue)
878/// without being transmitted.
879///
880/// # ⚠️ Data loss warning
881///
882/// Every outbox message is **silently discarded** — no EDIFACT message is
883/// sent to any counterparty. Do not use in production.
884#[derive(Debug, Clone, Copy, Default)]
885#[must_use = "NoopAs4Sender discards all outbound messages silently — use a real AS4 gateway in production"]
886pub struct NoopAs4Sender;
887
888impl As4Sender for NoopAs4Sender {
889    async fn send(&self, _msg: &OutboxMessage) -> Result<(), EngineError> {
890        Ok(())
891    }
892}
893
894/// An [`As4Sender`] that logs every outbound message at `warn` level and
895/// succeeds without transmitting.
896///
897/// Useful for development and integration-testing environments where the
898/// full AS4 stack is not yet available but message visibility is desired.
899/// All outbox messages are acknowledged (removed from the queue) after logging.
900///
901/// # ⚠️ Data loss warning
902///
903/// No EDIFACT message is sent to any counterparty. Do not use in production.
904#[derive(Debug, Clone, Copy, Default)]
905#[must_use = "LogAs4Sender discards all outbound messages — use a real AS4 gateway in production"]
906pub struct LogAs4Sender;
907
908impl As4Sender for LogAs4Sender {
909    async fn send(&self, msg: &OutboxMessage) -> Result<(), EngineError> {
910        tracing::warn!(
911            message_id   = %msg.message_id,
912            message_type = %msg.message_type,
913            recipient    = %msg.recipient,
914            "LogAs4Sender: outbox message dropped — configure a real AS4 gateway for production",
915        );
916        Ok(())
917    }
918}
919
920// ── DeadlineScheduler ─────────────────────────────────────────────────────────
921
922/// A background task that polls [`DeadlineStore::due_now`] and dispatches
923/// deadline commands to the owning processes via a caller-supplied function.
924///
925/// Obtain via [`EngineContext::run_deadline_scheduler`] and drive by spawning
926/// [`DeadlineScheduler::run`] in a Tokio task.
927///
928/// # Dispatch function
929///
930/// The `dispatch` function receives a fired [`Deadline`] and returns a future
931/// that dispatches the appropriate timeout command to the process. The function
932/// is responsible for resuming the correct workflow and calling `execute`.
933/// After the future completes, the scheduler cancels the deadline from the
934/// store regardless of the dispatch outcome (to prevent re-firing).
935///
936/// ```rust,ignore
937/// use std::time::Duration;
938///
939/// let scheduler = ctx.run_deadline_scheduler(
940///     |deadline| async move {
941///         tracing::warn!(
942///             deadline_id = %deadline.deadline_id(),
943///             label = %deadline.label(),
944///             "deadline fired",
945///         );
946///         Ok(())
947///     },
948///     100,
949///     Duration::from_secs(30),
950/// );
951/// tokio::spawn(async move { scheduler.run().await });
952/// ```
953pub struct DeadlineScheduler<DS: DeadlineStore> {
954    store: DS,
955    dispatch: Box<
956        dyn Fn(
957                Deadline,
958            ) -> std::pin::Pin<
959                Box<dyn std::future::Future<Output = Result<(), EngineError>> + Send>,
960            > + Send
961            + Sync,
962    >,
963    batch_size: usize,
964    poll_interval: std::time::Duration,
965}
966
967impl<DS: DeadlineStore> DeadlineScheduler<DS> {
968    /// Run the deadline poll loop until the task is cancelled.
969    pub async fn run(self) {
970        loop {
971            let result = match self.store.due_now(self.batch_size).await {
972                Ok(r) => r,
973                Err(e) => {
974                    tracing::warn!(
975                        error = %e,
976                        "deadline scheduler: store error polling due deadlines (will retry)",
977                    );
978                    tokio::time::sleep(self.poll_interval).await;
979                    continue;
980                }
981            };
982
983            if result.deadlines.is_empty() {
984                tokio::time::sleep(self.poll_interval).await;
985                continue;
986            }
987
988            for deadline in result.deadlines {
989                let id = deadline.deadline_id();
990                let label = deadline.label().to_owned();
991                let should_cancel = match (self.dispatch)(deadline).await {
992                    Ok(()) => true,
993                    Err(ref e) if e.is_version_conflict() => {
994                        // The process was modified concurrently; the timeout
995                        // command will be retried on the next poll cycle.
996                        // Do NOT cancel — let the deadline remain due so it
997                        // fires again until a non-conflict dispatch succeeds.
998                        tracing::warn!(
999                            deadline_id = %id,
1000                            label       = %label,
1001                            "deadline scheduler: VersionConflict; will retry on next poll",
1002                        );
1003                        false
1004                    }
1005                    Err(e) => {
1006                        tracing::warn!(
1007                            deadline_id = %id,
1008                            label       = %label,
1009                            error       = %e,
1010                            "deadline scheduler: dispatch failed (permanent); cancelling",
1011                        );
1012                        true
1013                    }
1014                };
1015                if should_cancel {
1016                    if let Err(e) = self.store.cancel(id).await {
1017                        tracing::error!(
1018                            deadline_id = %id,
1019                            error       = %e,
1020                            "deadline scheduler: cancel failed; deadline may fire again",
1021                        );
1022                    }
1023                }
1024            }
1025
1026            // If has_more, loop immediately to drain the batch.
1027        }
1028    }
1029}
1030
1031impl<ES, SS, OS, DS, PR> EngineContext<ES, SS, OS, DS, PR>
1032where
1033    ES: EventStore,
1034    DS: DeadlineStore + Clone,
1035{
1036    /// Construct a [`DeadlineScheduler`] that polls the deadline store and
1037    /// dispatches fired deadlines via `dispatch`.
1038    ///
1039    /// The `dispatch` function is called for every fired deadline. It should
1040    /// resume the owning process and execute the appropriate timeout command.
1041    ///
1042    /// `batch_size` — deadlines fetched per poll cycle.
1043    /// `poll_interval` — sleep duration when no deadlines are due.
1044    ///
1045    /// ```rust,ignore
1046    /// use std::time::Duration;
1047    ///
1048    /// let scheduler = ctx.run_deadline_scheduler(
1049    ///     |d| async move {
1050    ///         tracing::info!(label = %d.label(), "firing deadline");
1051    ///         Ok(())
1052    ///     },
1053    ///     100,
1054    ///     Duration::from_secs(30),
1055    /// );
1056    /// tokio::spawn(async move { scheduler.run().await });
1057    /// ```
1058    #[must_use]
1059    pub fn run_deadline_scheduler<F, Fut>(
1060        &self,
1061        dispatch: F,
1062        batch_size: usize,
1063        poll_interval: std::time::Duration,
1064    ) -> DeadlineScheduler<DS>
1065    where
1066        F: Fn(Deadline) -> Fut + Send + Sync + 'static,
1067        Fut: std::future::Future<Output = Result<(), EngineError>> + Send + 'static,
1068    {
1069        DeadlineScheduler {
1070            store: self.deadline_store.clone(),
1071            dispatch: Box::new(move |d| Box::pin(dispatch(d))),
1072            batch_size,
1073            poll_interval,
1074        }
1075    }
1076}
1077
1078// ── EngineBuilder ─────────────────────────────────────────────────────────────
1079
1080/// Assembles engine infrastructure and produces an [`EngineContext`].
1081///
1082/// Uses type-state to enforce that an event store is provided before
1083/// [`build`] can be called. All other stores default to `Noop`
1084/// implementations.
1085///
1086/// ## Quick start
1087///
1088/// ```rust,ignore
1089/// // Minimal — event store only, all others are Noop:
1090/// let ctx = EngineBuilder::new()
1091///     .with_event_store(InMemoryEventStore::new())
1092///     .build();
1093///
1094/// // Full infrastructure:
1095/// let ctx = EngineBuilder::new()
1096///     .with_event_store(InMemoryEventStore::new())
1097///     .with_snapshot_store(InMemorySnapshotStore::new())
1098///     .with_outbox_store(InMemoryOutboxStore::new())
1099///     .with_deadline_store(InMemoryDeadlineStore::new())
1100///     .with_registry(InMemoryProcessRegistry::new())
1101///     .register(Box::new(GpkeModule))
1102///     .build();
1103/// ```
1104///
1105/// [`build`]: EngineBuilder::build
1106pub struct EngineBuilder<
1107    ES = (),
1108    SS = NoopSnapshotStore,
1109    OS = NoopOutboxStore,
1110    DS = NoopDeadlineStore,
1111    PR = NoopProcessRegistry,
1112> {
1113    event_store: ES,
1114    snapshot_store: SS,
1115    outbox_store: OS,
1116    deadline_store: DS,
1117    registry: PR,
1118    dead_letter_sink: Arc<dyn DeadLetterSink>,
1119    modules: Vec<Box<dyn EngineModule>>,
1120    /// Active [`DeploymentRoles`] for this engine instance.
1121    ///
1122    /// Controls role-conditional PID registration via
1123    /// [`EngineModule::register_pids_with_roles`]. Defaults to
1124    /// [`DeploymentRoles::all()`] for backward compatibility.
1125    deployment_roles: DeploymentRoles,
1126    /// Optional profile validator injected by `makod` or callers that have
1127    /// access to `edi-energy`.  When `Some`, called for each
1128    /// [`ProfileRequirement`] declared by registered modules.  When `None`,
1129    /// profile requirements are not validated (safe in unit tests).
1130    ///
1131    /// Signature: `fn(message_type: &str) -> bool`
1132    ///
1133    /// [`ProfileRequirement`]: crate::profile::ProfileRequirement
1134    profile_validator: Option<Box<dyn Fn(&str) -> bool + Send + Sync>>,
1135}
1136#[cfg(any(test, feature = "testing"))]
1137impl Default
1138    for EngineBuilder<
1139        (),
1140        NoopSnapshotStore,
1141        NoopOutboxStore,
1142        NoopDeadlineStore,
1143        NoopProcessRegistry,
1144    >
1145{
1146    fn default() -> Self {
1147        Self {
1148            event_store: (),
1149            snapshot_store: NoopSnapshotStore,
1150            outbox_store: NoopOutboxStore,
1151            deadline_store: NoopDeadlineStore,
1152            registry: NoopProcessRegistry,
1153            dead_letter_sink: Arc::new(LogDeadLetterSink),
1154            modules: Vec::new(),
1155            deployment_roles: DeploymentRoles::all(),
1156            profile_validator: None,
1157        }
1158    }
1159}
1160
1161#[cfg(any(test, feature = "testing"))]
1162impl EngineBuilder {
1163    /// Create a new builder with all `Noop` defaults.
1164    ///
1165    /// Only available in `#[cfg(test)]` or with the `testing` feature enabled,
1166    /// because the Noop defaults silently discard outbox messages, deadlines,
1167    /// and process registry entries. Production binaries must wire real stores
1168    /// via the `with_*` builder methods.
1169    ///
1170    /// Call [`with_event_store`] before [`build`] — the event store is
1171    /// **required**.
1172    ///
1173    /// [`with_event_store`]: EngineBuilder::with_event_store
1174    /// [`build`]: EngineBuilder::build
1175    #[must_use]
1176    pub fn new() -> Self {
1177        Self::default()
1178    }
1179}
1180
1181impl<OS, DS, PR> EngineBuilder<(), NoopSnapshotStore, OS, DS, PR>
1182where
1183    OS: OutboxStore,
1184    DS: DeadlineStore,
1185    PR: ProcessRegistry,
1186{
1187    /// Create a production-ready builder with explicit stores for outbox,
1188    /// deadline, and process registry.
1189    ///
1190    /// This constructor is available in all build configurations including
1191    /// production binaries. It enforces that the three stores that can cause
1192    /// silent data loss (`OutboxStore`, `DeadlineStore`, `ProcessRegistry`)
1193    /// are provided explicitly — there is no Noop fallback.
1194    ///
1195    /// `NoopSnapshotStore` is used as the snapshot default because it is safe
1196    /// for production: skipping snapshots means full replay, but no data loss.
1197    /// Override with [`with_snapshot_store`] to enable snapshot-accelerated
1198    /// replay.
1199    ///
1200    /// Call [`with_event_store`] before [`build`] — the event store is
1201    /// **required**.
1202    ///
1203    /// ```rust,ignore
1204    /// let ctx = EngineBuilder::with_stores(outbox, deadline, registry)
1205    ///     .with_event_store(store.clone())
1206    ///     .with_snapshot_store(InMemorySnapshotStore::new())
1207    ///     .build();
1208    /// ```
1209    ///
1210    /// [`with_snapshot_store`]: EngineBuilder::with_snapshot_store
1211    /// [`with_event_store`]: EngineBuilder::with_event_store
1212    /// [`build`]: EngineBuilder::build
1213    #[must_use]
1214    pub fn with_stores(outbox_store: OS, deadline_store: DS, registry: PR) -> Self {
1215        Self {
1216            event_store: (),
1217            snapshot_store: NoopSnapshotStore,
1218            outbox_store,
1219            deadline_store,
1220            registry,
1221            dead_letter_sink: Arc::new(LogDeadLetterSink),
1222            modules: Vec::new(),
1223            deployment_roles: DeploymentRoles::all(),
1224            profile_validator: None,
1225        }
1226    }
1227}
1228
1229impl<ES, SS, OS, DS, PR> EngineBuilder<ES, SS, OS, DS, PR> {
1230    /// Set the event store. **Required** — `build()` is only available once
1231    /// this has been called with a type that implements [`EventStore`].
1232    ///
1233    /// Replaces any previously set event store (type-state transition).
1234    #[must_use]
1235    pub fn with_event_store<ES2: EventStore>(
1236        self,
1237        store: ES2,
1238    ) -> EngineBuilder<ES2, SS, OS, DS, PR> {
1239        EngineBuilder {
1240            event_store: store,
1241            snapshot_store: self.snapshot_store,
1242            outbox_store: self.outbox_store,
1243            deadline_store: self.deadline_store,
1244            registry: self.registry,
1245            dead_letter_sink: self.dead_letter_sink,
1246            modules: self.modules,
1247            deployment_roles: self.deployment_roles,
1248            profile_validator: self.profile_validator,
1249        }
1250    }
1251
1252    /// Set the snapshot store (default: [`NoopSnapshotStore`]).
1253    ///
1254    /// ## Default: `NoopSnapshotStore`
1255    ///
1256    /// Without calling this method the builder uses [`NoopSnapshotStore`],
1257    /// which silently discards all snapshot writes and returns `None` for
1258    /// every snapshot read.  The engine still functions correctly — every
1259    /// command handling call replays the full event log from the beginning
1260    /// instead of starting from a stored snapshot.  For low-volume processes
1261    /// this is fine; for long-lived processes with many events the replay cost
1262    /// can become significant.
1263    ///
1264    /// Enable snapshotting in production by providing a real [`SnapshotStore`]
1265    /// implementation (e.g. the SlateDB-backed store in `makod`).  In tests,
1266    /// [`InMemorySnapshotStore`][crate::snapshot::InMemorySnapshotStore] is
1267    /// available behind the `testing` feature flag.
1268    ///
1269    /// Note: [`Process::state_with_snapshot`][crate::process::Process::state_with_snapshot]
1270    /// is a compile-time no-op when the snapshot store is `NoopSnapshotStore`
1271    /// — it never calls the store and always returns `None`, so no snapshot is
1272    /// ever saved or loaded.
1273    #[must_use]
1274    pub fn with_snapshot_store<SS2: SnapshotStore>(
1275        self,
1276        store: SS2,
1277    ) -> EngineBuilder<ES, SS2, OS, DS, PR> {
1278        EngineBuilder {
1279            event_store: self.event_store,
1280            snapshot_store: store,
1281            outbox_store: self.outbox_store,
1282            deadline_store: self.deadline_store,
1283            registry: self.registry,
1284            dead_letter_sink: self.dead_letter_sink,
1285            modules: self.modules,
1286            deployment_roles: self.deployment_roles,
1287            profile_validator: self.profile_validator,
1288        }
1289    }
1290
1291    /// Set the outbox store (default: [`NoopOutboxStore`]).
1292    #[must_use]
1293    pub fn with_outbox_store<OS2: OutboxStore>(
1294        self,
1295        store: OS2,
1296    ) -> EngineBuilder<ES, SS, OS2, DS, PR> {
1297        EngineBuilder {
1298            event_store: self.event_store,
1299            snapshot_store: self.snapshot_store,
1300            outbox_store: store,
1301            deadline_store: self.deadline_store,
1302            registry: self.registry,
1303            dead_letter_sink: self.dead_letter_sink,
1304            modules: self.modules,
1305            deployment_roles: self.deployment_roles,
1306            profile_validator: self.profile_validator,
1307        }
1308    }
1309
1310    /// Set the deadline store (default: [`NoopDeadlineStore`]).
1311    #[must_use]
1312    pub fn with_deadline_store<DS2: DeadlineStore>(
1313        self,
1314        store: DS2,
1315    ) -> EngineBuilder<ES, SS, OS, DS2, PR> {
1316        EngineBuilder {
1317            event_store: self.event_store,
1318            snapshot_store: self.snapshot_store,
1319            outbox_store: self.outbox_store,
1320            deadline_store: store,
1321            registry: self.registry,
1322            dead_letter_sink: self.dead_letter_sink,
1323            modules: self.modules,
1324            deployment_roles: self.deployment_roles,
1325            profile_validator: self.profile_validator,
1326        }
1327    }
1328
1329    /// Set the process registry (default: [`NoopProcessRegistry`]).
1330    #[must_use]
1331    pub fn with_registry<PR2: ProcessRegistry>(
1332        self,
1333        registry: PR2,
1334    ) -> EngineBuilder<ES, SS, OS, DS, PR2> {
1335        EngineBuilder {
1336            event_store: self.event_store,
1337            snapshot_store: self.snapshot_store,
1338            outbox_store: self.outbox_store,
1339            deadline_store: self.deadline_store,
1340            registry,
1341            dead_letter_sink: self.dead_letter_sink,
1342            modules: self.modules,
1343            deployment_roles: self.deployment_roles,
1344            profile_validator: self.profile_validator,
1345        }
1346    }
1347
1348    /// Set the dead-letter sink (default: [`LogDeadLetterSink`]).
1349    ///
1350    /// The dead-letter sink receives every message that cannot be routed to a
1351    /// workflow. The default [`LogDeadLetterSink`] emits `tracing::warn!`
1352    /// events, making rejections visible in log output without configuration.
1353    ///
1354    /// Override with a persistent DLQ implementation in production:
1355    ///
1356    /// ```rust,ignore
1357    /// use mako_engine::dead_letter::LogDeadLetterSink;
1358    ///
1359    /// let ctx = EngineBuilder::new()
1360    ///     .with_event_store(my_store)
1361    ///     .with_dead_letter_sink(MyPersistentDlq::new())
1362    ///     .build();
1363    /// ```
1364    ///
1365    /// [`LogDeadLetterSink`]: crate::dead_letter::LogDeadLetterSink
1366    #[must_use]
1367    pub fn with_dead_letter_sink(mut self, sink: impl DeadLetterSink) -> Self {
1368        self.dead_letter_sink = Arc::new(sink);
1369        self
1370    }
1371
1372    /// Register an `edi-energy` profile validator for startup profile checks.
1373    ///
1374    /// The closure receives a message-type string (e.g. `"UTILMD"`) and must
1375    /// return `true` if at least one active profile for that message type is
1376    /// registered for today's date.
1377    ///
1378    /// Wire this in `makod` using the `edi-energy` global registry:
1379    ///
1380    /// ```rust,ignore
1381    /// use edi_energy::registry::ReleaseRegistry;
1382    ///
1383    /// let today = time::OffsetDateTime::now_utc().date();
1384    /// builder.with_profile_validator(move |msg_type| {
1385    ///     ReleaseRegistry::global()
1386    ///         .profiles_for_str(msg_type)
1387    ///         .any(|p| match (p.valid_from(), p.valid_until()) {
1388    ///             (Some(f), Some(u)) => f <= today && today <= u,
1389    ///             (Some(f), None)    => f <= today,
1390    ///             (None, _)          => true,
1391    ///         })
1392    /// })
1393    /// ```
1394    ///
1395    /// Domain crates do **not** need to call this — they only declare
1396    /// [`profile_requirements`].
1397    ///
1398    /// [`profile_requirements`]: EngineModule::profile_requirements
1399    #[must_use]
1400    pub fn with_profile_validator(
1401        mut self,
1402        validator: impl Fn(&str) -> bool + Send + Sync + 'static,
1403    ) -> Self {
1404        self.profile_validator = Some(Box::new(validator));
1405        self
1406    }
1407
1408    /// Register a domain module.
1409    ///
1410    /// The module name becomes visible in
1411    /// [`EngineContext::registered_modules`] after [`build`] is called.
1412    ///
1413    /// [`build`]: EngineBuilder::build
1414    #[must_use]
1415    pub fn register(mut self, module: Box<dyn EngineModule>) -> Self {
1416        self.modules.push(module);
1417        self
1418    }
1419
1420    /// Set the active [`DeploymentRoles`] for this engine instance.
1421    ///
1422    /// Controls role-conditional PID registration in [`EngineModule::register_pids_with_roles`].
1423    ///
1424    /// The default is [`DeploymentRoles::all()`], which registers every PID unconditionally
1425    /// — identical to the pre-role-aware behavior. Providing an explicit role set
1426    /// restricts role-conditional blocks to only the declared roles:
1427    ///
1428    /// - **NB-only** (`DeploymentRoles::nb()`): 19001/19002 route to `gpke-konfiguration`;
1429    ///   WiM nMSB blocks are skipped.
1430    /// - **nMSB-only** (`DeploymentRoles::nmsb()`): 19001/19002 route to `wim-geraeteubernahme`;
1431    ///   GPKE NB blocks are skipped.
1432    /// - **NB + gMSB** (`DeploymentRoles::nb_msb()`): most common Stadtwerke combination.
1433    ///
1434    /// # Conflict guard
1435    ///
1436    /// When two modules would register the same PID to **different** workflows, the
1437    /// engine panics during [`build`]. Set explicit roles to prevent both modules from
1438    /// activating the same PID simultaneously:
1439    ///
1440    /// ```rust,ignore
1441    /// use mako_engine::marktrolle::DeploymentRoles;
1442    ///
1443    /// let ctx = EngineBuilder::with_stores(outbox, deadline, registry)
1444    ///     .with_event_store(store)
1445    ///     .with_deployment_roles(DeploymentRoles::nb())  // only NB: GPKE gets 19001/19002
1446    ///     .register(Box::new(GpkeModule))
1447    ///     .register(Box::new(WimModule))  // nMSB block skipped — no conflict
1448    ///     .build();
1449    /// ```
1450    ///
1451    /// [`build`]: EngineBuilder::build
1452    #[must_use]
1453    pub fn with_deployment_roles(mut self, roles: DeploymentRoles) -> Self {
1454        self.deployment_roles = roles;
1455        self
1456    }
1457}
1458
1459impl<ES, SS, OS, DS, PR> EngineBuilder<ES, SS, OS, DS, PR>
1460where
1461    ES: EventStore,
1462    SS: SnapshotStore,
1463    OS: OutboxStore,
1464    DS: DeadlineStore,
1465    PR: ProcessRegistry,
1466{
1467    /// Build the [`EngineContext`].
1468    ///
1469    /// Consumes the builder. All registered modules and configured stores are
1470    /// moved into the returned [`EngineContext`].
1471    ///
1472    /// This method is only available when `ES` implements [`EventStore`].
1473    /// If you have not called [`with_event_store`], this will not compile.
1474    ///
1475    /// # Panics
1476    ///
1477    /// Panics when any registered module returns `Err` from
1478    /// [`EngineModule::configure`]. The panic message includes the module
1479    /// name and the error string so the deployment failure is actionable.
1480    ///
1481    /// [`with_event_store`]: EngineBuilder::with_event_store
1482    #[must_use]
1483    #[allow(clippy::too_many_lines)]
1484    pub fn build(self) -> EngineContext<ES, SS, OS, DS, PR> {
1485        // ── Noop store safety checks ──────────────────────────────────────────
1486        //
1487        // Noop stores lose data silently: NoopDeadlineStore drops every APERAK
1488        // deadline (BNetzA violation), NoopOutboxStore discards all outbound
1489        // messages, NoopProcessRegistry loses conversation routing on restart.
1490        //
1491        // In production builds (no `testing` feature, not running under
1492        // `#[test]`), the Noop constructors are cfg-gated out so this branch
1493        // is dead code and compiles away. In test/testing/tracing builds we
1494        // emit warnings so test harnesses see the configuration in log output.
1495        //
1496        // IMPORTANT: if you are reading this because a panic fired in production,
1497        // it means the `testing` feature was accidentally enabled in the binary.
1498        // Remove it from the production Cargo.toml feature list immediately.
1499        {
1500            let os_name = std::any::type_name::<OS>();
1501            let ds_name = std::any::type_name::<DS>();
1502            let pr_name = std::any::type_name::<PR>();
1503
1504            // Regulatory-critical stores: panic in any build context if these
1505            // are noop. OutboxStore and DeadlineStore must be durable in
1506            // production; ProcessRegistry must survive restarts.
1507            #[cfg(not(any(test, feature = "testing")))]
1508            {
1509                if ds_name.contains("NoopDeadlineStore") {
1510                    panic!(
1511                        "EngineBuilder::build: NoopDeadlineStore is active in a \
1512                         non-testing build. This silently discards all APERAK deadlines, \
1513                         which is an immediately reportable BNetzA violation \
1514                         (BK6-22-024 §5, BK7-24-01-009). \
1515                         Call .with_deadline_store(SlateDbStore::as_deadline_store()) \
1516                         in your production engine assembly. \
1517                         If this is a test, enable the 'testing' feature."
1518                    );
1519                }
1520                if os_name.contains("NoopOutboxStore") {
1521                    panic!(
1522                        "EngineBuilder::build: NoopOutboxStore is active in a \
1523                         non-testing build. This silently discards all outbound \
1524                         APERAK, CONTRL, and UTILMD messages. \
1525                         Call .with_outbox_store(SlateDbStore::as_outbox_store()) \
1526                         in your production engine assembly. \
1527                         If this is a test, enable the 'testing' feature."
1528                    );
1529                }
1530                if pr_name.contains("NoopProcessRegistry") {
1531                    panic!(
1532                        "EngineBuilder::build: NoopProcessRegistry is active in a \
1533                         non-testing build. This means conversation routing \
1534                         (PID → stream_id lookup) is lost on every restart, \
1535                         breaking all WiM, GeLi Gas, and GPKE in-flight processes. \
1536                         Call .with_registry(SlateDbStore::as_process_registry()) \
1537                         in your production engine assembly. \
1538                         If this is a test, enable the 'testing' feature."
1539                    );
1540                }
1541            }
1542
1543            // In test/testing/tracing builds: emit warnings instead of panicking.
1544            #[cfg(any(test, feature = "testing", feature = "tracing"))]
1545            {
1546                let ss_name = std::any::type_name::<SS>();
1547                if ss_name.contains("NoopSnapshotStore") {
1548                    tracing::warn!(
1549                        store = ss_name,
1550                        "EngineBuilder: NoopSnapshotStore is active — snapshots will not be \
1551                         persisted. Use SlateDbStore::as_snapshot_store() in production."
1552                    );
1553                }
1554                if os_name.contains("NoopOutboxStore") {
1555                    tracing::warn!(
1556                        store = os_name,
1557                        "EngineBuilder: NoopOutboxStore is active — outbound messages will be \
1558                         silently discarded. Use SlateDbStore::as_outbox_store() in production."
1559                    );
1560                }
1561                if ds_name.contains("NoopDeadlineStore") {
1562                    tracing::warn!(
1563                        store = ds_name,
1564                        "EngineBuilder: NoopDeadlineStore is active — scheduled deadlines will \
1565                         not fire after restart. Use SlateDbStore::as_deadline_store() in production."
1566                    );
1567                }
1568                if pr_name.contains("NoopProcessRegistry") {
1569                    tracing::warn!(
1570                        store = pr_name,
1571                        "EngineBuilder: NoopProcessRegistry is active — process routing will be \
1572                         lost on restart. Use SlateDbStore::as_process_registry() in production."
1573                    );
1574                }
1575            }
1576        }
1577        // Validate every module before assembling the context.
1578        // A missing adapter or misconfigured module fails at startup (not at
1579        // first inbound message), making deployment failures observable immediately.
1580        for module in &self.modules {
1581            if let Err(msg) = module.configure() {
1582                panic!(
1583                    "EngineBuilder::build: module '{}' failed configuration validation: {}",
1584                    module.name(),
1585                    msg
1586                );
1587            }
1588            // Validate profile requirements via the injected validator.
1589            // Domain crates declare requirements; only the binary crate (makod)
1590            // injects the edi-energy registry — domain crates need no edi-energy
1591            // import for this check.
1592            if let Some(ref validator) = self.profile_validator {
1593                for req in module.profile_requirements() {
1594                    assert!(
1595                        validator(req.message_type),
1596                        "EngineBuilder::build: module '{}' requires an active edi-energy \
1597                             profile for '{}' ({}) but none is registered for today's date. \
1598                             Run `cargo xtask codegen` to add the missing profile.",
1599                        module.name(),
1600                        req.message_type,
1601                        req.label,
1602                    );
1603                }
1604            }
1605        }
1606        // Build the PID router from all registered modules.
1607        // Also assert that no two modules claim the same PID — a PID overlap
1608        // is always a configuration error: one module's messages would be
1609        // silently swallowed by another's workflow, producing missing-process
1610        // errors or incorrect audit trails.
1611        let mut pid_router = PidRouter::new();
1612        let mut pid_owners: std::collections::HashMap<u32, &str> = std::collections::HashMap::new();
1613        for module in &self.modules {
1614            // Temporarily build a scratch router to read this module's PIDs
1615            // for cross-module overlap detection (module-ownership level).
1616            let mut scratch = PidRouter::new();
1617            module.register_pids_with_roles(&mut scratch, &self.deployment_roles);
1618            for pid in scratch.registered_pids() {
1619                if let Some(prev) = pid_owners.insert(pid, module.name()) {
1620                    if self.deployment_roles.is_all() {
1621                        // With DeploymentRoles::all() (the default), role-conditional PIDs
1622                        // are registered by all modules that claim them, producing last-wins
1623                        // semantics. This is acceptable for single-role and dev/test deployments.
1624                        //
1625                        // In production multi-role deployments where both an NB and nMSB role
1626                        // are served by the same instance, set explicit roles via
1627                        // `EngineBuilder::with_deployment_roles` to prevent silent misrouting.
1628                        //
1629                        // We emit a debug-level log here (not warn) because the vast majority
1630                        // of deployments are single-role and this overlap is expected/harmless.
1631                        #[cfg(feature = "tracing")]
1632                        tracing::debug!(
1633                            pid,
1634                            previous_module = prev,
1635                            current_module = module.name(),
1636                            "PID registered by multiple modules with DeploymentRoles::all(); \
1637                             last module wins (use with_deployment_roles for strict routing)",
1638                        );
1639                        let _ = prev; // suppress unused-variable warning when tracing is off
1640                    } else {
1641                        panic!(
1642                            "EngineBuilder::build: PID {pid} is claimed by both \
1643                             '{prev}' and '{}' — overlapping PID registrations are \
1644                             not allowed with explicit DeploymentRoles; each PID must be \
1645                             owned by exactly one module.\n  \
1646                             Hint: use EngineBuilder::with_deployment_roles to restrict \
1647                             role-conditional PIDs so only one module registers each PID.\n  \
1648                             Example: with_deployment_roles(DeploymentRoles::nb()) ensures \
1649                             GPKE registers ORDRSP 19001/19002, not WiM.",
1650                            module.name(),
1651                        );
1652                    }
1653                }
1654            }
1655            // Register into the real router with conflict detection.
1656            module.register_pids_with_roles(&mut pid_router, &self.deployment_roles);
1657        }
1658        let registered_modules = self.modules.iter().map(|m| m.name()).collect();
1659        let registered_workflows = self
1660            .modules
1661            .iter()
1662            .flat_map(|m| m.workflow_names().iter().copied())
1663            .collect();
1664        EngineContext {
1665            event_store: Arc::new(self.event_store),
1666            snapshot_store: self.snapshot_store,
1667            outbox_store: self.outbox_store,
1668            deadline_store: self.deadline_store,
1669            registry: self.registry,
1670            dead_letter_sink: self.dead_letter_sink,
1671            pid_router,
1672            registered_modules,
1673            registered_workflows,
1674        }
1675    }
1676}
1677
1678#[cfg(test)]
1679mod tests {
1680    use super::*;
1681    use crate::{
1682        deadline::InMemoryDeadlineStore,
1683        error::WorkflowError,
1684        event_store::InMemoryEventStore,
1685        ids::TenantId,
1686        outbox::InMemoryOutboxStore,
1687        pid_router::PidRouter,
1688        registry::InMemoryProcessRegistry,
1689        snapshot::InMemorySnapshotStore,
1690        version::WorkflowId,
1691        workflow::{CommandPayload, EventPayload, Workflow},
1692    };
1693
1694    // ── Minimal workflow for spawn/resume tests ───────────────────────────────
1695
1696    #[derive(serde::Serialize, serde::Deserialize)]
1697    struct PingEvent;
1698
1699    impl EventPayload for PingEvent {
1700        fn event_type(&self) -> &'static str {
1701            "Ping"
1702        }
1703    }
1704
1705    struct PingCommand;
1706
1707    impl CommandPayload for PingCommand {}
1708
1709    #[derive(Default, Clone)]
1710    struct PingState;
1711
1712    struct PingWorkflow;
1713
1714    impl Workflow for PingWorkflow {
1715        type State = PingState;
1716        type Event = PingEvent;
1717        type Command = PingCommand;
1718
1719        fn apply(state: PingState, _: &PingEvent) -> PingState {
1720            state
1721        }
1722
1723        fn handle(
1724            _: &PingState,
1725            _: PingCommand,
1726        ) -> Result<crate::workflow::WorkflowOutput<PingEvent>, WorkflowError> {
1727            Ok(vec![PingEvent].into())
1728        }
1729    }
1730
1731    struct TestModule;
1732
1733    impl EngineModule for TestModule {
1734        fn name(&self) -> &'static str {
1735            "test-module"
1736        }
1737    }
1738
1739    // ── Tests ─────────────────────────────────────────────────────────────────
1740
1741    #[test]
1742    fn build_with_event_store_only() {
1743        let ctx = EngineBuilder::new()
1744            .with_event_store(InMemoryEventStore::new())
1745            .build();
1746        assert!(ctx.registered_modules().is_empty());
1747    }
1748
1749    #[test]
1750    fn build_with_all_stores_and_module() {
1751        let ctx = EngineBuilder::new()
1752            .with_event_store(InMemoryEventStore::new())
1753            .with_snapshot_store(InMemorySnapshotStore::new())
1754            .with_outbox_store(InMemoryOutboxStore::new())
1755            .with_deadline_store(InMemoryDeadlineStore::new())
1756            .with_registry(InMemoryProcessRegistry::new())
1757            .register(Box::new(TestModule))
1758            .build();
1759        assert_eq!(ctx.registered_modules(), &["test-module"]);
1760    }
1761
1762    #[test]
1763    fn multiple_modules_ordered() {
1764        struct ModA;
1765        impl EngineModule for ModA {
1766            fn name(&self) -> &'static str {
1767                "mod-a"
1768            }
1769        }
1770        struct ModB;
1771        impl EngineModule for ModB {
1772            fn name(&self) -> &'static str {
1773                "mod-b"
1774            }
1775        }
1776
1777        let ctx = EngineBuilder::new()
1778            .with_event_store(InMemoryEventStore::new())
1779            .register(Box::new(ModA))
1780            .register(Box::new(ModB))
1781            .build();
1782        assert_eq!(ctx.registered_modules(), &["mod-a", "mod-b"]);
1783    }
1784
1785    #[tokio::test]
1786    async fn spawn_creates_independent_processes() {
1787        let ctx = EngineBuilder::new()
1788            .with_event_store(InMemoryEventStore::new())
1789            .build();
1790        let wf_id = WorkflowId::new("ping", "FV2024-10-01");
1791
1792        let p1 = ctx.spawn::<PingWorkflow>(TenantId::new(), wf_id.clone());
1793        let p2 = ctx.spawn::<PingWorkflow>(TenantId::new(), wf_id);
1794
1795        assert_ne!(p1.process_id(), p2.process_id());
1796    }
1797
1798    #[tokio::test]
1799    async fn resume_sees_previously_appended_events() {
1800        let store = InMemoryEventStore::new();
1801        let ctx = EngineBuilder::new().with_event_store(store).build();
1802
1803        let p = ctx.spawn::<PingWorkflow>(TenantId::new(), WorkflowId::new("ping", "FV2024-10-01"));
1804        p.execute(PingCommand).await.unwrap();
1805
1806        let identity = p.identity();
1807        let resumed = ctx.resume::<PingWorkflow>(identity);
1808        assert_eq!(resumed.event_count().await.unwrap(), 1);
1809    }
1810
1811    #[tokio::test]
1812    async fn registry_routes_process_via_conversation_key() {
1813        use crate::registry::RegistryKey;
1814        let ctx = EngineBuilder::new()
1815            .with_event_store(InMemoryEventStore::new())
1816            .with_registry(InMemoryProcessRegistry::new())
1817            .build();
1818
1819        let p = ctx.spawn::<PingWorkflow>(TenantId::new(), WorkflowId::new("ping", "FV2024-10-01"));
1820        let tenant = p.tenant_id();
1821        let conv_key = RegistryKey::parse("conv:test-conversation-123").expect("valid key");
1822        ctx.registry()
1823            .register(tenant, &conv_key, p.identity())
1824            .await
1825            .unwrap();
1826
1827        let found = ctx
1828            .registry()
1829            .lookup(tenant, &conv_key)
1830            .await
1831            .unwrap()
1832            .expect("must be registered");
1833        let resumed = ctx.resume::<PingWorkflow>(found);
1834        assert_eq!(resumed.process_id(), p.process_id());
1835    }
1836
1837    #[test]
1838    fn pid_router_populated_by_module_register_pids() {
1839        struct PidModule;
1840        impl EngineModule for PidModule {
1841            fn name(&self) -> &'static str {
1842                "pid-module"
1843            }
1844            fn register_pids(&self, router: &mut PidRouter) {
1845                router.register(55001, "gpke-supplier-change");
1846                router.register(55002, "gpke-supplier-change");
1847            }
1848        }
1849
1850        let ctx = EngineBuilder::new()
1851            .with_event_store(InMemoryEventStore::new())
1852            .register(Box::new(PidModule))
1853            .build();
1854
1855        assert_eq!(ctx.pid_router().route(55001), Some("gpke-supplier-change"));
1856        assert_eq!(ctx.pid_router().route(55002), Some("gpke-supplier-change"));
1857        assert!(ctx.pid_router().route(99999).is_none());
1858        assert_eq!(ctx.pid_router().len(), 2);
1859    }
1860
1861    /// Verify that `register_pids_with_roles` gates PIDs behind role checks.
1862    ///
1863    /// Scenario: two modules share PID 19001.
1864    /// - ModuleA registers 19001 → "workflow-a" when role `Nb` is present.
1865    /// - ModuleB registers 19001 → "workflow-b" when role `Nmsb` is explicitly set
1866    ///   (not on `all()`).
1867    ///
1868    /// - `all()`: ModuleA fires (Nb ∈ all), ModuleB does NOT (is_all → skip).
1869    ///   → 19001 routes to "workflow-a".
1870    /// - `from_roles([Nb])`: ModuleA fires, ModuleB skips.
1871    ///   → 19001 routes to "workflow-a".
1872    /// - `from_roles([Nmsb])`: ModuleA skips, ModuleB fires.
1873    ///   → 19001 routes to "workflow-b".
1874    #[test]
1875    fn register_pids_with_roles_gates_pids_correctly() {
1876        use crate::marktrolle::{DeploymentRoles, Marktrolle};
1877
1878        struct ModuleA;
1879        impl EngineModule for ModuleA {
1880            fn name(&self) -> &'static str {
1881                "module-a"
1882            }
1883            fn register_pids_with_roles(&self, router: &mut PidRouter, roles: &DeploymentRoles) {
1884                if roles.contains(Marktrolle::Nb) {
1885                    router.register(19_001, "workflow-a");
1886                }
1887            }
1888        }
1889
1890        struct ModuleB;
1891        impl EngineModule for ModuleB {
1892            fn name(&self) -> &'static str {
1893                "module-b"
1894            }
1895            fn register_pids_with_roles(&self, router: &mut PidRouter, roles: &DeploymentRoles) {
1896                // Only fires on explicit Nmsb, not on all() (backward-compat sentinel).
1897                if !roles.is_all() && roles.contains(Marktrolle::Nmsb) {
1898                    router.register(19_001, "workflow-b");
1899                    router.register(19_015, "workflow-b");
1900                }
1901            }
1902        }
1903
1904        let build = |roles: DeploymentRoles| {
1905            EngineBuilder::new()
1906                .with_event_store(InMemoryEventStore::new())
1907                .with_deployment_roles(roles)
1908                .register(Box::new(ModuleA))
1909                .register(Box::new(ModuleB))
1910                .build()
1911        };
1912
1913        // all() → backward compat: ModuleA registers 19001 (Nb ∈ all), ModuleB skips.
1914        let ctx = build(DeploymentRoles::all());
1915        assert_eq!(ctx.pid_router().route(19_001), Some("workflow-a"));
1916        assert!(ctx.pid_router().route(19_015).is_none());
1917
1918        // Explicit Nb → same result: ModuleA registers, ModuleB (nMSB) skips.
1919        let ctx = build(DeploymentRoles::nb());
1920        assert_eq!(ctx.pid_router().route(19_001), Some("workflow-a"));
1921        assert!(ctx.pid_router().route(19_015).is_none());
1922
1923        // Explicit Nmsb → ModuleA skips (Nb ∉ roles), ModuleB registers.
1924        let ctx = build(DeploymentRoles::nmsb());
1925        assert_eq!(ctx.pid_router().route(19_001), Some("workflow-b"));
1926        assert_eq!(ctx.pid_router().route(19_015), Some("workflow-b"));
1927    }
1928
1929    /// Verify that explicit roles with two conflicting modules panic at build time.
1930    #[test]
1931    #[should_panic(expected = "overlapping PID registrations")]
1932    fn register_pids_with_roles_conflict_panics_with_explicit_roles() {
1933        use crate::marktrolle::{DeploymentRoles, Marktrolle};
1934
1935        struct ConflictA;
1936        impl EngineModule for ConflictA {
1937            fn name(&self) -> &'static str {
1938                "conflict-a"
1939            }
1940            fn register_pids_with_roles(&self, router: &mut PidRouter, roles: &DeploymentRoles) {
1941                if roles.contains(Marktrolle::Nb) {
1942                    router.register(19_001, "workflow-a");
1943                }
1944            }
1945        }
1946
1947        struct ConflictB;
1948        impl EngineModule for ConflictB {
1949            fn name(&self) -> &'static str {
1950                "conflict-b"
1951            }
1952            fn register_pids_with_roles(&self, router: &mut PidRouter, roles: &DeploymentRoles) {
1953                if !roles.is_all() && roles.contains(Marktrolle::Nmsb) {
1954                    router.register(19_001, "workflow-b"); // same PID, different workflow
1955                }
1956            }
1957        }
1958
1959        // from_roles([Nb, Nmsb]): both modules fire → panic (overlapping PIDs).
1960        let _ = EngineBuilder::new()
1961            .with_event_store(InMemoryEventStore::new())
1962            .with_deployment_roles(DeploymentRoles::from_roles([
1963                Marktrolle::Nb,
1964                Marktrolle::Nmsb,
1965            ]))
1966            .register(Box::new(ConflictA))
1967            .register(Box::new(ConflictB))
1968            .build();
1969    }
1970}