#![allow(clippy::disallowed_types)]
use std::time::Duration;
use http::{HeaderName, HeaderValue, Method};
use crate::{
ffi::{handles::BodyReader, pumps::pump_hyper_body_to_channel_limited},
http::client::{fetch_request, FetchError},
parse_node_addr, Body, CoreError, FfiResponse, IrohEndpoint,
};
#[allow(clippy::too_many_arguments)]
pub async fn fetch(
endpoint: &IrohEndpoint,
remote_node_id: &str,
url: &str,
method: &str,
headers: &[(String, String)],
req_body_reader: Option<BodyReader>,
fetch_token: Option<u64>,
direct_addrs: Option<&[std::net::SocketAddr]>,
timeout: Option<Duration>,
decompress: bool,
max_response_body_bytes: Option<usize>,
) -> Result<FfiResponse, CoreError> {
{
let lower = url.to_ascii_lowercase();
if lower.starts_with("https://") || lower.starts_with("http://") {
let scheme_end = lower
.find("://")
.map(|i| i.saturating_add(3))
.unwrap_or(lower.len());
return Err(CoreError::invalid_input(format!(
"iroh-http URLs must use the \"httpi://\" scheme, not \"{}\". \
Example: httpi://nodeId/path",
&url[..scheme_end]
)));
}
}
let http_method = Method::from_bytes(method.as_bytes())
.map_err(|_| CoreError::invalid_input(format!("invalid HTTP method {:?}", method)))?;
for (name, value) in headers {
HeaderName::from_bytes(name.as_bytes())
.map_err(|_| CoreError::invalid_input(format!("invalid header name {:?}", name)))?;
HeaderValue::from_str(value).map_err(|_| {
CoreError::invalid_input(format!("invalid header value for {:?}", name))
})?;
}
let parsed = parse_node_addr(remote_node_id)?;
let mut addr = iroh::EndpointAddr::new(parsed.node_id);
for a in &parsed.direct_addrs {
addr = addr.with_ip_addr(*a);
}
if let Some(addrs) = direct_addrs {
for a in addrs {
addr = addr.with_ip_addr(*a);
}
}
let remote_str = crate::base32_encode(parsed.node_id.as_bytes());
let path = extract_path(url);
let mut req_builder = hyper::Request::builder()
.method(http_method)
.uri(&path)
.header(hyper::header::HOST, &remote_str);
{
let has_accept_encoding = headers
.iter()
.any(|(k, _)| k.eq_ignore_ascii_case("accept-encoding"));
if !has_accept_encoding {
req_builder = req_builder.header("accept-encoding", "zstd");
}
}
for (k, v) in headers {
req_builder = req_builder.header(k.as_str(), v.as_str());
}
let req_body: Body = if let Some(reader) = req_body_reader {
Body::new(reader)
} else {
Body::empty()
};
let req = req_builder
.body(req_body)
.map_err(|e| CoreError::internal(format!("build request: {e}")))?;
let cancel_notify = fetch_token.and_then(|t| endpoint.handles().get_fetch_cancel_notify(t));
let cfg = crate::http::server::stack::StackConfig {
timeout,
decompression: decompress,
..crate::http::server::stack::StackConfig::default()
};
let fetch_fut = fetch_request(endpoint, &addr, req, &cfg);
let resp = match cancel_notify {
Some(notify) => tokio::select! {
_ = notify.notified() => {
if let Some(t) = fetch_token {
endpoint.handles().remove_fetch_token(t);
}
return Err(CoreError::cancelled());
}
r = fetch_fut => r,
},
None => fetch_fut.await,
};
if let Some(t) = fetch_token {
endpoint.handles().remove_fetch_token(t);
}
let resp = resp.map_err(fetch_error_to_core)?;
package_response(endpoint, resp, &remote_str, &path, max_response_body_bytes).await
}
fn fetch_error_to_core(e: FetchError) -> CoreError {
match e {
FetchError::ConnectionFailed { detail, .. } => CoreError::connection_failed(detail),
FetchError::HeaderTooLarge { detail } => CoreError::header_too_large(detail),
FetchError::BodyTooLarge => CoreError::body_too_large("response body too large"),
FetchError::Timeout => CoreError::timeout("request timed out"),
FetchError::Cancelled => CoreError::cancelled(),
FetchError::Internal(msg) => CoreError::internal(msg),
}
}
pub(crate) fn extract_path(url: &str) -> String {
if let Some(rest) = url.strip_prefix("httpi://") {
if let Some(slash) = rest.find('/') {
return rest[slash..].to_string();
}
return "/".to_string();
}
if url.starts_with('/') {
return url.to_string();
}
format!("/{url}")
}
async fn package_response(
endpoint: &IrohEndpoint,
resp: hyper::Response<Body>,
remote_str: &str,
path: &str,
max_response_body_bytes: Option<usize>,
) -> Result<FfiResponse, CoreError> {
let max_header_size = endpoint.max_header_size();
let max_response_body_bytes =
max_response_body_bytes.unwrap_or_else(|| endpoint.max_response_body_bytes());
let handles = endpoint.handles();
let status = resp.status().as_u16();
let header_bytes: usize = resp
.headers()
.iter()
.map(|(k, v)| {
k.as_str()
.len()
.saturating_add(v.as_bytes().len())
.saturating_add(4) })
.fold(16usize, |acc, x| acc.saturating_add(x)); if header_bytes > max_header_size {
return Err(CoreError::header_too_large(format!(
"response header size {header_bytes} exceeds limit {max_header_size}"
)));
}
let mut resp_headers: Vec<(String, String)> = Vec::new();
for (k, v) in resp.headers().iter() {
match v.to_str() {
Ok(s) => resp_headers.push((k.as_str().to_string(), s.to_string())),
Err(_) => {
return Err(CoreError::invalid_input(format!(
"non-UTF8 response header value for '{}'",
k.as_str()
)));
}
}
}
let response_url = format!("httpi://{remote_str}{path}");
if matches!(status, 204 | 205 | 304) {
drop(resp.into_body());
return Ok(FfiResponse {
status,
headers: resp_headers,
body_handle: 0,
url: response_url,
});
}
let mut guard = handles.insert_guard();
let (res_writer, res_reader) = handles.make_body_channel();
let body = resp.into_body();
let frame_timeout = res_writer.drain_timeout;
tokio::spawn(pump_hyper_body_to_channel_limited(
body,
res_writer,
Some(max_response_body_bytes),
frame_timeout,
None,
));
let body_handle = guard.insert_reader(res_reader)?;
guard.commit();
Ok(FfiResponse {
status,
headers: resp_headers,
body_handle,
url: response_url,
})
}