1use ruc::*;
2
3use std::collections::{HashMap, HashSet};
4use std::iter;
5use std::mem;
6use std::time::Instant;
7
8use futures::StreamExt;
9use hotmint_consensus::network::NetworkSink;
10use hotmint_types::epoch::EpochNumber;
11use hotmint_types::sync::{SyncRequest, SyncResponse};
12use hotmint_types::{ConsensusMessage, ValidatorId};
13use litep2p::config::ConfigBuilder;
14use litep2p::protocol::notification::{
15 ConfigBuilder as NotifConfigBuilder, NotificationEvent, NotificationHandle, ValidationResult,
16};
17use litep2p::protocol::request_response::{
18 ConfigBuilder as ReqRespConfigBuilder, DialOptions, RequestResponseEvent, RequestResponseHandle,
19};
20use litep2p::transport::tcp::config::Config as TcpConfig;
21use litep2p::types::RequestId;
22use litep2p::types::multiaddr::Multiaddr;
23use litep2p::{Litep2p, Litep2pEvent, PeerId};
24use serde::{Deserialize, Serialize};
25use tokio::sync::{mpsc, watch};
26use tracing::{debug, info, trace, warn};
27
28type EpochUpdate = Option<(
30 EpochNumber,
31 Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>,
32)>;
33
34use std::sync::Arc;
35
36use tokio::sync::RwLock;
37
38use crate::codec;
39use crate::peer::{PeerBook, PeerInfo, PeerRole};
40use crate::pex::{PexConfig, PexRequest, PexResponse};
41
42const NOTIF_PROTOCOL: &str = "/hotmint/consensus/notif/1";
43const MEMPOOL_NOTIF_PROTOCOL: &str = "/hotmint/mempool/notif/1";
44const REQ_RESP_PROTOCOL: &str = "/hotmint/consensus/reqresp/1";
45const SYNC_PROTOCOL: &str = "/hotmint/sync/1";
46const PEX_PROTOCOL: &str = "/hotmint/pex/1";
47const MAX_NOTIFICATION_SIZE: usize = 16 * 1024 * 1024;
48const MAX_MEMPOOL_NOTIF_SIZE: usize = 512 * 1024;
50const MAINTENANCE_INTERVAL_SECS: u64 = 10;
51
52#[derive(Clone)]
54pub struct PeerMap {
55 pub validator_to_peer: HashMap<ValidatorId, PeerId>,
56 pub peer_to_validator: HashMap<PeerId, ValidatorId>,
57}
58
59impl PeerMap {
60 pub fn new() -> Self {
61 Self {
62 validator_to_peer: HashMap::new(),
63 peer_to_validator: HashMap::new(),
64 }
65 }
66
67 pub fn insert(&mut self, vid: ValidatorId, pid: PeerId) {
68 self.validator_to_peer.insert(vid, pid);
69 self.peer_to_validator.insert(pid, vid);
70 }
71
72 pub fn remove(&mut self, vid: ValidatorId) -> Option<PeerId> {
73 if let Some(pid) = self.validator_to_peer.remove(&vid) {
74 self.peer_to_validator.remove(&pid);
75 Some(pid)
76 } else {
77 None
78 }
79 }
80}
81
82impl Default for PeerMap {
83 fn default() -> Self {
84 Self::new()
85 }
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct PeerStatus {
91 pub validator_id: ValidatorId,
92 pub peer_id: String,
93}
94
95pub enum NetCommand {
97 Broadcast(Vec<u8>),
98 SendTo(ValidatorId, Vec<u8>),
99 AddPeer(ValidatorId, PeerId, Vec<Multiaddr>),
100 RemovePeer(ValidatorId),
101 SyncRequest(PeerId, Vec<u8>),
103 SyncRespond(RequestId, Vec<u8>),
105 EpochChange(Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>),
107 BroadcastTx(Vec<u8>),
109}
110
111pub struct IncomingSyncRequest {
113 pub request_id: RequestId,
114 pub peer: PeerId,
115 pub request: SyncRequest,
116}
117
118pub struct NetworkServiceHandles {
120 pub service: NetworkService,
121 pub sink: Litep2pNetworkSink,
122 pub msg_rx: mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>,
123 pub sync_req_rx: mpsc::Receiver<IncomingSyncRequest>,
124 pub sync_resp_rx: mpsc::Receiver<SyncResponse>,
125 pub peer_info_rx: watch::Receiver<Vec<PeerStatus>>,
126 pub connected_count_rx: watch::Receiver<usize>,
127 pub notif_connected_count_rx: watch::Receiver<usize>,
131 pub mempool_tx_rx: mpsc::Receiver<Vec<u8>>,
133}
134
135pub struct NetworkService {
137 litep2p: Litep2p,
138 notif_handle: NotificationHandle,
139 mempool_notif_handle: NotificationHandle,
140 reqresp_handle: RequestResponseHandle,
141 sync_handle: RequestResponseHandle,
142 pex_handle: RequestResponseHandle,
143 peer_map: PeerMap,
144 peer_book: Arc<RwLock<PeerBook>>,
145 pex_config: PexConfig,
146 persistent_peers: HashMap<ValidatorId, PeerId>,
147 initial_dial_addresses: Vec<Multiaddr>,
149 relay_consensus: bool,
151 validator_keys: HashMap<ValidatorId, hotmint_types::crypto::PublicKey>,
154 validator_ids_ordered: Vec<ValidatorId>,
157 chain_id_hash: [u8; 32],
159 current_epoch: EpochNumber,
161 msg_tx: mpsc::Sender<(Option<ValidatorId>, ConsensusMessage)>,
162 cmd_rx: mpsc::Receiver<NetCommand>,
163 sync_req_tx: mpsc::Sender<IncomingSyncRequest>,
164 sync_resp_tx: mpsc::Sender<SyncResponse>,
165 peer_info_tx: watch::Sender<Vec<PeerStatus>>,
166 connected_count_tx: watch::Sender<usize>,
167 notif_connected_peers: HashSet<PeerId>,
169 notif_connected_count_tx: watch::Sender<usize>,
170 connected_peers: HashSet<PeerId>,
171 seen_active: HashSet<u64>,
177 seen_backup: HashSet<u64>,
178 epoch_rx: watch::Receiver<EpochUpdate>,
180 pex_rate_limit: HashMap<PeerId, Instant>,
182 mempool_tx_tx: mpsc::Sender<Vec<u8>>,
184 mempool_seen_active: HashSet<[u8; 32]>,
186 mempool_seen_backup: HashSet<[u8; 32]>,
187 mempool_notif_connected_peers: HashSet<PeerId>,
189 mempool_peer_rate: HashMap<PeerId, (Instant, u32)>,
191}
192
193pub struct NetworkConfig {
195 pub listen_addr: Multiaddr,
196 pub peer_map: PeerMap,
197 pub known_addresses: Vec<(PeerId, Vec<Multiaddr>)>,
198 pub keypair: Option<litep2p::crypto::ed25519::Keypair>,
199 pub peer_book: Arc<RwLock<PeerBook>>,
200 pub pex_config: PexConfig,
201 pub relay_consensus: bool,
202 pub initial_validators: Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>,
203 pub chain_id_hash: [u8; 32],
204}
205
206impl NetworkService {
207 pub fn create(config: NetworkConfig) -> Result<NetworkServiceHandles> {
209 let NetworkConfig {
210 listen_addr,
211 peer_map,
212 known_addresses,
213 keypair,
214 peer_book,
215 pex_config,
216 relay_consensus,
217 initial_validators,
218 chain_id_hash,
219 } = config;
220 let (notif_config, notif_handle) = NotifConfigBuilder::new(NOTIF_PROTOCOL.into())
223 .with_max_size(MAX_NOTIFICATION_SIZE)
224 .with_handshake(chain_id_hash.to_vec())
225 .with_auto_accept_inbound(false)
226 .with_sync_channel_size(1024)
227 .with_async_channel_size(1024)
228 .build();
229
230 let (reqresp_config, reqresp_handle) = ReqRespConfigBuilder::new(REQ_RESP_PROTOCOL.into())
231 .with_max_size(MAX_NOTIFICATION_SIZE)
232 .build();
233
234 let (sync_config, sync_handle) = ReqRespConfigBuilder::new(SYNC_PROTOCOL.into())
235 .with_max_size(MAX_NOTIFICATION_SIZE)
236 .build();
237
238 let (pex_config_proto, pex_handle) = ReqRespConfigBuilder::new(PEX_PROTOCOL.into())
239 .with_max_size(1024 * 1024) .build();
241
242 let (mempool_notif_config, mempool_notif_handle) =
243 NotifConfigBuilder::new(MEMPOOL_NOTIF_PROTOCOL.into())
244 .with_max_size(MAX_MEMPOOL_NOTIF_SIZE)
245 .with_handshake(chain_id_hash.to_vec())
246 .with_auto_accept_inbound(true)
247 .with_sync_channel_size(2048)
248 .with_async_channel_size(2048)
249 .build();
250
251 let mut config_builder = ConfigBuilder::new()
252 .with_tcp(TcpConfig {
253 listen_addresses: vec![listen_addr],
254 ..Default::default()
255 })
256 .with_notification_protocol(notif_config)
257 .with_notification_protocol(mempool_notif_config)
258 .with_request_response_protocol(reqresp_config)
259 .with_request_response_protocol(sync_config)
260 .with_request_response_protocol(pex_config_proto);
261
262 if let Some(kp) = keypair {
263 config_builder = config_builder.with_keypair(kp);
264 }
265
266 let initial_dial_addresses: Vec<Multiaddr> = known_addresses
268 .iter()
269 .flat_map(|(pid, addrs)| {
270 addrs.iter().map(move |addr| {
271 let mut full = addr.clone();
272 full.push(litep2p::types::multiaddr::Protocol::P2p((*pid).into()));
273 full
274 })
275 })
276 .collect();
277
278 if !known_addresses.is_empty() {
279 config_builder = config_builder.with_known_addresses(known_addresses.into_iter());
280 }
281
282 let litep2p =
283 Litep2p::new(config_builder.build()).c(d!("failed to create litep2p instance"))?;
284
285 info!(peer_id = %litep2p.local_peer_id(), "litep2p started");
286 for addr in litep2p.listen_addresses() {
287 info!(address = %addr, "listening on");
288 }
289
290 let (msg_tx, msg_rx) = mpsc::channel(8192);
291 let (cmd_tx, cmd_rx) = mpsc::channel(4096);
292 let (sync_req_tx, sync_req_rx) = mpsc::channel(256);
293 let (sync_resp_tx, sync_resp_rx) = mpsc::channel(256);
294 let (mempool_tx_tx, mempool_tx_rx) = mpsc::channel(4096);
295
296 let initial_peers: Vec<PeerStatus> = peer_map
298 .validator_to_peer
299 .iter()
300 .map(|(&vid, pid)| PeerStatus {
301 validator_id: vid,
302 peer_id: pid.to_string(),
303 })
304 .collect();
305 let (peer_info_tx, peer_info_rx) = watch::channel(initial_peers);
306
307 let (epoch_tx, epoch_rx) = watch::channel::<
308 Option<(
309 EpochNumber,
310 Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>,
311 )>,
312 >(None);
313
314 let sink = Litep2pNetworkSink {
315 cmd_tx: cmd_tx.clone(),
316 epoch_tx,
317 };
318
319 let (connected_count_tx, connected_count_rx) = watch::channel(0usize);
320 let (notif_connected_count_tx, notif_connected_count_rx) = watch::channel(0usize);
321
322 let persistent_peers: HashMap<ValidatorId, PeerId> = peer_map.validator_to_peer.clone();
324
325 let validator_ids_ordered: Vec<ValidatorId> =
326 initial_validators.iter().map(|(vid, _)| *vid).collect();
327 let validator_keys: HashMap<ValidatorId, hotmint_types::crypto::PublicKey> =
328 initial_validators.into_iter().collect();
329
330 Ok(NetworkServiceHandles {
331 service: Self {
332 litep2p,
333 notif_handle,
334 mempool_notif_handle,
335 reqresp_handle,
336 sync_handle,
337 pex_handle,
338 peer_map,
339 peer_book,
340 pex_config,
341 persistent_peers,
342 initial_dial_addresses,
343 relay_consensus,
344 validator_keys,
345 validator_ids_ordered,
346 chain_id_hash,
347 current_epoch: EpochNumber::GENESIS,
348 msg_tx,
349 cmd_rx,
350 sync_req_tx,
351 sync_resp_tx,
352 peer_info_tx,
353 connected_count_tx,
354 notif_connected_peers: HashSet::new(),
355 notif_connected_count_tx,
356 connected_peers: HashSet::new(),
357 seen_active: HashSet::new(),
358 seen_backup: HashSet::new(),
359 epoch_rx,
360 pex_rate_limit: HashMap::new(),
361 mempool_tx_tx,
362 mempool_seen_active: HashSet::new(),
363 mempool_seen_backup: HashSet::new(),
364 mempool_notif_connected_peers: HashSet::new(),
365 mempool_peer_rate: HashMap::new(),
366 },
367 sink,
368 msg_rx,
369 sync_req_rx,
370 sync_resp_rx,
371 peer_info_rx,
372 connected_count_rx,
373 notif_connected_count_rx,
374 mempool_tx_rx,
375 })
376 }
377
378 pub fn local_peer_id(&self) -> &PeerId {
379 self.litep2p.local_peer_id()
380 }
381
382 pub async fn run(mut self) {
384 for addr in mem::take(&mut self.initial_dial_addresses) {
386 if let Err(e) = self.litep2p.dial_address(addr.clone()).await {
387 debug!(address = %addr, error = ?e, "initial dial failed (will retry)");
388 }
389 }
390
391 let mut maintenance_interval =
392 tokio::time::interval(tokio::time::Duration::from_secs(MAINTENANCE_INTERVAL_SECS));
393 let mut pex_interval = tokio::time::interval(tokio::time::Duration::from_secs(
394 self.pex_config.request_interval_secs,
395 ));
396 loop {
397 tokio::select! {
398 event = self.notif_handle.next() => {
399 if let Some(event) = event {
400 self.handle_notification_event(event).await;
401 }
402 }
403 event = self.mempool_notif_handle.next() => {
404 if let Some(event) = event {
405 self.handle_mempool_notification_event(event).await;
406 }
407 }
408 event = self.reqresp_handle.next() => {
409 if let Some(event) = event {
410 self.handle_reqresp_event(event);
411 }
412 }
413 event = self.sync_handle.next() => {
414 if let Some(event) = event {
415 self.handle_sync_event(event).await;
416 }
417 }
418 event = self.pex_handle.next() => {
419 if let Some(event) = event {
420 self.handle_pex_event(event).await;
421 }
422 }
423 event = self.litep2p.next_event() => {
424 if let Some(event) = event {
425 self.handle_litep2p_event(event).await;
426 }
427 }
428 Some(cmd) = self.cmd_rx.recv() => {
429 self.handle_command(cmd).await;
430 }
431 Ok(()) = self.epoch_rx.changed() => {
432 let epoch_val = self.epoch_rx.borrow_and_update().clone();
433 if let Some((epoch_number, validators)) = epoch_val {
434 self.current_epoch = epoch_number;
435 self.handle_epoch_change(validators).await;
436 }
437 }
438 _ = maintenance_interval.tick() => {
439 self.run_maintenance().await;
440 }
441 _ = pex_interval.tick() => {
442 if self.pex_config.enabled {
443 self.run_pex_round().await;
444 }
445 }
446 }
447 }
448 }
449
450 async fn handle_notification_event(&mut self, event: NotificationEvent) {
451 match event {
452 NotificationEvent::ValidateSubstream {
453 peer, handshake, ..
454 } => {
455 if handshake.as_slice() == self.chain_id_hash.as_slice() {
458 self.notif_handle
459 .send_validation_result(peer, ValidationResult::Accept);
460 } else {
461 warn!(peer = %peer, "rejecting peer: chain_id_hash handshake mismatch");
462 self.notif_handle
463 .send_validation_result(peer, ValidationResult::Reject);
464 }
465 }
466 NotificationEvent::NotificationStreamOpened { peer, .. } => {
467 info!(peer = %peer, "notification stream opened");
468 self.notif_connected_peers.insert(peer);
469 let _ = self
470 .notif_connected_count_tx
471 .send(self.notif_connected_peers.len());
472 }
473 NotificationEvent::NotificationStreamClosed { peer } => {
474 debug!(peer = %peer, "notification stream closed");
475 self.notif_connected_peers.remove(&peer);
476 let _ = self
477 .notif_connected_count_tx
478 .send(self.notif_connected_peers.len());
479 }
480 NotificationEvent::NotificationReceived { peer, notification } => {
481 let sender: Option<ValidatorId> =
483 self.peer_map.peer_to_validator.get(&peer).copied();
484
485 match codec::decode::<ConsensusMessage>(¬ification) {
486 Ok(msg) => {
487 if sender.is_some()
489 && let Err(e) = self.msg_tx.try_send((sender, msg.clone()))
490 {
491 warn!("consensus message dropped (notification): {e}");
492 }
493
494 if self.relay_consensus
499 && let Some(sid) = sender
500 && hotmint_consensus::engine::verify_relay_sender(
501 sid,
502 &msg,
503 &self.validator_keys,
504 &self.validator_ids_ordered,
505 &self.chain_id_hash,
506 self.current_epoch,
507 )
508 {
509 let msg_hash = u64::from_le_bytes(
510 blake3::hash(¬ification).as_bytes()[..8]
511 .try_into()
512 .unwrap(),
513 );
514
515 if !self.seen_active.contains(&msg_hash)
517 && !self.seen_backup.contains(&msg_hash)
518 {
519 self.seen_active.insert(msg_hash);
520 let raw = notification.to_vec();
521 for &other in &self.connected_peers {
522 if other != peer {
523 let _ = self
524 .notif_handle
525 .send_sync_notification(other, raw.clone());
526 }
527 }
528 if self.seen_active.len() > 10_000 {
530 self.seen_backup = mem::take(&mut self.seen_active);
531 }
532 }
533 self.peer_book
535 .write()
536 .await
537 .adjust_score(&peer.to_string(), 1);
538 }
539 }
540 Err(e) => {
541 warn!(error = %e, peer = %peer, "failed to decode notification");
542 self.peer_book
543 .write()
544 .await
545 .adjust_score(&peer.to_string(), -10);
546 }
547 }
548 }
549 NotificationEvent::NotificationStreamOpenFailure { peer, error } => {
550 warn!(peer = %peer, error = ?error, "notification stream open failed");
551 }
552 }
553 }
554
555 fn handle_reqresp_event(&mut self, event: RequestResponseEvent) {
556 match event {
557 RequestResponseEvent::RequestReceived {
558 peer,
559 request_id,
560 request,
561 ..
562 } => {
563 let Some(sender) = self.peer_map.peer_to_validator.get(&peer).copied() else {
564 warn!(peer = %peer, "dropping request from unknown peer");
565 self.reqresp_handle.reject_request(request_id);
566 return;
567 };
568 match codec::decode::<ConsensusMessage>(&request) {
569 Ok(msg) => {
570 if let Err(e) = self.msg_tx.try_send((Some(sender), msg)) {
571 warn!("consensus message dropped (reqresp): {e}");
572 }
573 self.reqresp_handle.send_response(request_id, vec![]);
574 }
575 Err(e) => {
576 warn!(error = %e, "failed to decode request");
577 self.reqresp_handle.reject_request(request_id);
578 }
579 }
580 }
581 RequestResponseEvent::ResponseReceived { .. } => {}
582 RequestResponseEvent::RequestFailed { peer, error, .. } => {
583 debug!(peer = %peer, error = ?error, "request failed");
584 }
585 }
586 }
587
588 async fn handle_sync_event(&mut self, event: RequestResponseEvent) {
589 match event {
590 RequestResponseEvent::RequestReceived {
591 peer,
592 request_id,
593 request,
594 ..
595 } => {
596 if !self.notif_connected_peers.contains(&peer) {
599 warn!(peer = %peer, "rejecting sync request: no notification stream (chain isolation)");
600 self.sync_handle.reject_request(request_id);
601 return;
602 }
603 match codec::decode::<SyncRequest>(&request) {
604 Ok(req) => {
605 if let Err(e) = self.sync_req_tx.try_send(IncomingSyncRequest {
606 request_id,
607 peer,
608 request: req,
609 }) {
610 warn!("sync request dropped: {e}");
611 }
612 }
613 Err(e) => {
614 warn!(error = %e, peer = %peer, "failed to decode sync request");
615 self.peer_book
616 .write()
617 .await
618 .adjust_score(&peer.to_string(), -5);
619 let err_resp = SyncResponse::Error(format!("decode error: {e}"));
620 if let Ok(bytes) = codec::encode(&err_resp) {
621 self.sync_handle.send_response(request_id, bytes);
622 } else {
623 self.sync_handle.reject_request(request_id);
624 }
625 }
626 }
627 }
628 RequestResponseEvent::ResponseReceived {
629 request_id: _,
630 response,
631 ..
632 } => {
633 match codec::decode::<SyncResponse>(&response) {
635 Ok(resp) => {
636 if let Err(e) = self.sync_resp_tx.try_send(resp) {
637 warn!("sync response dropped: {e}");
638 }
639 }
640 Err(e) => {
641 warn!(error = %e, "failed to decode sync response");
642 }
643 }
644 }
645 RequestResponseEvent::RequestFailed { peer, error, .. } => {
646 debug!(peer = %peer, error = ?error, "sync request failed");
647 if let Err(e) = self
648 .sync_resp_tx
649 .try_send(SyncResponse::Error(format!("request failed: {error:?}")))
650 {
651 warn!("sync error response dropped: {e}");
652 }
653 }
654 }
655 }
656
657 async fn handle_pex_event(&mut self, event: RequestResponseEvent) {
658 match event {
659 RequestResponseEvent::RequestReceived {
660 peer,
661 request_id,
662 request,
663 ..
664 } => {
665 if !self.peer_map.peer_to_validator.contains_key(&peer)
667 && !self.connected_peers.contains(&peer)
668 {
669 warn!(peer = %peer, "rejecting PEX request from unknown peer");
670 self.pex_handle.reject_request(request_id);
671 return;
672 }
673 let now = Instant::now();
675 if let Some(last) = self.pex_rate_limit.get(&peer)
676 && now.duration_since(*last) < std::time::Duration::from_secs(10)
677 {
678 self.pex_handle.reject_request(request_id);
679 return;
680 }
681 self.pex_rate_limit.insert(peer, now);
682 match postcard::from_bytes::<PexRequest>(&request) {
683 Ok(PexRequest::GetPeers) => {
684 let book = self.peer_book.read().await;
685 let private = &self.pex_config.private_peer_ids;
686 let peers: Vec<PeerInfo> = book
687 .get_random_peers(self.pex_config.max_peers_per_response)
688 .into_iter()
689 .filter(|p| p.peer_id != peer.to_string())
690 .filter(|p| !private.contains(&p.peer_id))
692 .cloned()
693 .collect();
694 let resp = PexResponse::Peers(peers);
695 if let Ok(bytes) = postcard::to_allocvec(&resp) {
696 self.pex_handle.send_response(request_id, bytes);
697 }
698 }
699 Ok(PexRequest::Advertise {
700 role,
701 validator_id,
702 addresses,
703 }) => {
704 if role == PeerRole::Validator && validator_id.is_none() {
706 warn!(peer = %peer, "PEX Advertise claims validator role without validator_id");
707 self.pex_handle.reject_request(request_id);
708 return;
709 }
710 if let Some(vid) = validator_id
712 && let Some(&expected_peer) =
713 self.peer_map.validator_to_peer.get(&ValidatorId(vid))
714 && expected_peer != peer
715 {
716 warn!(
717 peer = %peer,
718 claimed_vid = vid,
719 "PEX Advertise validator_id mismatch, rejecting"
720 );
721 self.pex_handle.reject_request(request_id);
722 return;
723 }
724 let addresses: Vec<_> = addresses.iter().take(8).cloned().collect();
726 let mut info = PeerInfo::new(
727 peer,
728 role,
729 addresses.iter().filter_map(|a| a.parse().ok()).collect(),
730 );
731 if let Some(vid) = validator_id {
732 info = info.with_validator(ValidatorId(vid));
733 }
734 self.peer_book.write().await.add_peer(info);
735 if let Ok(bytes) = postcard::to_allocvec(&PexResponse::Ack) {
736 self.pex_handle.send_response(request_id, bytes);
737 }
738 }
739 Err(e) => {
740 warn!(error = %e, "failed to decode PEX request");
741 self.pex_handle.reject_request(request_id);
742 }
743 }
744 }
745 RequestResponseEvent::ResponseReceived { response, .. } => {
746 if let Ok(PexResponse::Peers(peers)) = postcard::from_bytes(&response) {
747 let mut book = self.peer_book.write().await;
748 for peer in peers {
749 if !peer.is_banned() {
750 book.add_peer(peer);
751 }
752 }
753 }
754 }
755 other => {
757 trace!("unhandled PEX event: {other:?}");
758 }
759 }
760 }
761
762 async fn run_maintenance(&mut self) {
764 let to_dial: Vec<(PeerId, Vec<Multiaddr>)> = {
767 let book = self.peer_book.read().await;
768 self.persistent_peers
769 .values()
770 .filter(|pid| !self.connected_peers.contains(pid))
771 .filter_map(|&pid| {
772 book.get(&pid.to_string()).map(|info| {
773 let addrs: Vec<Multiaddr> = info
774 .addresses
775 .iter()
776 .filter_map(|a| a.parse().ok())
777 .collect();
778 (pid, addrs)
779 })
780 })
781 .collect()
782 };
783
784 for (pid, addrs) in to_dial {
785 if !addrs.is_empty() {
786 self.litep2p
787 .add_known_address(pid, addrs.clone().into_iter());
788 let mut dial_addr = addrs[0].clone();
790 dial_addr.push(litep2p::types::multiaddr::Protocol::P2p(pid.into()));
791 if let Err(e) = self.litep2p.dial_address(dial_addr).await {
792 debug!(peer = %pid, error = ?e, "persistent peer redial failed");
793 }
794 }
795 }
796
797 let max = self.pex_config.max_peers;
799 if self.connected_peers.len() < max * 4 / 5 {
800 let book = self.peer_book.read().await;
801 let candidates = book.get_random_peers(5);
802 for peer in candidates {
803 if let Ok(pid) = peer.peer_id.parse::<PeerId>()
804 && !self.connected_peers.contains(&pid)
805 {
806 let addrs: Vec<Multiaddr> = peer
807 .addresses
808 .iter()
809 .filter_map(|a| a.parse().ok())
810 .collect();
811 if !addrs.is_empty() {
812 self.litep2p.add_known_address(pid, addrs.into_iter());
813 }
814 }
815 }
816 }
817
818 self.peer_book.write().await.prune_stale(86400);
820 if let Err(e) = self.peer_book.read().await.save() {
821 warn!(%e, "failed to save peer book");
822 }
823
824 let pex_cutoff = Instant::now() - std::time::Duration::from_secs(60);
826 self.pex_rate_limit.retain(|_, last| *last > pex_cutoff);
827 }
828
829 async fn run_pex_round(&mut self) {
831 if self.connected_peers.is_empty() {
832 return;
833 }
834 let peers: Vec<PeerId> = self.connected_peers.iter().copied().collect();
836 let idx = rand::random::<usize>() % peers.len();
837 let target = peers[idx];
838
839 if let Ok(bytes) = postcard::to_allocvec(&PexRequest::GetPeers) {
840 let _ = self
841 .pex_handle
842 .send_request(target, bytes, DialOptions::Reject)
843 .await;
844 }
845 }
846
847 fn pick_non_validator_to_evict(&self) -> Option<PeerId> {
850 self.connected_peers
851 .iter()
852 .find(|p| !self.peer_map.peer_to_validator.contains_key(p))
853 .copied()
854 }
855
856 async fn handle_litep2p_event(&mut self, event: Litep2pEvent) {
857 match event {
858 Litep2pEvent::ConnectionEstablished { peer, endpoint } => {
859 let is_validator = self.peer_map.peer_to_validator.contains_key(&peer);
863 if !is_validator && self.connected_peers.len() >= self.pex_config.max_peers {
864 warn!(
865 peer = %peer,
866 total = self.connected_peers.len(),
867 max = self.pex_config.max_peers,
868 "connection limit reached, ignoring non-validator peer"
869 );
870 return;
871 }
872
873 if is_validator
876 && self.connected_peers.len() >= self.pex_config.max_peers
877 && let Some(evict_peer) = self.pick_non_validator_to_evict()
878 {
879 warn!(
880 evict = %evict_peer,
881 new_validator = %peer,
882 "evicting non-validator peer to make room for validator"
883 );
884 self.connected_peers.remove(&evict_peer);
885 self.notif_connected_peers.remove(&evict_peer);
886 }
889
890 info!(peer = %peer, endpoint = ?endpoint, "connection established");
891 self.connected_peers.insert(peer);
892 let _ = self.connected_count_tx.send(self.connected_peers.len());
893
894 if let Err(e) = self.notif_handle.try_open_substream_batch(iter::once(peer)) {
896 debug!(peer = %peer, error = ?e, "failed to open notification substream");
897 }
898
899 if let Some(info) = self.peer_book.write().await.get_mut(&peer.to_string()) {
901 info.touch();
902 }
903 }
904 Litep2pEvent::ConnectionClosed { peer, .. } => {
905 debug!(peer = %peer, "connection closed");
906 self.connected_peers.remove(&peer);
907 let _ = self.connected_count_tx.send(self.connected_peers.len());
908 if self.notif_connected_peers.remove(&peer) {
911 let _ = self
912 .notif_connected_count_tx
913 .send(self.notif_connected_peers.len());
914 }
915 }
916 Litep2pEvent::DialFailure { address, error, .. } => {
917 warn!(address = %address, error = ?error, "dial failed");
918 }
919 other => {
921 trace!(?other, "unhandled litep2p event");
922 }
923 }
924 }
925
926 fn update_peer_info(&self) {
927 let peers: Vec<PeerStatus> = self
928 .peer_map
929 .validator_to_peer
930 .iter()
931 .map(|(&vid, pid)| PeerStatus {
932 validator_id: vid,
933 peer_id: pid.to_string(),
934 })
935 .collect();
936 let _ = self.peer_info_tx.send(peers);
937 }
938
939 async fn handle_command(&mut self, cmd: NetCommand) {
940 match cmd {
941 NetCommand::Broadcast(bytes) => {
942 for &peer in &self.notif_connected_peers {
944 let _ = self
945 .notif_handle
946 .send_sync_notification(peer, bytes.clone());
947 }
948 }
949 NetCommand::SendTo(target, bytes) => {
950 if let Some(&peer_id) = self.peer_map.validator_to_peer.get(&target) {
951 let _ = self
952 .reqresp_handle
953 .send_request(peer_id, bytes, DialOptions::Reject)
954 .await;
955 }
956 }
957 NetCommand::AddPeer(vid, pid, addrs) => {
958 info!(validator = %vid, peer = %pid, "adding peer");
959 self.peer_map.insert(vid, pid);
960 self.litep2p.add_known_address(pid, addrs.into_iter());
961 self.update_peer_info();
962 }
963 NetCommand::RemovePeer(vid) => {
964 if let Some(pid) = self.peer_map.remove(vid) {
965 info!(validator = %vid, peer = %pid, "removed peer");
966 } else {
967 warn!(validator = %vid, "peer not found for removal");
968 }
969 self.update_peer_info();
970 }
971 NetCommand::SyncRequest(peer_id, bytes) => {
972 let _ = self
973 .sync_handle
974 .send_request(peer_id, bytes, DialOptions::Reject)
975 .await;
976 }
977 NetCommand::SyncRespond(request_id, bytes) => {
978 self.sync_handle.send_response(request_id, bytes);
979 }
980 NetCommand::EpochChange(validators) => {
981 self.handle_epoch_change(validators).await;
982 }
983 NetCommand::BroadcastTx(bytes) => {
984 for &peer in &self.mempool_notif_connected_peers {
985 let _ = self
986 .mempool_notif_handle
987 .send_sync_notification(peer, bytes.clone());
988 }
989 }
990 }
991 }
992
993 async fn handle_mempool_notification_event(&mut self, event: NotificationEvent) {
994 match event {
995 NotificationEvent::ValidateSubstream { peer, .. } => {
996 self.mempool_notif_handle
998 .send_validation_result(peer, ValidationResult::Accept);
999 }
1000 NotificationEvent::NotificationStreamOpened { peer, .. } => {
1001 trace!(peer = %peer, "mempool notif stream opened");
1002 self.mempool_notif_connected_peers.insert(peer);
1003 }
1004 NotificationEvent::NotificationStreamClosed { peer } => {
1005 trace!(peer = %peer, "mempool notif stream closed");
1006 self.mempool_notif_connected_peers.remove(&peer);
1007 self.mempool_peer_rate.remove(&peer);
1008 }
1009 NotificationEvent::NotificationReceived { peer, notification } => {
1010 const MAX_TX_PER_SEC: u32 = 500;
1012 let now = Instant::now();
1013 let entry = self.mempool_peer_rate.entry(peer).or_insert((now, 0));
1014 if now.duration_since(entry.0).as_secs() >= 1 {
1015 *entry = (now, 1);
1016 } else {
1017 entry.1 += 1;
1018 if entry.1 > MAX_TX_PER_SEC {
1019 return;
1020 }
1021 }
1022
1023 let hash = blake3::hash(¬ification);
1024 let hash_bytes: [u8; 32] = *hash.as_bytes();
1025 if self.mempool_seen_active.contains(&hash_bytes)
1026 || self.mempool_seen_backup.contains(&hash_bytes)
1027 {
1028 return;
1029 }
1030 self.mempool_seen_active.insert(hash_bytes);
1031 if self.mempool_seen_active.len() > 100_000 {
1034 std::mem::swap(&mut self.mempool_seen_active, &mut self.mempool_seen_backup);
1035 self.mempool_seen_active.clear();
1036 }
1037 let _ = self.mempool_tx_tx.try_send(notification.freeze().to_vec());
1038 }
1039 NotificationEvent::NotificationStreamOpenFailure { peer, error } => {
1040 debug!(peer = %peer, ?error, "mempool notif stream open failed");
1041 }
1042 }
1043 }
1044
1045 async fn handle_epoch_change(
1046 &mut self,
1047 validators: Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>,
1048 ) {
1049 for (vid, pubkey) in &validators {
1051 if self.peer_map.validator_to_peer.contains_key(vid) {
1052 continue;
1053 }
1054 let pk_bytes = &pubkey.0;
1056 if let Ok(lpk) = litep2p::crypto::ed25519::PublicKey::try_from_bytes(pk_bytes) {
1057 let peer_id = lpk.to_peer_id();
1058 info!(validator = %vid, peer = %peer_id, "adding new epoch validator to peer_map");
1059 self.peer_map.insert(*vid, peer_id);
1060 }
1061 }
1062 let new_ids: HashSet<ValidatorId> = validators.iter().map(|(vid, _)| *vid).collect();
1064 let to_remove: Vec<ValidatorId> = self
1065 .peer_map
1066 .validator_to_peer
1067 .keys()
1068 .filter(|vid| !new_ids.contains(vid))
1069 .copied()
1070 .collect();
1071 for vid in to_remove {
1072 info!(validator = %vid, "removing validator from peer_map after epoch change");
1073 self.peer_map.remove(vid);
1074 }
1075 self.validator_ids_ordered = validators.iter().map(|(vid, _)| *vid).collect();
1077 self.validator_keys = validators.into_iter().collect();
1078 self.persistent_peers = self.peer_map.validator_to_peer.clone();
1080 self.update_peer_info();
1081 }
1082}
1083
1084#[derive(Clone)]
1087pub struct Litep2pNetworkSink {
1088 cmd_tx: mpsc::Sender<NetCommand>,
1089 epoch_tx: watch::Sender<EpochUpdate>,
1090}
1091
1092impl Litep2pNetworkSink {
1093 pub fn add_peer(&self, vid: ValidatorId, pid: PeerId, addrs: Vec<Multiaddr>) {
1094 if let Err(e) = self.cmd_tx.try_send(NetCommand::AddPeer(vid, pid, addrs)) {
1095 warn!("add_peer cmd dropped: {e}");
1096 }
1097 }
1098
1099 pub fn remove_peer(&self, vid: ValidatorId) {
1100 if let Err(e) = self.cmd_tx.try_send(NetCommand::RemovePeer(vid)) {
1101 warn!("remove_peer cmd dropped: {e}");
1102 }
1103 }
1104
1105 pub fn send_sync_request(&self, peer_id: PeerId, request: &SyncRequest) {
1106 match codec::encode(request) {
1107 Ok(bytes) => {
1108 if let Err(e) = self
1109 .cmd_tx
1110 .try_send(NetCommand::SyncRequest(peer_id, bytes))
1111 {
1112 warn!("sync request cmd dropped: {e}");
1113 }
1114 }
1115 Err(e) => warn!("sync request encode failed: {e}"),
1116 }
1117 }
1118
1119 pub fn send_sync_response(&self, request_id: RequestId, response: &SyncResponse) {
1120 match codec::encode(response) {
1121 Ok(bytes) => {
1122 if let Err(e) = self
1123 .cmd_tx
1124 .try_send(NetCommand::SyncRespond(request_id, bytes))
1125 {
1126 warn!("sync response cmd dropped: {e}");
1127 }
1128 }
1129 Err(e) => warn!("sync response encode failed: {e}"),
1130 }
1131 }
1132
1133 pub fn broadcast_tx(&self, tx_bytes: Vec<u8>) {
1135 if let Err(e) = self.cmd_tx.try_send(NetCommand::BroadcastTx(tx_bytes)) {
1136 warn!("broadcast_tx cmd dropped: {e}");
1137 }
1138 }
1139}
1140
1141impl NetworkSink for Litep2pNetworkSink {
1142 fn broadcast(&self, msg: ConsensusMessage) {
1143 match codec::encode(&msg) {
1144 Ok(bytes) => {
1145 if let Err(e) = self.cmd_tx.try_send(NetCommand::Broadcast(bytes)) {
1146 warn!("broadcast cmd dropped: {e}");
1147 }
1148 }
1149 Err(e) => warn!("broadcast encode failed: {e}"),
1150 }
1151 }
1152
1153 fn send_to(&self, target: ValidatorId, msg: ConsensusMessage) {
1154 match codec::encode(&msg) {
1155 Ok(bytes) => {
1156 if let Err(e) = self.cmd_tx.try_send(NetCommand::SendTo(target, bytes)) {
1157 warn!("send_to cmd dropped for {target}: {e}");
1158 }
1159 }
1160 Err(e) => warn!("send_to encode failed for {target}: {e}"),
1161 }
1162 }
1163
1164 fn on_epoch_change(&self, epoch: EpochNumber, new_validator_set: &hotmint_types::ValidatorSet) {
1165 let validators: Vec<_> = new_validator_set
1166 .validators()
1167 .iter()
1168 .map(|v| (v.id, v.public_key.clone()))
1169 .collect();
1170 let _ = self.epoch_tx.send(Some((epoch, validators)));
1172 }
1173
1174 fn broadcast_evidence(&self, proof: &hotmint_types::evidence::EquivocationProof) {
1175 let msg = ConsensusMessage::Evidence(proof.clone());
1176 match codec::encode(&msg) {
1177 Ok(bytes) => {
1178 if let Err(e) = self.cmd_tx.try_send(NetCommand::Broadcast(bytes)) {
1179 warn!("broadcast_evidence cmd dropped: {e}");
1180 }
1181 }
1182 Err(e) => warn!("broadcast_evidence encode failed: {e}"),
1183 }
1184 }
1185}