acdp_client/cross_registry.rs
1//! Cross-registry resolution per RFC-ACDP-0006 (feature = "client").
2//!
3//! Resolves a `ctx_id` whose authority differs from the registry the
4//! consumer is currently talking to. Walks the lineage of `derived_from`
5//! references with cycle detection, configurable depth / node / fanout
6//! caps, and per-authority caching of the `RegistryClient` and
7//! capabilities document.
8//!
9//! See RFC-ACDP-0006 §4.1 for the seven-step algorithm:
10//! 1. Parse URI → authority
11//! 2. Fetch the foreign registry's capabilities
12//! 3. Verify the registry DID matches `did:web:<authority>`
13//! 4. Retrieve the full context
14//! 5. Verify content_hash
15//! 6. Verify signature via DID resolution
16//! 7. Walk `derived_from` references (with cycle/depth/node/fanout/timeout limits)
17
18use std::collections::{HashMap, HashSet, VecDeque};
19use std::sync::Mutex;
20use std::time::{Duration, Instant};
21
22use crate::{ReceiptPolicy, RegistryClient, VerificationPolicy, VerifiedContext};
23use acdp_did::WebResolver;
24use acdp_primitives::error::AcdpError;
25use acdp_safe_http::SsrfPolicy;
26use acdp_types::body::Body;
27use acdp_types::primitives::CtxId;
28use acdp_types::CapabilitiesDocument;
29
30/// Per-walk and per-resolve safety options.
31///
32/// Defaults are tuned for RFC-ACDP-0006 §7.4 / §7.5 — they bound a walk
33/// even when the producer fabricates `derived_from` lists pointing into a
34/// foreign registry's pathological lineage graph.
35#[derive(Debug, Clone)]
36pub struct ResolverOptions {
37 /// Per-edge maximum depth (default 10).
38 pub max_depth: usize,
39 /// Total number of contexts the walk may verify (default 100). Acts
40 /// as a hard ceiling even when individual hops respect `max_depth`.
41 pub max_nodes: usize,
42 /// Maximum `derived_from` count permitted on any single context the
43 /// walker visits (default 32). A context that lists more parents is
44 /// either malformed or hostile — short-circuit before fanning out.
45 pub max_fanout: usize,
46 /// Wall-clock budget for the entire walk (default 30 s). Wraps
47 /// [`CrossRegistryResolver::walk_derived_from`] in `tokio::time::timeout`.
48 pub total_timeout: Duration,
49 /// How long to cache a foreign registry's capabilities document
50 /// before re-fetching (default 5 min). Avoids hammering the foreign
51 /// `/.well-known/acdp.json` on every hop.
52 pub capabilities_ttl: Duration,
53}
54
55impl Default for ResolverOptions {
56 fn default() -> Self {
57 Self {
58 max_depth: 10,
59 max_nodes: 100,
60 max_fanout: 32,
61 total_timeout: Duration::from_secs(30),
62 capabilities_ttl: Duration::from_secs(300),
63 }
64 }
65}
66
67/// Resolver for cross-registry references.
68///
69/// Holds a [`WebResolver`] for DID lookups and caches a [`RegistryClient`]
70/// + capabilities document per authority for the lifetime of the resolver.
71///
72/// The [`SsrfPolicy`] is consulted on every URL the resolver constructs
73/// (RFC-ACDP-0006 §7.1, §7.2).
74pub struct CrossRegistryResolver {
75 did_resolver: WebResolver,
76 options: ResolverOptions,
77 allowlist: Option<HashSet<String>>,
78 ssrf_policy: SsrfPolicy,
79 // Per-authority caches. Mutex-guarded for interior mutability across
80 // the immutable `&self` API surface; contention is low since
81 // authorities are few per walk.
82 client_cache: Mutex<HashMap<String, RegistryClient>>,
83 /// Per-authority capabilities cache. The `Duration` is the
84 /// per-response TTL parsed from `Cache-Control: max-age=N` (capped
85 /// at 3600s per RFC-ACDP-0006 §4.2). Replaces an earlier shape
86 /// that used the resolver-wide `capabilities_ttl` for every entry,
87 /// ignoring the registry's own cache hint (BUG-09).
88 caps_cache: Mutex<HashMap<String, (CapabilitiesDocument, Instant, Duration)>>,
89}
90
91impl Default for CrossRegistryResolver {
92 fn default() -> Self {
93 Self::new()
94 }
95}
96
97impl CrossRegistryResolver {
98 /// Build a resolver with default settings: no allowlist, depth 10,
99 /// HTTPS-only / no IP literals SSRF policy.
100 pub fn new() -> Self {
101 Self {
102 did_resolver: WebResolver::new(),
103 options: ResolverOptions::default(),
104 allowlist: None,
105 ssrf_policy: SsrfPolicy::default(),
106 client_cache: Mutex::new(HashMap::new()),
107 caps_cache: Mutex::new(HashMap::new()),
108 }
109 }
110
111 /// Override the [`SsrfPolicy`] applied to outbound URLs.
112 ///
113 /// Useful for test environments that need to allow `http://` or
114 /// IP-literal hosts. Production deployments SHOULD keep the default.
115 pub fn with_ssrf_policy(mut self, policy: SsrfPolicy) -> Self {
116 self.ssrf_policy = policy;
117 self
118 }
119
120 /// Cap the number of `derived_from` hops walked in a single
121 /// [`Self::walk_derived_from`] call.
122 pub fn with_max_depth(mut self, depth: usize) -> Self {
123 self.options.max_depth = depth;
124 self
125 }
126
127 /// Replace the complete options struct (overrides every individual
128 /// `with_*` setter that wasn't already applied).
129 pub fn with_options(mut self, options: ResolverOptions) -> Self {
130 self.options = options;
131 self
132 }
133
134 /// Borrow the active options. Useful for tests + telemetry.
135 pub fn options(&self) -> &ResolverOptions {
136 &self.options
137 }
138
139 /// Override the [`WebResolver`] used for DID document lookups.
140 ///
141 /// Primary use is supplying a `WebResolver::with_root_cert_pem`
142 /// instance in tests so a self-signed mock can answer DID-document
143 /// requests for `did:web:localhost%3A<port>`. Production callers do
144 /// not need this — the default resolver trusts the system CA bundle.
145 pub fn with_did_resolver(mut self, resolver: WebResolver) -> Self {
146 self.did_resolver = resolver;
147 self
148 }
149
150 /// Pre-populate the per-authority [`RegistryClient`] cache.
151 ///
152 /// Primary use is the conformance harness: tests supply a client
153 /// whose HTTP layer trusts the in-process TLS server's self-signed
154 /// root certificate (via [`RegistryClient::with_root_cert_pem`]), so
155 /// the resolver hits the mock instead of attempting a real network
156 /// call. The seeded client wins over the lazy
157 /// `RegistryClient::new_pinned` constructor that [`Self::resolve`]
158 /// would otherwise invoke on first access.
159 pub fn seed_client(&self, authority: impl Into<String>, client: RegistryClient) {
160 self.client_cache
161 .lock()
162 .unwrap()
163 .insert(authority.into(), client);
164 }
165
166 /// Restrict cross-registry resolution to a fixed set of authorities
167 /// (lowercase DNS hostnames). When set, any reference outside the
168 /// allowlist is rejected with [`AcdpError::CrossRegistryResolutionFailed`].
169 pub fn with_allowlist<I, S>(mut self, authorities: I) -> Self
170 where
171 I: IntoIterator<Item = S>,
172 S: Into<String>,
173 {
174 self.allowlist = Some(authorities.into_iter().map(Into::into).collect());
175 self
176 }
177
178 /// Resolve a single cross-registry [`CtxId`] end-to-end.
179 ///
180 /// Steps 1–6 of RFC-ACDP-0006 §4.1: parse, fetch capabilities,
181 /// verify the registry DID *and* its DID document's web binding,
182 /// retrieve, recompute hash, verify signature. The [`SsrfPolicy`]
183 /// is checked first so a hostile authority cannot drive an
184 /// internal-network request.
185 pub async fn resolve(&self, ctx_id: &CtxId) -> Result<VerifiedContext, AcdpError> {
186 let parsed = CtxId::parse(ctx_id.as_str())?;
187 let authority = parsed.authority().to_string();
188 self.check_allowlist(&authority)?;
189
190 // RFC-ACDP-0006 §7: SSRF policy on the outbound base URL.
191 let base = format!("https://{authority}");
192 self.ssrf_policy
193 .check_url(&base)
194 .map_err(|e| AcdpError::CrossRegistryResolutionFailed(format!("SSRF policy: {e}")))?;
195
196 // Cached client (and capabilities) per authority.
197 let registry = self.client_for(&authority, &base).await?;
198 let caps = self.capabilities_for(&authority, ®istry).await?;
199
200 // Step 3a: capabilities.registry_did MUST be `did:web:<authority>`.
201 // BUG-06: percent-encode `:` for host:port authorities so the
202 // expected DID round-trips with `authority_to_did_web`.
203 let expected_did = acdp_did::authority_to_did_web(&authority);
204 if caps.registry_did != expected_did {
205 return Err(AcdpError::CrossRegistryResolutionFailed(format!(
206 "registry DID '{}' does not match expected '{expected_did}'",
207 caps.registry_did
208 )));
209 }
210
211 // Step 3b (RFC-ACDP-0006 §4.1 step 3): resolve the registry's
212 // DID document and confirm the web binding matches `<authority>`.
213 let registry_doc = self
214 .did_resolver
215 .resolve(&caps.registry_did)
216 .await
217 .map_err(|e| {
218 AcdpError::CrossRegistryResolutionFailed(format!(
219 "could not resolve registry DID document for '{}': {e}",
220 caps.registry_did
221 ))
222 })?;
223 if registry_doc.id != caps.registry_did {
224 return Err(AcdpError::CrossRegistryResolutionFailed(format!(
225 "registry DID document `id` '{}' does not match capabilities.registry_did '{}'",
226 registry_doc.id, caps.registry_did
227 )));
228 }
229
230 // Steps 4–6: retrieve + verify. fed-009 / RFC-ACDP-0010 §7+§11:
231 // an upstream advertising `acdp-registry-receipts` MUST always
232 // serve a receipt — absence is a registry fault (`invalid_receipt`),
233 // not a degraded mode — so the policy escalates to `Require` for
234 // such upstreams. Receipt-less upstreams proceed under the
235 // v0.1.0 trust model (receipt verified only if one is present).
236 let policy = if caps.claims_profile(acdp_types::profile::Profile::RegistryReceipts) {
237 VerificationPolicy {
238 receipts: ReceiptPolicy::Require,
239 ..VerificationPolicy::default()
240 }
241 } else {
242 VerificationPolicy::default()
243 };
244 VerifiedContext::fetch_with_policy(®istry, &self.did_resolver, &parsed, &policy).await
245 }
246
247 /// Walk the `derived_from` graph rooted at `body` with cycle detection,
248 /// a per-edge depth cap of [`ResolverOptions::max_depth`], a total-
249 /// nodes cap of `max_nodes`, a per-context fanout cap of `max_fanout`,
250 /// and a wall-clock `total_timeout`. Returns each verified ancestor
251 /// (excluding the root). Breadth-first; closer ancestors are returned
252 /// first.
253 pub async fn walk_derived_from(&self, body: &Body) -> Result<Vec<VerifiedContext>, AcdpError> {
254 let total_timeout = self.options.total_timeout;
255 let fut = self.walk_derived_from_inner(body);
256 match tokio::time::timeout(total_timeout, fut).await {
257 Ok(res) => res,
258 Err(_) => Err(AcdpError::CrossRegistryResolutionFailed(format!(
259 "derived_from walk exceeded total_timeout={:?}",
260 total_timeout
261 ))),
262 }
263 }
264
265 async fn walk_derived_from_inner(
266 &self,
267 body: &Body,
268 ) -> Result<Vec<VerifiedContext>, AcdpError> {
269 let mut seen: HashSet<String> = HashSet::new();
270 seen.insert(body.ctx_id.0.clone());
271
272 if body.derived_from.len() > self.options.max_fanout {
273 return Err(AcdpError::CrossRegistryResolutionFailed(format!(
274 "root context {} has derived_from fanout {} > max_fanout={}",
275 body.ctx_id.0,
276 body.derived_from.len(),
277 self.options.max_fanout
278 )));
279 }
280
281 let mut results: Vec<VerifiedContext> = Vec::new();
282 let mut frontier: VecDeque<(CtxId, usize)> = body
283 .derived_from
284 .iter()
285 .map(|c| (c.clone(), 1usize))
286 .collect();
287
288 while let Some((next, depth)) = frontier.pop_front() {
289 if !seen.insert(next.0.clone()) {
290 continue; // cycle
291 }
292 if depth > self.options.max_depth {
293 return Err(AcdpError::CrossRegistryResolutionFailed(format!(
294 "derived_from walk exceeded max_depth={} at {}",
295 self.options.max_depth, next.0
296 )));
297 }
298 if results.len() >= self.options.max_nodes {
299 return Err(AcdpError::CrossRegistryResolutionFailed(format!(
300 "derived_from walk exceeded max_nodes={} (last attempted: {})",
301 self.options.max_nodes, next.0
302 )));
303 }
304 let verified = self.resolve(&next).await?;
305 let parents = &verified.body().derived_from;
306 if parents.len() > self.options.max_fanout {
307 return Err(AcdpError::CrossRegistryResolutionFailed(format!(
308 "context {} has derived_from fanout {} > max_fanout={}",
309 next.0,
310 parents.len(),
311 self.options.max_fanout
312 )));
313 }
314 for parent in parents {
315 if !seen.contains(parent.as_str()) {
316 frontier.push_back((parent.clone(), depth + 1));
317 }
318 }
319 results.push(verified);
320 }
321 Ok(results)
322 }
323
324 fn check_allowlist(&self, authority: &str) -> Result<(), AcdpError> {
325 if let Some(list) = &self.allowlist {
326 if !list.contains(authority) {
327 return Err(AcdpError::CrossRegistryResolutionFailed(format!(
328 "authority '{authority}' is not on the resolver allowlist"
329 )));
330 }
331 }
332 Ok(())
333 }
334
335 /// Return a cached `RegistryClient` for the authority, building one
336 /// on first use. Reuse across hops avoids per-hop reqwest
337 /// connection-pool churn.
338 ///
339 /// SEC-01: the client is built with [`RegistryClient::new_pinned`],
340 /// which resolves the authority's DNS up-front, filters every
341 /// resolved IP through the resolver's [`SsrfPolicy`], and pins the
342 /// connection to that address. Without pinning a hostile `ctx_id`
343 /// authority (e.g. `internal-host.example.com` resolving to
344 /// `10.0.0.1` or `169.254.169.254`) would slip past the URL-syntax
345 /// `check_url` gate and reach an internal target. The seeded test
346 /// path ([`Self::seed_client`]) bypasses this constructor.
347 async fn client_for(&self, authority: &str, base: &str) -> Result<RegistryClient, AcdpError> {
348 {
349 let cache = self.client_cache.lock().unwrap();
350 if let Some(c) = cache.get(authority) {
351 return Ok(c.clone());
352 }
353 }
354 // Build with full DNS pinning before taking the cache lock —
355 // `new_pinned` is async and the cache mutex must not be held
356 // across the await.
357 let client = RegistryClient::new_pinned(base, &self.ssrf_policy).await?;
358 let mut cache = self.client_cache.lock().unwrap();
359 Ok(cache.entry(authority.to_string()).or_insert(client).clone())
360 }
361
362 /// Return the cached capabilities for `authority`, fetching when
363 /// the entry is missing or its per-response TTL has elapsed.
364 ///
365 /// BUG-09: TTL comes from the response's `Cache-Control: max-age=N`
366 /// (clamped to `[1s, ResolverOptions::capabilities_ttl]` so the
367 /// resolver-wide ceiling still applies) rather than a fixed value.
368 /// A registry serving `Cache-Control: max-age=60` is honored; one
369 /// serving no `Cache-Control` falls back to the
370 /// [`RegistryClient::capabilities_with_ttl`] default (300s).
371 async fn capabilities_for(
372 &self,
373 authority: &str,
374 registry: &RegistryClient,
375 ) -> Result<CapabilitiesDocument, AcdpError> {
376 // Fast path: cache hit + within per-response TTL.
377 {
378 let cache = self.caps_cache.lock().unwrap();
379 if let Some((caps, fetched_at, ttl)) = cache.get(authority) {
380 if fetched_at.elapsed() < *ttl {
381 return Ok(caps.clone());
382 }
383 }
384 }
385 let (caps, response_ttl) = registry
386 .capabilities_with_ttl()
387 .await
388 .map_err(|e| match e {
389 AcdpError::Http(_) | AcdpError::KeyResolutionUnreachable(_) => {
390 AcdpError::CrossRegistryResolutionFailed(format!(
391 "could not reach registry '{authority}': {e}"
392 ))
393 }
394 other => other,
395 })?;
396 // Clamp to the resolver-wide ceiling so a registry advertising
397 // an absurd `max-age` can't pin a stale doc indefinitely.
398 let ttl = response_ttl.min(self.options.capabilities_ttl);
399 let mut cache = self.caps_cache.lock().unwrap();
400 cache.insert(authority.to_string(), (caps.clone(), Instant::now(), ttl));
401 Ok(caps)
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408
409 #[test]
410 fn allowlist_rejects_outside_authorities() {
411 let resolver =
412 CrossRegistryResolver::new().with_allowlist(["registry.example.com".to_string()]);
413 let err = resolver.check_allowlist("evil.com").unwrap_err();
414 assert!(matches!(err, AcdpError::CrossRegistryResolutionFailed(_)));
415 resolver.check_allowlist("registry.example.com").unwrap();
416 }
417
418 #[test]
419 fn options_default_values_match_doc() {
420 let o = ResolverOptions::default();
421 assert_eq!(o.max_depth, 10);
422 assert_eq!(o.max_nodes, 100);
423 assert_eq!(o.max_fanout, 32);
424 assert_eq!(o.total_timeout, Duration::from_secs(30));
425 assert_eq!(o.capabilities_ttl, Duration::from_secs(300));
426 }
427
428 #[test]
429 fn with_options_replaces_full_struct() {
430 let r = CrossRegistryResolver::new().with_options(ResolverOptions {
431 max_depth: 3,
432 max_nodes: 7,
433 max_fanout: 2,
434 total_timeout: Duration::from_secs(5),
435 capabilities_ttl: Duration::from_secs(60),
436 });
437 assert_eq!(r.options().max_depth, 3);
438 assert_eq!(r.options().max_nodes, 7);
439 assert_eq!(r.options().max_fanout, 2);
440 }
441
442 #[test]
443 fn cycle_detection_short_circuits() {
444 let _resolver = CrossRegistryResolver::new();
445 let mut seen: HashSet<String> = HashSet::new();
446 let id = "acdp://r/12345678-1234-4321-8123-123456781234".to_string();
447 assert!(seen.insert(id.clone()));
448 assert!(!seen.insert(id));
449 }
450}