1use bytes::Bytes;
7use http::{HeaderName, HeaderValue, Method, StatusCode};
8use http_body_util::{BodyExt, StreamBody};
9use hyper::body::Frame;
10use hyper_util::rt::TokioIo;
11
12use crate::{
13 io::IrohStream,
14 parse_node_addr,
15 stream::{BodyReader, BodyWriter, HandleStore},
16 CoreError, FfiDuplexStream, FfiResponse, IrohEndpoint, ALPN, ALPN_DUPLEX,
17};
18
19use crate::BoxBody;
22
23#[cfg(feature = "compression")]
28struct HyperClientSvc(hyper::client::conn::http1::SendRequest<BoxBody>);
29
30#[cfg(feature = "compression")]
31impl tower::Service<hyper::Request<BoxBody>> for HyperClientSvc {
32 type Response = hyper::Response<hyper::body::Incoming>;
33 type Error = hyper::Error;
34 type Future = std::pin::Pin<
35 Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>,
36 >;
37
38 fn poll_ready(
39 &mut self,
40 cx: &mut std::task::Context<'_>,
41 ) -> std::task::Poll<Result<(), Self::Error>> {
42 self.0.poll_ready(cx)
43 }
44
45 fn call(&mut self, req: hyper::Request<BoxBody>) -> Self::Future {
46 Box::pin(self.0.send_request(req))
47 }
48}
49
50#[allow(clippy::too_many_arguments)]
57pub async fn fetch(
58 endpoint: &IrohEndpoint,
59 remote_node_id: &str,
60 url: &str,
61 method: &str,
62 headers: &[(String, String)],
63 req_body_reader: Option<BodyReader>,
64 fetch_token: Option<u64>,
65 direct_addrs: Option<&[std::net::SocketAddr]>,
66) -> Result<FfiResponse, CoreError> {
67 {
69 let lower = url.to_ascii_lowercase();
70 if lower.starts_with("https://") || lower.starts_with("http://") {
71 let scheme_end = lower
72 .find("://")
73 .map(|i| i.saturating_add(3))
74 .unwrap_or(lower.len());
75 return Err(CoreError::invalid_input(format!(
76 "iroh-http URLs must use the \"httpi://\" scheme, not \"{}\". \
77 Example: httpi://nodeId/path",
78 &url[..scheme_end]
79 )));
80 }
81 }
82
83 let http_method = Method::from_bytes(method.as_bytes())
85 .map_err(|_| CoreError::invalid_input(format!("invalid HTTP method {:?}", method)))?;
86 for (name, value) in headers {
87 HeaderName::from_bytes(name.as_bytes())
88 .map_err(|_| CoreError::invalid_input(format!("invalid header name {:?}", name)))?;
89 HeaderValue::from_str(value).map_err(|_| {
90 CoreError::invalid_input(format!("invalid header value for {:?}", name))
91 })?;
92 }
93
94 let cancel_notify = fetch_token.and_then(|t| endpoint.handles().get_fetch_cancel_notify(t));
95 let handles = endpoint.handles();
96
97 let out = async {
100 let parsed = parse_node_addr(remote_node_id)?;
101 let node_id = parsed.node_id;
102 let mut addr = iroh::EndpointAddr::new(node_id);
103 for a in &parsed.direct_addrs {
104 addr = addr.with_ip_addr(*a);
105 }
106 if let Some(addrs) = direct_addrs {
107 for a in addrs {
108 addr = addr.with_ip_addr(*a);
109 }
110 }
111
112 let ep_raw = endpoint.raw().clone();
113 let addr_clone = addr.clone();
114 let max_header_size = endpoint.max_header_size();
115 let max_response_body_bytes = endpoint.max_response_body_bytes();
116
117 let pooled = endpoint
118 .pool()
119 .get_or_connect(node_id, ALPN, || async move {
120 ep_raw
121 .connect(addr_clone, ALPN)
122 .await
123 .map_err(|e| format!("connect: {e}"))
124 })
125 .await
126 .map_err(CoreError::connection_failed)?;
127
128 let conn = pooled.conn.clone();
129 let remote_str = pooled.remote_id_str.clone();
130
131 let result = do_fetch(
132 handles,
133 conn,
134 &remote_str,
135 url,
136 http_method,
137 headers,
138 req_body_reader,
139 max_header_size,
140 max_response_body_bytes,
141 );
142
143 if let Some(notify) = cancel_notify {
144 tokio::select! {
145 _ = notify.notified() => Err(CoreError::cancelled()),
146 r = result => r,
147 }
148 } else {
149 result.await
150 }
151 }
152 .await;
153
154 if let Some(token) = fetch_token {
156 endpoint.handles().remove_fetch_token(token);
157 }
158
159 out
160}
161
162fn classify_hyper_error(e: &impl std::fmt::Display, context: &str) -> CoreError {
169 let msg = e.to_string();
170 if msg.to_ascii_lowercase().contains("header") {
177 CoreError::header_too_large(format!("{context}: {msg}"))
178 } else {
179 CoreError::connection_failed(format!("{context}: {msg}"))
180 }
181}
182
183#[allow(clippy::too_many_arguments)]
184async fn do_fetch(
185 handles: &HandleStore,
186 conn: iroh::endpoint::Connection,
187 remote_str: &str,
188 url: &str,
189 method: Method,
190 headers: &[(String, String)],
191 req_body_reader: Option<BodyReader>,
192 max_header_size: usize,
193 max_response_body_bytes: usize,
194) -> Result<FfiResponse, CoreError> {
195 let (send, recv) = conn
196 .open_bi()
197 .await
198 .map_err(|e| CoreError::connection_failed(format!("open_bi: {e}")))?;
199
200 let io = TokioIo::new(IrohStream::new(send, recv));
201
202 #[allow(unused_mut)] let (mut sender, conn_task) = hyper::client::conn::http1::Builder::new()
204 .max_buf_size(max_header_size.max(8192))
209 .max_headers(128)
210 .handshake::<_, BoxBody>(io)
211 .await
212 .map_err(|e| CoreError::connection_failed(format!("hyper handshake: {e}")))?;
213
214 tokio::spawn(conn_task);
216
217 let path = extract_path(url);
218
219 let mut req_builder = hyper::Request::builder()
221 .method(method)
222 .uri(&path)
223 .header(hyper::header::HOST, remote_str);
224
225 #[cfg(feature = "compression")]
230 {
231 let has_accept_encoding = headers
232 .iter()
233 .any(|(k, _)| k.eq_ignore_ascii_case("accept-encoding"));
234 if !has_accept_encoding {
235 req_builder = req_builder.header("accept-encoding", "zstd");
236 }
237 }
238
239 for (k, v) in headers {
240 req_builder = req_builder.header(k.as_str(), v.as_str());
241 }
242
243 let req_body: BoxBody = if let Some(reader) = req_body_reader {
244 crate::box_body(body_from_reader(reader))
245 } else {
246 crate::box_body(http_body_util::Empty::new())
247 };
248
249 let req = req_builder
250 .body(req_body)
251 .map_err(|e| CoreError::internal(format!("build request: {e}")))?;
252
253 #[cfg(feature = "compression")]
256 let resp = {
257 use tower::ServiceExt;
258 let svc = tower::ServiceBuilder::new()
259 .layer(tower_http::decompression::DecompressionLayer::new())
260 .service(HyperClientSvc(sender));
261 svc.oneshot(req)
262 .await
263 .map_err(|e| classify_hyper_error(&e, "send_request"))?
264 };
265 #[cfg(not(feature = "compression"))]
266 let resp = sender
267 .send_request(req)
268 .await
269 .map_err(|e| classify_hyper_error(&e, "send_request"))?;
270
271 let status = resp.status().as_u16();
272 let header_bytes: usize = resp
275 .headers()
276 .iter()
277 .map(|(k, v)| {
278 k.as_str()
279 .len()
280 .saturating_add(v.as_bytes().len())
281 .saturating_add(4) })
283 .fold(16usize, |acc, x| acc.saturating_add(x)); if header_bytes > max_header_size {
285 return Err(CoreError::header_too_large(format!(
286 "response header size {header_bytes} exceeds limit {max_header_size}"
287 )));
288 }
289
290 let mut resp_headers: Vec<(String, String)> = Vec::new();
291 for (k, v) in resp.headers().iter() {
292 match v.to_str() {
293 Ok(s) => resp_headers.push((k.as_str().to_string(), s.to_string())),
294 Err(_) => {
295 return Err(CoreError::invalid_input(format!(
296 "non-UTF8 response header value for '{}'",
297 k.as_str()
298 )));
299 }
300 }
301 }
302
303 let response_url = format!("httpi://{remote_str}{path}");
304
305 if is_null_body_status(status) {
311 drop(resp.into_body());
315 return Ok(FfiResponse {
316 status,
317 headers: resp_headers,
318 body_handle: 0,
319 url: response_url,
320 });
321 }
322
323 let mut guard = handles.insert_guard();
325
326 let (res_writer, res_reader) = handles.make_body_channel();
327 let body = resp.into_body();
328 let frame_timeout = res_writer.drain_timeout;
329 tokio::spawn(pump_hyper_body_to_channel_limited(
330 body,
331 res_writer,
332 Some(max_response_body_bytes),
333 frame_timeout,
334 None,
335 ));
336
337 let body_handle = guard.insert_reader(res_reader)?;
338
339 guard.commit();
340 Ok(FfiResponse {
341 status,
342 headers: resp_headers,
343 body_handle,
344 url: response_url,
345 })
346}
347
348#[inline]
352fn is_null_body_status(status: u16) -> bool {
353 status == 204 || status == 205 || status == 304
354}
355
356#[allow(dead_code)]
361pub(crate) async fn pump_hyper_body_to_channel<B>(body: B, writer: BodyWriter)
362where
363 B: http_body::Body<Data = Bytes>,
364 B::Error: std::fmt::Debug,
365{
366 let timeout = writer.drain_timeout;
367 pump_hyper_body_to_channel_limited(body, writer, None, timeout, None).await;
368}
369
370pub(crate) async fn pump_hyper_body_to_channel_limited<B>(
378 body: B,
379 writer: BodyWriter,
380 max_bytes: Option<usize>,
381 frame_timeout: std::time::Duration,
382 mut overflow_tx: Option<tokio::sync::oneshot::Sender<()>>,
383) where
384 B: http_body::Body<Data = Bytes>,
385 B::Error: std::fmt::Debug,
386{
387 let mut body = Box::pin(body);
389 let mut total = 0usize;
390 let mut overflowed = false;
394
395 loop {
396 let frame_result = match tokio::time::timeout(frame_timeout, body.frame()).await {
397 Err(_elapsed) => {
398 tracing::warn!("iroh-http: body frame read timed out after {frame_timeout:?}");
399 break;
400 }
401 Ok(None) => break,
402 Ok(Some(r)) => r,
403 };
404 match frame_result {
405 Err(e) => {
406 tracing::warn!("iroh-http: body frame error: {e:?}");
407 break;
408 }
409 Ok(frame) => {
410 if overflowed {
411 continue;
415 }
416 if frame.is_data() {
417 let data = frame.into_data().expect("is_data checked above");
418 total = total.saturating_add(data.len());
419 if let Some(limit) = max_bytes {
420 if total > limit {
421 tracing::warn!("iroh-http: request body exceeded {limit} bytes");
422 if let Some(tx) = overflow_tx.take() {
424 let _ = tx.send(());
425 }
426 overflowed = true;
427 continue; }
429 }
430 if writer.send_chunk(data).await.is_err() {
431 return; }
433 }
434 }
435 }
436 }
437
438 drop(writer);
439}
440
441pub(crate) fn body_from_reader(
444 reader: BodyReader,
445) -> StreamBody<impl futures::Stream<Item = Result<Frame<Bytes>, std::convert::Infallible>>> {
446 use futures::stream;
447
448 let s = stream::unfold(reader, |reader| async move {
449 reader
450 .next_chunk()
451 .await
452 .map(|data| (Ok(Frame::data(data)), reader))
453 });
454
455 StreamBody::new(s)
456}
457
458pub(crate) fn extract_path(url: &str) -> String {
461 let raw = if let Some(idx) = url.find("://") {
462 let after_scheme = url.get(idx.saturating_add(3)..).unwrap_or("");
463 if let Some(slash) = after_scheme.find('/') {
464 after_scheme[slash..].to_string()
465 } else if let Some(q) = after_scheme.find('?') {
466 format!("/{}", &after_scheme[q..])
468 } else {
469 "/".to_string()
470 }
471 } else if url.starts_with('/') {
472 url.to_string()
473 } else {
474 format!("/{url}")
475 };
476
477 match raw.find('#') {
480 Some(pos) => raw[..pos].to_string(),
481 None => raw,
482 }
483}
484
485pub async fn raw_connect(
489 endpoint: &IrohEndpoint,
490 remote_node_id: &str,
491 path: &str,
492 headers: &[(String, String)],
493) -> Result<FfiDuplexStream, CoreError> {
494 for (name, value) in headers {
496 HeaderName::from_bytes(name.as_bytes())
497 .map_err(|_| CoreError::invalid_input(format!("invalid header name {:?}", name)))?;
498 HeaderValue::from_str(value).map_err(|_| {
499 CoreError::invalid_input(format!("invalid header value for {:?}", name))
500 })?;
501 }
502
503 let parsed = parse_node_addr(remote_node_id)?;
504 let node_id = parsed.node_id;
505 let mut addr = iroh::EndpointAddr::new(node_id);
506 for a in &parsed.direct_addrs {
507 addr = addr.with_ip_addr(*a);
508 }
509
510 let ep_raw = endpoint.raw().clone();
511 let addr_clone = addr.clone();
512 let max_header_size = endpoint.max_header_size();
513 let handles = endpoint.handles();
514
515 let pooled = endpoint
516 .pool()
517 .get_or_connect(node_id, ALPN_DUPLEX, || async move {
518 ep_raw
519 .connect(addr_clone, ALPN_DUPLEX)
520 .await
521 .map_err(|e| format!("connect duplex: {e}"))
522 })
523 .await
524 .map_err(CoreError::connection_failed)?;
525
526 let (send, recv) = pooled
527 .conn
528 .open_bi()
529 .await
530 .map_err(|e| CoreError::connection_failed(format!("open_bi: {e}")))?;
531 let io = TokioIo::new(IrohStream::new(send, recv));
532
533 let (mut sender, conn_task) = hyper::client::conn::http1::Builder::new()
534 .max_buf_size(max_header_size.max(8192))
535 .handshake::<_, BoxBody>(io)
536 .await
537 .map_err(|e| CoreError::connection_failed(format!("hyper handshake (duplex): {e}")))?;
538
539 tokio::spawn(conn_task);
540
541 let mut req_builder = hyper::Request::builder()
544 .method(Method::from_bytes(b"CONNECT").expect("CONNECT is a valid HTTP method"))
545 .uri(path)
546 .header(hyper::header::CONNECTION, "upgrade")
547 .header(hyper::header::UPGRADE, "iroh-duplex");
548
549 for (k, v) in headers {
550 req_builder = req_builder.header(k.as_str(), v.as_str());
551 }
552
553 let req = req_builder
554 .body(crate::box_body(http_body_util::Empty::new()))
555 .map_err(|e| CoreError::internal(format!("build duplex request: {e}")))?;
556
557 let resp = sender
558 .send_request(req)
559 .await
560 .map_err(|e| CoreError::connection_failed(format!("send duplex request: {e}")))?;
561
562 let status = resp.status();
563 if status != StatusCode::SWITCHING_PROTOCOLS {
564 return Err(CoreError::peer_rejected(format!(
567 "server rejected duplex: expected 101, got {status}"
568 )));
569 }
570
571 let upgraded = hyper::upgrade::on(resp)
573 .await
574 .map_err(|e| CoreError::connection_failed(format!("upgrade error: {e}")))?;
575
576 let (server_write, server_read) = handles.make_body_channel();
577 let (client_write, client_read) = handles.make_body_channel();
578
579 let read_handle = handles.insert_reader(server_read)?;
580 let write_handle = handles.insert_writer(client_write)?;
581
582 let io = TokioIo::new(upgraded);
584 tokio::spawn(crate::stream::pump_duplex(io, server_write, client_read));
585
586 Ok(FfiDuplexStream {
587 read_handle,
588 write_handle,
589 })
590}
591
592#[cfg(test)]
593mod tests {
594 use super::extract_path;
595
596 #[test]
597 fn extract_path_basic() {
598 assert_eq!(extract_path("httpi://node/foo/bar"), "/foo/bar");
599 assert_eq!(extract_path("httpi://node/"), "/");
600 assert_eq!(extract_path("httpi://node"), "/");
601 }
602
603 #[test]
604 fn extract_path_query_string() {
605 assert_eq!(extract_path("httpi://node/path?x=1"), "/path?x=1");
606 assert_eq!(extract_path("httpi://node?x=1"), "/?x=1");
607 }
608
609 #[test]
610 fn extract_path_fragment() {
611 assert_eq!(extract_path("httpi://node/path#frag"), "/path");
613 assert_eq!(extract_path("httpi://node/path?q=1#frag"), "/path?q=1");
614 assert_eq!(extract_path("/local#frag"), "/local");
615 }
616
617 #[test]
618 fn extract_path_bare_path() {
619 assert_eq!(extract_path("/already"), "/already");
620 assert_eq!(extract_path("no-slash"), "/no-slash");
621 }
622}