Skip to main content

acdp_client/
data_ref.rs

1//! Data-reference fetching + hash verification (feature = "client").
2//!
3//! [`DataRef`] tells a consumer **where** a piece of underlying data
4//! lives; this module fetches it and verifies its integrity against the
5//! producer-signed `content_hash` (RFC-ACDP-0002 §6).
6//!
7//! Three pieces:
8//!
9//! - [`DataRefFetcher`] — trait that abstracts the fetch strategy. Native
10//!   async-fn-in-trait, so `impl DataRefFetcher` works directly in
11//!   generic positions. Wrap a custom impl in `Box<dyn …>` only if your
12//!   call site needs dynamic dispatch.
13//! - [`HttpsDataRefFetcher`] — concrete fetcher for `https://…` URIs.
14//!   The default [`acdp_safe_http::SsrfPolicy`] is HTTPS-only;
15//!   `http://` is rejected at the URL boundary before any socket
16//!   activity. A test SSRF policy with `allow_http: true` may relax
17//!   this. Caps response size at 16 MiB and has a 30 s timeout.
18//!   Structured locators are NOT handled — they need protocol-specific
19//!   knowledge.
20//! - [`fetch_and_verify_data_ref`] — convenience helper that wires a
21//!   fetcher to the declared `content_hash`, returning bytes only after
22//!   the SHA-256 matches.
23//!
24//! ## Embedded refs
25//!
26//! `fetch_and_verify_data_ref` short-circuits embedded refs without
27//! touching the fetcher — the bytes are already in the body. The
28//! embedded-hash check (RFC-ACDP-0003 §2.1 step 3) is the
29//! [`acdp_validation::verify_embedded_hash`] entry point.
30
31use sha2::{Digest, Sha256};
32
33use acdp_primitives::error::AcdpError;
34use acdp_safe_http::SsrfPolicy;
35use acdp_types::data_ref::{DataRef, Location};
36use acdp_types::primitives::ContentHash;
37
38/// Default response-size cap for an HTTPS data-ref fetch.
39///
40/// 16 MiB. Producers that need to publish larger payloads SHOULD use a
41/// chunked storage scheme (S3 multipart, IPFS, etc.) rather than serve
42/// raw HTTPS. The cap exists to bound consumer memory regardless of
43/// what the producer claimed in `size_bytes`.
44pub const DEFAULT_MAX_BYTES: u64 = 16 * 1024 * 1024;
45
46/// Pluggable fetch strategy for a [`DataRef`]. Implementations are
47/// responsible for SSRF defenses and response-size caps on URI fetches;
48/// structured locators are protocol-specific and likely need their own
49/// trait impl per scheme (`kafka.offset`, `ipfs.cid`, …).
50pub trait DataRefFetcher: Send + Sync {
51    /// Fetch raw bytes referenced by `location`. Implementations MAY
52    /// reject [`Location::Structured`] with a clear error rather than
53    /// implementing every scheme.
54    fn fetch(
55        &self,
56        location: &Location,
57    ) -> impl std::future::Future<Output = Result<Vec<u8>, AcdpError>> + Send;
58}
59
60/// Default HTTPS-only fetcher.
61///
62/// Enforces:
63/// - [`SsrfPolicy`] checks on every URL (HTTPS-only, IP-literal
64///   rejection, private-range blocking).
65/// - `Range`-free `GET` with a hard byte cap (default 16 MiB).
66/// - 30 s total timeout (matches RFC-ACDP-0006 §7.4 for registry RPCs).
67///
68/// Constructed via [`Self::new`] (default cap) or
69/// [`Self::with_max_bytes`].
70pub struct HttpsDataRefFetcher {
71    http: reqwest::Client,
72    ssrf_policy: SsrfPolicy,
73    max_bytes: u64,
74}
75
76impl Default for HttpsDataRefFetcher {
77    fn default() -> Self {
78        Self::new()
79    }
80}
81
82impl HttpsDataRefFetcher {
83    /// Build a fetcher with the default 16 MiB cap and the default
84    /// [`SsrfPolicy`] (HTTPS-only, no IP literals, no private ranges).
85    pub fn new() -> Self {
86        Self::with_max_bytes(DEFAULT_MAX_BYTES)
87    }
88
89    /// Build a fetcher with a custom response-size cap.
90    pub fn with_max_bytes(max_bytes: u64) -> Self {
91        let policy = SsrfPolicy::default();
92        let http = build_data_ref_http_client(&policy)
93            .expect("HttpsDataRefFetcher HTTP client build failed");
94        Self {
95            http,
96            ssrf_policy: policy,
97            max_bytes,
98        }
99    }
100
101    /// Replace the [`SsrfPolicy`] (useful for tests).
102    ///
103    /// SEC-02: this rebuilds the underlying `reqwest::Client` so the new
104    /// policy is actually applied at the DNS layer. The HTTP client
105    /// carries a [`SafeDnsResolver`](acdp_safe_http) hook, so the
106    /// resolver only takes effect on a client built *with* the policy —
107    /// mutating `ssrf_policy` alone would leave the old DNS filter wired
108    /// in.
109    pub fn with_ssrf_policy(mut self, policy: SsrfPolicy) -> Self {
110        self.http = build_data_ref_http_client(&policy)
111            .expect("rebuild HttpsDataRefFetcher HTTP client with new SSRF policy");
112        self.ssrf_policy = policy;
113        self
114    }
115}
116
117/// Build the `reqwest::Client` used by [`HttpsDataRefFetcher`].
118///
119/// SEC-02: mirrors `WebResolver`'s build path so a `DataRef` fetch gets
120/// the same SSRF defenses as DID resolution:
121///
122/// - `policy` is plumbed into reqwest's `dns_resolver` hook via
123///   [`SafeDnsResolver`](acdp_safe_http), so every resolved IP is
124///   filtered against the policy *before any TCP connect*. A
125///   producer-controlled `location` URL whose hostname resolves into a
126///   forbidden range (loopback, RFC 1918, link-local/IMDS, ULA, …) is
127///   refused at DNS time — defeating DNS rebinding (RFC-ACDP-0008 §4.8).
128/// - Redirects are capped at [`acdp_primitives::limits::MAX_REDIRECTS`] and must
129///   stay on the original request's authority; a cross-authority
130///   redirect is rejected.
131fn build_data_ref_http_client(policy: &SsrfPolicy) -> Result<reqwest::Client, AcdpError> {
132    use acdp_primitives::limits::MAX_REDIRECTS;
133
134    let redirect_policy = reqwest::redirect::Policy::custom(|attempt| {
135        if attempt.previous().len() >= MAX_REDIRECTS {
136            return attempt.error(format!(
137                "data_ref fetch: exceeded {MAX_REDIRECTS} redirects"
138            ));
139        }
140        // Same-authority enforcement (scheme + host + port) against the
141        // original request URL. RFC-ACDP-0008 §4.8.
142        let cross = attempt
143            .previous()
144            .first()
145            .filter(|orig| !acdp_safe_http::same_fetch_authority(orig, attempt.url()))
146            .map(|orig| (orig.to_string(), attempt.url().to_string()));
147        if let Some((from, to)) = cross {
148            return attempt.error(format!(
149                "data_ref fetch: cross-authority redirect rejected ({from} -> {to})"
150            ));
151        }
152        attempt.follow()
153    });
154
155    reqwest::Client::builder()
156        .use_rustls_tls()
157        .connect_timeout(std::time::Duration::from_secs(5))
158        .timeout(std::time::Duration::from_secs(30))
159        .redirect(redirect_policy)
160        .dns_resolver(acdp_safe_http::SafeDnsResolver::arc(policy.clone()))
161        .build()
162        .map_err(|e| AcdpError::Http(e.to_string()))
163}
164
165impl DataRefFetcher for HttpsDataRefFetcher {
166    async fn fetch(&self, location: &Location) -> Result<Vec<u8>, AcdpError> {
167        let uri = match location {
168            Location::Uri(s) => s,
169            Location::Structured(_) => {
170                return Err(AcdpError::NotImplemented(
171                    "HttpsDataRefFetcher does not handle structured locators \
172                     (kafka.offset, ipfs.cid, …) — implement DataRefFetcher \
173                     for the relevant scheme"
174                        .into(),
175                ));
176            }
177        };
178
179        // SSRF policy gate — RFC-ACDP-0006 §7.1/§7.2.
180        self.ssrf_policy
181            .check_url(uri)
182            .map_err(|e| AcdpError::SchemaViolation(format!("SSRF policy on data_ref: {e}")))?;
183
184        let mut resp = self
185            .http
186            .get(uri)
187            .send()
188            .await
189            .map_err(|e| AcdpError::Http(e.to_string()))?;
190
191        if !resp.status().is_success() {
192            return Err(AcdpError::Http(format!(
193                "data_ref fetch returned HTTP {}",
194                resp.status()
195            )));
196        }
197
198        // Cap response size as we stream — defends against a producer
199        // that claimed a small size_bytes but the server returns more.
200        let mut buf = Vec::with_capacity(8 * 1024);
201        while let Some(chunk) = resp
202            .chunk()
203            .await
204            .map_err(|e| AcdpError::Http(e.to_string()))?
205        {
206            if (buf.len() as u64).saturating_add(chunk.len() as u64) > self.max_bytes {
207                return Err(AcdpError::PayloadTooLarge(format!(
208                    "data_ref response exceeded {} bytes",
209                    self.max_bytes
210                )));
211            }
212            buf.extend_from_slice(&chunk);
213        }
214        Ok(buf)
215    }
216}
217
218/// Convenience: fetch a [`DataRef`] and verify its declared
219/// `content_hash`.
220///
221/// Behavior:
222/// - **Embedded ref:** returns the decoded bytes via
223///   [`acdp_validation::embedded_decoded_bytes`]. If the ref also
224///   declares a `content_hash`, [`acdp_validation::verify_embedded_hash`]
225///   has already verified it at validation time; this function
226///   re-verifies as a defense-in-depth check.
227/// - **URI ref:** delegates to `fetcher` and recomputes SHA-256 over the
228///   returned bytes, checking against `dr.content_hash` when present.
229///   If `content_hash` is absent, returns the bytes unverified — the
230///   producer chose not to commit to a hash, so the consumer is on its own.
231/// - **Both URI and embedded:** rejected at validation; this function
232///   relies on that and assumes exactly one is present.
233pub async fn fetch_and_verify_data_ref(
234    dr: &DataRef,
235    fetcher: &impl DataRefFetcher,
236) -> Result<Vec<u8>, AcdpError> {
237    if let Some(emb) = &dr.embedded {
238        let bytes = acdp_validation::embedded_decoded_bytes(emb)?;
239        if dr.content_hash.is_some() {
240            acdp_validation::verify_embedded_hash(dr)?;
241        }
242        return Ok(bytes);
243    }
244    let Some(location) = &dr.location else {
245        return Err(AcdpError::SchemaViolation(
246            "data_ref has neither embedded nor location — cannot fetch".into(),
247        ));
248    };
249    let bytes = fetcher.fetch(location).await?;
250    if let Some(declared) = &dr.content_hash {
251        check_sha256(&bytes, declared)?;
252    }
253    Ok(bytes)
254}
255
256fn check_sha256(bytes: &[u8], declared: &ContentHash) -> Result<(), AcdpError> {
257    let Some(declared_hex) = declared.as_str().strip_prefix("sha256:") else {
258        return Err(AcdpError::SchemaViolation(format!(
259            "data_ref content_hash must start with 'sha256:', got '{}'",
260            declared.as_str()
261        )));
262    };
263    let got = format!("{:x}", Sha256::digest(bytes));
264    if got != declared_hex {
265        // BUG-02: a content-hash mismatch on external data is a
266        // data-reference-level integrity failure, not a body-level hash
267        // failure and not a signature failure. `invalid_signature`
268        // implies the producer's Ed25519 signature didn't verify (a
269        // key/key-binding problem); `hash_mismatch` implies the whole
270        // body is unverifiable. Neither is true here — the body is
271        // fine, only the bytes at this one location have diverged
272        // (RFC-ACDP-0007 §5 "Distinguishing hash failures", data-ref-008).
273        return Err(AcdpError::DataRefHashMismatch(format!(
274            "data_ref content_hash mismatch: declared sha256:{declared_hex}, computed sha256:{got}"
275        )));
276    }
277    Ok(())
278}
279
280#[cfg(test)]
281mod tests {
282    use super::*;
283    use acdp_types::data_ref::{DataRefType, EmbeddedContent, EmbeddedEncoding};
284    use sha2::{Digest, Sha256};
285
286    /// Stub fetcher returning canned bytes — used to test the
287    /// fetch-and-verify wrapper without touching the network.
288    struct StubFetcher {
289        bytes: Vec<u8>,
290    }
291    impl DataRefFetcher for StubFetcher {
292        async fn fetch(&self, _location: &Location) -> Result<Vec<u8>, AcdpError> {
293            Ok(self.bytes.clone())
294        }
295    }
296
297    #[tokio::test]
298    async fn fetch_and_verify_uri_ref_passes_with_matching_hash() {
299        let bytes = b"hello-world".to_vec();
300        let hash = format!("sha256:{:x}", Sha256::digest(&bytes));
301        let dr = DataRef::uri_verified(
302            DataRefType::RawData,
303            "https://example.com/data",
304            ContentHash(hash),
305        );
306        let got = fetch_and_verify_data_ref(
307            &dr,
308            &StubFetcher {
309                bytes: bytes.clone(),
310            },
311        )
312        .await
313        .unwrap();
314        assert_eq!(got, bytes);
315    }
316
317    #[tokio::test]
318    async fn fetch_and_verify_uri_ref_fails_on_hash_mismatch() {
319        let dr = DataRef::uri_verified(
320            DataRefType::RawData,
321            "https://example.com/data",
322            ContentHash(format!("sha256:{}", "0".repeat(64))),
323        );
324        let err = fetch_and_verify_data_ref(
325            &dr,
326            &StubFetcher {
327                bytes: b"different bytes".to_vec(),
328            },
329        )
330        .await
331        .unwrap_err();
332        // BUG-02: data-ref hash mismatch is a data-reference-level
333        // integrity failure — `data_ref_hash_mismatch`, distinct from
334        // body-level `hash_mismatch` and from `invalid_signature`.
335        assert!(
336            matches!(err, AcdpError::DataRefHashMismatch(_)),
337            "expected DataRefHashMismatch, got {err:?}"
338        );
339    }
340
341    #[tokio::test]
342    async fn fetch_and_verify_uri_ref_without_declared_hash_returns_bytes_unverified() {
343        let dr = DataRef::uri(DataRefType::RawData, "https://example.com/data");
344        let got = fetch_and_verify_data_ref(
345            &dr,
346            &StubFetcher {
347                bytes: b"unverified".to_vec(),
348            },
349        )
350        .await
351        .unwrap();
352        assert_eq!(got, b"unverified");
353    }
354
355    #[tokio::test]
356    async fn fetch_and_verify_embedded_ref_returns_decoded_bytes() {
357        use base64::{engine::general_purpose::STANDARD, Engine};
358        let payload = b"embedded-bytes";
359        let encoded = STANDARD.encode(payload);
360        let dr = DataRef {
361            ref_type: DataRefType::RawData,
362            description: None,
363            size_bytes: None,
364            format: None,
365            schema_version: None,
366            content_hash: None,
367            location: None,
368            embedded: Some(EmbeddedContent {
369                encoding: EmbeddedEncoding::Base64,
370                content: serde_json::json!(encoded),
371            }),
372            extensions: serde_json::Map::new(),
373        };
374        let got = fetch_and_verify_data_ref(&dr, &StubFetcher { bytes: vec![] })
375            .await
376            .unwrap();
377        assert_eq!(got, payload);
378    }
379
380    /// SSRF policy rejects HTTP-only URIs at the boundary, before the
381    /// stub fetcher ever runs. This verifies the fetcher-side gate; the
382    /// helper itself just defers to whatever the fetcher returns.
383    #[tokio::test]
384    async fn https_fetcher_rejects_http_uri() {
385        let f = HttpsDataRefFetcher::new();
386        let err = f
387            .fetch(&Location::Uri("http://insecure.example.com/x".into()))
388            .await
389            .unwrap_err();
390        assert!(matches!(err, AcdpError::SchemaViolation(_)));
391    }
392
393    /// Structured locators surface NotImplemented from the HTTPS fetcher
394    /// — a custom fetcher would override.
395    #[tokio::test]
396    async fn https_fetcher_rejects_structured_locator() {
397        let f = HttpsDataRefFetcher::new();
398        let mut m = serde_json::Map::new();
399        m.insert("scheme".into(), serde_json::json!("kafka.offset"));
400        let err = f.fetch(&Location::Structured(m)).await.unwrap_err();
401        assert!(matches!(err, AcdpError::NotImplemented(_)));
402    }
403
404    /// data-ref-ssrf-001 — an external `data_refs[].location` whose host
405    /// is an IP literal in a private / loopback / link-local / IMDS
406    /// range MUST be refused before any connection (RFC-ACDP-0008 §4.9).
407    /// The default `SsrfPolicy` rejects IP-literal URLs at `check_url`,
408    /// so no socket activity occurs.
409    #[tokio::test]
410    async fn https_fetcher_rejects_ip_literal_private_location() {
411        let f = HttpsDataRefFetcher::new();
412        for uri in [
413            "https://10.0.0.1/data.csv",
414            "https://127.0.0.1/data.csv",
415            "https://[::1]/data.csv",
416            "https://169.254.169.254/latest/meta-data/",
417            "https://192.168.1.10/export.parquet",
418        ] {
419            let err = f.fetch(&Location::Uri(uri.into())).await.unwrap_err();
420            assert!(
421                matches!(err, AcdpError::SchemaViolation(_)),
422                "data-ref-ssrf-001: '{uri}' must be refused by the SSRF policy, got {err:?}"
423            );
424        }
425    }
426
427    /// data-ref-ssrf-002 — an external `data_refs[].location` whose host
428    /// is a syntactically public DNS name that *resolves* to a loopback
429    /// address MUST be refused. The `SafeDnsResolver` DNS hook filters
430    /// the resolved IP before any TCP connect, defeating DNS rebinding.
431    /// `localhost` stands in for the fixture's synthetic hostname — it
432    /// always resolves to a loopback address.
433    #[tokio::test]
434    async fn https_fetcher_blocks_hostname_resolving_to_loopback() {
435        let f = HttpsDataRefFetcher::new();
436        let err = f
437            .fetch(&Location::Uri("https://localhost/data.csv".into()))
438            .await
439            .unwrap_err();
440        // The hostname passes `check_url` (not an IP literal); the
441        // SafeDnsResolver refuses the resolved loopback IP, surfacing as
442        // a transport error rather than a successful fetch.
443        assert!(
444            !matches!(err, AcdpError::NotImplemented(_)),
445            "data-ref-ssrf-002: loopback-resolving host must be blocked, got {err:?}"
446        );
447    }
448
449    /// data-ref-ssrf-002 escape hatch — a test harness MAY opt into
450    /// loopback via a non-default SSRF policy. With `allow_test_loopback`
451    /// the DNS filter no longer refuses `localhost`, so the fetch fails
452    /// only on the connection itself (nothing is listening) rather than
453    /// on policy — i.e. it is no longer an SSRF refusal.
454    #[tokio::test]
455    async fn https_fetcher_allow_test_loopback_permits_localhost_dns() {
456        let f = HttpsDataRefFetcher::new()
457            .with_ssrf_policy(acdp_safe_http::SsrfPolicy::allow_test_loopback());
458        // No server is listening, so this still errors — but the point
459        // is that `with_ssrf_policy` rebuilt the client with the relaxed
460        // DNS resolver (SEC-02); the policy, not a stale resolver, now
461        // governs the fetch.
462        let _ = f
463            .fetch(&Location::Uri("https://localhost:1/data.csv".into()))
464            .await;
465    }
466}