1use std::collections::BTreeMap;
9use std::time::Duration;
10
11use async_channel::{Receiver, Sender, bounded};
12use async_trait::async_trait;
13use fedimint_core::PeerId;
14use fedimint_core::net::peers::{IP2PConnections, Recipient};
15use fedimint_core::task::{TaskGroup, sleep};
16use fedimint_core::util::FmtCompactAnyhow;
17use fedimint_core::util::backoff_util::{FibonacciBackoff, api_networking_backoff};
18use fedimint_logging::{LOG_CONSENSUS, LOG_NET_PEER};
19use fedimint_server_core::dashboard_ui::ConnectionType;
20use futures::FutureExt;
21use futures::future::select_all;
22use tokio::sync::watch;
23use tracing::{Instrument, info, info_span, warn};
24
25use crate::metrics::{PEER_CONNECT_COUNT, PEER_DISCONNECT_COUNT, PEER_MESSAGES_COUNT};
26use crate::net::p2p_connection::DynP2PConnection;
27use crate::net::p2p_connector::DynP2PConnector;
28
29pub type P2PStatusSenders = BTreeMap<PeerId, watch::Sender<Option<Duration>>>;
30pub type P2PStatusReceivers = BTreeMap<PeerId, watch::Receiver<Option<Duration>>>;
31
32pub type P2PConnectionTypeSenders = BTreeMap<PeerId, watch::Sender<ConnectionType>>;
33pub type P2PConnectionTypeReceivers = BTreeMap<PeerId, watch::Receiver<ConnectionType>>;
34
35pub fn p2p_status_channels(peers: Vec<PeerId>) -> (P2PStatusSenders, P2PStatusReceivers) {
36 let mut senders = BTreeMap::new();
37 let mut receivers = BTreeMap::new();
38
39 for peer in peers {
40 let (sender, receiver) = watch::channel(None);
41
42 senders.insert(peer, sender);
43 receivers.insert(peer, receiver);
44 }
45
46 (senders, receivers)
47}
48
49pub fn p2p_connection_type_channels(
50 peers: Vec<PeerId>,
51) -> (P2PConnectionTypeSenders, P2PConnectionTypeReceivers) {
52 let mut senders = BTreeMap::new();
53 let mut receivers = BTreeMap::new();
54
55 for peer in peers {
56 let (sender, receiver) = watch::channel(ConnectionType::Direct);
57
58 senders.insert(peer, sender);
59 receivers.insert(peer, receiver);
60 }
61
62 (senders, receivers)
63}
64
65#[derive(Clone)]
66pub struct ReconnectP2PConnections<M> {
67 connections: BTreeMap<PeerId, P2PConnection<M>>,
68}
69
70impl<M: Send + 'static> ReconnectP2PConnections<M> {
71 pub fn new(
72 identity: PeerId,
73 connector: DynP2PConnector<M>,
74 task_group: &TaskGroup,
75 status_senders: P2PStatusSenders,
76 connection_type_senders: P2PConnectionTypeSenders,
77 ) -> Self {
78 let mut connection_senders = BTreeMap::new();
79 let mut connections = BTreeMap::new();
80
81 for peer_id in connector.peers() {
82 assert_ne!(peer_id, identity);
83
84 let (connection_sender, connection_receiver) = bounded(4);
85
86 let connection = P2PConnection::new(
87 identity,
88 peer_id,
89 connector.clone(),
90 connection_receiver,
91 status_senders
92 .get(&peer_id)
93 .expect("No p2p status sender for peer")
94 .clone(),
95 connection_type_senders
96 .get(&peer_id)
97 .expect("No p2p connection type sender for peer")
98 .clone(),
99 task_group,
100 );
101
102 connection_senders.insert(peer_id, connection_sender);
103 connections.insert(peer_id, connection);
104 }
105
106 task_group.spawn_cancellable("handle-incoming-p2p-connections", async move {
107 info!(target: LOG_NET_PEER, "Starting listening task for p2p connections");
108
109 loop {
110 match connector.accept().await {
111 Ok((peer, connection)) => {
112 if connection_senders
113 .get_mut(&peer)
114 .expect("Authenticating connectors dont return unknown peers")
115 .send(connection)
116 .await
117 .is_err()
118 {
119 break;
120 }
121 },
122 Err(err) => {
123 warn!(target: LOG_NET_PEER, our_id = %identity, err = %err.fmt_compact_anyhow(), "Error while opening incoming connection");
124 }
125 }
126 }
127
128 info!(target: LOG_NET_PEER, "Shutting down listening task for p2p connections");
129 });
130
131 ReconnectP2PConnections { connections }
132 }
133}
134
135#[async_trait]
136impl<M: Clone + Send + 'static> IP2PConnections<M> for ReconnectP2PConnections<M> {
137 async fn send(&self, recipient: Recipient, message: M) {
138 match recipient {
139 Recipient::Everyone => {
140 for connection in self.connections.values() {
141 connection.send(message.clone()).await;
142 }
143 }
144 Recipient::Peer(peer) => match self.connections.get(&peer) {
145 Some(connection) => {
146 connection.send(message).await;
147 }
148 _ => {
149 warn!(target: LOG_NET_PEER, "No connection for peer {peer}");
150 }
151 },
152 }
153 }
154
155 fn try_send(&self, recipient: Recipient, message: M) {
156 match recipient {
157 Recipient::Everyone => {
158 for connection in self.connections.values() {
159 connection.try_send(message.clone());
160 }
161 }
162 Recipient::Peer(peer) => match self.connections.get(&peer) {
163 Some(connection) => {
164 connection.try_send(message);
165 }
166 _ => {
167 warn!(target: LOG_NET_PEER, "No connection for peer {peer}");
168 }
169 },
170 }
171 }
172
173 async fn receive(&self) -> Option<(PeerId, M)> {
174 select_all(self.connections.iter().map(|(&peer, connection)| {
175 Box::pin(connection.receive().map(move |m| m.map(|m| (peer, m))))
176 }))
177 .await
178 .0
179 }
180
181 async fn receive_from_peer(&self, peer: PeerId) -> Option<M> {
182 self.connections
183 .get(&peer)
184 .expect("No connection found for peer")
185 .receive()
186 .await
187 }
188}
189
190#[derive(Clone)]
191struct P2PConnection<M> {
192 outgoing: Sender<M>,
193 incoming: Receiver<M>,
194}
195
196impl<M: Send + 'static> P2PConnection<M> {
197 #[allow(clippy::too_many_arguments)]
198 fn new(
199 our_id: PeerId,
200 peer_id: PeerId,
201 connector: DynP2PConnector<M>,
202 incoming_connections: Receiver<DynP2PConnection<M>>,
203 status_sender: watch::Sender<Option<Duration>>,
204 connection_type_sender: watch::Sender<ConnectionType>,
205 task_group: &TaskGroup,
206 ) -> P2PConnection<M> {
207 let (outgoing_sender, outgoing_receiver) = bounded(1024);
208 let (incoming_sender, incoming_receiver) = bounded(1024);
209
210 let connector_clone = connector.clone();
211 let connection_type_sender_clone = connection_type_sender.clone();
212
213 task_group.spawn_cancellable(
215 format!("connection-type-poller-{peer_id}"),
216 async move {
217 let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
218 loop {
219 interval.tick().await;
220 let connection_type = connector_clone.connection_type(peer_id).await;
221 let _ = connection_type_sender_clone.send_replace(connection_type);
222 }
223 }
224 .instrument(info_span!("connection-type-poller", ?peer_id)),
225 );
226
227 task_group.spawn_cancellable(
228 format!("io-state-machine-{peer_id}"),
229 async move {
230 info!(target: LOG_NET_PEER, "Starting peer connection state machine");
231
232 let mut state_machine = P2PConnectionStateMachine {
233 common: P2PConnectionSMCommon {
234 incoming_sender,
235 outgoing_receiver,
236 our_id_str: our_id.to_string(),
237 our_id,
238 peer_id_str: peer_id.to_string(),
239 peer_id,
240 connector,
241 incoming_connections,
242 status_sender,
243 },
244 state: P2PConnectionSMState::Disconnected(api_networking_backoff()),
245 };
246
247 while let Some(sm) = state_machine.state_transition().await {
248 state_machine = sm;
249 }
250
251 info!(target: LOG_NET_PEER, "Shutting down peer connection state machine");
252 }
253 .instrument(info_span!("io-state-machine", ?peer_id)),
254 );
255
256 P2PConnection {
257 outgoing: outgoing_sender,
258 incoming: incoming_receiver,
259 }
260 }
261
262 async fn send(&self, message: M) {
263 self.outgoing.send(message).await.ok();
264 }
265
266 fn try_send(&self, message: M) {
267 self.outgoing.try_send(message).ok();
268 }
269
270 async fn receive(&self) -> Option<M> {
271 self.incoming.recv().await.ok()
272 }
273}
274
275struct P2PConnectionStateMachine<M> {
276 state: P2PConnectionSMState<M>,
277 common: P2PConnectionSMCommon<M>,
278}
279
280struct P2PConnectionSMCommon<M> {
281 incoming_sender: async_channel::Sender<M>,
282 outgoing_receiver: async_channel::Receiver<M>,
283 our_id: PeerId,
284 our_id_str: String,
285 peer_id: PeerId,
286 peer_id_str: String,
287 connector: DynP2PConnector<M>,
288 incoming_connections: Receiver<DynP2PConnection<M>>,
289 status_sender: watch::Sender<Option<Duration>>,
290}
291
292enum P2PConnectionSMState<M> {
293 Disconnected(FibonacciBackoff),
294 Connected(DynP2PConnection<M>),
295}
296
297impl<M: Send + 'static> P2PConnectionStateMachine<M> {
298 async fn state_transition(mut self) -> Option<Self> {
299 match self.state {
300 P2PConnectionSMState::Disconnected(backoff) => {
301 self.common.status_sender.send_replace(None);
302
303 self.common.transition_disconnected(backoff).await
304 }
305 P2PConnectionSMState::Connected(connection) => {
306 self.common
307 .status_sender
308 .send_replace(Some(connection.rtt()));
309
310 self.common.transition_connected(connection).await
311 }
312 }
313 .map(|state| P2PConnectionStateMachine {
314 common: self.common,
315 state,
316 })
317 }
318}
319
320impl<M: Send + 'static> P2PConnectionSMCommon<M> {
321 async fn transition_connected(
322 &mut self,
323 mut connection: DynP2PConnection<M>,
324 ) -> Option<P2PConnectionSMState<M>> {
325 tokio::select! {
326 message = self.outgoing_receiver.recv() => {
327 Some(self.send_message(connection, message.ok()?).await)
328 },
329 connection = self.incoming_connections.recv() => {
330 info!(target: LOG_NET_PEER, "Connected to peer");
331
332 Some(P2PConnectionSMState::Connected(connection.ok()?))
333 },
334 message = connection.receive() => {
335 let mut message = match message {
336 Ok(message) => message,
337 Err(e) => return Some(self.disconnect(e)),
338 };
339
340 match message.read_to_end().await {
341 Ok(message) => {
342 PEER_MESSAGES_COUNT
343 .with_label_values(&[self.our_id_str.as_str(), self.peer_id_str.as_str(), "incoming"])
344 .inc();
345
346 self.incoming_sender.send(message).await.ok()?;
347 },
348 Err(e) => return Some(self.disconnect(e)),
349 }
350
351 Some(P2PConnectionSMState::Connected(connection))
352 },
353 }
354 }
355
356 fn disconnect(&self, error: anyhow::Error) -> P2PConnectionSMState<M> {
357 info!(target: LOG_NET_PEER, "Disconnected from peer: {}", error);
358
359 PEER_DISCONNECT_COUNT
360 .with_label_values(&[&self.our_id_str, &self.peer_id_str])
361 .inc();
362
363 P2PConnectionSMState::Disconnected(api_networking_backoff())
364 }
365
366 async fn send_message(
367 &mut self,
368 mut connection: DynP2PConnection<M>,
369 peer_message: M,
370 ) -> P2PConnectionSMState<M> {
371 PEER_MESSAGES_COUNT
372 .with_label_values(&[
373 self.our_id_str.as_str(),
374 self.peer_id_str.as_str(),
375 "outgoing",
376 ])
377 .inc();
378
379 if let Err(e) = connection.send(peer_message).await {
380 return self.disconnect(e);
381 }
382
383 P2PConnectionSMState::Connected(connection)
384 }
385
386 async fn transition_disconnected(
387 &mut self,
388 mut backoff: FibonacciBackoff,
389 ) -> Option<P2PConnectionSMState<M>> {
390 tokio::select! {
391 connection = self.incoming_connections.recv() => {
392 PEER_CONNECT_COUNT
393 .with_label_values(&[self.our_id_str.as_str(), self.peer_id_str.as_str(), "incoming"])
394 .inc();
395
396 info!(target: LOG_NET_PEER, "Connected to peer");
397
398 Some(P2PConnectionSMState::Connected(connection.ok()?))
399 },
400 () = sleep(backoff.next().expect("Unlimited retries")), if self.our_id < self.peer_id => {
401 info!(target: LOG_NET_PEER, "Attempting to reconnect to peer");
404
405 match self.connector.connect(self.peer_id).await {
406 Ok(connection) => {
407 PEER_CONNECT_COUNT
408 .with_label_values(&[self.our_id_str.as_str(), self.peer_id_str.as_str(), "outgoing"])
409 .inc();
410
411 info!(target: LOG_NET_PEER, "Connected to peer");
412
413 return Some(P2PConnectionSMState::Connected(connection));
414 }
415 Err(e) => warn!(target: LOG_CONSENSUS, "Failed to connect to peer: {e}")
416 }
417
418 Some(P2PConnectionSMState::Disconnected(backoff))
419 },
420 }
421 }
422}