mako_engine/partner.rs
1//! Trading-partner master data — the [`PartnerStore`] trait and supporting
2//! types.
3//!
4//! # Why not just a `HashMap<GLN, URL>` in config?
5//!
6//! The `partners = ["GLN=URL", …]` field in `makod.toml` works for
7//! development but falls short in production:
8//!
9//! | Requirement | Config-only | `PartnerStore` |
10//! |---|---|---|
11//! | Survives restarts without re-deployment | ❌ | ✅ |
12//! | Carries PARTIN-derived metadata (validity, contacts, bank) | ❌ | ✅ |
13//! | Updatable from inbound PARTIN messages at runtime | ❌ | ✅ |
14//! | Tenant-scoped isolation | ❌ | ✅ |
15//! | Multiple communication channels per partner | ❌ | ✅ |
16//! | Validity windows (Gültig Ab) for future-dated updates | ❌ | ✅ |
17//!
18//! # PARTIN data model
19//!
20//! The German energy market uses EDIFACT **PARTIN** messages (PIDs 37000–37014)
21//! to distribute market-participant master data. Each PARTIN carries:
22//!
23//! - `NAD` → GLN, company name, country code
24//! - `COM` → communication channels: AS4 endpoint URL, email, fax (up to 5)
25//! - `CCI/CAV` → availability windows (*Erreichbarkeit*)
26//! - `FII` → bank account (IBAN, BIC)
27//! - `RFF` → tax number, VAT ID
28//! - `CTA/NAD` → contact persons (*Ansprechpartner*)
29//! - `DTM` → valid-from date (*Gültig Ab*)
30//! - `CCI` → associated Bilanzkreis
31//!
32//! [`PartnerRecord`] captures all of these fields in a form that is both
33//! serializable to SlateDB and constructible from static config.
34//!
35//! # Bootstrap pattern
36//!
37//! ```rust,ignore
38//! // At startup — seed from makod.toml `[as4] partners` list:
39//! for record in PartnerRecord::from_cli_pairs(&config.as4.partners)? {
40//! store.upsert(tenant_id, &record).await?;
41//! }
42//!
43//! // Later — update from inbound PARTIN message:
44//! let record = parse_partin_37001(&edifact_interchange)?;
45//! store.upsert(tenant_id, &record).await?;
46//!
47//! // Outbound AS4 dispatch:
48//! let partner = store.get(tenant_id, &gln).await?
49//! .ok_or(EngineError::partner(format!("no endpoint for {gln}")))?;
50//! let endpoint = partner.as4_endpoint
51//! .ok_or(EngineError::partner(format!("{gln} has no AS4 endpoint")))?;
52//! ```
53//!
54//! # Key schema (SlateDB)
55//!
56//! `pt/{tenant_id}/{gln}` → `JSON(PartnerRecord)`
57//!
58//! Both `TenantId` and GLN are fixed-width strings, giving a
59//! `pt/{36-chars}/{13-chars}` prefix that bounds efficient per-tenant scans.
60
61use std::sync::Arc;
62
63#[cfg(any(test, feature = "testing"))]
64use std::collections::HashMap;
65#[cfg(any(test, feature = "testing"))]
66use tokio::sync::RwLock;
67
68use serde::{Deserialize, Serialize};
69use time::OffsetDateTime;
70
71use crate::{error::EngineError, ids::TenantId, types::MarktpartnerCode};
72
73// ── CommunicationChannel ──────────────────────────────────────────────────────
74
75/// A single communication channel extracted from a PARTIN `COM` segment.
76///
77/// PARTIN allows up to 5 `COM` segments per party. The `qualifier` uses the
78/// UN/EDIFACT DE 3155 code list:
79///
80/// | Qualifier | Meaning |
81/// |---|---|
82/// | `EM` | Electronic mail (primary) |
83/// | `AK` | Electronic mail (alternative) |
84/// | `TE` | Telephone |
85/// | `FX` | Fax |
86/// | `AS4` | BDEW AS4 endpoint URL (non-standard extension) |
87/// | `AW` | BDEW API-Webdienste Strom endpoint URL (Verzeichnisdienst-discovered) |
88///
89/// > **Note**: BDEW uses qualifier `AK` for the AS4 endpoint URL in PARTIN
90/// > AHB 1.0f. The `AS4` literal is used here as an explicit semantic label
91/// > for channels that have already been identified as AS4 endpoints.
92/// >
93/// > `AW` is a project-internal qualifier used to store the API-Webdienste
94/// > Strom base URL discovered from the BDEW Verzeichnisdienst.
95#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
96pub struct CommunicationChannel {
97 /// DE 3155 communication qualifier (`EM`, `TE`, `FX`, `AK`, …).
98 pub qualifier: Box<str>,
99 /// The communication address (URL, email address, phone number).
100 pub address: Box<str>,
101}
102
103impl CommunicationChannel {
104 /// Construct a new channel.
105 #[must_use]
106 pub fn new(qualifier: impl Into<Box<str>>, address: impl Into<Box<str>>) -> Self {
107 Self {
108 qualifier: qualifier.into(),
109 address: address.into(),
110 }
111 }
112
113 /// Convenience: construct an AS4 endpoint channel.
114 ///
115 /// Uses qualifier `"AK"` per PARTIN AHB 1.0f DE 3155 convention.
116 #[must_use]
117 pub fn as4(endpoint_url: impl Into<Box<str>>) -> Self {
118 Self::new("AK", endpoint_url)
119 }
120
121 /// Convenience: construct an email channel.
122 #[must_use]
123 pub fn email(address: impl Into<Box<str>>) -> Self {
124 Self::new("EM", address)
125 }
126
127 /// Convenience: construct an API-Webdienste Strom endpoint channel.
128 ///
129 /// Uses qualifier `"AW"` (project-internal) to store the base URL
130 /// discovered from the BDEW Verzeichnisdienst for a given partner.
131 #[must_use]
132 pub fn api_webdienste(base_url: impl Into<Box<str>>) -> Self {
133 Self::new("AW", base_url)
134 }
135}
136
137// ── ContactPerson ─────────────────────────────────────────────────────────────
138
139/// A contact person extracted from a PARTIN `CTA`/`NAD`/`COM` group.
140///
141/// Corresponds to the *Ansprechpartner* group in PARTIN AHB 1.0f.
142#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
143pub struct ContactPerson {
144 /// Full name or department name.
145 pub name: Box<str>,
146 /// Contact channels (phone, email, …).
147 pub channels: Vec<CommunicationChannel>,
148}
149
150// ── MarketRole ────────────────────────────────────────────────────────────────
151
152/// The market role of a trading partner as declared in their PARTIN message.
153///
154/// Matches BDEW PARTIN Prüfidentifikator prefixes:
155///
156/// | PID | Role |
157/// |---|---|
158/// | 37000 | Lieferant Strom (`LfStrom`) |
159/// | 37001 | Netzbetreiber Strom (`NbStrom`) |
160/// | 37002 | Messstellenbetreiber Strom (`MsbStrom`) |
161/// | 37003 | Bilanzkreisverantwortlicher Strom (`Bkv`) |
162/// | 37004 | Bilanzkoordinator Strom (`Biko`) |
163/// | 37005 | Übertragungsnetzbetreiber Strom (`Uenb`) |
164/// | 37006 | Energiedienstleister/Serviceanbieter Strom (`Esa`) |
165/// | 37008 | Lieferant Gas (`LfGas`) |
166/// | 37009 | Netzbetreiber Gas (`NbGas`) |
167/// | 37010 | Messstellenbetreiber Gas (`MsbGas`) |
168/// | 37011 | Marktgebietsverantwortlicher Gas (`Mgv`) |
169/// | 37012–37014 | Cross-commodity roles |
170#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
171#[non_exhaustive]
172pub enum MarketRole {
173 /// Lieferant Strom (PID 37000)
174 LfStrom,
175 /// Netzbetreiber Strom (PID 37001)
176 NbStrom,
177 /// Messstellenbetreiber Strom (PID 37002)
178 MsbStrom,
179 /// Bilanzkreisverantwortlicher Strom (PID 37003)
180 Bkv,
181 /// Bilanzkoordinator Strom (PID 37004)
182 Biko,
183 /// Übertragungsnetzbetreiber Strom (PID 37005)
184 Uenb,
185 /// Energiedienstleister / Serviceanbieter Strom (PID 37006)
186 Esa,
187 /// Lieferant Gas (PID 37008)
188 LfGas,
189 /// Netzbetreiber Gas (PID 37009)
190 NbGas,
191 /// Messstellenbetreiber Gas (PID 37010)
192 MsbGas,
193 /// Marktgebietsverantwortlicher Gas (PID 37011)
194 Mgv,
195 /// Cross-commodity (PIDs 37012–37014)
196 CrossCommodity,
197}
198
199impl MarketRole {
200 /// Map a PARTIN Prüfidentifikator code to the corresponding `MarketRole`.
201 ///
202 /// Returns `None` for unrecognised codes.
203 #[must_use]
204 pub fn from_pid(pid: u32) -> Option<Self> {
205 match pid {
206 37000 => Some(Self::LfStrom),
207 37001 => Some(Self::NbStrom),
208 37002 => Some(Self::MsbStrom),
209 37003 => Some(Self::Bkv),
210 37004 => Some(Self::Biko),
211 37005 => Some(Self::Uenb),
212 37006 => Some(Self::Esa),
213 37008 => Some(Self::LfGas),
214 37009 => Some(Self::NbGas),
215 37010 => Some(Self::MsbGas),
216 37011 => Some(Self::Mgv),
217 37012..=37014 => Some(Self::CrossCommodity),
218 _ => None,
219 }
220 }
221}
222
223// ── PartnerRecord ─────────────────────────────────────────────────────────────
224
225/// Full trading-partner master record as stored in the [`PartnerStore`].
226///
227/// Populated either from static `makod.toml` config (minimal — GLN + AS4 URL
228/// only) or from an inbound PARTIN EDIFACT message (complete). Records from
229/// different sources coexist: a bootstrapped config record is upgraded in-place
230/// when the same partner later sends a PARTIN.
231///
232/// ## Constructors
233///
234/// - [`PartnerRecord::minimal`] — for bootstrapping from `GLN=URL` config pairs
235/// - [`PartnerRecord::from_cli_pairs`] — parse `[as4] partners` list from config
236///
237/// ## Merging
238///
239/// Use [`PartnerRecord::merge_from_partin`] to update an existing record with
240/// fields from a newer inbound PARTIN (respects validity dates).
241#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
242pub struct PartnerRecord {
243 /// The partner's 13-digit Global Location Number.
244 pub gln: MarktpartnerCode,
245
246 /// Company name from the PARTIN `NAD` segment.
247 pub display_name: Option<Box<str>>,
248
249 /// All communication channels from PARTIN `COM` segments.
250 ///
251 /// The AS4 endpoint is the entry with qualifier `"AK"` (PARTIN AHB 1.0f
252 /// DE 3155 convention). Use [`as4_endpoint`] for direct access.
253 ///
254 /// [`as4_endpoint`]: PartnerRecord::as4_endpoint
255 pub channels: Vec<CommunicationChannel>,
256
257 /// Market roles this partner has declared via PARTIN.
258 pub roles: Vec<MarketRole>,
259
260 /// Date from which this record version is valid (`DTM/137`).
261 ///
262 /// `None` when bootstrapped from static config (no validity date known).
263 #[serde(
264 default,
265 skip_serializing_if = "Option::is_none",
266 with = "time::serde::rfc3339::option"
267 )]
268 pub valid_from: Option<OffsetDateTime>,
269
270 /// Contact persons from the PARTIN *Ansprechpartner* group.
271 pub contacts: Vec<ContactPerson>,
272
273 /// ISO 3166-1 alpha-2 country code from `NAD+MS+++...+DE` (usually `DE`).
274 pub country_code: Option<Box<str>>,
275
276 /// Wall-clock time when this record was last written to the store.
277 #[serde(with = "time::serde::rfc3339")]
278 pub updated_at: OffsetDateTime,
279}
280
281impl PartnerRecord {
282 /// Create a minimal record from a GLN and an AS4 endpoint URL.
283 ///
284 /// Used when bootstrapping from `[as4] partners = ["GLN=URL", …]` in
285 /// `makod.toml`. The record has no PARTIN-derived metadata — only the
286 /// GLN and a single AS4 channel.
287 #[must_use]
288 pub fn minimal(gln: impl Into<MarktpartnerCode>, as4_url: impl Into<Box<str>>) -> Self {
289 Self {
290 gln: gln.into(),
291 display_name: None,
292 channels: vec![CommunicationChannel::as4(as4_url)],
293 roles: Vec::new(),
294 valid_from: None,
295 contacts: Vec::new(),
296 country_code: None,
297 updated_at: OffsetDateTime::now_utc(),
298 }
299 }
300
301 /// Parse `["GLN=HTTPS-URL", …]` configuration entries into minimal records.
302 ///
303 /// Returns an error on the first malformed or non-HTTPS entry.
304 ///
305 /// # Errors
306 ///
307 /// Returns [`EngineError::Partner`] when an entry lacks `=`, has an empty
308 /// GLN, or uses a non-HTTPS URL.
309 pub fn from_cli_pairs(pairs: &[impl AsRef<str>]) -> Result<Vec<Self>, EngineError> {
310 pairs
311 .iter()
312 .map(|entry| {
313 let pair = entry.as_ref();
314 let (gln, url) = pair.split_once('=').ok_or_else(|| {
315 EngineError::partner(format!(
316 "invalid partner entry {pair:?} — expected <GLN>=<HTTPS-URL>"
317 ))
318 })?;
319 let gln = gln.trim();
320 let url = url.trim();
321 if gln.is_empty() {
322 return Err(EngineError::partner(format!(
323 "invalid partner entry {pair:?} — GLN must not be empty"
324 )));
325 }
326 if !url.starts_with("https://") {
327 return Err(EngineError::partner(format!(
328 "invalid partner entry {pair:?} — endpoint URL must use HTTPS (got {url:?})"
329 )));
330 }
331 Ok(Self::minimal(gln, url))
332 })
333 .collect()
334 }
335
336 /// Return the AS4 endpoint URL if one has been registered.
337 ///
338 /// Looks for a channel with qualifier `"AK"` (PARTIN AHB 1.0f
339 /// convention for the AS4 endpoint). Falls back to `"AS4"` for records
340 /// that were imported with a non-standard qualifier.
341 #[must_use]
342 pub fn as4_endpoint(&self) -> Option<&str> {
343 self.channels
344 .iter()
345 .find(|c| c.qualifier.as_ref() == "AK" || c.qualifier.as_ref() == "AS4")
346 .map(|c| c.address.as_ref())
347 }
348
349 /// Return the primary email address if one has been registered.
350 ///
351 /// Looks for a channel with qualifier `"EM"`.
352 #[must_use]
353 pub fn email(&self) -> Option<&str> {
354 self.channels
355 .iter()
356 .find(|c| c.qualifier.as_ref() == "EM")
357 .map(|c| c.address.as_ref())
358 }
359
360 /// Return the API-Webdienste Strom base URL if one has been registered.
361 ///
362 /// Looks for a channel with qualifier `"AW"`. This URL is typically
363 /// populated by the Verzeichnisdienst discovery worker and is
364 /// used by `MaloIdentSender` to reach the LF's callback endpoint.
365 #[must_use]
366 pub fn api_webdienste_endpoint(&self) -> Option<&str> {
367 self.channels
368 .iter()
369 .find(|c| c.qualifier.as_ref() == "AW")
370 .map(|c| c.address.as_ref())
371 }
372
373 /// Merge fields from a newer PARTIN-derived record into `self`.
374 ///
375 /// Only updates `self` when `incoming.valid_from` is newer than
376 /// `self.valid_from` (or when `self.valid_from` is `None`). Config-
377 /// bootstrapped records (no `valid_from`) are always overwritten.
378 ///
379 /// The GLN must match — mismatches are silently ignored (the caller is
380 /// responsible for routing PARTIN messages to the correct record).
381 pub fn merge_from_partin(&mut self, incoming: PartnerRecord) {
382 if incoming.gln != self.gln {
383 return;
384 }
385 let should_update = match (self.valid_from, incoming.valid_from) {
386 (None, _) => true,
387 (Some(_), None) => false, // keep the dated record
388 (Some(a), Some(b)) => b >= a,
389 };
390 if !should_update {
391 return;
392 }
393 self.display_name = incoming.display_name.or(self.display_name.take());
394 self.channels = incoming.channels;
395 self.roles = incoming.roles;
396 self.valid_from = incoming.valid_from;
397 self.contacts = incoming.contacts;
398 self.country_code = incoming.country_code.or(self.country_code.take());
399 self.updated_at = incoming.updated_at;
400 }
401}
402
403// ── PartnerStore ──────────────────────────────────────────────────────────────
404
405/// Durable store for trading-partner master records.
406///
407/// Provides tenant-scoped access to [`PartnerRecord`]s. Records are upserted
408/// when a new PARTIN message arrives or when `makod` bootstraps from static
409/// config.
410///
411/// All three operations are idempotent — reinserting the same record is safe.
412///
413/// ## Blanket `Arc` implementation
414///
415/// `Arc<S>` implements `PartnerStore` whenever `S: PartnerStore`.
416#[allow(async_fn_in_trait)]
417pub trait PartnerStore: Send + Sync {
418 /// Insert or update the record for `(tenant_id, record.gln)`.
419 ///
420 /// If a record already exists for this GLN, it is **merged** via
421 /// [`PartnerRecord::merge_from_partin`] — i.e. the newer PARTIN-derived
422 /// record wins, but a config-only bootstrap is always overwritten.
423 ///
424 /// # Errors
425 ///
426 /// Returns [`EngineError::Partner`] on storage failure.
427 async fn upsert(&self, tenant_id: TenantId, record: &PartnerRecord) -> Result<(), EngineError>;
428
429 /// Return the record for `(tenant_id, gln)`, or `None` if not registered.
430 ///
431 /// # Errors
432 ///
433 /// Returns [`EngineError::Partner`] on storage failure.
434 async fn get(
435 &self,
436 tenant_id: TenantId,
437 gln: &MarktpartnerCode,
438 ) -> Result<Option<PartnerRecord>, EngineError>;
439
440 /// Remove the record for `(tenant_id, gln)`.
441 ///
442 /// No-op when the record does not exist.
443 ///
444 /// # Errors
445 ///
446 /// Returns [`EngineError::Partner`] on storage failure.
447 async fn remove(&self, tenant_id: TenantId, gln: &MarktpartnerCode) -> Result<(), EngineError>;
448
449 /// Return all records registered for `tenant_id`.
450 ///
451 /// # Errors
452 ///
453 /// Returns [`EngineError::Partner`] on storage failure.
454 async fn list(&self, tenant_id: TenantId) -> Result<Vec<PartnerRecord>, EngineError>;
455
456 /// Return the AS4 endpoint URL for `gln`, if known.
457 ///
458 /// Convenience wrapper over `get` + `as4_endpoint`.
459 ///
460 /// # Errors
461 ///
462 /// Returns [`EngineError::Partner`] on storage failure.
463 async fn as4_endpoint(
464 &self,
465 tenant_id: TenantId,
466 gln: &MarktpartnerCode,
467 ) -> Result<Option<Box<str>>, EngineError> {
468 Ok(self
469 .get(tenant_id, gln)
470 .await?
471 .and_then(|r| r.as4_endpoint().map(std::convert::Into::into)))
472 }
473
474 /// Return the API-Webdienste Strom base URL for `gln`, if known.
475 ///
476 /// Looks for a channel with qualifier `"AW"` (populated by the
477 /// Verzeichnisdienst discovery path.
478 ///
479 /// Convenience wrapper over `get` + `api_webdienste_endpoint`.
480 ///
481 /// # Errors
482 ///
483 /// Returns [`EngineError::Partner`] on storage failure.
484 async fn api_webdienste_endpoint(
485 &self,
486 tenant_id: TenantId,
487 gln: &MarktpartnerCode,
488 ) -> Result<Option<Box<str>>, EngineError> {
489 Ok(self
490 .get(tenant_id, gln)
491 .await?
492 .and_then(|r| r.api_webdienste_endpoint().map(std::convert::Into::into)))
493 }
494}
495
496// ── Arc<S> blanket impl ───────────────────────────────────────────────────────
497
498impl<S: PartnerStore> PartnerStore for Arc<S> {
499 async fn upsert(&self, tenant_id: TenantId, record: &PartnerRecord) -> Result<(), EngineError> {
500 self.as_ref().upsert(tenant_id, record).await
501 }
502
503 async fn get(
504 &self,
505 tenant_id: TenantId,
506 gln: &MarktpartnerCode,
507 ) -> Result<Option<PartnerRecord>, EngineError> {
508 self.as_ref().get(tenant_id, gln).await
509 }
510
511 async fn remove(&self, tenant_id: TenantId, gln: &MarktpartnerCode) -> Result<(), EngineError> {
512 self.as_ref().remove(tenant_id, gln).await
513 }
514
515 async fn list(&self, tenant_id: TenantId) -> Result<Vec<PartnerRecord>, EngineError> {
516 self.as_ref().list(tenant_id).await
517 }
518}
519
520// ── NoopPartnerStore ──────────────────────────────────────────────────────────
521
522/// A [`PartnerStore`] that never persists anything.
523///
524/// Every `get` returns `None`. Use as the default in deployments that rely
525/// exclusively on static config-based partner lookup (i.e. when
526/// `PartnerDirectory::from_cli_pairs` is sufficient).
527///
528/// ⚠️ **Data loss**: All upserts are silently discarded. PARTIN-derived
529/// updates received at runtime will not be retained across restarts.
530#[derive(Debug, Clone, Copy, Default)]
531pub struct NoopPartnerStore;
532
533impl PartnerStore for NoopPartnerStore {
534 async fn upsert(
535 &self,
536 _tenant_id: TenantId,
537 _record: &PartnerRecord,
538 ) -> Result<(), EngineError> {
539 Ok(())
540 }
541
542 async fn get(
543 &self,
544 _tenant_id: TenantId,
545 _gln: &MarktpartnerCode,
546 ) -> Result<Option<PartnerRecord>, EngineError> {
547 Ok(None)
548 }
549
550 async fn remove(
551 &self,
552 _tenant_id: TenantId,
553 _gln: &MarktpartnerCode,
554 ) -> Result<(), EngineError> {
555 Ok(())
556 }
557
558 async fn list(&self, _tenant_id: TenantId) -> Result<Vec<PartnerRecord>, EngineError> {
559 Ok(vec![])
560 }
561}
562
563// ── InMemoryPartnerStore ──────────────────────────────────────────────────────
564
565/// An in-memory [`PartnerStore`] for tests and development.
566///
567/// Backed by a `HashMap<(TenantId, MarktpartnerCode), PartnerRecord>` protected by an
568/// `Arc<RwLock<…>>`. Clones share the underlying data — all clones see the
569/// same records. Upsert calls `merge_from_partin` for existing records.
570///
571/// Only available in `#[cfg(test)]` or with the `testing` feature enabled.
572#[cfg(any(test, feature = "testing"))]
573#[derive(Debug, Clone, Default)]
574pub struct InMemoryPartnerStore {
575 inner: Arc<RwLock<HashMap<(TenantId, MarktpartnerCode), PartnerRecord>>>,
576}
577
578#[cfg(any(test, feature = "testing"))]
579impl InMemoryPartnerStore {
580 /// Create a new empty store.
581 #[must_use]
582 pub fn new() -> Self {
583 Self::default()
584 }
585}
586
587#[cfg(any(test, feature = "testing"))]
588impl PartnerStore for InMemoryPartnerStore {
589 async fn upsert(&self, tenant_id: TenantId, record: &PartnerRecord) -> Result<(), EngineError> {
590 let mut guard = self.inner.write().await;
591 let key = (tenant_id, record.gln.clone());
592 match guard.get_mut(&key) {
593 Some(existing) => existing.merge_from_partin(record.clone()),
594 None => {
595 guard.insert(key, record.clone());
596 }
597 }
598 Ok(())
599 }
600
601 async fn get(
602 &self,
603 tenant_id: TenantId,
604 gln: &MarktpartnerCode,
605 ) -> Result<Option<PartnerRecord>, EngineError> {
606 Ok(self
607 .inner
608 .read()
609 .await
610 .get(&(tenant_id, gln.clone()))
611 .cloned())
612 }
613
614 async fn remove(&self, tenant_id: TenantId, gln: &MarktpartnerCode) -> Result<(), EngineError> {
615 self.inner.write().await.remove(&(tenant_id, gln.clone()));
616 Ok(())
617 }
618
619 async fn list(&self, tenant_id: TenantId) -> Result<Vec<PartnerRecord>, EngineError> {
620 Ok(self
621 .inner
622 .read()
623 .await
624 .iter()
625 .filter(|((tid, _), _)| *tid == tenant_id)
626 .map(|(_, record)| record.clone())
627 .collect())
628 }
629}
630
631// ── Tests ─────────────────────────────────────────────────────────────────────
632
633#[cfg(test)]
634mod tests {
635 use super::*;
636
637 fn gln(s: &str) -> MarktpartnerCode {
638 MarktpartnerCode::new(s)
639 }
640 fn tid() -> TenantId {
641 TenantId::new()
642 }
643
644 fn minimal_record(gln_str: &str, url: &str) -> PartnerRecord {
645 PartnerRecord::minimal(gln(gln_str), url)
646 }
647
648 // ── from_cli_pairs ────────────────────────────────────────────────────────
649
650 #[test]
651 fn from_cli_pairs_parses_valid_entries() {
652 let pairs = vec![
653 "9900000000002=https://partner-a.example/as4/inbox",
654 "9900000000003=https://partner-b.example/as4/inbox",
655 ];
656 let records = PartnerRecord::from_cli_pairs(&pairs).unwrap();
657 assert_eq!(records.len(), 2);
658 assert_eq!(records[0].gln.as_str(), "9900000000002");
659 assert_eq!(
660 records[0].as4_endpoint(),
661 Some("https://partner-a.example/as4/inbox")
662 );
663 assert_eq!(records[1].gln.as_str(), "9900000000003");
664 }
665
666 #[test]
667 fn from_cli_pairs_rejects_missing_equals() {
668 let pairs = vec!["9900000000002https://no-equals.example"];
669 assert!(PartnerRecord::from_cli_pairs(&pairs).is_err());
670 }
671
672 #[test]
673 fn from_cli_pairs_rejects_http_url() {
674 let pairs = vec!["9900000000002=http://insecure.example/as4"];
675 assert!(PartnerRecord::from_cli_pairs(&pairs).is_err());
676 }
677
678 #[test]
679 fn from_cli_pairs_rejects_empty_gln() {
680 let pairs = vec!["=https://no-gln.example/as4"];
681 assert!(PartnerRecord::from_cli_pairs(&pairs).is_err());
682 }
683
684 // ── as4_endpoint ──────────────────────────────────────────────────────────
685
686 #[test]
687 fn as4_endpoint_returns_ak_channel() {
688 let r = minimal_record("9900000000002", "https://a.example/as4");
689 assert_eq!(r.as4_endpoint(), Some("https://a.example/as4"));
690 }
691
692 #[test]
693 fn as4_endpoint_returns_none_when_absent() {
694 let r = PartnerRecord {
695 gln: gln("9900000000002"),
696 display_name: None,
697 channels: vec![CommunicationChannel::email("info@example.de")],
698 roles: vec![],
699 valid_from: None,
700 contacts: vec![],
701 country_code: None,
702 updated_at: OffsetDateTime::now_utc(),
703 };
704 assert!(r.as4_endpoint().is_none());
705 }
706
707 // ── merge_from_partin ─────────────────────────────────────────────────────
708
709 #[test]
710 fn merge_overwrites_config_record_with_partin_data() {
711 let mut base = minimal_record("9900000000002", "https://old.example/as4");
712 let newer = PartnerRecord {
713 gln: gln("9900000000002"),
714 display_name: Some("Stadtwerke AG".into()),
715 channels: vec![
716 CommunicationChannel::as4("https://new.example/as4"),
717 CommunicationChannel::email("edifact@sw.example"),
718 ],
719 roles: vec![MarketRole::NbStrom],
720 valid_from: Some(OffsetDateTime::now_utc()),
721 contacts: vec![],
722 country_code: Some("DE".into()),
723 updated_at: OffsetDateTime::now_utc(),
724 };
725 base.merge_from_partin(newer.clone());
726 assert_eq!(base.as4_endpoint(), Some("https://new.example/as4"));
727 assert_eq!(base.display_name.as_deref(), Some("Stadtwerke AG"));
728 assert_eq!(base.roles, vec![MarketRole::NbStrom]);
729 }
730
731 #[test]
732 fn merge_ignores_older_partin() {
733 use time::Duration;
734 let old_ts = OffsetDateTime::now_utc() - Duration::days(30);
735 let new_ts = OffsetDateTime::now_utc();
736
737 let mut current = PartnerRecord {
738 gln: gln("9900000000002"),
739 display_name: Some("Current Name".into()),
740 channels: vec![CommunicationChannel::as4("https://current.example/as4")],
741 roles: vec![MarketRole::NbStrom],
742 valid_from: Some(new_ts),
743 contacts: vec![],
744 country_code: Some("DE".into()),
745 updated_at: OffsetDateTime::now_utc(),
746 };
747
748 let stale = PartnerRecord {
749 gln: gln("9900000000002"),
750 display_name: Some("Stale Name".into()),
751 channels: vec![CommunicationChannel::as4("https://stale.example/as4")],
752 roles: vec![],
753 valid_from: Some(old_ts),
754 contacts: vec![],
755 country_code: None,
756 updated_at: OffsetDateTime::now_utc(),
757 };
758
759 current.merge_from_partin(stale);
760 // Should not be overwritten
761 assert_eq!(current.display_name.as_deref(), Some("Current Name"));
762 assert_eq!(current.as4_endpoint(), Some("https://current.example/as4"));
763 }
764
765 #[test]
766 fn merge_ignores_wrong_gln() {
767 let mut r = minimal_record("9900000000002", "https://a.example/as4");
768 let other = minimal_record("9900000000003", "https://b.example/as4");
769 r.merge_from_partin(other);
770 assert_eq!(r.as4_endpoint(), Some("https://a.example/as4"));
771 }
772
773 // ── MarketRole::from_pid ──────────────────────────────────────────────────
774
775 #[test]
776 fn market_role_from_pid_covers_all_partin_pids() {
777 for pid in [
778 37000u32, 37001, 37002, 37003, 37004, 37005, 37006, 37008, 37009, 37010, 37011, 37012,
779 37013, 37014,
780 ] {
781 assert!(
782 MarketRole::from_pid(pid).is_some(),
783 "MarketRole::from_pid({pid}) should return Some"
784 );
785 }
786 // PID 37007 is not in the AHB (gap)
787 assert!(MarketRole::from_pid(37007).is_none());
788 assert!(MarketRole::from_pid(0).is_none());
789 }
790
791 // ── InMemoryPartnerStore ──────────────────────────────────────────────────
792
793 #[tokio::test]
794 async fn in_memory_upsert_and_get() {
795 let store = InMemoryPartnerStore::new();
796 let tenant = tid();
797 let record = minimal_record("9900000000001", "https://a.example/as4");
798
799 store.upsert(tenant, &record).await.unwrap();
800 let found = store
801 .get(tenant, &gln("9900000000001"))
802 .await
803 .unwrap()
804 .unwrap();
805 assert_eq!(found.as4_endpoint(), Some("https://a.example/as4"));
806 }
807
808 #[tokio::test]
809 async fn in_memory_get_returns_none_for_unknown() {
810 let store = InMemoryPartnerStore::new();
811 assert!(
812 store
813 .get(tid(), &gln("9900000000099"))
814 .await
815 .unwrap()
816 .is_none()
817 );
818 }
819
820 #[tokio::test]
821 async fn in_memory_upsert_merges_into_existing() {
822 let store = InMemoryPartnerStore::new();
823 let tenant = tid();
824 let base = minimal_record("9900000000001", "https://old.example/as4");
825 store.upsert(tenant, &base).await.unwrap();
826
827 let newer = PartnerRecord {
828 gln: gln("9900000000001"),
829 display_name: Some("Partner AG".into()),
830 channels: vec![CommunicationChannel::as4("https://new.example/as4")],
831 roles: vec![MarketRole::LfStrom],
832 valid_from: Some(OffsetDateTime::now_utc()),
833 contacts: vec![],
834 country_code: Some("DE".into()),
835 updated_at: OffsetDateTime::now_utc(),
836 };
837 store.upsert(tenant, &newer).await.unwrap();
838
839 let found = store
840 .get(tenant, &gln("9900000000001"))
841 .await
842 .unwrap()
843 .unwrap();
844 assert_eq!(found.as4_endpoint(), Some("https://new.example/as4"));
845 assert_eq!(found.display_name.as_deref(), Some("Partner AG"));
846 }
847
848 #[tokio::test]
849 async fn in_memory_remove_clears_record() {
850 let store = InMemoryPartnerStore::new();
851 let tenant = tid();
852 let record = minimal_record("9900000000001", "https://a.example/as4");
853
854 store.upsert(tenant, &record).await.unwrap();
855 store.remove(tenant, &gln("9900000000001")).await.unwrap();
856 assert!(
857 store
858 .get(tenant, &gln("9900000000001"))
859 .await
860 .unwrap()
861 .is_none()
862 );
863 }
864
865 #[tokio::test]
866 async fn in_memory_list_is_tenant_scoped() {
867 let store = InMemoryPartnerStore::new();
868 let t1 = tid();
869 let t2 = tid();
870
871 store
872 .upsert(
873 t1,
874 &minimal_record("9900000000001", "https://a.example/as4"),
875 )
876 .await
877 .unwrap();
878 store
879 .upsert(
880 t2,
881 &minimal_record("9900000000002", "https://b.example/as4"),
882 )
883 .await
884 .unwrap();
885
886 let t1_list = store.list(t1).await.unwrap();
887 assert_eq!(t1_list.len(), 1);
888 assert_eq!(t1_list[0].gln.as_str(), "9900000000001");
889
890 let t2_list = store.list(t2).await.unwrap();
891 assert_eq!(t2_list.len(), 1);
892 assert_eq!(t2_list[0].gln.as_str(), "9900000000002");
893 }
894
895 #[tokio::test]
896 async fn as4_endpoint_convenience_method() {
897 let store = InMemoryPartnerStore::new();
898 let tenant = tid();
899 let record = minimal_record("9900000000001", "https://a.example/as4");
900
901 store.upsert(tenant, &record).await.unwrap();
902 let url = store
903 .as4_endpoint(tenant, &gln("9900000000001"))
904 .await
905 .unwrap();
906 assert_eq!(url.as_deref(), Some("https://a.example/as4"));
907
908 let none = store
909 .as4_endpoint(tenant, &gln("9900000000099"))
910 .await
911 .unwrap();
912 assert!(none.is_none());
913 }
914}