1use crate::utils;
2use crate::{IDMap, PeerConnections, Protocol};
3
4pub async fn peer_to_peer(
5 req: hyper::Request<hyper::body::Incoming>,
6 self_id52: &str,
7 remote_node_id52: &str,
8 peer_connections: PeerConnections,
9 _patch: ftnet_sdk::RequestPatch,
10 id_map: IDMap,
11) -> crate::http::ProxyResult {
12 use http_body_util::BodyExt;
13 use tokio_stream::StreamExt;
14
15 tracing::info!("peer_proxy: {remote_node_id52}");
16
17 let (mut send, recv) =
18 get_stream(self_id52, remote_node_id52, peer_connections, id_map).await?;
19
20 tracing::info!("got stream");
21 send.write_all(&serde_json::to_vec(&Protocol::Identity)?)
22 .await?;
23 send.write(b"\n").await?;
24
25 tracing::info!("wrote protocol");
26
27 let (head, body) = req.into_parts();
28 send.write_all(&serde_json::to_vec(&crate::http::Request::from(head))?)
29 .await?;
30 send.write_all("\n".as_bytes()).await?;
31 tracing::info!("sent request header");
32
33 let mut body = http_body_util::BodyDataStream::new(body);
34 while let Some(v) = body.next().await {
35 send.write_all(&v?).await?;
36 }
37
38 tracing::info!("sent body");
39
40 let mut recv = crate::utils::frame_reader(recv);
41 let r: crate::http::Response = match recv.next().await {
42 Some(v) => serde_json::from_str(&v?)?,
43 None => {
44 tracing::error!("failed to read from incoming connection");
45 return Err(eyre::anyhow!("failed to read from incoming connection"));
46 }
47 };
48
49 tracing::info!("got response header: {r:?}");
50
51 let mut body = recv.read_buffer().to_vec();
52 let mut recv = recv.into_inner();
53
54 let mut buf = Vec::with_capacity(1024 * 64);
55
56 while let Some(v) = recv.read(&mut buf).await? {
57 if v == 0 {
58 tracing::info!("finished reading body");
59 break;
60 }
61 body.extend_from_slice(&buf);
62 buf.truncate(0);
63 }
64
65 tracing::info!("got body");
66
67 let mut res = hyper::Response::new(
68 http_body_util::Full::new(hyper::body::Bytes::from(body))
69 .map_err(|e| match e {})
70 .boxed(),
71 );
72 *res.status_mut() = hyper::http::StatusCode::from_u16(r.status)?;
73 for (k, v) in r.headers {
74 res.headers_mut().insert(
75 hyper::http::header::HeaderName::from_bytes(k.as_bytes())?,
76 hyper::http::header::HeaderValue::from_bytes(&v)?,
77 );
78 }
79
80 tracing::info!("all done");
81 Ok(res)
82}
83
84async fn get_stream(
85 self_id52: &str,
86 remote_node_id52: &str,
87 peer_connections: PeerConnections,
88 id_map: IDMap,
89) -> eyre::Result<(iroh::endpoint::SendStream, iroh::endpoint::RecvStream)> {
90 let conn = get_connection(self_id52, remote_node_id52, id_map, peer_connections).await?;
91 Ok(conn.open_bi().await?)
94}
95
96async fn get_connection(
97 self_id52: &str,
98 remote_node_id52: &str,
99 id_map: IDMap,
100 peer_connections: PeerConnections,
101) -> eyre::Result<iroh::endpoint::Connection> {
102 let connections = peer_connections.lock().await;
103 let connection = connections.get(remote_node_id52).map(ToOwned::to_owned);
104
105 drop(connections);
107
108 if let Some(conn) = connection {
109 return Ok(conn);
110 }
111
112 let ep = get_endpoint(self_id52, id_map).await?;
113 let conn = match ep
114 .connect(
115 utils::id52_to_public_key(remote_node_id52)?,
116 crate::APNS_IDENTITY,
117 )
118 .await
119 {
120 Ok(conn) => conn,
121 Err(e) => {
122 tracing::error!("failed to create connection: {e:?}");
123 return Err(eyre::anyhow!("failed to create connection: {e:?}"));
124 }
125 };
126
127 let mut connections = peer_connections.lock().await;
128 connections.insert(remote_node_id52.to_string(), conn.clone());
129
130 Ok(conn)
131}
132
133async fn get_endpoint(self_id52: &str, id_map: IDMap) -> eyre::Result<iroh::endpoint::Endpoint> {
134 let map = id_map.lock().await;
135
136 for (id, (_port, ep)) in map.iter() {
137 if id == self_id52 {
138 return Ok(ep.clone());
139 }
140 }
141
142 tracing::error!("no entry for {self_id52} in the id_map: {id_map:?}");
143 Err(eyre::anyhow!(
144 "no entry for {self_id52} in the id_map: {id_map:?}"
145 ))
146}