1use bytes::Bytes;
7use http::{HeaderName, HeaderValue, Method, StatusCode};
8use http_body_util::{BodyExt, StreamBody};
9use hyper::body::Frame;
10use hyper_util::rt::TokioIo;
11
12use crate::{
13 io::IrohStream,
14 parse_node_addr,
15 stream::{BodyReader, BodyWriter, HandleStore},
16 CoreError, FfiDuplexStream, FfiResponse, IrohEndpoint, ALPN, ALPN_DUPLEX,
17};
18
19use crate::BoxBody;
22
23#[cfg(feature = "compression")]
28struct HyperClientSvc(hyper::client::conn::http1::SendRequest<BoxBody>);
29
30#[cfg(feature = "compression")]
31impl tower::Service<hyper::Request<BoxBody>> for HyperClientSvc {
32 type Response = hyper::Response<hyper::body::Incoming>;
33 type Error = hyper::Error;
34 type Future = std::pin::Pin<
35 Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>,
36 >;
37
38 fn poll_ready(
39 &mut self,
40 cx: &mut std::task::Context<'_>,
41 ) -> std::task::Poll<Result<(), Self::Error>> {
42 self.0.poll_ready(cx)
43 }
44
45 fn call(&mut self, req: hyper::Request<BoxBody>) -> Self::Future {
46 Box::pin(self.0.send_request(req))
47 }
48}
49
50#[allow(clippy::too_many_arguments)]
57pub async fn fetch(
58 endpoint: &IrohEndpoint,
59 remote_node_id: &str,
60 url: &str,
61 method: &str,
62 headers: &[(String, String)],
63 req_body_reader: Option<BodyReader>,
64 req_trailer_sender_handle: Option<u64>,
65 fetch_token: Option<u64>,
66 direct_addrs: Option<&[std::net::SocketAddr]>,
67) -> Result<FfiResponse, CoreError> {
68 {
70 let lower = url.to_ascii_lowercase();
71 if lower.starts_with("https://") || lower.starts_with("http://") {
72 let scheme_end = lower
73 .find("://")
74 .map(|i| i.saturating_add(3))
75 .unwrap_or(lower.len());
76 return Err(CoreError::invalid_input(format!(
77 "iroh-http URLs must use the \"httpi://\" scheme, not \"{}\". \
78 Example: httpi://nodeId/path",
79 &url[..scheme_end]
80 )));
81 }
82 }
83
84 let http_method = Method::from_bytes(method.as_bytes())
86 .map_err(|_| CoreError::invalid_input(format!("invalid HTTP method {:?}", method)))?;
87 for (name, value) in headers {
88 HeaderName::from_bytes(name.as_bytes())
89 .map_err(|_| CoreError::invalid_input(format!("invalid header name {:?}", name)))?;
90 HeaderValue::from_str(value).map_err(|_| {
91 CoreError::invalid_input(format!("invalid header value for {:?}", name))
92 })?;
93 }
94
95 let cancel_notify = fetch_token.and_then(|t| endpoint.handles().get_fetch_cancel_notify(t));
96 let handles = endpoint.handles();
97
98 let req_trailer_rx = req_trailer_sender_handle.and_then(|h| {
100 if h == 0 {
101 None
102 } else {
103 handles.claim_pending_trailer_rx(h)
104 }
105 });
106
107 let parsed = parse_node_addr(remote_node_id)?;
108 let node_id = parsed.node_id;
109 let mut addr = iroh::EndpointAddr::new(node_id);
110 for a in &parsed.direct_addrs {
111 addr = addr.with_ip_addr(*a);
112 }
113 if let Some(addrs) = direct_addrs {
114 for a in addrs {
115 addr = addr.with_ip_addr(*a);
116 }
117 }
118
119 let ep_raw = endpoint.raw().clone();
120 let addr_clone = addr.clone();
121 let max_header_size = endpoint.max_header_size();
122
123 let pooled = endpoint
124 .pool()
125 .get_or_connect(node_id, ALPN, || async move {
126 ep_raw
127 .connect(addr_clone, ALPN)
128 .await
129 .map_err(|e| format!("connect: {e}"))
130 })
131 .await
132 .map_err(CoreError::connection_failed)?;
133
134 let conn = pooled.conn.clone();
135 let remote_str = pooled.remote_id_str.clone();
136
137 let result = do_fetch(
138 handles,
139 conn,
140 &remote_str,
141 url,
142 http_method,
143 headers,
144 req_body_reader,
145 req_trailer_rx,
146 max_header_size,
147 );
148
149 let out = if let Some(notify) = cancel_notify {
150 tokio::select! {
151 _ = notify.notified() => Err(CoreError::cancelled()),
152 r = result => r,
153 }
154 } else {
155 result.await
156 };
157
158 if let Some(token) = fetch_token {
160 endpoint.handles().remove_fetch_token(token);
161 }
162
163 out
164}
165
166#[allow(clippy::too_many_arguments)]
167async fn do_fetch(
168 handles: &HandleStore,
169 conn: iroh::endpoint::Connection,
170 remote_str: &str,
171 url: &str,
172 method: Method,
173 headers: &[(String, String)],
174 req_body_reader: Option<BodyReader>,
175 req_trailer_rx: Option<crate::stream::TrailerRx>,
176 max_header_size: usize,
177) -> Result<FfiResponse, CoreError> {
178 let (send, recv) = conn
179 .open_bi()
180 .await
181 .map_err(|e| CoreError::connection_failed(format!("open_bi: {e}")))?;
182
183 let io = TokioIo::new(IrohStream::new(send, recv));
184
185 #[allow(unused_mut)] let (mut sender, conn_task) = hyper::client::conn::http1::Builder::new()
187 .max_buf_size(max_header_size.max(8192))
192 .max_headers(128)
193 .handshake::<_, BoxBody>(io)
194 .await
195 .map_err(|e| CoreError::connection_failed(format!("hyper handshake: {e}")))?;
196
197 tokio::spawn(conn_task);
199
200 let path = extract_path(url);
201
202 let mut req_builder = hyper::Request::builder()
204 .method(method)
205 .uri(&path)
206 .header(hyper::header::HOST, remote_str)
207 .header("te", "trailers");
209
210 #[cfg(feature = "compression")]
215 {
216 let has_accept_encoding = headers
217 .iter()
218 .any(|(k, _)| k.eq_ignore_ascii_case("accept-encoding"));
219 if !has_accept_encoding {
220 req_builder = req_builder.header("accept-encoding", "zstd");
221 }
222 }
223
224 for (k, v) in headers {
225 req_builder = req_builder.header(k.as_str(), v.as_str());
226 }
227
228 let req_body: BoxBody = if let Some(reader) = req_body_reader {
229 crate::box_body(body_from_reader(reader, req_trailer_rx))
231 } else {
232 crate::box_body(http_body_util::Empty::new())
233 };
234
235 let req = req_builder
236 .body(req_body)
237 .map_err(|e| CoreError::internal(format!("build request: {e}")))?;
238
239 #[cfg(feature = "compression")]
242 let resp = {
243 use tower::ServiceExt;
244 let svc = tower::ServiceBuilder::new()
245 .layer(tower_http::decompression::DecompressionLayer::new())
246 .service(HyperClientSvc(sender));
247 svc.oneshot(req)
248 .await
249 .map_err(|e| CoreError::connection_failed(format!("send_request: {e}")))?
250 };
251 #[cfg(not(feature = "compression"))]
252 let resp = sender
253 .send_request(req)
254 .await
255 .map_err(|e| CoreError::connection_failed(format!("send_request: {e}")))?;
256
257 let status = resp.status().as_u16();
258 let header_bytes: usize = resp
261 .headers()
262 .iter()
263 .map(|(k, v)| {
264 k.as_str()
265 .len()
266 .saturating_add(v.as_bytes().len())
267 .saturating_add(4) })
269 .fold(16usize, |acc, x| acc.saturating_add(x)); if header_bytes > max_header_size {
271 return Err(CoreError::header_too_large(format!(
272 "response header size {header_bytes} exceeds limit {max_header_size}"
273 )));
274 }
275
276 let mut resp_headers: Vec<(String, String)> = Vec::new();
277 for (k, v) in resp.headers().iter() {
278 match v.to_str() {
279 Ok(s) => resp_headers.push((k.as_str().to_string(), s.to_string())),
280 Err(_) => {
281 return Err(CoreError::invalid_input(format!(
282 "non-UTF8 response header value for '{}'",
283 k.as_str()
284 )));
285 }
286 }
287 }
288
289 let mut guard = handles.insert_guard();
291 let (trailer_tx, trailer_rx) = tokio::sync::oneshot::channel::<Vec<(String, String)>>();
292 let trailer_handle = guard.insert_trailer_receiver(trailer_rx)?;
293
294 let (res_writer, res_reader) = handles.make_body_channel();
295 let body = resp.into_body();
296 tokio::spawn(pump_hyper_body_to_channel(body, res_writer, trailer_tx));
297
298 let body_handle = guard.insert_reader(res_reader)?;
299 let response_url = format!("httpi://{remote_str}{path}");
300
301 guard.commit();
302 Ok(FfiResponse {
303 status,
304 headers: resp_headers,
305 body_handle,
306 url: response_url,
307 trailers_handle: trailer_handle,
308 })
309}
310
311pub(crate) async fn pump_hyper_body_to_channel<B>(
316 body: B,
317 writer: BodyWriter,
318 trailer_tx: tokio::sync::oneshot::Sender<Vec<(String, String)>>,
319) where
320 B: http_body::Body<Data = Bytes>,
321 B::Error: std::fmt::Debug,
322{
323 let timeout = writer.drain_timeout;
324 pump_hyper_body_to_channel_limited(body, writer, trailer_tx, None, timeout, None).await;
325}
326
327pub(crate) async fn pump_hyper_body_to_channel_limited<B>(
335 body: B,
336 writer: BodyWriter,
337 trailer_tx: tokio::sync::oneshot::Sender<Vec<(String, String)>>,
338 max_bytes: Option<usize>,
339 frame_timeout: std::time::Duration,
340 overflow_tx: Option<tokio::sync::oneshot::Sender<()>>,
341) where
342 B: http_body::Body<Data = Bytes>,
343 B::Error: std::fmt::Debug,
344{
345 let mut body = Box::pin(body);
347 let mut total = 0usize;
348 let mut trailers_vec: Vec<(String, String)> = Vec::new();
349
350 loop {
351 let frame_result = match tokio::time::timeout(frame_timeout, body.frame()).await {
352 Err(_elapsed) => {
353 tracing::warn!("iroh-http: body frame read timed out after {frame_timeout:?}");
354 break;
355 }
356 Ok(None) => break,
357 Ok(Some(r)) => r,
358 };
359 match frame_result {
360 Err(e) => {
361 tracing::warn!("iroh-http: body frame error: {e:?}");
362 break;
363 }
364 Ok(frame) => {
365 if frame.is_data() {
366 let data = frame.into_data().expect("is_data checked above");
367 total = total.saturating_add(data.len());
368 if let Some(limit) = max_bytes {
369 if total > limit {
370 tracing::warn!("iroh-http: request body exceeded {limit} bytes");
371 if let Some(tx) = overflow_tx {
373 let _ = tx.send(());
374 }
375 break;
376 }
377 }
378 if writer.send_chunk(data).await.is_err() {
379 return; }
381 } else if frame.is_trailers() {
382 let hdrs = frame.into_trailers().expect("is_trailers checked above");
383 trailers_vec = hdrs
384 .iter()
385 .filter_map(|(k, v)| match v.to_str() {
386 Ok(s) => Some((k.as_str().to_string(), s.to_string())),
387 Err(_) => {
388 tracing::warn!(
389 "iroh-http: dropping non-UTF8 trailer value for '{}'",
390 k.as_str()
391 );
392 None
393 }
394 })
395 .collect();
396 }
397 }
398 }
399 }
400
401 drop(writer);
402 let _ = trailer_tx.send(trailers_vec);
403}
404
405pub(crate) fn body_from_reader(
408 reader: BodyReader,
409 trailer_rx: Option<tokio::sync::oneshot::Receiver<Vec<(String, String)>>>,
410) -> StreamBody<impl futures::Stream<Item = Result<Frame<Bytes>, std::convert::Infallible>>> {
411 use futures::stream;
412
413 let s = stream::unfold(
415 (reader, trailer_rx, false),
416 |(reader, trailer_rx, done)| async move {
417 if done {
418 return None;
419 }
420 match reader.next_chunk().await {
421 Some(data) => Some((Ok(Frame::data(data)), (reader, trailer_rx, false))),
422 None => {
423 if let Some(rx) = trailer_rx {
425 let timeout = reader.drain_timeout;
428 match tokio::time::timeout(timeout, rx).await {
429 Ok(Ok(trailers)) => {
430 let mut map = http::HeaderMap::new();
431 for (k, v) in trailers {
432 if let (Ok(name), Ok(val)) = (
433 HeaderName::from_bytes(k.as_bytes()),
434 HeaderValue::from_str(&v),
435 ) {
436 map.append(name, val);
437 }
438 }
439 if !map.is_empty() {
440 return Some((Ok(Frame::trailers(map)), (reader, None, true)));
441 }
442 }
443 Ok(Err(_)) => {
444 }
446 Err(_) => {
447 tracing::warn!(
448 "iroh-http: trailer wait timed out after {timeout:?}; \
449 completing body without trailers"
450 );
451 }
452 }
453 }
454 None
455 }
456 }
457 },
458 );
459
460 StreamBody::new(s)
461}
462
463pub(crate) fn extract_path(url: &str) -> String {
466 let raw = if let Some(idx) = url.find("://") {
467 let after_scheme = url.get(idx.saturating_add(3)..).unwrap_or("");
468 if let Some(slash) = after_scheme.find('/') {
469 after_scheme[slash..].to_string()
470 } else if let Some(q) = after_scheme.find('?') {
471 format!("/{}", &after_scheme[q..])
473 } else {
474 "/".to_string()
475 }
476 } else if url.starts_with('/') {
477 url.to_string()
478 } else {
479 format!("/{url}")
480 };
481
482 match raw.find('#') {
485 Some(pos) => raw[..pos].to_string(),
486 None => raw,
487 }
488}
489
490pub async fn raw_connect(
494 endpoint: &IrohEndpoint,
495 remote_node_id: &str,
496 path: &str,
497 headers: &[(String, String)],
498) -> Result<FfiDuplexStream, CoreError> {
499 for (name, value) in headers {
501 HeaderName::from_bytes(name.as_bytes())
502 .map_err(|_| CoreError::invalid_input(format!("invalid header name {:?}", name)))?;
503 HeaderValue::from_str(value).map_err(|_| {
504 CoreError::invalid_input(format!("invalid header value for {:?}", name))
505 })?;
506 }
507
508 let parsed = parse_node_addr(remote_node_id)?;
509 let node_id = parsed.node_id;
510 let mut addr = iroh::EndpointAddr::new(node_id);
511 for a in &parsed.direct_addrs {
512 addr = addr.with_ip_addr(*a);
513 }
514
515 let ep_raw = endpoint.raw().clone();
516 let addr_clone = addr.clone();
517 let max_header_size = endpoint.max_header_size();
518 let handles = endpoint.handles();
519
520 let pooled = endpoint
521 .pool()
522 .get_or_connect(node_id, ALPN_DUPLEX, || async move {
523 ep_raw
524 .connect(addr_clone, ALPN_DUPLEX)
525 .await
526 .map_err(|e| format!("connect duplex: {e}"))
527 })
528 .await
529 .map_err(CoreError::connection_failed)?;
530
531 let (send, recv) = pooled
532 .conn
533 .open_bi()
534 .await
535 .map_err(|e| CoreError::connection_failed(format!("open_bi: {e}")))?;
536 let io = TokioIo::new(IrohStream::new(send, recv));
537
538 let (mut sender, conn_task) = hyper::client::conn::http1::Builder::new()
539 .max_buf_size(max_header_size.max(8192))
540 .handshake::<_, BoxBody>(io)
541 .await
542 .map_err(|e| CoreError::connection_failed(format!("hyper handshake (duplex): {e}")))?;
543
544 tokio::spawn(conn_task);
545
546 let mut req_builder = hyper::Request::builder()
549 .method(Method::from_bytes(b"CONNECT").expect("CONNECT is a valid HTTP method"))
550 .uri(path)
551 .header(hyper::header::CONNECTION, "upgrade")
552 .header(hyper::header::UPGRADE, "iroh-duplex");
553
554 for (k, v) in headers {
555 req_builder = req_builder.header(k.as_str(), v.as_str());
556 }
557
558 let req = req_builder
559 .body(crate::box_body(http_body_util::Empty::new()))
560 .map_err(|e| CoreError::internal(format!("build duplex request: {e}")))?;
561
562 let resp = sender
563 .send_request(req)
564 .await
565 .map_err(|e| CoreError::connection_failed(format!("send duplex request: {e}")))?;
566
567 let status = resp.status();
568 if status != StatusCode::SWITCHING_PROTOCOLS {
569 return Err(CoreError::peer_rejected(format!(
572 "server rejected duplex: expected 101, got {status}"
573 )));
574 }
575
576 let upgraded = hyper::upgrade::on(resp)
578 .await
579 .map_err(|e| CoreError::connection_failed(format!("upgrade error: {e}")))?;
580
581 let (server_write, server_read) = handles.make_body_channel();
582 let (client_write, client_read) = handles.make_body_channel();
583
584 let read_handle = handles.insert_reader(server_read)?;
585 let write_handle = handles.insert_writer(client_write)?;
586
587 let io = TokioIo::new(upgraded);
589 tokio::spawn(crate::stream::pump_duplex(io, server_write, client_read));
590
591 Ok(FfiDuplexStream {
592 read_handle,
593 write_handle,
594 })
595}
596
597#[cfg(test)]
598mod tests {
599 use super::extract_path;
600
601 #[test]
602 fn extract_path_basic() {
603 assert_eq!(extract_path("httpi://node/foo/bar"), "/foo/bar");
604 assert_eq!(extract_path("httpi://node/"), "/");
605 assert_eq!(extract_path("httpi://node"), "/");
606 }
607
608 #[test]
609 fn extract_path_query_string() {
610 assert_eq!(extract_path("httpi://node/path?x=1"), "/path?x=1");
611 assert_eq!(extract_path("httpi://node?x=1"), "/?x=1");
612 }
613
614 #[test]
615 fn extract_path_fragment() {
616 assert_eq!(extract_path("httpi://node/path#frag"), "/path");
618 assert_eq!(extract_path("httpi://node/path?q=1#frag"), "/path?q=1");
619 assert_eq!(extract_path("/local#frag"), "/local");
620 }
621
622 #[test]
623 fn extract_path_bare_path() {
624 assert_eq!(extract_path("/already"), "/already");
625 assert_eq!(extract_path("no-slash"), "/no-slash");
626 }
627}