Skip to main content

iroh_http_core/
client.rs

1//! Outgoing HTTP request — `fetch()` and `raw_connect()` implementation.
2//!
3//! HTTP/1.1 framing is delegated entirely to hyper.  Iroh's QUIC stream pair
4//! is wrapped in `IrohStream` and handed to hyper's client connection API.
5
6use bytes::Bytes;
7use http::{HeaderName, HeaderValue, Method, StatusCode};
8use http_body_util::{BodyExt, StreamBody};
9use hyper::body::Frame;
10use hyper_util::rt::TokioIo;
11
12use crate::{
13    io::IrohStream,
14    parse_node_addr,
15    stream::{BodyReader, BodyWriter, HandleStore},
16    CoreError, FfiDuplexStream, FfiResponse, IrohEndpoint, ALPN, ALPN_DUPLEX,
17};
18
19// ── BoxBody type alias ────────────────────────────────────────────────────────
20
21use crate::BoxBody;
22
23// ── Compression: thin tower service wrapper around hyper SendRequest ─────────
24
25/// Wraps `SendRequest<BoxBody>` as a `tower::Service` so compression/decompression
26/// layers from `tower-http` can be composed around it.
27#[cfg(feature = "compression")]
28struct HyperClientSvc(hyper::client::conn::http1::SendRequest<BoxBody>);
29
30#[cfg(feature = "compression")]
31impl tower::Service<hyper::Request<BoxBody>> for HyperClientSvc {
32    type Response = hyper::Response<hyper::body::Incoming>;
33    type Error = hyper::Error;
34    type Future = std::pin::Pin<
35        Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>,
36    >;
37
38    fn poll_ready(
39        &mut self,
40        cx: &mut std::task::Context<'_>,
41    ) -> std::task::Poll<Result<(), Self::Error>> {
42        self.0.poll_ready(cx)
43    }
44
45    fn call(&mut self, req: hyper::Request<BoxBody>) -> Self::Future {
46        Box::pin(self.0.send_request(req))
47    }
48}
49
50// ── In-flight fetch cancellation ──────────────────────────────────────────────
51
52// alloc_fetch_token / cancel_in_flight / get_fetch_cancel_notify / remove_fetch_token
53// are now in crate::stream (imported above).
54// ── Public fetch API ──────────────────────────────────────────────────────────
55
56#[allow(clippy::too_many_arguments)]
57pub async fn fetch(
58    endpoint: &IrohEndpoint,
59    remote_node_id: &str,
60    url: &str,
61    method: &str,
62    headers: &[(String, String)],
63    req_body_reader: Option<BodyReader>,
64    req_trailer_sender_handle: Option<u64>,
65    fetch_token: Option<u64>,
66    direct_addrs: Option<&[std::net::SocketAddr]>,
67) -> Result<FfiResponse, CoreError> {
68    // Reject standard web schemes.
69    {
70        let lower = url.to_ascii_lowercase();
71        if lower.starts_with("https://") || lower.starts_with("http://") {
72            let scheme_end = lower
73                .find("://")
74                .map(|i| i.saturating_add(3))
75                .unwrap_or(lower.len());
76            return Err(CoreError::invalid_input(format!(
77                "iroh-http URLs must use the \"httpi://\" scheme, not \"{}\". \
78                 Example: httpi://nodeId/path",
79                &url[..scheme_end]
80            )));
81        }
82    }
83
84    // Validate method and headers at the FFI boundary.
85    let http_method = Method::from_bytes(method.as_bytes())
86        .map_err(|_| CoreError::invalid_input(format!("invalid HTTP method {:?}", method)))?;
87    for (name, value) in headers {
88        HeaderName::from_bytes(name.as_bytes())
89            .map_err(|_| CoreError::invalid_input(format!("invalid header name {:?}", name)))?;
90        HeaderValue::from_str(value).map_err(|_| {
91            CoreError::invalid_input(format!("invalid header value for {:?}", name))
92        })?;
93    }
94
95    let cancel_notify = fetch_token.and_then(|t| endpoint.handles().get_fetch_cancel_notify(t));
96    let handles = endpoint.handles();
97
98    // Claim request trailer receiver (paired with the sender handle JS holds).
99    let req_trailer_rx = req_trailer_sender_handle.and_then(|h| {
100        if h == 0 {
101            None
102        } else {
103            handles.claim_pending_trailer_rx(h)
104        }
105    });
106
107    let parsed = parse_node_addr(remote_node_id)?;
108    let node_id = parsed.node_id;
109    let mut addr = iroh::EndpointAddr::new(node_id);
110    for a in &parsed.direct_addrs {
111        addr = addr.with_ip_addr(*a);
112    }
113    if let Some(addrs) = direct_addrs {
114        for a in addrs {
115            addr = addr.with_ip_addr(*a);
116        }
117    }
118
119    let ep_raw = endpoint.raw().clone();
120    let addr_clone = addr.clone();
121    let max_header_size = endpoint.max_header_size();
122
123    let pooled = endpoint
124        .pool()
125        .get_or_connect(node_id, ALPN, || async move {
126            ep_raw
127                .connect(addr_clone, ALPN)
128                .await
129                .map_err(|e| format!("connect: {e}"))
130        })
131        .await
132        .map_err(CoreError::connection_failed)?;
133
134    let conn = pooled.conn.clone();
135    let remote_str = pooled.remote_id_str.clone();
136
137    let result = do_fetch(
138        handles,
139        conn,
140        &remote_str,
141        url,
142        http_method,
143        headers,
144        req_body_reader,
145        req_trailer_rx,
146        max_header_size,
147    );
148
149    let out = if let Some(notify) = cancel_notify {
150        tokio::select! {
151            _ = notify.notified() => Err(CoreError::cancelled()),
152            r = result => r,
153        }
154    } else {
155        result.await
156    };
157
158    // Clean up the cancellation token.
159    if let Some(token) = fetch_token {
160        endpoint.handles().remove_fetch_token(token);
161    }
162
163    out
164}
165
166#[allow(clippy::too_many_arguments)]
167async fn do_fetch(
168    handles: &HandleStore,
169    conn: iroh::endpoint::Connection,
170    remote_str: &str,
171    url: &str,
172    method: Method,
173    headers: &[(String, String)],
174    req_body_reader: Option<BodyReader>,
175    req_trailer_rx: Option<crate::stream::TrailerRx>,
176    max_header_size: usize,
177) -> Result<FfiResponse, CoreError> {
178    let (send, recv) = conn
179        .open_bi()
180        .await
181        .map_err(|e| CoreError::connection_failed(format!("open_bi: {e}")))?;
182
183    let io = TokioIo::new(IrohStream::new(send, recv));
184
185    #[allow(unused_mut)] // mut only needed without the compression feature
186    let (mut sender, conn_task) = hyper::client::conn::http1::Builder::new()
187        // hyper requires max_buf_size >= 8192; clamp upward so small
188        // max_header_size values don't panic.  Header-size enforcement happens
189        // via the response parsing error that hyper returns when the actual
190        // response head exceeds max_header_size bytes.
191        .max_buf_size(max_header_size.max(8192))
192        .max_headers(128)
193        .handshake::<_, BoxBody>(io)
194        .await
195        .map_err(|e| CoreError::connection_failed(format!("hyper handshake: {e}")))?;
196
197    // Drive the connection state machine in the background.
198    tokio::spawn(conn_task);
199
200    let path = extract_path(url);
201
202    // Build the hyper request.
203    let mut req_builder = hyper::Request::builder()
204        .method(method)
205        .uri(&path)
206        .header(hyper::header::HOST, remote_str)
207        // Tell the server we accept chunked trailers (required for HTTP/1.1 trailer delivery).
208        .header("te", "trailers");
209
210    // When compression is enabled, advertise zstd-only Accept-Encoding — but
211    // only if the caller has not already set Accept-Encoding.  A caller passing
212    // `Accept-Encoding: identity` is opting out of compression and must not be
213    // overridden.
214    #[cfg(feature = "compression")]
215    {
216        let has_accept_encoding = headers
217            .iter()
218            .any(|(k, _)| k.eq_ignore_ascii_case("accept-encoding"));
219        if !has_accept_encoding {
220            req_builder = req_builder.header("accept-encoding", "zstd");
221        }
222    }
223
224    for (k, v) in headers {
225        req_builder = req_builder.header(k.as_str(), v.as_str());
226    }
227
228    let req_body: BoxBody = if let Some(reader) = req_body_reader {
229        // Adapt BodyReader → hyper body, including optional request trailers.
230        crate::box_body(body_from_reader(reader, req_trailer_rx))
231    } else {
232        crate::box_body(http_body_util::Empty::new())
233    };
234
235    let req = req_builder
236        .body(req_body)
237        .map_err(|e| CoreError::internal(format!("build request: {e}")))?;
238
239    // Dispatch: with compression, wrap sender in DecompressionLayer so the
240    // response body is transparently decompressed before reaching the channel pump.
241    #[cfg(feature = "compression")]
242    let resp = {
243        use tower::ServiceExt;
244        let svc = tower::ServiceBuilder::new()
245            .layer(tower_http::decompression::DecompressionLayer::new())
246            .service(HyperClientSvc(sender));
247        svc.oneshot(req)
248            .await
249            .map_err(|e| CoreError::connection_failed(format!("send_request: {e}")))?
250    };
251    #[cfg(not(feature = "compression"))]
252    let resp = sender
253        .send_request(req)
254        .await
255        .map_err(|e| CoreError::connection_failed(format!("send_request: {e}")))?;
256
257    let status = resp.status().as_u16();
258    // ISS-011: measure header bytes using raw values before string conversion;
259    // reject non-UTF8 response header values deterministically.
260    let header_bytes: usize = resp
261        .headers()
262        .iter()
263        .map(|(k, v)| {
264            k.as_str()
265                .len()
266                .saturating_add(v.as_bytes().len())
267                .saturating_add(4) // "name: value\r\n"
268        })
269        .fold(16usize, |acc, x| acc.saturating_add(x)); // approximate status line
270    if header_bytes > max_header_size {
271        return Err(CoreError::header_too_large(format!(
272            "response header size {header_bytes} exceeds limit {max_header_size}"
273        )));
274    }
275
276    let mut resp_headers: Vec<(String, String)> = Vec::new();
277    for (k, v) in resp.headers().iter() {
278        match v.to_str() {
279            Ok(s) => resp_headers.push((k.as_str().to_string(), s.to_string())),
280            Err(_) => {
281                return Err(CoreError::invalid_input(format!(
282                    "non-UTF8 response header value for '{}'",
283                    k.as_str()
284                )));
285            }
286        }
287    }
288
289    // Allocate channels for streaming the response body to JS.
290    let mut guard = handles.insert_guard();
291    let (trailer_tx, trailer_rx) = tokio::sync::oneshot::channel::<Vec<(String, String)>>();
292    let trailer_handle = guard.insert_trailer_receiver(trailer_rx)?;
293
294    let (res_writer, res_reader) = handles.make_body_channel();
295    let body = resp.into_body();
296    tokio::spawn(pump_hyper_body_to_channel(body, res_writer, trailer_tx));
297
298    let body_handle = guard.insert_reader(res_reader)?;
299    let response_url = format!("httpi://{remote_str}{path}");
300
301    guard.commit();
302    Ok(FfiResponse {
303        status,
304        headers: resp_headers,
305        body_handle,
306        url: response_url,
307        trailers_handle: trailer_handle,
308    })
309}
310
311// ── Body bridge utilities ─────────────────────────────────────────────────────
312
313/// Drain a hyper body into `BodyWriter`, delivering trailers via the oneshot when done.
314/// Generic over any body type with `Data = Bytes` (e.g. `Incoming`, `DecompressionBody`).
315pub(crate) async fn pump_hyper_body_to_channel<B>(
316    body: B,
317    writer: BodyWriter,
318    trailer_tx: tokio::sync::oneshot::Sender<Vec<(String, String)>>,
319) where
320    B: http_body::Body<Data = Bytes>,
321    B::Error: std::fmt::Debug,
322{
323    let timeout = writer.drain_timeout;
324    pump_hyper_body_to_channel_limited(body, writer, trailer_tx, None, timeout, None).await;
325}
326
327/// Drain with optional byte limit and a per-frame read timeout.
328///
329/// `frame_timeout` bounds how long we wait for each individual body frame.
330/// A slow-drip peer that stalls indefinitely will be cut off after this deadline.
331///
332/// When a byte limit is set and the body exceeds it, `overflow_tx` is fired
333/// so the caller can return a `413 Content Too Large` response (ISS-004).
334pub(crate) async fn pump_hyper_body_to_channel_limited<B>(
335    body: B,
336    writer: BodyWriter,
337    trailer_tx: tokio::sync::oneshot::Sender<Vec<(String, String)>>,
338    max_bytes: Option<usize>,
339    frame_timeout: std::time::Duration,
340    overflow_tx: Option<tokio::sync::oneshot::Sender<()>>,
341) where
342    B: http_body::Body<Data = Bytes>,
343    B::Error: std::fmt::Debug,
344{
345    // Box::pin gives Pin<Box<B>>: Unpin (Box<T>: Unpin ∀T), which satisfies BodyExt::frame().
346    let mut body = Box::pin(body);
347    let mut total = 0usize;
348    let mut trailers_vec: Vec<(String, String)> = Vec::new();
349
350    loop {
351        let frame_result = match tokio::time::timeout(frame_timeout, body.frame()).await {
352            Err(_elapsed) => {
353                tracing::warn!("iroh-http: body frame read timed out after {frame_timeout:?}");
354                break;
355            }
356            Ok(None) => break,
357            Ok(Some(r)) => r,
358        };
359        match frame_result {
360            Err(e) => {
361                tracing::warn!("iroh-http: body frame error: {e:?}");
362                break;
363            }
364            Ok(frame) => {
365                if frame.is_data() {
366                    let data = frame.into_data().expect("is_data checked above");
367                    total = total.saturating_add(data.len());
368                    if let Some(limit) = max_bytes {
369                        if total > limit {
370                            tracing::warn!("iroh-http: request body exceeded {limit} bytes");
371                            // ISS-004: signal overflow so the serve path can send 413.
372                            if let Some(tx) = overflow_tx {
373                                let _ = tx.send(());
374                            }
375                            break;
376                        }
377                    }
378                    if writer.send_chunk(data).await.is_err() {
379                        return; // reader dropped
380                    }
381                } else if frame.is_trailers() {
382                    let hdrs = frame.into_trailers().expect("is_trailers checked above");
383                    trailers_vec = hdrs
384                        .iter()
385                        .filter_map(|(k, v)| match v.to_str() {
386                            Ok(s) => Some((k.as_str().to_string(), s.to_string())),
387                            Err(_) => {
388                                tracing::warn!(
389                                    "iroh-http: dropping non-UTF8 trailer value for '{}'",
390                                    k.as_str()
391                                );
392                                None
393                            }
394                        })
395                        .collect();
396                }
397            }
398        }
399    }
400
401    drop(writer);
402    let _ = trailer_tx.send(trailers_vec);
403}
404
405/// Adapt a `BodyReader` + optional trailer channel into a hyper-compatible
406/// body using `StreamBody` backed by a futures stream.
407pub(crate) fn body_from_reader(
408    reader: BodyReader,
409    trailer_rx: Option<tokio::sync::oneshot::Receiver<Vec<(String, String)>>>,
410) -> StreamBody<impl futures::Stream<Item = Result<Frame<Bytes>, std::convert::Infallible>>> {
411    use futures::stream;
412
413    // State machine: first yield data frames, then optionally a trailer frame.
414    let s = stream::unfold(
415        (reader, trailer_rx, false),
416        |(reader, trailer_rx, done)| async move {
417            if done {
418                return None;
419            }
420            match reader.next_chunk().await {
421                Some(data) => Some((Ok(Frame::data(data)), (reader, trailer_rx, false))),
422                None => {
423                    // Body data complete — check for trailers.
424                    if let Some(rx) = trailer_rx {
425                        // ISS-016: bound the wait so declared-but-unsent trailers
426                        // don't stall completion indefinitely.
427                        let timeout = reader.drain_timeout;
428                        match tokio::time::timeout(timeout, rx).await {
429                            Ok(Ok(trailers)) => {
430                                let mut map = http::HeaderMap::new();
431                                for (k, v) in trailers {
432                                    if let (Ok(name), Ok(val)) = (
433                                        HeaderName::from_bytes(k.as_bytes()),
434                                        HeaderValue::from_str(&v),
435                                    ) {
436                                        map.append(name, val);
437                                    }
438                                }
439                                if !map.is_empty() {
440                                    return Some((Ok(Frame::trailers(map)), (reader, None, true)));
441                                }
442                            }
443                            Ok(Err(_)) => {
444                                // Sender dropped without sending — treat as no trailers.
445                            }
446                            Err(_) => {
447                                tracing::warn!(
448                                    "iroh-http: trailer wait timed out after {timeout:?}; \
449                                     completing body without trailers"
450                                );
451                            }
452                        }
453                    }
454                    None
455                }
456            }
457        },
458    );
459
460    StreamBody::new(s)
461}
462
463// ── Path extraction ───────────────────────────────────────────────────────────
464
465pub(crate) fn extract_path(url: &str) -> String {
466    let raw = if let Some(idx) = url.find("://") {
467        let after_scheme = url.get(idx.saturating_add(3)..).unwrap_or("");
468        if let Some(slash) = after_scheme.find('/') {
469            after_scheme[slash..].to_string()
470        } else if let Some(q) = after_scheme.find('?') {
471            // No path segment — check for query string (e.g. "httpi://node?x=1").
472            format!("/{}", &after_scheme[q..])
473        } else {
474            "/".to_string()
475        }
476    } else if url.starts_with('/') {
477        url.to_string()
478    } else {
479        format!("/{url}")
480    };
481
482    // RFC 9110 §4.1: fragment identifiers are client-side only and must
483    // never appear in the request-target sent on the wire.
484    match raw.find('#') {
485        Some(pos) => raw[..pos].to_string(),
486        None => raw,
487    }
488}
489
490// ── Duplex / raw_connect ──────────────────────────────────────────────────────
491
492/// Open a full-duplex QUIC connection to a remote node via HTTP Upgrade.
493pub async fn raw_connect(
494    endpoint: &IrohEndpoint,
495    remote_node_id: &str,
496    path: &str,
497    headers: &[(String, String)],
498) -> Result<FfiDuplexStream, CoreError> {
499    // Validate headers.
500    for (name, value) in headers {
501        HeaderName::from_bytes(name.as_bytes())
502            .map_err(|_| CoreError::invalid_input(format!("invalid header name {:?}", name)))?;
503        HeaderValue::from_str(value).map_err(|_| {
504            CoreError::invalid_input(format!("invalid header value for {:?}", name))
505        })?;
506    }
507
508    let parsed = parse_node_addr(remote_node_id)?;
509    let node_id = parsed.node_id;
510    let mut addr = iroh::EndpointAddr::new(node_id);
511    for a in &parsed.direct_addrs {
512        addr = addr.with_ip_addr(*a);
513    }
514
515    let ep_raw = endpoint.raw().clone();
516    let addr_clone = addr.clone();
517    let max_header_size = endpoint.max_header_size();
518    let handles = endpoint.handles();
519
520    let pooled = endpoint
521        .pool()
522        .get_or_connect(node_id, ALPN_DUPLEX, || async move {
523            ep_raw
524                .connect(addr_clone, ALPN_DUPLEX)
525                .await
526                .map_err(|e| format!("connect duplex: {e}"))
527        })
528        .await
529        .map_err(CoreError::connection_failed)?;
530
531    let (send, recv) = pooled
532        .conn
533        .open_bi()
534        .await
535        .map_err(|e| CoreError::connection_failed(format!("open_bi: {e}")))?;
536    let io = TokioIo::new(IrohStream::new(send, recv));
537
538    let (mut sender, conn_task) = hyper::client::conn::http1::Builder::new()
539        .max_buf_size(max_header_size.max(8192))
540        .handshake::<_, BoxBody>(io)
541        .await
542        .map_err(|e| CoreError::connection_failed(format!("hyper handshake (duplex): {e}")))?;
543
544    tokio::spawn(conn_task);
545
546    // Build CONNECT request with Upgrade: iroh-duplex.
547    // ISS-015: include Connection: upgrade for strict handshake compliance.
548    let mut req_builder = hyper::Request::builder()
549        .method(Method::from_bytes(b"CONNECT").expect("CONNECT is a valid HTTP method"))
550        .uri(path)
551        .header(hyper::header::CONNECTION, "upgrade")
552        .header(hyper::header::UPGRADE, "iroh-duplex");
553
554    for (k, v) in headers {
555        req_builder = req_builder.header(k.as_str(), v.as_str());
556    }
557
558    let req = req_builder
559        .body(crate::box_body(http_body_util::Empty::new()))
560        .map_err(|e| CoreError::internal(format!("build duplex request: {e}")))?;
561
562    let resp = sender
563        .send_request(req)
564        .await
565        .map_err(|e| CoreError::connection_failed(format!("send duplex request: {e}")))?;
566
567    let status = resp.status();
568    if status != StatusCode::SWITCHING_PROTOCOLS {
569        // ISS-022: use PeerRejected so callers can distinguish policy rejection
570        // from transport failure for retry/telemetry purposes.
571        return Err(CoreError::peer_rejected(format!(
572            "server rejected duplex: expected 101, got {status}"
573        )));
574    }
575
576    // Perform the protocol upgrade to get raw bidirectional IO.
577    let upgraded = hyper::upgrade::on(resp)
578        .await
579        .map_err(|e| CoreError::connection_failed(format!("upgrade error: {e}")))?;
580
581    let (server_write, server_read) = handles.make_body_channel();
582    let (client_write, client_read) = handles.make_body_channel();
583
584    let read_handle = handles.insert_reader(server_read)?;
585    let write_handle = handles.insert_writer(client_write)?;
586
587    // Pipe upgraded IO to/from body channels.
588    let io = TokioIo::new(upgraded);
589    tokio::spawn(crate::stream::pump_duplex(io, server_write, client_read));
590
591    Ok(FfiDuplexStream {
592        read_handle,
593        write_handle,
594    })
595}
596
597#[cfg(test)]
598mod tests {
599    use super::extract_path;
600
601    #[test]
602    fn extract_path_basic() {
603        assert_eq!(extract_path("httpi://node/foo/bar"), "/foo/bar");
604        assert_eq!(extract_path("httpi://node/"), "/");
605        assert_eq!(extract_path("httpi://node"), "/");
606    }
607
608    #[test]
609    fn extract_path_query_string() {
610        assert_eq!(extract_path("httpi://node/path?x=1"), "/path?x=1");
611        assert_eq!(extract_path("httpi://node?x=1"), "/?x=1");
612    }
613
614    #[test]
615    fn extract_path_fragment() {
616        // RFC 9110 §4.1: fragments must be stripped before sending.
617        assert_eq!(extract_path("httpi://node/path#frag"), "/path");
618        assert_eq!(extract_path("httpi://node/path?q=1#frag"), "/path?q=1");
619        assert_eq!(extract_path("/local#frag"), "/local");
620    }
621
622    #[test]
623    fn extract_path_bare_path() {
624        assert_eq!(extract_path("/already"), "/already");
625        assert_eq!(extract_path("no-slash"), "/no-slash");
626    }
627}