ftnet_utils/
get_stream.rs

1/// PeerConnections stores the iroh connections for every peer.
2///
3/// when a connection is broken, etc., we remove the connection from the map.
4pub type PeerStreamSenders = std::sync::Arc<
5    tokio::sync::Mutex<std::collections::HashMap<(SelfID52, RemoteID52), StreamRequestSender>>,
6>;
7
8type Stream = (iroh::endpoint::SendStream, ftnet_utils::FrameReader);
9type StreamResult = eyre::Result<Stream>;
10type ReplyChannel = tokio::sync::oneshot::Sender<StreamResult>;
11type RemoteID52 = String;
12type SelfID52 = String;
13
14type StreamRequest = (ftnet_utils::Protocol, ReplyChannel);
15
16type StreamRequestSender = tokio::sync::mpsc::Sender<StreamRequest>;
17type StreamRequestReceiver = tokio::sync::mpsc::Receiver<StreamRequest>;
18
19/// get_stream tries to check if the bidirectional stream is healthy, as simply opening
20/// a bidirectional stream, or even simply writing on it does not guarantee that the stream is
21/// open. only the read request times out to tell us something is wrong. this is why get_stream
22/// takes the protocol as well, as every outgoing bi-direction stream must have a protocol. it
23/// sends the protocol and waits for an ack. if the ack is not received within a certain time, it
24/// assumes the connection is not healthy, and tries to recreate the connection.
25///
26/// for managing connection, we use a spawned task. this task listens for incoming stream requests
27/// and manages the connection as part of the task local data.
28pub async fn get_stream(
29    self_endpoint: iroh::Endpoint,
30    protocol: ftnet_utils::Protocol,
31    remote_node_id52: RemoteID52,
32    peer_stream_senders: PeerStreamSenders,
33) -> eyre::Result<(iroh::endpoint::SendStream, ftnet_utils::FrameReader)> {
34    let stream_request_sender =
35        get_stream_request_sender(self_endpoint, remote_node_id52, peer_stream_senders).await;
36
37    let (reply_channel, receiver) = tokio::sync::oneshot::channel();
38
39    stream_request_sender
40        .send((protocol, reply_channel))
41        .await?;
42
43    receiver.await?
44}
45
46async fn get_stream_request_sender(
47    self_endpoint: iroh::Endpoint,
48    remote_node_id52: RemoteID52,
49    peer_stream_senders: PeerStreamSenders,
50) -> StreamRequestSender {
51    let self_id52 = ftnet_utils::public_key_to_id52(&self_endpoint.node_id());
52    let mut senders = peer_stream_senders.lock().await;
53
54    if let Some(sender) = senders.get(&(self_id52.clone(), remote_node_id52.clone())) {
55        return sender.clone();
56    }
57
58    // TODO: figure out if the mpsc::channel is the right size
59    let (sender, receiver) = tokio::sync::mpsc::channel(1);
60    senders.insert(
61        (self_id52.clone(), remote_node_id52.clone()),
62        sender.clone(),
63    );
64    drop(senders);
65
66    tokio::spawn(async move {
67        connection_manager(
68            receiver,
69            self_id52,
70            self_endpoint,
71            remote_node_id52,
72            peer_stream_senders,
73        )
74        .await;
75    });
76
77    sender
78}
79
80async fn connection_manager(
81    mut receiver: StreamRequestReceiver,
82    self_id52: SelfID52,
83    self_endpoint: iroh::Endpoint,
84    remote_node_id52: RemoteID52,
85    peer_stream_senders: PeerStreamSenders,
86) {
87    let e = match connection_manager_(&mut receiver, self_endpoint, remote_node_id52.clone()).await
88    {
89        Ok(()) => return,
90        Err(e) => e,
91    };
92
93    // what is our error handling strategy?
94    //
95    // since an error has just occurred on our connection, it is best to cancel all concurrent
96    // tasks that depend on this connection, and let the next task recreate the connection, this
97    // way things are clean.
98    //
99    // we can try to keep the concurrent tasks open, and retry connection, but it increases the
100    // complexity of implementation, and it is not worth it for now.
101    //
102    // also note that connection_manager() and it's caller, get_stream(), are called to create the
103    // initial stream only, this error handling strategy will work for concurrent requests that are
104    // waiting for the stream to be created. the tasks that already got the stream will not be
105    // affected by this. tho, since something wrong has happened with the connection, they will
106    // eventually fail too.
107    tracing::error!("connection manager worker error: {e:?}");
108
109    // once we close the receiver, any tasks that have gotten access to the corresponding sender
110    // will fail when sending.
111    receiver.close();
112
113    // send an error to all the tasks that are waiting for stream for this receiver.
114    while let Some((_protocol, reply_channel)) = receiver.recv().await {
115        if reply_channel
116            .send(Err(eyre::anyhow!("failed to create connection: {e:?}")))
117            .is_err()
118        {
119            tracing::error!("failed to send error reply: {e:?}");
120        }
121    }
122
123    // cleanup the peer_stream_senders map, so no future tasks will try to use this.
124    let mut senders = peer_stream_senders.lock().await;
125    senders.remove(&(self_id52.clone(), remote_node_id52.clone()));
126}
127
128async fn connection_manager_(
129    receiver: &mut StreamRequestReceiver,
130    self_endpoint: iroh::Endpoint,
131    remote_node_id52: RemoteID52,
132) -> eyre::Result<()> {
133    let conn = match self_endpoint
134        .connect(
135            ftnet_utils::id52_to_public_key(&remote_node_id52)?,
136            ftnet_utils::APNS_IDENTITY,
137        )
138        .await
139    {
140        Ok(v) => v,
141        Err(e) => {
142            tracing::error!("failed to create connection: {e:?}");
143            return Err(eyre::anyhow!("failed to create connection: {e:?}"));
144        }
145    };
146
147    // TODO: if we do not get any task on receiver, we should send ping pong for the keep alive
148    //       duration. hint: use tokio::select!{} here
149    while let Some((protocol, reply_channel)) = receiver.recv().await {
150        // is this a good idea to serialize this part? if 10 concurrent requests come in, we will
151        // handle each one sequentially. the other alternative is to spawn a task for each request.
152        // so which is better?
153        //
154        // in general, if we do it in parallel via spawning, we will have better throughput.
155        //
156        // and we are not worried about having too many concurrent tasks, tho iroh has a limit on
157        // concurrent tasks[1], with a default of 100[2]. it is actually a todo to find out what
158        // happens when we hit this limit, do they handle it by queueing the tasks, or do they
159        // return an error. if they queue then we wont have to implement queue logic.
160        //
161        // [1]: https://docs.rs/iroh/0.34.1/iroh/endpoint/struct.TransportConfig.html#method.max_concurrent_bidi_streams
162        // [2]: https://docs.rs/iroh-quinn-proto/0.13.0/src/iroh_quinn_proto/config/transport.rs.html#354
163        //
164        // but all that is besides the point, we are worried about resilience right now, not
165        // throughput per se (throughput is secondary goal, resilience primary).
166        //
167        // say we have 10 concurrent requests and lets say if we spawned a task for each, what
168        // happens in error case? say connection failed, the device switched from wifi to 4g, or
169        // whatever? in the handler task, we are putting a timeout on the read. in the serial case
170        // the first request will timeout, and all subsequent requests will get immediately an
171        // error. its predictable, its clean.
172        //
173        // if the tasks were spawned, each will timeout independently.
174        //
175        // we can also no longer rely on this function, connection_manager_, returning an error for
176        // them, so our connection_handler strategy will interfere, we would have read more requests
177        // off of receiver.
178        //
179        // do note that this is not a clear winner problem, this is a tradeoff, we lose throughput,
180        // as in best case scenario, 10 concurrent tasks will be better. we will have to revisit
181        // this in future when we are performance optimising things.
182        if let Err(e) = handle_request(&conn, protocol, reply_channel).await {
183            tracing::error!("failed to handle request: {e:?}");
184            // note: we are intentionally not calling conn.close(). why? so that if some existing
185            // stream is still open, if we explicitly call close on the connection, that stream will
186            // immediately fail as well, and we do not want that. we want to let the stream fail
187            // on its own, maybe it will work, maybe it will not.
188            return Err(e);
189        }
190    }
191
192    Ok(())
193}
194
195async fn handle_request(
196    conn: &iroh::endpoint::Connection,
197    protocol: ftnet_utils::Protocol,
198    reply_channel: ReplyChannel,
199) -> eyre::Result<()> {
200    use tokio_stream::StreamExt;
201
202    let (mut send, recv) = match conn.open_bi().await {
203        Ok(v) => v,
204        Err(e) => {
205            tracing::error!("failed to open_bi: {e:?}");
206            return Err(eyre::anyhow!("failed to open_bi: {e:?}"));
207        }
208    };
209
210    send.write_all(&serde_json::to_vec(&protocol)?).await?;
211    send.write(b"\n").await?;
212    let mut recv = ftnet_utils::frame_reader(recv);
213
214    let msg = match recv.next().await {
215        Some(v) => v?,
216        None => {
217            tracing::error!("failed to read from incoming connection");
218            return Err(eyre::anyhow!("failed to read from incoming connection"));
219        }
220    };
221
222    if msg != ftnet_utils::ACK {
223        tracing::error!("failed to read ack: {msg:?}");
224        return Err(eyre::anyhow!("failed to read ack: {msg:?}"));
225    }
226
227    reply_channel.send(Ok((send, recv))).unwrap_or_else(|_| {
228        tracing::error!("failed to send reply");
229    });
230
231    Ok(())
232}