1use ruc::*;
2
3use std::collections::{HashMap, HashSet};
4use std::iter;
5use std::mem;
6use std::time::Instant;
7
8use futures::StreamExt;
9use hotmint_consensus::network::NetworkSink;
10use hotmint_types::epoch::EpochNumber;
11use hotmint_types::sync::{SyncRequest, SyncResponse};
12use hotmint_types::{ConsensusMessage, ValidatorId};
13use litep2p::config::ConfigBuilder;
14use litep2p::protocol::notification::{
15 ConfigBuilder as NotifConfigBuilder, NotificationEvent, NotificationHandle, ValidationResult,
16};
17use litep2p::protocol::request_response::{
18 ConfigBuilder as ReqRespConfigBuilder, DialOptions, RequestResponseEvent, RequestResponseHandle,
19};
20use litep2p::transport::tcp::config::Config as TcpConfig;
21use litep2p::types::RequestId;
22use litep2p::types::multiaddr::Multiaddr;
23use litep2p::{Litep2p, Litep2pEvent, PeerId};
24use serde::{Deserialize, Serialize};
25use tokio::sync::{mpsc, watch};
26use tracing::{debug, info, trace, warn};
27
28type EpochUpdate = Option<(
30 EpochNumber,
31 Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>,
32)>;
33
34use std::sync::Arc;
35
36use tokio::sync::RwLock;
37
38use crate::codec;
39use crate::peer::{PeerBook, PeerInfo, PeerRole};
40use crate::pex::{PexConfig, PexRequest, PexResponse};
41
42const NOTIF_PROTOCOL: &str = "/hotmint/consensus/notif/1";
43const MEMPOOL_NOTIF_PROTOCOL: &str = "/hotmint/mempool/notif/1";
44const REQ_RESP_PROTOCOL: &str = "/hotmint/consensus/reqresp/1";
45const SYNC_PROTOCOL: &str = "/hotmint/sync/1";
46const PEX_PROTOCOL: &str = "/hotmint/pex/1";
47const MAX_NOTIFICATION_SIZE: usize = 16 * 1024 * 1024;
48const MAX_MEMPOOL_NOTIF_SIZE: usize = 512 * 1024;
50const MAINTENANCE_INTERVAL_SECS: u64 = 10;
51
52#[derive(Clone)]
54pub struct PeerMap {
55 pub validator_to_peer: HashMap<ValidatorId, PeerId>,
56 pub peer_to_validator: HashMap<PeerId, ValidatorId>,
57}
58
59impl PeerMap {
60 pub fn new() -> Self {
61 Self {
62 validator_to_peer: HashMap::new(),
63 peer_to_validator: HashMap::new(),
64 }
65 }
66
67 pub fn insert(&mut self, vid: ValidatorId, pid: PeerId) {
68 self.validator_to_peer.insert(vid, pid);
69 self.peer_to_validator.insert(pid, vid);
70 }
71
72 pub fn remove(&mut self, vid: ValidatorId) -> Option<PeerId> {
73 if let Some(pid) = self.validator_to_peer.remove(&vid) {
74 self.peer_to_validator.remove(&pid);
75 Some(pid)
76 } else {
77 None
78 }
79 }
80}
81
82impl Default for PeerMap {
83 fn default() -> Self {
84 Self::new()
85 }
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct PeerStatus {
91 pub validator_id: ValidatorId,
92 pub peer_id: String,
93}
94
95pub enum NetCommand {
97 Broadcast(Vec<u8>),
98 SendTo(ValidatorId, Vec<u8>),
99 AddPeer(ValidatorId, PeerId, Vec<Multiaddr>),
100 RemovePeer(ValidatorId),
101 SyncRequest(PeerId, Vec<u8>),
103 SyncRespond(RequestId, Vec<u8>),
105 EpochChange(Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>),
107 BroadcastTx(Vec<u8>),
109}
110
111pub struct IncomingSyncRequest {
113 pub request_id: RequestId,
114 pub peer: PeerId,
115 pub request: SyncRequest,
116}
117
118pub struct NetworkServiceHandles {
120 pub service: NetworkService,
121 pub sink: Litep2pNetworkSink,
122 pub msg_rx: mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>,
123 pub sync_req_rx: mpsc::Receiver<IncomingSyncRequest>,
124 pub sync_resp_rx: mpsc::Receiver<SyncResponse>,
125 pub peer_info_rx: watch::Receiver<Vec<PeerStatus>>,
126 pub connected_count_rx: watch::Receiver<usize>,
127 pub notif_connected_count_rx: watch::Receiver<usize>,
131 pub mempool_tx_rx: mpsc::Receiver<Vec<u8>>,
133}
134
135pub struct NetworkService {
137 litep2p: Litep2p,
138 notif_handle: NotificationHandle,
139 mempool_notif_handle: NotificationHandle,
140 reqresp_handle: RequestResponseHandle,
141 sync_handle: RequestResponseHandle,
142 pex_handle: RequestResponseHandle,
143 peer_map: PeerMap,
144 peer_book: Arc<RwLock<PeerBook>>,
145 pex_config: PexConfig,
146 persistent_peers: HashMap<ValidatorId, PeerId>,
147 initial_dial_addresses: Vec<Multiaddr>,
149 relay_consensus: bool,
151 validator_keys: HashMap<ValidatorId, hotmint_types::crypto::PublicKey>,
154 validator_ids_ordered: Vec<ValidatorId>,
157 chain_id_hash: [u8; 32],
159 current_epoch: EpochNumber,
161 msg_tx: mpsc::Sender<(Option<ValidatorId>, ConsensusMessage)>,
162 cmd_rx: mpsc::Receiver<NetCommand>,
163 sync_req_tx: mpsc::Sender<IncomingSyncRequest>,
164 sync_resp_tx: mpsc::Sender<SyncResponse>,
165 peer_info_tx: watch::Sender<Vec<PeerStatus>>,
166 connected_count_tx: watch::Sender<usize>,
167 notif_connected_peers: HashSet<PeerId>,
169 notif_connected_count_tx: watch::Sender<usize>,
170 connected_peers: HashSet<PeerId>,
171 seen_active: HashSet<u64>,
177 seen_backup: HashSet<u64>,
178 epoch_rx: watch::Receiver<EpochUpdate>,
180 pex_rate_limit: HashMap<PeerId, Instant>,
182 mempool_tx_tx: mpsc::Sender<Vec<u8>>,
184 mempool_seen_active: HashSet<[u8; 32]>,
186 mempool_seen_backup: HashSet<[u8; 32]>,
187 mempool_notif_connected_peers: HashSet<PeerId>,
189}
190
191pub struct NetworkConfig {
193 pub listen_addr: Multiaddr,
194 pub peer_map: PeerMap,
195 pub known_addresses: Vec<(PeerId, Vec<Multiaddr>)>,
196 pub keypair: Option<litep2p::crypto::ed25519::Keypair>,
197 pub peer_book: Arc<RwLock<PeerBook>>,
198 pub pex_config: PexConfig,
199 pub relay_consensus: bool,
200 pub initial_validators: Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>,
201 pub chain_id_hash: [u8; 32],
202}
203
204impl NetworkService {
205 pub fn create(config: NetworkConfig) -> Result<NetworkServiceHandles> {
207 let NetworkConfig {
208 listen_addr,
209 peer_map,
210 known_addresses,
211 keypair,
212 peer_book,
213 pex_config,
214 relay_consensus,
215 initial_validators,
216 chain_id_hash,
217 } = config;
218 let (notif_config, notif_handle) = NotifConfigBuilder::new(NOTIF_PROTOCOL.into())
221 .with_max_size(MAX_NOTIFICATION_SIZE)
222 .with_handshake(chain_id_hash.to_vec())
223 .with_auto_accept_inbound(false)
224 .with_sync_channel_size(1024)
225 .with_async_channel_size(1024)
226 .build();
227
228 let (reqresp_config, reqresp_handle) = ReqRespConfigBuilder::new(REQ_RESP_PROTOCOL.into())
229 .with_max_size(MAX_NOTIFICATION_SIZE)
230 .build();
231
232 let (sync_config, sync_handle) = ReqRespConfigBuilder::new(SYNC_PROTOCOL.into())
233 .with_max_size(MAX_NOTIFICATION_SIZE)
234 .build();
235
236 let (pex_config_proto, pex_handle) = ReqRespConfigBuilder::new(PEX_PROTOCOL.into())
237 .with_max_size(1024 * 1024) .build();
239
240 let (mempool_notif_config, mempool_notif_handle) =
241 NotifConfigBuilder::new(MEMPOOL_NOTIF_PROTOCOL.into())
242 .with_max_size(MAX_MEMPOOL_NOTIF_SIZE)
243 .with_handshake(chain_id_hash.to_vec())
244 .with_auto_accept_inbound(true)
245 .with_sync_channel_size(2048)
246 .with_async_channel_size(2048)
247 .build();
248
249 let mut config_builder = ConfigBuilder::new()
250 .with_tcp(TcpConfig {
251 listen_addresses: vec![listen_addr],
252 ..Default::default()
253 })
254 .with_notification_protocol(notif_config)
255 .with_notification_protocol(mempool_notif_config)
256 .with_request_response_protocol(reqresp_config)
257 .with_request_response_protocol(sync_config)
258 .with_request_response_protocol(pex_config_proto);
259
260 if let Some(kp) = keypair {
261 config_builder = config_builder.with_keypair(kp);
262 }
263
264 let initial_dial_addresses: Vec<Multiaddr> = known_addresses
266 .iter()
267 .flat_map(|(pid, addrs)| {
268 addrs.iter().map(move |addr| {
269 let mut full = addr.clone();
270 full.push(litep2p::types::multiaddr::Protocol::P2p((*pid).into()));
271 full
272 })
273 })
274 .collect();
275
276 if !known_addresses.is_empty() {
277 config_builder = config_builder.with_known_addresses(known_addresses.into_iter());
278 }
279
280 let litep2p =
281 Litep2p::new(config_builder.build()).c(d!("failed to create litep2p instance"))?;
282
283 info!(peer_id = %litep2p.local_peer_id(), "litep2p started");
284 for addr in litep2p.listen_addresses() {
285 info!(address = %addr, "listening on");
286 }
287
288 let (msg_tx, msg_rx) = mpsc::channel(8192);
289 let (cmd_tx, cmd_rx) = mpsc::channel(4096);
290 let (sync_req_tx, sync_req_rx) = mpsc::channel(256);
291 let (sync_resp_tx, sync_resp_rx) = mpsc::channel(256);
292 let (mempool_tx_tx, mempool_tx_rx) = mpsc::channel(4096);
293
294 let initial_peers: Vec<PeerStatus> = peer_map
296 .validator_to_peer
297 .iter()
298 .map(|(&vid, pid)| PeerStatus {
299 validator_id: vid,
300 peer_id: pid.to_string(),
301 })
302 .collect();
303 let (peer_info_tx, peer_info_rx) = watch::channel(initial_peers);
304
305 let (epoch_tx, epoch_rx) = watch::channel::<
306 Option<(
307 EpochNumber,
308 Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>,
309 )>,
310 >(None);
311
312 let sink = Litep2pNetworkSink {
313 cmd_tx: cmd_tx.clone(),
314 epoch_tx,
315 };
316
317 let (connected_count_tx, connected_count_rx) = watch::channel(0usize);
318 let (notif_connected_count_tx, notif_connected_count_rx) = watch::channel(0usize);
319
320 let persistent_peers: HashMap<ValidatorId, PeerId> = peer_map.validator_to_peer.clone();
322
323 let validator_ids_ordered: Vec<ValidatorId> =
324 initial_validators.iter().map(|(vid, _)| *vid).collect();
325 let validator_keys: HashMap<ValidatorId, hotmint_types::crypto::PublicKey> =
326 initial_validators.into_iter().collect();
327
328 Ok(NetworkServiceHandles {
329 service: Self {
330 litep2p,
331 notif_handle,
332 mempool_notif_handle,
333 reqresp_handle,
334 sync_handle,
335 pex_handle,
336 peer_map,
337 peer_book,
338 pex_config,
339 persistent_peers,
340 initial_dial_addresses,
341 relay_consensus,
342 validator_keys,
343 validator_ids_ordered,
344 chain_id_hash,
345 current_epoch: EpochNumber::GENESIS,
346 msg_tx,
347 cmd_rx,
348 sync_req_tx,
349 sync_resp_tx,
350 peer_info_tx,
351 connected_count_tx,
352 notif_connected_peers: HashSet::new(),
353 notif_connected_count_tx,
354 connected_peers: HashSet::new(),
355 seen_active: HashSet::new(),
356 seen_backup: HashSet::new(),
357 epoch_rx,
358 pex_rate_limit: HashMap::new(),
359 mempool_tx_tx,
360 mempool_seen_active: HashSet::new(),
361 mempool_seen_backup: HashSet::new(),
362 mempool_notif_connected_peers: HashSet::new(),
363 },
364 sink,
365 msg_rx,
366 sync_req_rx,
367 sync_resp_rx,
368 peer_info_rx,
369 connected_count_rx,
370 notif_connected_count_rx,
371 mempool_tx_rx,
372 })
373 }
374
375 pub fn local_peer_id(&self) -> &PeerId {
376 self.litep2p.local_peer_id()
377 }
378
379 pub async fn run(mut self) {
381 for addr in mem::take(&mut self.initial_dial_addresses) {
383 if let Err(e) = self.litep2p.dial_address(addr.clone()).await {
384 debug!(address = %addr, error = ?e, "initial dial failed (will retry)");
385 }
386 }
387
388 let mut maintenance_interval =
389 tokio::time::interval(tokio::time::Duration::from_secs(MAINTENANCE_INTERVAL_SECS));
390 let mut pex_interval = tokio::time::interval(tokio::time::Duration::from_secs(
391 self.pex_config.request_interval_secs,
392 ));
393 loop {
394 tokio::select! {
395 event = self.notif_handle.next() => {
396 if let Some(event) = event {
397 self.handle_notification_event(event).await;
398 }
399 }
400 event = self.mempool_notif_handle.next() => {
401 if let Some(event) = event {
402 self.handle_mempool_notification_event(event).await;
403 }
404 }
405 event = self.reqresp_handle.next() => {
406 if let Some(event) = event {
407 self.handle_reqresp_event(event);
408 }
409 }
410 event = self.sync_handle.next() => {
411 if let Some(event) = event {
412 self.handle_sync_event(event).await;
413 }
414 }
415 event = self.pex_handle.next() => {
416 if let Some(event) = event {
417 self.handle_pex_event(event).await;
418 }
419 }
420 event = self.litep2p.next_event() => {
421 if let Some(event) = event {
422 self.handle_litep2p_event(event).await;
423 }
424 }
425 Some(cmd) = self.cmd_rx.recv() => {
426 self.handle_command(cmd).await;
427 }
428 Ok(()) = self.epoch_rx.changed() => {
429 let epoch_val = self.epoch_rx.borrow_and_update().clone();
430 if let Some((epoch_number, validators)) = epoch_val {
431 self.current_epoch = epoch_number;
432 self.handle_epoch_change(validators).await;
433 }
434 }
435 _ = maintenance_interval.tick() => {
436 self.run_maintenance().await;
437 }
438 _ = pex_interval.tick() => {
439 if self.pex_config.enabled {
440 self.run_pex_round().await;
441 }
442 }
443 }
444 }
445 }
446
447 async fn handle_notification_event(&mut self, event: NotificationEvent) {
448 match event {
449 NotificationEvent::ValidateSubstream {
450 peer, handshake, ..
451 } => {
452 if handshake.as_slice() == self.chain_id_hash.as_slice() {
455 self.notif_handle
456 .send_validation_result(peer, ValidationResult::Accept);
457 } else {
458 warn!(peer = %peer, "rejecting peer: chain_id_hash handshake mismatch");
459 self.notif_handle
460 .send_validation_result(peer, ValidationResult::Reject);
461 }
462 }
463 NotificationEvent::NotificationStreamOpened { peer, .. } => {
464 info!(peer = %peer, "notification stream opened");
465 self.notif_connected_peers.insert(peer);
466 let _ = self
467 .notif_connected_count_tx
468 .send(self.notif_connected_peers.len());
469 }
470 NotificationEvent::NotificationStreamClosed { peer } => {
471 debug!(peer = %peer, "notification stream closed");
472 self.notif_connected_peers.remove(&peer);
473 let _ = self
474 .notif_connected_count_tx
475 .send(self.notif_connected_peers.len());
476 }
477 NotificationEvent::NotificationReceived { peer, notification } => {
478 let sender: Option<ValidatorId> =
480 self.peer_map.peer_to_validator.get(&peer).copied();
481
482 match codec::decode::<ConsensusMessage>(¬ification) {
483 Ok(msg) => {
484 if sender.is_some()
486 && let Err(e) = self.msg_tx.try_send((sender, msg.clone()))
487 {
488 warn!("consensus message dropped (notification): {e}");
489 }
490
491 if self.relay_consensus
496 && let Some(sid) = sender
497 && hotmint_consensus::engine::verify_relay_sender(
498 sid,
499 &msg,
500 &self.validator_keys,
501 &self.validator_ids_ordered,
502 &self.chain_id_hash,
503 self.current_epoch,
504 )
505 {
506 let msg_hash = u64::from_le_bytes(
507 blake3::hash(¬ification).as_bytes()[..8]
508 .try_into()
509 .unwrap(),
510 );
511
512 if !self.seen_active.contains(&msg_hash)
514 && !self.seen_backup.contains(&msg_hash)
515 {
516 self.seen_active.insert(msg_hash);
517 let raw = notification.to_vec();
518 for &other in &self.connected_peers {
519 if other != peer {
520 let _ = self
521 .notif_handle
522 .send_sync_notification(other, raw.clone());
523 }
524 }
525 if self.seen_active.len() > 10_000 {
527 self.seen_backup = mem::take(&mut self.seen_active);
528 }
529 }
530 self.peer_book
532 .write()
533 .await
534 .adjust_score(&peer.to_string(), 1);
535 }
536 }
537 Err(e) => {
538 warn!(error = %e, peer = %peer, "failed to decode notification");
539 self.peer_book
540 .write()
541 .await
542 .adjust_score(&peer.to_string(), -10);
543 }
544 }
545 }
546 NotificationEvent::NotificationStreamOpenFailure { peer, error } => {
547 warn!(peer = %peer, error = ?error, "notification stream open failed");
548 }
549 }
550 }
551
552 fn handle_reqresp_event(&mut self, event: RequestResponseEvent) {
553 match event {
554 RequestResponseEvent::RequestReceived {
555 peer,
556 request_id,
557 request,
558 ..
559 } => {
560 let Some(sender) = self.peer_map.peer_to_validator.get(&peer).copied() else {
561 warn!(peer = %peer, "dropping request from unknown peer");
562 self.reqresp_handle.reject_request(request_id);
563 return;
564 };
565 match codec::decode::<ConsensusMessage>(&request) {
566 Ok(msg) => {
567 if let Err(e) = self.msg_tx.try_send((Some(sender), msg)) {
568 warn!("consensus message dropped (reqresp): {e}");
569 }
570 self.reqresp_handle.send_response(request_id, vec![]);
571 }
572 Err(e) => {
573 warn!(error = %e, "failed to decode request");
574 self.reqresp_handle.reject_request(request_id);
575 }
576 }
577 }
578 RequestResponseEvent::ResponseReceived { .. } => {}
579 RequestResponseEvent::RequestFailed { peer, error, .. } => {
580 debug!(peer = %peer, error = ?error, "request failed");
581 }
582 }
583 }
584
585 async fn handle_sync_event(&mut self, event: RequestResponseEvent) {
586 match event {
587 RequestResponseEvent::RequestReceived {
588 peer,
589 request_id,
590 request,
591 ..
592 } => {
593 if !self.notif_connected_peers.contains(&peer) {
596 warn!(peer = %peer, "rejecting sync request: no notification stream (chain isolation)");
597 self.sync_handle.reject_request(request_id);
598 return;
599 }
600 match codec::decode::<SyncRequest>(&request) {
601 Ok(req) => {
602 if let Err(e) = self.sync_req_tx.try_send(IncomingSyncRequest {
603 request_id,
604 peer,
605 request: req,
606 }) {
607 warn!("sync request dropped: {e}");
608 }
609 }
610 Err(e) => {
611 warn!(error = %e, peer = %peer, "failed to decode sync request");
612 self.peer_book
613 .write()
614 .await
615 .adjust_score(&peer.to_string(), -5);
616 let err_resp = SyncResponse::Error(format!("decode error: {e}"));
617 if let Ok(bytes) = codec::encode(&err_resp) {
618 self.sync_handle.send_response(request_id, bytes);
619 } else {
620 self.sync_handle.reject_request(request_id);
621 }
622 }
623 }
624 }
625 RequestResponseEvent::ResponseReceived {
626 request_id: _,
627 response,
628 ..
629 } => {
630 match codec::decode::<SyncResponse>(&response) {
632 Ok(resp) => {
633 if let Err(e) = self.sync_resp_tx.try_send(resp) {
634 warn!("sync response dropped: {e}");
635 }
636 }
637 Err(e) => {
638 warn!(error = %e, "failed to decode sync response");
639 }
640 }
641 }
642 RequestResponseEvent::RequestFailed { peer, error, .. } => {
643 debug!(peer = %peer, error = ?error, "sync request failed");
644 if let Err(e) = self
645 .sync_resp_tx
646 .try_send(SyncResponse::Error(format!("request failed: {error:?}")))
647 {
648 warn!("sync error response dropped: {e}");
649 }
650 }
651 }
652 }
653
654 async fn handle_pex_event(&mut self, event: RequestResponseEvent) {
655 match event {
656 RequestResponseEvent::RequestReceived {
657 peer,
658 request_id,
659 request,
660 ..
661 } => {
662 if !self.peer_map.peer_to_validator.contains_key(&peer)
664 && !self.connected_peers.contains(&peer)
665 {
666 warn!(peer = %peer, "rejecting PEX request from unknown peer");
667 self.pex_handle.reject_request(request_id);
668 return;
669 }
670 let now = Instant::now();
672 if let Some(last) = self.pex_rate_limit.get(&peer)
673 && now.duration_since(*last) < std::time::Duration::from_secs(10)
674 {
675 self.pex_handle.reject_request(request_id);
676 return;
677 }
678 self.pex_rate_limit.insert(peer, now);
679 match postcard::from_bytes::<PexRequest>(&request) {
680 Ok(PexRequest::GetPeers) => {
681 let book = self.peer_book.read().await;
682 let private = &self.pex_config.private_peer_ids;
683 let peers: Vec<PeerInfo> = book
684 .get_random_peers(self.pex_config.max_peers_per_response)
685 .into_iter()
686 .filter(|p| p.peer_id != peer.to_string())
687 .filter(|p| !private.contains(&p.peer_id))
689 .cloned()
690 .collect();
691 let resp = PexResponse::Peers(peers);
692 if let Ok(bytes) = postcard::to_allocvec(&resp) {
693 self.pex_handle.send_response(request_id, bytes);
694 }
695 }
696 Ok(PexRequest::Advertise {
697 role,
698 validator_id,
699 addresses,
700 }) => {
701 if role == PeerRole::Validator && validator_id.is_none() {
703 warn!(peer = %peer, "PEX Advertise claims validator role without validator_id");
704 self.pex_handle.reject_request(request_id);
705 return;
706 }
707 if let Some(vid) = validator_id
709 && let Some(&expected_peer) =
710 self.peer_map.validator_to_peer.get(&ValidatorId(vid))
711 && expected_peer != peer
712 {
713 warn!(
714 peer = %peer,
715 claimed_vid = vid,
716 "PEX Advertise validator_id mismatch, rejecting"
717 );
718 self.pex_handle.reject_request(request_id);
719 return;
720 }
721 let addresses: Vec<_> = addresses.iter().take(8).cloned().collect();
723 let mut info = PeerInfo::new(
724 peer,
725 role,
726 addresses.iter().filter_map(|a| a.parse().ok()).collect(),
727 );
728 if let Some(vid) = validator_id {
729 info = info.with_validator(ValidatorId(vid));
730 }
731 self.peer_book.write().await.add_peer(info);
732 if let Ok(bytes) = postcard::to_allocvec(&PexResponse::Ack) {
733 self.pex_handle.send_response(request_id, bytes);
734 }
735 }
736 Err(e) => {
737 warn!(error = %e, "failed to decode PEX request");
738 self.pex_handle.reject_request(request_id);
739 }
740 }
741 }
742 RequestResponseEvent::ResponseReceived { response, .. } => {
743 if let Ok(PexResponse::Peers(peers)) = postcard::from_bytes(&response) {
744 let mut book = self.peer_book.write().await;
745 for peer in peers {
746 if !peer.is_banned() {
747 book.add_peer(peer);
748 }
749 }
750 }
751 }
752 other => {
754 trace!("unhandled PEX event: {other:?}");
755 }
756 }
757 }
758
759 async fn run_maintenance(&mut self) {
761 let to_dial: Vec<(PeerId, Vec<Multiaddr>)> = {
764 let book = self.peer_book.read().await;
765 self.persistent_peers
766 .values()
767 .filter(|pid| !self.connected_peers.contains(pid))
768 .filter_map(|&pid| {
769 book.get(&pid.to_string()).map(|info| {
770 let addrs: Vec<Multiaddr> = info
771 .addresses
772 .iter()
773 .filter_map(|a| a.parse().ok())
774 .collect();
775 (pid, addrs)
776 })
777 })
778 .collect()
779 };
780
781 for (pid, addrs) in to_dial {
782 if !addrs.is_empty() {
783 self.litep2p
784 .add_known_address(pid, addrs.clone().into_iter());
785 let mut dial_addr = addrs[0].clone();
787 dial_addr.push(litep2p::types::multiaddr::Protocol::P2p(pid.into()));
788 if let Err(e) = self.litep2p.dial_address(dial_addr).await {
789 debug!(peer = %pid, error = ?e, "persistent peer redial failed");
790 }
791 }
792 }
793
794 let max = self.pex_config.max_peers;
796 if self.connected_peers.len() < max * 4 / 5 {
797 let book = self.peer_book.read().await;
798 let candidates = book.get_random_peers(5);
799 for peer in candidates {
800 if let Ok(pid) = peer.peer_id.parse::<PeerId>()
801 && !self.connected_peers.contains(&pid)
802 {
803 let addrs: Vec<Multiaddr> = peer
804 .addresses
805 .iter()
806 .filter_map(|a| a.parse().ok())
807 .collect();
808 if !addrs.is_empty() {
809 self.litep2p.add_known_address(pid, addrs.into_iter());
810 }
811 }
812 }
813 }
814
815 self.peer_book.write().await.prune_stale(86400);
817 if let Err(e) = self.peer_book.read().await.save() {
818 warn!(%e, "failed to save peer book");
819 }
820
821 let pex_cutoff = Instant::now() - std::time::Duration::from_secs(60);
823 self.pex_rate_limit.retain(|_, last| *last > pex_cutoff);
824 }
825
826 async fn run_pex_round(&mut self) {
828 if self.connected_peers.is_empty() {
829 return;
830 }
831 let peers: Vec<PeerId> = self.connected_peers.iter().copied().collect();
833 let idx = rand::random::<usize>() % peers.len();
834 let target = peers[idx];
835
836 if let Ok(bytes) = postcard::to_allocvec(&PexRequest::GetPeers) {
837 let _ = self
838 .pex_handle
839 .send_request(target, bytes, DialOptions::Reject)
840 .await;
841 }
842 }
843
844 fn pick_non_validator_to_evict(&self) -> Option<PeerId> {
847 self.connected_peers
848 .iter()
849 .find(|p| !self.peer_map.peer_to_validator.contains_key(p))
850 .copied()
851 }
852
853 async fn handle_litep2p_event(&mut self, event: Litep2pEvent) {
854 match event {
855 Litep2pEvent::ConnectionEstablished { peer, endpoint } => {
856 let is_validator = self.peer_map.peer_to_validator.contains_key(&peer);
860 if !is_validator && self.connected_peers.len() >= self.pex_config.max_peers {
861 warn!(
862 peer = %peer,
863 total = self.connected_peers.len(),
864 max = self.pex_config.max_peers,
865 "connection limit reached, ignoring non-validator peer"
866 );
867 return;
868 }
869
870 if is_validator
873 && self.connected_peers.len() >= self.pex_config.max_peers
874 && let Some(evict_peer) = self.pick_non_validator_to_evict()
875 {
876 warn!(
877 evict = %evict_peer,
878 new_validator = %peer,
879 "evicting non-validator peer to make room for validator"
880 );
881 self.connected_peers.remove(&evict_peer);
882 self.notif_connected_peers.remove(&evict_peer);
883 }
886
887 info!(peer = %peer, endpoint = ?endpoint, "connection established");
888 self.connected_peers.insert(peer);
889 let _ = self.connected_count_tx.send(self.connected_peers.len());
890
891 if let Err(e) = self.notif_handle.try_open_substream_batch(iter::once(peer)) {
893 debug!(peer = %peer, error = ?e, "failed to open notification substream");
894 }
895
896 if let Some(info) = self.peer_book.write().await.get_mut(&peer.to_string()) {
898 info.touch();
899 }
900 }
901 Litep2pEvent::ConnectionClosed { peer, .. } => {
902 debug!(peer = %peer, "connection closed");
903 self.connected_peers.remove(&peer);
904 let _ = self.connected_count_tx.send(self.connected_peers.len());
905 if self.notif_connected_peers.remove(&peer) {
908 let _ = self
909 .notif_connected_count_tx
910 .send(self.notif_connected_peers.len());
911 }
912 }
913 Litep2pEvent::DialFailure { address, error, .. } => {
914 warn!(address = %address, error = ?error, "dial failed");
915 }
916 other => {
918 trace!(?other, "unhandled litep2p event");
919 }
920 }
921 }
922
923 fn update_peer_info(&self) {
924 let peers: Vec<PeerStatus> = self
925 .peer_map
926 .validator_to_peer
927 .iter()
928 .map(|(&vid, pid)| PeerStatus {
929 validator_id: vid,
930 peer_id: pid.to_string(),
931 })
932 .collect();
933 let _ = self.peer_info_tx.send(peers);
934 }
935
936 async fn handle_command(&mut self, cmd: NetCommand) {
937 match cmd {
938 NetCommand::Broadcast(bytes) => {
939 for &peer in &self.notif_connected_peers {
941 let _ = self
942 .notif_handle
943 .send_sync_notification(peer, bytes.clone());
944 }
945 }
946 NetCommand::SendTo(target, bytes) => {
947 if let Some(&peer_id) = self.peer_map.validator_to_peer.get(&target) {
948 let _ = self
949 .reqresp_handle
950 .send_request(peer_id, bytes, DialOptions::Reject)
951 .await;
952 }
953 }
954 NetCommand::AddPeer(vid, pid, addrs) => {
955 info!(validator = %vid, peer = %pid, "adding peer");
956 self.peer_map.insert(vid, pid);
957 self.litep2p.add_known_address(pid, addrs.into_iter());
958 self.update_peer_info();
959 }
960 NetCommand::RemovePeer(vid) => {
961 if let Some(pid) = self.peer_map.remove(vid) {
962 info!(validator = %vid, peer = %pid, "removed peer");
963 } else {
964 warn!(validator = %vid, "peer not found for removal");
965 }
966 self.update_peer_info();
967 }
968 NetCommand::SyncRequest(peer_id, bytes) => {
969 let _ = self
970 .sync_handle
971 .send_request(peer_id, bytes, DialOptions::Reject)
972 .await;
973 }
974 NetCommand::SyncRespond(request_id, bytes) => {
975 self.sync_handle.send_response(request_id, bytes);
976 }
977 NetCommand::EpochChange(validators) => {
978 self.handle_epoch_change(validators).await;
979 }
980 NetCommand::BroadcastTx(bytes) => {
981 for &peer in &self.mempool_notif_connected_peers {
982 let _ = self
983 .mempool_notif_handle
984 .send_sync_notification(peer, bytes.clone());
985 }
986 }
987 }
988 }
989
990 async fn handle_mempool_notification_event(&mut self, event: NotificationEvent) {
991 match event {
992 NotificationEvent::ValidateSubstream { peer, .. } => {
993 self.mempool_notif_handle
995 .send_validation_result(peer, ValidationResult::Accept);
996 }
997 NotificationEvent::NotificationStreamOpened { peer, .. } => {
998 trace!(peer = %peer, "mempool notif stream opened");
999 self.mempool_notif_connected_peers.insert(peer);
1000 }
1001 NotificationEvent::NotificationStreamClosed { peer } => {
1002 trace!(peer = %peer, "mempool notif stream closed");
1003 self.mempool_notif_connected_peers.remove(&peer);
1004 }
1005 NotificationEvent::NotificationReceived {
1006 peer: _,
1007 notification,
1008 } => {
1009 let hash = blake3::hash(¬ification);
1010 let hash_bytes: [u8; 32] = *hash.as_bytes();
1011 if self.mempool_seen_active.contains(&hash_bytes)
1012 || self.mempool_seen_backup.contains(&hash_bytes)
1013 {
1014 return;
1015 }
1016 self.mempool_seen_active.insert(hash_bytes);
1017 if self.mempool_seen_active.len() > 100_000 {
1020 std::mem::swap(&mut self.mempool_seen_active, &mut self.mempool_seen_backup);
1021 self.mempool_seen_active.clear();
1022 }
1023 let _ = self.mempool_tx_tx.try_send(notification.freeze().to_vec());
1024 }
1025 NotificationEvent::NotificationStreamOpenFailure { peer, error } => {
1026 debug!(peer = %peer, ?error, "mempool notif stream open failed");
1027 }
1028 }
1029 }
1030
1031 async fn handle_epoch_change(
1032 &mut self,
1033 validators: Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>,
1034 ) {
1035 for (vid, pubkey) in &validators {
1037 if self.peer_map.validator_to_peer.contains_key(vid) {
1038 continue;
1039 }
1040 let pk_bytes = &pubkey.0;
1042 if let Ok(lpk) = litep2p::crypto::ed25519::PublicKey::try_from_bytes(pk_bytes) {
1043 let peer_id = lpk.to_peer_id();
1044 info!(validator = %vid, peer = %peer_id, "adding new epoch validator to peer_map");
1045 self.peer_map.insert(*vid, peer_id);
1046 }
1047 }
1048 let new_ids: HashSet<ValidatorId> = validators.iter().map(|(vid, _)| *vid).collect();
1050 let to_remove: Vec<ValidatorId> = self
1051 .peer_map
1052 .validator_to_peer
1053 .keys()
1054 .filter(|vid| !new_ids.contains(vid))
1055 .copied()
1056 .collect();
1057 for vid in to_remove {
1058 info!(validator = %vid, "removing validator from peer_map after epoch change");
1059 self.peer_map.remove(vid);
1060 }
1061 self.validator_ids_ordered = validators.iter().map(|(vid, _)| *vid).collect();
1063 self.validator_keys = validators.into_iter().collect();
1064 self.persistent_peers = self.peer_map.validator_to_peer.clone();
1066 self.update_peer_info();
1067 }
1068}
1069
1070#[derive(Clone)]
1073pub struct Litep2pNetworkSink {
1074 cmd_tx: mpsc::Sender<NetCommand>,
1075 epoch_tx: watch::Sender<EpochUpdate>,
1076}
1077
1078impl Litep2pNetworkSink {
1079 pub fn add_peer(&self, vid: ValidatorId, pid: PeerId, addrs: Vec<Multiaddr>) {
1080 if let Err(e) = self.cmd_tx.try_send(NetCommand::AddPeer(vid, pid, addrs)) {
1081 warn!("add_peer cmd dropped: {e}");
1082 }
1083 }
1084
1085 pub fn remove_peer(&self, vid: ValidatorId) {
1086 if let Err(e) = self.cmd_tx.try_send(NetCommand::RemovePeer(vid)) {
1087 warn!("remove_peer cmd dropped: {e}");
1088 }
1089 }
1090
1091 pub fn send_sync_request(&self, peer_id: PeerId, request: &SyncRequest) {
1092 match codec::encode(request) {
1093 Ok(bytes) => {
1094 if let Err(e) = self
1095 .cmd_tx
1096 .try_send(NetCommand::SyncRequest(peer_id, bytes))
1097 {
1098 warn!("sync request cmd dropped: {e}");
1099 }
1100 }
1101 Err(e) => warn!("sync request encode failed: {e}"),
1102 }
1103 }
1104
1105 pub fn send_sync_response(&self, request_id: RequestId, response: &SyncResponse) {
1106 match codec::encode(response) {
1107 Ok(bytes) => {
1108 if let Err(e) = self
1109 .cmd_tx
1110 .try_send(NetCommand::SyncRespond(request_id, bytes))
1111 {
1112 warn!("sync response cmd dropped: {e}");
1113 }
1114 }
1115 Err(e) => warn!("sync response encode failed: {e}"),
1116 }
1117 }
1118
1119 pub fn broadcast_tx(&self, tx_bytes: Vec<u8>) {
1121 if let Err(e) = self.cmd_tx.try_send(NetCommand::BroadcastTx(tx_bytes)) {
1122 warn!("broadcast_tx cmd dropped: {e}");
1123 }
1124 }
1125}
1126
1127impl NetworkSink for Litep2pNetworkSink {
1128 fn broadcast(&self, msg: ConsensusMessage) {
1129 match codec::encode(&msg) {
1130 Ok(bytes) => {
1131 if let Err(e) = self.cmd_tx.try_send(NetCommand::Broadcast(bytes)) {
1132 warn!("broadcast cmd dropped: {e}");
1133 }
1134 }
1135 Err(e) => warn!("broadcast encode failed: {e}"),
1136 }
1137 }
1138
1139 fn send_to(&self, target: ValidatorId, msg: ConsensusMessage) {
1140 match codec::encode(&msg) {
1141 Ok(bytes) => {
1142 if let Err(e) = self.cmd_tx.try_send(NetCommand::SendTo(target, bytes)) {
1143 warn!("send_to cmd dropped for {target}: {e}");
1144 }
1145 }
1146 Err(e) => warn!("send_to encode failed for {target}: {e}"),
1147 }
1148 }
1149
1150 fn on_epoch_change(&self, epoch: EpochNumber, new_validator_set: &hotmint_types::ValidatorSet) {
1151 let validators: Vec<_> = new_validator_set
1152 .validators()
1153 .iter()
1154 .map(|v| (v.id, v.public_key.clone()))
1155 .collect();
1156 let _ = self.epoch_tx.send(Some((epoch, validators)));
1158 }
1159
1160 fn broadcast_evidence(&self, proof: &hotmint_types::evidence::EquivocationProof) {
1161 let msg = ConsensusMessage::Evidence(proof.clone());
1162 match codec::encode(&msg) {
1163 Ok(bytes) => {
1164 if let Err(e) = self.cmd_tx.try_send(NetCommand::Broadcast(bytes)) {
1165 warn!("broadcast_evidence cmd dropped: {e}");
1166 }
1167 }
1168 Err(e) => warn!("broadcast_evidence encode failed: {e}"),
1169 }
1170 }
1171}