ftnet_utils/
http_to_peer.rs

1pub async fn http_to_peer<T>(
2    req: hyper::Request<T>,
3    self_endpoint: iroh::Endpoint,
4    remote_node_id52: &str,
5    peer_connections: ftnet_utils::PeerStreamSenders,
6    _patch: ftnet_sdk::RequestPatch,
7) -> ftnet_utils::http::ProxyResult
8where
9    T: hyper::body::Body + Unpin + Send,
10    T::Data: Into<hyper::body::Bytes> + Send,
11    T::Error: std::error::Error + Send + Sync + 'static,
12{
13    use http_body_util::{BodyDataStream, BodyExt};
14    use tokio_stream::StreamExt;
15
16    tracing::info!("peer_proxy: {remote_node_id52}");
17
18    let (mut send, mut recv) = ftnet_utils::get_stream(
19        self_endpoint,
20        ftnet_utils::Protocol::Identity,
21        remote_node_id52.to_string(),
22        peer_connections.clone(),
23    )
24    .await?;
25
26    tracing::info!("wrote protocol");
27
28    let (head, body) = req.into_parts();
29    send.write_all(&serde_json::to_vec(&crate::http::Request::from(head))?)
30        .await?;
31    send.write_all(b"\n").await?;
32
33    tracing::info!("sent request header");
34
35    let mut stream = BodyDataStream::new(body);
36
37    while let Some(chunk) = stream.next().await {
38        let bytes: hyper::body::Bytes = chunk?.into(); // requires T::Data: Into<Bytes>
39        send.write_all(&bytes).await?;
40    }
41
42    tracing::info!("sent body");
43
44    let r: ftnet_utils::http::Response = match recv.next().await {
45        Some(Ok(v)) => serde_json::from_str(&v)?,
46        Some(Err(e)) => {
47            tracing::error!("failed to get bidirectional stream: {e:?}");
48            return Err(eyre::anyhow!("failed to get bidirectional stream: {e:?}"));
49        }
50        None => {
51            tracing::error!("failed to read from incoming connection");
52            return Err(eyre::anyhow!("failed to read from incoming connection"));
53        }
54    };
55
56    tracing::info!("got response header: {r:?}");
57
58    let mut body = recv.read_buffer().to_owned();
59    let mut recv = recv.into_inner();
60
61    tracing::trace!("reading body");
62
63    while let Some(v) = match recv.read_chunk(1024 * 64, true).await {
64        Ok(v) => Ok(v),
65        Err(e) => {
66            tracing::error!("error reading chunk: {e:?}");
67            Err(eyre::anyhow!("read_chunk error: {e:?}"))
68        }
69    }? {
70        body.extend_from_slice(&v.bytes);
71        tracing::trace!(
72            "reading body, partial: {}, new body size: {} bytes",
73            v.bytes.len(),
74            body.len()
75        );
76    }
77
78    let body = body.freeze();
79    tracing::debug!("got {} bytes of body", body.len());
80
81    let mut res = hyper::Response::new(
82        http_body_util::Full::new(body)
83            .map_err(|e| match e {})
84            .boxed(),
85    );
86    *res.status_mut() = hyper::http::StatusCode::from_u16(r.status)?;
87    for (k, v) in r.headers {
88        res.headers_mut().insert(
89            hyper::http::header::HeaderName::from_bytes(k.as_bytes())?,
90            hyper::http::header::HeaderValue::from_bytes(&v)?,
91        );
92    }
93
94    tracing::info!("all done");
95    Ok(res)
96}