Skip to main content

arc_malachitebft_network/
state.rs

1//! Network state management
2
3use std::collections::{HashMap, HashSet};
4use std::fmt;
5
6use libp2p::identify;
7use libp2p::request_response::InboundRequestId;
8use libp2p::Multiaddr;
9use malachitebft_discovery as discovery;
10use malachitebft_discovery::util::strip_peer_id_from_multiaddr;
11use malachitebft_sync as sync;
12
13use crate::behaviour::Behaviour;
14use crate::metrics::Metrics as NetworkMetrics;
15use crate::{Channel, ChannelNames, PeerType, PersistentPeerError};
16use malachitebft_discovery::ConnectionDirection;
17
18/// Public network state dump for external consumers
19#[derive(Clone, Debug)]
20pub struct NetworkStateDump {
21    pub local_node: LocalNodeInfo,
22    pub peers: std::collections::HashMap<libp2p::PeerId, PeerInfo>,
23    pub validator_set: Vec<ValidatorInfo>,
24    pub persistent_peer_ids: Vec<libp2p::PeerId>,
25    pub persistent_peer_addrs: Vec<Multiaddr>,
26}
27
28/// Validator information passed from consensus to network layer
29#[derive(Clone, Debug, PartialEq, Eq, Hash)]
30pub struct ValidatorInfo {
31    /// Consensus address as string (for matching via Identify protocol)
32    pub address: String,
33    /// Voting power
34    pub voting_power: u64,
35}
36
37/// Local node information
38#[derive(Clone, Debug)]
39pub struct LocalNodeInfo {
40    pub moniker: String,
41    pub peer_id: libp2p::PeerId,
42    pub listen_addr: Multiaddr,
43    /// This node's consensus address (if it is configured with validator credentials).
44    ///
45    /// Present if the node has a consensus keypair, even if not currently in the active validator set.
46    /// This is static configuration determined at startup.
47    /// Note: In the future full nodes will not have a consensus address, so this will be None.
48    pub consensus_address: Option<String>,
49    /// Whether this node is currently in the active validator set.
50    ///
51    /// Updated dynamically when validator set changes. A node can have `consensus_address = Some(...)`
52    /// but `is_validator = false` if it was removed from the validator set or hasn't joined yet.
53    pub is_validator: bool,
54    /// Whether this node only accepts connections from persistent peers.
55    pub persistent_peers_only: bool,
56    pub subscribed_topics: HashSet<String>,
57}
58
59impl fmt::Display for LocalNodeInfo {
60    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61        let mut topics: Vec<&str> = self.subscribed_topics.iter().map(|s| s.as_str()).collect();
62        topics.sort();
63        let topics_str = format!("[{}]", topics.join(","));
64        let address = self.consensus_address.as_deref().unwrap_or("none");
65        let role = if self.is_validator {
66            "validator"
67        } else {
68            "full_node"
69        };
70        let peers_mode = if self.persistent_peers_only {
71            "persistent_only"
72        } else {
73            "open"
74        };
75        write!(
76            f,
77            "{}, {}, {}, {}, {}, {}, {}, me",
78            self.listen_addr, self.moniker, role, self.peer_id, address, topics_str, peers_mode
79        )
80    }
81}
82
83/// Peer information without slot number (for State, which has no cardinality limits)
84#[derive(Clone, Debug)]
85pub struct PeerInfo {
86    pub address: Multiaddr,
87    pub consensus_address: String, // Consensus address as string (for validator matching)
88    pub moniker: String,
89    pub peer_type: PeerType,
90    pub connection_direction: Option<ConnectionDirection>, // None if ephemeral (unknown)
91    pub score: f64,
92    pub topics: HashSet<String>, // Set of topics peer is in mesh for (e.g., "/consensus", "/liveness")
93    pub is_explicit: bool,       // Whether this peer is an explicit peer in gossipsub
94}
95
96impl PeerInfo {
97    /// Format peer info with peer_id for logging
98    ///  Address, Moniker, Type, PeerId, ConsensusAddr, Mesh, Dir, Score, Explicit
99    pub fn format_with_peer_id(&self, peer_id: &libp2p::PeerId) -> String {
100        let direction = self.connection_direction.map_or("??", |d| d.as_str());
101        let mut topics: Vec<&str> = self.topics.iter().map(|s| s.as_str()).collect();
102        topics.sort();
103        let topics_str = format!("[{}]", topics.join(","));
104        let peer_type_str = self.peer_type.primary_type_str();
105        let address = if self.consensus_address.is_empty() {
106            "none"
107        } else {
108            &self.consensus_address
109        };
110        let explicit = if self.is_explicit { "explicit" } else { "-" };
111        format!(
112            "{}, {}, {}, {}, {}, {}, {}, {}, {}",
113            self.address,
114            self.moniker,
115            peer_type_str,
116            peer_id,
117            address,
118            topics_str,
119            direction,
120            self.score as i64,
121            explicit
122        )
123    }
124}
125
126#[derive(Debug)]
127pub struct State {
128    pub sync_channels: HashMap<InboundRequestId, sync::ResponseChannel>,
129    pub discovery: discovery::Discovery<Behaviour>,
130    pub persistent_peer_ids: HashSet<libp2p::PeerId>,
131    pub persistent_peer_addrs: Vec<Multiaddr>,
132    /// Latest validator set from consensus
133    pub validator_set: Vec<ValidatorInfo>,
134    pub(crate) metrics: NetworkMetrics,
135    /// Local node information
136    pub local_node: LocalNodeInfo,
137    /// Detailed peer information indexed by PeerId (for RPC queries and metrics)
138    pub peer_info: HashMap<libp2p::PeerId, PeerInfo>,
139}
140
141impl State {
142    /// Process a validator set update from consensus.
143    ///
144    /// This method:
145    /// - Updates the validator set
146    /// - Updates local node validator status and metrics
147    /// - Re-classifies all connected peers based on the new validator set
148    ///
149    /// Returns a list of (peer_id, new_score) for peers whose type changed,
150    /// so the caller can update GossipSub scores.
151    pub(crate) fn process_validator_set_update(
152        &mut self,
153        new_validators: Vec<ValidatorInfo>,
154    ) -> Vec<(libp2p::PeerId, f64)> {
155        // Store the new validator set
156        self.validator_set = new_validators;
157
158        self.reclassify_local_node();
159
160        // Re-classify all connected peers
161        self.reclassify_peers()
162    }
163
164    /// Re-classify the local node based on the current validator set.
165    fn reclassify_local_node(&mut self) {
166        let was_validator = self.local_node.is_validator;
167        // Update local node status
168        let local_is_validator = self
169            .local_node
170            .consensus_address
171            .as_ref()
172            .map(|addr| self.validator_set.iter().any(|v| &v.address == addr))
173            .unwrap_or(false);
174
175        self.local_node.is_validator = local_is_validator;
176
177        // Log and update metrics for local node status change
178        if was_validator != local_is_validator {
179            tracing::info!(
180                local_is_validator,
181                address = ?self.local_node.consensus_address,
182                "Local node validator status changed"
183            );
184            self.metrics.set_local_node_info(&self.local_node);
185        }
186    }
187
188    /// Re-classify all connected peers based on the current validator set.
189    ///
190    /// Returns a list of (peer_id, new_score) for peers whose type changed.
191    fn reclassify_peers(&mut self) -> Vec<(libp2p::PeerId, f64)> {
192        let mut changed_peers = Vec::new();
193
194        for (peer_id, peer_info) in self.peer_info.iter_mut() {
195            // Check if advertised address matches a validator in the set
196            let is_validator = if let Some(validator_info) = self
197                .validator_set
198                .iter()
199                .find(|v| v.address == peer_info.consensus_address)
200            {
201                peer_info.consensus_address = validator_info.address.clone();
202                true
203            } else {
204                false
205            };
206
207            // Preserve persistent status, update validator status
208            let new_type = peer_info.peer_type.with_validator_status(is_validator);
209
210            // Clone old info for metrics BEFORE updating fields
211            let old_peer_info = peer_info.clone();
212
213            if let Some(new_score) = apply_peer_type_change(
214                peer_id,
215                peer_info,
216                &old_peer_info,
217                new_type,
218                &mut self.metrics,
219            ) {
220                changed_peers.push((*peer_id, new_score));
221            }
222        }
223
224        changed_peers
225    }
226
227    pub(crate) fn new(
228        discovery: discovery::Discovery<Behaviour>,
229        persistent_peer_addrs: Vec<Multiaddr>,
230        local_node: LocalNodeInfo,
231        metrics: NetworkMetrics,
232    ) -> Self {
233        // Extract PeerIds from persistent peer Multiaddrs if they contain /p2p/<peer_id>
234        let persistent_peer_ids = persistent_peer_addrs
235            .iter()
236            .filter_map(extract_peer_id_from_multiaddr)
237            .collect();
238
239        Self {
240            sync_channels: Default::default(),
241            discovery,
242            persistent_peer_ids,
243            persistent_peer_addrs,
244            validator_set: Vec::new(),
245            metrics,
246            local_node,
247            peer_info: HashMap::new(),
248        }
249    }
250
251    /// Determine the peer type based on peer ID and identify info
252    pub(crate) fn peer_type(
253        &self,
254        peer_id: &libp2p::PeerId,
255        connection_id: libp2p::swarm::ConnectionId,
256        info: &identify::Info,
257    ) -> PeerType {
258        let is_persistent = self.persistent_peer_ids.contains(peer_id)
259            || self.is_persistent_peer_by_address(connection_id);
260
261        // Extract validator address from agent_version and check if it's in the validator set
262        let agent_info = crate::utils::parse_agent_version(&info.agent_version);
263        let is_validator = agent_info.address != "unknown"
264            && self
265                .validator_set
266                .iter()
267                .any(|v| v.address == agent_info.address);
268
269        PeerType::new(is_persistent, is_validator)
270    }
271
272    /// Check if a peer is a persistent peer by matching its addresses against persistent peer addresses
273    ///
274    /// For inbound connections, we use the actual remote address from the connection endpoint
275    /// to prevent address spoofing attacks where a malicious peer could claim to be a
276    /// persistent peer by faking its `listen_addrs` in the Identify message.
277    fn is_persistent_peer_by_address(&self, connection_id: libp2p::swarm::ConnectionId) -> bool {
278        // Use actual remote address for both inbound and outbound connections
279        // This prevents address spoofing for inbound, and for outbound it's the address we dialed
280        let Some(conn_info) = self.discovery.connections.get(&connection_id) else {
281            return false;
282        };
283
284        let remote_addr_without_p2p = strip_peer_id_from_multiaddr(&conn_info.remote_addr);
285
286        for persistent_addr in &self.persistent_peer_addrs {
287            let persistent_addr_without_p2p = strip_peer_id_from_multiaddr(persistent_addr);
288
289            if remote_addr_without_p2p == persistent_addr_without_p2p {
290                return true;
291            }
292        }
293
294        false
295    }
296
297    /// Update peer information from gossipsub (scores and mesh membership)
298    /// Also updates metrics based on the updated State
299    pub(crate) fn update_peer_info(
300        &mut self,
301        gossipsub: &libp2p_gossipsub::Behaviour,
302        channels: &[Channel],
303        channel_names: ChannelNames,
304    ) {
305        // Clean up disconnected peers from State
306        let current_peers: HashSet<libp2p::PeerId> =
307            gossipsub.all_peers().map(|(p, _)| *p).collect();
308        let tracked_peers: HashSet<libp2p::PeerId> = self.peer_info.keys().copied().collect();
309        let disconnected_peers: Vec<libp2p::PeerId> =
310            tracked_peers.difference(&current_peers).copied().collect();
311
312        for peer_id in disconnected_peers {
313            // Remove from State
314            if let Some(peer_info) = self.peer_info.remove(&peer_id) {
315                // Also free metric slot if peer has one
316                self.metrics.free_slot(&peer_id, &peer_info);
317            }
318        }
319
320        // Build a map of peer_id to the set of topics they're in
321        let mut peer_topics: HashMap<libp2p::PeerId, HashSet<String>> = HashMap::new();
322
323        for channel in channels {
324            let topic = channel.to_gossipsub_topic(channel_names);
325            let topic_hash = topic.hash();
326            let topic_str = channel.as_str(channel_names).to_string();
327
328            for peer_id in gossipsub.mesh_peers(&topic_hash) {
329                peer_topics
330                    .entry(*peer_id)
331                    .or_default()
332                    .insert(topic_str.clone());
333            }
334        }
335
336        // Update score and topics for all peers in State
337        for (peer_id, peer_info) in self.peer_info.iter_mut() {
338            let new_score = gossipsub.peer_score(peer_id).unwrap_or(0.0);
339            let new_topics = peer_topics.get(peer_id).cloned().unwrap_or_default();
340
341            // Update metrics before updating peer_info.topics
342            // (metrics needs to compare old vs new topics)
343            let _ = self.metrics.update_peer_metrics(
344                peer_id,
345                peer_info,
346                new_score,
347                Some(new_topics.clone()),
348            );
349
350            // Now update peer information in State
351            peer_info.score = new_score;
352            peer_info.topics = new_topics;
353        }
354    }
355
356    /// Update the peer information after Identify completes and compute peer score.
357    ///
358    /// This method:
359    /// - Determines the peer type (validator, persistent, etc.)
360    /// - Records peer info in state and metrics
361    /// - Computes the GossipSub score
362    ///
363    /// Returns the score to set on the peer in GossipSub.
364    pub(crate) fn update_peer(
365        &mut self,
366        peer_id: libp2p::PeerId,
367        connection_id: libp2p::swarm::ConnectionId,
368        info: &identify::Info,
369    ) -> f64 {
370        // Determine peer type using actual remote address for inbound connections
371        let peer_type = self.peer_type(&peer_id, connection_id, info);
372
373        // Track persistent peers
374        if peer_type.is_persistent() {
375            self.persistent_peer_ids.insert(peer_id);
376        }
377
378        // Determine peer type direction from discovery layer
379        let connection_direction = if self.discovery.is_outbound_peer(&peer_id) {
380            Some(ConnectionDirection::Outbound)
381        } else if self.discovery.is_inbound_peer(&peer_id) {
382            Some(ConnectionDirection::Inbound)
383        } else {
384            // ephemeral connection (not tracked, will be closed after timeout)
385            None
386        };
387
388        // Use actual connection address (dialed for outbound, source for inbound)
389        // This is more reliable than self-reported listen_addrs from identify
390        let address = self
391            .discovery
392            .connections
393            .get(&connection_id)
394            .map(|conn| conn.remote_addr.clone())
395            .unwrap_or_else(|| {
396                // Fallback to identify listen_addrs if connection info not available
397                info.listen_addrs
398                    .first()
399                    .cloned()
400                    .unwrap_or_else(|| "/ip4/0.0.0.0/tcp/0".parse().expect("valid multiaddr"))
401            });
402
403        // Parse agent_version to extract moniker and consensus address
404        let agent_info = crate::utils::parse_agent_version(&info.agent_version);
405
406        // TODO: The advertised address in agent_version is untrusted, any peer can claim any address.
407        // A malicious peer could impersonate a validator by advertising their address.
408        // Fix: Require peers to sign their libp2p PeerID with their consensus key to prove ownership.
409        let consensus_address = if peer_type.is_validator() {
410            // Use canonical address from validator set
411            self.validator_set
412                .iter()
413                .find(|v| v.address == agent_info.address)
414                .map(|v| v.address.clone())
415                .unwrap_or_else(|| agent_info.address.clone())
416        } else {
417            agent_info.address.clone()
418        };
419
420        // If peer already exists (additional connection), update Identify-provided fields.
421        // Keep existing state (topics) since they never fully disconnected.
422        if let Some(existing) = self.peer_info.get_mut(&peer_id) {
423            let old_peer_info = existing.clone();
424            existing.moniker = agent_info.moniker;
425            // Prefer outbound (dialed) addresses over inbound
426            if connection_direction == Some(ConnectionDirection::Outbound)
427                || existing.connection_direction != Some(ConnectionDirection::Outbound)
428            {
429                existing.address = address;
430                existing.connection_direction = connection_direction;
431            }
432            // Re-evaluate peer type and consensus address with current state
433            existing.peer_type = peer_type;
434            existing.consensus_address = consensus_address;
435            existing.score = crate::peer_scoring::get_peer_score(peer_type);
436
437            self.metrics
438                .update_peer_labels(&peer_id, &old_peer_info, existing);
439            return existing.score;
440        }
441
442        // New peer - create entry
443        let score = crate::peer_scoring::get_peer_score(peer_type);
444        let peer_info = PeerInfo {
445            address,
446            consensus_address,
447            moniker: agent_info.moniker,
448            peer_type,
449            connection_direction,
450            score,
451            topics: Default::default(),
452            is_explicit: false,
453        };
454
455        // Record peer information in metrics (subject to 100 slot limit)
456        self.metrics.record_new_peer(&peer_id, &peer_info);
457
458        // Store in State
459        self.peer_info.insert(peer_id, peer_info);
460
461        score
462    }
463
464    /// Format the peer information for logging (scrapable format):
465    ///  Address, Moniker, Type, PeerId, ConsensusAddr, Mesh, Dir, Score, Explicit
466    pub fn format_peer_info(&self) -> String {
467        let mut lines = Vec::new();
468
469        // Header
470        lines.push("Address, Moniker, Type, PeerId, ConsensusAddr, Mesh, Dir, Score".to_string());
471
472        // Local node info marked with "me"
473        lines.push(format!("{}", self.local_node));
474
475        // Sort peers by moniker
476        let mut peers: Vec<_> = self.peer_info.iter().collect();
477        peers.sort_by(|a, b| a.1.moniker.cmp(&b.1.moniker));
478
479        for (peer_id, peer_info) in peers {
480            lines.push(peer_info.format_with_peer_id(peer_id));
481        }
482
483        lines.join("\n")
484    }
485
486    /// Update peer's persistent status, recalculate score, and update GossipSub
487    fn update_peer_persistent_status(
488        peer_id: libp2p::PeerId,
489        peer_info: Option<&mut PeerInfo>,
490        is_persistent: bool,
491        swarm: &mut libp2p::Swarm<Behaviour>,
492    ) {
493        let Some(peer_info) = peer_info else {
494            return;
495        };
496
497        peer_info.peer_type = peer_info.peer_type.with_persistent(is_persistent);
498
499        // Recalculate score
500        let new_score = crate::peer_scoring::get_peer_score(peer_info.peer_type);
501        peer_info.score = new_score;
502
503        // Update GossipSub score
504        if let Some(gossipsub) = swarm.behaviour_mut().gossipsub.as_mut() {
505            gossipsub.set_application_score(&peer_id, new_score);
506        }
507
508        tracing::debug!(
509            %peer_id,
510            %is_persistent,
511            peer_type = ?peer_info.peer_type,
512            "Updated peer persistent status"
513        );
514    }
515
516    /// Add a persistent peer at runtime
517    pub(crate) fn add_persistent_peer(
518        &mut self,
519        addr: Multiaddr,
520        swarm: &mut libp2p::Swarm<Behaviour>,
521    ) -> Result<(), PersistentPeerError> {
522        // Check if already exists
523        if self.persistent_peer_addrs.contains(&addr) {
524            return Err(PersistentPeerError::AlreadyExists);
525        }
526
527        // Extract PeerId from multiaddr if present
528        if let Some(peer_id) = extract_peer_id_from_multiaddr(&addr) {
529            self.persistent_peer_ids.insert(peer_id);
530
531            // Update peer type and score if already connected
532            Self::update_peer_persistent_status(
533                peer_id,
534                self.peer_info.get_mut(&peer_id),
535                true,
536                swarm,
537            );
538        }
539
540        // Add to persistent peer list
541        self.persistent_peer_addrs.push(addr.clone());
542
543        // Update discovery layer to add this as a bootstrap node
544        self.discovery.add_bootstrap_node(addr.clone());
545
546        // Attempt to dial the new persistent peer
547        if let Err(e) = swarm.dial(addr.clone()) {
548            tracing::warn!(
549                error = %e,
550                addr = %addr,
551                "Failed to dial newly added persistent peer, will retry via discovery"
552            );
553            // Don't return error - the peer is added, dialing might succeed later
554        }
555
556        Ok(())
557    }
558
559    /// Remove a persistent peer at runtime
560    pub(crate) fn remove_persistent_peer(
561        &mut self,
562        addr: Multiaddr,
563        swarm: &mut libp2p::Swarm<Behaviour>,
564    ) -> Result<(), PersistentPeerError> {
565        // Check if exists and remove from persistent peer list
566        let Some(pos) = self.persistent_peer_addrs.iter().position(|a| a == &addr) else {
567            return Err(PersistentPeerError::NotFound);
568        };
569
570        self.persistent_peer_addrs.remove(pos);
571
572        // Look up the peer_id from discovery, learned via TLS/noise handshake
573        // when we successfully connected to this address
574        let peer_id = self.discovery.get_peer_id_for_addr(&addr);
575
576        if let Some(peer_id) = peer_id {
577            self.persistent_peer_ids.remove(&peer_id);
578
579            // Update peer type and score if connected
580            Self::update_peer_persistent_status(
581                peer_id,
582                self.peer_info.get_mut(&peer_id),
583                false,
584                swarm,
585            );
586
587            // If peer is connected, disconnect it if
588            // - `persistent_peers_only` is configured,
589            // - or outbound connection exists
590            // Do not disconnect if there are inbound connections as the peer might have us as their persistent peer
591            let should_disconnect =
592                self.local_node.persistent_peers_only || !self.discovery.is_inbound_peer(&peer_id);
593
594            if swarm.is_connected(&peer_id) && should_disconnect {
595                let _ = swarm.disconnect_peer_id(peer_id);
596                tracing::info!(%peer_id, %addr, "Disconnecting from removed persistent peer");
597            }
598        }
599
600        // Cancel any in-progress dial attempts for this address and peer
601        self.discovery.cancel_dial_attempts(&addr, peer_id);
602
603        // Update discovery layer
604        self.discovery.remove_bootstrap_node(&addr);
605
606        Ok(())
607    }
608}
609
610/// Extract PeerId from a Multiaddr if it contains a /p2p/<peer_id> component
611fn extract_peer_id_from_multiaddr(addr: &Multiaddr) -> Option<libp2p::PeerId> {
612    use libp2p::multiaddr::Protocol;
613
614    for protocol in addr.iter() {
615        if let Protocol::P2p(peer_id) = protocol {
616            return Some(peer_id);
617        }
618    }
619    None
620}
621
622/// Helper to apply a peer type change, updating score and metrics.
623///
624/// Takes old_peer_info for stale metric labels (before any modifications)
625/// and uses peer_info for current metric labels (after modifications).
626/// Returns Some(new_score) if any label field changed, None otherwise.
627fn apply_peer_type_change(
628    peer_id: &libp2p::PeerId,
629    peer_info: &mut PeerInfo,
630    old_peer_info: &PeerInfo,
631    new_type: PeerType,
632    metrics: &mut NetworkMetrics,
633) -> Option<f64> {
634    let new_score = crate::peer_scoring::get_peer_score(new_type);
635    peer_info.peer_type = new_type;
636    peer_info.score = new_score;
637
638    metrics
639        .update_peer_labels(peer_id, old_peer_info, peer_info)
640        .then_some(new_score)
641}