1use ruc::*;
2
3use std::collections::{HashMap, HashSet};
4use std::iter;
5use std::mem;
6use std::time::Instant;
7
8use futures::StreamExt;
9use hotmint_consensus::network::NetworkSink;
10use hotmint_types::epoch::EpochNumber;
11use hotmint_types::sync::{SyncRequest, SyncResponse};
12use hotmint_types::{ConsensusMessage, ValidatorId};
13use litep2p::config::ConfigBuilder;
14use litep2p::protocol::notification::{
15 ConfigBuilder as NotifConfigBuilder, NotificationEvent, NotificationHandle, ValidationResult,
16};
17use litep2p::protocol::request_response::{
18 ConfigBuilder as ReqRespConfigBuilder, DialOptions, RequestResponseEvent, RequestResponseHandle,
19};
20use litep2p::transport::tcp::config::Config as TcpConfig;
21use litep2p::types::RequestId;
22use litep2p::types::multiaddr::Multiaddr;
23use litep2p::{Litep2p, Litep2pEvent, PeerId};
24use serde::{Deserialize, Serialize};
25use tokio::sync::{mpsc, watch};
26use tracing::{debug, info, trace, warn};
27
28type EpochUpdate = Option<(
30 EpochNumber,
31 Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>,
32)>;
33
34use std::sync::Arc;
35
36use tokio::sync::RwLock;
37
38use crate::codec;
39use crate::peer::{PeerBook, PeerInfo, PeerRole};
40use crate::pex::{PexConfig, PexRequest, PexResponse};
41
42const NOTIF_PROTOCOL: &str = "/hotmint/consensus/notif/1";
43const REQ_RESP_PROTOCOL: &str = "/hotmint/consensus/reqresp/1";
44const SYNC_PROTOCOL: &str = "/hotmint/sync/1";
45const PEX_PROTOCOL: &str = "/hotmint/pex/1";
46const MAX_NOTIFICATION_SIZE: usize = 16 * 1024 * 1024;
47const MAINTENANCE_INTERVAL_SECS: u64 = 10;
48
49#[derive(Clone)]
51pub struct PeerMap {
52 pub validator_to_peer: HashMap<ValidatorId, PeerId>,
53 pub peer_to_validator: HashMap<PeerId, ValidatorId>,
54}
55
56impl PeerMap {
57 pub fn new() -> Self {
58 Self {
59 validator_to_peer: HashMap::new(),
60 peer_to_validator: HashMap::new(),
61 }
62 }
63
64 pub fn insert(&mut self, vid: ValidatorId, pid: PeerId) {
65 self.validator_to_peer.insert(vid, pid);
66 self.peer_to_validator.insert(pid, vid);
67 }
68
69 pub fn remove(&mut self, vid: ValidatorId) -> Option<PeerId> {
70 if let Some(pid) = self.validator_to_peer.remove(&vid) {
71 self.peer_to_validator.remove(&pid);
72 Some(pid)
73 } else {
74 None
75 }
76 }
77}
78
79impl Default for PeerMap {
80 fn default() -> Self {
81 Self::new()
82 }
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct PeerStatus {
88 pub validator_id: ValidatorId,
89 pub peer_id: String,
90}
91
92pub enum NetCommand {
94 Broadcast(Vec<u8>),
95 SendTo(ValidatorId, Vec<u8>),
96 AddPeer(ValidatorId, PeerId, Vec<Multiaddr>),
97 RemovePeer(ValidatorId),
98 SyncRequest(PeerId, Vec<u8>),
100 SyncRespond(RequestId, Vec<u8>),
102 EpochChange(Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>),
104}
105
106pub struct IncomingSyncRequest {
108 pub request_id: RequestId,
109 pub peer: PeerId,
110 pub request: SyncRequest,
111}
112
113pub struct NetworkServiceHandles {
115 pub service: NetworkService,
116 pub sink: Litep2pNetworkSink,
117 pub msg_rx: mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>,
118 pub sync_req_rx: mpsc::Receiver<IncomingSyncRequest>,
119 pub sync_resp_rx: mpsc::Receiver<SyncResponse>,
120 pub peer_info_rx: watch::Receiver<Vec<PeerStatus>>,
121 pub connected_count_rx: watch::Receiver<usize>,
122 pub notif_connected_count_rx: watch::Receiver<usize>,
126}
127
128pub struct NetworkService {
130 litep2p: Litep2p,
131 notif_handle: NotificationHandle,
132 reqresp_handle: RequestResponseHandle,
133 sync_handle: RequestResponseHandle,
134 pex_handle: RequestResponseHandle,
135 peer_map: PeerMap,
136 peer_book: Arc<RwLock<PeerBook>>,
137 pex_config: PexConfig,
138 persistent_peers: HashMap<ValidatorId, PeerId>,
139 initial_dial_addresses: Vec<Multiaddr>,
141 relay_consensus: bool,
143 validator_keys: HashMap<ValidatorId, hotmint_types::crypto::PublicKey>,
146 validator_ids_ordered: Vec<ValidatorId>,
149 chain_id_hash: [u8; 32],
151 current_epoch: EpochNumber,
153 msg_tx: mpsc::Sender<(Option<ValidatorId>, ConsensusMessage)>,
154 cmd_rx: mpsc::Receiver<NetCommand>,
155 sync_req_tx: mpsc::Sender<IncomingSyncRequest>,
156 sync_resp_tx: mpsc::Sender<SyncResponse>,
157 peer_info_tx: watch::Sender<Vec<PeerStatus>>,
158 connected_count_tx: watch::Sender<usize>,
159 notif_connected_peers: HashSet<PeerId>,
161 notif_connected_count_tx: watch::Sender<usize>,
162 connected_peers: HashSet<PeerId>,
163 seen_active: HashSet<u64>,
169 seen_backup: HashSet<u64>,
170 epoch_rx: watch::Receiver<EpochUpdate>,
172 pex_rate_limit: HashMap<PeerId, Instant>,
174}
175
176pub struct NetworkConfig {
178 pub listen_addr: Multiaddr,
179 pub peer_map: PeerMap,
180 pub known_addresses: Vec<(PeerId, Vec<Multiaddr>)>,
181 pub keypair: Option<litep2p::crypto::ed25519::Keypair>,
182 pub peer_book: Arc<RwLock<PeerBook>>,
183 pub pex_config: PexConfig,
184 pub relay_consensus: bool,
185 pub initial_validators: Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>,
186 pub chain_id_hash: [u8; 32],
187}
188
189impl NetworkService {
190 pub fn create(config: NetworkConfig) -> Result<NetworkServiceHandles> {
192 let NetworkConfig {
193 listen_addr,
194 peer_map,
195 known_addresses,
196 keypair,
197 peer_book,
198 pex_config,
199 relay_consensus,
200 initial_validators,
201 chain_id_hash,
202 } = config;
203 let (notif_config, notif_handle) = NotifConfigBuilder::new(NOTIF_PROTOCOL.into())
206 .with_max_size(MAX_NOTIFICATION_SIZE)
207 .with_handshake(chain_id_hash.to_vec())
208 .with_auto_accept_inbound(false)
209 .with_sync_channel_size(1024)
210 .with_async_channel_size(1024)
211 .build();
212
213 let (reqresp_config, reqresp_handle) = ReqRespConfigBuilder::new(REQ_RESP_PROTOCOL.into())
214 .with_max_size(MAX_NOTIFICATION_SIZE)
215 .build();
216
217 let (sync_config, sync_handle) = ReqRespConfigBuilder::new(SYNC_PROTOCOL.into())
218 .with_max_size(MAX_NOTIFICATION_SIZE)
219 .build();
220
221 let (pex_config_proto, pex_handle) = ReqRespConfigBuilder::new(PEX_PROTOCOL.into())
222 .with_max_size(1024 * 1024) .build();
224
225 let mut config_builder = ConfigBuilder::new()
226 .with_tcp(TcpConfig {
227 listen_addresses: vec![listen_addr],
228 ..Default::default()
229 })
230 .with_notification_protocol(notif_config)
231 .with_request_response_protocol(reqresp_config)
232 .with_request_response_protocol(sync_config)
233 .with_request_response_protocol(pex_config_proto);
234
235 if let Some(kp) = keypair {
236 config_builder = config_builder.with_keypair(kp);
237 }
238
239 let initial_dial_addresses: Vec<Multiaddr> = known_addresses
241 .iter()
242 .flat_map(|(pid, addrs)| {
243 addrs.iter().map(move |addr| {
244 let mut full = addr.clone();
245 full.push(litep2p::types::multiaddr::Protocol::P2p((*pid).into()));
246 full
247 })
248 })
249 .collect();
250
251 if !known_addresses.is_empty() {
252 config_builder = config_builder.with_known_addresses(known_addresses.into_iter());
253 }
254
255 let litep2p =
256 Litep2p::new(config_builder.build()).c(d!("failed to create litep2p instance"))?;
257
258 info!(peer_id = %litep2p.local_peer_id(), "litep2p started");
259 for addr in litep2p.listen_addresses() {
260 info!(address = %addr, "listening on");
261 }
262
263 let (msg_tx, msg_rx) = mpsc::channel(8192);
264 let (cmd_tx, cmd_rx) = mpsc::channel(4096);
265 let (sync_req_tx, sync_req_rx) = mpsc::channel(256);
266 let (sync_resp_tx, sync_resp_rx) = mpsc::channel(256);
267
268 let initial_peers: Vec<PeerStatus> = peer_map
270 .validator_to_peer
271 .iter()
272 .map(|(&vid, pid)| PeerStatus {
273 validator_id: vid,
274 peer_id: pid.to_string(),
275 })
276 .collect();
277 let (peer_info_tx, peer_info_rx) = watch::channel(initial_peers);
278
279 let (epoch_tx, epoch_rx) = watch::channel::<
280 Option<(
281 EpochNumber,
282 Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>,
283 )>,
284 >(None);
285
286 let sink = Litep2pNetworkSink {
287 cmd_tx: cmd_tx.clone(),
288 epoch_tx,
289 };
290
291 let (connected_count_tx, connected_count_rx) = watch::channel(0usize);
292 let (notif_connected_count_tx, notif_connected_count_rx) = watch::channel(0usize);
293
294 let persistent_peers: HashMap<ValidatorId, PeerId> = peer_map.validator_to_peer.clone();
296
297 let validator_ids_ordered: Vec<ValidatorId> =
298 initial_validators.iter().map(|(vid, _)| *vid).collect();
299 let validator_keys: HashMap<ValidatorId, hotmint_types::crypto::PublicKey> =
300 initial_validators.into_iter().collect();
301
302 Ok(NetworkServiceHandles {
303 service: Self {
304 litep2p,
305 notif_handle,
306 reqresp_handle,
307 sync_handle,
308 pex_handle,
309 peer_map,
310 peer_book,
311 pex_config,
312 persistent_peers,
313 initial_dial_addresses,
314 relay_consensus,
315 validator_keys,
316 validator_ids_ordered,
317 chain_id_hash,
318 current_epoch: EpochNumber::GENESIS,
319 msg_tx,
320 cmd_rx,
321 sync_req_tx,
322 sync_resp_tx,
323 peer_info_tx,
324 connected_count_tx,
325 notif_connected_peers: HashSet::new(),
326 notif_connected_count_tx,
327 connected_peers: HashSet::new(),
328 seen_active: HashSet::new(),
329 seen_backup: HashSet::new(),
330 epoch_rx,
331 pex_rate_limit: HashMap::new(),
332 },
333 sink,
334 msg_rx,
335 sync_req_rx,
336 sync_resp_rx,
337 peer_info_rx,
338 connected_count_rx,
339 notif_connected_count_rx,
340 })
341 }
342
343 pub fn local_peer_id(&self) -> &PeerId {
344 self.litep2p.local_peer_id()
345 }
346
347 pub async fn run(mut self) {
349 for addr in mem::take(&mut self.initial_dial_addresses) {
351 if let Err(e) = self.litep2p.dial_address(addr.clone()).await {
352 debug!(address = %addr, error = ?e, "initial dial failed (will retry)");
353 }
354 }
355
356 let mut maintenance_interval =
357 tokio::time::interval(tokio::time::Duration::from_secs(MAINTENANCE_INTERVAL_SECS));
358 let mut pex_interval = tokio::time::interval(tokio::time::Duration::from_secs(
359 self.pex_config.request_interval_secs,
360 ));
361 loop {
362 tokio::select! {
363 event = self.notif_handle.next() => {
364 if let Some(event) = event {
365 self.handle_notification_event(event).await;
366 }
367 }
368 event = self.reqresp_handle.next() => {
369 if let Some(event) = event {
370 self.handle_reqresp_event(event);
371 }
372 }
373 event = self.sync_handle.next() => {
374 if let Some(event) = event {
375 self.handle_sync_event(event).await;
376 }
377 }
378 event = self.pex_handle.next() => {
379 if let Some(event) = event {
380 self.handle_pex_event(event).await;
381 }
382 }
383 event = self.litep2p.next_event() => {
384 if let Some(event) = event {
385 self.handle_litep2p_event(event).await;
386 }
387 }
388 Some(cmd) = self.cmd_rx.recv() => {
389 self.handle_command(cmd).await;
390 }
391 Ok(()) = self.epoch_rx.changed() => {
392 let epoch_val = self.epoch_rx.borrow_and_update().clone();
393 if let Some((epoch_number, validators)) = epoch_val {
394 self.current_epoch = epoch_number;
395 self.handle_epoch_change(validators).await;
396 }
397 }
398 _ = maintenance_interval.tick() => {
399 self.run_maintenance().await;
400 }
401 _ = pex_interval.tick() => {
402 if self.pex_config.enabled {
403 self.run_pex_round().await;
404 }
405 }
406 }
407 }
408 }
409
410 async fn handle_notification_event(&mut self, event: NotificationEvent) {
411 match event {
412 NotificationEvent::ValidateSubstream {
413 peer, handshake, ..
414 } => {
415 if handshake.as_slice() == self.chain_id_hash.as_slice() {
418 self.notif_handle
419 .send_validation_result(peer, ValidationResult::Accept);
420 } else {
421 warn!(peer = %peer, "rejecting peer: chain_id_hash handshake mismatch");
422 self.notif_handle
423 .send_validation_result(peer, ValidationResult::Reject);
424 }
425 }
426 NotificationEvent::NotificationStreamOpened { peer, .. } => {
427 info!(peer = %peer, "notification stream opened");
428 self.notif_connected_peers.insert(peer);
429 let _ = self
430 .notif_connected_count_tx
431 .send(self.notif_connected_peers.len());
432 }
433 NotificationEvent::NotificationStreamClosed { peer } => {
434 debug!(peer = %peer, "notification stream closed");
435 self.notif_connected_peers.remove(&peer);
436 let _ = self
437 .notif_connected_count_tx
438 .send(self.notif_connected_peers.len());
439 }
440 NotificationEvent::NotificationReceived { peer, notification } => {
441 let sender: Option<ValidatorId> =
443 self.peer_map.peer_to_validator.get(&peer).copied();
444
445 match codec::decode::<ConsensusMessage>(¬ification) {
446 Ok(msg) => {
447 if sender.is_some()
449 && let Err(e) = self.msg_tx.try_send((sender, msg.clone()))
450 {
451 warn!("consensus message dropped (notification): {e}");
452 }
453
454 if self.relay_consensus
459 && let Some(sid) = sender
460 && hotmint_consensus::engine::verify_relay_sender(
461 sid,
462 &msg,
463 &self.validator_keys,
464 &self.validator_ids_ordered,
465 &self.chain_id_hash,
466 self.current_epoch,
467 )
468 {
469 let msg_hash = u64::from_le_bytes(
470 blake3::hash(¬ification).as_bytes()[..8]
471 .try_into()
472 .unwrap(),
473 );
474
475 if !self.seen_active.contains(&msg_hash)
477 && !self.seen_backup.contains(&msg_hash)
478 {
479 self.seen_active.insert(msg_hash);
480 let raw = notification.to_vec();
481 for &other in &self.connected_peers {
482 if other != peer {
483 let _ = self
484 .notif_handle
485 .send_sync_notification(other, raw.clone());
486 }
487 }
488 if self.seen_active.len() > 10_000 {
490 self.seen_backup = mem::take(&mut self.seen_active);
491 }
492 }
493 self.peer_book
495 .write()
496 .await
497 .adjust_score(&peer.to_string(), 1);
498 }
499 }
500 Err(e) => {
501 warn!(error = %e, peer = %peer, "failed to decode notification");
502 self.peer_book
503 .write()
504 .await
505 .adjust_score(&peer.to_string(), -10);
506 }
507 }
508 }
509 NotificationEvent::NotificationStreamOpenFailure { peer, error } => {
510 warn!(peer = %peer, error = ?error, "notification stream open failed");
511 }
512 }
513 }
514
515 fn handle_reqresp_event(&mut self, event: RequestResponseEvent) {
516 match event {
517 RequestResponseEvent::RequestReceived {
518 peer,
519 request_id,
520 request,
521 ..
522 } => {
523 let Some(sender) = self.peer_map.peer_to_validator.get(&peer).copied() else {
524 warn!(peer = %peer, "dropping request from unknown peer");
525 self.reqresp_handle.reject_request(request_id);
526 return;
527 };
528 match codec::decode::<ConsensusMessage>(&request) {
529 Ok(msg) => {
530 if let Err(e) = self.msg_tx.try_send((Some(sender), msg)) {
531 warn!("consensus message dropped (reqresp): {e}");
532 }
533 self.reqresp_handle.send_response(request_id, vec![]);
534 }
535 Err(e) => {
536 warn!(error = %e, "failed to decode request");
537 self.reqresp_handle.reject_request(request_id);
538 }
539 }
540 }
541 RequestResponseEvent::ResponseReceived { .. } => {}
542 RequestResponseEvent::RequestFailed { peer, error, .. } => {
543 debug!(peer = %peer, error = ?error, "request failed");
544 }
545 }
546 }
547
548 async fn handle_sync_event(&mut self, event: RequestResponseEvent) {
549 match event {
550 RequestResponseEvent::RequestReceived {
551 peer,
552 request_id,
553 request,
554 ..
555 } => {
556 if !self.notif_connected_peers.contains(&peer) {
559 warn!(peer = %peer, "rejecting sync request: no notification stream (chain isolation)");
560 self.sync_handle.reject_request(request_id);
561 return;
562 }
563 match codec::decode::<SyncRequest>(&request) {
564 Ok(req) => {
565 if let Err(e) = self.sync_req_tx.try_send(IncomingSyncRequest {
566 request_id,
567 peer,
568 request: req,
569 }) {
570 warn!("sync request dropped: {e}");
571 }
572 }
573 Err(e) => {
574 warn!(error = %e, peer = %peer, "failed to decode sync request");
575 self.peer_book
576 .write()
577 .await
578 .adjust_score(&peer.to_string(), -5);
579 let err_resp = SyncResponse::Error(format!("decode error: {e}"));
580 if let Ok(bytes) = codec::encode(&err_resp) {
581 self.sync_handle.send_response(request_id, bytes);
582 } else {
583 self.sync_handle.reject_request(request_id);
584 }
585 }
586 }
587 }
588 RequestResponseEvent::ResponseReceived {
589 request_id: _,
590 response,
591 ..
592 } => {
593 match codec::decode::<SyncResponse>(&response) {
595 Ok(resp) => {
596 if let Err(e) = self.sync_resp_tx.try_send(resp) {
597 warn!("sync response dropped: {e}");
598 }
599 }
600 Err(e) => {
601 warn!(error = %e, "failed to decode sync response");
602 }
603 }
604 }
605 RequestResponseEvent::RequestFailed { peer, error, .. } => {
606 debug!(peer = %peer, error = ?error, "sync request failed");
607 if let Err(e) = self
608 .sync_resp_tx
609 .try_send(SyncResponse::Error(format!("request failed: {error:?}")))
610 {
611 warn!("sync error response dropped: {e}");
612 }
613 }
614 }
615 }
616
617 async fn handle_pex_event(&mut self, event: RequestResponseEvent) {
618 match event {
619 RequestResponseEvent::RequestReceived {
620 peer,
621 request_id,
622 request,
623 ..
624 } => {
625 if !self.peer_map.peer_to_validator.contains_key(&peer)
627 && !self.connected_peers.contains(&peer)
628 {
629 warn!(peer = %peer, "rejecting PEX request from unknown peer");
630 self.pex_handle.reject_request(request_id);
631 return;
632 }
633 let now = Instant::now();
635 if let Some(last) = self.pex_rate_limit.get(&peer)
636 && now.duration_since(*last) < std::time::Duration::from_secs(10)
637 {
638 self.pex_handle.reject_request(request_id);
639 return;
640 }
641 self.pex_rate_limit.insert(peer, now);
642 match postcard::from_bytes::<PexRequest>(&request) {
643 Ok(PexRequest::GetPeers) => {
644 let book = self.peer_book.read().await;
645 let private = &self.pex_config.private_peer_ids;
646 let peers: Vec<PeerInfo> = book
647 .get_random_peers(self.pex_config.max_peers_per_response)
648 .into_iter()
649 .filter(|p| p.peer_id != peer.to_string())
650 .filter(|p| !private.contains(&p.peer_id))
652 .cloned()
653 .collect();
654 let resp = PexResponse::Peers(peers);
655 if let Ok(bytes) = postcard::to_allocvec(&resp) {
656 self.pex_handle.send_response(request_id, bytes);
657 }
658 }
659 Ok(PexRequest::Advertise {
660 role,
661 validator_id,
662 addresses,
663 }) => {
664 if role == PeerRole::Validator && validator_id.is_none() {
666 warn!(peer = %peer, "PEX Advertise claims validator role without validator_id");
667 self.pex_handle.reject_request(request_id);
668 return;
669 }
670 if let Some(vid) = validator_id
672 && let Some(&expected_peer) =
673 self.peer_map.validator_to_peer.get(&ValidatorId(vid))
674 && expected_peer != peer
675 {
676 warn!(
677 peer = %peer,
678 claimed_vid = vid,
679 "PEX Advertise validator_id mismatch, rejecting"
680 );
681 self.pex_handle.reject_request(request_id);
682 return;
683 }
684 let addresses: Vec<_> = addresses.iter().take(8).cloned().collect();
686 let mut info = PeerInfo::new(
687 peer,
688 role,
689 addresses.iter().filter_map(|a| a.parse().ok()).collect(),
690 );
691 if let Some(vid) = validator_id {
692 info = info.with_validator(ValidatorId(vid));
693 }
694 self.peer_book.write().await.add_peer(info);
695 if let Ok(bytes) = postcard::to_allocvec(&PexResponse::Ack) {
696 self.pex_handle.send_response(request_id, bytes);
697 }
698 }
699 Err(e) => {
700 warn!(error = %e, "failed to decode PEX request");
701 self.pex_handle.reject_request(request_id);
702 }
703 }
704 }
705 RequestResponseEvent::ResponseReceived { response, .. } => {
706 if let Ok(PexResponse::Peers(peers)) = postcard::from_bytes(&response) {
707 let mut book = self.peer_book.write().await;
708 for peer in peers {
709 if !peer.is_banned() {
710 book.add_peer(peer);
711 }
712 }
713 }
714 }
715 other => {
717 trace!("unhandled PEX event: {other:?}");
718 }
719 }
720 }
721
722 async fn run_maintenance(&mut self) {
724 let to_dial: Vec<(PeerId, Vec<Multiaddr>)> = {
727 let book = self.peer_book.read().await;
728 self.persistent_peers
729 .values()
730 .filter(|pid| !self.connected_peers.contains(pid))
731 .filter_map(|&pid| {
732 book.get(&pid.to_string()).map(|info| {
733 let addrs: Vec<Multiaddr> = info
734 .addresses
735 .iter()
736 .filter_map(|a| a.parse().ok())
737 .collect();
738 (pid, addrs)
739 })
740 })
741 .collect()
742 };
743
744 for (pid, addrs) in to_dial {
745 if !addrs.is_empty() {
746 self.litep2p
747 .add_known_address(pid, addrs.clone().into_iter());
748 let mut dial_addr = addrs[0].clone();
750 dial_addr.push(litep2p::types::multiaddr::Protocol::P2p(pid.into()));
751 if let Err(e) = self.litep2p.dial_address(dial_addr).await {
752 debug!(peer = %pid, error = ?e, "persistent peer redial failed");
753 }
754 }
755 }
756
757 let max = self.pex_config.max_peers;
759 if self.connected_peers.len() < max * 4 / 5 {
760 let book = self.peer_book.read().await;
761 let candidates = book.get_random_peers(5);
762 for peer in candidates {
763 if let Ok(pid) = peer.peer_id.parse::<PeerId>()
764 && !self.connected_peers.contains(&pid)
765 {
766 let addrs: Vec<Multiaddr> = peer
767 .addresses
768 .iter()
769 .filter_map(|a| a.parse().ok())
770 .collect();
771 if !addrs.is_empty() {
772 self.litep2p.add_known_address(pid, addrs.into_iter());
773 }
774 }
775 }
776 }
777
778 self.peer_book.write().await.prune_stale(86400);
780 if let Err(e) = self.peer_book.read().await.save() {
781 warn!(%e, "failed to save peer book");
782 }
783 }
784
785 async fn run_pex_round(&mut self) {
787 if self.connected_peers.is_empty() {
788 return;
789 }
790 let peers: Vec<PeerId> = self.connected_peers.iter().copied().collect();
792 let idx = rand::random::<usize>() % peers.len();
793 let target = peers[idx];
794
795 if let Ok(bytes) = postcard::to_allocvec(&PexRequest::GetPeers) {
796 let _ = self
797 .pex_handle
798 .send_request(target, bytes, DialOptions::Reject)
799 .await;
800 }
801 }
802
803 async fn handle_litep2p_event(&mut self, event: Litep2pEvent) {
804 match event {
805 Litep2pEvent::ConnectionEstablished { peer, endpoint } => {
806 let is_validator = self.peer_map.peer_to_validator.contains_key(&peer);
810 if !is_validator && self.connected_peers.len() >= self.pex_config.max_peers {
811 warn!(
812 peer = %peer,
813 total = self.connected_peers.len(),
814 max = self.pex_config.max_peers,
815 "connection limit reached, ignoring non-validator peer"
816 );
817 return;
818 }
819
820 info!(peer = %peer, endpoint = ?endpoint, "connection established");
821 self.connected_peers.insert(peer);
822 let _ = self.connected_count_tx.send(self.connected_peers.len());
823
824 if let Err(e) = self.notif_handle.try_open_substream_batch(iter::once(peer)) {
826 debug!(peer = %peer, error = ?e, "failed to open notification substream");
827 }
828
829 if let Some(info) = self.peer_book.write().await.get_mut(&peer.to_string()) {
831 info.touch();
832 }
833 }
834 Litep2pEvent::ConnectionClosed { peer, .. } => {
835 debug!(peer = %peer, "connection closed");
836 self.connected_peers.remove(&peer);
837 let _ = self.connected_count_tx.send(self.connected_peers.len());
838 if self.notif_connected_peers.remove(&peer) {
841 let _ = self
842 .notif_connected_count_tx
843 .send(self.notif_connected_peers.len());
844 }
845 }
846 Litep2pEvent::DialFailure { address, error, .. } => {
847 warn!(address = %address, error = ?error, "dial failed");
848 }
849 other => {
851 trace!(?other, "unhandled litep2p event");
852 }
853 }
854 }
855
856 fn update_peer_info(&self) {
857 let peers: Vec<PeerStatus> = self
858 .peer_map
859 .validator_to_peer
860 .iter()
861 .map(|(&vid, pid)| PeerStatus {
862 validator_id: vid,
863 peer_id: pid.to_string(),
864 })
865 .collect();
866 let _ = self.peer_info_tx.send(peers);
867 }
868
869 async fn handle_command(&mut self, cmd: NetCommand) {
870 match cmd {
871 NetCommand::Broadcast(bytes) => {
872 for &peer in &self.connected_peers {
874 let _ = self
875 .notif_handle
876 .send_sync_notification(peer, bytes.clone());
877 }
878 }
879 NetCommand::SendTo(target, bytes) => {
880 if let Some(&peer_id) = self.peer_map.validator_to_peer.get(&target) {
881 let _ = self
882 .reqresp_handle
883 .send_request(peer_id, bytes, DialOptions::Reject)
884 .await;
885 }
886 }
887 NetCommand::AddPeer(vid, pid, addrs) => {
888 info!(validator = %vid, peer = %pid, "adding peer");
889 self.peer_map.insert(vid, pid);
890 self.litep2p.add_known_address(pid, addrs.into_iter());
891 self.update_peer_info();
892 }
893 NetCommand::RemovePeer(vid) => {
894 if let Some(pid) = self.peer_map.remove(vid) {
895 info!(validator = %vid, peer = %pid, "removed peer");
896 } else {
897 warn!(validator = %vid, "peer not found for removal");
898 }
899 self.update_peer_info();
900 }
901 NetCommand::SyncRequest(peer_id, bytes) => {
902 let _ = self
903 .sync_handle
904 .send_request(peer_id, bytes, DialOptions::Reject)
905 .await;
906 }
907 NetCommand::SyncRespond(request_id, bytes) => {
908 self.sync_handle.send_response(request_id, bytes);
909 }
910 NetCommand::EpochChange(validators) => {
911 self.handle_epoch_change(validators).await;
912 }
913 }
914 }
915
916 async fn handle_epoch_change(
917 &mut self,
918 validators: Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>,
919 ) {
920 for (vid, pubkey) in &validators {
922 if self.peer_map.validator_to_peer.contains_key(vid) {
923 continue;
924 }
925 let pk_bytes = &pubkey.0;
927 if let Ok(lpk) = litep2p::crypto::ed25519::PublicKey::try_from_bytes(pk_bytes) {
928 let peer_id = lpk.to_peer_id();
929 info!(validator = %vid, peer = %peer_id, "adding new epoch validator to peer_map");
930 self.peer_map.insert(*vid, peer_id);
931 }
932 }
933 let new_ids: HashSet<ValidatorId> = validators.iter().map(|(vid, _)| *vid).collect();
935 let to_remove: Vec<ValidatorId> = self
936 .peer_map
937 .validator_to_peer
938 .keys()
939 .filter(|vid| !new_ids.contains(vid))
940 .copied()
941 .collect();
942 for vid in to_remove {
943 info!(validator = %vid, "removing validator from peer_map after epoch change");
944 self.peer_map.remove(vid);
945 }
946 self.validator_ids_ordered = validators.iter().map(|(vid, _)| *vid).collect();
948 self.validator_keys = validators.into_iter().collect();
949 self.persistent_peers = self.peer_map.validator_to_peer.clone();
951 self.update_peer_info();
952 }
953}
954
955#[derive(Clone)]
958pub struct Litep2pNetworkSink {
959 cmd_tx: mpsc::Sender<NetCommand>,
960 epoch_tx: watch::Sender<EpochUpdate>,
961}
962
963impl Litep2pNetworkSink {
964 pub fn add_peer(&self, vid: ValidatorId, pid: PeerId, addrs: Vec<Multiaddr>) {
965 if let Err(e) = self.cmd_tx.try_send(NetCommand::AddPeer(vid, pid, addrs)) {
966 warn!("add_peer cmd dropped: {e}");
967 }
968 }
969
970 pub fn remove_peer(&self, vid: ValidatorId) {
971 if let Err(e) = self.cmd_tx.try_send(NetCommand::RemovePeer(vid)) {
972 warn!("remove_peer cmd dropped: {e}");
973 }
974 }
975
976 pub fn send_sync_request(&self, peer_id: PeerId, request: &SyncRequest) {
977 match codec::encode(request) {
978 Ok(bytes) => {
979 if let Err(e) = self
980 .cmd_tx
981 .try_send(NetCommand::SyncRequest(peer_id, bytes))
982 {
983 warn!("sync request cmd dropped: {e}");
984 }
985 }
986 Err(e) => warn!("sync request encode failed: {e}"),
987 }
988 }
989
990 pub fn send_sync_response(&self, request_id: RequestId, response: &SyncResponse) {
991 match codec::encode(response) {
992 Ok(bytes) => {
993 if let Err(e) = self
994 .cmd_tx
995 .try_send(NetCommand::SyncRespond(request_id, bytes))
996 {
997 warn!("sync response cmd dropped: {e}");
998 }
999 }
1000 Err(e) => warn!("sync response encode failed: {e}"),
1001 }
1002 }
1003}
1004
1005impl NetworkSink for Litep2pNetworkSink {
1006 fn broadcast(&self, msg: ConsensusMessage) {
1007 match codec::encode(&msg) {
1008 Ok(bytes) => {
1009 if let Err(e) = self.cmd_tx.try_send(NetCommand::Broadcast(bytes)) {
1010 warn!("broadcast cmd dropped: {e}");
1011 }
1012 }
1013 Err(e) => warn!("broadcast encode failed: {e}"),
1014 }
1015 }
1016
1017 fn send_to(&self, target: ValidatorId, msg: ConsensusMessage) {
1018 match codec::encode(&msg) {
1019 Ok(bytes) => {
1020 if let Err(e) = self.cmd_tx.try_send(NetCommand::SendTo(target, bytes)) {
1021 warn!("send_to cmd dropped for {target}: {e}");
1022 }
1023 }
1024 Err(e) => warn!("send_to encode failed for {target}: {e}"),
1025 }
1026 }
1027
1028 fn on_epoch_change(&self, epoch: EpochNumber, new_validator_set: &hotmint_types::ValidatorSet) {
1029 let validators: Vec<_> = new_validator_set
1030 .validators()
1031 .iter()
1032 .map(|v| (v.id, v.public_key.clone()))
1033 .collect();
1034 let _ = self.epoch_tx.send(Some((epoch, validators)));
1036 }
1037
1038 fn broadcast_evidence(&self, proof: &hotmint_types::evidence::EquivocationProof) {
1039 let msg = ConsensusMessage::Evidence(proof.clone());
1040 match codec::encode(&msg) {
1041 Ok(bytes) => {
1042 if let Err(e) = self.cmd_tx.try_send(NetCommand::Broadcast(bytes)) {
1043 warn!("broadcast_evidence cmd dropped: {e}");
1044 }
1045 }
1046 Err(e) => warn!("broadcast_evidence encode failed: {e}"),
1047 }
1048 }
1049}