mako_engine/erp.rs
1//! ERP integration traits and reference implementations.
2//!
3//! ## Role
4//!
5//! `mako-engine` is a protocol processor — it handles EDIFACT parsing, BDEW
6//! process rules, AS4 delivery, and regulatory deadlines. All contract data,
7//! billing logic, and master data live in the operator's ERP.
8//!
9//! This module defines the **stable integration contract** between `mako-engine`
10//! and external ERP or backend systems. The payload contract is **BO4E**, not
11//! raw EDIFACT. ERP adapters never see EDIFACT segment codes or format-version
12//! identifiers — those are absorbed inside `mako-engine`.
13//!
14//! ## Outbound: mako → ERP
15//!
16//! Implement [`ErpAdapter`] and register it at startup. Every domain event
17//! that requires ERP action is delivered as an [`ErpEvent`] with a BO4E-typed
18//! JSON payload.
19//!
20//! ```rust,ignore
21//! struct MyErpAdapter { client: reqwest::Client, base_url: String }
22//!
23//! impl ErpAdapter for MyErpAdapter {
24//! async fn notify(&self, event: ErpEvent) -> Result<(), ErpAdapterError> {
25//! let malo: serde_json::Value = event.payload;
26//! self.client
27//! .post(format!("{}/mako/events", self.base_url))
28//! .header("X-Idempotency-Key", &event.idempotency_key)
29//! .json(&event)
30//! .send()
31//! .await
32//! .map_err(ErpAdapterError::transport)?;
33//! Ok(())
34//! }
35//! }
36//! ```
37//!
38//! ## Inbound: ERP → mako (event-driven)
39//!
40//! For ERP systems with a message bus, implement [`ErpCommandSource`] to feed
41//! BO4E business objects into the engine without a synchronous REST round-trip.
42//!
43//! ```rust,ignore
44//! struct MyKafkaSource { consumer: KafkaConsumer }
45//!
46//! impl ErpCommandSource for MyKafkaSource {
47//! async fn next(&self) -> Result<Option<InboundErpCommand>, ErpAdapterError> {
48//! let msg = self.consumer.poll(Duration::from_millis(100)).await;
49//! Ok(msg.map(|m| InboundErpCommand {
50//! idempotency_key: m.offset().to_string(),
51//! tenant_id: TenantId::new(),
52//! payload_schema: "…/Marktlokation.json".into(),
53//! payload: serde_json::from_slice(m.payload()).unwrap(),
54//! }))
55//! }
56//!
57//! async fn ack(&self, id: &str) -> Result<(), ErpAdapterError> {
58//! self.consumer.commit_offset(id.parse().unwrap()).await
59//! .map_err(ErpAdapterError::transport)
60//! }
61//!
62//! async fn nack(&self, _id: &str, _reason: &str) -> Result<(), ErpAdapterError> {
63//! Ok(()) // Kafka auto-redelivers on next poll
64//! }
65//! }
66//! ```
67//!
68//! ## Reference implementations
69//!
70//! | Type | Feature | Use case |
71//! |------|---------|---------|
72//! | [`NoopErpAdapter`] | `testing` | Unit tests, CI |
73//! | [`LogErpAdapter`] | — | Structured log output; starting point for new integrations |
74//! | [`NoopErpCommandSource`] | `testing` | No-op inbound source for tests |
75//!
76//! For the production `WebhookErpAdapter` and `POST /api/v1/commands` endpoint,
77//! see `makod/src/erp_adapter.rs`.
78
79use std::sync::Arc;
80
81use serde::{Deserialize, Serialize};
82use time::OffsetDateTime;
83
84use crate::ids::{ConversationId, EventId, ProcessId, TenantId};
85
86// ── ErpAdapterError ───────────────────────────────────────────────────────────
87
88/// Errors produced by [`ErpAdapter`] and [`ErpCommandSource`] implementations.
89#[derive(Debug, thiserror::Error)]
90pub enum ErpAdapterError {
91 /// The ERP response payload could not be deserialised or is semantically
92 /// invalid.
93 #[error("ERP payload error: {0}")]
94 Payload(String),
95
96 /// A transient transport error (network timeout, HTTP 5xx, broker
97 /// disconnect). The delivery worker will retry with exponential backoff.
98 #[error("ERP transport error: {0}")]
99 Transport(String),
100
101 /// A permanent, non-retryable error (e.g. invalid configuration,
102 /// authentication failure). The delivery worker will dead-letter the
103 /// message.
104 #[error("ERP permanent error: {0}")]
105 Permanent(String),
106}
107
108impl ErpAdapterError {
109 /// Construct a [`Payload`](ErpAdapterError::Payload) variant.
110 pub fn payload(e: impl std::fmt::Display) -> Self {
111 Self::Payload(e.to_string())
112 }
113
114 /// Construct a [`Transport`](ErpAdapterError::Transport) variant.
115 pub fn transport(e: impl std::fmt::Display) -> Self {
116 Self::Transport(e.to_string())
117 }
118
119 /// Construct a [`Permanent`](ErpAdapterError::Permanent) variant.
120 pub fn permanent(e: impl std::fmt::Display) -> Self {
121 Self::Permanent(e.to_string())
122 }
123
124 /// Returns `true` for transient errors that warrant a retry.
125 #[must_use]
126 pub fn is_retryable(&self) -> bool {
127 matches!(self, Self::Transport(_))
128 }
129}
130
131// ── ErpEventType ─────────────────────────────────────────────────────────────
132
133/// Semantic classification of an outbound ERP process event.
134///
135/// The ERP uses this to decide which action to take — update an order status,
136/// trigger a billing run, open a complaint ticket, etc.
137#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
138#[serde(rename_all = "snake_case")]
139pub enum ErpEventType {
140 /// A new MaKo process was spawned (e.g. inbound UTILMD received).
141 ProcessInitiated,
142 /// The counterparty sent an APERAK accepting our UTILMD.
143 AperakAccepted,
144 /// The counterparty sent an APERAK rejecting our UTILMD.
145 AperakRejected,
146 /// No APERAK received within the regulatory SLA window (deadline expired).
147 AperakTimeout,
148 /// A CONTRL syntax acknowledgement was received.
149 ContrlReceived,
150 /// The process reached its terminal success state
151 /// (e.g. Lieferbeginn/Lieferende confirmed).
152 ProcessCompleted,
153 /// A MaLo identification request was successfully resolved: the MaLo was
154 /// found and the positive callback was delivered to the requesting LF.
155 ///
156 /// The `payload` field of the associated [`ErpEvent`] carries a BO4E
157 /// `Marktlokation` JSON object with the resolved MaLo data.
158 MaloIdentified,
159 /// The process failed permanently (regulatory timeout, data error, …).
160 ProcessFailed {
161 /// Human-readable failure description.
162 reason: Box<str>,
163 },
164}
165
166impl ErpEventType {
167 /// Short label for structured logging and metrics.
168 #[must_use]
169 pub fn label(&self) -> &'static str {
170 match self {
171 Self::ProcessInitiated => "process_initiated",
172 Self::AperakAccepted => "aperak_accepted",
173 Self::AperakRejected => "aperak_rejected",
174 Self::AperakTimeout => "aperak_timeout",
175 Self::ContrlReceived => "contrl_received",
176 Self::ProcessCompleted => "process_completed",
177 Self::MaloIdentified => "malo_identified",
178 Self::ProcessFailed { .. } => "process_failed",
179 }
180 }
181}
182
183// ── ErpEvent ──────────────────────────────────────────────────────────────────
184
185/// A structured process event delivered from `mako-engine` to the ERP.
186///
187/// The payload is always a **BO4E-typed JSON object** — the ERP adapter never
188/// receives raw EDIFACT bytes or EDIFACT format-version identifiers.
189///
190/// ## Idempotency
191///
192/// `idempotency_key` is a stable dedup identifier. The ERP **must** persist
193/// this key and reject duplicate deliveries with `HTTP 200 OK` (not `409`) so
194/// the adapter does not retry indefinitely.
195#[derive(Debug, Clone, Serialize, Deserialize)]
196pub struct ErpEvent {
197 /// Stable dedup key — store in the ERP to reject duplicate deliveries.
198 ///
199 /// Derived from the outbox `message_id`; stable across retries.
200 pub idempotency_key: String,
201
202 /// Semantic classification of this event.
203 pub event_type: ErpEventType,
204
205 /// The mako process that generated this event.
206 pub process_id: ProcessId,
207
208 /// Tenant (operator GLN) that owns this process.
209 pub tenant_id: TenantId,
210
211 /// BDEW business conversation identifier.
212 pub conversation_id: ConversationId,
213
214 /// The mako domain event that directly caused this ERP notification.
215 pub causation_id: EventId,
216
217 /// Prüfidentifikator of the process.
218 pub pid: u32,
219
220 /// BO4E JSON Schema URL that validates [`payload`](ErpEvent::payload).
221 ///
222 /// Examples:
223 /// - `"https://raw.githubusercontent.com/BO4E/BO4E-Schemas/v202501.0.0/src/bo4e_schemas/bo/Marktlokation.json"`
224 /// - `"https://raw.githubusercontent.com/BO4E/BO4E-Schemas/v202501.0.0/src/bo4e_schemas/bo/Messlokation.json"`
225 ///
226 /// `None` for events where no primary BO4E object is applicable
227 /// (e.g. `ContrlReceived`).
228 #[serde(skip_serializing_if = "Option::is_none")]
229 pub payload_schema: Option<String>,
230
231 /// BO4E-typed payload.
232 ///
233 /// Deserialise using the ERP's own BO4E library. Raw EDIFACT structures
234 /// are never exposed here. `null` when no payload is applicable.
235 pub payload: serde_json::Value,
236
237 /// Wall-clock time when the domain event was persisted.
238 pub occurred_at: OffsetDateTime,
239}
240
241// ── ErpAdapter trait ──────────────────────────────────────────────────────────
242
243/// Outbound notification sink — `mako-engine` calls this when a process event
244/// should be reported to the ERP.
245///
246/// The payload is always a BO4E-typed JSON object; the adapter never receives
247/// raw EDIFACT bytes or format-version identifiers.
248///
249/// ## Contract
250///
251/// - Must be **idempotent** on `event.idempotency_key`. Called twice with the
252/// same key must succeed without double-posting.
253/// - Return [`ErpAdapterError::Transport`] for transient failures — the caller
254/// will retry with exponential backoff.
255/// - Return [`ErpAdapterError::Permanent`] for non-retryable failures — the
256/// caller will dead-letter the event.
257#[allow(async_fn_in_trait)]
258pub trait ErpAdapter: Send + Sync + 'static {
259 /// Deliver `event` to the ERP.
260 async fn notify(&self, event: ErpEvent) -> Result<(), ErpAdapterError>;
261}
262
263/// Blanket `Arc` implementation so `ErpAdapter` can be shared across tasks.
264impl<T: ErpAdapter> ErpAdapter for Arc<T> {
265 async fn notify(&self, event: ErpEvent) -> Result<(), ErpAdapterError> {
266 (**self).notify(event).await
267 }
268}
269
270// ── InboundErpCommand ─────────────────────────────────────────────────────────
271
272/// A BO4E business object received from the ERP, intended to trigger a mako
273/// process.
274///
275/// `mako-engine` maps the BO4E payload to an internal `Command` via the
276/// domain crate's command mapper.
277#[derive(Debug, Clone, Serialize, Deserialize)]
278pub struct InboundErpCommand {
279 /// Stable dedup key — forwarded to [`InboxStore::accept`].
280 ///
281 /// The ERP must supply a stable, unique identifier per command so that
282 /// retransmissions do not double-execute the workflow.
283 ///
284 /// [`InboxStore::accept`]: crate::inbox::InboxStore::accept
285 pub idempotency_key: String,
286
287 /// Tenant (operator GLN) that owns the target process.
288 pub tenant_id: TenantId,
289
290 /// BO4E JSON Schema URL — identifies the object type without inspecting
291 /// `payload`.
292 ///
293 /// Example:
294 /// `"https://raw.githubusercontent.com/BO4E/BO4E-Schemas/v202501.0.0/src/bo4e_schemas/bo/Vertrag.json"`
295 pub payload_schema: String,
296
297 /// BO4E-typed JSON payload. `mako-engine` maps this to an internal
298 /// `Command` via the registered domain command mapper.
299 pub payload: serde_json::Value,
300}
301
302// ── ErpCommandSource trait ────────────────────────────────────────────────────
303
304/// Inbound command source — `mako-engine` polls this for new BO4E objects
305/// from the ERP.
306///
307/// Implement this for broker-based inbound flows (Kafka consumer, SFTP poll,
308/// database change feed, …) to make the entire integration fully event-driven
309/// — no synchronous REST round-trip required.
310///
311/// ## Contract
312///
313/// - [`next`](ErpCommandSource::next) must be **non-blocking** when idle —
314/// return `Ok(None)` immediately when no command is available.
315/// - [`ack`](ErpCommandSource::ack) must suppress re-delivery of `id` after
316/// a successful ack (idempotent).
317/// - [`nack`](ErpCommandSource::nack) should allow re-delivery of `id` after
318/// an appropriate backoff.
319#[allow(async_fn_in_trait)]
320pub trait ErpCommandSource: Send + Sync + 'static {
321 /// Return the next pending BO4E command, or `None` when the source is idle.
322 async fn next(&self) -> Result<Option<InboundErpCommand>, ErpAdapterError>;
323
324 /// Acknowledge successful processing of `id`.
325 ///
326 /// After a successful ack the source must not re-deliver `id`.
327 async fn ack(&self, id: &str) -> Result<(), ErpAdapterError>;
328
329 /// Negative-acknowledge — allow re-delivery of `id` after backoff.
330 async fn nack(&self, id: &str, reason: &str) -> Result<(), ErpAdapterError>;
331}
332
333/// Blanket `Arc` implementation so `ErpCommandSource` can be shared across tasks.
334impl<S: ErpCommandSource> ErpCommandSource for Arc<S> {
335 async fn next(&self) -> Result<Option<InboundErpCommand>, ErpAdapterError> {
336 (**self).next().await
337 }
338 async fn ack(&self, id: &str) -> Result<(), ErpAdapterError> {
339 (**self).ack(id).await
340 }
341 async fn nack(&self, id: &str, reason: &str) -> Result<(), ErpAdapterError> {
342 (**self).nack(id, reason).await
343 }
344}
345
346// ── NoopErpAdapter ────────────────────────────────────────────────────────────
347
348/// An [`ErpAdapter`] that succeeds immediately without notifying anything.
349///
350/// Use in unit tests and CI where no real ERP endpoint is available.
351#[cfg(feature = "testing")]
352#[derive(Debug, Clone, Default)]
353pub struct NoopErpAdapter;
354
355#[cfg(feature = "testing")]
356impl ErpAdapter for NoopErpAdapter {
357 async fn notify(&self, _event: ErpEvent) -> Result<(), ErpAdapterError> {
358 Ok(())
359 }
360}
361
362// ── LogErpAdapter ─────────────────────────────────────────────────────────────
363
364/// An [`ErpAdapter`] that logs every event at `info` level without delivering
365/// it.
366///
367/// Useful as a development starting point — replace it with your concrete ERP
368/// adapter in production.
369#[derive(Debug, Clone, Default)]
370pub struct LogErpAdapter;
371
372impl ErpAdapter for LogErpAdapter {
373 async fn notify(&self, event: ErpEvent) -> Result<(), ErpAdapterError> {
374 tracing::info!(
375 idempotency_key = %event.idempotency_key,
376 event_type = event.event_type.label(),
377 process_id = %event.process_id,
378 tenant_id = %event.tenant_id,
379 pid = event.pid,
380 "ErpAdapter: event logged (no delivery configured)",
381 );
382 Ok(())
383 }
384}
385
386// ── NoopErpCommandSource ──────────────────────────────────────────────────────
387
388/// An [`ErpCommandSource`] that is always idle (returns `Ok(None)`).
389///
390/// Use in tests where no inbound ERP command flow is needed.
391#[cfg(feature = "testing")]
392#[derive(Debug, Clone, Default)]
393pub struct NoopErpCommandSource;
394
395#[cfg(feature = "testing")]
396impl ErpCommandSource for NoopErpCommandSource {
397 async fn next(&self) -> Result<Option<InboundErpCommand>, ErpAdapterError> {
398 Ok(None)
399 }
400 async fn ack(&self, _id: &str) -> Result<(), ErpAdapterError> {
401 Ok(())
402 }
403 async fn nack(&self, _id: &str, _reason: &str) -> Result<(), ErpAdapterError> {
404 Ok(())
405 }
406}
407
408// ── ErpAdapterTestHarness ─────────────────────────────────────────────────────
409
410/// A recording [`ErpAdapter`] for use in tests.
411///
412/// Records every [`ErpEvent`] delivered via [`notify`](ErpAdapter::notify) so
413/// tests can assert on event types, ordering, and BO4E payload shapes.
414///
415/// ```rust,ignore
416/// let harness = ErpAdapterTestHarness::new();
417/// my_workflow.run_with_adapter(harness.adapter()).await?;
418///
419/// let events = harness.events();
420/// assert_eq!(events[0].event_type, ErpEventType::ProcessInitiated);
421/// assert_eq!(events[1].event_type, ErpEventType::AperakAccepted);
422/// ```
423#[cfg(feature = "testing")]
424#[derive(Debug, Clone, Default)]
425pub struct ErpAdapterTestHarness {
426 events: Arc<tokio::sync::Mutex<Vec<ErpEvent>>>,
427}
428
429#[cfg(feature = "testing")]
430impl ErpAdapterTestHarness {
431 /// Create a new empty harness.
432 #[must_use]
433 pub fn new() -> Self {
434 Self::default()
435 }
436
437 /// Return a snapshot of all recorded events in delivery order.
438 pub async fn events(&self) -> Vec<ErpEvent> {
439 self.events.lock().await.clone()
440 }
441
442 /// Drain all recorded events, resetting the harness.
443 pub async fn drain(&self) -> Vec<ErpEvent> {
444 std::mem::take(&mut *self.events.lock().await)
445 }
446}
447
448#[cfg(feature = "testing")]
449impl ErpAdapter for ErpAdapterTestHarness {
450 async fn notify(&self, event: ErpEvent) -> Result<(), ErpAdapterError> {
451 self.events.lock().await.push(event);
452 Ok(())
453 }
454}
455
456// ── ErpCommandSourceTestHarness ───────────────────────────────────────────────
457
458/// A controllable [`ErpCommandSource`] for use in tests.
459///
460/// Inject canned [`InboundErpCommand`] payloads and verify that the engine
461/// processes them correctly.
462///
463/// ```text
464/// let source = ErpCommandSourceTestHarness::new();
465/// source.inject(InboundErpCommand {
466/// idempotency_key: "order-42".into(),
467/// tenant_id: TenantId::new(),
468/// payload_schema: ".../Vertrag.json".into(),
469/// payload: serde_json::json!({ "_typ": "VERTRAG", ... }),
470/// }).await;
471///
472/// // The engine picks up the command on the next poll.
473/// ```
474#[cfg(feature = "testing")]
475#[derive(Debug, Clone, Default)]
476pub struct ErpCommandSourceTestHarness {
477 queue: Arc<tokio::sync::Mutex<std::collections::VecDeque<InboundErpCommand>>>,
478 acked: Arc<tokio::sync::Mutex<Vec<String>>>,
479 nacked: Arc<tokio::sync::Mutex<Vec<(String, String)>>>,
480}
481
482#[cfg(feature = "testing")]
483impl ErpCommandSourceTestHarness {
484 /// Create a new empty harness.
485 #[must_use]
486 pub fn new() -> Self {
487 Self::default()
488 }
489
490 /// Enqueue a command to be returned by the next [`next`](ErpCommandSource::next) call.
491 pub async fn inject(&self, cmd: InboundErpCommand) {
492 self.queue.lock().await.push_back(cmd);
493 }
494
495 /// Return all acked command IDs.
496 pub async fn acked(&self) -> Vec<String> {
497 self.acked.lock().await.clone()
498 }
499
500 /// Return all nacked `(id, reason)` pairs.
501 pub async fn nacked(&self) -> Vec<(String, String)> {
502 self.nacked.lock().await.clone()
503 }
504}
505
506#[cfg(feature = "testing")]
507impl ErpCommandSource for ErpCommandSourceTestHarness {
508 async fn next(&self) -> Result<Option<InboundErpCommand>, ErpAdapterError> {
509 Ok(self.queue.lock().await.pop_front())
510 }
511
512 async fn ack(&self, id: &str) -> Result<(), ErpAdapterError> {
513 self.acked.lock().await.push(id.to_owned());
514 Ok(())
515 }
516
517 async fn nack(&self, id: &str, reason: &str) -> Result<(), ErpAdapterError> {
518 self.nacked
519 .lock()
520 .await
521 .push((id.to_owned(), reason.to_owned()));
522 Ok(())
523 }
524}
525
526// ── BO4E schema URL constants ─────────────────────────────────────────────────
527
528/// BO4E schema URL base for v202501.0.0.
529///
530/// Use `bo4e_schema_url!(Marktlokation)` to construct typed schema URLs at
531/// compile time.
532pub const BO4E_V202501_BASE: &str =
533 "https://raw.githubusercontent.com/BO4E/BO4E-Schemas/v202501.0.0/src/bo4e_schemas";
534
535/// Construct a BO4E v202501.0.0 JSON Schema URL for a Business Object.
536///
537/// ```rust
538/// use mako_engine::bo4e_schema_url;
539/// assert!(bo4e_schema_url!("bo", "Marktlokation").contains("Marktlokation"));
540/// ```
541#[macro_export]
542macro_rules! bo4e_schema_url {
543 ($category:literal, $name:literal) => {
544 concat!(
545 "https://raw.githubusercontent.com/BO4E/BO4E-Schemas/v202501.0.0/src/bo4e_schemas/",
546 $category,
547 "/",
548 $name,
549 ".json",
550 )
551 };
552}
553
554/// BO4E JSON Schema URL for `Marktlokation`.
555pub const BO4E_SCHEMA_MARKTLOKATION: &str = bo4e_schema_url!("bo", "Marktlokation");
556
557/// BO4E JSON Schema URL for `Messlokation`.
558pub const BO4E_SCHEMA_MESSLOKATION: &str = bo4e_schema_url!("bo", "Messlokation");
559
560/// BO4E JSON Schema URL for `Vertrag`.
561pub const BO4E_SCHEMA_VERTRAG: &str = bo4e_schema_url!("bo", "Vertrag");
562
563/// BO4E JSON Schema URL for `Energiemenge`.
564pub const BO4E_SCHEMA_ENERGIEMENGE: &str = bo4e_schema_url!("bo", "Energiemenge");
565
566/// BO4E JSON Schema URL for `Rechnung`.
567pub const BO4E_SCHEMA_RECHNUNG: &str = bo4e_schema_url!("bo", "Rechnung");
568
569/// BO4E JSON Schema URL for `Zaehler`.
570pub const BO4E_SCHEMA_ZAEHLER: &str = bo4e_schema_url!("bo", "Zaehler");
571
572/// BO4E JSON Schema URL for `Geschaeftspartner`.
573pub const BO4E_SCHEMA_GESCHAEFTSPARTNER: &str = bo4e_schema_url!("bo", "Geschaeftspartner");