1use mako_engine::{
20 deadline::Deadline,
21 error::WorkflowError,
22 ids::DeadlineId,
23 workflow::{CommandPayload, EventPayload, Workflow, WorkflowOutput},
24};
25use serde::{Deserialize, Serialize};
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
31#[serde(tag = "type", content = "data")]
32pub enum AckForwardEvent {
33 Received {
35 mrid: String,
37 doc_type: String,
39 sender: String,
41 receiver: String,
43 received_at: String,
45 },
46 Acknowledged {
48 ack_mrid: String,
50 },
51 Forwarded {
53 upstream_mrid: String,
55 },
56 DeadlineExpired {
58 deadline_id: DeadlineId,
60 label: Box<str>,
62 },
63}
64
65#[derive(Clone)]
67pub enum AckForwardCommand {
68 Receive {
70 mrid: String,
72 doc_type: String,
74 sender: String,
76 receiver: String,
78 received_at: String,
80 },
81 Acknowledge {
83 ack_mrid: String,
85 },
86 Forward {
88 upstream_mrid: String,
90 },
91 TimeoutExpired {
93 deadline_id: DeadlineId,
95 label: Box<str>,
97 },
98}
99
100impl CommandPayload for AckForwardCommand {}
101
102#[derive(Debug, Clone, Serialize, Deserialize)]
104#[serde(deny_unknown_fields)]
105pub struct ReceivedData {
106 pub mrid: String,
108 pub doc_type: String,
110 pub sender: String,
112 pub receiver: String,
114 pub received_at: String,
116}
117
118#[derive(Debug, Clone, Default, Serialize, Deserialize)]
120#[serde(tag = "status", content = "data")]
121pub enum AckForwardState {
122 #[default]
124 New,
125 Received(ReceivedData),
127 Acknowledged(ReceivedData),
129 Forwarded(ReceivedData),
131 DeadlineExpired {
133 reason: String,
135 },
136}
137
138impl AckForwardState {
139 #[must_use]
141 pub fn label(&self) -> &'static str {
142 match self {
143 Self::New => "New",
144 Self::Received(_) => "Received",
145 Self::Acknowledged(_) => "Acknowledged",
146 Self::Forwarded(_) => "Forwarded",
147 Self::DeadlineExpired { .. } => "DeadlineExpired",
148 }
149 }
150}
151
152impl EventPayload for AckForwardEvent {
153 fn event_type(&self) -> &'static str {
154 match self {
155 Self::Received { .. } => "AckForwardReceived",
156 Self::Acknowledged { .. } => "AckForwardAcknowledged",
157 Self::Forwarded { .. } => "AckForwardForwarded",
158 Self::DeadlineExpired { .. } => "AckForwardDeadlineExpired",
159 }
160 }
161}
162
163macro_rules! define_workflow_event {
175 ($event_type:ident, $prefix:expr) => {
176 #[derive(Debug, Clone, Serialize, Deserialize)]
183 #[serde(transparent)]
184 pub struct $event_type(pub AckForwardEvent);
185
186 impl From<AckForwardEvent> for $event_type {
187 fn from(e: AckForwardEvent) -> Self {
188 Self(e)
189 }
190 }
191
192 impl From<$event_type> for AckForwardEvent {
193 fn from(e: $event_type) -> AckForwardEvent {
194 e.0
195 }
196 }
197
198 impl EventPayload for $event_type {
199 fn event_type(&self) -> &'static str {
200 match &self.0 {
201 AckForwardEvent::Received { .. } => concat!($prefix, "Received"),
202 AckForwardEvent::Acknowledged { .. } => concat!($prefix, "Acknowledged"),
203 AckForwardEvent::Forwarded { .. } => concat!($prefix, "Forwarded"),
204 AckForwardEvent::DeadlineExpired { .. } => {
205 concat!($prefix, "DeadlineExpired")
206 }
207 }
208 }
209 }
210 };
211}
212
213define_workflow_event!(VerfuegbarkeitEvent, "Verfuegbarkeit");
214define_workflow_event!(NetzengpassEvent, "Netzengpass");
215define_workflow_event!(KaskadeEvent, "Kaskade");
216define_workflow_event!(PlanungsdatenEvent, "Planungsdaten");
217define_workflow_event!(StatusanfrageEvent, "Statusanfrage");
218define_workflow_event!(KostenblattEvent, "Kostenblatt");
219
220pub(crate) fn apply(state: AckForwardState, event: &AckForwardEvent) -> AckForwardState {
224 match event {
225 AckForwardEvent::Received {
226 mrid,
227 doc_type,
228 sender,
229 receiver,
230 received_at,
231 } => AckForwardState::Received(ReceivedData {
232 mrid: mrid.clone(),
233 doc_type: doc_type.clone(),
234 sender: sender.clone(),
235 receiver: receiver.clone(),
236 received_at: received_at.clone(),
237 }),
238
239 AckForwardEvent::Acknowledged { .. } => match state {
240 AckForwardState::Received(data) => AckForwardState::Acknowledged(data),
241 other => other,
242 },
243
244 AckForwardEvent::Forwarded { .. } => match state {
245 AckForwardState::Acknowledged(data) => AckForwardState::Forwarded(data),
246 other => other,
247 },
248
249 AckForwardEvent::DeadlineExpired { label, .. } => AckForwardState::DeadlineExpired {
250 reason: format!("deadline expired: {label}"),
251 },
252 }
253}
254
255pub(crate) fn handle(
257 state: &AckForwardState,
258 command: AckForwardCommand,
259 ack_window_label: &str,
260) -> Result<WorkflowOutput<AckForwardEvent>, WorkflowError> {
261 match command {
262 AckForwardCommand::Receive {
263 mrid,
264 doc_type,
265 sender,
266 receiver,
267 received_at,
268 } => {
269 if !matches!(state, AckForwardState::New) {
270 return Ok(vec![].into());
271 }
272 Ok(vec![AckForwardEvent::Received {
273 mrid,
274 doc_type,
275 sender,
276 receiver,
277 received_at,
278 }]
279 .into())
280 }
281
282 AckForwardCommand::Acknowledge { ack_mrid } => match state {
283 AckForwardState::Received(_) => {
284 Ok(vec![AckForwardEvent::Acknowledged { ack_mrid }].into())
285 }
286 AckForwardState::Acknowledged(_) | AckForwardState::Forwarded(_) => Ok(vec![].into()),
287 other => Err(WorkflowError::rejected(format!(
288 "Acknowledge not valid in state {}",
289 other.label()
290 ))),
291 },
292
293 AckForwardCommand::Forward { upstream_mrid } => match state {
294 AckForwardState::Acknowledged(_) => {
295 Ok(vec![AckForwardEvent::Forwarded { upstream_mrid }].into())
296 }
297 AckForwardState::Forwarded(_) => Ok(vec![].into()),
298 other => Err(WorkflowError::rejected(format!(
299 "Forward not valid in state {}",
300 other.label()
301 ))),
302 },
303
304 AckForwardCommand::TimeoutExpired { deadline_id, label } => match state {
305 AckForwardState::Acknowledged(_)
306 | AckForwardState::Forwarded(_)
307 | AckForwardState::DeadlineExpired { .. } => Ok(vec![].into()),
308 _ => {
309 let _ = ack_window_label; Ok(vec![AckForwardEvent::DeadlineExpired { deadline_id, label }].into())
311 }
312 },
313 }
314}
315
316macro_rules! ack_forward_workflow {
319 (
320 $(#[$meta:meta])*
321 $name:ident,
322 $event_newtype:ident,
323 $workflow_name:expr,
324 $ack_label:expr,
325 $event_prefix:expr $(,)?
326 ) => {
327 $(#[$meta])*
328 pub struct $name;
329
330 impl Workflow for $name {
331 type State = AckForwardState;
332 type Event = $event_newtype;
333 type Command = AckForwardCommand;
334
335 fn on_deadline(
336 deadline: &Deadline,
337 state: &Self::State,
338 ) -> Option<Self::Command> {
339 if deadline.label() == $ack_label {
340 if matches!(state, AckForwardState::Received(_)) {
341 return Some(AckForwardCommand::TimeoutExpired {
342 deadline_id: deadline.deadline_id(),
343 label: deadline.label().into(),
344 });
345 }
346 }
347 None
348 }
349
350 fn apply(state: Self::State, event: &Self::Event) -> Self::State {
351 crate::ack_forward::apply(state, &event.0)
352 }
353
354 fn handle(
355 state: &Self::State,
356 command: Self::Command,
357 ) -> Result<WorkflowOutput<Self::Event>, WorkflowError> {
358 let output = crate::ack_forward::handle(state, command, $ack_label)?;
359 Ok(WorkflowOutput::with_outbox(
360 output.events.into_iter().map($event_newtype).collect(),
361 output.outbox,
362 ))
363 }
364 }
365
366 impl $name {
367 #[must_use]
369 pub fn event_prefix() -> &'static str {
370 $event_prefix
371 }
372 }
373 };
374}
375
376ack_forward_workflow!(
377 VerfuegbarkeitWorkflow,
381 VerfuegbarkeitEvent,
382 "redispatch-verfuegbarkeit",
383 "redispatch-verfuegbarkeit-ack-window",
384 "Verfuegbarkeit",
385);
386
387ack_forward_workflow!(
388 NetzengpassWorkflow,
392 NetzengpassEvent,
393 "redispatch-netzengpass",
394 "redispatch-netzengpass-ack-window",
395 "Netzengpass",
396);
397
398ack_forward_workflow!(
399 KaskadeWorkflow,
404 KaskadeEvent,
405 "redispatch-kaskade",
406 "redispatch-kaskade-ack-window",
407 "Kaskade",
408);
409
410ack_forward_workflow!(
411 PlanungsdatenWorkflow,
415 PlanungsdatenEvent,
416 "redispatch-planungsdaten",
417 "redispatch-planungsdaten-ack-window",
418 "Planungsdaten",
419);
420
421ack_forward_workflow!(
422 StatusanfrageWorkflow,
426 StatusanfrageEvent,
427 "redispatch-statusanfrage",
428 "redispatch-statusanfrage-response-window",
429 "Statusanfrage",
430);
431
432ack_forward_workflow!(
433 KostenblattWorkflow,
438 KostenblattEvent,
439 "redispatch-kostenblatt",
440 "redispatch-kostenblatt-ack-window",
441 "Kostenblatt",
442);
443
444pub mod names {
447 pub const VERFUEGBARKEIT: &str = "redispatch-verfuegbarkeit";
449 pub const NETZENGPASS: &str = "redispatch-netzengpass";
451 pub const KASKADE: &str = "redispatch-kaskade";
453 pub const PLANUNGSDATEN: &str = "redispatch-planungsdaten";
455 pub const STATUSANFRAGE: &str = "redispatch-statusanfrage";
457 pub const KOSTENBLATT: &str = "redispatch-kostenblatt";
459}
460
461#[cfg(test)]
462mod tests {
463 use super::*;
464 use mako_engine::workflow::EventPayload;
465
466 #[test]
467 fn verfuegbarkeit_receive_to_acknowledged() {
468 let state = AckForwardState::New;
469 let output = VerfuegbarkeitWorkflow::handle(
470 &state,
471 AckForwardCommand::Receive {
472 mrid: "m1".into(),
473 doc_type: "Unavailability".into(),
474 sender: "s".into(),
475 receiver: "r".into(),
476 received_at: "2025-10-15T10:00:00Z".into(),
477 },
478 )
479 .unwrap();
480 assert_eq!(output.events.len(), 1);
481
482 let state2 = VerfuegbarkeitWorkflow::apply(state, &output.events[0]);
483 assert!(matches!(state2, AckForwardState::Received(_)));
484
485 let output2 = VerfuegbarkeitWorkflow::handle(
486 &state2,
487 AckForwardCommand::Acknowledge {
488 ack_mrid: "ack-1".into(),
489 },
490 )
491 .unwrap();
492 let state3 = VerfuegbarkeitWorkflow::apply(state2, &output2.events[0]);
493 assert!(matches!(state3, AckForwardState::Acknowledged(_)));
494 }
495
496 #[test]
497 fn kaskade_forward_requires_acknowledged_state() {
498 let state = AckForwardState::Received(ReceivedData {
499 mrid: "m".into(),
500 doc_type: "Kaskade".into(),
501 sender: "s".into(),
502 receiver: "r".into(),
503 received_at: "2025-10-15T10:00:00Z".into(),
504 });
505 let result = KaskadeWorkflow::handle(
506 &state,
507 AckForwardCommand::Forward {
508 upstream_mrid: "u".into(),
509 },
510 );
511 assert!(result.is_err());
512 }
513
514 #[test]
516 fn event_types_are_unique_per_workflow() {
517 let inner = AckForwardEvent::Received {
518 mrid: "m".into(),
519 doc_type: "X".into(),
520 sender: "s".into(),
521 receiver: "r".into(),
522 received_at: "t".into(),
523 };
524
525 let types: Vec<&'static str> = vec![
526 VerfuegbarkeitEvent(inner.clone()).event_type(),
527 NetzengpassEvent(inner.clone()).event_type(),
528 KaskadeEvent(inner.clone()).event_type(),
529 PlanungsdatenEvent(inner.clone()).event_type(),
530 StatusanfrageEvent(inner.clone()).event_type(),
531 KostenblattEvent(inner.clone()).event_type(),
532 ];
533
534 let unique: std::collections::HashSet<_> = types.iter().collect();
536 assert_eq!(
537 unique.len(),
538 types.len(),
539 "event_type() strings must be unique across all ack-forward workflows: {types:?}"
540 );
541
542 for t in &types {
544 assert!(
545 !t.starts_with("AckForward"),
546 "event_type '{t}' must not use the generic AckForward prefix"
547 );
548 }
549 }
550}