use hyper_util::rt::TokioIo;
use crate::{
http::{server::stack::StackConfig, transport::io::IrohStream},
Body, IrohEndpoint, ALPN,
};
#[derive(Debug)]
#[non_exhaustive]
pub enum FetchError {
ConnectionFailed {
detail: String,
source: Option<hyper::Error>,
},
HeaderTooLarge { detail: String },
BodyTooLarge,
Timeout,
Cancelled,
Internal(String),
}
impl std::fmt::Display for FetchError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
FetchError::ConnectionFailed { detail, .. } => {
write!(f, "connection failed: {detail}")
}
FetchError::HeaderTooLarge { detail } => {
write!(f, "response header too large: {detail}")
}
FetchError::BodyTooLarge => f.write_str("response body too large"),
FetchError::Timeout => f.write_str("request timed out"),
FetchError::Cancelled => f.write_str("request cancelled"),
FetchError::Internal(msg) => write!(f, "internal error: {msg}"),
}
}
}
impl std::error::Error for FetchError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
FetchError::ConnectionFailed {
source: Some(s), ..
} => Some(s),
_ => None,
}
}
}
pub async fn fetch_request(
endpoint: &IrohEndpoint,
addr: &iroh::EndpointAddr,
req: hyper::Request<Body>,
cfg: &StackConfig,
) -> Result<hyper::Response<Body>, FetchError> {
let work = async {
let node_id = addr.id;
let ep_raw = endpoint.raw().clone();
let addr_clone = addr.clone();
let max_header_size = endpoint.max_header_size();
let pooled = endpoint
.pool()
.get_or_connect(node_id, ALPN, || async move {
ep_raw
.connect(addr_clone, ALPN)
.await
.map_err(|e| format!("connect: {e}"))
})
.await
.map_err(|e| FetchError::ConnectionFailed {
detail: e,
source: None,
})?;
let conn = pooled.conn.clone();
let (send, recv) = conn
.open_bi()
.await
.map_err(|e| FetchError::ConnectionFailed {
detail: format!("open_bi: {e}"),
source: None,
})?;
let io = TokioIo::new(IrohStream::new(send, recv));
let (sender, conn_task) = hyper::client::conn::http1::Builder::new()
.max_buf_size(max_header_size.max(8192))
.max_headers(128)
.handshake::<_, Body>(io)
.await
.map_err(|e| FetchError::ConnectionFailed {
detail: format!("hyper handshake: {e}"),
source: Some(e),
})?;
tokio::spawn(conn_task);
use tower::ServiceExt;
let svc = crate::http::server::stack::build_client_stack(sender, cfg);
svc.oneshot(req)
.await
.map_err(|e| FetchError::ConnectionFailed {
detail: format!("send_request: {e}"),
source: Some(e),
})
};
match cfg.timeout {
Some(t) => match tokio::time::timeout(t, work).await {
Ok(r) => r,
Err(_) => Err(FetchError::Timeout),
},
None => work.await,
}
}