1use std::collections::{HashMap, HashSet};
4use std::fmt;
5
6use libp2p::identify;
7use libp2p::request_response::InboundRequestId;
8use libp2p::Multiaddr;
9use malachitebft_discovery as discovery;
10use malachitebft_discovery::util::strip_peer_id_from_multiaddr;
11use malachitebft_sync as sync;
12
13use crate::behaviour::Behaviour;
14use crate::metrics::Metrics as NetworkMetrics;
15use crate::{Channel, ChannelNames, PeerType, PersistentPeerError};
16use malachitebft_discovery::ConnectionDirection;
17
18#[derive(Clone, Debug)]
20pub struct NetworkStateDump {
21 pub local_node: LocalNodeInfo,
22 pub peers: std::collections::HashMap<libp2p::PeerId, PeerInfo>,
23 pub validator_set: Vec<ValidatorInfo>,
24 pub persistent_peer_ids: Vec<libp2p::PeerId>,
25 pub persistent_peer_addrs: Vec<Multiaddr>,
26}
27
28#[derive(Clone, Debug, PartialEq, Eq, Hash)]
30pub struct ValidatorInfo {
31 pub address: String,
33 pub voting_power: u64,
35}
36
37#[derive(Clone, Debug)]
39pub struct LocalNodeInfo {
40 pub moniker: String,
41 pub peer_id: libp2p::PeerId,
42 pub listen_addr: Multiaddr,
43 pub consensus_address: Option<String>,
49 pub is_validator: bool,
54 pub persistent_peers_only: bool,
56 pub subscribed_topics: HashSet<String>,
57}
58
59impl fmt::Display for LocalNodeInfo {
60 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61 let mut topics: Vec<&str> = self.subscribed_topics.iter().map(|s| s.as_str()).collect();
62 topics.sort();
63 let topics_str = format!("[{}]", topics.join(","));
64 let address = self.consensus_address.as_deref().unwrap_or("none");
65 let role = if self.is_validator {
66 "validator"
67 } else {
68 "full_node"
69 };
70 let peers_mode = if self.persistent_peers_only {
71 "persistent_only"
72 } else {
73 "open"
74 };
75 write!(
76 f,
77 "{}, {}, {}, {}, {}, {}, {}, me",
78 self.listen_addr, self.moniker, role, self.peer_id, address, topics_str, peers_mode
79 )
80 }
81}
82
83#[derive(Clone, Debug)]
85pub struct PeerInfo {
86 pub address: Multiaddr,
87 pub consensus_address: String, pub moniker: String,
89 pub peer_type: PeerType,
90 pub connection_direction: Option<ConnectionDirection>, pub score: f64,
92 pub topics: HashSet<String>, pub is_explicit: bool, }
95
96impl PeerInfo {
97 pub fn format_with_peer_id(&self, peer_id: &libp2p::PeerId) -> String {
100 let direction = self.connection_direction.map_or("??", |d| d.as_str());
101 let mut topics: Vec<&str> = self.topics.iter().map(|s| s.as_str()).collect();
102 topics.sort();
103 let topics_str = format!("[{}]", topics.join(","));
104 let peer_type_str = self.peer_type.primary_type_str();
105 let address = if self.consensus_address.is_empty() {
106 "none"
107 } else {
108 &self.consensus_address
109 };
110 let explicit = if self.is_explicit { "explicit" } else { "-" };
111 format!(
112 "{}, {}, {}, {}, {}, {}, {}, {}, {}",
113 self.address,
114 self.moniker,
115 peer_type_str,
116 peer_id,
117 address,
118 topics_str,
119 direction,
120 self.score as i64,
121 explicit
122 )
123 }
124}
125
126#[derive(Debug)]
127pub struct State {
128 pub sync_channels: HashMap<InboundRequestId, sync::ResponseChannel>,
129 pub discovery: discovery::Discovery<Behaviour>,
130 pub persistent_peer_ids: HashSet<libp2p::PeerId>,
131 pub persistent_peer_addrs: Vec<Multiaddr>,
132 pub validator_set: Vec<ValidatorInfo>,
134 pub(crate) metrics: NetworkMetrics,
135 pub local_node: LocalNodeInfo,
137 pub peer_info: HashMap<libp2p::PeerId, PeerInfo>,
139}
140
141impl State {
142 pub(crate) fn process_validator_set_update(
152 &mut self,
153 new_validators: Vec<ValidatorInfo>,
154 ) -> Vec<(libp2p::PeerId, f64)> {
155 self.validator_set = new_validators;
157
158 self.reclassify_local_node();
159
160 self.reclassify_peers()
162 }
163
164 fn reclassify_local_node(&mut self) {
166 let was_validator = self.local_node.is_validator;
167 let local_is_validator = self
169 .local_node
170 .consensus_address
171 .as_ref()
172 .map(|addr| self.validator_set.iter().any(|v| &v.address == addr))
173 .unwrap_or(false);
174
175 self.local_node.is_validator = local_is_validator;
176
177 if was_validator != local_is_validator {
179 tracing::info!(
180 local_is_validator,
181 address = ?self.local_node.consensus_address,
182 "Local node validator status changed"
183 );
184 self.metrics.set_local_node_info(&self.local_node);
185 }
186 }
187
188 fn reclassify_peers(&mut self) -> Vec<(libp2p::PeerId, f64)> {
192 let mut changed_peers = Vec::new();
193
194 for (peer_id, peer_info) in self.peer_info.iter_mut() {
195 let is_validator = if let Some(validator_info) = self
197 .validator_set
198 .iter()
199 .find(|v| v.address == peer_info.consensus_address)
200 {
201 peer_info.consensus_address = validator_info.address.clone();
202 true
203 } else {
204 false
205 };
206
207 let new_type = peer_info.peer_type.with_validator_status(is_validator);
209
210 let old_peer_info = peer_info.clone();
212
213 if let Some(new_score) = apply_peer_type_change(
214 peer_id,
215 peer_info,
216 &old_peer_info,
217 new_type,
218 &mut self.metrics,
219 ) {
220 changed_peers.push((*peer_id, new_score));
221 }
222 }
223
224 changed_peers
225 }
226
227 pub(crate) fn new(
228 discovery: discovery::Discovery<Behaviour>,
229 persistent_peer_addrs: Vec<Multiaddr>,
230 local_node: LocalNodeInfo,
231 metrics: NetworkMetrics,
232 ) -> Self {
233 let persistent_peer_ids = persistent_peer_addrs
235 .iter()
236 .filter_map(extract_peer_id_from_multiaddr)
237 .collect();
238
239 Self {
240 sync_channels: Default::default(),
241 discovery,
242 persistent_peer_ids,
243 persistent_peer_addrs,
244 validator_set: Vec::new(),
245 metrics,
246 local_node,
247 peer_info: HashMap::new(),
248 }
249 }
250
251 pub(crate) fn peer_type(
253 &self,
254 peer_id: &libp2p::PeerId,
255 connection_id: libp2p::swarm::ConnectionId,
256 info: &identify::Info,
257 ) -> PeerType {
258 let is_persistent = self.persistent_peer_ids.contains(peer_id)
259 || self.is_persistent_peer_by_address(connection_id);
260
261 let agent_info = crate::utils::parse_agent_version(&info.agent_version);
263 let is_validator = agent_info.address != "unknown"
264 && self
265 .validator_set
266 .iter()
267 .any(|v| v.address == agent_info.address);
268
269 PeerType::new(is_persistent, is_validator)
270 }
271
272 fn is_persistent_peer_by_address(&self, connection_id: libp2p::swarm::ConnectionId) -> bool {
278 let Some(conn_info) = self.discovery.connections.get(&connection_id) else {
281 return false;
282 };
283
284 let remote_addr_without_p2p = strip_peer_id_from_multiaddr(&conn_info.remote_addr);
285
286 for persistent_addr in &self.persistent_peer_addrs {
287 let persistent_addr_without_p2p = strip_peer_id_from_multiaddr(persistent_addr);
288
289 if remote_addr_without_p2p == persistent_addr_without_p2p {
290 return true;
291 }
292 }
293
294 false
295 }
296
297 pub(crate) fn update_peer_info(
300 &mut self,
301 gossipsub: &libp2p_gossipsub::Behaviour,
302 channels: &[Channel],
303 channel_names: ChannelNames,
304 ) {
305 let current_peers: HashSet<libp2p::PeerId> =
307 gossipsub.all_peers().map(|(p, _)| *p).collect();
308 let tracked_peers: HashSet<libp2p::PeerId> = self.peer_info.keys().copied().collect();
309 let disconnected_peers: Vec<libp2p::PeerId> =
310 tracked_peers.difference(¤t_peers).copied().collect();
311
312 for peer_id in disconnected_peers {
313 if let Some(peer_info) = self.peer_info.remove(&peer_id) {
315 self.metrics.free_slot(&peer_id, &peer_info);
317 }
318 }
319
320 let mut peer_topics: HashMap<libp2p::PeerId, HashSet<String>> = HashMap::new();
322
323 for channel in channels {
324 let topic = channel.to_gossipsub_topic(channel_names);
325 let topic_hash = topic.hash();
326 let topic_str = channel.as_str(channel_names).to_string();
327
328 for peer_id in gossipsub.mesh_peers(&topic_hash) {
329 peer_topics
330 .entry(*peer_id)
331 .or_default()
332 .insert(topic_str.clone());
333 }
334 }
335
336 for (peer_id, peer_info) in self.peer_info.iter_mut() {
338 let new_score = gossipsub.peer_score(peer_id).unwrap_or(0.0);
339 let new_topics = peer_topics.get(peer_id).cloned().unwrap_or_default();
340
341 let _ = self.metrics.update_peer_metrics(
344 peer_id,
345 peer_info,
346 new_score,
347 Some(new_topics.clone()),
348 );
349
350 peer_info.score = new_score;
352 peer_info.topics = new_topics;
353 }
354 }
355
356 pub(crate) fn update_peer(
365 &mut self,
366 peer_id: libp2p::PeerId,
367 connection_id: libp2p::swarm::ConnectionId,
368 info: &identify::Info,
369 ) -> f64 {
370 let peer_type = self.peer_type(&peer_id, connection_id, info);
372
373 if peer_type.is_persistent() {
375 self.persistent_peer_ids.insert(peer_id);
376 }
377
378 let connection_direction = if self.discovery.is_outbound_peer(&peer_id) {
380 Some(ConnectionDirection::Outbound)
381 } else if self.discovery.is_inbound_peer(&peer_id) {
382 Some(ConnectionDirection::Inbound)
383 } else {
384 None
386 };
387
388 let address = self
391 .discovery
392 .connections
393 .get(&connection_id)
394 .map(|conn| conn.remote_addr.clone())
395 .unwrap_or_else(|| {
396 info.listen_addrs
398 .first()
399 .cloned()
400 .unwrap_or_else(|| "/ip4/0.0.0.0/tcp/0".parse().expect("valid multiaddr"))
401 });
402
403 let agent_info = crate::utils::parse_agent_version(&info.agent_version);
405
406 let consensus_address = if peer_type.is_validator() {
410 self.validator_set
412 .iter()
413 .find(|v| v.address == agent_info.address)
414 .map(|v| v.address.clone())
415 .unwrap_or_else(|| agent_info.address.clone())
416 } else {
417 agent_info.address.clone()
418 };
419
420 if let Some(existing) = self.peer_info.get_mut(&peer_id) {
423 let old_peer_info = existing.clone();
424 existing.moniker = agent_info.moniker;
425 if connection_direction == Some(ConnectionDirection::Outbound)
427 || existing.connection_direction != Some(ConnectionDirection::Outbound)
428 {
429 existing.address = address;
430 existing.connection_direction = connection_direction;
431 }
432 existing.peer_type = peer_type;
434 existing.consensus_address = consensus_address;
435 existing.score = crate::peer_scoring::get_peer_score(peer_type);
436
437 self.metrics
438 .update_peer_labels(&peer_id, &old_peer_info, existing);
439 return existing.score;
440 }
441
442 let score = crate::peer_scoring::get_peer_score(peer_type);
444 let peer_info = PeerInfo {
445 address,
446 consensus_address,
447 moniker: agent_info.moniker,
448 peer_type,
449 connection_direction,
450 score,
451 topics: Default::default(),
452 is_explicit: false,
453 };
454
455 self.metrics.record_new_peer(&peer_id, &peer_info);
457
458 self.peer_info.insert(peer_id, peer_info);
460
461 score
462 }
463
464 pub fn format_peer_info(&self) -> String {
467 let mut lines = Vec::new();
468
469 lines.push("Address, Moniker, Type, PeerId, ConsensusAddr, Mesh, Dir, Score".to_string());
471
472 lines.push(format!("{}", self.local_node));
474
475 let mut peers: Vec<_> = self.peer_info.iter().collect();
477 peers.sort_by(|a, b| a.1.moniker.cmp(&b.1.moniker));
478
479 for (peer_id, peer_info) in peers {
480 lines.push(peer_info.format_with_peer_id(peer_id));
481 }
482
483 lines.join("\n")
484 }
485
486 fn update_peer_persistent_status(
488 peer_id: libp2p::PeerId,
489 peer_info: Option<&mut PeerInfo>,
490 is_persistent: bool,
491 swarm: &mut libp2p::Swarm<Behaviour>,
492 ) {
493 let Some(peer_info) = peer_info else {
494 return;
495 };
496
497 peer_info.peer_type = peer_info.peer_type.with_persistent(is_persistent);
498
499 let new_score = crate::peer_scoring::get_peer_score(peer_info.peer_type);
501 peer_info.score = new_score;
502
503 if let Some(gossipsub) = swarm.behaviour_mut().gossipsub.as_mut() {
505 gossipsub.set_application_score(&peer_id, new_score);
506 }
507
508 tracing::debug!(
509 %peer_id,
510 %is_persistent,
511 peer_type = ?peer_info.peer_type,
512 "Updated peer persistent status"
513 );
514 }
515
516 pub(crate) fn add_persistent_peer(
518 &mut self,
519 addr: Multiaddr,
520 swarm: &mut libp2p::Swarm<Behaviour>,
521 ) -> Result<(), PersistentPeerError> {
522 if self.persistent_peer_addrs.contains(&addr) {
524 return Err(PersistentPeerError::AlreadyExists);
525 }
526
527 if let Some(peer_id) = extract_peer_id_from_multiaddr(&addr) {
529 self.persistent_peer_ids.insert(peer_id);
530
531 Self::update_peer_persistent_status(
533 peer_id,
534 self.peer_info.get_mut(&peer_id),
535 true,
536 swarm,
537 );
538 }
539
540 self.persistent_peer_addrs.push(addr.clone());
542
543 self.discovery.add_bootstrap_node(addr.clone());
545
546 if let Err(e) = swarm.dial(addr.clone()) {
548 tracing::warn!(
549 error = %e,
550 addr = %addr,
551 "Failed to dial newly added persistent peer, will retry via discovery"
552 );
553 }
555
556 Ok(())
557 }
558
559 pub(crate) fn remove_persistent_peer(
561 &mut self,
562 addr: Multiaddr,
563 swarm: &mut libp2p::Swarm<Behaviour>,
564 ) -> Result<(), PersistentPeerError> {
565 let Some(pos) = self.persistent_peer_addrs.iter().position(|a| a == &addr) else {
567 return Err(PersistentPeerError::NotFound);
568 };
569
570 self.persistent_peer_addrs.remove(pos);
571
572 let peer_id = self.discovery.get_peer_id_for_addr(&addr);
575
576 if let Some(peer_id) = peer_id {
577 self.persistent_peer_ids.remove(&peer_id);
578
579 Self::update_peer_persistent_status(
581 peer_id,
582 self.peer_info.get_mut(&peer_id),
583 false,
584 swarm,
585 );
586
587 let should_disconnect =
592 self.local_node.persistent_peers_only || !self.discovery.is_inbound_peer(&peer_id);
593
594 if swarm.is_connected(&peer_id) && should_disconnect {
595 let _ = swarm.disconnect_peer_id(peer_id);
596 tracing::info!(%peer_id, %addr, "Disconnecting from removed persistent peer");
597 }
598 }
599
600 self.discovery.cancel_dial_attempts(&addr, peer_id);
602
603 self.discovery.remove_bootstrap_node(&addr);
605
606 Ok(())
607 }
608}
609
610fn extract_peer_id_from_multiaddr(addr: &Multiaddr) -> Option<libp2p::PeerId> {
612 use libp2p::multiaddr::Protocol;
613
614 for protocol in addr.iter() {
615 if let Protocol::P2p(peer_id) = protocol {
616 return Some(peer_id);
617 }
618 }
619 None
620}
621
622fn apply_peer_type_change(
628 peer_id: &libp2p::PeerId,
629 peer_info: &mut PeerInfo,
630 old_peer_info: &PeerInfo,
631 new_type: PeerType,
632 metrics: &mut NetworkMetrics,
633) -> Option<f64> {
634 let new_score = crate::peer_scoring::get_peer_score(new_type);
635 peer_info.peer_type = new_type;
636 peer_info.score = new_score;
637
638 metrics
639 .update_peer_labels(peer_id, old_peer_info, peer_info)
640 .then_some(new_score)
641}