1use ruc::*;
2
3use std::collections::HashMap;
4
5use futures::StreamExt;
6use hotmint_consensus::network::NetworkSink;
7use hotmint_types::sync::{SyncRequest, SyncResponse};
8use hotmint_types::{ConsensusMessage, ValidatorId};
9use litep2p::config::ConfigBuilder;
10use litep2p::protocol::notification::{
11 ConfigBuilder as NotifConfigBuilder, NotificationEvent, NotificationHandle, ValidationResult,
12};
13use litep2p::protocol::request_response::{
14 ConfigBuilder as ReqRespConfigBuilder, DialOptions, RequestResponseEvent, RequestResponseHandle,
15};
16use litep2p::transport::tcp::config::Config as TcpConfig;
17use litep2p::types::RequestId;
18use litep2p::types::multiaddr::Multiaddr;
19use litep2p::{Litep2p, Litep2pEvent, PeerId};
20use serde::{Deserialize, Serialize};
21use tokio::sync::{mpsc, watch};
22use tracing::{debug, info, warn};
23
24use std::sync::{Arc, RwLock};
25
26use crate::peer::{PeerBook, PeerInfo};
27use crate::pex::{PexConfig, PexRequest, PexResponse};
28
29const NOTIF_PROTOCOL: &str = "/hotmint/consensus/notif/1";
30const REQ_RESP_PROTOCOL: &str = "/hotmint/consensus/reqresp/1";
31const SYNC_PROTOCOL: &str = "/hotmint/sync/1";
32const PEX_PROTOCOL: &str = "/hotmint/pex/1";
33const MAX_NOTIFICATION_SIZE: usize = 16 * 1024 * 1024;
34const MAINTENANCE_INTERVAL_SECS: u64 = 10;
35
36#[derive(Clone)]
38pub struct PeerMap {
39 pub validator_to_peer: HashMap<ValidatorId, PeerId>,
40 pub peer_to_validator: HashMap<PeerId, ValidatorId>,
41}
42
43impl PeerMap {
44 pub fn new() -> Self {
45 Self {
46 validator_to_peer: HashMap::new(),
47 peer_to_validator: HashMap::new(),
48 }
49 }
50
51 pub fn insert(&mut self, vid: ValidatorId, pid: PeerId) {
52 self.validator_to_peer.insert(vid, pid);
53 self.peer_to_validator.insert(pid, vid);
54 }
55
56 pub fn remove(&mut self, vid: ValidatorId) -> Option<PeerId> {
57 if let Some(pid) = self.validator_to_peer.remove(&vid) {
58 self.peer_to_validator.remove(&pid);
59 Some(pid)
60 } else {
61 None
62 }
63 }
64}
65
66impl Default for PeerMap {
67 fn default() -> Self {
68 Self::new()
69 }
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct PeerStatus {
75 pub validator_id: ValidatorId,
76 pub peer_id: String,
77}
78
79pub enum NetCommand {
81 Broadcast(Vec<u8>),
82 SendTo(ValidatorId, Vec<u8>),
83 AddPeer(ValidatorId, PeerId, Vec<Multiaddr>),
84 RemovePeer(ValidatorId),
85 SyncRequest(PeerId, Vec<u8>),
87 SyncRespond(RequestId, Vec<u8>),
89 EpochChange(Vec<(ValidatorId, hotmint_types::crypto::PublicKey)>),
91}
92
93pub struct IncomingSyncRequest {
95 pub request_id: RequestId,
96 pub peer: PeerId,
97 pub request: SyncRequest,
98}
99
100pub struct NetworkServiceHandles {
102 pub service: NetworkService,
103 pub sink: Litep2pNetworkSink,
104 pub msg_rx: mpsc::Receiver<(ValidatorId, ConsensusMessage)>,
105 pub sync_req_rx: mpsc::Receiver<IncomingSyncRequest>,
106 pub sync_resp_rx: mpsc::Receiver<SyncResponse>,
107 pub peer_info_rx: watch::Receiver<Vec<PeerStatus>>,
108 pub connected_count_rx: watch::Receiver<usize>,
109}
110
111pub struct NetworkService {
113 litep2p: Litep2p,
114 notif_handle: NotificationHandle,
115 reqresp_handle: RequestResponseHandle,
116 sync_handle: RequestResponseHandle,
117 pex_handle: RequestResponseHandle,
118 peer_map: PeerMap,
119 peer_book: Arc<RwLock<PeerBook>>,
120 pex_config: PexConfig,
121 persistent_peers: HashMap<ValidatorId, PeerId>,
122 msg_tx: mpsc::Sender<(ValidatorId, ConsensusMessage)>,
123 cmd_rx: mpsc::Receiver<NetCommand>,
124 sync_req_tx: mpsc::Sender<IncomingSyncRequest>,
125 sync_resp_tx: mpsc::Sender<SyncResponse>,
126 peer_info_tx: watch::Sender<Vec<PeerStatus>>,
127 connected_count_tx: watch::Sender<usize>,
128 connected_peers: std::collections::HashSet<PeerId>,
129}
130
131impl NetworkService {
132 pub fn create(
134 listen_addr: Multiaddr,
135 peer_map: PeerMap,
136 known_addresses: Vec<(PeerId, Vec<Multiaddr>)>,
137 keypair: Option<litep2p::crypto::ed25519::Keypair>,
138 peer_book: Arc<RwLock<PeerBook>>,
139 pex_config: PexConfig,
140 ) -> Result<NetworkServiceHandles> {
141 let (notif_config, notif_handle) = NotifConfigBuilder::new(NOTIF_PROTOCOL.into())
142 .with_max_size(MAX_NOTIFICATION_SIZE)
143 .with_handshake(vec![])
144 .with_auto_accept_inbound(true)
145 .with_sync_channel_size(1024)
146 .with_async_channel_size(1024)
147 .build();
148
149 let (reqresp_config, reqresp_handle) = ReqRespConfigBuilder::new(REQ_RESP_PROTOCOL.into())
150 .with_max_size(MAX_NOTIFICATION_SIZE)
151 .build();
152
153 let (sync_config, sync_handle) = ReqRespConfigBuilder::new(SYNC_PROTOCOL.into())
154 .with_max_size(MAX_NOTIFICATION_SIZE)
155 .build();
156
157 let (pex_config_proto, pex_handle) = ReqRespConfigBuilder::new(PEX_PROTOCOL.into())
158 .with_max_size(1024 * 1024) .build();
160
161 let mut config_builder = ConfigBuilder::new()
162 .with_tcp(TcpConfig {
163 listen_addresses: vec![listen_addr],
164 ..Default::default()
165 })
166 .with_notification_protocol(notif_config)
167 .with_request_response_protocol(reqresp_config)
168 .with_request_response_protocol(sync_config)
169 .with_request_response_protocol(pex_config_proto);
170
171 if let Some(kp) = keypair {
172 config_builder = config_builder.with_keypair(kp);
173 }
174
175 if !known_addresses.is_empty() {
176 config_builder = config_builder.with_known_addresses(known_addresses.into_iter());
177 }
178
179 let litep2p =
180 Litep2p::new(config_builder.build()).c(d!("failed to create litep2p instance"))?;
181
182 info!(peer_id = %litep2p.local_peer_id(), "litep2p started");
183 for addr in litep2p.listen_addresses() {
184 info!(address = %addr, "listening on");
185 }
186
187 let (msg_tx, msg_rx) = mpsc::channel(8192);
188 let (cmd_tx, cmd_rx) = mpsc::channel(4096);
189 let (sync_req_tx, sync_req_rx) = mpsc::channel(256);
190 let (sync_resp_tx, sync_resp_rx) = mpsc::channel(256);
191
192 let initial_peers: Vec<PeerStatus> = peer_map
194 .validator_to_peer
195 .iter()
196 .map(|(&vid, pid)| PeerStatus {
197 validator_id: vid,
198 peer_id: pid.to_string(),
199 })
200 .collect();
201 let (peer_info_tx, peer_info_rx) = watch::channel(initial_peers);
202
203 let sink = Litep2pNetworkSink {
204 cmd_tx: cmd_tx.clone(),
205 };
206
207 let (connected_count_tx, connected_count_rx) = watch::channel(0usize);
208
209 let persistent_peers: HashMap<ValidatorId, PeerId> = peer_map.validator_to_peer.clone();
211
212 Ok(NetworkServiceHandles {
213 service: Self {
214 litep2p,
215 notif_handle,
216 reqresp_handle,
217 sync_handle,
218 pex_handle,
219 peer_map,
220 peer_book,
221 pex_config,
222 persistent_peers,
223 msg_tx,
224 cmd_rx,
225 sync_req_tx,
226 sync_resp_tx,
227 peer_info_tx,
228 connected_count_tx,
229 connected_peers: std::collections::HashSet::new(),
230 },
231 sink,
232 msg_rx,
233 sync_req_rx,
234 sync_resp_rx,
235 peer_info_rx,
236 connected_count_rx,
237 })
238 }
239
240 pub fn local_peer_id(&self) -> &PeerId {
241 self.litep2p.local_peer_id()
242 }
243
244 pub async fn run(mut self) {
246 let mut maintenance_interval =
247 tokio::time::interval(tokio::time::Duration::from_secs(MAINTENANCE_INTERVAL_SECS));
248 let mut pex_interval = tokio::time::interval(tokio::time::Duration::from_secs(
249 self.pex_config.request_interval_secs,
250 ));
251 loop {
252 tokio::select! {
253 event = self.notif_handle.next() => {
254 if let Some(event) = event {
255 self.handle_notification_event(event);
256 }
257 }
258 event = self.reqresp_handle.next() => {
259 if let Some(event) = event {
260 self.handle_reqresp_event(event);
261 }
262 }
263 event = self.sync_handle.next() => {
264 if let Some(event) = event {
265 self.handle_sync_event(event);
266 }
267 }
268 event = self.pex_handle.next() => {
269 if let Some(event) = event {
270 self.handle_pex_event(event);
271 }
272 }
273 event = self.litep2p.next_event() => {
274 if let Some(event) = event {
275 self.handle_litep2p_event(event);
276 }
277 }
278 Some(cmd) = self.cmd_rx.recv() => {
279 self.handle_command(cmd).await;
280 }
281 _ = maintenance_interval.tick() => {
282 self.run_maintenance();
283 }
284 _ = pex_interval.tick() => {
285 if self.pex_config.enabled {
286 self.run_pex_round().await;
287 }
288 }
289 }
290 }
291 }
292
293 fn handle_notification_event(&mut self, event: NotificationEvent) {
294 match event {
295 NotificationEvent::ValidateSubstream { peer, .. } => {
296 self.notif_handle
297 .send_validation_result(peer, ValidationResult::Accept);
298 }
299 NotificationEvent::NotificationStreamOpened { peer, .. } => {
300 info!(peer = %peer, "notification stream opened");
301 }
302 NotificationEvent::NotificationStreamClosed { peer } => {
303 debug!(peer = %peer, "notification stream closed");
304 }
305 NotificationEvent::NotificationReceived { peer, notification } => {
306 let Some(sender) = self.peer_map.peer_to_validator.get(&peer).copied() else {
307 warn!(peer = %peer, "dropping notification from unknown peer");
308 return;
309 };
310 match serde_cbor_2::from_slice::<ConsensusMessage>(¬ification) {
311 Ok(msg) => {
312 if let Err(e) = self.msg_tx.try_send((sender, msg)) {
313 warn!("consensus message dropped (notification): {e}");
314 }
315 }
316 Err(e) => {
317 warn!(error = %e, peer = %peer, "failed to decode notification");
318 self.peer_book
319 .write()
320 .unwrap()
321 .adjust_score(&peer.to_string(), -10);
322 }
323 }
324 }
325 NotificationEvent::NotificationStreamOpenFailure { peer, error } => {
326 warn!(peer = %peer, error = ?error, "notification stream open failed");
327 }
328 }
329 }
330
331 fn handle_reqresp_event(&mut self, event: RequestResponseEvent) {
332 match event {
333 RequestResponseEvent::RequestReceived {
334 peer,
335 request_id,
336 request,
337 ..
338 } => {
339 let Some(sender) = self.peer_map.peer_to_validator.get(&peer).copied() else {
340 warn!(peer = %peer, "dropping request from unknown peer");
341 self.reqresp_handle.reject_request(request_id);
342 return;
343 };
344 match serde_cbor_2::from_slice::<ConsensusMessage>(&request) {
345 Ok(msg) => {
346 if let Err(e) = self.msg_tx.try_send((sender, msg)) {
347 warn!("consensus message dropped (reqresp): {e}");
348 }
349 self.reqresp_handle.send_response(request_id, vec![]);
350 }
351 Err(e) => {
352 warn!(error = %e, "failed to decode request");
353 self.reqresp_handle.reject_request(request_id);
354 }
355 }
356 }
357 RequestResponseEvent::ResponseReceived { .. } => {}
358 RequestResponseEvent::RequestFailed { peer, error, .. } => {
359 debug!(peer = %peer, error = ?error, "request failed");
360 }
361 }
362 }
363
364 fn handle_sync_event(&mut self, event: RequestResponseEvent) {
365 match event {
366 RequestResponseEvent::RequestReceived {
367 peer,
368 request_id,
369 request,
370 ..
371 } => {
372 if !self.peer_map.peer_to_validator.contains_key(&peer) {
373 warn!(peer = %peer, "rejecting sync request from unknown peer");
374 self.sync_handle.reject_request(request_id);
375 return;
376 }
377 match serde_cbor_2::from_slice::<SyncRequest>(&request) {
378 Ok(req) => {
379 if let Err(e) = self.sync_req_tx.try_send(IncomingSyncRequest {
380 request_id,
381 peer,
382 request: req,
383 }) {
384 warn!("sync request dropped: {e}");
385 }
386 }
387 Err(e) => {
388 warn!(error = %e, peer = %peer, "failed to decode sync request");
389 self.peer_book
390 .write()
391 .unwrap()
392 .adjust_score(&peer.to_string(), -5);
393 let err_resp = SyncResponse::Error(format!("decode error: {e}"));
394 if let Ok(bytes) = serde_cbor_2::to_vec(&err_resp) {
395 self.sync_handle.send_response(request_id, bytes);
396 } else {
397 self.sync_handle.reject_request(request_id);
398 }
399 }
400 }
401 }
402 RequestResponseEvent::ResponseReceived {
403 request_id: _,
404 response,
405 ..
406 } => {
407 match serde_cbor_2::from_slice::<SyncResponse>(&response) {
409 Ok(resp) => {
410 if let Err(e) = self.sync_resp_tx.try_send(resp) {
411 warn!("sync response dropped: {e}");
412 }
413 }
414 Err(e) => {
415 warn!(error = %e, "failed to decode sync response");
416 }
417 }
418 }
419 RequestResponseEvent::RequestFailed { peer, error, .. } => {
420 debug!(peer = %peer, error = ?error, "sync request failed");
421 if let Err(e) = self
422 .sync_resp_tx
423 .try_send(SyncResponse::Error(format!("request failed: {error:?}")))
424 {
425 warn!("sync error response dropped: {e}");
426 }
427 }
428 }
429 }
430
431 fn handle_pex_event(&mut self, event: RequestResponseEvent) {
432 match event {
433 RequestResponseEvent::RequestReceived {
434 peer,
435 request_id,
436 request,
437 ..
438 } => {
439 if !self.peer_map.peer_to_validator.contains_key(&peer)
441 && !self.connected_peers.contains(&peer)
442 {
443 warn!(peer = %peer, "rejecting PEX request from unknown peer");
444 self.pex_handle.reject_request(request_id);
445 return;
446 }
447 match serde_cbor_2::from_slice::<PexRequest>(&request) {
448 Ok(PexRequest::GetPeers) => {
449 let book = self.peer_book.read().unwrap();
450 let private = &self.pex_config.private_peer_ids;
451 let peers: Vec<PeerInfo> = book
452 .get_random_peers(self.pex_config.max_peers_per_response)
453 .into_iter()
454 .filter(|p| p.peer_id != peer.to_string())
455 .filter(|p| !private.contains(&p.peer_id))
457 .cloned()
458 .collect();
459 let resp = PexResponse::Peers(peers);
460 if let Ok(bytes) = serde_cbor_2::to_vec(&resp) {
461 self.pex_handle.send_response(request_id, bytes);
462 }
463 }
464 Ok(PexRequest::Advertise {
465 role,
466 validator_id,
467 addresses,
468 }) => {
469 if let Some(vid) = validator_id
471 && let Some(&expected_peer) =
472 self.peer_map.validator_to_peer.get(&ValidatorId(vid))
473 && expected_peer != peer
474 {
475 warn!(
476 peer = %peer,
477 claimed_vid = vid,
478 "PEX Advertise validator_id mismatch, rejecting"
479 );
480 self.pex_handle.reject_request(request_id);
481 return;
482 }
483 let mut info = PeerInfo::new(
484 peer,
485 role,
486 addresses.iter().filter_map(|a| a.parse().ok()).collect(),
487 );
488 if let Some(vid) = validator_id {
489 info = info.with_validator(ValidatorId(vid));
490 }
491 self.peer_book.write().unwrap().add_peer(info);
492 if let Ok(bytes) = serde_cbor_2::to_vec(&PexResponse::Ack) {
493 self.pex_handle.send_response(request_id, bytes);
494 }
495 }
496 Err(e) => {
497 warn!(error = %e, "failed to decode PEX request");
498 self.pex_handle.reject_request(request_id);
499 }
500 }
501 }
502 RequestResponseEvent::ResponseReceived { response, .. } => {
503 if let Ok(PexResponse::Peers(peers)) = serde_cbor_2::from_slice(&response) {
504 let mut book = self.peer_book.write().unwrap();
505 for peer in peers {
506 if !peer.is_banned() {
507 book.add_peer(peer);
508 }
509 }
510 }
511 }
512 _ => {}
513 }
514 }
515
516 fn run_maintenance(&mut self) {
518 for (&_vid, &pid) in &self.persistent_peers {
520 if !self.connected_peers.contains(&pid)
521 && let Some(info) = self.peer_book.read().unwrap().get(&pid.to_string())
522 {
523 let addrs: Vec<Multiaddr> = info
524 .addresses
525 .iter()
526 .filter_map(|a| a.parse().ok())
527 .collect();
528 if !addrs.is_empty() {
529 self.litep2p.add_known_address(pid, addrs.into_iter());
530 }
531 }
532 }
533
534 let max = self.pex_config.max_peers;
536 if self.connected_peers.len() < max * 4 / 5 {
537 let book = self.peer_book.read().unwrap();
538 let candidates = book.get_random_peers(5);
539 for peer in candidates {
540 if let Ok(pid) = peer.peer_id.parse::<PeerId>()
541 && !self.connected_peers.contains(&pid)
542 {
543 let addrs: Vec<Multiaddr> = peer
544 .addresses
545 .iter()
546 .filter_map(|a| a.parse().ok())
547 .collect();
548 if !addrs.is_empty() {
549 self.litep2p.add_known_address(pid, addrs.into_iter());
550 }
551 }
552 }
553 }
554
555 self.peer_book.write().unwrap().prune_stale(86400);
557 if let Err(e) = self.peer_book.read().unwrap().save() {
558 warn!(%e, "failed to save peer book");
559 }
560 }
561
562 async fn run_pex_round(&mut self) {
564 if self.connected_peers.is_empty() {
565 return;
566 }
567 let peers: Vec<PeerId> = self.connected_peers.iter().copied().collect();
569 let idx = rand::random::<usize>() % peers.len();
570 let target = peers[idx];
571
572 if let Ok(bytes) = serde_cbor_2::to_vec(&PexRequest::GetPeers) {
573 let _ = self
574 .pex_handle
575 .send_request(target, bytes, DialOptions::Reject)
576 .await;
577 }
578 }
579
580 fn handle_litep2p_event(&mut self, event: Litep2pEvent) {
581 match event {
582 Litep2pEvent::ConnectionEstablished { peer, endpoint } => {
583 if self.connected_peers.len() >= self.pex_config.max_peers {
585 warn!(
586 peer = %peer,
587 total = self.connected_peers.len(),
588 max = self.pex_config.max_peers,
589 "connection limit reached, ignoring new peer"
590 );
591 return;
592 }
593
594 info!(peer = %peer, endpoint = ?endpoint, "connection established");
595 self.connected_peers.insert(peer);
596 let _ = self.connected_count_tx.send(self.connected_peers.len());
597 if let Some(info) = self.peer_book.write().unwrap().get_mut(&peer.to_string()) {
599 info.touch();
600 }
601 }
602 Litep2pEvent::ConnectionClosed { peer, .. } => {
603 debug!(peer = %peer, "connection closed");
604 self.connected_peers.remove(&peer);
605 let _ = self.connected_count_tx.send(self.connected_peers.len());
606 }
607 Litep2pEvent::DialFailure { address, error, .. } => {
608 warn!(address = %address, error = ?error, "dial failed");
609 }
610 _ => {}
611 }
612 }
613
614 fn update_peer_info(&self) {
615 let peers: Vec<PeerStatus> = self
616 .peer_map
617 .validator_to_peer
618 .iter()
619 .map(|(&vid, pid)| PeerStatus {
620 validator_id: vid,
621 peer_id: pid.to_string(),
622 })
623 .collect();
624 let _ = self.peer_info_tx.send(peers);
625 }
626
627 async fn handle_command(&mut self, cmd: NetCommand) {
628 match cmd {
629 NetCommand::Broadcast(bytes) => {
630 for &peer in self.peer_map.peer_to_validator.keys() {
631 let _ = self
632 .notif_handle
633 .send_sync_notification(peer, bytes.clone());
634 }
635 }
636 NetCommand::SendTo(target, bytes) => {
637 if let Some(&peer_id) = self.peer_map.validator_to_peer.get(&target) {
638 let _ = self
639 .reqresp_handle
640 .send_request(peer_id, bytes, DialOptions::Reject)
641 .await;
642 }
643 }
644 NetCommand::AddPeer(vid, pid, addrs) => {
645 info!(validator = %vid, peer = %pid, "adding peer");
646 self.peer_map.insert(vid, pid);
647 self.litep2p.add_known_address(pid, addrs.into_iter());
648 self.update_peer_info();
649 }
650 NetCommand::RemovePeer(vid) => {
651 if let Some(pid) = self.peer_map.remove(vid) {
652 info!(validator = %vid, peer = %pid, "removed peer");
653 } else {
654 warn!(validator = %vid, "peer not found for removal");
655 }
656 self.update_peer_info();
657 }
658 NetCommand::SyncRequest(peer_id, bytes) => {
659 let _ = self
660 .sync_handle
661 .send_request(peer_id, bytes, DialOptions::Reject)
662 .await;
663 }
664 NetCommand::SyncRespond(request_id, bytes) => {
665 self.sync_handle.send_response(request_id, bytes);
666 }
667 NetCommand::EpochChange(validators) => {
668 for (vid, pubkey) in &validators {
670 if self.peer_map.validator_to_peer.contains_key(vid) {
671 continue;
672 }
673 let pk_bytes = &pubkey.0;
675 if let Ok(lpk) = litep2p::crypto::ed25519::PublicKey::try_from_bytes(pk_bytes) {
676 let peer_id = lpk.to_peer_id();
677 info!(validator = %vid, peer = %peer_id, "adding new epoch validator to peer_map");
678 self.peer_map.insert(*vid, peer_id);
679 }
680 }
681 let new_ids: std::collections::HashSet<ValidatorId> =
683 validators.iter().map(|(vid, _)| *vid).collect();
684 let to_remove: Vec<ValidatorId> = self
685 .peer_map
686 .validator_to_peer
687 .keys()
688 .filter(|vid| !new_ids.contains(vid))
689 .copied()
690 .collect();
691 for vid in to_remove {
692 info!(validator = %vid, "removing validator from peer_map after epoch change");
693 self.peer_map.remove(vid);
694 }
695 self.update_peer_info();
696 }
697 }
698 }
699}
700
701#[derive(Clone)]
704pub struct Litep2pNetworkSink {
705 cmd_tx: mpsc::Sender<NetCommand>,
706}
707
708impl Litep2pNetworkSink {
709 pub fn add_peer(&self, vid: ValidatorId, pid: PeerId, addrs: Vec<Multiaddr>) {
710 if let Err(e) = self.cmd_tx.try_send(NetCommand::AddPeer(vid, pid, addrs)) {
711 warn!("add_peer cmd dropped: {e}");
712 }
713 }
714
715 pub fn remove_peer(&self, vid: ValidatorId) {
716 if let Err(e) = self.cmd_tx.try_send(NetCommand::RemovePeer(vid)) {
717 warn!("remove_peer cmd dropped: {e}");
718 }
719 }
720
721 pub fn send_sync_request(&self, peer_id: PeerId, request: &SyncRequest) {
722 if let Ok(bytes) = serde_cbor_2::to_vec(request)
723 && let Err(e) = self
724 .cmd_tx
725 .try_send(NetCommand::SyncRequest(peer_id, bytes))
726 {
727 warn!("sync request cmd dropped: {e}");
728 }
729 }
730
731 pub fn send_sync_response(&self, request_id: RequestId, response: &SyncResponse) {
732 if let Ok(bytes) = serde_cbor_2::to_vec(response)
733 && let Err(e) = self
734 .cmd_tx
735 .try_send(NetCommand::SyncRespond(request_id, bytes))
736 {
737 warn!("sync response cmd dropped: {e}");
738 }
739 }
740}
741
742impl NetworkSink for Litep2pNetworkSink {
743 fn broadcast(&self, msg: ConsensusMessage) {
744 if let Ok(bytes) = serde_cbor_2::to_vec(&msg)
745 && let Err(e) = self.cmd_tx.try_send(NetCommand::Broadcast(bytes))
746 {
747 warn!("broadcast cmd dropped: {e}");
748 }
749 }
750
751 fn send_to(&self, target: ValidatorId, msg: ConsensusMessage) {
752 if let Ok(bytes) = serde_cbor_2::to_vec(&msg)
753 && let Err(e) = self.cmd_tx.try_send(NetCommand::SendTo(target, bytes))
754 {
755 warn!("send_to cmd dropped for {target}: {e}");
756 }
757 }
758
759 fn on_epoch_change(&self, new_validator_set: &hotmint_types::ValidatorSet) {
760 let validators: Vec<_> = new_validator_set
761 .validators()
762 .iter()
763 .map(|v| (v.id, v.public_key.clone()))
764 .collect();
765 if let Err(e) = self.cmd_tx.try_send(NetCommand::EpochChange(validators)) {
766 warn!("epoch change cmd dropped: {e}");
767 }
768 }
769}