ftnet_utils/
http_to_peer.rs1pub async fn http_to_peer<T>(
2 req: hyper::Request<T>,
3 self_endpoint: iroh::Endpoint,
4 remote_node_id52: &str,
5 peer_connections: ftnet_utils::PeerStreamSenders,
6 _patch: ftnet_sdk::RequestPatch,
7) -> ftnet_utils::http::ProxyResult
8where
9 T: hyper::body::Body + Unpin + Send,
10 T::Data: Into<hyper::body::Bytes> + Send,
11 T::Error: std::error::Error + Send + Sync + 'static,
12{
13 use http_body_util::{BodyDataStream, BodyExt};
14 use tokio_stream::StreamExt;
15
16 tracing::info!("peer_proxy: {remote_node_id52}");
17
18 let (mut send, mut recv) = ftnet_utils::get_stream(
19 self_endpoint,
20 ftnet_utils::Protocol::Identity,
21 remote_node_id52.to_string(),
22 peer_connections.clone(),
23 )
24 .await?;
25
26 tracing::info!("wrote protocol");
27
28 let (head, body) = req.into_parts();
29 send.write_all(&serde_json::to_vec(&crate::http::Request::from(head))?)
30 .await?;
31 send.write_all(b"\n").await?;
32
33 tracing::info!("sent request header");
34
35 let mut stream = BodyDataStream::new(body);
36
37 while let Some(chunk) = stream.next().await {
38 let bytes: hyper::body::Bytes = chunk?.into(); send.write_all(&bytes).await?;
40 }
41
42 tracing::info!("sent body");
43
44 let r: ftnet_utils::http::Response = match recv.next().await {
45 Some(Ok(v)) => serde_json::from_str(&v)?,
46 Some(Err(e)) => {
47 tracing::error!("failed to get bidirectional stream: {e:?}");
48 return Err(eyre::anyhow!("failed to get bidirectional stream: {e:?}"));
49 }
50 None => {
51 tracing::error!("failed to read from incoming connection");
52 return Err(eyre::anyhow!("failed to read from incoming connection"));
53 }
54 };
55
56 tracing::info!("got response header: {r:?}");
57
58 let mut body = recv.read_buffer().to_owned();
59 let mut recv = recv.into_inner();
60
61 tracing::trace!("reading body");
62
63 while let Some(v) = match recv.read_chunk(1024 * 64, true).await {
64 Ok(v) => Ok(v),
65 Err(e) => {
66 tracing::error!("error reading chunk: {e:?}");
67 Err(eyre::anyhow!("read_chunk error: {e:?}"))
68 }
69 }? {
70 body.extend_from_slice(&v.bytes);
71 tracing::trace!(
72 "reading body, partial: {}, new body size: {} bytes",
73 v.bytes.len(),
74 body.len()
75 );
76 }
77
78 let body = body.freeze();
79 tracing::debug!("got {} bytes of body", body.len());
80
81 let mut res = hyper::Response::new(
82 http_body_util::Full::new(body)
83 .map_err(|e| match e {})
84 .boxed(),
85 );
86 *res.status_mut() = hyper::http::StatusCode::from_u16(r.status)?;
87 for (k, v) in r.headers {
88 res.headers_mut().insert(
89 hyper::http::header::HeaderName::from_bytes(k.as_bytes())?,
90 hyper::http::header::HeaderValue::from_bytes(&v)?,
91 );
92 }
93
94 tracing::info!("all done");
95 Ok(res)
96}