use mako_engine::{
deadline::Deadline,
error::WorkflowError,
ids::DeadlineId,
workflow::{CommandPayload, EventPayload, Workflow, WorkflowOutput},
};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "data")]
pub enum AckForwardEvent {
Received {
mrid: String,
doc_type: String,
sender: String,
receiver: String,
received_at: String,
},
Acknowledged {
ack_mrid: String,
},
Forwarded {
upstream_mrid: String,
},
DeadlineExpired {
deadline_id: DeadlineId,
label: Box<str>,
},
}
#[derive(Clone)]
pub enum AckForwardCommand {
Receive {
mrid: String,
doc_type: String,
sender: String,
receiver: String,
received_at: String,
},
Acknowledge {
ack_mrid: String,
},
Forward {
upstream_mrid: String,
},
TimeoutExpired {
deadline_id: DeadlineId,
label: Box<str>,
},
}
impl CommandPayload for AckForwardCommand {}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ReceivedData {
pub mrid: String,
pub doc_type: String,
pub sender: String,
pub receiver: String,
pub received_at: String,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(tag = "status", content = "data")]
pub enum AckForwardState {
#[default]
New,
Received(ReceivedData),
Acknowledged(ReceivedData),
Forwarded(ReceivedData),
DeadlineExpired {
reason: String,
},
}
impl AckForwardState {
#[must_use]
pub fn label(&self) -> &'static str {
match self {
Self::New => "New",
Self::Received(_) => "Received",
Self::Acknowledged(_) => "Acknowledged",
Self::Forwarded(_) => "Forwarded",
Self::DeadlineExpired { .. } => "DeadlineExpired",
}
}
}
impl EventPayload for AckForwardEvent {
fn event_type(&self) -> &'static str {
match self {
Self::Received { .. } => "AckForwardReceived",
Self::Acknowledged { .. } => "AckForwardAcknowledged",
Self::Forwarded { .. } => "AckForwardForwarded",
Self::DeadlineExpired { .. } => "AckForwardDeadlineExpired",
}
}
}
macro_rules! define_workflow_event {
($event_type:ident, $prefix:expr) => {
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(transparent)]
pub struct $event_type(pub AckForwardEvent);
impl From<AckForwardEvent> for $event_type {
fn from(e: AckForwardEvent) -> Self {
Self(e)
}
}
impl From<$event_type> for AckForwardEvent {
fn from(e: $event_type) -> AckForwardEvent {
e.0
}
}
impl EventPayload for $event_type {
fn event_type(&self) -> &'static str {
match &self.0 {
AckForwardEvent::Received { .. } => concat!($prefix, "Received"),
AckForwardEvent::Acknowledged { .. } => concat!($prefix, "Acknowledged"),
AckForwardEvent::Forwarded { .. } => concat!($prefix, "Forwarded"),
AckForwardEvent::DeadlineExpired { .. } => {
concat!($prefix, "DeadlineExpired")
}
}
}
}
};
}
define_workflow_event!(VerfuegbarkeitEvent, "Verfuegbarkeit");
define_workflow_event!(NetzengpassEvent, "Netzengpass");
define_workflow_event!(KaskadeEvent, "Kaskade");
define_workflow_event!(PlanungsdatenEvent, "Planungsdaten");
define_workflow_event!(StatusanfrageEvent, "Statusanfrage");
define_workflow_event!(KostenblattEvent, "Kostenblatt");
pub(crate) fn apply(state: AckForwardState, event: &AckForwardEvent) -> AckForwardState {
match event {
AckForwardEvent::Received {
mrid,
doc_type,
sender,
receiver,
received_at,
} => AckForwardState::Received(ReceivedData {
mrid: mrid.clone(),
doc_type: doc_type.clone(),
sender: sender.clone(),
receiver: receiver.clone(),
received_at: received_at.clone(),
}),
AckForwardEvent::Acknowledged { .. } => match state {
AckForwardState::Received(data) => AckForwardState::Acknowledged(data),
other => other,
},
AckForwardEvent::Forwarded { .. } => match state {
AckForwardState::Acknowledged(data) => AckForwardState::Forwarded(data),
other => other,
},
AckForwardEvent::DeadlineExpired { label, .. } => AckForwardState::DeadlineExpired {
reason: format!("deadline expired: {label}"),
},
}
}
pub(crate) fn handle(
state: &AckForwardState,
command: AckForwardCommand,
ack_window_label: &str,
) -> Result<WorkflowOutput<AckForwardEvent>, WorkflowError> {
match command {
AckForwardCommand::Receive {
mrid,
doc_type,
sender,
receiver,
received_at,
} => {
if !matches!(state, AckForwardState::New) {
return Ok(vec![].into());
}
Ok(vec![AckForwardEvent::Received {
mrid,
doc_type,
sender,
receiver,
received_at,
}]
.into())
}
AckForwardCommand::Acknowledge { ack_mrid } => match state {
AckForwardState::Received(_) => {
Ok(vec![AckForwardEvent::Acknowledged { ack_mrid }].into())
}
AckForwardState::Acknowledged(_) | AckForwardState::Forwarded(_) => Ok(vec![].into()),
other => Err(WorkflowError::rejected(format!(
"Acknowledge not valid in state {}",
other.label()
))),
},
AckForwardCommand::Forward { upstream_mrid } => match state {
AckForwardState::Acknowledged(_) => {
Ok(vec![AckForwardEvent::Forwarded { upstream_mrid }].into())
}
AckForwardState::Forwarded(_) => Ok(vec![].into()),
other => Err(WorkflowError::rejected(format!(
"Forward not valid in state {}",
other.label()
))),
},
AckForwardCommand::TimeoutExpired { deadline_id, label } => match state {
AckForwardState::Acknowledged(_)
| AckForwardState::Forwarded(_)
| AckForwardState::DeadlineExpired { .. } => Ok(vec![].into()),
_ => {
let _ = ack_window_label; Ok(vec![AckForwardEvent::DeadlineExpired { deadline_id, label }].into())
}
},
}
}
macro_rules! ack_forward_workflow {
(
$(#[$meta:meta])*
$name:ident,
$event_newtype:ident,
$workflow_name:expr,
$ack_label:expr,
$event_prefix:expr $(,)?
) => {
$(#[$meta])*
pub struct $name;
impl Workflow for $name {
type State = AckForwardState;
type Event = $event_newtype;
type Command = AckForwardCommand;
fn on_deadline(
deadline: &Deadline,
state: &Self::State,
) -> Option<Self::Command> {
if deadline.label() == $ack_label {
if matches!(state, AckForwardState::Received(_)) {
return Some(AckForwardCommand::TimeoutExpired {
deadline_id: deadline.deadline_id(),
label: deadline.label().into(),
});
}
}
None
}
fn apply(state: Self::State, event: &Self::Event) -> Self::State {
crate::ack_forward::apply(state, &event.0)
}
fn handle(
state: &Self::State,
command: Self::Command,
) -> Result<WorkflowOutput<Self::Event>, WorkflowError> {
let output = crate::ack_forward::handle(state, command, $ack_label)?;
Ok(WorkflowOutput::with_outbox(
output.events.into_iter().map($event_newtype).collect(),
output.outbox,
))
}
}
impl $name {
#[must_use]
pub fn event_prefix() -> &'static str {
$event_prefix
}
}
};
}
ack_forward_workflow!(
VerfuegbarkeitWorkflow,
VerfuegbarkeitEvent,
"redispatch-verfuegbarkeit",
"redispatch-verfuegbarkeit-ack-window",
"Verfuegbarkeit",
);
ack_forward_workflow!(
NetzengpassWorkflow,
NetzengpassEvent,
"redispatch-netzengpass",
"redispatch-netzengpass-ack-window",
"Netzengpass",
);
ack_forward_workflow!(
KaskadeWorkflow,
KaskadeEvent,
"redispatch-kaskade",
"redispatch-kaskade-ack-window",
"Kaskade",
);
ack_forward_workflow!(
PlanungsdatenWorkflow,
PlanungsdatenEvent,
"redispatch-planungsdaten",
"redispatch-planungsdaten-ack-window",
"Planungsdaten",
);
ack_forward_workflow!(
StatusanfrageWorkflow,
StatusanfrageEvent,
"redispatch-statusanfrage",
"redispatch-statusanfrage-response-window",
"Statusanfrage",
);
ack_forward_workflow!(
KostenblattWorkflow,
KostenblattEvent,
"redispatch-kostenblatt",
"redispatch-kostenblatt-ack-window",
"Kostenblatt",
);
pub mod names {
pub const VERFUEGBARKEIT: &str = "redispatch-verfuegbarkeit";
pub const NETZENGPASS: &str = "redispatch-netzengpass";
pub const KASKADE: &str = "redispatch-kaskade";
pub const PLANUNGSDATEN: &str = "redispatch-planungsdaten";
pub const STATUSANFRAGE: &str = "redispatch-statusanfrage";
pub const KOSTENBLATT: &str = "redispatch-kostenblatt";
}
#[cfg(test)]
mod tests {
use super::*;
use mako_engine::workflow::EventPayload;
#[test]
fn verfuegbarkeit_receive_to_acknowledged() {
let state = AckForwardState::New;
let output = VerfuegbarkeitWorkflow::handle(
&state,
AckForwardCommand::Receive {
mrid: "m1".into(),
doc_type: "Unavailability".into(),
sender: "s".into(),
receiver: "r".into(),
received_at: "2025-10-15T10:00:00Z".into(),
},
)
.unwrap();
assert_eq!(output.events.len(), 1);
let state2 = VerfuegbarkeitWorkflow::apply(state, &output.events[0]);
assert!(matches!(state2, AckForwardState::Received(_)));
let output2 = VerfuegbarkeitWorkflow::handle(
&state2,
AckForwardCommand::Acknowledge {
ack_mrid: "ack-1".into(),
},
)
.unwrap();
let state3 = VerfuegbarkeitWorkflow::apply(state2, &output2.events[0]);
assert!(matches!(state3, AckForwardState::Acknowledged(_)));
}
#[test]
fn kaskade_forward_requires_acknowledged_state() {
let state = AckForwardState::Received(ReceivedData {
mrid: "m".into(),
doc_type: "Kaskade".into(),
sender: "s".into(),
receiver: "r".into(),
received_at: "2025-10-15T10:00:00Z".into(),
});
let result = KaskadeWorkflow::handle(
&state,
AckForwardCommand::Forward {
upstream_mrid: "u".into(),
},
);
assert!(result.is_err());
}
#[test]
fn event_types_are_unique_per_workflow() {
let inner = AckForwardEvent::Received {
mrid: "m".into(),
doc_type: "X".into(),
sender: "s".into(),
receiver: "r".into(),
received_at: "t".into(),
};
let types: Vec<&'static str> = vec![
VerfuegbarkeitEvent(inner.clone()).event_type(),
NetzengpassEvent(inner.clone()).event_type(),
KaskadeEvent(inner.clone()).event_type(),
PlanungsdatenEvent(inner.clone()).event_type(),
StatusanfrageEvent(inner.clone()).event_type(),
KostenblattEvent(inner.clone()).event_type(),
];
let unique: std::collections::HashSet<_> = types.iter().collect();
assert_eq!(
unique.len(),
types.len(),
"event_type() strings must be unique across all ack-forward workflows: {types:?}"
);
for t in &types {
assert!(
!t.starts_with("AckForward"),
"event_type '{t}' must not use the generic AckForward prefix"
);
}
}
}