fastn_net/
peer_to_http.rs1pub async fn peer_to_http(
17 addr: &str,
18 client_pools: crate::HttpConnectionPools,
19 send: &mut iroh::endpoint::SendStream,
20 mut recv: iroh::endpoint::RecvStream,
21) -> eyre::Result<()> {
22 use eyre::WrapErr;
23 use http_body_util::BodyExt;
24
25 tracing::info!("http request with {addr}");
26 let start = std::time::Instant::now();
27
28 let req: crate::http::Request = crate::next_json(&mut recv).await?;
29
30 tracing::info!("got request: {req:?}");
31
32 let mut r = hyper::Request::builder()
33 .method(req.method.as_str())
34 .uri(&req.uri);
35 for (name, value) in req.headers {
36 r = r.header(name, value);
37 }
38
39 tracing::debug!("request: {r:?}");
40
41 let pool = get_pool(addr, client_pools).await?;
42 tracing::trace!("got pool");
43 let mut client = match pool.get().await {
44 Ok(v) => v,
45 Err(e) => {
46 tracing::error!("failed to get connection: {e:?}");
47 return Err(eyre::anyhow!("failed to get connection: {e:?}"));
48 }
49 };
50 use futures_util::TryStreamExt;
53 let stream = tokio_util::io::ReaderStream::new(recv);
54 let stream_body = http_body_util::StreamBody::new(
55 stream
56 .map_ok(|b| {
57 tracing::trace!("got chunk of size: {}", b.len());
58 hyper::body::Frame::data(b)
59 })
60 .map_err(|e| {
61 tracing::info!("error reading chunk: {e:?}");
62 eyre::anyhow!("read_chunk error: {e:?}")
63 }),
64 );
65
66 let boxed_body = http_body_util::BodyExt::boxed(stream_body);
67
68 let (resp, mut body) = client
69 .send_request(r.body(boxed_body)?)
70 .await
71 .wrap_err_with(|| "failed to send request")?
72 .into_parts();
73
74 let r = crate::http::Response {
75 status: resp.status.as_u16(),
76 headers: resp
77 .headers
78 .iter()
79 .map(|(k, v)| (k.to_string(), v.as_bytes().to_vec()))
80 .collect(),
81 };
82
83 send.write_all(
84 serde_json::to_string(&r)
85 .wrap_err_with(|| "failed to serialize json while writing http response")?
86 .as_bytes(),
87 )
88 .await?;
89 send.write_all(b"\n").await?;
90
91 tracing::debug!(
92 "got response body of size: {:?} bytes",
93 hyper::body::Body::size_hint(&body)
94 );
95
96 while let Some(chunk) = body.frame().await {
97 match chunk {
98 Ok(v) => {
99 let data = v
100 .data_ref()
101 .ok_or_else(|| eyre::anyhow!("chunk data is None"))?;
102 tracing::trace!("sending chunk of size: {}", data.len());
103 send.write_all(data).await?;
104 }
105 Err(e) => {
106 tracing::error!("error reading chunk: {e:?}");
107 return Err(eyre::anyhow!("read_chunk error: {e:?}"));
108 }
109 }
110 }
111
112 tracing::info!("handled http request in {:?}", start.elapsed());
113
114 {
115 use colored::Colorize;
116 println!(
117 "{} {} {} in {}",
118 req.method.to_uppercase().green(),
119 req.uri,
120 resp.status.as_str().on_blue().black(),
121 format!("{}ms", start.elapsed().as_millis()).yellow()
122 );
123 }
124
125 Ok(())
126}
127
128async fn get_pool(
129 addr: &str,
130 client_pools: crate::HttpConnectionPools,
131) -> eyre::Result<bb8::Pool<crate::HttpConnectionManager>> {
132 tracing::trace!("get pool called");
133 let mut pools = client_pools.lock().await;
134
135 Ok(match pools.get(addr) {
136 Some(v) => {
137 tracing::debug!("found existing pool for {addr}");
138 v.clone()
139 }
140 None => {
141 tracing::debug!("creating new pool for {addr}");
142
143 let pool = bb8::Pool::builder()
144 .build(crate::HttpConnectionManager::new(addr.to_string()))
145 .await?;
146
147 pools.insert(addr.to_string(), pool.clone());
148 pool
149 }
150 })
151}