Skip to main content

mako_engine/
message_adapter.rs

1//! [`MessageAdapter`] — cross-format-version message-to-command translation.
2//!
3//! # Problem
4//!
5//! A GPKE process started under `FV2025-10-01` may still be in-flight when
6//! `FV2026-10-01` goes live.  The counterparty begins sending APERAK messages
7//! in the new AHB format before the process completes.  The field that signals
8//! acceptance may have moved (e.g. a new qualifier in BGM DE 1001), or an
9//! additional mandatory DTM has been added.
10//!
11//! Without an explicit adapter, each workflow handles this ad-hoc inside its
12//! command constructor, making the mapping invisible, untested, and easy to
13//! forget when a new release cycle arrives.
14//!
15//! # Solution
16//!
17//! `MessageAdapter<W>` is the type-system home for all format-version-specific
18//! translation logic.  An adapter declares which format versions it can handle
19//! (`accepts_format_version`), receives a parsed `AnyMessage`, and returns the
20//! domain command to dispatch.
21//!
22//! Adapters are registered in an [`AdapterRegistry`] at engine startup.  The
23//! registry validates at registration time that all format versions in the
24//! workflow's [`WorkflowVersionPolicy`] have a registered adapter.
25//!
26//! # Example
27//!
28//! ```rust,ignore
29//! use mako_engine::message_adapter::{AdapterRegistry, MessageAdapter};
30//! use mako_engine::version::FormatVersion;
31//! use mako_engine::error::EngineError;
32//!
33//! struct GpkeAperakAdapter;
34//!
35//! impl MessageAdapter<GpkeWorkflow> for GpkeAperakAdapter {
36//!     fn accepts_format_version(&self, fv: &FormatVersion) -> bool {
37//!         matches!(fv.as_str(), "FV2025-10-01" | "FV2026-10-01")
38//!     }
39//!
40//!     fn adapt(
41//!         &self,
42//!         msg: &dyn std::any::Any,
43//!         fv: &FormatVersion,
44//!     ) -> Result<GpkeCommand, EngineError> {
45//!         // parse `msg` as APERAK and construct the appropriate command
46//!         Ok(GpkeCommand::ReceiveAperak { positive: true })
47//!     }
48//! }
49//!
50//! let mut registry: AdapterRegistry<GpkeWorkflow> = AdapterRegistry::new();
51//! registry.register(GpkeAperakAdapter);
52//! ```
53
54use crate::{
55    error::EngineError,
56    version::{FormatVersion, WorkflowVersionPolicy},
57    workflow::Workflow,
58};
59
60// ── MessageAdapter trait ──────────────────────────────────────────────────────
61
62/// Translates a parsed EDIFACT message into a domain command for workflow `W`.
63///
64/// Implement one `MessageAdapter` per (message type, format version range)
65/// combination that your workflow needs to handle.  An adapter that handles
66/// multiple format versions via internal branching is also valid.
67///
68/// # Thread safety
69///
70/// Adapters must be `Send + Sync + 'static` because they are stored in an
71/// [`AdapterRegistry`] that is shared across async tasks.
72pub trait MessageAdapter<W: Workflow>: Send + Sync + 'static {
73    /// Returns `true` when this adapter can translate messages formatted under
74    /// `fv`.
75    ///
76    /// The [`AdapterRegistry`] calls this during validation to confirm that
77    /// every format version declared by the workflow's
78    /// [`WorkflowVersionPolicy`] is covered.
79    fn accepts_format_version(&self, fv: &FormatVersion) -> bool;
80
81    /// Translate a raw parsed message into a domain command.
82    ///
83    /// `fv` is the format version detected from the wire message (e.g. from
84    /// `EdiEnergyMessage::detect_release`).  Use it to select the correct
85    /// field mapping when the adapter handles multiple format versions.
86    ///
87    /// # Errors
88    ///
89    /// Return [`EngineError::Workflow`] when the message is structurally valid
90    /// but semantically inappropriate for this command (e.g. wrong PID).
91    ///
92    /// Return [`EngineError::Deserialization`] when a required field is absent
93    /// or malformed.
94    fn adapt(&self, raw: &dyn std::any::Any, fv: &FormatVersion)
95    -> Result<W::Command, EngineError>;
96}
97
98// ── AdapterRegistry ───────────────────────────────────────────────────────────
99
100/// Runtime registry of [`MessageAdapter`]s for a single workflow type `W`.
101///
102/// Adapters are registered at startup via [`AdapterRegistry::register`].
103/// After all adapters are registered, call [`AdapterRegistry::validate_policy`]
104/// to confirm that every format version declared in the workflow's
105/// [`WorkflowVersionPolicy`] is covered by at least one adapter.
106///
107/// # Example
108///
109/// ```rust,ignore
110/// use mako_engine::message_adapter::AdapterRegistry;
111///
112/// let mut registry: AdapterRegistry<MyWorkflow> = AdapterRegistry::new();
113/// registry.register(MyFV2025Adapter);
114/// registry.register(MyFV2026Adapter);
115/// registry
116///     .validate_policy(
117///         &MyWorkflow::version_policy(),
118///         &[
119///             FormatVersion::new("FV2025-10-01"),
120///             FormatVersion::new("FV2026-10-01"),
121///         ],
122///     )
123///     .expect("all format versions must have a registered adapter");
124/// ```
125pub struct AdapterRegistry<W: Workflow> {
126    adapters: Vec<Box<dyn MessageAdapter<W>>>,
127}
128
129impl<W: Workflow> Default for AdapterRegistry<W> {
130    fn default() -> Self {
131        Self::new()
132    }
133}
134
135impl<W: Workflow> AdapterRegistry<W> {
136    /// Create an empty registry.
137    #[must_use]
138    pub fn new() -> Self {
139        Self {
140            adapters: Vec::new(),
141        }
142    }
143
144    /// Register an adapter.
145    ///
146    /// Multiple adapters can be registered.  When [`AdapterRegistry::dispatch`]
147    /// is called, the first adapter that returns `true` from
148    /// `accepts_format_version` is used.  Register the most specific adapters
149    /// first.
150    pub fn register(&mut self, adapter: impl MessageAdapter<W>) {
151        self.adapters.push(Box::new(adapter));
152    }
153
154    /// Dispatch `raw` to the first adapter that accepts `fv`.
155    ///
156    /// # Errors
157    ///
158    /// Returns [`EngineError::Workflow`] wrapping
159    /// `WorkflowError::other("no adapter registered for format version …")`
160    /// when no registered adapter claims `fv`.
161    ///
162    /// Propagates the adapter's own error otherwise.
163    pub fn dispatch(
164        &self,
165        raw: &dyn std::any::Any,
166        fv: &FormatVersion,
167    ) -> Result<W::Command, EngineError> {
168        for adapter in &self.adapters {
169            if adapter.accepts_format_version(fv) {
170                return adapter.adapt(raw, fv);
171            }
172        }
173        Err(EngineError::Workflow(crate::error::WorkflowError::other(
174            format!("no adapter registered for format version {fv}"),
175        )))
176    }
177
178    /// Validate that every format version in `known_fvs` is covered by at
179    /// least one registered adapter, according to `policy`.
180    ///
181    /// `known_fvs` is typically the set of all registered BDEW profiles for
182    /// the workflow's message type.  In practice, call this at engine startup
183    /// with the format versions returned by `ReleaseRegistry::all_profiles()`.
184    ///
185    /// # Behaviour per policy
186    ///
187    /// | Policy | Validation rule |
188    /// |--------|-----------------|
189    /// | `Pinned` | All `known_fvs` must be covered. A Pinned workflow can be
190    ///   started under any known FV; every one of them must have an adapter. |
191    /// | `ForwardCompatible` | Same — all `known_fvs` must be covered so the
192    ///   workflow can handle messages in every FV it may encounter. |
193    /// | `Explicit(list)` | Only the explicitly listed FVs must be covered. |
194    ///
195    /// Passing an empty `known_fvs` slice skips all coverage checks and
196    /// always returns `Ok(())`.
197    ///
198    /// # Errors
199    ///
200    /// Returns a non-empty list of uncovered format versions.  The engine
201    /// should treat this as a startup error rather than a runtime error.
202    pub fn validate_policy(
203        &self,
204        policy: &WorkflowVersionPolicy,
205        known_fvs: &[FormatVersion],
206    ) -> Result<(), Vec<FormatVersion>> {
207        let must_cover: &[FormatVersion] = match policy {
208            // Pinned and ForwardCompatible both require coverage of every
209            // currently-known FV.  (For Pinned, any of the known FVs may be
210            // used as the process creation FV; for ForwardCompatible, the
211            // workflow accepts all of them.)
212            WorkflowVersionPolicy::Pinned | WorkflowVersionPolicy::ForwardCompatible => known_fvs,
213
214            // Explicit lists the exact FVs that need coverage.
215            WorkflowVersionPolicy::Explicit(required) => required.as_slice(),
216        };
217
218        let uncovered: Vec<FormatVersion> = must_cover
219            .iter()
220            .filter(|fv| !self.adapters.iter().any(|a| a.accepts_format_version(fv)))
221            .cloned()
222            .collect();
223
224        if uncovered.is_empty() {
225            Ok(())
226        } else {
227            Err(uncovered)
228        }
229    }
230
231    /// Returns the number of registered adapters.
232    #[must_use]
233    pub fn len(&self) -> usize {
234        self.adapters.len()
235    }
236
237    /// Returns `true` when no adapters are registered.
238    #[must_use]
239    pub fn is_empty(&self) -> bool {
240        self.adapters.is_empty()
241    }
242
243    /// Returns a list of all format versions for which at least one adapter
244    /// returns `true` from `accepts_format_version`, out of the given
245    /// `candidate_fvs` set.
246    #[must_use]
247    pub fn covered_versions<'a>(
248        &self,
249        candidate_fvs: &'a [FormatVersion],
250    ) -> Vec<&'a FormatVersion> {
251        candidate_fvs
252            .iter()
253            .filter(|fv| self.adapters.iter().any(|a| a.accepts_format_version(fv)))
254            .collect()
255    }
256}
257
258impl<W: Workflow> std::fmt::Debug for AdapterRegistry<W> {
259    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260        f.debug_struct("AdapterRegistry")
261            .field("adapters", &self.adapters.len())
262            .finish()
263    }
264}
265
266// ── Blanket impl for closures ─────────────────────────────────────────────────
267
268/// A simple function-based adapter constructed via
269/// [`FnAdapter::new`].
270///
271/// Use this for lightweight adapters that do not need to carry state.
272///
273/// # Example
274///
275/// ```rust,ignore
276/// use mako_engine::message_adapter::{AdapterRegistry, FnAdapter};
277///
278/// let mut registry: AdapterRegistry<MyWorkflow> = AdapterRegistry::new();
279/// registry.register(FnAdapter::new(
280///     |fv| fv.as_str() == "FV2025-10-01",
281///     |raw, _fv| {
282///         // cast raw and construct command
283///         Ok(MyCommand::Received)
284///     },
285/// ));
286/// ```
287pub struct FnAdapter<W: Workflow, A, D>
288where
289    A: Fn(&FormatVersion) -> bool + Send + Sync + 'static,
290    D: Fn(&dyn std::any::Any, &FormatVersion) -> Result<W::Command, EngineError>
291        + Send
292        + Sync
293        + 'static,
294{
295    accepts: A,
296    adapt: D,
297    _phantom: std::marker::PhantomData<W>,
298}
299
300impl<W: Workflow, A, D> FnAdapter<W, A, D>
301where
302    A: Fn(&FormatVersion) -> bool + Send + Sync + 'static,
303    D: Fn(&dyn std::any::Any, &FormatVersion) -> Result<W::Command, EngineError>
304        + Send
305        + Sync
306        + 'static,
307{
308    /// Construct an adapter from two closures.
309    pub fn new(accepts: A, adapt: D) -> Self {
310        Self {
311            accepts,
312            adapt,
313            _phantom: std::marker::PhantomData,
314        }
315    }
316}
317
318impl<W: Workflow, A, D> MessageAdapter<W> for FnAdapter<W, A, D>
319where
320    A: Fn(&FormatVersion) -> bool + Send + Sync + 'static,
321    D: Fn(&dyn std::any::Any, &FormatVersion) -> Result<W::Command, EngineError>
322        + Send
323        + Sync
324        + 'static,
325{
326    fn accepts_format_version(&self, fv: &FormatVersion) -> bool {
327        (self.accepts)(fv)
328    }
329
330    fn adapt(
331        &self,
332        raw: &dyn std::any::Any,
333        fv: &FormatVersion,
334    ) -> Result<W::Command, EngineError> {
335        (self.adapt)(raw, fv)
336    }
337}
338
339#[cfg(test)]
340mod tests {
341    use super::*;
342    use crate::{
343        error::WorkflowError,
344        version::{FormatVersion, WorkflowVersionPolicy},
345        workflow::{CommandPayload, EventPayload, Workflow},
346    };
347
348    // ── Minimal test workflow ─────────────────────────────────────────────────
349
350    #[derive(Debug, Default, Clone)]
351    struct TestState;
352
353    #[derive(Debug, serde::Serialize, serde::Deserialize)]
354    enum TestEvent {
355        Fired,
356    }
357    impl EventPayload for TestEvent {
358        fn event_type(&self) -> &'static str {
359            "Fired"
360        }
361    }
362
363    #[derive(Debug)]
364    enum TestCommand {
365        Fire,
366    }
367    impl CommandPayload for TestCommand {}
368
369    struct TestWorkflow;
370    impl Workflow for TestWorkflow {
371        type State = TestState;
372        type Event = TestEvent;
373        type Command = TestCommand;
374
375        fn apply(state: Self::State, _event: &Self::Event) -> Self::State {
376            state
377        }
378        fn handle(
379            _state: &Self::State,
380            _cmd: Self::Command,
381        ) -> Result<crate::workflow::WorkflowOutput<Self::Event>, WorkflowError> {
382            Ok(vec![TestEvent::Fired].into())
383        }
384    }
385
386    // ── Tests ─────────────────────────────────────────────────────────────────
387
388    #[test]
389    fn fn_adapter_accepts_correct_fv() {
390        let adapter: FnAdapter<TestWorkflow, _, _> = FnAdapter::new(
391            |fv| fv.as_str() == "FV2025-10-01",
392            |_raw, _fv| Ok(TestCommand::Fire),
393        );
394        let fv25 = FormatVersion::new("FV2025-10-01");
395        let fv26 = FormatVersion::new("FV2026-10-01");
396        assert!(adapter.accepts_format_version(&fv25));
397        assert!(!adapter.accepts_format_version(&fv26));
398    }
399
400    #[test]
401    fn registry_dispatches_to_first_matching_adapter() {
402        let mut registry: AdapterRegistry<TestWorkflow> = AdapterRegistry::new();
403        registry.register(FnAdapter::new(
404            |fv| fv.as_str() == "FV2025-10-01",
405            |_raw, _fv| Ok(TestCommand::Fire),
406        ));
407        let fv = FormatVersion::new("FV2025-10-01");
408        // `()` as the "raw" message — the adapter ignores it.
409        let result = registry.dispatch(&() as &dyn std::any::Any, &fv);
410        assert!(result.is_ok(), "dispatch must succeed for registered FV");
411    }
412
413    #[test]
414    fn registry_errors_on_unregistered_fv() {
415        let registry: AdapterRegistry<TestWorkflow> = AdapterRegistry::new();
416        let fv = FormatVersion::new("FV2025-10-01");
417        let result = registry.dispatch(&() as &dyn std::any::Any, &fv);
418        assert!(result.is_err(), "must return Err for unregistered FV");
419    }
420
421    #[test]
422    fn validate_policy_explicit_all_covered() {
423        let mut registry: AdapterRegistry<TestWorkflow> = AdapterRegistry::new();
424        registry.register(FnAdapter::new(
425            |fv| matches!(fv.as_str(), "FV2025-10-01" | "FV2026-10-01"),
426            |_raw, _fv| Ok(TestCommand::Fire),
427        ));
428        let policy = WorkflowVersionPolicy::Explicit(vec![
429            FormatVersion::new("FV2025-10-01"),
430            FormatVersion::new("FV2026-10-01"),
431        ]);
432        let known = vec![
433            FormatVersion::new("FV2025-10-01"),
434            FormatVersion::new("FV2026-10-01"),
435        ];
436        assert!(registry.validate_policy(&policy, &known).is_ok());
437    }
438
439    #[test]
440    fn validate_policy_explicit_gap_detected() {
441        let mut registry: AdapterRegistry<TestWorkflow> = AdapterRegistry::new();
442        // Only FV2025 adapter registered.
443        registry.register(FnAdapter::new(
444            |fv| fv.as_str() == "FV2025-10-01",
445            |_raw, _fv| Ok(TestCommand::Fire),
446        ));
447        let policy = WorkflowVersionPolicy::Explicit(vec![
448            FormatVersion::new("FV2025-10-01"),
449            FormatVersion::new("FV2026-10-01"), // <-- no adapter
450        ]);
451        let known = vec![
452            FormatVersion::new("FV2025-10-01"),
453            FormatVersion::new("FV2026-10-01"),
454        ];
455        let result = registry.validate_policy(&policy, &known);
456        assert!(result.is_err());
457        let gaps = result.unwrap_err();
458        assert_eq!(gaps.len(), 1);
459        assert_eq!(gaps[0].as_str(), "FV2026-10-01");
460    }
461
462    #[test]
463    fn validate_policy_pinned_empty_known_fvs_always_ok() {
464        // When no known FVs are supplied, there is nothing to validate —
465        // even an empty registry passes.  Callers should always provide the
466        // actual registered profile list for meaningful coverage checks.
467        let registry: AdapterRegistry<TestWorkflow> = AdapterRegistry::new();
468        assert!(
469            registry
470                .validate_policy(&WorkflowVersionPolicy::Pinned, &[])
471                .is_ok()
472        );
473    }
474
475    #[test]
476    fn validate_policy_pinned_with_known_fvs_detects_gap() {
477        // Pinned policy with known FVs: all must be covered.
478        let mut registry: AdapterRegistry<TestWorkflow> = AdapterRegistry::new();
479        registry.register(FnAdapter::new(
480            |fv| fv.as_str() == "FV2025-10-01",
481            |_raw, _fv| Ok(TestCommand::Fire),
482        ));
483        let known = vec![
484            FormatVersion::new("FV2025-10-01"),
485            FormatVersion::new("FV2026-10-01"), // no adapter → gap
486        ];
487        let result = registry.validate_policy(&WorkflowVersionPolicy::Pinned, &known);
488        assert!(result.is_err());
489        assert_eq!(
490            result.unwrap_err(),
491            vec![FormatVersion::new("FV2026-10-01")]
492        );
493    }
494
495    #[test]
496    fn validate_policy_forward_compatible_with_known_fvs_detects_gap() {
497        // ForwardCompatible must cover every known FV.
498        let mut registry: AdapterRegistry<TestWorkflow> = AdapterRegistry::new();
499        registry.register(FnAdapter::new(
500            |fv| fv.as_str() == "FV2025-10-01",
501            |_raw, _fv| Ok(TestCommand::Fire),
502        ));
503        let known = vec![
504            FormatVersion::new("FV2025-10-01"),
505            FormatVersion::new("FV2026-10-01"), // no adapter → gap
506        ];
507        let result = registry.validate_policy(&WorkflowVersionPolicy::ForwardCompatible, &known);
508        assert!(result.is_err());
509        assert_eq!(
510            result.unwrap_err(),
511            vec![FormatVersion::new("FV2026-10-01")]
512        );
513    }
514
515    #[test]
516    fn covered_versions_returns_subset() {
517        let mut registry: AdapterRegistry<TestWorkflow> = AdapterRegistry::new();
518        registry.register(FnAdapter::new(
519            |fv| fv.as_str() == "FV2025-10-01",
520            |_raw, _fv| Ok(TestCommand::Fire),
521        ));
522        let candidates = vec![
523            FormatVersion::new("FV2025-10-01"),
524            FormatVersion::new("FV2026-10-01"),
525        ];
526        let covered = registry.covered_versions(&candidates);
527        assert_eq!(covered.len(), 1);
528        assert_eq!(covered[0].as_str(), "FV2025-10-01");
529    }
530}