1use std::collections::HashMap;
2use std::error::Error;
3use std::ops::ControlFlow;
4use std::time::Duration;
5
6use futures::StreamExt;
7use libp2p::metrics::{Metrics, Recorder};
8use libp2p::request_response::{InboundRequestId, OutboundRequestId};
9use libp2p::swarm::{self, SwarmEvent};
10use libp2p::{gossipsub, identify, quic, SwarmBuilder};
11use libp2p_broadcast as broadcast;
12use tokio::sync::{mpsc, oneshot};
13use tracing::{debug, error, error_span, info, trace, warn, Instrument};
14
15use malachitebft_discovery::{self as discovery};
16use malachitebft_metrics::SharedRegistry;
17use malachitebft_sync::{self as sync};
18
19pub use malachitebft_peer::PeerId;
20
21pub use bytes::Bytes;
22pub use libp2p::gossipsub::MessageId;
23pub use libp2p::identity::Keypair;
24pub use libp2p::Multiaddr;
25
26pub mod behaviour;
27pub mod handle;
28pub mod pubsub;
29
30mod channel;
31pub use channel::Channel;
32
33use behaviour::{Behaviour, NetworkEvent};
34use handle::Handle;
35
36const PROTOCOL: &str = "/malachitebft-core-consensus/v1beta1";
37const METRICS_PREFIX: &str = "malachitebft_network";
38const DISCOVERY_METRICS_PREFIX: &str = "malachitebft_discovery";
39
40#[derive(Copy, Clone, Debug, Default)]
41pub enum PubSubProtocol {
42 #[default]
44 GossipSub,
45
46 Broadcast,
48}
49
50impl PubSubProtocol {
51 pub fn is_gossipsub(&self) -> bool {
52 matches!(self, Self::GossipSub)
53 }
54
55 pub fn is_broadcast(&self) -> bool {
56 matches!(self, Self::Broadcast)
57 }
58}
59
60#[derive(Copy, Clone, Debug)]
61pub struct GossipSubConfig {
62 pub mesh_n: usize,
63 pub mesh_n_high: usize,
64 pub mesh_n_low: usize,
65 pub mesh_outbound_min: usize,
66}
67
68impl Default for GossipSubConfig {
69 fn default() -> Self {
70 Self {
72 mesh_n: 6,
73 mesh_n_high: 12,
74 mesh_n_low: 4,
75 mesh_outbound_min: 2,
76 }
77 }
78}
79
80pub type BoxError = Box<dyn Error + Send + Sync + 'static>;
81
82pub type DiscoveryConfig = discovery::Config;
83pub type BootstrapProtocol = discovery::config::BootstrapProtocol;
84pub type Selector = discovery::config::Selector;
85
86#[derive(Clone, Debug)]
87pub struct Config {
88 pub listen_addr: Multiaddr,
89 pub persistent_peers: Vec<Multiaddr>,
90 pub discovery: DiscoveryConfig,
91 pub idle_connection_timeout: Duration,
92 pub transport: TransportProtocol,
93 pub gossipsub: GossipSubConfig,
94 pub pubsub_protocol: PubSubProtocol,
95 pub rpc_max_size: usize,
96 pub pubsub_max_size: usize,
97 pub enable_sync: bool,
98}
99
100impl Config {
101 fn apply_to_swarm(&self, cfg: swarm::Config) -> swarm::Config {
102 cfg.with_idle_connection_timeout(self.idle_connection_timeout)
103 }
104
105 fn apply_to_quic(&self, mut cfg: quic::Config) -> quic::Config {
106 cfg.max_idle_timeout = 300;
110 cfg.keep_alive_interval = Duration::from_millis(100);
111 cfg
112 }
113}
114
115#[derive(Copy, Clone, Debug, PartialEq, Eq)]
116pub enum TransportProtocol {
117 Tcp,
118 Quic,
119}
120
121impl TransportProtocol {
122 pub fn from_multiaddr(multiaddr: &Multiaddr) -> Option<TransportProtocol> {
123 for protocol in multiaddr.protocol_stack() {
124 match protocol {
125 "tcp" => return Some(TransportProtocol::Tcp),
126 "quic" | "quic-v1" => return Some(TransportProtocol::Quic),
127 _ => {}
128 }
129 }
130 None
131 }
132}
133
134#[derive(Clone, Debug)]
144pub enum Event {
145 Listening(Multiaddr),
146 PeerConnected(PeerId),
147 PeerDisconnected(PeerId),
148 ConsensusMessage(Channel, PeerId, Bytes),
149 LivenessMessage(Channel, PeerId, Bytes),
150 Sync(sync::RawMessage),
151}
152
153#[derive(Debug)]
154pub enum CtrlMsg {
155 Publish(Channel, Bytes),
156 Broadcast(Channel, Bytes),
157 SyncRequest(PeerId, Bytes, oneshot::Sender<OutboundRequestId>),
158 SyncReply(InboundRequestId, Bytes),
159 Shutdown,
160}
161
162#[derive(Debug)]
163pub struct State {
164 pub sync_channels: HashMap<InboundRequestId, sync::ResponseChannel>,
165 pub discovery: discovery::Discovery<Behaviour>,
166}
167
168impl State {
169 fn new(discovery: discovery::Discovery<Behaviour>) -> Self {
170 Self {
171 sync_channels: Default::default(),
172 discovery,
173 }
174 }
175}
176
177pub async fn spawn(
178 keypair: Keypair,
179 config: Config,
180 registry: SharedRegistry,
181) -> Result<Handle, eyre::Report> {
182 let swarm = registry.with_prefix(METRICS_PREFIX, |registry| -> Result<_, eyre::Report> {
183 let builder = SwarmBuilder::with_existing_identity(keypair).with_tokio();
184 match config.transport {
185 TransportProtocol::Tcp => Ok(builder
186 .with_tcp(
187 libp2p::tcp::Config::new().nodelay(true), libp2p::noise::Config::new,
189 libp2p::yamux::Config::default,
190 )?
191 .with_dns()?
192 .with_bandwidth_metrics(registry)
193 .with_behaviour(|kp| Behaviour::new_with_metrics(&config, kp, registry))?
194 .with_swarm_config(|cfg| config.apply_to_swarm(cfg))
195 .build()),
196 TransportProtocol::Quic => Ok(builder
197 .with_quic_config(|cfg| config.apply_to_quic(cfg))
198 .with_dns()?
199 .with_bandwidth_metrics(registry)
200 .with_behaviour(|kp| Behaviour::new_with_metrics(&config, kp, registry))?
201 .with_swarm_config(|cfg| config.apply_to_swarm(cfg))
202 .build()),
203 }
204 })?;
205
206 let metrics = registry.with_prefix(METRICS_PREFIX, Metrics::new);
207
208 let (tx_event, rx_event) = mpsc::channel(32);
209 let (tx_ctrl, rx_ctrl) = mpsc::channel(32);
210
211 let discovery = registry.with_prefix(DISCOVERY_METRICS_PREFIX, |reg| {
212 discovery::Discovery::new(config.discovery, config.persistent_peers.clone(), reg)
213 });
214
215 let state = State::new(discovery);
216
217 let peer_id = PeerId::from_libp2p(swarm.local_peer_id());
218 let span = error_span!("network");
219
220 info!(parent: span.clone(), %peer_id, "Starting network service");
221
222 let task_handle =
223 tokio::task::spawn(run(config, metrics, state, swarm, rx_ctrl, tx_event).instrument(span));
224
225 Ok(Handle::new(peer_id, tx_ctrl, rx_event, task_handle))
226}
227
228async fn run(
229 config: Config,
230 metrics: Metrics,
231 mut state: State,
232 mut swarm: swarm::Swarm<Behaviour>,
233 mut rx_ctrl: mpsc::Receiver<CtrlMsg>,
234 tx_event: mpsc::Sender<Event>,
235) {
236 if let Err(e) = swarm.listen_on(config.listen_addr.clone()) {
237 error!("Error listening on {}: {e}", config.listen_addr);
238 return;
239 }
240
241 state.discovery.dial_bootstrap_nodes(&swarm);
242
243 if let Err(e) = pubsub::subscribe(&mut swarm, config.pubsub_protocol, Channel::consensus()) {
244 error!("Error subscribing to consensus channels: {e}");
245 return;
246 };
247
248 if config.enable_sync {
249 if let Err(e) = pubsub::subscribe(&mut swarm, PubSubProtocol::Broadcast, &[Channel::Sync]) {
250 error!("Error subscribing to Sync channel: {e}");
251 return;
252 };
253 }
254
255 loop {
256 let result = tokio::select! {
257 event = swarm.select_next_some() => {
258 handle_swarm_event(event, &config, &metrics, &mut swarm, &mut state, &tx_event).await
259 }
260
261 Some(connection_data) = state.discovery.controller.dial.recv(), if state.discovery.can_dial() => {
262 state.discovery.dial_peer(&mut swarm, connection_data);
263 ControlFlow::Continue(())
264 }
265
266 Some(request_data) = state.discovery.controller.peers_request.recv(), if state.discovery.can_peers_request() => {
267 state.discovery.peers_request_peer(&mut swarm, request_data);
268 ControlFlow::Continue(())
269 }
270
271 Some(request_data) = state.discovery.controller.connect_request.recv(), if state.discovery.can_connect_request() => {
272 state.discovery.connect_request_peer(&mut swarm, request_data);
273 ControlFlow::Continue(())
274 }
275
276 Some((peer_id, connection_id)) = state.discovery.controller.close.recv(), if state.discovery.can_close() => {
277 state.discovery.close_connection(&mut swarm, peer_id, connection_id);
278 ControlFlow::Continue(())
279 }
280
281 Some(ctrl) = rx_ctrl.recv() => {
282 handle_ctrl_msg(&mut swarm, &mut state, &config, ctrl).await
283 }
284 };
285
286 match result {
287 ControlFlow::Continue(()) => continue,
288 ControlFlow::Break(()) => break,
289 }
290 }
291}
292
293async fn handle_ctrl_msg(
294 swarm: &mut swarm::Swarm<Behaviour>,
295 state: &mut State,
296 config: &Config,
297 msg: CtrlMsg,
298) -> ControlFlow<()> {
299 match msg {
300 CtrlMsg::Publish(channel, data) => {
301 let msg_size = data.len();
302 let result = pubsub::publish(swarm, config.pubsub_protocol, channel, data);
303
304 match result {
305 Ok(()) => debug!(%channel, size = %msg_size, "Published message"),
306 Err(e) => error!(%channel, "Error publishing message: {e}"),
307 }
308
309 ControlFlow::Continue(())
310 }
311
312 CtrlMsg::Broadcast(channel, data) => {
313 if channel == Channel::Sync && !config.enable_sync {
314 trace!("Ignoring broadcast message to Sync channel: Sync not enabled");
315 return ControlFlow::Continue(());
316 }
317
318 let msg_size = data.len();
319 let result = pubsub::publish(swarm, PubSubProtocol::Broadcast, channel, data);
320
321 match result {
322 Ok(()) => debug!(%channel, size = %msg_size, "Broadcasted message"),
323 Err(e) => error!(%channel, "Error broadcasting message: {e}"),
324 }
325
326 ControlFlow::Continue(())
327 }
328
329 CtrlMsg::SyncRequest(peer_id, request, reply_to) => {
330 let Some(sync) = swarm.behaviour_mut().sync.as_mut() else {
331 error!("Cannot request Sync from peer: Sync not enabled");
332 return ControlFlow::Continue(());
333 };
334
335 let request_id = sync.send_request(peer_id.to_libp2p(), request);
336
337 if let Err(e) = reply_to.send(request_id) {
338 error!(%peer_id, "Error sending Sync request: {e}");
339 }
340
341 ControlFlow::Continue(())
342 }
343
344 CtrlMsg::SyncReply(request_id, data) => {
345 let Some(sync) = swarm.behaviour_mut().sync.as_mut() else {
346 error!("Cannot send Sync response to peer: Sync not enabled");
347 return ControlFlow::Continue(());
348 };
349
350 let Some(channel) = state.sync_channels.remove(&request_id) else {
351 error!(%request_id, "Received Sync reply for unknown request ID");
352 return ControlFlow::Continue(());
353 };
354
355 let result = sync.send_response(channel, data);
356
357 match result {
358 Ok(()) => debug!(%request_id, "Replied to Sync request"),
359 Err(e) => error!(%request_id, "Error replying to Sync request: {e}"),
360 }
361
362 ControlFlow::Continue(())
363 }
364
365 CtrlMsg::Shutdown => ControlFlow::Break(()),
366 }
367}
368
369async fn handle_swarm_event(
370 event: SwarmEvent<NetworkEvent>,
371 _config: &Config,
372 metrics: &Metrics,
373 swarm: &mut swarm::Swarm<Behaviour>,
374 state: &mut State,
375 tx_event: &mpsc::Sender<Event>,
376) -> ControlFlow<()> {
377 if let SwarmEvent::Behaviour(NetworkEvent::GossipSub(e)) = &event {
378 metrics.record(e);
379 } else if let SwarmEvent::Behaviour(NetworkEvent::Identify(e)) = &event {
380 metrics.record(e.as_ref());
381 }
382
383 match event {
384 SwarmEvent::NewListenAddr { address, .. } => {
385 debug!(%address, "Node is listening");
386
387 if let Err(e) = tx_event.send(Event::Listening(address)).await {
388 error!("Error sending listening event to handle: {e}");
389 return ControlFlow::Break(());
390 }
391 }
392
393 SwarmEvent::ConnectionEstablished {
394 peer_id,
395 connection_id,
396 endpoint,
397 ..
398 } => {
399 trace!("Connected to {peer_id} with connection id {connection_id}",);
400
401 state
402 .discovery
403 .handle_connection(swarm, peer_id, connection_id, endpoint);
404 }
405
406 SwarmEvent::OutgoingConnectionError {
407 connection_id,
408 error,
409 ..
410 } => {
411 error!("Error dialing peer: {error}");
412
413 state
414 .discovery
415 .handle_failed_connection(swarm, connection_id, error);
416 }
417
418 SwarmEvent::ConnectionClosed {
419 peer_id,
420 connection_id,
421 num_established,
422 cause,
423 ..
424 } => {
425 if let Some(cause) = cause {
426 warn!("Connection closed with {peer_id}, reason: {cause}");
427 } else {
428 warn!("Connection closed with {peer_id}, reason: unknown");
429 }
430
431 state
432 .discovery
433 .handle_closed_connection(swarm, peer_id, connection_id);
434
435 if num_established == 0 {
436 if let Err(e) = tx_event
437 .send(Event::PeerDisconnected(PeerId::from_libp2p(&peer_id)))
438 .await
439 {
440 error!("Error sending peer disconnected event to handle: {e}");
441 return ControlFlow::Break(());
442 }
443 }
444 }
445
446 SwarmEvent::Behaviour(NetworkEvent::Identify(event)) => match *event {
447 identify::Event::Sent { peer_id, .. } => {
448 trace!("Sent identity to {peer_id}");
449 }
450
451 identify::Event::Received {
452 connection_id,
453 peer_id,
454 info,
455 } => {
456 trace!(
457 "Received identity from {peer_id}: protocol={:?}",
458 info.protocol_version
459 );
460
461 if info.protocol_version == PROTOCOL {
462 trace!(
463 "Peer {peer_id} is using compatible protocol version: {:?}",
464 info.protocol_version
465 );
466
467 let is_already_connected =
468 state
469 .discovery
470 .handle_new_peer(swarm, connection_id, peer_id, info);
471
472 if !is_already_connected {
473 if let Err(e) = tx_event
474 .send(Event::PeerConnected(PeerId::from_libp2p(&peer_id)))
475 .await
476 {
477 error!("Error sending peer connected event to handle: {e}");
478 return ControlFlow::Break(());
479 }
480 }
481 } else {
482 trace!(
483 "Peer {peer_id} is using incompatible protocol version: {:?}",
484 info.protocol_version
485 );
486 }
487 }
488
489 _ => (),
491 },
492
493 SwarmEvent::Behaviour(NetworkEvent::Ping(event)) => {
494 match &event.result {
495 Ok(rtt) => {
496 trace!("Received pong from {} in {rtt:?}", event.peer);
497 }
498 Err(e) => {
499 trace!("Received pong from {} with error: {e}", event.peer);
500 }
501 }
502
503 metrics.record(&event);
505 }
506
507 SwarmEvent::Behaviour(NetworkEvent::GossipSub(event)) => {
508 return handle_gossipsub_event(event, metrics, swarm, state, tx_event).await;
509 }
510
511 SwarmEvent::Behaviour(NetworkEvent::Broadcast(event)) => {
512 return handle_broadcast_event(event, metrics, swarm, state, tx_event).await;
513 }
514
515 SwarmEvent::Behaviour(NetworkEvent::Sync(event)) => {
516 return handle_sync_event(event, metrics, swarm, state, tx_event).await;
517 }
518
519 SwarmEvent::Behaviour(NetworkEvent::Discovery(network_event)) => {
520 state.discovery.on_network_event(swarm, *network_event);
521 }
522
523 swarm_event => {
524 metrics.record(&swarm_event);
525 }
526 }
527
528 ControlFlow::Continue(())
529}
530
531async fn handle_gossipsub_event(
532 event: gossipsub::Event,
533 _metrics: &Metrics,
534 _swarm: &mut swarm::Swarm<Behaviour>,
535 _state: &mut State,
536 tx_event: &mpsc::Sender<Event>,
537) -> ControlFlow<()> {
538 match event {
539 gossipsub::Event::Subscribed { peer_id, topic } => {
540 if !Channel::has_gossipsub_topic(&topic) {
541 trace!("Peer {peer_id} tried to subscribe to unknown topic: {topic}");
542 return ControlFlow::Continue(());
543 }
544
545 trace!("Peer {peer_id} subscribed to {topic}");
546 }
547
548 gossipsub::Event::Unsubscribed { peer_id, topic } => {
549 if !Channel::has_gossipsub_topic(&topic) {
550 trace!("Peer {peer_id} tried to unsubscribe from unknown topic: {topic}");
551 return ControlFlow::Continue(());
552 }
553
554 trace!("Peer {peer_id} unsubscribed from {topic}");
555 }
556
557 gossipsub::Event::Message {
558 message_id,
559 message,
560 ..
561 } => {
562 let Some(peer_id) = message.source else {
563 return ControlFlow::Continue(());
564 };
565
566 let Some(channel) = Channel::from_gossipsub_topic_hash(&message.topic) else {
567 trace!(
568 "Received message {message_id} from {peer_id} on different channel: {}",
569 message.topic
570 );
571
572 return ControlFlow::Continue(());
573 };
574
575 trace!(
576 "Received message {message_id} from {peer_id} on channel {channel} of {} bytes",
577 message.data.len()
578 );
579
580 let peer_id = PeerId::from_libp2p(&peer_id);
581
582 let event = if channel == Channel::Liveness {
583 Event::LivenessMessage(channel, peer_id, Bytes::from(message.data))
584 } else {
585 Event::ConsensusMessage(channel, peer_id, Bytes::from(message.data))
586 };
587
588 if let Err(e) = tx_event.send(event).await {
589 error!("Error sending message to handle: {e}");
590 return ControlFlow::Break(());
591 }
592 }
593
594 gossipsub::Event::SlowPeer {
595 peer_id,
596 failed_messages,
597 } => {
598 trace!(
599 "Slow peer detected: {peer_id}, total failed messages: {}",
600 failed_messages.total()
601 );
602 }
603
604 gossipsub::Event::GossipsubNotSupported { peer_id } => {
605 trace!("Peer does not support GossipSub: {peer_id}");
606 }
607 }
608
609 ControlFlow::Continue(())
610}
611
612async fn handle_broadcast_event(
613 event: broadcast::Event,
614 _metrics: &Metrics,
615 _swarm: &mut swarm::Swarm<Behaviour>,
616 _state: &mut State,
617 tx_event: &mpsc::Sender<Event>,
618) -> ControlFlow<()> {
619 match event {
620 broadcast::Event::Subscribed(peer_id, topic) => {
621 if !Channel::has_broadcast_topic(&topic) {
622 trace!("Peer {peer_id} tried to subscribe to unknown topic: {topic:?}");
623 return ControlFlow::Continue(());
624 }
625
626 trace!("Peer {peer_id} subscribed to {topic:?}");
627 }
628
629 broadcast::Event::Unsubscribed(peer_id, topic) => {
630 if !Channel::has_broadcast_topic(&topic) {
631 trace!("Peer {peer_id} tried to unsubscribe from unknown topic: {topic:?}");
632 return ControlFlow::Continue(());
633 }
634
635 trace!("Peer {peer_id} unsubscribed from {topic:?}");
636 }
637
638 broadcast::Event::Received(peer_id, topic, message) => {
639 let Some(channel) = Channel::from_broadcast_topic(&topic) else {
640 trace!("Received message from {peer_id} on different channel: {topic:?}");
641 return ControlFlow::Continue(());
642 };
643
644 trace!(
645 "Received message from {peer_id} on channel {channel} of {} bytes",
646 message.len()
647 );
648
649 let peer_id = PeerId::from_libp2p(&peer_id);
650
651 let event = if channel == Channel::Liveness {
652 Event::LivenessMessage(channel, peer_id, message)
653 } else {
654 Event::ConsensusMessage(channel, peer_id, message)
655 };
656
657 if let Err(e) = tx_event.send(event).await {
658 error!("Error sending message to handle: {e}");
659 return ControlFlow::Break(());
660 }
661 }
662 }
663
664 ControlFlow::Continue(())
665}
666
667async fn handle_sync_event(
668 event: sync::Event,
669 _metrics: &Metrics,
670 _swarm: &mut swarm::Swarm<Behaviour>,
671 state: &mut State,
672 tx_event: &mpsc::Sender<Event>,
673) -> ControlFlow<()> {
674 match event {
675 sync::Event::Message { peer, message, .. } => {
676 match message {
677 libp2p::request_response::Message::Request {
678 request_id,
679 request,
680 channel,
681 } => {
682 state.sync_channels.insert(request_id, channel);
683
684 let _ = tx_event
685 .send(Event::Sync(sync::RawMessage::Request {
686 request_id,
687 peer: PeerId::from_libp2p(&peer),
688 body: request.0,
689 }))
690 .await
691 .map_err(|e| {
692 error!("Error sending Sync request to handle: {e}");
693 });
694 }
695
696 libp2p::request_response::Message::Response {
697 request_id,
698 response,
699 } => {
700 let _ = tx_event
701 .send(Event::Sync(sync::RawMessage::Response {
702 request_id,
703 peer: PeerId::from_libp2p(&peer),
704 body: response.0,
705 }))
706 .await
707 .map_err(|e| {
708 error!("Error sending Sync response to handle: {e}");
709 });
710 }
711 }
712
713 ControlFlow::Continue(())
714 }
715
716 sync::Event::ResponseSent { .. } => ControlFlow::Continue(()),
717
718 sync::Event::OutboundFailure { .. } => ControlFlow::Continue(()),
719
720 sync::Event::InboundFailure { .. } => ControlFlow::Continue(()),
721 }
722}
723
724pub trait PeerIdExt {
725 fn to_libp2p(&self) -> libp2p::PeerId;
726 fn from_libp2p(peer_id: &libp2p::PeerId) -> Self;
727}
728
729impl PeerIdExt for PeerId {
730 fn to_libp2p(&self) -> libp2p::PeerId {
731 libp2p::PeerId::from_bytes(&self.to_bytes()).expect("valid PeerId")
732 }
733
734 fn from_libp2p(peer_id: &libp2p::PeerId) -> Self {
735 Self::from_bytes(&peer_id.to_bytes()).expect("valid PeerId")
736 }
737}