1use bytes::Bytes;
7use http::{HeaderName, HeaderValue, Method, StatusCode};
8use http_body_util::{BodyExt, StreamBody};
9use hyper::body::Frame;
10use hyper_util::rt::TokioIo;
11
12use crate::{
13 io::IrohStream,
14 parse_node_addr,
15 stream::{BodyReader, BodyWriter, HandleStore},
16 CoreError, FfiDuplexStream, FfiResponse, IrohEndpoint, ALPN, ALPN_DUPLEX,
17};
18
19use crate::BoxBody;
22
23#[cfg(feature = "compression")]
28struct HyperClientSvc(hyper::client::conn::http1::SendRequest<BoxBody>);
29
30#[cfg(feature = "compression")]
31impl tower::Service<hyper::Request<BoxBody>> for HyperClientSvc {
32 type Response = hyper::Response<hyper::body::Incoming>;
33 type Error = hyper::Error;
34 type Future = std::pin::Pin<
35 Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>,
36 >;
37
38 fn poll_ready(
39 &mut self,
40 cx: &mut std::task::Context<'_>,
41 ) -> std::task::Poll<Result<(), Self::Error>> {
42 self.0.poll_ready(cx)
43 }
44
45 fn call(&mut self, req: hyper::Request<BoxBody>) -> Self::Future {
46 Box::pin(self.0.send_request(req))
47 }
48}
49
50#[allow(clippy::too_many_arguments)]
57pub async fn fetch(
58 endpoint: &IrohEndpoint,
59 remote_node_id: &str,
60 url: &str,
61 method: &str,
62 headers: &[(String, String)],
63 req_body_reader: Option<BodyReader>,
64 req_trailer_sender_handle: Option<u64>,
65 fetch_token: Option<u64>,
66 direct_addrs: Option<&[std::net::SocketAddr]>,
67) -> Result<FfiResponse, CoreError> {
68 {
70 let lower = url.to_ascii_lowercase();
71 if lower.starts_with("https://") || lower.starts_with("http://") {
72 let scheme_end = lower.find("://").map(|i| i + 3).unwrap_or(lower.len());
73 return Err(CoreError::invalid_input(format!(
74 "iroh-http URLs must use the \"httpi://\" scheme, not \"{}\". \
75 Example: httpi://nodeId/path",
76 &url[..scheme_end]
77 )));
78 }
79 }
80
81 let http_method = Method::from_bytes(method.as_bytes())
83 .map_err(|_| CoreError::invalid_input(format!("invalid HTTP method {:?}", method)))?;
84 for (name, value) in headers {
85 HeaderName::from_bytes(name.as_bytes())
86 .map_err(|_| CoreError::invalid_input(format!("invalid header name {:?}", name)))?;
87 HeaderValue::from_str(value).map_err(|_| {
88 CoreError::invalid_input(format!("invalid header value for {:?}", name))
89 })?;
90 }
91
92 let cancel_notify = fetch_token.and_then(|t| endpoint.handles().get_fetch_cancel_notify(t));
93 let handles = endpoint.handles();
94
95 let req_trailer_rx = req_trailer_sender_handle
97 .and_then(|h| if h == 0 { None } else { handles.claim_pending_trailer_rx(h) });
98
99 let parsed = parse_node_addr(remote_node_id)?;
100 let node_id = parsed.node_id;
101 let mut addr = iroh::EndpointAddr::new(node_id);
102 for a in &parsed.direct_addrs {
103 addr = addr.with_ip_addr(*a);
104 }
105 if let Some(addrs) = direct_addrs {
106 for a in addrs {
107 addr = addr.with_ip_addr(*a);
108 }
109 }
110
111 let ep_raw = endpoint.raw().clone();
112 let addr_clone = addr.clone();
113 let max_header_size = endpoint.max_header_size();
114
115 let pooled = endpoint
116 .pool()
117 .get_or_connect(node_id, ALPN, || async move {
118 ep_raw
119 .connect(addr_clone, ALPN)
120 .await
121 .map_err(|e| format!("connect: {e}"))
122 })
123 .await
124 .map_err(CoreError::connection_failed)?;
125
126 let conn = pooled.conn.clone();
127 let remote_str = pooled.remote_id_str.clone();
128
129 let result = do_fetch(
130 handles,
131 conn,
132 &remote_str,
133 url,
134 http_method,
135 headers,
136 req_body_reader,
137 req_trailer_rx,
138 max_header_size,
139 );
140
141 let out = if let Some(notify) = cancel_notify {
142 tokio::select! {
143 _ = notify.notified() => Err(CoreError::cancelled()),
144 r = result => r,
145 }
146 } else {
147 result.await
148 };
149
150 if let Some(token) = fetch_token {
152 endpoint.handles().remove_fetch_token(token);
153 }
154
155 out
156}
157
158#[allow(clippy::too_many_arguments)]
159async fn do_fetch(
160 handles: &HandleStore,
161 conn: iroh::endpoint::Connection,
162 remote_str: &str,
163 url: &str,
164 method: Method,
165 headers: &[(String, String)],
166 req_body_reader: Option<BodyReader>,
167 req_trailer_rx: Option<crate::stream::TrailerRx>,
168 max_header_size: usize,
169) -> Result<FfiResponse, CoreError> {
170 let (send, recv) = conn
171 .open_bi()
172 .await
173 .map_err(|e| CoreError::connection_failed(format!("open_bi: {e}")))?;
174
175 let io = TokioIo::new(IrohStream::new(send, recv));
176
177 #[allow(unused_mut)] let (mut sender, conn_task) = hyper::client::conn::http1::Builder::new()
179 .max_buf_size(max_header_size.max(8192))
184 .max_headers(128)
185 .handshake::<_, BoxBody>(io)
186 .await
187 .map_err(|e| CoreError::connection_failed(format!("hyper handshake: {e}")))?;
188
189 tokio::spawn(conn_task);
191
192 let path = extract_path(url);
193
194 let mut req_builder = hyper::Request::builder()
196 .method(method)
197 .uri(&path)
198 .header(hyper::header::HOST, remote_str)
199 .header("te", "trailers");
201
202 #[cfg(feature = "compression")]
207 {
208 let has_accept_encoding = headers
209 .iter()
210 .any(|(k, _)| k.eq_ignore_ascii_case("accept-encoding"));
211 if !has_accept_encoding {
212 req_builder = req_builder.header("accept-encoding", "zstd");
213 }
214 }
215
216 for (k, v) in headers {
217 req_builder = req_builder.header(k.as_str(), v.as_str());
218 }
219
220 let req_body: BoxBody = if let Some(reader) = req_body_reader {
221 crate::box_body(body_from_reader(reader, req_trailer_rx))
223 } else {
224 crate::box_body(http_body_util::Empty::new())
225 };
226
227 let req = req_builder
228 .body(req_body)
229 .map_err(|e| CoreError::internal(format!("build request: {e}")))?;
230
231 #[cfg(feature = "compression")]
234 let resp = {
235 use tower::ServiceExt;
236 let svc = tower::ServiceBuilder::new()
237 .layer(tower_http::decompression::DecompressionLayer::new())
238 .service(HyperClientSvc(sender));
239 svc.oneshot(req)
240 .await
241 .map_err(|e| CoreError::connection_failed(format!("send_request: {e}")))?
242 };
243 #[cfg(not(feature = "compression"))]
244 let resp = sender
245 .send_request(req)
246 .await
247 .map_err(|e| CoreError::connection_failed(format!("send_request: {e}")))?;
248
249 let status = resp.status().as_u16();
250 let header_bytes: usize = resp
253 .headers()
254 .iter()
255 .map(|(k, v)| k.as_str().len() + 2 + v.as_bytes().len() + 2) .sum::<usize>()
257 + 16; if header_bytes > max_header_size {
259 return Err(CoreError::header_too_large(format!(
260 "response header size {header_bytes} exceeds limit {max_header_size}"
261 )));
262 }
263
264 let mut resp_headers: Vec<(String, String)> = Vec::new();
265 for (k, v) in resp.headers().iter() {
266 match v.to_str() {
267 Ok(s) => resp_headers.push((k.as_str().to_string(), s.to_string())),
268 Err(_) => {
269 return Err(CoreError::invalid_input(format!(
270 "non-UTF8 response header value for '{}'",
271 k.as_str()
272 )));
273 }
274 }
275 }
276
277 let mut guard = handles.insert_guard();
279 let (trailer_tx, trailer_rx) = tokio::sync::oneshot::channel::<Vec<(String, String)>>();
280 let trailer_handle = guard.insert_trailer_receiver(trailer_rx)?;
281
282 let (res_writer, res_reader) = handles.make_body_channel();
283 let body = resp.into_body();
284 tokio::spawn(pump_hyper_body_to_channel(body, res_writer, trailer_tx));
285
286 let body_handle = guard.insert_reader(res_reader)?;
287 let response_url = format!("httpi://{remote_str}{path}");
288
289 guard.commit();
290 Ok(FfiResponse {
291 status,
292 headers: resp_headers,
293 body_handle,
294 url: response_url,
295 trailers_handle: trailer_handle,
296 })
297}
298
299pub(crate) async fn pump_hyper_body_to_channel<B>(
304 body: B,
305 writer: BodyWriter,
306 trailer_tx: tokio::sync::oneshot::Sender<Vec<(String, String)>>,
307) where
308 B: http_body::Body<Data = Bytes>,
309 B::Error: std::fmt::Debug,
310{
311 let timeout = writer.drain_timeout;
312 pump_hyper_body_to_channel_limited(body, writer, trailer_tx, None, timeout, None).await;
313}
314
315pub(crate) async fn pump_hyper_body_to_channel_limited<B>(
323 body: B,
324 writer: BodyWriter,
325 trailer_tx: tokio::sync::oneshot::Sender<Vec<(String, String)>>,
326 max_bytes: Option<usize>,
327 frame_timeout: std::time::Duration,
328 overflow_tx: Option<tokio::sync::oneshot::Sender<()>>,
329) where
330 B: http_body::Body<Data = Bytes>,
331 B::Error: std::fmt::Debug,
332{
333 let mut body = Box::pin(body);
335 let mut total = 0usize;
336 let mut trailers_vec: Vec<(String, String)> = Vec::new();
337
338 loop {
339 let frame_result = match tokio::time::timeout(frame_timeout, body.frame()).await {
340 Err(_elapsed) => {
341 tracing::warn!("iroh-http: body frame read timed out after {frame_timeout:?}");
342 break;
343 }
344 Ok(None) => break,
345 Ok(Some(r)) => r,
346 };
347 match frame_result {
348 Err(e) => {
349 tracing::warn!("iroh-http: body frame error: {e:?}");
350 break;
351 }
352 Ok(frame) => {
353 if frame.is_data() {
354 let data = frame.into_data().expect("is_data checked above");
355 total += data.len();
356 if let Some(limit) = max_bytes {
357 if total > limit {
358 tracing::warn!("iroh-http: request body exceeded {limit} bytes");
359 if let Some(tx) = overflow_tx {
361 let _ = tx.send(());
362 }
363 break;
364 }
365 }
366 if writer.send_chunk(data).await.is_err() {
367 return; }
369 } else if frame.is_trailers() {
370 let hdrs = frame.into_trailers().expect("is_trailers checked above");
371 trailers_vec = hdrs
372 .iter()
373 .filter_map(|(k, v)| match v.to_str() {
374 Ok(s) => Some((k.as_str().to_string(), s.to_string())),
375 Err(_) => {
376 tracing::warn!(
377 "iroh-http: dropping non-UTF8 trailer value for '{}'",
378 k.as_str()
379 );
380 None
381 }
382 })
383 .collect();
384 }
385 }
386 }
387 }
388
389 drop(writer);
390 let _ = trailer_tx.send(trailers_vec);
391}
392
393pub(crate) fn body_from_reader(
396 reader: BodyReader,
397 trailer_rx: Option<tokio::sync::oneshot::Receiver<Vec<(String, String)>>>,
398) -> StreamBody<impl futures::Stream<Item = Result<Frame<Bytes>, std::convert::Infallible>>> {
399 use futures::stream;
400
401 let s = stream::unfold(
403 (reader, trailer_rx, false),
404 |(reader, trailer_rx, done)| async move {
405 if done {
406 return None;
407 }
408 match reader.next_chunk().await {
409 Some(data) => Some((Ok(Frame::data(data)), (reader, trailer_rx, false))),
410 None => {
411 if let Some(rx) = trailer_rx {
413 let timeout = reader.drain_timeout;
416 match tokio::time::timeout(timeout, rx).await {
417 Ok(Ok(trailers)) => {
418 let mut map = http::HeaderMap::new();
419 for (k, v) in trailers {
420 if let (Ok(name), Ok(val)) = (
421 HeaderName::from_bytes(k.as_bytes()),
422 HeaderValue::from_str(&v),
423 ) {
424 map.append(name, val);
425 }
426 }
427 if !map.is_empty() {
428 return Some((Ok(Frame::trailers(map)), (reader, None, true)));
429 }
430 }
431 Ok(Err(_)) => {
432 }
434 Err(_) => {
435 tracing::warn!(
436 "iroh-http: trailer wait timed out after {timeout:?}; \
437 completing body without trailers"
438 );
439 }
440 }
441 }
442 None
443 }
444 }
445 },
446 );
447
448 StreamBody::new(s)
449}
450
451pub(crate) fn extract_path(url: &str) -> String {
454 let raw = if let Some(idx) = url.find("://") {
455 let after_scheme = &url[idx + 3..];
456 if let Some(slash) = after_scheme.find('/') {
457 after_scheme[slash..].to_string()
458 } else if let Some(q) = after_scheme.find('?') {
459 format!("/{}", &after_scheme[q..])
461 } else {
462 "/".to_string()
463 }
464 } else if url.starts_with('/') {
465 url.to_string()
466 } else {
467 format!("/{url}")
468 };
469
470 match raw.find('#') {
473 Some(pos) => raw[..pos].to_string(),
474 None => raw,
475 }
476}
477
478pub async fn raw_connect(
482 endpoint: &IrohEndpoint,
483 remote_node_id: &str,
484 path: &str,
485 headers: &[(String, String)],
486) -> Result<FfiDuplexStream, CoreError> {
487 for (name, value) in headers {
489 HeaderName::from_bytes(name.as_bytes())
490 .map_err(|_| CoreError::invalid_input(format!("invalid header name {:?}", name)))?;
491 HeaderValue::from_str(value).map_err(|_| {
492 CoreError::invalid_input(format!("invalid header value for {:?}", name))
493 })?;
494 }
495
496 let parsed = parse_node_addr(remote_node_id)?;
497 let node_id = parsed.node_id;
498 let mut addr = iroh::EndpointAddr::new(node_id);
499 for a in &parsed.direct_addrs {
500 addr = addr.with_ip_addr(*a);
501 }
502
503 let ep_raw = endpoint.raw().clone();
504 let addr_clone = addr.clone();
505 let max_header_size = endpoint.max_header_size();
506 let handles = endpoint.handles();
507
508 let pooled = endpoint
509 .pool()
510 .get_or_connect(node_id, ALPN_DUPLEX, || async move {
511 ep_raw
512 .connect(addr_clone, ALPN_DUPLEX)
513 .await
514 .map_err(|e| format!("connect duplex: {e}"))
515 })
516 .await
517 .map_err(CoreError::connection_failed)?;
518
519 let (send, recv) = pooled
520 .conn
521 .open_bi()
522 .await
523 .map_err(|e| CoreError::connection_failed(format!("open_bi: {e}")))?;
524 let io = TokioIo::new(IrohStream::new(send, recv));
525
526 let (mut sender, conn_task) = hyper::client::conn::http1::Builder::new()
527 .max_buf_size(max_header_size.max(8192))
528 .handshake::<_, BoxBody>(io)
529 .await
530 .map_err(|e| CoreError::connection_failed(format!("hyper handshake (duplex): {e}")))?;
531
532 tokio::spawn(conn_task);
533
534 let mut req_builder = hyper::Request::builder()
537 .method(Method::from_bytes(b"CONNECT").unwrap())
538 .uri(path)
539 .header(hyper::header::CONNECTION, "upgrade")
540 .header(hyper::header::UPGRADE, "iroh-duplex");
541
542 for (k, v) in headers {
543 req_builder = req_builder.header(k.as_str(), v.as_str());
544 }
545
546 let req = req_builder
547 .body(crate::box_body(http_body_util::Empty::new()))
548 .map_err(|e| CoreError::internal(format!("build duplex request: {e}")))?;
549
550 let resp = sender
551 .send_request(req)
552 .await
553 .map_err(|e| CoreError::connection_failed(format!("send duplex request: {e}")))?;
554
555 let status = resp.status();
556 if status != StatusCode::SWITCHING_PROTOCOLS {
557 return Err(CoreError::peer_rejected(format!(
560 "server rejected duplex: expected 101, got {status}"
561 )));
562 }
563
564 let upgraded = hyper::upgrade::on(resp)
566 .await
567 .map_err(|e| CoreError::connection_failed(format!("upgrade error: {e}")))?;
568
569 let (server_write, server_read) = handles.make_body_channel();
570 let (client_write, client_read) = handles.make_body_channel();
571
572 let read_handle = handles.insert_reader(server_read)?;
573 let write_handle = handles.insert_writer(client_write)?;
574
575 let io = TokioIo::new(upgraded);
577 tokio::spawn(crate::stream::pump_duplex(io, server_write, client_read));
578
579 Ok(FfiDuplexStream {
580 read_handle,
581 write_handle,
582 })
583}
584
585#[cfg(test)]
586mod tests {
587 use super::extract_path;
588
589 #[test]
590 fn extract_path_basic() {
591 assert_eq!(extract_path("httpi://node/foo/bar"), "/foo/bar");
592 assert_eq!(extract_path("httpi://node/"), "/");
593 assert_eq!(extract_path("httpi://node"), "/");
594 }
595
596 #[test]
597 fn extract_path_query_string() {
598 assert_eq!(extract_path("httpi://node/path?x=1"), "/path?x=1");
599 assert_eq!(extract_path("httpi://node?x=1"), "/?x=1");
600 }
601
602 #[test]
603 fn extract_path_fragment() {
604 assert_eq!(extract_path("httpi://node/path#frag"), "/path");
606 assert_eq!(extract_path("httpi://node/path?q=1#frag"), "/path?q=1");
607 assert_eq!(extract_path("/local#frag"), "/local");
608 }
609
610 #[test]
611 fn extract_path_bare_path() {
612 assert_eq!(extract_path("/already"), "/already");
613 assert_eq!(extract_path("no-slash"), "/no-slash");
614 }
615}