Skip to main content

hotmint_network/
service.rs

1use ruc::*;
2
3use std::collections::{HashMap, HashSet};
4use std::iter;
5use std::mem;
6use std::time::Instant;
7
8use futures::StreamExt;
9use hotmint_consensus::network::NetworkSink;
10use hotmint_types::sync::{SyncRequest, SyncResponse};
11use hotmint_types::{ConsensusMessage, ValidatorId};
12use litep2p::config::ConfigBuilder;
13use litep2p::protocol::notification::{
14    ConfigBuilder as NotifConfigBuilder, NotificationEvent, NotificationHandle, ValidationResult,
15};
16use litep2p::protocol::request_response::{
17    ConfigBuilder as ReqRespConfigBuilder, DialOptions, RequestResponseEvent, RequestResponseHandle,
18};
19use litep2p::transport::tcp::config::Config as TcpConfig;
20use litep2p::types::RequestId;
21use litep2p::types::multiaddr::Multiaddr;
22use litep2p::{Litep2p, Litep2pEvent, PeerId};
23use serde::{Deserialize, Serialize};
24use tokio::sync::{mpsc, watch};
25use tracing::{debug, info, trace, warn};
26
27use std::sync::Arc;
28
29use tokio::sync::RwLock;
30
31use crate::codec;
32use crate::peer::{PeerBook, PeerInfo, PeerRole};
33use crate::pex::{PexConfig, PexRequest, PexResponse};
34
35const NOTIF_PROTOCOL: &str = "/hotmint/consensus/notif/1";
36const REQ_RESP_PROTOCOL: &str = "/hotmint/consensus/reqresp/1";
37const SYNC_PROTOCOL: &str = "/hotmint/sync/1";
38const PEX_PROTOCOL: &str = "/hotmint/pex/1";
39const MAX_NOTIFICATION_SIZE: usize = 16 * 1024 * 1024;
40const MAINTENANCE_INTERVAL_SECS: u64 = 10;
41
42/// Maps ValidatorId <-> PeerId for routing
43#[derive(Clone)]
44pub struct PeerMap {
45    pub validator_to_peer: HashMap<ValidatorId, PeerId>,
46    pub peer_to_validator: HashMap<PeerId, ValidatorId>,
47}
48
49impl PeerMap {
50    pub fn new() -> Self {
51        Self {
52            validator_to_peer: HashMap::new(),
53            peer_to_validator: HashMap::new(),
54        }
55    }
56
57    pub fn insert(&mut self, vid: ValidatorId, pid: PeerId) {
58        self.validator_to_peer.insert(vid, pid);
59        self.peer_to_validator.insert(pid, vid);
60    }
61
62    pub fn remove(&mut self, vid: ValidatorId) -> Option<PeerId> {
63        if let Some(pid) = self.validator_to_peer.remove(&vid) {
64            self.peer_to_validator.remove(&pid);
65            Some(pid)
66        } else {
67            None
68        }
69    }
70}
71
72impl Default for PeerMap {
73    fn default() -> Self {
74        Self::new()
75    }
76}
77
78/// Status of a peer for external queries
79#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct PeerStatus {
81    pub validator_id: ValidatorId,
82    pub peer_id: String,
83}
84
85/// Commands sent from the NetworkSink to the NetworkService
86pub enum NetCommand {
87    Broadcast(Vec<u8>),
88    SendTo(ValidatorId, Vec<u8>),
89    AddPeer(ValidatorId, PeerId, Vec<Multiaddr>),
90    RemovePeer(ValidatorId),
91    /// Send a sync request to a specific peer
92    SyncRequest(PeerId, Vec<u8>),
93    /// Respond to a sync request
94    SyncRespond(RequestId, Vec<u8>),
95    /// Update peer_map from new validator set (epoch transition)
96    EpochChange(Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>),
97}
98
99/// Incoming sync request forwarded to the sync responder
100pub struct IncomingSyncRequest {
101    pub request_id: RequestId,
102    pub peer: PeerId,
103    pub request: SyncRequest,
104}
105
106/// All handles returned by [`NetworkService::create`].
107pub struct NetworkServiceHandles {
108    pub service: NetworkService,
109    pub sink: Litep2pNetworkSink,
110    pub msg_rx: mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>,
111    pub sync_req_rx: mpsc::Receiver<IncomingSyncRequest>,
112    pub sync_resp_rx: mpsc::Receiver<SyncResponse>,
113    pub peer_info_rx: watch::Receiver<Vec<PeerStatus>>,
114    pub connected_count_rx: watch::Receiver<usize>,
115    /// Number of peers with an open notification (consensus) substream.
116    /// Reaches >0 once the notification handshake completes, which is
117    /// later than the raw connection and avoids the need for a fixed sleep.
118    pub notif_connected_count_rx: watch::Receiver<usize>,
119}
120
121/// NetworkService wraps litep2p and provides consensus-level networking
122pub struct NetworkService {
123    litep2p: Litep2p,
124    notif_handle: NotificationHandle,
125    reqresp_handle: RequestResponseHandle,
126    sync_handle: RequestResponseHandle,
127    pex_handle: RequestResponseHandle,
128    peer_map: PeerMap,
129    peer_book: Arc<RwLock<PeerBook>>,
130    pex_config: PexConfig,
131    persistent_peers: HashMap<ValidatorId, PeerId>,
132    /// Addresses to dial at startup (from persistent_peers config).
133    initial_dial_addresses: Vec<Multiaddr>,
134    /// Whether to relay received consensus messages to other connected peers.
135    relay_consensus: bool,
136    /// Public keys of known validators, used to verify individual message signatures
137    /// before relaying.  Updated on epoch transitions via [`NetCommand::EpochChange`].
138    validator_keys: HashMap<ValidatorId, hotmint_types::crypto::PublicKey>,
139    /// Validators in round-robin order for leader-for-view computation.
140    /// Must match the ordering in `ValidatorSet::validators()`.
141    validator_ids_ordered: Vec<ValidatorId>,
142    /// Blake3 hash of the chain identifier — used for relay signature verification.
143    chain_id_hash: [u8; 32],
144    msg_tx: mpsc::Sender<(Option<ValidatorId>, ConsensusMessage)>,
145    cmd_rx: mpsc::Receiver<NetCommand>,
146    sync_req_tx: mpsc::Sender<IncomingSyncRequest>,
147    sync_resp_tx: mpsc::Sender<SyncResponse>,
148    peer_info_tx: watch::Sender<Vec<PeerStatus>>,
149    connected_count_tx: watch::Sender<usize>,
150    /// Tracks peers with an open notification substream (post-handshake).
151    notif_connected_peers: HashSet<PeerId>,
152    notif_connected_count_tx: watch::Sender<usize>,
153    connected_peers: HashSet<PeerId>,
154    /// Two-set rotation for relay deduplication: when the active set fills up,
155    /// swap it to the backup position (clearing the old backup). Messages are
156    /// checked against both sets, so recent history is always preserved across
157    /// rotations. This avoids the brief relay-window that a single-set clear
158    /// would create.
159    seen_active: HashSet<u64>,
160    seen_backup: HashSet<u64>,
161    /// Reliable channel for epoch changes (F-02).
162    epoch_rx: watch::Receiver<Option<Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>>>,
163    /// Per-peer rate limiting for PEX requests (F-09).
164    pex_rate_limit: HashMap<PeerId, Instant>,
165}
166
167/// Configuration for creating a [`NetworkService`].
168pub struct NetworkConfig {
169    pub listen_addr: Multiaddr,
170    pub peer_map: PeerMap,
171    pub known_addresses: Vec<(PeerId, Vec<Multiaddr>)>,
172    pub keypair: Option<litep2p::crypto::ed25519::Keypair>,
173    pub peer_book: Arc<RwLock<PeerBook>>,
174    pub pex_config: PexConfig,
175    pub relay_consensus: bool,
176    pub initial_validators: Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>,
177    pub chain_id_hash: [u8; 32],
178}
179
180impl NetworkService {
181    /// Create the network service and all handles for the consensus engine.
182    pub fn create(config: NetworkConfig) -> Result<NetworkServiceHandles> {
183        let NetworkConfig {
184            listen_addr,
185            peer_map,
186            known_addresses,
187            keypair,
188            peer_book,
189            pex_config,
190            relay_consensus,
191            initial_validators,
192            chain_id_hash,
193        } = config;
194        let (notif_config, notif_handle) = NotifConfigBuilder::new(NOTIF_PROTOCOL.into())
195            .with_max_size(MAX_NOTIFICATION_SIZE)
196            .with_handshake(vec![])
197            .with_auto_accept_inbound(true)
198            .with_sync_channel_size(1024)
199            .with_async_channel_size(1024)
200            .build();
201
202        let (reqresp_config, reqresp_handle) = ReqRespConfigBuilder::new(REQ_RESP_PROTOCOL.into())
203            .with_max_size(MAX_NOTIFICATION_SIZE)
204            .build();
205
206        let (sync_config, sync_handle) = ReqRespConfigBuilder::new(SYNC_PROTOCOL.into())
207            .with_max_size(MAX_NOTIFICATION_SIZE)
208            .build();
209
210        let (pex_config_proto, pex_handle) = ReqRespConfigBuilder::new(PEX_PROTOCOL.into())
211            .with_max_size(1024 * 1024) // 1MB for peer lists
212            .build();
213
214        let mut config_builder = ConfigBuilder::new()
215            .with_tcp(TcpConfig {
216                listen_addresses: vec![listen_addr],
217                ..Default::default()
218            })
219            .with_notification_protocol(notif_config)
220            .with_request_response_protocol(reqresp_config)
221            .with_request_response_protocol(sync_config)
222            .with_request_response_protocol(pex_config_proto);
223
224        if let Some(kp) = keypair {
225            config_builder = config_builder.with_keypair(kp);
226        }
227
228        // Collect dial addresses with /p2p/<peer_id> suffix for startup dialing
229        let initial_dial_addresses: Vec<Multiaddr> = known_addresses
230            .iter()
231            .flat_map(|(pid, addrs)| {
232                addrs.iter().map(move |addr| {
233                    let mut full = addr.clone();
234                    full.push(litep2p::types::multiaddr::Protocol::P2p((*pid).into()));
235                    full
236                })
237            })
238            .collect();
239
240        if !known_addresses.is_empty() {
241            config_builder = config_builder.with_known_addresses(known_addresses.into_iter());
242        }
243
244        let litep2p =
245            Litep2p::new(config_builder.build()).c(d!("failed to create litep2p instance"))?;
246
247        info!(peer_id = %litep2p.local_peer_id(), "litep2p started");
248        for addr in litep2p.listen_addresses() {
249            info!(address = %addr, "listening on");
250        }
251
252        let (msg_tx, msg_rx) = mpsc::channel(8192);
253        let (cmd_tx, cmd_rx) = mpsc::channel(4096);
254        let (sync_req_tx, sync_req_rx) = mpsc::channel(256);
255        let (sync_resp_tx, sync_resp_rx) = mpsc::channel(256);
256
257        // Build initial peer info
258        let initial_peers: Vec<PeerStatus> = peer_map
259            .validator_to_peer
260            .iter()
261            .map(|(&vid, pid)| PeerStatus {
262                validator_id: vid,
263                peer_id: pid.to_string(),
264            })
265            .collect();
266        let (peer_info_tx, peer_info_rx) = watch::channel(initial_peers);
267
268        let (epoch_tx, epoch_rx) =
269            watch::channel::<Option<Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>>>(None);
270
271        let sink = Litep2pNetworkSink {
272            cmd_tx: cmd_tx.clone(),
273            epoch_tx,
274        };
275
276        let (connected_count_tx, connected_count_rx) = watch::channel(0usize);
277        let (notif_connected_count_tx, notif_connected_count_rx) = watch::channel(0usize);
278
279        // Save persistent peers for auto-reconnect
280        let persistent_peers: HashMap<ValidatorId, PeerId> = peer_map.validator_to_peer.clone();
281
282        let validator_ids_ordered: Vec<ValidatorId> =
283            initial_validators.iter().map(|(vid, _)| *vid).collect();
284        let validator_keys: HashMap<ValidatorId, hotmint_types::crypto::PublicKey> =
285            initial_validators.into_iter().collect();
286
287        Ok(NetworkServiceHandles {
288            service: Self {
289                litep2p,
290                notif_handle,
291                reqresp_handle,
292                sync_handle,
293                pex_handle,
294                peer_map,
295                peer_book,
296                pex_config,
297                persistent_peers,
298                initial_dial_addresses,
299                relay_consensus,
300                validator_keys,
301                validator_ids_ordered,
302                chain_id_hash,
303                msg_tx,
304                cmd_rx,
305                sync_req_tx,
306                sync_resp_tx,
307                peer_info_tx,
308                connected_count_tx,
309                notif_connected_peers: HashSet::new(),
310                notif_connected_count_tx,
311                connected_peers: HashSet::new(),
312                seen_active: HashSet::new(),
313                seen_backup: HashSet::new(),
314                epoch_rx,
315                pex_rate_limit: HashMap::new(),
316            },
317            sink,
318            msg_rx,
319            sync_req_rx,
320            sync_resp_rx,
321            peer_info_rx,
322            connected_count_rx,
323            notif_connected_count_rx,
324        })
325    }
326
327    pub fn local_peer_id(&self) -> &PeerId {
328        self.litep2p.local_peer_id()
329    }
330
331    /// Run the network event loop
332    pub async fn run(mut self) {
333        // Dial all persistent peers at startup using full multiaddrs.
334        for addr in mem::take(&mut self.initial_dial_addresses) {
335            if let Err(e) = self.litep2p.dial_address(addr.clone()).await {
336                debug!(address = %addr, error = ?e, "initial dial failed (will retry)");
337            }
338        }
339
340        let mut maintenance_interval =
341            tokio::time::interval(tokio::time::Duration::from_secs(MAINTENANCE_INTERVAL_SECS));
342        let mut pex_interval = tokio::time::interval(tokio::time::Duration::from_secs(
343            self.pex_config.request_interval_secs,
344        ));
345        loop {
346            tokio::select! {
347                event = self.notif_handle.next() => {
348                    if let Some(event) = event {
349                        self.handle_notification_event(event).await;
350                    }
351                }
352                event = self.reqresp_handle.next() => {
353                    if let Some(event) = event {
354                        self.handle_reqresp_event(event);
355                    }
356                }
357                event = self.sync_handle.next() => {
358                    if let Some(event) = event {
359                        self.handle_sync_event(event).await;
360                    }
361                }
362                event = self.pex_handle.next() => {
363                    if let Some(event) = event {
364                        self.handle_pex_event(event).await;
365                    }
366                }
367                event = self.litep2p.next_event() => {
368                    if let Some(event) = event {
369                        self.handle_litep2p_event(event).await;
370                    }
371                }
372                Some(cmd) = self.cmd_rx.recv() => {
373                    self.handle_command(cmd).await;
374                }
375                Ok(()) = self.epoch_rx.changed() => {
376                    let epoch_val = self.epoch_rx.borrow_and_update().clone();
377                    if let Some(validators) = epoch_val {
378                        self.handle_epoch_change(validators).await;
379                    }
380                }
381                _ = maintenance_interval.tick() => {
382                    self.run_maintenance().await;
383                }
384                _ = pex_interval.tick() => {
385                    if self.pex_config.enabled {
386                        self.run_pex_round().await;
387                    }
388                }
389            }
390        }
391    }
392
393    async fn handle_notification_event(&mut self, event: NotificationEvent) {
394        match event {
395            NotificationEvent::ValidateSubstream { peer, .. } => {
396                self.notif_handle
397                    .send_validation_result(peer, ValidationResult::Accept);
398            }
399            NotificationEvent::NotificationStreamOpened { peer, .. } => {
400                info!(peer = %peer, "notification stream opened");
401                self.notif_connected_peers.insert(peer);
402                let _ = self
403                    .notif_connected_count_tx
404                    .send(self.notif_connected_peers.len());
405            }
406            NotificationEvent::NotificationStreamClosed { peer } => {
407                debug!(peer = %peer, "notification stream closed");
408                self.notif_connected_peers.remove(&peer);
409                let _ = self
410                    .notif_connected_count_tx
411                    .send(self.notif_connected_peers.len());
412            }
413            NotificationEvent::NotificationReceived { peer, notification } => {
414                // Determine the sender ValidatorId (None if peer is not a known validator)
415                let sender: Option<ValidatorId> =
416                    self.peer_map.peer_to_validator.get(&peer).copied();
417
418                match codec::decode::<ConsensusMessage>(&notification) {
419                    Ok(msg) => {
420                        // F-08: Only deliver to consensus if sender is a known validator.
421                        if sender.is_some()
422                            && let Err(e) = self.msg_tx.try_send((sender, msg.clone()))
423                        {
424                            warn!("consensus message dropped (notification): {e}");
425                        }
426
427                        // Relay: re-broadcast to other connected peers (with two-set dedup).
428                        // Only relay from known validators whose individual message signature
429                        // is valid, preventing unknown peers from using this node as a DoS
430                        // amplifier.
431                        if self.relay_consensus
432                            && let Some(sid) = sender
433                            && hotmint_consensus::engine::verify_relay_sender(
434                                sid,
435                                &msg,
436                                &self.validator_keys,
437                                &self.validator_ids_ordered,
438                                &self.chain_id_hash,
439                            )
440                        {
441                            let msg_hash = u64::from_le_bytes(
442                                blake3::hash(&notification).as_bytes()[..8]
443                                    .try_into()
444                                    .unwrap(),
445                            );
446
447                            // Check both sets to avoid re-relay across rotations
448                            if !self.seen_active.contains(&msg_hash)
449                                && !self.seen_backup.contains(&msg_hash)
450                            {
451                                self.seen_active.insert(msg_hash);
452                                let raw = notification.to_vec();
453                                for &other in &self.connected_peers {
454                                    if other != peer {
455                                        let _ = self
456                                            .notif_handle
457                                            .send_sync_notification(other, raw.clone());
458                                    }
459                                }
460                                // Rotate: move active→backup, clear old backup
461                                if self.seen_active.len() > 10_000 {
462                                    self.seen_backup = mem::take(&mut self.seen_active);
463                                }
464                            }
465                            // F-31: Reward peer for a valid relayed consensus message.
466                            self.peer_book
467                                .write()
468                                .await
469                                .adjust_score(&peer.to_string(), 1);
470                        }
471                    }
472                    Err(e) => {
473                        warn!(error = %e, peer = %peer, "failed to decode notification");
474                        self.peer_book
475                            .write()
476                            .await
477                            .adjust_score(&peer.to_string(), -10);
478                    }
479                }
480            }
481            NotificationEvent::NotificationStreamOpenFailure { peer, error } => {
482                warn!(peer = %peer, error = ?error, "notification stream open failed");
483            }
484        }
485    }
486
487    fn handle_reqresp_event(&mut self, event: RequestResponseEvent) {
488        match event {
489            RequestResponseEvent::RequestReceived {
490                peer,
491                request_id,
492                request,
493                ..
494            } => {
495                let Some(sender) = self.peer_map.peer_to_validator.get(&peer).copied() else {
496                    warn!(peer = %peer, "dropping request from unknown peer");
497                    self.reqresp_handle.reject_request(request_id);
498                    return;
499                };
500                match codec::decode::<ConsensusMessage>(&request) {
501                    Ok(msg) => {
502                        if let Err(e) = self.msg_tx.try_send((Some(sender), msg)) {
503                            warn!("consensus message dropped (reqresp): {e}");
504                        }
505                        self.reqresp_handle.send_response(request_id, vec![]);
506                    }
507                    Err(e) => {
508                        warn!(error = %e, "failed to decode request");
509                        self.reqresp_handle.reject_request(request_id);
510                    }
511                }
512            }
513            RequestResponseEvent::ResponseReceived { .. } => {}
514            RequestResponseEvent::RequestFailed { peer, error, .. } => {
515                debug!(peer = %peer, error = ?error, "request failed");
516            }
517        }
518    }
519
520    async fn handle_sync_event(&mut self, event: RequestResponseEvent) {
521        match event {
522            RequestResponseEvent::RequestReceived {
523                peer,
524                request_id,
525                request,
526                ..
527            } => {
528                if !self.connected_peers.contains(&peer) {
529                    warn!(peer = %peer, "rejecting sync request from unconnected peer");
530                    self.sync_handle.reject_request(request_id);
531                    return;
532                }
533                match codec::decode::<SyncRequest>(&request) {
534                    Ok(req) => {
535                        if let Err(e) = self.sync_req_tx.try_send(IncomingSyncRequest {
536                            request_id,
537                            peer,
538                            request: req,
539                        }) {
540                            warn!("sync request dropped: {e}");
541                        }
542                    }
543                    Err(e) => {
544                        warn!(error = %e, peer = %peer, "failed to decode sync request");
545                        self.peer_book
546                            .write()
547                            .await
548                            .adjust_score(&peer.to_string(), -5);
549                        let err_resp = SyncResponse::Error(format!("decode error: {e}"));
550                        if let Ok(bytes) = codec::encode(&err_resp) {
551                            self.sync_handle.send_response(request_id, bytes);
552                        } else {
553                            self.sync_handle.reject_request(request_id);
554                        }
555                    }
556                }
557            }
558            RequestResponseEvent::ResponseReceived {
559                request_id: _,
560                response,
561                ..
562            } => {
563                // Forward sync response to the sync requester
564                match codec::decode::<SyncResponse>(&response) {
565                    Ok(resp) => {
566                        if let Err(e) = self.sync_resp_tx.try_send(resp) {
567                            warn!("sync response dropped: {e}");
568                        }
569                    }
570                    Err(e) => {
571                        warn!(error = %e, "failed to decode sync response");
572                    }
573                }
574            }
575            RequestResponseEvent::RequestFailed { peer, error, .. } => {
576                debug!(peer = %peer, error = ?error, "sync request failed");
577                if let Err(e) = self
578                    .sync_resp_tx
579                    .try_send(SyncResponse::Error(format!("request failed: {error:?}")))
580                {
581                    warn!("sync error response dropped: {e}");
582                }
583            }
584        }
585    }
586
587    async fn handle_pex_event(&mut self, event: RequestResponseEvent) {
588        match event {
589            RequestResponseEvent::RequestReceived {
590                peer,
591                request_id,
592                request,
593                ..
594            } => {
595                // P3: Only accept PEX from known peers (peers in peer_map or connected)
596                if !self.peer_map.peer_to_validator.contains_key(&peer)
597                    && !self.connected_peers.contains(&peer)
598                {
599                    warn!(peer = %peer, "rejecting PEX request from unknown peer");
600                    self.pex_handle.reject_request(request_id);
601                    return;
602                }
603                // F-09: Rate limit PEX requests per peer (max 1 every 10s).
604                let now = Instant::now();
605                if let Some(last) = self.pex_rate_limit.get(&peer)
606                    && now.duration_since(*last) < std::time::Duration::from_secs(10)
607                {
608                    self.pex_handle.reject_request(request_id);
609                    return;
610                }
611                self.pex_rate_limit.insert(peer, now);
612                match serde_cbor_2::from_slice::<PexRequest>(&request) {
613                    Ok(PexRequest::GetPeers) => {
614                        let book = self.peer_book.read().await;
615                        let private = &self.pex_config.private_peer_ids;
616                        let peers: Vec<PeerInfo> = book
617                            .get_random_peers(self.pex_config.max_peers_per_response)
618                            .into_iter()
619                            .filter(|p| p.peer_id != peer.to_string())
620                            // P4: exclude private peers from PEX responses
621                            .filter(|p| !private.contains(&p.peer_id))
622                            .cloned()
623                            .collect();
624                        let resp = PexResponse::Peers(peers);
625                        if let Ok(bytes) = serde_cbor_2::to_vec(&resp) {
626                            self.pex_handle.send_response(request_id, bytes);
627                        }
628                    }
629                    Ok(PexRequest::Advertise {
630                        role,
631                        validator_id,
632                        addresses,
633                    }) => {
634                        // F-10: Reject if role is "validator" but validator_id is None.
635                        if role == PeerRole::Validator && validator_id.is_none() {
636                            warn!(peer = %peer, "PEX Advertise claims validator role without validator_id");
637                            self.pex_handle.reject_request(request_id);
638                            return;
639                        }
640                        // P2: If claiming validator_id, verify PeerId matches peer_map
641                        if let Some(vid) = validator_id
642                            && let Some(&expected_peer) =
643                                self.peer_map.validator_to_peer.get(&ValidatorId(vid))
644                            && expected_peer != peer
645                        {
646                            warn!(
647                                peer = %peer,
648                                claimed_vid = vid,
649                                "PEX Advertise validator_id mismatch, rejecting"
650                            );
651                            self.pex_handle.reject_request(request_id);
652                            return;
653                        }
654                        // F-10: Limit addresses to max 8 to prevent abuse.
655                        let addresses: Vec<_> = addresses.iter().take(8).cloned().collect();
656                        let mut info = PeerInfo::new(
657                            peer,
658                            role,
659                            addresses.iter().filter_map(|a| a.parse().ok()).collect(),
660                        );
661                        if let Some(vid) = validator_id {
662                            info = info.with_validator(ValidatorId(vid));
663                        }
664                        self.peer_book.write().await.add_peer(info);
665                        if let Ok(bytes) = serde_cbor_2::to_vec(&PexResponse::Ack) {
666                            self.pex_handle.send_response(request_id, bytes);
667                        }
668                    }
669                    Err(e) => {
670                        warn!(error = %e, "failed to decode PEX request");
671                        self.pex_handle.reject_request(request_id);
672                    }
673                }
674            }
675            RequestResponseEvent::ResponseReceived { response, .. } => {
676                if let Ok(PexResponse::Peers(peers)) = serde_cbor_2::from_slice(&response) {
677                    let mut book = self.peer_book.write().await;
678                    for peer in peers {
679                        if !peer.is_banned() {
680                            book.add_peer(peer);
681                        }
682                    }
683                }
684            }
685            // F-27: Log unhandled PEX events instead of silently dropping.
686            other => {
687                trace!("unhandled PEX event: {other:?}");
688            }
689        }
690    }
691
692    /// Periodic connection maintenance: reconnect persistent peers, save peer book.
693    async fn run_maintenance(&mut self) {
694        // 1. Reconnect disconnected persistent peers.
695        // Collect addresses outside the lock scope to avoid holding it across await.
696        let to_dial: Vec<(PeerId, Vec<Multiaddr>)> = {
697            let book = self.peer_book.read().await;
698            self.persistent_peers
699                .values()
700                .filter(|pid| !self.connected_peers.contains(pid))
701                .filter_map(|&pid| {
702                    book.get(&pid.to_string()).map(|info| {
703                        let addrs: Vec<Multiaddr> = info
704                            .addresses
705                            .iter()
706                            .filter_map(|a| a.parse().ok())
707                            .collect();
708                        (pid, addrs)
709                    })
710                })
711                .collect()
712        };
713
714        for (pid, addrs) in to_dial {
715            if !addrs.is_empty() {
716                self.litep2p
717                    .add_known_address(pid, addrs.clone().into_iter());
718                // F-07: Also actively dial the first address to trigger reconnection.
719                let mut dial_addr = addrs[0].clone();
720                dial_addr.push(litep2p::types::multiaddr::Protocol::P2p(pid.into()));
721                if let Err(e) = self.litep2p.dial_address(dial_addr).await {
722                    debug!(peer = %pid, error = ?e, "persistent peer redial failed");
723                }
724            }
725        }
726
727        // 2. Try to connect to peers from book if under target
728        let max = self.pex_config.max_peers;
729        if self.connected_peers.len() < max * 4 / 5 {
730            let book = self.peer_book.read().await;
731            let candidates = book.get_random_peers(5);
732            for peer in candidates {
733                if let Ok(pid) = peer.peer_id.parse::<PeerId>()
734                    && !self.connected_peers.contains(&pid)
735                {
736                    let addrs: Vec<Multiaddr> = peer
737                        .addresses
738                        .iter()
739                        .filter_map(|a| a.parse().ok())
740                        .collect();
741                    if !addrs.is_empty() {
742                        self.litep2p.add_known_address(pid, addrs.into_iter());
743                    }
744                }
745            }
746        }
747
748        // 3. Prune stale peers (older than 24 hours) and persist
749        self.peer_book.write().await.prune_stale(86400);
750        if let Err(e) = self.peer_book.read().await.save() {
751            warn!(%e, "failed to save peer book");
752        }
753    }
754
755    /// Send a PEX GetPeers request to a random connected peer.
756    async fn run_pex_round(&mut self) {
757        if self.connected_peers.is_empty() {
758            return;
759        }
760        // Pick a random connected peer
761        let peers: Vec<PeerId> = self.connected_peers.iter().copied().collect();
762        let idx = rand::random::<usize>() % peers.len();
763        let target = peers[idx];
764
765        if let Ok(bytes) = serde_cbor_2::to_vec(&PexRequest::GetPeers) {
766            let _ = self
767                .pex_handle
768                .send_request(target, bytes, DialOptions::Reject)
769                .await;
770        }
771    }
772
773    async fn handle_litep2p_event(&mut self, event: Litep2pEvent) {
774        match event {
775            Litep2pEvent::ConnectionEstablished { peer, endpoint } => {
776                // Enforce total connection limit
777                if self.connected_peers.len() >= self.pex_config.max_peers {
778                    warn!(
779                        peer = %peer,
780                        total = self.connected_peers.len(),
781                        max = self.pex_config.max_peers,
782                        "connection limit reached, ignoring new peer"
783                    );
784                    return;
785                }
786
787                info!(peer = %peer, endpoint = ?endpoint, "connection established");
788                self.connected_peers.insert(peer);
789                let _ = self.connected_count_tx.send(self.connected_peers.len());
790
791                // Open notification substream to this peer for consensus messages.
792                if let Err(e) = self.notif_handle.try_open_substream_batch(iter::once(peer)) {
793                    debug!(peer = %peer, error = ?e, "failed to open notification substream");
794                }
795
796                // Update last_seen in peer book
797                if let Some(info) = self.peer_book.write().await.get_mut(&peer.to_string()) {
798                    info.touch();
799                }
800            }
801            Litep2pEvent::ConnectionClosed { peer, .. } => {
802                debug!(peer = %peer, "connection closed");
803                self.connected_peers.remove(&peer);
804                let _ = self.connected_count_tx.send(self.connected_peers.len());
805                // Eagerly remove from notif set; NotificationStreamClosed may
806                // arrive late or not at all when the TCP connection drops first.
807                if self.notif_connected_peers.remove(&peer) {
808                    let _ = self
809                        .notif_connected_count_tx
810                        .send(self.notif_connected_peers.len());
811                }
812            }
813            Litep2pEvent::DialFailure { address, error, .. } => {
814                warn!(address = %address, error = ?error, "dial failed");
815            }
816            // F-27: Log unhandled litep2p events instead of silently dropping.
817            other => {
818                trace!(?other, "unhandled litep2p event");
819            }
820        }
821    }
822
823    fn update_peer_info(&self) {
824        let peers: Vec<PeerStatus> = self
825            .peer_map
826            .validator_to_peer
827            .iter()
828            .map(|(&vid, pid)| PeerStatus {
829                validator_id: vid,
830                peer_id: pid.to_string(),
831            })
832            .collect();
833        let _ = self.peer_info_tx.send(peers);
834    }
835
836    async fn handle_command(&mut self, cmd: NetCommand) {
837        match cmd {
838            NetCommand::Broadcast(bytes) => {
839                // Broadcast to all connected peers (validators + fullnodes).
840                for &peer in &self.connected_peers {
841                    let _ = self
842                        .notif_handle
843                        .send_sync_notification(peer, bytes.clone());
844                }
845            }
846            NetCommand::SendTo(target, bytes) => {
847                if let Some(&peer_id) = self.peer_map.validator_to_peer.get(&target) {
848                    let _ = self
849                        .reqresp_handle
850                        .send_request(peer_id, bytes, DialOptions::Reject)
851                        .await;
852                }
853            }
854            NetCommand::AddPeer(vid, pid, addrs) => {
855                info!(validator = %vid, peer = %pid, "adding peer");
856                self.peer_map.insert(vid, pid);
857                self.litep2p.add_known_address(pid, addrs.into_iter());
858                self.update_peer_info();
859            }
860            NetCommand::RemovePeer(vid) => {
861                if let Some(pid) = self.peer_map.remove(vid) {
862                    info!(validator = %vid, peer = %pid, "removed peer");
863                } else {
864                    warn!(validator = %vid, "peer not found for removal");
865                }
866                self.update_peer_info();
867            }
868            NetCommand::SyncRequest(peer_id, bytes) => {
869                let _ = self
870                    .sync_handle
871                    .send_request(peer_id, bytes, DialOptions::Reject)
872                    .await;
873            }
874            NetCommand::SyncRespond(request_id, bytes) => {
875                self.sync_handle.send_response(request_id, bytes);
876            }
877            NetCommand::EpochChange(validators) => {
878                self.handle_epoch_change(validators).await;
879            }
880        }
881    }
882
883    async fn handle_epoch_change(
884        &mut self,
885        validators: Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>,
886    ) {
887        // Rebuild peer_map entries for new validators using PeerBook
888        for (vid, pubkey) in &validators {
889            if self.peer_map.validator_to_peer.contains_key(vid) {
890                continue;
891            }
892            // Try to find PeerId from PeerBook by looking up the public key
893            let pk_bytes = &pubkey.0;
894            if let Ok(lpk) = litep2p::crypto::ed25519::PublicKey::try_from_bytes(pk_bytes) {
895                let peer_id = lpk.to_peer_id();
896                info!(validator = %vid, peer = %peer_id, "adding new epoch validator to peer_map");
897                self.peer_map.insert(*vid, peer_id);
898            }
899        }
900        // Remove validators no longer in the set
901        let new_ids: HashSet<ValidatorId> = validators.iter().map(|(vid, _)| *vid).collect();
902        let to_remove: Vec<ValidatorId> = self
903            .peer_map
904            .validator_to_peer
905            .keys()
906            .filter(|vid| !new_ids.contains(vid))
907            .copied()
908            .collect();
909        for vid in to_remove {
910            info!(validator = %vid, "removing validator from peer_map after epoch change");
911            self.peer_map.remove(vid);
912        }
913        // Update relay verification key map to match new epoch
914        self.validator_ids_ordered = validators.iter().map(|(vid, _)| *vid).collect();
915        self.validator_keys = validators.into_iter().collect();
916        // F-28: Rebuild persistent_peers from updated peer_map
917        self.persistent_peers = self.peer_map.validator_to_peer.clone();
918        self.update_peer_info();
919    }
920}
921
922/// NetworkSink backed by litep2p, for use by the consensus engine.
923/// Also provides methods for peer management and sync.
924#[derive(Clone)]
925pub struct Litep2pNetworkSink {
926    cmd_tx: mpsc::Sender<NetCommand>,
927    epoch_tx: watch::Sender<Option<Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>>>,
928}
929
930impl Litep2pNetworkSink {
931    pub fn add_peer(&self, vid: ValidatorId, pid: PeerId, addrs: Vec<Multiaddr>) {
932        if let Err(e) = self.cmd_tx.try_send(NetCommand::AddPeer(vid, pid, addrs)) {
933            warn!("add_peer cmd dropped: {e}");
934        }
935    }
936
937    pub fn remove_peer(&self, vid: ValidatorId) {
938        if let Err(e) = self.cmd_tx.try_send(NetCommand::RemovePeer(vid)) {
939            warn!("remove_peer cmd dropped: {e}");
940        }
941    }
942
943    pub fn send_sync_request(&self, peer_id: PeerId, request: &SyncRequest) {
944        match codec::encode(request) {
945            Ok(bytes) => {
946                if let Err(e) = self
947                    .cmd_tx
948                    .try_send(NetCommand::SyncRequest(peer_id, bytes))
949                {
950                    warn!("sync request cmd dropped: {e}");
951                }
952            }
953            Err(e) => warn!("sync request encode failed: {e}"),
954        }
955    }
956
957    pub fn send_sync_response(&self, request_id: RequestId, response: &SyncResponse) {
958        match codec::encode(response) {
959            Ok(bytes) => {
960                if let Err(e) = self
961                    .cmd_tx
962                    .try_send(NetCommand::SyncRespond(request_id, bytes))
963                {
964                    warn!("sync response cmd dropped: {e}");
965                }
966            }
967            Err(e) => warn!("sync response encode failed: {e}"),
968        }
969    }
970}
971
972impl NetworkSink for Litep2pNetworkSink {
973    fn broadcast(&self, msg: ConsensusMessage) {
974        match codec::encode(&msg) {
975            Ok(bytes) => {
976                if let Err(e) = self.cmd_tx.try_send(NetCommand::Broadcast(bytes)) {
977                    warn!("broadcast cmd dropped: {e}");
978                }
979            }
980            Err(e) => warn!("broadcast encode failed: {e}"),
981        }
982    }
983
984    fn send_to(&self, target: ValidatorId, msg: ConsensusMessage) {
985        match codec::encode(&msg) {
986            Ok(bytes) => {
987                if let Err(e) = self.cmd_tx.try_send(NetCommand::SendTo(target, bytes)) {
988                    warn!("send_to cmd dropped for {target}: {e}");
989                }
990            }
991            Err(e) => warn!("send_to encode failed for {target}: {e}"),
992        }
993    }
994
995    fn on_epoch_change(&self, new_validator_set: &hotmint_types::ValidatorSet) {
996        let validators: Vec<_> = new_validator_set
997            .validators()
998            .iter()
999            .map(|v| (v.id, v.public_key.clone()))
1000            .collect();
1001        // F-02: Use dedicated watch channel so epoch changes are never dropped.
1002        let _ = self.epoch_tx.send(Some(validators));
1003    }
1004}