ftnet_utils/get_stream.rs
1/// PeerConnections stores the iroh connections for every peer.
2///
3/// when a connection is broken, etc., we remove the connection from the map.
4pub type PeerStreamSenders = std::sync::Arc<
5 tokio::sync::Mutex<std::collections::HashMap<(SelfID52, RemoteID52), StreamRequestSender>>,
6>;
7
8type Stream = (iroh::endpoint::SendStream, ftnet_utils::FrameReader);
9type StreamResult = eyre::Result<Stream>;
10type ReplyChannel = tokio::sync::oneshot::Sender<StreamResult>;
11type RemoteID52 = String;
12type SelfID52 = String;
13
14type StreamRequest = (ftnet_utils::Protocol, ReplyChannel);
15
16type StreamRequestSender = tokio::sync::mpsc::Sender<StreamRequest>;
17type StreamRequestReceiver = tokio::sync::mpsc::Receiver<StreamRequest>;
18
19/// get_stream tries to check if the bidirectional stream is healthy, as simply opening
20/// a bidirectional stream, or even simply writing on it does not guarantee that the stream is
21/// open. only the read request times out to tell us something is wrong. this is why get_stream
22/// takes the protocol as well, as every outgoing bi-direction stream must have a protocol. it
23/// sends the protocol and waits for an ack. if the ack is not received within a certain time, it
24/// assumes the connection is not healthy, and tries to recreate the connection.
25///
26/// for managing connection, we use a spawned task. this task listens for incoming stream requests
27/// and manages the connection as part of the task local data.
28pub async fn get_stream(
29 self_endpoint: iroh::Endpoint,
30 protocol: ftnet_utils::Protocol,
31 remote_node_id52: RemoteID52,
32 peer_stream_senders: PeerStreamSenders,
33) -> eyre::Result<(iroh::endpoint::SendStream, ftnet_utils::FrameReader)> {
34 let stream_request_sender =
35 get_stream_request_sender(self_endpoint, remote_node_id52, peer_stream_senders).await;
36
37 let (reply_channel, receiver) = tokio::sync::oneshot::channel();
38
39 stream_request_sender
40 .send((protocol, reply_channel))
41 .await?;
42
43 receiver.await?
44}
45
46async fn get_stream_request_sender(
47 self_endpoint: iroh::Endpoint,
48 remote_node_id52: RemoteID52,
49 peer_stream_senders: PeerStreamSenders,
50) -> StreamRequestSender {
51 let self_id52 = ftnet_utils::public_key_to_id52(&self_endpoint.node_id());
52 let mut senders = peer_stream_senders.lock().await;
53
54 if let Some(sender) = senders.get(&(self_id52.clone(), remote_node_id52.clone())) {
55 return sender.clone();
56 }
57
58 // TODO: figure out if the mpsc::channel is the right size
59 let (sender, receiver) = tokio::sync::mpsc::channel(1);
60 senders.insert(
61 (self_id52.clone(), remote_node_id52.clone()),
62 sender.clone(),
63 );
64 drop(senders);
65
66 tokio::spawn(async move {
67 connection_manager(
68 receiver,
69 self_id52,
70 self_endpoint,
71 remote_node_id52,
72 peer_stream_senders,
73 )
74 .await;
75 });
76
77 sender
78}
79
80async fn connection_manager(
81 mut receiver: StreamRequestReceiver,
82 self_id52: SelfID52,
83 self_endpoint: iroh::Endpoint,
84 remote_node_id52: RemoteID52,
85 peer_stream_senders: PeerStreamSenders,
86) {
87 let e = match connection_manager_(&mut receiver, self_endpoint, remote_node_id52.clone()).await
88 {
89 Ok(()) => return,
90 Err(e) => e,
91 };
92
93 // what is our error handling strategy?
94 //
95 // since an error has just occurred on our connection, it is best to cancel all concurrent
96 // tasks that depend on this connection, and let the next task recreate the connection, this
97 // way things are clean.
98 //
99 // we can try to keep the concurrent tasks open, and retry connection, but it increases the
100 // complexity of implementation, and it is not worth it for now.
101 //
102 // also note that connection_manager() and it's caller, get_stream(), are called to create the
103 // initial stream only, this error handling strategy will work for concurrent requests that are
104 // waiting for the stream to be created. the tasks that already got the stream will not be
105 // affected by this. tho, since something wrong has happened with the connection, they will
106 // eventually fail too.
107 tracing::error!("connection manager worker error: {e:?}");
108
109 // once we close the receiver, any tasks that have gotten access to the corresponding sender
110 // will fail when sending.
111 receiver.close();
112
113 // send an error to all the tasks that are waiting for stream for this receiver.
114 while let Some((_protocol, reply_channel)) = receiver.recv().await {
115 if reply_channel
116 .send(Err(eyre::anyhow!("failed to create connection: {e:?}")))
117 .is_err()
118 {
119 tracing::error!("failed to send error reply: {e:?}");
120 }
121 }
122
123 // cleanup the peer_stream_senders map, so no future tasks will try to use this.
124 let mut senders = peer_stream_senders.lock().await;
125 senders.remove(&(self_id52.clone(), remote_node_id52.clone()));
126}
127
128async fn connection_manager_(
129 receiver: &mut StreamRequestReceiver,
130 self_endpoint: iroh::Endpoint,
131 remote_node_id52: RemoteID52,
132) -> eyre::Result<()> {
133 let conn = match self_endpoint
134 .connect(
135 ftnet_utils::id52_to_public_key(&remote_node_id52)?,
136 ftnet_utils::APNS_IDENTITY,
137 )
138 .await
139 {
140 Ok(v) => v,
141 Err(e) => {
142 tracing::error!("failed to create connection: {e:?}");
143 return Err(eyre::anyhow!("failed to create connection: {e:?}"));
144 }
145 };
146
147 // TODO: if we do not get any task on receiver, we should send ping pong for the keep alive
148 // duration. hint: use tokio::select!{} here
149 while let Some((protocol, reply_channel)) = receiver.recv().await {
150 // is this a good idea to serialize this part? if 10 concurrent requests come in, we will
151 // handle each one sequentially. the other alternative is to spawn a task for each request.
152 // so which is better?
153 //
154 // in general, if we do it in parallel via spawning, we will have better throughput.
155 //
156 // and we are not worried about having too many concurrent tasks, tho iroh has a limit on
157 // concurrent tasks[1], with a default of 100[2]. it is actually a todo to find out what
158 // happens when we hit this limit, do they handle it by queueing the tasks, or do they
159 // return an error. if they queue then we wont have to implement queue logic.
160 //
161 // [1]: https://docs.rs/iroh/0.34.1/iroh/endpoint/struct.TransportConfig.html#method.max_concurrent_bidi_streams
162 // [2]: https://docs.rs/iroh-quinn-proto/0.13.0/src/iroh_quinn_proto/config/transport.rs.html#354
163 //
164 // but all that is besides the point, we are worried about resilience right now, not
165 // throughput per se (throughput is secondary goal, resilience primary).
166 //
167 // say we have 10 concurrent requests and lets say if we spawned a task for each, what
168 // happens in error case? say connection failed, the device switched from wifi to 4g, or
169 // whatever? in the handler task, we are putting a timeout on the read. in the serial case
170 // the first request will timeout, and all subsequent requests will get immediately an
171 // error. its predictable, its clean.
172 //
173 // if the tasks were spawned, each will timeout independently.
174 //
175 // we can also no longer rely on this function, connection_manager_, returning an error for
176 // them, so our connection_handler strategy will interfere, we would have read more requests
177 // off of receiver.
178 //
179 // do note that this is not a clear winner problem, this is a tradeoff, we lose throughput,
180 // as in best case scenario, 10 concurrent tasks will be better. we will have to revisit
181 // this in future when we are performance optimising things.
182 if let Err(e) = handle_request(&conn, protocol, reply_channel).await {
183 tracing::error!("failed to handle request: {e:?}");
184 // note: we are intentionally not calling conn.close(). why? so that if some existing
185 // stream is still open, if we explicitly call close on the connection, that stream will
186 // immediately fail as well, and we do not want that. we want to let the stream fail
187 // on its own, maybe it will work, maybe it will not.
188 return Err(e);
189 }
190 }
191
192 Ok(())
193}
194
195async fn handle_request(
196 conn: &iroh::endpoint::Connection,
197 protocol: ftnet_utils::Protocol,
198 reply_channel: ReplyChannel,
199) -> eyre::Result<()> {
200 use tokio_stream::StreamExt;
201
202 let (mut send, recv) = match conn.open_bi().await {
203 Ok(v) => v,
204 Err(e) => {
205 tracing::error!("failed to open_bi: {e:?}");
206 return Err(eyre::anyhow!("failed to open_bi: {e:?}"));
207 }
208 };
209
210 send.write_all(&serde_json::to_vec(&protocol)?).await?;
211 send.write(b"\n").await?;
212 let mut recv = ftnet_utils::frame_reader(recv);
213
214 let msg = match recv.next().await {
215 Some(v) => v?,
216 None => {
217 tracing::error!("failed to read from incoming connection");
218 return Err(eyre::anyhow!("failed to read from incoming connection"));
219 }
220 };
221
222 if msg != ftnet_utils::ACK {
223 tracing::error!("failed to read ack: {msg:?}");
224 return Err(eyre::anyhow!("failed to read ack: {msg:?}"));
225 }
226
227 reply_channel.send(Ok((send, recv))).unwrap_or_else(|_| {
228 tracing::error!("failed to send reply");
229 });
230
231 Ok(())
232}