Skip to main content

hotmint_network/
service.rs

1use ruc::*;
2
3use std::collections::{HashMap, HashSet};
4use std::iter;
5use std::mem;
6use std::time::Instant;
7
8use futures::StreamExt;
9use hotmint_consensus::network::NetworkSink;
10use hotmint_types::epoch::EpochNumber;
11use hotmint_types::sync::{SyncRequest, SyncResponse};
12use hotmint_types::{ConsensusMessage, ValidatorId};
13use litep2p::config::ConfigBuilder;
14use litep2p::protocol::notification::{
15    ConfigBuilder as NotifConfigBuilder, NotificationEvent, NotificationHandle, ValidationResult,
16};
17use litep2p::protocol::request_response::{
18    ConfigBuilder as ReqRespConfigBuilder, DialOptions, RequestResponseEvent, RequestResponseHandle,
19};
20use litep2p::transport::tcp::config::Config as TcpConfig;
21use litep2p::types::RequestId;
22use litep2p::types::multiaddr::Multiaddr;
23use litep2p::{Litep2p, Litep2pEvent, PeerId};
24use serde::{Deserialize, Serialize};
25use tokio::sync::{mpsc, watch};
26use tracing::{debug, info, trace, warn};
27
28/// Type alias for the epoch update payload: (EpochNumber, validator public keys).
29type EpochUpdate = Option<(
30    EpochNumber,
31    Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>,
32)>;
33
34use std::sync::Arc;
35
36use tokio::sync::RwLock;
37
38use crate::codec;
39use crate::peer::{PeerBook, PeerInfo, PeerRole};
40use crate::pex::{PexConfig, PexRequest, PexResponse};
41
42const NOTIF_PROTOCOL: &str = "/hotmint/consensus/notif/1";
43const MEMPOOL_NOTIF_PROTOCOL: &str = "/hotmint/mempool/notif/1";
44const REQ_RESP_PROTOCOL: &str = "/hotmint/consensus/reqresp/1";
45const SYNC_PROTOCOL: &str = "/hotmint/sync/1";
46const PEX_PROTOCOL: &str = "/hotmint/pex/1";
47const MAX_NOTIFICATION_SIZE: usize = 16 * 1024 * 1024;
48/// Max size for a single mempool tx gossip message (512 KB).
49const MAX_MEMPOOL_NOTIF_SIZE: usize = 512 * 1024;
50const MAINTENANCE_INTERVAL_SECS: u64 = 10;
51
52/// Maps ValidatorId <-> PeerId for routing
53#[derive(Clone)]
54pub struct PeerMap {
55    pub validator_to_peer: HashMap<ValidatorId, PeerId>,
56    pub peer_to_validator: HashMap<PeerId, ValidatorId>,
57}
58
59impl PeerMap {
60    pub fn new() -> Self {
61        Self {
62            validator_to_peer: HashMap::new(),
63            peer_to_validator: HashMap::new(),
64        }
65    }
66
67    pub fn insert(&mut self, vid: ValidatorId, pid: PeerId) {
68        self.validator_to_peer.insert(vid, pid);
69        self.peer_to_validator.insert(pid, vid);
70    }
71
72    pub fn remove(&mut self, vid: ValidatorId) -> Option<PeerId> {
73        if let Some(pid) = self.validator_to_peer.remove(&vid) {
74            self.peer_to_validator.remove(&pid);
75            Some(pid)
76        } else {
77            None
78        }
79    }
80}
81
82impl Default for PeerMap {
83    fn default() -> Self {
84        Self::new()
85    }
86}
87
88/// Status of a peer for external queries
89#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct PeerStatus {
91    pub validator_id: ValidatorId,
92    pub peer_id: String,
93}
94
95/// Commands sent from the NetworkSink to the NetworkService
96pub enum NetCommand {
97    Broadcast(Vec<u8>),
98    SendTo(ValidatorId, Vec<u8>),
99    AddPeer(ValidatorId, PeerId, Vec<Multiaddr>),
100    RemovePeer(ValidatorId),
101    /// Send a sync request to a specific peer
102    SyncRequest(PeerId, Vec<u8>),
103    /// Respond to a sync request
104    SyncRespond(RequestId, Vec<u8>),
105    /// Update peer_map from new validator set (epoch transition)
106    EpochChange(Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>),
107    /// Broadcast a raw transaction to all connected peers via mempool gossip.
108    BroadcastTx(Vec<u8>),
109}
110
111/// Incoming sync request forwarded to the sync responder
112pub struct IncomingSyncRequest {
113    pub request_id: RequestId,
114    pub peer: PeerId,
115    pub request: SyncRequest,
116}
117
118/// All handles returned by [`NetworkService::create`].
119pub struct NetworkServiceHandles {
120    pub service: NetworkService,
121    pub sink: Litep2pNetworkSink,
122    pub msg_rx: mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>,
123    pub sync_req_rx: mpsc::Receiver<IncomingSyncRequest>,
124    pub sync_resp_rx: mpsc::Receiver<SyncResponse>,
125    pub peer_info_rx: watch::Receiver<Vec<PeerStatus>>,
126    pub connected_count_rx: watch::Receiver<usize>,
127    /// Number of peers with an open notification (consensus) substream.
128    /// Reaches >0 once the notification handshake completes, which is
129    /// later than the raw connection and avoids the need for a fixed sleep.
130    pub notif_connected_count_rx: watch::Receiver<usize>,
131    /// Receiver for transactions gossipped from peers via the mempool protocol.
132    pub mempool_tx_rx: mpsc::Receiver<Vec<u8>>,
133}
134
135/// NetworkService wraps litep2p and provides consensus-level networking
136pub struct NetworkService {
137    litep2p: Litep2p,
138    notif_handle: NotificationHandle,
139    mempool_notif_handle: NotificationHandle,
140    reqresp_handle: RequestResponseHandle,
141    sync_handle: RequestResponseHandle,
142    pex_handle: RequestResponseHandle,
143    peer_map: PeerMap,
144    peer_book: Arc<RwLock<PeerBook>>,
145    pex_config: PexConfig,
146    persistent_peers: HashMap<ValidatorId, PeerId>,
147    /// Addresses to dial at startup (from persistent_peers config).
148    initial_dial_addresses: Vec<Multiaddr>,
149    /// Whether to relay received consensus messages to other connected peers.
150    relay_consensus: bool,
151    /// Public keys of known validators, used to verify individual message signatures
152    /// before relaying.  Updated on epoch transitions via [`NetCommand::EpochChange`].
153    validator_keys: HashMap<ValidatorId, hotmint_types::crypto::PublicKey>,
154    /// Validators in round-robin order for leader-for-view computation.
155    /// Must match the ordering in `ValidatorSet::validators()`.
156    validator_ids_ordered: Vec<ValidatorId>,
157    /// Blake3 hash of the chain identifier — used for relay signature verification.
158    chain_id_hash: [u8; 32],
159    /// Current epoch number — used for relay signature verification.
160    current_epoch: EpochNumber,
161    msg_tx: mpsc::Sender<(Option<ValidatorId>, ConsensusMessage)>,
162    cmd_rx: mpsc::Receiver<NetCommand>,
163    sync_req_tx: mpsc::Sender<IncomingSyncRequest>,
164    sync_resp_tx: mpsc::Sender<SyncResponse>,
165    peer_info_tx: watch::Sender<Vec<PeerStatus>>,
166    connected_count_tx: watch::Sender<usize>,
167    /// Tracks peers with an open notification substream (post-handshake).
168    notif_connected_peers: HashSet<PeerId>,
169    notif_connected_count_tx: watch::Sender<usize>,
170    connected_peers: HashSet<PeerId>,
171    /// Two-set rotation for relay deduplication: when the active set fills up,
172    /// swap it to the backup position (clearing the old backup). Messages are
173    /// checked against both sets, so recent history is always preserved across
174    /// rotations. This avoids the brief relay-window that a single-set clear
175    /// would create.
176    seen_active: HashSet<u64>,
177    seen_backup: HashSet<u64>,
178    /// Reliable channel for epoch changes (F-02).
179    epoch_rx: watch::Receiver<EpochUpdate>,
180    /// Per-peer rate limiting for PEX requests (F-09).
181    pex_rate_limit: HashMap<PeerId, Instant>,
182    /// Sender for transactions received via mempool gossip.
183    mempool_tx_tx: mpsc::Sender<Vec<u8>>,
184    /// Dedup sets for mempool gossip (two-set rotation like relay dedup).
185    mempool_seen_active: HashSet<[u8; 32]>,
186    mempool_seen_backup: HashSet<[u8; 32]>,
187    /// Peers with open mempool notification substreams.
188    mempool_notif_connected_peers: HashSet<PeerId>,
189    /// C-2: Per-peer mempool tx gossip rate limit (Instant, count).
190    mempool_peer_rate: HashMap<PeerId, (Instant, u32)>,
191}
192
193/// Configuration for creating a [`NetworkService`].
194pub struct NetworkConfig {
195    pub listen_addr: Multiaddr,
196    pub peer_map: PeerMap,
197    pub known_addresses: Vec<(PeerId, Vec<Multiaddr>)>,
198    pub keypair: Option<litep2p::crypto::ed25519::Keypair>,
199    pub peer_book: Arc<RwLock<PeerBook>>,
200    pub pex_config: PexConfig,
201    pub relay_consensus: bool,
202    pub initial_validators: Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>,
203    pub chain_id_hash: [u8; 32],
204}
205
206impl NetworkService {
207    /// Create the network service and all handles for the consensus engine.
208    pub fn create(config: NetworkConfig) -> Result<NetworkServiceHandles> {
209        let NetworkConfig {
210            listen_addr,
211            peer_map,
212            known_addresses,
213            keypair,
214            peer_book,
215            pex_config,
216            relay_consensus,
217            initial_validators,
218            chain_id_hash,
219        } = config;
220        // H-6: Use chain_id_hash as handshake bytes for chain isolation.
221        // Peers on different chains will have mismatched handshakes and be rejected.
222        let (notif_config, notif_handle) = NotifConfigBuilder::new(NOTIF_PROTOCOL.into())
223            .with_max_size(MAX_NOTIFICATION_SIZE)
224            .with_handshake(chain_id_hash.to_vec())
225            .with_auto_accept_inbound(false)
226            .with_sync_channel_size(1024)
227            .with_async_channel_size(1024)
228            .build();
229
230        let (reqresp_config, reqresp_handle) = ReqRespConfigBuilder::new(REQ_RESP_PROTOCOL.into())
231            .with_max_size(MAX_NOTIFICATION_SIZE)
232            .build();
233
234        let (sync_config, sync_handle) = ReqRespConfigBuilder::new(SYNC_PROTOCOL.into())
235            .with_max_size(MAX_NOTIFICATION_SIZE)
236            .build();
237
238        let (pex_config_proto, pex_handle) = ReqRespConfigBuilder::new(PEX_PROTOCOL.into())
239            .with_max_size(1024 * 1024) // 1MB for peer lists
240            .build();
241
242        let (mempool_notif_config, mempool_notif_handle) =
243            NotifConfigBuilder::new(MEMPOOL_NOTIF_PROTOCOL.into())
244                .with_max_size(MAX_MEMPOOL_NOTIF_SIZE)
245                .with_handshake(chain_id_hash.to_vec())
246                .with_auto_accept_inbound(true)
247                .with_sync_channel_size(2048)
248                .with_async_channel_size(2048)
249                .build();
250
251        let mut config_builder = ConfigBuilder::new()
252            .with_tcp(TcpConfig {
253                listen_addresses: vec![listen_addr],
254                ..Default::default()
255            })
256            .with_notification_protocol(notif_config)
257            .with_notification_protocol(mempool_notif_config)
258            .with_request_response_protocol(reqresp_config)
259            .with_request_response_protocol(sync_config)
260            .with_request_response_protocol(pex_config_proto);
261
262        if let Some(kp) = keypair {
263            config_builder = config_builder.with_keypair(kp);
264        }
265
266        // Collect dial addresses with /p2p/<peer_id> suffix for startup dialing
267        let initial_dial_addresses: Vec<Multiaddr> = known_addresses
268            .iter()
269            .flat_map(|(pid, addrs)| {
270                addrs.iter().map(move |addr| {
271                    let mut full = addr.clone();
272                    full.push(litep2p::types::multiaddr::Protocol::P2p((*pid).into()));
273                    full
274                })
275            })
276            .collect();
277
278        if !known_addresses.is_empty() {
279            config_builder = config_builder.with_known_addresses(known_addresses.into_iter());
280        }
281
282        let litep2p =
283            Litep2p::new(config_builder.build()).c(d!("failed to create litep2p instance"))?;
284
285        info!(peer_id = %litep2p.local_peer_id(), "litep2p started");
286        for addr in litep2p.listen_addresses() {
287            info!(address = %addr, "listening on");
288        }
289
290        let (msg_tx, msg_rx) = mpsc::channel(8192);
291        let (cmd_tx, cmd_rx) = mpsc::channel(4096);
292        let (sync_req_tx, sync_req_rx) = mpsc::channel(256);
293        let (sync_resp_tx, sync_resp_rx) = mpsc::channel(256);
294        let (mempool_tx_tx, mempool_tx_rx) = mpsc::channel(4096);
295
296        // Build initial peer info
297        let initial_peers: Vec<PeerStatus> = peer_map
298            .validator_to_peer
299            .iter()
300            .map(|(&vid, pid)| PeerStatus {
301                validator_id: vid,
302                peer_id: pid.to_string(),
303            })
304            .collect();
305        let (peer_info_tx, peer_info_rx) = watch::channel(initial_peers);
306
307        let (epoch_tx, epoch_rx) = watch::channel::<
308            Option<(
309                EpochNumber,
310                Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>,
311            )>,
312        >(None);
313
314        let sink = Litep2pNetworkSink {
315            cmd_tx: cmd_tx.clone(),
316            epoch_tx,
317        };
318
319        let (connected_count_tx, connected_count_rx) = watch::channel(0usize);
320        let (notif_connected_count_tx, notif_connected_count_rx) = watch::channel(0usize);
321
322        // Save persistent peers for auto-reconnect
323        let persistent_peers: HashMap<ValidatorId, PeerId> = peer_map.validator_to_peer.clone();
324
325        let validator_ids_ordered: Vec<ValidatorId> =
326            initial_validators.iter().map(|(vid, _)| *vid).collect();
327        let validator_keys: HashMap<ValidatorId, hotmint_types::crypto::PublicKey> =
328            initial_validators.into_iter().collect();
329
330        Ok(NetworkServiceHandles {
331            service: Self {
332                litep2p,
333                notif_handle,
334                mempool_notif_handle,
335                reqresp_handle,
336                sync_handle,
337                pex_handle,
338                peer_map,
339                peer_book,
340                pex_config,
341                persistent_peers,
342                initial_dial_addresses,
343                relay_consensus,
344                validator_keys,
345                validator_ids_ordered,
346                chain_id_hash,
347                current_epoch: EpochNumber::GENESIS,
348                msg_tx,
349                cmd_rx,
350                sync_req_tx,
351                sync_resp_tx,
352                peer_info_tx,
353                connected_count_tx,
354                notif_connected_peers: HashSet::new(),
355                notif_connected_count_tx,
356                connected_peers: HashSet::new(),
357                seen_active: HashSet::new(),
358                seen_backup: HashSet::new(),
359                epoch_rx,
360                pex_rate_limit: HashMap::new(),
361                mempool_tx_tx,
362                mempool_seen_active: HashSet::new(),
363                mempool_seen_backup: HashSet::new(),
364                mempool_notif_connected_peers: HashSet::new(),
365                mempool_peer_rate: HashMap::new(),
366            },
367            sink,
368            msg_rx,
369            sync_req_rx,
370            sync_resp_rx,
371            peer_info_rx,
372            connected_count_rx,
373            notif_connected_count_rx,
374            mempool_tx_rx,
375        })
376    }
377
378    pub fn local_peer_id(&self) -> &PeerId {
379        self.litep2p.local_peer_id()
380    }
381
382    /// Run the network event loop
383    pub async fn run(mut self) {
384        // Dial all persistent peers at startup using full multiaddrs.
385        for addr in mem::take(&mut self.initial_dial_addresses) {
386            if let Err(e) = self.litep2p.dial_address(addr.clone()).await {
387                debug!(address = %addr, error = ?e, "initial dial failed (will retry)");
388            }
389        }
390
391        let mut maintenance_interval =
392            tokio::time::interval(tokio::time::Duration::from_secs(MAINTENANCE_INTERVAL_SECS));
393        let mut pex_interval = tokio::time::interval(tokio::time::Duration::from_secs(
394            self.pex_config.request_interval_secs,
395        ));
396        loop {
397            tokio::select! {
398                event = self.notif_handle.next() => {
399                    if let Some(event) = event {
400                        self.handle_notification_event(event).await;
401                    }
402                }
403                event = self.mempool_notif_handle.next() => {
404                    if let Some(event) = event {
405                        self.handle_mempool_notification_event(event).await;
406                    }
407                }
408                event = self.reqresp_handle.next() => {
409                    if let Some(event) = event {
410                        self.handle_reqresp_event(event);
411                    }
412                }
413                event = self.sync_handle.next() => {
414                    if let Some(event) = event {
415                        self.handle_sync_event(event).await;
416                    }
417                }
418                event = self.pex_handle.next() => {
419                    if let Some(event) = event {
420                        self.handle_pex_event(event).await;
421                    }
422                }
423                event = self.litep2p.next_event() => {
424                    if let Some(event) = event {
425                        self.handle_litep2p_event(event).await;
426                    }
427                }
428                Some(cmd) = self.cmd_rx.recv() => {
429                    self.handle_command(cmd).await;
430                }
431                Ok(()) = self.epoch_rx.changed() => {
432                    let epoch_val = self.epoch_rx.borrow_and_update().clone();
433                    if let Some((epoch_number, validators)) = epoch_val {
434                        self.current_epoch = epoch_number;
435                        self.handle_epoch_change(validators).await;
436                    }
437                }
438                _ = maintenance_interval.tick() => {
439                    self.run_maintenance().await;
440                }
441                _ = pex_interval.tick() => {
442                    if self.pex_config.enabled {
443                        self.run_pex_round().await;
444                    }
445                }
446            }
447        }
448    }
449
450    async fn handle_notification_event(&mut self, event: NotificationEvent) {
451        match event {
452            NotificationEvent::ValidateSubstream {
453                peer, handshake, ..
454            } => {
455                // H-6: Verify handshake carries our chain_id_hash.
456                // Reject peers on a different chain or with empty handshake.
457                if handshake.as_slice() == self.chain_id_hash.as_slice() {
458                    self.notif_handle
459                        .send_validation_result(peer, ValidationResult::Accept);
460                } else {
461                    warn!(peer = %peer, "rejecting peer: chain_id_hash handshake mismatch");
462                    self.notif_handle
463                        .send_validation_result(peer, ValidationResult::Reject);
464                }
465            }
466            NotificationEvent::NotificationStreamOpened { peer, .. } => {
467                info!(peer = %peer, "notification stream opened");
468                self.notif_connected_peers.insert(peer);
469                let _ = self
470                    .notif_connected_count_tx
471                    .send(self.notif_connected_peers.len());
472            }
473            NotificationEvent::NotificationStreamClosed { peer } => {
474                debug!(peer = %peer, "notification stream closed");
475                self.notif_connected_peers.remove(&peer);
476                let _ = self
477                    .notif_connected_count_tx
478                    .send(self.notif_connected_peers.len());
479            }
480            NotificationEvent::NotificationReceived { peer, notification } => {
481                // Determine the sender ValidatorId (None if peer is not a known validator)
482                let sender: Option<ValidatorId> =
483                    self.peer_map.peer_to_validator.get(&peer).copied();
484
485                match codec::decode::<ConsensusMessage>(&notification) {
486                    Ok(msg) => {
487                        // F-08: Only deliver to consensus if sender is a known validator.
488                        if sender.is_some()
489                            && let Err(e) = self.msg_tx.try_send((sender, msg.clone()))
490                        {
491                            warn!("consensus message dropped (notification): {e}");
492                        }
493
494                        // Relay: re-broadcast to other connected peers (with two-set dedup).
495                        // Only relay from known validators whose individual message signature
496                        // is valid, preventing unknown peers from using this node as a DoS
497                        // amplifier.
498                        if self.relay_consensus
499                            && let Some(sid) = sender
500                            && hotmint_consensus::engine::verify_relay_sender(
501                                sid,
502                                &msg,
503                                &self.validator_keys,
504                                &self.validator_ids_ordered,
505                                &self.chain_id_hash,
506                                self.current_epoch,
507                            )
508                        {
509                            let msg_hash = u64::from_le_bytes(
510                                blake3::hash(&notification).as_bytes()[..8]
511                                    .try_into()
512                                    .unwrap(),
513                            );
514
515                            // Check both sets to avoid re-relay across rotations
516                            if !self.seen_active.contains(&msg_hash)
517                                && !self.seen_backup.contains(&msg_hash)
518                            {
519                                self.seen_active.insert(msg_hash);
520                                let raw = notification.to_vec();
521                                for &other in &self.connected_peers {
522                                    if other != peer {
523                                        let _ = self
524                                            .notif_handle
525                                            .send_sync_notification(other, raw.clone());
526                                    }
527                                }
528                                // Rotate: move active→backup, clear old backup
529                                if self.seen_active.len() > 10_000 {
530                                    self.seen_backup = mem::take(&mut self.seen_active);
531                                }
532                            }
533                            // F-31: Reward peer for a valid relayed consensus message.
534                            self.peer_book
535                                .write()
536                                .await
537                                .adjust_score(&peer.to_string(), 1);
538                        }
539                    }
540                    Err(e) => {
541                        warn!(error = %e, peer = %peer, "failed to decode notification");
542                        self.peer_book
543                            .write()
544                            .await
545                            .adjust_score(&peer.to_string(), -10);
546                    }
547                }
548            }
549            NotificationEvent::NotificationStreamOpenFailure { peer, error } => {
550                warn!(peer = %peer, error = ?error, "notification stream open failed");
551            }
552        }
553    }
554
555    fn handle_reqresp_event(&mut self, event: RequestResponseEvent) {
556        match event {
557            RequestResponseEvent::RequestReceived {
558                peer,
559                request_id,
560                request,
561                ..
562            } => {
563                let Some(sender) = self.peer_map.peer_to_validator.get(&peer).copied() else {
564                    warn!(peer = %peer, "dropping request from unknown peer");
565                    self.reqresp_handle.reject_request(request_id);
566                    return;
567                };
568                match codec::decode::<ConsensusMessage>(&request) {
569                    Ok(msg) => {
570                        if let Err(e) = self.msg_tx.try_send((Some(sender), msg)) {
571                            warn!("consensus message dropped (reqresp): {e}");
572                        }
573                        self.reqresp_handle.send_response(request_id, vec![]);
574                    }
575                    Err(e) => {
576                        warn!(error = %e, "failed to decode request");
577                        self.reqresp_handle.reject_request(request_id);
578                    }
579                }
580            }
581            RequestResponseEvent::ResponseReceived { .. } => {}
582            RequestResponseEvent::RequestFailed { peer, error, .. } => {
583                debug!(peer = %peer, error = ?error, "request failed");
584            }
585        }
586    }
587
588    async fn handle_sync_event(&mut self, event: RequestResponseEvent) {
589        match event {
590            RequestResponseEvent::RequestReceived {
591                peer,
592                request_id,
593                request,
594                ..
595            } => {
596                // Only serve sync requests from peers with an active notification
597                // stream (which implies they passed the chain_id handshake).
598                if !self.notif_connected_peers.contains(&peer) {
599                    warn!(peer = %peer, "rejecting sync request: no notification stream (chain isolation)");
600                    self.sync_handle.reject_request(request_id);
601                    return;
602                }
603                match codec::decode::<SyncRequest>(&request) {
604                    Ok(req) => {
605                        if let Err(e) = self.sync_req_tx.try_send(IncomingSyncRequest {
606                            request_id,
607                            peer,
608                            request: req,
609                        }) {
610                            warn!("sync request dropped: {e}");
611                        }
612                    }
613                    Err(e) => {
614                        warn!(error = %e, peer = %peer, "failed to decode sync request");
615                        self.peer_book
616                            .write()
617                            .await
618                            .adjust_score(&peer.to_string(), -5);
619                        let err_resp = SyncResponse::Error(format!("decode error: {e}"));
620                        if let Ok(bytes) = codec::encode(&err_resp) {
621                            self.sync_handle.send_response(request_id, bytes);
622                        } else {
623                            self.sync_handle.reject_request(request_id);
624                        }
625                    }
626                }
627            }
628            RequestResponseEvent::ResponseReceived {
629                request_id: _,
630                response,
631                ..
632            } => {
633                // Forward sync response to the sync requester
634                match codec::decode::<SyncResponse>(&response) {
635                    Ok(resp) => {
636                        if let Err(e) = self.sync_resp_tx.try_send(resp) {
637                            warn!("sync response dropped: {e}");
638                        }
639                    }
640                    Err(e) => {
641                        warn!(error = %e, "failed to decode sync response");
642                    }
643                }
644            }
645            RequestResponseEvent::RequestFailed { peer, error, .. } => {
646                debug!(peer = %peer, error = ?error, "sync request failed");
647                if let Err(e) = self
648                    .sync_resp_tx
649                    .try_send(SyncResponse::Error(format!("request failed: {error:?}")))
650                {
651                    warn!("sync error response dropped: {e}");
652                }
653            }
654        }
655    }
656
657    async fn handle_pex_event(&mut self, event: RequestResponseEvent) {
658        match event {
659            RequestResponseEvent::RequestReceived {
660                peer,
661                request_id,
662                request,
663                ..
664            } => {
665                // P3: Only accept PEX from known peers (peers in peer_map or connected)
666                if !self.peer_map.peer_to_validator.contains_key(&peer)
667                    && !self.connected_peers.contains(&peer)
668                {
669                    warn!(peer = %peer, "rejecting PEX request from unknown peer");
670                    self.pex_handle.reject_request(request_id);
671                    return;
672                }
673                // F-09: Rate limit PEX requests per peer (max 1 every 10s).
674                let now = Instant::now();
675                if let Some(last) = self.pex_rate_limit.get(&peer)
676                    && now.duration_since(*last) < std::time::Duration::from_secs(10)
677                {
678                    self.pex_handle.reject_request(request_id);
679                    return;
680                }
681                self.pex_rate_limit.insert(peer, now);
682                match postcard::from_bytes::<PexRequest>(&request) {
683                    Ok(PexRequest::GetPeers) => {
684                        let book = self.peer_book.read().await;
685                        let private = &self.pex_config.private_peer_ids;
686                        let peers: Vec<PeerInfo> = book
687                            .get_random_peers(self.pex_config.max_peers_per_response)
688                            .into_iter()
689                            .filter(|p| p.peer_id != peer.to_string())
690                            // P4: exclude private peers from PEX responses
691                            .filter(|p| !private.contains(&p.peer_id))
692                            .cloned()
693                            .collect();
694                        let resp = PexResponse::Peers(peers);
695                        if let Ok(bytes) = postcard::to_allocvec(&resp) {
696                            self.pex_handle.send_response(request_id, bytes);
697                        }
698                    }
699                    Ok(PexRequest::Advertise {
700                        role,
701                        validator_id,
702                        addresses,
703                    }) => {
704                        // F-10: Reject if role is "validator" but validator_id is None.
705                        if role == PeerRole::Validator && validator_id.is_none() {
706                            warn!(peer = %peer, "PEX Advertise claims validator role without validator_id");
707                            self.pex_handle.reject_request(request_id);
708                            return;
709                        }
710                        // P2: If claiming validator_id, verify PeerId matches peer_map
711                        if let Some(vid) = validator_id
712                            && let Some(&expected_peer) =
713                                self.peer_map.validator_to_peer.get(&ValidatorId(vid))
714                            && expected_peer != peer
715                        {
716                            warn!(
717                                peer = %peer,
718                                claimed_vid = vid,
719                                "PEX Advertise validator_id mismatch, rejecting"
720                            );
721                            self.pex_handle.reject_request(request_id);
722                            return;
723                        }
724                        // F-10: Limit addresses to max 8 to prevent abuse.
725                        let addresses: Vec<_> = addresses.iter().take(8).cloned().collect();
726                        let mut info = PeerInfo::new(
727                            peer,
728                            role,
729                            addresses.iter().filter_map(|a| a.parse().ok()).collect(),
730                        );
731                        if let Some(vid) = validator_id {
732                            info = info.with_validator(ValidatorId(vid));
733                        }
734                        self.peer_book.write().await.add_peer(info);
735                        if let Ok(bytes) = postcard::to_allocvec(&PexResponse::Ack) {
736                            self.pex_handle.send_response(request_id, bytes);
737                        }
738                    }
739                    Err(e) => {
740                        warn!(error = %e, "failed to decode PEX request");
741                        self.pex_handle.reject_request(request_id);
742                    }
743                }
744            }
745            RequestResponseEvent::ResponseReceived { response, .. } => {
746                if let Ok(PexResponse::Peers(peers)) = postcard::from_bytes(&response) {
747                    let mut book = self.peer_book.write().await;
748                    for peer in peers {
749                        if !peer.is_banned() {
750                            book.add_peer(peer);
751                        }
752                    }
753                }
754            }
755            // F-27: Log unhandled PEX events instead of silently dropping.
756            other => {
757                trace!("unhandled PEX event: {other:?}");
758            }
759        }
760    }
761
762    /// Periodic connection maintenance: reconnect persistent peers, save peer book.
763    async fn run_maintenance(&mut self) {
764        // 1. Reconnect disconnected persistent peers.
765        // Collect addresses outside the lock scope to avoid holding it across await.
766        let to_dial: Vec<(PeerId, Vec<Multiaddr>)> = {
767            let book = self.peer_book.read().await;
768            self.persistent_peers
769                .values()
770                .filter(|pid| !self.connected_peers.contains(pid))
771                .filter_map(|&pid| {
772                    book.get(&pid.to_string()).map(|info| {
773                        let addrs: Vec<Multiaddr> = info
774                            .addresses
775                            .iter()
776                            .filter_map(|a| a.parse().ok())
777                            .collect();
778                        (pid, addrs)
779                    })
780                })
781                .collect()
782        };
783
784        for (pid, addrs) in to_dial {
785            if !addrs.is_empty() {
786                self.litep2p
787                    .add_known_address(pid, addrs.clone().into_iter());
788                // F-07: Also actively dial the first address to trigger reconnection.
789                let mut dial_addr = addrs[0].clone();
790                dial_addr.push(litep2p::types::multiaddr::Protocol::P2p(pid.into()));
791                if let Err(e) = self.litep2p.dial_address(dial_addr).await {
792                    debug!(peer = %pid, error = ?e, "persistent peer redial failed");
793                }
794            }
795        }
796
797        // 2. Try to connect to peers from book if under target
798        let max = self.pex_config.max_peers;
799        if self.connected_peers.len() < max * 4 / 5 {
800            let book = self.peer_book.read().await;
801            let candidates = book.get_random_peers(5);
802            for peer in candidates {
803                if let Ok(pid) = peer.peer_id.parse::<PeerId>()
804                    && !self.connected_peers.contains(&pid)
805                {
806                    let addrs: Vec<Multiaddr> = peer
807                        .addresses
808                        .iter()
809                        .filter_map(|a| a.parse().ok())
810                        .collect();
811                    if !addrs.is_empty() {
812                        self.litep2p.add_known_address(pid, addrs.into_iter());
813                    }
814                }
815            }
816        }
817
818        // 3. Prune stale peers (older than 24 hours) and persist
819        self.peer_book.write().await.prune_stale(86400);
820        if let Err(e) = self.peer_book.read().await.save() {
821            warn!(%e, "failed to save peer book");
822        }
823
824        // 4. Prune stale PEX rate-limit entries (disconnected peers older than 60s).
825        let pex_cutoff = Instant::now() - std::time::Duration::from_secs(60);
826        self.pex_rate_limit.retain(|_, last| *last > pex_cutoff);
827    }
828
829    /// Send a PEX GetPeers request to a random connected peer.
830    async fn run_pex_round(&mut self) {
831        if self.connected_peers.is_empty() {
832            return;
833        }
834        // Pick a random connected peer
835        let peers: Vec<PeerId> = self.connected_peers.iter().copied().collect();
836        let idx = rand::random::<usize>() % peers.len();
837        let target = peers[idx];
838
839        if let Ok(bytes) = postcard::to_allocvec(&PexRequest::GetPeers) {
840            let _ = self
841                .pex_handle
842                .send_request(target, bytes, DialOptions::Reject)
843                .await;
844        }
845    }
846
847    /// C-1: Pick a non-validator connected peer to evict, giving validators
848    /// priority access to connection slots.
849    fn pick_non_validator_to_evict(&self) -> Option<PeerId> {
850        self.connected_peers
851            .iter()
852            .find(|p| !self.peer_map.peer_to_validator.contains_key(p))
853            .copied()
854    }
855
856    async fn handle_litep2p_event(&mut self, event: Litep2pEvent) {
857        match event {
858            Litep2pEvent::ConnectionEstablished { peer, endpoint } => {
859                // Enforce connection limit, but always admit validators.
860                // This prevents Eclipse attacks where an adversary fills all
861                // connection slots with Sybil nodes to isolate validators.
862                let is_validator = self.peer_map.peer_to_validator.contains_key(&peer);
863                if !is_validator && self.connected_peers.len() >= self.pex_config.max_peers {
864                    warn!(
865                        peer = %peer,
866                        total = self.connected_peers.len(),
867                        max = self.pex_config.max_peers,
868                        "connection limit reached, ignoring non-validator peer"
869                    );
870                    return;
871                }
872
873                // C-1: Proactive eviction — if a validator needs a slot and we're at
874                // the limit, evict the oldest non-validator connection to make room.
875                if is_validator
876                    && self.connected_peers.len() >= self.pex_config.max_peers
877                    && let Some(evict_peer) = self.pick_non_validator_to_evict()
878                {
879                    warn!(
880                        evict = %evict_peer,
881                        new_validator = %peer,
882                        "evicting non-validator peer to make room for validator"
883                    );
884                    self.connected_peers.remove(&evict_peer);
885                    self.notif_connected_peers.remove(&evict_peer);
886                    // litep2p doesn't expose disconnect — closing the notification
887                    // substream will cause the peer to be cleaned up eventually.
888                }
889
890                info!(peer = %peer, endpoint = ?endpoint, "connection established");
891                self.connected_peers.insert(peer);
892                let _ = self.connected_count_tx.send(self.connected_peers.len());
893
894                // Open notification substream to this peer for consensus messages.
895                if let Err(e) = self.notif_handle.try_open_substream_batch(iter::once(peer)) {
896                    debug!(peer = %peer, error = ?e, "failed to open notification substream");
897                }
898
899                // Update last_seen in peer book
900                if let Some(info) = self.peer_book.write().await.get_mut(&peer.to_string()) {
901                    info.touch();
902                }
903            }
904            Litep2pEvent::ConnectionClosed { peer, .. } => {
905                debug!(peer = %peer, "connection closed");
906                self.connected_peers.remove(&peer);
907                let _ = self.connected_count_tx.send(self.connected_peers.len());
908                // Eagerly remove from notif set; NotificationStreamClosed may
909                // arrive late or not at all when the TCP connection drops first.
910                if self.notif_connected_peers.remove(&peer) {
911                    let _ = self
912                        .notif_connected_count_tx
913                        .send(self.notif_connected_peers.len());
914                }
915            }
916            Litep2pEvent::DialFailure { address, error, .. } => {
917                warn!(address = %address, error = ?error, "dial failed");
918            }
919            // F-27: Log unhandled litep2p events instead of silently dropping.
920            other => {
921                trace!(?other, "unhandled litep2p event");
922            }
923        }
924    }
925
926    fn update_peer_info(&self) {
927        let peers: Vec<PeerStatus> = self
928            .peer_map
929            .validator_to_peer
930            .iter()
931            .map(|(&vid, pid)| PeerStatus {
932                validator_id: vid,
933                peer_id: pid.to_string(),
934            })
935            .collect();
936        let _ = self.peer_info_tx.send(peers);
937    }
938
939    async fn handle_command(&mut self, cmd: NetCommand) {
940        match cmd {
941            NetCommand::Broadcast(bytes) => {
942                // Broadcast to peers with open notification substreams only.
943                for &peer in &self.notif_connected_peers {
944                    let _ = self
945                        .notif_handle
946                        .send_sync_notification(peer, bytes.clone());
947                }
948            }
949            NetCommand::SendTo(target, bytes) => {
950                if let Some(&peer_id) = self.peer_map.validator_to_peer.get(&target) {
951                    let _ = self
952                        .reqresp_handle
953                        .send_request(peer_id, bytes, DialOptions::Reject)
954                        .await;
955                }
956            }
957            NetCommand::AddPeer(vid, pid, addrs) => {
958                info!(validator = %vid, peer = %pid, "adding peer");
959                self.peer_map.insert(vid, pid);
960                self.litep2p.add_known_address(pid, addrs.into_iter());
961                self.update_peer_info();
962            }
963            NetCommand::RemovePeer(vid) => {
964                if let Some(pid) = self.peer_map.remove(vid) {
965                    info!(validator = %vid, peer = %pid, "removed peer");
966                } else {
967                    warn!(validator = %vid, "peer not found for removal");
968                }
969                self.update_peer_info();
970            }
971            NetCommand::SyncRequest(peer_id, bytes) => {
972                let _ = self
973                    .sync_handle
974                    .send_request(peer_id, bytes, DialOptions::Reject)
975                    .await;
976            }
977            NetCommand::SyncRespond(request_id, bytes) => {
978                self.sync_handle.send_response(request_id, bytes);
979            }
980            NetCommand::EpochChange(validators) => {
981                self.handle_epoch_change(validators).await;
982            }
983            NetCommand::BroadcastTx(bytes) => {
984                for &peer in &self.mempool_notif_connected_peers {
985                    let _ = self
986                        .mempool_notif_handle
987                        .send_sync_notification(peer, bytes.clone());
988                }
989            }
990        }
991    }
992
993    async fn handle_mempool_notification_event(&mut self, event: NotificationEvent) {
994        match event {
995            NotificationEvent::ValidateSubstream { peer, .. } => {
996                // Auto-accept is true, but handle explicitly if called.
997                self.mempool_notif_handle
998                    .send_validation_result(peer, ValidationResult::Accept);
999            }
1000            NotificationEvent::NotificationStreamOpened { peer, .. } => {
1001                trace!(peer = %peer, "mempool notif stream opened");
1002                self.mempool_notif_connected_peers.insert(peer);
1003            }
1004            NotificationEvent::NotificationStreamClosed { peer } => {
1005                trace!(peer = %peer, "mempool notif stream closed");
1006                self.mempool_notif_connected_peers.remove(&peer);
1007                self.mempool_peer_rate.remove(&peer);
1008            }
1009            NotificationEvent::NotificationReceived { peer, notification } => {
1010                // C-2: Per-peer tx gossip rate limit — max 500 tx/sec per peer.
1011                const MAX_TX_PER_SEC: u32 = 500;
1012                let now = Instant::now();
1013                let entry = self.mempool_peer_rate.entry(peer).or_insert((now, 0));
1014                if now.duration_since(entry.0).as_secs() >= 1 {
1015                    *entry = (now, 1);
1016                } else {
1017                    entry.1 += 1;
1018                    if entry.1 > MAX_TX_PER_SEC {
1019                        return;
1020                    }
1021                }
1022
1023                let hash = blake3::hash(&notification);
1024                let hash_bytes: [u8; 32] = *hash.as_bytes();
1025                if self.mempool_seen_active.contains(&hash_bytes)
1026                    || self.mempool_seen_backup.contains(&hash_bytes)
1027                {
1028                    return;
1029                }
1030                self.mempool_seen_active.insert(hash_bytes);
1031                // Two-set rotation: swap active→backup when full, preserving
1032                // recent history (same approach as relay dedup).
1033                if self.mempool_seen_active.len() > 100_000 {
1034                    std::mem::swap(&mut self.mempool_seen_active, &mut self.mempool_seen_backup);
1035                    self.mempool_seen_active.clear();
1036                }
1037                let _ = self.mempool_tx_tx.try_send(notification.freeze().to_vec());
1038            }
1039            NotificationEvent::NotificationStreamOpenFailure { peer, error } => {
1040                debug!(peer = %peer, ?error, "mempool notif stream open failed");
1041            }
1042        }
1043    }
1044
1045    async fn handle_epoch_change(
1046        &mut self,
1047        validators: Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>,
1048    ) {
1049        // Rebuild peer_map entries for new validators using PeerBook
1050        for (vid, pubkey) in &validators {
1051            if self.peer_map.validator_to_peer.contains_key(vid) {
1052                continue;
1053            }
1054            // Try to find PeerId from PeerBook by looking up the public key
1055            let pk_bytes = &pubkey.0;
1056            if let Ok(lpk) = litep2p::crypto::ed25519::PublicKey::try_from_bytes(pk_bytes) {
1057                let peer_id = lpk.to_peer_id();
1058                info!(validator = %vid, peer = %peer_id, "adding new epoch validator to peer_map");
1059                self.peer_map.insert(*vid, peer_id);
1060            }
1061        }
1062        // Remove validators no longer in the set
1063        let new_ids: HashSet<ValidatorId> = validators.iter().map(|(vid, _)| *vid).collect();
1064        let to_remove: Vec<ValidatorId> = self
1065            .peer_map
1066            .validator_to_peer
1067            .keys()
1068            .filter(|vid| !new_ids.contains(vid))
1069            .copied()
1070            .collect();
1071        for vid in to_remove {
1072            info!(validator = %vid, "removing validator from peer_map after epoch change");
1073            self.peer_map.remove(vid);
1074        }
1075        // Update relay verification key map to match new epoch
1076        self.validator_ids_ordered = validators.iter().map(|(vid, _)| *vid).collect();
1077        self.validator_keys = validators.into_iter().collect();
1078        // F-28: Rebuild persistent_peers from updated peer_map
1079        self.persistent_peers = self.peer_map.validator_to_peer.clone();
1080        self.update_peer_info();
1081    }
1082}
1083
1084/// NetworkSink backed by litep2p, for use by the consensus engine.
1085/// Also provides methods for peer management and sync.
1086#[derive(Clone)]
1087pub struct Litep2pNetworkSink {
1088    cmd_tx: mpsc::Sender<NetCommand>,
1089    epoch_tx: watch::Sender<EpochUpdate>,
1090}
1091
1092impl Litep2pNetworkSink {
1093    pub fn add_peer(&self, vid: ValidatorId, pid: PeerId, addrs: Vec<Multiaddr>) {
1094        if let Err(e) = self.cmd_tx.try_send(NetCommand::AddPeer(vid, pid, addrs)) {
1095            warn!("add_peer cmd dropped: {e}");
1096        }
1097    }
1098
1099    pub fn remove_peer(&self, vid: ValidatorId) {
1100        if let Err(e) = self.cmd_tx.try_send(NetCommand::RemovePeer(vid)) {
1101            warn!("remove_peer cmd dropped: {e}");
1102        }
1103    }
1104
1105    pub fn send_sync_request(&self, peer_id: PeerId, request: &SyncRequest) {
1106        match codec::encode(request) {
1107            Ok(bytes) => {
1108                if let Err(e) = self
1109                    .cmd_tx
1110                    .try_send(NetCommand::SyncRequest(peer_id, bytes))
1111                {
1112                    warn!("sync request cmd dropped: {e}");
1113                }
1114            }
1115            Err(e) => warn!("sync request encode failed: {e}"),
1116        }
1117    }
1118
1119    pub fn send_sync_response(&self, request_id: RequestId, response: &SyncResponse) {
1120        match codec::encode(response) {
1121            Ok(bytes) => {
1122                if let Err(e) = self
1123                    .cmd_tx
1124                    .try_send(NetCommand::SyncRespond(request_id, bytes))
1125                {
1126                    warn!("sync response cmd dropped: {e}");
1127                }
1128            }
1129            Err(e) => warn!("sync response encode failed: {e}"),
1130        }
1131    }
1132
1133    /// Broadcast a raw transaction to all connected peers via the mempool gossip protocol.
1134    pub fn broadcast_tx(&self, tx_bytes: Vec<u8>) {
1135        if let Err(e) = self.cmd_tx.try_send(NetCommand::BroadcastTx(tx_bytes)) {
1136            warn!("broadcast_tx cmd dropped: {e}");
1137        }
1138    }
1139}
1140
1141impl NetworkSink for Litep2pNetworkSink {
1142    fn broadcast(&self, msg: ConsensusMessage) {
1143        match codec::encode(&msg) {
1144            Ok(bytes) => {
1145                if let Err(e) = self.cmd_tx.try_send(NetCommand::Broadcast(bytes)) {
1146                    warn!("broadcast cmd dropped: {e}");
1147                }
1148            }
1149            Err(e) => warn!("broadcast encode failed: {e}"),
1150        }
1151    }
1152
1153    fn send_to(&self, target: ValidatorId, msg: ConsensusMessage) {
1154        match codec::encode(&msg) {
1155            Ok(bytes) => {
1156                if let Err(e) = self.cmd_tx.try_send(NetCommand::SendTo(target, bytes)) {
1157                    warn!("send_to cmd dropped for {target}: {e}");
1158                }
1159            }
1160            Err(e) => warn!("send_to encode failed for {target}: {e}"),
1161        }
1162    }
1163
1164    fn on_epoch_change(&self, epoch: EpochNumber, new_validator_set: &hotmint_types::ValidatorSet) {
1165        let validators: Vec<_> = new_validator_set
1166            .validators()
1167            .iter()
1168            .map(|v| (v.id, v.public_key.clone()))
1169            .collect();
1170        // F-02: Use dedicated watch channel so epoch changes are never dropped.
1171        let _ = self.epoch_tx.send(Some((epoch, validators)));
1172    }
1173
1174    fn broadcast_evidence(&self, proof: &hotmint_types::evidence::EquivocationProof) {
1175        let msg = ConsensusMessage::Evidence(proof.clone());
1176        match codec::encode(&msg) {
1177            Ok(bytes) => {
1178                if let Err(e) = self.cmd_tx.try_send(NetCommand::Broadcast(bytes)) {
1179                    warn!("broadcast_evidence cmd dropped: {e}");
1180                }
1181            }
1182            Err(e) => warn!("broadcast_evidence encode failed: {e}"),
1183        }
1184    }
1185}