1use mako_engine::{
26 deadline::Deadline,
27 error::WorkflowError,
28 ids::DeadlineId,
29 workflow::{CommandPayload, EventPayload, Workflow, WorkflowOutput},
30};
31use serde::{Deserialize, Serialize};
32
33pub const WORKFLOW_NAME: &str = "redispatch-stammdaten";
37
38pub const ACK_WINDOW_LABEL: &str = "redispatch-stammdaten-ack-window";
44
45pub const FORWARD_WINDOW_LABEL: &str = "redispatch-stammdaten-forward-window";
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
55#[serde(tag = "type", content = "data")]
56pub enum StammdatenEvent {
57 Received {
59 mrid: String,
61 sender: String,
63 receiver: String,
65 doc_type: String,
67 anlagen_count: u32,
69 received_at: String,
71 },
72 Acknowledged {
74 ack_mrid: String,
76 },
77 Forwarded {
79 upstream_mrid: String,
81 },
82 DeadlineExpired {
84 deadline_id: DeadlineId,
86 label: Box<str>,
88 },
89}
90
91impl EventPayload for StammdatenEvent {
92 fn event_type(&self) -> &'static str {
93 match self {
94 Self::Received { .. } => "StammdatenReceived",
95 Self::Acknowledged { .. } => "StammdatenAcknowledged",
96 Self::Forwarded { .. } => "StammdatenForwarded",
97 Self::DeadlineExpired { .. } => "StammdatenDeadlineExpired",
98 }
99 }
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize)]
106#[serde(deny_unknown_fields)]
107pub struct ReceivedData {
108 pub mrid: String,
110 pub sender: String,
112 pub receiver: String,
114 pub doc_type: String,
116 pub anlagen_count: u32,
118 pub received_at: String,
120}
121
122#[derive(Debug, Clone, Default, Serialize, Deserialize)]
133#[serde(tag = "status", content = "data")]
134pub enum StammdatenState {
135 #[default]
137 New,
138 Received(ReceivedData),
140 Acknowledged(ReceivedData),
142 Forwarded(ReceivedData),
144 DeadlineExpired {
146 reason: String,
148 },
149}
150
151impl StammdatenState {
152 #[must_use]
154 pub fn label(&self) -> &'static str {
155 match self {
156 Self::New => "New",
157 Self::Received(_) => "Received",
158 Self::Acknowledged(_) => "Acknowledged",
159 Self::Forwarded(_) => "Forwarded",
160 Self::DeadlineExpired { .. } => "DeadlineExpired",
161 }
162 }
163}
164
165#[derive(Clone)]
172pub enum StammdatenCommand {
173 Receive {
175 mrid: String,
177 sender: String,
179 receiver: String,
181 doc_type: String,
183 anlagen_count: u32,
185 received_at: String,
187 },
188 SendAcknowledgement {
193 ack_mrid: String,
195 },
196 Forward {
200 upstream_mrid: String,
202 },
203 TimeoutExpired {
205 deadline_id: DeadlineId,
207 label: Box<str>,
209 },
210}
211
212impl CommandPayload for StammdatenCommand {}
213
214pub struct StammdatenWorkflow;
229
230impl Workflow for StammdatenWorkflow {
231 type State = StammdatenState;
232 type Event = StammdatenEvent;
233 type Command = StammdatenCommand;
234
235 fn on_deadline(deadline: &Deadline, state: &Self::State) -> Option<Self::Command> {
237 match (deadline.label(), state) {
238 (ACK_WINDOW_LABEL, StammdatenState::Received(_)) => {
239 Some(StammdatenCommand::TimeoutExpired {
240 deadline_id: deadline.deadline_id(),
241 label: deadline.label().into(),
242 })
243 }
244 _ => None,
245 }
246 }
247
248 fn apply(state: Self::State, event: &Self::Event) -> Self::State {
249 match event {
250 StammdatenEvent::Received {
251 mrid,
252 sender,
253 receiver,
254 doc_type,
255 anlagen_count,
256 received_at,
257 } => StammdatenState::Received(ReceivedData {
258 mrid: mrid.clone(),
259 sender: sender.clone(),
260 receiver: receiver.clone(),
261 doc_type: doc_type.clone(),
262 anlagen_count: *anlagen_count,
263 received_at: received_at.clone(),
264 }),
265
266 StammdatenEvent::Acknowledged { .. } => match state {
267 StammdatenState::Received(data) => StammdatenState::Acknowledged(data),
268 other => other,
269 },
270
271 StammdatenEvent::Forwarded { .. } => match state {
272 StammdatenState::Acknowledged(data) => StammdatenState::Forwarded(data),
273 other => other,
274 },
275
276 StammdatenEvent::DeadlineExpired { label, .. } => StammdatenState::DeadlineExpired {
277 reason: format!("deadline expired: {label}"),
278 },
279 }
280 }
281
282 fn handle(
283 state: &Self::State,
284 command: Self::Command,
285 ) -> Result<WorkflowOutput<Self::Event>, WorkflowError> {
286 match command {
287 StammdatenCommand::Receive {
288 mrid,
289 sender,
290 receiver,
291 doc_type,
292 anlagen_count,
293 received_at,
294 } => {
295 if !matches!(state, StammdatenState::New) {
296 return Ok(vec![].into());
298 }
299 Ok(vec![StammdatenEvent::Received {
300 mrid,
301 sender,
302 receiver,
303 doc_type,
304 anlagen_count,
305 received_at,
306 }]
307 .into())
308 }
309
310 StammdatenCommand::SendAcknowledgement { ack_mrid } => match state {
311 StammdatenState::Received(_) => {
312 Ok(vec![StammdatenEvent::Acknowledged { ack_mrid }].into())
313 }
314 StammdatenState::Acknowledged(_) | StammdatenState::Forwarded(_) => {
315 Ok(vec![].into())
317 }
318 other => Err(WorkflowError::rejected(format!(
319 "SendAcknowledgement not valid in state {}",
320 other.label()
321 ))),
322 },
323
324 StammdatenCommand::Forward { upstream_mrid } => match state {
325 StammdatenState::Acknowledged(_) => {
326 Ok(vec![StammdatenEvent::Forwarded { upstream_mrid }].into())
327 }
328 StammdatenState::Forwarded(_) => {
329 Ok(vec![].into())
331 }
332 other => Err(WorkflowError::rejected(format!(
333 "Forward not valid in state {}",
334 other.label()
335 ))),
336 },
337
338 StammdatenCommand::TimeoutExpired { deadline_id, label } => {
339 match state {
340 StammdatenState::Acknowledged(_)
342 | StammdatenState::Forwarded(_)
343 | StammdatenState::DeadlineExpired { .. } => Ok(vec![].into()),
344 _ => Ok(vec![StammdatenEvent::DeadlineExpired { deadline_id, label }].into()),
345 }
346 }
347 }
348 }
349}
350
351#[cfg(test)]
352mod tests {
353 use super::*;
354 use mako_engine::ids::DeadlineId;
355
356 fn received_cmd() -> StammdatenCommand {
357 StammdatenCommand::Receive {
358 mrid: "mrid-001".into(),
359 sender: "4012345000001".into(),
360 receiver: "4012345000002".into(),
361 doc_type: "Z02".into(),
362 anlagen_count: 3,
363 received_at: "2025-10-15T10:00:00Z".into(),
364 }
365 }
366
367 #[test]
368 fn receive_transitions_new_to_received() {
369 let state = StammdatenState::New;
370 let output = StammdatenWorkflow::handle(&state, received_cmd()).unwrap();
371 assert_eq!(output.events.len(), 1);
372 let new_state = StammdatenWorkflow::apply(state, &output.events[0]);
373 assert!(matches!(new_state, StammdatenState::Received(_)));
374 }
375
376 #[test]
377 fn acknowledge_transitions_received_to_acknowledged() {
378 let state = StammdatenState::Received(ReceivedData {
379 mrid: "m".into(),
380 sender: "s".into(),
381 receiver: "r".into(),
382 doc_type: "Z02".into(),
383 anlagen_count: 1,
384 received_at: "2025-10-15T10:00:00Z".into(),
385 });
386 let output = StammdatenWorkflow::handle(
387 &state,
388 StammdatenCommand::SendAcknowledgement {
389 ack_mrid: "ack-001".into(),
390 },
391 )
392 .unwrap();
393 assert_eq!(output.events.len(), 1);
394 let new_state = StammdatenWorkflow::apply(state, &output.events[0]);
395 assert!(matches!(new_state, StammdatenState::Acknowledged(_)));
396 }
397
398 #[test]
399 fn forward_requires_acknowledged_state() {
400 let state = StammdatenState::Received(ReceivedData {
401 mrid: "m".into(),
402 sender: "s".into(),
403 receiver: "r".into(),
404 doc_type: "Z03".into(),
405 anlagen_count: 1,
406 received_at: "2025-10-15T10:00:00Z".into(),
407 });
408 let result = StammdatenWorkflow::handle(
409 &state,
410 StammdatenCommand::Forward {
411 upstream_mrid: "u".into(),
412 },
413 );
414 assert!(result.is_err());
415 }
416
417 #[test]
418 fn timeout_in_received_state_emits_deadline_expired() {
419 let state = StammdatenState::Received(ReceivedData {
420 mrid: "m".into(),
421 sender: "s".into(),
422 receiver: "r".into(),
423 doc_type: "Z02".into(),
424 anlagen_count: 1,
425 received_at: "2025-10-15T10:00:00Z".into(),
426 });
427 let output = StammdatenWorkflow::handle(
428 &state,
429 StammdatenCommand::TimeoutExpired {
430 deadline_id: DeadlineId::new(),
431 label: ACK_WINDOW_LABEL.into(),
432 },
433 )
434 .unwrap();
435 assert!(matches!(
436 output.events.as_slice(),
437 [StammdatenEvent::DeadlineExpired { .. }]
438 ));
439 }
440
441 #[test]
442 fn timeout_in_acknowledged_state_is_noop() {
443 let state = StammdatenState::Acknowledged(ReceivedData {
444 mrid: "m".into(),
445 sender: "s".into(),
446 receiver: "r".into(),
447 doc_type: "Z02".into(),
448 anlagen_count: 1,
449 received_at: "2025-10-15T10:00:00Z".into(),
450 });
451 let output = StammdatenWorkflow::handle(
452 &state,
453 StammdatenCommand::TimeoutExpired {
454 deadline_id: DeadlineId::new(),
455 label: ACK_WINDOW_LABEL.into(),
456 },
457 )
458 .unwrap();
459 assert!(output.events.is_empty());
460 }
461}