Skip to main content

acdp_client/
cross_registry.rs

1//! Cross-registry resolution per RFC-ACDP-0006 (feature = "client").
2//!
3//! Resolves a `ctx_id` whose authority differs from the registry the
4//! consumer is currently talking to. Walks the lineage of `derived_from`
5//! references with cycle detection, configurable depth / node / fanout
6//! caps, and per-authority caching of the `RegistryClient` and
7//! capabilities document.
8//!
9//! See RFC-ACDP-0006 §4.1 for the seven-step algorithm:
10//!   1. Parse URI → authority
11//!   2. Fetch the foreign registry's capabilities
12//!   3. Verify the registry DID matches `did:web:<authority>`
13//!   4. Retrieve the full context
14//!   5. Verify content_hash
15//!   6. Verify signature via DID resolution
16//!   7. Walk `derived_from` references (with cycle/depth/node/fanout/timeout limits)
17
18use std::collections::{HashMap, HashSet, VecDeque};
19use std::sync::Mutex;
20use std::time::{Duration, Instant};
21
22use crate::{ReceiptPolicy, RegistryClient, VerificationPolicy, VerifiedContext};
23use acdp_did::WebResolver;
24use acdp_primitives::error::AcdpError;
25use acdp_safe_http::SsrfPolicy;
26use acdp_types::body::Body;
27use acdp_types::primitives::CtxId;
28use acdp_types::CapabilitiesDocument;
29
30/// Per-walk and per-resolve safety options.
31///
32/// Defaults are tuned for RFC-ACDP-0006 §7.4 / §7.5 — they bound a walk
33/// even when the producer fabricates `derived_from` lists pointing into a
34/// foreign registry's pathological lineage graph.
35#[derive(Debug, Clone)]
36pub struct ResolverOptions {
37    /// Per-edge maximum depth (default 10).
38    pub max_depth: usize,
39    /// Total number of contexts the walk may verify (default 100). Acts
40    /// as a hard ceiling even when individual hops respect `max_depth`.
41    pub max_nodes: usize,
42    /// Maximum `derived_from` count permitted on any single context the
43    /// walker visits (default 32). A context that lists more parents is
44    /// either malformed or hostile — short-circuit before fanning out.
45    pub max_fanout: usize,
46    /// Wall-clock budget for the entire walk (default 30 s). Wraps
47    /// [`CrossRegistryResolver::walk_derived_from`] in `tokio::time::timeout`.
48    pub total_timeout: Duration,
49    /// How long to cache a foreign registry's capabilities document
50    /// before re-fetching (default 5 min). Avoids hammering the foreign
51    /// `/.well-known/acdp.json` on every hop.
52    pub capabilities_ttl: Duration,
53}
54
55impl Default for ResolverOptions {
56    fn default() -> Self {
57        Self {
58            max_depth: 10,
59            max_nodes: 100,
60            max_fanout: 32,
61            total_timeout: Duration::from_secs(30),
62            capabilities_ttl: Duration::from_secs(300),
63        }
64    }
65}
66
67/// Resolver for cross-registry references.
68///
69/// Holds a [`WebResolver`] for DID lookups and caches a [`RegistryClient`]
70/// + capabilities document per authority for the lifetime of the resolver.
71///
72/// The [`SsrfPolicy`] is consulted on every URL the resolver constructs
73/// (RFC-ACDP-0006 §7.1, §7.2).
74pub struct CrossRegistryResolver {
75    did_resolver: WebResolver,
76    options: ResolverOptions,
77    allowlist: Option<HashSet<String>>,
78    ssrf_policy: SsrfPolicy,
79    // Per-authority caches. Mutex-guarded for interior mutability across
80    // the immutable `&self` API surface; contention is low since
81    // authorities are few per walk.
82    client_cache: Mutex<HashMap<String, RegistryClient>>,
83    /// Per-authority capabilities cache. The `Duration` is the
84    /// per-response TTL parsed from `Cache-Control: max-age=N` (capped
85    /// at 3600s per RFC-ACDP-0006 §4.2). Replaces an earlier shape
86    /// that used the resolver-wide `capabilities_ttl` for every entry,
87    /// ignoring the registry's own cache hint (BUG-09).
88    caps_cache: Mutex<HashMap<String, (CapabilitiesDocument, Instant, Duration)>>,
89}
90
91impl Default for CrossRegistryResolver {
92    fn default() -> Self {
93        Self::new()
94    }
95}
96
97impl CrossRegistryResolver {
98    /// Build a resolver with default settings: no allowlist, depth 10,
99    /// HTTPS-only / no IP literals SSRF policy.
100    pub fn new() -> Self {
101        Self {
102            did_resolver: WebResolver::new(),
103            options: ResolverOptions::default(),
104            allowlist: None,
105            ssrf_policy: SsrfPolicy::default(),
106            client_cache: Mutex::new(HashMap::new()),
107            caps_cache: Mutex::new(HashMap::new()),
108        }
109    }
110
111    /// Override the [`SsrfPolicy`] applied to outbound URLs.
112    ///
113    /// Useful for test environments that need to allow `http://` or
114    /// IP-literal hosts. Production deployments SHOULD keep the default.
115    pub fn with_ssrf_policy(mut self, policy: SsrfPolicy) -> Self {
116        self.ssrf_policy = policy;
117        self
118    }
119
120    /// Cap the number of `derived_from` hops walked in a single
121    /// [`Self::walk_derived_from`] call.
122    pub fn with_max_depth(mut self, depth: usize) -> Self {
123        self.options.max_depth = depth;
124        self
125    }
126
127    /// Replace the complete options struct (overrides every individual
128    /// `with_*` setter that wasn't already applied).
129    pub fn with_options(mut self, options: ResolverOptions) -> Self {
130        self.options = options;
131        self
132    }
133
134    /// Borrow the active options. Useful for tests + telemetry.
135    pub fn options(&self) -> &ResolverOptions {
136        &self.options
137    }
138
139    /// Override the [`WebResolver`] used for DID document lookups.
140    ///
141    /// Primary use is supplying a `WebResolver::with_root_cert_pem`
142    /// instance in tests so a self-signed mock can answer DID-document
143    /// requests for `did:web:localhost%3A<port>`. Production callers do
144    /// not need this — the default resolver trusts the system CA bundle.
145    pub fn with_did_resolver(mut self, resolver: WebResolver) -> Self {
146        self.did_resolver = resolver;
147        self
148    }
149
150    /// Pre-populate the per-authority [`RegistryClient`] cache.
151    ///
152    /// Primary use is the conformance harness: tests supply a client
153    /// whose HTTP layer trusts the in-process TLS server's self-signed
154    /// root certificate (via [`RegistryClient::with_root_cert_pem`]), so
155    /// the resolver hits the mock instead of attempting a real network
156    /// call. The seeded client wins over the lazy
157    /// `RegistryClient::new_pinned` constructor that [`Self::resolve`]
158    /// would otherwise invoke on first access.
159    pub fn seed_client(&self, authority: impl Into<String>, client: RegistryClient) {
160        self.client_cache
161            .lock()
162            .unwrap()
163            .insert(authority.into(), client);
164    }
165
166    /// Restrict cross-registry resolution to a fixed set of authorities
167    /// (lowercase DNS hostnames). When set, any reference outside the
168    /// allowlist is rejected with [`AcdpError::CrossRegistryResolutionFailed`].
169    pub fn with_allowlist<I, S>(mut self, authorities: I) -> Self
170    where
171        I: IntoIterator<Item = S>,
172        S: Into<String>,
173    {
174        self.allowlist = Some(authorities.into_iter().map(Into::into).collect());
175        self
176    }
177
178    /// Resolve a single cross-registry [`CtxId`] end-to-end.
179    ///
180    /// Steps 1–6 of RFC-ACDP-0006 §4.1: parse, fetch capabilities,
181    /// verify the registry DID *and* its DID document's web binding,
182    /// retrieve, recompute hash, verify signature. The [`SsrfPolicy`]
183    /// is checked first so a hostile authority cannot drive an
184    /// internal-network request.
185    pub async fn resolve(&self, ctx_id: &CtxId) -> Result<VerifiedContext, AcdpError> {
186        let parsed = CtxId::parse(ctx_id.as_str())?;
187        let authority = parsed.authority().to_string();
188        self.check_allowlist(&authority)?;
189
190        // RFC-ACDP-0006 §7: SSRF policy on the outbound base URL.
191        let base = format!("https://{authority}");
192        self.ssrf_policy
193            .check_url(&base)
194            .map_err(|e| AcdpError::CrossRegistryResolutionFailed(format!("SSRF policy: {e}")))?;
195
196        // Cached client (and capabilities) per authority.
197        let registry = self.client_for(&authority, &base).await?;
198        let caps = self.capabilities_for(&authority, &registry).await?;
199
200        // Step 3a: capabilities.registry_did MUST be `did:web:<authority>`.
201        // BUG-06: percent-encode `:` for host:port authorities so the
202        // expected DID round-trips with `authority_to_did_web`.
203        let expected_did = acdp_did::authority_to_did_web(&authority);
204        if caps.registry_did != expected_did {
205            return Err(AcdpError::CrossRegistryResolutionFailed(format!(
206                "registry DID '{}' does not match expected '{expected_did}'",
207                caps.registry_did
208            )));
209        }
210
211        // Step 3b (RFC-ACDP-0006 §4.1 step 3): resolve the registry's
212        // DID document and confirm the web binding matches `<authority>`.
213        let registry_doc = self
214            .did_resolver
215            .resolve(&caps.registry_did)
216            .await
217            .map_err(|e| {
218                AcdpError::CrossRegistryResolutionFailed(format!(
219                    "could not resolve registry DID document for '{}': {e}",
220                    caps.registry_did
221                ))
222            })?;
223        if registry_doc.id != caps.registry_did {
224            return Err(AcdpError::CrossRegistryResolutionFailed(format!(
225                "registry DID document `id` '{}' does not match capabilities.registry_did '{}'",
226                registry_doc.id, caps.registry_did
227            )));
228        }
229
230        // Steps 4–6: retrieve + verify. fed-009 / RFC-ACDP-0010 §7+§11:
231        // an upstream advertising `acdp-registry-receipts` MUST always
232        // serve a receipt — absence is a registry fault (`invalid_receipt`),
233        // not a degraded mode — so the policy escalates to `Require` for
234        // such upstreams. Receipt-less upstreams proceed under the
235        // v0.1.0 trust model (receipt verified only if one is present).
236        let policy = if caps.claims_profile(acdp_types::profile::Profile::RegistryReceipts) {
237            VerificationPolicy {
238                receipts: ReceiptPolicy::Require,
239                ..VerificationPolicy::default()
240            }
241        } else {
242            VerificationPolicy::default()
243        };
244        VerifiedContext::fetch_with_policy(&registry, &self.did_resolver, &parsed, &policy).await
245    }
246
247    /// Walk the `derived_from` graph rooted at `body` with cycle detection,
248    /// a per-edge depth cap of [`ResolverOptions::max_depth`], a total-
249    /// nodes cap of `max_nodes`, a per-context fanout cap of `max_fanout`,
250    /// and a wall-clock `total_timeout`. Returns each verified ancestor
251    /// (excluding the root). Breadth-first; closer ancestors are returned
252    /// first.
253    pub async fn walk_derived_from(&self, body: &Body) -> Result<Vec<VerifiedContext>, AcdpError> {
254        let total_timeout = self.options.total_timeout;
255        let fut = self.walk_derived_from_inner(body);
256        match tokio::time::timeout(total_timeout, fut).await {
257            Ok(res) => res,
258            Err(_) => Err(AcdpError::CrossRegistryResolutionFailed(format!(
259                "derived_from walk exceeded total_timeout={:?}",
260                total_timeout
261            ))),
262        }
263    }
264
265    async fn walk_derived_from_inner(
266        &self,
267        body: &Body,
268    ) -> Result<Vec<VerifiedContext>, AcdpError> {
269        let mut seen: HashSet<String> = HashSet::new();
270        seen.insert(body.ctx_id.0.clone());
271
272        if body.derived_from.len() > self.options.max_fanout {
273            return Err(AcdpError::CrossRegistryResolutionFailed(format!(
274                "root context {} has derived_from fanout {} > max_fanout={}",
275                body.ctx_id.0,
276                body.derived_from.len(),
277                self.options.max_fanout
278            )));
279        }
280
281        let mut results: Vec<VerifiedContext> = Vec::new();
282        let mut frontier: VecDeque<(CtxId, usize)> = body
283            .derived_from
284            .iter()
285            .map(|c| (c.clone(), 1usize))
286            .collect();
287
288        while let Some((next, depth)) = frontier.pop_front() {
289            if !seen.insert(next.0.clone()) {
290                continue; // cycle
291            }
292            if depth > self.options.max_depth {
293                return Err(AcdpError::CrossRegistryResolutionFailed(format!(
294                    "derived_from walk exceeded max_depth={} at {}",
295                    self.options.max_depth, next.0
296                )));
297            }
298            if results.len() >= self.options.max_nodes {
299                return Err(AcdpError::CrossRegistryResolutionFailed(format!(
300                    "derived_from walk exceeded max_nodes={} (last attempted: {})",
301                    self.options.max_nodes, next.0
302                )));
303            }
304            let verified = self.resolve(&next).await?;
305            let parents = &verified.body().derived_from;
306            if parents.len() > self.options.max_fanout {
307                return Err(AcdpError::CrossRegistryResolutionFailed(format!(
308                    "context {} has derived_from fanout {} > max_fanout={}",
309                    next.0,
310                    parents.len(),
311                    self.options.max_fanout
312                )));
313            }
314            for parent in parents {
315                if !seen.contains(parent.as_str()) {
316                    frontier.push_back((parent.clone(), depth + 1));
317                }
318            }
319            results.push(verified);
320        }
321        Ok(results)
322    }
323
324    fn check_allowlist(&self, authority: &str) -> Result<(), AcdpError> {
325        if let Some(list) = &self.allowlist {
326            if !list.contains(authority) {
327                return Err(AcdpError::CrossRegistryResolutionFailed(format!(
328                    "authority '{authority}' is not on the resolver allowlist"
329                )));
330            }
331        }
332        Ok(())
333    }
334
335    /// Return a cached `RegistryClient` for the authority, building one
336    /// on first use. Reuse across hops avoids per-hop reqwest
337    /// connection-pool churn.
338    ///
339    /// SEC-01: the client is built with [`RegistryClient::new_pinned`],
340    /// which resolves the authority's DNS up-front, filters every
341    /// resolved IP through the resolver's [`SsrfPolicy`], and pins the
342    /// connection to that address. Without pinning a hostile `ctx_id`
343    /// authority (e.g. `internal-host.example.com` resolving to
344    /// `10.0.0.1` or `169.254.169.254`) would slip past the URL-syntax
345    /// `check_url` gate and reach an internal target. The seeded test
346    /// path ([`Self::seed_client`]) bypasses this constructor.
347    async fn client_for(&self, authority: &str, base: &str) -> Result<RegistryClient, AcdpError> {
348        {
349            let cache = self.client_cache.lock().unwrap();
350            if let Some(c) = cache.get(authority) {
351                return Ok(c.clone());
352            }
353        }
354        // Build with full DNS pinning before taking the cache lock —
355        // `new_pinned` is async and the cache mutex must not be held
356        // across the await.
357        let client = RegistryClient::new_pinned(base, &self.ssrf_policy).await?;
358        let mut cache = self.client_cache.lock().unwrap();
359        Ok(cache.entry(authority.to_string()).or_insert(client).clone())
360    }
361
362    /// Return the cached capabilities for `authority`, fetching when
363    /// the entry is missing or its per-response TTL has elapsed.
364    ///
365    /// BUG-09: TTL comes from the response's `Cache-Control: max-age=N`
366    /// (clamped to `[1s, ResolverOptions::capabilities_ttl]` so the
367    /// resolver-wide ceiling still applies) rather than a fixed value.
368    /// A registry serving `Cache-Control: max-age=60` is honored; one
369    /// serving no `Cache-Control` falls back to the
370    /// [`RegistryClient::capabilities_with_ttl`] default (300s).
371    async fn capabilities_for(
372        &self,
373        authority: &str,
374        registry: &RegistryClient,
375    ) -> Result<CapabilitiesDocument, AcdpError> {
376        // Fast path: cache hit + within per-response TTL.
377        {
378            let cache = self.caps_cache.lock().unwrap();
379            if let Some((caps, fetched_at, ttl)) = cache.get(authority) {
380                if fetched_at.elapsed() < *ttl {
381                    return Ok(caps.clone());
382                }
383            }
384        }
385        let (caps, response_ttl) = registry
386            .capabilities_with_ttl()
387            .await
388            .map_err(|e| match e {
389                AcdpError::Http(_) | AcdpError::KeyResolutionUnreachable(_) => {
390                    AcdpError::CrossRegistryResolutionFailed(format!(
391                        "could not reach registry '{authority}': {e}"
392                    ))
393                }
394                other => other,
395            })?;
396        // Clamp to the resolver-wide ceiling so a registry advertising
397        // an absurd `max-age` can't pin a stale doc indefinitely.
398        let ttl = response_ttl.min(self.options.capabilities_ttl);
399        let mut cache = self.caps_cache.lock().unwrap();
400        cache.insert(authority.to_string(), (caps.clone(), Instant::now(), ttl));
401        Ok(caps)
402    }
403}
404
405#[cfg(test)]
406mod tests {
407    use super::*;
408
409    #[test]
410    fn allowlist_rejects_outside_authorities() {
411        let resolver =
412            CrossRegistryResolver::new().with_allowlist(["registry.example.com".to_string()]);
413        let err = resolver.check_allowlist("evil.com").unwrap_err();
414        assert!(matches!(err, AcdpError::CrossRegistryResolutionFailed(_)));
415        resolver.check_allowlist("registry.example.com").unwrap();
416    }
417
418    #[test]
419    fn options_default_values_match_doc() {
420        let o = ResolverOptions::default();
421        assert_eq!(o.max_depth, 10);
422        assert_eq!(o.max_nodes, 100);
423        assert_eq!(o.max_fanout, 32);
424        assert_eq!(o.total_timeout, Duration::from_secs(30));
425        assert_eq!(o.capabilities_ttl, Duration::from_secs(300));
426    }
427
428    #[test]
429    fn with_options_replaces_full_struct() {
430        let r = CrossRegistryResolver::new().with_options(ResolverOptions {
431            max_depth: 3,
432            max_nodes: 7,
433            max_fanout: 2,
434            total_timeout: Duration::from_secs(5),
435            capabilities_ttl: Duration::from_secs(60),
436        });
437        assert_eq!(r.options().max_depth, 3);
438        assert_eq!(r.options().max_nodes, 7);
439        assert_eq!(r.options().max_fanout, 2);
440    }
441
442    #[test]
443    fn cycle_detection_short_circuits() {
444        let _resolver = CrossRegistryResolver::new();
445        let mut seen: HashSet<String> = HashSet::new();
446        let id = "acdp://r/12345678-1234-4321-8123-123456781234".to_string();
447        assert!(seen.insert(id.clone()));
448        assert!(!seen.insert(id));
449    }
450}