Skip to main content

mako_redispatch/
stammdaten.rs

1//! Stammdatenübermittlung workflow for Redispatch 2.0.
2//!
3//! **Direction:** ANB → VNB → ÜNB\
4//! **Document:** `redispatch_xml::Stammdaten` (Z02 reduced, Z03 enriched,
5//! Z04 NB aggregate, Z14 BKV)
6//!
7//! # Process description
8//!
9//! 1. ANB sends `Stammdaten` to VNB (initial + updates on change).
10//! 2. Receiver sends `AcknowledgementDocument` within **6 wall-clock hours**
11//!    (UTC — see note below).
12//! 3. VNB optionally forwards enriched `Stammdaten` to ÜNB within **1 Werktag**
13//!    of the master-data change (BK6-20-060 §3.2).
14//!
15//! # Clock semantics
16//!
17//! All Redispatch 2.0 fristen use **UTC wall-clock hours**, not German local
18//! time (CET/CEST). The `UtcDateTime` fields in XSD carry explicit `Z` offsets.
19//! This differs from GPKE/WiM deadlines, which use German local time.
20//!
21//! # Regulatory basis
22//!
23//! `BNetzA` BK6-20-059 §4.3 (6h ACK), BK6-20-060 §3.2 (Stammdaten update).
24
25use mako_engine::{
26    deadline::Deadline,
27    error::WorkflowError,
28    ids::DeadlineId,
29    workflow::{CommandPayload, EventPayload, Workflow, WorkflowOutput},
30};
31use serde::{Deserialize, Serialize};
32
33// ── Workflow name ─────────────────────────────────────────────────────────────
34
35/// Stable workflow name — used in `ProcessRegistry` lookups and log output.
36pub const WORKFLOW_NAME: &str = "redispatch-stammdaten";
37
38// ── Deadline labels ───────────────────────────────────────────────────────────
39
40/// 6h UTC window for dispatching `AcknowledgementDocument` (BK6-20-059 §4.3).
41///
42/// Register immediately after [`StammdatenEvent::Received`] is applied.
43pub const ACK_WINDOW_LABEL: &str = "redispatch-stammdaten-ack-window";
44
45/// 1 Werktag forwarding window for VNB→ÜNB enrichment (BK6-20-060 §3.2).
46///
47/// Register after [`StammdatenEvent::Acknowledged`] is applied, when the
48/// deployment role is VNB.
49pub const FORWARD_WINDOW_LABEL: &str = "redispatch-stammdaten-forward-window";
50
51// ── Events ────────────────────────────────────────────────────────────────────
52
53/// Events emitted by the Stammdaten workflow.
54#[derive(Debug, Clone, Serialize, Deserialize)]
55#[serde(tag = "type", content = "data")]
56pub enum StammdatenEvent {
57    /// `Stammdaten` document received from ANB or VNB.
58    Received {
59        /// MRID (UUID) of the received `Stammdaten` document.
60        mrid: String,
61        /// GLN of the sender (ANB or VNB).
62        sender: String,
63        /// GLN of the receiver (VNB or ÜNB).
64        receiver: String,
65        /// Document type code (Z02/Z03/Z04/Z14).
66        doc_type: String,
67        /// Number of resource objects (`Anlagen`) included.
68        anlagen_count: u32,
69        /// UTC receipt timestamp in ISO-8601 format.
70        received_at: String,
71    },
72    /// `AcknowledgementDocument` dispatched within the 6h window.
73    Acknowledged {
74        /// MRID of the outbound `AcknowledgementDocument`.
75        ack_mrid: String,
76    },
77    /// Enriched `Stammdaten` forwarded upstream (VNB→ÜNB, role-conditional).
78    Forwarded {
79        /// MRID of the upstream `Stammdaten` sent to ÜNB.
80        upstream_mrid: String,
81    },
82    /// The 6h acknowledgement window expired without a response.
83    DeadlineExpired {
84        /// Unique ID of the expired deadline.
85        deadline_id: DeadlineId,
86        /// Label identifying the deadline type.
87        label: Box<str>,
88    },
89}
90
91impl EventPayload for StammdatenEvent {
92    fn event_type(&self) -> &'static str {
93        match self {
94            Self::Received { .. } => "StammdatenReceived",
95            Self::Acknowledged { .. } => "StammdatenAcknowledged",
96            Self::Forwarded { .. } => "StammdatenForwarded",
97            Self::DeadlineExpired { .. } => "StammdatenDeadlineExpired",
98        }
99    }
100}
101
102// ── Domain data ───────────────────────────────────────────────────────────────
103
104/// Business data captured when the `Stammdaten` document is first received.
105#[derive(Debug, Clone, Serialize, Deserialize)]
106#[serde(deny_unknown_fields)]
107pub struct ReceivedData {
108    /// MRID (UUID) of the received `Stammdaten` document.
109    pub mrid: String,
110    /// GLN of the sender.
111    pub sender: String,
112    /// GLN of the receiver.
113    pub receiver: String,
114    /// Document type code.
115    pub doc_type: String,
116    /// Number of resource objects.
117    pub anlagen_count: u32,
118    /// UTC receipt timestamp.
119    pub received_at: String,
120}
121
122// ── State ─────────────────────────────────────────────────────────────────────
123
124/// Current state of a Stammdaten process stream.
125///
126/// # Lifecycle
127///
128/// ```text
129/// New → Received → Acknowledged → [Forwarded →] Done
130///                ↘ DeadlineExpired (6h window lapsed)
131/// ```
132#[derive(Debug, Clone, Default, Serialize, Deserialize)]
133#[serde(tag = "status", content = "data")]
134pub enum StammdatenState {
135    /// No events yet.
136    #[default]
137    New,
138    /// Document received; `AcknowledgementDocument` not yet sent.
139    Received(ReceivedData),
140    /// `AcknowledgementDocument` sent; forwarding to ÜNB not yet done.
141    Acknowledged(ReceivedData),
142    /// Enriched document forwarded to ÜNB (VNB role only).
143    Forwarded(ReceivedData),
144    /// Process terminated due to a missed deadline.
145    DeadlineExpired {
146        /// Human-readable description of the expired deadline.
147        reason: String,
148    },
149}
150
151impl StammdatenState {
152    /// Stable string label for the current variant.
153    #[must_use]
154    pub fn label(&self) -> &'static str {
155        match self {
156            Self::New => "New",
157            Self::Received(_) => "Received",
158            Self::Acknowledged(_) => "Acknowledged",
159            Self::Forwarded(_) => "Forwarded",
160            Self::DeadlineExpired { .. } => "DeadlineExpired",
161        }
162    }
163}
164
165// ── Commands ──────────────────────────────────────────────────────────────────
166
167/// Commands for the Stammdaten workflow.
168///
169/// All domain values are pre-extracted by the transport layer before
170/// construction. `Workflow::handle` is pure — no I/O.
171#[derive(Clone)]
172pub enum StammdatenCommand {
173    /// Inbound `Stammdaten` document received and parsed by the transport layer.
174    Receive {
175        /// MRID (UUID) of the received document.
176        mrid: String,
177        /// GLN of the sender.
178        sender: String,
179        /// GLN of the receiver.
180        receiver: String,
181        /// Document type code (Z02/Z03/Z04/Z14).
182        doc_type: String,
183        /// Number of resource objects in the document.
184        anlagen_count: u32,
185        /// UTC receipt timestamp (ISO-8601 string).
186        received_at: String,
187    },
188    /// `AcknowledgementDocument` dispatched to the sender.
189    ///
190    /// The caller is responsible for building and enqueuing the outbound XML
191    /// via the outbox before issuing this command.
192    SendAcknowledgement {
193        /// MRID assigned to the outbound `AcknowledgementDocument`.
194        ack_mrid: String,
195    },
196    /// Enriched `Stammdaten` forwarded to ÜNB (VNB role only).
197    ///
198    /// The caller is responsible for building and enqueuing the upstream XML.
199    Forward {
200        /// MRID assigned to the upstream `Stammdaten` document.
201        upstream_mrid: String,
202    },
203    /// A registered deadline fired.
204    TimeoutExpired {
205        /// Unique ID of the expired deadline.
206        deadline_id: DeadlineId,
207        /// Label identifying the deadline type.
208        label: Box<str>,
209    },
210}
211
212impl CommandPayload for StammdatenCommand {}
213
214// ── Workflow ──────────────────────────────────────────────────────────────────
215
216/// Stammdatenübermittlung workflow for Redispatch 2.0.
217///
218/// Handles the reception, acknowledgement, and optional forwarding of
219/// `Stammdaten` documents exchanged between ANB, VNB, and ÜNB.
220///
221/// Spawn via [`mako_engine::process::Process`]:
222/// ```rust,ignore
223/// let process = ctx.spawn::<StammdatenWorkflow>(
224///     tenant_id,
225///     WorkflowId::new(WORKFLOW_NAME, "FV2025-10-01"),
226/// );
227/// ```
228pub struct StammdatenWorkflow;
229
230impl Workflow for StammdatenWorkflow {
231    type State = StammdatenState;
232    type Event = StammdatenEvent;
233    type Command = StammdatenCommand;
234
235    /// Fire deadline commands when the ACK or forward windows expire.
236    fn on_deadline(deadline: &Deadline, state: &Self::State) -> Option<Self::Command> {
237        match (deadline.label(), state) {
238            (ACK_WINDOW_LABEL, StammdatenState::Received(_)) => {
239                Some(StammdatenCommand::TimeoutExpired {
240                    deadline_id: deadline.deadline_id(),
241                    label: deadline.label().into(),
242                })
243            }
244            _ => None,
245        }
246    }
247
248    fn apply(state: Self::State, event: &Self::Event) -> Self::State {
249        match event {
250            StammdatenEvent::Received {
251                mrid,
252                sender,
253                receiver,
254                doc_type,
255                anlagen_count,
256                received_at,
257            } => StammdatenState::Received(ReceivedData {
258                mrid: mrid.clone(),
259                sender: sender.clone(),
260                receiver: receiver.clone(),
261                doc_type: doc_type.clone(),
262                anlagen_count: *anlagen_count,
263                received_at: received_at.clone(),
264            }),
265
266            StammdatenEvent::Acknowledged { .. } => match state {
267                StammdatenState::Received(data) => StammdatenState::Acknowledged(data),
268                other => other,
269            },
270
271            StammdatenEvent::Forwarded { .. } => match state {
272                StammdatenState::Acknowledged(data) => StammdatenState::Forwarded(data),
273                other => other,
274            },
275
276            StammdatenEvent::DeadlineExpired { label, .. } => StammdatenState::DeadlineExpired {
277                reason: format!("deadline expired: {label}"),
278            },
279        }
280    }
281
282    fn handle(
283        state: &Self::State,
284        command: Self::Command,
285    ) -> Result<WorkflowOutput<Self::Event>, WorkflowError> {
286        match command {
287            StammdatenCommand::Receive {
288                mrid,
289                sender,
290                receiver,
291                doc_type,
292                anlagen_count,
293                received_at,
294            } => {
295                if !matches!(state, StammdatenState::New) {
296                    // Idempotent: document already received — this is a retry.
297                    return Ok(vec![].into());
298                }
299                Ok(vec![StammdatenEvent::Received {
300                    mrid,
301                    sender,
302                    receiver,
303                    doc_type,
304                    anlagen_count,
305                    received_at,
306                }]
307                .into())
308            }
309
310            StammdatenCommand::SendAcknowledgement { ack_mrid } => match state {
311                StammdatenState::Received(_) => {
312                    Ok(vec![StammdatenEvent::Acknowledged { ack_mrid }].into())
313                }
314                StammdatenState::Acknowledged(_) | StammdatenState::Forwarded(_) => {
315                    // Idempotent — acknowledgement already sent.
316                    Ok(vec![].into())
317                }
318                other => Err(WorkflowError::rejected(format!(
319                    "SendAcknowledgement not valid in state {}",
320                    other.label()
321                ))),
322            },
323
324            StammdatenCommand::Forward { upstream_mrid } => match state {
325                StammdatenState::Acknowledged(_) => {
326                    Ok(vec![StammdatenEvent::Forwarded { upstream_mrid }].into())
327                }
328                StammdatenState::Forwarded(_) => {
329                    // Idempotent.
330                    Ok(vec![].into())
331                }
332                other => Err(WorkflowError::rejected(format!(
333                    "Forward not valid in state {}",
334                    other.label()
335                ))),
336            },
337
338            StammdatenCommand::TimeoutExpired { deadline_id, label } => {
339                match state {
340                    // Terminal states — deadline is a no-op.
341                    StammdatenState::Acknowledged(_)
342                    | StammdatenState::Forwarded(_)
343                    | StammdatenState::DeadlineExpired { .. } => Ok(vec![].into()),
344                    _ => Ok(vec![StammdatenEvent::DeadlineExpired { deadline_id, label }].into()),
345                }
346            }
347        }
348    }
349}
350
351#[cfg(test)]
352mod tests {
353    use super::*;
354    use mako_engine::ids::DeadlineId;
355
356    fn received_cmd() -> StammdatenCommand {
357        StammdatenCommand::Receive {
358            mrid: "mrid-001".into(),
359            sender: "4012345000001".into(),
360            receiver: "4012345000002".into(),
361            doc_type: "Z02".into(),
362            anlagen_count: 3,
363            received_at: "2025-10-15T10:00:00Z".into(),
364        }
365    }
366
367    #[test]
368    fn receive_transitions_new_to_received() {
369        let state = StammdatenState::New;
370        let output = StammdatenWorkflow::handle(&state, received_cmd()).unwrap();
371        assert_eq!(output.events.len(), 1);
372        let new_state = StammdatenWorkflow::apply(state, &output.events[0]);
373        assert!(matches!(new_state, StammdatenState::Received(_)));
374    }
375
376    #[test]
377    fn acknowledge_transitions_received_to_acknowledged() {
378        let state = StammdatenState::Received(ReceivedData {
379            mrid: "m".into(),
380            sender: "s".into(),
381            receiver: "r".into(),
382            doc_type: "Z02".into(),
383            anlagen_count: 1,
384            received_at: "2025-10-15T10:00:00Z".into(),
385        });
386        let output = StammdatenWorkflow::handle(
387            &state,
388            StammdatenCommand::SendAcknowledgement {
389                ack_mrid: "ack-001".into(),
390            },
391        )
392        .unwrap();
393        assert_eq!(output.events.len(), 1);
394        let new_state = StammdatenWorkflow::apply(state, &output.events[0]);
395        assert!(matches!(new_state, StammdatenState::Acknowledged(_)));
396    }
397
398    #[test]
399    fn forward_requires_acknowledged_state() {
400        let state = StammdatenState::Received(ReceivedData {
401            mrid: "m".into(),
402            sender: "s".into(),
403            receiver: "r".into(),
404            doc_type: "Z03".into(),
405            anlagen_count: 1,
406            received_at: "2025-10-15T10:00:00Z".into(),
407        });
408        let result = StammdatenWorkflow::handle(
409            &state,
410            StammdatenCommand::Forward {
411                upstream_mrid: "u".into(),
412            },
413        );
414        assert!(result.is_err());
415    }
416
417    #[test]
418    fn timeout_in_received_state_emits_deadline_expired() {
419        let state = StammdatenState::Received(ReceivedData {
420            mrid: "m".into(),
421            sender: "s".into(),
422            receiver: "r".into(),
423            doc_type: "Z02".into(),
424            anlagen_count: 1,
425            received_at: "2025-10-15T10:00:00Z".into(),
426        });
427        let output = StammdatenWorkflow::handle(
428            &state,
429            StammdatenCommand::TimeoutExpired {
430                deadline_id: DeadlineId::new(),
431                label: ACK_WINDOW_LABEL.into(),
432            },
433        )
434        .unwrap();
435        assert!(matches!(
436            output.events.as_slice(),
437            [StammdatenEvent::DeadlineExpired { .. }]
438        ));
439    }
440
441    #[test]
442    fn timeout_in_acknowledged_state_is_noop() {
443        let state = StammdatenState::Acknowledged(ReceivedData {
444            mrid: "m".into(),
445            sender: "s".into(),
446            receiver: "r".into(),
447            doc_type: "Z02".into(),
448            anlagen_count: 1,
449            received_at: "2025-10-15T10:00:00Z".into(),
450        });
451        let output = StammdatenWorkflow::handle(
452            &state,
453            StammdatenCommand::TimeoutExpired {
454                deadline_id: DeadlineId::new(),
455                label: ACK_WINDOW_LABEL.into(),
456            },
457        )
458        .unwrap();
459        assert!(output.events.is_empty());
460    }
461}