1use crate::{
2 cached_view::CachedView,
3 codecs::{
4 gossipsub::GossipsubMessageHandler,
5 request_response::RequestResponseMessageHandler,
6 },
7 config::{
8 Config,
9 NotInitialized,
10 },
11 gossipsub::messages::{
12 GossipTopicTag,
13 GossipsubBroadcastRequest,
14 GossipsubMessage,
15 },
16 p2p_service::{
17 FuelP2PEvent,
18 FuelP2PService,
19 },
20 peer_manager::PeerInfo,
21 ports::{
22 BlockHeightImporter,
23 P2PPreConfirmationGossipData,
24 P2PPreConfirmationMessage,
25 P2pDb,
26 TxPool,
27 },
28 request_response::messages::{
29 OnResponse,
30 OnResponseWithPeerSelection,
31 RequestMessage,
32 ResponseMessageErrorCode,
33 ResponseSender,
34 V2ResponseMessage,
35 },
36};
37use anyhow::anyhow;
38use fuel_core_metrics::p2p_metrics::set_blocks_requested;
39use fuel_core_services::{
40 AsyncProcessor,
41 RunnableService,
42 RunnableTask,
43 ServiceRunner,
44 StateWatcher,
45 SyncProcessor,
46 TaskNextAction,
47 TraceErr,
48 stream::BoxStream,
49};
50use fuel_core_storage::transactional::AtomicView;
51use fuel_core_types::{
52 blockchain::SealedBlockHeader,
53 fuel_tx::{
54 Transaction,
55 TxId,
56 UniqueIdentifier,
57 },
58 fuel_types::{
59 BlockHeight,
60 ChainId,
61 },
62 services::p2p::{
63 BlockHeightHeartbeatData,
64 GossipData,
65 GossipsubMessageAcceptance,
66 GossipsubMessageInfo,
67 NetworkableTransactionPool,
68 PeerId as FuelPeerId,
69 TransactionGossipData,
70 Transactions,
71 peer_reputation::{
72 AppScore,
73 PeerReport,
74 },
75 },
76};
77use futures::{
78 StreamExt,
79 future::BoxFuture,
80};
81use libp2p::{
82 PeerId,
83 gossipsub::{
84 MessageAcceptance,
85 MessageId,
86 PublishError,
87 },
88 request_response::InboundRequestId,
89};
90use std::{
91 fmt::Debug,
92 future::Future,
93 ops::Range,
94 sync::Arc,
95};
96use thiserror::Error;
97use tokio::{
98 sync::{
99 broadcast,
100 mpsc::{
101 self,
102 Receiver,
103 },
104 oneshot,
105 },
106 time::{
107 Duration,
108 Instant,
109 },
110};
111use tracing::warn;
112
113#[cfg(test)]
114pub mod broadcast_tests;
115#[cfg(test)]
116pub mod task_tests;
117
118const CHANNEL_SIZE: usize = 1024 * 10;
119
120pub type Service<V, T> = ServiceRunner<UninitializedTask<V, SharedState, T>>;
121
122#[derive(Debug, Error)]
123pub enum TaskError {
124 #[error("No peer found to send request to")]
125 NoPeerFound,
126}
127
128pub enum TaskRequest {
129 BroadcastTransaction(Arc<Transaction>),
131 BroadcastPreConfirmations(Arc<P2PPreConfirmationMessage>),
133 GetAllPeerInfo {
135 channel: oneshot::Sender<Vec<(PeerId, PeerInfo)>>,
136 },
137 GetSealedHeaders {
138 block_height_range: Range<u32>,
139 channel: OnResponseWithPeerSelection<
140 Result<Vec<SealedBlockHeader>, ResponseMessageErrorCode>,
141 >,
142 },
143 GetTransactions {
144 block_height_range: Range<u32>,
145 channel: OnResponseWithPeerSelection<
146 Result<Vec<Transactions>, ResponseMessageErrorCode>,
147 >,
148 },
149 GetTransactionsFromPeer {
150 block_height_range: Range<u32>,
151 from_peer: PeerId,
152 channel: OnResponse<Result<Vec<Transactions>, ResponseMessageErrorCode>>,
153 },
154 TxPoolGetAllTxIds {
155 from_peer: PeerId,
156 channel: OnResponse<Result<Vec<TxId>, ResponseMessageErrorCode>>,
157 },
158 TxPoolGetFullTransactions {
159 tx_ids: Vec<TxId>,
160 from_peer: PeerId,
161 channel: OnResponse<
162 Result<Vec<Option<NetworkableTransactionPool>>, ResponseMessageErrorCode>,
163 >,
164 },
165 RespondWithGossipsubMessageReport((GossipsubMessageInfo, GossipsubMessageAcceptance)),
167 RespondWithPeerReport {
168 peer_id: PeerId,
169 score: AppScore,
170 reporting_service: &'static str,
171 },
172 DatabaseTransactionsLookUp {
173 response: Result<Vec<Transactions>, ResponseMessageErrorCode>,
174 request_id: InboundRequestId,
175 },
176 DatabaseHeaderLookUp {
177 response: Result<Vec<SealedBlockHeader>, ResponseMessageErrorCode>,
178 request_id: InboundRequestId,
179 },
180 TxPoolAllTransactionsIds {
181 response: Result<Vec<TxId>, ResponseMessageErrorCode>,
182 request_id: InboundRequestId,
183 },
184 TxPoolFullTransactions {
185 response:
186 Result<Vec<Option<NetworkableTransactionPool>>, ResponseMessageErrorCode>,
187 request_id: InboundRequestId,
188 },
189}
190
191impl Debug for TaskRequest {
192 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193 match self {
194 TaskRequest::BroadcastTransaction(_) => {
195 write!(f, "TaskRequest::BroadcastTransaction")
196 }
197 TaskRequest::BroadcastPreConfirmations(_) => {
198 write!(f, "TaskRequest::BroadcastPreConfirmations")
199 }
200 TaskRequest::GetSealedHeaders { .. } => {
201 write!(f, "TaskRequest::GetSealedHeaders")
202 }
203 TaskRequest::GetTransactions { .. } => {
204 write!(f, "TaskRequest::GetTransactions")
205 }
206 TaskRequest::GetTransactionsFromPeer { .. } => {
207 write!(f, "TaskRequest::GetTransactionsFromPeer")
208 }
209 TaskRequest::TxPoolGetAllTxIds { .. } => {
210 write!(f, "TaskRequest::TxPoolGetAllTxIds")
211 }
212 TaskRequest::TxPoolGetFullTransactions { .. } => {
213 write!(f, "TaskRequest::TxPoolGetFullTransactions")
214 }
215 TaskRequest::RespondWithGossipsubMessageReport(_) => {
216 write!(f, "TaskRequest::RespondWithGossipsubMessageReport")
217 }
218 TaskRequest::RespondWithPeerReport { .. } => {
219 write!(f, "TaskRequest::RespondWithPeerReport")
220 }
221 TaskRequest::GetAllPeerInfo { .. } => {
222 write!(f, "TaskRequest::GetPeerInfo")
223 }
224 TaskRequest::DatabaseTransactionsLookUp { .. } => {
225 write!(f, "TaskRequest::DatabaseTransactionsLookUp")
226 }
227 TaskRequest::DatabaseHeaderLookUp { .. } => {
228 write!(f, "TaskRequest::DatabaseHeaderLookUp")
229 }
230 TaskRequest::TxPoolAllTransactionsIds { .. } => {
231 write!(f, "TaskRequest::TxPoolAllTransactionsIds")
232 }
233 TaskRequest::TxPoolFullTransactions { .. } => {
234 write!(f, "TaskRequest::TxPoolFullTransactions")
235 }
236 }
237 }
238}
239
240#[derive(Debug, Clone, Copy, PartialEq, Eq)]
241pub enum HeartBeatPeerReportReason {
242 OldHeartBeat,
243 LowHeartBeatFrequency,
244}
245
246pub trait TaskP2PService: Send {
247 fn get_all_peer_info(&self) -> Vec<(&PeerId, &PeerInfo)>;
248 fn get_peer_id_with_height(&self, height: &BlockHeight) -> Option<PeerId>;
249
250 fn next_event(&mut self) -> BoxFuture<'_, Option<FuelP2PEvent>>;
251
252 fn publish_message(
253 &mut self,
254 message: GossipsubBroadcastRequest,
255 ) -> anyhow::Result<()>;
256
257 fn send_request_msg(
258 &mut self,
259 peer_id: Option<PeerId>,
260 request_msg: RequestMessage,
261 on_response: ResponseSender,
262 ) -> anyhow::Result<()>;
263
264 fn send_response_msg(
265 &mut self,
266 request_id: InboundRequestId,
267 message: V2ResponseMessage,
268 ) -> anyhow::Result<()>;
269
270 fn report_message(
271 &mut self,
272 message: GossipsubMessageInfo,
273 acceptance: GossipsubMessageAcceptance,
274 ) -> anyhow::Result<()>;
275
276 fn report_peer(
277 &mut self,
278 peer_id: PeerId,
279 score: AppScore,
280 reporting_service: &str,
281 ) -> anyhow::Result<()>;
282
283 fn update_block_height(&mut self, height: BlockHeight) -> anyhow::Result<()>;
284
285 fn update_metrics<T>(&self, update_fn: T)
286 where
287 T: FnOnce();
288}
289
290impl TaskP2PService for FuelP2PService {
291 fn update_metrics<T>(&self, update_fn: T)
292 where
293 T: FnOnce(),
294 {
295 FuelP2PService::update_metrics(self, update_fn)
296 }
297
298 fn get_all_peer_info(&self) -> Vec<(&PeerId, &PeerInfo)> {
299 self.peer_manager().get_all_peers().collect()
300 }
301
302 fn get_peer_id_with_height(&self, height: &BlockHeight) -> Option<PeerId> {
303 self.peer_manager().get_peer_id_with_height(height)
304 }
305
306 fn next_event(&mut self) -> BoxFuture<'_, Option<FuelP2PEvent>> {
307 Box::pin(self.next_event())
308 }
309
310 fn publish_message(
311 &mut self,
312 message: GossipsubBroadcastRequest,
313 ) -> anyhow::Result<()> {
314 let result = self.publish_message(message);
315
316 match result {
317 Ok(_) => Ok(()),
318 Err(e) => {
319 if matches!(&e, PublishError::InsufficientPeers) {
320 Ok(())
321 } else {
322 Err(anyhow!(e))
323 }
324 }
325 }
326 }
327
328 fn send_request_msg(
329 &mut self,
330 peer_id: Option<PeerId>,
331 request_msg: RequestMessage,
332 on_response: ResponseSender,
333 ) -> anyhow::Result<()> {
334 self.send_request_msg(peer_id, request_msg, on_response)?;
335 Ok(())
336 }
337
338 fn send_response_msg(
339 &mut self,
340 request_id: InboundRequestId,
341 message: V2ResponseMessage,
342 ) -> anyhow::Result<()> {
343 self.send_response_msg(request_id, message)?;
344 Ok(())
345 }
346
347 fn report_message(
348 &mut self,
349 message: GossipsubMessageInfo,
350 acceptance: GossipsubMessageAcceptance,
351 ) -> anyhow::Result<()> {
352 report_message(self, message, acceptance);
353 Ok(())
354 }
355
356 fn report_peer(
357 &mut self,
358 peer_id: PeerId,
359 score: AppScore,
360 reporting_service: &str,
361 ) -> anyhow::Result<()> {
362 self.report_peer(peer_id, score, reporting_service);
363 Ok(())
364 }
365
366 fn update_block_height(&mut self, height: BlockHeight) -> anyhow::Result<()> {
367 self.update_block_height(height);
368 Ok(())
369 }
370}
371
372pub trait Broadcast: Send {
373 fn report_peer(
374 &self,
375 peer_id: FuelPeerId,
376 report: AppScore,
377 reporting_service: &'static str,
378 ) -> anyhow::Result<()>;
379
380 fn block_height_broadcast(
381 &self,
382 block_height_data: BlockHeightHeartbeatData,
383 ) -> anyhow::Result<()>;
384
385 fn tx_broadcast(&self, transaction: TransactionGossipData) -> anyhow::Result<()>;
386
387 fn pre_confirmation_broadcast(
388 &self,
389 confirmations: P2PPreConfirmationGossipData,
390 ) -> anyhow::Result<()>;
391
392 fn new_tx_subscription_broadcast(&self, peer_id: FuelPeerId) -> anyhow::Result<()>;
393}
394
395impl Broadcast for SharedState {
396 fn report_peer(
397 &self,
398 peer_id: FuelPeerId,
399 report: AppScore,
400 reporting_service: &'static str,
401 ) -> anyhow::Result<()> {
402 self.report_peer(peer_id, report, reporting_service)
403 }
404
405 fn block_height_broadcast(
406 &self,
407 block_height_data: BlockHeightHeartbeatData,
408 ) -> anyhow::Result<()> {
409 self.block_height_broadcast.send(block_height_data)?;
410 Ok(())
411 }
412
413 fn tx_broadcast(&self, transaction: TransactionGossipData) -> anyhow::Result<()> {
414 self.tx_broadcast.send(transaction)?;
415 Ok(())
416 }
417
418 fn pre_confirmation_broadcast(
419 &self,
420 confirmations: P2PPreConfirmationGossipData,
421 ) -> anyhow::Result<()> {
422 self.pre_confirmations_broadcast.send(confirmations)?;
423 Ok(())
424 }
425
426 fn new_tx_subscription_broadcast(&self, peer_id: FuelPeerId) -> anyhow::Result<()> {
427 self.new_tx_subscription_broadcast.send(peer_id)?;
428 Ok(())
429 }
430}
431
432pub struct UninitializedTask<V, B, T> {
434 chain_id: ChainId,
435 last_height: BlockHeight,
436 view_provider: V,
437 next_block_height: BoxStream<BlockHeight>,
438 request_receiver: mpsc::Receiver<TaskRequest>,
440 broadcast: B,
441 tx_pool: T,
442 config: Config<NotInitialized>,
443}
444
445pub struct Task<P, V, B, T> {
448 chain_id: ChainId,
449 response_timeout: Duration,
450 p2p_service: P,
451 view_provider: V,
452 next_block_height: BoxStream<BlockHeight>,
453 request_receiver: mpsc::Receiver<TaskRequest>,
455 request_sender: mpsc::Sender<TaskRequest>,
456 db_heavy_task_processor: SyncProcessor,
457 tx_pool_heavy_task_processor: AsyncProcessor,
458 broadcast: B,
459 tx_pool: T,
460 max_headers_per_request: usize,
461 max_txs_per_request: usize,
462 heartbeat_check_interval: Duration,
464 heartbeat_max_avg_interval: Duration,
465 heartbeat_max_time_since_last: Duration,
466 next_check_time: Instant,
467 heartbeat_peer_reputation_config: HeartbeatPeerReputationConfig,
468 cached_view: Arc<CachedView>,
470}
471
472impl<P, V, B, T> Task<P, V, B, T>
473where
474 P: TaskP2PService,
475 B: Broadcast,
476{
477 pub(crate) fn broadcast_gossip_message(
478 &mut self,
479 message: GossipsubMessage,
480 message_id: MessageId,
481 peer_id: PeerId,
482 ) {
483 let message_id = message_id.0;
484
485 match message {
486 GossipsubMessage::NewTx(transaction) => {
487 let next_transaction = GossipData::new(transaction, peer_id, message_id);
488 let _ = self.broadcast.tx_broadcast(next_transaction);
489 }
490 GossipsubMessage::TxPreConfirmations(confirmations) => {
491 let data = GossipData::new(confirmations, peer_id, message_id);
492 let _ = self.broadcast.pre_confirmation_broadcast(data);
493 }
494 }
495 }
496}
497
498#[derive(Default, Clone)]
499pub struct HeartbeatPeerReputationConfig {
500 old_heartbeat_penalty: AppScore,
501 low_heartbeat_frequency_penalty: AppScore,
502}
503
504impl<V, T> UninitializedTask<V, SharedState, T> {
505 #[allow(clippy::too_many_arguments)]
506 pub fn new<B: BlockHeightImporter>(
507 chain_id: ChainId,
508 last_height: BlockHeight,
509 config: Config<NotInitialized>,
510 shared_state: SharedState,
511 request_receiver: Receiver<TaskRequest>,
512 view_provider: V,
513 block_importer: B,
514 tx_pool: T,
515 ) -> Self {
516 let next_block_height = block_importer.next_block_height();
517
518 Self {
519 chain_id,
520 last_height,
521 view_provider,
522 tx_pool,
523 next_block_height,
524 request_receiver,
525 broadcast: shared_state,
526 config,
527 }
528 }
529}
530
531impl<P, V, B, T> Task<P, V, B, T>
532where
533 P: TaskP2PService,
534 V: AtomicView,
535 B: Broadcast,
536{
537 fn peer_heartbeat_reputation_checks(&self) -> anyhow::Result<()> {
538 for (peer_id, peer_info) in self.p2p_service.get_all_peer_info() {
539 if peer_info.heartbeat_data.duration_since_last_heartbeat()
540 > self.heartbeat_max_time_since_last
541 {
542 tracing::debug!("Peer {:?} has old heartbeat", peer_id);
543 let report = HeartBeatPeerReportReason::OldHeartBeat;
544 let peer_id = convert_peer_id(peer_id)?;
545 self.report_peer(peer_id, report)?;
546 } else if peer_info.heartbeat_data.average_time_between_heartbeats()
547 > self.heartbeat_max_avg_interval
548 {
549 tracing::debug!("Peer {:?} has low heartbeat frequency", peer_id);
550 let report = HeartBeatPeerReportReason::LowHeartBeatFrequency;
551 let peer_id = convert_peer_id(peer_id)?;
552 self.report_peer(peer_id, report)?;
553 }
554 }
555 Ok(())
556 }
557
558 fn report_peer(
559 &self,
560 peer_id: FuelPeerId,
561 report: HeartBeatPeerReportReason,
562 ) -> anyhow::Result<()> {
563 let app_score = match report {
564 HeartBeatPeerReportReason::OldHeartBeat => {
565 self.heartbeat_peer_reputation_config.old_heartbeat_penalty
566 }
567 HeartBeatPeerReportReason::LowHeartBeatFrequency => {
568 self.heartbeat_peer_reputation_config
569 .low_heartbeat_frequency_penalty
570 }
571 };
572 let reporting_service = "p2p";
573 self.broadcast
574 .report_peer(peer_id, app_score, reporting_service)?;
575 Ok(())
576 }
577}
578
579impl<P, V, B, T> Task<P, V, B, T>
580where
581 P: TaskP2PService + 'static,
582 V: AtomicView + 'static,
583 V::LatestView: P2pDb,
584 T: TxPool + 'static,
585 B: Send,
586{
587 fn update_metrics<U>(&self, update_fn: U)
588 where
589 U: FnOnce(),
590 {
591 self.p2p_service.update_metrics(update_fn)
592 }
593
594 fn process_request(
595 &mut self,
596 request_message: RequestMessage,
597 request_id: InboundRequestId,
598 ) -> anyhow::Result<()> {
599 match request_message {
600 RequestMessage::Transactions(range) => {
601 self.handle_transactions_request(range, request_id)
602 }
603 RequestMessage::SealedHeaders(range) => {
604 self.handle_sealed_headers_request(range, request_id)
605 }
606 RequestMessage::TxPoolAllTransactionsIds => {
607 self.handle_all_transactions_ids_request(request_id)
608 }
609 RequestMessage::TxPoolFullTransactions(tx_ids) => {
610 self.handle_full_transactions_request(tx_ids, request_id)
611 }
612 }
613 }
614
615 fn handle_db_request<DbLookUpFn, ResponseSenderFn, TaskRequestFn, R>(
616 &mut self,
617 range: Range<u32>,
618 request_id: InboundRequestId,
619 response_sender: ResponseSenderFn,
620 db_lookup: DbLookUpFn,
621 task_request: TaskRequestFn,
622 max_len: usize,
623 ) -> anyhow::Result<()>
624 where
625 DbLookUpFn: Fn(&V::LatestView, &Arc<CachedView>, Range<u32>) -> anyhow::Result<Option<R>>
626 + Send
627 + 'static,
628 ResponseSenderFn:
629 Fn(Result<R, ResponseMessageErrorCode>) -> V2ResponseMessage + Send + 'static,
630 TaskRequestFn: Fn(Result<R, ResponseMessageErrorCode>, InboundRequestId) -> TaskRequest
631 + Send
632 + 'static,
633 R: Send + 'static,
634 {
635 let instant = Instant::now();
636 let timeout = self.response_timeout;
637 let response_channel = self.request_sender.clone();
638 let range_len = range.len();
639
640 self.update_metrics(|| set_blocks_requested(range_len));
641
642 if range_len > max_len {
643 tracing::error!(
644 requested_length = range.len(),
645 max_len,
646 "Requested range is too large"
647 );
648 let response = Err(ResponseMessageErrorCode::RequestedRangeTooLarge);
649 let _ = self
650 .p2p_service
651 .send_response_msg(request_id, response_sender(response));
652 return Ok(());
653 }
654
655 let view = self.view_provider.latest_view()?;
656 let result = self.db_heavy_task_processor.try_spawn({
657 let cached_view = self.cached_view.clone();
658 move || {
659 if instant.elapsed() > timeout {
660 tracing::warn!("Request timed out");
661 return;
662 }
663
664 let response = db_lookup(&view, &cached_view, range.clone())
665 .ok()
666 .flatten()
667 .ok_or(ResponseMessageErrorCode::Timeout);
668
669 let _ = response_channel
670 .try_send(task_request(response, request_id))
671 .trace_err("Failed to send response to the request channel");
672 }
673 });
674
675 if result.is_err() {
676 let err = Err(ResponseMessageErrorCode::SyncProcessorOutOfCapacity);
677 let _ = self
678 .p2p_service
679 .send_response_msg(request_id, response_sender(err));
680 }
681
682 Ok(())
683 }
684
685 fn handle_transactions_request(
686 &mut self,
687 range: Range<u32>,
688 request_id: InboundRequestId,
689 ) -> anyhow::Result<()> {
690 self.handle_db_request(
691 range,
692 request_id,
693 V2ResponseMessage::Transactions,
694 |view, cached_view, range| {
695 cached_view
696 .get_transactions(view, range)
697 .map_err(anyhow::Error::from)
698 },
699 |response, request_id| TaskRequest::DatabaseTransactionsLookUp {
700 response,
701 request_id,
702 },
703 self.max_headers_per_request,
704 )
705 }
706
707 fn handle_sealed_headers_request(
708 &mut self,
709 range: Range<u32>,
710 request_id: InboundRequestId,
711 ) -> anyhow::Result<()> {
712 self.handle_db_request(
713 range,
714 request_id,
715 V2ResponseMessage::SealedHeaders,
716 |view, cached_view, range| {
717 cached_view
718 .get_sealed_headers(view, range)
719 .map_err(anyhow::Error::from)
720 },
721 |response, request_id| TaskRequest::DatabaseHeaderLookUp {
722 response,
723 request_id,
724 },
725 self.max_headers_per_request,
726 )
727 }
728
729 fn handle_txpool_request<F, ResponseSenderFn, TaskRequestFn, R>(
730 &mut self,
731 request_id: InboundRequestId,
732 txpool_function: F,
733 response_sender: ResponseSenderFn,
734 task_request: TaskRequestFn,
735 ) -> anyhow::Result<()>
736 where
737 ResponseSenderFn:
738 Fn(Result<R, ResponseMessageErrorCode>) -> V2ResponseMessage + Send + 'static,
739 TaskRequestFn: Fn(Result<R, ResponseMessageErrorCode>, InboundRequestId) -> TaskRequest
740 + Send
741 + 'static,
742 F: Future<Output = anyhow::Result<R>> + Send + 'static,
743 {
744 let instant = Instant::now();
745 let timeout = self.response_timeout;
746 let response_channel = self.request_sender.clone();
747 let result = self.tx_pool_heavy_task_processor.try_spawn(async move {
748 if instant.elapsed() > timeout {
749 tracing::warn!("Request timed out");
750 return;
751 }
752
753 let Ok(response) = txpool_function.await else {
754 warn!("Failed to get txpool data");
755 return;
756 };
757
758 let _ = response_channel
759 .try_send(task_request(Ok(response), request_id))
760 .trace_err("Failed to send response to the request channel");
761 });
762
763 if result.is_err() {
764 let res = Err(ResponseMessageErrorCode::SyncProcessorOutOfCapacity);
765 let _ = self
766 .p2p_service
767 .send_response_msg(request_id, response_sender(res));
768 }
769
770 Ok(())
771 }
772
773 fn handle_all_transactions_ids_request(
774 &mut self,
775 request_id: InboundRequestId,
776 ) -> anyhow::Result<()> {
777 let max_txs = self.max_txs_per_request;
778 let tx_pool = self.tx_pool.clone();
779 self.handle_txpool_request(
780 request_id,
781 async move { tx_pool.get_tx_ids(max_txs).await },
782 V2ResponseMessage::TxPoolAllTransactionsIds,
783 |response, request_id| TaskRequest::TxPoolAllTransactionsIds {
784 response,
785 request_id,
786 },
787 )
788 }
789
790 fn handle_full_transactions_request(
791 &mut self,
792 tx_ids: Vec<TxId>,
793 request_id: InboundRequestId,
794 ) -> anyhow::Result<()> {
795 if tx_ids.len() > self.max_txs_per_request {
796 self.p2p_service.send_response_msg(
797 request_id,
798 V2ResponseMessage::TxPoolFullTransactions(Err(
799 ResponseMessageErrorCode::RequestedRangeTooLarge,
800 )),
801 )?;
802 return Ok(());
803 }
804 let tx_pool = self.tx_pool.clone();
805 self.handle_txpool_request(
806 request_id,
807 async move { tx_pool.get_full_txs(tx_ids).await },
808 V2ResponseMessage::TxPoolFullTransactions,
809 |response, request_id| TaskRequest::TxPoolFullTransactions {
810 response,
811 request_id,
812 },
813 )
814 }
815}
816
817fn convert_peer_id(peer_id: &PeerId) -> anyhow::Result<FuelPeerId> {
818 let inner = Vec::from(*peer_id);
819 Ok(FuelPeerId::from(inner))
820}
821
822#[async_trait::async_trait]
823impl<V, T> RunnableService for UninitializedTask<V, SharedState, T>
824where
825 V: AtomicView + 'static,
826 V::LatestView: P2pDb,
827 T: TxPool + 'static,
828{
829 const NAME: &'static str = "P2P";
830
831 type SharedData = SharedState;
832 type Task = Task<FuelP2PService, V, SharedState, T>;
833 type TaskParams = ();
834
835 fn shared_data(&self) -> Self::SharedData {
836 self.broadcast.clone()
837 }
838
839 async fn into_task(
840 mut self,
841 _: &StateWatcher,
842 _: Self::TaskParams,
843 ) -> anyhow::Result<Self::Task> {
844 let Self {
845 chain_id,
846 last_height,
847 view_provider,
848 next_block_height,
849 request_receiver,
850 broadcast,
851 tx_pool,
852 config,
853 } = self;
854
855 let view = view_provider.latest_view()?;
856 let genesis = view.get_genesis()?;
857 let config = config.init(genesis)?;
858 let Config {
859 max_block_size,
860 max_headers_per_request,
861 max_txs_per_request,
862 heartbeat_check_interval,
863 heartbeat_max_avg_interval,
864 heartbeat_max_time_since_last,
865 database_read_threads,
866 tx_pool_threads,
867 metrics,
868 cache_size,
869 ..
870 } = config;
871
872 let heartbeat_peer_reputation_config = HeartbeatPeerReputationConfig {
875 old_heartbeat_penalty: -5.,
876 low_heartbeat_frequency_penalty: -5.,
877 };
878
879 let response_timeout = config.set_request_timeout;
880 let mut p2p_service = FuelP2PService::new(
881 broadcast.reserved_peers_broadcast.clone(),
882 config,
883 GossipsubMessageHandler::new(),
884 RequestResponseMessageHandler::new(max_block_size),
885 )
886 .await?;
887 p2p_service.update_block_height(last_height);
888 p2p_service.start().await?;
889
890 let next_check_time =
891 Instant::now().checked_add(heartbeat_check_interval).expect(
892 "The heartbeat check interval should be small enough to do frequently",
893 );
894 let db_heavy_task_processor = SyncProcessor::new(
895 "P2P_DatabaseProcessor",
896 database_read_threads,
897 1024 * 10,
898 )?;
899 let tx_pool_heavy_task_processor =
900 AsyncProcessor::new("P2P_TxPoolLookUpProcessor", tx_pool_threads, 32)?;
901 let request_sender = broadcast.request_sender.clone();
902
903 let task = Task {
904 chain_id,
905 response_timeout,
906 p2p_service,
907 view_provider,
908 request_receiver,
909 request_sender,
910 next_block_height,
911 broadcast,
912 tx_pool,
913 db_heavy_task_processor,
914 tx_pool_heavy_task_processor,
915 max_headers_per_request,
916 max_txs_per_request,
917 heartbeat_check_interval,
918 heartbeat_max_avg_interval,
919 heartbeat_max_time_since_last,
920 next_check_time,
921 heartbeat_peer_reputation_config,
922 cached_view: Arc::new(CachedView::new(
923 cache_size
924 .map(|cache_size| cache_size.into())
925 .unwrap_or(1_535),
926 metrics,
927 )),
928 };
929 Ok(task)
930 }
931}
932
933impl<P, V, B, T> RunnableTask for Task<P, V, B, T>
935where
936 P: TaskP2PService + 'static,
937 V: AtomicView + 'static,
938 V::LatestView: P2pDb,
939 B: Broadcast + 'static,
940 T: TxPool + 'static,
941{
942 async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction {
943 tokio::select! {
944 biased;
945
946 _ = watcher.while_started() => {
947 TaskNextAction::Stop
948 },
949 latest_block_height = self.next_block_height.next() => {
950 if let Some(latest_block_height) = latest_block_height {
951 let _ = self.p2p_service.update_block_height(latest_block_height);
952 TaskNextAction::Continue
953 } else {
954 TaskNextAction::Stop
955 }
956 },
957 next_service_request = self.request_receiver.recv() => {
958 match next_service_request {
959 Some(TaskRequest::BroadcastTransaction(transaction)) => {
960 let tx_id = transaction.id(&self.chain_id);
961 let broadcast = GossipsubBroadcastRequest::NewTx(transaction);
962 let result = self.p2p_service.publish_message(broadcast);
963 if let Err(e) = result {
964 tracing::error!("Got an error during transaction {} broadcasting {}", tx_id, e);
965 }
966 }
967 Some(TaskRequest::BroadcastPreConfirmations(pre_confirmation_message)) => {
968 let broadcast = GossipsubBroadcastRequest::TxPreConfirmations(pre_confirmation_message);
969 let result = self.p2p_service.publish_message(broadcast.clone());
970 if let Err(e) = result {
971 tracing::error!("Got an error during pre-confirmation message broadcasting {:?}: {}", broadcast, e);
972 }
973 }
974 Some(TaskRequest::GetSealedHeaders { block_height_range, channel}) => {
975 let height = BlockHeight::from(block_height_range.end.saturating_sub(1));
978 let Some(peer) = self.p2p_service.get_peer_id_with_height(&height) else {
979 let _ = channel.send(Err(TaskError::NoPeerFound));
980 return TaskNextAction::Continue
981 };
982 let channel = ResponseSender::SealedHeaders(channel);
983 let request_msg = RequestMessage::SealedHeaders(block_height_range.clone());
984 self.p2p_service.send_request_msg(Some(peer), request_msg, channel).expect("We always have a peer here, so send has a target");
985 }
986 Some(TaskRequest::GetTransactions {block_height_range, channel }) => {
987 let height = BlockHeight::from(block_height_range.end.saturating_sub(1));
988 let Some(peer) = self.p2p_service.get_peer_id_with_height(&height) else {
989 let _ = channel.send(Err(TaskError::NoPeerFound));
990 return TaskNextAction::Continue
991 };
992 let channel = ResponseSender::Transactions(channel);
993 let request_msg = RequestMessage::Transactions(block_height_range.clone());
994 self.p2p_service.send_request_msg(Some(peer), request_msg, channel).expect("We always have a peer here, so send has a target");
995 }
996 Some(TaskRequest::GetTransactionsFromPeer { block_height_range, from_peer, channel }) => {
997 let channel = ResponseSender::TransactionsFromPeer(channel);
998 let request_msg = RequestMessage::Transactions(block_height_range);
999 self.p2p_service.send_request_msg(Some(from_peer), request_msg, channel).expect("We always a peer here, so send has a target");
1000 }
1001 Some(TaskRequest::TxPoolGetAllTxIds { from_peer, channel }) => {
1002 let channel = ResponseSender::TxPoolAllTransactionsIds(channel);
1003 let request_msg = RequestMessage::TxPoolAllTransactionsIds;
1004 self.p2p_service.send_request_msg(Some(from_peer), request_msg, channel).expect("We always have a peer here, so send has a target");
1005 }
1006 Some(TaskRequest::TxPoolGetFullTransactions { tx_ids, from_peer, channel }) => {
1007 let channel = ResponseSender::TxPoolFullTransactions(channel);
1008 let request_msg = RequestMessage::TxPoolFullTransactions(tx_ids);
1009 self.p2p_service.send_request_msg(Some(from_peer), request_msg, channel).expect("We always have a peer here, so send has a target");
1010 }
1011 Some(TaskRequest::RespondWithGossipsubMessageReport((message, acceptance))) => {
1012 let res = self.p2p_service.report_message(message, acceptance);
1013 if let Err(err) = res {
1014 return TaskNextAction::ErrorContinue(err)
1015 }
1016 }
1017 Some(TaskRequest::RespondWithPeerReport { peer_id, score, reporting_service }) => {
1018 let _ = self.p2p_service.report_peer(peer_id, score, reporting_service);
1019 }
1020 Some(TaskRequest::GetAllPeerInfo { channel }) => {
1021 let peers = self.p2p_service.get_all_peer_info()
1022 .into_iter()
1023 .map(|(id, info)| (*id, info.clone()))
1024 .collect::<Vec<_>>();
1025 let _ = channel.send(peers);
1026 }
1027 Some(TaskRequest::DatabaseTransactionsLookUp { response, request_id }) => {
1028 let _ = self.p2p_service.send_response_msg(request_id, V2ResponseMessage::Transactions(response));
1029 }
1030 Some(TaskRequest::DatabaseHeaderLookUp { response, request_id }) => {
1031 let _ = self.p2p_service.send_response_msg(request_id, V2ResponseMessage::SealedHeaders(response));
1032 }
1033 Some(TaskRequest::TxPoolAllTransactionsIds { response, request_id }) => {
1034 let _ = self.p2p_service.send_response_msg(request_id, V2ResponseMessage::TxPoolAllTransactionsIds(response));
1035 }
1036 Some(TaskRequest::TxPoolFullTransactions { response, request_id }) => {
1037 let _ = self.p2p_service.send_response_msg(request_id, V2ResponseMessage::TxPoolFullTransactions(response));
1038 }
1039 None => {
1040 tracing::error!("The P2P `Task` should be holder of the `Sender`");
1041 return TaskNextAction::Stop
1042 }
1043 }
1044 TaskNextAction::Continue
1045 }
1046 p2p_event = self.p2p_service.next_event() => {
1047 match p2p_event {
1048 Some(FuelP2PEvent::PeerInfoUpdated { peer_id, block_height }) => {
1049 let peer_id: Vec<u8> = peer_id.into();
1050 let block_height_data = BlockHeightHeartbeatData {
1051 peer_id: peer_id.into(),
1052 block_height,
1053 };
1054
1055 let _ = self.broadcast.block_height_broadcast(block_height_data);
1056 }
1057 Some(FuelP2PEvent::GossipsubMessage { message, message_id, peer_id,.. }) => {
1058 tracing::debug!("Received gossip message from peer {:?}", peer_id);
1059 self.broadcast_gossip_message(message, message_id, peer_id);
1060 },
1061 Some(FuelP2PEvent::InboundRequestMessage { request_message, request_id }) => {
1062 let res = self.process_request(request_message, request_id);
1063 if let Err(err) = res {
1064 return TaskNextAction::ErrorContinue(err)
1065 }
1066 },
1067 Some(FuelP2PEvent::NewSubscription { peer_id, tag }) => {
1068 if tag == GossipTopicTag::NewTx {
1069 let _ = self.broadcast.new_tx_subscription_broadcast(FuelPeerId::from(peer_id.to_bytes()));
1070 }
1071 },
1072 _ => (),
1073 }
1074 TaskNextAction::Continue
1075 },
1076 _ = tokio::time::sleep_until(self.next_check_time) => {
1077 let res = self.peer_heartbeat_reputation_checks();
1078 match res {
1079 Ok(_) => tracing::debug!("Peer heartbeat reputation checks completed"),
1080 Err(e) => {
1081 tracing::error!("Failed to perform peer heartbeat reputation checks: {:?}", e);
1082 }
1083 }
1084
1085 if let Some(next_check_time) = self.next_check_time.checked_add(self.heartbeat_check_interval) {
1086 self.next_check_time = next_check_time;
1087 TaskNextAction::Continue
1088 } else {
1089 tracing::error!("Next check time overflowed");
1090 TaskNextAction::Stop
1091 }
1092 }
1093 }
1094 }
1095
1096 async fn shutdown(self) -> anyhow::Result<()> {
1097 Ok(())
1105 }
1106}
1107
1108#[derive(Clone)]
1109pub struct SharedState {
1110 new_tx_subscription_broadcast: broadcast::Sender<FuelPeerId>,
1112 tx_broadcast: broadcast::Sender<TransactionGossipData>,
1114 pre_confirmations_broadcast: broadcast::Sender<P2PPreConfirmationGossipData>,
1116 reserved_peers_broadcast: broadcast::Sender<usize>,
1118 request_sender: mpsc::Sender<TaskRequest>,
1120 block_height_broadcast: broadcast::Sender<BlockHeightHeartbeatData>,
1122 max_txs_per_request: usize,
1124}
1125
1126impl SharedState {
1127 pub fn notify_gossip_transaction_validity(
1128 &self,
1129 message_info: GossipsubMessageInfo,
1130 acceptance: GossipsubMessageAcceptance,
1131 ) -> anyhow::Result<()> {
1132 self.request_sender
1133 .try_send(TaskRequest::RespondWithGossipsubMessageReport((
1134 message_info,
1135 acceptance,
1136 )))?;
1137 Ok(())
1138 }
1139
1140 pub async fn get_sealed_block_headers(
1141 &self,
1142 block_height_range: Range<u32>,
1143 ) -> anyhow::Result<(Vec<u8>, Option<Vec<SealedBlockHeader>>)> {
1144 let (sender, receiver) = oneshot::channel();
1145
1146 if block_height_range.is_empty() {
1147 return Err(anyhow!(
1148 "Cannot retrieve headers for an empty range of block heights"
1149 ));
1150 }
1151
1152 self.request_sender
1153 .send(TaskRequest::GetSealedHeaders {
1154 block_height_range,
1155 channel: sender,
1156 })
1157 .await?;
1158
1159 let (peer_id, response) = receiver
1160 .await
1161 .map_err(|e| anyhow!("{e}"))?
1162 .map_err(|e| anyhow!("{e}"))?;
1163
1164 let data = response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?;
1165 if let Err(ref response_error_code) = data {
1166 warn!(
1167 "Peer {peer_id:?} failed to respond with sealed headers: {response_error_code:?}"
1168 );
1169 };
1170
1171 Ok((peer_id.to_bytes(), data.ok()))
1172 }
1173
1174 pub async fn get_transactions(
1175 &self,
1176 range: Range<u32>,
1177 ) -> anyhow::Result<(Vec<u8>, Option<Vec<Transactions>>)> {
1178 let (sender, receiver) = oneshot::channel();
1179
1180 if range.is_empty() {
1181 return Err(anyhow!(
1182 "Cannot retrieve transactions for an empty range of block heights"
1183 ));
1184 }
1185
1186 self.request_sender
1187 .send(TaskRequest::GetTransactions {
1188 block_height_range: range,
1189 channel: sender,
1190 })
1191 .await?;
1192
1193 let (peer_id, response) = receiver
1194 .await
1195 .map_err(|e| anyhow!("{e}"))?
1196 .map_err(|e| anyhow!("{e}"))?;
1197
1198 let data = match response {
1199 Err(request_response_protocol_error) => Err(anyhow!(
1200 "Invalid response from peer {request_response_protocol_error:?}"
1201 )),
1202 Ok(Err(response_error_code)) => {
1203 warn!(
1204 "Peer {peer_id:?} failed to respond with sealed headers: {response_error_code:?}"
1205 );
1206 Ok(None)
1207 }
1208 Ok(Ok(headers)) => Ok(Some(headers)),
1209 };
1210 data.map(|data| (peer_id.to_bytes(), data))
1211 }
1212
1213 pub async fn get_transactions_from_peer(
1214 &self,
1215 peer_id: FuelPeerId,
1216 range: Range<u32>,
1217 ) -> anyhow::Result<Option<Vec<Transactions>>> {
1218 let (sender, receiver) = oneshot::channel();
1219 let from_peer = PeerId::from_bytes(peer_id.as_ref()).expect("Valid PeerId");
1220
1221 let request = TaskRequest::GetTransactionsFromPeer {
1222 block_height_range: range,
1223 from_peer,
1224 channel: sender,
1225 };
1226 self.request_sender.send(request).await?;
1227
1228 let (response_from_peer, response) =
1229 receiver.await.map_err(|e| anyhow!("{e}"))?;
1230 assert_eq!(
1231 peer_id.as_ref(),
1232 response_from_peer.to_bytes(),
1233 "Bug: response from non-requested peer"
1234 );
1235
1236 match response {
1237 Err(request_response_protocol_error) => Err(anyhow!(
1238 "Invalid response from peer {request_response_protocol_error:?}"
1239 )),
1240 Ok(Err(response_error_code)) => {
1241 warn!(
1242 "Peer {peer_id:?} failed to respond with transactions: {response_error_code:?}"
1243 );
1244 Ok(None)
1245 }
1246 Ok(Ok(txs)) => Ok(Some(txs)),
1247 }
1248 }
1249
1250 pub async fn get_all_transactions_ids_from_peer(
1251 &self,
1252 peer_id: FuelPeerId,
1253 ) -> anyhow::Result<Vec<TxId>> {
1254 let (sender, receiver) = oneshot::channel();
1255 let from_peer = PeerId::from_bytes(peer_id.as_ref()).expect("Valid PeerId");
1256 let request = TaskRequest::TxPoolGetAllTxIds {
1257 from_peer,
1258 channel: sender,
1259 };
1260 self.request_sender.try_send(request)?;
1261
1262 let (response_from_peer, response) =
1263 receiver.await.map_err(|e| anyhow!("{e}"))?;
1264
1265 debug_assert_eq!(
1266 peer_id.as_ref(),
1267 response_from_peer.to_bytes(),
1268 "Bug: response from non-requested peer"
1269 );
1270
1271 let response =
1272 response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?;
1273
1274 let txs = response.inspect_err(|e| { warn!("Peer {peer_id:?} could not response to request to get all transactions ids: {e:?}"); } ).unwrap_or_default();
1275
1276 if txs.len() > self.max_txs_per_request {
1277 return Err(anyhow!("Too many transactions requested: {}", txs.len()));
1278 }
1279 Ok(txs)
1280 }
1281
1282 pub async fn get_full_transactions_from_peer(
1283 &self,
1284 peer_id: FuelPeerId,
1285 tx_ids: Vec<TxId>,
1286 ) -> anyhow::Result<Vec<Option<Transaction>>> {
1287 let (sender, receiver) = oneshot::channel();
1288 let from_peer = PeerId::from_bytes(peer_id.as_ref()).expect("Valid PeerId");
1289 let request = TaskRequest::TxPoolGetFullTransactions {
1290 tx_ids,
1291 from_peer,
1292 channel: sender,
1293 };
1294 self.request_sender.try_send(request)?;
1295
1296 let (response_from_peer, response) =
1297 receiver.await.map_err(|e| anyhow!("{e}"))?;
1298 debug_assert_eq!(
1299 peer_id.as_ref(),
1300 response_from_peer.to_bytes(),
1301 "Bug: response from non-requested peer"
1302 );
1303
1304 let response =
1305 response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?;
1306 let txs = response.inspect_err(|e| { warn!("Peer {peer_id:?} could not response to request to get full transactions: {e:?}"); } ).unwrap_or_default();
1307
1308 if txs.len() > self.max_txs_per_request {
1309 return Err(anyhow!("Too many transactions requested: {}", txs.len()));
1310 }
1311 txs.into_iter()
1312 .map(|tx| {
1313 tx.map(Transaction::try_from)
1314 .transpose()
1315 .map_err(|err| anyhow::anyhow!(err))
1316 })
1317 .collect()
1318 }
1319
1320 pub fn broadcast_transaction(
1321 &self,
1322 transaction: Arc<Transaction>,
1323 ) -> anyhow::Result<()> {
1324 self.request_sender
1325 .try_send(TaskRequest::BroadcastTransaction(transaction))?;
1326 Ok(())
1327 }
1328
1329 pub fn broadcast_preconfirmations(
1330 &self,
1331 preconfirmations: Arc<P2PPreConfirmationMessage>,
1332 ) -> anyhow::Result<()> {
1333 self.request_sender
1334 .try_send(TaskRequest::BroadcastPreConfirmations(preconfirmations))?;
1335 Ok(())
1336 }
1337
1338 pub async fn get_all_peers(&self) -> anyhow::Result<Vec<(PeerId, PeerInfo)>> {
1339 let (sender, receiver) = oneshot::channel();
1340
1341 self.request_sender
1342 .send(TaskRequest::GetAllPeerInfo { channel: sender })
1343 .await?;
1344
1345 receiver.await.map_err(|e| anyhow!("{}", e))
1346 }
1347
1348 pub fn subscribe_new_peers(&self) -> broadcast::Receiver<FuelPeerId> {
1349 self.new_tx_subscription_broadcast.subscribe()
1350 }
1351
1352 pub fn subscribe_tx(&self) -> broadcast::Receiver<TransactionGossipData> {
1353 self.tx_broadcast.subscribe()
1354 }
1355
1356 pub fn subscribe_preconfirmations(
1357 &self,
1358 ) -> broadcast::Receiver<P2PPreConfirmationGossipData> {
1359 self.pre_confirmations_broadcast.subscribe()
1360 }
1361
1362 pub fn subscribe_block_height(
1363 &self,
1364 ) -> broadcast::Receiver<BlockHeightHeartbeatData> {
1365 self.block_height_broadcast.subscribe()
1366 }
1367
1368 pub fn subscribe_reserved_peers_count(&self) -> broadcast::Receiver<usize> {
1369 self.reserved_peers_broadcast.subscribe()
1370 }
1371
1372 pub fn report_peer<T: PeerReport>(
1373 &self,
1374 peer_id: FuelPeerId,
1375 peer_report: T,
1376 reporting_service: &'static str,
1377 ) -> anyhow::Result<()> {
1378 match Vec::from(peer_id).try_into() {
1379 Ok(peer_id) => {
1380 let score = peer_report.get_score_from_report();
1381
1382 self.request_sender
1383 .try_send(TaskRequest::RespondWithPeerReport {
1384 peer_id,
1385 score,
1386 reporting_service,
1387 })?;
1388
1389 Ok(())
1390 }
1391 Err(e) => {
1392 warn!(target: "fuel-p2p", "Failed to read PeerId from {e:?}");
1393 Err(anyhow::anyhow!("Failed to read PeerId from {e:?}"))
1394 }
1395 }
1396 }
1397}
1398
1399pub fn build_shared_state(
1400 config: Config<NotInitialized>,
1401) -> (SharedState, Receiver<TaskRequest>) {
1402 let (request_sender, request_receiver) = mpsc::channel(CHANNEL_SIZE);
1403 let (tx_broadcast, _) = broadcast::channel(CHANNEL_SIZE);
1404 let (preconfirmations_broadcast, _) = broadcast::channel(CHANNEL_SIZE);
1405 let (new_tx_subscription_broadcast, _) = broadcast::channel(CHANNEL_SIZE);
1406 let (block_height_broadcast, _) = broadcast::channel(CHANNEL_SIZE);
1407
1408 let (reserved_peers_broadcast, _) = broadcast::channel::<usize>(
1409 config
1410 .reserved_nodes
1411 .len()
1412 .saturating_mul(2)
1413 .saturating_add(1),
1414 );
1415
1416 (
1417 SharedState {
1418 request_sender,
1419 new_tx_subscription_broadcast,
1420 tx_broadcast,
1421 pre_confirmations_broadcast: preconfirmations_broadcast,
1422 reserved_peers_broadcast,
1423 block_height_broadcast,
1424 max_txs_per_request: config.max_txs_per_request,
1425 },
1426 request_receiver,
1427 )
1428}
1429
1430#[allow(clippy::too_many_arguments)]
1431pub fn new_service<V, B, T>(
1432 chain_id: ChainId,
1433 last_height: BlockHeight,
1434 p2p_config: Config<NotInitialized>,
1435 shared_state: SharedState,
1436 request_receiver: Receiver<TaskRequest>,
1437 view_provider: V,
1438 block_importer: B,
1439 tx_pool: T,
1440) -> Service<V, T>
1441where
1442 V: AtomicView + 'static,
1443 V::LatestView: P2pDb,
1444 B: BlockHeightImporter,
1445 T: TxPool,
1446{
1447 let task = UninitializedTask::new(
1448 chain_id,
1449 last_height,
1450 p2p_config,
1451 shared_state,
1452 request_receiver,
1453 view_provider,
1454 block_importer,
1455 tx_pool,
1456 );
1457 Service::new(task)
1458}
1459
1460pub fn to_message_acceptance(
1461 acceptance: &GossipsubMessageAcceptance,
1462) -> MessageAcceptance {
1463 match acceptance {
1464 GossipsubMessageAcceptance::Accept => MessageAcceptance::Accept,
1465 GossipsubMessageAcceptance::Reject => MessageAcceptance::Reject,
1466 GossipsubMessageAcceptance::Ignore => MessageAcceptance::Ignore,
1467 }
1468}
1469
1470fn report_message(
1471 p2p_service: &mut FuelP2PService,
1472 message: GossipsubMessageInfo,
1473 acceptance: GossipsubMessageAcceptance,
1474) {
1475 let GossipsubMessageInfo {
1476 peer_id,
1477 message_id,
1478 } = message;
1479
1480 let msg_id = message_id.into();
1481 let peer_id: Vec<u8> = peer_id.into();
1482
1483 if let Ok(peer_id) = peer_id.try_into() {
1484 let acceptance = to_message_acceptance(&acceptance);
1485 p2p_service.report_message_validation_result(&msg_id, peer_id, acceptance);
1486 } else {
1487 warn!(target: "fuel-p2p", "Failed to read PeerId from received GossipsubMessageId: {}", msg_id);
1488 }
1489}