fastn_net/
get_stream.rs

1/// Connection pool for P2P stream management.
2///
3/// Maintains a map of active peer connections indexed by (self ID52, remote ID52) pairs.
4/// Each entry contains a channel for requesting new streams on that connection.
5/// Connections are automatically removed when they break or become unhealthy.
6///
7/// This type is used to reuse existing P2P connections instead of creating new ones
8/// for each request, improving performance and reducing connection overhead.
9pub type PeerStreamSenders = std::sync::Arc<
10    tokio::sync::Mutex<std::collections::HashMap<(SelfID52, RemoteID52), StreamRequestSender>>,
11>;
12
13type Stream = (iroh::endpoint::SendStream, iroh::endpoint::RecvStream);
14type StreamResult = eyre::Result<Stream>;
15type ReplyChannel = tokio::sync::oneshot::Sender<StreamResult>;
16type RemoteID52 = String;
17type SelfID52 = String;
18
19type StreamRequest = (crate::ProtocolHeader, ReplyChannel);
20
21type StreamRequestSender = tokio::sync::mpsc::Sender<StreamRequest>;
22type StreamRequestReceiver = tokio::sync::mpsc::Receiver<StreamRequest>;
23
24/// Gets or creates a bidirectional stream to a remote peer.
25///
26/// This function manages P2P connections efficiently by:
27/// 1. Reusing existing connections when available
28/// 2. Creating new connections when needed
29/// 3. Verifying connection health with protocol handshake
30/// 4. Automatically reconnecting on failure
31///
32/// The function sends the protocol header and waits for acknowledgment
33/// to ensure the stream is healthy before returning it.
34///
35/// # Arguments
36///
37/// * `self_endpoint` - Local Iroh endpoint
38/// * `header` - Protocol header to negotiate
39/// * `remote_node_id52` - ID52 of the target peer
40/// * `peer_stream_senders` - Connection pool for reuse
41/// * `graceful` - Graceful shutdown handle
42///
43/// # Returns
44///
45/// A tuple of (SendStream, RecvStream) ready for communication.
46///
47/// # Errors
48///
49/// Returns an error if connection fails or protocol negotiation times out.
50#[tracing::instrument(skip_all)]
51pub async fn get_stream(
52    self_endpoint: iroh::Endpoint,
53    header: crate::ProtocolHeader,
54    remote_node_id52: RemoteID52,
55    peer_stream_senders: PeerStreamSenders,
56    graceful: crate::Graceful,
57) -> eyre::Result<(iroh::endpoint::SendStream, iroh::endpoint::RecvStream)> {
58    use eyre::WrapErr;
59
60    tracing::trace!("get_stream: {header:?}");
61    let stream_request_sender = get_stream_request_sender(
62        self_endpoint,
63        remote_node_id52,
64        peer_stream_senders,
65        graceful,
66    )
67    .await;
68    tracing::trace!("got stream_request_sender");
69    let (reply_channel, receiver) = tokio::sync::oneshot::channel();
70
71    stream_request_sender
72        .send((header, reply_channel))
73        .await
74        .wrap_err_with(|| "failed to send on stream_request_sender")?;
75
76    tracing::trace!("sent stream request");
77
78    let r = receiver.await?;
79
80    tracing::trace!("got stream request reply");
81    r
82}
83
84#[tracing::instrument(skip_all)]
85async fn get_stream_request_sender(
86    self_endpoint: iroh::Endpoint,
87    remote_node_id52: RemoteID52,
88    peer_stream_senders: PeerStreamSenders,
89    graceful: crate::Graceful,
90) -> StreamRequestSender {
91    // Convert iroh::PublicKey to ID52 string
92    let self_id52 = data_encoding::BASE32_DNSSEC.encode(self_endpoint.node_id().as_bytes());
93    let mut senders = peer_stream_senders.lock().await;
94
95    if let Some(sender) = senders.get(&(self_id52.clone(), remote_node_id52.clone())) {
96        return sender.clone();
97    }
98
99    // TODO: figure out if the mpsc::channel is the right size
100    let (sender, receiver) = tokio::sync::mpsc::channel(1);
101    senders.insert(
102        (self_id52.clone(), remote_node_id52.clone()),
103        sender.clone(),
104    );
105    drop(senders);
106
107    let graceful_for_connection_manager = graceful.clone();
108    graceful.spawn(async move {
109        connection_manager(
110            receiver,
111            self_endpoint,
112            remote_node_id52.clone(),
113            graceful_for_connection_manager,
114        )
115        .await;
116
117        // cleanup the peer_stream_senders map, so no future tasks will try to use this.
118        let mut senders = peer_stream_senders.lock().await;
119        senders.remove(&(self_id52.clone(), remote_node_id52));
120    });
121
122    sender
123}
124
125async fn connection_manager(
126    mut receiver: StreamRequestReceiver,
127    self_endpoint: iroh::Endpoint,
128    remote_node_id52: RemoteID52,
129    graceful: crate::Graceful,
130) {
131    let e = match connection_manager_(
132        &mut receiver,
133        self_endpoint,
134        remote_node_id52.clone(),
135        graceful,
136    )
137    .await
138    {
139        Ok(()) => {
140            tracing::info!("connection manager closed");
141            return;
142        }
143        Err(e) => e,
144    };
145
146    // what is our error handling strategy?
147    //
148    // since an error has just occurred on our connection, it is best to cancel all concurrent
149    // tasks that depend on this connection, and let the next task recreate the connection, this
150    // way things are clean.
151    //
152    // we can try to keep the concurrent tasks open, and retry connection, but it increases the
153    // complexity of implementation, and it is not worth it for now.
154    //
155    // also note that connection_manager() and it's caller, get_stream(), are called to create the
156    // initial stream only, this error handling strategy will work for concurrent requests that are
157    // waiting for the stream to be created. the tasks that already got the stream will not be
158    // affected by this. tho, since something wrong has happened with the connection, they will
159    // eventually fail too.
160    tracing::error!("connection manager worker error: {e:?}");
161
162    // once we close the receiver, any tasks that have gotten access to the corresponding sender
163    // will fail when sending.
164    receiver.close();
165
166    // send an error to all the tasks that are waiting for stream for this receiver.
167    while let Some((_protocol, reply_channel)) = receiver.recv().await {
168        if reply_channel
169            .send(Err(eyre::anyhow!("failed to create connection: {e:?}")))
170            .is_err()
171        {
172            tracing::error!("failed to send error reply: {e:?}");
173        }
174    }
175}
176
177#[tracing::instrument(skip_all)]
178async fn connection_manager_(
179    receiver: &mut StreamRequestReceiver,
180    self_endpoint: iroh::Endpoint,
181    remote_node_id52: RemoteID52,
182    graceful: crate::Graceful,
183) -> eyre::Result<()> {
184    let conn = match self_endpoint
185        .connect(
186            {
187                // Convert ID52 to iroh::NodeId
188                use std::str::FromStr;
189                let public_key = fastn_id52::PublicKey::from_str(&remote_node_id52)
190                    .map_err(|e| eyre::anyhow!("{}", e))?;
191                iroh::NodeId::from(iroh::PublicKey::from_bytes(&public_key.to_bytes())?)
192            },
193            crate::APNS_IDENTITY,
194        )
195        .await
196    {
197        Ok(v) => v,
198        Err(e) => {
199            tracing::error!("failed to create connection: {e:?}");
200            return Err(eyre::anyhow!("failed to create connection: {e:?}"));
201        }
202    };
203
204    let timeout = std::time::Duration::from_secs(12);
205    let mut idle_counter = 0;
206
207    loop {
208        tracing::trace!("connection manager loop");
209
210        if idle_counter > 4 {
211            tracing::info!("connection idle timeout, returning");
212            // this ensures we keep a connection open only for 12 * 5 seconds = 1 min
213            break;
214        }
215
216        tokio::select! {
217            _ = graceful.cancelled() => {
218                tracing::info!("graceful shutdown");
219                break;
220            },
221            _ = tokio::time::sleep(timeout) => {
222                tracing::info!("woken up");
223                if let Err(e) = crate::ping(&conn).await {
224                    tracing::error!("pinging failed: {e:?}");
225                    break;
226                }
227                idle_counter += 1;
228            },
229            Some((header, reply_channel)) = receiver.recv() => {
230                tracing::info!("connection: {header:?}, idle counter: {idle_counter}");
231                idle_counter = 0;
232                // is this a good idea to serialize this part? if 10 concurrent requests come in, we will
233                // handle each one sequentially. the other alternative is to spawn a task for each request.
234                // so which is better?
235                //
236                // in general, if we do it in parallel via spawning, we will have better throughput.
237                //
238                // and we are not worried about having too many concurrent tasks, tho iroh has a limit on
239                // concurrent tasks[1], with a default of 100[2]. it is actually a todo to find out what
240                // happens when we hit this limit, do they handle it by queueing the tasks, or do they
241                // return an error. if they queue then we wont have to implement queue logic.
242                //
243                // [1]: https://docs.rs/iroh/0.34.1/iroh/endpoint/struct.TransportConfig.html#method.max_concurrent_bidi_streams
244                // [2]: https://docs.rs/iroh-quinn-proto/0.13.0/src/iroh_quinn_proto/config/transport.rs.html#354
245                //
246                // but all that is besides the point, we are worried about resilience right now, not
247                // throughput per se (throughput is secondary goal, resilience primary).
248                //
249                // say we have 10 concurrent requests and lets say if we spawned a task for each, what
250                // happens in error case? say connection failed, the device switched from wifi to 4g, or
251                // whatever? in the handler task, we are putting a timeout on the read. in the serial case
252                // the first request will timeout, and all subsequent requests will get immediately an
253                // error. its predictable, its clean.
254                //
255                // if the tasks were spawned, each will timeout independently.
256                //
257                // we can also no longer rely on this function, connection_manager_, returning an error for
258                // them, so our connection_handler strategy will interfere, we would have read more requests
259                // off of receiver.
260                //
261                // do note that this is not a clear winner problem, this is a tradeoff, we lose throughput,
262                // as in best case scenario, 10 concurrent tasks will be better. we will have to revisit
263                // this in future when we are performance optimising things.
264                if let Err(e) = handle_request(&conn, header, reply_channel).await {
265                    tracing::error!("failed to handle request: {e:?}");
266                    // note: we are intentionally not calling conn.close(). why? so that if some existing
267                    // stream is still open, if we explicitly call close on the connection, that stream will
268                    // immediately fail as well, and we do not want that. we want to let the stream fail
269                    // on its own, maybe it will work, maybe it will not.
270                    return Err(e);
271                }
272                tracing::info!("handled connection");
273            }
274            else => {
275                tracing::error!("failed to read from receiver");
276                break
277            },
278        }
279    }
280
281    Ok(())
282}
283
284async fn handle_request(
285    conn: &iroh::endpoint::Connection,
286    header: crate::ProtocolHeader,
287    reply_channel: ReplyChannel,
288) -> eyre::Result<()> {
289    use eyre::WrapErr;
290
291    tracing::trace!("handling request: {header:?}");
292
293    let (mut send, mut recv) = match conn.open_bi().await {
294        Ok(v) => {
295            tracing::trace!("opened bi-stream");
296            v
297        }
298        Err(e) => {
299            tracing::error!("failed to open_bi: {e:?}");
300            return Err(eyre::anyhow!("failed to open_bi: {e:?}"));
301        }
302    };
303
304    send.write_all(
305        &serde_json::to_vec(&header.protocol)
306            .wrap_err_with(|| format!("failed to serialize protocol: {:?}", header.protocol))?,
307    )
308    .await?;
309    tracing::trace!("wrote protocol");
310
311    send.write(b"\n")
312        .await
313        .wrap_err_with(|| "failed to write newline")?;
314
315    tracing::trace!("wrote newline");
316
317    if let Some(extra) = header.extra {
318        send.write_all(extra.as_bytes()).await?;
319        tracing::trace!("wrote protocol");
320
321        send.write(b"\n")
322            .await
323            .wrap_err_with(|| "failed to write newline")?;
324    }
325
326    let msg = crate::next_string(&mut recv).await?;
327
328    if msg != crate::ACK {
329        tracing::error!("failed to read ack: {msg:?}");
330        return Err(eyre::anyhow!("failed to read ack: {msg:?}"));
331    }
332
333    tracing::trace!("received ack");
334
335    reply_channel.send(Ok((send, recv))).unwrap_or_else(|e| {
336        tracing::error!("failed to send reply: {e:?}");
337    });
338
339    tracing::trace!("handle_request done");
340
341    Ok(())
342}