fedimint_server/net/
p2p.rs

1//! Implements a connection manager for communication with other federation
2//! members
3//!
4//! The main interface is [`fedimint_core::net::peers::IP2PConnections`] and
5//! its main implementation is [`ReconnectP2PConnections`], see these for
6//! details.
7
8use std::collections::BTreeMap;
9use std::time::Duration;
10
11use async_channel::{Receiver, Sender, bounded};
12use async_trait::async_trait;
13use fedimint_core::PeerId;
14use fedimint_core::net::peers::{IP2PConnections, Recipient};
15use fedimint_core::task::{TaskGroup, sleep};
16use fedimint_core::util::FmtCompactAnyhow;
17use fedimint_core::util::backoff_util::{FibonacciBackoff, api_networking_backoff};
18use fedimint_logging::{LOG_CONSENSUS, LOG_NET_PEER};
19use fedimint_server_core::dashboard_ui::ConnectionType;
20use futures::FutureExt;
21use futures::future::select_all;
22use tokio::sync::watch;
23use tracing::{Instrument, info, info_span, warn};
24
25use crate::metrics::{PEER_CONNECT_COUNT, PEER_DISCONNECT_COUNT, PEER_MESSAGES_COUNT};
26use crate::net::p2p_connection::DynP2PConnection;
27use crate::net::p2p_connector::DynP2PConnector;
28
29pub type P2PStatusSenders = BTreeMap<PeerId, watch::Sender<Option<Duration>>>;
30pub type P2PStatusReceivers = BTreeMap<PeerId, watch::Receiver<Option<Duration>>>;
31
32pub type P2PConnectionTypeSenders = BTreeMap<PeerId, watch::Sender<ConnectionType>>;
33pub type P2PConnectionTypeReceivers = BTreeMap<PeerId, watch::Receiver<ConnectionType>>;
34
35pub fn p2p_status_channels(peers: Vec<PeerId>) -> (P2PStatusSenders, P2PStatusReceivers) {
36    let mut senders = BTreeMap::new();
37    let mut receivers = BTreeMap::new();
38
39    for peer in peers {
40        let (sender, receiver) = watch::channel(None);
41
42        senders.insert(peer, sender);
43        receivers.insert(peer, receiver);
44    }
45
46    (senders, receivers)
47}
48
49pub fn p2p_connection_type_channels(
50    peers: Vec<PeerId>,
51) -> (P2PConnectionTypeSenders, P2PConnectionTypeReceivers) {
52    let mut senders = BTreeMap::new();
53    let mut receivers = BTreeMap::new();
54
55    for peer in peers {
56        let (sender, receiver) = watch::channel(ConnectionType::Direct);
57
58        senders.insert(peer, sender);
59        receivers.insert(peer, receiver);
60    }
61
62    (senders, receivers)
63}
64
65#[derive(Clone)]
66pub struct ReconnectP2PConnections<M> {
67    connections: BTreeMap<PeerId, P2PConnection<M>>,
68}
69
70impl<M: Send + 'static> ReconnectP2PConnections<M> {
71    pub fn new(
72        identity: PeerId,
73        connector: DynP2PConnector<M>,
74        task_group: &TaskGroup,
75        status_senders: P2PStatusSenders,
76        connection_type_senders: P2PConnectionTypeSenders,
77    ) -> Self {
78        let mut connection_senders = BTreeMap::new();
79        let mut connections = BTreeMap::new();
80
81        for peer_id in connector.peers() {
82            assert_ne!(peer_id, identity);
83
84            let (connection_sender, connection_receiver) = bounded(4);
85
86            let connection = P2PConnection::new(
87                identity,
88                peer_id,
89                connector.clone(),
90                connection_receiver,
91                status_senders
92                    .get(&peer_id)
93                    .expect("No p2p status sender for peer")
94                    .clone(),
95                connection_type_senders
96                    .get(&peer_id)
97                    .expect("No p2p connection type sender for peer")
98                    .clone(),
99                task_group,
100            );
101
102            connection_senders.insert(peer_id, connection_sender);
103            connections.insert(peer_id, connection);
104        }
105
106        task_group.spawn_cancellable("handle-incoming-p2p-connections", async move {
107            info!(target: LOG_NET_PEER, "Starting listening task for p2p connections");
108
109            loop {
110                match connector.accept().await {
111                    Ok((peer, connection)) => {
112                        if connection_senders
113                            .get_mut(&peer)
114                            .expect("Authenticating connectors dont return unknown peers")
115                            .send(connection)
116                            .await
117                            .is_err()
118                        {
119                            break;
120                        }
121                    },
122                    Err(err) => {
123                        warn!(target: LOG_NET_PEER, our_id = %identity, err = %err.fmt_compact_anyhow(), "Error while opening incoming connection");
124                    }
125                }
126            }
127
128            info!(target: LOG_NET_PEER, "Shutting down listening task for p2p connections");
129        });
130
131        ReconnectP2PConnections { connections }
132    }
133}
134
135#[async_trait]
136impl<M: Clone + Send + 'static> IP2PConnections<M> for ReconnectP2PConnections<M> {
137    async fn send(&self, recipient: Recipient, message: M) {
138        match recipient {
139            Recipient::Everyone => {
140                for connection in self.connections.values() {
141                    connection.send(message.clone()).await;
142                }
143            }
144            Recipient::Peer(peer) => match self.connections.get(&peer) {
145                Some(connection) => {
146                    connection.send(message).await;
147                }
148                _ => {
149                    warn!(target: LOG_NET_PEER, "No connection for peer {peer}");
150                }
151            },
152        }
153    }
154
155    fn try_send(&self, recipient: Recipient, message: M) {
156        match recipient {
157            Recipient::Everyone => {
158                for connection in self.connections.values() {
159                    connection.try_send(message.clone());
160                }
161            }
162            Recipient::Peer(peer) => match self.connections.get(&peer) {
163                Some(connection) => {
164                    connection.try_send(message);
165                }
166                _ => {
167                    warn!(target: LOG_NET_PEER, "No connection for peer {peer}");
168                }
169            },
170        }
171    }
172
173    async fn receive(&self) -> Option<(PeerId, M)> {
174        select_all(self.connections.iter().map(|(&peer, connection)| {
175            Box::pin(connection.receive().map(move |m| m.map(|m| (peer, m))))
176        }))
177        .await
178        .0
179    }
180
181    async fn receive_from_peer(&self, peer: PeerId) -> Option<M> {
182        self.connections
183            .get(&peer)
184            .expect("No connection found for peer")
185            .receive()
186            .await
187    }
188}
189
190#[derive(Clone)]
191struct P2PConnection<M> {
192    outgoing: Sender<M>,
193    incoming: Receiver<M>,
194}
195
196impl<M: Send + 'static> P2PConnection<M> {
197    #[allow(clippy::too_many_arguments)]
198    fn new(
199        our_id: PeerId,
200        peer_id: PeerId,
201        connector: DynP2PConnector<M>,
202        incoming_connections: Receiver<DynP2PConnection<M>>,
203        status_sender: watch::Sender<Option<Duration>>,
204        connection_type_sender: watch::Sender<ConnectionType>,
205        task_group: &TaskGroup,
206    ) -> P2PConnection<M> {
207        let (outgoing_sender, outgoing_receiver) = bounded(1024);
208        let (incoming_sender, incoming_receiver) = bounded(1024);
209
210        let connector_clone = connector.clone();
211        let connection_type_sender_clone = connection_type_sender.clone();
212
213        // Spawn periodic connection type polling task
214        task_group.spawn_cancellable(
215            format!("connection-type-poller-{peer_id}"),
216            async move {
217                let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
218                loop {
219                    interval.tick().await;
220                    let connection_type = connector_clone.connection_type(peer_id).await;
221                    let _ = connection_type_sender_clone.send_replace(connection_type);
222                }
223            }
224            .instrument(info_span!("connection-type-poller", ?peer_id)),
225        );
226
227        task_group.spawn_cancellable(
228            format!("io-state-machine-{peer_id}"),
229            async move {
230                info!(target: LOG_NET_PEER, "Starting peer connection state machine");
231
232                let mut state_machine = P2PConnectionStateMachine {
233                    common: P2PConnectionSMCommon {
234                        incoming_sender,
235                        outgoing_receiver,
236                        our_id_str: our_id.to_string(),
237                        our_id,
238                        peer_id_str: peer_id.to_string(),
239                        peer_id,
240                        connector,
241                        incoming_connections,
242                        status_sender,
243                    },
244                    state: P2PConnectionSMState::Disconnected(api_networking_backoff()),
245                };
246
247                while let Some(sm) = state_machine.state_transition().await {
248                    state_machine = sm;
249                }
250
251                info!(target: LOG_NET_PEER, "Shutting down peer connection state machine");
252            }
253            .instrument(info_span!("io-state-machine", ?peer_id)),
254        );
255
256        P2PConnection {
257            outgoing: outgoing_sender,
258            incoming: incoming_receiver,
259        }
260    }
261
262    async fn send(&self, message: M) {
263        self.outgoing.send(message).await.ok();
264    }
265
266    fn try_send(&self, message: M) {
267        self.outgoing.try_send(message).ok();
268    }
269
270    async fn receive(&self) -> Option<M> {
271        self.incoming.recv().await.ok()
272    }
273}
274
275struct P2PConnectionStateMachine<M> {
276    state: P2PConnectionSMState<M>,
277    common: P2PConnectionSMCommon<M>,
278}
279
280struct P2PConnectionSMCommon<M> {
281    incoming_sender: async_channel::Sender<M>,
282    outgoing_receiver: async_channel::Receiver<M>,
283    our_id: PeerId,
284    our_id_str: String,
285    peer_id: PeerId,
286    peer_id_str: String,
287    connector: DynP2PConnector<M>,
288    incoming_connections: Receiver<DynP2PConnection<M>>,
289    status_sender: watch::Sender<Option<Duration>>,
290}
291
292enum P2PConnectionSMState<M> {
293    Disconnected(FibonacciBackoff),
294    Connected(DynP2PConnection<M>),
295}
296
297impl<M: Send + 'static> P2PConnectionStateMachine<M> {
298    async fn state_transition(mut self) -> Option<Self> {
299        match self.state {
300            P2PConnectionSMState::Disconnected(backoff) => {
301                self.common.status_sender.send_replace(None);
302
303                self.common.transition_disconnected(backoff).await
304            }
305            P2PConnectionSMState::Connected(connection) => {
306                self.common
307                    .status_sender
308                    .send_replace(Some(connection.rtt()));
309
310                self.common.transition_connected(connection).await
311            }
312        }
313        .map(|state| P2PConnectionStateMachine {
314            common: self.common,
315            state,
316        })
317    }
318}
319
320impl<M: Send + 'static> P2PConnectionSMCommon<M> {
321    async fn transition_connected(
322        &mut self,
323        mut connection: DynP2PConnection<M>,
324    ) -> Option<P2PConnectionSMState<M>> {
325        tokio::select! {
326            message = self.outgoing_receiver.recv() => {
327                Some(self.send_message(connection, message.ok()?).await)
328            },
329            connection = self.incoming_connections.recv() => {
330                info!(target: LOG_NET_PEER, "Connected to peer");
331
332                Some(P2PConnectionSMState::Connected(connection.ok()?))
333            },
334            message = connection.receive() => {
335                let mut message = match message {
336                    Ok(message) => message,
337                    Err(e) => return Some(self.disconnect(e)),
338                };
339
340                match message.read_to_end().await {
341                    Ok(message) => {
342                        PEER_MESSAGES_COUNT
343                            .with_label_values(&[self.our_id_str.as_str(), self.peer_id_str.as_str(), "incoming"])
344                            .inc();
345
346                         self.incoming_sender.send(message).await.ok()?;
347                    },
348                    Err(e) => return Some(self.disconnect(e)),
349                }
350
351                Some(P2PConnectionSMState::Connected(connection))
352            },
353        }
354    }
355
356    fn disconnect(&self, error: anyhow::Error) -> P2PConnectionSMState<M> {
357        info!(target: LOG_NET_PEER, "Disconnected from peer: {}",  error);
358
359        PEER_DISCONNECT_COUNT
360            .with_label_values(&[&self.our_id_str, &self.peer_id_str])
361            .inc();
362
363        P2PConnectionSMState::Disconnected(api_networking_backoff())
364    }
365
366    async fn send_message(
367        &mut self,
368        mut connection: DynP2PConnection<M>,
369        peer_message: M,
370    ) -> P2PConnectionSMState<M> {
371        PEER_MESSAGES_COUNT
372            .with_label_values(&[
373                self.our_id_str.as_str(),
374                self.peer_id_str.as_str(),
375                "outgoing",
376            ])
377            .inc();
378
379        if let Err(e) = connection.send(peer_message).await {
380            return self.disconnect(e);
381        }
382
383        P2PConnectionSMState::Connected(connection)
384    }
385
386    async fn transition_disconnected(
387        &mut self,
388        mut backoff: FibonacciBackoff,
389    ) -> Option<P2PConnectionSMState<M>> {
390        tokio::select! {
391            connection = self.incoming_connections.recv() => {
392                PEER_CONNECT_COUNT
393                    .with_label_values(&[self.our_id_str.as_str(), self.peer_id_str.as_str(), "incoming"])
394                    .inc();
395
396                info!(target: LOG_NET_PEER, "Connected to peer");
397
398                Some(P2PConnectionSMState::Connected(connection.ok()?))
399            },
400            () = sleep(backoff.next().expect("Unlimited retries")), if self.our_id < self.peer_id => {
401                // to prevent "reconnection ping-pongs", only the side with lower PeerId is responsible for reconnecting
402
403                info!(target: LOG_NET_PEER, "Attempting to reconnect to peer");
404
405                match  self.connector.connect(self.peer_id).await {
406                    Ok(connection) => {
407                        PEER_CONNECT_COUNT
408                            .with_label_values(&[self.our_id_str.as_str(), self.peer_id_str.as_str(), "outgoing"])
409                            .inc();
410
411                        info!(target: LOG_NET_PEER, "Connected to peer");
412
413                        return Some(P2PConnectionSMState::Connected(connection));
414                    }
415                    Err(e) => warn!(target: LOG_CONSENSUS, "Failed to connect to peer: {e}")
416                }
417
418                Some(P2PConnectionSMState::Disconnected(backoff))
419            },
420        }
421    }
422}