iroh_http_core/ffi/
fetch.rs1#![allow(clippy::disallowed_types)]
12
13use std::time::Duration;
14
15use http::{HeaderName, HeaderValue, Method};
16
17use crate::{
18 ffi::{handles::BodyReader, pumps::pump_hyper_body_to_channel_limited},
19 http::client::{fetch_request, FetchError},
20 parse_node_addr, Body, CoreError, FfiResponse, IrohEndpoint,
21};
22
23#[allow(clippy::too_many_arguments)]
28pub async fn fetch(
29 endpoint: &IrohEndpoint,
30 remote_node_id: &str,
31 url: &str,
32 method: &str,
33 headers: &[(String, String)],
34 req_body_reader: Option<BodyReader>,
35 fetch_token: Option<u64>,
36 direct_addrs: Option<&[std::net::SocketAddr]>,
37 timeout: Option<Duration>,
38 decompress: bool,
39 max_response_body_bytes: Option<usize>,
40) -> Result<FfiResponse, CoreError> {
41 {
43 let lower = url.to_ascii_lowercase();
44 if lower.starts_with("https://") || lower.starts_with("http://") {
45 let scheme_end = lower
46 .find("://")
47 .map(|i| i.saturating_add(3))
48 .unwrap_or(lower.len());
49 return Err(CoreError::invalid_input(format!(
50 "iroh-http URLs must use the \"httpi://\" scheme, not \"{}\". \
51 Example: httpi://nodeId/path",
52 &url[..scheme_end]
53 )));
54 }
55 }
56
57 let http_method = Method::from_bytes(method.as_bytes())
59 .map_err(|_| CoreError::invalid_input(format!("invalid HTTP method {:?}", method)))?;
60 for (name, value) in headers {
61 HeaderName::from_bytes(name.as_bytes())
62 .map_err(|_| CoreError::invalid_input(format!("invalid header name {:?}", name)))?;
63 HeaderValue::from_str(value).map_err(|_| {
64 CoreError::invalid_input(format!("invalid header value for {:?}", name))
65 })?;
66 }
67
68 let parsed = parse_node_addr(remote_node_id)?;
71 let mut addr = iroh::EndpointAddr::new(parsed.node_id);
72 for a in &parsed.direct_addrs {
73 addr = addr.with_ip_addr(*a);
74 }
75 if let Some(addrs) = direct_addrs {
76 for a in addrs {
77 addr = addr.with_ip_addr(*a);
78 }
79 }
80 let remote_str = crate::base32_encode(parsed.node_id.as_bytes());
81 let path = extract_path(url);
82
83 let mut req_builder = hyper::Request::builder()
85 .method(http_method)
86 .uri(&path)
87 .header(hyper::header::HOST, &remote_str);
88
89 {
94 let has_accept_encoding = headers
95 .iter()
96 .any(|(k, _)| k.eq_ignore_ascii_case("accept-encoding"));
97 if !has_accept_encoding {
98 req_builder = req_builder.header("accept-encoding", "zstd");
99 }
100 }
101 for (k, v) in headers {
102 req_builder = req_builder.header(k.as_str(), v.as_str());
103 }
104
105 let req_body: Body = if let Some(reader) = req_body_reader {
106 Body::new(reader)
107 } else {
108 Body::empty()
109 };
110 let req = req_builder
111 .body(req_body)
112 .map_err(|e| CoreError::internal(format!("build request: {e}")))?;
113
114 let cancel_notify = fetch_token.and_then(|t| endpoint.handles().get_fetch_cancel_notify(t));
115
116 let cfg = crate::http::server::stack::StackConfig {
123 timeout,
124 decompression: decompress,
125 ..crate::http::server::stack::StackConfig::default()
126 };
127
128 let fetch_fut = fetch_request(endpoint, &addr, req, &cfg);
129 let resp = match cancel_notify {
130 Some(notify) => tokio::select! {
131 _ = notify.notified() => {
132 if let Some(t) = fetch_token {
133 endpoint.handles().remove_fetch_token(t);
134 }
135 return Err(CoreError::cancelled());
136 }
137 r = fetch_fut => r,
138 },
139 None => fetch_fut.await,
140 };
141
142 if let Some(t) = fetch_token {
144 endpoint.handles().remove_fetch_token(t);
145 }
146
147 let resp = resp.map_err(fetch_error_to_core)?;
148
149 package_response(endpoint, resp, &remote_str, &path, max_response_body_bytes).await
150}
151
152fn fetch_error_to_core(e: FetchError) -> CoreError {
155 match e {
156 FetchError::ConnectionFailed { detail, .. } => CoreError::connection_failed(detail),
157 FetchError::HeaderTooLarge { detail } => CoreError::header_too_large(detail),
158 FetchError::BodyTooLarge => CoreError::body_too_large("response body too large"),
159 FetchError::Timeout => CoreError::timeout("request timed out"),
160 FetchError::Cancelled => CoreError::cancelled(),
161 FetchError::Internal(msg) => CoreError::internal(msg),
162 }
163}
164
165pub(crate) fn extract_path(url: &str) -> String {
170 if let Some(rest) = url.strip_prefix("httpi://") {
171 if let Some(slash) = rest.find('/') {
172 return rest[slash..].to_string();
173 }
174 return "/".to_string();
175 }
176 if url.starts_with('/') {
177 return url.to_string();
178 }
179 format!("/{url}")
180}
181
182async fn package_response(
190 endpoint: &IrohEndpoint,
191 resp: hyper::Response<Body>,
192 remote_str: &str,
193 path: &str,
194 max_response_body_bytes: Option<usize>,
195) -> Result<FfiResponse, CoreError> {
196 let max_header_size = endpoint.max_header_size();
197 let max_response_body_bytes =
199 max_response_body_bytes.unwrap_or_else(|| endpoint.max_response_body_bytes());
200 let handles = endpoint.handles();
201
202 let status = resp.status().as_u16();
203 let header_bytes: usize = resp
206 .headers()
207 .iter()
208 .map(|(k, v)| {
209 k.as_str()
210 .len()
211 .saturating_add(v.as_bytes().len())
212 .saturating_add(4) })
214 .fold(16usize, |acc, x| acc.saturating_add(x)); if header_bytes > max_header_size {
216 return Err(CoreError::header_too_large(format!(
217 "response header size {header_bytes} exceeds limit {max_header_size}"
218 )));
219 }
220
221 let mut resp_headers: Vec<(String, String)> = Vec::new();
222 for (k, v) in resp.headers().iter() {
223 match v.to_str() {
224 Ok(s) => resp_headers.push((k.as_str().to_string(), s.to_string())),
225 Err(_) => {
226 return Err(CoreError::invalid_input(format!(
227 "non-UTF8 response header value for '{}'",
228 k.as_str()
229 )));
230 }
231 }
232 }
233
234 let response_url = format!("httpi://{remote_str}{path}");
235
236 if matches!(status, 204 | 205 | 304) {
242 drop(resp.into_body());
246 return Ok(FfiResponse {
247 status,
248 headers: resp_headers,
249 body_handle: 0,
250 url: response_url,
251 });
252 }
253
254 let mut guard = handles.insert_guard();
256 let (res_writer, res_reader) = handles.make_body_channel();
257 let body = resp.into_body();
258 let frame_timeout = res_writer.drain_timeout;
259 tokio::spawn(pump_hyper_body_to_channel_limited(
260 body,
261 res_writer,
262 Some(max_response_body_bytes),
263 frame_timeout,
264 None,
265 ));
266
267 let body_handle = guard.insert_reader(res_reader)?;
268 guard.commit();
269 Ok(FfiResponse {
270 status,
271 headers: resp_headers,
272 body_handle,
273 url: response_url,
274 })
275}