fastn_net/get_stream.rs
1/// Connection pool for P2P stream management.
2///
3/// Maintains a map of active peer connections indexed by (self ID52, remote ID52) pairs.
4/// Each entry contains a channel for requesting new streams on that connection.
5/// Connections are automatically removed when they break or become unhealthy.
6///
7/// This type is used to reuse existing P2P connections instead of creating new ones
8/// for each request, improving performance and reducing connection overhead.
9pub type PeerStreamSenders = std::sync::Arc<
10 tokio::sync::Mutex<std::collections::HashMap<(SelfID52, RemoteID52), StreamRequestSender>>,
11>;
12
13type Stream = (iroh::endpoint::SendStream, iroh::endpoint::RecvStream);
14type StreamResult = eyre::Result<Stream>;
15type ReplyChannel = tokio::sync::oneshot::Sender<StreamResult>;
16type RemoteID52 = String;
17type SelfID52 = String;
18
19type StreamRequest = (crate::ProtocolHeader, ReplyChannel);
20
21type StreamRequestSender = tokio::sync::mpsc::Sender<StreamRequest>;
22type StreamRequestReceiver = tokio::sync::mpsc::Receiver<StreamRequest>;
23
24/// Gets or creates a bidirectional stream to a remote peer.
25///
26/// This function manages P2P connections efficiently by:
27/// 1. Reusing existing connections when available
28/// 2. Creating new connections when needed
29/// 3. Verifying connection health with protocol handshake
30/// 4. Automatically reconnecting on failure
31///
32/// The function sends the protocol header and waits for acknowledgment
33/// to ensure the stream is healthy before returning it.
34///
35/// # Arguments
36///
37/// * `self_endpoint` - Local Iroh endpoint
38/// * `header` - Protocol header to negotiate
39/// * `remote_node_id52` - ID52 of the target peer
40/// * `peer_stream_senders` - Connection pool for reuse
41/// * `graceful` - Graceful shutdown handle
42///
43/// # Returns
44///
45/// A tuple of (SendStream, RecvStream) ready for communication.
46///
47/// # Errors
48///
49/// Returns an error if connection fails or protocol negotiation times out.
50#[tracing::instrument(skip_all)]
51pub async fn get_stream(
52 self_endpoint: iroh::Endpoint,
53 header: crate::ProtocolHeader,
54 remote_node_id52: RemoteID52,
55 peer_stream_senders: PeerStreamSenders,
56 graceful: crate::Graceful,
57) -> eyre::Result<(iroh::endpoint::SendStream, iroh::endpoint::RecvStream)> {
58 use eyre::WrapErr;
59
60 tracing::trace!("get_stream: {header:?}");
61 let stream_request_sender = get_stream_request_sender(
62 self_endpoint,
63 remote_node_id52,
64 peer_stream_senders,
65 graceful,
66 )
67 .await;
68 tracing::trace!("got stream_request_sender");
69 let (reply_channel, receiver) = tokio::sync::oneshot::channel();
70
71 stream_request_sender
72 .send((header, reply_channel))
73 .await
74 .wrap_err_with(|| "failed to send on stream_request_sender")?;
75
76 tracing::trace!("sent stream request");
77
78 let r = receiver.await?;
79
80 tracing::trace!("got stream request reply");
81 r
82}
83
84#[tracing::instrument(skip_all)]
85async fn get_stream_request_sender(
86 self_endpoint: iroh::Endpoint,
87 remote_node_id52: RemoteID52,
88 peer_stream_senders: PeerStreamSenders,
89 graceful: crate::Graceful,
90) -> StreamRequestSender {
91 // Convert iroh::PublicKey to ID52 string
92 let self_id52 = data_encoding::BASE32_DNSSEC.encode(self_endpoint.node_id().as_bytes());
93 let mut senders = peer_stream_senders.lock().await;
94
95 if let Some(sender) = senders.get(&(self_id52.clone(), remote_node_id52.clone())) {
96 return sender.clone();
97 }
98
99 // TODO: figure out if the mpsc::channel is the right size
100 let (sender, receiver) = tokio::sync::mpsc::channel(1);
101 senders.insert(
102 (self_id52.clone(), remote_node_id52.clone()),
103 sender.clone(),
104 );
105 drop(senders);
106
107 let graceful_for_connection_manager = graceful.clone();
108 graceful.spawn(async move {
109 connection_manager(
110 receiver,
111 self_endpoint,
112 remote_node_id52.clone(),
113 graceful_for_connection_manager,
114 )
115 .await;
116
117 // cleanup the peer_stream_senders map, so no future tasks will try to use this.
118 let mut senders = peer_stream_senders.lock().await;
119 senders.remove(&(self_id52.clone(), remote_node_id52));
120 });
121
122 sender
123}
124
125async fn connection_manager(
126 mut receiver: StreamRequestReceiver,
127 self_endpoint: iroh::Endpoint,
128 remote_node_id52: RemoteID52,
129 graceful: crate::Graceful,
130) {
131 let e = match connection_manager_(
132 &mut receiver,
133 self_endpoint,
134 remote_node_id52.clone(),
135 graceful,
136 )
137 .await
138 {
139 Ok(()) => {
140 tracing::info!("connection manager closed");
141 return;
142 }
143 Err(e) => e,
144 };
145
146 // what is our error handling strategy?
147 //
148 // since an error has just occurred on our connection, it is best to cancel all concurrent
149 // tasks that depend on this connection, and let the next task recreate the connection, this
150 // way things are clean.
151 //
152 // we can try to keep the concurrent tasks open, and retry connection, but it increases the
153 // complexity of implementation, and it is not worth it for now.
154 //
155 // also note that connection_manager() and it's caller, get_stream(), are called to create the
156 // initial stream only, this error handling strategy will work for concurrent requests that are
157 // waiting for the stream to be created. the tasks that already got the stream will not be
158 // affected by this. tho, since something wrong has happened with the connection, they will
159 // eventually fail too.
160 tracing::error!("connection manager worker error: {e:?}");
161
162 // once we close the receiver, any tasks that have gotten access to the corresponding sender
163 // will fail when sending.
164 receiver.close();
165
166 // send an error to all the tasks that are waiting for stream for this receiver.
167 while let Some((_protocol, reply_channel)) = receiver.recv().await {
168 if reply_channel
169 .send(Err(eyre::anyhow!("failed to create connection: {e:?}")))
170 .is_err()
171 {
172 tracing::error!("failed to send error reply: {e:?}");
173 }
174 }
175}
176
177#[tracing::instrument(skip_all)]
178async fn connection_manager_(
179 receiver: &mut StreamRequestReceiver,
180 self_endpoint: iroh::Endpoint,
181 remote_node_id52: RemoteID52,
182 graceful: crate::Graceful,
183) -> eyre::Result<()> {
184 let conn = match self_endpoint
185 .connect(
186 {
187 // Convert ID52 to iroh::NodeId
188 use std::str::FromStr;
189 let public_key = fastn_id52::PublicKey::from_str(&remote_node_id52)
190 .map_err(|e| eyre::anyhow!("{}", e))?;
191 iroh::NodeId::from(iroh::PublicKey::from_bytes(&public_key.to_bytes())?)
192 },
193 crate::APNS_IDENTITY,
194 )
195 .await
196 {
197 Ok(v) => v,
198 Err(e) => {
199 tracing::error!("failed to create connection: {e:?}");
200 return Err(eyre::anyhow!("failed to create connection: {e:?}"));
201 }
202 };
203
204 let timeout = std::time::Duration::from_secs(12);
205 let mut idle_counter = 0;
206
207 loop {
208 tracing::trace!("connection manager loop");
209
210 if idle_counter > 4 {
211 tracing::info!("connection idle timeout, returning");
212 // this ensures we keep a connection open only for 12 * 5 seconds = 1 min
213 break;
214 }
215
216 tokio::select! {
217 _ = graceful.cancelled() => {
218 tracing::info!("graceful shutdown");
219 break;
220 },
221 _ = tokio::time::sleep(timeout) => {
222 tracing::info!("woken up");
223 if let Err(e) = crate::ping(&conn).await {
224 tracing::error!("pinging failed: {e:?}");
225 break;
226 }
227 idle_counter += 1;
228 },
229 Some((header, reply_channel)) = receiver.recv() => {
230 tracing::info!("connection: {header:?}, idle counter: {idle_counter}");
231 idle_counter = 0;
232 // is this a good idea to serialize this part? if 10 concurrent requests come in, we will
233 // handle each one sequentially. the other alternative is to spawn a task for each request.
234 // so which is better?
235 //
236 // in general, if we do it in parallel via spawning, we will have better throughput.
237 //
238 // and we are not worried about having too many concurrent tasks, tho iroh has a limit on
239 // concurrent tasks[1], with a default of 100[2]. it is actually a todo to find out what
240 // happens when we hit this limit, do they handle it by queueing the tasks, or do they
241 // return an error. if they queue then we wont have to implement queue logic.
242 //
243 // [1]: https://docs.rs/iroh/0.34.1/iroh/endpoint/struct.TransportConfig.html#method.max_concurrent_bidi_streams
244 // [2]: https://docs.rs/iroh-quinn-proto/0.13.0/src/iroh_quinn_proto/config/transport.rs.html#354
245 //
246 // but all that is besides the point, we are worried about resilience right now, not
247 // throughput per se (throughput is secondary goal, resilience primary).
248 //
249 // say we have 10 concurrent requests and lets say if we spawned a task for each, what
250 // happens in error case? say connection failed, the device switched from wifi to 4g, or
251 // whatever? in the handler task, we are putting a timeout on the read. in the serial case
252 // the first request will timeout, and all subsequent requests will get immediately an
253 // error. its predictable, its clean.
254 //
255 // if the tasks were spawned, each will timeout independently.
256 //
257 // we can also no longer rely on this function, connection_manager_, returning an error for
258 // them, so our connection_handler strategy will interfere, we would have read more requests
259 // off of receiver.
260 //
261 // do note that this is not a clear winner problem, this is a tradeoff, we lose throughput,
262 // as in best case scenario, 10 concurrent tasks will be better. we will have to revisit
263 // this in future when we are performance optimising things.
264 if let Err(e) = handle_request(&conn, header, reply_channel).await {
265 tracing::error!("failed to handle request: {e:?}");
266 // note: we are intentionally not calling conn.close(). why? so that if some existing
267 // stream is still open, if we explicitly call close on the connection, that stream will
268 // immediately fail as well, and we do not want that. we want to let the stream fail
269 // on its own, maybe it will work, maybe it will not.
270 return Err(e);
271 }
272 tracing::info!("handled connection");
273 }
274 else => {
275 tracing::error!("failed to read from receiver");
276 break
277 },
278 }
279 }
280
281 Ok(())
282}
283
284async fn handle_request(
285 conn: &iroh::endpoint::Connection,
286 header: crate::ProtocolHeader,
287 reply_channel: ReplyChannel,
288) -> eyre::Result<()> {
289 use eyre::WrapErr;
290
291 tracing::trace!("handling request: {header:?}");
292
293 let (mut send, mut recv) = match conn.open_bi().await {
294 Ok(v) => {
295 tracing::trace!("opened bi-stream");
296 v
297 }
298 Err(e) => {
299 tracing::error!("failed to open_bi: {e:?}");
300 return Err(eyre::anyhow!("failed to open_bi: {e:?}"));
301 }
302 };
303
304 send.write_all(
305 &serde_json::to_vec(&header.protocol)
306 .wrap_err_with(|| format!("failed to serialize protocol: {:?}", header.protocol))?,
307 )
308 .await?;
309 tracing::trace!("wrote protocol");
310
311 send.write(b"\n")
312 .await
313 .wrap_err_with(|| "failed to write newline")?;
314
315 tracing::trace!("wrote newline");
316
317 if let Some(extra) = header.extra {
318 send.write_all(extra.as_bytes()).await?;
319 tracing::trace!("wrote protocol");
320
321 send.write(b"\n")
322 .await
323 .wrap_err_with(|| "failed to write newline")?;
324 }
325
326 let msg = crate::next_string(&mut recv).await?;
327
328 if msg != crate::ACK {
329 tracing::error!("failed to read ack: {msg:?}");
330 return Err(eyre::anyhow!("failed to read ack: {msg:?}"));
331 }
332
333 tracing::trace!("received ack");
334
335 reply_channel.send(Ok((send, recv))).unwrap_or_else(|e| {
336 tracing::error!("failed to send reply: {e:?}");
337 });
338
339 tracing::trace!("handle_request done");
340
341 Ok(())
342}