Skip to main content

hotmint_network/
service.rs

1use ruc::*;
2
3use std::collections::{HashMap, HashSet};
4use std::iter;
5use std::mem;
6use std::time::Instant;
7
8use futures::StreamExt;
9use hotmint_consensus::network::NetworkSink;
10use hotmint_types::epoch::EpochNumber;
11use hotmint_types::sync::{SyncRequest, SyncResponse};
12use hotmint_types::{ConsensusMessage, ValidatorId};
13use litep2p::config::ConfigBuilder;
14use litep2p::protocol::notification::{
15    ConfigBuilder as NotifConfigBuilder, NotificationEvent, NotificationHandle, ValidationResult,
16};
17use litep2p::protocol::request_response::{
18    ConfigBuilder as ReqRespConfigBuilder, DialOptions, RequestResponseEvent, RequestResponseHandle,
19};
20use litep2p::transport::tcp::config::Config as TcpConfig;
21use litep2p::types::RequestId;
22use litep2p::types::multiaddr::Multiaddr;
23use litep2p::{Litep2p, Litep2pEvent, PeerId};
24use serde::{Deserialize, Serialize};
25use tokio::sync::{mpsc, watch};
26use tracing::{debug, info, trace, warn};
27
28/// Type alias for the epoch update payload: (EpochNumber, validator public keys).
29type EpochUpdate = Option<(
30    EpochNumber,
31    Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>,
32)>;
33
34use std::sync::Arc;
35
36use tokio::sync::RwLock;
37
38use crate::codec;
39use crate::peer::{PeerBook, PeerInfo, PeerRole};
40use crate::pex::{PexConfig, PexRequest, PexResponse};
41
42const NOTIF_PROTOCOL: &str = "/hotmint/consensus/notif/1";
43const REQ_RESP_PROTOCOL: &str = "/hotmint/consensus/reqresp/1";
44const SYNC_PROTOCOL: &str = "/hotmint/sync/1";
45const PEX_PROTOCOL: &str = "/hotmint/pex/1";
46const MAX_NOTIFICATION_SIZE: usize = 16 * 1024 * 1024;
47const MAINTENANCE_INTERVAL_SECS: u64 = 10;
48
49/// Maps ValidatorId <-> PeerId for routing
50#[derive(Clone)]
51pub struct PeerMap {
52    pub validator_to_peer: HashMap<ValidatorId, PeerId>,
53    pub peer_to_validator: HashMap<PeerId, ValidatorId>,
54}
55
56impl PeerMap {
57    pub fn new() -> Self {
58        Self {
59            validator_to_peer: HashMap::new(),
60            peer_to_validator: HashMap::new(),
61        }
62    }
63
64    pub fn insert(&mut self, vid: ValidatorId, pid: PeerId) {
65        self.validator_to_peer.insert(vid, pid);
66        self.peer_to_validator.insert(pid, vid);
67    }
68
69    pub fn remove(&mut self, vid: ValidatorId) -> Option<PeerId> {
70        if let Some(pid) = self.validator_to_peer.remove(&vid) {
71            self.peer_to_validator.remove(&pid);
72            Some(pid)
73        } else {
74            None
75        }
76    }
77}
78
79impl Default for PeerMap {
80    fn default() -> Self {
81        Self::new()
82    }
83}
84
85/// Status of a peer for external queries
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct PeerStatus {
88    pub validator_id: ValidatorId,
89    pub peer_id: String,
90}
91
92/// Commands sent from the NetworkSink to the NetworkService
93pub enum NetCommand {
94    Broadcast(Vec<u8>),
95    SendTo(ValidatorId, Vec<u8>),
96    AddPeer(ValidatorId, PeerId, Vec<Multiaddr>),
97    RemovePeer(ValidatorId),
98    /// Send a sync request to a specific peer
99    SyncRequest(PeerId, Vec<u8>),
100    /// Respond to a sync request
101    SyncRespond(RequestId, Vec<u8>),
102    /// Update peer_map from new validator set (epoch transition)
103    EpochChange(Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>),
104}
105
106/// Incoming sync request forwarded to the sync responder
107pub struct IncomingSyncRequest {
108    pub request_id: RequestId,
109    pub peer: PeerId,
110    pub request: SyncRequest,
111}
112
113/// All handles returned by [`NetworkService::create`].
114pub struct NetworkServiceHandles {
115    pub service: NetworkService,
116    pub sink: Litep2pNetworkSink,
117    pub msg_rx: mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>,
118    pub sync_req_rx: mpsc::Receiver<IncomingSyncRequest>,
119    pub sync_resp_rx: mpsc::Receiver<SyncResponse>,
120    pub peer_info_rx: watch::Receiver<Vec<PeerStatus>>,
121    pub connected_count_rx: watch::Receiver<usize>,
122    /// Number of peers with an open notification (consensus) substream.
123    /// Reaches >0 once the notification handshake completes, which is
124    /// later than the raw connection and avoids the need for a fixed sleep.
125    pub notif_connected_count_rx: watch::Receiver<usize>,
126}
127
128/// NetworkService wraps litep2p and provides consensus-level networking
129pub struct NetworkService {
130    litep2p: Litep2p,
131    notif_handle: NotificationHandle,
132    reqresp_handle: RequestResponseHandle,
133    sync_handle: RequestResponseHandle,
134    pex_handle: RequestResponseHandle,
135    peer_map: PeerMap,
136    peer_book: Arc<RwLock<PeerBook>>,
137    pex_config: PexConfig,
138    persistent_peers: HashMap<ValidatorId, PeerId>,
139    /// Addresses to dial at startup (from persistent_peers config).
140    initial_dial_addresses: Vec<Multiaddr>,
141    /// Whether to relay received consensus messages to other connected peers.
142    relay_consensus: bool,
143    /// Public keys of known validators, used to verify individual message signatures
144    /// before relaying.  Updated on epoch transitions via [`NetCommand::EpochChange`].
145    validator_keys: HashMap<ValidatorId, hotmint_types::crypto::PublicKey>,
146    /// Validators in round-robin order for leader-for-view computation.
147    /// Must match the ordering in `ValidatorSet::validators()`.
148    validator_ids_ordered: Vec<ValidatorId>,
149    /// Blake3 hash of the chain identifier — used for relay signature verification.
150    chain_id_hash: [u8; 32],
151    /// Current epoch number — used for relay signature verification.
152    current_epoch: EpochNumber,
153    msg_tx: mpsc::Sender<(Option<ValidatorId>, ConsensusMessage)>,
154    cmd_rx: mpsc::Receiver<NetCommand>,
155    sync_req_tx: mpsc::Sender<IncomingSyncRequest>,
156    sync_resp_tx: mpsc::Sender<SyncResponse>,
157    peer_info_tx: watch::Sender<Vec<PeerStatus>>,
158    connected_count_tx: watch::Sender<usize>,
159    /// Tracks peers with an open notification substream (post-handshake).
160    notif_connected_peers: HashSet<PeerId>,
161    notif_connected_count_tx: watch::Sender<usize>,
162    connected_peers: HashSet<PeerId>,
163    /// Two-set rotation for relay deduplication: when the active set fills up,
164    /// swap it to the backup position (clearing the old backup). Messages are
165    /// checked against both sets, so recent history is always preserved across
166    /// rotations. This avoids the brief relay-window that a single-set clear
167    /// would create.
168    seen_active: HashSet<u64>,
169    seen_backup: HashSet<u64>,
170    /// Reliable channel for epoch changes (F-02).
171    epoch_rx: watch::Receiver<EpochUpdate>,
172    /// Per-peer rate limiting for PEX requests (F-09).
173    pex_rate_limit: HashMap<PeerId, Instant>,
174}
175
176/// Configuration for creating a [`NetworkService`].
177pub struct NetworkConfig {
178    pub listen_addr: Multiaddr,
179    pub peer_map: PeerMap,
180    pub known_addresses: Vec<(PeerId, Vec<Multiaddr>)>,
181    pub keypair: Option<litep2p::crypto::ed25519::Keypair>,
182    pub peer_book: Arc<RwLock<PeerBook>>,
183    pub pex_config: PexConfig,
184    pub relay_consensus: bool,
185    pub initial_validators: Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>,
186    pub chain_id_hash: [u8; 32],
187}
188
189impl NetworkService {
190    /// Create the network service and all handles for the consensus engine.
191    pub fn create(config: NetworkConfig) -> Result<NetworkServiceHandles> {
192        let NetworkConfig {
193            listen_addr,
194            peer_map,
195            known_addresses,
196            keypair,
197            peer_book,
198            pex_config,
199            relay_consensus,
200            initial_validators,
201            chain_id_hash,
202        } = config;
203        // H-6: Use chain_id_hash as handshake bytes for chain isolation.
204        // Peers on different chains will have mismatched handshakes and be rejected.
205        let (notif_config, notif_handle) = NotifConfigBuilder::new(NOTIF_PROTOCOL.into())
206            .with_max_size(MAX_NOTIFICATION_SIZE)
207            .with_handshake(chain_id_hash.to_vec())
208            .with_auto_accept_inbound(false)
209            .with_sync_channel_size(1024)
210            .with_async_channel_size(1024)
211            .build();
212
213        let (reqresp_config, reqresp_handle) = ReqRespConfigBuilder::new(REQ_RESP_PROTOCOL.into())
214            .with_max_size(MAX_NOTIFICATION_SIZE)
215            .build();
216
217        let (sync_config, sync_handle) = ReqRespConfigBuilder::new(SYNC_PROTOCOL.into())
218            .with_max_size(MAX_NOTIFICATION_SIZE)
219            .build();
220
221        let (pex_config_proto, pex_handle) = ReqRespConfigBuilder::new(PEX_PROTOCOL.into())
222            .with_max_size(1024 * 1024) // 1MB for peer lists
223            .build();
224
225        let mut config_builder = ConfigBuilder::new()
226            .with_tcp(TcpConfig {
227                listen_addresses: vec![listen_addr],
228                ..Default::default()
229            })
230            .with_notification_protocol(notif_config)
231            .with_request_response_protocol(reqresp_config)
232            .with_request_response_protocol(sync_config)
233            .with_request_response_protocol(pex_config_proto);
234
235        if let Some(kp) = keypair {
236            config_builder = config_builder.with_keypair(kp);
237        }
238
239        // Collect dial addresses with /p2p/<peer_id> suffix for startup dialing
240        let initial_dial_addresses: Vec<Multiaddr> = known_addresses
241            .iter()
242            .flat_map(|(pid, addrs)| {
243                addrs.iter().map(move |addr| {
244                    let mut full = addr.clone();
245                    full.push(litep2p::types::multiaddr::Protocol::P2p((*pid).into()));
246                    full
247                })
248            })
249            .collect();
250
251        if !known_addresses.is_empty() {
252            config_builder = config_builder.with_known_addresses(known_addresses.into_iter());
253        }
254
255        let litep2p =
256            Litep2p::new(config_builder.build()).c(d!("failed to create litep2p instance"))?;
257
258        info!(peer_id = %litep2p.local_peer_id(), "litep2p started");
259        for addr in litep2p.listen_addresses() {
260            info!(address = %addr, "listening on");
261        }
262
263        let (msg_tx, msg_rx) = mpsc::channel(8192);
264        let (cmd_tx, cmd_rx) = mpsc::channel(4096);
265        let (sync_req_tx, sync_req_rx) = mpsc::channel(256);
266        let (sync_resp_tx, sync_resp_rx) = mpsc::channel(256);
267
268        // Build initial peer info
269        let initial_peers: Vec<PeerStatus> = peer_map
270            .validator_to_peer
271            .iter()
272            .map(|(&vid, pid)| PeerStatus {
273                validator_id: vid,
274                peer_id: pid.to_string(),
275            })
276            .collect();
277        let (peer_info_tx, peer_info_rx) = watch::channel(initial_peers);
278
279        let (epoch_tx, epoch_rx) = watch::channel::<
280            Option<(
281                EpochNumber,
282                Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>,
283            )>,
284        >(None);
285
286        let sink = Litep2pNetworkSink {
287            cmd_tx: cmd_tx.clone(),
288            epoch_tx,
289        };
290
291        let (connected_count_tx, connected_count_rx) = watch::channel(0usize);
292        let (notif_connected_count_tx, notif_connected_count_rx) = watch::channel(0usize);
293
294        // Save persistent peers for auto-reconnect
295        let persistent_peers: HashMap<ValidatorId, PeerId> = peer_map.validator_to_peer.clone();
296
297        let validator_ids_ordered: Vec<ValidatorId> =
298            initial_validators.iter().map(|(vid, _)| *vid).collect();
299        let validator_keys: HashMap<ValidatorId, hotmint_types::crypto::PublicKey> =
300            initial_validators.into_iter().collect();
301
302        Ok(NetworkServiceHandles {
303            service: Self {
304                litep2p,
305                notif_handle,
306                reqresp_handle,
307                sync_handle,
308                pex_handle,
309                peer_map,
310                peer_book,
311                pex_config,
312                persistent_peers,
313                initial_dial_addresses,
314                relay_consensus,
315                validator_keys,
316                validator_ids_ordered,
317                chain_id_hash,
318                current_epoch: EpochNumber::GENESIS,
319                msg_tx,
320                cmd_rx,
321                sync_req_tx,
322                sync_resp_tx,
323                peer_info_tx,
324                connected_count_tx,
325                notif_connected_peers: HashSet::new(),
326                notif_connected_count_tx,
327                connected_peers: HashSet::new(),
328                seen_active: HashSet::new(),
329                seen_backup: HashSet::new(),
330                epoch_rx,
331                pex_rate_limit: HashMap::new(),
332            },
333            sink,
334            msg_rx,
335            sync_req_rx,
336            sync_resp_rx,
337            peer_info_rx,
338            connected_count_rx,
339            notif_connected_count_rx,
340        })
341    }
342
343    pub fn local_peer_id(&self) -> &PeerId {
344        self.litep2p.local_peer_id()
345    }
346
347    /// Run the network event loop
348    pub async fn run(mut self) {
349        // Dial all persistent peers at startup using full multiaddrs.
350        for addr in mem::take(&mut self.initial_dial_addresses) {
351            if let Err(e) = self.litep2p.dial_address(addr.clone()).await {
352                debug!(address = %addr, error = ?e, "initial dial failed (will retry)");
353            }
354        }
355
356        let mut maintenance_interval =
357            tokio::time::interval(tokio::time::Duration::from_secs(MAINTENANCE_INTERVAL_SECS));
358        let mut pex_interval = tokio::time::interval(tokio::time::Duration::from_secs(
359            self.pex_config.request_interval_secs,
360        ));
361        loop {
362            tokio::select! {
363                event = self.notif_handle.next() => {
364                    if let Some(event) = event {
365                        self.handle_notification_event(event).await;
366                    }
367                }
368                event = self.reqresp_handle.next() => {
369                    if let Some(event) = event {
370                        self.handle_reqresp_event(event);
371                    }
372                }
373                event = self.sync_handle.next() => {
374                    if let Some(event) = event {
375                        self.handle_sync_event(event).await;
376                    }
377                }
378                event = self.pex_handle.next() => {
379                    if let Some(event) = event {
380                        self.handle_pex_event(event).await;
381                    }
382                }
383                event = self.litep2p.next_event() => {
384                    if let Some(event) = event {
385                        self.handle_litep2p_event(event).await;
386                    }
387                }
388                Some(cmd) = self.cmd_rx.recv() => {
389                    self.handle_command(cmd).await;
390                }
391                Ok(()) = self.epoch_rx.changed() => {
392                    let epoch_val = self.epoch_rx.borrow_and_update().clone();
393                    if let Some((epoch_number, validators)) = epoch_val {
394                        self.current_epoch = epoch_number;
395                        self.handle_epoch_change(validators).await;
396                    }
397                }
398                _ = maintenance_interval.tick() => {
399                    self.run_maintenance().await;
400                }
401                _ = pex_interval.tick() => {
402                    if self.pex_config.enabled {
403                        self.run_pex_round().await;
404                    }
405                }
406            }
407        }
408    }
409
410    async fn handle_notification_event(&mut self, event: NotificationEvent) {
411        match event {
412            NotificationEvent::ValidateSubstream {
413                peer, handshake, ..
414            } => {
415                // H-6: Verify handshake carries our chain_id_hash.
416                // Reject peers on a different chain or with empty handshake.
417                if handshake.as_slice() == self.chain_id_hash.as_slice() {
418                    self.notif_handle
419                        .send_validation_result(peer, ValidationResult::Accept);
420                } else {
421                    warn!(peer = %peer, "rejecting peer: chain_id_hash handshake mismatch");
422                    self.notif_handle
423                        .send_validation_result(peer, ValidationResult::Reject);
424                }
425            }
426            NotificationEvent::NotificationStreamOpened { peer, .. } => {
427                info!(peer = %peer, "notification stream opened");
428                self.notif_connected_peers.insert(peer);
429                let _ = self
430                    .notif_connected_count_tx
431                    .send(self.notif_connected_peers.len());
432            }
433            NotificationEvent::NotificationStreamClosed { peer } => {
434                debug!(peer = %peer, "notification stream closed");
435                self.notif_connected_peers.remove(&peer);
436                let _ = self
437                    .notif_connected_count_tx
438                    .send(self.notif_connected_peers.len());
439            }
440            NotificationEvent::NotificationReceived { peer, notification } => {
441                // Determine the sender ValidatorId (None if peer is not a known validator)
442                let sender: Option<ValidatorId> =
443                    self.peer_map.peer_to_validator.get(&peer).copied();
444
445                match codec::decode::<ConsensusMessage>(&notification) {
446                    Ok(msg) => {
447                        // F-08: Only deliver to consensus if sender is a known validator.
448                        if sender.is_some()
449                            && let Err(e) = self.msg_tx.try_send((sender, msg.clone()))
450                        {
451                            warn!("consensus message dropped (notification): {e}");
452                        }
453
454                        // Relay: re-broadcast to other connected peers (with two-set dedup).
455                        // Only relay from known validators whose individual message signature
456                        // is valid, preventing unknown peers from using this node as a DoS
457                        // amplifier.
458                        if self.relay_consensus
459                            && let Some(sid) = sender
460                            && hotmint_consensus::engine::verify_relay_sender(
461                                sid,
462                                &msg,
463                                &self.validator_keys,
464                                &self.validator_ids_ordered,
465                                &self.chain_id_hash,
466                                self.current_epoch,
467                            )
468                        {
469                            let msg_hash = u64::from_le_bytes(
470                                blake3::hash(&notification).as_bytes()[..8]
471                                    .try_into()
472                                    .unwrap(),
473                            );
474
475                            // Check both sets to avoid re-relay across rotations
476                            if !self.seen_active.contains(&msg_hash)
477                                && !self.seen_backup.contains(&msg_hash)
478                            {
479                                self.seen_active.insert(msg_hash);
480                                let raw = notification.to_vec();
481                                for &other in &self.connected_peers {
482                                    if other != peer {
483                                        let _ = self
484                                            .notif_handle
485                                            .send_sync_notification(other, raw.clone());
486                                    }
487                                }
488                                // Rotate: move active→backup, clear old backup
489                                if self.seen_active.len() > 10_000 {
490                                    self.seen_backup = mem::take(&mut self.seen_active);
491                                }
492                            }
493                            // F-31: Reward peer for a valid relayed consensus message.
494                            self.peer_book
495                                .write()
496                                .await
497                                .adjust_score(&peer.to_string(), 1);
498                        }
499                    }
500                    Err(e) => {
501                        warn!(error = %e, peer = %peer, "failed to decode notification");
502                        self.peer_book
503                            .write()
504                            .await
505                            .adjust_score(&peer.to_string(), -10);
506                    }
507                }
508            }
509            NotificationEvent::NotificationStreamOpenFailure { peer, error } => {
510                warn!(peer = %peer, error = ?error, "notification stream open failed");
511            }
512        }
513    }
514
515    fn handle_reqresp_event(&mut self, event: RequestResponseEvent) {
516        match event {
517            RequestResponseEvent::RequestReceived {
518                peer,
519                request_id,
520                request,
521                ..
522            } => {
523                let Some(sender) = self.peer_map.peer_to_validator.get(&peer).copied() else {
524                    warn!(peer = %peer, "dropping request from unknown peer");
525                    self.reqresp_handle.reject_request(request_id);
526                    return;
527                };
528                match codec::decode::<ConsensusMessage>(&request) {
529                    Ok(msg) => {
530                        if let Err(e) = self.msg_tx.try_send((Some(sender), msg)) {
531                            warn!("consensus message dropped (reqresp): {e}");
532                        }
533                        self.reqresp_handle.send_response(request_id, vec![]);
534                    }
535                    Err(e) => {
536                        warn!(error = %e, "failed to decode request");
537                        self.reqresp_handle.reject_request(request_id);
538                    }
539                }
540            }
541            RequestResponseEvent::ResponseReceived { .. } => {}
542            RequestResponseEvent::RequestFailed { peer, error, .. } => {
543                debug!(peer = %peer, error = ?error, "request failed");
544            }
545        }
546    }
547
548    async fn handle_sync_event(&mut self, event: RequestResponseEvent) {
549        match event {
550            RequestResponseEvent::RequestReceived {
551                peer,
552                request_id,
553                request,
554                ..
555            } => {
556                // Only serve sync requests from peers with an active notification
557                // stream (which implies they passed the chain_id handshake).
558                if !self.notif_connected_peers.contains(&peer) {
559                    warn!(peer = %peer, "rejecting sync request: no notification stream (chain isolation)");
560                    self.sync_handle.reject_request(request_id);
561                    return;
562                }
563                match codec::decode::<SyncRequest>(&request) {
564                    Ok(req) => {
565                        if let Err(e) = self.sync_req_tx.try_send(IncomingSyncRequest {
566                            request_id,
567                            peer,
568                            request: req,
569                        }) {
570                            warn!("sync request dropped: {e}");
571                        }
572                    }
573                    Err(e) => {
574                        warn!(error = %e, peer = %peer, "failed to decode sync request");
575                        self.peer_book
576                            .write()
577                            .await
578                            .adjust_score(&peer.to_string(), -5);
579                        let err_resp = SyncResponse::Error(format!("decode error: {e}"));
580                        if let Ok(bytes) = codec::encode(&err_resp) {
581                            self.sync_handle.send_response(request_id, bytes);
582                        } else {
583                            self.sync_handle.reject_request(request_id);
584                        }
585                    }
586                }
587            }
588            RequestResponseEvent::ResponseReceived {
589                request_id: _,
590                response,
591                ..
592            } => {
593                // Forward sync response to the sync requester
594                match codec::decode::<SyncResponse>(&response) {
595                    Ok(resp) => {
596                        if let Err(e) = self.sync_resp_tx.try_send(resp) {
597                            warn!("sync response dropped: {e}");
598                        }
599                    }
600                    Err(e) => {
601                        warn!(error = %e, "failed to decode sync response");
602                    }
603                }
604            }
605            RequestResponseEvent::RequestFailed { peer, error, .. } => {
606                debug!(peer = %peer, error = ?error, "sync request failed");
607                if let Err(e) = self
608                    .sync_resp_tx
609                    .try_send(SyncResponse::Error(format!("request failed: {error:?}")))
610                {
611                    warn!("sync error response dropped: {e}");
612                }
613            }
614        }
615    }
616
617    async fn handle_pex_event(&mut self, event: RequestResponseEvent) {
618        match event {
619            RequestResponseEvent::RequestReceived {
620                peer,
621                request_id,
622                request,
623                ..
624            } => {
625                // P3: Only accept PEX from known peers (peers in peer_map or connected)
626                if !self.peer_map.peer_to_validator.contains_key(&peer)
627                    && !self.connected_peers.contains(&peer)
628                {
629                    warn!(peer = %peer, "rejecting PEX request from unknown peer");
630                    self.pex_handle.reject_request(request_id);
631                    return;
632                }
633                // F-09: Rate limit PEX requests per peer (max 1 every 10s).
634                let now = Instant::now();
635                if let Some(last) = self.pex_rate_limit.get(&peer)
636                    && now.duration_since(*last) < std::time::Duration::from_secs(10)
637                {
638                    self.pex_handle.reject_request(request_id);
639                    return;
640                }
641                self.pex_rate_limit.insert(peer, now);
642                match postcard::from_bytes::<PexRequest>(&request) {
643                    Ok(PexRequest::GetPeers) => {
644                        let book = self.peer_book.read().await;
645                        let private = &self.pex_config.private_peer_ids;
646                        let peers: Vec<PeerInfo> = book
647                            .get_random_peers(self.pex_config.max_peers_per_response)
648                            .into_iter()
649                            .filter(|p| p.peer_id != peer.to_string())
650                            // P4: exclude private peers from PEX responses
651                            .filter(|p| !private.contains(&p.peer_id))
652                            .cloned()
653                            .collect();
654                        let resp = PexResponse::Peers(peers);
655                        if let Ok(bytes) = postcard::to_allocvec(&resp) {
656                            self.pex_handle.send_response(request_id, bytes);
657                        }
658                    }
659                    Ok(PexRequest::Advertise {
660                        role,
661                        validator_id,
662                        addresses,
663                    }) => {
664                        // F-10: Reject if role is "validator" but validator_id is None.
665                        if role == PeerRole::Validator && validator_id.is_none() {
666                            warn!(peer = %peer, "PEX Advertise claims validator role without validator_id");
667                            self.pex_handle.reject_request(request_id);
668                            return;
669                        }
670                        // P2: If claiming validator_id, verify PeerId matches peer_map
671                        if let Some(vid) = validator_id
672                            && let Some(&expected_peer) =
673                                self.peer_map.validator_to_peer.get(&ValidatorId(vid))
674                            && expected_peer != peer
675                        {
676                            warn!(
677                                peer = %peer,
678                                claimed_vid = vid,
679                                "PEX Advertise validator_id mismatch, rejecting"
680                            );
681                            self.pex_handle.reject_request(request_id);
682                            return;
683                        }
684                        // F-10: Limit addresses to max 8 to prevent abuse.
685                        let addresses: Vec<_> = addresses.iter().take(8).cloned().collect();
686                        let mut info = PeerInfo::new(
687                            peer,
688                            role,
689                            addresses.iter().filter_map(|a| a.parse().ok()).collect(),
690                        );
691                        if let Some(vid) = validator_id {
692                            info = info.with_validator(ValidatorId(vid));
693                        }
694                        self.peer_book.write().await.add_peer(info);
695                        if let Ok(bytes) = postcard::to_allocvec(&PexResponse::Ack) {
696                            self.pex_handle.send_response(request_id, bytes);
697                        }
698                    }
699                    Err(e) => {
700                        warn!(error = %e, "failed to decode PEX request");
701                        self.pex_handle.reject_request(request_id);
702                    }
703                }
704            }
705            RequestResponseEvent::ResponseReceived { response, .. } => {
706                if let Ok(PexResponse::Peers(peers)) = postcard::from_bytes(&response) {
707                    let mut book = self.peer_book.write().await;
708                    for peer in peers {
709                        if !peer.is_banned() {
710                            book.add_peer(peer);
711                        }
712                    }
713                }
714            }
715            // F-27: Log unhandled PEX events instead of silently dropping.
716            other => {
717                trace!("unhandled PEX event: {other:?}");
718            }
719        }
720    }
721
722    /// Periodic connection maintenance: reconnect persistent peers, save peer book.
723    async fn run_maintenance(&mut self) {
724        // 1. Reconnect disconnected persistent peers.
725        // Collect addresses outside the lock scope to avoid holding it across await.
726        let to_dial: Vec<(PeerId, Vec<Multiaddr>)> = {
727            let book = self.peer_book.read().await;
728            self.persistent_peers
729                .values()
730                .filter(|pid| !self.connected_peers.contains(pid))
731                .filter_map(|&pid| {
732                    book.get(&pid.to_string()).map(|info| {
733                        let addrs: Vec<Multiaddr> = info
734                            .addresses
735                            .iter()
736                            .filter_map(|a| a.parse().ok())
737                            .collect();
738                        (pid, addrs)
739                    })
740                })
741                .collect()
742        };
743
744        for (pid, addrs) in to_dial {
745            if !addrs.is_empty() {
746                self.litep2p
747                    .add_known_address(pid, addrs.clone().into_iter());
748                // F-07: Also actively dial the first address to trigger reconnection.
749                let mut dial_addr = addrs[0].clone();
750                dial_addr.push(litep2p::types::multiaddr::Protocol::P2p(pid.into()));
751                if let Err(e) = self.litep2p.dial_address(dial_addr).await {
752                    debug!(peer = %pid, error = ?e, "persistent peer redial failed");
753                }
754            }
755        }
756
757        // 2. Try to connect to peers from book if under target
758        let max = self.pex_config.max_peers;
759        if self.connected_peers.len() < max * 4 / 5 {
760            let book = self.peer_book.read().await;
761            let candidates = book.get_random_peers(5);
762            for peer in candidates {
763                if let Ok(pid) = peer.peer_id.parse::<PeerId>()
764                    && !self.connected_peers.contains(&pid)
765                {
766                    let addrs: Vec<Multiaddr> = peer
767                        .addresses
768                        .iter()
769                        .filter_map(|a| a.parse().ok())
770                        .collect();
771                    if !addrs.is_empty() {
772                        self.litep2p.add_known_address(pid, addrs.into_iter());
773                    }
774                }
775            }
776        }
777
778        // 3. Prune stale peers (older than 24 hours) and persist
779        self.peer_book.write().await.prune_stale(86400);
780        if let Err(e) = self.peer_book.read().await.save() {
781            warn!(%e, "failed to save peer book");
782        }
783    }
784
785    /// Send a PEX GetPeers request to a random connected peer.
786    async fn run_pex_round(&mut self) {
787        if self.connected_peers.is_empty() {
788            return;
789        }
790        // Pick a random connected peer
791        let peers: Vec<PeerId> = self.connected_peers.iter().copied().collect();
792        let idx = rand::random::<usize>() % peers.len();
793        let target = peers[idx];
794
795        if let Ok(bytes) = postcard::to_allocvec(&PexRequest::GetPeers) {
796            let _ = self
797                .pex_handle
798                .send_request(target, bytes, DialOptions::Reject)
799                .await;
800        }
801    }
802
803    async fn handle_litep2p_event(&mut self, event: Litep2pEvent) {
804        match event {
805            Litep2pEvent::ConnectionEstablished { peer, endpoint } => {
806                // Enforce connection limit, but always admit validators.
807                // This prevents Eclipse attacks where an adversary fills all
808                // connection slots with Sybil nodes to isolate validators.
809                let is_validator = self.peer_map.peer_to_validator.contains_key(&peer);
810                if !is_validator && self.connected_peers.len() >= self.pex_config.max_peers {
811                    warn!(
812                        peer = %peer,
813                        total = self.connected_peers.len(),
814                        max = self.pex_config.max_peers,
815                        "connection limit reached, ignoring non-validator peer"
816                    );
817                    return;
818                }
819
820                info!(peer = %peer, endpoint = ?endpoint, "connection established");
821                self.connected_peers.insert(peer);
822                let _ = self.connected_count_tx.send(self.connected_peers.len());
823
824                // Open notification substream to this peer for consensus messages.
825                if let Err(e) = self.notif_handle.try_open_substream_batch(iter::once(peer)) {
826                    debug!(peer = %peer, error = ?e, "failed to open notification substream");
827                }
828
829                // Update last_seen in peer book
830                if let Some(info) = self.peer_book.write().await.get_mut(&peer.to_string()) {
831                    info.touch();
832                }
833            }
834            Litep2pEvent::ConnectionClosed { peer, .. } => {
835                debug!(peer = %peer, "connection closed");
836                self.connected_peers.remove(&peer);
837                let _ = self.connected_count_tx.send(self.connected_peers.len());
838                // Eagerly remove from notif set; NotificationStreamClosed may
839                // arrive late or not at all when the TCP connection drops first.
840                if self.notif_connected_peers.remove(&peer) {
841                    let _ = self
842                        .notif_connected_count_tx
843                        .send(self.notif_connected_peers.len());
844                }
845            }
846            Litep2pEvent::DialFailure { address, error, .. } => {
847                warn!(address = %address, error = ?error, "dial failed");
848            }
849            // F-27: Log unhandled litep2p events instead of silently dropping.
850            other => {
851                trace!(?other, "unhandled litep2p event");
852            }
853        }
854    }
855
856    fn update_peer_info(&self) {
857        let peers: Vec<PeerStatus> = self
858            .peer_map
859            .validator_to_peer
860            .iter()
861            .map(|(&vid, pid)| PeerStatus {
862                validator_id: vid,
863                peer_id: pid.to_string(),
864            })
865            .collect();
866        let _ = self.peer_info_tx.send(peers);
867    }
868
869    async fn handle_command(&mut self, cmd: NetCommand) {
870        match cmd {
871            NetCommand::Broadcast(bytes) => {
872                // Broadcast to all connected peers (validators + fullnodes).
873                for &peer in &self.connected_peers {
874                    let _ = self
875                        .notif_handle
876                        .send_sync_notification(peer, bytes.clone());
877                }
878            }
879            NetCommand::SendTo(target, bytes) => {
880                if let Some(&peer_id) = self.peer_map.validator_to_peer.get(&target) {
881                    let _ = self
882                        .reqresp_handle
883                        .send_request(peer_id, bytes, DialOptions::Reject)
884                        .await;
885                }
886            }
887            NetCommand::AddPeer(vid, pid, addrs) => {
888                info!(validator = %vid, peer = %pid, "adding peer");
889                self.peer_map.insert(vid, pid);
890                self.litep2p.add_known_address(pid, addrs.into_iter());
891                self.update_peer_info();
892            }
893            NetCommand::RemovePeer(vid) => {
894                if let Some(pid) = self.peer_map.remove(vid) {
895                    info!(validator = %vid, peer = %pid, "removed peer");
896                } else {
897                    warn!(validator = %vid, "peer not found for removal");
898                }
899                self.update_peer_info();
900            }
901            NetCommand::SyncRequest(peer_id, bytes) => {
902                let _ = self
903                    .sync_handle
904                    .send_request(peer_id, bytes, DialOptions::Reject)
905                    .await;
906            }
907            NetCommand::SyncRespond(request_id, bytes) => {
908                self.sync_handle.send_response(request_id, bytes);
909            }
910            NetCommand::EpochChange(validators) => {
911                self.handle_epoch_change(validators).await;
912            }
913        }
914    }
915
916    async fn handle_epoch_change(
917        &mut self,
918        validators: Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>,
919    ) {
920        // Rebuild peer_map entries for new validators using PeerBook
921        for (vid, pubkey) in &validators {
922            if self.peer_map.validator_to_peer.contains_key(vid) {
923                continue;
924            }
925            // Try to find PeerId from PeerBook by looking up the public key
926            let pk_bytes = &pubkey.0;
927            if let Ok(lpk) = litep2p::crypto::ed25519::PublicKey::try_from_bytes(pk_bytes) {
928                let peer_id = lpk.to_peer_id();
929                info!(validator = %vid, peer = %peer_id, "adding new epoch validator to peer_map");
930                self.peer_map.insert(*vid, peer_id);
931            }
932        }
933        // Remove validators no longer in the set
934        let new_ids: HashSet<ValidatorId> = validators.iter().map(|(vid, _)| *vid).collect();
935        let to_remove: Vec<ValidatorId> = self
936            .peer_map
937            .validator_to_peer
938            .keys()
939            .filter(|vid| !new_ids.contains(vid))
940            .copied()
941            .collect();
942        for vid in to_remove {
943            info!(validator = %vid, "removing validator from peer_map after epoch change");
944            self.peer_map.remove(vid);
945        }
946        // Update relay verification key map to match new epoch
947        self.validator_ids_ordered = validators.iter().map(|(vid, _)| *vid).collect();
948        self.validator_keys = validators.into_iter().collect();
949        // F-28: Rebuild persistent_peers from updated peer_map
950        self.persistent_peers = self.peer_map.validator_to_peer.clone();
951        self.update_peer_info();
952    }
953}
954
955/// NetworkSink backed by litep2p, for use by the consensus engine.
956/// Also provides methods for peer management and sync.
957#[derive(Clone)]
958pub struct Litep2pNetworkSink {
959    cmd_tx: mpsc::Sender<NetCommand>,
960    epoch_tx: watch::Sender<EpochUpdate>,
961}
962
963impl Litep2pNetworkSink {
964    pub fn add_peer(&self, vid: ValidatorId, pid: PeerId, addrs: Vec<Multiaddr>) {
965        if let Err(e) = self.cmd_tx.try_send(NetCommand::AddPeer(vid, pid, addrs)) {
966            warn!("add_peer cmd dropped: {e}");
967        }
968    }
969
970    pub fn remove_peer(&self, vid: ValidatorId) {
971        if let Err(e) = self.cmd_tx.try_send(NetCommand::RemovePeer(vid)) {
972            warn!("remove_peer cmd dropped: {e}");
973        }
974    }
975
976    pub fn send_sync_request(&self, peer_id: PeerId, request: &SyncRequest) {
977        match codec::encode(request) {
978            Ok(bytes) => {
979                if let Err(e) = self
980                    .cmd_tx
981                    .try_send(NetCommand::SyncRequest(peer_id, bytes))
982                {
983                    warn!("sync request cmd dropped: {e}");
984                }
985            }
986            Err(e) => warn!("sync request encode failed: {e}"),
987        }
988    }
989
990    pub fn send_sync_response(&self, request_id: RequestId, response: &SyncResponse) {
991        match codec::encode(response) {
992            Ok(bytes) => {
993                if let Err(e) = self
994                    .cmd_tx
995                    .try_send(NetCommand::SyncRespond(request_id, bytes))
996                {
997                    warn!("sync response cmd dropped: {e}");
998                }
999            }
1000            Err(e) => warn!("sync response encode failed: {e}"),
1001        }
1002    }
1003}
1004
1005impl NetworkSink for Litep2pNetworkSink {
1006    fn broadcast(&self, msg: ConsensusMessage) {
1007        match codec::encode(&msg) {
1008            Ok(bytes) => {
1009                if let Err(e) = self.cmd_tx.try_send(NetCommand::Broadcast(bytes)) {
1010                    warn!("broadcast cmd dropped: {e}");
1011                }
1012            }
1013            Err(e) => warn!("broadcast encode failed: {e}"),
1014        }
1015    }
1016
1017    fn send_to(&self, target: ValidatorId, msg: ConsensusMessage) {
1018        match codec::encode(&msg) {
1019            Ok(bytes) => {
1020                if let Err(e) = self.cmd_tx.try_send(NetCommand::SendTo(target, bytes)) {
1021                    warn!("send_to cmd dropped for {target}: {e}");
1022                }
1023            }
1024            Err(e) => warn!("send_to encode failed for {target}: {e}"),
1025        }
1026    }
1027
1028    fn on_epoch_change(&self, epoch: EpochNumber, new_validator_set: &hotmint_types::ValidatorSet) {
1029        let validators: Vec<_> = new_validator_set
1030            .validators()
1031            .iter()
1032            .map(|v| (v.id, v.public_key.clone()))
1033            .collect();
1034        // F-02: Use dedicated watch channel so epoch changes are never dropped.
1035        let _ = self.epoch_tx.send(Some((epoch, validators)));
1036    }
1037
1038    fn broadcast_evidence(&self, proof: &hotmint_types::evidence::EquivocationProof) {
1039        let msg = ConsensusMessage::Evidence(proof.clone());
1040        match codec::encode(&msg) {
1041            Ok(bytes) => {
1042                if let Err(e) = self.cmd_tx.try_send(NetCommand::Broadcast(bytes)) {
1043                    warn!("broadcast_evidence cmd dropped: {e}");
1044                }
1045            }
1046            Err(e) => warn!("broadcast_evidence encode failed: {e}"),
1047        }
1048    }
1049}