Skip to main content

iroh_http_core/ffi/
fetch.rs

1//! FFI-shaped `fetch` — flat-string wrapper around the pure-Rust
2//! [`crate::http::client::fetch_request`].
3//!
4//! Slice D (#186): translates the JS adapter calling convention into a
5//! typed [`hyper::Request<Body>`] / [`iroh::EndpointAddr`] pair, hands
6//! them to [`crate::http::client::fetch_request`], and re-packages the
7//! response as a [`FfiResponse`] with a slotmap body handle. Maps
8//! [`crate::http::client::FetchError`] variants onto [`crate::CoreError`]
9//! codes for the FFI boundary.
10// Legitimate FFI wiring — uses the disallowed types intentionally.
11#![allow(clippy::disallowed_types)]
12
13use std::time::Duration;
14
15use http::{HeaderName, HeaderValue, Method};
16
17use crate::{
18    ffi::{handles::BodyReader, pumps::pump_hyper_body_to_channel_limited},
19    http::client::{fetch_request, FetchError},
20    parse_node_addr, Body, CoreError, FfiResponse, IrohEndpoint,
21};
22
23/// FFI-shaped fetch — re-exported as `iroh_http_core::fetch` for FFI
24/// binary compatibility (Slice D acceptance #5). Composition mirrors the
25/// pre-Slice-D function exactly; the moving parts that became pure Rust
26/// live in [`crate::http::client::fetch_request`].
27#[allow(clippy::too_many_arguments)]
28pub async fn fetch(
29    endpoint: &IrohEndpoint,
30    remote_node_id: &str,
31    url: &str,
32    method: &str,
33    headers: &[(String, String)],
34    req_body_reader: Option<BodyReader>,
35    fetch_token: Option<u64>,
36    direct_addrs: Option<&[std::net::SocketAddr]>,
37    timeout: Option<Duration>,
38    decompress: bool,
39    max_response_body_bytes: Option<usize>,
40) -> Result<FfiResponse, CoreError> {
41    // Reject standard web schemes.
42    {
43        let lower = url.to_ascii_lowercase();
44        if lower.starts_with("https://") || lower.starts_with("http://") {
45            let scheme_end = lower
46                .find("://")
47                .map(|i| i.saturating_add(3))
48                .unwrap_or(lower.len());
49            return Err(CoreError::invalid_input(format!(
50                "iroh-http URLs must use the \"httpi://\" scheme, not \"{}\". \
51                 Example: httpi://nodeId/path",
52                &url[..scheme_end]
53            )));
54        }
55    }
56
57    // Validate method and headers at the FFI boundary.
58    let http_method = Method::from_bytes(method.as_bytes())
59        .map_err(|_| CoreError::invalid_input(format!("invalid HTTP method {:?}", method)))?;
60    for (name, value) in headers {
61        HeaderName::from_bytes(name.as_bytes())
62            .map_err(|_| CoreError::invalid_input(format!("invalid header name {:?}", name)))?;
63        HeaderValue::from_str(value).map_err(|_| {
64            CoreError::invalid_input(format!("invalid header value for {:?}", name))
65        })?;
66    }
67
68    // Resolve the EndpointAddr once: bare node id, ticket, or JSON, plus
69    // any caller-supplied direct addresses.
70    let parsed = parse_node_addr(remote_node_id)?;
71    let mut addr = iroh::EndpointAddr::new(parsed.node_id);
72    for a in &parsed.direct_addrs {
73        addr = addr.with_ip_addr(*a);
74    }
75    if let Some(addrs) = direct_addrs {
76        for a in addrs {
77            addr = addr.with_ip_addr(*a);
78        }
79    }
80    let remote_str = crate::base32_encode(parsed.node_id.as_bytes());
81    let path = extract_path(url);
82
83    // Build the typed Request<Body>.
84    let mut req_builder = hyper::Request::builder()
85        .method(http_method)
86        .uri(&path)
87        .header(hyper::header::HOST, &remote_str);
88
89    // When compression is enabled, advertise zstd-only Accept-Encoding —
90    // but only if the caller has not already set Accept-Encoding. A caller
91    // passing `Accept-Encoding: identity` is opting out of compression and
92    // must not be overridden.
93    {
94        let has_accept_encoding = headers
95            .iter()
96            .any(|(k, _)| k.eq_ignore_ascii_case("accept-encoding"));
97        if !has_accept_encoding {
98            req_builder = req_builder.header("accept-encoding", "zstd");
99        }
100    }
101    for (k, v) in headers {
102        req_builder = req_builder.header(k.as_str(), v.as_str());
103    }
104
105    let req_body: Body = if let Some(reader) = req_body_reader {
106        Body::new(reader)
107    } else {
108        Body::empty()
109    };
110    let req = req_builder
111        .body(req_body)
112        .map_err(|e| CoreError::internal(format!("build request: {e}")))?;
113
114    let cancel_notify = fetch_token.and_then(|t| endpoint.handles().get_fetch_cancel_notify(t));
115
116    // Wire FFI-supplied knobs into the shared stack config.
117    // - `timeout` bounds time-to-response-head inside `fetch_request`.
118    // - `decompress` toggles the tower-http Decompression layer.
119    // Per-frame body-read timeout and the response-body byte limit are
120    // enforced below by `pump_hyper_body_to_channel_limited`; cancellation
121    // is handled by the `tokio::select!` on the cancel-token notifier.
122    let cfg = crate::http::server::stack::StackConfig {
123        timeout,
124        decompression: decompress,
125        ..crate::http::server::stack::StackConfig::default()
126    };
127
128    let fetch_fut = fetch_request(endpoint, &addr, req, &cfg);
129    let resp = match cancel_notify {
130        Some(notify) => tokio::select! {
131            _ = notify.notified() => {
132                if let Some(t) = fetch_token {
133                    endpoint.handles().remove_fetch_token(t);
134                }
135                return Err(CoreError::cancelled());
136            }
137            r = fetch_fut => r,
138        },
139        None => fetch_fut.await,
140    };
141
142    // Always clean up the cancellation token, even on error.
143    if let Some(t) = fetch_token {
144        endpoint.handles().remove_fetch_token(t);
145    }
146
147    let resp = resp.map_err(fetch_error_to_core)?;
148
149    package_response(endpoint, resp, &remote_str, &path, max_response_body_bytes).await
150}
151
152/// Translate the typed [`FetchError`] surface into the flat
153/// [`CoreError`] the FFI boundary expects.
154fn fetch_error_to_core(e: FetchError) -> CoreError {
155    match e {
156        FetchError::ConnectionFailed { detail, .. } => CoreError::connection_failed(detail),
157        FetchError::HeaderTooLarge { detail } => CoreError::header_too_large(detail),
158        FetchError::BodyTooLarge => CoreError::body_too_large("response body too large"),
159        FetchError::Timeout => CoreError::timeout("request timed out"),
160        FetchError::Cancelled => CoreError::cancelled(),
161        FetchError::Internal(msg) => CoreError::internal(msg),
162    }
163}
164
165/// Extract the path portion from an `httpi://nodeId/path?query#frag` URL.
166///
167/// Lives next to [`fetch`] because that is the sole caller — it constructs
168/// the request-target line for the outgoing HTTP/1.1 request.
169pub(crate) fn extract_path(url: &str) -> String {
170    if let Some(rest) = url.strip_prefix("httpi://") {
171        if let Some(slash) = rest.find('/') {
172            return rest[slash..].to_string();
173        }
174        return "/".to_string();
175    }
176    if url.starts_with('/') {
177        return url.to_string();
178    }
179    format!("/{url}")
180}
181
182/// Validate response head, allocate body channel, and assemble [`FfiResponse`].
183///
184/// Lives in `ffi::fetch` because every step here exists to satisfy the
185/// FFI contract: header-byte budget enforcement, RFC-9110 null-body
186/// handling, slotmap body handle allocation, `Vec<(String, String)>`
187/// header conversion. The pure-Rust caller would just consume
188/// `resp.into_body()` directly.
189async fn package_response(
190    endpoint: &IrohEndpoint,
191    resp: hyper::Response<Body>,
192    remote_str: &str,
193    path: &str,
194    max_response_body_bytes: Option<usize>,
195) -> Result<FfiResponse, CoreError> {
196    let max_header_size = endpoint.max_header_size();
197    // Per-call limit takes precedence; fall back to the endpoint-wide default.
198    let max_response_body_bytes =
199        max_response_body_bytes.unwrap_or_else(|| endpoint.max_response_body_bytes());
200    let handles = endpoint.handles();
201
202    let status = resp.status().as_u16();
203    // ISS-011: measure header bytes using raw values before string conversion;
204    // reject non-UTF8 response header values deterministically.
205    let header_bytes: usize = resp
206        .headers()
207        .iter()
208        .map(|(k, v)| {
209            k.as_str()
210                .len()
211                .saturating_add(v.as_bytes().len())
212                .saturating_add(4) // "name: value\r\n"
213        })
214        .fold(16usize, |acc, x| acc.saturating_add(x)); // approximate status line
215    if header_bytes > max_header_size {
216        return Err(CoreError::header_too_large(format!(
217            "response header size {header_bytes} exceeds limit {max_header_size}"
218        )));
219    }
220
221    let mut resp_headers: Vec<(String, String)> = Vec::new();
222    for (k, v) in resp.headers().iter() {
223        match v.to_str() {
224            Ok(s) => resp_headers.push((k.as_str().to_string(), s.to_string())),
225            Err(_) => {
226                return Err(CoreError::invalid_input(format!(
227                    "non-UTF8 response header value for '{}'",
228                    k.as_str()
229                )));
230            }
231        }
232    }
233
234    let response_url = format!("httpi://{remote_str}{path}");
235
236    // RFC 9110 §6.3: responses with status 204, 205, or 304 MUST NOT carry a
237    // message body. Skip channel allocation entirely and return the slotmap
238    // null sentinel (0) for body_handle so the JS layer can use
239    // `bodyHandle === 0n` as a clean structural check without re-encoding
240    // HTTP semantics in every adapter.
241    if matches!(status, 204 | 205 | 304) {
242        // Dropping the body signals to hyper that we are done reading.
243        // For a spec-compliant server the body is already empty; this is a
244        // defensive drain for misbehaving peers.
245        drop(resp.into_body());
246        return Ok(FfiResponse {
247            status,
248            headers: resp_headers,
249            body_handle: 0,
250            url: response_url,
251        });
252    }
253
254    // Allocate channels for streaming the response body to JS.
255    let mut guard = handles.insert_guard();
256    let (res_writer, res_reader) = handles.make_body_channel();
257    let body = resp.into_body();
258    let frame_timeout = res_writer.drain_timeout;
259    tokio::spawn(pump_hyper_body_to_channel_limited(
260        body,
261        res_writer,
262        Some(max_response_body_bytes),
263        frame_timeout,
264        None,
265    ));
266
267    let body_handle = guard.insert_reader(res_reader)?;
268    guard.commit();
269    Ok(FfiResponse {
270        status,
271        headers: resp_headers,
272        body_handle,
273        url: response_url,
274    })
275}