mako_engine/builder.rs
1//! [`EngineModule`] trait, [`EngineBuilder`], and [`EngineContext`].
2//!
3//! # Summary
4//!
5//! `EngineBuilder` assembles all engine infrastructure into a single
6//! [`EngineContext`] value. Domain modules (GPKE, WiM, GeLi Gas, …) register
7//! themselves at startup via the [`EngineModule`] trait, making their names
8//! visible in diagnostics and health checks.
9//!
10//! # Type-state guarantee
11//!
12//! [`EngineBuilder::build`] is only available when the event store type
13//! parameter `ES` implements [`EventStore`]. Forgetting to call
14//! [`with_event_store`] is a **compile-time error**, not a runtime panic.
15//!
16//! All other stores default to their respective `Noop` implementations:
17//!
18//! | Store | Default |
19//! |-------|---------|
20//! | Snapshot store | [`NoopSnapshotStore`] |
21//! | Outbox store | [`NoopOutboxStore`] |
22//! | Deadline store | [`NoopDeadlineStore`] |
23//! | Process registry | [`NoopProcessRegistry`] |
24//!
25//! # Assembly example
26//!
27//! ```rust,ignore
28//! use mako_engine::builder::{EngineBuilder, EngineModule};
29//! use mako_engine::event_store::InMemoryEventStore;
30//! use mako_engine::outbox::InMemoryOutboxStore;
31//! use mako_engine::deadline::InMemoryDeadlineStore;
32//! use mako_engine::registry::InMemoryProcessRegistry;
33//! use mako_engine::snapshot::InMemorySnapshotStore;
34//!
35//! struct GpkeModule;
36//! impl EngineModule for GpkeModule { fn name(&self) -> &'static str { "gpke" } }
37//!
38//! let ctx = EngineBuilder::new()
39//! .with_event_store(InMemoryEventStore::new())
40//! .with_snapshot_store(InMemorySnapshotStore::new())
41//! .with_outbox_store(InMemoryOutboxStore::new())
42//! .with_deadline_store(InMemoryDeadlineStore::new())
43//! .with_registry(InMemoryProcessRegistry::new())
44//! .register(Box::new(GpkeModule))
45//! .build();
46//!
47//! // Spawn a fresh process:
48//! let p = ctx.spawn::<SupplierChangeWorkflow>(tenant_id, workflow_id);
49//! p.execute(ReceiveUtilmd { .. }).await?;
50//!
51//! // Resume an existing process from a persisted identity:
52//! let identity = ctx.registry.lookup(&conv_id.to_string()).await?.unwrap();
53//! let p = ctx.resume::<SupplierChangeWorkflow>(identity);
54//!
55//! // Access stores for delivery workers / schedulers:
56//! let pending = ctx.outbox_store.pending_now(50).await?;
57//! let overdue = ctx.deadline_store.due_now(50).await?;
58//! ```
59//!
60//! [`with_event_store`]: EngineBuilder::with_event_store
61
62// Type-state generics can produce long signatures that trip up the
63// `type_complexity` lint; suppress it for this module only.
64#![allow(clippy::type_complexity)]
65
66use crate::{
67 dead_letter::{DeadLetterSink, LogDeadLetterSink},
68 deadline::{Deadline, DeadlineStore, NoopDeadlineStore},
69 error::EngineError,
70 event_store::EventStore,
71 ids::{ProcessIdentity, TenantId},
72 marktrolle::DeploymentRoles,
73 outbox::{NoopOutboxStore, OutboxMessage, OutboxStore},
74 pid_router::PidRouter,
75 process::Process,
76 registry::{NoopProcessRegistry, ProcessRegistry},
77 snapshot::{NoopSnapshotStore, SnapshotStore},
78 version::WorkflowId,
79 workflow::Workflow,
80};
81
82use std::sync::Arc;
83
84// ── EngineModule ──────────────────────────────────────────────────────────────
85
86/// A self-contained domain module that registers with the engine at startup.
87///
88/// Domain crates implement this trait to declare their presence in the engine.
89/// The module name is surfaced in [`EngineContext::registered_modules`] for
90/// diagnostics, health checks, and log output.
91///
92/// ## Startup validation
93///
94/// Override [`configure`] to perform adapter coverage checks at engine startup
95/// time. The engine calls [`configure`] for every registered module during
96/// [`EngineBuilder::build`] and panics with an actionable message if any
97/// module returns `Err`. This surfaces missing adapter registrations as a
98/// startup failure rather than a silent runtime error.
99///
100/// ## Example
101///
102/// ```rust,ignore
103/// pub struct GpkeModule;
104///
105/// impl EngineModule for GpkeModule {
106/// fn name(&self) -> &'static str { "gpke" }
107///
108/// fn configure(&self) -> Result<(), String> {
109/// // Validate that every known BDEW format version has an adapter:
110/// GPKE_ADAPTER_REGISTRY
111/// .validate_policy(&GpkeWorkflow::version_policy(), &KNOWN_FVS)
112/// .map_err(|uncovered| format!(
113/// "gpke: missing adapters for format versions: {:?}",
114/// uncovered
115/// ))
116/// }
117/// }
118///
119/// let ctx = EngineBuilder::new()
120/// .with_event_store(my_store)
121/// .register(Box::new(GpkeModule))
122/// .build(); // panics if GpkeModule::configure returns Err
123///
124/// assert_eq!(ctx.registered_modules(), &["gpke"]);
125/// ```
126///
127/// [`configure`]: EngineModule::configure
128pub trait EngineModule: Send + 'static {
129 /// Stable, unique name for this domain module.
130 ///
131 /// Used in diagnostics, health checks, and structured log output.
132 /// Choose a short lowercase identifier (e.g. `"gpke"`, `"wim"`,
133 /// `"geli"`).
134 fn name(&self) -> &'static str;
135
136 /// Register all PIDs this module handles into the shared [`PidRouter`].
137 ///
138 /// # Mutability contract
139 ///
140 /// This method is called **exactly once** by [`EngineBuilder::build`],
141 /// before the resulting [`EngineContext`] is handed to the caller. The
142 /// `&mut PidRouter` reference is only available here, at build time.
143 /// After `build` returns the router is **sealed** — the engine provides
144 /// only a shared `&PidRouter` reference, with no mutation path at runtime.
145 ///
146 /// Consequence: **all PIDs a module will ever need must be registered
147 /// here**. Do not attempt to register PIDs lazily from async handlers or
148 /// after the engine has started — there is no API for that by design.
149 ///
150 /// Duplicate registrations (same PID from two modules) silently overwrite
151 /// the previous mapping; the last module to register wins. Use
152 /// `cargo xtask validate-pruefids` to catch accidental PID conflicts
153 /// between modules before they reach production.
154 ///
155 /// For role-conditional registration (PIDs that should only be active for
156 /// specific BDEW Marktrollen), override [`register_pids_with_roles`] instead.
157 ///
158 /// # Example
159 ///
160 /// ```rust,ignore
161 /// fn register_pids(&self, router: &mut PidRouter) {
162 /// // GPKE Lieferantenwechsel / Lieferbeginn (BK6-22-024, PIDs 55001, 55002, 55017)
163 /// for &pid in &[55001_u32, 55002, 55017] {
164 /// router.register(pid, "gpke-supplier-change");
165 /// }
166 /// // Ex-MPES Einspeisestelle (übernommen per BK6-22-024 LFW24, ab 2025-06-06, PIDs 56001–56004)
167 /// for pid in 56001_u32..=56004 {
168 /// router.register(pid, "gpke-supplier-change");
169 /// }
170 /// }
171 /// ```
172 ///
173 /// [`register_pids_with_roles`]: EngineModule::register_pids_with_roles
174 fn register_pids(&self, _router: &mut PidRouter) {}
175
176 /// Register PIDs with role-context awareness.
177 ///
178 /// This is the **preferred override** for modules that have role-conditional
179 /// PID registrations — PIDs that should only be active when this `makod`
180 /// instance holds a specific [`Marktrolle`].
181 ///
182 /// The default implementation calls [`register_pids`] (role-agnostic) so
183 /// existing modules that override `register_pids` continue to work without
184 /// changes.
185 ///
186 /// Override this method instead of `register_pids` when any PID registration
187 /// should be conditional on the deployment role:
188 ///
189 /// ```rust,ignore
190 /// use mako_engine::marktrolle::Marktrolle;
191 ///
192 /// fn register_pids_with_roles(&self, router: &mut PidRouter, roles: &DeploymentRoles) {
193 /// // Always register: 55001, 55002 (not role-specific)
194 /// for pid in [55001_u32, 55002] { router.register_with_module(pid, "gpke-supplier-change", self.name()); }
195 ///
196 /// // Only when NB role: 19001/19002 inbound ORDRSP from MSB
197 /// if roles.contains(Marktrolle::Nb) {
198 /// for pid in [19001_u32, 19002] { router.register_with_module(pid, "gpke-konfiguration", self.name()); }
199 /// }
200 /// }
201 /// ```
202 ///
203 /// # Conflict guard
204 ///
205 /// Use [`PidRouter::register_with_module`] (not `register`) inside this
206 /// method. The conflict guard panics at build time if two modules register
207 /// the same PID to different workflows — this makes role misconfigurations
208 /// visible at startup rather than silently misrouting messages.
209 ///
210 /// [`Marktrolle`]: crate::marktrolle::Marktrolle
211 /// [`register_pids`]: EngineModule::register_pids
212 fn register_pids_with_roles(&self, router: &mut PidRouter, _roles: &DeploymentRoles) {
213 self.register_pids(router);
214 }
215
216 /// Workflow names this module handles for deadline dispatch.
217 ///
218 /// Return the same name strings that [`register_pids`] maps PIDs to.
219 /// These names are stored in [`EngineContext::registered_workflows`] and
220 /// used to validate that every workflow that has deadlines scheduled is
221 /// covered by the deadline scheduler dispatch function at runtime.
222 ///
223 /// The default implementation returns an empty slice. Override it to
224 /// declare all workflow names that may fire deadlines:
225 ///
226 /// ```rust,ignore
227 /// fn workflow_names(&self) -> &'static [&'static str] {
228 /// &["gpke-supplier-change", "gpke-abrechnung"]
229 /// }
230 /// ```
231 ///
232 /// [`register_pids`]: EngineModule::register_pids
233 /// [`EngineContext::registered_workflows`]: crate::builder::EngineContext::registered_workflows
234 fn workflow_names(&self) -> &'static [&'static str] {
235 &[]
236 }
237
238 /// Declare the EDIFACT profile types this module requires at runtime.
239 ///
240 /// Returning a non-empty slice causes [`EngineBuilder::build`] to call the
241 /// registered profile validator for each requirement. If no active profile
242 /// exists for a required message type, `build` panics with an actionable
243 /// error so deployment fails fast rather than silently.
244 ///
245 /// **This replaces the previous pattern** of calling
246 /// `edi_energy::registry::ReleaseRegistry::global()` inside `configure()`.
247 /// Domain crates no longer need `edi-energy` in their production
248 /// `[dependencies]` — they just declare their requirements here.
249 ///
250 /// ```rust,ignore
251 /// fn profile_requirements(&self) -> &'static [ProfileRequirement] {
252 /// &[
253 /// ProfileRequirement { message_type: "UTILMD", label: "UTILMD Strom (GPKE)" },
254 /// ProfileRequirement { message_type: "INVOIC", label: "INVOIC Abrechnung (GPKE)" },
255 /// ]
256 /// }
257 /// ```
258 ///
259 /// [`ProfileRequirement`]: crate::profile::ProfileRequirement
260 fn profile_requirements(&self) -> &'static [crate::profile::ProfileRequirement] {
261 &[]
262 }
263
264 /// Validate adapter coverage and configuration at engine startup.
265 ///
266 /// Called by [`EngineBuilder::build`] after all modules are registered.
267 /// Return `Ok(())` when the module is fully configured. Return `Err(msg)`
268 /// with an actionable description when an adapter or configuration is
269 /// missing — the engine will panic with that message so the deployment
270 /// fails early rather than silently.
271 ///
272 /// The default implementation is a no-op (always returns `Ok(())`).
273 /// Override it in domain crates to call
274 /// [`AdapterRegistry::validate_policy`] and emit structured errors.
275 ///
276 /// Note: if your validation needs access to the edi-energy profile
277 /// registry, use [`profile_requirements`] instead — it does not require
278 /// importing `edi-energy` in domain crates.
279 ///
280 /// [`AdapterRegistry::validate_policy`]: crate::message_adapter::AdapterRegistry::validate_policy
281 /// [`profile_requirements`]: EngineModule::profile_requirements
282 ///
283 /// # Errors
284 ///
285 /// Returns a descriptive error string when the module's configuration is invalid.
286 fn configure(&self) -> Result<(), String> {
287 Ok(())
288 }
289}
290
291// ── EngineContext ─────────────────────────────────────────────────────────────
292
293/// Assembled engine infrastructure returned by [`EngineBuilder::build`].
294///
295/// `EngineContext` bundles all stores and the process registry into a single
296/// value. It is the root dependency for:
297///
298/// - Spawning new processes ([`spawn`])
299/// - Resuming existing processes ([`resume`])
300/// - Running outbox delivery workers (`outbox_store.pending_now(…)`)
301/// - Driving the deadline scheduler (`deadline_store.due_now(…)`)
302///
303/// ## Generic parameters
304///
305/// | Param | Role | Default |
306/// |-------|------|---------|
307/// | `ES` | [`EventStore`] backend | — (required) |
308/// | `SS` | [`SnapshotStore`] backend | [`NoopSnapshotStore`] |
309/// | `OS` | [`OutboxStore`] backend | [`NoopOutboxStore`] |
310/// | `DS` | [`DeadlineStore`] backend | [`NoopDeadlineStore`] |
311/// | `PR` | [`ProcessRegistry`] backend | [`NoopProcessRegistry`] |
312///
313/// In most codebases all type parameters are inferred from the builder calls.
314///
315/// [`spawn`]: EngineContext::spawn
316/// [`resume`]: EngineContext::resume
317pub struct EngineContext<
318 ES,
319 SS = NoopSnapshotStore,
320 OS = NoopOutboxStore,
321 DS = NoopDeadlineStore,
322 PR = NoopProcessRegistry,
323> {
324 event_store: Arc<ES>,
325 snapshot_store: SS,
326 outbox_store: OS,
327 deadline_store: DS,
328 registry: PR,
329 /// Dead-letter sink for unroutable or unprocessable inbound messages.
330 ///
331 /// Stored as `Arc<dyn DeadLetterSink>` so callers can share it across
332 /// tasks without an extra type parameter on `EngineContext`.
333 pub dead_letter_sink: Arc<dyn DeadLetterSink>,
334 /// PID-to-workflow routing table, populated from all registered modules.
335 pid_router: PidRouter,
336 registered_modules: Vec<&'static str>,
337 /// Workflow names declared by all registered modules via
338 /// [`EngineModule::workflow_names`]. Used to validate deadline scheduler
339 /// coverage at runtime (see [`EngineContext::registered_workflows`]).
340 registered_workflows: Vec<&'static str>,
341}
342
343// ── Type aliases ──────────────────────────────────────────────────────────────
344
345/// An [`EngineContext`] with all optional subsystems disabled.
346///
347/// Uses `NoopSnapshotStore` and, in `testing`-enabled builds, Noop
348/// implementations for outbox, deadline, and process registry. Suitable for
349/// tests and minimal deployments where only a durable event store is required.
350///
351/// All five type parameters are inferred from context when used with
352/// [`EngineBuilder`]:
353///
354/// ```rust,ignore
355/// // Only available in test / testing-feature builds:
356/// use mako_engine::builder::{EngineBuilder, MinimalEngine};
357/// use mako_engine::event_store::InMemoryEventStore;
358///
359/// let ctx: MinimalEngine<InMemoryEventStore> = EngineBuilder::new()
360/// .with_event_store(InMemoryEventStore::new())
361/// .build();
362/// ```
363pub type MinimalEngine<ES> = EngineContext<ES>;
364
365impl<ES, SS, OS, DS, PR> EngineContext<ES, SS, OS, DS, PR>
366where
367 ES: EventStore,
368{
369 /// Spawn a new process and return a typed `Process<W, Arc<ES>>` handle.
370 ///
371 /// No `ES: Clone` bound is required — the engine stores the event store
372 /// behind an `Arc` so spawning is always a cheap pointer clone.
373 ///
374 /// ```rust,ignore
375 /// let p = ctx.spawn::<SupplierChangeWorkflow>(tenant_id, workflow_id);
376 /// p.execute(ReceiveUtilmd { .. }).await?;
377 /// ```
378 #[must_use]
379 pub fn spawn<W: Workflow>(
380 &self,
381 tenant_id: TenantId,
382 workflow_id: WorkflowId,
383 ) -> Process<W, Arc<ES>> {
384 Process::new(Arc::clone(&self.event_store), tenant_id, workflow_id)
385 }
386
387 /// Resume an existing process from a [`ProcessIdentity`].
388 ///
389 /// ```rust,ignore
390 /// let identity = ctx.registry()
391 /// .lookup(tenant_id, &conv_id.to_string())
392 /// .await?
393 /// .ok_or(EngineError::Registry("unknown conversation".into()))?;
394 /// let p = ctx.resume::<SupplierChangeWorkflow>(identity);
395 /// p.execute(HandleAperak { .. }).await?;
396 /// ```
397 #[must_use]
398 pub fn resume<W: Workflow>(&self, identity: ProcessIdentity) -> Process<W, Arc<ES>> {
399 Process::from_identity(Arc::clone(&self.event_store), identity)
400 }
401
402 /// Names of all domain modules registered with the builder, in
403 /// registration order.
404 #[must_use]
405 pub fn registered_modules(&self) -> &[&'static str] {
406 &self.registered_modules
407 }
408
409 /// Workflow names declared by all registered modules, in registration order.
410 ///
411 /// Use this in the deadline scheduler dispatch function to detect unknown
412 /// workflow names at startup. If a deadline fires for a workflow name that
413 /// is not in this list, the scheduler's dispatch function should emit an
414 /// error rather than silently dropping the deadline:
415 ///
416 /// ```rust,ignore
417 /// let known = ctx.registered_workflows().iter().copied().collect::<HashSet<_>>();
418 /// let scheduler = ctx.run_deadline_scheduler(
419 /// move |deadline| {
420 /// let wf = deadline.workflow_id().name.as_ref();
421 /// if !known.contains(wf) {
422 /// tracing::error!(workflow = %wf, "deadline fired for unregistered workflow");
423 /// return Box::pin(async { Ok(()) });
424 /// }
425 /// // dispatch by workflow name …
426 /// Box::pin(async { Ok(()) })
427 /// },
428 /// 100,
429 /// Duration::from_secs(30),
430 /// );
431 /// ```
432 #[must_use]
433 pub fn registered_workflows(&self) -> &[&'static str] {
434 &self.registered_workflows
435 }
436
437 /// The event store backend (behind an `Arc`).
438 #[must_use]
439 pub fn event_store(&self) -> &Arc<ES> {
440 &self.event_store
441 }
442
443 /// The snapshot store backend.
444 #[must_use]
445 pub fn snapshot_store(&self) -> &SS {
446 &self.snapshot_store
447 }
448
449 /// The outbox store backend.
450 ///
451 /// Poll `outbox_store().pending_now(limit)` in a background task to drain
452 /// the delivery queue.
453 #[must_use]
454 pub fn outbox_store(&self) -> &OS {
455 &self.outbox_store
456 }
457
458 /// The deadline store backend.
459 ///
460 /// Poll `deadline_store().due_now(limit)` in a background scheduler to
461 /// fire overdue process timers.
462 #[must_use]
463 pub fn deadline_store(&self) -> &DS {
464 &self.deadline_store
465 }
466
467 /// The process routing registry.
468 ///
469 /// Register a [`ProcessIdentity`] under a `(tenant_id, key)` pair at
470 /// process creation, then `lookup` it when routing inbound messages.
471 #[must_use]
472 pub fn registry(&self) -> &PR {
473 &self.registry
474 }
475
476 /// The dead-letter sink for unroutable or unprocessable messages.
477 ///
478 /// Call [`DeadLetterSink::reject`] when an inbound message cannot be
479 /// dispatched to any workflow. The default sink emits `tracing::warn!`
480 /// so rejections are always visible in the log output.
481 #[must_use]
482 pub fn dead_letter_sink(&self) -> &Arc<dyn DeadLetterSink> {
483 &self.dead_letter_sink
484 }
485
486 /// Assert that no Noop store is active — call this during production startup.
487 ///
488 /// Checks the type names of `OS`, `DS`, and `PR` against the string `"Noop"`.
489 /// Panics with a human-readable message if any match, directing the operator
490 /// to configure a persistent backend.
491 ///
492 /// # When to call
493 ///
494 /// Call this early in `makod`'s startup path (and `--check` mode) to catch
495 /// deployments where a Noop store was accidentally wired — e.g. the
496 /// `[outbox]`, `[deadline]`, or `[registry]` configuration section was
497 /// omitted from `makod.toml`. The check is defence-in-depth: in release
498 /// builds without the `testing` feature, Noop stores cannot implement the
499 /// required traits at all and the compiler would have already rejected them.
500 ///
501 /// # Panics
502 ///
503 /// Panics when any of `OS`, `DS`, or `PR` is a Noop implementation.
504 pub fn assert_production_stores(&self) {
505 let checks: &[(&str, &str)] = &[
506 ("OutboxStore", std::any::type_name::<OS>()),
507 ("DeadlineStore", std::any::type_name::<DS>()),
508 ("ProcessRegistry", std::any::type_name::<PR>()),
509 ];
510 for (trait_name, type_name) in checks {
511 assert!(
512 !type_name.contains("Noop"),
513 "makod: Noop{trait_name} is active — \
514 configure a persistent {trait_name} backend in makod.toml. \
515 Type resolved to: {type_name}"
516 );
517 }
518 }
519
520 /// The PID-to-workflow routing table.
521 ///
522 /// Populated **once** during [`EngineBuilder::build`] by calling
523 /// [`EngineModule::register_pids`] on every registered module in
524 /// registration order. After `build` returns the table is **sealed** —
525 /// it is read-only for the lifetime of the `EngineContext` and may be
526 /// freely shared across async tasks without synchronisation.
527 ///
528 /// # Mutability contract
529 ///
530 /// There is intentionally no `pid_router_mut()` accessor. Adding PIDs
531 /// after the engine is built would create a TOCTOU race between the
532 /// dispatch path (which calls `route(pid)`) and any hypothetical
533 /// concurrent mutator. Instead, register all PIDs during the build phase
534 /// via `EngineModule::register_pids`.
535 ///
536 /// If a new process family needs to be added without restarting the
537 /// binary, rebuild and restart `makod` — hot-swap of PID routing is not
538 /// supported.
539 ///
540 /// # Example — dispatch at the AS4 reception boundary
541 ///
542 /// ```rust,ignore
543 /// let workflow_name = ctx.pid_router().route(pid)
544 /// .ok_or_else(|| EngineError::Workflow(WorkflowError::InvalidCommand(
545 /// format!("no workflow registered for PID {pid}").into()
546 /// )))?;
547 ///
548 /// match workflow_name {
549 /// "gpke-supplier-change" => dispatch::<GpkeSupplierChangeWorkflow>(&ctx, pid, payload).await,
550 /// "wim-device-change" => dispatch::<WimDeviceChangeWorkflow>(&ctx, pid, payload).await,
551 /// other => Err(EngineError::Workflow(WorkflowError::InvalidCommand(
552 /// format!("unhandled workflow name: {other}").into()
553 /// ))),
554 /// }
555 /// ```
556 #[must_use]
557 pub fn pid_router(&self) -> &PidRouter {
558 &self.pid_router
559 }
560}
561
562// ── As4Sender ─────────────────────────────────────────────────────────────────
563
564/// Sends a single AS4 / EDIINT-over-HTTP outbound message.
565///
566/// Implement this trait for your AS4 gateway client and pass it to
567/// [`EngineContext::run_outbox_worker`].
568///
569/// # Contract
570///
571/// Return `Ok(())` only after the message has been **durably accepted** by the
572/// receiving MSH. Return `Err(…)` on transient or permanent failure — the
573/// outbox worker calls [`OutboxStore::reschedule`] so the message is retried.
574pub trait As4Sender: Send + Sync + 'static {
575 /// Transmit `msg` and return when the remote MSH has accepted it.
576 fn send(
577 &self,
578 msg: &OutboxMessage,
579 ) -> impl std::future::Future<Output = Result<(), EngineError>> + Send;
580}
581
582// ── OutboxWorker ──────────────────────────────────────────────────────────────
583
584/// A background worker that drains the outbox by polling pending
585/// [`OutboxMessage`]s and dispatching them via an [`As4Sender`].
586///
587/// Obtain via [`EngineContext::run_outbox_worker`] and drive by spawning
588/// [`OutboxWorker::run`] in a Tokio task.
589///
590/// # Polling behaviour
591///
592/// When the poll returns an empty batch the worker sleeps for `poll_interval`
593/// before polling again. Non-empty batches are processed immediately.
594///
595/// # Error handling
596///
597/// Successful sends are acknowledged via [`OutboxStore::acknowledge`].
598/// Failed sends are rescheduled via [`OutboxStore::reschedule`] using
599/// **full-jitter exponential backoff**: `delay = rand(0, min(MAX, BASE * 2^n))`
600/// where `n = attempt_count`. This avoids thundering-herd when multiple
601/// `makod` instances restart simultaneously after a receiver outage.
602///
603/// When `attempt_count >= max_attempts`, the message is **acknowledged** (removed
604/// from the outbox) and a [`DeadLetterReason::OutboxExhausted`] record is written
605/// to the dead-letter sink. This prevents permanently-undeliverable messages
606/// from clogging the outbox forever.
607///
608/// All errors are emitted as structured `tracing` events at `warn` / `error`
609/// level rather than `eprintln!`, so they appear in the application's log
610/// pipeline with full context (message_id, error).
611///
612/// # Example
613///
614/// ```rust,ignore
615/// use std::time::Duration;
616///
617/// let worker = ctx.run_outbox_worker(my_sender, 50, Duration::from_secs(1));
618/// tokio::spawn(async move { worker.run().await });
619/// ```
620///
621/// [`DeadLetterReason::OutboxExhausted`]: crate::dead_letter::DeadLetterReason::OutboxExhausted
622pub struct OutboxWorker<OS: OutboxStore, S: As4Sender> {
623 store: OS,
624 sender: S,
625 batch_size: usize,
626 poll_interval: std::time::Duration,
627 /// Maximum total delivery attempts before a message is dead-lettered.
628 ///
629 /// Default: 48 (covers ~4 hours at the 300 s backoff cap).
630 /// Set to `u32::MAX` to disable the cap (not recommended for production).
631 max_attempts: u32,
632 /// Sink for messages that exceed `max_attempts`.
633 dead_letter_sink: std::sync::Arc<dyn crate::dead_letter::DeadLetterSink>,
634}
635
636/// Compute a full-jitter exponential backoff delay.
637///
638/// `attempt` is the number of prior attempts (0 = first retry).
639/// `entropy` provides randomness; derive from a stable message identifier
640/// (e.g. hash of `message_id`) rather than the current timestamp — a
641/// timestamp-derived value is deterministic within a single batch, which
642/// defeats jitter when multiple messages fail simultaneously.
643///
644/// | attempt | window (s) | expected delay (s) |
645/// |---------|------------|-------------------|
646/// | 0 | 5 | 2.5 |
647/// | 1 | 10 | 5 |
648/// | 2 | 20 | 10 |
649/// | 3 | 40 | 20 |
650/// | 4 | 80 | 40 |
651/// | 5+ | 300 (cap) | 150 |
652fn backoff_delay(attempt: u32, entropy: u64) -> std::time::Duration {
653 const BASE_SECS: u64 = 5;
654 const MAX_SECS: u64 = 300;
655 // Exponential window: BASE * 2^attempt, capped at MAX.
656 let window = BASE_SECS
657 .saturating_mul(1u64.wrapping_shl(attempt.min(5)))
658 .min(MAX_SECS);
659 // Full jitter: uniform random in [0, window).
660 let jitter_secs = if window == 0 { 0 } else { entropy % window };
661 std::time::Duration::from_secs(jitter_secs)
662}
663
664impl<OS: OutboxStore, S: As4Sender> OutboxWorker<OS, S> {
665 /// Run the outbox drain loop until the task is cancelled.
666 ///
667 /// # Panics
668 ///
669 /// Panics if `time::Duration::try_from(delay)` overflows (unreachable for
670 /// the delay values produced by `backoff_delay`).
671 #[allow(clippy::too_many_lines)]
672 pub async fn run(self) {
673 loop {
674 let batch = match self.store.pending_now(self.batch_size).await {
675 Ok(b) => b,
676 Err(e) => {
677 tracing::warn!(error = %e, "outbox worker: store error polling pending messages (will retry)");
678 tokio::time::sleep(self.poll_interval).await;
679 continue;
680 }
681 };
682
683 if batch.is_empty() {
684 tokio::time::sleep(self.poll_interval).await;
685 continue;
686 }
687
688 for msg in batch {
689 // ── Max-attempt cap ───────────────────────────────────
690 // `attempt_count` starts at 0 and is incremented on each
691 // `reschedule` call. When it reaches `max_attempts` the
692 // message is considered permanently undeliverable: acknowledge
693 // it (remove from outbox) and dead-letter it so the regulatory
694 // audit trail is preserved.
695 if msg.attempt_count >= self.max_attempts {
696 tracing::error!(
697 message_id = %msg.message_id,
698 message_type = %msg.message_type,
699 recipient = %msg.recipient,
700 attempts = msg.attempt_count,
701 max_attempts = self.max_attempts,
702 "outbox worker: max delivery attempts reached; dead-lettering message",
703 );
704 self.dead_letter_sink.reject(
705 &crate::dead_letter::DeadLetterReason::OutboxExhausted {
706 message_id: msg.message_id,
707 message_type: msg.message_type.to_string(),
708 recipient: msg.recipient.to_string(),
709 last_error: format!(
710 "delivery exhausted after {} attempts",
711 msg.attempt_count
712 ),
713 attempts: msg.attempt_count,
714 },
715 );
716 if let Err(e) = self.store.acknowledge(msg.message_id).await {
717 tracing::error!(
718 message_id = %msg.message_id,
719 error = %e,
720 "outbox worker: acknowledge after exhaust failed; message may reappear",
721 );
722 }
723 continue;
724 }
725
726 match self.sender.send(&msg).await {
727 Ok(()) => {
728 if let Err(e) = self.store.acknowledge(msg.message_id).await {
729 tracing::warn!(
730 message_id = %msg.message_id,
731 error = %e,
732 "outbox worker: acknowledge failed",
733 );
734 }
735 // CONTRL AHB 1.0 §1.2: the CONTRL must be delivered
736 // within 6 wall-clock hours of interchange receipt.
737 // `msg.created_at` is when the PendingOutbox was
738 // materialised (which should equal the ingest timestamp
739 // for transport-layer CONTRL obligations).
740 if msg.message_type.as_ref() == "CONTRL" {
741 let elapsed = time::OffsetDateTime::now_utc() - msg.created_at;
742 if elapsed > time::Duration::hours(crate::fristen::CONTRL_FRIST_HOURS) {
743 tracing::warn!(
744 message_id = %msg.message_id,
745 elapsed_secs = elapsed.whole_seconds(),
746 max_secs = crate::fristen::CONTRL_FRIST_HOURS * 3600,
747 "outbox worker: CONTRL delivered OUTSIDE the 6h Übertragungsfrist \
748 (CONTRL AHB 1.0 §1.2) — this is a BNetzA compliance violation"
749 );
750 }
751 }
752 }
753 // Permanent error: dead-letter immediately without retrying.
754 // PartnerUnknown requires operator intervention (add --as4-partner);
755 // Serialization errors will never succeed on retry.
756 Err(ref e)
757 if e.is_partner_unknown() || matches!(e, EngineError::Serialization(_)) =>
758 {
759 tracing::error!(
760 message_id = %msg.message_id,
761 message_type = %msg.message_type,
762 recipient = %msg.recipient,
763 error = %e,
764 "outbox worker: permanent send failure; dead-lettering without retry",
765 );
766 self.dead_letter_sink.reject(
767 &crate::dead_letter::DeadLetterReason::OutboxExhausted {
768 message_id: msg.message_id,
769 message_type: msg.message_type.to_string(),
770 recipient: msg.recipient.to_string(),
771 last_error: e.to_string(),
772 attempts: msg.attempt_count,
773 },
774 );
775 if let Err(re) = self.store.acknowledge(msg.message_id).await {
776 tracing::error!(
777 message_id = %msg.message_id,
778 error = %re,
779 "outbox worker: acknowledge after permanent failure failed",
780 );
781 }
782 }
783 Err(e) => {
784 // Stable jitter entropy derived from the UUID bytes of
785 // `message_id`. Using the last 8 bytes as a `u64` gives
786 // uniform entropy across message IDs (UUIDs are random in
787 // all 128 bits for v4) and is stable across Rust versions —
788 // unlike `DefaultHasher`, whose algorithm is explicitly
789 // documented as unstable.
790 let entropy = {
791 let uuid = msg.message_id.as_uuid();
792 let bytes = uuid.as_bytes();
793 u64::from_le_bytes(bytes[8..16].try_into().unwrap())
794 };
795 let delay = backoff_delay(msg.attempt_count, entropy);
796 let retry_at = time::OffsetDateTime::now_utc()
797 + time::Duration::try_from(delay).unwrap_or(time::Duration::minutes(5));
798 tracing::warn!(
799 message_id = %msg.message_id,
800 attempt = msg.attempt_count,
801 max_attempts = self.max_attempts,
802 retry_in = ?delay,
803 error = %e,
804 "outbox worker: send failed; rescheduling with backoff",
805 );
806 if let Err(re) = self.store.reschedule(msg.message_id, retry_at).await {
807 tracing::error!(
808 message_id = %msg.message_id,
809 error = %re,
810 "outbox worker: reschedule failed; message may be stuck",
811 );
812 }
813 }
814 }
815 }
816 }
817 }
818}
819
820impl<ES, SS, OS, DS, PR> EngineContext<ES, SS, OS, DS, PR>
821where
822 ES: EventStore,
823 OS: OutboxStore + Clone,
824{
825 /// Construct an [`OutboxWorker`] that drains the outbox via `sender`.
826 ///
827 /// `batch_size` — messages fetched per poll cycle.
828 /// `poll_interval` — sleep duration when the batch is empty.
829 ///
830 /// `max_attempts` — maximum total delivery attempts before dead-lettering.
831 /// Pass `48` for a ~4-hour retry budget at the 300 s backoff cap, or
832 /// `u32::MAX` to disable the cap (not recommended for production).
833 ///
834 /// ```rust,ignore
835 /// use std::time::Duration;
836 ///
837 /// let worker = ctx.run_outbox_worker(my_sender, 50, Duration::from_secs(1), 48);
838 /// tokio::spawn(async move { worker.run().await });
839 /// ```
840 #[must_use]
841 pub fn run_outbox_worker<S: As4Sender>(
842 &self,
843 sender: S,
844 batch_size: usize,
845 poll_interval: std::time::Duration,
846 max_attempts: u32,
847 ) -> OutboxWorker<OS, S> {
848 OutboxWorker {
849 store: self.outbox_store.clone(),
850 sender,
851 batch_size,
852 poll_interval,
853 max_attempts,
854 dead_letter_sink: self.dead_letter_sink.clone(),
855 }
856 }
857}
858
859impl<ES, SS, OS, DS, PR> std::fmt::Debug for EngineContext<ES, SS, OS, DS, PR>
860where
861 ES: std::fmt::Debug,
862 SS: std::fmt::Debug,
863 OS: std::fmt::Debug,
864 DS: std::fmt::Debug,
865 PR: std::fmt::Debug,
866{
867 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
868 f.debug_struct("EngineContext")
869 .field("registered_modules", &self.registered_modules)
870 .field("registered_workflows", &self.registered_workflows)
871 .field("pid_router_len", &self.pid_router.len())
872 .finish_non_exhaustive()
873 }
874}
875
876// ── NoopAs4Sender / LogAs4Sender ──────────────────────────────────────────────
877
878/// An [`As4Sender`] that succeeds immediately without sending anything.
879///
880/// Use in tests and environments where outbound AS4 delivery is not yet
881/// wired. All outbox messages are acknowledged (removed from the queue)
882/// without being transmitted.
883///
884/// # ⚠️ Data loss warning
885///
886/// Every outbox message is **silently discarded** — no EDIFACT message is
887/// sent to any counterparty. Do not use in production.
888#[derive(Debug, Clone, Copy, Default)]
889#[must_use = "NoopAs4Sender discards all outbound messages silently — use a real AS4 gateway in production"]
890pub struct NoopAs4Sender;
891
892impl As4Sender for NoopAs4Sender {
893 async fn send(&self, _msg: &OutboxMessage) -> Result<(), EngineError> {
894 Ok(())
895 }
896}
897
898/// An [`As4Sender`] that logs every outbound message at `warn` level and
899/// succeeds without transmitting.
900///
901/// Useful for development and integration-testing environments where the
902/// full AS4 stack is not yet available but message visibility is desired.
903/// All outbox messages are acknowledged (removed from the queue) after logging.
904///
905/// # ⚠️ Data loss warning
906///
907/// No EDIFACT message is sent to any counterparty. Do not use in production.
908#[derive(Debug, Clone, Copy, Default)]
909#[must_use = "LogAs4Sender discards all outbound messages — use a real AS4 gateway in production"]
910pub struct LogAs4Sender;
911
912impl As4Sender for LogAs4Sender {
913 async fn send(&self, msg: &OutboxMessage) -> Result<(), EngineError> {
914 tracing::warn!(
915 message_id = %msg.message_id,
916 message_type = %msg.message_type,
917 recipient = %msg.recipient,
918 "LogAs4Sender: outbox message dropped — configure a real AS4 gateway for production",
919 );
920 Ok(())
921 }
922}
923
924// ── DeadlineScheduler ─────────────────────────────────────────────────────────
925
926/// A background task that polls [`DeadlineStore::due_now`] and dispatches
927/// deadline commands to the owning processes via a caller-supplied function.
928///
929/// Obtain via [`EngineContext::run_deadline_scheduler`] and drive by spawning
930/// [`DeadlineScheduler::run`] in a Tokio task.
931///
932/// # Dispatch function
933///
934/// The `dispatch` function receives a fired [`Deadline`] and returns a future
935/// that dispatches the appropriate timeout command to the process. The function
936/// is responsible for resuming the correct workflow and calling `execute`.
937/// After the future completes, the scheduler cancels the deadline from the
938/// store regardless of the dispatch outcome (to prevent re-firing).
939///
940/// ```rust,ignore
941/// use std::time::Duration;
942///
943/// let scheduler = ctx.run_deadline_scheduler(
944/// |deadline| async move {
945/// tracing::warn!(
946/// deadline_id = %deadline.deadline_id(),
947/// label = %deadline.label(),
948/// "deadline fired",
949/// );
950/// Ok(())
951/// },
952/// 100,
953/// Duration::from_secs(30),
954/// );
955/// tokio::spawn(async move { scheduler.run().await });
956/// ```
957pub struct DeadlineScheduler<DS: DeadlineStore> {
958 store: DS,
959 dispatch: Box<
960 dyn Fn(
961 Deadline,
962 ) -> std::pin::Pin<
963 Box<dyn std::future::Future<Output = Result<(), EngineError>> + Send>,
964 > + Send
965 + Sync,
966 >,
967 batch_size: usize,
968 poll_interval: std::time::Duration,
969}
970
971impl<DS: DeadlineStore> DeadlineScheduler<DS> {
972 /// Run the deadline poll loop until the task is cancelled.
973 pub async fn run(self) {
974 loop {
975 let result = match self.store.due_now(self.batch_size).await {
976 Ok(r) => r,
977 Err(e) => {
978 tracing::warn!(
979 error = %e,
980 "deadline scheduler: store error polling due deadlines (will retry)",
981 );
982 tokio::time::sleep(self.poll_interval).await;
983 continue;
984 }
985 };
986
987 if result.deadlines.is_empty() {
988 tokio::time::sleep(self.poll_interval).await;
989 continue;
990 }
991
992 for deadline in result.deadlines {
993 let id = deadline.deadline_id();
994 let label = deadline.label().to_owned();
995 let should_cancel = match (self.dispatch)(deadline).await {
996 Ok(()) => true,
997 Err(ref e) if e.is_version_conflict() => {
998 // The process was modified concurrently; the timeout
999 // command will be retried on the next poll cycle.
1000 // Do NOT cancel — let the deadline remain due so it
1001 // fires again until a non-conflict dispatch succeeds.
1002 tracing::warn!(
1003 deadline_id = %id,
1004 label = %label,
1005 "deadline scheduler: VersionConflict; will retry on next poll",
1006 );
1007 false
1008 }
1009 Err(e) => {
1010 tracing::warn!(
1011 deadline_id = %id,
1012 label = %label,
1013 error = %e,
1014 "deadline scheduler: dispatch failed (permanent); cancelling",
1015 );
1016 true
1017 }
1018 };
1019 if should_cancel {
1020 if let Err(e) = self.store.cancel(id).await {
1021 tracing::error!(
1022 deadline_id = %id,
1023 error = %e,
1024 "deadline scheduler: cancel failed; deadline may fire again",
1025 );
1026 }
1027 }
1028 }
1029
1030 // If has_more, loop immediately to drain the batch.
1031 }
1032 }
1033}
1034
1035impl<ES, SS, OS, DS, PR> EngineContext<ES, SS, OS, DS, PR>
1036where
1037 ES: EventStore,
1038 DS: DeadlineStore + Clone,
1039{
1040 /// Construct a [`DeadlineScheduler`] that polls the deadline store and
1041 /// dispatches fired deadlines via `dispatch`.
1042 ///
1043 /// The `dispatch` function is called for every fired deadline. It should
1044 /// resume the owning process and execute the appropriate timeout command.
1045 ///
1046 /// `batch_size` — deadlines fetched per poll cycle.
1047 /// `poll_interval` — sleep duration when no deadlines are due.
1048 ///
1049 /// ```rust,ignore
1050 /// use std::time::Duration;
1051 ///
1052 /// let scheduler = ctx.run_deadline_scheduler(
1053 /// |d| async move {
1054 /// tracing::info!(label = %d.label(), "firing deadline");
1055 /// Ok(())
1056 /// },
1057 /// 100,
1058 /// Duration::from_secs(30),
1059 /// );
1060 /// tokio::spawn(async move { scheduler.run().await });
1061 /// ```
1062 #[must_use]
1063 pub fn run_deadline_scheduler<F, Fut>(
1064 &self,
1065 dispatch: F,
1066 batch_size: usize,
1067 poll_interval: std::time::Duration,
1068 ) -> DeadlineScheduler<DS>
1069 where
1070 F: Fn(Deadline) -> Fut + Send + Sync + 'static,
1071 Fut: std::future::Future<Output = Result<(), EngineError>> + Send + 'static,
1072 {
1073 DeadlineScheduler {
1074 store: self.deadline_store.clone(),
1075 dispatch: Box::new(move |d| Box::pin(dispatch(d))),
1076 batch_size,
1077 poll_interval,
1078 }
1079 }
1080}
1081
1082// ── EngineBuilder ─────────────────────────────────────────────────────────────
1083
1084/// Assembles engine infrastructure and produces an [`EngineContext`].
1085///
1086/// Uses type-state to enforce that an event store is provided before
1087/// [`build`] can be called. All other stores default to `Noop`
1088/// implementations.
1089///
1090/// ## Quick start
1091///
1092/// ```rust,ignore
1093/// // Minimal — event store only, all others are Noop:
1094/// let ctx = EngineBuilder::new()
1095/// .with_event_store(InMemoryEventStore::new())
1096/// .build();
1097///
1098/// // Full infrastructure:
1099/// let ctx = EngineBuilder::new()
1100/// .with_event_store(InMemoryEventStore::new())
1101/// .with_snapshot_store(InMemorySnapshotStore::new())
1102/// .with_outbox_store(InMemoryOutboxStore::new())
1103/// .with_deadline_store(InMemoryDeadlineStore::new())
1104/// .with_registry(InMemoryProcessRegistry::new())
1105/// .register(Box::new(GpkeModule))
1106/// .build();
1107/// ```
1108///
1109/// [`build`]: EngineBuilder::build
1110pub struct EngineBuilder<
1111 ES = (),
1112 SS = NoopSnapshotStore,
1113 OS = NoopOutboxStore,
1114 DS = NoopDeadlineStore,
1115 PR = NoopProcessRegistry,
1116> {
1117 event_store: ES,
1118 snapshot_store: SS,
1119 outbox_store: OS,
1120 deadline_store: DS,
1121 registry: PR,
1122 dead_letter_sink: Arc<dyn DeadLetterSink>,
1123 modules: Vec<Box<dyn EngineModule>>,
1124 /// Active [`DeploymentRoles`] for this engine instance.
1125 ///
1126 /// Controls role-conditional PID registration via
1127 /// [`EngineModule::register_pids_with_roles`]. Defaults to
1128 /// [`DeploymentRoles::all()`] for backward compatibility.
1129 deployment_roles: DeploymentRoles,
1130 /// Optional profile validator injected by `makod` or callers that have
1131 /// access to `edi-energy`. When `Some`, called for each
1132 /// [`ProfileRequirement`] declared by registered modules. When `None`,
1133 /// profile requirements are not validated (safe in unit tests).
1134 ///
1135 /// Signature: `fn(message_type: &str) -> bool`
1136 ///
1137 /// [`ProfileRequirement`]: crate::profile::ProfileRequirement
1138 profile_validator: Option<Box<dyn Fn(&str) -> bool + Send + Sync>>,
1139}
1140#[cfg(any(test, feature = "testing"))]
1141impl Default
1142 for EngineBuilder<
1143 (),
1144 NoopSnapshotStore,
1145 NoopOutboxStore,
1146 NoopDeadlineStore,
1147 NoopProcessRegistry,
1148 >
1149{
1150 fn default() -> Self {
1151 Self {
1152 event_store: (),
1153 snapshot_store: NoopSnapshotStore,
1154 outbox_store: NoopOutboxStore,
1155 deadline_store: NoopDeadlineStore,
1156 registry: NoopProcessRegistry,
1157 dead_letter_sink: Arc::new(LogDeadLetterSink),
1158 modules: Vec::new(),
1159 deployment_roles: DeploymentRoles::all(),
1160 profile_validator: None,
1161 }
1162 }
1163}
1164
1165#[cfg(any(test, feature = "testing"))]
1166impl EngineBuilder {
1167 /// Create a new builder with all `Noop` defaults.
1168 ///
1169 /// Only available in `#[cfg(test)]` or with the `testing` feature enabled,
1170 /// because the Noop defaults silently discard outbox messages, deadlines,
1171 /// and process registry entries. Production binaries must wire real stores
1172 /// via the `with_*` builder methods.
1173 ///
1174 /// Call [`with_event_store`] before [`build`] — the event store is
1175 /// **required**.
1176 ///
1177 /// [`with_event_store`]: EngineBuilder::with_event_store
1178 /// [`build`]: EngineBuilder::build
1179 #[must_use]
1180 pub fn new() -> Self {
1181 Self::default()
1182 }
1183}
1184
1185impl<OS, DS, PR> EngineBuilder<(), NoopSnapshotStore, OS, DS, PR>
1186where
1187 OS: OutboxStore,
1188 DS: DeadlineStore,
1189 PR: ProcessRegistry,
1190{
1191 /// Create a production-ready builder with explicit stores for outbox,
1192 /// deadline, and process registry.
1193 ///
1194 /// This constructor is available in all build configurations including
1195 /// production binaries. It enforces that the three stores that can cause
1196 /// silent data loss (`OutboxStore`, `DeadlineStore`, `ProcessRegistry`)
1197 /// are provided explicitly — there is no Noop fallback.
1198 ///
1199 /// `NoopSnapshotStore` is used as the snapshot default because it is safe
1200 /// for production: skipping snapshots means full replay, but no data loss.
1201 /// Override with [`with_snapshot_store`] to enable snapshot-accelerated
1202 /// replay.
1203 ///
1204 /// Call [`with_event_store`] before [`build`] — the event store is
1205 /// **required**.
1206 ///
1207 /// ```rust,ignore
1208 /// let ctx = EngineBuilder::with_stores(outbox, deadline, registry)
1209 /// .with_event_store(store.clone())
1210 /// .with_snapshot_store(InMemorySnapshotStore::new())
1211 /// .build();
1212 /// ```
1213 ///
1214 /// [`with_snapshot_store`]: EngineBuilder::with_snapshot_store
1215 /// [`with_event_store`]: EngineBuilder::with_event_store
1216 /// [`build`]: EngineBuilder::build
1217 #[must_use]
1218 pub fn with_stores(outbox_store: OS, deadline_store: DS, registry: PR) -> Self {
1219 Self {
1220 event_store: (),
1221 snapshot_store: NoopSnapshotStore,
1222 outbox_store,
1223 deadline_store,
1224 registry,
1225 dead_letter_sink: Arc::new(LogDeadLetterSink),
1226 modules: Vec::new(),
1227 deployment_roles: DeploymentRoles::all(),
1228 profile_validator: None,
1229 }
1230 }
1231}
1232
1233impl<ES, SS, OS, DS, PR> EngineBuilder<ES, SS, OS, DS, PR> {
1234 /// Set the event store. **Required** — `build()` is only available once
1235 /// this has been called with a type that implements [`EventStore`].
1236 ///
1237 /// Replaces any previously set event store (type-state transition).
1238 #[must_use]
1239 pub fn with_event_store<ES2: EventStore>(
1240 self,
1241 store: ES2,
1242 ) -> EngineBuilder<ES2, SS, OS, DS, PR> {
1243 EngineBuilder {
1244 event_store: store,
1245 snapshot_store: self.snapshot_store,
1246 outbox_store: self.outbox_store,
1247 deadline_store: self.deadline_store,
1248 registry: self.registry,
1249 dead_letter_sink: self.dead_letter_sink,
1250 modules: self.modules,
1251 deployment_roles: self.deployment_roles,
1252 profile_validator: self.profile_validator,
1253 }
1254 }
1255
1256 /// Set the snapshot store (default: [`NoopSnapshotStore`]).
1257 #[must_use]
1258 pub fn with_snapshot_store<SS2: SnapshotStore>(
1259 self,
1260 store: SS2,
1261 ) -> EngineBuilder<ES, SS2, OS, DS, PR> {
1262 EngineBuilder {
1263 event_store: self.event_store,
1264 snapshot_store: store,
1265 outbox_store: self.outbox_store,
1266 deadline_store: self.deadline_store,
1267 registry: self.registry,
1268 dead_letter_sink: self.dead_letter_sink,
1269 modules: self.modules,
1270 deployment_roles: self.deployment_roles,
1271 profile_validator: self.profile_validator,
1272 }
1273 }
1274
1275 /// Set the outbox store (default: [`NoopOutboxStore`]).
1276 #[must_use]
1277 pub fn with_outbox_store<OS2: OutboxStore>(
1278 self,
1279 store: OS2,
1280 ) -> EngineBuilder<ES, SS, OS2, DS, PR> {
1281 EngineBuilder {
1282 event_store: self.event_store,
1283 snapshot_store: self.snapshot_store,
1284 outbox_store: store,
1285 deadline_store: self.deadline_store,
1286 registry: self.registry,
1287 dead_letter_sink: self.dead_letter_sink,
1288 modules: self.modules,
1289 deployment_roles: self.deployment_roles,
1290 profile_validator: self.profile_validator,
1291 }
1292 }
1293
1294 /// Set the deadline store (default: [`NoopDeadlineStore`]).
1295 #[must_use]
1296 pub fn with_deadline_store<DS2: DeadlineStore>(
1297 self,
1298 store: DS2,
1299 ) -> EngineBuilder<ES, SS, OS, DS2, PR> {
1300 EngineBuilder {
1301 event_store: self.event_store,
1302 snapshot_store: self.snapshot_store,
1303 outbox_store: self.outbox_store,
1304 deadline_store: store,
1305 registry: self.registry,
1306 dead_letter_sink: self.dead_letter_sink,
1307 modules: self.modules,
1308 deployment_roles: self.deployment_roles,
1309 profile_validator: self.profile_validator,
1310 }
1311 }
1312
1313 /// Set the process registry (default: [`NoopProcessRegistry`]).
1314 #[must_use]
1315 pub fn with_registry<PR2: ProcessRegistry>(
1316 self,
1317 registry: PR2,
1318 ) -> EngineBuilder<ES, SS, OS, DS, PR2> {
1319 EngineBuilder {
1320 event_store: self.event_store,
1321 snapshot_store: self.snapshot_store,
1322 outbox_store: self.outbox_store,
1323 deadline_store: self.deadline_store,
1324 registry,
1325 dead_letter_sink: self.dead_letter_sink,
1326 modules: self.modules,
1327 deployment_roles: self.deployment_roles,
1328 profile_validator: self.profile_validator,
1329 }
1330 }
1331
1332 /// Set the dead-letter sink (default: [`LogDeadLetterSink`]).
1333 ///
1334 /// The dead-letter sink receives every message that cannot be routed to a
1335 /// workflow. The default [`LogDeadLetterSink`] emits `tracing::warn!`
1336 /// events, making rejections visible in log output without configuration.
1337 ///
1338 /// Override with a persistent DLQ implementation in production:
1339 ///
1340 /// ```rust,ignore
1341 /// use mako_engine::dead_letter::LogDeadLetterSink;
1342 ///
1343 /// let ctx = EngineBuilder::new()
1344 /// .with_event_store(my_store)
1345 /// .with_dead_letter_sink(MyPersistentDlq::new())
1346 /// .build();
1347 /// ```
1348 ///
1349 /// [`LogDeadLetterSink`]: crate::dead_letter::LogDeadLetterSink
1350 #[must_use]
1351 pub fn with_dead_letter_sink(mut self, sink: impl DeadLetterSink) -> Self {
1352 self.dead_letter_sink = Arc::new(sink);
1353 self
1354 }
1355
1356 /// Register an `edi-energy` profile validator for startup profile checks.
1357 ///
1358 /// The closure receives a message-type string (e.g. `"UTILMD"`) and must
1359 /// return `true` if at least one active profile for that message type is
1360 /// registered for today's date.
1361 ///
1362 /// Wire this in `makod` using the `edi-energy` global registry:
1363 ///
1364 /// ```rust,ignore
1365 /// use edi_energy::registry::ReleaseRegistry;
1366 ///
1367 /// let today = time::OffsetDateTime::now_utc().date();
1368 /// builder.with_profile_validator(move |msg_type| {
1369 /// ReleaseRegistry::global()
1370 /// .profiles_for_str(msg_type)
1371 /// .any(|p| match (p.valid_from(), p.valid_until()) {
1372 /// (Some(f), Some(u)) => f <= today && today <= u,
1373 /// (Some(f), None) => f <= today,
1374 /// (None, _) => true,
1375 /// })
1376 /// })
1377 /// ```
1378 ///
1379 /// Domain crates do **not** need to call this — they only declare
1380 /// [`profile_requirements`].
1381 ///
1382 /// [`profile_requirements`]: EngineModule::profile_requirements
1383 #[must_use]
1384 pub fn with_profile_validator(
1385 mut self,
1386 validator: impl Fn(&str) -> bool + Send + Sync + 'static,
1387 ) -> Self {
1388 self.profile_validator = Some(Box::new(validator));
1389 self
1390 }
1391
1392 /// Register a domain module.
1393 ///
1394 /// The module name becomes visible in
1395 /// [`EngineContext::registered_modules`] after [`build`] is called.
1396 ///
1397 /// [`build`]: EngineBuilder::build
1398 #[must_use]
1399 pub fn register(mut self, module: Box<dyn EngineModule>) -> Self {
1400 self.modules.push(module);
1401 self
1402 }
1403
1404 /// Set the active [`DeploymentRoles`] for this engine instance.
1405 ///
1406 /// Controls role-conditional PID registration in [`EngineModule::register_pids_with_roles`].
1407 ///
1408 /// The default is [`DeploymentRoles::all()`], which registers every PID unconditionally
1409 /// — identical to the pre-role-aware behavior. Providing an explicit role set
1410 /// restricts role-conditional blocks to only the declared roles:
1411 ///
1412 /// - **NB-only** (`DeploymentRoles::nb()`): 19001/19002 route to `gpke-konfiguration`;
1413 /// WiM nMSB blocks are skipped.
1414 /// - **nMSB-only** (`DeploymentRoles::nmsb()`): 19001/19002 route to `wim-geraeteubernahme`;
1415 /// GPKE NB blocks are skipped.
1416 /// - **NB + gMSB** (`DeploymentRoles::nb_msb()`): most common Stadtwerke combination.
1417 ///
1418 /// # Conflict guard
1419 ///
1420 /// When two modules would register the same PID to **different** workflows, the
1421 /// engine panics during [`build`]. Set explicit roles to prevent both modules from
1422 /// activating the same PID simultaneously:
1423 ///
1424 /// ```rust,ignore
1425 /// use mako_engine::marktrolle::DeploymentRoles;
1426 ///
1427 /// let ctx = EngineBuilder::with_stores(outbox, deadline, registry)
1428 /// .with_event_store(store)
1429 /// .with_deployment_roles(DeploymentRoles::nb()) // only NB: GPKE gets 19001/19002
1430 /// .register(Box::new(GpkeModule))
1431 /// .register(Box::new(WimModule)) // nMSB block skipped — no conflict
1432 /// .build();
1433 /// ```
1434 ///
1435 /// [`build`]: EngineBuilder::build
1436 #[must_use]
1437 pub fn with_deployment_roles(mut self, roles: DeploymentRoles) -> Self {
1438 self.deployment_roles = roles;
1439 self
1440 }
1441}
1442
1443impl<ES, SS, OS, DS, PR> EngineBuilder<ES, SS, OS, DS, PR>
1444where
1445 ES: EventStore,
1446 SS: SnapshotStore,
1447 OS: OutboxStore,
1448 DS: DeadlineStore,
1449 PR: ProcessRegistry,
1450{
1451 /// Build the [`EngineContext`].
1452 ///
1453 /// Consumes the builder. All registered modules and configured stores are
1454 /// moved into the returned [`EngineContext`].
1455 ///
1456 /// This method is only available when `ES` implements [`EventStore`].
1457 /// If you have not called [`with_event_store`], this will not compile.
1458 ///
1459 /// # Panics
1460 ///
1461 /// Panics when any registered module returns `Err` from
1462 /// [`EngineModule::configure`]. The panic message includes the module
1463 /// name and the error string so the deployment failure is actionable.
1464 ///
1465 /// [`with_event_store`]: EngineBuilder::with_event_store
1466 #[must_use]
1467 #[allow(clippy::too_many_lines)]
1468 pub fn build(self) -> EngineContext<ES, SS, OS, DS, PR> {
1469 // Warn when Noop stores are active — they are fine for testing but
1470 // silently lose data in production. Checking type names is the only
1471 // approach that works with the generic type-state builder without adding
1472 // a new marker trait or changing the public API.
1473 //
1474 // Gate: always emit in `testing` / `#[cfg(test)]` builds so that test
1475 // harnesses see the warning in their log output. Also emit when the
1476 // `tracing` feature is enabled (explicit instrumentation mode). In
1477 // release production builds without the `testing` feature, Noop stores
1478 // are not constructible at all (the struct definitions are cfg-gated),
1479 // so this branch is dead code and will be optimised away by the compiler.
1480 #[cfg(any(test, feature = "testing", feature = "tracing"))]
1481 {
1482 let ss_name = std::any::type_name::<SS>();
1483 let os_name = std::any::type_name::<OS>();
1484 let ds_name = std::any::type_name::<DS>();
1485 let pr_name = std::any::type_name::<PR>();
1486 if ss_name.contains("NoopSnapshotStore") {
1487 tracing::warn!(
1488 store = ss_name,
1489 "EngineBuilder: NoopSnapshotStore is active — snapshots will not be persisted. \
1490 Use SlateDbStore::as_snapshot_store() in production."
1491 );
1492 }
1493 if os_name.contains("NoopOutboxStore") {
1494 tracing::warn!(
1495 store = os_name,
1496 "EngineBuilder: NoopOutboxStore is active — outbound messages will be silently \
1497 discarded. Use SlateDbStore::as_outbox_store() in production."
1498 );
1499 }
1500 if ds_name.contains("NoopDeadlineStore") {
1501 tracing::warn!(
1502 store = ds_name,
1503 "EngineBuilder: NoopDeadlineStore is active — scheduled deadlines will not fire \
1504 after restart. Use SlateDbStore::as_deadline_store() in production."
1505 );
1506 }
1507 if pr_name.contains("NoopProcessRegistry") {
1508 tracing::warn!(
1509 store = pr_name,
1510 "EngineBuilder: NoopProcessRegistry is active — process routing will be lost on \
1511 restart. Use SlateDbStore::as_process_registry() in production."
1512 );
1513 }
1514 }
1515 // Validate every module before assembling the context.
1516 // A missing adapter or misconfigured module fails at startup (not at
1517 // first inbound message), making deployment failures observable immediately.
1518 for module in &self.modules {
1519 if let Err(msg) = module.configure() {
1520 panic!(
1521 "EngineBuilder::build: module '{}' failed configuration validation: {}",
1522 module.name(),
1523 msg
1524 );
1525 }
1526 // Validate profile requirements via the injected validator.
1527 // Domain crates declare requirements; only the binary crate (makod)
1528 // injects the edi-energy registry — domain crates need no edi-energy
1529 // import for this check.
1530 if let Some(ref validator) = self.profile_validator {
1531 for req in module.profile_requirements() {
1532 assert!(
1533 validator(req.message_type),
1534 "EngineBuilder::build: module '{}' requires an active edi-energy \
1535 profile for '{}' ({}) but none is registered for today's date. \
1536 Run `cargo xtask codegen` to add the missing profile.",
1537 module.name(),
1538 req.message_type,
1539 req.label,
1540 );
1541 }
1542 }
1543 }
1544 // Build the PID router from all registered modules.
1545 // Also assert that no two modules claim the same PID — a PID overlap
1546 // is always a configuration error: one module's messages would be
1547 // silently swallowed by another's workflow, producing missing-process
1548 // errors or incorrect audit trails.
1549 let mut pid_router = PidRouter::new();
1550 let mut pid_owners: std::collections::HashMap<u32, &str> = std::collections::HashMap::new();
1551 for module in &self.modules {
1552 // Temporarily build a scratch router to read this module's PIDs
1553 // for cross-module overlap detection (module-ownership level).
1554 let mut scratch = PidRouter::new();
1555 module.register_pids_with_roles(&mut scratch, &self.deployment_roles);
1556 for pid in scratch.registered_pids() {
1557 if let Some(prev) = pid_owners.insert(pid, module.name()) {
1558 if self.deployment_roles.is_all() {
1559 // With DeploymentRoles::all() (the default), role-conditional PIDs
1560 // are registered by all modules that claim them, producing last-wins
1561 // semantics. This is acceptable for single-role and dev/test deployments.
1562 //
1563 // In production multi-role deployments where both an NB and nMSB role
1564 // are served by the same instance, set explicit roles via
1565 // `EngineBuilder::with_deployment_roles` to prevent silent misrouting.
1566 //
1567 // We emit a debug-level log here (not warn) because the vast majority
1568 // of deployments are single-role and this overlap is expected/harmless.
1569 #[cfg(feature = "tracing")]
1570 tracing::debug!(
1571 pid,
1572 previous_module = prev,
1573 current_module = module.name(),
1574 "PID registered by multiple modules with DeploymentRoles::all(); \
1575 last module wins (use with_deployment_roles for strict routing)",
1576 );
1577 let _ = prev; // suppress unused-variable warning when tracing is off
1578 } else {
1579 panic!(
1580 "EngineBuilder::build: PID {pid} is claimed by both \
1581 '{prev}' and '{}' — overlapping PID registrations are \
1582 not allowed with explicit DeploymentRoles; each PID must be \
1583 owned by exactly one module.\n \
1584 Hint: use EngineBuilder::with_deployment_roles to restrict \
1585 role-conditional PIDs so only one module registers each PID.\n \
1586 Example: with_deployment_roles(DeploymentRoles::nb()) ensures \
1587 GPKE registers ORDRSP 19001/19002, not WiM.",
1588 module.name(),
1589 );
1590 }
1591 }
1592 }
1593 // Register into the real router with conflict detection.
1594 module.register_pids_with_roles(&mut pid_router, &self.deployment_roles);
1595 }
1596 let registered_modules = self.modules.iter().map(|m| m.name()).collect();
1597 let registered_workflows = self
1598 .modules
1599 .iter()
1600 .flat_map(|m| m.workflow_names().iter().copied())
1601 .collect();
1602 EngineContext {
1603 event_store: Arc::new(self.event_store),
1604 snapshot_store: self.snapshot_store,
1605 outbox_store: self.outbox_store,
1606 deadline_store: self.deadline_store,
1607 registry: self.registry,
1608 dead_letter_sink: self.dead_letter_sink,
1609 pid_router,
1610 registered_modules,
1611 registered_workflows,
1612 }
1613 }
1614}
1615
1616#[cfg(test)]
1617mod tests {
1618 use super::*;
1619 use crate::{
1620 deadline::InMemoryDeadlineStore,
1621 error::WorkflowError,
1622 event_store::InMemoryEventStore,
1623 ids::TenantId,
1624 outbox::InMemoryOutboxStore,
1625 pid_router::PidRouter,
1626 registry::InMemoryProcessRegistry,
1627 snapshot::InMemorySnapshotStore,
1628 version::WorkflowId,
1629 workflow::{CommandPayload, EventPayload, Workflow},
1630 };
1631
1632 // ── Minimal workflow for spawn/resume tests ───────────────────────────────
1633
1634 #[derive(serde::Serialize, serde::Deserialize)]
1635 struct PingEvent;
1636
1637 impl EventPayload for PingEvent {
1638 fn event_type(&self) -> &'static str {
1639 "Ping"
1640 }
1641 }
1642
1643 struct PingCommand;
1644
1645 impl CommandPayload for PingCommand {}
1646
1647 #[derive(Default, Clone)]
1648 struct PingState;
1649
1650 struct PingWorkflow;
1651
1652 impl Workflow for PingWorkflow {
1653 type State = PingState;
1654 type Event = PingEvent;
1655 type Command = PingCommand;
1656
1657 fn apply(state: PingState, _: &PingEvent) -> PingState {
1658 state
1659 }
1660
1661 fn handle(
1662 _: &PingState,
1663 _: PingCommand,
1664 ) -> Result<crate::workflow::WorkflowOutput<PingEvent>, WorkflowError> {
1665 Ok(vec![PingEvent].into())
1666 }
1667 }
1668
1669 struct TestModule;
1670
1671 impl EngineModule for TestModule {
1672 fn name(&self) -> &'static str {
1673 "test-module"
1674 }
1675 }
1676
1677 // ── Tests ─────────────────────────────────────────────────────────────────
1678
1679 #[test]
1680 fn build_with_event_store_only() {
1681 let ctx = EngineBuilder::new()
1682 .with_event_store(InMemoryEventStore::new())
1683 .build();
1684 assert!(ctx.registered_modules().is_empty());
1685 }
1686
1687 #[test]
1688 fn build_with_all_stores_and_module() {
1689 let ctx = EngineBuilder::new()
1690 .with_event_store(InMemoryEventStore::new())
1691 .with_snapshot_store(InMemorySnapshotStore::new())
1692 .with_outbox_store(InMemoryOutboxStore::new())
1693 .with_deadline_store(InMemoryDeadlineStore::new())
1694 .with_registry(InMemoryProcessRegistry::new())
1695 .register(Box::new(TestModule))
1696 .build();
1697 assert_eq!(ctx.registered_modules(), &["test-module"]);
1698 }
1699
1700 #[test]
1701 fn multiple_modules_ordered() {
1702 struct ModA;
1703 impl EngineModule for ModA {
1704 fn name(&self) -> &'static str {
1705 "mod-a"
1706 }
1707 }
1708 struct ModB;
1709 impl EngineModule for ModB {
1710 fn name(&self) -> &'static str {
1711 "mod-b"
1712 }
1713 }
1714
1715 let ctx = EngineBuilder::new()
1716 .with_event_store(InMemoryEventStore::new())
1717 .register(Box::new(ModA))
1718 .register(Box::new(ModB))
1719 .build();
1720 assert_eq!(ctx.registered_modules(), &["mod-a", "mod-b"]);
1721 }
1722
1723 #[tokio::test]
1724 async fn spawn_creates_independent_processes() {
1725 let ctx = EngineBuilder::new()
1726 .with_event_store(InMemoryEventStore::new())
1727 .build();
1728 let wf_id = WorkflowId::new("ping", "FV2024-10-01");
1729
1730 let p1 = ctx.spawn::<PingWorkflow>(TenantId::new(), wf_id.clone());
1731 let p2 = ctx.spawn::<PingWorkflow>(TenantId::new(), wf_id);
1732
1733 assert_ne!(p1.process_id(), p2.process_id());
1734 }
1735
1736 #[tokio::test]
1737 async fn resume_sees_previously_appended_events() {
1738 let store = InMemoryEventStore::new();
1739 let ctx = EngineBuilder::new().with_event_store(store).build();
1740
1741 let p = ctx.spawn::<PingWorkflow>(TenantId::new(), WorkflowId::new("ping", "FV2024-10-01"));
1742 p.execute(PingCommand).await.unwrap();
1743
1744 let identity = p.identity();
1745 let resumed = ctx.resume::<PingWorkflow>(identity);
1746 assert_eq!(resumed.event_count().await.unwrap(), 1);
1747 }
1748
1749 #[tokio::test]
1750 async fn registry_routes_process_via_conversation_key() {
1751 use crate::registry::RegistryKey;
1752 let ctx = EngineBuilder::new()
1753 .with_event_store(InMemoryEventStore::new())
1754 .with_registry(InMemoryProcessRegistry::new())
1755 .build();
1756
1757 let p = ctx.spawn::<PingWorkflow>(TenantId::new(), WorkflowId::new("ping", "FV2024-10-01"));
1758 let tenant = p.tenant_id();
1759 let conv_key = RegistryKey::parse("conv:test-conversation-123").expect("valid key");
1760 ctx.registry()
1761 .register(tenant, &conv_key, p.identity())
1762 .await
1763 .unwrap();
1764
1765 let found = ctx
1766 .registry()
1767 .lookup(tenant, &conv_key)
1768 .await
1769 .unwrap()
1770 .expect("must be registered");
1771 let resumed = ctx.resume::<PingWorkflow>(found);
1772 assert_eq!(resumed.process_id(), p.process_id());
1773 }
1774
1775 #[test]
1776 fn pid_router_populated_by_module_register_pids() {
1777 struct PidModule;
1778 impl EngineModule for PidModule {
1779 fn name(&self) -> &'static str {
1780 "pid-module"
1781 }
1782 fn register_pids(&self, router: &mut PidRouter) {
1783 router.register(55001, "gpke-supplier-change");
1784 router.register(55002, "gpke-supplier-change");
1785 }
1786 }
1787
1788 let ctx = EngineBuilder::new()
1789 .with_event_store(InMemoryEventStore::new())
1790 .register(Box::new(PidModule))
1791 .build();
1792
1793 assert_eq!(ctx.pid_router().route(55001), Some("gpke-supplier-change"));
1794 assert_eq!(ctx.pid_router().route(55002), Some("gpke-supplier-change"));
1795 assert!(ctx.pid_router().route(99999).is_none());
1796 assert_eq!(ctx.pid_router().len(), 2);
1797 }
1798
1799 /// Verify that `register_pids_with_roles` gates PIDs behind role checks.
1800 ///
1801 /// Scenario: two modules share PID 19001.
1802 /// - ModuleA registers 19001 → "workflow-a" when role `Nb` is present.
1803 /// - ModuleB registers 19001 → "workflow-b" when role `Nmsb` is explicitly set
1804 /// (not on `all()`).
1805 ///
1806 /// - `all()`: ModuleA fires (Nb ∈ all), ModuleB does NOT (is_all → skip).
1807 /// → 19001 routes to "workflow-a".
1808 /// - `from_roles([Nb])`: ModuleA fires, ModuleB skips.
1809 /// → 19001 routes to "workflow-a".
1810 /// - `from_roles([Nmsb])`: ModuleA skips, ModuleB fires.
1811 /// → 19001 routes to "workflow-b".
1812 #[test]
1813 fn register_pids_with_roles_gates_pids_correctly() {
1814 use crate::marktrolle::{DeploymentRoles, Marktrolle};
1815
1816 struct ModuleA;
1817 impl EngineModule for ModuleA {
1818 fn name(&self) -> &'static str {
1819 "module-a"
1820 }
1821 fn register_pids_with_roles(&self, router: &mut PidRouter, roles: &DeploymentRoles) {
1822 if roles.contains(Marktrolle::Nb) {
1823 router.register(19_001, "workflow-a");
1824 }
1825 }
1826 }
1827
1828 struct ModuleB;
1829 impl EngineModule for ModuleB {
1830 fn name(&self) -> &'static str {
1831 "module-b"
1832 }
1833 fn register_pids_with_roles(&self, router: &mut PidRouter, roles: &DeploymentRoles) {
1834 // Only fires on explicit Nmsb, not on all() (backward-compat sentinel).
1835 if !roles.is_all() && roles.contains(Marktrolle::Nmsb) {
1836 router.register(19_001, "workflow-b");
1837 router.register(19_015, "workflow-b");
1838 }
1839 }
1840 }
1841
1842 let build = |roles: DeploymentRoles| {
1843 EngineBuilder::new()
1844 .with_event_store(InMemoryEventStore::new())
1845 .with_deployment_roles(roles)
1846 .register(Box::new(ModuleA))
1847 .register(Box::new(ModuleB))
1848 .build()
1849 };
1850
1851 // all() → backward compat: ModuleA registers 19001 (Nb ∈ all), ModuleB skips.
1852 let ctx = build(DeploymentRoles::all());
1853 assert_eq!(ctx.pid_router().route(19_001), Some("workflow-a"));
1854 assert!(ctx.pid_router().route(19_015).is_none());
1855
1856 // Explicit Nb → same result: ModuleA registers, ModuleB (nMSB) skips.
1857 let ctx = build(DeploymentRoles::nb());
1858 assert_eq!(ctx.pid_router().route(19_001), Some("workflow-a"));
1859 assert!(ctx.pid_router().route(19_015).is_none());
1860
1861 // Explicit Nmsb → ModuleA skips (Nb ∉ roles), ModuleB registers.
1862 let ctx = build(DeploymentRoles::nmsb());
1863 assert_eq!(ctx.pid_router().route(19_001), Some("workflow-b"));
1864 assert_eq!(ctx.pid_router().route(19_015), Some("workflow-b"));
1865 }
1866
1867 /// Verify that explicit roles with two conflicting modules panic at build time.
1868 #[test]
1869 #[should_panic(expected = "overlapping PID registrations")]
1870 fn register_pids_with_roles_conflict_panics_with_explicit_roles() {
1871 use crate::marktrolle::{DeploymentRoles, Marktrolle};
1872
1873 struct ConflictA;
1874 impl EngineModule for ConflictA {
1875 fn name(&self) -> &'static str {
1876 "conflict-a"
1877 }
1878 fn register_pids_with_roles(&self, router: &mut PidRouter, roles: &DeploymentRoles) {
1879 if roles.contains(Marktrolle::Nb) {
1880 router.register(19_001, "workflow-a");
1881 }
1882 }
1883 }
1884
1885 struct ConflictB;
1886 impl EngineModule for ConflictB {
1887 fn name(&self) -> &'static str {
1888 "conflict-b"
1889 }
1890 fn register_pids_with_roles(&self, router: &mut PidRouter, roles: &DeploymentRoles) {
1891 if !roles.is_all() && roles.contains(Marktrolle::Nmsb) {
1892 router.register(19_001, "workflow-b"); // same PID, different workflow
1893 }
1894 }
1895 }
1896
1897 // from_roles([Nb, Nmsb]): both modules fire → panic (overlapping PIDs).
1898 let _ = EngineBuilder::new()
1899 .with_event_store(InMemoryEventStore::new())
1900 .with_deployment_roles(DeploymentRoles::from_roles([
1901 Marktrolle::Nb,
1902 Marktrolle::Nmsb,
1903 ]))
1904 .register(Box::new(ConflictA))
1905 .register(Box::new(ConflictB))
1906 .build();
1907 }
1908}