Skip to main content

mako_engine/
partner.rs

1//! Trading-partner master data — the [`PartnerStore`] trait and supporting
2//! types.
3//!
4//! # Why not just a `HashMap<GLN, URL>` in config?
5//!
6//! The `partners = ["GLN=URL", …]` field in `makod.toml` works for
7//! development but falls short in production:
8//!
9//! | Requirement | Config-only | `PartnerStore` |
10//! |---|---|---|
11//! | Survives restarts without re-deployment | ❌ | ✅ |
12//! | Carries PARTIN-derived metadata (validity, contacts, bank) | ❌ | ✅ |
13//! | Updatable from inbound PARTIN messages at runtime | ❌ | ✅ |
14//! | Tenant-scoped isolation | ❌ | ✅ |
15//! | Multiple communication channels per partner | ❌ | ✅ |
16//! | Validity windows (Gültig Ab) for future-dated updates | ❌ | ✅ |
17//!
18//! # PARTIN data model
19//!
20//! The German energy market uses EDIFACT **PARTIN** messages (PIDs 37000–37014)
21//! to distribute market-participant master data. Each PARTIN carries:
22//!
23//! - `NAD` → GLN, company name, country code
24//! - `COM` → communication channels: AS4 endpoint URL, email, fax (up to 5)
25//! - `CCI/CAV` → availability windows (*Erreichbarkeit*)
26//! - `FII` → bank account (IBAN, BIC)
27//! - `RFF` → tax number, VAT ID
28//! - `CTA/NAD` → contact persons (*Ansprechpartner*)
29//! - `DTM` → valid-from date (*Gültig Ab*)
30//! - `CCI` → associated Bilanzkreis
31//!
32//! [`PartnerRecord`] captures all of these fields in a form that is both
33//! serializable to SlateDB and constructible from static config.
34//!
35//! # Bootstrap pattern
36//!
37//! ```rust,ignore
38//! // At startup — seed from makod.toml `[as4] partners` list:
39//! for record in PartnerRecord::from_cli_pairs(&config.as4.partners)? {
40//!     store.upsert(tenant_id, &record).await?;
41//! }
42//!
43//! // Later — update from inbound PARTIN message:
44//! let record = parse_partin_37001(&edifact_interchange)?;
45//! store.upsert(tenant_id, &record).await?;
46//!
47//! // Outbound AS4 dispatch:
48//! let partner = store.get(tenant_id, &gln).await?
49//!     .ok_or(EngineError::partner(format!("no endpoint for {gln}")))?;
50//! let endpoint = partner.as4_endpoint
51//!     .ok_or(EngineError::partner(format!("{gln} has no AS4 endpoint")))?;
52//! ```
53//!
54//! # Key schema (SlateDB)
55//!
56//! `pt/{tenant_id}/{gln}` → `JSON(PartnerRecord)`
57//!
58//! Both `TenantId` and GLN are fixed-width strings, giving a
59//! `pt/{36-chars}/{13-chars}` prefix that bounds efficient per-tenant scans.
60
61use std::sync::Arc;
62
63#[cfg(any(test, feature = "testing"))]
64use std::collections::HashMap;
65#[cfg(any(test, feature = "testing"))]
66use tokio::sync::RwLock;
67
68use serde::{Deserialize, Serialize};
69use time::OffsetDateTime;
70
71use crate::{error::EngineError, ids::TenantId, types::MarktpartnerCode};
72
73// ── CommunicationChannel ──────────────────────────────────────────────────────
74
75/// A single communication channel extracted from a PARTIN `COM` segment.
76///
77/// PARTIN allows up to 5 `COM` segments per party. The `qualifier` uses the
78/// UN/EDIFACT DE 3155 code list:
79///
80/// | Qualifier | Meaning |
81/// |---|---|
82/// | `EM` | Electronic mail (primary) |
83/// | `AK` | Electronic mail (alternative) |
84/// | `TE` | Telephone |
85/// | `FX` | Fax |
86/// | `AS4` | BDEW AS4 endpoint URL (non-standard extension) |
87/// | `AW` | BDEW API-Webdienste Strom endpoint URL (Verzeichnisdienst-discovered) |
88///
89/// > **Note**: BDEW uses qualifier `AK` for the AS4 endpoint URL in PARTIN
90/// > AHB 1.0f. The `AS4` literal is used here as an explicit semantic label
91/// > for channels that have already been identified as AS4 endpoints.
92/// >
93/// > `AW` is a project-internal qualifier used to store the API-Webdienste
94/// > Strom base URL discovered from the BDEW Verzeichnisdienst.
95#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
96pub struct CommunicationChannel {
97    /// DE 3155 communication qualifier (`EM`, `TE`, `FX`, `AK`, …).
98    pub qualifier: Box<str>,
99    /// The communication address (URL, email address, phone number).
100    pub address: Box<str>,
101}
102
103impl CommunicationChannel {
104    /// Construct a new channel.
105    #[must_use]
106    pub fn new(qualifier: impl Into<Box<str>>, address: impl Into<Box<str>>) -> Self {
107        Self {
108            qualifier: qualifier.into(),
109            address: address.into(),
110        }
111    }
112
113    /// Convenience: construct an AS4 endpoint channel.
114    ///
115    /// Uses qualifier `"AK"` per PARTIN AHB 1.0f DE 3155 convention.
116    #[must_use]
117    pub fn as4(endpoint_url: impl Into<Box<str>>) -> Self {
118        Self::new("AK", endpoint_url)
119    }
120
121    /// Convenience: construct an email channel.
122    #[must_use]
123    pub fn email(address: impl Into<Box<str>>) -> Self {
124        Self::new("EM", address)
125    }
126
127    /// Convenience: construct an API-Webdienste Strom endpoint channel.
128    ///
129    /// Uses qualifier `"AW"` (project-internal) to store the base URL
130    /// discovered from the BDEW Verzeichnisdienst for a given partner.
131    #[must_use]
132    pub fn api_webdienste(base_url: impl Into<Box<str>>) -> Self {
133        Self::new("AW", base_url)
134    }
135}
136
137// ── ContactPerson ─────────────────────────────────────────────────────────────
138
139/// A contact person extracted from a PARTIN `CTA`/`NAD`/`COM` group.
140///
141/// Corresponds to the *Ansprechpartner* group in PARTIN AHB 1.0f.
142#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
143pub struct ContactPerson {
144    /// Full name or department name.
145    pub name: Box<str>,
146    /// Contact channels (phone, email, …).
147    pub channels: Vec<CommunicationChannel>,
148}
149
150// ── MarketRole ────────────────────────────────────────────────────────────────
151
152/// The market role of a trading partner as declared in their PARTIN message.
153///
154/// Matches BDEW PARTIN Prüfidentifikator prefixes:
155///
156/// | PID | Role |
157/// |---|---|
158/// | 37000 | Lieferant Strom (`LfStrom`) |
159/// | 37001 | Netzbetreiber Strom (`NbStrom`) |
160/// | 37002 | Messstellenbetreiber Strom (`MsbStrom`) |
161/// | 37003 | Bilanzkreisverantwortlicher Strom (`Bkv`) |
162/// | 37004 | Bilanzkoordinator Strom (`Biko`) |
163/// | 37005 | Übertragungsnetzbetreiber Strom (`Uenb`) |
164/// | 37006 | Energiedienstleister/Serviceanbieter Strom (`Esa`) |
165/// | 37008 | Lieferant Gas (`LfGas`) |
166/// | 37009 | Netzbetreiber Gas (`NbGas`) |
167/// | 37010 | Messstellenbetreiber Gas (`MsbGas`) |
168/// | 37011 | Marktgebietsverantwortlicher Gas (`Mgv`) |
169/// | 37012–37014 | Cross-commodity roles |
170#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
171#[non_exhaustive]
172pub enum MarketRole {
173    /// Lieferant Strom (PID 37000)
174    LfStrom,
175    /// Netzbetreiber Strom (PID 37001)
176    NbStrom,
177    /// Messstellenbetreiber Strom (PID 37002)
178    MsbStrom,
179    /// Bilanzkreisverantwortlicher Strom (PID 37003)
180    Bkv,
181    /// Bilanzkoordinator Strom (PID 37004)
182    Biko,
183    /// Übertragungsnetzbetreiber Strom (PID 37005)
184    Uenb,
185    /// Energiedienstleister / Serviceanbieter Strom (PID 37006)
186    Esa,
187    /// Lieferant Gas (PID 37008)
188    LfGas,
189    /// Netzbetreiber Gas (PID 37009)
190    NbGas,
191    /// Messstellenbetreiber Gas (PID 37010)
192    MsbGas,
193    /// Marktgebietsverantwortlicher Gas (PID 37011)
194    Mgv,
195    /// Cross-commodity (PIDs 37012–37014)
196    CrossCommodity,
197}
198
199impl MarketRole {
200    /// Map a PARTIN Prüfidentifikator code to the corresponding `MarketRole`.
201    ///
202    /// Returns `None` for unrecognised codes.
203    #[must_use]
204    pub fn from_pid(pid: u32) -> Option<Self> {
205        match pid {
206            37000 => Some(Self::LfStrom),
207            37001 => Some(Self::NbStrom),
208            37002 => Some(Self::MsbStrom),
209            37003 => Some(Self::Bkv),
210            37004 => Some(Self::Biko),
211            37005 => Some(Self::Uenb),
212            37006 => Some(Self::Esa),
213            37008 => Some(Self::LfGas),
214            37009 => Some(Self::NbGas),
215            37010 => Some(Self::MsbGas),
216            37011 => Some(Self::Mgv),
217            37012..=37014 => Some(Self::CrossCommodity),
218            _ => None,
219        }
220    }
221}
222
223// ── PartnerRecord ─────────────────────────────────────────────────────────────
224
225/// Full trading-partner master record as stored in the [`PartnerStore`].
226///
227/// Populated either from static `makod.toml` config (minimal — GLN + AS4 URL
228/// only) or from an inbound PARTIN EDIFACT message (complete). Records from
229/// different sources coexist: a bootstrapped config record is upgraded in-place
230/// when the same partner later sends a PARTIN.
231///
232/// ## Constructors
233///
234/// - [`PartnerRecord::minimal`] — for bootstrapping from `GLN=URL` config pairs
235/// - [`PartnerRecord::from_cli_pairs`] — parse `[as4] partners` list from config
236///
237/// ## Merging
238///
239/// Use [`PartnerRecord::merge_from_partin`] to update an existing record with
240/// fields from a newer inbound PARTIN (respects validity dates).
241#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
242pub struct PartnerRecord {
243    /// The partner's 13-digit Global Location Number.
244    pub gln: MarktpartnerCode,
245
246    /// Company name from the PARTIN `NAD` segment.
247    pub display_name: Option<Box<str>>,
248
249    /// All communication channels from PARTIN `COM` segments.
250    ///
251    /// The AS4 endpoint is the entry with qualifier `"AK"` (PARTIN AHB 1.0f
252    /// DE 3155 convention).  Use [`as4_endpoint`] for direct access.
253    ///
254    /// [`as4_endpoint`]: PartnerRecord::as4_endpoint
255    pub channels: Vec<CommunicationChannel>,
256
257    /// Market roles this partner has declared via PARTIN.
258    pub roles: Vec<MarketRole>,
259
260    /// Date from which this record version is valid (`DTM/137`).
261    ///
262    /// `None` when bootstrapped from static config (no validity date known).
263    #[serde(
264        default,
265        skip_serializing_if = "Option::is_none",
266        with = "time::serde::rfc3339::option"
267    )]
268    pub valid_from: Option<OffsetDateTime>,
269
270    /// Contact persons from the PARTIN *Ansprechpartner* group.
271    pub contacts: Vec<ContactPerson>,
272
273    /// ISO 3166-1 alpha-2 country code from `NAD+MS+++...+DE` (usually `DE`).
274    pub country_code: Option<Box<str>>,
275
276    /// Wall-clock time when this record was last written to the store.
277    #[serde(with = "time::serde::rfc3339")]
278    pub updated_at: OffsetDateTime,
279}
280
281impl PartnerRecord {
282    /// Create a minimal record from a GLN and an AS4 endpoint URL.
283    ///
284    /// Used when bootstrapping from `[as4] partners = ["GLN=URL", …]` in
285    /// `makod.toml`. The record has no PARTIN-derived metadata — only the
286    /// GLN and a single AS4 channel.
287    #[must_use]
288    pub fn minimal(gln: impl Into<MarktpartnerCode>, as4_url: impl Into<Box<str>>) -> Self {
289        Self {
290            gln: gln.into(),
291            display_name: None,
292            channels: vec![CommunicationChannel::as4(as4_url)],
293            roles: Vec::new(),
294            valid_from: None,
295            contacts: Vec::new(),
296            country_code: None,
297            updated_at: OffsetDateTime::now_utc(),
298        }
299    }
300
301    /// Parse `["GLN=HTTPS-URL", …]` configuration entries into minimal records.
302    ///
303    /// Returns an error on the first malformed or non-HTTPS entry.
304    ///
305    /// # Errors
306    ///
307    /// Returns [`EngineError::Partner`] when an entry lacks `=`, has an empty
308    /// GLN, or uses a non-HTTPS URL.
309    pub fn from_cli_pairs(pairs: &[impl AsRef<str>]) -> Result<Vec<Self>, EngineError> {
310        pairs
311            .iter()
312            .map(|entry| {
313                let pair = entry.as_ref();
314                let (gln, url) = pair.split_once('=').ok_or_else(|| {
315                    EngineError::partner(format!(
316                        "invalid partner entry {pair:?} — expected <GLN>=<HTTPS-URL>"
317                    ))
318                })?;
319                let gln = gln.trim();
320                let url = url.trim();
321                if gln.is_empty() {
322                    return Err(EngineError::partner(format!(
323                        "invalid partner entry {pair:?} — GLN must not be empty"
324                    )));
325                }
326                if !url.starts_with("https://") {
327                    return Err(EngineError::partner(format!(
328                        "invalid partner entry {pair:?} — endpoint URL must use HTTPS (got {url:?})"
329                    )));
330                }
331                Ok(Self::minimal(gln, url))
332            })
333            .collect()
334    }
335
336    /// Return the AS4 endpoint URL if one has been registered.
337    ///
338    /// Looks for a channel with qualifier `"AK"` (PARTIN AHB 1.0f
339    /// convention for the AS4 endpoint). Falls back to `"AS4"` for records
340    /// that were imported with a non-standard qualifier.
341    #[must_use]
342    pub fn as4_endpoint(&self) -> Option<&str> {
343        self.channels
344            .iter()
345            .find(|c| c.qualifier.as_ref() == "AK" || c.qualifier.as_ref() == "AS4")
346            .map(|c| c.address.as_ref())
347    }
348
349    /// Return the primary email address if one has been registered.
350    ///
351    /// Looks for a channel with qualifier `"EM"`.
352    #[must_use]
353    pub fn email(&self) -> Option<&str> {
354        self.channels
355            .iter()
356            .find(|c| c.qualifier.as_ref() == "EM")
357            .map(|c| c.address.as_ref())
358    }
359
360    /// Return the API-Webdienste Strom base URL if one has been registered.
361    ///
362    /// Looks for a channel with qualifier `"AW"`.  This URL is typically
363    /// populated by the Verzeichnisdienst discovery worker and is
364    /// used by `MaloIdentSender` to reach the LF's callback endpoint.
365    #[must_use]
366    pub fn api_webdienste_endpoint(&self) -> Option<&str> {
367        self.channels
368            .iter()
369            .find(|c| c.qualifier.as_ref() == "AW")
370            .map(|c| c.address.as_ref())
371    }
372
373    /// Merge fields from a newer PARTIN-derived record into `self`.
374    ///
375    /// Only updates `self` when `incoming.valid_from` is newer than
376    /// `self.valid_from` (or when `self.valid_from` is `None`). Config-
377    /// bootstrapped records (no `valid_from`) are always overwritten.
378    ///
379    /// The GLN must match — mismatches are silently ignored (the caller is
380    /// responsible for routing PARTIN messages to the correct record).
381    pub fn merge_from_partin(&mut self, incoming: PartnerRecord) {
382        if incoming.gln != self.gln {
383            return;
384        }
385        let should_update = match (self.valid_from, incoming.valid_from) {
386            (None, _) => true,
387            (Some(_), None) => false, // keep the dated record
388            (Some(a), Some(b)) => b >= a,
389        };
390        if !should_update {
391            return;
392        }
393        self.display_name = incoming.display_name.or(self.display_name.take());
394        self.channels = incoming.channels;
395        self.roles = incoming.roles;
396        self.valid_from = incoming.valid_from;
397        self.contacts = incoming.contacts;
398        self.country_code = incoming.country_code.or(self.country_code.take());
399        self.updated_at = incoming.updated_at;
400    }
401}
402
403// ── PartnerStore ──────────────────────────────────────────────────────────────
404
405/// Durable store for trading-partner master records.
406///
407/// Provides tenant-scoped access to [`PartnerRecord`]s. Records are upserted
408/// when a new PARTIN message arrives or when `makod` bootstraps from static
409/// config.
410///
411/// All three operations are idempotent — reinserting the same record is safe.
412///
413/// ## Blanket `Arc` implementation
414///
415/// `Arc<S>` implements `PartnerStore` whenever `S: PartnerStore`.
416#[allow(async_fn_in_trait)]
417pub trait PartnerStore: Send + Sync {
418    /// Insert or update the record for `(tenant_id, record.gln)`.
419    ///
420    /// If a record already exists for this GLN, it is **merged** via
421    /// [`PartnerRecord::merge_from_partin`] — i.e. the newer PARTIN-derived
422    /// record wins, but a config-only bootstrap is always overwritten.
423    ///
424    /// # Errors
425    ///
426    /// Returns [`EngineError::Partner`] on storage failure.
427    async fn upsert(&self, tenant_id: TenantId, record: &PartnerRecord) -> Result<(), EngineError>;
428
429    /// Return the record for `(tenant_id, gln)`, or `None` if not registered.
430    ///
431    /// # Errors
432    ///
433    /// Returns [`EngineError::Partner`] on storage failure.
434    async fn get(
435        &self,
436        tenant_id: TenantId,
437        gln: &MarktpartnerCode,
438    ) -> Result<Option<PartnerRecord>, EngineError>;
439
440    /// Remove the record for `(tenant_id, gln)`.
441    ///
442    /// No-op when the record does not exist.
443    ///
444    /// # Errors
445    ///
446    /// Returns [`EngineError::Partner`] on storage failure.
447    async fn remove(&self, tenant_id: TenantId, gln: &MarktpartnerCode) -> Result<(), EngineError>;
448
449    /// Return all records registered for `tenant_id`.
450    ///
451    /// # Errors
452    ///
453    /// Returns [`EngineError::Partner`] on storage failure.
454    async fn list(&self, tenant_id: TenantId) -> Result<Vec<PartnerRecord>, EngineError>;
455
456    /// Return the AS4 endpoint URL for `gln`, if known.
457    ///
458    /// Convenience wrapper over `get` + `as4_endpoint`.
459    ///
460    /// # Errors
461    ///
462    /// Returns [`EngineError::Partner`] on storage failure.
463    async fn as4_endpoint(
464        &self,
465        tenant_id: TenantId,
466        gln: &MarktpartnerCode,
467    ) -> Result<Option<Box<str>>, EngineError> {
468        Ok(self
469            .get(tenant_id, gln)
470            .await?
471            .and_then(|r| r.as4_endpoint().map(std::convert::Into::into)))
472    }
473
474    /// Return the API-Webdienste Strom base URL for `gln`, if known.
475    ///
476    /// Looks for a channel with qualifier `"AW"` (populated by the
477    /// Verzeichnisdienst discovery path.
478    ///
479    /// Convenience wrapper over `get` + `api_webdienste_endpoint`.
480    ///
481    /// # Errors
482    ///
483    /// Returns [`EngineError::Partner`] on storage failure.
484    async fn api_webdienste_endpoint(
485        &self,
486        tenant_id: TenantId,
487        gln: &MarktpartnerCode,
488    ) -> Result<Option<Box<str>>, EngineError> {
489        Ok(self
490            .get(tenant_id, gln)
491            .await?
492            .and_then(|r| r.api_webdienste_endpoint().map(std::convert::Into::into)))
493    }
494}
495
496// ── Arc<S> blanket impl ───────────────────────────────────────────────────────
497
498impl<S: PartnerStore> PartnerStore for Arc<S> {
499    async fn upsert(&self, tenant_id: TenantId, record: &PartnerRecord) -> Result<(), EngineError> {
500        self.as_ref().upsert(tenant_id, record).await
501    }
502
503    async fn get(
504        &self,
505        tenant_id: TenantId,
506        gln: &MarktpartnerCode,
507    ) -> Result<Option<PartnerRecord>, EngineError> {
508        self.as_ref().get(tenant_id, gln).await
509    }
510
511    async fn remove(&self, tenant_id: TenantId, gln: &MarktpartnerCode) -> Result<(), EngineError> {
512        self.as_ref().remove(tenant_id, gln).await
513    }
514
515    async fn list(&self, tenant_id: TenantId) -> Result<Vec<PartnerRecord>, EngineError> {
516        self.as_ref().list(tenant_id).await
517    }
518}
519
520// ── NoopPartnerStore ──────────────────────────────────────────────────────────
521
522/// A [`PartnerStore`] that never persists anything.
523///
524/// Every `get` returns `None`. Use as the default in deployments that rely
525/// exclusively on static config-based partner lookup (i.e. when
526/// `PartnerDirectory::from_cli_pairs` is sufficient).
527///
528/// ⚠️ **Data loss**: All upserts are silently discarded. PARTIN-derived
529/// updates received at runtime will not be retained across restarts.
530#[derive(Debug, Clone, Copy, Default)]
531pub struct NoopPartnerStore;
532
533impl PartnerStore for NoopPartnerStore {
534    async fn upsert(
535        &self,
536        _tenant_id: TenantId,
537        _record: &PartnerRecord,
538    ) -> Result<(), EngineError> {
539        Ok(())
540    }
541
542    async fn get(
543        &self,
544        _tenant_id: TenantId,
545        _gln: &MarktpartnerCode,
546    ) -> Result<Option<PartnerRecord>, EngineError> {
547        Ok(None)
548    }
549
550    async fn remove(
551        &self,
552        _tenant_id: TenantId,
553        _gln: &MarktpartnerCode,
554    ) -> Result<(), EngineError> {
555        Ok(())
556    }
557
558    async fn list(&self, _tenant_id: TenantId) -> Result<Vec<PartnerRecord>, EngineError> {
559        Ok(vec![])
560    }
561}
562
563// ── InMemoryPartnerStore ──────────────────────────────────────────────────────
564
565/// An in-memory [`PartnerStore`] for tests and development.
566///
567/// Backed by a `HashMap<(TenantId, MarktpartnerCode), PartnerRecord>` protected by an
568/// `Arc<RwLock<…>>`. Clones share the underlying data — all clones see the
569/// same records. Upsert calls `merge_from_partin` for existing records.
570///
571/// Only available in `#[cfg(test)]` or with the `testing` feature enabled.
572#[cfg(any(test, feature = "testing"))]
573#[derive(Debug, Clone, Default)]
574pub struct InMemoryPartnerStore {
575    inner: Arc<RwLock<HashMap<(TenantId, MarktpartnerCode), PartnerRecord>>>,
576}
577
578#[cfg(any(test, feature = "testing"))]
579impl InMemoryPartnerStore {
580    /// Create a new empty store.
581    #[must_use]
582    pub fn new() -> Self {
583        Self::default()
584    }
585}
586
587#[cfg(any(test, feature = "testing"))]
588impl PartnerStore for InMemoryPartnerStore {
589    async fn upsert(&self, tenant_id: TenantId, record: &PartnerRecord) -> Result<(), EngineError> {
590        let mut guard = self.inner.write().await;
591        let key = (tenant_id, record.gln.clone());
592        match guard.get_mut(&key) {
593            Some(existing) => existing.merge_from_partin(record.clone()),
594            None => {
595                guard.insert(key, record.clone());
596            }
597        }
598        Ok(())
599    }
600
601    async fn get(
602        &self,
603        tenant_id: TenantId,
604        gln: &MarktpartnerCode,
605    ) -> Result<Option<PartnerRecord>, EngineError> {
606        Ok(self
607            .inner
608            .read()
609            .await
610            .get(&(tenant_id, gln.clone()))
611            .cloned())
612    }
613
614    async fn remove(&self, tenant_id: TenantId, gln: &MarktpartnerCode) -> Result<(), EngineError> {
615        self.inner.write().await.remove(&(tenant_id, gln.clone()));
616        Ok(())
617    }
618
619    async fn list(&self, tenant_id: TenantId) -> Result<Vec<PartnerRecord>, EngineError> {
620        Ok(self
621            .inner
622            .read()
623            .await
624            .iter()
625            .filter(|((tid, _), _)| *tid == tenant_id)
626            .map(|(_, record)| record.clone())
627            .collect())
628    }
629}
630
631// ── Tests ─────────────────────────────────────────────────────────────────────
632
633#[cfg(test)]
634mod tests {
635    use super::*;
636
637    fn gln(s: &str) -> MarktpartnerCode {
638        MarktpartnerCode::new(s)
639    }
640    fn tid() -> TenantId {
641        TenantId::new()
642    }
643
644    fn minimal_record(gln_str: &str, url: &str) -> PartnerRecord {
645        PartnerRecord::minimal(gln(gln_str), url)
646    }
647
648    // ── from_cli_pairs ────────────────────────────────────────────────────────
649
650    #[test]
651    fn from_cli_pairs_parses_valid_entries() {
652        let pairs = vec![
653            "9900000000002=https://partner-a.example/as4/inbox",
654            "9900000000003=https://partner-b.example/as4/inbox",
655        ];
656        let records = PartnerRecord::from_cli_pairs(&pairs).unwrap();
657        assert_eq!(records.len(), 2);
658        assert_eq!(records[0].gln.as_str(), "9900000000002");
659        assert_eq!(
660            records[0].as4_endpoint(),
661            Some("https://partner-a.example/as4/inbox")
662        );
663        assert_eq!(records[1].gln.as_str(), "9900000000003");
664    }
665
666    #[test]
667    fn from_cli_pairs_rejects_missing_equals() {
668        let pairs = vec!["9900000000002https://no-equals.example"];
669        assert!(PartnerRecord::from_cli_pairs(&pairs).is_err());
670    }
671
672    #[test]
673    fn from_cli_pairs_rejects_http_url() {
674        let pairs = vec!["9900000000002=http://insecure.example/as4"];
675        assert!(PartnerRecord::from_cli_pairs(&pairs).is_err());
676    }
677
678    #[test]
679    fn from_cli_pairs_rejects_empty_gln() {
680        let pairs = vec!["=https://no-gln.example/as4"];
681        assert!(PartnerRecord::from_cli_pairs(&pairs).is_err());
682    }
683
684    // ── as4_endpoint ──────────────────────────────────────────────────────────
685
686    #[test]
687    fn as4_endpoint_returns_ak_channel() {
688        let r = minimal_record("9900000000002", "https://a.example/as4");
689        assert_eq!(r.as4_endpoint(), Some("https://a.example/as4"));
690    }
691
692    #[test]
693    fn as4_endpoint_returns_none_when_absent() {
694        let r = PartnerRecord {
695            gln: gln("9900000000002"),
696            display_name: None,
697            channels: vec![CommunicationChannel::email("info@example.de")],
698            roles: vec![],
699            valid_from: None,
700            contacts: vec![],
701            country_code: None,
702            updated_at: OffsetDateTime::now_utc(),
703        };
704        assert!(r.as4_endpoint().is_none());
705    }
706
707    // ── merge_from_partin ─────────────────────────────────────────────────────
708
709    #[test]
710    fn merge_overwrites_config_record_with_partin_data() {
711        let mut base = minimal_record("9900000000002", "https://old.example/as4");
712        let newer = PartnerRecord {
713            gln: gln("9900000000002"),
714            display_name: Some("Stadtwerke AG".into()),
715            channels: vec![
716                CommunicationChannel::as4("https://new.example/as4"),
717                CommunicationChannel::email("edifact@sw.example"),
718            ],
719            roles: vec![MarketRole::NbStrom],
720            valid_from: Some(OffsetDateTime::now_utc()),
721            contacts: vec![],
722            country_code: Some("DE".into()),
723            updated_at: OffsetDateTime::now_utc(),
724        };
725        base.merge_from_partin(newer.clone());
726        assert_eq!(base.as4_endpoint(), Some("https://new.example/as4"));
727        assert_eq!(base.display_name.as_deref(), Some("Stadtwerke AG"));
728        assert_eq!(base.roles, vec![MarketRole::NbStrom]);
729    }
730
731    #[test]
732    fn merge_ignores_older_partin() {
733        use time::Duration;
734        let old_ts = OffsetDateTime::now_utc() - Duration::days(30);
735        let new_ts = OffsetDateTime::now_utc();
736
737        let mut current = PartnerRecord {
738            gln: gln("9900000000002"),
739            display_name: Some("Current Name".into()),
740            channels: vec![CommunicationChannel::as4("https://current.example/as4")],
741            roles: vec![MarketRole::NbStrom],
742            valid_from: Some(new_ts),
743            contacts: vec![],
744            country_code: Some("DE".into()),
745            updated_at: OffsetDateTime::now_utc(),
746        };
747
748        let stale = PartnerRecord {
749            gln: gln("9900000000002"),
750            display_name: Some("Stale Name".into()),
751            channels: vec![CommunicationChannel::as4("https://stale.example/as4")],
752            roles: vec![],
753            valid_from: Some(old_ts),
754            contacts: vec![],
755            country_code: None,
756            updated_at: OffsetDateTime::now_utc(),
757        };
758
759        current.merge_from_partin(stale);
760        // Should not be overwritten
761        assert_eq!(current.display_name.as_deref(), Some("Current Name"));
762        assert_eq!(current.as4_endpoint(), Some("https://current.example/as4"));
763    }
764
765    #[test]
766    fn merge_ignores_wrong_gln() {
767        let mut r = minimal_record("9900000000002", "https://a.example/as4");
768        let other = minimal_record("9900000000003", "https://b.example/as4");
769        r.merge_from_partin(other);
770        assert_eq!(r.as4_endpoint(), Some("https://a.example/as4"));
771    }
772
773    // ── MarketRole::from_pid ──────────────────────────────────────────────────
774
775    #[test]
776    fn market_role_from_pid_covers_all_partin_pids() {
777        for pid in [
778            37000u32, 37001, 37002, 37003, 37004, 37005, 37006, 37008, 37009, 37010, 37011, 37012,
779            37013, 37014,
780        ] {
781            assert!(
782                MarketRole::from_pid(pid).is_some(),
783                "MarketRole::from_pid({pid}) should return Some"
784            );
785        }
786        // PID 37007 is not in the AHB (gap)
787        assert!(MarketRole::from_pid(37007).is_none());
788        assert!(MarketRole::from_pid(0).is_none());
789    }
790
791    // ── InMemoryPartnerStore ──────────────────────────────────────────────────
792
793    #[tokio::test]
794    async fn in_memory_upsert_and_get() {
795        let store = InMemoryPartnerStore::new();
796        let tenant = tid();
797        let record = minimal_record("9900000000001", "https://a.example/as4");
798
799        store.upsert(tenant, &record).await.unwrap();
800        let found = store
801            .get(tenant, &gln("9900000000001"))
802            .await
803            .unwrap()
804            .unwrap();
805        assert_eq!(found.as4_endpoint(), Some("https://a.example/as4"));
806    }
807
808    #[tokio::test]
809    async fn in_memory_get_returns_none_for_unknown() {
810        let store = InMemoryPartnerStore::new();
811        assert!(
812            store
813                .get(tid(), &gln("9900000000099"))
814                .await
815                .unwrap()
816                .is_none()
817        );
818    }
819
820    #[tokio::test]
821    async fn in_memory_upsert_merges_into_existing() {
822        let store = InMemoryPartnerStore::new();
823        let tenant = tid();
824        let base = minimal_record("9900000000001", "https://old.example/as4");
825        store.upsert(tenant, &base).await.unwrap();
826
827        let newer = PartnerRecord {
828            gln: gln("9900000000001"),
829            display_name: Some("Partner AG".into()),
830            channels: vec![CommunicationChannel::as4("https://new.example/as4")],
831            roles: vec![MarketRole::LfStrom],
832            valid_from: Some(OffsetDateTime::now_utc()),
833            contacts: vec![],
834            country_code: Some("DE".into()),
835            updated_at: OffsetDateTime::now_utc(),
836        };
837        store.upsert(tenant, &newer).await.unwrap();
838
839        let found = store
840            .get(tenant, &gln("9900000000001"))
841            .await
842            .unwrap()
843            .unwrap();
844        assert_eq!(found.as4_endpoint(), Some("https://new.example/as4"));
845        assert_eq!(found.display_name.as_deref(), Some("Partner AG"));
846    }
847
848    #[tokio::test]
849    async fn in_memory_remove_clears_record() {
850        let store = InMemoryPartnerStore::new();
851        let tenant = tid();
852        let record = minimal_record("9900000000001", "https://a.example/as4");
853
854        store.upsert(tenant, &record).await.unwrap();
855        store.remove(tenant, &gln("9900000000001")).await.unwrap();
856        assert!(
857            store
858                .get(tenant, &gln("9900000000001"))
859                .await
860                .unwrap()
861                .is_none()
862        );
863    }
864
865    #[tokio::test]
866    async fn in_memory_list_is_tenant_scoped() {
867        let store = InMemoryPartnerStore::new();
868        let t1 = tid();
869        let t2 = tid();
870
871        store
872            .upsert(
873                t1,
874                &minimal_record("9900000000001", "https://a.example/as4"),
875            )
876            .await
877            .unwrap();
878        store
879            .upsert(
880                t2,
881                &minimal_record("9900000000002", "https://b.example/as4"),
882            )
883            .await
884            .unwrap();
885
886        let t1_list = store.list(t1).await.unwrap();
887        assert_eq!(t1_list.len(), 1);
888        assert_eq!(t1_list[0].gln.as_str(), "9900000000001");
889
890        let t2_list = store.list(t2).await.unwrap();
891        assert_eq!(t2_list.len(), 1);
892        assert_eq!(t2_list[0].gln.as_str(), "9900000000002");
893    }
894
895    #[tokio::test]
896    async fn as4_endpoint_convenience_method() {
897        let store = InMemoryPartnerStore::new();
898        let tenant = tid();
899        let record = minimal_record("9900000000001", "https://a.example/as4");
900
901        store.upsert(tenant, &record).await.unwrap();
902        let url = store
903            .as4_endpoint(tenant, &gln("9900000000001"))
904            .await
905            .unwrap();
906        assert_eq!(url.as_deref(), Some("https://a.example/as4"));
907
908        let none = store
909            .as4_endpoint(tenant, &gln("9900000000099"))
910            .await
911            .unwrap();
912        assert!(none.is_none());
913    }
914}