mako_engine/message_adapter.rs
1//! [`MessageAdapter`] — cross-format-version message-to-command translation.
2//!
3//! # Problem
4//!
5//! A GPKE process started under `FV2025-10-01` may still be in-flight when
6//! `FV2026-10-01` goes live. The counterparty begins sending APERAK messages
7//! in the new AHB format before the process completes. The field that signals
8//! acceptance may have moved (e.g. a new qualifier in BGM DE 1001), or an
9//! additional mandatory DTM has been added.
10//!
11//! Without an explicit adapter, each workflow handles this ad-hoc inside its
12//! command constructor, making the mapping invisible, untested, and easy to
13//! forget when a new release cycle arrives.
14//!
15//! # Solution
16//!
17//! `MessageAdapter<W>` is the type-system home for all format-version-specific
18//! translation logic. An adapter declares which format versions it can handle
19//! (`accepts_format_version`), receives a parsed `AnyMessage`, and returns the
20//! domain command to dispatch.
21//!
22//! Adapters are registered in an [`AdapterRegistry`] at engine startup. The
23//! registry validates at registration time that all format versions in the
24//! workflow's [`WorkflowVersionPolicy`] have a registered adapter.
25//!
26//! # Example
27//!
28//! ```rust,ignore
29//! use mako_engine::message_adapter::{AdapterRegistry, MessageAdapter};
30//! use mako_engine::version::FormatVersion;
31//! use mako_engine::error::EngineError;
32//!
33//! struct GpkeAperakAdapter;
34//!
35//! impl MessageAdapter<GpkeWorkflow> for GpkeAperakAdapter {
36//! fn accepts_format_version(&self, fv: &FormatVersion) -> bool {
37//! matches!(fv.as_str(), "FV2025-10-01" | "FV2026-10-01")
38//! }
39//!
40//! fn adapt(
41//! &self,
42//! msg: &dyn std::any::Any,
43//! fv: &FormatVersion,
44//! ) -> Result<GpkeCommand, EngineError> {
45//! // parse `msg` as APERAK and construct the appropriate command
46//! Ok(GpkeCommand::ReceiveAperak { positive: true })
47//! }
48//! }
49//!
50//! let mut registry: AdapterRegistry<GpkeWorkflow> = AdapterRegistry::new();
51//! registry.register(GpkeAperakAdapter);
52//! ```
53
54use crate::{
55 error::EngineError,
56 version::{FormatVersion, WorkflowVersionPolicy},
57 workflow::Workflow,
58};
59
60// ── MessageAdapter trait ──────────────────────────────────────────────────────
61
62/// Translates a parsed EDIFACT message into a domain command for workflow `W`.
63///
64/// Implement one `MessageAdapter` per (message type, format version range)
65/// combination that your workflow needs to handle. An adapter that handles
66/// multiple format versions via internal branching is also valid.
67///
68/// # Thread safety
69///
70/// Adapters must be `Send + Sync + 'static` because they are stored in an
71/// [`AdapterRegistry`] that is shared across async tasks.
72pub trait MessageAdapter<W: Workflow>: Send + Sync + 'static {
73 /// Returns `true` when this adapter can translate messages formatted under
74 /// `fv`.
75 ///
76 /// The [`AdapterRegistry`] calls this during validation to confirm that
77 /// every format version declared by the workflow's
78 /// [`WorkflowVersionPolicy`] is covered.
79 fn accepts_format_version(&self, fv: &FormatVersion) -> bool;
80
81 /// Translate a raw parsed message into a domain command.
82 ///
83 /// `fv` is the format version detected from the wire message (e.g. from
84 /// `EdiEnergyMessage::detect_release`). Use it to select the correct
85 /// field mapping when the adapter handles multiple format versions.
86 ///
87 /// # Errors
88 ///
89 /// Return [`EngineError::Workflow`] when the message is structurally valid
90 /// but semantically inappropriate for this command (e.g. wrong PID).
91 ///
92 /// Return [`EngineError::Deserialization`] when a required field is absent
93 /// or malformed.
94 fn adapt(&self, raw: &dyn std::any::Any, fv: &FormatVersion)
95 -> Result<W::Command, EngineError>;
96}
97
98// ── AdapterRegistry ───────────────────────────────────────────────────────────
99
100/// Runtime registry of [`MessageAdapter`]s for a single workflow type `W`.
101///
102/// Adapters are registered at startup via [`AdapterRegistry::register`].
103/// After all adapters are registered, call [`AdapterRegistry::validate_policy`]
104/// to confirm that every format version declared in the workflow's
105/// [`WorkflowVersionPolicy`] is covered by at least one adapter.
106///
107/// # Example
108///
109/// ```rust,ignore
110/// use mako_engine::message_adapter::AdapterRegistry;
111///
112/// let mut registry: AdapterRegistry<MyWorkflow> = AdapterRegistry::new();
113/// registry.register(MyFV2025Adapter);
114/// registry.register(MyFV2026Adapter);
115/// registry
116/// .validate_policy(
117/// &MyWorkflow::version_policy(),
118/// &[
119/// FormatVersion::new("FV2025-10-01"),
120/// FormatVersion::new("FV2026-10-01"),
121/// ],
122/// )
123/// .expect("all format versions must have a registered adapter");
124/// ```
125pub struct AdapterRegistry<W: Workflow> {
126 adapters: Vec<Box<dyn MessageAdapter<W>>>,
127}
128
129impl<W: Workflow> Default for AdapterRegistry<W> {
130 fn default() -> Self {
131 Self::new()
132 }
133}
134
135impl<W: Workflow> AdapterRegistry<W> {
136 /// Create an empty registry.
137 #[must_use]
138 pub fn new() -> Self {
139 Self {
140 adapters: Vec::new(),
141 }
142 }
143
144 /// Register an adapter.
145 ///
146 /// Multiple adapters can be registered. When [`AdapterRegistry::dispatch`]
147 /// is called, the first adapter that returns `true` from
148 /// `accepts_format_version` is used. Register the most specific adapters
149 /// first.
150 pub fn register(&mut self, adapter: impl MessageAdapter<W>) {
151 self.adapters.push(Box::new(adapter));
152 }
153
154 /// Dispatch `raw` to the first adapter that accepts `fv`.
155 ///
156 /// # Errors
157 ///
158 /// Returns [`EngineError::Workflow`] wrapping
159 /// `WorkflowError::other("no adapter registered for format version …")`
160 /// when no registered adapter claims `fv`.
161 ///
162 /// Propagates the adapter's own error otherwise.
163 pub fn dispatch(
164 &self,
165 raw: &dyn std::any::Any,
166 fv: &FormatVersion,
167 ) -> Result<W::Command, EngineError> {
168 for adapter in &self.adapters {
169 if adapter.accepts_format_version(fv) {
170 return adapter.adapt(raw, fv);
171 }
172 }
173 Err(EngineError::Workflow(crate::error::WorkflowError::other(
174 format!("no adapter registered for format version {fv}"),
175 )))
176 }
177
178 /// Validate that every format version in `known_fvs` is covered by at
179 /// least one registered adapter, according to `policy`.
180 ///
181 /// `known_fvs` is typically the set of all registered BDEW profiles for
182 /// the workflow's message type. In practice, call this at engine startup
183 /// with the format versions returned by `ReleaseRegistry::all_profiles()`.
184 ///
185 /// # Behaviour per policy
186 ///
187 /// | Policy | Validation rule |
188 /// |--------|-----------------|
189 /// | `Pinned` | All `known_fvs` must be covered. A Pinned workflow can be
190 /// started under any known FV; every one of them must have an adapter. |
191 /// | `ForwardCompatible` | Same — all `known_fvs` must be covered so the
192 /// workflow can handle messages in every FV it may encounter. |
193 /// | `Explicit(list)` | Only the explicitly listed FVs must be covered. |
194 ///
195 /// Passing an empty `known_fvs` slice skips all coverage checks and
196 /// always returns `Ok(())`.
197 ///
198 /// # Errors
199 ///
200 /// Returns a non-empty list of uncovered format versions. The engine
201 /// should treat this as a startup error rather than a runtime error.
202 pub fn validate_policy(
203 &self,
204 policy: &WorkflowVersionPolicy,
205 known_fvs: &[FormatVersion],
206 ) -> Result<(), Vec<FormatVersion>> {
207 let must_cover: &[FormatVersion] = match policy {
208 // Pinned and ForwardCompatible both require coverage of every
209 // currently-known FV. (For Pinned, any of the known FVs may be
210 // used as the process creation FV; for ForwardCompatible, the
211 // workflow accepts all of them.)
212 WorkflowVersionPolicy::Pinned | WorkflowVersionPolicy::ForwardCompatible => known_fvs,
213
214 // Explicit lists the exact FVs that need coverage.
215 WorkflowVersionPolicy::Explicit(required) => required.as_slice(),
216 };
217
218 let uncovered: Vec<FormatVersion> = must_cover
219 .iter()
220 .filter(|fv| !self.adapters.iter().any(|a| a.accepts_format_version(fv)))
221 .cloned()
222 .collect();
223
224 if uncovered.is_empty() {
225 Ok(())
226 } else {
227 Err(uncovered)
228 }
229 }
230
231 /// Returns the number of registered adapters.
232 #[must_use]
233 pub fn len(&self) -> usize {
234 self.adapters.len()
235 }
236
237 /// Returns `true` when no adapters are registered.
238 #[must_use]
239 pub fn is_empty(&self) -> bool {
240 self.adapters.is_empty()
241 }
242
243 /// Returns a list of all format versions for which at least one adapter
244 /// returns `true` from `accepts_format_version`, out of the given
245 /// `candidate_fvs` set.
246 #[must_use]
247 pub fn covered_versions<'a>(
248 &self,
249 candidate_fvs: &'a [FormatVersion],
250 ) -> Vec<&'a FormatVersion> {
251 candidate_fvs
252 .iter()
253 .filter(|fv| self.adapters.iter().any(|a| a.accepts_format_version(fv)))
254 .collect()
255 }
256}
257
258impl<W: Workflow> std::fmt::Debug for AdapterRegistry<W> {
259 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260 f.debug_struct("AdapterRegistry")
261 .field("adapters", &self.adapters.len())
262 .finish()
263 }
264}
265
266// ── Blanket impl for closures ─────────────────────────────────────────────────
267
268/// A simple function-based adapter constructed via
269/// [`FnAdapter::new`].
270///
271/// Use this for lightweight adapters that do not need to carry state.
272///
273/// # Example
274///
275/// ```rust,ignore
276/// use mako_engine::message_adapter::{AdapterRegistry, FnAdapter};
277///
278/// let mut registry: AdapterRegistry<MyWorkflow> = AdapterRegistry::new();
279/// registry.register(FnAdapter::new(
280/// |fv| fv.as_str() == "FV2025-10-01",
281/// |raw, _fv| {
282/// // cast raw and construct command
283/// Ok(MyCommand::Received)
284/// },
285/// ));
286/// ```
287pub struct FnAdapter<W: Workflow, A, D>
288where
289 A: Fn(&FormatVersion) -> bool + Send + Sync + 'static,
290 D: Fn(&dyn std::any::Any, &FormatVersion) -> Result<W::Command, EngineError>
291 + Send
292 + Sync
293 + 'static,
294{
295 accepts: A,
296 adapt: D,
297 _phantom: std::marker::PhantomData<W>,
298}
299
300impl<W: Workflow, A, D> FnAdapter<W, A, D>
301where
302 A: Fn(&FormatVersion) -> bool + Send + Sync + 'static,
303 D: Fn(&dyn std::any::Any, &FormatVersion) -> Result<W::Command, EngineError>
304 + Send
305 + Sync
306 + 'static,
307{
308 /// Construct an adapter from two closures.
309 pub fn new(accepts: A, adapt: D) -> Self {
310 Self {
311 accepts,
312 adapt,
313 _phantom: std::marker::PhantomData,
314 }
315 }
316}
317
318impl<W: Workflow, A, D> MessageAdapter<W> for FnAdapter<W, A, D>
319where
320 A: Fn(&FormatVersion) -> bool + Send + Sync + 'static,
321 D: Fn(&dyn std::any::Any, &FormatVersion) -> Result<W::Command, EngineError>
322 + Send
323 + Sync
324 + 'static,
325{
326 fn accepts_format_version(&self, fv: &FormatVersion) -> bool {
327 (self.accepts)(fv)
328 }
329
330 fn adapt(
331 &self,
332 raw: &dyn std::any::Any,
333 fv: &FormatVersion,
334 ) -> Result<W::Command, EngineError> {
335 (self.adapt)(raw, fv)
336 }
337}
338
339#[cfg(test)]
340mod tests {
341 use super::*;
342 use crate::{
343 error::WorkflowError,
344 version::{FormatVersion, WorkflowVersionPolicy},
345 workflow::{CommandPayload, EventPayload, Workflow},
346 };
347
348 // ── Minimal test workflow ─────────────────────────────────────────────────
349
350 #[derive(Debug, Default, Clone)]
351 struct TestState;
352
353 #[derive(Debug, serde::Serialize, serde::Deserialize)]
354 enum TestEvent {
355 Fired,
356 }
357 impl EventPayload for TestEvent {
358 fn event_type(&self) -> &'static str {
359 "Fired"
360 }
361 }
362
363 #[derive(Debug)]
364 enum TestCommand {
365 Fire,
366 }
367 impl CommandPayload for TestCommand {}
368
369 struct TestWorkflow;
370 impl Workflow for TestWorkflow {
371 type State = TestState;
372 type Event = TestEvent;
373 type Command = TestCommand;
374
375 fn apply(state: Self::State, _event: &Self::Event) -> Self::State {
376 state
377 }
378 fn handle(
379 _state: &Self::State,
380 _cmd: Self::Command,
381 ) -> Result<crate::workflow::WorkflowOutput<Self::Event>, WorkflowError> {
382 Ok(vec![TestEvent::Fired].into())
383 }
384 }
385
386 // ── Tests ─────────────────────────────────────────────────────────────────
387
388 #[test]
389 fn fn_adapter_accepts_correct_fv() {
390 let adapter: FnAdapter<TestWorkflow, _, _> = FnAdapter::new(
391 |fv| fv.as_str() == "FV2025-10-01",
392 |_raw, _fv| Ok(TestCommand::Fire),
393 );
394 let fv25 = FormatVersion::new("FV2025-10-01");
395 let fv26 = FormatVersion::new("FV2026-10-01");
396 assert!(adapter.accepts_format_version(&fv25));
397 assert!(!adapter.accepts_format_version(&fv26));
398 }
399
400 #[test]
401 fn registry_dispatches_to_first_matching_adapter() {
402 let mut registry: AdapterRegistry<TestWorkflow> = AdapterRegistry::new();
403 registry.register(FnAdapter::new(
404 |fv| fv.as_str() == "FV2025-10-01",
405 |_raw, _fv| Ok(TestCommand::Fire),
406 ));
407 let fv = FormatVersion::new("FV2025-10-01");
408 // `()` as the "raw" message — the adapter ignores it.
409 let result = registry.dispatch(&() as &dyn std::any::Any, &fv);
410 assert!(result.is_ok(), "dispatch must succeed for registered FV");
411 }
412
413 #[test]
414 fn registry_errors_on_unregistered_fv() {
415 let registry: AdapterRegistry<TestWorkflow> = AdapterRegistry::new();
416 let fv = FormatVersion::new("FV2025-10-01");
417 let result = registry.dispatch(&() as &dyn std::any::Any, &fv);
418 assert!(result.is_err(), "must return Err for unregistered FV");
419 }
420
421 #[test]
422 fn validate_policy_explicit_all_covered() {
423 let mut registry: AdapterRegistry<TestWorkflow> = AdapterRegistry::new();
424 registry.register(FnAdapter::new(
425 |fv| matches!(fv.as_str(), "FV2025-10-01" | "FV2026-10-01"),
426 |_raw, _fv| Ok(TestCommand::Fire),
427 ));
428 let policy = WorkflowVersionPolicy::Explicit(vec![
429 FormatVersion::new("FV2025-10-01"),
430 FormatVersion::new("FV2026-10-01"),
431 ]);
432 let known = vec![
433 FormatVersion::new("FV2025-10-01"),
434 FormatVersion::new("FV2026-10-01"),
435 ];
436 assert!(registry.validate_policy(&policy, &known).is_ok());
437 }
438
439 #[test]
440 fn validate_policy_explicit_gap_detected() {
441 let mut registry: AdapterRegistry<TestWorkflow> = AdapterRegistry::new();
442 // Only FV2025 adapter registered.
443 registry.register(FnAdapter::new(
444 |fv| fv.as_str() == "FV2025-10-01",
445 |_raw, _fv| Ok(TestCommand::Fire),
446 ));
447 let policy = WorkflowVersionPolicy::Explicit(vec![
448 FormatVersion::new("FV2025-10-01"),
449 FormatVersion::new("FV2026-10-01"), // <-- no adapter
450 ]);
451 let known = vec![
452 FormatVersion::new("FV2025-10-01"),
453 FormatVersion::new("FV2026-10-01"),
454 ];
455 let result = registry.validate_policy(&policy, &known);
456 assert!(result.is_err());
457 let gaps = result.unwrap_err();
458 assert_eq!(gaps.len(), 1);
459 assert_eq!(gaps[0].as_str(), "FV2026-10-01");
460 }
461
462 #[test]
463 fn validate_policy_pinned_empty_known_fvs_always_ok() {
464 // When no known FVs are supplied, there is nothing to validate —
465 // even an empty registry passes. Callers should always provide the
466 // actual registered profile list for meaningful coverage checks.
467 let registry: AdapterRegistry<TestWorkflow> = AdapterRegistry::new();
468 assert!(
469 registry
470 .validate_policy(&WorkflowVersionPolicy::Pinned, &[])
471 .is_ok()
472 );
473 }
474
475 #[test]
476 fn validate_policy_pinned_with_known_fvs_detects_gap() {
477 // Pinned policy with known FVs: all must be covered.
478 let mut registry: AdapterRegistry<TestWorkflow> = AdapterRegistry::new();
479 registry.register(FnAdapter::new(
480 |fv| fv.as_str() == "FV2025-10-01",
481 |_raw, _fv| Ok(TestCommand::Fire),
482 ));
483 let known = vec![
484 FormatVersion::new("FV2025-10-01"),
485 FormatVersion::new("FV2026-10-01"), // no adapter → gap
486 ];
487 let result = registry.validate_policy(&WorkflowVersionPolicy::Pinned, &known);
488 assert!(result.is_err());
489 assert_eq!(
490 result.unwrap_err(),
491 vec![FormatVersion::new("FV2026-10-01")]
492 );
493 }
494
495 #[test]
496 fn validate_policy_forward_compatible_with_known_fvs_detects_gap() {
497 // ForwardCompatible must cover every known FV.
498 let mut registry: AdapterRegistry<TestWorkflow> = AdapterRegistry::new();
499 registry.register(FnAdapter::new(
500 |fv| fv.as_str() == "FV2025-10-01",
501 |_raw, _fv| Ok(TestCommand::Fire),
502 ));
503 let known = vec![
504 FormatVersion::new("FV2025-10-01"),
505 FormatVersion::new("FV2026-10-01"), // no adapter → gap
506 ];
507 let result = registry.validate_policy(&WorkflowVersionPolicy::ForwardCompatible, &known);
508 assert!(result.is_err());
509 assert_eq!(
510 result.unwrap_err(),
511 vec![FormatVersion::new("FV2026-10-01")]
512 );
513 }
514
515 #[test]
516 fn covered_versions_returns_subset() {
517 let mut registry: AdapterRegistry<TestWorkflow> = AdapterRegistry::new();
518 registry.register(FnAdapter::new(
519 |fv| fv.as_str() == "FV2025-10-01",
520 |_raw, _fv| Ok(TestCommand::Fire),
521 ));
522 let candidates = vec![
523 FormatVersion::new("FV2025-10-01"),
524 FormatVersion::new("FV2026-10-01"),
525 ];
526 let covered = registry.covered_versions(&candidates);
527 assert_eq!(covered.len(), 1);
528 assert_eq!(covered[0].as_str(), "FV2025-10-01");
529 }
530}