Skip to main content

mako_redispatch/
ack_forward.rs

1//! Generic acknowledge-and-forward workflow for Redispatch 2.0.
2//!
3//! Shared state machine used by:
4//! - Verfügbarkeitsmeldung (`redispatch-verfuegbarkeit`)
5//! - Netzengpassinformation (`redispatch-netzengpass`)
6//! - Kaskade §13 Abs. 2 (`redispatch-kaskade`)
7//! - Planungsdaten Abruffahrplan (`redispatch-planungsdaten`)
8//! - Statusanfrage (`redispatch-statusanfrage`)
9//! - Kostenblatt (`redispatch-kostenblatt`)
10//!
11//! Each of these processes follows the same pattern:
12//! 1. Receive an XML document.
13//! 2. Send an `AcknowledgementDocument` within **6 wall-clock hours** (UTC).
14//! 3. Optionally forward to an upstream party.
15//!
16//! A separate workflow struct per process is defined below so that workflow
17//! names, BDEW references, and deadline labels remain distinct.
18
19use mako_engine::{
20    deadline::Deadline,
21    error::WorkflowError,
22    ids::DeadlineId,
23    workflow::{CommandPayload, EventPayload, Workflow, WorkflowOutput},
24};
25use serde::{Deserialize, Serialize};
26
27// ── Generic events ─────────────────────────────────────────────────────────────
28
29/// Events shared by all acknowledge-and-forward workflows.
30#[derive(Debug, Clone, Serialize, Deserialize)]
31#[serde(tag = "type", content = "data")]
32pub enum AckForwardEvent {
33    /// XML document received.
34    Received {
35        /// MRID (UUID) of the received document.
36        mrid: String,
37        /// Document type string (e.g. `"Unavailability"`, `"Kaskade"`).
38        doc_type: String,
39        /// GLN of the sender.
40        sender: String,
41        /// GLN of the receiver.
42        receiver: String,
43        /// UTC receipt timestamp (ISO-8601).
44        received_at: String,
45    },
46    /// `AcknowledgementDocument` dispatched within the 6h window.
47    Acknowledged {
48        /// MRID of the outbound `AcknowledgementDocument`.
49        ack_mrid: String,
50    },
51    /// Document forwarded upstream (role-conditional).
52    Forwarded {
53        /// MRID of the forwarded document.
54        upstream_mrid: String,
55    },
56    /// A registered deadline expired.
57    DeadlineExpired {
58        /// Unique ID of the expired deadline.
59        deadline_id: DeadlineId,
60        /// Label identifying the deadline.
61        label: Box<str>,
62    },
63}
64
65/// Commands shared by all acknowledge-and-forward workflows.
66#[derive(Clone)]
67pub enum AckForwardCommand {
68    /// Inbound document received.
69    Receive {
70        /// MRID of the received document.
71        mrid: String,
72        /// Document type string.
73        doc_type: String,
74        /// Sender GLN.
75        sender: String,
76        /// Receiver GLN.
77        receiver: String,
78        /// UTC receipt timestamp.
79        received_at: String,
80    },
81    /// `AcknowledgementDocument` dispatched.
82    Acknowledge {
83        /// MRID of the outbound `AcknowledgementDocument`.
84        ack_mrid: String,
85    },
86    /// Document forwarded upstream.
87    Forward {
88        /// MRID of the forwarded document.
89        upstream_mrid: String,
90    },
91    /// Deadline fired.
92    TimeoutExpired {
93        /// Unique ID of the expired deadline.
94        deadline_id: DeadlineId,
95        /// Label identifying the deadline.
96        label: Box<str>,
97    },
98}
99
100impl CommandPayload for AckForwardCommand {}
101
102/// Core data captured on receipt.
103#[derive(Debug, Clone, Serialize, Deserialize)]
104#[serde(deny_unknown_fields)]
105pub struct ReceivedData {
106    /// MRID of the received document.
107    pub mrid: String,
108    /// Document type string.
109    pub doc_type: String,
110    /// Sender GLN.
111    pub sender: String,
112    /// Receiver GLN.
113    pub receiver: String,
114    /// Receipt timestamp.
115    pub received_at: String,
116}
117
118/// Generic state for acknowledge-and-forward workflows.
119#[derive(Debug, Clone, Default, Serialize, Deserialize)]
120#[serde(tag = "status", content = "data")]
121pub enum AckForwardState {
122    /// No events yet.
123    #[default]
124    New,
125    /// Document received; acknowledgement not yet sent.
126    Received(ReceivedData),
127    /// `AcknowledgementDocument` dispatched.
128    Acknowledged(ReceivedData),
129    /// Document forwarded upstream.
130    Forwarded(ReceivedData),
131    /// A registered deadline expired without acknowledgement.
132    DeadlineExpired {
133        /// Human-readable reason.
134        reason: String,
135    },
136}
137
138impl AckForwardState {
139    /// Stable string label.
140    #[must_use]
141    pub fn label(&self) -> &'static str {
142        match self {
143            Self::New => "New",
144            Self::Received(_) => "Received",
145            Self::Acknowledged(_) => "Acknowledged",
146            Self::Forwarded(_) => "Forwarded",
147            Self::DeadlineExpired { .. } => "DeadlineExpired",
148        }
149    }
150}
151
152impl EventPayload for AckForwardEvent {
153    fn event_type(&self) -> &'static str {
154        match self {
155            Self::Received { .. } => "AckForwardReceived",
156            Self::Acknowledged { .. } => "AckForwardAcknowledged",
157            Self::Forwarded { .. } => "AckForwardForwarded",
158            Self::DeadlineExpired { .. } => "AckForwardDeadlineExpired",
159        }
160    }
161}
162
163// ── Per-workflow event newtypes ────────────────────────────────────────────────
164//
165// Each workflow needs distinct event_type() strings so that event logs,
166// projections, and observability tools can identify events unambiguously
167// across all six ack-forward process families.
168//
169// The macro below generates a thin newtype `FooEvent(AckForwardEvent)` for each
170// workflow, with `EventPayload::event_type()` returning prefixed names such as
171// `"VerfuegbarkeitReceived"`.  All apply/handle logic delegates to the shared
172// `AckForwardEvent` via `From<FooEvent> for AckForwardEvent`.
173
174macro_rules! define_workflow_event {
175    ($event_type:ident, $prefix:expr) => {
176        /// Workflow-specific event newtype for one of the six ack-forward process
177        /// families.
178        ///
179        /// Wraps [`AckForwardEvent`] and returns a workflow-specific prefix from
180        /// [`EventPayload::event_type`] so events from different ack-forward
181        /// workflows are distinguishable in projections and the event log.
182        #[derive(Debug, Clone, Serialize, Deserialize)]
183        #[serde(transparent)]
184        pub struct $event_type(pub AckForwardEvent);
185
186        impl From<AckForwardEvent> for $event_type {
187            fn from(e: AckForwardEvent) -> Self {
188                Self(e)
189            }
190        }
191
192        impl From<$event_type> for AckForwardEvent {
193            fn from(e: $event_type) -> AckForwardEvent {
194                e.0
195            }
196        }
197
198        impl EventPayload for $event_type {
199            fn event_type(&self) -> &'static str {
200                match &self.0 {
201                    AckForwardEvent::Received { .. } => concat!($prefix, "Received"),
202                    AckForwardEvent::Acknowledged { .. } => concat!($prefix, "Acknowledged"),
203                    AckForwardEvent::Forwarded { .. } => concat!($prefix, "Forwarded"),
204                    AckForwardEvent::DeadlineExpired { .. } => {
205                        concat!($prefix, "DeadlineExpired")
206                    }
207                }
208            }
209        }
210    };
211}
212
213define_workflow_event!(VerfuegbarkeitEvent, "Verfuegbarkeit");
214define_workflow_event!(NetzengpassEvent, "Netzengpass");
215define_workflow_event!(KaskadeEvent, "Kaskade");
216define_workflow_event!(PlanungsdatenEvent, "Planungsdaten");
217define_workflow_event!(StatusanfrageEvent, "Statusanfrage");
218define_workflow_event!(KostenblattEvent, "Kostenblatt");
219
220// ── Shared apply / handle logic ───────────────────────────────────────────────
221
222/// Apply an `AckForwardEvent` to `AckForwardState`.
223pub(crate) fn apply(state: AckForwardState, event: &AckForwardEvent) -> AckForwardState {
224    match event {
225        AckForwardEvent::Received {
226            mrid,
227            doc_type,
228            sender,
229            receiver,
230            received_at,
231        } => AckForwardState::Received(ReceivedData {
232            mrid: mrid.clone(),
233            doc_type: doc_type.clone(),
234            sender: sender.clone(),
235            receiver: receiver.clone(),
236            received_at: received_at.clone(),
237        }),
238
239        AckForwardEvent::Acknowledged { .. } => match state {
240            AckForwardState::Received(data) => AckForwardState::Acknowledged(data),
241            other => other,
242        },
243
244        AckForwardEvent::Forwarded { .. } => match state {
245            AckForwardState::Acknowledged(data) => AckForwardState::Forwarded(data),
246            other => other,
247        },
248
249        AckForwardEvent::DeadlineExpired { label, .. } => AckForwardState::DeadlineExpired {
250            reason: format!("deadline expired: {label}"),
251        },
252    }
253}
254
255/// Handle an `AckForwardCommand` against `AckForwardState`.
256pub(crate) fn handle(
257    state: &AckForwardState,
258    command: AckForwardCommand,
259    ack_window_label: &str,
260) -> Result<WorkflowOutput<AckForwardEvent>, WorkflowError> {
261    match command {
262        AckForwardCommand::Receive {
263            mrid,
264            doc_type,
265            sender,
266            receiver,
267            received_at,
268        } => {
269            if !matches!(state, AckForwardState::New) {
270                return Ok(vec![].into());
271            }
272            Ok(vec![AckForwardEvent::Received {
273                mrid,
274                doc_type,
275                sender,
276                receiver,
277                received_at,
278            }]
279            .into())
280        }
281
282        AckForwardCommand::Acknowledge { ack_mrid } => match state {
283            AckForwardState::Received(_) => {
284                Ok(vec![AckForwardEvent::Acknowledged { ack_mrid }].into())
285            }
286            AckForwardState::Acknowledged(_) | AckForwardState::Forwarded(_) => Ok(vec![].into()),
287            other => Err(WorkflowError::rejected(format!(
288                "Acknowledge not valid in state {}",
289                other.label()
290            ))),
291        },
292
293        AckForwardCommand::Forward { upstream_mrid } => match state {
294            AckForwardState::Acknowledged(_) => {
295                Ok(vec![AckForwardEvent::Forwarded { upstream_mrid }].into())
296            }
297            AckForwardState::Forwarded(_) => Ok(vec![].into()),
298            other => Err(WorkflowError::rejected(format!(
299                "Forward not valid in state {}",
300                other.label()
301            ))),
302        },
303
304        AckForwardCommand::TimeoutExpired { deadline_id, label } => match state {
305            AckForwardState::Acknowledged(_)
306            | AckForwardState::Forwarded(_)
307            | AckForwardState::DeadlineExpired { .. } => Ok(vec![].into()),
308            _ => {
309                let _ = ack_window_label; // used by caller for label registration
310                Ok(vec![AckForwardEvent::DeadlineExpired { deadline_id, label }].into())
311            }
312        },
313    }
314}
315
316// ── Per-process workflow structs ───────────────────────────────────────────────
317
318macro_rules! ack_forward_workflow {
319    (
320        $(#[$meta:meta])*
321        $name:ident,
322        $event_newtype:ident,
323        $workflow_name:expr,
324        $ack_label:expr,
325        $event_prefix:expr $(,)?
326    ) => {
327        $(#[$meta])*
328        pub struct $name;
329
330        impl Workflow for $name {
331            type State   = AckForwardState;
332            type Event   = $event_newtype;
333            type Command = AckForwardCommand;
334
335            fn on_deadline(
336                deadline: &Deadline,
337                state: &Self::State,
338            ) -> Option<Self::Command> {
339                if deadline.label() == $ack_label {
340                    if matches!(state, AckForwardState::Received(_)) {
341                        return Some(AckForwardCommand::TimeoutExpired {
342                            deadline_id: deadline.deadline_id(),
343                            label: deadline.label().into(),
344                        });
345                    }
346                }
347                None
348            }
349
350            fn apply(state: Self::State, event: &Self::Event) -> Self::State {
351                crate::ack_forward::apply(state, &event.0)
352            }
353
354            fn handle(
355                state: &Self::State,
356                command: Self::Command,
357            ) -> Result<WorkflowOutput<Self::Event>, WorkflowError> {
358                let output = crate::ack_forward::handle(state, command, $ack_label)?;
359                Ok(WorkflowOutput::with_outbox(
360                    output.events.into_iter().map($event_newtype).collect(),
361                    output.outbox,
362                ))
363            }
364        }
365
366        impl $name {
367            /// Return the event-type prefix for this workflow's events.
368            #[must_use]
369            pub fn event_prefix() -> &'static str {
370                $event_prefix
371            }
372        }
373    };
374}
375
376ack_forward_workflow!(
377    /// Verfügbarkeitsmeldung workflow — `UnavailabilityMarketDocument`.
378    ///
379    /// ANB → VNB. Receiver acknowledges within 6h (BK6-20-059 §4.3).
380    VerfuegbarkeitWorkflow,
381    VerfuegbarkeitEvent,
382    "redispatch-verfuegbarkeit",
383    "redispatch-verfuegbarkeit-ack-window",
384    "Verfuegbarkeit",
385);
386
387ack_forward_workflow!(
388    /// Netzengpassinformation workflow — `NetworkConstraintDocument`.
389    ///
390    /// ÜNB ↔ VNB. Receiver acknowledges within 6h (BK6-20-059 §4.3).
391    NetzengpassWorkflow,
392    NetzengpassEvent,
393    "redispatch-netzengpass",
394    "redispatch-netzengpass-ack-window",
395    "Netzengpass",
396);
397
398ack_forward_workflow!(
399    /// Kaskade workflow — emergency measures per § 13 Abs. 2 `EnWG`.
400    ///
401    /// ÜNB → VNB → ANB. Receiver acknowledges within 6h (BK6-20-059 §4.3).
402    /// Only active for `Marktrolle::Nb` and `Marktrolle::Unb` deployments.
403    KaskadeWorkflow,
404    KaskadeEvent,
405    "redispatch-kaskade",
406    "redispatch-kaskade-ack-window",
407    "Kaskade",
408);
409
410ack_forward_workflow!(
411    /// Planungsdaten (Abruffahrplan) workflow — `PlannedResourceScheduleDocument`.
412    ///
413    /// ÜNB → VNB → ANB. Receiver acknowledges within 6h (BK6-20-059 §4.3).
414    PlanungsdatenWorkflow,
415    PlanungsdatenEvent,
416    "redispatch-planungsdaten",
417    "redispatch-planungsdaten-ack-window",
418    "Planungsdaten",
419);
420
421ack_forward_workflow!(
422    /// Statusanfrage workflow — `StatusRequest_MarketDocument`.
423    ///
424    /// Addressed party responds within 24h (BK6-20-059 §4.4).
425    StatusanfrageWorkflow,
426    StatusanfrageEvent,
427    "redispatch-statusanfrage",
428    "redispatch-statusanfrage-response-window",
429    "Statusanfrage",
430);
431
432ack_forward_workflow!(
433    /// Kostenblatt workflow — monthly cost reconciliation.
434    ///
435    /// VNB → ÜNB. Receiver acknowledges within 6h (BK6-20-059 §4.3).
436    /// VNB submits by the 15th of the following month (BK6-20-061 §7).
437    KostenblattWorkflow,
438    KostenblattEvent,
439    "redispatch-kostenblatt",
440    "redispatch-kostenblatt-ack-window",
441    "Kostenblatt",
442);
443
444/// Workflow name constants for each process.
445/// Workflow name constants for each process in the acknowledge-and-forward family.
446pub mod names {
447    /// Workflow name for `VerfuegbarkeitWorkflow`.
448    pub const VERFUEGBARKEIT: &str = "redispatch-verfuegbarkeit";
449    /// Workflow name for `NetzengpassWorkflow`.
450    pub const NETZENGPASS: &str = "redispatch-netzengpass";
451    /// Workflow name for `KaskadeWorkflow`.
452    pub const KASKADE: &str = "redispatch-kaskade";
453    /// Workflow name for `PlanungsdatenWorkflow`.
454    pub const PLANUNGSDATEN: &str = "redispatch-planungsdaten";
455    /// Workflow name for `StatusanfrageWorkflow`.
456    pub const STATUSANFRAGE: &str = "redispatch-statusanfrage";
457    /// Workflow name for `KostenblattWorkflow`.
458    pub const KOSTENBLATT: &str = "redispatch-kostenblatt";
459}
460
461#[cfg(test)]
462mod tests {
463    use super::*;
464    use mako_engine::workflow::EventPayload;
465
466    #[test]
467    fn verfuegbarkeit_receive_to_acknowledged() {
468        let state = AckForwardState::New;
469        let output = VerfuegbarkeitWorkflow::handle(
470            &state,
471            AckForwardCommand::Receive {
472                mrid: "m1".into(),
473                doc_type: "Unavailability".into(),
474                sender: "s".into(),
475                receiver: "r".into(),
476                received_at: "2025-10-15T10:00:00Z".into(),
477            },
478        )
479        .unwrap();
480        assert_eq!(output.events.len(), 1);
481
482        let state2 = VerfuegbarkeitWorkflow::apply(state, &output.events[0]);
483        assert!(matches!(state2, AckForwardState::Received(_)));
484
485        let output2 = VerfuegbarkeitWorkflow::handle(
486            &state2,
487            AckForwardCommand::Acknowledge {
488                ack_mrid: "ack-1".into(),
489            },
490        )
491        .unwrap();
492        let state3 = VerfuegbarkeitWorkflow::apply(state2, &output2.events[0]);
493        assert!(matches!(state3, AckForwardState::Acknowledged(_)));
494    }
495
496    #[test]
497    fn kaskade_forward_requires_acknowledged_state() {
498        let state = AckForwardState::Received(ReceivedData {
499            mrid: "m".into(),
500            doc_type: "Kaskade".into(),
501            sender: "s".into(),
502            receiver: "r".into(),
503            received_at: "2025-10-15T10:00:00Z".into(),
504        });
505        let result = KaskadeWorkflow::handle(
506            &state,
507            AckForwardCommand::Forward {
508                upstream_mrid: "u".into(),
509            },
510        );
511        assert!(result.is_err());
512    }
513
514    /// Verify that each workflow's event types are unique and correctly prefixed.
515    #[test]
516    fn event_types_are_unique_per_workflow() {
517        let inner = AckForwardEvent::Received {
518            mrid: "m".into(),
519            doc_type: "X".into(),
520            sender: "s".into(),
521            receiver: "r".into(),
522            received_at: "t".into(),
523        };
524
525        let types: Vec<&'static str> = vec![
526            VerfuegbarkeitEvent(inner.clone()).event_type(),
527            NetzengpassEvent(inner.clone()).event_type(),
528            KaskadeEvent(inner.clone()).event_type(),
529            PlanungsdatenEvent(inner.clone()).event_type(),
530            StatusanfrageEvent(inner.clone()).event_type(),
531            KostenblattEvent(inner.clone()).event_type(),
532        ];
533
534        // All event types must be distinct.
535        let unique: std::collections::HashSet<_> = types.iter().collect();
536        assert_eq!(
537            unique.len(),
538            types.len(),
539            "event_type() strings must be unique across all ack-forward workflows: {types:?}"
540        );
541
542        // All event types must be prefixed (not generic "AckForward…" names).
543        for t in &types {
544            assert!(
545                !t.starts_with("AckForward"),
546                "event_type '{t}' must not use the generic AckForward prefix"
547            );
548        }
549    }
550}