use mako_engine::{
deadline::Deadline,
error::WorkflowError,
ids::DeadlineId,
workflow::{CommandPayload, EventPayload, Workflow, WorkflowOutput},
};
use serde::{Deserialize, Serialize};
pub const WORKFLOW_NAME: &str = "redispatch-stammdaten";
pub const ACK_WINDOW_LABEL: &str = "redispatch-stammdaten-ack-window";
pub const FORWARD_WINDOW_LABEL: &str = "redispatch-stammdaten-forward-window";
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "data")]
pub enum StammdatenEvent {
Received {
mrid: String,
sender: String,
receiver: String,
doc_type: String,
anlagen_count: u32,
received_at: String,
},
Acknowledged {
ack_mrid: String,
},
Forwarded {
upstream_mrid: String,
},
DeadlineExpired {
deadline_id: DeadlineId,
label: Box<str>,
},
}
impl EventPayload for StammdatenEvent {
fn event_type(&self) -> &'static str {
match self {
Self::Received { .. } => "StammdatenReceived",
Self::Acknowledged { .. } => "StammdatenAcknowledged",
Self::Forwarded { .. } => "StammdatenForwarded",
Self::DeadlineExpired { .. } => "StammdatenDeadlineExpired",
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ReceivedData {
pub mrid: String,
pub sender: String,
pub receiver: String,
pub doc_type: String,
pub anlagen_count: u32,
pub received_at: String,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(tag = "status", content = "data")]
pub enum StammdatenState {
#[default]
New,
Received(ReceivedData),
Acknowledged(ReceivedData),
Forwarded(ReceivedData),
DeadlineExpired {
reason: String,
},
}
impl StammdatenState {
#[must_use]
pub fn label(&self) -> &'static str {
match self {
Self::New => "New",
Self::Received(_) => "Received",
Self::Acknowledged(_) => "Acknowledged",
Self::Forwarded(_) => "Forwarded",
Self::DeadlineExpired { .. } => "DeadlineExpired",
}
}
}
#[derive(Clone)]
pub enum StammdatenCommand {
Receive {
mrid: String,
sender: String,
receiver: String,
doc_type: String,
anlagen_count: u32,
received_at: String,
},
SendAcknowledgement {
ack_mrid: String,
},
Forward {
upstream_mrid: String,
},
TimeoutExpired {
deadline_id: DeadlineId,
label: Box<str>,
},
}
impl CommandPayload for StammdatenCommand {}
pub struct StammdatenWorkflow;
impl Workflow for StammdatenWorkflow {
type State = StammdatenState;
type Event = StammdatenEvent;
type Command = StammdatenCommand;
fn on_deadline(deadline: &Deadline, state: &Self::State) -> Option<Self::Command> {
match (deadline.label(), state) {
(ACK_WINDOW_LABEL, StammdatenState::Received(_)) => {
Some(StammdatenCommand::TimeoutExpired {
deadline_id: deadline.deadline_id(),
label: deadline.label().into(),
})
}
_ => None,
}
}
fn apply(state: Self::State, event: &Self::Event) -> Self::State {
match event {
StammdatenEvent::Received {
mrid,
sender,
receiver,
doc_type,
anlagen_count,
received_at,
} => StammdatenState::Received(ReceivedData {
mrid: mrid.clone(),
sender: sender.clone(),
receiver: receiver.clone(),
doc_type: doc_type.clone(),
anlagen_count: *anlagen_count,
received_at: received_at.clone(),
}),
StammdatenEvent::Acknowledged { .. } => match state {
StammdatenState::Received(data) => StammdatenState::Acknowledged(data),
other => other,
},
StammdatenEvent::Forwarded { .. } => match state {
StammdatenState::Acknowledged(data) => StammdatenState::Forwarded(data),
other => other,
},
StammdatenEvent::DeadlineExpired { label, .. } => StammdatenState::DeadlineExpired {
reason: format!("deadline expired: {label}"),
},
}
}
fn handle(
state: &Self::State,
command: Self::Command,
) -> Result<WorkflowOutput<Self::Event>, WorkflowError> {
match command {
StammdatenCommand::Receive {
mrid,
sender,
receiver,
doc_type,
anlagen_count,
received_at,
} => {
if !matches!(state, StammdatenState::New) {
return Ok(vec![].into());
}
Ok(vec![StammdatenEvent::Received {
mrid,
sender,
receiver,
doc_type,
anlagen_count,
received_at,
}]
.into())
}
StammdatenCommand::SendAcknowledgement { ack_mrid } => match state {
StammdatenState::Received(_) => {
Ok(vec![StammdatenEvent::Acknowledged { ack_mrid }].into())
}
StammdatenState::Acknowledged(_) | StammdatenState::Forwarded(_) => {
Ok(vec![].into())
}
other => Err(WorkflowError::rejected(format!(
"SendAcknowledgement not valid in state {}",
other.label()
))),
},
StammdatenCommand::Forward { upstream_mrid } => match state {
StammdatenState::Acknowledged(_) => {
Ok(vec![StammdatenEvent::Forwarded { upstream_mrid }].into())
}
StammdatenState::Forwarded(_) => {
Ok(vec![].into())
}
other => Err(WorkflowError::rejected(format!(
"Forward not valid in state {}",
other.label()
))),
},
StammdatenCommand::TimeoutExpired { deadline_id, label } => {
match state {
StammdatenState::Acknowledged(_)
| StammdatenState::Forwarded(_)
| StammdatenState::DeadlineExpired { .. } => Ok(vec![].into()),
_ => Ok(vec![StammdatenEvent::DeadlineExpired { deadline_id, label }].into()),
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use mako_engine::ids::DeadlineId;
fn received_cmd() -> StammdatenCommand {
StammdatenCommand::Receive {
mrid: "mrid-001".into(),
sender: "4012345000001".into(),
receiver: "4012345000002".into(),
doc_type: "Z02".into(),
anlagen_count: 3,
received_at: "2025-10-15T10:00:00Z".into(),
}
}
#[test]
fn receive_transitions_new_to_received() {
let state = StammdatenState::New;
let output = StammdatenWorkflow::handle(&state, received_cmd()).unwrap();
assert_eq!(output.events.len(), 1);
let new_state = StammdatenWorkflow::apply(state, &output.events[0]);
assert!(matches!(new_state, StammdatenState::Received(_)));
}
#[test]
fn acknowledge_transitions_received_to_acknowledged() {
let state = StammdatenState::Received(ReceivedData {
mrid: "m".into(),
sender: "s".into(),
receiver: "r".into(),
doc_type: "Z02".into(),
anlagen_count: 1,
received_at: "2025-10-15T10:00:00Z".into(),
});
let output = StammdatenWorkflow::handle(
&state,
StammdatenCommand::SendAcknowledgement {
ack_mrid: "ack-001".into(),
},
)
.unwrap();
assert_eq!(output.events.len(), 1);
let new_state = StammdatenWorkflow::apply(state, &output.events[0]);
assert!(matches!(new_state, StammdatenState::Acknowledged(_)));
}
#[test]
fn forward_requires_acknowledged_state() {
let state = StammdatenState::Received(ReceivedData {
mrid: "m".into(),
sender: "s".into(),
receiver: "r".into(),
doc_type: "Z03".into(),
anlagen_count: 1,
received_at: "2025-10-15T10:00:00Z".into(),
});
let result = StammdatenWorkflow::handle(
&state,
StammdatenCommand::Forward {
upstream_mrid: "u".into(),
},
);
assert!(result.is_err());
}
#[test]
fn timeout_in_received_state_emits_deadline_expired() {
let state = StammdatenState::Received(ReceivedData {
mrid: "m".into(),
sender: "s".into(),
receiver: "r".into(),
doc_type: "Z02".into(),
anlagen_count: 1,
received_at: "2025-10-15T10:00:00Z".into(),
});
let output = StammdatenWorkflow::handle(
&state,
StammdatenCommand::TimeoutExpired {
deadline_id: DeadlineId::new(),
label: ACK_WINDOW_LABEL.into(),
},
)
.unwrap();
assert!(matches!(
output.events.as_slice(),
[StammdatenEvent::DeadlineExpired { .. }]
));
}
#[test]
fn timeout_in_acknowledged_state_is_noop() {
let state = StammdatenState::Acknowledged(ReceivedData {
mrid: "m".into(),
sender: "s".into(),
receiver: "r".into(),
doc_type: "Z02".into(),
anlagen_count: 1,
received_at: "2025-10-15T10:00:00Z".into(),
});
let output = StammdatenWorkflow::handle(
&state,
StammdatenCommand::TimeoutExpired {
deadline_id: DeadlineId::new(),
label: ACK_WINDOW_LABEL.into(),
},
)
.unwrap();
assert!(output.events.is_empty());
}
}