Skip to main content

hotmint_network/
service.rs

1use ruc::*;
2
3use std::collections::HashMap;
4
5use futures::StreamExt;
6use hotmint_consensus::network::NetworkSink;
7use hotmint_types::sync::{SyncRequest, SyncResponse};
8use hotmint_types::{ConsensusMessage, ValidatorId};
9use litep2p::config::ConfigBuilder;
10use litep2p::protocol::notification::{
11    ConfigBuilder as NotifConfigBuilder, NotificationEvent, NotificationHandle, ValidationResult,
12};
13use litep2p::protocol::request_response::{
14    ConfigBuilder as ReqRespConfigBuilder, DialOptions, RequestResponseEvent, RequestResponseHandle,
15};
16use litep2p::transport::tcp::config::Config as TcpConfig;
17use litep2p::types::RequestId;
18use litep2p::types::multiaddr::Multiaddr;
19use litep2p::{Litep2p, Litep2pEvent, PeerId};
20use serde::{Deserialize, Serialize};
21use tokio::sync::{mpsc, watch};
22use tracing::{debug, info, warn};
23
24use std::sync::{Arc, RwLock};
25
26use crate::peer::{PeerBook, PeerInfo};
27use crate::pex::{PexConfig, PexRequest, PexResponse};
28
29const NOTIF_PROTOCOL: &str = "/hotmint/consensus/notif/1";
30const REQ_RESP_PROTOCOL: &str = "/hotmint/consensus/reqresp/1";
31const SYNC_PROTOCOL: &str = "/hotmint/sync/1";
32const PEX_PROTOCOL: &str = "/hotmint/pex/1";
33const MAX_NOTIFICATION_SIZE: usize = 16 * 1024 * 1024;
34const MAINTENANCE_INTERVAL_SECS: u64 = 10;
35
36/// Maps ValidatorId <-> PeerId for routing
37#[derive(Clone)]
38pub struct PeerMap {
39    pub validator_to_peer: HashMap<ValidatorId, PeerId>,
40    pub peer_to_validator: HashMap<PeerId, ValidatorId>,
41}
42
43impl PeerMap {
44    pub fn new() -> Self {
45        Self {
46            validator_to_peer: HashMap::new(),
47            peer_to_validator: HashMap::new(),
48        }
49    }
50
51    pub fn insert(&mut self, vid: ValidatorId, pid: PeerId) {
52        self.validator_to_peer.insert(vid, pid);
53        self.peer_to_validator.insert(pid, vid);
54    }
55
56    pub fn remove(&mut self, vid: ValidatorId) -> Option<PeerId> {
57        if let Some(pid) = self.validator_to_peer.remove(&vid) {
58            self.peer_to_validator.remove(&pid);
59            Some(pid)
60        } else {
61            None
62        }
63    }
64}
65
66impl Default for PeerMap {
67    fn default() -> Self {
68        Self::new()
69    }
70}
71
72/// Status of a peer for external queries
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct PeerStatus {
75    pub validator_id: ValidatorId,
76    pub peer_id: String,
77}
78
79/// Commands sent from the NetworkSink to the NetworkService
80pub enum NetCommand {
81    Broadcast(Vec<u8>),
82    SendTo(ValidatorId, Vec<u8>),
83    AddPeer(ValidatorId, PeerId, Vec<Multiaddr>),
84    RemovePeer(ValidatorId),
85    /// Send a sync request to a specific peer
86    SyncRequest(PeerId, Vec<u8>),
87    /// Respond to a sync request
88    SyncRespond(RequestId, Vec<u8>),
89    /// Update peer_map from new validator set (epoch transition)
90    EpochChange(Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>),
91}
92
93/// Incoming sync request forwarded to the sync responder
94pub struct IncomingSyncRequest {
95    pub request_id: RequestId,
96    pub peer: PeerId,
97    pub request: SyncRequest,
98}
99
100/// All handles returned by [`NetworkService::create`].
101pub struct NetworkServiceHandles {
102    pub service: NetworkService,
103    pub sink: Litep2pNetworkSink,
104    pub msg_rx: mpsc::Receiver<(ValidatorId, ConsensusMessage)>,
105    pub sync_req_rx: mpsc::Receiver<IncomingSyncRequest>,
106    pub sync_resp_rx: mpsc::Receiver<SyncResponse>,
107    pub peer_info_rx: watch::Receiver<Vec<PeerStatus>>,
108    pub connected_count_rx: watch::Receiver<usize>,
109}
110
111/// NetworkService wraps litep2p and provides consensus-level networking
112pub struct NetworkService {
113    litep2p: Litep2p,
114    notif_handle: NotificationHandle,
115    reqresp_handle: RequestResponseHandle,
116    sync_handle: RequestResponseHandle,
117    pex_handle: RequestResponseHandle,
118    peer_map: PeerMap,
119    peer_book: Arc<RwLock<PeerBook>>,
120    pex_config: PexConfig,
121    persistent_peers: HashMap<ValidatorId, PeerId>,
122    msg_tx: mpsc::Sender<(ValidatorId, ConsensusMessage)>,
123    cmd_rx: mpsc::Receiver<NetCommand>,
124    sync_req_tx: mpsc::Sender<IncomingSyncRequest>,
125    sync_resp_tx: mpsc::Sender<SyncResponse>,
126    peer_info_tx: watch::Sender<Vec<PeerStatus>>,
127    connected_count_tx: watch::Sender<usize>,
128    connected_peers: std::collections::HashSet<PeerId>,
129}
130
131impl NetworkService {
132    /// Create the network service and all handles for the consensus engine.
133    pub fn create(
134        listen_addr: Multiaddr,
135        peer_map: PeerMap,
136        known_addresses: Vec<(PeerId, Vec<Multiaddr>)>,
137        keypair: Option<litep2p::crypto::ed25519::Keypair>,
138        peer_book: Arc<RwLock<PeerBook>>,
139        pex_config: PexConfig,
140    ) -> Result<NetworkServiceHandles> {
141        let (notif_config, notif_handle) = NotifConfigBuilder::new(NOTIF_PROTOCOL.into())
142            .with_max_size(MAX_NOTIFICATION_SIZE)
143            .with_handshake(vec![])
144            .with_auto_accept_inbound(true)
145            .with_sync_channel_size(1024)
146            .with_async_channel_size(1024)
147            .build();
148
149        let (reqresp_config, reqresp_handle) = ReqRespConfigBuilder::new(REQ_RESP_PROTOCOL.into())
150            .with_max_size(MAX_NOTIFICATION_SIZE)
151            .build();
152
153        let (sync_config, sync_handle) = ReqRespConfigBuilder::new(SYNC_PROTOCOL.into())
154            .with_max_size(MAX_NOTIFICATION_SIZE)
155            .build();
156
157        let (pex_config_proto, pex_handle) = ReqRespConfigBuilder::new(PEX_PROTOCOL.into())
158            .with_max_size(1024 * 1024) // 1MB for peer lists
159            .build();
160
161        let mut config_builder = ConfigBuilder::new()
162            .with_tcp(TcpConfig {
163                listen_addresses: vec![listen_addr],
164                ..Default::default()
165            })
166            .with_notification_protocol(notif_config)
167            .with_request_response_protocol(reqresp_config)
168            .with_request_response_protocol(sync_config)
169            .with_request_response_protocol(pex_config_proto);
170
171        if let Some(kp) = keypair {
172            config_builder = config_builder.with_keypair(kp);
173        }
174
175        if !known_addresses.is_empty() {
176            config_builder = config_builder.with_known_addresses(known_addresses.into_iter());
177        }
178
179        let litep2p =
180            Litep2p::new(config_builder.build()).c(d!("failed to create litep2p instance"))?;
181
182        info!(peer_id = %litep2p.local_peer_id(), "litep2p started");
183        for addr in litep2p.listen_addresses() {
184            info!(address = %addr, "listening on");
185        }
186
187        let (msg_tx, msg_rx) = mpsc::channel(8192);
188        let (cmd_tx, cmd_rx) = mpsc::channel(4096);
189        let (sync_req_tx, sync_req_rx) = mpsc::channel(256);
190        let (sync_resp_tx, sync_resp_rx) = mpsc::channel(256);
191
192        // Build initial peer info
193        let initial_peers: Vec<PeerStatus> = peer_map
194            .validator_to_peer
195            .iter()
196            .map(|(&vid, pid)| PeerStatus {
197                validator_id: vid,
198                peer_id: pid.to_string(),
199            })
200            .collect();
201        let (peer_info_tx, peer_info_rx) = watch::channel(initial_peers);
202
203        let sink = Litep2pNetworkSink {
204            cmd_tx: cmd_tx.clone(),
205        };
206
207        let (connected_count_tx, connected_count_rx) = watch::channel(0usize);
208
209        // Save persistent peers for auto-reconnect
210        let persistent_peers: HashMap<ValidatorId, PeerId> = peer_map.validator_to_peer.clone();
211
212        Ok(NetworkServiceHandles {
213            service: Self {
214                litep2p,
215                notif_handle,
216                reqresp_handle,
217                sync_handle,
218                pex_handle,
219                peer_map,
220                peer_book,
221                pex_config,
222                persistent_peers,
223                msg_tx,
224                cmd_rx,
225                sync_req_tx,
226                sync_resp_tx,
227                peer_info_tx,
228                connected_count_tx,
229                connected_peers: std::collections::HashSet::new(),
230            },
231            sink,
232            msg_rx,
233            sync_req_rx,
234            sync_resp_rx,
235            peer_info_rx,
236            connected_count_rx,
237        })
238    }
239
240    pub fn local_peer_id(&self) -> &PeerId {
241        self.litep2p.local_peer_id()
242    }
243
244    /// Run the network event loop
245    pub async fn run(mut self) {
246        let mut maintenance_interval =
247            tokio::time::interval(tokio::time::Duration::from_secs(MAINTENANCE_INTERVAL_SECS));
248        let mut pex_interval = tokio::time::interval(tokio::time::Duration::from_secs(
249            self.pex_config.request_interval_secs,
250        ));
251        loop {
252            tokio::select! {
253                event = self.notif_handle.next() => {
254                    if let Some(event) = event {
255                        self.handle_notification_event(event);
256                    }
257                }
258                event = self.reqresp_handle.next() => {
259                    if let Some(event) = event {
260                        self.handle_reqresp_event(event);
261                    }
262                }
263                event = self.sync_handle.next() => {
264                    if let Some(event) = event {
265                        self.handle_sync_event(event);
266                    }
267                }
268                event = self.pex_handle.next() => {
269                    if let Some(event) = event {
270                        self.handle_pex_event(event);
271                    }
272                }
273                event = self.litep2p.next_event() => {
274                    if let Some(event) = event {
275                        self.handle_litep2p_event(event);
276                    }
277                }
278                Some(cmd) = self.cmd_rx.recv() => {
279                    self.handle_command(cmd).await;
280                }
281                _ = maintenance_interval.tick() => {
282                    self.run_maintenance();
283                }
284                _ = pex_interval.tick() => {
285                    if self.pex_config.enabled {
286                        self.run_pex_round().await;
287                    }
288                }
289            }
290        }
291    }
292
293    fn handle_notification_event(&mut self, event: NotificationEvent) {
294        match event {
295            NotificationEvent::ValidateSubstream { peer, .. } => {
296                self.notif_handle
297                    .send_validation_result(peer, ValidationResult::Accept);
298            }
299            NotificationEvent::NotificationStreamOpened { peer, .. } => {
300                info!(peer = %peer, "notification stream opened");
301            }
302            NotificationEvent::NotificationStreamClosed { peer } => {
303                debug!(peer = %peer, "notification stream closed");
304            }
305            NotificationEvent::NotificationReceived { peer, notification } => {
306                let Some(sender) = self.peer_map.peer_to_validator.get(&peer).copied() else {
307                    warn!(peer = %peer, "dropping notification from unknown peer");
308                    return;
309                };
310                match serde_cbor_2::from_slice::<ConsensusMessage>(&notification) {
311                    Ok(msg) => {
312                        if let Err(e) = self.msg_tx.try_send((sender, msg)) {
313                            warn!("consensus message dropped (notification): {e}");
314                        }
315                    }
316                    Err(e) => {
317                        warn!(error = %e, peer = %peer, "failed to decode notification");
318                        self.peer_book
319                            .write()
320                            .unwrap()
321                            .adjust_score(&peer.to_string(), -10);
322                    }
323                }
324            }
325            NotificationEvent::NotificationStreamOpenFailure { peer, error } => {
326                warn!(peer = %peer, error = ?error, "notification stream open failed");
327            }
328        }
329    }
330
331    fn handle_reqresp_event(&mut self, event: RequestResponseEvent) {
332        match event {
333            RequestResponseEvent::RequestReceived {
334                peer,
335                request_id,
336                request,
337                ..
338            } => {
339                let Some(sender) = self.peer_map.peer_to_validator.get(&peer).copied() else {
340                    warn!(peer = %peer, "dropping request from unknown peer");
341                    self.reqresp_handle.reject_request(request_id);
342                    return;
343                };
344                match serde_cbor_2::from_slice::<ConsensusMessage>(&request) {
345                    Ok(msg) => {
346                        if let Err(e) = self.msg_tx.try_send((sender, msg)) {
347                            warn!("consensus message dropped (reqresp): {e}");
348                        }
349                        self.reqresp_handle.send_response(request_id, vec![]);
350                    }
351                    Err(e) => {
352                        warn!(error = %e, "failed to decode request");
353                        self.reqresp_handle.reject_request(request_id);
354                    }
355                }
356            }
357            RequestResponseEvent::ResponseReceived { .. } => {}
358            RequestResponseEvent::RequestFailed { peer, error, .. } => {
359                debug!(peer = %peer, error = ?error, "request failed");
360            }
361        }
362    }
363
364    fn handle_sync_event(&mut self, event: RequestResponseEvent) {
365        match event {
366            RequestResponseEvent::RequestReceived {
367                peer,
368                request_id,
369                request,
370                ..
371            } => {
372                if !self.peer_map.peer_to_validator.contains_key(&peer) {
373                    warn!(peer = %peer, "rejecting sync request from unknown peer");
374                    self.sync_handle.reject_request(request_id);
375                    return;
376                }
377                match serde_cbor_2::from_slice::<SyncRequest>(&request) {
378                    Ok(req) => {
379                        if let Err(e) = self.sync_req_tx.try_send(IncomingSyncRequest {
380                            request_id,
381                            peer,
382                            request: req,
383                        }) {
384                            warn!("sync request dropped: {e}");
385                        }
386                    }
387                    Err(e) => {
388                        warn!(error = %e, peer = %peer, "failed to decode sync request");
389                        self.peer_book
390                            .write()
391                            .unwrap()
392                            .adjust_score(&peer.to_string(), -5);
393                        let err_resp = SyncResponse::Error(format!("decode error: {e}"));
394                        if let Ok(bytes) = serde_cbor_2::to_vec(&err_resp) {
395                            self.sync_handle.send_response(request_id, bytes);
396                        } else {
397                            self.sync_handle.reject_request(request_id);
398                        }
399                    }
400                }
401            }
402            RequestResponseEvent::ResponseReceived {
403                request_id: _,
404                response,
405                ..
406            } => {
407                // Forward sync response to the sync requester
408                match serde_cbor_2::from_slice::<SyncResponse>(&response) {
409                    Ok(resp) => {
410                        if let Err(e) = self.sync_resp_tx.try_send(resp) {
411                            warn!("sync response dropped: {e}");
412                        }
413                    }
414                    Err(e) => {
415                        warn!(error = %e, "failed to decode sync response");
416                    }
417                }
418            }
419            RequestResponseEvent::RequestFailed { peer, error, .. } => {
420                debug!(peer = %peer, error = ?error, "sync request failed");
421                if let Err(e) = self
422                    .sync_resp_tx
423                    .try_send(SyncResponse::Error(format!("request failed: {error:?}")))
424                {
425                    warn!("sync error response dropped: {e}");
426                }
427            }
428        }
429    }
430
431    fn handle_pex_event(&mut self, event: RequestResponseEvent) {
432        match event {
433            RequestResponseEvent::RequestReceived {
434                peer,
435                request_id,
436                request,
437                ..
438            } => {
439                // P3: Only accept PEX from known peers (peers in peer_map or connected)
440                if !self.peer_map.peer_to_validator.contains_key(&peer)
441                    && !self.connected_peers.contains(&peer)
442                {
443                    warn!(peer = %peer, "rejecting PEX request from unknown peer");
444                    self.pex_handle.reject_request(request_id);
445                    return;
446                }
447                match serde_cbor_2::from_slice::<PexRequest>(&request) {
448                    Ok(PexRequest::GetPeers) => {
449                        let book = self.peer_book.read().unwrap();
450                        let private = &self.pex_config.private_peer_ids;
451                        let peers: Vec<PeerInfo> = book
452                            .get_random_peers(self.pex_config.max_peers_per_response)
453                            .into_iter()
454                            .filter(|p| p.peer_id != peer.to_string())
455                            // P4: exclude private peers from PEX responses
456                            .filter(|p| !private.contains(&p.peer_id))
457                            .cloned()
458                            .collect();
459                        let resp = PexResponse::Peers(peers);
460                        if let Ok(bytes) = serde_cbor_2::to_vec(&resp) {
461                            self.pex_handle.send_response(request_id, bytes);
462                        }
463                    }
464                    Ok(PexRequest::Advertise {
465                        role,
466                        validator_id,
467                        addresses,
468                    }) => {
469                        // P2: If claiming validator_id, verify PeerId matches peer_map
470                        if let Some(vid) = validator_id
471                            && let Some(&expected_peer) =
472                                self.peer_map.validator_to_peer.get(&ValidatorId(vid))
473                            && expected_peer != peer
474                        {
475                            warn!(
476                                peer = %peer,
477                                claimed_vid = vid,
478                                "PEX Advertise validator_id mismatch, rejecting"
479                            );
480                            self.pex_handle.reject_request(request_id);
481                            return;
482                        }
483                        let mut info = PeerInfo::new(
484                            peer,
485                            role,
486                            addresses.iter().filter_map(|a| a.parse().ok()).collect(),
487                        );
488                        if let Some(vid) = validator_id {
489                            info = info.with_validator(ValidatorId(vid));
490                        }
491                        self.peer_book.write().unwrap().add_peer(info);
492                        if let Ok(bytes) = serde_cbor_2::to_vec(&PexResponse::Ack) {
493                            self.pex_handle.send_response(request_id, bytes);
494                        }
495                    }
496                    Err(e) => {
497                        warn!(error = %e, "failed to decode PEX request");
498                        self.pex_handle.reject_request(request_id);
499                    }
500                }
501            }
502            RequestResponseEvent::ResponseReceived { response, .. } => {
503                if let Ok(PexResponse::Peers(peers)) = serde_cbor_2::from_slice(&response) {
504                    let mut book = self.peer_book.write().unwrap();
505                    for peer in peers {
506                        if !peer.is_banned() {
507                            book.add_peer(peer);
508                        }
509                    }
510                }
511            }
512            _ => {}
513        }
514    }
515
516    /// Periodic connection maintenance: reconnect persistent peers, save peer book.
517    fn run_maintenance(&mut self) {
518        // 1. Reconnect disconnected persistent peers
519        for (&_vid, &pid) in &self.persistent_peers {
520            if !self.connected_peers.contains(&pid)
521                && let Some(info) = self.peer_book.read().unwrap().get(&pid.to_string())
522            {
523                let addrs: Vec<Multiaddr> = info
524                    .addresses
525                    .iter()
526                    .filter_map(|a| a.parse().ok())
527                    .collect();
528                if !addrs.is_empty() {
529                    self.litep2p.add_known_address(pid, addrs.into_iter());
530                }
531            }
532        }
533
534        // 2. Try to connect to peers from book if under target
535        let max = self.pex_config.max_peers;
536        if self.connected_peers.len() < max * 4 / 5 {
537            let book = self.peer_book.read().unwrap();
538            let candidates = book.get_random_peers(5);
539            for peer in candidates {
540                if let Ok(pid) = peer.peer_id.parse::<PeerId>()
541                    && !self.connected_peers.contains(&pid)
542                {
543                    let addrs: Vec<Multiaddr> = peer
544                        .addresses
545                        .iter()
546                        .filter_map(|a| a.parse().ok())
547                        .collect();
548                    if !addrs.is_empty() {
549                        self.litep2p.add_known_address(pid, addrs.into_iter());
550                    }
551                }
552            }
553        }
554
555        // 3. Prune stale peers (older than 24 hours) and persist
556        self.peer_book.write().unwrap().prune_stale(86400);
557        if let Err(e) = self.peer_book.read().unwrap().save() {
558            warn!(%e, "failed to save peer book");
559        }
560    }
561
562    /// Send a PEX GetPeers request to a random connected peer.
563    async fn run_pex_round(&mut self) {
564        if self.connected_peers.is_empty() {
565            return;
566        }
567        // Pick a random connected peer
568        let peers: Vec<PeerId> = self.connected_peers.iter().copied().collect();
569        let idx = rand::random::<usize>() % peers.len();
570        let target = peers[idx];
571
572        if let Ok(bytes) = serde_cbor_2::to_vec(&PexRequest::GetPeers) {
573            let _ = self
574                .pex_handle
575                .send_request(target, bytes, DialOptions::Reject)
576                .await;
577        }
578    }
579
580    fn handle_litep2p_event(&mut self, event: Litep2pEvent) {
581        match event {
582            Litep2pEvent::ConnectionEstablished { peer, endpoint } => {
583                // Enforce total connection limit
584                if self.connected_peers.len() >= self.pex_config.max_peers {
585                    warn!(
586                        peer = %peer,
587                        total = self.connected_peers.len(),
588                        max = self.pex_config.max_peers,
589                        "connection limit reached, ignoring new peer"
590                    );
591                    return;
592                }
593
594                info!(peer = %peer, endpoint = ?endpoint, "connection established");
595                self.connected_peers.insert(peer);
596                let _ = self.connected_count_tx.send(self.connected_peers.len());
597                // Update last_seen in peer book
598                if let Some(info) = self.peer_book.write().unwrap().get_mut(&peer.to_string()) {
599                    info.touch();
600                }
601            }
602            Litep2pEvent::ConnectionClosed { peer, .. } => {
603                debug!(peer = %peer, "connection closed");
604                self.connected_peers.remove(&peer);
605                let _ = self.connected_count_tx.send(self.connected_peers.len());
606            }
607            Litep2pEvent::DialFailure { address, error, .. } => {
608                warn!(address = %address, error = ?error, "dial failed");
609            }
610            _ => {}
611        }
612    }
613
614    fn update_peer_info(&self) {
615        let peers: Vec<PeerStatus> = self
616            .peer_map
617            .validator_to_peer
618            .iter()
619            .map(|(&vid, pid)| PeerStatus {
620                validator_id: vid,
621                peer_id: pid.to_string(),
622            })
623            .collect();
624        let _ = self.peer_info_tx.send(peers);
625    }
626
627    async fn handle_command(&mut self, cmd: NetCommand) {
628        match cmd {
629            NetCommand::Broadcast(bytes) => {
630                for &peer in self.peer_map.peer_to_validator.keys() {
631                    let _ = self
632                        .notif_handle
633                        .send_sync_notification(peer, bytes.clone());
634                }
635            }
636            NetCommand::SendTo(target, bytes) => {
637                if let Some(&peer_id) = self.peer_map.validator_to_peer.get(&target) {
638                    let _ = self
639                        .reqresp_handle
640                        .send_request(peer_id, bytes, DialOptions::Reject)
641                        .await;
642                }
643            }
644            NetCommand::AddPeer(vid, pid, addrs) => {
645                info!(validator = %vid, peer = %pid, "adding peer");
646                self.peer_map.insert(vid, pid);
647                self.litep2p.add_known_address(pid, addrs.into_iter());
648                self.update_peer_info();
649            }
650            NetCommand::RemovePeer(vid) => {
651                if let Some(pid) = self.peer_map.remove(vid) {
652                    info!(validator = %vid, peer = %pid, "removed peer");
653                } else {
654                    warn!(validator = %vid, "peer not found for removal");
655                }
656                self.update_peer_info();
657            }
658            NetCommand::SyncRequest(peer_id, bytes) => {
659                let _ = self
660                    .sync_handle
661                    .send_request(peer_id, bytes, DialOptions::Reject)
662                    .await;
663            }
664            NetCommand::SyncRespond(request_id, bytes) => {
665                self.sync_handle.send_response(request_id, bytes);
666            }
667            NetCommand::EpochChange(validators) => {
668                // Rebuild peer_map entries for new validators using PeerBook
669                for (vid, pubkey) in &validators {
670                    if self.peer_map.validator_to_peer.contains_key(vid) {
671                        continue;
672                    }
673                    // Try to find PeerId from PeerBook by looking up the public key
674                    let pk_bytes = &pubkey.0;
675                    if let Ok(lpk) = litep2p::crypto::ed25519::PublicKey::try_from_bytes(pk_bytes) {
676                        let peer_id = lpk.to_peer_id();
677                        info!(validator = %vid, peer = %peer_id, "adding new epoch validator to peer_map");
678                        self.peer_map.insert(*vid, peer_id);
679                    }
680                }
681                // Remove validators no longer in the set
682                let new_ids: std::collections::HashSet<ValidatorId> =
683                    validators.iter().map(|(vid, _)| *vid).collect();
684                let to_remove: Vec<ValidatorId> = self
685                    .peer_map
686                    .validator_to_peer
687                    .keys()
688                    .filter(|vid| !new_ids.contains(vid))
689                    .copied()
690                    .collect();
691                for vid in to_remove {
692                    info!(validator = %vid, "removing validator from peer_map after epoch change");
693                    self.peer_map.remove(vid);
694                }
695                self.update_peer_info();
696            }
697        }
698    }
699}
700
701/// NetworkSink backed by litep2p, for use by the consensus engine.
702/// Also provides methods for peer management and sync.
703#[derive(Clone)]
704pub struct Litep2pNetworkSink {
705    cmd_tx: mpsc::Sender<NetCommand>,
706}
707
708impl Litep2pNetworkSink {
709    pub fn add_peer(&self, vid: ValidatorId, pid: PeerId, addrs: Vec<Multiaddr>) {
710        if let Err(e) = self.cmd_tx.try_send(NetCommand::AddPeer(vid, pid, addrs)) {
711            warn!("add_peer cmd dropped: {e}");
712        }
713    }
714
715    pub fn remove_peer(&self, vid: ValidatorId) {
716        if let Err(e) = self.cmd_tx.try_send(NetCommand::RemovePeer(vid)) {
717            warn!("remove_peer cmd dropped: {e}");
718        }
719    }
720
721    pub fn send_sync_request(&self, peer_id: PeerId, request: &SyncRequest) {
722        if let Ok(bytes) = serde_cbor_2::to_vec(request)
723            && let Err(e) = self
724                .cmd_tx
725                .try_send(NetCommand::SyncRequest(peer_id, bytes))
726        {
727            warn!("sync request cmd dropped: {e}");
728        }
729    }
730
731    pub fn send_sync_response(&self, request_id: RequestId, response: &SyncResponse) {
732        if let Ok(bytes) = serde_cbor_2::to_vec(response)
733            && let Err(e) = self
734                .cmd_tx
735                .try_send(NetCommand::SyncRespond(request_id, bytes))
736        {
737            warn!("sync response cmd dropped: {e}");
738        }
739    }
740}
741
742impl NetworkSink for Litep2pNetworkSink {
743    fn broadcast(&self, msg: ConsensusMessage) {
744        if let Ok(bytes) = serde_cbor_2::to_vec(&msg)
745            && let Err(e) = self.cmd_tx.try_send(NetCommand::Broadcast(bytes))
746        {
747            warn!("broadcast cmd dropped: {e}");
748        }
749    }
750
751    fn send_to(&self, target: ValidatorId, msg: ConsensusMessage) {
752        if let Ok(bytes) = serde_cbor_2::to_vec(&msg)
753            && let Err(e) = self.cmd_tx.try_send(NetCommand::SendTo(target, bytes))
754        {
755            warn!("send_to cmd dropped for {target}: {e}");
756        }
757    }
758
759    fn on_epoch_change(&self, new_validator_set: &hotmint_types::ValidatorSet) {
760        let validators: Vec<_> = new_validator_set
761            .validators()
762            .iter()
763            .map(|v| (v.id, v.public_key.clone()))
764            .collect();
765        if let Err(e) = self.cmd_tx.try_send(NetCommand::EpochChange(validators)) {
766            warn!("epoch change cmd dropped: {e}");
767        }
768    }
769}