1use crate::{
2 cached_view::CachedView,
3 codecs::{
4 gossipsub::GossipsubMessageHandler,
5 request_response::RequestResponseMessageHandler,
6 },
7 config::{
8 Config,
9 NotInitialized,
10 },
11 gossipsub::messages::{
12 GossipTopicTag,
13 GossipsubBroadcastRequest,
14 GossipsubMessage,
15 },
16 p2p_service::{
17 FuelP2PEvent,
18 FuelP2PService,
19 },
20 peer_manager::PeerInfo,
21 ports::{
22 BlockHeightImporter,
23 P2PPreConfirmationGossipData,
24 P2PPreConfirmationMessage,
25 P2pDb,
26 TxPool,
27 },
28 request_response::messages::{
29 OnResponse,
30 OnResponseWithPeerSelection,
31 RequestMessage,
32 ResponseMessageErrorCode,
33 ResponseSender,
34 V2ResponseMessage,
35 },
36};
37use anyhow::anyhow;
38use fuel_core_metrics::p2p_metrics::set_blocks_requested;
39use fuel_core_services::{
40 AsyncProcessor,
41 RunnableService,
42 RunnableTask,
43 ServiceRunner,
44 StateWatcher,
45 SyncProcessor,
46 TaskNextAction,
47 TraceErr,
48 stream::BoxStream,
49};
50use fuel_core_storage::transactional::AtomicView;
51use fuel_core_types::{
52 blockchain::SealedBlockHeader,
53 fuel_tx::{
54 Transaction,
55 TxId,
56 UniqueIdentifier,
57 },
58 fuel_types::{
59 BlockHeight,
60 ChainId,
61 },
62 services::p2p::{
63 BlockHeightHeartbeatData,
64 GossipData,
65 GossipsubMessageAcceptance,
66 GossipsubMessageInfo,
67 NetworkableTransactionPool,
68 PeerId as FuelPeerId,
69 TransactionGossipData,
70 Transactions,
71 peer_reputation::{
72 AppScore,
73 PeerReport,
74 },
75 },
76};
77use futures::{
78 StreamExt,
79 future::BoxFuture,
80};
81use libp2p::{
82 PeerId,
83 gossipsub::{
84 MessageAcceptance,
85 MessageId,
86 PublishError,
87 },
88 request_response::InboundRequestId,
89};
90use std::{
91 fmt::Debug,
92 future::Future,
93 ops::Range,
94 sync::Arc,
95};
96use thiserror::Error;
97use tokio::{
98 sync::{
99 broadcast,
100 mpsc::{
101 self,
102 Receiver,
103 },
104 oneshot,
105 },
106 time::{
107 Duration,
108 Instant,
109 },
110};
111use tracing::warn;
112
113#[cfg(test)]
114pub mod broadcast_tests;
115#[cfg(test)]
116pub mod task_tests;
117
118const CHANNEL_SIZE: usize = 1024 * 10;
119
120pub type Service<V, T> = ServiceRunner<UninitializedTask<V, SharedState, T>>;
121
122#[derive(Debug, Error)]
123pub enum TaskError {
124 #[error("No peer found to send request to")]
125 NoPeerFound,
126}
127
128pub enum TaskRequest {
129 BroadcastTransaction(Arc<Transaction>),
131 BroadcastPreConfirmations(Arc<P2PPreConfirmationMessage>),
133 GetAllPeerInfo {
135 channel: oneshot::Sender<Vec<(PeerId, PeerInfo)>>,
136 },
137 GetSealedHeaders {
138 block_height_range: Range<u32>,
139 channel: OnResponseWithPeerSelection<
140 Result<Vec<SealedBlockHeader>, ResponseMessageErrorCode>,
141 >,
142 },
143 GetTransactions {
144 block_height_range: Range<u32>,
145 channel: OnResponseWithPeerSelection<
146 Result<Vec<Transactions>, ResponseMessageErrorCode>,
147 >,
148 },
149 GetTransactionsFromPeer {
150 block_height_range: Range<u32>,
151 from_peer: PeerId,
152 channel: OnResponse<Result<Vec<Transactions>, ResponseMessageErrorCode>>,
153 },
154 TxPoolGetAllTxIds {
155 from_peer: PeerId,
156 channel: OnResponse<Result<Vec<TxId>, ResponseMessageErrorCode>>,
157 },
158 TxPoolGetFullTransactions {
159 tx_ids: Vec<TxId>,
160 from_peer: PeerId,
161 channel: OnResponse<
162 Result<Vec<Option<NetworkableTransactionPool>>, ResponseMessageErrorCode>,
163 >,
164 },
165 RespondWithGossipsubMessageReport((GossipsubMessageInfo, GossipsubMessageAcceptance)),
167 RespondWithPeerReport {
168 peer_id: PeerId,
169 score: AppScore,
170 reporting_service: &'static str,
171 },
172 DatabaseTransactionsLookUp {
173 response: Result<Vec<Transactions>, ResponseMessageErrorCode>,
174 request_id: InboundRequestId,
175 },
176 DatabaseHeaderLookUp {
177 response: Result<Vec<SealedBlockHeader>, ResponseMessageErrorCode>,
178 request_id: InboundRequestId,
179 },
180 TxPoolAllTransactionsIds {
181 response: Result<Vec<TxId>, ResponseMessageErrorCode>,
182 request_id: InboundRequestId,
183 },
184 TxPoolFullTransactions {
185 response:
186 Result<Vec<Option<NetworkableTransactionPool>>, ResponseMessageErrorCode>,
187 request_id: InboundRequestId,
188 },
189}
190
191impl Debug for TaskRequest {
192 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193 match self {
194 TaskRequest::BroadcastTransaction(_) => {
195 write!(f, "TaskRequest::BroadcastTransaction")
196 }
197 TaskRequest::BroadcastPreConfirmations(_) => {
198 write!(f, "TaskRequest::BroadcastPreConfirmations")
199 }
200 TaskRequest::GetSealedHeaders { .. } => {
201 write!(f, "TaskRequest::GetSealedHeaders")
202 }
203 TaskRequest::GetTransactions { .. } => {
204 write!(f, "TaskRequest::GetTransactions")
205 }
206 TaskRequest::GetTransactionsFromPeer { .. } => {
207 write!(f, "TaskRequest::GetTransactionsFromPeer")
208 }
209 TaskRequest::TxPoolGetAllTxIds { .. } => {
210 write!(f, "TaskRequest::TxPoolGetAllTxIds")
211 }
212 TaskRequest::TxPoolGetFullTransactions { .. } => {
213 write!(f, "TaskRequest::TxPoolGetFullTransactions")
214 }
215 TaskRequest::RespondWithGossipsubMessageReport(_) => {
216 write!(f, "TaskRequest::RespondWithGossipsubMessageReport")
217 }
218 TaskRequest::RespondWithPeerReport { .. } => {
219 write!(f, "TaskRequest::RespondWithPeerReport")
220 }
221 TaskRequest::GetAllPeerInfo { .. } => {
222 write!(f, "TaskRequest::GetPeerInfo")
223 }
224 TaskRequest::DatabaseTransactionsLookUp { .. } => {
225 write!(f, "TaskRequest::DatabaseTransactionsLookUp")
226 }
227 TaskRequest::DatabaseHeaderLookUp { .. } => {
228 write!(f, "TaskRequest::DatabaseHeaderLookUp")
229 }
230 TaskRequest::TxPoolAllTransactionsIds { .. } => {
231 write!(f, "TaskRequest::TxPoolAllTransactionsIds")
232 }
233 TaskRequest::TxPoolFullTransactions { .. } => {
234 write!(f, "TaskRequest::TxPoolFullTransactions")
235 }
236 }
237 }
238}
239
240#[derive(Debug, Clone, Copy, PartialEq, Eq)]
241pub enum HeartBeatPeerReportReason {
242 OldHeartBeat,
243 LowHeartBeatFrequency,
244}
245
246pub trait TaskP2PService: Send {
247 fn get_all_peer_info(&self) -> Vec<(&PeerId, &PeerInfo)>;
248 fn get_peer_id_with_height(&self, height: &BlockHeight) -> Option<PeerId>;
249
250 fn next_event(&mut self) -> BoxFuture<'_, Option<FuelP2PEvent>>;
251
252 fn publish_message(
253 &mut self,
254 message: GossipsubBroadcastRequest,
255 ) -> anyhow::Result<()>;
256
257 fn send_request_msg(
258 &mut self,
259 peer_id: Option<PeerId>,
260 request_msg: RequestMessage,
261 on_response: ResponseSender,
262 ) -> anyhow::Result<()>;
263
264 fn send_response_msg(
265 &mut self,
266 request_id: InboundRequestId,
267 message: V2ResponseMessage,
268 ) -> anyhow::Result<()>;
269
270 fn report_message(
271 &mut self,
272 message: GossipsubMessageInfo,
273 acceptance: GossipsubMessageAcceptance,
274 ) -> anyhow::Result<()>;
275
276 fn report_peer(
277 &mut self,
278 peer_id: PeerId,
279 score: AppScore,
280 reporting_service: &str,
281 ) -> anyhow::Result<()>;
282
283 fn update_block_height(&mut self, height: BlockHeight) -> anyhow::Result<()>;
284
285 fn update_metrics<T>(&self, update_fn: T)
286 where
287 T: FnOnce();
288}
289
290impl TaskP2PService for FuelP2PService {
291 fn update_metrics<T>(&self, update_fn: T)
292 where
293 T: FnOnce(),
294 {
295 FuelP2PService::update_metrics(self, update_fn)
296 }
297
298 fn get_all_peer_info(&self) -> Vec<(&PeerId, &PeerInfo)> {
299 self.peer_manager().get_all_peers().collect()
300 }
301
302 fn get_peer_id_with_height(&self, height: &BlockHeight) -> Option<PeerId> {
303 self.peer_manager().get_peer_id_with_height(height)
304 }
305
306 fn next_event(&mut self) -> BoxFuture<'_, Option<FuelP2PEvent>> {
307 Box::pin(self.next_event())
308 }
309
310 fn publish_message(
311 &mut self,
312 message: GossipsubBroadcastRequest,
313 ) -> anyhow::Result<()> {
314 let result = self.publish_message(message);
315
316 match result {
317 Ok(_) => Ok(()),
318 Err(e) => {
319 if matches!(&e, PublishError::InsufficientPeers) {
320 Ok(())
321 } else {
322 Err(anyhow!(e))
323 }
324 }
325 }
326 }
327
328 fn send_request_msg(
329 &mut self,
330 peer_id: Option<PeerId>,
331 request_msg: RequestMessage,
332 on_response: ResponseSender,
333 ) -> anyhow::Result<()> {
334 self.send_request_msg(peer_id, request_msg, on_response)?;
335 Ok(())
336 }
337
338 fn send_response_msg(
339 &mut self,
340 request_id: InboundRequestId,
341 message: V2ResponseMessage,
342 ) -> anyhow::Result<()> {
343 self.send_response_msg(request_id, message)?;
344 Ok(())
345 }
346
347 fn report_message(
348 &mut self,
349 message: GossipsubMessageInfo,
350 acceptance: GossipsubMessageAcceptance,
351 ) -> anyhow::Result<()> {
352 report_message(self, message, acceptance);
353 Ok(())
354 }
355
356 fn report_peer(
357 &mut self,
358 peer_id: PeerId,
359 score: AppScore,
360 reporting_service: &str,
361 ) -> anyhow::Result<()> {
362 self.report_peer(peer_id, score, reporting_service);
363 Ok(())
364 }
365
366 fn update_block_height(&mut self, height: BlockHeight) -> anyhow::Result<()> {
367 self.update_block_height(height);
368 Ok(())
369 }
370}
371
372pub trait Broadcast: Send {
373 fn report_peer(
374 &self,
375 peer_id: FuelPeerId,
376 report: AppScore,
377 reporting_service: &'static str,
378 ) -> anyhow::Result<()>;
379
380 fn block_height_broadcast(
381 &self,
382 block_height_data: BlockHeightHeartbeatData,
383 ) -> anyhow::Result<()>;
384
385 fn tx_broadcast(&self, transaction: TransactionGossipData) -> anyhow::Result<()>;
386
387 fn pre_confirmation_broadcast(
388 &self,
389 confirmations: P2PPreConfirmationGossipData,
390 ) -> anyhow::Result<()>;
391
392 fn new_tx_subscription_broadcast(&self, peer_id: FuelPeerId) -> anyhow::Result<()>;
393}
394
395impl Broadcast for SharedState {
396 fn report_peer(
397 &self,
398 peer_id: FuelPeerId,
399 report: AppScore,
400 reporting_service: &'static str,
401 ) -> anyhow::Result<()> {
402 self.report_peer(peer_id, report, reporting_service)
403 }
404
405 fn block_height_broadcast(
406 &self,
407 block_height_data: BlockHeightHeartbeatData,
408 ) -> anyhow::Result<()> {
409 self.block_height_broadcast.send(block_height_data)?;
410 Ok(())
411 }
412
413 fn tx_broadcast(&self, transaction: TransactionGossipData) -> anyhow::Result<()> {
414 self.tx_broadcast.send(transaction)?;
415 Ok(())
416 }
417
418 fn pre_confirmation_broadcast(
419 &self,
420 confirmations: P2PPreConfirmationGossipData,
421 ) -> anyhow::Result<()> {
422 self.pre_confirmations_broadcast.send(confirmations)?;
423 Ok(())
424 }
425
426 fn new_tx_subscription_broadcast(&self, peer_id: FuelPeerId) -> anyhow::Result<()> {
427 self.new_tx_subscription_broadcast.send(peer_id)?;
428 Ok(())
429 }
430}
431
432pub struct UninitializedTask<V, B, T> {
434 chain_id: ChainId,
435 last_height: BlockHeight,
436 view_provider: V,
437 next_block_height: BoxStream<BlockHeight>,
438 request_receiver: mpsc::Receiver<TaskRequest>,
440 broadcast: B,
441 tx_pool: T,
442 config: Config<NotInitialized>,
443}
444
445pub struct Task<P, V, B, T> {
448 chain_id: ChainId,
449 response_timeout: Duration,
450 p2p_service: P,
451 view_provider: V,
452 next_block_height: BoxStream<BlockHeight>,
453 request_receiver: mpsc::Receiver<TaskRequest>,
455 request_sender: mpsc::Sender<TaskRequest>,
456 db_heavy_task_processor: SyncProcessor,
457 tx_pool_heavy_task_processor: AsyncProcessor,
458 broadcast: B,
459 tx_pool: T,
460 max_headers_per_request: usize,
461 max_txs_per_request: usize,
462 heartbeat_check_interval: Duration,
464 heartbeat_max_avg_interval: Duration,
465 heartbeat_max_time_since_last: Duration,
466 next_check_time: Instant,
467 heartbeat_peer_reputation_config: HeartbeatPeerReputationConfig,
468 cached_view: Arc<CachedView>,
470}
471
472impl<P, V, B, T> Task<P, V, B, T>
473where
474 P: TaskP2PService,
475 B: Broadcast,
476{
477 pub(crate) fn broadcast_gossip_message(
478 &mut self,
479 message: GossipsubMessage,
480 message_id: MessageId,
481 peer_id: PeerId,
482 ) {
483 let message_id = message_id.0;
484
485 match message {
486 GossipsubMessage::NewTx(transaction) => {
487 let next_transaction = GossipData::new(transaction, peer_id, message_id);
488 let _ = self.broadcast.tx_broadcast(next_transaction);
489 }
490 GossipsubMessage::TxPreConfirmations(confirmations) => {
491 let data = GossipData::new(confirmations, peer_id, message_id);
492 let _ = self.broadcast.pre_confirmation_broadcast(data);
493 }
494 }
495 }
496}
497
498#[derive(Default, Clone)]
499pub struct HeartbeatPeerReputationConfig {
500 old_heartbeat_penalty: AppScore,
501 low_heartbeat_frequency_penalty: AppScore,
502}
503
504impl<V, T> UninitializedTask<V, SharedState, T> {
505 #[allow(clippy::too_many_arguments)]
506 pub fn new<B: BlockHeightImporter>(
507 chain_id: ChainId,
508 last_height: BlockHeight,
509 config: Config<NotInitialized>,
510 shared_state: SharedState,
511 request_receiver: Receiver<TaskRequest>,
512 view_provider: V,
513 block_importer: B,
514 tx_pool: T,
515 ) -> Self {
516 let next_block_height = block_importer.next_block_height();
517
518 Self {
519 chain_id,
520 last_height,
521 view_provider,
522 tx_pool,
523 next_block_height,
524 request_receiver,
525 broadcast: shared_state,
526 config,
527 }
528 }
529}
530
531impl<P, V, B, T> Task<P, V, B, T>
532where
533 P: TaskP2PService,
534 V: AtomicView,
535 B: Broadcast,
536{
537 fn peer_heartbeat_reputation_checks(&self) -> anyhow::Result<()> {
538 for (peer_id, peer_info) in self.p2p_service.get_all_peer_info() {
539 if peer_info.heartbeat_data.duration_since_last_heartbeat()
540 > self.heartbeat_max_time_since_last
541 {
542 tracing::debug!("Peer {:?} has old heartbeat", peer_id);
543 let report = HeartBeatPeerReportReason::OldHeartBeat;
544 let peer_id = convert_peer_id(peer_id)?;
545 self.report_peer(peer_id, report)?;
546 } else if peer_info.heartbeat_data.average_time_between_heartbeats()
547 > self.heartbeat_max_avg_interval
548 {
549 tracing::debug!("Peer {:?} has low heartbeat frequency", peer_id);
550 let report = HeartBeatPeerReportReason::LowHeartBeatFrequency;
551 let peer_id = convert_peer_id(peer_id)?;
552 self.report_peer(peer_id, report)?;
553 }
554 }
555 Ok(())
556 }
557
558 fn report_peer(
559 &self,
560 peer_id: FuelPeerId,
561 report: HeartBeatPeerReportReason,
562 ) -> anyhow::Result<()> {
563 let app_score = match report {
564 HeartBeatPeerReportReason::OldHeartBeat => {
565 self.heartbeat_peer_reputation_config.old_heartbeat_penalty
566 }
567 HeartBeatPeerReportReason::LowHeartBeatFrequency => {
568 self.heartbeat_peer_reputation_config
569 .low_heartbeat_frequency_penalty
570 }
571 };
572 let reporting_service = "p2p";
573 self.broadcast
574 .report_peer(peer_id, app_score, reporting_service)?;
575 Ok(())
576 }
577}
578
579impl<P, V, B, T> Task<P, V, B, T>
580where
581 P: TaskP2PService + 'static,
582 V: AtomicView + 'static,
583 V::LatestView: P2pDb,
584 T: TxPool + 'static,
585 B: Send,
586{
587 fn update_metrics<U>(&self, update_fn: U)
588 where
589 U: FnOnce(),
590 {
591 self.p2p_service.update_metrics(update_fn)
592 }
593
594 fn process_request(
595 &mut self,
596 request_message: RequestMessage,
597 request_id: InboundRequestId,
598 ) -> anyhow::Result<()> {
599 match request_message {
600 RequestMessage::Transactions(range) => {
601 self.handle_transactions_request(range, request_id)
602 }
603 RequestMessage::SealedHeaders(range) => {
604 self.handle_sealed_headers_request(range, request_id)
605 }
606 RequestMessage::TxPoolAllTransactionsIds => {
607 self.handle_all_transactions_ids_request(request_id)
608 }
609 RequestMessage::TxPoolFullTransactions(tx_ids) => {
610 self.handle_full_transactions_request(tx_ids, request_id)
611 }
612 }
613 }
614
615 fn handle_db_request<DbLookUpFn, ResponseSenderFn, TaskRequestFn, R>(
616 &mut self,
617 range: Range<u32>,
618 request_id: InboundRequestId,
619 response_sender: ResponseSenderFn,
620 db_lookup: DbLookUpFn,
621 task_request: TaskRequestFn,
622 max_len: usize,
623 ) -> anyhow::Result<()>
624 where
625 DbLookUpFn: Fn(&V::LatestView, &Arc<CachedView>, Range<u32>) -> anyhow::Result<Option<R>>
626 + Send
627 + 'static,
628 ResponseSenderFn:
629 Fn(Result<R, ResponseMessageErrorCode>) -> V2ResponseMessage + Send + 'static,
630 TaskRequestFn: Fn(Result<R, ResponseMessageErrorCode>, InboundRequestId) -> TaskRequest
631 + Send
632 + 'static,
633 R: Send + 'static,
634 {
635 let instant = Instant::now();
636 let timeout = self.response_timeout;
637 let response_channel = self.request_sender.clone();
638 let range_len = range.len();
639
640 self.update_metrics(|| set_blocks_requested(range_len));
641
642 if range_len > max_len {
643 tracing::error!(
644 requested_length = range.len(),
645 max_len,
646 "Requested range is too large"
647 );
648 let response = Err(ResponseMessageErrorCode::RequestedRangeTooLarge);
649 let _ = self
650 .p2p_service
651 .send_response_msg(request_id, response_sender(response));
652 return Ok(());
653 }
654
655 let view = self.view_provider.latest_view()?;
656 let result = self.db_heavy_task_processor.try_spawn({
657 let cached_view = self.cached_view.clone();
658 move || {
659 if instant.elapsed() > timeout {
660 tracing::warn!("Request timed out");
661 return;
662 }
663
664 let response = db_lookup(&view, &cached_view, range.clone())
665 .ok()
666 .flatten()
667 .ok_or(ResponseMessageErrorCode::Timeout);
668
669 let _ = response_channel
670 .try_send(task_request(response, request_id))
671 .trace_err("Failed to send response to the request channel");
672 }
673 });
674
675 if result.is_err() {
676 let err = Err(ResponseMessageErrorCode::SyncProcessorOutOfCapacity);
677 let _ = self
678 .p2p_service
679 .send_response_msg(request_id, response_sender(err));
680 }
681
682 Ok(())
683 }
684
685 fn handle_transactions_request(
686 &mut self,
687 range: Range<u32>,
688 request_id: InboundRequestId,
689 ) -> anyhow::Result<()> {
690 self.handle_db_request(
691 range,
692 request_id,
693 V2ResponseMessage::Transactions,
694 |view, cached_view, range| {
695 cached_view
696 .get_transactions(view, range)
697 .map_err(anyhow::Error::from)
698 },
699 |response, request_id| TaskRequest::DatabaseTransactionsLookUp {
700 response,
701 request_id,
702 },
703 self.max_headers_per_request,
704 )
705 }
706
707 fn handle_sealed_headers_request(
708 &mut self,
709 range: Range<u32>,
710 request_id: InboundRequestId,
711 ) -> anyhow::Result<()> {
712 self.handle_db_request(
713 range,
714 request_id,
715 V2ResponseMessage::SealedHeaders,
716 |view, cached_view, range| {
717 cached_view
718 .get_sealed_headers(view, range)
719 .map_err(anyhow::Error::from)
720 },
721 |response, request_id| TaskRequest::DatabaseHeaderLookUp {
722 response,
723 request_id,
724 },
725 self.max_headers_per_request,
726 )
727 }
728
729 fn handle_txpool_request<F, ResponseSenderFn, TaskRequestFn, R>(
730 &mut self,
731 request_id: InboundRequestId,
732 txpool_function: F,
733 response_sender: ResponseSenderFn,
734 task_request: TaskRequestFn,
735 ) -> anyhow::Result<()>
736 where
737 ResponseSenderFn:
738 Fn(Result<R, ResponseMessageErrorCode>) -> V2ResponseMessage + Send + 'static,
739 TaskRequestFn: Fn(Result<R, ResponseMessageErrorCode>, InboundRequestId) -> TaskRequest
740 + Send
741 + 'static,
742 F: Future<Output = anyhow::Result<R>> + Send + 'static,
743 {
744 let instant = Instant::now();
745 let timeout = self.response_timeout;
746 let response_channel = self.request_sender.clone();
747 let result = self.tx_pool_heavy_task_processor.try_spawn(async move {
748 if instant.elapsed() > timeout {
749 tracing::warn!("Request timed out");
750 return;
751 }
752
753 let Ok(response) = txpool_function.await else {
754 warn!("Failed to get txpool data");
755 return;
756 };
757
758 let _ = response_channel
759 .try_send(task_request(Ok(response), request_id))
760 .trace_err("Failed to send response to the request channel");
761 });
762
763 if result.is_err() {
764 let res = Err(ResponseMessageErrorCode::SyncProcessorOutOfCapacity);
765 let _ = self
766 .p2p_service
767 .send_response_msg(request_id, response_sender(res));
768 }
769
770 Ok(())
771 }
772
773 fn handle_all_transactions_ids_request(
774 &mut self,
775 request_id: InboundRequestId,
776 ) -> anyhow::Result<()> {
777 let max_txs = self.max_txs_per_request;
778 let tx_pool = self.tx_pool.clone();
779 self.handle_txpool_request(
780 request_id,
781 async move { tx_pool.get_tx_ids(max_txs).await },
782 V2ResponseMessage::TxPoolAllTransactionsIds,
783 |response, request_id| TaskRequest::TxPoolAllTransactionsIds {
784 response,
785 request_id,
786 },
787 )
788 }
789
790 fn handle_full_transactions_request(
791 &mut self,
792 tx_ids: Vec<TxId>,
793 request_id: InboundRequestId,
794 ) -> anyhow::Result<()> {
795 if tx_ids.len() > self.max_txs_per_request {
796 self.p2p_service.send_response_msg(
797 request_id,
798 V2ResponseMessage::TxPoolFullTransactions(Err(
799 ResponseMessageErrorCode::RequestedRangeTooLarge,
800 )),
801 )?;
802 return Ok(());
803 }
804 let tx_pool = self.tx_pool.clone();
805 self.handle_txpool_request(
806 request_id,
807 async move { tx_pool.get_full_txs(tx_ids).await },
808 V2ResponseMessage::TxPoolFullTransactions,
809 |response, request_id| TaskRequest::TxPoolFullTransactions {
810 response,
811 request_id,
812 },
813 )
814 }
815}
816
817fn convert_peer_id(peer_id: &PeerId) -> anyhow::Result<FuelPeerId> {
818 let inner = Vec::from(*peer_id);
819 Ok(FuelPeerId::from(inner))
820}
821
822#[async_trait::async_trait]
823impl<V, T> RunnableService for UninitializedTask<V, SharedState, T>
824where
825 V: AtomicView + 'static,
826 V::LatestView: P2pDb,
827 T: TxPool + 'static,
828{
829 const NAME: &'static str = "P2P";
830
831 type SharedData = SharedState;
832 type Task = Task<FuelP2PService, V, SharedState, T>;
833 type TaskParams = ();
834
835 fn shared_data(&self) -> Self::SharedData {
836 self.broadcast.clone()
837 }
838
839 async fn into_task(
840 mut self,
841 _: &StateWatcher,
842 _: Self::TaskParams,
843 ) -> anyhow::Result<Self::Task> {
844 let Self {
845 chain_id,
846 last_height,
847 view_provider,
848 next_block_height,
849 request_receiver,
850 broadcast,
851 tx_pool,
852 config,
853 } = self;
854
855 let view = view_provider.latest_view()?;
856 let genesis = view.get_genesis()?;
857 let config = config.init(genesis)?;
858 let Config {
859 max_block_size,
860 max_headers_per_request,
861 max_txs_per_request,
862 heartbeat_check_interval,
863 heartbeat_max_avg_interval,
864 heartbeat_max_time_since_last,
865 database_read_threads,
866 tx_pool_threads,
867 metrics,
868 ..
869 } = config;
870
871 let heartbeat_peer_reputation_config = HeartbeatPeerReputationConfig {
874 old_heartbeat_penalty: -5.,
875 low_heartbeat_frequency_penalty: -5.,
876 };
877
878 let response_timeout = config.set_request_timeout;
879 let mut p2p_service = FuelP2PService::new(
880 broadcast.reserved_peers_broadcast.clone(),
881 config,
882 GossipsubMessageHandler::new(),
883 RequestResponseMessageHandler::new(max_block_size),
884 )
885 .await?;
886 p2p_service.update_block_height(last_height);
887 p2p_service.start().await?;
888
889 let next_check_time =
890 Instant::now().checked_add(heartbeat_check_interval).expect(
891 "The heartbeat check interval should be small enough to do frequently",
892 );
893 let db_heavy_task_processor = SyncProcessor::new(
894 "P2P_DatabaseProcessor",
895 database_read_threads,
896 1024 * 10,
897 )?;
898 let tx_pool_heavy_task_processor =
899 AsyncProcessor::new("P2P_TxPoolLookUpProcessor", tx_pool_threads, 32)?;
900 let request_sender = broadcast.request_sender.clone();
901
902 let task = Task {
903 chain_id,
904 response_timeout,
905 p2p_service,
906 view_provider,
907 request_receiver,
908 request_sender,
909 next_block_height,
910 broadcast,
911 tx_pool,
912 db_heavy_task_processor,
913 tx_pool_heavy_task_processor,
914 max_headers_per_request,
915 max_txs_per_request,
916 heartbeat_check_interval,
917 heartbeat_max_avg_interval,
918 heartbeat_max_time_since_last,
919 next_check_time,
920 heartbeat_peer_reputation_config,
921 cached_view: Arc::new(CachedView::new(614 * 10, metrics)),
922 };
923 Ok(task)
924 }
925}
926
927impl<P, V, B, T> RunnableTask for Task<P, V, B, T>
929where
930 P: TaskP2PService + 'static,
931 V: AtomicView + 'static,
932 V::LatestView: P2pDb,
933 B: Broadcast + 'static,
934 T: TxPool + 'static,
935{
936 async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction {
937 tokio::select! {
938 biased;
939
940 _ = watcher.while_started() => {
941 TaskNextAction::Stop
942 },
943 latest_block_height = self.next_block_height.next() => {
944 if let Some(latest_block_height) = latest_block_height {
945 let _ = self.p2p_service.update_block_height(latest_block_height);
946 TaskNextAction::Continue
947 } else {
948 TaskNextAction::Stop
949 }
950 },
951 next_service_request = self.request_receiver.recv() => {
952 match next_service_request {
953 Some(TaskRequest::BroadcastTransaction(transaction)) => {
954 let tx_id = transaction.id(&self.chain_id);
955 let broadcast = GossipsubBroadcastRequest::NewTx(transaction);
956 let result = self.p2p_service.publish_message(broadcast);
957 if let Err(e) = result {
958 tracing::error!("Got an error during transaction {} broadcasting {}", tx_id, e);
959 }
960 }
961 Some(TaskRequest::BroadcastPreConfirmations(pre_confirmation_message)) => {
962 let broadcast = GossipsubBroadcastRequest::TxPreConfirmations(pre_confirmation_message);
963 let result = self.p2p_service.publish_message(broadcast.clone());
964 if let Err(e) = result {
965 tracing::error!("Got an error during pre-confirmation message broadcasting {:?}: {}", broadcast, e);
966 }
967 }
968 Some(TaskRequest::GetSealedHeaders { block_height_range, channel}) => {
969 let height = BlockHeight::from(block_height_range.end.saturating_sub(1));
972 let Some(peer) = self.p2p_service.get_peer_id_with_height(&height) else {
973 let _ = channel.send(Err(TaskError::NoPeerFound));
974 return TaskNextAction::Continue
975 };
976 let channel = ResponseSender::SealedHeaders(channel);
977 let request_msg = RequestMessage::SealedHeaders(block_height_range.clone());
978 self.p2p_service.send_request_msg(Some(peer), request_msg, channel).expect("We always have a peer here, so send has a target");
979 }
980 Some(TaskRequest::GetTransactions {block_height_range, channel }) => {
981 let height = BlockHeight::from(block_height_range.end.saturating_sub(1));
982 let Some(peer) = self.p2p_service.get_peer_id_with_height(&height) else {
983 let _ = channel.send(Err(TaskError::NoPeerFound));
984 return TaskNextAction::Continue
985 };
986 let channel = ResponseSender::Transactions(channel);
987 let request_msg = RequestMessage::Transactions(block_height_range.clone());
988 self.p2p_service.send_request_msg(Some(peer), request_msg, channel).expect("We always have a peer here, so send has a target");
989 }
990 Some(TaskRequest::GetTransactionsFromPeer { block_height_range, from_peer, channel }) => {
991 let channel = ResponseSender::TransactionsFromPeer(channel);
992 let request_msg = RequestMessage::Transactions(block_height_range);
993 self.p2p_service.send_request_msg(Some(from_peer), request_msg, channel).expect("We always a peer here, so send has a target");
994 }
995 Some(TaskRequest::TxPoolGetAllTxIds { from_peer, channel }) => {
996 let channel = ResponseSender::TxPoolAllTransactionsIds(channel);
997 let request_msg = RequestMessage::TxPoolAllTransactionsIds;
998 self.p2p_service.send_request_msg(Some(from_peer), request_msg, channel).expect("We always have a peer here, so send has a target");
999 }
1000 Some(TaskRequest::TxPoolGetFullTransactions { tx_ids, from_peer, channel }) => {
1001 let channel = ResponseSender::TxPoolFullTransactions(channel);
1002 let request_msg = RequestMessage::TxPoolFullTransactions(tx_ids);
1003 self.p2p_service.send_request_msg(Some(from_peer), request_msg, channel).expect("We always have a peer here, so send has a target");
1004 }
1005 Some(TaskRequest::RespondWithGossipsubMessageReport((message, acceptance))) => {
1006 let res = self.p2p_service.report_message(message, acceptance);
1007 if let Err(err) = res {
1008 return TaskNextAction::ErrorContinue(err)
1009 }
1010 }
1011 Some(TaskRequest::RespondWithPeerReport { peer_id, score, reporting_service }) => {
1012 let _ = self.p2p_service.report_peer(peer_id, score, reporting_service);
1013 }
1014 Some(TaskRequest::GetAllPeerInfo { channel }) => {
1015 let peers = self.p2p_service.get_all_peer_info()
1016 .into_iter()
1017 .map(|(id, info)| (*id, info.clone()))
1018 .collect::<Vec<_>>();
1019 let _ = channel.send(peers);
1020 }
1021 Some(TaskRequest::DatabaseTransactionsLookUp { response, request_id }) => {
1022 let _ = self.p2p_service.send_response_msg(request_id, V2ResponseMessage::Transactions(response));
1023 }
1024 Some(TaskRequest::DatabaseHeaderLookUp { response, request_id }) => {
1025 let _ = self.p2p_service.send_response_msg(request_id, V2ResponseMessage::SealedHeaders(response));
1026 }
1027 Some(TaskRequest::TxPoolAllTransactionsIds { response, request_id }) => {
1028 let _ = self.p2p_service.send_response_msg(request_id, V2ResponseMessage::TxPoolAllTransactionsIds(response));
1029 }
1030 Some(TaskRequest::TxPoolFullTransactions { response, request_id }) => {
1031 let _ = self.p2p_service.send_response_msg(request_id, V2ResponseMessage::TxPoolFullTransactions(response));
1032 }
1033 None => {
1034 tracing::error!("The P2P `Task` should be holder of the `Sender`");
1035 return TaskNextAction::Stop
1036 }
1037 }
1038 TaskNextAction::Continue
1039 }
1040 p2p_event = self.p2p_service.next_event() => {
1041 match p2p_event {
1042 Some(FuelP2PEvent::PeerInfoUpdated { peer_id, block_height }) => {
1043 let peer_id: Vec<u8> = peer_id.into();
1044 let block_height_data = BlockHeightHeartbeatData {
1045 peer_id: peer_id.into(),
1046 block_height,
1047 };
1048
1049 let _ = self.broadcast.block_height_broadcast(block_height_data);
1050 }
1051 Some(FuelP2PEvent::GossipsubMessage { message, message_id, peer_id,.. }) => {
1052 tracing::info!("Received gossip message from peer {:?}", peer_id);
1053 self.broadcast_gossip_message(message, message_id, peer_id);
1054 },
1055 Some(FuelP2PEvent::InboundRequestMessage { request_message, request_id }) => {
1056 let res = self.process_request(request_message, request_id);
1057 if let Err(err) = res {
1058 return TaskNextAction::ErrorContinue(err)
1059 }
1060 },
1061 Some(FuelP2PEvent::NewSubscription { peer_id, tag }) => {
1062 if tag == GossipTopicTag::NewTx {
1063 let _ = self.broadcast.new_tx_subscription_broadcast(FuelPeerId::from(peer_id.to_bytes()));
1064 }
1065 },
1066 _ => (),
1067 }
1068 TaskNextAction::Continue
1069 },
1070 _ = tokio::time::sleep_until(self.next_check_time) => {
1071 let res = self.peer_heartbeat_reputation_checks();
1072 match res {
1073 Ok(_) => tracing::debug!("Peer heartbeat reputation checks completed"),
1074 Err(e) => {
1075 tracing::error!("Failed to perform peer heartbeat reputation checks: {:?}", e);
1076 }
1077 }
1078
1079 if let Some(next_check_time) = self.next_check_time.checked_add(self.heartbeat_check_interval) {
1080 self.next_check_time = next_check_time;
1081 TaskNextAction::Continue
1082 } else {
1083 tracing::error!("Next check time overflowed");
1084 TaskNextAction::Stop
1085 }
1086 }
1087 }
1088 }
1089
1090 async fn shutdown(self) -> anyhow::Result<()> {
1091 Ok(())
1099 }
1100}
1101
1102#[derive(Clone)]
1103pub struct SharedState {
1104 new_tx_subscription_broadcast: broadcast::Sender<FuelPeerId>,
1106 tx_broadcast: broadcast::Sender<TransactionGossipData>,
1108 pre_confirmations_broadcast: broadcast::Sender<P2PPreConfirmationGossipData>,
1110 reserved_peers_broadcast: broadcast::Sender<usize>,
1112 request_sender: mpsc::Sender<TaskRequest>,
1114 block_height_broadcast: broadcast::Sender<BlockHeightHeartbeatData>,
1116 max_txs_per_request: usize,
1118}
1119
1120impl SharedState {
1121 pub fn notify_gossip_transaction_validity(
1122 &self,
1123 message_info: GossipsubMessageInfo,
1124 acceptance: GossipsubMessageAcceptance,
1125 ) -> anyhow::Result<()> {
1126 self.request_sender
1127 .try_send(TaskRequest::RespondWithGossipsubMessageReport((
1128 message_info,
1129 acceptance,
1130 )))?;
1131 Ok(())
1132 }
1133
1134 pub async fn get_sealed_block_headers(
1135 &self,
1136 block_height_range: Range<u32>,
1137 ) -> anyhow::Result<(Vec<u8>, Option<Vec<SealedBlockHeader>>)> {
1138 let (sender, receiver) = oneshot::channel();
1139
1140 if block_height_range.is_empty() {
1141 return Err(anyhow!(
1142 "Cannot retrieve headers for an empty range of block heights"
1143 ));
1144 }
1145
1146 self.request_sender
1147 .send(TaskRequest::GetSealedHeaders {
1148 block_height_range,
1149 channel: sender,
1150 })
1151 .await?;
1152
1153 let (peer_id, response) = receiver
1154 .await
1155 .map_err(|e| anyhow!("{e}"))?
1156 .map_err(|e| anyhow!("{e}"))?;
1157
1158 let data = response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?;
1159 if let Err(ref response_error_code) = data {
1160 warn!(
1161 "Peer {peer_id:?} failed to respond with sealed headers: {response_error_code:?}"
1162 );
1163 };
1164
1165 Ok((peer_id.to_bytes(), data.ok()))
1166 }
1167
1168 pub async fn get_transactions(
1169 &self,
1170 range: Range<u32>,
1171 ) -> anyhow::Result<(Vec<u8>, Option<Vec<Transactions>>)> {
1172 let (sender, receiver) = oneshot::channel();
1173
1174 if range.is_empty() {
1175 return Err(anyhow!(
1176 "Cannot retrieve transactions for an empty range of block heights"
1177 ));
1178 }
1179
1180 self.request_sender
1181 .send(TaskRequest::GetTransactions {
1182 block_height_range: range,
1183 channel: sender,
1184 })
1185 .await?;
1186
1187 let (peer_id, response) = receiver
1188 .await
1189 .map_err(|e| anyhow!("{e}"))?
1190 .map_err(|e| anyhow!("{e}"))?;
1191
1192 let data = match response {
1193 Err(request_response_protocol_error) => Err(anyhow!(
1194 "Invalid response from peer {request_response_protocol_error:?}"
1195 )),
1196 Ok(Err(response_error_code)) => {
1197 warn!(
1198 "Peer {peer_id:?} failed to respond with sealed headers: {response_error_code:?}"
1199 );
1200 Ok(None)
1201 }
1202 Ok(Ok(headers)) => Ok(Some(headers)),
1203 };
1204 data.map(|data| (peer_id.to_bytes(), data))
1205 }
1206
1207 pub async fn get_transactions_from_peer(
1208 &self,
1209 peer_id: FuelPeerId,
1210 range: Range<u32>,
1211 ) -> anyhow::Result<Option<Vec<Transactions>>> {
1212 let (sender, receiver) = oneshot::channel();
1213 let from_peer = PeerId::from_bytes(peer_id.as_ref()).expect("Valid PeerId");
1214
1215 let request = TaskRequest::GetTransactionsFromPeer {
1216 block_height_range: range,
1217 from_peer,
1218 channel: sender,
1219 };
1220 self.request_sender.send(request).await?;
1221
1222 let (response_from_peer, response) =
1223 receiver.await.map_err(|e| anyhow!("{e}"))?;
1224 assert_eq!(
1225 peer_id.as_ref(),
1226 response_from_peer.to_bytes(),
1227 "Bug: response from non-requested peer"
1228 );
1229
1230 match response {
1231 Err(request_response_protocol_error) => Err(anyhow!(
1232 "Invalid response from peer {request_response_protocol_error:?}"
1233 )),
1234 Ok(Err(response_error_code)) => {
1235 warn!(
1236 "Peer {peer_id:?} failed to respond with transactions: {response_error_code:?}"
1237 );
1238 Ok(None)
1239 }
1240 Ok(Ok(txs)) => Ok(Some(txs)),
1241 }
1242 }
1243
1244 pub async fn get_all_transactions_ids_from_peer(
1245 &self,
1246 peer_id: FuelPeerId,
1247 ) -> anyhow::Result<Vec<TxId>> {
1248 let (sender, receiver) = oneshot::channel();
1249 let from_peer = PeerId::from_bytes(peer_id.as_ref()).expect("Valid PeerId");
1250 let request = TaskRequest::TxPoolGetAllTxIds {
1251 from_peer,
1252 channel: sender,
1253 };
1254 self.request_sender.try_send(request)?;
1255
1256 let (response_from_peer, response) =
1257 receiver.await.map_err(|e| anyhow!("{e}"))?;
1258
1259 debug_assert_eq!(
1260 peer_id.as_ref(),
1261 response_from_peer.to_bytes(),
1262 "Bug: response from non-requested peer"
1263 );
1264
1265 let response =
1266 response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?;
1267
1268 let txs = response.inspect_err(|e| { warn!("Peer {peer_id:?} could not response to request to get all transactions ids: {e:?}"); } ).unwrap_or_default();
1269
1270 if txs.len() > self.max_txs_per_request {
1271 return Err(anyhow!("Too many transactions requested: {}", txs.len()));
1272 }
1273 Ok(txs)
1274 }
1275
1276 pub async fn get_full_transactions_from_peer(
1277 &self,
1278 peer_id: FuelPeerId,
1279 tx_ids: Vec<TxId>,
1280 ) -> anyhow::Result<Vec<Option<Transaction>>> {
1281 let (sender, receiver) = oneshot::channel();
1282 let from_peer = PeerId::from_bytes(peer_id.as_ref()).expect("Valid PeerId");
1283 let request = TaskRequest::TxPoolGetFullTransactions {
1284 tx_ids,
1285 from_peer,
1286 channel: sender,
1287 };
1288 self.request_sender.try_send(request)?;
1289
1290 let (response_from_peer, response) =
1291 receiver.await.map_err(|e| anyhow!("{e}"))?;
1292 debug_assert_eq!(
1293 peer_id.as_ref(),
1294 response_from_peer.to_bytes(),
1295 "Bug: response from non-requested peer"
1296 );
1297
1298 let response =
1299 response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?;
1300 let txs = response.inspect_err(|e| { warn!("Peer {peer_id:?} could not response to request to get full transactions: {e:?}"); } ).unwrap_or_default();
1301
1302 if txs.len() > self.max_txs_per_request {
1303 return Err(anyhow!("Too many transactions requested: {}", txs.len()));
1304 }
1305 txs.into_iter()
1306 .map(|tx| {
1307 tx.map(Transaction::try_from)
1308 .transpose()
1309 .map_err(|err| anyhow::anyhow!(err))
1310 })
1311 .collect()
1312 }
1313
1314 pub fn broadcast_transaction(
1315 &self,
1316 transaction: Arc<Transaction>,
1317 ) -> anyhow::Result<()> {
1318 self.request_sender
1319 .try_send(TaskRequest::BroadcastTransaction(transaction))?;
1320 Ok(())
1321 }
1322
1323 pub fn broadcast_preconfirmations(
1324 &self,
1325 preconfirmations: Arc<P2PPreConfirmationMessage>,
1326 ) -> anyhow::Result<()> {
1327 self.request_sender
1328 .try_send(TaskRequest::BroadcastPreConfirmations(preconfirmations))?;
1329 Ok(())
1330 }
1331
1332 pub async fn get_all_peers(&self) -> anyhow::Result<Vec<(PeerId, PeerInfo)>> {
1333 let (sender, receiver) = oneshot::channel();
1334
1335 self.request_sender
1336 .send(TaskRequest::GetAllPeerInfo { channel: sender })
1337 .await?;
1338
1339 receiver.await.map_err(|e| anyhow!("{}", e))
1340 }
1341
1342 pub fn subscribe_new_peers(&self) -> broadcast::Receiver<FuelPeerId> {
1343 self.new_tx_subscription_broadcast.subscribe()
1344 }
1345
1346 pub fn subscribe_tx(&self) -> broadcast::Receiver<TransactionGossipData> {
1347 self.tx_broadcast.subscribe()
1348 }
1349
1350 pub fn subscribe_preconfirmations(
1351 &self,
1352 ) -> broadcast::Receiver<P2PPreConfirmationGossipData> {
1353 self.pre_confirmations_broadcast.subscribe()
1354 }
1355
1356 pub fn subscribe_block_height(
1357 &self,
1358 ) -> broadcast::Receiver<BlockHeightHeartbeatData> {
1359 self.block_height_broadcast.subscribe()
1360 }
1361
1362 pub fn subscribe_reserved_peers_count(&self) -> broadcast::Receiver<usize> {
1363 self.reserved_peers_broadcast.subscribe()
1364 }
1365
1366 pub fn report_peer<T: PeerReport>(
1367 &self,
1368 peer_id: FuelPeerId,
1369 peer_report: T,
1370 reporting_service: &'static str,
1371 ) -> anyhow::Result<()> {
1372 match Vec::from(peer_id).try_into() {
1373 Ok(peer_id) => {
1374 let score = peer_report.get_score_from_report();
1375
1376 self.request_sender
1377 .try_send(TaskRequest::RespondWithPeerReport {
1378 peer_id,
1379 score,
1380 reporting_service,
1381 })?;
1382
1383 Ok(())
1384 }
1385 Err(e) => {
1386 warn!(target: "fuel-p2p", "Failed to read PeerId from {e:?}");
1387 Err(anyhow::anyhow!("Failed to read PeerId from {e:?}"))
1388 }
1389 }
1390 }
1391}
1392
1393pub fn build_shared_state(
1394 config: Config<NotInitialized>,
1395) -> (SharedState, Receiver<TaskRequest>) {
1396 let (request_sender, request_receiver) = mpsc::channel(CHANNEL_SIZE);
1397 let (tx_broadcast, _) = broadcast::channel(CHANNEL_SIZE);
1398 let (preconfirmations_broadcast, _) = broadcast::channel(CHANNEL_SIZE);
1399 let (new_tx_subscription_broadcast, _) = broadcast::channel(CHANNEL_SIZE);
1400 let (block_height_broadcast, _) = broadcast::channel(CHANNEL_SIZE);
1401
1402 let (reserved_peers_broadcast, _) = broadcast::channel::<usize>(
1403 config
1404 .reserved_nodes
1405 .len()
1406 .saturating_mul(2)
1407 .saturating_add(1),
1408 );
1409
1410 (
1411 SharedState {
1412 request_sender,
1413 new_tx_subscription_broadcast,
1414 tx_broadcast,
1415 pre_confirmations_broadcast: preconfirmations_broadcast,
1416 reserved_peers_broadcast,
1417 block_height_broadcast,
1418 max_txs_per_request: config.max_txs_per_request,
1419 },
1420 request_receiver,
1421 )
1422}
1423
1424#[allow(clippy::too_many_arguments)]
1425pub fn new_service<V, B, T>(
1426 chain_id: ChainId,
1427 last_height: BlockHeight,
1428 p2p_config: Config<NotInitialized>,
1429 shared_state: SharedState,
1430 request_receiver: Receiver<TaskRequest>,
1431 view_provider: V,
1432 block_importer: B,
1433 tx_pool: T,
1434) -> Service<V, T>
1435where
1436 V: AtomicView + 'static,
1437 V::LatestView: P2pDb,
1438 B: BlockHeightImporter,
1439 T: TxPool,
1440{
1441 let task = UninitializedTask::new(
1442 chain_id,
1443 last_height,
1444 p2p_config,
1445 shared_state,
1446 request_receiver,
1447 view_provider,
1448 block_importer,
1449 tx_pool,
1450 );
1451 Service::new(task)
1452}
1453
1454pub fn to_message_acceptance(
1455 acceptance: &GossipsubMessageAcceptance,
1456) -> MessageAcceptance {
1457 match acceptance {
1458 GossipsubMessageAcceptance::Accept => MessageAcceptance::Accept,
1459 GossipsubMessageAcceptance::Reject => MessageAcceptance::Reject,
1460 GossipsubMessageAcceptance::Ignore => MessageAcceptance::Ignore,
1461 }
1462}
1463
1464fn report_message(
1465 p2p_service: &mut FuelP2PService,
1466 message: GossipsubMessageInfo,
1467 acceptance: GossipsubMessageAcceptance,
1468) {
1469 let GossipsubMessageInfo {
1470 peer_id,
1471 message_id,
1472 } = message;
1473
1474 let msg_id = message_id.into();
1475 let peer_id: Vec<u8> = peer_id.into();
1476
1477 if let Ok(peer_id) = peer_id.try_into() {
1478 let acceptance = to_message_acceptance(&acceptance);
1479 p2p_service.report_message_validation_result(&msg_id, peer_id, acceptance);
1480 } else {
1481 warn!(target: "fuel-p2p", "Failed to read PeerId from received GossipsubMessageId: {}", msg_id);
1482 }
1483}