1use ruc::*;
2
3use std::collections::{HashMap, HashSet};
4use std::iter;
5use std::mem;
6use std::time::Instant;
7
8use futures::StreamExt;
9use hotmint_consensus::network::NetworkSink;
10use hotmint_types::sync::{SyncRequest, SyncResponse};
11use hotmint_types::{ConsensusMessage, ValidatorId};
12use litep2p::config::ConfigBuilder;
13use litep2p::protocol::notification::{
14 ConfigBuilder as NotifConfigBuilder, NotificationEvent, NotificationHandle, ValidationResult,
15};
16use litep2p::protocol::request_response::{
17 ConfigBuilder as ReqRespConfigBuilder, DialOptions, RequestResponseEvent, RequestResponseHandle,
18};
19use litep2p::transport::tcp::config::Config as TcpConfig;
20use litep2p::types::RequestId;
21use litep2p::types::multiaddr::Multiaddr;
22use litep2p::{Litep2p, Litep2pEvent, PeerId};
23use serde::{Deserialize, Serialize};
24use tokio::sync::{mpsc, watch};
25use tracing::{debug, info, trace, warn};
26
27use std::sync::Arc;
28
29use tokio::sync::RwLock;
30
31use crate::codec;
32use crate::peer::{PeerBook, PeerInfo, PeerRole};
33use crate::pex::{PexConfig, PexRequest, PexResponse};
34
35const NOTIF_PROTOCOL: &str = "/hotmint/consensus/notif/1";
36const REQ_RESP_PROTOCOL: &str = "/hotmint/consensus/reqresp/1";
37const SYNC_PROTOCOL: &str = "/hotmint/sync/1";
38const PEX_PROTOCOL: &str = "/hotmint/pex/1";
39const MAX_NOTIFICATION_SIZE: usize = 16 * 1024 * 1024;
40const MAINTENANCE_INTERVAL_SECS: u64 = 10;
41
42#[derive(Clone)]
44pub struct PeerMap {
45 pub validator_to_peer: HashMap<ValidatorId, PeerId>,
46 pub peer_to_validator: HashMap<PeerId, ValidatorId>,
47}
48
49impl PeerMap {
50 pub fn new() -> Self {
51 Self {
52 validator_to_peer: HashMap::new(),
53 peer_to_validator: HashMap::new(),
54 }
55 }
56
57 pub fn insert(&mut self, vid: ValidatorId, pid: PeerId) {
58 self.validator_to_peer.insert(vid, pid);
59 self.peer_to_validator.insert(pid, vid);
60 }
61
62 pub fn remove(&mut self, vid: ValidatorId) -> Option<PeerId> {
63 if let Some(pid) = self.validator_to_peer.remove(&vid) {
64 self.peer_to_validator.remove(&pid);
65 Some(pid)
66 } else {
67 None
68 }
69 }
70}
71
72impl Default for PeerMap {
73 fn default() -> Self {
74 Self::new()
75 }
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct PeerStatus {
81 pub validator_id: ValidatorId,
82 pub peer_id: String,
83}
84
85pub enum NetCommand {
87 Broadcast(Vec<u8>),
88 SendTo(ValidatorId, Vec<u8>),
89 AddPeer(ValidatorId, PeerId, Vec<Multiaddr>),
90 RemovePeer(ValidatorId),
91 SyncRequest(PeerId, Vec<u8>),
93 SyncRespond(RequestId, Vec<u8>),
95 EpochChange(Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>),
97}
98
99pub struct IncomingSyncRequest {
101 pub request_id: RequestId,
102 pub peer: PeerId,
103 pub request: SyncRequest,
104}
105
106pub struct NetworkServiceHandles {
108 pub service: NetworkService,
109 pub sink: Litep2pNetworkSink,
110 pub msg_rx: mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>,
111 pub sync_req_rx: mpsc::Receiver<IncomingSyncRequest>,
112 pub sync_resp_rx: mpsc::Receiver<SyncResponse>,
113 pub peer_info_rx: watch::Receiver<Vec<PeerStatus>>,
114 pub connected_count_rx: watch::Receiver<usize>,
115 pub notif_connected_count_rx: watch::Receiver<usize>,
119}
120
121pub struct NetworkService {
123 litep2p: Litep2p,
124 notif_handle: NotificationHandle,
125 reqresp_handle: RequestResponseHandle,
126 sync_handle: RequestResponseHandle,
127 pex_handle: RequestResponseHandle,
128 peer_map: PeerMap,
129 peer_book: Arc<RwLock<PeerBook>>,
130 pex_config: PexConfig,
131 persistent_peers: HashMap<ValidatorId, PeerId>,
132 initial_dial_addresses: Vec<Multiaddr>,
134 relay_consensus: bool,
136 validator_keys: HashMap<ValidatorId, hotmint_types::crypto::PublicKey>,
139 validator_ids_ordered: Vec<ValidatorId>,
142 chain_id_hash: [u8; 32],
144 msg_tx: mpsc::Sender<(Option<ValidatorId>, ConsensusMessage)>,
145 cmd_rx: mpsc::Receiver<NetCommand>,
146 sync_req_tx: mpsc::Sender<IncomingSyncRequest>,
147 sync_resp_tx: mpsc::Sender<SyncResponse>,
148 peer_info_tx: watch::Sender<Vec<PeerStatus>>,
149 connected_count_tx: watch::Sender<usize>,
150 notif_connected_peers: HashSet<PeerId>,
152 notif_connected_count_tx: watch::Sender<usize>,
153 connected_peers: HashSet<PeerId>,
154 seen_active: HashSet<u64>,
160 seen_backup: HashSet<u64>,
161 epoch_rx: watch::Receiver<Option<Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>>>,
163 pex_rate_limit: HashMap<PeerId, Instant>,
165}
166
167pub struct NetworkConfig {
169 pub listen_addr: Multiaddr,
170 pub peer_map: PeerMap,
171 pub known_addresses: Vec<(PeerId, Vec<Multiaddr>)>,
172 pub keypair: Option<litep2p::crypto::ed25519::Keypair>,
173 pub peer_book: Arc<RwLock<PeerBook>>,
174 pub pex_config: PexConfig,
175 pub relay_consensus: bool,
176 pub initial_validators: Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>,
177 pub chain_id_hash: [u8; 32],
178}
179
180impl NetworkService {
181 pub fn create(config: NetworkConfig) -> Result<NetworkServiceHandles> {
183 let NetworkConfig {
184 listen_addr,
185 peer_map,
186 known_addresses,
187 keypair,
188 peer_book,
189 pex_config,
190 relay_consensus,
191 initial_validators,
192 chain_id_hash,
193 } = config;
194 let (notif_config, notif_handle) = NotifConfigBuilder::new(NOTIF_PROTOCOL.into())
195 .with_max_size(MAX_NOTIFICATION_SIZE)
196 .with_handshake(vec![])
197 .with_auto_accept_inbound(true)
198 .with_sync_channel_size(1024)
199 .with_async_channel_size(1024)
200 .build();
201
202 let (reqresp_config, reqresp_handle) = ReqRespConfigBuilder::new(REQ_RESP_PROTOCOL.into())
203 .with_max_size(MAX_NOTIFICATION_SIZE)
204 .build();
205
206 let (sync_config, sync_handle) = ReqRespConfigBuilder::new(SYNC_PROTOCOL.into())
207 .with_max_size(MAX_NOTIFICATION_SIZE)
208 .build();
209
210 let (pex_config_proto, pex_handle) = ReqRespConfigBuilder::new(PEX_PROTOCOL.into())
211 .with_max_size(1024 * 1024) .build();
213
214 let mut config_builder = ConfigBuilder::new()
215 .with_tcp(TcpConfig {
216 listen_addresses: vec![listen_addr],
217 ..Default::default()
218 })
219 .with_notification_protocol(notif_config)
220 .with_request_response_protocol(reqresp_config)
221 .with_request_response_protocol(sync_config)
222 .with_request_response_protocol(pex_config_proto);
223
224 if let Some(kp) = keypair {
225 config_builder = config_builder.with_keypair(kp);
226 }
227
228 let initial_dial_addresses: Vec<Multiaddr> = known_addresses
230 .iter()
231 .flat_map(|(pid, addrs)| {
232 addrs.iter().map(move |addr| {
233 let mut full = addr.clone();
234 full.push(litep2p::types::multiaddr::Protocol::P2p((*pid).into()));
235 full
236 })
237 })
238 .collect();
239
240 if !known_addresses.is_empty() {
241 config_builder = config_builder.with_known_addresses(known_addresses.into_iter());
242 }
243
244 let litep2p =
245 Litep2p::new(config_builder.build()).c(d!("failed to create litep2p instance"))?;
246
247 info!(peer_id = %litep2p.local_peer_id(), "litep2p started");
248 for addr in litep2p.listen_addresses() {
249 info!(address = %addr, "listening on");
250 }
251
252 let (msg_tx, msg_rx) = mpsc::channel(8192);
253 let (cmd_tx, cmd_rx) = mpsc::channel(4096);
254 let (sync_req_tx, sync_req_rx) = mpsc::channel(256);
255 let (sync_resp_tx, sync_resp_rx) = mpsc::channel(256);
256
257 let initial_peers: Vec<PeerStatus> = peer_map
259 .validator_to_peer
260 .iter()
261 .map(|(&vid, pid)| PeerStatus {
262 validator_id: vid,
263 peer_id: pid.to_string(),
264 })
265 .collect();
266 let (peer_info_tx, peer_info_rx) = watch::channel(initial_peers);
267
268 let (epoch_tx, epoch_rx) =
269 watch::channel::<Option<Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>>>(None);
270
271 let sink = Litep2pNetworkSink {
272 cmd_tx: cmd_tx.clone(),
273 epoch_tx,
274 };
275
276 let (connected_count_tx, connected_count_rx) = watch::channel(0usize);
277 let (notif_connected_count_tx, notif_connected_count_rx) = watch::channel(0usize);
278
279 let persistent_peers: HashMap<ValidatorId, PeerId> = peer_map.validator_to_peer.clone();
281
282 let validator_ids_ordered: Vec<ValidatorId> =
283 initial_validators.iter().map(|(vid, _)| *vid).collect();
284 let validator_keys: HashMap<ValidatorId, hotmint_types::crypto::PublicKey> =
285 initial_validators.into_iter().collect();
286
287 Ok(NetworkServiceHandles {
288 service: Self {
289 litep2p,
290 notif_handle,
291 reqresp_handle,
292 sync_handle,
293 pex_handle,
294 peer_map,
295 peer_book,
296 pex_config,
297 persistent_peers,
298 initial_dial_addresses,
299 relay_consensus,
300 validator_keys,
301 validator_ids_ordered,
302 chain_id_hash,
303 msg_tx,
304 cmd_rx,
305 sync_req_tx,
306 sync_resp_tx,
307 peer_info_tx,
308 connected_count_tx,
309 notif_connected_peers: HashSet::new(),
310 notif_connected_count_tx,
311 connected_peers: HashSet::new(),
312 seen_active: HashSet::new(),
313 seen_backup: HashSet::new(),
314 epoch_rx,
315 pex_rate_limit: HashMap::new(),
316 },
317 sink,
318 msg_rx,
319 sync_req_rx,
320 sync_resp_rx,
321 peer_info_rx,
322 connected_count_rx,
323 notif_connected_count_rx,
324 })
325 }
326
327 pub fn local_peer_id(&self) -> &PeerId {
328 self.litep2p.local_peer_id()
329 }
330
331 pub async fn run(mut self) {
333 for addr in mem::take(&mut self.initial_dial_addresses) {
335 if let Err(e) = self.litep2p.dial_address(addr.clone()).await {
336 debug!(address = %addr, error = ?e, "initial dial failed (will retry)");
337 }
338 }
339
340 let mut maintenance_interval =
341 tokio::time::interval(tokio::time::Duration::from_secs(MAINTENANCE_INTERVAL_SECS));
342 let mut pex_interval = tokio::time::interval(tokio::time::Duration::from_secs(
343 self.pex_config.request_interval_secs,
344 ));
345 loop {
346 tokio::select! {
347 event = self.notif_handle.next() => {
348 if let Some(event) = event {
349 self.handle_notification_event(event).await;
350 }
351 }
352 event = self.reqresp_handle.next() => {
353 if let Some(event) = event {
354 self.handle_reqresp_event(event);
355 }
356 }
357 event = self.sync_handle.next() => {
358 if let Some(event) = event {
359 self.handle_sync_event(event).await;
360 }
361 }
362 event = self.pex_handle.next() => {
363 if let Some(event) = event {
364 self.handle_pex_event(event).await;
365 }
366 }
367 event = self.litep2p.next_event() => {
368 if let Some(event) = event {
369 self.handle_litep2p_event(event).await;
370 }
371 }
372 Some(cmd) = self.cmd_rx.recv() => {
373 self.handle_command(cmd).await;
374 }
375 Ok(()) = self.epoch_rx.changed() => {
376 let epoch_val = self.epoch_rx.borrow_and_update().clone();
377 if let Some(validators) = epoch_val {
378 self.handle_epoch_change(validators).await;
379 }
380 }
381 _ = maintenance_interval.tick() => {
382 self.run_maintenance().await;
383 }
384 _ = pex_interval.tick() => {
385 if self.pex_config.enabled {
386 self.run_pex_round().await;
387 }
388 }
389 }
390 }
391 }
392
393 async fn handle_notification_event(&mut self, event: NotificationEvent) {
394 match event {
395 NotificationEvent::ValidateSubstream { peer, .. } => {
396 self.notif_handle
397 .send_validation_result(peer, ValidationResult::Accept);
398 }
399 NotificationEvent::NotificationStreamOpened { peer, .. } => {
400 info!(peer = %peer, "notification stream opened");
401 self.notif_connected_peers.insert(peer);
402 let _ = self
403 .notif_connected_count_tx
404 .send(self.notif_connected_peers.len());
405 }
406 NotificationEvent::NotificationStreamClosed { peer } => {
407 debug!(peer = %peer, "notification stream closed");
408 self.notif_connected_peers.remove(&peer);
409 let _ = self
410 .notif_connected_count_tx
411 .send(self.notif_connected_peers.len());
412 }
413 NotificationEvent::NotificationReceived { peer, notification } => {
414 let sender: Option<ValidatorId> =
416 self.peer_map.peer_to_validator.get(&peer).copied();
417
418 match codec::decode::<ConsensusMessage>(¬ification) {
419 Ok(msg) => {
420 if sender.is_some()
422 && let Err(e) = self.msg_tx.try_send((sender, msg.clone()))
423 {
424 warn!("consensus message dropped (notification): {e}");
425 }
426
427 if self.relay_consensus
432 && let Some(sid) = sender
433 && hotmint_consensus::engine::verify_relay_sender(
434 sid,
435 &msg,
436 &self.validator_keys,
437 &self.validator_ids_ordered,
438 &self.chain_id_hash,
439 )
440 {
441 let msg_hash = u64::from_le_bytes(
442 blake3::hash(¬ification).as_bytes()[..8]
443 .try_into()
444 .unwrap(),
445 );
446
447 if !self.seen_active.contains(&msg_hash)
449 && !self.seen_backup.contains(&msg_hash)
450 {
451 self.seen_active.insert(msg_hash);
452 let raw = notification.to_vec();
453 for &other in &self.connected_peers {
454 if other != peer {
455 let _ = self
456 .notif_handle
457 .send_sync_notification(other, raw.clone());
458 }
459 }
460 if self.seen_active.len() > 10_000 {
462 self.seen_backup = mem::take(&mut self.seen_active);
463 }
464 }
465 self.peer_book
467 .write()
468 .await
469 .adjust_score(&peer.to_string(), 1);
470 }
471 }
472 Err(e) => {
473 warn!(error = %e, peer = %peer, "failed to decode notification");
474 self.peer_book
475 .write()
476 .await
477 .adjust_score(&peer.to_string(), -10);
478 }
479 }
480 }
481 NotificationEvent::NotificationStreamOpenFailure { peer, error } => {
482 warn!(peer = %peer, error = ?error, "notification stream open failed");
483 }
484 }
485 }
486
487 fn handle_reqresp_event(&mut self, event: RequestResponseEvent) {
488 match event {
489 RequestResponseEvent::RequestReceived {
490 peer,
491 request_id,
492 request,
493 ..
494 } => {
495 let Some(sender) = self.peer_map.peer_to_validator.get(&peer).copied() else {
496 warn!(peer = %peer, "dropping request from unknown peer");
497 self.reqresp_handle.reject_request(request_id);
498 return;
499 };
500 match codec::decode::<ConsensusMessage>(&request) {
501 Ok(msg) => {
502 if let Err(e) = self.msg_tx.try_send((Some(sender), msg)) {
503 warn!("consensus message dropped (reqresp): {e}");
504 }
505 self.reqresp_handle.send_response(request_id, vec![]);
506 }
507 Err(e) => {
508 warn!(error = %e, "failed to decode request");
509 self.reqresp_handle.reject_request(request_id);
510 }
511 }
512 }
513 RequestResponseEvent::ResponseReceived { .. } => {}
514 RequestResponseEvent::RequestFailed { peer, error, .. } => {
515 debug!(peer = %peer, error = ?error, "request failed");
516 }
517 }
518 }
519
520 async fn handle_sync_event(&mut self, event: RequestResponseEvent) {
521 match event {
522 RequestResponseEvent::RequestReceived {
523 peer,
524 request_id,
525 request,
526 ..
527 } => {
528 if !self.connected_peers.contains(&peer) {
529 warn!(peer = %peer, "rejecting sync request from unconnected peer");
530 self.sync_handle.reject_request(request_id);
531 return;
532 }
533 match codec::decode::<SyncRequest>(&request) {
534 Ok(req) => {
535 if let Err(e) = self.sync_req_tx.try_send(IncomingSyncRequest {
536 request_id,
537 peer,
538 request: req,
539 }) {
540 warn!("sync request dropped: {e}");
541 }
542 }
543 Err(e) => {
544 warn!(error = %e, peer = %peer, "failed to decode sync request");
545 self.peer_book
546 .write()
547 .await
548 .adjust_score(&peer.to_string(), -5);
549 let err_resp = SyncResponse::Error(format!("decode error: {e}"));
550 if let Ok(bytes) = codec::encode(&err_resp) {
551 self.sync_handle.send_response(request_id, bytes);
552 } else {
553 self.sync_handle.reject_request(request_id);
554 }
555 }
556 }
557 }
558 RequestResponseEvent::ResponseReceived {
559 request_id: _,
560 response,
561 ..
562 } => {
563 match codec::decode::<SyncResponse>(&response) {
565 Ok(resp) => {
566 if let Err(e) = self.sync_resp_tx.try_send(resp) {
567 warn!("sync response dropped: {e}");
568 }
569 }
570 Err(e) => {
571 warn!(error = %e, "failed to decode sync response");
572 }
573 }
574 }
575 RequestResponseEvent::RequestFailed { peer, error, .. } => {
576 debug!(peer = %peer, error = ?error, "sync request failed");
577 if let Err(e) = self
578 .sync_resp_tx
579 .try_send(SyncResponse::Error(format!("request failed: {error:?}")))
580 {
581 warn!("sync error response dropped: {e}");
582 }
583 }
584 }
585 }
586
587 async fn handle_pex_event(&mut self, event: RequestResponseEvent) {
588 match event {
589 RequestResponseEvent::RequestReceived {
590 peer,
591 request_id,
592 request,
593 ..
594 } => {
595 if !self.peer_map.peer_to_validator.contains_key(&peer)
597 && !self.connected_peers.contains(&peer)
598 {
599 warn!(peer = %peer, "rejecting PEX request from unknown peer");
600 self.pex_handle.reject_request(request_id);
601 return;
602 }
603 let now = Instant::now();
605 if let Some(last) = self.pex_rate_limit.get(&peer)
606 && now.duration_since(*last) < std::time::Duration::from_secs(10)
607 {
608 self.pex_handle.reject_request(request_id);
609 return;
610 }
611 self.pex_rate_limit.insert(peer, now);
612 match serde_cbor_2::from_slice::<PexRequest>(&request) {
613 Ok(PexRequest::GetPeers) => {
614 let book = self.peer_book.read().await;
615 let private = &self.pex_config.private_peer_ids;
616 let peers: Vec<PeerInfo> = book
617 .get_random_peers(self.pex_config.max_peers_per_response)
618 .into_iter()
619 .filter(|p| p.peer_id != peer.to_string())
620 .filter(|p| !private.contains(&p.peer_id))
622 .cloned()
623 .collect();
624 let resp = PexResponse::Peers(peers);
625 if let Ok(bytes) = serde_cbor_2::to_vec(&resp) {
626 self.pex_handle.send_response(request_id, bytes);
627 }
628 }
629 Ok(PexRequest::Advertise {
630 role,
631 validator_id,
632 addresses,
633 }) => {
634 if role == PeerRole::Validator && validator_id.is_none() {
636 warn!(peer = %peer, "PEX Advertise claims validator role without validator_id");
637 self.pex_handle.reject_request(request_id);
638 return;
639 }
640 if let Some(vid) = validator_id
642 && let Some(&expected_peer) =
643 self.peer_map.validator_to_peer.get(&ValidatorId(vid))
644 && expected_peer != peer
645 {
646 warn!(
647 peer = %peer,
648 claimed_vid = vid,
649 "PEX Advertise validator_id mismatch, rejecting"
650 );
651 self.pex_handle.reject_request(request_id);
652 return;
653 }
654 let addresses: Vec<_> = addresses.iter().take(8).cloned().collect();
656 let mut info = PeerInfo::new(
657 peer,
658 role,
659 addresses.iter().filter_map(|a| a.parse().ok()).collect(),
660 );
661 if let Some(vid) = validator_id {
662 info = info.with_validator(ValidatorId(vid));
663 }
664 self.peer_book.write().await.add_peer(info);
665 if let Ok(bytes) = serde_cbor_2::to_vec(&PexResponse::Ack) {
666 self.pex_handle.send_response(request_id, bytes);
667 }
668 }
669 Err(e) => {
670 warn!(error = %e, "failed to decode PEX request");
671 self.pex_handle.reject_request(request_id);
672 }
673 }
674 }
675 RequestResponseEvent::ResponseReceived { response, .. } => {
676 if let Ok(PexResponse::Peers(peers)) = serde_cbor_2::from_slice(&response) {
677 let mut book = self.peer_book.write().await;
678 for peer in peers {
679 if !peer.is_banned() {
680 book.add_peer(peer);
681 }
682 }
683 }
684 }
685 other => {
687 trace!("unhandled PEX event: {other:?}");
688 }
689 }
690 }
691
692 async fn run_maintenance(&mut self) {
694 let to_dial: Vec<(PeerId, Vec<Multiaddr>)> = {
697 let book = self.peer_book.read().await;
698 self.persistent_peers
699 .values()
700 .filter(|pid| !self.connected_peers.contains(pid))
701 .filter_map(|&pid| {
702 book.get(&pid.to_string()).map(|info| {
703 let addrs: Vec<Multiaddr> = info
704 .addresses
705 .iter()
706 .filter_map(|a| a.parse().ok())
707 .collect();
708 (pid, addrs)
709 })
710 })
711 .collect()
712 };
713
714 for (pid, addrs) in to_dial {
715 if !addrs.is_empty() {
716 self.litep2p
717 .add_known_address(pid, addrs.clone().into_iter());
718 let mut dial_addr = addrs[0].clone();
720 dial_addr.push(litep2p::types::multiaddr::Protocol::P2p(pid.into()));
721 if let Err(e) = self.litep2p.dial_address(dial_addr).await {
722 debug!(peer = %pid, error = ?e, "persistent peer redial failed");
723 }
724 }
725 }
726
727 let max = self.pex_config.max_peers;
729 if self.connected_peers.len() < max * 4 / 5 {
730 let book = self.peer_book.read().await;
731 let candidates = book.get_random_peers(5);
732 for peer in candidates {
733 if let Ok(pid) = peer.peer_id.parse::<PeerId>()
734 && !self.connected_peers.contains(&pid)
735 {
736 let addrs: Vec<Multiaddr> = peer
737 .addresses
738 .iter()
739 .filter_map(|a| a.parse().ok())
740 .collect();
741 if !addrs.is_empty() {
742 self.litep2p.add_known_address(pid, addrs.into_iter());
743 }
744 }
745 }
746 }
747
748 self.peer_book.write().await.prune_stale(86400);
750 if let Err(e) = self.peer_book.read().await.save() {
751 warn!(%e, "failed to save peer book");
752 }
753 }
754
755 async fn run_pex_round(&mut self) {
757 if self.connected_peers.is_empty() {
758 return;
759 }
760 let peers: Vec<PeerId> = self.connected_peers.iter().copied().collect();
762 let idx = rand::random::<usize>() % peers.len();
763 let target = peers[idx];
764
765 if let Ok(bytes) = serde_cbor_2::to_vec(&PexRequest::GetPeers) {
766 let _ = self
767 .pex_handle
768 .send_request(target, bytes, DialOptions::Reject)
769 .await;
770 }
771 }
772
773 async fn handle_litep2p_event(&mut self, event: Litep2pEvent) {
774 match event {
775 Litep2pEvent::ConnectionEstablished { peer, endpoint } => {
776 if self.connected_peers.len() >= self.pex_config.max_peers {
778 warn!(
779 peer = %peer,
780 total = self.connected_peers.len(),
781 max = self.pex_config.max_peers,
782 "connection limit reached, ignoring new peer"
783 );
784 return;
785 }
786
787 info!(peer = %peer, endpoint = ?endpoint, "connection established");
788 self.connected_peers.insert(peer);
789 let _ = self.connected_count_tx.send(self.connected_peers.len());
790
791 if let Err(e) = self.notif_handle.try_open_substream_batch(iter::once(peer)) {
793 debug!(peer = %peer, error = ?e, "failed to open notification substream");
794 }
795
796 if let Some(info) = self.peer_book.write().await.get_mut(&peer.to_string()) {
798 info.touch();
799 }
800 }
801 Litep2pEvent::ConnectionClosed { peer, .. } => {
802 debug!(peer = %peer, "connection closed");
803 self.connected_peers.remove(&peer);
804 let _ = self.connected_count_tx.send(self.connected_peers.len());
805 if self.notif_connected_peers.remove(&peer) {
808 let _ = self
809 .notif_connected_count_tx
810 .send(self.notif_connected_peers.len());
811 }
812 }
813 Litep2pEvent::DialFailure { address, error, .. } => {
814 warn!(address = %address, error = ?error, "dial failed");
815 }
816 other => {
818 trace!(?other, "unhandled litep2p event");
819 }
820 }
821 }
822
823 fn update_peer_info(&self) {
824 let peers: Vec<PeerStatus> = self
825 .peer_map
826 .validator_to_peer
827 .iter()
828 .map(|(&vid, pid)| PeerStatus {
829 validator_id: vid,
830 peer_id: pid.to_string(),
831 })
832 .collect();
833 let _ = self.peer_info_tx.send(peers);
834 }
835
836 async fn handle_command(&mut self, cmd: NetCommand) {
837 match cmd {
838 NetCommand::Broadcast(bytes) => {
839 for &peer in &self.connected_peers {
841 let _ = self
842 .notif_handle
843 .send_sync_notification(peer, bytes.clone());
844 }
845 }
846 NetCommand::SendTo(target, bytes) => {
847 if let Some(&peer_id) = self.peer_map.validator_to_peer.get(&target) {
848 let _ = self
849 .reqresp_handle
850 .send_request(peer_id, bytes, DialOptions::Reject)
851 .await;
852 }
853 }
854 NetCommand::AddPeer(vid, pid, addrs) => {
855 info!(validator = %vid, peer = %pid, "adding peer");
856 self.peer_map.insert(vid, pid);
857 self.litep2p.add_known_address(pid, addrs.into_iter());
858 self.update_peer_info();
859 }
860 NetCommand::RemovePeer(vid) => {
861 if let Some(pid) = self.peer_map.remove(vid) {
862 info!(validator = %vid, peer = %pid, "removed peer");
863 } else {
864 warn!(validator = %vid, "peer not found for removal");
865 }
866 self.update_peer_info();
867 }
868 NetCommand::SyncRequest(peer_id, bytes) => {
869 let _ = self
870 .sync_handle
871 .send_request(peer_id, bytes, DialOptions::Reject)
872 .await;
873 }
874 NetCommand::SyncRespond(request_id, bytes) => {
875 self.sync_handle.send_response(request_id, bytes);
876 }
877 NetCommand::EpochChange(validators) => {
878 self.handle_epoch_change(validators).await;
879 }
880 }
881 }
882
883 async fn handle_epoch_change(
884 &mut self,
885 validators: Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>,
886 ) {
887 for (vid, pubkey) in &validators {
889 if self.peer_map.validator_to_peer.contains_key(vid) {
890 continue;
891 }
892 let pk_bytes = &pubkey.0;
894 if let Ok(lpk) = litep2p::crypto::ed25519::PublicKey::try_from_bytes(pk_bytes) {
895 let peer_id = lpk.to_peer_id();
896 info!(validator = %vid, peer = %peer_id, "adding new epoch validator to peer_map");
897 self.peer_map.insert(*vid, peer_id);
898 }
899 }
900 let new_ids: HashSet<ValidatorId> = validators.iter().map(|(vid, _)| *vid).collect();
902 let to_remove: Vec<ValidatorId> = self
903 .peer_map
904 .validator_to_peer
905 .keys()
906 .filter(|vid| !new_ids.contains(vid))
907 .copied()
908 .collect();
909 for vid in to_remove {
910 info!(validator = %vid, "removing validator from peer_map after epoch change");
911 self.peer_map.remove(vid);
912 }
913 self.validator_ids_ordered = validators.iter().map(|(vid, _)| *vid).collect();
915 self.validator_keys = validators.into_iter().collect();
916 self.persistent_peers = self.peer_map.validator_to_peer.clone();
918 self.update_peer_info();
919 }
920}
921
922#[derive(Clone)]
925pub struct Litep2pNetworkSink {
926 cmd_tx: mpsc::Sender<NetCommand>,
927 epoch_tx: watch::Sender<Option<Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>>>,
928}
929
930impl Litep2pNetworkSink {
931 pub fn add_peer(&self, vid: ValidatorId, pid: PeerId, addrs: Vec<Multiaddr>) {
932 if let Err(e) = self.cmd_tx.try_send(NetCommand::AddPeer(vid, pid, addrs)) {
933 warn!("add_peer cmd dropped: {e}");
934 }
935 }
936
937 pub fn remove_peer(&self, vid: ValidatorId) {
938 if let Err(e) = self.cmd_tx.try_send(NetCommand::RemovePeer(vid)) {
939 warn!("remove_peer cmd dropped: {e}");
940 }
941 }
942
943 pub fn send_sync_request(&self, peer_id: PeerId, request: &SyncRequest) {
944 match codec::encode(request) {
945 Ok(bytes) => {
946 if let Err(e) = self
947 .cmd_tx
948 .try_send(NetCommand::SyncRequest(peer_id, bytes))
949 {
950 warn!("sync request cmd dropped: {e}");
951 }
952 }
953 Err(e) => warn!("sync request encode failed: {e}"),
954 }
955 }
956
957 pub fn send_sync_response(&self, request_id: RequestId, response: &SyncResponse) {
958 match codec::encode(response) {
959 Ok(bytes) => {
960 if let Err(e) = self
961 .cmd_tx
962 .try_send(NetCommand::SyncRespond(request_id, bytes))
963 {
964 warn!("sync response cmd dropped: {e}");
965 }
966 }
967 Err(e) => warn!("sync response encode failed: {e}"),
968 }
969 }
970}
971
972impl NetworkSink for Litep2pNetworkSink {
973 fn broadcast(&self, msg: ConsensusMessage) {
974 match codec::encode(&msg) {
975 Ok(bytes) => {
976 if let Err(e) = self.cmd_tx.try_send(NetCommand::Broadcast(bytes)) {
977 warn!("broadcast cmd dropped: {e}");
978 }
979 }
980 Err(e) => warn!("broadcast encode failed: {e}"),
981 }
982 }
983
984 fn send_to(&self, target: ValidatorId, msg: ConsensusMessage) {
985 match codec::encode(&msg) {
986 Ok(bytes) => {
987 if let Err(e) = self.cmd_tx.try_send(NetCommand::SendTo(target, bytes)) {
988 warn!("send_to cmd dropped for {target}: {e}");
989 }
990 }
991 Err(e) => warn!("send_to encode failed for {target}: {e}"),
992 }
993 }
994
995 fn on_epoch_change(&self, new_validator_set: &hotmint_types::ValidatorSet) {
996 let validators: Vec<_> = new_validator_set
997 .validators()
998 .iter()
999 .map(|v| (v.id, v.public_key.clone()))
1000 .collect();
1001 let _ = self.epoch_tx.send(Some(validators));
1003 }
1004}