Skip to main content

iroh_http_core/
client.rs

1//! Outgoing HTTP request — `fetch()` and `raw_connect()` implementation.
2//!
3//! HTTP/1.1 framing is delegated entirely to hyper.  Iroh's QUIC stream pair
4//! is wrapped in `IrohStream` and handed to hyper's client connection API.
5
6use bytes::Bytes;
7use http::{HeaderName, HeaderValue, Method, StatusCode};
8use http_body_util::{BodyExt, StreamBody};
9use hyper::body::Frame;
10use hyper_util::rt::TokioIo;
11
12use crate::{
13    io::IrohStream,
14    parse_node_addr,
15    stream::{BodyReader, BodyWriter, HandleStore},
16    CoreError, FfiDuplexStream, FfiResponse, IrohEndpoint, ALPN, ALPN_DUPLEX,
17};
18
19// ── BoxBody type alias ────────────────────────────────────────────────────────
20
21use crate::BoxBody;
22
23// ── Compression: thin tower service wrapper around hyper SendRequest ─────────
24
25/// Wraps `SendRequest<BoxBody>` as a `tower::Service` so compression/decompression
26/// layers from `tower-http` can be composed around it.
27#[cfg(feature = "compression")]
28struct HyperClientSvc(hyper::client::conn::http1::SendRequest<BoxBody>);
29
30#[cfg(feature = "compression")]
31impl tower::Service<hyper::Request<BoxBody>> for HyperClientSvc {
32    type Response = hyper::Response<hyper::body::Incoming>;
33    type Error = hyper::Error;
34    type Future = std::pin::Pin<
35        Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>,
36    >;
37
38    fn poll_ready(
39        &mut self,
40        cx: &mut std::task::Context<'_>,
41    ) -> std::task::Poll<Result<(), Self::Error>> {
42        self.0.poll_ready(cx)
43    }
44
45    fn call(&mut self, req: hyper::Request<BoxBody>) -> Self::Future {
46        Box::pin(self.0.send_request(req))
47    }
48}
49
50// ── In-flight fetch cancellation ──────────────────────────────────────────────
51
52// alloc_fetch_token / cancel_in_flight / get_fetch_cancel_notify / remove_fetch_token
53// are now in crate::stream (imported above).
54// ── Public fetch API ──────────────────────────────────────────────────────────
55
56#[allow(clippy::too_many_arguments)]
57pub async fn fetch(
58    endpoint: &IrohEndpoint,
59    remote_node_id: &str,
60    url: &str,
61    method: &str,
62    headers: &[(String, String)],
63    req_body_reader: Option<BodyReader>,
64    fetch_token: Option<u64>,
65    direct_addrs: Option<&[std::net::SocketAddr]>,
66) -> Result<FfiResponse, CoreError> {
67    // Reject standard web schemes.
68    {
69        let lower = url.to_ascii_lowercase();
70        if lower.starts_with("https://") || lower.starts_with("http://") {
71            let scheme_end = lower
72                .find("://")
73                .map(|i| i.saturating_add(3))
74                .unwrap_or(lower.len());
75            return Err(CoreError::invalid_input(format!(
76                "iroh-http URLs must use the \"httpi://\" scheme, not \"{}\". \
77                 Example: httpi://nodeId/path",
78                &url[..scheme_end]
79            )));
80        }
81    }
82
83    // Validate method and headers at the FFI boundary.
84    let http_method = Method::from_bytes(method.as_bytes())
85        .map_err(|_| CoreError::invalid_input(format!("invalid HTTP method {:?}", method)))?;
86    for (name, value) in headers {
87        HeaderName::from_bytes(name.as_bytes())
88            .map_err(|_| CoreError::invalid_input(format!("invalid header name {:?}", name)))?;
89        HeaderValue::from_str(value).map_err(|_| {
90            CoreError::invalid_input(format!("invalid header value for {:?}", name))
91        })?;
92    }
93
94    let cancel_notify = fetch_token.and_then(|t| endpoint.handles().get_fetch_cancel_notify(t));
95    let handles = endpoint.handles();
96
97    // Wrap all fallible work so the cancel-token cleanup below always runs,
98    // even if connection setup returns early via `?`.
99    let out = async {
100        let parsed = parse_node_addr(remote_node_id)?;
101        let node_id = parsed.node_id;
102        let mut addr = iroh::EndpointAddr::new(node_id);
103        for a in &parsed.direct_addrs {
104            addr = addr.with_ip_addr(*a);
105        }
106        if let Some(addrs) = direct_addrs {
107            for a in addrs {
108                addr = addr.with_ip_addr(*a);
109            }
110        }
111
112        let ep_raw = endpoint.raw().clone();
113        let addr_clone = addr.clone();
114        let max_header_size = endpoint.max_header_size();
115        let max_response_body_bytes = endpoint.max_response_body_bytes();
116
117        let pooled = endpoint
118            .pool()
119            .get_or_connect(node_id, ALPN, || async move {
120                ep_raw
121                    .connect(addr_clone, ALPN)
122                    .await
123                    .map_err(|e| format!("connect: {e}"))
124            })
125            .await
126            .map_err(CoreError::connection_failed)?;
127
128        let conn = pooled.conn.clone();
129        let remote_str = pooled.remote_id_str.clone();
130
131        let result = do_fetch(
132            handles,
133            conn,
134            &remote_str,
135            url,
136            http_method,
137            headers,
138            req_body_reader,
139            max_header_size,
140            max_response_body_bytes,
141        );
142
143        if let Some(notify) = cancel_notify {
144            tokio::select! {
145                _ = notify.notified() => Err(CoreError::cancelled()),
146                r = result => r,
147            }
148        } else {
149            result.await
150        }
151    }
152    .await;
153
154    // Clean up the cancellation token — always, even on early error.
155    if let Some(token) = fetch_token {
156        endpoint.handles().remove_fetch_token(token);
157    }
158
159    out
160}
161
162/// Classify a hyper send-request or handshake error: if the error message
163/// indicates a header/buffer overflow (hyper emits "header" in its parse error
164/// descriptions), return `CoreError::HeaderTooLarge`; otherwise return
165/// `CoreError::ConnectionFailed`.  This gives callers consistent error types
166/// regardless of where hyper's internal buffer boundary falls relative to the
167/// configured `max_header_size`.
168fn classify_hyper_error(e: &impl std::fmt::Display, context: &str) -> CoreError {
169    let msg = e.to_string();
170    // hyper 1.x surfaces header-parse failures as e.g.
171    //   "error reading a body from connection: header too large"
172    //   "invalid HTTP method"
173    //   "header value is too long"
174    //   "too many headers"
175    // All of these mention "header" in the message.
176    if msg.to_ascii_lowercase().contains("header") {
177        CoreError::header_too_large(format!("{context}: {msg}"))
178    } else {
179        CoreError::connection_failed(format!("{context}: {msg}"))
180    }
181}
182
183#[allow(clippy::too_many_arguments)]
184async fn do_fetch(
185    handles: &HandleStore,
186    conn: iroh::endpoint::Connection,
187    remote_str: &str,
188    url: &str,
189    method: Method,
190    headers: &[(String, String)],
191    req_body_reader: Option<BodyReader>,
192    max_header_size: usize,
193    max_response_body_bytes: usize,
194) -> Result<FfiResponse, CoreError> {
195    let (send, recv) = conn
196        .open_bi()
197        .await
198        .map_err(|e| CoreError::connection_failed(format!("open_bi: {e}")))?;
199
200    let io = TokioIo::new(IrohStream::new(send, recv));
201
202    #[allow(unused_mut)] // mut only needed without the compression feature
203    let (mut sender, conn_task) = hyper::client::conn::http1::Builder::new()
204        // hyper requires max_buf_size >= 8192; clamp upward so small
205        // max_header_size values don't panic.  Header-size enforcement happens
206        // via the response parsing error that hyper returns when the actual
207        // response head exceeds max_header_size bytes.
208        .max_buf_size(max_header_size.max(8192))
209        .max_headers(128)
210        .handshake::<_, BoxBody>(io)
211        .await
212        .map_err(|e| CoreError::connection_failed(format!("hyper handshake: {e}")))?;
213
214    // Drive the connection state machine in the background.
215    tokio::spawn(conn_task);
216
217    let path = extract_path(url);
218
219    // Build the hyper request.
220    let mut req_builder = hyper::Request::builder()
221        .method(method)
222        .uri(&path)
223        .header(hyper::header::HOST, remote_str);
224
225    // When compression is enabled, advertise zstd-only Accept-Encoding — but
226    // only if the caller has not already set Accept-Encoding.  A caller passing
227    // `Accept-Encoding: identity` is opting out of compression and must not be
228    // overridden.
229    #[cfg(feature = "compression")]
230    {
231        let has_accept_encoding = headers
232            .iter()
233            .any(|(k, _)| k.eq_ignore_ascii_case("accept-encoding"));
234        if !has_accept_encoding {
235            req_builder = req_builder.header("accept-encoding", "zstd");
236        }
237    }
238
239    for (k, v) in headers {
240        req_builder = req_builder.header(k.as_str(), v.as_str());
241    }
242
243    let req_body: BoxBody = if let Some(reader) = req_body_reader {
244        crate::box_body(body_from_reader(reader))
245    } else {
246        crate::box_body(http_body_util::Empty::new())
247    };
248
249    let req = req_builder
250        .body(req_body)
251        .map_err(|e| CoreError::internal(format!("build request: {e}")))?;
252
253    // Dispatch: with compression, wrap sender in DecompressionLayer so the
254    // response body is transparently decompressed before reaching the channel pump.
255    #[cfg(feature = "compression")]
256    let resp = {
257        use tower::ServiceExt;
258        let svc = tower::ServiceBuilder::new()
259            .layer(tower_http::decompression::DecompressionLayer::new())
260            .service(HyperClientSvc(sender));
261        svc.oneshot(req)
262            .await
263            .map_err(|e| classify_hyper_error(&e, "send_request"))?
264    };
265    #[cfg(not(feature = "compression"))]
266    let resp = sender
267        .send_request(req)
268        .await
269        .map_err(|e| classify_hyper_error(&e, "send_request"))?;
270
271    let status = resp.status().as_u16();
272    // ISS-011: measure header bytes using raw values before string conversion;
273    // reject non-UTF8 response header values deterministically.
274    let header_bytes: usize = resp
275        .headers()
276        .iter()
277        .map(|(k, v)| {
278            k.as_str()
279                .len()
280                .saturating_add(v.as_bytes().len())
281                .saturating_add(4) // "name: value\r\n"
282        })
283        .fold(16usize, |acc, x| acc.saturating_add(x)); // approximate status line
284    if header_bytes > max_header_size {
285        return Err(CoreError::header_too_large(format!(
286            "response header size {header_bytes} exceeds limit {max_header_size}"
287        )));
288    }
289
290    let mut resp_headers: Vec<(String, String)> = Vec::new();
291    for (k, v) in resp.headers().iter() {
292        match v.to_str() {
293            Ok(s) => resp_headers.push((k.as_str().to_string(), s.to_string())),
294            Err(_) => {
295                return Err(CoreError::invalid_input(format!(
296                    "non-UTF8 response header value for '{}'",
297                    k.as_str()
298                )));
299            }
300        }
301    }
302
303    let response_url = format!("httpi://{remote_str}{path}");
304
305    // RFC 9110 §6.3: responses with status 204, 205, or 304 MUST NOT carry a
306    // message body.  Skip channel allocation entirely and return the slotmap
307    // null sentinel (0) for body_handle so the JS layer can use
308    // `bodyHandle === 0n` as a clean structural check without re-encoding
309    // HTTP semantics in every adapter.
310    if is_null_body_status(status) {
311        // Dropping the body signals to hyper that we are done reading.
312        // For a spec-compliant server the body is already empty; this is a
313        // defensive drain for misbehaving peers.
314        drop(resp.into_body());
315        return Ok(FfiResponse {
316            status,
317            headers: resp_headers,
318            body_handle: 0,
319            url: response_url,
320        });
321    }
322
323    // Allocate channels for streaming the response body to JS.
324    let mut guard = handles.insert_guard();
325
326    let (res_writer, res_reader) = handles.make_body_channel();
327    let body = resp.into_body();
328    let frame_timeout = res_writer.drain_timeout;
329    tokio::spawn(pump_hyper_body_to_channel_limited(
330        body,
331        res_writer,
332        Some(max_response_body_bytes),
333        frame_timeout,
334        None,
335    ));
336
337    let body_handle = guard.insert_reader(res_reader)?;
338
339    guard.commit();
340    Ok(FfiResponse {
341        status,
342        headers: resp_headers,
343        body_handle,
344        url: response_url,
345    })
346}
347
348/// RFC 9110 §6.3 — responses with these status codes MUST NOT contain a
349/// message body.  Skipping body-channel allocation for them avoids wasting
350/// resources and keeps HTTP semantics in core instead of every adapter.
351#[inline]
352fn is_null_body_status(status: u16) -> bool {
353    status == 204 || status == 205 || status == 304
354}
355
356// ── Body bridge utilities ─────────────────────────────────────────────────────
357
358/// Drain a hyper body into `BodyWriter`.
359/// Generic over any body type with `Data = Bytes` (e.g. `Incoming`, `DecompressionBody`).
360#[allow(dead_code)]
361pub(crate) async fn pump_hyper_body_to_channel<B>(body: B, writer: BodyWriter)
362where
363    B: http_body::Body<Data = Bytes>,
364    B::Error: std::fmt::Debug,
365{
366    let timeout = writer.drain_timeout;
367    pump_hyper_body_to_channel_limited(body, writer, None, timeout, None).await;
368}
369
370/// Drain with optional byte limit and a per-frame read timeout.
371///
372/// `frame_timeout` bounds how long we wait for each individual body frame.
373/// A slow-drip peer that stalls indefinitely will be cut off after this deadline.
374///
375/// When a byte limit is set and the body exceeds it, `overflow_tx` is fired
376/// so the caller can return a `413 Content Too Large` response (ISS-004).
377pub(crate) async fn pump_hyper_body_to_channel_limited<B>(
378    body: B,
379    writer: BodyWriter,
380    max_bytes: Option<usize>,
381    frame_timeout: std::time::Duration,
382    mut overflow_tx: Option<tokio::sync::oneshot::Sender<()>>,
383) where
384    B: http_body::Body<Data = Bytes>,
385    B::Error: std::fmt::Debug,
386{
387    // Box::pin gives Pin<Box<B>>: Unpin (Box<T>: Unpin ∀T), which satisfies BodyExt::frame().
388    let mut body = Box::pin(body);
389    let mut total = 0usize;
390    // Set to true once max_bytes is exceeded. When true, frames are read and
391    // discarded so the peer's QUIC send stream can receive flow-control ACKs
392    // and close cleanly instead of stalling until idle timeout (ISS-015).
393    let mut overflowed = false;
394
395    loop {
396        let frame_result = match tokio::time::timeout(frame_timeout, body.frame()).await {
397            Err(_elapsed) => {
398                tracing::warn!("iroh-http: body frame read timed out after {frame_timeout:?}");
399                break;
400            }
401            Ok(None) => break,
402            Ok(Some(r)) => r,
403        };
404        match frame_result {
405            Err(e) => {
406                tracing::warn!("iroh-http: body frame error: {e:?}");
407                break;
408            }
409            Ok(frame) => {
410                if overflowed {
411                    // Drain: discard the frame but keep reading so the QUIC
412                    // flow-control window advances and the peer can finish
413                    // writing its body.
414                    continue;
415                }
416                if frame.is_data() {
417                    let data = frame.into_data().expect("is_data checked above");
418                    total = total.saturating_add(data.len());
419                    if let Some(limit) = max_bytes {
420                        if total > limit {
421                            tracing::warn!("iroh-http: request body exceeded {limit} bytes");
422                            // ISS-004: signal overflow so the serve path can send 413.
423                            if let Some(tx) = overflow_tx.take() {
424                                let _ = tx.send(());
425                            }
426                            overflowed = true;
427                            continue; // drain remaining frames without stalling the peer
428                        }
429                    }
430                    if writer.send_chunk(data).await.is_err() {
431                        return; // reader dropped
432                    }
433                }
434            }
435        }
436    }
437
438    drop(writer);
439}
440
441/// Adapt a `BodyReader` into a hyper-compatible body using `StreamBody`
442/// backed by a futures stream.
443pub(crate) fn body_from_reader(
444    reader: BodyReader,
445) -> StreamBody<impl futures::Stream<Item = Result<Frame<Bytes>, std::convert::Infallible>>> {
446    use futures::stream;
447
448    let s = stream::unfold(reader, |reader| async move {
449        reader
450            .next_chunk()
451            .await
452            .map(|data| (Ok(Frame::data(data)), reader))
453    });
454
455    StreamBody::new(s)
456}
457
458// ── Path extraction ───────────────────────────────────────────────────────────
459
460pub(crate) fn extract_path(url: &str) -> String {
461    let raw = if let Some(idx) = url.find("://") {
462        let after_scheme = url.get(idx.saturating_add(3)..).unwrap_or("");
463        if let Some(slash) = after_scheme.find('/') {
464            after_scheme[slash..].to_string()
465        } else if let Some(q) = after_scheme.find('?') {
466            // No path segment — check for query string (e.g. "httpi://node?x=1").
467            format!("/{}", &after_scheme[q..])
468        } else {
469            "/".to_string()
470        }
471    } else if url.starts_with('/') {
472        url.to_string()
473    } else {
474        format!("/{url}")
475    };
476
477    // RFC 9110 §4.1: fragment identifiers are client-side only and must
478    // never appear in the request-target sent on the wire.
479    match raw.find('#') {
480        Some(pos) => raw[..pos].to_string(),
481        None => raw,
482    }
483}
484
485// ── Duplex / raw_connect ──────────────────────────────────────────────────────
486
487/// Open a full-duplex QUIC connection to a remote node via HTTP Upgrade.
488pub async fn raw_connect(
489    endpoint: &IrohEndpoint,
490    remote_node_id: &str,
491    path: &str,
492    headers: &[(String, String)],
493) -> Result<FfiDuplexStream, CoreError> {
494    // Validate headers.
495    for (name, value) in headers {
496        HeaderName::from_bytes(name.as_bytes())
497            .map_err(|_| CoreError::invalid_input(format!("invalid header name {:?}", name)))?;
498        HeaderValue::from_str(value).map_err(|_| {
499            CoreError::invalid_input(format!("invalid header value for {:?}", name))
500        })?;
501    }
502
503    let parsed = parse_node_addr(remote_node_id)?;
504    let node_id = parsed.node_id;
505    let mut addr = iroh::EndpointAddr::new(node_id);
506    for a in &parsed.direct_addrs {
507        addr = addr.with_ip_addr(*a);
508    }
509
510    let ep_raw = endpoint.raw().clone();
511    let addr_clone = addr.clone();
512    let max_header_size = endpoint.max_header_size();
513    let handles = endpoint.handles();
514
515    let pooled = endpoint
516        .pool()
517        .get_or_connect(node_id, ALPN_DUPLEX, || async move {
518            ep_raw
519                .connect(addr_clone, ALPN_DUPLEX)
520                .await
521                .map_err(|e| format!("connect duplex: {e}"))
522        })
523        .await
524        .map_err(CoreError::connection_failed)?;
525
526    let (send, recv) = pooled
527        .conn
528        .open_bi()
529        .await
530        .map_err(|e| CoreError::connection_failed(format!("open_bi: {e}")))?;
531    let io = TokioIo::new(IrohStream::new(send, recv));
532
533    let (mut sender, conn_task) = hyper::client::conn::http1::Builder::new()
534        .max_buf_size(max_header_size.max(8192))
535        .handshake::<_, BoxBody>(io)
536        .await
537        .map_err(|e| CoreError::connection_failed(format!("hyper handshake (duplex): {e}")))?;
538
539    tokio::spawn(conn_task);
540
541    // Build CONNECT request with Upgrade: iroh-duplex.
542    // ISS-015: include Connection: upgrade for strict handshake compliance.
543    let mut req_builder = hyper::Request::builder()
544        .method(Method::from_bytes(b"CONNECT").expect("CONNECT is a valid HTTP method"))
545        .uri(path)
546        .header(hyper::header::CONNECTION, "upgrade")
547        .header(hyper::header::UPGRADE, "iroh-duplex");
548
549    for (k, v) in headers {
550        req_builder = req_builder.header(k.as_str(), v.as_str());
551    }
552
553    let req = req_builder
554        .body(crate::box_body(http_body_util::Empty::new()))
555        .map_err(|e| CoreError::internal(format!("build duplex request: {e}")))?;
556
557    let resp = sender
558        .send_request(req)
559        .await
560        .map_err(|e| CoreError::connection_failed(format!("send duplex request: {e}")))?;
561
562    let status = resp.status();
563    if status != StatusCode::SWITCHING_PROTOCOLS {
564        // ISS-022: use PeerRejected so callers can distinguish policy rejection
565        // from transport failure for retry/telemetry purposes.
566        return Err(CoreError::peer_rejected(format!(
567            "server rejected duplex: expected 101, got {status}"
568        )));
569    }
570
571    // Perform the protocol upgrade to get raw bidirectional IO.
572    let upgraded = hyper::upgrade::on(resp)
573        .await
574        .map_err(|e| CoreError::connection_failed(format!("upgrade error: {e}")))?;
575
576    let (server_write, server_read) = handles.make_body_channel();
577    let (client_write, client_read) = handles.make_body_channel();
578
579    let read_handle = handles.insert_reader(server_read)?;
580    let write_handle = handles.insert_writer(client_write)?;
581
582    // Pipe upgraded IO to/from body channels.
583    let io = TokioIo::new(upgraded);
584    tokio::spawn(crate::stream::pump_duplex(io, server_write, client_read));
585
586    Ok(FfiDuplexStream {
587        read_handle,
588        write_handle,
589    })
590}
591
592#[cfg(test)]
593mod tests {
594    use super::extract_path;
595
596    #[test]
597    fn extract_path_basic() {
598        assert_eq!(extract_path("httpi://node/foo/bar"), "/foo/bar");
599        assert_eq!(extract_path("httpi://node/"), "/");
600        assert_eq!(extract_path("httpi://node"), "/");
601    }
602
603    #[test]
604    fn extract_path_query_string() {
605        assert_eq!(extract_path("httpi://node/path?x=1"), "/path?x=1");
606        assert_eq!(extract_path("httpi://node?x=1"), "/?x=1");
607    }
608
609    #[test]
610    fn extract_path_fragment() {
611        // RFC 9110 §4.1: fragments must be stripped before sending.
612        assert_eq!(extract_path("httpi://node/path#frag"), "/path");
613        assert_eq!(extract_path("httpi://node/path?q=1#frag"), "/path?q=1");
614        assert_eq!(extract_path("/local#frag"), "/local");
615    }
616
617    #[test]
618    fn extract_path_bare_path() {
619        assert_eq!(extract_path("/already"), "/already");
620        assert_eq!(extract_path("no-slash"), "/no-slash");
621    }
622}