fastn_net/
peer_to_http.rs

1/// Handles an incoming P2P request and proxies it to an HTTP server.
2///
3/// Receives a request from a peer over Iroh streams and forwards it
4/// to the specified HTTP server address using connection pooling.
5///
6/// # Arguments
7///
8/// * `addr` - Target HTTP server address
9/// * `client_pools` - HTTP connection pools for reuse
10/// * `send` - Stream to send response back to peer
11/// * `recv` - Stream to receive request from peer
12///
13/// # Errors
14///
15/// Returns an error if the HTTP request fails or streams are interrupted.
16pub async fn peer_to_http(
17    addr: &str,
18    client_pools: crate::HttpConnectionPools,
19    send: &mut iroh::endpoint::SendStream,
20    mut recv: iroh::endpoint::RecvStream,
21) -> eyre::Result<()> {
22    use eyre::WrapErr;
23    use http_body_util::BodyExt;
24
25    tracing::info!("http request with {addr}");
26    let start = std::time::Instant::now();
27
28    let req: crate::http::Request = crate::next_json(&mut recv).await?;
29
30    tracing::info!("got request: {req:?}");
31
32    let mut r = hyper::Request::builder()
33        .method(req.method.as_str())
34        .uri(&req.uri);
35    for (name, value) in req.headers {
36        r = r.header(name, value);
37    }
38
39    tracing::debug!("request: {r:?}");
40
41    let pool = get_pool(addr, client_pools).await?;
42    tracing::trace!("got pool");
43    let mut client = match pool.get().await {
44        Ok(v) => v,
45        Err(e) => {
46            tracing::error!("failed to get connection: {e:?}");
47            return Err(eyre::anyhow!("failed to get connection: {e:?}"));
48        }
49    };
50    // tracing::info!("got client");
51
52    use futures_util::TryStreamExt;
53    let stream = tokio_util::io::ReaderStream::new(recv);
54    let stream_body = http_body_util::StreamBody::new(
55        stream
56            .map_ok(|b| {
57                tracing::trace!("got chunk of size: {}", b.len());
58                hyper::body::Frame::data(b)
59            })
60            .map_err(|e| {
61                tracing::info!("error reading chunk: {e:?}");
62                eyre::anyhow!("read_chunk error: {e:?}")
63            }),
64    );
65
66    let boxed_body = http_body_util::BodyExt::boxed(stream_body);
67
68    let (resp, mut body) = client
69        .send_request(r.body(boxed_body)?)
70        .await
71        .wrap_err_with(|| "failed to send request")?
72        .into_parts();
73
74    let r = crate::http::Response {
75        status: resp.status.as_u16(),
76        headers: resp
77            .headers
78            .iter()
79            .map(|(k, v)| (k.to_string(), v.as_bytes().to_vec()))
80            .collect(),
81    };
82
83    send.write_all(
84        serde_json::to_string(&r)
85            .wrap_err_with(|| "failed to serialize json while writing http response")?
86            .as_bytes(),
87    )
88    .await?;
89    send.write_all(b"\n").await?;
90
91    tracing::debug!(
92        "got response body of size: {:?} bytes",
93        hyper::body::Body::size_hint(&body)
94    );
95
96    while let Some(chunk) = body.frame().await {
97        match chunk {
98            Ok(v) => {
99                let data = v
100                    .data_ref()
101                    .ok_or_else(|| eyre::anyhow!("chunk data is None"))?;
102                tracing::trace!("sending chunk of size: {}", data.len());
103                send.write_all(data).await?;
104            }
105            Err(e) => {
106                tracing::error!("error reading chunk: {e:?}");
107                return Err(eyre::anyhow!("read_chunk error: {e:?}"));
108            }
109        }
110    }
111
112    tracing::info!("handled http request in {:?}", start.elapsed());
113
114    {
115        use colored::Colorize;
116        println!(
117            "{} {} {} in {}",
118            req.method.to_uppercase().green(),
119            req.uri,
120            resp.status.as_str().on_blue().black(),
121            format!("{}ms", start.elapsed().as_millis()).yellow()
122        );
123    }
124
125    Ok(())
126}
127
128async fn get_pool(
129    addr: &str,
130    client_pools: crate::HttpConnectionPools,
131) -> eyre::Result<bb8::Pool<crate::HttpConnectionManager>> {
132    tracing::trace!("get pool called");
133    let mut pools = client_pools.lock().await;
134
135    Ok(match pools.get(addr) {
136        Some(v) => {
137            tracing::debug!("found existing pool for {addr}");
138            v.clone()
139        }
140        None => {
141            tracing::debug!("creating new pool for {addr}");
142
143            let pool = bb8::Pool::builder()
144                .build(crate::HttpConnectionManager::new(addr.to_string()))
145                .await?;
146
147            pools.insert(addr.to_string(), pool.clone());
148            pool
149        }
150    })
151}