mako_engine/builder.rs
1//! [`EngineModule`] trait, [`EngineBuilder`], and [`EngineContext`].
2//!
3//! # Summary
4//!
5//! `EngineBuilder` assembles all engine infrastructure into a single
6//! [`EngineContext`] value. Domain modules (GPKE, WiM, GeLi Gas, …) register
7//! themselves at startup via the [`EngineModule`] trait, making their names
8//! visible in diagnostics and health checks.
9//!
10//! # Type-state guarantee
11//!
12//! [`EngineBuilder::build`] is only available when the event store type
13//! parameter `ES` implements [`EventStore`]. Forgetting to call
14//! [`with_event_store`] is a **compile-time error**, not a runtime panic.
15//!
16//! All other stores default to their respective `Noop` implementations:
17//!
18//! | Store | Default |
19//! |-------|---------|
20//! | Snapshot store | [`NoopSnapshotStore`] |
21//! | Outbox store | [`NoopOutboxStore`] |
22//! | Deadline store | [`NoopDeadlineStore`] |
23//! | Process registry | [`NoopProcessRegistry`] |
24//!
25//! # Assembly example
26//!
27//! ```rust,ignore
28//! use mako_engine::builder::{EngineBuilder, EngineModule};
29//! use mako_engine::event_store::InMemoryEventStore;
30//! use mako_engine::outbox::InMemoryOutboxStore;
31//! use mako_engine::deadline::InMemoryDeadlineStore;
32//! use mako_engine::registry::InMemoryProcessRegistry;
33//! use mako_engine::snapshot::InMemorySnapshotStore;
34//!
35//! struct GpkeModule;
36//! impl EngineModule for GpkeModule { fn name(&self) -> &'static str { "gpke" } }
37//!
38//! let ctx = EngineBuilder::new()
39//! .with_event_store(InMemoryEventStore::new())
40//! .with_snapshot_store(InMemorySnapshotStore::new())
41//! .with_outbox_store(InMemoryOutboxStore::new())
42//! .with_deadline_store(InMemoryDeadlineStore::new())
43//! .with_registry(InMemoryProcessRegistry::new())
44//! .register(Box::new(GpkeModule))
45//! .build();
46//!
47//! // Spawn a fresh process:
48//! let p = ctx.spawn::<SupplierChangeWorkflow>(tenant_id, workflow_id);
49//! p.execute(ReceiveUtilmd { .. }).await?;
50//!
51//! // Resume an existing process from a persisted identity:
52//! let identity = ctx.registry.lookup(&conv_id.to_string()).await?.unwrap();
53//! let p = ctx.resume::<SupplierChangeWorkflow>(identity);
54//!
55//! // Access stores for delivery workers / schedulers:
56//! let pending = ctx.outbox_store.pending_now(50).await?;
57//! let overdue = ctx.deadline_store.due_now(50).await?;
58//! ```
59//!
60//! [`with_event_store`]: EngineBuilder::with_event_store
61
62// Type-state generics can produce long signatures that trip up the
63// `type_complexity` lint; suppress it for this module only.
64#![allow(clippy::type_complexity)]
65
66use crate::{
67 dead_letter::{DeadLetterSink, LogDeadLetterSink},
68 deadline::{Deadline, DeadlineStore, NoopDeadlineStore},
69 error::EngineError,
70 event_store::EventStore,
71 ids::{ProcessIdentity, TenantId},
72 marktrolle::DeploymentRoles,
73 outbox::{NoopOutboxStore, OutboxMessage, OutboxStore},
74 pid_router::PidRouter,
75 process::Process,
76 registry::{NoopProcessRegistry, ProcessRegistry},
77 snapshot::{NoopSnapshotStore, SnapshotStore},
78 version::WorkflowId,
79 workflow::Workflow,
80};
81
82use std::sync::Arc;
83
84// ── EngineModule ──────────────────────────────────────────────────────────────
85
86/// A self-contained domain module that registers with the engine at startup.
87///
88/// Domain crates implement this trait to declare their presence in the engine.
89/// The module name is surfaced in [`EngineContext::registered_modules`] for
90/// diagnostics, health checks, and log output.
91///
92/// ## Startup validation
93///
94/// Override [`configure`] to perform adapter coverage checks at engine startup
95/// time. The engine calls [`configure`] for every registered module during
96/// [`EngineBuilder::build`] and panics with an actionable message if any
97/// module returns `Err`. This surfaces missing adapter registrations as a
98/// startup failure rather than a silent runtime error.
99///
100/// ## Example
101///
102/// ```rust,ignore
103/// pub struct GpkeModule;
104///
105/// impl EngineModule for GpkeModule {
106/// fn name(&self) -> &'static str { "gpke" }
107///
108/// fn configure(&self) -> Result<(), String> {
109/// // Validate that every known BDEW format version has an adapter:
110/// GPKE_ADAPTER_REGISTRY
111/// .validate_policy(&GpkeWorkflow::version_policy(), &KNOWN_FVS)
112/// .map_err(|uncovered| format!(
113/// "gpke: missing adapters for format versions: {:?}",
114/// uncovered
115/// ))
116/// }
117/// }
118///
119/// let ctx = EngineBuilder::new()
120/// .with_event_store(my_store)
121/// .register(Box::new(GpkeModule))
122/// .build(); // panics if GpkeModule::configure returns Err
123///
124/// assert_eq!(ctx.registered_modules(), &["gpke"]);
125/// ```
126///
127/// [`configure`]: EngineModule::configure
128pub trait EngineModule: Send + 'static {
129 /// Stable, unique name for this domain module.
130 ///
131 /// Used in diagnostics, health checks, and structured log output.
132 /// Choose a short lowercase identifier (e.g. `"gpke"`, `"wim"`,
133 /// `"geli"`).
134 fn name(&self) -> &'static str;
135
136 /// Register all PIDs this module handles into the shared [`PidRouter`].
137 ///
138 /// # Mutability contract
139 ///
140 /// This method is called **exactly once** by [`EngineBuilder::build`],
141 /// before the resulting [`EngineContext`] is handed to the caller. The
142 /// `&mut PidRouter` reference is only available here, at build time.
143 /// After `build` returns the router is **sealed** — the engine provides
144 /// only a shared `&PidRouter` reference, with no mutation path at runtime.
145 ///
146 /// Consequence: **all PIDs a module will ever need must be registered
147 /// here**. Do not attempt to register PIDs lazily from async handlers or
148 /// after the engine has started — there is no API for that by design.
149 ///
150 /// Duplicate registrations (same PID from two modules) silently overwrite
151 /// the previous mapping; the last module to register wins. Use
152 /// `cargo xtask validate-pruefids` to catch accidental PID conflicts
153 /// between modules before they reach production.
154 ///
155 /// For role-conditional registration (PIDs that should only be active for
156 /// specific BDEW Marktrollen), override [`register_pids_with_roles`] instead.
157 ///
158 /// # Example
159 ///
160 /// ```rust,ignore
161 /// fn register_pids(&self, router: &mut PidRouter) {
162 /// // GPKE Lieferantenwechsel / Lieferbeginn (BK6-22-024, PIDs 55001, 55002, 55017)
163 /// for &pid in &[55001_u32, 55002, 55017] {
164 /// router.register(pid, "gpke-supplier-change");
165 /// }
166 /// }
167 /// ```
168 ///
169 /// [`register_pids_with_roles`]: EngineModule::register_pids_with_roles
170 fn register_pids(&self, _router: &mut PidRouter) {}
171
172 /// Register PIDs with role-context awareness.
173 ///
174 /// This is the **preferred override** for modules that have role-conditional
175 /// PID registrations — PIDs that should only be active when this `makod`
176 /// instance holds a specific [`Marktrolle`].
177 ///
178 /// The default implementation calls [`register_pids`] (role-agnostic) so
179 /// existing modules that override `register_pids` continue to work without
180 /// changes.
181 ///
182 /// Override this method instead of `register_pids` when any PID registration
183 /// should be conditional on the deployment role:
184 ///
185 /// ```rust,ignore
186 /// use mako_engine::marktrolle::Marktrolle;
187 ///
188 /// fn register_pids_with_roles(&self, router: &mut PidRouter, roles: &DeploymentRoles) {
189 /// // Always register: 55001, 55002 (not role-specific)
190 /// for pid in [55001_u32, 55002] { router.register_with_module(pid, "gpke-supplier-change", self.name()); }
191 ///
192 /// // Only when NB role: 19001/19002 inbound ORDRSP from MSB
193 /// if roles.contains(Marktrolle::Nb) {
194 /// for pid in [19001_u32, 19002] { router.register_with_module(pid, "gpke-konfiguration", self.name()); }
195 /// }
196 /// }
197 /// ```
198 ///
199 /// # Conflict guard
200 ///
201 /// Use [`PidRouter::register_with_module`] (not `register`) inside this
202 /// method. The conflict guard panics at build time if two modules register
203 /// the same PID to different workflows — this makes role misconfigurations
204 /// visible at startup rather than silently misrouting messages.
205 ///
206 /// [`Marktrolle`]: crate::marktrolle::Marktrolle
207 /// [`register_pids`]: EngineModule::register_pids
208 fn register_pids_with_roles(&self, router: &mut PidRouter, _roles: &DeploymentRoles) {
209 self.register_pids(router);
210 }
211
212 /// Workflow names this module handles for deadline dispatch.
213 ///
214 /// Return the same name strings that [`register_pids`] maps PIDs to.
215 /// These names are stored in [`EngineContext::registered_workflows`] and
216 /// used to validate that every workflow that has deadlines scheduled is
217 /// covered by the deadline scheduler dispatch function at runtime.
218 ///
219 /// The default implementation returns an empty slice. Override it to
220 /// declare all workflow names that may fire deadlines:
221 ///
222 /// ```rust,ignore
223 /// fn workflow_names(&self) -> &'static [&'static str] {
224 /// &["gpke-supplier-change", "gpke-abrechnung"]
225 /// }
226 /// ```
227 ///
228 /// [`register_pids`]: EngineModule::register_pids
229 /// [`EngineContext::registered_workflows`]: crate::builder::EngineContext::registered_workflows
230 fn workflow_names(&self) -> &'static [&'static str] {
231 &[]
232 }
233
234 /// Declare the EDIFACT profile types this module requires at runtime.
235 ///
236 /// Returning a non-empty slice causes [`EngineBuilder::build`] to call the
237 /// registered profile validator for each requirement. If no active profile
238 /// exists for a required message type, `build` panics with an actionable
239 /// error so deployment fails fast rather than silently.
240 ///
241 /// **This replaces the previous pattern** of calling
242 /// `edi_energy::registry::ReleaseRegistry::global()` inside `configure()`.
243 /// Domain crates no longer need `edi-energy` in their production
244 /// `[dependencies]` — they just declare their requirements here.
245 ///
246 /// ```rust,ignore
247 /// fn profile_requirements(&self) -> &'static [ProfileRequirement] {
248 /// &[
249 /// ProfileRequirement { message_type: "UTILMD", label: "UTILMD Strom (GPKE)" },
250 /// ProfileRequirement { message_type: "INVOIC", label: "INVOIC Abrechnung (GPKE)" },
251 /// ]
252 /// }
253 /// ```
254 ///
255 /// [`ProfileRequirement`]: crate::profile::ProfileRequirement
256 fn profile_requirements(&self) -> &'static [crate::profile::ProfileRequirement] {
257 &[]
258 }
259
260 /// Validate adapter coverage and configuration at engine startup.
261 ///
262 /// Called by [`EngineBuilder::build`] after all modules are registered.
263 /// Return `Ok(())` when the module is fully configured. Return `Err(msg)`
264 /// with an actionable description when an adapter or configuration is
265 /// missing — the engine will panic with that message so the deployment
266 /// fails early rather than silently.
267 ///
268 /// The default implementation is a no-op (always returns `Ok(())`).
269 /// Override it in domain crates to call
270 /// [`AdapterRegistry::validate_policy`] and emit structured errors.
271 ///
272 /// Note: if your validation needs access to the edi-energy profile
273 /// registry, use [`profile_requirements`] instead — it does not require
274 /// importing `edi-energy` in domain crates.
275 ///
276 /// [`AdapterRegistry::validate_policy`]: crate::message_adapter::AdapterRegistry::validate_policy
277 /// [`profile_requirements`]: EngineModule::profile_requirements
278 ///
279 /// # Errors
280 ///
281 /// Returns a descriptive error string when the module's configuration is invalid.
282 fn configure(&self) -> Result<(), String> {
283 Ok(())
284 }
285}
286
287// ── EngineContext ─────────────────────────────────────────────────────────────
288
289/// Assembled engine infrastructure returned by [`EngineBuilder::build`].
290///
291/// `EngineContext` bundles all stores and the process registry into a single
292/// value. It is the root dependency for:
293///
294/// - Spawning new processes ([`spawn`])
295/// - Resuming existing processes ([`resume`])
296/// - Running outbox delivery workers (`outbox_store.pending_now(…)`)
297/// - Driving the deadline scheduler (`deadline_store.due_now(…)`)
298///
299/// ## Generic parameters
300///
301/// | Param | Role | Default |
302/// |-------|------|---------|
303/// | `ES` | [`EventStore`] backend | — (required) |
304/// | `SS` | [`SnapshotStore`] backend | [`NoopSnapshotStore`] |
305/// | `OS` | [`OutboxStore`] backend | [`NoopOutboxStore`] |
306/// | `DS` | [`DeadlineStore`] backend | [`NoopDeadlineStore`] |
307/// | `PR` | [`ProcessRegistry`] backend | [`NoopProcessRegistry`] |
308///
309/// In most codebases all type parameters are inferred from the builder calls.
310///
311/// [`spawn`]: EngineContext::spawn
312/// [`resume`]: EngineContext::resume
313pub struct EngineContext<
314 ES,
315 SS = NoopSnapshotStore,
316 OS = NoopOutboxStore,
317 DS = NoopDeadlineStore,
318 PR = NoopProcessRegistry,
319> {
320 event_store: Arc<ES>,
321 snapshot_store: SS,
322 outbox_store: OS,
323 deadline_store: DS,
324 registry: PR,
325 /// Dead-letter sink for unroutable or unprocessable inbound messages.
326 ///
327 /// Stored as `Arc<dyn DeadLetterSink>` so callers can share it across
328 /// tasks without an extra type parameter on `EngineContext`.
329 pub dead_letter_sink: Arc<dyn DeadLetterSink>,
330 /// PID-to-workflow routing table, populated from all registered modules.
331 pid_router: PidRouter,
332 registered_modules: Vec<&'static str>,
333 /// Workflow names declared by all registered modules via
334 /// [`EngineModule::workflow_names`]. Used to validate deadline scheduler
335 /// coverage at runtime (see [`EngineContext::registered_workflows`]).
336 registered_workflows: Vec<&'static str>,
337}
338
339// ── Type aliases ──────────────────────────────────────────────────────────────
340
341/// An [`EngineContext`] with all optional subsystems disabled.
342///
343/// Uses `NoopSnapshotStore` and, in `testing`-enabled builds, Noop
344/// implementations for outbox, deadline, and process registry. Suitable for
345/// tests and minimal deployments where only a durable event store is required.
346///
347/// All five type parameters are inferred from context when used with
348/// [`EngineBuilder`]:
349///
350/// ```rust,ignore
351/// // Only available in test / testing-feature builds:
352/// use mako_engine::builder::{EngineBuilder, MinimalEngine};
353/// use mako_engine::event_store::InMemoryEventStore;
354///
355/// let ctx: MinimalEngine<InMemoryEventStore> = EngineBuilder::new()
356/// .with_event_store(InMemoryEventStore::new())
357/// .build();
358/// ```
359pub type MinimalEngine<ES> = EngineContext<ES>;
360
361impl<ES, SS, OS, DS, PR> EngineContext<ES, SS, OS, DS, PR>
362where
363 ES: EventStore,
364{
365 /// Spawn a new process and return a typed `Process<W, Arc<ES>>` handle.
366 ///
367 /// No `ES: Clone` bound is required — the engine stores the event store
368 /// behind an `Arc` so spawning is always a cheap pointer clone.
369 ///
370 /// ```rust,ignore
371 /// let p = ctx.spawn::<SupplierChangeWorkflow>(tenant_id, workflow_id);
372 /// p.execute(ReceiveUtilmd { .. }).await?;
373 /// ```
374 #[must_use]
375 pub fn spawn<W: Workflow>(
376 &self,
377 tenant_id: TenantId,
378 workflow_id: WorkflowId,
379 ) -> Process<W, Arc<ES>> {
380 Process::new(Arc::clone(&self.event_store), tenant_id, workflow_id)
381 }
382
383 /// Resume an existing process from a [`ProcessIdentity`].
384 ///
385 /// ```rust,ignore
386 /// let identity = ctx.registry()
387 /// .lookup(tenant_id, &conv_id.to_string())
388 /// .await?
389 /// .ok_or(EngineError::Registry("unknown conversation".into()))?;
390 /// let p = ctx.resume::<SupplierChangeWorkflow>(identity);
391 /// p.execute(HandleAperak { .. }).await?;
392 /// ```
393 #[must_use]
394 pub fn resume<W: Workflow>(&self, identity: ProcessIdentity) -> Process<W, Arc<ES>> {
395 Process::from_identity(Arc::clone(&self.event_store), identity)
396 }
397
398 /// Names of all domain modules registered with the builder, in
399 /// registration order.
400 #[must_use]
401 pub fn registered_modules(&self) -> &[&'static str] {
402 &self.registered_modules
403 }
404
405 /// Workflow names declared by all registered modules, in registration order.
406 ///
407 /// Use this in the deadline scheduler dispatch function to detect unknown
408 /// workflow names at startup. If a deadline fires for a workflow name that
409 /// is not in this list, the scheduler's dispatch function should emit an
410 /// error rather than silently dropping the deadline:
411 ///
412 /// ```rust,ignore
413 /// let known = ctx.registered_workflows().iter().copied().collect::<HashSet<_>>();
414 /// let scheduler = ctx.run_deadline_scheduler(
415 /// move |deadline| {
416 /// let wf = deadline.workflow_id().name.as_ref();
417 /// if !known.contains(wf) {
418 /// tracing::error!(workflow = %wf, "deadline fired for unregistered workflow");
419 /// return Box::pin(async { Ok(()) });
420 /// }
421 /// // dispatch by workflow name …
422 /// Box::pin(async { Ok(()) })
423 /// },
424 /// 100,
425 /// Duration::from_secs(30),
426 /// );
427 /// ```
428 #[must_use]
429 pub fn registered_workflows(&self) -> &[&'static str] {
430 &self.registered_workflows
431 }
432
433 /// The event store backend (behind an `Arc`).
434 #[must_use]
435 pub fn event_store(&self) -> &Arc<ES> {
436 &self.event_store
437 }
438
439 /// The snapshot store backend.
440 #[must_use]
441 pub fn snapshot_store(&self) -> &SS {
442 &self.snapshot_store
443 }
444
445 /// The outbox store backend.
446 ///
447 /// Poll `outbox_store().pending_now(limit)` in a background task to drain
448 /// the delivery queue.
449 #[must_use]
450 pub fn outbox_store(&self) -> &OS {
451 &self.outbox_store
452 }
453
454 /// The deadline store backend.
455 ///
456 /// Poll `deadline_store().due_now(limit)` in a background scheduler to
457 /// fire overdue process timers.
458 #[must_use]
459 pub fn deadline_store(&self) -> &DS {
460 &self.deadline_store
461 }
462
463 /// The process routing registry.
464 ///
465 /// Register a [`ProcessIdentity`] under a `(tenant_id, key)` pair at
466 /// process creation, then `lookup` it when routing inbound messages.
467 #[must_use]
468 pub fn registry(&self) -> &PR {
469 &self.registry
470 }
471
472 /// The dead-letter sink for unroutable or unprocessable messages.
473 ///
474 /// Call [`DeadLetterSink::reject`] when an inbound message cannot be
475 /// dispatched to any workflow. The default sink emits `tracing::warn!`
476 /// so rejections are always visible in the log output.
477 #[must_use]
478 pub fn dead_letter_sink(&self) -> &Arc<dyn DeadLetterSink> {
479 &self.dead_letter_sink
480 }
481
482 /// Assert that no Noop store is active — call this during production startup.
483 ///
484 /// Checks the type names of `OS`, `DS`, and `PR` against the string `"Noop"`.
485 /// Panics with a human-readable message if any match, directing the operator
486 /// to configure a persistent backend.
487 ///
488 /// # When to call
489 ///
490 /// Call this early in `makod`'s startup path (and `--check` mode) to catch
491 /// deployments where a Noop store was accidentally wired — e.g. the
492 /// `[outbox]`, `[deadline]`, or `[registry]` configuration section was
493 /// omitted from `makod.toml`. The check is defence-in-depth: in release
494 /// builds without the `testing` feature, Noop stores cannot implement the
495 /// required traits at all and the compiler would have already rejected them.
496 ///
497 /// # Panics
498 ///
499 /// Panics when any of `OS`, `DS`, or `PR` is a Noop implementation.
500 pub fn assert_production_stores(&self) {
501 let checks: &[(&str, &str)] = &[
502 ("OutboxStore", std::any::type_name::<OS>()),
503 ("DeadlineStore", std::any::type_name::<DS>()),
504 ("ProcessRegistry", std::any::type_name::<PR>()),
505 ];
506 for (trait_name, type_name) in checks {
507 assert!(
508 !type_name.contains("Noop"),
509 "makod: Noop{trait_name} is active — \
510 configure a persistent {trait_name} backend in makod.toml. \
511 Type resolved to: {type_name}"
512 );
513 }
514 }
515
516 /// The PID-to-workflow routing table.
517 ///
518 /// Populated **once** during [`EngineBuilder::build`] by calling
519 /// [`EngineModule::register_pids`] on every registered module in
520 /// registration order. After `build` returns the table is **sealed** —
521 /// it is read-only for the lifetime of the `EngineContext` and may be
522 /// freely shared across async tasks without synchronisation.
523 ///
524 /// # Mutability contract
525 ///
526 /// There is intentionally no `pid_router_mut()` accessor. Adding PIDs
527 /// after the engine is built would create a TOCTOU race between the
528 /// dispatch path (which calls `route(pid)`) and any hypothetical
529 /// concurrent mutator. Instead, register all PIDs during the build phase
530 /// via `EngineModule::register_pids`.
531 ///
532 /// If a new process family needs to be added without restarting the
533 /// binary, rebuild and restart `makod` — hot-swap of PID routing is not
534 /// supported.
535 ///
536 /// # Example — dispatch at the AS4 reception boundary
537 ///
538 /// ```rust,ignore
539 /// let workflow_name = ctx.pid_router().route(pid)
540 /// .ok_or_else(|| EngineError::Workflow(WorkflowError::InvalidCommand(
541 /// format!("no workflow registered for PID {pid}").into()
542 /// )))?;
543 ///
544 /// match workflow_name {
545 /// "gpke-supplier-change" => dispatch::<GpkeSupplierChangeWorkflow>(&ctx, pid, payload).await,
546 /// "wim-device-change" => dispatch::<WimDeviceChangeWorkflow>(&ctx, pid, payload).await,
547 /// other => Err(EngineError::Workflow(WorkflowError::InvalidCommand(
548 /// format!("unhandled workflow name: {other}").into()
549 /// ))),
550 /// }
551 /// ```
552 #[must_use]
553 pub fn pid_router(&self) -> &PidRouter {
554 &self.pid_router
555 }
556}
557
558// ── As4Sender ─────────────────────────────────────────────────────────────────
559
560/// Sends a single AS4 / EDIINT-over-HTTP outbound message.
561///
562/// Implement this trait for your AS4 gateway client and pass it to
563/// [`EngineContext::run_outbox_worker`].
564///
565/// # Contract
566///
567/// Return `Ok(())` only after the message has been **durably accepted** by the
568/// receiving MSH. Return `Err(…)` on transient or permanent failure — the
569/// outbox worker calls [`OutboxStore::reschedule`] so the message is retried.
570pub trait As4Sender: Send + Sync + 'static {
571 /// Transmit `msg` and return when the remote MSH has accepted it.
572 fn send(
573 &self,
574 msg: &OutboxMessage,
575 ) -> impl std::future::Future<Output = Result<(), EngineError>> + Send;
576}
577
578// ── OutboxWorker ──────────────────────────────────────────────────────────────
579
580/// A background worker that drains the outbox by polling pending
581/// [`OutboxMessage`]s and dispatching them via an [`As4Sender`].
582///
583/// Obtain via [`EngineContext::run_outbox_worker`] and drive by spawning
584/// [`OutboxWorker::run`] in a Tokio task.
585///
586/// # Polling behaviour
587///
588/// When the poll returns an empty batch the worker sleeps for `poll_interval`
589/// before polling again. Non-empty batches are processed immediately.
590///
591/// # Error handling
592///
593/// Successful sends are acknowledged via [`OutboxStore::acknowledge`].
594/// Failed sends are rescheduled via [`OutboxStore::reschedule`] using
595/// **full-jitter exponential backoff**: `delay = rand(0, min(MAX, BASE * 2^n))`
596/// where `n = attempt_count`. This avoids thundering-herd when multiple
597/// `makod` instances restart simultaneously after a receiver outage.
598///
599/// When `attempt_count >= max_attempts`, the message is **acknowledged** (removed
600/// from the outbox) and a [`DeadLetterReason::OutboxExhausted`] record is written
601/// to the dead-letter sink. This prevents permanently-undeliverable messages
602/// from clogging the outbox forever.
603///
604/// All errors are emitted as structured `tracing` events at `warn` / `error`
605/// level rather than `eprintln!`, so they appear in the application's log
606/// pipeline with full context (message_id, error).
607///
608/// # Example
609///
610/// ```rust,ignore
611/// use std::time::Duration;
612///
613/// let worker = ctx.run_outbox_worker(my_sender, 50, Duration::from_secs(1));
614/// tokio::spawn(async move { worker.run().await });
615/// ```
616///
617/// [`DeadLetterReason::OutboxExhausted`]: crate::dead_letter::DeadLetterReason::OutboxExhausted
618pub struct OutboxWorker<OS: OutboxStore, S: As4Sender> {
619 store: OS,
620 sender: S,
621 batch_size: usize,
622 poll_interval: std::time::Duration,
623 /// Maximum total delivery attempts before a message is dead-lettered.
624 ///
625 /// Default: 48 (covers ~4 hours at the 300 s backoff cap).
626 /// Set to `u32::MAX` to disable the cap (not recommended for production).
627 max_attempts: u32,
628 /// Sink for messages that exceed `max_attempts`.
629 dead_letter_sink: std::sync::Arc<dyn crate::dead_letter::DeadLetterSink>,
630}
631
632/// Compute a full-jitter exponential backoff delay.
633///
634/// `attempt` is the number of prior attempts (0 = first retry).
635/// `entropy` provides randomness; derive from a stable message identifier
636/// (e.g. hash of `message_id`) rather than the current timestamp — a
637/// timestamp-derived value is deterministic within a single batch, which
638/// defeats jitter when multiple messages fail simultaneously.
639///
640/// | attempt | window (s) | expected delay (s) |
641/// |---------|------------|-------------------|
642/// | 0 | 5 | 2.5 |
643/// | 1 | 10 | 5 |
644/// | 2 | 20 | 10 |
645/// | 3 | 40 | 20 |
646/// | 4 | 80 | 40 |
647/// | 5+ | 300 (cap) | 150 |
648fn backoff_delay(attempt: u32, entropy: u64) -> std::time::Duration {
649 const BASE_SECS: u64 = 5;
650 const MAX_SECS: u64 = 300;
651 // Exponential window: BASE * 2^attempt, capped at MAX.
652 let window = BASE_SECS
653 .saturating_mul(1u64.wrapping_shl(attempt.min(5)))
654 .min(MAX_SECS);
655 // Full jitter: uniform random in [0, window).
656 let jitter_secs = if window == 0 { 0 } else { entropy % window };
657 std::time::Duration::from_secs(jitter_secs)
658}
659
660impl<OS: OutboxStore, S: As4Sender> OutboxWorker<OS, S> {
661 /// Run the outbox drain loop until the task is cancelled.
662 ///
663 /// # Panics
664 ///
665 /// Panics if `time::Duration::try_from(delay)` overflows (unreachable for
666 /// the delay values produced by `backoff_delay`).
667 #[allow(clippy::too_many_lines)]
668 pub async fn run(self) {
669 loop {
670 let batch = match self.store.pending_now(self.batch_size).await {
671 Ok(b) => b,
672 Err(e) => {
673 tracing::warn!(error = %e, "outbox worker: store error polling pending messages (will retry)");
674 tokio::time::sleep(self.poll_interval).await;
675 continue;
676 }
677 };
678
679 if batch.is_empty() {
680 tokio::time::sleep(self.poll_interval).await;
681 continue;
682 }
683
684 for msg in batch {
685 // ── Max-attempt cap ───────────────────────────────────
686 // `attempt_count` starts at 0 and is incremented on each
687 // `reschedule` call. When it reaches `max_attempts` the
688 // message is considered permanently undeliverable: acknowledge
689 // it (remove from outbox) and dead-letter it so the regulatory
690 // audit trail is preserved.
691 if msg.attempt_count >= self.max_attempts {
692 tracing::error!(
693 message_id = %msg.message_id,
694 message_type = %msg.message_type,
695 recipient = %msg.recipient,
696 attempts = msg.attempt_count,
697 max_attempts = self.max_attempts,
698 "outbox worker: max delivery attempts reached; dead-lettering message",
699 );
700 self.dead_letter_sink.reject(
701 &crate::dead_letter::DeadLetterReason::OutboxExhausted {
702 message_id: msg.message_id,
703 message_type: msg.message_type.to_string(),
704 recipient: msg.recipient.to_string(),
705 last_error: format!(
706 "delivery exhausted after {} attempts",
707 msg.attempt_count
708 ),
709 attempts: msg.attempt_count,
710 },
711 );
712 if let Err(e) = self.store.acknowledge(msg.message_id).await {
713 tracing::error!(
714 message_id = %msg.message_id,
715 error = %e,
716 "outbox worker: acknowledge after exhaust failed; message may reappear",
717 );
718 }
719 continue;
720 }
721
722 match self.sender.send(&msg).await {
723 Ok(()) => {
724 if let Err(e) = self.store.acknowledge(msg.message_id).await {
725 tracing::warn!(
726 message_id = %msg.message_id,
727 error = %e,
728 "outbox worker: acknowledge failed",
729 );
730 }
731 // CONTRL AHB 1.0 §1.2: the CONTRL must be delivered
732 // within 6 wall-clock hours of interchange receipt.
733 // `msg.created_at` is when the PendingOutbox was
734 // materialised (which should equal the ingest timestamp
735 // for transport-layer CONTRL obligations).
736 if msg.message_type.as_ref() == "CONTRL" {
737 let elapsed = time::OffsetDateTime::now_utc() - msg.created_at;
738 if elapsed > time::Duration::hours(crate::fristen::CONTRL_FRIST_HOURS) {
739 tracing::warn!(
740 message_id = %msg.message_id,
741 elapsed_secs = elapsed.whole_seconds(),
742 max_secs = crate::fristen::CONTRL_FRIST_HOURS * 3600,
743 "outbox worker: CONTRL delivered OUTSIDE the 6h Übertragungsfrist \
744 (CONTRL AHB 1.0 §1.2) — this is a BNetzA compliance violation"
745 );
746 }
747 }
748 }
749 // Permanent error: dead-letter immediately without retrying.
750 // PartnerUnknown requires operator intervention (add --as4-partner);
751 // Serialization errors will never succeed on retry.
752 Err(ref e)
753 if e.is_partner_unknown() || matches!(e, EngineError::Serialization(_)) =>
754 {
755 tracing::error!(
756 message_id = %msg.message_id,
757 message_type = %msg.message_type,
758 recipient = %msg.recipient,
759 error = %e,
760 "outbox worker: permanent send failure; dead-lettering without retry",
761 );
762 self.dead_letter_sink.reject(
763 &crate::dead_letter::DeadLetterReason::OutboxExhausted {
764 message_id: msg.message_id,
765 message_type: msg.message_type.to_string(),
766 recipient: msg.recipient.to_string(),
767 last_error: e.to_string(),
768 attempts: msg.attempt_count,
769 },
770 );
771 if let Err(re) = self.store.acknowledge(msg.message_id).await {
772 tracing::error!(
773 message_id = %msg.message_id,
774 error = %re,
775 "outbox worker: acknowledge after permanent failure failed",
776 );
777 }
778 }
779 Err(e) => {
780 // Stable jitter entropy derived from the UUID bytes of
781 // `message_id`. Using the last 8 bytes as a `u64` gives
782 // uniform entropy across message IDs (UUIDs are random in
783 // all 128 bits for v4) and is stable across Rust versions —
784 // unlike `DefaultHasher`, whose algorithm is explicitly
785 // documented as unstable.
786 let entropy = {
787 let uuid = msg.message_id.as_uuid();
788 let bytes = uuid.as_bytes();
789 u64::from_le_bytes(bytes[8..16].try_into().unwrap())
790 };
791 let delay = backoff_delay(msg.attempt_count, entropy);
792 let retry_at = time::OffsetDateTime::now_utc()
793 + time::Duration::try_from(delay).unwrap_or(time::Duration::minutes(5));
794 tracing::warn!(
795 message_id = %msg.message_id,
796 attempt = msg.attempt_count,
797 max_attempts = self.max_attempts,
798 retry_in = ?delay,
799 error = %e,
800 "outbox worker: send failed; rescheduling with backoff",
801 );
802 if let Err(re) = self.store.reschedule(msg.message_id, retry_at).await {
803 tracing::error!(
804 message_id = %msg.message_id,
805 error = %re,
806 "outbox worker: reschedule failed; message may be stuck",
807 );
808 }
809 }
810 }
811 }
812 }
813 }
814}
815
816impl<ES, SS, OS, DS, PR> EngineContext<ES, SS, OS, DS, PR>
817where
818 ES: EventStore,
819 OS: OutboxStore + Clone,
820{
821 /// Construct an [`OutboxWorker`] that drains the outbox via `sender`.
822 ///
823 /// `batch_size` — messages fetched per poll cycle.
824 /// `poll_interval` — sleep duration when the batch is empty.
825 ///
826 /// `max_attempts` — maximum total delivery attempts before dead-lettering.
827 /// Pass `48` for a ~4-hour retry budget at the 300 s backoff cap, or
828 /// `u32::MAX` to disable the cap (not recommended for production).
829 ///
830 /// ```rust,ignore
831 /// use std::time::Duration;
832 ///
833 /// let worker = ctx.run_outbox_worker(my_sender, 50, Duration::from_secs(1), 48);
834 /// tokio::spawn(async move { worker.run().await });
835 /// ```
836 #[must_use]
837 pub fn run_outbox_worker<S: As4Sender>(
838 &self,
839 sender: S,
840 batch_size: usize,
841 poll_interval: std::time::Duration,
842 max_attempts: u32,
843 ) -> OutboxWorker<OS, S> {
844 OutboxWorker {
845 store: self.outbox_store.clone(),
846 sender,
847 batch_size,
848 poll_interval,
849 max_attempts,
850 dead_letter_sink: self.dead_letter_sink.clone(),
851 }
852 }
853}
854
855impl<ES, SS, OS, DS, PR> std::fmt::Debug for EngineContext<ES, SS, OS, DS, PR>
856where
857 ES: std::fmt::Debug,
858 SS: std::fmt::Debug,
859 OS: std::fmt::Debug,
860 DS: std::fmt::Debug,
861 PR: std::fmt::Debug,
862{
863 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
864 f.debug_struct("EngineContext")
865 .field("registered_modules", &self.registered_modules)
866 .field("registered_workflows", &self.registered_workflows)
867 .field("pid_router_len", &self.pid_router.len())
868 .finish_non_exhaustive()
869 }
870}
871
872// ── NoopAs4Sender / LogAs4Sender ──────────────────────────────────────────────
873
874/// An [`As4Sender`] that succeeds immediately without sending anything.
875///
876/// Use in tests and environments where outbound AS4 delivery is not yet
877/// wired. All outbox messages are acknowledged (removed from the queue)
878/// without being transmitted.
879///
880/// # ⚠️ Data loss warning
881///
882/// Every outbox message is **silently discarded** — no EDIFACT message is
883/// sent to any counterparty. Do not use in production.
884#[derive(Debug, Clone, Copy, Default)]
885#[must_use = "NoopAs4Sender discards all outbound messages silently — use a real AS4 gateway in production"]
886pub struct NoopAs4Sender;
887
888impl As4Sender for NoopAs4Sender {
889 async fn send(&self, _msg: &OutboxMessage) -> Result<(), EngineError> {
890 Ok(())
891 }
892}
893
894/// An [`As4Sender`] that logs every outbound message at `warn` level and
895/// succeeds without transmitting.
896///
897/// Useful for development and integration-testing environments where the
898/// full AS4 stack is not yet available but message visibility is desired.
899/// All outbox messages are acknowledged (removed from the queue) after logging.
900///
901/// # ⚠️ Data loss warning
902///
903/// No EDIFACT message is sent to any counterparty. Do not use in production.
904#[derive(Debug, Clone, Copy, Default)]
905#[must_use = "LogAs4Sender discards all outbound messages — use a real AS4 gateway in production"]
906pub struct LogAs4Sender;
907
908impl As4Sender for LogAs4Sender {
909 async fn send(&self, msg: &OutboxMessage) -> Result<(), EngineError> {
910 tracing::warn!(
911 message_id = %msg.message_id,
912 message_type = %msg.message_type,
913 recipient = %msg.recipient,
914 "LogAs4Sender: outbox message dropped — configure a real AS4 gateway for production",
915 );
916 Ok(())
917 }
918}
919
920// ── DeadlineScheduler ─────────────────────────────────────────────────────────
921
922/// A background task that polls [`DeadlineStore::due_now`] and dispatches
923/// deadline commands to the owning processes via a caller-supplied function.
924///
925/// Obtain via [`EngineContext::run_deadline_scheduler`] and drive by spawning
926/// [`DeadlineScheduler::run`] in a Tokio task.
927///
928/// # Dispatch function
929///
930/// The `dispatch` function receives a fired [`Deadline`] and returns a future
931/// that dispatches the appropriate timeout command to the process. The function
932/// is responsible for resuming the correct workflow and calling `execute`.
933/// After the future completes, the scheduler cancels the deadline from the
934/// store regardless of the dispatch outcome (to prevent re-firing).
935///
936/// ```rust,ignore
937/// use std::time::Duration;
938///
939/// let scheduler = ctx.run_deadline_scheduler(
940/// |deadline| async move {
941/// tracing::warn!(
942/// deadline_id = %deadline.deadline_id(),
943/// label = %deadline.label(),
944/// "deadline fired",
945/// );
946/// Ok(())
947/// },
948/// 100,
949/// Duration::from_secs(30),
950/// );
951/// tokio::spawn(async move { scheduler.run().await });
952/// ```
953pub struct DeadlineScheduler<DS: DeadlineStore> {
954 store: DS,
955 dispatch: Box<
956 dyn Fn(
957 Deadline,
958 ) -> std::pin::Pin<
959 Box<dyn std::future::Future<Output = Result<(), EngineError>> + Send>,
960 > + Send
961 + Sync,
962 >,
963 batch_size: usize,
964 poll_interval: std::time::Duration,
965}
966
967impl<DS: DeadlineStore> DeadlineScheduler<DS> {
968 /// Run the deadline poll loop until the task is cancelled.
969 pub async fn run(self) {
970 loop {
971 let result = match self.store.due_now(self.batch_size).await {
972 Ok(r) => r,
973 Err(e) => {
974 tracing::warn!(
975 error = %e,
976 "deadline scheduler: store error polling due deadlines (will retry)",
977 );
978 tokio::time::sleep(self.poll_interval).await;
979 continue;
980 }
981 };
982
983 if result.deadlines.is_empty() {
984 tokio::time::sleep(self.poll_interval).await;
985 continue;
986 }
987
988 for deadline in result.deadlines {
989 let id = deadline.deadline_id();
990 let label = deadline.label().to_owned();
991 let should_cancel = match (self.dispatch)(deadline).await {
992 Ok(()) => true,
993 Err(ref e) if e.is_version_conflict() => {
994 // The process was modified concurrently; the timeout
995 // command will be retried on the next poll cycle.
996 // Do NOT cancel — let the deadline remain due so it
997 // fires again until a non-conflict dispatch succeeds.
998 tracing::warn!(
999 deadline_id = %id,
1000 label = %label,
1001 "deadline scheduler: VersionConflict; will retry on next poll",
1002 );
1003 false
1004 }
1005 Err(e) => {
1006 tracing::warn!(
1007 deadline_id = %id,
1008 label = %label,
1009 error = %e,
1010 "deadline scheduler: dispatch failed (permanent); cancelling",
1011 );
1012 true
1013 }
1014 };
1015 if should_cancel {
1016 if let Err(e) = self.store.cancel(id).await {
1017 tracing::error!(
1018 deadline_id = %id,
1019 error = %e,
1020 "deadline scheduler: cancel failed; deadline may fire again",
1021 );
1022 }
1023 }
1024 }
1025
1026 // If has_more, loop immediately to drain the batch.
1027 }
1028 }
1029}
1030
1031impl<ES, SS, OS, DS, PR> EngineContext<ES, SS, OS, DS, PR>
1032where
1033 ES: EventStore,
1034 DS: DeadlineStore + Clone,
1035{
1036 /// Construct a [`DeadlineScheduler`] that polls the deadline store and
1037 /// dispatches fired deadlines via `dispatch`.
1038 ///
1039 /// The `dispatch` function is called for every fired deadline. It should
1040 /// resume the owning process and execute the appropriate timeout command.
1041 ///
1042 /// `batch_size` — deadlines fetched per poll cycle.
1043 /// `poll_interval` — sleep duration when no deadlines are due.
1044 ///
1045 /// ```rust,ignore
1046 /// use std::time::Duration;
1047 ///
1048 /// let scheduler = ctx.run_deadline_scheduler(
1049 /// |d| async move {
1050 /// tracing::info!(label = %d.label(), "firing deadline");
1051 /// Ok(())
1052 /// },
1053 /// 100,
1054 /// Duration::from_secs(30),
1055 /// );
1056 /// tokio::spawn(async move { scheduler.run().await });
1057 /// ```
1058 #[must_use]
1059 pub fn run_deadline_scheduler<F, Fut>(
1060 &self,
1061 dispatch: F,
1062 batch_size: usize,
1063 poll_interval: std::time::Duration,
1064 ) -> DeadlineScheduler<DS>
1065 where
1066 F: Fn(Deadline) -> Fut + Send + Sync + 'static,
1067 Fut: std::future::Future<Output = Result<(), EngineError>> + Send + 'static,
1068 {
1069 DeadlineScheduler {
1070 store: self.deadline_store.clone(),
1071 dispatch: Box::new(move |d| Box::pin(dispatch(d))),
1072 batch_size,
1073 poll_interval,
1074 }
1075 }
1076}
1077
1078// ── EngineBuilder ─────────────────────────────────────────────────────────────
1079
1080/// Assembles engine infrastructure and produces an [`EngineContext`].
1081///
1082/// Uses type-state to enforce that an event store is provided before
1083/// [`build`] can be called. All other stores default to `Noop`
1084/// implementations.
1085///
1086/// ## Quick start
1087///
1088/// ```rust,ignore
1089/// // Minimal — event store only, all others are Noop:
1090/// let ctx = EngineBuilder::new()
1091/// .with_event_store(InMemoryEventStore::new())
1092/// .build();
1093///
1094/// // Full infrastructure:
1095/// let ctx = EngineBuilder::new()
1096/// .with_event_store(InMemoryEventStore::new())
1097/// .with_snapshot_store(InMemorySnapshotStore::new())
1098/// .with_outbox_store(InMemoryOutboxStore::new())
1099/// .with_deadline_store(InMemoryDeadlineStore::new())
1100/// .with_registry(InMemoryProcessRegistry::new())
1101/// .register(Box::new(GpkeModule))
1102/// .build();
1103/// ```
1104///
1105/// [`build`]: EngineBuilder::build
1106pub struct EngineBuilder<
1107 ES = (),
1108 SS = NoopSnapshotStore,
1109 OS = NoopOutboxStore,
1110 DS = NoopDeadlineStore,
1111 PR = NoopProcessRegistry,
1112> {
1113 event_store: ES,
1114 snapshot_store: SS,
1115 outbox_store: OS,
1116 deadline_store: DS,
1117 registry: PR,
1118 dead_letter_sink: Arc<dyn DeadLetterSink>,
1119 modules: Vec<Box<dyn EngineModule>>,
1120 /// Active [`DeploymentRoles`] for this engine instance.
1121 ///
1122 /// Controls role-conditional PID registration via
1123 /// [`EngineModule::register_pids_with_roles`]. Defaults to
1124 /// [`DeploymentRoles::all()`] for backward compatibility.
1125 deployment_roles: DeploymentRoles,
1126 /// Optional profile validator injected by `makod` or callers that have
1127 /// access to `edi-energy`. When `Some`, called for each
1128 /// [`ProfileRequirement`] declared by registered modules. When `None`,
1129 /// profile requirements are not validated (safe in unit tests).
1130 ///
1131 /// Signature: `fn(message_type: &str) -> bool`
1132 ///
1133 /// [`ProfileRequirement`]: crate::profile::ProfileRequirement
1134 profile_validator: Option<Box<dyn Fn(&str) -> bool + Send + Sync>>,
1135}
1136#[cfg(any(test, feature = "testing"))]
1137impl Default
1138 for EngineBuilder<
1139 (),
1140 NoopSnapshotStore,
1141 NoopOutboxStore,
1142 NoopDeadlineStore,
1143 NoopProcessRegistry,
1144 >
1145{
1146 fn default() -> Self {
1147 Self {
1148 event_store: (),
1149 snapshot_store: NoopSnapshotStore,
1150 outbox_store: NoopOutboxStore,
1151 deadline_store: NoopDeadlineStore,
1152 registry: NoopProcessRegistry,
1153 dead_letter_sink: Arc::new(LogDeadLetterSink),
1154 modules: Vec::new(),
1155 deployment_roles: DeploymentRoles::all(),
1156 profile_validator: None,
1157 }
1158 }
1159}
1160
1161#[cfg(any(test, feature = "testing"))]
1162impl EngineBuilder {
1163 /// Create a new builder with all `Noop` defaults.
1164 ///
1165 /// Only available in `#[cfg(test)]` or with the `testing` feature enabled,
1166 /// because the Noop defaults silently discard outbox messages, deadlines,
1167 /// and process registry entries. Production binaries must wire real stores
1168 /// via the `with_*` builder methods.
1169 ///
1170 /// Call [`with_event_store`] before [`build`] — the event store is
1171 /// **required**.
1172 ///
1173 /// [`with_event_store`]: EngineBuilder::with_event_store
1174 /// [`build`]: EngineBuilder::build
1175 #[must_use]
1176 pub fn new() -> Self {
1177 Self::default()
1178 }
1179}
1180
1181impl<OS, DS, PR> EngineBuilder<(), NoopSnapshotStore, OS, DS, PR>
1182where
1183 OS: OutboxStore,
1184 DS: DeadlineStore,
1185 PR: ProcessRegistry,
1186{
1187 /// Create a production-ready builder with explicit stores for outbox,
1188 /// deadline, and process registry.
1189 ///
1190 /// This constructor is available in all build configurations including
1191 /// production binaries. It enforces that the three stores that can cause
1192 /// silent data loss (`OutboxStore`, `DeadlineStore`, `ProcessRegistry`)
1193 /// are provided explicitly — there is no Noop fallback.
1194 ///
1195 /// `NoopSnapshotStore` is used as the snapshot default because it is safe
1196 /// for production: skipping snapshots means full replay, but no data loss.
1197 /// Override with [`with_snapshot_store`] to enable snapshot-accelerated
1198 /// replay.
1199 ///
1200 /// Call [`with_event_store`] before [`build`] — the event store is
1201 /// **required**.
1202 ///
1203 /// ```rust,ignore
1204 /// let ctx = EngineBuilder::with_stores(outbox, deadline, registry)
1205 /// .with_event_store(store.clone())
1206 /// .with_snapshot_store(InMemorySnapshotStore::new())
1207 /// .build();
1208 /// ```
1209 ///
1210 /// [`with_snapshot_store`]: EngineBuilder::with_snapshot_store
1211 /// [`with_event_store`]: EngineBuilder::with_event_store
1212 /// [`build`]: EngineBuilder::build
1213 #[must_use]
1214 pub fn with_stores(outbox_store: OS, deadline_store: DS, registry: PR) -> Self {
1215 Self {
1216 event_store: (),
1217 snapshot_store: NoopSnapshotStore,
1218 outbox_store,
1219 deadline_store,
1220 registry,
1221 dead_letter_sink: Arc::new(LogDeadLetterSink),
1222 modules: Vec::new(),
1223 deployment_roles: DeploymentRoles::all(),
1224 profile_validator: None,
1225 }
1226 }
1227}
1228
1229impl<ES, SS, OS, DS, PR> EngineBuilder<ES, SS, OS, DS, PR> {
1230 /// Set the event store. **Required** — `build()` is only available once
1231 /// this has been called with a type that implements [`EventStore`].
1232 ///
1233 /// Replaces any previously set event store (type-state transition).
1234 #[must_use]
1235 pub fn with_event_store<ES2: EventStore>(
1236 self,
1237 store: ES2,
1238 ) -> EngineBuilder<ES2, SS, OS, DS, PR> {
1239 EngineBuilder {
1240 event_store: store,
1241 snapshot_store: self.snapshot_store,
1242 outbox_store: self.outbox_store,
1243 deadline_store: self.deadline_store,
1244 registry: self.registry,
1245 dead_letter_sink: self.dead_letter_sink,
1246 modules: self.modules,
1247 deployment_roles: self.deployment_roles,
1248 profile_validator: self.profile_validator,
1249 }
1250 }
1251
1252 /// Set the snapshot store (default: [`NoopSnapshotStore`]).
1253 ///
1254 /// ## Default: `NoopSnapshotStore`
1255 ///
1256 /// Without calling this method the builder uses [`NoopSnapshotStore`],
1257 /// which silently discards all snapshot writes and returns `None` for
1258 /// every snapshot read. The engine still functions correctly — every
1259 /// command handling call replays the full event log from the beginning
1260 /// instead of starting from a stored snapshot. For low-volume processes
1261 /// this is fine; for long-lived processes with many events the replay cost
1262 /// can become significant.
1263 ///
1264 /// Enable snapshotting in production by providing a real [`SnapshotStore`]
1265 /// implementation (e.g. the SlateDB-backed store in `makod`). In tests,
1266 /// [`InMemorySnapshotStore`][crate::snapshot::InMemorySnapshotStore] is
1267 /// available behind the `testing` feature flag.
1268 ///
1269 /// Note: [`Process::state_with_snapshot`][crate::process::Process::state_with_snapshot]
1270 /// is a compile-time no-op when the snapshot store is `NoopSnapshotStore`
1271 /// — it never calls the store and always returns `None`, so no snapshot is
1272 /// ever saved or loaded.
1273 #[must_use]
1274 pub fn with_snapshot_store<SS2: SnapshotStore>(
1275 self,
1276 store: SS2,
1277 ) -> EngineBuilder<ES, SS2, OS, DS, PR> {
1278 EngineBuilder {
1279 event_store: self.event_store,
1280 snapshot_store: store,
1281 outbox_store: self.outbox_store,
1282 deadline_store: self.deadline_store,
1283 registry: self.registry,
1284 dead_letter_sink: self.dead_letter_sink,
1285 modules: self.modules,
1286 deployment_roles: self.deployment_roles,
1287 profile_validator: self.profile_validator,
1288 }
1289 }
1290
1291 /// Set the outbox store (default: [`NoopOutboxStore`]).
1292 #[must_use]
1293 pub fn with_outbox_store<OS2: OutboxStore>(
1294 self,
1295 store: OS2,
1296 ) -> EngineBuilder<ES, SS, OS2, DS, PR> {
1297 EngineBuilder {
1298 event_store: self.event_store,
1299 snapshot_store: self.snapshot_store,
1300 outbox_store: store,
1301 deadline_store: self.deadline_store,
1302 registry: self.registry,
1303 dead_letter_sink: self.dead_letter_sink,
1304 modules: self.modules,
1305 deployment_roles: self.deployment_roles,
1306 profile_validator: self.profile_validator,
1307 }
1308 }
1309
1310 /// Set the deadline store (default: [`NoopDeadlineStore`]).
1311 #[must_use]
1312 pub fn with_deadline_store<DS2: DeadlineStore>(
1313 self,
1314 store: DS2,
1315 ) -> EngineBuilder<ES, SS, OS, DS2, PR> {
1316 EngineBuilder {
1317 event_store: self.event_store,
1318 snapshot_store: self.snapshot_store,
1319 outbox_store: self.outbox_store,
1320 deadline_store: store,
1321 registry: self.registry,
1322 dead_letter_sink: self.dead_letter_sink,
1323 modules: self.modules,
1324 deployment_roles: self.deployment_roles,
1325 profile_validator: self.profile_validator,
1326 }
1327 }
1328
1329 /// Set the process registry (default: [`NoopProcessRegistry`]).
1330 #[must_use]
1331 pub fn with_registry<PR2: ProcessRegistry>(
1332 self,
1333 registry: PR2,
1334 ) -> EngineBuilder<ES, SS, OS, DS, PR2> {
1335 EngineBuilder {
1336 event_store: self.event_store,
1337 snapshot_store: self.snapshot_store,
1338 outbox_store: self.outbox_store,
1339 deadline_store: self.deadline_store,
1340 registry,
1341 dead_letter_sink: self.dead_letter_sink,
1342 modules: self.modules,
1343 deployment_roles: self.deployment_roles,
1344 profile_validator: self.profile_validator,
1345 }
1346 }
1347
1348 /// Set the dead-letter sink (default: [`LogDeadLetterSink`]).
1349 ///
1350 /// The dead-letter sink receives every message that cannot be routed to a
1351 /// workflow. The default [`LogDeadLetterSink`] emits `tracing::warn!`
1352 /// events, making rejections visible in log output without configuration.
1353 ///
1354 /// Override with a persistent DLQ implementation in production:
1355 ///
1356 /// ```rust,ignore
1357 /// use mako_engine::dead_letter::LogDeadLetterSink;
1358 ///
1359 /// let ctx = EngineBuilder::new()
1360 /// .with_event_store(my_store)
1361 /// .with_dead_letter_sink(MyPersistentDlq::new())
1362 /// .build();
1363 /// ```
1364 ///
1365 /// [`LogDeadLetterSink`]: crate::dead_letter::LogDeadLetterSink
1366 #[must_use]
1367 pub fn with_dead_letter_sink(mut self, sink: impl DeadLetterSink) -> Self {
1368 self.dead_letter_sink = Arc::new(sink);
1369 self
1370 }
1371
1372 /// Register an `edi-energy` profile validator for startup profile checks.
1373 ///
1374 /// The closure receives a message-type string (e.g. `"UTILMD"`) and must
1375 /// return `true` if at least one active profile for that message type is
1376 /// registered for today's date.
1377 ///
1378 /// Wire this in `makod` using the `edi-energy` global registry:
1379 ///
1380 /// ```rust,ignore
1381 /// use edi_energy::registry::ReleaseRegistry;
1382 ///
1383 /// let today = time::OffsetDateTime::now_utc().date();
1384 /// builder.with_profile_validator(move |msg_type| {
1385 /// ReleaseRegistry::global()
1386 /// .profiles_for_str(msg_type)
1387 /// .any(|p| match (p.valid_from(), p.valid_until()) {
1388 /// (Some(f), Some(u)) => f <= today && today <= u,
1389 /// (Some(f), None) => f <= today,
1390 /// (None, _) => true,
1391 /// })
1392 /// })
1393 /// ```
1394 ///
1395 /// Domain crates do **not** need to call this — they only declare
1396 /// [`profile_requirements`].
1397 ///
1398 /// [`profile_requirements`]: EngineModule::profile_requirements
1399 #[must_use]
1400 pub fn with_profile_validator(
1401 mut self,
1402 validator: impl Fn(&str) -> bool + Send + Sync + 'static,
1403 ) -> Self {
1404 self.profile_validator = Some(Box::new(validator));
1405 self
1406 }
1407
1408 /// Register a domain module.
1409 ///
1410 /// The module name becomes visible in
1411 /// [`EngineContext::registered_modules`] after [`build`] is called.
1412 ///
1413 /// [`build`]: EngineBuilder::build
1414 #[must_use]
1415 pub fn register(mut self, module: Box<dyn EngineModule>) -> Self {
1416 self.modules.push(module);
1417 self
1418 }
1419
1420 /// Set the active [`DeploymentRoles`] for this engine instance.
1421 ///
1422 /// Controls role-conditional PID registration in [`EngineModule::register_pids_with_roles`].
1423 ///
1424 /// The default is [`DeploymentRoles::all()`], which registers every PID unconditionally
1425 /// — identical to the pre-role-aware behavior. Providing an explicit role set
1426 /// restricts role-conditional blocks to only the declared roles:
1427 ///
1428 /// - **NB-only** (`DeploymentRoles::nb()`): 19001/19002 route to `gpke-konfiguration`;
1429 /// WiM nMSB blocks are skipped.
1430 /// - **nMSB-only** (`DeploymentRoles::nmsb()`): 19001/19002 route to `wim-geraeteubernahme`;
1431 /// GPKE NB blocks are skipped.
1432 /// - **NB + gMSB** (`DeploymentRoles::nb_msb()`): most common Stadtwerke combination.
1433 ///
1434 /// # Conflict guard
1435 ///
1436 /// When two modules would register the same PID to **different** workflows, the
1437 /// engine panics during [`build`]. Set explicit roles to prevent both modules from
1438 /// activating the same PID simultaneously:
1439 ///
1440 /// ```rust,ignore
1441 /// use mako_engine::marktrolle::DeploymentRoles;
1442 ///
1443 /// let ctx = EngineBuilder::with_stores(outbox, deadline, registry)
1444 /// .with_event_store(store)
1445 /// .with_deployment_roles(DeploymentRoles::nb()) // only NB: GPKE gets 19001/19002
1446 /// .register(Box::new(GpkeModule))
1447 /// .register(Box::new(WimModule)) // nMSB block skipped — no conflict
1448 /// .build();
1449 /// ```
1450 ///
1451 /// [`build`]: EngineBuilder::build
1452 #[must_use]
1453 pub fn with_deployment_roles(mut self, roles: DeploymentRoles) -> Self {
1454 self.deployment_roles = roles;
1455 self
1456 }
1457}
1458
1459impl<ES, SS, OS, DS, PR> EngineBuilder<ES, SS, OS, DS, PR>
1460where
1461 ES: EventStore,
1462 SS: SnapshotStore,
1463 OS: OutboxStore,
1464 DS: DeadlineStore,
1465 PR: ProcessRegistry,
1466{
1467 /// Build the [`EngineContext`].
1468 ///
1469 /// Consumes the builder. All registered modules and configured stores are
1470 /// moved into the returned [`EngineContext`].
1471 ///
1472 /// This method is only available when `ES` implements [`EventStore`].
1473 /// If you have not called [`with_event_store`], this will not compile.
1474 ///
1475 /// # Panics
1476 ///
1477 /// Panics when any registered module returns `Err` from
1478 /// [`EngineModule::configure`]. The panic message includes the module
1479 /// name and the error string so the deployment failure is actionable.
1480 ///
1481 /// [`with_event_store`]: EngineBuilder::with_event_store
1482 #[must_use]
1483 #[allow(clippy::too_many_lines)]
1484 pub fn build(self) -> EngineContext<ES, SS, OS, DS, PR> {
1485 // ── Noop store safety checks ──────────────────────────────────────────
1486 //
1487 // Noop stores lose data silently: NoopDeadlineStore drops every APERAK
1488 // deadline (BNetzA violation), NoopOutboxStore discards all outbound
1489 // messages, NoopProcessRegistry loses conversation routing on restart.
1490 //
1491 // In production builds (no `testing` feature, not running under
1492 // `#[test]`), the Noop constructors are cfg-gated out so this branch
1493 // is dead code and compiles away. In test/testing/tracing builds we
1494 // emit warnings so test harnesses see the configuration in log output.
1495 //
1496 // IMPORTANT: if you are reading this because a panic fired in production,
1497 // it means the `testing` feature was accidentally enabled in the binary.
1498 // Remove it from the production Cargo.toml feature list immediately.
1499 {
1500 let os_name = std::any::type_name::<OS>();
1501 let ds_name = std::any::type_name::<DS>();
1502 let pr_name = std::any::type_name::<PR>();
1503
1504 // Regulatory-critical stores: panic in any build context if these
1505 // are noop. OutboxStore and DeadlineStore must be durable in
1506 // production; ProcessRegistry must survive restarts.
1507 #[cfg(not(any(test, feature = "testing")))]
1508 {
1509 if ds_name.contains("NoopDeadlineStore") {
1510 panic!(
1511 "EngineBuilder::build: NoopDeadlineStore is active in a \
1512 non-testing build. This silently discards all APERAK deadlines, \
1513 which is an immediately reportable BNetzA violation \
1514 (BK6-22-024 §5, BK7-24-01-009). \
1515 Call .with_deadline_store(SlateDbStore::as_deadline_store()) \
1516 in your production engine assembly. \
1517 If this is a test, enable the 'testing' feature."
1518 );
1519 }
1520 if os_name.contains("NoopOutboxStore") {
1521 panic!(
1522 "EngineBuilder::build: NoopOutboxStore is active in a \
1523 non-testing build. This silently discards all outbound \
1524 APERAK, CONTRL, and UTILMD messages. \
1525 Call .with_outbox_store(SlateDbStore::as_outbox_store()) \
1526 in your production engine assembly. \
1527 If this is a test, enable the 'testing' feature."
1528 );
1529 }
1530 if pr_name.contains("NoopProcessRegistry") {
1531 panic!(
1532 "EngineBuilder::build: NoopProcessRegistry is active in a \
1533 non-testing build. This means conversation routing \
1534 (PID → stream_id lookup) is lost on every restart, \
1535 breaking all WiM, GeLi Gas, and GPKE in-flight processes. \
1536 Call .with_registry(SlateDbStore::as_process_registry()) \
1537 in your production engine assembly. \
1538 If this is a test, enable the 'testing' feature."
1539 );
1540 }
1541 }
1542
1543 // In test/testing/tracing builds: emit warnings instead of panicking.
1544 #[cfg(any(test, feature = "testing", feature = "tracing"))]
1545 {
1546 let ss_name = std::any::type_name::<SS>();
1547 if ss_name.contains("NoopSnapshotStore") {
1548 tracing::warn!(
1549 store = ss_name,
1550 "EngineBuilder: NoopSnapshotStore is active — snapshots will not be \
1551 persisted. Use SlateDbStore::as_snapshot_store() in production."
1552 );
1553 }
1554 if os_name.contains("NoopOutboxStore") {
1555 tracing::warn!(
1556 store = os_name,
1557 "EngineBuilder: NoopOutboxStore is active — outbound messages will be \
1558 silently discarded. Use SlateDbStore::as_outbox_store() in production."
1559 );
1560 }
1561 if ds_name.contains("NoopDeadlineStore") {
1562 tracing::warn!(
1563 store = ds_name,
1564 "EngineBuilder: NoopDeadlineStore is active — scheduled deadlines will \
1565 not fire after restart. Use SlateDbStore::as_deadline_store() in production."
1566 );
1567 }
1568 if pr_name.contains("NoopProcessRegistry") {
1569 tracing::warn!(
1570 store = pr_name,
1571 "EngineBuilder: NoopProcessRegistry is active — process routing will be \
1572 lost on restart. Use SlateDbStore::as_process_registry() in production."
1573 );
1574 }
1575 }
1576 }
1577 // Validate every module before assembling the context.
1578 // A missing adapter or misconfigured module fails at startup (not at
1579 // first inbound message), making deployment failures observable immediately.
1580 for module in &self.modules {
1581 if let Err(msg) = module.configure() {
1582 panic!(
1583 "EngineBuilder::build: module '{}' failed configuration validation: {}",
1584 module.name(),
1585 msg
1586 );
1587 }
1588 // Validate profile requirements via the injected validator.
1589 // Domain crates declare requirements; only the binary crate (makod)
1590 // injects the edi-energy registry — domain crates need no edi-energy
1591 // import for this check.
1592 if let Some(ref validator) = self.profile_validator {
1593 for req in module.profile_requirements() {
1594 assert!(
1595 validator(req.message_type),
1596 "EngineBuilder::build: module '{}' requires an active edi-energy \
1597 profile for '{}' ({}) but none is registered for today's date. \
1598 Run `cargo xtask codegen` to add the missing profile.",
1599 module.name(),
1600 req.message_type,
1601 req.label,
1602 );
1603 }
1604 }
1605 }
1606 // Build the PID router from all registered modules.
1607 // Also assert that no two modules claim the same PID — a PID overlap
1608 // is always a configuration error: one module's messages would be
1609 // silently swallowed by another's workflow, producing missing-process
1610 // errors or incorrect audit trails.
1611 let mut pid_router = PidRouter::new();
1612 let mut pid_owners: std::collections::HashMap<u32, &str> = std::collections::HashMap::new();
1613 for module in &self.modules {
1614 // Temporarily build a scratch router to read this module's PIDs
1615 // for cross-module overlap detection (module-ownership level).
1616 let mut scratch = PidRouter::new();
1617 module.register_pids_with_roles(&mut scratch, &self.deployment_roles);
1618 for pid in scratch.registered_pids() {
1619 if let Some(prev) = pid_owners.insert(pid, module.name()) {
1620 if self.deployment_roles.is_all() {
1621 // With DeploymentRoles::all() (the default), role-conditional PIDs
1622 // are registered by all modules that claim them, producing last-wins
1623 // semantics. This is acceptable for single-role and dev/test deployments.
1624 //
1625 // In production multi-role deployments where both an NB and nMSB role
1626 // are served by the same instance, set explicit roles via
1627 // `EngineBuilder::with_deployment_roles` to prevent silent misrouting.
1628 //
1629 // We emit a debug-level log here (not warn) because the vast majority
1630 // of deployments are single-role and this overlap is expected/harmless.
1631 #[cfg(feature = "tracing")]
1632 tracing::debug!(
1633 pid,
1634 previous_module = prev,
1635 current_module = module.name(),
1636 "PID registered by multiple modules with DeploymentRoles::all(); \
1637 last module wins (use with_deployment_roles for strict routing)",
1638 );
1639 let _ = prev; // suppress unused-variable warning when tracing is off
1640 } else {
1641 panic!(
1642 "EngineBuilder::build: PID {pid} is claimed by both \
1643 '{prev}' and '{}' — overlapping PID registrations are \
1644 not allowed with explicit DeploymentRoles; each PID must be \
1645 owned by exactly one module.\n \
1646 Hint: use EngineBuilder::with_deployment_roles to restrict \
1647 role-conditional PIDs so only one module registers each PID.\n \
1648 Example: with_deployment_roles(DeploymentRoles::nb()) ensures \
1649 GPKE registers ORDRSP 19001/19002, not WiM.",
1650 module.name(),
1651 );
1652 }
1653 }
1654 }
1655 // Register into the real router with conflict detection.
1656 module.register_pids_with_roles(&mut pid_router, &self.deployment_roles);
1657 }
1658 let registered_modules = self.modules.iter().map(|m| m.name()).collect();
1659 let registered_workflows = self
1660 .modules
1661 .iter()
1662 .flat_map(|m| m.workflow_names().iter().copied())
1663 .collect();
1664 EngineContext {
1665 event_store: Arc::new(self.event_store),
1666 snapshot_store: self.snapshot_store,
1667 outbox_store: self.outbox_store,
1668 deadline_store: self.deadline_store,
1669 registry: self.registry,
1670 dead_letter_sink: self.dead_letter_sink,
1671 pid_router,
1672 registered_modules,
1673 registered_workflows,
1674 }
1675 }
1676}
1677
1678#[cfg(test)]
1679mod tests {
1680 use super::*;
1681 use crate::{
1682 deadline::InMemoryDeadlineStore,
1683 error::WorkflowError,
1684 event_store::InMemoryEventStore,
1685 ids::TenantId,
1686 outbox::InMemoryOutboxStore,
1687 pid_router::PidRouter,
1688 registry::InMemoryProcessRegistry,
1689 snapshot::InMemorySnapshotStore,
1690 version::WorkflowId,
1691 workflow::{CommandPayload, EventPayload, Workflow},
1692 };
1693
1694 // ── Minimal workflow for spawn/resume tests ───────────────────────────────
1695
1696 #[derive(serde::Serialize, serde::Deserialize)]
1697 struct PingEvent;
1698
1699 impl EventPayload for PingEvent {
1700 fn event_type(&self) -> &'static str {
1701 "Ping"
1702 }
1703 }
1704
1705 struct PingCommand;
1706
1707 impl CommandPayload for PingCommand {}
1708
1709 #[derive(Default, Clone)]
1710 struct PingState;
1711
1712 struct PingWorkflow;
1713
1714 impl Workflow for PingWorkflow {
1715 type State = PingState;
1716 type Event = PingEvent;
1717 type Command = PingCommand;
1718
1719 fn apply(state: PingState, _: &PingEvent) -> PingState {
1720 state
1721 }
1722
1723 fn handle(
1724 _: &PingState,
1725 _: PingCommand,
1726 ) -> Result<crate::workflow::WorkflowOutput<PingEvent>, WorkflowError> {
1727 Ok(vec![PingEvent].into())
1728 }
1729 }
1730
1731 struct TestModule;
1732
1733 impl EngineModule for TestModule {
1734 fn name(&self) -> &'static str {
1735 "test-module"
1736 }
1737 }
1738
1739 // ── Tests ─────────────────────────────────────────────────────────────────
1740
1741 #[test]
1742 fn build_with_event_store_only() {
1743 let ctx = EngineBuilder::new()
1744 .with_event_store(InMemoryEventStore::new())
1745 .build();
1746 assert!(ctx.registered_modules().is_empty());
1747 }
1748
1749 #[test]
1750 fn build_with_all_stores_and_module() {
1751 let ctx = EngineBuilder::new()
1752 .with_event_store(InMemoryEventStore::new())
1753 .with_snapshot_store(InMemorySnapshotStore::new())
1754 .with_outbox_store(InMemoryOutboxStore::new())
1755 .with_deadline_store(InMemoryDeadlineStore::new())
1756 .with_registry(InMemoryProcessRegistry::new())
1757 .register(Box::new(TestModule))
1758 .build();
1759 assert_eq!(ctx.registered_modules(), &["test-module"]);
1760 }
1761
1762 #[test]
1763 fn multiple_modules_ordered() {
1764 struct ModA;
1765 impl EngineModule for ModA {
1766 fn name(&self) -> &'static str {
1767 "mod-a"
1768 }
1769 }
1770 struct ModB;
1771 impl EngineModule for ModB {
1772 fn name(&self) -> &'static str {
1773 "mod-b"
1774 }
1775 }
1776
1777 let ctx = EngineBuilder::new()
1778 .with_event_store(InMemoryEventStore::new())
1779 .register(Box::new(ModA))
1780 .register(Box::new(ModB))
1781 .build();
1782 assert_eq!(ctx.registered_modules(), &["mod-a", "mod-b"]);
1783 }
1784
1785 #[tokio::test]
1786 async fn spawn_creates_independent_processes() {
1787 let ctx = EngineBuilder::new()
1788 .with_event_store(InMemoryEventStore::new())
1789 .build();
1790 let wf_id = WorkflowId::new("ping", "FV2024-10-01");
1791
1792 let p1 = ctx.spawn::<PingWorkflow>(TenantId::new(), wf_id.clone());
1793 let p2 = ctx.spawn::<PingWorkflow>(TenantId::new(), wf_id);
1794
1795 assert_ne!(p1.process_id(), p2.process_id());
1796 }
1797
1798 #[tokio::test]
1799 async fn resume_sees_previously_appended_events() {
1800 let store = InMemoryEventStore::new();
1801 let ctx = EngineBuilder::new().with_event_store(store).build();
1802
1803 let p = ctx.spawn::<PingWorkflow>(TenantId::new(), WorkflowId::new("ping", "FV2024-10-01"));
1804 p.execute(PingCommand).await.unwrap();
1805
1806 let identity = p.identity();
1807 let resumed = ctx.resume::<PingWorkflow>(identity);
1808 assert_eq!(resumed.event_count().await.unwrap(), 1);
1809 }
1810
1811 #[tokio::test]
1812 async fn registry_routes_process_via_conversation_key() {
1813 use crate::registry::RegistryKey;
1814 let ctx = EngineBuilder::new()
1815 .with_event_store(InMemoryEventStore::new())
1816 .with_registry(InMemoryProcessRegistry::new())
1817 .build();
1818
1819 let p = ctx.spawn::<PingWorkflow>(TenantId::new(), WorkflowId::new("ping", "FV2024-10-01"));
1820 let tenant = p.tenant_id();
1821 let conv_key = RegistryKey::parse("conv:test-conversation-123").expect("valid key");
1822 ctx.registry()
1823 .register(tenant, &conv_key, p.identity())
1824 .await
1825 .unwrap();
1826
1827 let found = ctx
1828 .registry()
1829 .lookup(tenant, &conv_key)
1830 .await
1831 .unwrap()
1832 .expect("must be registered");
1833 let resumed = ctx.resume::<PingWorkflow>(found);
1834 assert_eq!(resumed.process_id(), p.process_id());
1835 }
1836
1837 #[test]
1838 fn pid_router_populated_by_module_register_pids() {
1839 struct PidModule;
1840 impl EngineModule for PidModule {
1841 fn name(&self) -> &'static str {
1842 "pid-module"
1843 }
1844 fn register_pids(&self, router: &mut PidRouter) {
1845 router.register(55001, "gpke-supplier-change");
1846 router.register(55002, "gpke-supplier-change");
1847 }
1848 }
1849
1850 let ctx = EngineBuilder::new()
1851 .with_event_store(InMemoryEventStore::new())
1852 .register(Box::new(PidModule))
1853 .build();
1854
1855 assert_eq!(ctx.pid_router().route(55001), Some("gpke-supplier-change"));
1856 assert_eq!(ctx.pid_router().route(55002), Some("gpke-supplier-change"));
1857 assert!(ctx.pid_router().route(99999).is_none());
1858 assert_eq!(ctx.pid_router().len(), 2);
1859 }
1860
1861 /// Verify that `register_pids_with_roles` gates PIDs behind role checks.
1862 ///
1863 /// Scenario: two modules share PID 19001.
1864 /// - ModuleA registers 19001 → "workflow-a" when role `Nb` is present.
1865 /// - ModuleB registers 19001 → "workflow-b" when role `Nmsb` is explicitly set
1866 /// (not on `all()`).
1867 ///
1868 /// - `all()`: ModuleA fires (Nb ∈ all), ModuleB does NOT (is_all → skip).
1869 /// → 19001 routes to "workflow-a".
1870 /// - `from_roles([Nb])`: ModuleA fires, ModuleB skips.
1871 /// → 19001 routes to "workflow-a".
1872 /// - `from_roles([Nmsb])`: ModuleA skips, ModuleB fires.
1873 /// → 19001 routes to "workflow-b".
1874 #[test]
1875 fn register_pids_with_roles_gates_pids_correctly() {
1876 use crate::marktrolle::{DeploymentRoles, Marktrolle};
1877
1878 struct ModuleA;
1879 impl EngineModule for ModuleA {
1880 fn name(&self) -> &'static str {
1881 "module-a"
1882 }
1883 fn register_pids_with_roles(&self, router: &mut PidRouter, roles: &DeploymentRoles) {
1884 if roles.contains(Marktrolle::Nb) {
1885 router.register(19_001, "workflow-a");
1886 }
1887 }
1888 }
1889
1890 struct ModuleB;
1891 impl EngineModule for ModuleB {
1892 fn name(&self) -> &'static str {
1893 "module-b"
1894 }
1895 fn register_pids_with_roles(&self, router: &mut PidRouter, roles: &DeploymentRoles) {
1896 // Only fires on explicit Nmsb, not on all() (backward-compat sentinel).
1897 if !roles.is_all() && roles.contains(Marktrolle::Nmsb) {
1898 router.register(19_001, "workflow-b");
1899 router.register(19_015, "workflow-b");
1900 }
1901 }
1902 }
1903
1904 let build = |roles: DeploymentRoles| {
1905 EngineBuilder::new()
1906 .with_event_store(InMemoryEventStore::new())
1907 .with_deployment_roles(roles)
1908 .register(Box::new(ModuleA))
1909 .register(Box::new(ModuleB))
1910 .build()
1911 };
1912
1913 // all() → backward compat: ModuleA registers 19001 (Nb ∈ all), ModuleB skips.
1914 let ctx = build(DeploymentRoles::all());
1915 assert_eq!(ctx.pid_router().route(19_001), Some("workflow-a"));
1916 assert!(ctx.pid_router().route(19_015).is_none());
1917
1918 // Explicit Nb → same result: ModuleA registers, ModuleB (nMSB) skips.
1919 let ctx = build(DeploymentRoles::nb());
1920 assert_eq!(ctx.pid_router().route(19_001), Some("workflow-a"));
1921 assert!(ctx.pid_router().route(19_015).is_none());
1922
1923 // Explicit Nmsb → ModuleA skips (Nb ∉ roles), ModuleB registers.
1924 let ctx = build(DeploymentRoles::nmsb());
1925 assert_eq!(ctx.pid_router().route(19_001), Some("workflow-b"));
1926 assert_eq!(ctx.pid_router().route(19_015), Some("workflow-b"));
1927 }
1928
1929 /// Verify that explicit roles with two conflicting modules panic at build time.
1930 #[test]
1931 #[should_panic(expected = "overlapping PID registrations")]
1932 fn register_pids_with_roles_conflict_panics_with_explicit_roles() {
1933 use crate::marktrolle::{DeploymentRoles, Marktrolle};
1934
1935 struct ConflictA;
1936 impl EngineModule for ConflictA {
1937 fn name(&self) -> &'static str {
1938 "conflict-a"
1939 }
1940 fn register_pids_with_roles(&self, router: &mut PidRouter, roles: &DeploymentRoles) {
1941 if roles.contains(Marktrolle::Nb) {
1942 router.register(19_001, "workflow-a");
1943 }
1944 }
1945 }
1946
1947 struct ConflictB;
1948 impl EngineModule for ConflictB {
1949 fn name(&self) -> &'static str {
1950 "conflict-b"
1951 }
1952 fn register_pids_with_roles(&self, router: &mut PidRouter, roles: &DeploymentRoles) {
1953 if !roles.is_all() && roles.contains(Marktrolle::Nmsb) {
1954 router.register(19_001, "workflow-b"); // same PID, different workflow
1955 }
1956 }
1957 }
1958
1959 // from_roles([Nb, Nmsb]): both modules fire → panic (overlapping PIDs).
1960 let _ = EngineBuilder::new()
1961 .with_event_store(InMemoryEventStore::new())
1962 .with_deployment_roles(DeploymentRoles::from_roles([
1963 Marktrolle::Nb,
1964 Marktrolle::Nmsb,
1965 ]))
1966 .register(Box::new(ConflictA))
1967 .register(Box::new(ConflictB))
1968 .build();
1969 }
1970}