use std::sync::Arc;
#[cfg(any(test, feature = "testing"))]
use std::collections::HashMap;
#[cfg(any(test, feature = "testing"))]
use tokio::sync::RwLock;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use crate::{error::EngineError, ids::TenantId, types::MarktpartnerCode};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct CommunicationChannel {
pub qualifier: Box<str>,
pub address: Box<str>,
}
impl CommunicationChannel {
#[must_use]
pub fn new(qualifier: impl Into<Box<str>>, address: impl Into<Box<str>>) -> Self {
Self {
qualifier: qualifier.into(),
address: address.into(),
}
}
#[must_use]
pub fn as4(endpoint_url: impl Into<Box<str>>) -> Self {
Self::new("AK", endpoint_url)
}
#[must_use]
pub fn email(address: impl Into<Box<str>>) -> Self {
Self::new("EM", address)
}
#[must_use]
pub fn api_webdienste(base_url: impl Into<Box<str>>) -> Self {
Self::new("AW", base_url)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ContactPerson {
pub name: Box<str>,
pub channels: Vec<CommunicationChannel>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[non_exhaustive]
pub enum MarketRole {
LfStrom,
NbStrom,
MsbStrom,
Bkv,
Biko,
Uenb,
Esa,
LfGas,
NbGas,
MsbGas,
Mgv,
CrossCommodity,
}
impl MarketRole {
#[must_use]
pub fn from_pid(pid: u32) -> Option<Self> {
match pid {
37000 => Some(Self::LfStrom),
37001 => Some(Self::NbStrom),
37002 => Some(Self::MsbStrom),
37003 => Some(Self::Bkv),
37004 => Some(Self::Biko),
37005 => Some(Self::Uenb),
37006 => Some(Self::Esa),
37008 => Some(Self::LfGas),
37009 => Some(Self::NbGas),
37010 => Some(Self::MsbGas),
37011 => Some(Self::Mgv),
37012..=37014 => Some(Self::CrossCommodity),
_ => None,
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct PartnerRecord {
pub gln: MarktpartnerCode,
pub display_name: Option<Box<str>>,
pub channels: Vec<CommunicationChannel>,
pub roles: Vec<MarketRole>,
#[serde(
default,
skip_serializing_if = "Option::is_none",
with = "time::serde::rfc3339::option"
)]
pub valid_from: Option<OffsetDateTime>,
pub contacts: Vec<ContactPerson>,
pub country_code: Option<Box<str>>,
#[serde(with = "time::serde::rfc3339")]
pub updated_at: OffsetDateTime,
}
impl PartnerRecord {
#[must_use]
pub fn minimal(gln: impl Into<MarktpartnerCode>, as4_url: impl Into<Box<str>>) -> Self {
Self {
gln: gln.into(),
display_name: None,
channels: vec![CommunicationChannel::as4(as4_url)],
roles: Vec::new(),
valid_from: None,
contacts: Vec::new(),
country_code: None,
updated_at: OffsetDateTime::now_utc(),
}
}
pub fn from_cli_pairs(pairs: &[impl AsRef<str>]) -> Result<Vec<Self>, EngineError> {
pairs
.iter()
.map(|entry| {
let pair = entry.as_ref();
let (gln, url) = pair.split_once('=').ok_or_else(|| {
EngineError::partner(format!(
"invalid partner entry {pair:?} — expected <GLN>=<HTTPS-URL>"
))
})?;
let gln = gln.trim();
let url = url.trim();
if gln.is_empty() {
return Err(EngineError::partner(format!(
"invalid partner entry {pair:?} — GLN must not be empty"
)));
}
if !url.starts_with("https://") {
return Err(EngineError::partner(format!(
"invalid partner entry {pair:?} — endpoint URL must use HTTPS (got {url:?})"
)));
}
Ok(Self::minimal(gln, url))
})
.collect()
}
#[must_use]
pub fn as4_endpoint(&self) -> Option<&str> {
self.channels
.iter()
.find(|c| c.qualifier.as_ref() == "AK" || c.qualifier.as_ref() == "AS4")
.map(|c| c.address.as_ref())
}
#[must_use]
pub fn email(&self) -> Option<&str> {
self.channels
.iter()
.find(|c| c.qualifier.as_ref() == "EM")
.map(|c| c.address.as_ref())
}
#[must_use]
pub fn api_webdienste_endpoint(&self) -> Option<&str> {
self.channels
.iter()
.find(|c| c.qualifier.as_ref() == "AW")
.map(|c| c.address.as_ref())
}
pub fn merge_from_partin(&mut self, incoming: PartnerRecord) {
if incoming.gln != self.gln {
return;
}
let should_update = match (self.valid_from, incoming.valid_from) {
(None, _) => true,
(Some(_), None) => false, (Some(a), Some(b)) => b >= a,
};
if !should_update {
return;
}
self.display_name = incoming.display_name.or(self.display_name.take());
self.channels = incoming.channels;
self.roles = incoming.roles;
self.valid_from = incoming.valid_from;
self.contacts = incoming.contacts;
self.country_code = incoming.country_code.or(self.country_code.take());
self.updated_at = incoming.updated_at;
}
}
#[allow(async_fn_in_trait)]
pub trait PartnerStore: Send + Sync {
async fn upsert(&self, tenant_id: TenantId, record: &PartnerRecord) -> Result<(), EngineError>;
async fn get(
&self,
tenant_id: TenantId,
gln: &MarktpartnerCode,
) -> Result<Option<PartnerRecord>, EngineError>;
async fn remove(&self, tenant_id: TenantId, gln: &MarktpartnerCode) -> Result<(), EngineError>;
async fn list(&self, tenant_id: TenantId) -> Result<Vec<PartnerRecord>, EngineError>;
async fn as4_endpoint(
&self,
tenant_id: TenantId,
gln: &MarktpartnerCode,
) -> Result<Option<Box<str>>, EngineError> {
Ok(self
.get(tenant_id, gln)
.await?
.and_then(|r| r.as4_endpoint().map(std::convert::Into::into)))
}
async fn api_webdienste_endpoint(
&self,
tenant_id: TenantId,
gln: &MarktpartnerCode,
) -> Result<Option<Box<str>>, EngineError> {
Ok(self
.get(tenant_id, gln)
.await?
.and_then(|r| r.api_webdienste_endpoint().map(std::convert::Into::into)))
}
}
impl<S: PartnerStore> PartnerStore for Arc<S> {
async fn upsert(&self, tenant_id: TenantId, record: &PartnerRecord) -> Result<(), EngineError> {
self.as_ref().upsert(tenant_id, record).await
}
async fn get(
&self,
tenant_id: TenantId,
gln: &MarktpartnerCode,
) -> Result<Option<PartnerRecord>, EngineError> {
self.as_ref().get(tenant_id, gln).await
}
async fn remove(&self, tenant_id: TenantId, gln: &MarktpartnerCode) -> Result<(), EngineError> {
self.as_ref().remove(tenant_id, gln).await
}
async fn list(&self, tenant_id: TenantId) -> Result<Vec<PartnerRecord>, EngineError> {
self.as_ref().list(tenant_id).await
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct NoopPartnerStore;
impl PartnerStore for NoopPartnerStore {
async fn upsert(
&self,
_tenant_id: TenantId,
_record: &PartnerRecord,
) -> Result<(), EngineError> {
Ok(())
}
async fn get(
&self,
_tenant_id: TenantId,
_gln: &MarktpartnerCode,
) -> Result<Option<PartnerRecord>, EngineError> {
Ok(None)
}
async fn remove(
&self,
_tenant_id: TenantId,
_gln: &MarktpartnerCode,
) -> Result<(), EngineError> {
Ok(())
}
async fn list(&self, _tenant_id: TenantId) -> Result<Vec<PartnerRecord>, EngineError> {
Ok(vec![])
}
}
#[cfg(any(test, feature = "testing"))]
#[derive(Debug, Clone, Default)]
pub struct InMemoryPartnerStore {
inner: Arc<RwLock<HashMap<(TenantId, MarktpartnerCode), PartnerRecord>>>,
}
#[cfg(any(test, feature = "testing"))]
impl InMemoryPartnerStore {
#[must_use]
pub fn new() -> Self {
Self::default()
}
}
#[cfg(any(test, feature = "testing"))]
impl PartnerStore for InMemoryPartnerStore {
async fn upsert(&self, tenant_id: TenantId, record: &PartnerRecord) -> Result<(), EngineError> {
let mut guard = self.inner.write().await;
let key = (tenant_id, record.gln.clone());
match guard.get_mut(&key) {
Some(existing) => existing.merge_from_partin(record.clone()),
None => {
guard.insert(key, record.clone());
}
}
Ok(())
}
async fn get(
&self,
tenant_id: TenantId,
gln: &MarktpartnerCode,
) -> Result<Option<PartnerRecord>, EngineError> {
Ok(self
.inner
.read()
.await
.get(&(tenant_id, gln.clone()))
.cloned())
}
async fn remove(&self, tenant_id: TenantId, gln: &MarktpartnerCode) -> Result<(), EngineError> {
self.inner.write().await.remove(&(tenant_id, gln.clone()));
Ok(())
}
async fn list(&self, tenant_id: TenantId) -> Result<Vec<PartnerRecord>, EngineError> {
Ok(self
.inner
.read()
.await
.iter()
.filter(|((tid, _), _)| *tid == tenant_id)
.map(|(_, record)| record.clone())
.collect())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn gln(s: &str) -> MarktpartnerCode {
MarktpartnerCode::new(s)
}
fn tid() -> TenantId {
TenantId::new()
}
fn minimal_record(gln_str: &str, url: &str) -> PartnerRecord {
PartnerRecord::minimal(gln(gln_str), url)
}
#[test]
fn from_cli_pairs_parses_valid_entries() {
let pairs = vec![
"9900000000002=https://partner-a.example/as4/inbox",
"9900000000003=https://partner-b.example/as4/inbox",
];
let records = PartnerRecord::from_cli_pairs(&pairs).unwrap();
assert_eq!(records.len(), 2);
assert_eq!(records[0].gln.as_str(), "9900000000002");
assert_eq!(
records[0].as4_endpoint(),
Some("https://partner-a.example/as4/inbox")
);
assert_eq!(records[1].gln.as_str(), "9900000000003");
}
#[test]
fn from_cli_pairs_rejects_missing_equals() {
let pairs = vec!["9900000000002https://no-equals.example"];
assert!(PartnerRecord::from_cli_pairs(&pairs).is_err());
}
#[test]
fn from_cli_pairs_rejects_http_url() {
let pairs = vec!["9900000000002=http://insecure.example/as4"];
assert!(PartnerRecord::from_cli_pairs(&pairs).is_err());
}
#[test]
fn from_cli_pairs_rejects_empty_gln() {
let pairs = vec!["=https://no-gln.example/as4"];
assert!(PartnerRecord::from_cli_pairs(&pairs).is_err());
}
#[test]
fn as4_endpoint_returns_ak_channel() {
let r = minimal_record("9900000000002", "https://a.example/as4");
assert_eq!(r.as4_endpoint(), Some("https://a.example/as4"));
}
#[test]
fn as4_endpoint_returns_none_when_absent() {
let r = PartnerRecord {
gln: gln("9900000000002"),
display_name: None,
channels: vec![CommunicationChannel::email("info@example.de")],
roles: vec![],
valid_from: None,
contacts: vec![],
country_code: None,
updated_at: OffsetDateTime::now_utc(),
};
assert!(r.as4_endpoint().is_none());
}
#[test]
fn merge_overwrites_config_record_with_partin_data() {
let mut base = minimal_record("9900000000002", "https://old.example/as4");
let newer = PartnerRecord {
gln: gln("9900000000002"),
display_name: Some("Stadtwerke AG".into()),
channels: vec![
CommunicationChannel::as4("https://new.example/as4"),
CommunicationChannel::email("edifact@sw.example"),
],
roles: vec![MarketRole::NbStrom],
valid_from: Some(OffsetDateTime::now_utc()),
contacts: vec![],
country_code: Some("DE".into()),
updated_at: OffsetDateTime::now_utc(),
};
base.merge_from_partin(newer.clone());
assert_eq!(base.as4_endpoint(), Some("https://new.example/as4"));
assert_eq!(base.display_name.as_deref(), Some("Stadtwerke AG"));
assert_eq!(base.roles, vec![MarketRole::NbStrom]);
}
#[test]
fn merge_ignores_older_partin() {
use time::Duration;
let old_ts = OffsetDateTime::now_utc() - Duration::days(30);
let new_ts = OffsetDateTime::now_utc();
let mut current = PartnerRecord {
gln: gln("9900000000002"),
display_name: Some("Current Name".into()),
channels: vec![CommunicationChannel::as4("https://current.example/as4")],
roles: vec![MarketRole::NbStrom],
valid_from: Some(new_ts),
contacts: vec![],
country_code: Some("DE".into()),
updated_at: OffsetDateTime::now_utc(),
};
let stale = PartnerRecord {
gln: gln("9900000000002"),
display_name: Some("Stale Name".into()),
channels: vec![CommunicationChannel::as4("https://stale.example/as4")],
roles: vec![],
valid_from: Some(old_ts),
contacts: vec![],
country_code: None,
updated_at: OffsetDateTime::now_utc(),
};
current.merge_from_partin(stale);
assert_eq!(current.display_name.as_deref(), Some("Current Name"));
assert_eq!(current.as4_endpoint(), Some("https://current.example/as4"));
}
#[test]
fn merge_ignores_wrong_gln() {
let mut r = minimal_record("9900000000002", "https://a.example/as4");
let other = minimal_record("9900000000003", "https://b.example/as4");
r.merge_from_partin(other);
assert_eq!(r.as4_endpoint(), Some("https://a.example/as4"));
}
#[test]
fn market_role_from_pid_covers_all_partin_pids() {
for pid in [
37000u32, 37001, 37002, 37003, 37004, 37005, 37006, 37008, 37009, 37010, 37011, 37012,
37013, 37014,
] {
assert!(
MarketRole::from_pid(pid).is_some(),
"MarketRole::from_pid({pid}) should return Some"
);
}
assert!(MarketRole::from_pid(37007).is_none());
assert!(MarketRole::from_pid(0).is_none());
}
#[tokio::test]
async fn in_memory_upsert_and_get() {
let store = InMemoryPartnerStore::new();
let tenant = tid();
let record = minimal_record("9900000000001", "https://a.example/as4");
store.upsert(tenant, &record).await.unwrap();
let found = store
.get(tenant, &gln("9900000000001"))
.await
.unwrap()
.unwrap();
assert_eq!(found.as4_endpoint(), Some("https://a.example/as4"));
}
#[tokio::test]
async fn in_memory_get_returns_none_for_unknown() {
let store = InMemoryPartnerStore::new();
assert!(
store
.get(tid(), &gln("9900000000099"))
.await
.unwrap()
.is_none()
);
}
#[tokio::test]
async fn in_memory_upsert_merges_into_existing() {
let store = InMemoryPartnerStore::new();
let tenant = tid();
let base = minimal_record("9900000000001", "https://old.example/as4");
store.upsert(tenant, &base).await.unwrap();
let newer = PartnerRecord {
gln: gln("9900000000001"),
display_name: Some("Partner AG".into()),
channels: vec![CommunicationChannel::as4("https://new.example/as4")],
roles: vec![MarketRole::LfStrom],
valid_from: Some(OffsetDateTime::now_utc()),
contacts: vec![],
country_code: Some("DE".into()),
updated_at: OffsetDateTime::now_utc(),
};
store.upsert(tenant, &newer).await.unwrap();
let found = store
.get(tenant, &gln("9900000000001"))
.await
.unwrap()
.unwrap();
assert_eq!(found.as4_endpoint(), Some("https://new.example/as4"));
assert_eq!(found.display_name.as_deref(), Some("Partner AG"));
}
#[tokio::test]
async fn in_memory_remove_clears_record() {
let store = InMemoryPartnerStore::new();
let tenant = tid();
let record = minimal_record("9900000000001", "https://a.example/as4");
store.upsert(tenant, &record).await.unwrap();
store.remove(tenant, &gln("9900000000001")).await.unwrap();
assert!(
store
.get(tenant, &gln("9900000000001"))
.await
.unwrap()
.is_none()
);
}
#[tokio::test]
async fn in_memory_list_is_tenant_scoped() {
let store = InMemoryPartnerStore::new();
let t1 = tid();
let t2 = tid();
store
.upsert(
t1,
&minimal_record("9900000000001", "https://a.example/as4"),
)
.await
.unwrap();
store
.upsert(
t2,
&minimal_record("9900000000002", "https://b.example/as4"),
)
.await
.unwrap();
let t1_list = store.list(t1).await.unwrap();
assert_eq!(t1_list.len(), 1);
assert_eq!(t1_list[0].gln.as_str(), "9900000000001");
let t2_list = store.list(t2).await.unwrap();
assert_eq!(t2_list.len(), 1);
assert_eq!(t2_list[0].gln.as_str(), "9900000000002");
}
#[tokio::test]
async fn as4_endpoint_convenience_method() {
let store = InMemoryPartnerStore::new();
let tenant = tid();
let record = minimal_record("9900000000001", "https://a.example/as4");
store.upsert(tenant, &record).await.unwrap();
let url = store
.as4_endpoint(tenant, &gln("9900000000001"))
.await
.unwrap();
assert_eq!(url.as_deref(), Some("https://a.example/as4"));
let none = store
.as4_endpoint(tenant, &gln("9900000000099"))
.await
.unwrap();
assert!(none.is_none());
}
}