Skip to main content

mako_engine/
erp.rs

1//! ERP integration traits and reference implementations.
2//!
3//! ## Role
4//!
5//! `mako-engine` is a protocol processor — it handles EDIFACT parsing, BDEW
6//! process rules, AS4 delivery, and regulatory deadlines. All contract data,
7//! billing logic, and master data live in the operator's ERP.
8//!
9//! This module defines the **stable integration contract** between `mako-engine`
10//! and external ERP or backend systems.  The payload contract is **BO4E**, not
11//! raw EDIFACT.  ERP adapters never see EDIFACT segment codes or format-version
12//! identifiers — those are absorbed inside `mako-engine`.
13//!
14//! ## Outbound: mako → ERP
15//!
16//! Implement [`ErpAdapter`] and register it at startup.  Every domain event
17//! that requires ERP action is delivered as an [`ErpEvent`] with a BO4E-typed
18//! JSON payload.
19//!
20//! ```rust,ignore
21//! struct MyErpAdapter { client: reqwest::Client, base_url: String }
22//!
23//! impl ErpAdapter for MyErpAdapter {
24//!     async fn notify(&self, event: ErpEvent) -> Result<(), ErpAdapterError> {
25//!         let malo: serde_json::Value = event.payload;
26//!         self.client
27//!             .post(format!("{}/mako/events", self.base_url))
28//!             .header("X-Idempotency-Key", &event.idempotency_key)
29//!             .json(&event)
30//!             .send()
31//!             .await
32//!             .map_err(ErpAdapterError::transport)?;
33//!         Ok(())
34//!     }
35//! }
36//! ```
37//!
38//! ## Inbound: ERP → mako (event-driven)
39//!
40//! For ERP systems with a message bus, implement [`ErpCommandSource`] to feed
41//! BO4E business objects into the engine without a synchronous REST round-trip.
42//!
43//! ```rust,ignore
44//! struct MyKafkaSource { consumer: KafkaConsumer }
45//!
46//! impl ErpCommandSource for MyKafkaSource {
47//!     async fn next(&self) -> Result<Option<InboundErpCommand>, ErpAdapterError> {
48//!         let msg = self.consumer.poll(Duration::from_millis(100)).await;
49//!         Ok(msg.map(|m| InboundErpCommand {
50//!             idempotency_key: m.offset().to_string(),
51//!             tenant_id: TenantId::new(),
52//!             payload_schema: "…/Marktlokation.json".into(),
53//!             payload: serde_json::from_slice(m.payload()).unwrap(),
54//!         }))
55//!     }
56//!
57//!     async fn ack(&self, id: &str) -> Result<(), ErpAdapterError> {
58//!         self.consumer.commit_offset(id.parse().unwrap()).await
59//!             .map_err(ErpAdapterError::transport)
60//!     }
61//!
62//!     async fn nack(&self, _id: &str, _reason: &str) -> Result<(), ErpAdapterError> {
63//!         Ok(()) // Kafka auto-redelivers on next poll
64//!     }
65//! }
66//! ```
67//!
68//! ## Reference implementations
69//!
70//! | Type | Feature | Use case |
71//! |------|---------|---------|
72//! | [`NoopErpAdapter`] | `testing` | Unit tests, CI |
73//! | [`LogErpAdapter`] | — | Structured log output; starting point for new integrations |
74//! | [`NoopErpCommandSource`] | `testing` | No-op inbound source for tests |
75//!
76//! For the production `WebhookErpAdapter` and `POST /api/v1/commands` endpoint,
77//! see `makod/src/erp_adapter.rs`.
78
79use std::sync::Arc;
80
81use serde::{Deserialize, Serialize};
82use time::OffsetDateTime;
83
84use crate::ids::{ConversationId, EventId, ProcessId, TenantId};
85
86// ── ErpAdapterError ───────────────────────────────────────────────────────────
87
88/// Errors produced by [`ErpAdapter`] and [`ErpCommandSource`] implementations.
89#[derive(Debug, thiserror::Error)]
90pub enum ErpAdapterError {
91    /// The ERP response payload could not be deserialised or is semantically
92    /// invalid.
93    #[error("ERP payload error: {0}")]
94    Payload(String),
95
96    /// A transient transport error (network timeout, HTTP 5xx, broker
97    /// disconnect).  The delivery worker will retry with exponential backoff.
98    #[error("ERP transport error: {0}")]
99    Transport(String),
100
101    /// A permanent, non-retryable error (e.g. invalid configuration,
102    /// authentication failure).  The delivery worker will dead-letter the
103    /// message.
104    #[error("ERP permanent error: {0}")]
105    Permanent(String),
106}
107
108impl ErpAdapterError {
109    /// Construct a [`Payload`](ErpAdapterError::Payload) variant.
110    pub fn payload(e: impl std::fmt::Display) -> Self {
111        Self::Payload(e.to_string())
112    }
113
114    /// Construct a [`Transport`](ErpAdapterError::Transport) variant.
115    pub fn transport(e: impl std::fmt::Display) -> Self {
116        Self::Transport(e.to_string())
117    }
118
119    /// Construct a [`Permanent`](ErpAdapterError::Permanent) variant.
120    pub fn permanent(e: impl std::fmt::Display) -> Self {
121        Self::Permanent(e.to_string())
122    }
123
124    /// Returns `true` for transient errors that warrant a retry.
125    #[must_use]
126    pub fn is_retryable(&self) -> bool {
127        matches!(self, Self::Transport(_))
128    }
129}
130
131// ── ErpEventType ─────────────────────────────────────────────────────────────
132
133/// Semantic classification of an outbound ERP process event.
134///
135/// The ERP uses this to decide which action to take — update an order status,
136/// trigger a billing run, open a complaint ticket, etc.
137#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
138#[serde(rename_all = "snake_case")]
139pub enum ErpEventType {
140    /// A new MaKo process was spawned (e.g. inbound UTILMD received).
141    ProcessInitiated,
142    /// The counterparty sent an APERAK accepting our UTILMD.
143    AperakAccepted,
144    /// The counterparty sent an APERAK rejecting our UTILMD.
145    AperakRejected,
146    /// No APERAK received within the regulatory SLA window (deadline expired).
147    AperakTimeout,
148    /// A CONTRL syntax acknowledgement was received.
149    ContrlReceived,
150    /// The process reached its terminal success state
151    /// (e.g. Lieferbeginn/Lieferende confirmed).
152    ProcessCompleted,
153    /// A MaLo identification request was successfully resolved: the MaLo was
154    /// found and the positive callback was delivered to the requesting LF.
155    ///
156    /// The `payload` field of the associated [`ErpEvent`] carries a BO4E
157    /// `Marktlokation` JSON object with the resolved MaLo data.
158    MaloIdentified,
159    /// The process failed permanently (regulatory timeout, data error, …).
160    ProcessFailed {
161        /// Human-readable failure description.
162        reason: Box<str>,
163    },
164}
165
166impl ErpEventType {
167    /// Short label for structured logging and metrics.
168    #[must_use]
169    pub fn label(&self) -> &'static str {
170        match self {
171            Self::ProcessInitiated => "process_initiated",
172            Self::AperakAccepted => "aperak_accepted",
173            Self::AperakRejected => "aperak_rejected",
174            Self::AperakTimeout => "aperak_timeout",
175            Self::ContrlReceived => "contrl_received",
176            Self::ProcessCompleted => "process_completed",
177            Self::MaloIdentified => "malo_identified",
178            Self::ProcessFailed { .. } => "process_failed",
179        }
180    }
181}
182
183// ── ErpEvent ──────────────────────────────────────────────────────────────────
184
185/// A structured process event delivered from `mako-engine` to the ERP.
186///
187/// The payload is always a **BO4E-typed JSON object** — the ERP adapter never
188/// receives raw EDIFACT bytes or EDIFACT format-version identifiers.
189///
190/// ## Idempotency
191///
192/// `idempotency_key` is a stable dedup identifier.  The ERP **must** persist
193/// this key and reject duplicate deliveries with `HTTP 200 OK` (not `409`) so
194/// the adapter does not retry indefinitely.
195#[derive(Debug, Clone, Serialize, Deserialize)]
196pub struct ErpEvent {
197    /// Stable dedup key — store in the ERP to reject duplicate deliveries.
198    ///
199    /// Derived from the outbox `message_id`; stable across retries.
200    pub idempotency_key: String,
201
202    /// Semantic classification of this event.
203    pub event_type: ErpEventType,
204
205    /// The mako process that generated this event.
206    pub process_id: ProcessId,
207
208    /// Tenant (operator GLN) that owns this process.
209    pub tenant_id: TenantId,
210
211    /// BDEW business conversation identifier.
212    pub conversation_id: ConversationId,
213
214    /// The mako domain event that directly caused this ERP notification.
215    pub causation_id: EventId,
216
217    /// Prüfidentifikator of the process.
218    pub pid: u32,
219
220    /// BO4E JSON Schema URL that validates [`payload`](ErpEvent::payload).
221    ///
222    /// Examples:
223    /// - `"https://raw.githubusercontent.com/BO4E/BO4E-Schemas/v202501.0.0/src/bo4e_schemas/bo/Marktlokation.json"`
224    /// - `"https://raw.githubusercontent.com/BO4E/BO4E-Schemas/v202501.0.0/src/bo4e_schemas/bo/Messlokation.json"`
225    ///
226    /// `None` for events where no primary BO4E object is applicable
227    /// (e.g. `ContrlReceived`).
228    #[serde(skip_serializing_if = "Option::is_none")]
229    pub payload_schema: Option<String>,
230
231    /// BO4E-typed payload.
232    ///
233    /// Deserialise using the ERP's own BO4E library.  Raw EDIFACT structures
234    /// are never exposed here.  `null` when no payload is applicable.
235    pub payload: serde_json::Value,
236
237    /// Wall-clock time when the domain event was persisted.
238    pub occurred_at: OffsetDateTime,
239}
240
241// ── ErpAdapter trait ──────────────────────────────────────────────────────────
242
243/// Outbound notification sink — `mako-engine` calls this when a process event
244/// should be reported to the ERP.
245///
246/// The payload is always a BO4E-typed JSON object; the adapter never receives
247/// raw EDIFACT bytes or format-version identifiers.
248///
249/// ## Contract
250///
251/// - Must be **idempotent** on `event.idempotency_key`.  Called twice with the
252///   same key must succeed without double-posting.
253/// - Return [`ErpAdapterError::Transport`] for transient failures — the caller
254///   will retry with exponential backoff.
255/// - Return [`ErpAdapterError::Permanent`] for non-retryable failures — the
256///   caller will dead-letter the event.
257#[allow(async_fn_in_trait)]
258pub trait ErpAdapter: Send + Sync + 'static {
259    /// Deliver `event` to the ERP.
260    async fn notify(&self, event: ErpEvent) -> Result<(), ErpAdapterError>;
261}
262
263/// Blanket `Arc` implementation so `ErpAdapter` can be shared across tasks.
264impl<T: ErpAdapter> ErpAdapter for Arc<T> {
265    async fn notify(&self, event: ErpEvent) -> Result<(), ErpAdapterError> {
266        (**self).notify(event).await
267    }
268}
269
270// ── InboundErpCommand ─────────────────────────────────────────────────────────
271
272/// A BO4E business object received from the ERP, intended to trigger a mako
273/// process.
274///
275/// `mako-engine` maps the BO4E payload to an internal `Command` via the
276/// domain crate's command mapper.
277#[derive(Debug, Clone, Serialize, Deserialize)]
278pub struct InboundErpCommand {
279    /// Stable dedup key — forwarded to [`InboxStore::accept`].
280    ///
281    /// The ERP must supply a stable, unique identifier per command so that
282    /// retransmissions do not double-execute the workflow.
283    ///
284    /// [`InboxStore::accept`]: crate::inbox::InboxStore::accept
285    pub idempotency_key: String,
286
287    /// Tenant (operator GLN) that owns the target process.
288    pub tenant_id: TenantId,
289
290    /// BO4E JSON Schema URL — identifies the object type without inspecting
291    /// `payload`.
292    ///
293    /// Example:
294    /// `"https://raw.githubusercontent.com/BO4E/BO4E-Schemas/v202501.0.0/src/bo4e_schemas/bo/Vertrag.json"`
295    pub payload_schema: String,
296
297    /// BO4E-typed JSON payload.  `mako-engine` maps this to an internal
298    /// `Command` via the registered domain command mapper.
299    pub payload: serde_json::Value,
300}
301
302// ── ErpCommandSource trait ────────────────────────────────────────────────────
303
304/// Inbound command source — `mako-engine` polls this for new BO4E objects
305/// from the ERP.
306///
307/// Implement this for broker-based inbound flows (Kafka consumer, SFTP poll,
308/// database change feed, …) to make the entire integration fully event-driven
309/// — no synchronous REST round-trip required.
310///
311/// ## Contract
312///
313/// - [`next`](ErpCommandSource::next) must be **non-blocking** when idle —
314///   return `Ok(None)` immediately when no command is available.
315/// - [`ack`](ErpCommandSource::ack) must suppress re-delivery of `id` after
316///   a successful ack (idempotent).
317/// - [`nack`](ErpCommandSource::nack) should allow re-delivery of `id` after
318///   an appropriate backoff.
319#[allow(async_fn_in_trait)]
320pub trait ErpCommandSource: Send + Sync + 'static {
321    /// Return the next pending BO4E command, or `None` when the source is idle.
322    async fn next(&self) -> Result<Option<InboundErpCommand>, ErpAdapterError>;
323
324    /// Acknowledge successful processing of `id`.
325    ///
326    /// After a successful ack the source must not re-deliver `id`.
327    async fn ack(&self, id: &str) -> Result<(), ErpAdapterError>;
328
329    /// Negative-acknowledge — allow re-delivery of `id` after backoff.
330    async fn nack(&self, id: &str, reason: &str) -> Result<(), ErpAdapterError>;
331}
332
333/// Blanket `Arc` implementation so `ErpCommandSource` can be shared across tasks.
334impl<S: ErpCommandSource> ErpCommandSource for Arc<S> {
335    async fn next(&self) -> Result<Option<InboundErpCommand>, ErpAdapterError> {
336        (**self).next().await
337    }
338    async fn ack(&self, id: &str) -> Result<(), ErpAdapterError> {
339        (**self).ack(id).await
340    }
341    async fn nack(&self, id: &str, reason: &str) -> Result<(), ErpAdapterError> {
342        (**self).nack(id, reason).await
343    }
344}
345
346// ── NoopErpAdapter ────────────────────────────────────────────────────────────
347
348/// An [`ErpAdapter`] that succeeds immediately without notifying anything.
349///
350/// Use in unit tests and CI where no real ERP endpoint is available.
351#[cfg(feature = "testing")]
352#[derive(Debug, Clone, Default)]
353pub struct NoopErpAdapter;
354
355#[cfg(feature = "testing")]
356impl ErpAdapter for NoopErpAdapter {
357    async fn notify(&self, _event: ErpEvent) -> Result<(), ErpAdapterError> {
358        Ok(())
359    }
360}
361
362// ── LogErpAdapter ─────────────────────────────────────────────────────────────
363
364/// An [`ErpAdapter`] that logs every event at `info` level without delivering
365/// it.
366///
367/// Useful as a development starting point — replace it with your concrete ERP
368/// adapter in production.
369#[derive(Debug, Clone, Default)]
370pub struct LogErpAdapter;
371
372impl ErpAdapter for LogErpAdapter {
373    async fn notify(&self, event: ErpEvent) -> Result<(), ErpAdapterError> {
374        tracing::info!(
375            idempotency_key = %event.idempotency_key,
376            event_type      = event.event_type.label(),
377            process_id      = %event.process_id,
378            tenant_id       = %event.tenant_id,
379            pid             = event.pid,
380            "ErpAdapter: event logged (no delivery configured)",
381        );
382        Ok(())
383    }
384}
385
386// ── NoopErpCommandSource ──────────────────────────────────────────────────────
387
388/// An [`ErpCommandSource`] that is always idle (returns `Ok(None)`).
389///
390/// Use in tests where no inbound ERP command flow is needed.
391#[cfg(feature = "testing")]
392#[derive(Debug, Clone, Default)]
393pub struct NoopErpCommandSource;
394
395#[cfg(feature = "testing")]
396impl ErpCommandSource for NoopErpCommandSource {
397    async fn next(&self) -> Result<Option<InboundErpCommand>, ErpAdapterError> {
398        Ok(None)
399    }
400    async fn ack(&self, _id: &str) -> Result<(), ErpAdapterError> {
401        Ok(())
402    }
403    async fn nack(&self, _id: &str, _reason: &str) -> Result<(), ErpAdapterError> {
404        Ok(())
405    }
406}
407
408// ── ErpAdapterTestHarness ─────────────────────────────────────────────────────
409
410/// A recording [`ErpAdapter`] for use in tests.
411///
412/// Records every [`ErpEvent`] delivered via [`notify`](ErpAdapter::notify) so
413/// tests can assert on event types, ordering, and BO4E payload shapes.
414///
415/// ```rust,ignore
416/// let harness = ErpAdapterTestHarness::new();
417/// my_workflow.run_with_adapter(harness.adapter()).await?;
418///
419/// let events = harness.events();
420/// assert_eq!(events[0].event_type, ErpEventType::ProcessInitiated);
421/// assert_eq!(events[1].event_type, ErpEventType::AperakAccepted);
422/// ```
423#[cfg(feature = "testing")]
424#[derive(Debug, Clone, Default)]
425pub struct ErpAdapterTestHarness {
426    events: Arc<tokio::sync::Mutex<Vec<ErpEvent>>>,
427}
428
429#[cfg(feature = "testing")]
430impl ErpAdapterTestHarness {
431    /// Create a new empty harness.
432    #[must_use]
433    pub fn new() -> Self {
434        Self::default()
435    }
436
437    /// Return a snapshot of all recorded events in delivery order.
438    pub async fn events(&self) -> Vec<ErpEvent> {
439        self.events.lock().await.clone()
440    }
441
442    /// Drain all recorded events, resetting the harness.
443    pub async fn drain(&self) -> Vec<ErpEvent> {
444        std::mem::take(&mut *self.events.lock().await)
445    }
446}
447
448#[cfg(feature = "testing")]
449impl ErpAdapter for ErpAdapterTestHarness {
450    async fn notify(&self, event: ErpEvent) -> Result<(), ErpAdapterError> {
451        self.events.lock().await.push(event);
452        Ok(())
453    }
454}
455
456// ── ErpCommandSourceTestHarness ───────────────────────────────────────────────
457
458/// A controllable [`ErpCommandSource`] for use in tests.
459///
460/// Inject canned [`InboundErpCommand`] payloads and verify that the engine
461/// processes them correctly.
462///
463/// ```text
464/// let source = ErpCommandSourceTestHarness::new();
465/// source.inject(InboundErpCommand {
466///     idempotency_key: "order-42".into(),
467///     tenant_id: TenantId::new(),
468///     payload_schema: ".../Vertrag.json".into(),
469///     payload: serde_json::json!({ "_typ": "VERTRAG", ... }),
470/// }).await;
471///
472/// // The engine picks up the command on the next poll.
473/// ```
474#[cfg(feature = "testing")]
475#[derive(Debug, Clone, Default)]
476pub struct ErpCommandSourceTestHarness {
477    queue: Arc<tokio::sync::Mutex<std::collections::VecDeque<InboundErpCommand>>>,
478    acked: Arc<tokio::sync::Mutex<Vec<String>>>,
479    nacked: Arc<tokio::sync::Mutex<Vec<(String, String)>>>,
480}
481
482#[cfg(feature = "testing")]
483impl ErpCommandSourceTestHarness {
484    /// Create a new empty harness.
485    #[must_use]
486    pub fn new() -> Self {
487        Self::default()
488    }
489
490    /// Enqueue a command to be returned by the next [`next`](ErpCommandSource::next) call.
491    pub async fn inject(&self, cmd: InboundErpCommand) {
492        self.queue.lock().await.push_back(cmd);
493    }
494
495    /// Return all acked command IDs.
496    pub async fn acked(&self) -> Vec<String> {
497        self.acked.lock().await.clone()
498    }
499
500    /// Return all nacked `(id, reason)` pairs.
501    pub async fn nacked(&self) -> Vec<(String, String)> {
502        self.nacked.lock().await.clone()
503    }
504}
505
506#[cfg(feature = "testing")]
507impl ErpCommandSource for ErpCommandSourceTestHarness {
508    async fn next(&self) -> Result<Option<InboundErpCommand>, ErpAdapterError> {
509        Ok(self.queue.lock().await.pop_front())
510    }
511
512    async fn ack(&self, id: &str) -> Result<(), ErpAdapterError> {
513        self.acked.lock().await.push(id.to_owned());
514        Ok(())
515    }
516
517    async fn nack(&self, id: &str, reason: &str) -> Result<(), ErpAdapterError> {
518        self.nacked
519            .lock()
520            .await
521            .push((id.to_owned(), reason.to_owned()));
522        Ok(())
523    }
524}
525
526// ── BO4E schema URL constants ─────────────────────────────────────────────────
527
528/// BO4E schema URL base for v202501.0.0.
529///
530/// Use `bo4e_schema_url!(Marktlokation)` to construct typed schema URLs at
531/// compile time.
532pub const BO4E_V202501_BASE: &str =
533    "https://raw.githubusercontent.com/BO4E/BO4E-Schemas/v202501.0.0/src/bo4e_schemas";
534
535/// Construct a BO4E v202501.0.0 JSON Schema URL for a Business Object.
536///
537/// ```rust
538/// use mako_engine::bo4e_schema_url;
539/// assert!(bo4e_schema_url!("bo", "Marktlokation").contains("Marktlokation"));
540/// ```
541#[macro_export]
542macro_rules! bo4e_schema_url {
543    ($category:literal, $name:literal) => {
544        concat!(
545            "https://raw.githubusercontent.com/BO4E/BO4E-Schemas/v202501.0.0/src/bo4e_schemas/",
546            $category,
547            "/",
548            $name,
549            ".json",
550        )
551    };
552}
553
554/// BO4E JSON Schema URL for `Marktlokation`.
555pub const BO4E_SCHEMA_MARKTLOKATION: &str = bo4e_schema_url!("bo", "Marktlokation");
556
557/// BO4E JSON Schema URL for `Messlokation`.
558pub const BO4E_SCHEMA_MESSLOKATION: &str = bo4e_schema_url!("bo", "Messlokation");
559
560/// BO4E JSON Schema URL for `Vertrag`.
561pub const BO4E_SCHEMA_VERTRAG: &str = bo4e_schema_url!("bo", "Vertrag");
562
563/// BO4E JSON Schema URL for `Energiemenge`.
564pub const BO4E_SCHEMA_ENERGIEMENGE: &str = bo4e_schema_url!("bo", "Energiemenge");
565
566/// BO4E JSON Schema URL for `Rechnung`.
567pub const BO4E_SCHEMA_RECHNUNG: &str = bo4e_schema_url!("bo", "Rechnung");
568
569/// BO4E JSON Schema URL for `Zaehler`.
570pub const BO4E_SCHEMA_ZAEHLER: &str = bo4e_schema_url!("bo", "Zaehler");
571
572/// BO4E JSON Schema URL for `Geschaeftspartner`.
573pub const BO4E_SCHEMA_GESCHAEFTSPARTNER: &str = bo4e_schema_url!("bo", "Geschaeftspartner");