informalsystems_malachitebft_network/
lib.rs

1use std::collections::HashMap;
2use std::error::Error;
3use std::ops::ControlFlow;
4use std::time::Duration;
5
6use futures::StreamExt;
7use libp2p::metrics::{Metrics, Recorder};
8use libp2p::request_response::{InboundRequestId, OutboundRequestId};
9use libp2p::swarm::{self, SwarmEvent};
10use libp2p::{gossipsub, identify, quic, SwarmBuilder};
11use libp2p_broadcast as broadcast;
12use tokio::sync::{mpsc, oneshot};
13use tracing::{debug, error, error_span, info, trace, warn, Instrument};
14
15use malachitebft_discovery::{self as discovery};
16use malachitebft_metrics::SharedRegistry;
17use malachitebft_sync::{self as sync};
18
19pub use malachitebft_peer::PeerId;
20
21pub use bytes::Bytes;
22pub use libp2p::gossipsub::MessageId;
23pub use libp2p::identity::Keypair;
24pub use libp2p::Multiaddr;
25
26pub mod behaviour;
27pub mod handle;
28pub mod pubsub;
29
30mod channel;
31pub use channel::Channel;
32
33use behaviour::{Behaviour, NetworkEvent};
34use handle::Handle;
35
36const PROTOCOL: &str = "/malachitebft-core-consensus/v1beta1";
37const METRICS_PREFIX: &str = "malachitebft_network";
38const DISCOVERY_METRICS_PREFIX: &str = "malachitebft_discovery";
39
40#[derive(Copy, Clone, Debug, Default)]
41pub enum PubSubProtocol {
42    /// GossipSub: a pubsub protocol based on epidemic broadcast trees
43    #[default]
44    GossipSub,
45
46    /// Broadcast: a simple broadcast protocol
47    Broadcast,
48}
49
50impl PubSubProtocol {
51    pub fn is_gossipsub(&self) -> bool {
52        matches!(self, Self::GossipSub)
53    }
54
55    pub fn is_broadcast(&self) -> bool {
56        matches!(self, Self::Broadcast)
57    }
58}
59
60#[derive(Copy, Clone, Debug)]
61pub struct GossipSubConfig {
62    pub mesh_n: usize,
63    pub mesh_n_high: usize,
64    pub mesh_n_low: usize,
65    pub mesh_outbound_min: usize,
66}
67
68impl Default for GossipSubConfig {
69    fn default() -> Self {
70        // Tests use these defaults.
71        Self {
72            mesh_n: 6,
73            mesh_n_high: 12,
74            mesh_n_low: 4,
75            mesh_outbound_min: 2,
76        }
77    }
78}
79
80pub type BoxError = Box<dyn Error + Send + Sync + 'static>;
81
82pub type DiscoveryConfig = discovery::Config;
83pub type BootstrapProtocol = discovery::config::BootstrapProtocol;
84pub type Selector = discovery::config::Selector;
85
86#[derive(Clone, Debug)]
87pub struct Config {
88    pub listen_addr: Multiaddr,
89    pub persistent_peers: Vec<Multiaddr>,
90    pub discovery: DiscoveryConfig,
91    pub idle_connection_timeout: Duration,
92    pub transport: TransportProtocol,
93    pub gossipsub: GossipSubConfig,
94    pub pubsub_protocol: PubSubProtocol,
95    pub rpc_max_size: usize,
96    pub pubsub_max_size: usize,
97    pub enable_sync: bool,
98}
99
100impl Config {
101    fn apply_to_swarm(&self, cfg: swarm::Config) -> swarm::Config {
102        cfg.with_idle_connection_timeout(self.idle_connection_timeout)
103    }
104
105    fn apply_to_quic(&self, mut cfg: quic::Config) -> quic::Config {
106        // NOTE: This is set low due to quic transport not properly resetting
107        // connection state when reconnecting before connection timeout.
108        // See https://github.com/libp2p/rust-libp2p/issues/5097
109        cfg.max_idle_timeout = 300;
110        cfg.keep_alive_interval = Duration::from_millis(100);
111        cfg
112    }
113}
114
115#[derive(Copy, Clone, Debug, PartialEq, Eq)]
116pub enum TransportProtocol {
117    Tcp,
118    Quic,
119}
120
121impl TransportProtocol {
122    pub fn from_multiaddr(multiaddr: &Multiaddr) -> Option<TransportProtocol> {
123        for protocol in multiaddr.protocol_stack() {
124            match protocol {
125                "tcp" => return Some(TransportProtocol::Tcp),
126                "quic" | "quic-v1" => return Some(TransportProtocol::Quic),
127                _ => {}
128            }
129        }
130        None
131    }
132}
133
134/// sync event details:
135///
136/// peer1: sync                  peer2: network                    peer2: sync              peer1: network
137/// CtrlMsg::SyncRequest       --> Event::Sync      -----------> CtrlMsg::SyncReply ------> Event::Sync
138/// (peer_id, height)             (RawMessage::Request           (request_id, height)       RawMessage::Response
139///                           {request_id, peer_id, request}                                {request_id, response}
140///
141///
142/// An event that can be emitted by the gossip layer
143#[derive(Clone, Debug)]
144pub enum Event {
145    Listening(Multiaddr),
146    PeerConnected(PeerId),
147    PeerDisconnected(PeerId),
148    ConsensusMessage(Channel, PeerId, Bytes),
149    LivenessMessage(Channel, PeerId, Bytes),
150    Sync(sync::RawMessage),
151}
152
153#[derive(Debug)]
154pub enum CtrlMsg {
155    Publish(Channel, Bytes),
156    Broadcast(Channel, Bytes),
157    SyncRequest(PeerId, Bytes, oneshot::Sender<OutboundRequestId>),
158    SyncReply(InboundRequestId, Bytes),
159    Shutdown,
160}
161
162#[derive(Debug)]
163pub struct State {
164    pub sync_channels: HashMap<InboundRequestId, sync::ResponseChannel>,
165    pub discovery: discovery::Discovery<Behaviour>,
166}
167
168impl State {
169    fn new(discovery: discovery::Discovery<Behaviour>) -> Self {
170        Self {
171            sync_channels: Default::default(),
172            discovery,
173        }
174    }
175}
176
177pub async fn spawn(
178    keypair: Keypair,
179    config: Config,
180    registry: SharedRegistry,
181) -> Result<Handle, eyre::Report> {
182    let swarm = registry.with_prefix(METRICS_PREFIX, |registry| -> Result<_, eyre::Report> {
183        let builder = SwarmBuilder::with_existing_identity(keypair).with_tokio();
184        match config.transport {
185            TransportProtocol::Tcp => Ok(builder
186                .with_tcp(
187                    libp2p::tcp::Config::new().nodelay(true), // Disable Nagle's algorithm
188                    libp2p::noise::Config::new,
189                    libp2p::yamux::Config::default,
190                )?
191                .with_dns()?
192                .with_bandwidth_metrics(registry)
193                .with_behaviour(|kp| Behaviour::new_with_metrics(&config, kp, registry))?
194                .with_swarm_config(|cfg| config.apply_to_swarm(cfg))
195                .build()),
196            TransportProtocol::Quic => Ok(builder
197                .with_quic_config(|cfg| config.apply_to_quic(cfg))
198                .with_dns()?
199                .with_bandwidth_metrics(registry)
200                .with_behaviour(|kp| Behaviour::new_with_metrics(&config, kp, registry))?
201                .with_swarm_config(|cfg| config.apply_to_swarm(cfg))
202                .build()),
203        }
204    })?;
205
206    let metrics = registry.with_prefix(METRICS_PREFIX, Metrics::new);
207
208    let (tx_event, rx_event) = mpsc::channel(32);
209    let (tx_ctrl, rx_ctrl) = mpsc::channel(32);
210
211    let discovery = registry.with_prefix(DISCOVERY_METRICS_PREFIX, |reg| {
212        discovery::Discovery::new(config.discovery, config.persistent_peers.clone(), reg)
213    });
214
215    let state = State::new(discovery);
216
217    let peer_id = PeerId::from_libp2p(swarm.local_peer_id());
218    let span = error_span!("network");
219
220    info!(parent: span.clone(), %peer_id, "Starting network service");
221
222    let task_handle =
223        tokio::task::spawn(run(config, metrics, state, swarm, rx_ctrl, tx_event).instrument(span));
224
225    Ok(Handle::new(peer_id, tx_ctrl, rx_event, task_handle))
226}
227
228async fn run(
229    config: Config,
230    metrics: Metrics,
231    mut state: State,
232    mut swarm: swarm::Swarm<Behaviour>,
233    mut rx_ctrl: mpsc::Receiver<CtrlMsg>,
234    tx_event: mpsc::Sender<Event>,
235) {
236    if let Err(e) = swarm.listen_on(config.listen_addr.clone()) {
237        error!("Error listening on {}: {e}", config.listen_addr);
238        return;
239    }
240
241    state.discovery.dial_bootstrap_nodes(&swarm);
242
243    if let Err(e) = pubsub::subscribe(&mut swarm, config.pubsub_protocol, Channel::consensus()) {
244        error!("Error subscribing to consensus channels: {e}");
245        return;
246    };
247
248    if config.enable_sync {
249        if let Err(e) = pubsub::subscribe(&mut swarm, PubSubProtocol::Broadcast, &[Channel::Sync]) {
250            error!("Error subscribing to Sync channel: {e}");
251            return;
252        };
253    }
254
255    loop {
256        let result = tokio::select! {
257            event = swarm.select_next_some() => {
258                handle_swarm_event(event, &config, &metrics, &mut swarm, &mut state, &tx_event).await
259            }
260
261            Some(connection_data) = state.discovery.controller.dial.recv(), if state.discovery.can_dial() => {
262                state.discovery.dial_peer(&mut swarm, connection_data);
263                ControlFlow::Continue(())
264            }
265
266            Some(request_data) = state.discovery.controller.peers_request.recv(), if state.discovery.can_peers_request() => {
267                state.discovery.peers_request_peer(&mut swarm, request_data);
268                ControlFlow::Continue(())
269            }
270
271            Some(request_data) = state.discovery.controller.connect_request.recv(), if state.discovery.can_connect_request() => {
272                state.discovery.connect_request_peer(&mut swarm, request_data);
273                ControlFlow::Continue(())
274            }
275
276            Some((peer_id, connection_id)) = state.discovery.controller.close.recv(), if state.discovery.can_close() => {
277                state.discovery.close_connection(&mut swarm, peer_id, connection_id);
278                ControlFlow::Continue(())
279            }
280
281            Some(ctrl) = rx_ctrl.recv() => {
282                handle_ctrl_msg(&mut swarm, &mut state, &config, ctrl).await
283            }
284        };
285
286        match result {
287            ControlFlow::Continue(()) => continue,
288            ControlFlow::Break(()) => break,
289        }
290    }
291}
292
293async fn handle_ctrl_msg(
294    swarm: &mut swarm::Swarm<Behaviour>,
295    state: &mut State,
296    config: &Config,
297    msg: CtrlMsg,
298) -> ControlFlow<()> {
299    match msg {
300        CtrlMsg::Publish(channel, data) => {
301            let msg_size = data.len();
302            let result = pubsub::publish(swarm, config.pubsub_protocol, channel, data);
303
304            match result {
305                Ok(()) => debug!(%channel, size = %msg_size, "Published message"),
306                Err(e) => error!(%channel, "Error publishing message: {e}"),
307            }
308
309            ControlFlow::Continue(())
310        }
311
312        CtrlMsg::Broadcast(channel, data) => {
313            if channel == Channel::Sync && !config.enable_sync {
314                trace!("Ignoring broadcast message to Sync channel: Sync not enabled");
315                return ControlFlow::Continue(());
316            }
317
318            let msg_size = data.len();
319            let result = pubsub::publish(swarm, PubSubProtocol::Broadcast, channel, data);
320
321            match result {
322                Ok(()) => debug!(%channel, size = %msg_size, "Broadcasted message"),
323                Err(e) => error!(%channel, "Error broadcasting message: {e}"),
324            }
325
326            ControlFlow::Continue(())
327        }
328
329        CtrlMsg::SyncRequest(peer_id, request, reply_to) => {
330            let Some(sync) = swarm.behaviour_mut().sync.as_mut() else {
331                error!("Cannot request Sync from peer: Sync not enabled");
332                return ControlFlow::Continue(());
333            };
334
335            let request_id = sync.send_request(peer_id.to_libp2p(), request);
336
337            if let Err(e) = reply_to.send(request_id) {
338                error!(%peer_id, "Error sending Sync request: {e}");
339            }
340
341            ControlFlow::Continue(())
342        }
343
344        CtrlMsg::SyncReply(request_id, data) => {
345            let Some(sync) = swarm.behaviour_mut().sync.as_mut() else {
346                error!("Cannot send Sync response to peer: Sync not enabled");
347                return ControlFlow::Continue(());
348            };
349
350            let Some(channel) = state.sync_channels.remove(&request_id) else {
351                error!(%request_id, "Received Sync reply for unknown request ID");
352                return ControlFlow::Continue(());
353            };
354
355            let result = sync.send_response(channel, data);
356
357            match result {
358                Ok(()) => debug!(%request_id, "Replied to Sync request"),
359                Err(e) => error!(%request_id, "Error replying to Sync request: {e}"),
360            }
361
362            ControlFlow::Continue(())
363        }
364
365        CtrlMsg::Shutdown => ControlFlow::Break(()),
366    }
367}
368
369async fn handle_swarm_event(
370    event: SwarmEvent<NetworkEvent>,
371    _config: &Config,
372    metrics: &Metrics,
373    swarm: &mut swarm::Swarm<Behaviour>,
374    state: &mut State,
375    tx_event: &mpsc::Sender<Event>,
376) -> ControlFlow<()> {
377    if let SwarmEvent::Behaviour(NetworkEvent::GossipSub(e)) = &event {
378        metrics.record(e);
379    } else if let SwarmEvent::Behaviour(NetworkEvent::Identify(e)) = &event {
380        metrics.record(e.as_ref());
381    }
382
383    match event {
384        SwarmEvent::NewListenAddr { address, .. } => {
385            debug!(%address, "Node is listening");
386
387            if let Err(e) = tx_event.send(Event::Listening(address)).await {
388                error!("Error sending listening event to handle: {e}");
389                return ControlFlow::Break(());
390            }
391        }
392
393        SwarmEvent::ConnectionEstablished {
394            peer_id,
395            connection_id,
396            endpoint,
397            ..
398        } => {
399            trace!("Connected to {peer_id} with connection id {connection_id}",);
400
401            state
402                .discovery
403                .handle_connection(swarm, peer_id, connection_id, endpoint);
404        }
405
406        SwarmEvent::OutgoingConnectionError {
407            connection_id,
408            error,
409            ..
410        } => {
411            error!("Error dialing peer: {error}");
412
413            state
414                .discovery
415                .handle_failed_connection(swarm, connection_id, error);
416        }
417
418        SwarmEvent::ConnectionClosed {
419            peer_id,
420            connection_id,
421            num_established,
422            cause,
423            ..
424        } => {
425            if let Some(cause) = cause {
426                warn!("Connection closed with {peer_id}, reason: {cause}");
427            } else {
428                warn!("Connection closed with {peer_id}, reason: unknown");
429            }
430
431            state
432                .discovery
433                .handle_closed_connection(swarm, peer_id, connection_id);
434
435            if num_established == 0 {
436                if let Err(e) = tx_event
437                    .send(Event::PeerDisconnected(PeerId::from_libp2p(&peer_id)))
438                    .await
439                {
440                    error!("Error sending peer disconnected event to handle: {e}");
441                    return ControlFlow::Break(());
442                }
443            }
444        }
445
446        SwarmEvent::Behaviour(NetworkEvent::Identify(event)) => match *event {
447            identify::Event::Sent { peer_id, .. } => {
448                trace!("Sent identity to {peer_id}");
449            }
450
451            identify::Event::Received {
452                connection_id,
453                peer_id,
454                info,
455            } => {
456                trace!(
457                    "Received identity from {peer_id}: protocol={:?}",
458                    info.protocol_version
459                );
460
461                if info.protocol_version == PROTOCOL {
462                    trace!(
463                        "Peer {peer_id} is using compatible protocol version: {:?}",
464                        info.protocol_version
465                    );
466
467                    let is_already_connected =
468                        state
469                            .discovery
470                            .handle_new_peer(swarm, connection_id, peer_id, info);
471
472                    if !is_already_connected {
473                        if let Err(e) = tx_event
474                            .send(Event::PeerConnected(PeerId::from_libp2p(&peer_id)))
475                            .await
476                        {
477                            error!("Error sending peer connected event to handle: {e}");
478                            return ControlFlow::Break(());
479                        }
480                    }
481                } else {
482                    trace!(
483                        "Peer {peer_id} is using incompatible protocol version: {:?}",
484                        info.protocol_version
485                    );
486                }
487            }
488
489            // Ignore other identify events
490            _ => (),
491        },
492
493        SwarmEvent::Behaviour(NetworkEvent::Ping(event)) => {
494            match &event.result {
495                Ok(rtt) => {
496                    trace!("Received pong from {} in {rtt:?}", event.peer);
497                }
498                Err(e) => {
499                    trace!("Received pong from {} with error: {e}", event.peer);
500                }
501            }
502
503            // Record metric for round-trip time sending a ping and receiving a pong
504            metrics.record(&event);
505        }
506
507        SwarmEvent::Behaviour(NetworkEvent::GossipSub(event)) => {
508            return handle_gossipsub_event(event, metrics, swarm, state, tx_event).await;
509        }
510
511        SwarmEvent::Behaviour(NetworkEvent::Broadcast(event)) => {
512            return handle_broadcast_event(event, metrics, swarm, state, tx_event).await;
513        }
514
515        SwarmEvent::Behaviour(NetworkEvent::Sync(event)) => {
516            return handle_sync_event(event, metrics, swarm, state, tx_event).await;
517        }
518
519        SwarmEvent::Behaviour(NetworkEvent::Discovery(network_event)) => {
520            state.discovery.on_network_event(swarm, *network_event);
521        }
522
523        swarm_event => {
524            metrics.record(&swarm_event);
525        }
526    }
527
528    ControlFlow::Continue(())
529}
530
531async fn handle_gossipsub_event(
532    event: gossipsub::Event,
533    _metrics: &Metrics,
534    _swarm: &mut swarm::Swarm<Behaviour>,
535    _state: &mut State,
536    tx_event: &mpsc::Sender<Event>,
537) -> ControlFlow<()> {
538    match event {
539        gossipsub::Event::Subscribed { peer_id, topic } => {
540            if !Channel::has_gossipsub_topic(&topic) {
541                trace!("Peer {peer_id} tried to subscribe to unknown topic: {topic}");
542                return ControlFlow::Continue(());
543            }
544
545            trace!("Peer {peer_id} subscribed to {topic}");
546        }
547
548        gossipsub::Event::Unsubscribed { peer_id, topic } => {
549            if !Channel::has_gossipsub_topic(&topic) {
550                trace!("Peer {peer_id} tried to unsubscribe from unknown topic: {topic}");
551                return ControlFlow::Continue(());
552            }
553
554            trace!("Peer {peer_id} unsubscribed from {topic}");
555        }
556
557        gossipsub::Event::Message {
558            message_id,
559            message,
560            ..
561        } => {
562            let Some(peer_id) = message.source else {
563                return ControlFlow::Continue(());
564            };
565
566            let Some(channel) = Channel::from_gossipsub_topic_hash(&message.topic) else {
567                trace!(
568                    "Received message {message_id} from {peer_id} on different channel: {}",
569                    message.topic
570                );
571
572                return ControlFlow::Continue(());
573            };
574
575            trace!(
576                "Received message {message_id} from {peer_id} on channel {channel} of {} bytes",
577                message.data.len()
578            );
579
580            let peer_id = PeerId::from_libp2p(&peer_id);
581
582            let event = if channel == Channel::Liveness {
583                Event::LivenessMessage(channel, peer_id, Bytes::from(message.data))
584            } else {
585                Event::ConsensusMessage(channel, peer_id, Bytes::from(message.data))
586            };
587
588            if let Err(e) = tx_event.send(event).await {
589                error!("Error sending message to handle: {e}");
590                return ControlFlow::Break(());
591            }
592        }
593
594        gossipsub::Event::SlowPeer {
595            peer_id,
596            failed_messages,
597        } => {
598            trace!(
599                "Slow peer detected: {peer_id}, total failed messages: {}",
600                failed_messages.total()
601            );
602        }
603
604        gossipsub::Event::GossipsubNotSupported { peer_id } => {
605            trace!("Peer does not support GossipSub: {peer_id}");
606        }
607    }
608
609    ControlFlow::Continue(())
610}
611
612async fn handle_broadcast_event(
613    event: broadcast::Event,
614    _metrics: &Metrics,
615    _swarm: &mut swarm::Swarm<Behaviour>,
616    _state: &mut State,
617    tx_event: &mpsc::Sender<Event>,
618) -> ControlFlow<()> {
619    match event {
620        broadcast::Event::Subscribed(peer_id, topic) => {
621            if !Channel::has_broadcast_topic(&topic) {
622                trace!("Peer {peer_id} tried to subscribe to unknown topic: {topic:?}");
623                return ControlFlow::Continue(());
624            }
625
626            trace!("Peer {peer_id} subscribed to {topic:?}");
627        }
628
629        broadcast::Event::Unsubscribed(peer_id, topic) => {
630            if !Channel::has_broadcast_topic(&topic) {
631                trace!("Peer {peer_id} tried to unsubscribe from unknown topic: {topic:?}");
632                return ControlFlow::Continue(());
633            }
634
635            trace!("Peer {peer_id} unsubscribed from {topic:?}");
636        }
637
638        broadcast::Event::Received(peer_id, topic, message) => {
639            let Some(channel) = Channel::from_broadcast_topic(&topic) else {
640                trace!("Received message from {peer_id} on different channel: {topic:?}");
641                return ControlFlow::Continue(());
642            };
643
644            trace!(
645                "Received message from {peer_id} on channel {channel} of {} bytes",
646                message.len()
647            );
648
649            let peer_id = PeerId::from_libp2p(&peer_id);
650
651            let event = if channel == Channel::Liveness {
652                Event::LivenessMessage(channel, peer_id, message)
653            } else {
654                Event::ConsensusMessage(channel, peer_id, message)
655            };
656
657            if let Err(e) = tx_event.send(event).await {
658                error!("Error sending message to handle: {e}");
659                return ControlFlow::Break(());
660            }
661        }
662    }
663
664    ControlFlow::Continue(())
665}
666
667async fn handle_sync_event(
668    event: sync::Event,
669    _metrics: &Metrics,
670    _swarm: &mut swarm::Swarm<Behaviour>,
671    state: &mut State,
672    tx_event: &mpsc::Sender<Event>,
673) -> ControlFlow<()> {
674    match event {
675        sync::Event::Message { peer, message, .. } => {
676            match message {
677                libp2p::request_response::Message::Request {
678                    request_id,
679                    request,
680                    channel,
681                } => {
682                    state.sync_channels.insert(request_id, channel);
683
684                    let _ = tx_event
685                        .send(Event::Sync(sync::RawMessage::Request {
686                            request_id,
687                            peer: PeerId::from_libp2p(&peer),
688                            body: request.0,
689                        }))
690                        .await
691                        .map_err(|e| {
692                            error!("Error sending Sync request to handle: {e}");
693                        });
694                }
695
696                libp2p::request_response::Message::Response {
697                    request_id,
698                    response,
699                } => {
700                    let _ = tx_event
701                        .send(Event::Sync(sync::RawMessage::Response {
702                            request_id,
703                            peer: PeerId::from_libp2p(&peer),
704                            body: response.0,
705                        }))
706                        .await
707                        .map_err(|e| {
708                            error!("Error sending Sync response to handle: {e}");
709                        });
710                }
711            }
712
713            ControlFlow::Continue(())
714        }
715
716        sync::Event::ResponseSent { .. } => ControlFlow::Continue(()),
717
718        sync::Event::OutboundFailure { .. } => ControlFlow::Continue(()),
719
720        sync::Event::InboundFailure { .. } => ControlFlow::Continue(()),
721    }
722}
723
724pub trait PeerIdExt {
725    fn to_libp2p(&self) -> libp2p::PeerId;
726    fn from_libp2p(peer_id: &libp2p::PeerId) -> Self;
727}
728
729impl PeerIdExt for PeerId {
730    fn to_libp2p(&self) -> libp2p::PeerId {
731        libp2p::PeerId::from_bytes(&self.to_bytes()).expect("valid PeerId")
732    }
733
734    fn from_libp2p(peer_id: &libp2p::PeerId) -> Self {
735        Self::from_bytes(&peer_id.to_bytes()).expect("valid PeerId")
736    }
737}