use std::sync::Arc;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use crate::ids::{ConversationId, EventId, ProcessId, TenantId};
#[derive(Debug, thiserror::Error)]
pub enum ErpAdapterError {
#[error("ERP payload error: {0}")]
Payload(String),
#[error("ERP transport error: {0}")]
Transport(String),
#[error("ERP permanent error: {0}")]
Permanent(String),
}
impl ErpAdapterError {
pub fn payload(e: impl std::fmt::Display) -> Self {
Self::Payload(e.to_string())
}
pub fn transport(e: impl std::fmt::Display) -> Self {
Self::Transport(e.to_string())
}
pub fn permanent(e: impl std::fmt::Display) -> Self {
Self::Permanent(e.to_string())
}
#[must_use]
pub fn is_retryable(&self) -> bool {
matches!(self, Self::Transport(_))
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ErpEventType {
ProcessInitiated,
AperakAccepted,
AperakRejected,
AperakTimeout,
ContrlReceived,
ProcessCompleted,
MaloIdentified,
ProcessFailed {
reason: Box<str>,
},
}
impl ErpEventType {
#[must_use]
pub fn label(&self) -> &'static str {
match self {
Self::ProcessInitiated => "process_initiated",
Self::AperakAccepted => "aperak_accepted",
Self::AperakRejected => "aperak_rejected",
Self::AperakTimeout => "aperak_timeout",
Self::ContrlReceived => "contrl_received",
Self::ProcessCompleted => "process_completed",
Self::MaloIdentified => "malo_identified",
Self::ProcessFailed { .. } => "process_failed",
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ErpEvent {
pub idempotency_key: String,
pub event_type: ErpEventType,
pub process_id: ProcessId,
pub tenant_id: TenantId,
pub conversation_id: ConversationId,
pub causation_id: EventId,
pub pid: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub payload_schema: Option<String>,
pub payload: serde_json::Value,
pub occurred_at: OffsetDateTime,
}
#[allow(async_fn_in_trait)]
pub trait ErpAdapter: Send + Sync + 'static {
async fn notify(&self, event: ErpEvent) -> Result<(), ErpAdapterError>;
}
impl<T: ErpAdapter> ErpAdapter for Arc<T> {
async fn notify(&self, event: ErpEvent) -> Result<(), ErpAdapterError> {
(**self).notify(event).await
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InboundErpCommand {
pub idempotency_key: String,
pub tenant_id: TenantId,
pub payload_schema: String,
pub payload: serde_json::Value,
}
#[allow(async_fn_in_trait)]
pub trait ErpCommandSource: Send + Sync + 'static {
async fn next(&self) -> Result<Option<InboundErpCommand>, ErpAdapterError>;
async fn ack(&self, id: &str) -> Result<(), ErpAdapterError>;
async fn nack(&self, id: &str, reason: &str) -> Result<(), ErpAdapterError>;
}
impl<S: ErpCommandSource> ErpCommandSource for Arc<S> {
async fn next(&self) -> Result<Option<InboundErpCommand>, ErpAdapterError> {
(**self).next().await
}
async fn ack(&self, id: &str) -> Result<(), ErpAdapterError> {
(**self).ack(id).await
}
async fn nack(&self, id: &str, reason: &str) -> Result<(), ErpAdapterError> {
(**self).nack(id, reason).await
}
}
#[cfg(feature = "testing")]
#[derive(Debug, Clone, Default)]
pub struct NoopErpAdapter;
#[cfg(feature = "testing")]
impl ErpAdapter for NoopErpAdapter {
async fn notify(&self, _event: ErpEvent) -> Result<(), ErpAdapterError> {
Ok(())
}
}
#[derive(Debug, Clone, Default)]
pub struct LogErpAdapter;
impl ErpAdapter for LogErpAdapter {
async fn notify(&self, event: ErpEvent) -> Result<(), ErpAdapterError> {
tracing::info!(
idempotency_key = %event.idempotency_key,
event_type = event.event_type.label(),
process_id = %event.process_id,
tenant_id = %event.tenant_id,
pid = event.pid,
"ErpAdapter: event logged (no delivery configured)",
);
Ok(())
}
}
#[cfg(feature = "testing")]
#[derive(Debug, Clone, Default)]
pub struct NoopErpCommandSource;
#[cfg(feature = "testing")]
impl ErpCommandSource for NoopErpCommandSource {
async fn next(&self) -> Result<Option<InboundErpCommand>, ErpAdapterError> {
Ok(None)
}
async fn ack(&self, _id: &str) -> Result<(), ErpAdapterError> {
Ok(())
}
async fn nack(&self, _id: &str, _reason: &str) -> Result<(), ErpAdapterError> {
Ok(())
}
}
#[cfg(feature = "testing")]
#[derive(Debug, Clone, Default)]
pub struct ErpAdapterTestHarness {
events: Arc<tokio::sync::Mutex<Vec<ErpEvent>>>,
}
#[cfg(feature = "testing")]
impl ErpAdapterTestHarness {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub async fn events(&self) -> Vec<ErpEvent> {
self.events.lock().await.clone()
}
pub async fn drain(&self) -> Vec<ErpEvent> {
std::mem::take(&mut *self.events.lock().await)
}
}
#[cfg(feature = "testing")]
impl ErpAdapter for ErpAdapterTestHarness {
async fn notify(&self, event: ErpEvent) -> Result<(), ErpAdapterError> {
self.events.lock().await.push(event);
Ok(())
}
}
#[cfg(feature = "testing")]
#[derive(Debug, Clone, Default)]
pub struct ErpCommandSourceTestHarness {
queue: Arc<tokio::sync::Mutex<std::collections::VecDeque<InboundErpCommand>>>,
acked: Arc<tokio::sync::Mutex<Vec<String>>>,
nacked: Arc<tokio::sync::Mutex<Vec<(String, String)>>>,
}
#[cfg(feature = "testing")]
impl ErpCommandSourceTestHarness {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub async fn inject(&self, cmd: InboundErpCommand) {
self.queue.lock().await.push_back(cmd);
}
pub async fn acked(&self) -> Vec<String> {
self.acked.lock().await.clone()
}
pub async fn nacked(&self) -> Vec<(String, String)> {
self.nacked.lock().await.clone()
}
}
#[cfg(feature = "testing")]
impl ErpCommandSource for ErpCommandSourceTestHarness {
async fn next(&self) -> Result<Option<InboundErpCommand>, ErpAdapterError> {
Ok(self.queue.lock().await.pop_front())
}
async fn ack(&self, id: &str) -> Result<(), ErpAdapterError> {
self.acked.lock().await.push(id.to_owned());
Ok(())
}
async fn nack(&self, id: &str, reason: &str) -> Result<(), ErpAdapterError> {
self.nacked
.lock()
.await
.push((id.to_owned(), reason.to_owned()));
Ok(())
}
}
pub const BO4E_V202501_BASE: &str =
"https://raw.githubusercontent.com/BO4E/BO4E-Schemas/v202501.0.0/src/bo4e_schemas";
#[macro_export]
macro_rules! bo4e_schema_url {
($category:literal, $name:literal) => {
concat!(
"https://raw.githubusercontent.com/BO4E/BO4E-Schemas/v202501.0.0/src/bo4e_schemas/",
$category,
"/",
$name,
".json",
)
};
}
pub const BO4E_SCHEMA_MARKTLOKATION: &str = bo4e_schema_url!("bo", "Marktlokation");
pub const BO4E_SCHEMA_MESSLOKATION: &str = bo4e_schema_url!("bo", "Messlokation");
pub const BO4E_SCHEMA_VERTRAG: &str = bo4e_schema_url!("bo", "Vertrag");
pub const BO4E_SCHEMA_ENERGIEMENGE: &str = bo4e_schema_url!("bo", "Energiemenge");
pub const BO4E_SCHEMA_RECHNUNG: &str = bo4e_schema_url!("bo", "Rechnung");
pub const BO4E_SCHEMA_ZAEHLER: &str = bo4e_schema_url!("bo", "Zaehler");
pub const BO4E_SCHEMA_GESCHAEFTSPARTNER: &str = bo4e_schema_url!("bo", "Geschaeftspartner");