acdp_client/data_ref.rs
1//! Data-reference fetching + hash verification (feature = "client").
2//!
3//! [`DataRef`] tells a consumer **where** a piece of underlying data
4//! lives; this module fetches it and verifies its integrity against the
5//! producer-signed `content_hash` (RFC-ACDP-0002 §6).
6//!
7//! Three pieces:
8//!
9//! - [`DataRefFetcher`] — trait that abstracts the fetch strategy. Native
10//! async-fn-in-trait, so `impl DataRefFetcher` works directly in
11//! generic positions. Wrap a custom impl in `Box<dyn …>` only if your
12//! call site needs dynamic dispatch.
13//! - [`HttpsDataRefFetcher`] — concrete fetcher for `https://…` URIs.
14//! The default [`acdp_safe_http::SsrfPolicy`] is HTTPS-only;
15//! `http://` is rejected at the URL boundary before any socket
16//! activity. A test SSRF policy with `allow_http: true` may relax
17//! this. Caps response size at 16 MiB and has a 30 s timeout.
18//! Structured locators are NOT handled — they need protocol-specific
19//! knowledge.
20//! - [`fetch_and_verify_data_ref`] — convenience helper that wires a
21//! fetcher to the declared `content_hash`, returning bytes only after
22//! the SHA-256 matches.
23//!
24//! ## Embedded refs
25//!
26//! `fetch_and_verify_data_ref` short-circuits embedded refs without
27//! touching the fetcher — the bytes are already in the body. The
28//! embedded-hash check (RFC-ACDP-0003 §2.1 step 3) is the
29//! [`acdp_validation::verify_embedded_hash`] entry point.
30
31use sha2::{Digest, Sha256};
32
33use acdp_primitives::error::AcdpError;
34use acdp_safe_http::SsrfPolicy;
35use acdp_types::data_ref::{DataRef, Location};
36use acdp_types::primitives::ContentHash;
37
38/// Default response-size cap for an HTTPS data-ref fetch.
39///
40/// 16 MiB. Producers that need to publish larger payloads SHOULD use a
41/// chunked storage scheme (S3 multipart, IPFS, etc.) rather than serve
42/// raw HTTPS. The cap exists to bound consumer memory regardless of
43/// what the producer claimed in `size_bytes`.
44pub const DEFAULT_MAX_BYTES: u64 = 16 * 1024 * 1024;
45
46/// Pluggable fetch strategy for a [`DataRef`]. Implementations are
47/// responsible for SSRF defenses and response-size caps on URI fetches;
48/// structured locators are protocol-specific and likely need their own
49/// trait impl per scheme (`kafka.offset`, `ipfs.cid`, …).
50pub trait DataRefFetcher: Send + Sync {
51 /// Fetch raw bytes referenced by `location`. Implementations MAY
52 /// reject [`Location::Structured`] with a clear error rather than
53 /// implementing every scheme.
54 fn fetch(
55 &self,
56 location: &Location,
57 ) -> impl std::future::Future<Output = Result<Vec<u8>, AcdpError>> + Send;
58}
59
60/// Default HTTPS-only fetcher.
61///
62/// Enforces:
63/// - [`SsrfPolicy`] checks on every URL (HTTPS-only, IP-literal
64/// rejection, private-range blocking).
65/// - `Range`-free `GET` with a hard byte cap (default 16 MiB).
66/// - 30 s total timeout (matches RFC-ACDP-0006 §7.4 for registry RPCs).
67///
68/// Constructed via [`Self::new`] (default cap) or
69/// [`Self::with_max_bytes`].
70pub struct HttpsDataRefFetcher {
71 http: reqwest::Client,
72 ssrf_policy: SsrfPolicy,
73 max_bytes: u64,
74}
75
76impl Default for HttpsDataRefFetcher {
77 fn default() -> Self {
78 Self::new()
79 }
80}
81
82impl HttpsDataRefFetcher {
83 /// Build a fetcher with the default 16 MiB cap and the default
84 /// [`SsrfPolicy`] (HTTPS-only, no IP literals, no private ranges).
85 pub fn new() -> Self {
86 Self::with_max_bytes(DEFAULT_MAX_BYTES)
87 }
88
89 /// Build a fetcher with a custom response-size cap.
90 pub fn with_max_bytes(max_bytes: u64) -> Self {
91 let policy = SsrfPolicy::default();
92 let http = build_data_ref_http_client(&policy)
93 .expect("HttpsDataRefFetcher HTTP client build failed");
94 Self {
95 http,
96 ssrf_policy: policy,
97 max_bytes,
98 }
99 }
100
101 /// Replace the [`SsrfPolicy`] (useful for tests).
102 ///
103 /// SEC-02: this rebuilds the underlying `reqwest::Client` so the new
104 /// policy is actually applied at the DNS layer. The HTTP client
105 /// carries a [`SafeDnsResolver`](acdp_safe_http) hook, so the
106 /// resolver only takes effect on a client built *with* the policy —
107 /// mutating `ssrf_policy` alone would leave the old DNS filter wired
108 /// in.
109 pub fn with_ssrf_policy(mut self, policy: SsrfPolicy) -> Self {
110 self.http = build_data_ref_http_client(&policy)
111 .expect("rebuild HttpsDataRefFetcher HTTP client with new SSRF policy");
112 self.ssrf_policy = policy;
113 self
114 }
115}
116
117/// Build the `reqwest::Client` used by [`HttpsDataRefFetcher`].
118///
119/// SEC-02: mirrors `WebResolver`'s build path so a `DataRef` fetch gets
120/// the same SSRF defenses as DID resolution:
121///
122/// - `policy` is plumbed into reqwest's `dns_resolver` hook via
123/// [`SafeDnsResolver`](acdp_safe_http), so every resolved IP is
124/// filtered against the policy *before any TCP connect*. A
125/// producer-controlled `location` URL whose hostname resolves into a
126/// forbidden range (loopback, RFC 1918, link-local/IMDS, ULA, …) is
127/// refused at DNS time — defeating DNS rebinding (RFC-ACDP-0008 §4.8).
128/// - Redirects are capped at [`acdp_primitives::limits::MAX_REDIRECTS`] and must
129/// stay on the original request's authority; a cross-authority
130/// redirect is rejected.
131fn build_data_ref_http_client(policy: &SsrfPolicy) -> Result<reqwest::Client, AcdpError> {
132 use acdp_primitives::limits::MAX_REDIRECTS;
133
134 let redirect_policy = reqwest::redirect::Policy::custom(|attempt| {
135 if attempt.previous().len() >= MAX_REDIRECTS {
136 return attempt.error(format!(
137 "data_ref fetch: exceeded {MAX_REDIRECTS} redirects"
138 ));
139 }
140 // Same-authority enforcement (scheme + host + port) against the
141 // original request URL. RFC-ACDP-0008 §4.8.
142 let cross = attempt
143 .previous()
144 .first()
145 .filter(|orig| !acdp_safe_http::same_fetch_authority(orig, attempt.url()))
146 .map(|orig| (orig.to_string(), attempt.url().to_string()));
147 if let Some((from, to)) = cross {
148 return attempt.error(format!(
149 "data_ref fetch: cross-authority redirect rejected ({from} -> {to})"
150 ));
151 }
152 attempt.follow()
153 });
154
155 reqwest::Client::builder()
156 .use_rustls_tls()
157 .connect_timeout(std::time::Duration::from_secs(5))
158 .timeout(std::time::Duration::from_secs(30))
159 .redirect(redirect_policy)
160 .dns_resolver(acdp_safe_http::SafeDnsResolver::arc(policy.clone()))
161 .build()
162 .map_err(|e| AcdpError::Http(e.to_string()))
163}
164
165impl DataRefFetcher for HttpsDataRefFetcher {
166 async fn fetch(&self, location: &Location) -> Result<Vec<u8>, AcdpError> {
167 let uri = match location {
168 Location::Uri(s) => s,
169 Location::Structured(_) => {
170 return Err(AcdpError::NotImplemented(
171 "HttpsDataRefFetcher does not handle structured locators \
172 (kafka.offset, ipfs.cid, …) — implement DataRefFetcher \
173 for the relevant scheme"
174 .into(),
175 ));
176 }
177 };
178
179 // SSRF policy gate — RFC-ACDP-0006 §7.1/§7.2.
180 self.ssrf_policy
181 .check_url(uri)
182 .map_err(|e| AcdpError::SchemaViolation(format!("SSRF policy on data_ref: {e}")))?;
183
184 let mut resp = self
185 .http
186 .get(uri)
187 .send()
188 .await
189 .map_err(|e| AcdpError::Http(e.to_string()))?;
190
191 if !resp.status().is_success() {
192 return Err(AcdpError::Http(format!(
193 "data_ref fetch returned HTTP {}",
194 resp.status()
195 )));
196 }
197
198 // Cap response size as we stream — defends against a producer
199 // that claimed a small size_bytes but the server returns more.
200 let mut buf = Vec::with_capacity(8 * 1024);
201 while let Some(chunk) = resp
202 .chunk()
203 .await
204 .map_err(|e| AcdpError::Http(e.to_string()))?
205 {
206 if (buf.len() as u64).saturating_add(chunk.len() as u64) > self.max_bytes {
207 return Err(AcdpError::PayloadTooLarge(format!(
208 "data_ref response exceeded {} bytes",
209 self.max_bytes
210 )));
211 }
212 buf.extend_from_slice(&chunk);
213 }
214 Ok(buf)
215 }
216}
217
218/// Convenience: fetch a [`DataRef`] and verify its declared
219/// `content_hash`.
220///
221/// Behavior:
222/// - **Embedded ref:** returns the decoded bytes via
223/// [`acdp_validation::embedded_decoded_bytes`]. If the ref also
224/// declares a `content_hash`, [`acdp_validation::verify_embedded_hash`]
225/// has already verified it at validation time; this function
226/// re-verifies as a defense-in-depth check.
227/// - **URI ref:** delegates to `fetcher` and recomputes SHA-256 over the
228/// returned bytes, checking against `dr.content_hash` when present.
229/// If `content_hash` is absent, returns the bytes unverified — the
230/// producer chose not to commit to a hash, so the consumer is on its own.
231/// - **Both URI and embedded:** rejected at validation; this function
232/// relies on that and assumes exactly one is present.
233pub async fn fetch_and_verify_data_ref(
234 dr: &DataRef,
235 fetcher: &impl DataRefFetcher,
236) -> Result<Vec<u8>, AcdpError> {
237 if let Some(emb) = &dr.embedded {
238 let bytes = acdp_validation::embedded_decoded_bytes(emb)?;
239 if dr.content_hash.is_some() {
240 acdp_validation::verify_embedded_hash(dr)?;
241 }
242 return Ok(bytes);
243 }
244 let Some(location) = &dr.location else {
245 return Err(AcdpError::SchemaViolation(
246 "data_ref has neither embedded nor location — cannot fetch".into(),
247 ));
248 };
249 let bytes = fetcher.fetch(location).await?;
250 if let Some(declared) = &dr.content_hash {
251 check_sha256(&bytes, declared)?;
252 }
253 Ok(bytes)
254}
255
256fn check_sha256(bytes: &[u8], declared: &ContentHash) -> Result<(), AcdpError> {
257 let Some(declared_hex) = declared.as_str().strip_prefix("sha256:") else {
258 return Err(AcdpError::SchemaViolation(format!(
259 "data_ref content_hash must start with 'sha256:', got '{}'",
260 declared.as_str()
261 )));
262 };
263 let got = format!("{:x}", Sha256::digest(bytes));
264 if got != declared_hex {
265 // BUG-02: a content-hash mismatch on external data is a
266 // data-reference-level integrity failure, not a body-level hash
267 // failure and not a signature failure. `invalid_signature`
268 // implies the producer's Ed25519 signature didn't verify (a
269 // key/key-binding problem); `hash_mismatch` implies the whole
270 // body is unverifiable. Neither is true here — the body is
271 // fine, only the bytes at this one location have diverged
272 // (RFC-ACDP-0007 §5 "Distinguishing hash failures", data-ref-008).
273 return Err(AcdpError::DataRefHashMismatch(format!(
274 "data_ref content_hash mismatch: declared sha256:{declared_hex}, computed sha256:{got}"
275 )));
276 }
277 Ok(())
278}
279
280#[cfg(test)]
281mod tests {
282 use super::*;
283 use acdp_types::data_ref::{DataRefType, EmbeddedContent, EmbeddedEncoding};
284 use sha2::{Digest, Sha256};
285
286 /// Stub fetcher returning canned bytes — used to test the
287 /// fetch-and-verify wrapper without touching the network.
288 struct StubFetcher {
289 bytes: Vec<u8>,
290 }
291 impl DataRefFetcher for StubFetcher {
292 async fn fetch(&self, _location: &Location) -> Result<Vec<u8>, AcdpError> {
293 Ok(self.bytes.clone())
294 }
295 }
296
297 #[tokio::test]
298 async fn fetch_and_verify_uri_ref_passes_with_matching_hash() {
299 let bytes = b"hello-world".to_vec();
300 let hash = format!("sha256:{:x}", Sha256::digest(&bytes));
301 let dr = DataRef::uri_verified(
302 DataRefType::RawData,
303 "https://example.com/data",
304 ContentHash(hash),
305 );
306 let got = fetch_and_verify_data_ref(
307 &dr,
308 &StubFetcher {
309 bytes: bytes.clone(),
310 },
311 )
312 .await
313 .unwrap();
314 assert_eq!(got, bytes);
315 }
316
317 #[tokio::test]
318 async fn fetch_and_verify_uri_ref_fails_on_hash_mismatch() {
319 let dr = DataRef::uri_verified(
320 DataRefType::RawData,
321 "https://example.com/data",
322 ContentHash(format!("sha256:{}", "0".repeat(64))),
323 );
324 let err = fetch_and_verify_data_ref(
325 &dr,
326 &StubFetcher {
327 bytes: b"different bytes".to_vec(),
328 },
329 )
330 .await
331 .unwrap_err();
332 // BUG-02: data-ref hash mismatch is a data-reference-level
333 // integrity failure — `data_ref_hash_mismatch`, distinct from
334 // body-level `hash_mismatch` and from `invalid_signature`.
335 assert!(
336 matches!(err, AcdpError::DataRefHashMismatch(_)),
337 "expected DataRefHashMismatch, got {err:?}"
338 );
339 }
340
341 #[tokio::test]
342 async fn fetch_and_verify_uri_ref_without_declared_hash_returns_bytes_unverified() {
343 let dr = DataRef::uri(DataRefType::RawData, "https://example.com/data");
344 let got = fetch_and_verify_data_ref(
345 &dr,
346 &StubFetcher {
347 bytes: b"unverified".to_vec(),
348 },
349 )
350 .await
351 .unwrap();
352 assert_eq!(got, b"unverified");
353 }
354
355 #[tokio::test]
356 async fn fetch_and_verify_embedded_ref_returns_decoded_bytes() {
357 use base64::{engine::general_purpose::STANDARD, Engine};
358 let payload = b"embedded-bytes";
359 let encoded = STANDARD.encode(payload);
360 let dr = DataRef {
361 ref_type: DataRefType::RawData,
362 description: None,
363 size_bytes: None,
364 format: None,
365 schema_version: None,
366 content_hash: None,
367 location: None,
368 embedded: Some(EmbeddedContent {
369 encoding: EmbeddedEncoding::Base64,
370 content: serde_json::json!(encoded),
371 }),
372 extensions: serde_json::Map::new(),
373 };
374 let got = fetch_and_verify_data_ref(&dr, &StubFetcher { bytes: vec![] })
375 .await
376 .unwrap();
377 assert_eq!(got, payload);
378 }
379
380 /// SSRF policy rejects HTTP-only URIs at the boundary, before the
381 /// stub fetcher ever runs. This verifies the fetcher-side gate; the
382 /// helper itself just defers to whatever the fetcher returns.
383 #[tokio::test]
384 async fn https_fetcher_rejects_http_uri() {
385 let f = HttpsDataRefFetcher::new();
386 let err = f
387 .fetch(&Location::Uri("http://insecure.example.com/x".into()))
388 .await
389 .unwrap_err();
390 assert!(matches!(err, AcdpError::SchemaViolation(_)));
391 }
392
393 /// Structured locators surface NotImplemented from the HTTPS fetcher
394 /// — a custom fetcher would override.
395 #[tokio::test]
396 async fn https_fetcher_rejects_structured_locator() {
397 let f = HttpsDataRefFetcher::new();
398 let mut m = serde_json::Map::new();
399 m.insert("scheme".into(), serde_json::json!("kafka.offset"));
400 let err = f.fetch(&Location::Structured(m)).await.unwrap_err();
401 assert!(matches!(err, AcdpError::NotImplemented(_)));
402 }
403
404 /// data-ref-ssrf-001 — an external `data_refs[].location` whose host
405 /// is an IP literal in a private / loopback / link-local / IMDS
406 /// range MUST be refused before any connection (RFC-ACDP-0008 §4.9).
407 /// The default `SsrfPolicy` rejects IP-literal URLs at `check_url`,
408 /// so no socket activity occurs.
409 #[tokio::test]
410 async fn https_fetcher_rejects_ip_literal_private_location() {
411 let f = HttpsDataRefFetcher::new();
412 for uri in [
413 "https://10.0.0.1/data.csv",
414 "https://127.0.0.1/data.csv",
415 "https://[::1]/data.csv",
416 "https://169.254.169.254/latest/meta-data/",
417 "https://192.168.1.10/export.parquet",
418 ] {
419 let err = f.fetch(&Location::Uri(uri.into())).await.unwrap_err();
420 assert!(
421 matches!(err, AcdpError::SchemaViolation(_)),
422 "data-ref-ssrf-001: '{uri}' must be refused by the SSRF policy, got {err:?}"
423 );
424 }
425 }
426
427 /// data-ref-ssrf-002 — an external `data_refs[].location` whose host
428 /// is a syntactically public DNS name that *resolves* to a loopback
429 /// address MUST be refused. The `SafeDnsResolver` DNS hook filters
430 /// the resolved IP before any TCP connect, defeating DNS rebinding.
431 /// `localhost` stands in for the fixture's synthetic hostname — it
432 /// always resolves to a loopback address.
433 #[tokio::test]
434 async fn https_fetcher_blocks_hostname_resolving_to_loopback() {
435 let f = HttpsDataRefFetcher::new();
436 let err = f
437 .fetch(&Location::Uri("https://localhost/data.csv".into()))
438 .await
439 .unwrap_err();
440 // The hostname passes `check_url` (not an IP literal); the
441 // SafeDnsResolver refuses the resolved loopback IP, surfacing as
442 // a transport error rather than a successful fetch.
443 assert!(
444 !matches!(err, AcdpError::NotImplemented(_)),
445 "data-ref-ssrf-002: loopback-resolving host must be blocked, got {err:?}"
446 );
447 }
448
449 /// data-ref-ssrf-002 escape hatch — a test harness MAY opt into
450 /// loopback via a non-default SSRF policy. With `allow_test_loopback`
451 /// the DNS filter no longer refuses `localhost`, so the fetch fails
452 /// only on the connection itself (nothing is listening) rather than
453 /// on policy — i.e. it is no longer an SSRF refusal.
454 #[tokio::test]
455 async fn https_fetcher_allow_test_loopback_permits_localhost_dns() {
456 let f = HttpsDataRefFetcher::new()
457 .with_ssrf_policy(acdp_safe_http::SsrfPolicy::allow_test_loopback());
458 // No server is listening, so this still errors — but the point
459 // is that `with_ssrf_policy` rebuilt the client with the relaxed
460 // DNS resolver (SEC-02); the policy, not a stale resolver, now
461 // governs the fetch.
462 let _ = f
463 .fetch(&Location::Uri("https://localhost:1/data.csv".into()))
464 .await;
465 }
466}