Skip to main content

iroh_http_core/
client.rs

1//! Outgoing HTTP request — `fetch()` and `raw_connect()` implementation.
2//!
3//! HTTP/1.1 framing is delegated entirely to hyper.  Iroh's QUIC stream pair
4//! is wrapped in `IrohStream` and handed to hyper's client connection API.
5
6use bytes::Bytes;
7use http::{HeaderName, HeaderValue, Method, StatusCode};
8use http_body_util::{BodyExt, StreamBody};
9use hyper::body::Frame;
10use hyper_util::rt::TokioIo;
11
12use crate::{
13    io::IrohStream,
14    parse_node_addr,
15    stream::{BodyReader, BodyWriter, HandleStore},
16    CoreError, FfiDuplexStream, FfiResponse, IrohEndpoint, ALPN, ALPN_DUPLEX,
17};
18
19// ── BoxBody type alias ────────────────────────────────────────────────────────
20
21use crate::BoxBody;
22
23// ── Compression: thin tower service wrapper around hyper SendRequest ─────────
24
25/// Wraps `SendRequest<BoxBody>` as a `tower::Service` so compression/decompression
26/// layers from `tower-http` can be composed around it.
27#[cfg(feature = "compression")]
28struct HyperClientSvc(hyper::client::conn::http1::SendRequest<BoxBody>);
29
30#[cfg(feature = "compression")]
31impl tower::Service<hyper::Request<BoxBody>> for HyperClientSvc {
32    type Response = hyper::Response<hyper::body::Incoming>;
33    type Error = hyper::Error;
34    type Future = std::pin::Pin<
35        Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>,
36    >;
37
38    fn poll_ready(
39        &mut self,
40        cx: &mut std::task::Context<'_>,
41    ) -> std::task::Poll<Result<(), Self::Error>> {
42        self.0.poll_ready(cx)
43    }
44
45    fn call(&mut self, req: hyper::Request<BoxBody>) -> Self::Future {
46        Box::pin(self.0.send_request(req))
47    }
48}
49
50// ── In-flight fetch cancellation ──────────────────────────────────────────────
51
52// alloc_fetch_token / cancel_in_flight / get_fetch_cancel_notify / remove_fetch_token
53// are now in crate::stream (imported above).
54// ── Public fetch API ──────────────────────────────────────────────────────────
55
56#[allow(clippy::too_many_arguments)]
57pub async fn fetch(
58    endpoint: &IrohEndpoint,
59    remote_node_id: &str,
60    url: &str,
61    method: &str,
62    headers: &[(String, String)],
63    req_body_reader: Option<BodyReader>,
64    req_trailer_sender_handle: Option<u64>,
65    fetch_token: Option<u64>,
66    direct_addrs: Option<&[std::net::SocketAddr]>,
67) -> Result<FfiResponse, CoreError> {
68    // Reject standard web schemes.
69    {
70        let lower = url.to_ascii_lowercase();
71        if lower.starts_with("https://") || lower.starts_with("http://") {
72            let scheme_end = lower.find("://").map(|i| i + 3).unwrap_or(lower.len());
73            return Err(CoreError::invalid_input(format!(
74                "iroh-http URLs must use the \"httpi://\" scheme, not \"{}\". \
75                 Example: httpi://nodeId/path",
76                &url[..scheme_end]
77            )));
78        }
79    }
80
81    // Validate method and headers at the FFI boundary.
82    let http_method = Method::from_bytes(method.as_bytes())
83        .map_err(|_| CoreError::invalid_input(format!("invalid HTTP method {:?}", method)))?;
84    for (name, value) in headers {
85        HeaderName::from_bytes(name.as_bytes())
86            .map_err(|_| CoreError::invalid_input(format!("invalid header name {:?}", name)))?;
87        HeaderValue::from_str(value).map_err(|_| {
88            CoreError::invalid_input(format!("invalid header value for {:?}", name))
89        })?;
90    }
91
92    let cancel_notify = fetch_token.and_then(|t| endpoint.handles().get_fetch_cancel_notify(t));
93    let handles = endpoint.handles();
94
95    // Claim request trailer receiver (paired with the sender handle JS holds).
96    let req_trailer_rx = req_trailer_sender_handle
97        .and_then(|h| if h == 0 { None } else { handles.claim_pending_trailer_rx(h) });
98
99    let parsed = parse_node_addr(remote_node_id)?;
100    let node_id = parsed.node_id;
101    let mut addr = iroh::EndpointAddr::new(node_id);
102    for a in &parsed.direct_addrs {
103        addr = addr.with_ip_addr(*a);
104    }
105    if let Some(addrs) = direct_addrs {
106        for a in addrs {
107            addr = addr.with_ip_addr(*a);
108        }
109    }
110
111    let ep_raw = endpoint.raw().clone();
112    let addr_clone = addr.clone();
113    let max_header_size = endpoint.max_header_size();
114
115    let pooled = endpoint
116        .pool()
117        .get_or_connect(node_id, ALPN, || async move {
118            ep_raw
119                .connect(addr_clone, ALPN)
120                .await
121                .map_err(|e| format!("connect: {e}"))
122        })
123        .await
124        .map_err(CoreError::connection_failed)?;
125
126    let conn = pooled.conn.clone();
127    let remote_str = pooled.remote_id_str.clone();
128
129    let result = do_fetch(
130        handles,
131        conn,
132        &remote_str,
133        url,
134        http_method,
135        headers,
136        req_body_reader,
137        req_trailer_rx,
138        max_header_size,
139    );
140
141    let out = if let Some(notify) = cancel_notify {
142        tokio::select! {
143            _ = notify.notified() => Err(CoreError::cancelled()),
144            r = result => r,
145        }
146    } else {
147        result.await
148    };
149
150    // Clean up the cancellation token.
151    if let Some(token) = fetch_token {
152        endpoint.handles().remove_fetch_token(token);
153    }
154
155    out
156}
157
158#[allow(clippy::too_many_arguments)]
159async fn do_fetch(
160    handles: &HandleStore,
161    conn: iroh::endpoint::Connection,
162    remote_str: &str,
163    url: &str,
164    method: Method,
165    headers: &[(String, String)],
166    req_body_reader: Option<BodyReader>,
167    req_trailer_rx: Option<crate::stream::TrailerRx>,
168    max_header_size: usize,
169) -> Result<FfiResponse, CoreError> {
170    let (send, recv) = conn
171        .open_bi()
172        .await
173        .map_err(|e| CoreError::connection_failed(format!("open_bi: {e}")))?;
174
175    let io = TokioIo::new(IrohStream::new(send, recv));
176
177    #[allow(unused_mut)] // mut only needed without the compression feature
178    let (mut sender, conn_task) = hyper::client::conn::http1::Builder::new()
179        // hyper requires max_buf_size >= 8192; clamp upward so small
180        // max_header_size values don't panic.  Header-size enforcement happens
181        // via the response parsing error that hyper returns when the actual
182        // response head exceeds max_header_size bytes.
183        .max_buf_size(max_header_size.max(8192))
184        .max_headers(128)
185        .handshake::<_, BoxBody>(io)
186        .await
187        .map_err(|e| CoreError::connection_failed(format!("hyper handshake: {e}")))?;
188
189    // Drive the connection state machine in the background.
190    tokio::spawn(conn_task);
191
192    let path = extract_path(url);
193
194    // Build the hyper request.
195    let mut req_builder = hyper::Request::builder()
196        .method(method)
197        .uri(&path)
198        .header(hyper::header::HOST, remote_str)
199        // Tell the server we accept chunked trailers (required for HTTP/1.1 trailer delivery).
200        .header("te", "trailers");
201
202    // When compression is enabled, advertise zstd-only Accept-Encoding — but
203    // only if the caller has not already set Accept-Encoding.  A caller passing
204    // `Accept-Encoding: identity` is opting out of compression and must not be
205    // overridden.
206    #[cfg(feature = "compression")]
207    {
208        let has_accept_encoding = headers
209            .iter()
210            .any(|(k, _)| k.eq_ignore_ascii_case("accept-encoding"));
211        if !has_accept_encoding {
212            req_builder = req_builder.header("accept-encoding", "zstd");
213        }
214    }
215
216    for (k, v) in headers {
217        req_builder = req_builder.header(k.as_str(), v.as_str());
218    }
219
220    let req_body: BoxBody = if let Some(reader) = req_body_reader {
221        // Adapt BodyReader → hyper body, including optional request trailers.
222        crate::box_body(body_from_reader(reader, req_trailer_rx))
223    } else {
224        crate::box_body(http_body_util::Empty::new())
225    };
226
227    let req = req_builder
228        .body(req_body)
229        .map_err(|e| CoreError::internal(format!("build request: {e}")))?;
230
231    // Dispatch: with compression, wrap sender in DecompressionLayer so the
232    // response body is transparently decompressed before reaching the channel pump.
233    #[cfg(feature = "compression")]
234    let resp = {
235        use tower::ServiceExt;
236        let svc = tower::ServiceBuilder::new()
237            .layer(tower_http::decompression::DecompressionLayer::new())
238            .service(HyperClientSvc(sender));
239        svc.oneshot(req)
240            .await
241            .map_err(|e| CoreError::connection_failed(format!("send_request: {e}")))?
242    };
243    #[cfg(not(feature = "compression"))]
244    let resp = sender
245        .send_request(req)
246        .await
247        .map_err(|e| CoreError::connection_failed(format!("send_request: {e}")))?;
248
249    let status = resp.status().as_u16();
250    // ISS-011: measure header bytes using raw values before string conversion;
251    // reject non-UTF8 response header values deterministically.
252    let header_bytes: usize = resp
253        .headers()
254        .iter()
255        .map(|(k, v)| k.as_str().len() + 2 + v.as_bytes().len() + 2) // "name: value\r\n"
256        .sum::<usize>()
257        + 16; // approximate status line
258    if header_bytes > max_header_size {
259        return Err(CoreError::header_too_large(format!(
260            "response header size {header_bytes} exceeds limit {max_header_size}"
261        )));
262    }
263
264    let mut resp_headers: Vec<(String, String)> = Vec::new();
265    for (k, v) in resp.headers().iter() {
266        match v.to_str() {
267            Ok(s) => resp_headers.push((k.as_str().to_string(), s.to_string())),
268            Err(_) => {
269                return Err(CoreError::invalid_input(format!(
270                    "non-UTF8 response header value for '{}'",
271                    k.as_str()
272                )));
273            }
274        }
275    }
276
277    // Allocate channels for streaming the response body to JS.
278    let mut guard = handles.insert_guard();
279    let (trailer_tx, trailer_rx) = tokio::sync::oneshot::channel::<Vec<(String, String)>>();
280    let trailer_handle = guard.insert_trailer_receiver(trailer_rx)?;
281
282    let (res_writer, res_reader) = handles.make_body_channel();
283    let body = resp.into_body();
284    tokio::spawn(pump_hyper_body_to_channel(body, res_writer, trailer_tx));
285
286    let body_handle = guard.insert_reader(res_reader)?;
287    let response_url = format!("httpi://{remote_str}{path}");
288
289    guard.commit();
290    Ok(FfiResponse {
291        status,
292        headers: resp_headers,
293        body_handle,
294        url: response_url,
295        trailers_handle: trailer_handle,
296    })
297}
298
299// ── Body bridge utilities ─────────────────────────────────────────────────────
300
301/// Drain a hyper body into `BodyWriter`, delivering trailers via the oneshot when done.
302/// Generic over any body type with `Data = Bytes` (e.g. `Incoming`, `DecompressionBody`).
303pub(crate) async fn pump_hyper_body_to_channel<B>(
304    body: B,
305    writer: BodyWriter,
306    trailer_tx: tokio::sync::oneshot::Sender<Vec<(String, String)>>,
307) where
308    B: http_body::Body<Data = Bytes>,
309    B::Error: std::fmt::Debug,
310{
311    let timeout = writer.drain_timeout;
312    pump_hyper_body_to_channel_limited(body, writer, trailer_tx, None, timeout, None).await;
313}
314
315/// Drain with optional byte limit and a per-frame read timeout.
316///
317/// `frame_timeout` bounds how long we wait for each individual body frame.
318/// A slow-drip peer that stalls indefinitely will be cut off after this deadline.
319///
320/// When a byte limit is set and the body exceeds it, `overflow_tx` is fired
321/// so the caller can return a `413 Content Too Large` response (ISS-004).
322pub(crate) async fn pump_hyper_body_to_channel_limited<B>(
323    body: B,
324    writer: BodyWriter,
325    trailer_tx: tokio::sync::oneshot::Sender<Vec<(String, String)>>,
326    max_bytes: Option<usize>,
327    frame_timeout: std::time::Duration,
328    overflow_tx: Option<tokio::sync::oneshot::Sender<()>>,
329) where
330    B: http_body::Body<Data = Bytes>,
331    B::Error: std::fmt::Debug,
332{
333    // Box::pin gives Pin<Box<B>>: Unpin (Box<T>: Unpin ∀T), which satisfies BodyExt::frame().
334    let mut body = Box::pin(body);
335    let mut total = 0usize;
336    let mut trailers_vec: Vec<(String, String)> = Vec::new();
337
338    loop {
339        let frame_result = match tokio::time::timeout(frame_timeout, body.frame()).await {
340            Err(_elapsed) => {
341                tracing::warn!("iroh-http: body frame read timed out after {frame_timeout:?}");
342                break;
343            }
344            Ok(None) => break,
345            Ok(Some(r)) => r,
346        };
347        match frame_result {
348            Err(e) => {
349                tracing::warn!("iroh-http: body frame error: {e:?}");
350                break;
351            }
352            Ok(frame) => {
353                if frame.is_data() {
354                    let data = frame.into_data().expect("is_data checked above");
355                    total += data.len();
356                    if let Some(limit) = max_bytes {
357                        if total > limit {
358                            tracing::warn!("iroh-http: request body exceeded {limit} bytes");
359                            // ISS-004: signal overflow so the serve path can send 413.
360                            if let Some(tx) = overflow_tx {
361                                let _ = tx.send(());
362                            }
363                            break;
364                        }
365                    }
366                    if writer.send_chunk(data).await.is_err() {
367                        return; // reader dropped
368                    }
369                } else if frame.is_trailers() {
370                    let hdrs = frame.into_trailers().expect("is_trailers checked above");
371                    trailers_vec = hdrs
372                        .iter()
373                        .filter_map(|(k, v)| match v.to_str() {
374                            Ok(s) => Some((k.as_str().to_string(), s.to_string())),
375                            Err(_) => {
376                                tracing::warn!(
377                                    "iroh-http: dropping non-UTF8 trailer value for '{}'",
378                                    k.as_str()
379                                );
380                                None
381                            }
382                        })
383                        .collect();
384                }
385            }
386        }
387    }
388
389    drop(writer);
390    let _ = trailer_tx.send(trailers_vec);
391}
392
393/// Adapt a `BodyReader` + optional trailer channel into a hyper-compatible
394/// body using `StreamBody` backed by a futures stream.
395pub(crate) fn body_from_reader(
396    reader: BodyReader,
397    trailer_rx: Option<tokio::sync::oneshot::Receiver<Vec<(String, String)>>>,
398) -> StreamBody<impl futures::Stream<Item = Result<Frame<Bytes>, std::convert::Infallible>>> {
399    use futures::stream;
400
401    // State machine: first yield data frames, then optionally a trailer frame.
402    let s = stream::unfold(
403        (reader, trailer_rx, false),
404        |(reader, trailer_rx, done)| async move {
405            if done {
406                return None;
407            }
408            match reader.next_chunk().await {
409                Some(data) => Some((Ok(Frame::data(data)), (reader, trailer_rx, false))),
410                None => {
411                    // Body data complete — check for trailers.
412                    if let Some(rx) = trailer_rx {
413                        // ISS-016: bound the wait so declared-but-unsent trailers
414                        // don't stall completion indefinitely.
415                        let timeout = reader.drain_timeout;
416                        match tokio::time::timeout(timeout, rx).await {
417                            Ok(Ok(trailers)) => {
418                                let mut map = http::HeaderMap::new();
419                                for (k, v) in trailers {
420                                    if let (Ok(name), Ok(val)) = (
421                                        HeaderName::from_bytes(k.as_bytes()),
422                                        HeaderValue::from_str(&v),
423                                    ) {
424                                        map.append(name, val);
425                                    }
426                                }
427                                if !map.is_empty() {
428                                    return Some((Ok(Frame::trailers(map)), (reader, None, true)));
429                                }
430                            }
431                            Ok(Err(_)) => {
432                                // Sender dropped without sending — treat as no trailers.
433                            }
434                            Err(_) => {
435                                tracing::warn!(
436                                    "iroh-http: trailer wait timed out after {timeout:?}; \
437                                     completing body without trailers"
438                                );
439                            }
440                        }
441                    }
442                    None
443                }
444            }
445        },
446    );
447
448    StreamBody::new(s)
449}
450
451// ── Path extraction ───────────────────────────────────────────────────────────
452
453pub(crate) fn extract_path(url: &str) -> String {
454    let raw = if let Some(idx) = url.find("://") {
455        let after_scheme = &url[idx + 3..];
456        if let Some(slash) = after_scheme.find('/') {
457            after_scheme[slash..].to_string()
458        } else if let Some(q) = after_scheme.find('?') {
459            // No path segment — check for query string (e.g. "httpi://node?x=1").
460            format!("/{}", &after_scheme[q..])
461        } else {
462            "/".to_string()
463        }
464    } else if url.starts_with('/') {
465        url.to_string()
466    } else {
467        format!("/{url}")
468    };
469
470    // RFC 9110 §4.1: fragment identifiers are client-side only and must
471    // never appear in the request-target sent on the wire.
472    match raw.find('#') {
473        Some(pos) => raw[..pos].to_string(),
474        None => raw,
475    }
476}
477
478// ── Duplex / raw_connect ──────────────────────────────────────────────────────
479
480/// Open a full-duplex QUIC connection to a remote node via HTTP Upgrade.
481pub async fn raw_connect(
482    endpoint: &IrohEndpoint,
483    remote_node_id: &str,
484    path: &str,
485    headers: &[(String, String)],
486) -> Result<FfiDuplexStream, CoreError> {
487    // Validate headers.
488    for (name, value) in headers {
489        HeaderName::from_bytes(name.as_bytes())
490            .map_err(|_| CoreError::invalid_input(format!("invalid header name {:?}", name)))?;
491        HeaderValue::from_str(value).map_err(|_| {
492            CoreError::invalid_input(format!("invalid header value for {:?}", name))
493        })?;
494    }
495
496    let parsed = parse_node_addr(remote_node_id)?;
497    let node_id = parsed.node_id;
498    let mut addr = iroh::EndpointAddr::new(node_id);
499    for a in &parsed.direct_addrs {
500        addr = addr.with_ip_addr(*a);
501    }
502
503    let ep_raw = endpoint.raw().clone();
504    let addr_clone = addr.clone();
505    let max_header_size = endpoint.max_header_size();
506    let handles = endpoint.handles();
507
508    let pooled = endpoint
509        .pool()
510        .get_or_connect(node_id, ALPN_DUPLEX, || async move {
511            ep_raw
512                .connect(addr_clone, ALPN_DUPLEX)
513                .await
514                .map_err(|e| format!("connect duplex: {e}"))
515        })
516        .await
517        .map_err(CoreError::connection_failed)?;
518
519    let (send, recv) = pooled
520        .conn
521        .open_bi()
522        .await
523        .map_err(|e| CoreError::connection_failed(format!("open_bi: {e}")))?;
524    let io = TokioIo::new(IrohStream::new(send, recv));
525
526    let (mut sender, conn_task) = hyper::client::conn::http1::Builder::new()
527        .max_buf_size(max_header_size.max(8192))
528        .handshake::<_, BoxBody>(io)
529        .await
530        .map_err(|e| CoreError::connection_failed(format!("hyper handshake (duplex): {e}")))?;
531
532    tokio::spawn(conn_task);
533
534    // Build CONNECT request with Upgrade: iroh-duplex.
535    // ISS-015: include Connection: upgrade for strict handshake compliance.
536    let mut req_builder = hyper::Request::builder()
537        .method(Method::from_bytes(b"CONNECT").unwrap())
538        .uri(path)
539        .header(hyper::header::CONNECTION, "upgrade")
540        .header(hyper::header::UPGRADE, "iroh-duplex");
541
542    for (k, v) in headers {
543        req_builder = req_builder.header(k.as_str(), v.as_str());
544    }
545
546    let req = req_builder
547        .body(crate::box_body(http_body_util::Empty::new()))
548        .map_err(|e| CoreError::internal(format!("build duplex request: {e}")))?;
549
550    let resp = sender
551        .send_request(req)
552        .await
553        .map_err(|e| CoreError::connection_failed(format!("send duplex request: {e}")))?;
554
555    let status = resp.status();
556    if status != StatusCode::SWITCHING_PROTOCOLS {
557        // ISS-022: use PeerRejected so callers can distinguish policy rejection
558        // from transport failure for retry/telemetry purposes.
559        return Err(CoreError::peer_rejected(format!(
560            "server rejected duplex: expected 101, got {status}"
561        )));
562    }
563
564    // Perform the protocol upgrade to get raw bidirectional IO.
565    let upgraded = hyper::upgrade::on(resp)
566        .await
567        .map_err(|e| CoreError::connection_failed(format!("upgrade error: {e}")))?;
568
569    let (server_write, server_read) = handles.make_body_channel();
570    let (client_write, client_read) = handles.make_body_channel();
571
572    let read_handle = handles.insert_reader(server_read)?;
573    let write_handle = handles.insert_writer(client_write)?;
574
575    // Pipe upgraded IO to/from body channels.
576    let io = TokioIo::new(upgraded);
577    tokio::spawn(crate::stream::pump_duplex(io, server_write, client_read));
578
579    Ok(FfiDuplexStream {
580        read_handle,
581        write_handle,
582    })
583}
584
585#[cfg(test)]
586mod tests {
587    use super::extract_path;
588
589    #[test]
590    fn extract_path_basic() {
591        assert_eq!(extract_path("httpi://node/foo/bar"), "/foo/bar");
592        assert_eq!(extract_path("httpi://node/"), "/");
593        assert_eq!(extract_path("httpi://node"), "/");
594    }
595
596    #[test]
597    fn extract_path_query_string() {
598        assert_eq!(extract_path("httpi://node/path?x=1"), "/path?x=1");
599        assert_eq!(extract_path("httpi://node?x=1"), "/?x=1");
600    }
601
602    #[test]
603    fn extract_path_fragment() {
604        // RFC 9110 §4.1: fragments must be stripped before sending.
605        assert_eq!(extract_path("httpi://node/path#frag"), "/path");
606        assert_eq!(extract_path("httpi://node/path?q=1#frag"), "/path?q=1");
607        assert_eq!(extract_path("/local#frag"), "/local");
608    }
609
610    #[test]
611    fn extract_path_bare_path() {
612        assert_eq!(extract_path("/already"), "/already");
613        assert_eq!(extract_path("no-slash"), "/no-slash");
614    }
615}