use crate::{
error::EngineError,
version::{FormatVersion, WorkflowVersionPolicy},
workflow::Workflow,
};
pub trait MessageAdapter<W: Workflow>: Send + Sync + 'static {
fn accepts_format_version(&self, fv: &FormatVersion) -> bool;
fn adapt(&self, raw: &dyn std::any::Any, fv: &FormatVersion)
-> Result<W::Command, EngineError>;
}
pub struct AdapterRegistry<W: Workflow> {
adapters: Vec<Box<dyn MessageAdapter<W>>>,
}
impl<W: Workflow> Default for AdapterRegistry<W> {
fn default() -> Self {
Self::new()
}
}
impl<W: Workflow> AdapterRegistry<W> {
#[must_use]
pub fn new() -> Self {
Self {
adapters: Vec::new(),
}
}
pub fn register(&mut self, adapter: impl MessageAdapter<W>) {
self.adapters.push(Box::new(adapter));
}
pub fn dispatch(
&self,
raw: &dyn std::any::Any,
fv: &FormatVersion,
) -> Result<W::Command, EngineError> {
for adapter in &self.adapters {
if adapter.accepts_format_version(fv) {
return adapter.adapt(raw, fv);
}
}
Err(EngineError::Workflow(crate::error::WorkflowError::other(
format!("no adapter registered for format version {fv}"),
)))
}
pub fn validate_policy(
&self,
policy: &WorkflowVersionPolicy,
known_fvs: &[FormatVersion],
) -> Result<(), Vec<FormatVersion>> {
let must_cover: &[FormatVersion] = match policy {
WorkflowVersionPolicy::Pinned | WorkflowVersionPolicy::ForwardCompatible => known_fvs,
WorkflowVersionPolicy::Explicit(required) => required.as_slice(),
};
let uncovered: Vec<FormatVersion> = must_cover
.iter()
.filter(|fv| !self.adapters.iter().any(|a| a.accepts_format_version(fv)))
.cloned()
.collect();
if uncovered.is_empty() {
Ok(())
} else {
Err(uncovered)
}
}
#[must_use]
pub fn len(&self) -> usize {
self.adapters.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.adapters.is_empty()
}
#[must_use]
pub fn covered_versions<'a>(
&self,
candidate_fvs: &'a [FormatVersion],
) -> Vec<&'a FormatVersion> {
candidate_fvs
.iter()
.filter(|fv| self.adapters.iter().any(|a| a.accepts_format_version(fv)))
.collect()
}
}
impl<W: Workflow> std::fmt::Debug for AdapterRegistry<W> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AdapterRegistry")
.field("adapters", &self.adapters.len())
.finish()
}
}
pub struct FnAdapter<W: Workflow, A, D>
where
A: Fn(&FormatVersion) -> bool + Send + Sync + 'static,
D: Fn(&dyn std::any::Any, &FormatVersion) -> Result<W::Command, EngineError>
+ Send
+ Sync
+ 'static,
{
accepts: A,
adapt: D,
_phantom: std::marker::PhantomData<W>,
}
impl<W: Workflow, A, D> FnAdapter<W, A, D>
where
A: Fn(&FormatVersion) -> bool + Send + Sync + 'static,
D: Fn(&dyn std::any::Any, &FormatVersion) -> Result<W::Command, EngineError>
+ Send
+ Sync
+ 'static,
{
pub fn new(accepts: A, adapt: D) -> Self {
Self {
accepts,
adapt,
_phantom: std::marker::PhantomData,
}
}
}
impl<W: Workflow, A, D> MessageAdapter<W> for FnAdapter<W, A, D>
where
A: Fn(&FormatVersion) -> bool + Send + Sync + 'static,
D: Fn(&dyn std::any::Any, &FormatVersion) -> Result<W::Command, EngineError>
+ Send
+ Sync
+ 'static,
{
fn accepts_format_version(&self, fv: &FormatVersion) -> bool {
(self.accepts)(fv)
}
fn adapt(
&self,
raw: &dyn std::any::Any,
fv: &FormatVersion,
) -> Result<W::Command, EngineError> {
(self.adapt)(raw, fv)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
error::WorkflowError,
version::{FormatVersion, WorkflowVersionPolicy},
workflow::{CommandPayload, EventPayload, Workflow},
};
#[derive(Debug, Default, Clone)]
struct TestState;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
enum TestEvent {
Fired,
}
impl EventPayload for TestEvent {
fn event_type(&self) -> &'static str {
"Fired"
}
}
#[derive(Debug)]
enum TestCommand {
Fire,
}
impl CommandPayload for TestCommand {}
struct TestWorkflow;
impl Workflow for TestWorkflow {
type State = TestState;
type Event = TestEvent;
type Command = TestCommand;
fn apply(state: Self::State, _event: &Self::Event) -> Self::State {
state
}
fn handle(
_state: &Self::State,
_cmd: Self::Command,
) -> Result<crate::workflow::WorkflowOutput<Self::Event>, WorkflowError> {
Ok(vec![TestEvent::Fired].into())
}
}
#[test]
fn fn_adapter_accepts_correct_fv() {
let adapter: FnAdapter<TestWorkflow, _, _> = FnAdapter::new(
|fv| fv.as_str() == "FV2025-10-01",
|_raw, _fv| Ok(TestCommand::Fire),
);
let fv25 = FormatVersion::new("FV2025-10-01");
let fv26 = FormatVersion::new("FV2026-10-01");
assert!(adapter.accepts_format_version(&fv25));
assert!(!adapter.accepts_format_version(&fv26));
}
#[test]
fn registry_dispatches_to_first_matching_adapter() {
let mut registry: AdapterRegistry<TestWorkflow> = AdapterRegistry::new();
registry.register(FnAdapter::new(
|fv| fv.as_str() == "FV2025-10-01",
|_raw, _fv| Ok(TestCommand::Fire),
));
let fv = FormatVersion::new("FV2025-10-01");
let result = registry.dispatch(&() as &dyn std::any::Any, &fv);
assert!(result.is_ok(), "dispatch must succeed for registered FV");
}
#[test]
fn registry_errors_on_unregistered_fv() {
let registry: AdapterRegistry<TestWorkflow> = AdapterRegistry::new();
let fv = FormatVersion::new("FV2025-10-01");
let result = registry.dispatch(&() as &dyn std::any::Any, &fv);
assert!(result.is_err(), "must return Err for unregistered FV");
}
#[test]
fn validate_policy_explicit_all_covered() {
let mut registry: AdapterRegistry<TestWorkflow> = AdapterRegistry::new();
registry.register(FnAdapter::new(
|fv| matches!(fv.as_str(), "FV2025-10-01" | "FV2026-10-01"),
|_raw, _fv| Ok(TestCommand::Fire),
));
let policy = WorkflowVersionPolicy::Explicit(vec![
FormatVersion::new("FV2025-10-01"),
FormatVersion::new("FV2026-10-01"),
]);
let known = vec![
FormatVersion::new("FV2025-10-01"),
FormatVersion::new("FV2026-10-01"),
];
assert!(registry.validate_policy(&policy, &known).is_ok());
}
#[test]
fn validate_policy_explicit_gap_detected() {
let mut registry: AdapterRegistry<TestWorkflow> = AdapterRegistry::new();
registry.register(FnAdapter::new(
|fv| fv.as_str() == "FV2025-10-01",
|_raw, _fv| Ok(TestCommand::Fire),
));
let policy = WorkflowVersionPolicy::Explicit(vec![
FormatVersion::new("FV2025-10-01"),
FormatVersion::new("FV2026-10-01"), ]);
let known = vec![
FormatVersion::new("FV2025-10-01"),
FormatVersion::new("FV2026-10-01"),
];
let result = registry.validate_policy(&policy, &known);
assert!(result.is_err());
let gaps = result.unwrap_err();
assert_eq!(gaps.len(), 1);
assert_eq!(gaps[0].as_str(), "FV2026-10-01");
}
#[test]
fn validate_policy_pinned_empty_known_fvs_always_ok() {
let registry: AdapterRegistry<TestWorkflow> = AdapterRegistry::new();
assert!(
registry
.validate_policy(&WorkflowVersionPolicy::Pinned, &[])
.is_ok()
);
}
#[test]
fn validate_policy_pinned_with_known_fvs_detects_gap() {
let mut registry: AdapterRegistry<TestWorkflow> = AdapterRegistry::new();
registry.register(FnAdapter::new(
|fv| fv.as_str() == "FV2025-10-01",
|_raw, _fv| Ok(TestCommand::Fire),
));
let known = vec![
FormatVersion::new("FV2025-10-01"),
FormatVersion::new("FV2026-10-01"), ];
let result = registry.validate_policy(&WorkflowVersionPolicy::Pinned, &known);
assert!(result.is_err());
assert_eq!(
result.unwrap_err(),
vec![FormatVersion::new("FV2026-10-01")]
);
}
#[test]
fn validate_policy_forward_compatible_with_known_fvs_detects_gap() {
let mut registry: AdapterRegistry<TestWorkflow> = AdapterRegistry::new();
registry.register(FnAdapter::new(
|fv| fv.as_str() == "FV2025-10-01",
|_raw, _fv| Ok(TestCommand::Fire),
));
let known = vec![
FormatVersion::new("FV2025-10-01"),
FormatVersion::new("FV2026-10-01"), ];
let result = registry.validate_policy(&WorkflowVersionPolicy::ForwardCompatible, &known);
assert!(result.is_err());
assert_eq!(
result.unwrap_err(),
vec![FormatVersion::new("FV2026-10-01")]
);
}
#[test]
fn covered_versions_returns_subset() {
let mut registry: AdapterRegistry<TestWorkflow> = AdapterRegistry::new();
registry.register(FnAdapter::new(
|fv| fv.as_str() == "FV2025-10-01",
|_raw, _fv| Ok(TestCommand::Fire),
));
let candidates = vec![
FormatVersion::new("FV2025-10-01"),
FormatVersion::new("FV2026-10-01"),
];
let covered = registry.covered_versions(&candidates);
assert_eq!(covered.len(), 1);
assert_eq!(covered[0].as_str(), "FV2025-10-01");
}
}