fuel_core_p2p/
service.rs

1use crate::{
2    cached_view::CachedView,
3    codecs::{
4        gossipsub::GossipsubMessageHandler,
5        request_response::RequestResponseMessageHandler,
6    },
7    config::{
8        Config,
9        NotInitialized,
10    },
11    gossipsub::messages::{
12        GossipTopicTag,
13        GossipsubBroadcastRequest,
14        GossipsubMessage,
15    },
16    p2p_service::{
17        FuelP2PEvent,
18        FuelP2PService,
19    },
20    peer_manager::PeerInfo,
21    ports::{
22        BlockHeightImporter,
23        P2PPreConfirmationGossipData,
24        P2PPreConfirmationMessage,
25        P2pDb,
26        TxPool,
27    },
28    request_response::messages::{
29        OnResponse,
30        OnResponseWithPeerSelection,
31        RequestMessage,
32        ResponseMessageErrorCode,
33        ResponseSender,
34        V2ResponseMessage,
35    },
36};
37use anyhow::anyhow;
38use fuel_core_metrics::p2p_metrics::set_blocks_requested;
39use fuel_core_services::{
40    AsyncProcessor,
41    RunnableService,
42    RunnableTask,
43    ServiceRunner,
44    StateWatcher,
45    SyncProcessor,
46    TaskNextAction,
47    TraceErr,
48    stream::BoxStream,
49};
50use fuel_core_storage::transactional::AtomicView;
51use fuel_core_types::{
52    blockchain::SealedBlockHeader,
53    fuel_tx::{
54        Transaction,
55        TxId,
56        UniqueIdentifier,
57    },
58    fuel_types::{
59        BlockHeight,
60        ChainId,
61    },
62    services::p2p::{
63        BlockHeightHeartbeatData,
64        GossipData,
65        GossipsubMessageAcceptance,
66        GossipsubMessageInfo,
67        NetworkableTransactionPool,
68        PeerId as FuelPeerId,
69        TransactionGossipData,
70        Transactions,
71        peer_reputation::{
72            AppScore,
73            PeerReport,
74        },
75    },
76};
77use futures::{
78    StreamExt,
79    future::BoxFuture,
80};
81use libp2p::{
82    PeerId,
83    gossipsub::{
84        MessageAcceptance,
85        MessageId,
86        PublishError,
87    },
88    request_response::InboundRequestId,
89};
90use std::{
91    fmt::Debug,
92    future::Future,
93    ops::Range,
94    sync::Arc,
95};
96use thiserror::Error;
97use tokio::{
98    sync::{
99        broadcast,
100        mpsc::{
101            self,
102            Receiver,
103        },
104        oneshot,
105    },
106    time::{
107        Duration,
108        Instant,
109    },
110};
111use tracing::warn;
112
113#[cfg(test)]
114pub mod broadcast_tests;
115#[cfg(test)]
116pub mod task_tests;
117
118const CHANNEL_SIZE: usize = 1024 * 10;
119
120pub type Service<V, T> = ServiceRunner<UninitializedTask<V, SharedState, T>>;
121
122#[derive(Debug, Error)]
123pub enum TaskError {
124    #[error("No peer found to send request to")]
125    NoPeerFound,
126}
127
128pub enum TaskRequest {
129    // Broadcast requests to p2p network
130    BroadcastTransaction(Arc<Transaction>),
131    // Broadcast Preconfirmations to p2p network
132    BroadcastPreConfirmations(Arc<P2PPreConfirmationMessage>),
133    // Request to get information about all connected peers
134    GetAllPeerInfo {
135        channel: oneshot::Sender<Vec<(PeerId, PeerInfo)>>,
136    },
137    GetSealedHeaders {
138        block_height_range: Range<u32>,
139        channel: OnResponseWithPeerSelection<
140            Result<Vec<SealedBlockHeader>, ResponseMessageErrorCode>,
141        >,
142    },
143    GetTransactions {
144        block_height_range: Range<u32>,
145        channel: OnResponseWithPeerSelection<
146            Result<Vec<Transactions>, ResponseMessageErrorCode>,
147        >,
148    },
149    GetTransactionsFromPeer {
150        block_height_range: Range<u32>,
151        from_peer: PeerId,
152        channel: OnResponse<Result<Vec<Transactions>, ResponseMessageErrorCode>>,
153    },
154    TxPoolGetAllTxIds {
155        from_peer: PeerId,
156        channel: OnResponse<Result<Vec<TxId>, ResponseMessageErrorCode>>,
157    },
158    TxPoolGetFullTransactions {
159        tx_ids: Vec<TxId>,
160        from_peer: PeerId,
161        channel: OnResponse<
162            Result<Vec<Option<NetworkableTransactionPool>>, ResponseMessageErrorCode>,
163        >,
164    },
165    // Responds back to the p2p network
166    RespondWithGossipsubMessageReport((GossipsubMessageInfo, GossipsubMessageAcceptance)),
167    RespondWithPeerReport {
168        peer_id: PeerId,
169        score: AppScore,
170        reporting_service: &'static str,
171    },
172    DatabaseTransactionsLookUp {
173        response: Result<Vec<Transactions>, ResponseMessageErrorCode>,
174        request_id: InboundRequestId,
175    },
176    DatabaseHeaderLookUp {
177        response: Result<Vec<SealedBlockHeader>, ResponseMessageErrorCode>,
178        request_id: InboundRequestId,
179    },
180    TxPoolAllTransactionsIds {
181        response: Result<Vec<TxId>, ResponseMessageErrorCode>,
182        request_id: InboundRequestId,
183    },
184    TxPoolFullTransactions {
185        response:
186            Result<Vec<Option<NetworkableTransactionPool>>, ResponseMessageErrorCode>,
187        request_id: InboundRequestId,
188    },
189}
190
191impl Debug for TaskRequest {
192    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193        match self {
194            TaskRequest::BroadcastTransaction(_) => {
195                write!(f, "TaskRequest::BroadcastTransaction")
196            }
197            TaskRequest::BroadcastPreConfirmations(_) => {
198                write!(f, "TaskRequest::BroadcastPreConfirmations")
199            }
200            TaskRequest::GetSealedHeaders { .. } => {
201                write!(f, "TaskRequest::GetSealedHeaders")
202            }
203            TaskRequest::GetTransactions { .. } => {
204                write!(f, "TaskRequest::GetTransactions")
205            }
206            TaskRequest::GetTransactionsFromPeer { .. } => {
207                write!(f, "TaskRequest::GetTransactionsFromPeer")
208            }
209            TaskRequest::TxPoolGetAllTxIds { .. } => {
210                write!(f, "TaskRequest::TxPoolGetAllTxIds")
211            }
212            TaskRequest::TxPoolGetFullTransactions { .. } => {
213                write!(f, "TaskRequest::TxPoolGetFullTransactions")
214            }
215            TaskRequest::RespondWithGossipsubMessageReport(_) => {
216                write!(f, "TaskRequest::RespondWithGossipsubMessageReport")
217            }
218            TaskRequest::RespondWithPeerReport { .. } => {
219                write!(f, "TaskRequest::RespondWithPeerReport")
220            }
221            TaskRequest::GetAllPeerInfo { .. } => {
222                write!(f, "TaskRequest::GetPeerInfo")
223            }
224            TaskRequest::DatabaseTransactionsLookUp { .. } => {
225                write!(f, "TaskRequest::DatabaseTransactionsLookUp")
226            }
227            TaskRequest::DatabaseHeaderLookUp { .. } => {
228                write!(f, "TaskRequest::DatabaseHeaderLookUp")
229            }
230            TaskRequest::TxPoolAllTransactionsIds { .. } => {
231                write!(f, "TaskRequest::TxPoolAllTransactionsIds")
232            }
233            TaskRequest::TxPoolFullTransactions { .. } => {
234                write!(f, "TaskRequest::TxPoolFullTransactions")
235            }
236        }
237    }
238}
239
240#[derive(Debug, Clone, Copy, PartialEq, Eq)]
241pub enum HeartBeatPeerReportReason {
242    OldHeartBeat,
243    LowHeartBeatFrequency,
244}
245
246pub trait TaskP2PService: Send {
247    fn get_all_peer_info(&self) -> Vec<(&PeerId, &PeerInfo)>;
248    fn get_peer_id_with_height(&self, height: &BlockHeight) -> Option<PeerId>;
249
250    fn next_event(&mut self) -> BoxFuture<'_, Option<FuelP2PEvent>>;
251
252    fn publish_message(
253        &mut self,
254        message: GossipsubBroadcastRequest,
255    ) -> anyhow::Result<()>;
256
257    fn send_request_msg(
258        &mut self,
259        peer_id: Option<PeerId>,
260        request_msg: RequestMessage,
261        on_response: ResponseSender,
262    ) -> anyhow::Result<()>;
263
264    fn send_response_msg(
265        &mut self,
266        request_id: InboundRequestId,
267        message: V2ResponseMessage,
268    ) -> anyhow::Result<()>;
269
270    fn report_message(
271        &mut self,
272        message: GossipsubMessageInfo,
273        acceptance: GossipsubMessageAcceptance,
274    ) -> anyhow::Result<()>;
275
276    fn report_peer(
277        &mut self,
278        peer_id: PeerId,
279        score: AppScore,
280        reporting_service: &str,
281    ) -> anyhow::Result<()>;
282
283    fn update_block_height(&mut self, height: BlockHeight) -> anyhow::Result<()>;
284
285    fn update_metrics<T>(&self, update_fn: T)
286    where
287        T: FnOnce();
288}
289
290impl TaskP2PService for FuelP2PService {
291    fn update_metrics<T>(&self, update_fn: T)
292    where
293        T: FnOnce(),
294    {
295        FuelP2PService::update_metrics(self, update_fn)
296    }
297
298    fn get_all_peer_info(&self) -> Vec<(&PeerId, &PeerInfo)> {
299        self.peer_manager().get_all_peers().collect()
300    }
301
302    fn get_peer_id_with_height(&self, height: &BlockHeight) -> Option<PeerId> {
303        self.peer_manager().get_peer_id_with_height(height)
304    }
305
306    fn next_event(&mut self) -> BoxFuture<'_, Option<FuelP2PEvent>> {
307        Box::pin(self.next_event())
308    }
309
310    fn publish_message(
311        &mut self,
312        message: GossipsubBroadcastRequest,
313    ) -> anyhow::Result<()> {
314        let result = self.publish_message(message);
315
316        match result {
317            Ok(_) => Ok(()),
318            Err(e) => {
319                if matches!(&e, PublishError::InsufficientPeers) {
320                    Ok(())
321                } else {
322                    Err(anyhow!(e))
323                }
324            }
325        }
326    }
327
328    fn send_request_msg(
329        &mut self,
330        peer_id: Option<PeerId>,
331        request_msg: RequestMessage,
332        on_response: ResponseSender,
333    ) -> anyhow::Result<()> {
334        self.send_request_msg(peer_id, request_msg, on_response)?;
335        Ok(())
336    }
337
338    fn send_response_msg(
339        &mut self,
340        request_id: InboundRequestId,
341        message: V2ResponseMessage,
342    ) -> anyhow::Result<()> {
343        self.send_response_msg(request_id, message)?;
344        Ok(())
345    }
346
347    fn report_message(
348        &mut self,
349        message: GossipsubMessageInfo,
350        acceptance: GossipsubMessageAcceptance,
351    ) -> anyhow::Result<()> {
352        report_message(self, message, acceptance);
353        Ok(())
354    }
355
356    fn report_peer(
357        &mut self,
358        peer_id: PeerId,
359        score: AppScore,
360        reporting_service: &str,
361    ) -> anyhow::Result<()> {
362        self.report_peer(peer_id, score, reporting_service);
363        Ok(())
364    }
365
366    fn update_block_height(&mut self, height: BlockHeight) -> anyhow::Result<()> {
367        self.update_block_height(height);
368        Ok(())
369    }
370}
371
372pub trait Broadcast: Send {
373    fn report_peer(
374        &self,
375        peer_id: FuelPeerId,
376        report: AppScore,
377        reporting_service: &'static str,
378    ) -> anyhow::Result<()>;
379
380    fn block_height_broadcast(
381        &self,
382        block_height_data: BlockHeightHeartbeatData,
383    ) -> anyhow::Result<()>;
384
385    fn tx_broadcast(&self, transaction: TransactionGossipData) -> anyhow::Result<()>;
386
387    fn pre_confirmation_broadcast(
388        &self,
389        confirmations: P2PPreConfirmationGossipData,
390    ) -> anyhow::Result<()>;
391
392    fn new_tx_subscription_broadcast(&self, peer_id: FuelPeerId) -> anyhow::Result<()>;
393}
394
395impl Broadcast for SharedState {
396    fn report_peer(
397        &self,
398        peer_id: FuelPeerId,
399        report: AppScore,
400        reporting_service: &'static str,
401    ) -> anyhow::Result<()> {
402        self.report_peer(peer_id, report, reporting_service)
403    }
404
405    fn block_height_broadcast(
406        &self,
407        block_height_data: BlockHeightHeartbeatData,
408    ) -> anyhow::Result<()> {
409        self.block_height_broadcast.send(block_height_data)?;
410        Ok(())
411    }
412
413    fn tx_broadcast(&self, transaction: TransactionGossipData) -> anyhow::Result<()> {
414        self.tx_broadcast.send(transaction)?;
415        Ok(())
416    }
417
418    fn pre_confirmation_broadcast(
419        &self,
420        confirmations: P2PPreConfirmationGossipData,
421    ) -> anyhow::Result<()> {
422        self.pre_confirmations_broadcast.send(confirmations)?;
423        Ok(())
424    }
425
426    fn new_tx_subscription_broadcast(&self, peer_id: FuelPeerId) -> anyhow::Result<()> {
427        self.new_tx_subscription_broadcast.send(peer_id)?;
428        Ok(())
429    }
430}
431
432/// Uninitialized task for the p2p that can be upgraded later into [`Task`].
433pub struct UninitializedTask<V, B, T> {
434    chain_id: ChainId,
435    last_height: BlockHeight,
436    view_provider: V,
437    next_block_height: BoxStream<BlockHeight>,
438    /// Receive internal Task Requests
439    request_receiver: mpsc::Receiver<TaskRequest>,
440    broadcast: B,
441    tx_pool: T,
442    config: Config<NotInitialized>,
443}
444
445/// Orchestrates various p2p-related events between the inner `P2pService`
446/// and the top level `NetworkService`.
447pub struct Task<P, V, B, T> {
448    chain_id: ChainId,
449    response_timeout: Duration,
450    p2p_service: P,
451    view_provider: V,
452    next_block_height: BoxStream<BlockHeight>,
453    /// Receive internal Task Requests
454    request_receiver: mpsc::Receiver<TaskRequest>,
455    request_sender: mpsc::Sender<TaskRequest>,
456    db_heavy_task_processor: SyncProcessor,
457    tx_pool_heavy_task_processor: AsyncProcessor,
458    broadcast: B,
459    tx_pool: T,
460    max_headers_per_request: usize,
461    max_txs_per_request: usize,
462    // milliseconds wait time between peer heartbeat reputation checks
463    heartbeat_check_interval: Duration,
464    heartbeat_max_avg_interval: Duration,
465    heartbeat_max_time_since_last: Duration,
466    next_check_time: Instant,
467    heartbeat_peer_reputation_config: HeartbeatPeerReputationConfig,
468    // cached view
469    cached_view: Arc<CachedView>,
470}
471
472impl<P, V, B, T> Task<P, V, B, T>
473where
474    P: TaskP2PService,
475    B: Broadcast,
476{
477    pub(crate) fn broadcast_gossip_message(
478        &mut self,
479        message: GossipsubMessage,
480        message_id: MessageId,
481        peer_id: PeerId,
482    ) {
483        let message_id = message_id.0;
484
485        match message {
486            GossipsubMessage::NewTx(transaction) => {
487                let next_transaction = GossipData::new(transaction, peer_id, message_id);
488                let _ = self.broadcast.tx_broadcast(next_transaction);
489            }
490            GossipsubMessage::TxPreConfirmations(confirmations) => {
491                let data = GossipData::new(confirmations, peer_id, message_id);
492                let _ = self.broadcast.pre_confirmation_broadcast(data);
493            }
494        }
495    }
496}
497
498#[derive(Default, Clone)]
499pub struct HeartbeatPeerReputationConfig {
500    old_heartbeat_penalty: AppScore,
501    low_heartbeat_frequency_penalty: AppScore,
502}
503
504impl<V, T> UninitializedTask<V, SharedState, T> {
505    #[allow(clippy::too_many_arguments)]
506    pub fn new<B: BlockHeightImporter>(
507        chain_id: ChainId,
508        last_height: BlockHeight,
509        config: Config<NotInitialized>,
510        shared_state: SharedState,
511        request_receiver: Receiver<TaskRequest>,
512        view_provider: V,
513        block_importer: B,
514        tx_pool: T,
515    ) -> Self {
516        let next_block_height = block_importer.next_block_height();
517
518        Self {
519            chain_id,
520            last_height,
521            view_provider,
522            tx_pool,
523            next_block_height,
524            request_receiver,
525            broadcast: shared_state,
526            config,
527        }
528    }
529}
530
531impl<P, V, B, T> Task<P, V, B, T>
532where
533    P: TaskP2PService,
534    V: AtomicView,
535    B: Broadcast,
536{
537    fn peer_heartbeat_reputation_checks(&self) -> anyhow::Result<()> {
538        for (peer_id, peer_info) in self.p2p_service.get_all_peer_info() {
539            if peer_info.heartbeat_data.duration_since_last_heartbeat()
540                > self.heartbeat_max_time_since_last
541            {
542                tracing::debug!("Peer {:?} has old heartbeat", peer_id);
543                let report = HeartBeatPeerReportReason::OldHeartBeat;
544                let peer_id = convert_peer_id(peer_id)?;
545                self.report_peer(peer_id, report)?;
546            } else if peer_info.heartbeat_data.average_time_between_heartbeats()
547                > self.heartbeat_max_avg_interval
548            {
549                tracing::debug!("Peer {:?} has low heartbeat frequency", peer_id);
550                let report = HeartBeatPeerReportReason::LowHeartBeatFrequency;
551                let peer_id = convert_peer_id(peer_id)?;
552                self.report_peer(peer_id, report)?;
553            }
554        }
555        Ok(())
556    }
557
558    fn report_peer(
559        &self,
560        peer_id: FuelPeerId,
561        report: HeartBeatPeerReportReason,
562    ) -> anyhow::Result<()> {
563        let app_score = match report {
564            HeartBeatPeerReportReason::OldHeartBeat => {
565                self.heartbeat_peer_reputation_config.old_heartbeat_penalty
566            }
567            HeartBeatPeerReportReason::LowHeartBeatFrequency => {
568                self.heartbeat_peer_reputation_config
569                    .low_heartbeat_frequency_penalty
570            }
571        };
572        let reporting_service = "p2p";
573        self.broadcast
574            .report_peer(peer_id, app_score, reporting_service)?;
575        Ok(())
576    }
577}
578
579impl<P, V, B, T> Task<P, V, B, T>
580where
581    P: TaskP2PService + 'static,
582    V: AtomicView + 'static,
583    V::LatestView: P2pDb,
584    T: TxPool + 'static,
585    B: Send,
586{
587    fn update_metrics<U>(&self, update_fn: U)
588    where
589        U: FnOnce(),
590    {
591        self.p2p_service.update_metrics(update_fn)
592    }
593
594    fn process_request(
595        &mut self,
596        request_message: RequestMessage,
597        request_id: InboundRequestId,
598    ) -> anyhow::Result<()> {
599        match request_message {
600            RequestMessage::Transactions(range) => {
601                self.handle_transactions_request(range, request_id)
602            }
603            RequestMessage::SealedHeaders(range) => {
604                self.handle_sealed_headers_request(range, request_id)
605            }
606            RequestMessage::TxPoolAllTransactionsIds => {
607                self.handle_all_transactions_ids_request(request_id)
608            }
609            RequestMessage::TxPoolFullTransactions(tx_ids) => {
610                self.handle_full_transactions_request(tx_ids, request_id)
611            }
612        }
613    }
614
615    fn handle_db_request<DbLookUpFn, ResponseSenderFn, TaskRequestFn, R>(
616        &mut self,
617        range: Range<u32>,
618        request_id: InboundRequestId,
619        response_sender: ResponseSenderFn,
620        db_lookup: DbLookUpFn,
621        task_request: TaskRequestFn,
622        max_len: usize,
623    ) -> anyhow::Result<()>
624    where
625        DbLookUpFn: Fn(&V::LatestView, &Arc<CachedView>, Range<u32>) -> anyhow::Result<Option<R>>
626            + Send
627            + 'static,
628        ResponseSenderFn:
629            Fn(Result<R, ResponseMessageErrorCode>) -> V2ResponseMessage + Send + 'static,
630        TaskRequestFn: Fn(Result<R, ResponseMessageErrorCode>, InboundRequestId) -> TaskRequest
631            + Send
632            + 'static,
633        R: Send + 'static,
634    {
635        let instant = Instant::now();
636        let timeout = self.response_timeout;
637        let response_channel = self.request_sender.clone();
638        let range_len = range.len();
639
640        self.update_metrics(|| set_blocks_requested(range_len));
641
642        if range_len > max_len {
643            tracing::error!(
644                requested_length = range.len(),
645                max_len,
646                "Requested range is too large"
647            );
648            let response = Err(ResponseMessageErrorCode::RequestedRangeTooLarge);
649            let _ = self
650                .p2p_service
651                .send_response_msg(request_id, response_sender(response));
652            return Ok(());
653        }
654
655        let view = self.view_provider.latest_view()?;
656        let result = self.db_heavy_task_processor.try_spawn({
657            let cached_view = self.cached_view.clone();
658            move || {
659                if instant.elapsed() > timeout {
660                    tracing::warn!("Request timed out");
661                    return;
662                }
663
664                let response = db_lookup(&view, &cached_view, range.clone())
665                    .ok()
666                    .flatten()
667                    .ok_or(ResponseMessageErrorCode::Timeout);
668
669                let _ = response_channel
670                    .try_send(task_request(response, request_id))
671                    .trace_err("Failed to send response to the request channel");
672            }
673        });
674
675        if result.is_err() {
676            let err = Err(ResponseMessageErrorCode::SyncProcessorOutOfCapacity);
677            let _ = self
678                .p2p_service
679                .send_response_msg(request_id, response_sender(err));
680        }
681
682        Ok(())
683    }
684
685    fn handle_transactions_request(
686        &mut self,
687        range: Range<u32>,
688        request_id: InboundRequestId,
689    ) -> anyhow::Result<()> {
690        self.handle_db_request(
691            range,
692            request_id,
693            V2ResponseMessage::Transactions,
694            |view, cached_view, range| {
695                cached_view
696                    .get_transactions(view, range)
697                    .map_err(anyhow::Error::from)
698            },
699            |response, request_id| TaskRequest::DatabaseTransactionsLookUp {
700                response,
701                request_id,
702            },
703            self.max_headers_per_request,
704        )
705    }
706
707    fn handle_sealed_headers_request(
708        &mut self,
709        range: Range<u32>,
710        request_id: InboundRequestId,
711    ) -> anyhow::Result<()> {
712        self.handle_db_request(
713            range,
714            request_id,
715            V2ResponseMessage::SealedHeaders,
716            |view, cached_view, range| {
717                cached_view
718                    .get_sealed_headers(view, range)
719                    .map_err(anyhow::Error::from)
720            },
721            |response, request_id| TaskRequest::DatabaseHeaderLookUp {
722                response,
723                request_id,
724            },
725            self.max_headers_per_request,
726        )
727    }
728
729    fn handle_txpool_request<F, ResponseSenderFn, TaskRequestFn, R>(
730        &mut self,
731        request_id: InboundRequestId,
732        txpool_function: F,
733        response_sender: ResponseSenderFn,
734        task_request: TaskRequestFn,
735    ) -> anyhow::Result<()>
736    where
737        ResponseSenderFn:
738            Fn(Result<R, ResponseMessageErrorCode>) -> V2ResponseMessage + Send + 'static,
739        TaskRequestFn: Fn(Result<R, ResponseMessageErrorCode>, InboundRequestId) -> TaskRequest
740            + Send
741            + 'static,
742        F: Future<Output = anyhow::Result<R>> + Send + 'static,
743    {
744        let instant = Instant::now();
745        let timeout = self.response_timeout;
746        let response_channel = self.request_sender.clone();
747        let result = self.tx_pool_heavy_task_processor.try_spawn(async move {
748            if instant.elapsed() > timeout {
749                tracing::warn!("Request timed out");
750                return;
751            }
752
753            let Ok(response) = txpool_function.await else {
754                warn!("Failed to get txpool data");
755                return;
756            };
757
758            let _ = response_channel
759                .try_send(task_request(Ok(response), request_id))
760                .trace_err("Failed to send response to the request channel");
761        });
762
763        if result.is_err() {
764            let res = Err(ResponseMessageErrorCode::SyncProcessorOutOfCapacity);
765            let _ = self
766                .p2p_service
767                .send_response_msg(request_id, response_sender(res));
768        }
769
770        Ok(())
771    }
772
773    fn handle_all_transactions_ids_request(
774        &mut self,
775        request_id: InboundRequestId,
776    ) -> anyhow::Result<()> {
777        let max_txs = self.max_txs_per_request;
778        let tx_pool = self.tx_pool.clone();
779        self.handle_txpool_request(
780            request_id,
781            async move { tx_pool.get_tx_ids(max_txs).await },
782            V2ResponseMessage::TxPoolAllTransactionsIds,
783            |response, request_id| TaskRequest::TxPoolAllTransactionsIds {
784                response,
785                request_id,
786            },
787        )
788    }
789
790    fn handle_full_transactions_request(
791        &mut self,
792        tx_ids: Vec<TxId>,
793        request_id: InboundRequestId,
794    ) -> anyhow::Result<()> {
795        if tx_ids.len() > self.max_txs_per_request {
796            self.p2p_service.send_response_msg(
797                request_id,
798                V2ResponseMessage::TxPoolFullTransactions(Err(
799                    ResponseMessageErrorCode::RequestedRangeTooLarge,
800                )),
801            )?;
802            return Ok(());
803        }
804        let tx_pool = self.tx_pool.clone();
805        self.handle_txpool_request(
806            request_id,
807            async move { tx_pool.get_full_txs(tx_ids).await },
808            V2ResponseMessage::TxPoolFullTransactions,
809            |response, request_id| TaskRequest::TxPoolFullTransactions {
810                response,
811                request_id,
812            },
813        )
814    }
815}
816
817fn convert_peer_id(peer_id: &PeerId) -> anyhow::Result<FuelPeerId> {
818    let inner = Vec::from(*peer_id);
819    Ok(FuelPeerId::from(inner))
820}
821
822#[async_trait::async_trait]
823impl<V, T> RunnableService for UninitializedTask<V, SharedState, T>
824where
825    V: AtomicView + 'static,
826    V::LatestView: P2pDb,
827    T: TxPool + 'static,
828{
829    const NAME: &'static str = "P2P";
830
831    type SharedData = SharedState;
832    type Task = Task<FuelP2PService, V, SharedState, T>;
833    type TaskParams = ();
834
835    fn shared_data(&self) -> Self::SharedData {
836        self.broadcast.clone()
837    }
838
839    async fn into_task(
840        mut self,
841        _: &StateWatcher,
842        _: Self::TaskParams,
843    ) -> anyhow::Result<Self::Task> {
844        let Self {
845            chain_id,
846            last_height,
847            view_provider,
848            next_block_height,
849            request_receiver,
850            broadcast,
851            tx_pool,
852            config,
853        } = self;
854
855        let view = view_provider.latest_view()?;
856        let genesis = view.get_genesis()?;
857        let config = config.init(genesis)?;
858        let Config {
859            max_block_size,
860            max_headers_per_request,
861            max_txs_per_request,
862            heartbeat_check_interval,
863            heartbeat_max_avg_interval,
864            heartbeat_max_time_since_last,
865            database_read_threads,
866            tx_pool_threads,
867            metrics,
868            ..
869        } = config;
870
871        // Hardcoded for now, but left here to be configurable in the future.
872        // TODO: https://github.com/FuelLabs/fuel-core/issues/1340
873        let heartbeat_peer_reputation_config = HeartbeatPeerReputationConfig {
874            old_heartbeat_penalty: -5.,
875            low_heartbeat_frequency_penalty: -5.,
876        };
877
878        let response_timeout = config.set_request_timeout;
879        let mut p2p_service = FuelP2PService::new(
880            broadcast.reserved_peers_broadcast.clone(),
881            config,
882            GossipsubMessageHandler::new(),
883            RequestResponseMessageHandler::new(max_block_size),
884        )
885        .await?;
886        p2p_service.update_block_height(last_height);
887        p2p_service.start().await?;
888
889        let next_check_time =
890            Instant::now().checked_add(heartbeat_check_interval).expect(
891                "The heartbeat check interval should be small enough to do frequently",
892            );
893        let db_heavy_task_processor = SyncProcessor::new(
894            "P2P_DatabaseProcessor",
895            database_read_threads,
896            1024 * 10,
897        )?;
898        let tx_pool_heavy_task_processor =
899            AsyncProcessor::new("P2P_TxPoolLookUpProcessor", tx_pool_threads, 32)?;
900        let request_sender = broadcast.request_sender.clone();
901
902        let task = Task {
903            chain_id,
904            response_timeout,
905            p2p_service,
906            view_provider,
907            request_receiver,
908            request_sender,
909            next_block_height,
910            broadcast,
911            tx_pool,
912            db_heavy_task_processor,
913            tx_pool_heavy_task_processor,
914            max_headers_per_request,
915            max_txs_per_request,
916            heartbeat_check_interval,
917            heartbeat_max_avg_interval,
918            heartbeat_max_time_since_last,
919            next_check_time,
920            heartbeat_peer_reputation_config,
921            cached_view: Arc::new(CachedView::new(614 * 10, metrics)),
922        };
923        Ok(task)
924    }
925}
926
927// TODO: Add tests https://github.com/FuelLabs/fuel-core/issues/1275
928impl<P, V, B, T> RunnableTask for Task<P, V, B, T>
929where
930    P: TaskP2PService + 'static,
931    V: AtomicView + 'static,
932    V::LatestView: P2pDb,
933    B: Broadcast + 'static,
934    T: TxPool + 'static,
935{
936    async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction {
937        tokio::select! {
938            biased;
939
940            _ = watcher.while_started() => {
941                TaskNextAction::Stop
942            },
943            latest_block_height = self.next_block_height.next() => {
944                if let Some(latest_block_height) = latest_block_height {
945                    let _ = self.p2p_service.update_block_height(latest_block_height);
946                    TaskNextAction::Continue
947                } else {
948                    TaskNextAction::Stop
949                }
950            },
951            next_service_request = self.request_receiver.recv() => {
952                match next_service_request {
953                    Some(TaskRequest::BroadcastTransaction(transaction)) => {
954                        let tx_id = transaction.id(&self.chain_id);
955                        let broadcast = GossipsubBroadcastRequest::NewTx(transaction);
956                        let result = self.p2p_service.publish_message(broadcast);
957                        if let Err(e) = result {
958                            tracing::error!("Got an error during transaction {} broadcasting {}", tx_id, e);
959                        }
960                    }
961                    Some(TaskRequest::BroadcastPreConfirmations(pre_confirmation_message)) => {
962                        let broadcast = GossipsubBroadcastRequest::TxPreConfirmations(pre_confirmation_message);
963                        let result = self.p2p_service.publish_message(broadcast.clone());
964                        if let Err(e) = result {
965                            tracing::error!("Got an error during pre-confirmation message broadcasting {:?}: {}", broadcast, e);
966                        }
967                    }
968                    Some(TaskRequest::GetSealedHeaders { block_height_range, channel}) => {
969                        // Note: this range has already been checked for
970                        // validity in `SharedState::get_sealed_block_headers`.
971                        let height = BlockHeight::from(block_height_range.end.saturating_sub(1));
972                        let Some(peer) = self.p2p_service.get_peer_id_with_height(&height) else {
973                            let _ = channel.send(Err(TaskError::NoPeerFound));
974                            return TaskNextAction::Continue
975                        };
976                        let channel = ResponseSender::SealedHeaders(channel);
977                        let request_msg = RequestMessage::SealedHeaders(block_height_range.clone());
978                        self.p2p_service.send_request_msg(Some(peer), request_msg, channel).expect("We always have a peer here, so send has a target");
979                    }
980                    Some(TaskRequest::GetTransactions {block_height_range, channel }) => {
981                        let height = BlockHeight::from(block_height_range.end.saturating_sub(1));
982                        let Some(peer) = self.p2p_service.get_peer_id_with_height(&height) else {
983                            let _ = channel.send(Err(TaskError::NoPeerFound));
984                            return TaskNextAction::Continue
985                        };
986                        let channel = ResponseSender::Transactions(channel);
987                        let request_msg = RequestMessage::Transactions(block_height_range.clone());
988                        self.p2p_service.send_request_msg(Some(peer), request_msg, channel).expect("We always have a peer here, so send has a target");
989                    }
990                    Some(TaskRequest::GetTransactionsFromPeer { block_height_range, from_peer, channel }) => {
991                        let channel = ResponseSender::TransactionsFromPeer(channel);
992                        let request_msg = RequestMessage::Transactions(block_height_range);
993                        self.p2p_service.send_request_msg(Some(from_peer), request_msg, channel).expect("We always a peer here, so send has a target");
994                    }
995                    Some(TaskRequest::TxPoolGetAllTxIds { from_peer, channel }) => {
996                        let channel = ResponseSender::TxPoolAllTransactionsIds(channel);
997                        let request_msg = RequestMessage::TxPoolAllTransactionsIds;
998                        self.p2p_service.send_request_msg(Some(from_peer), request_msg, channel).expect("We always have a peer here, so send has a target");
999                    }
1000                    Some(TaskRequest::TxPoolGetFullTransactions { tx_ids, from_peer, channel }) => {
1001                        let channel = ResponseSender::TxPoolFullTransactions(channel);
1002                        let request_msg = RequestMessage::TxPoolFullTransactions(tx_ids);
1003                        self.p2p_service.send_request_msg(Some(from_peer), request_msg, channel).expect("We always have a peer here, so send has a target");
1004                    }
1005                    Some(TaskRequest::RespondWithGossipsubMessageReport((message, acceptance))) => {
1006                        let res = self.p2p_service.report_message(message, acceptance);
1007                        if let Err(err) = res {
1008                            return TaskNextAction::ErrorContinue(err)
1009                        }
1010                    }
1011                    Some(TaskRequest::RespondWithPeerReport { peer_id, score, reporting_service }) => {
1012                        let _ = self.p2p_service.report_peer(peer_id, score, reporting_service);
1013                    }
1014                    Some(TaskRequest::GetAllPeerInfo { channel }) => {
1015                        let peers = self.p2p_service.get_all_peer_info()
1016                            .into_iter()
1017                            .map(|(id, info)| (*id, info.clone()))
1018                            .collect::<Vec<_>>();
1019                        let _ = channel.send(peers);
1020                    }
1021                    Some(TaskRequest::DatabaseTransactionsLookUp { response, request_id }) => {
1022                        let _ = self.p2p_service.send_response_msg(request_id, V2ResponseMessage::Transactions(response));
1023                    }
1024                    Some(TaskRequest::DatabaseHeaderLookUp { response, request_id }) => {
1025                        let _ = self.p2p_service.send_response_msg(request_id, V2ResponseMessage::SealedHeaders(response));
1026                    }
1027                    Some(TaskRequest::TxPoolAllTransactionsIds { response, request_id }) => {
1028                        let _ = self.p2p_service.send_response_msg(request_id, V2ResponseMessage::TxPoolAllTransactionsIds(response));
1029                    }
1030                    Some(TaskRequest::TxPoolFullTransactions { response, request_id }) => {
1031                        let _ = self.p2p_service.send_response_msg(request_id, V2ResponseMessage::TxPoolFullTransactions(response));
1032                    }
1033                    None => {
1034                        tracing::error!("The P2P `Task` should be holder of the `Sender`");
1035                        return TaskNextAction::Stop
1036                    }
1037                }
1038                    TaskNextAction::Continue
1039            }
1040            p2p_event = self.p2p_service.next_event() => {
1041                match p2p_event {
1042                    Some(FuelP2PEvent::PeerInfoUpdated { peer_id, block_height }) => {
1043                        let peer_id: Vec<u8> = peer_id.into();
1044                        let block_height_data = BlockHeightHeartbeatData {
1045                            peer_id: peer_id.into(),
1046                            block_height,
1047                        };
1048
1049                        let _ = self.broadcast.block_height_broadcast(block_height_data);
1050                    }
1051                    Some(FuelP2PEvent::GossipsubMessage { message, message_id, peer_id,.. }) => {
1052                        tracing::info!("Received gossip message from peer {:?}", peer_id);
1053                        self.broadcast_gossip_message(message, message_id, peer_id);
1054                    },
1055                    Some(FuelP2PEvent::InboundRequestMessage { request_message, request_id }) => {
1056                        let res = self.process_request(request_message, request_id);
1057                        if let Err(err) = res {
1058                            return TaskNextAction::ErrorContinue(err)
1059                        }
1060                    },
1061                    Some(FuelP2PEvent::NewSubscription { peer_id, tag }) => {
1062                        if tag == GossipTopicTag::NewTx {
1063                            let _ = self.broadcast.new_tx_subscription_broadcast(FuelPeerId::from(peer_id.to_bytes()));
1064                        }
1065                    },
1066                    _ => (),
1067                }
1068                TaskNextAction::Continue
1069            },
1070            _  = tokio::time::sleep_until(self.next_check_time) => {
1071                let res = self.peer_heartbeat_reputation_checks();
1072                match res {
1073                    Ok(_) => tracing::debug!("Peer heartbeat reputation checks completed"),
1074                    Err(e) => {
1075                        tracing::error!("Failed to perform peer heartbeat reputation checks: {:?}", e);
1076                    }
1077                }
1078
1079                if let Some(next_check_time) = self.next_check_time.checked_add(self.heartbeat_check_interval) {
1080                    self.next_check_time = next_check_time;
1081                    TaskNextAction::Continue
1082                } else {
1083                    tracing::error!("Next check time overflowed");
1084                    TaskNextAction::Stop
1085                }
1086            }
1087        }
1088    }
1089
1090    async fn shutdown(self) -> anyhow::Result<()> {
1091        // Nothing to shut down because we don't have any temporary state that should be dumped,
1092        // and we don't spawn any sub-tasks that we need to finish or await.
1093
1094        // `FuelP2PService` doesn't support graceful shutdown(with informing of connected peers).
1095        // https://github.com/libp2p/specs/blob/master/ROADMAP.md#%EF%B8%8F-polite-peering
1096        // Dropping of the `FuelP2PService` will close all connections.
1097
1098        Ok(())
1099    }
1100}
1101
1102#[derive(Clone)]
1103pub struct SharedState {
1104    /// Sender of p2p with peer gossip subscription (`Vec<u8>` represent the peer_id)
1105    new_tx_subscription_broadcast: broadcast::Sender<FuelPeerId>,
1106    /// Sender of p2p transaction used for subscribing.
1107    tx_broadcast: broadcast::Sender<TransactionGossipData>,
1108    /// Sender of p2p transaction preconfirmations used for subscribing.
1109    pre_confirmations_broadcast: broadcast::Sender<P2PPreConfirmationGossipData>,
1110    /// Sender of reserved peers connection updates.
1111    reserved_peers_broadcast: broadcast::Sender<usize>,
1112    /// Used for communicating with the `Task`.
1113    request_sender: mpsc::Sender<TaskRequest>,
1114    /// Sender of p2p block height data
1115    block_height_broadcast: broadcast::Sender<BlockHeightHeartbeatData>,
1116    /// Max txs per request
1117    max_txs_per_request: usize,
1118}
1119
1120impl SharedState {
1121    pub fn notify_gossip_transaction_validity(
1122        &self,
1123        message_info: GossipsubMessageInfo,
1124        acceptance: GossipsubMessageAcceptance,
1125    ) -> anyhow::Result<()> {
1126        self.request_sender
1127            .try_send(TaskRequest::RespondWithGossipsubMessageReport((
1128                message_info,
1129                acceptance,
1130            )))?;
1131        Ok(())
1132    }
1133
1134    pub async fn get_sealed_block_headers(
1135        &self,
1136        block_height_range: Range<u32>,
1137    ) -> anyhow::Result<(Vec<u8>, Option<Vec<SealedBlockHeader>>)> {
1138        let (sender, receiver) = oneshot::channel();
1139
1140        if block_height_range.is_empty() {
1141            return Err(anyhow!(
1142                "Cannot retrieve headers for an empty range of block heights"
1143            ));
1144        }
1145
1146        self.request_sender
1147            .send(TaskRequest::GetSealedHeaders {
1148                block_height_range,
1149                channel: sender,
1150            })
1151            .await?;
1152
1153        let (peer_id, response) = receiver
1154            .await
1155            .map_err(|e| anyhow!("{e}"))?
1156            .map_err(|e| anyhow!("{e}"))?;
1157
1158        let data = response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?;
1159        if let Err(ref response_error_code) = data {
1160            warn!(
1161                "Peer {peer_id:?} failed to respond with sealed headers: {response_error_code:?}"
1162            );
1163        };
1164
1165        Ok((peer_id.to_bytes(), data.ok()))
1166    }
1167
1168    pub async fn get_transactions(
1169        &self,
1170        range: Range<u32>,
1171    ) -> anyhow::Result<(Vec<u8>, Option<Vec<Transactions>>)> {
1172        let (sender, receiver) = oneshot::channel();
1173
1174        if range.is_empty() {
1175            return Err(anyhow!(
1176                "Cannot retrieve transactions for an empty range of block heights"
1177            ));
1178        }
1179
1180        self.request_sender
1181            .send(TaskRequest::GetTransactions {
1182                block_height_range: range,
1183                channel: sender,
1184            })
1185            .await?;
1186
1187        let (peer_id, response) = receiver
1188            .await
1189            .map_err(|e| anyhow!("{e}"))?
1190            .map_err(|e| anyhow!("{e}"))?;
1191
1192        let data = match response {
1193            Err(request_response_protocol_error) => Err(anyhow!(
1194                "Invalid response from peer {request_response_protocol_error:?}"
1195            )),
1196            Ok(Err(response_error_code)) => {
1197                warn!(
1198                    "Peer {peer_id:?} failed to respond with sealed headers: {response_error_code:?}"
1199                );
1200                Ok(None)
1201            }
1202            Ok(Ok(headers)) => Ok(Some(headers)),
1203        };
1204        data.map(|data| (peer_id.to_bytes(), data))
1205    }
1206
1207    pub async fn get_transactions_from_peer(
1208        &self,
1209        peer_id: FuelPeerId,
1210        range: Range<u32>,
1211    ) -> anyhow::Result<Option<Vec<Transactions>>> {
1212        let (sender, receiver) = oneshot::channel();
1213        let from_peer = PeerId::from_bytes(peer_id.as_ref()).expect("Valid PeerId");
1214
1215        let request = TaskRequest::GetTransactionsFromPeer {
1216            block_height_range: range,
1217            from_peer,
1218            channel: sender,
1219        };
1220        self.request_sender.send(request).await?;
1221
1222        let (response_from_peer, response) =
1223            receiver.await.map_err(|e| anyhow!("{e}"))?;
1224        assert_eq!(
1225            peer_id.as_ref(),
1226            response_from_peer.to_bytes(),
1227            "Bug: response from non-requested peer"
1228        );
1229
1230        match response {
1231            Err(request_response_protocol_error) => Err(anyhow!(
1232                "Invalid response from peer {request_response_protocol_error:?}"
1233            )),
1234            Ok(Err(response_error_code)) => {
1235                warn!(
1236                    "Peer {peer_id:?} failed to respond with transactions: {response_error_code:?}"
1237                );
1238                Ok(None)
1239            }
1240            Ok(Ok(txs)) => Ok(Some(txs)),
1241        }
1242    }
1243
1244    pub async fn get_all_transactions_ids_from_peer(
1245        &self,
1246        peer_id: FuelPeerId,
1247    ) -> anyhow::Result<Vec<TxId>> {
1248        let (sender, receiver) = oneshot::channel();
1249        let from_peer = PeerId::from_bytes(peer_id.as_ref()).expect("Valid PeerId");
1250        let request = TaskRequest::TxPoolGetAllTxIds {
1251            from_peer,
1252            channel: sender,
1253        };
1254        self.request_sender.try_send(request)?;
1255
1256        let (response_from_peer, response) =
1257            receiver.await.map_err(|e| anyhow!("{e}"))?;
1258
1259        debug_assert_eq!(
1260            peer_id.as_ref(),
1261            response_from_peer.to_bytes(),
1262            "Bug: response from non-requested peer"
1263        );
1264
1265        let response =
1266            response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?;
1267
1268        let txs = response.inspect_err(|e| { warn!("Peer {peer_id:?} could not response to request to get all transactions ids: {e:?}"); } ).unwrap_or_default();
1269
1270        if txs.len() > self.max_txs_per_request {
1271            return Err(anyhow!("Too many transactions requested: {}", txs.len()));
1272        }
1273        Ok(txs)
1274    }
1275
1276    pub async fn get_full_transactions_from_peer(
1277        &self,
1278        peer_id: FuelPeerId,
1279        tx_ids: Vec<TxId>,
1280    ) -> anyhow::Result<Vec<Option<Transaction>>> {
1281        let (sender, receiver) = oneshot::channel();
1282        let from_peer = PeerId::from_bytes(peer_id.as_ref()).expect("Valid PeerId");
1283        let request = TaskRequest::TxPoolGetFullTransactions {
1284            tx_ids,
1285            from_peer,
1286            channel: sender,
1287        };
1288        self.request_sender.try_send(request)?;
1289
1290        let (response_from_peer, response) =
1291            receiver.await.map_err(|e| anyhow!("{e}"))?;
1292        debug_assert_eq!(
1293            peer_id.as_ref(),
1294            response_from_peer.to_bytes(),
1295            "Bug: response from non-requested peer"
1296        );
1297
1298        let response =
1299            response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?;
1300        let txs = response.inspect_err(|e| { warn!("Peer {peer_id:?} could not response to request to get full transactions: {e:?}"); } ).unwrap_or_default();
1301
1302        if txs.len() > self.max_txs_per_request {
1303            return Err(anyhow!("Too many transactions requested: {}", txs.len()));
1304        }
1305        txs.into_iter()
1306            .map(|tx| {
1307                tx.map(Transaction::try_from)
1308                    .transpose()
1309                    .map_err(|err| anyhow::anyhow!(err))
1310            })
1311            .collect()
1312    }
1313
1314    pub fn broadcast_transaction(
1315        &self,
1316        transaction: Arc<Transaction>,
1317    ) -> anyhow::Result<()> {
1318        self.request_sender
1319            .try_send(TaskRequest::BroadcastTransaction(transaction))?;
1320        Ok(())
1321    }
1322
1323    pub fn broadcast_preconfirmations(
1324        &self,
1325        preconfirmations: Arc<P2PPreConfirmationMessage>,
1326    ) -> anyhow::Result<()> {
1327        self.request_sender
1328            .try_send(TaskRequest::BroadcastPreConfirmations(preconfirmations))?;
1329        Ok(())
1330    }
1331
1332    pub async fn get_all_peers(&self) -> anyhow::Result<Vec<(PeerId, PeerInfo)>> {
1333        let (sender, receiver) = oneshot::channel();
1334
1335        self.request_sender
1336            .send(TaskRequest::GetAllPeerInfo { channel: sender })
1337            .await?;
1338
1339        receiver.await.map_err(|e| anyhow!("{}", e))
1340    }
1341
1342    pub fn subscribe_new_peers(&self) -> broadcast::Receiver<FuelPeerId> {
1343        self.new_tx_subscription_broadcast.subscribe()
1344    }
1345
1346    pub fn subscribe_tx(&self) -> broadcast::Receiver<TransactionGossipData> {
1347        self.tx_broadcast.subscribe()
1348    }
1349
1350    pub fn subscribe_preconfirmations(
1351        &self,
1352    ) -> broadcast::Receiver<P2PPreConfirmationGossipData> {
1353        self.pre_confirmations_broadcast.subscribe()
1354    }
1355
1356    pub fn subscribe_block_height(
1357        &self,
1358    ) -> broadcast::Receiver<BlockHeightHeartbeatData> {
1359        self.block_height_broadcast.subscribe()
1360    }
1361
1362    pub fn subscribe_reserved_peers_count(&self) -> broadcast::Receiver<usize> {
1363        self.reserved_peers_broadcast.subscribe()
1364    }
1365
1366    pub fn report_peer<T: PeerReport>(
1367        &self,
1368        peer_id: FuelPeerId,
1369        peer_report: T,
1370        reporting_service: &'static str,
1371    ) -> anyhow::Result<()> {
1372        match Vec::from(peer_id).try_into() {
1373            Ok(peer_id) => {
1374                let score = peer_report.get_score_from_report();
1375
1376                self.request_sender
1377                    .try_send(TaskRequest::RespondWithPeerReport {
1378                        peer_id,
1379                        score,
1380                        reporting_service,
1381                    })?;
1382
1383                Ok(())
1384            }
1385            Err(e) => {
1386                warn!(target: "fuel-p2p", "Failed to read PeerId from {e:?}");
1387                Err(anyhow::anyhow!("Failed to read PeerId from {e:?}"))
1388            }
1389        }
1390    }
1391}
1392
1393pub fn build_shared_state(
1394    config: Config<NotInitialized>,
1395) -> (SharedState, Receiver<TaskRequest>) {
1396    let (request_sender, request_receiver) = mpsc::channel(CHANNEL_SIZE);
1397    let (tx_broadcast, _) = broadcast::channel(CHANNEL_SIZE);
1398    let (preconfirmations_broadcast, _) = broadcast::channel(CHANNEL_SIZE);
1399    let (new_tx_subscription_broadcast, _) = broadcast::channel(CHANNEL_SIZE);
1400    let (block_height_broadcast, _) = broadcast::channel(CHANNEL_SIZE);
1401
1402    let (reserved_peers_broadcast, _) = broadcast::channel::<usize>(
1403        config
1404            .reserved_nodes
1405            .len()
1406            .saturating_mul(2)
1407            .saturating_add(1),
1408    );
1409
1410    (
1411        SharedState {
1412            request_sender,
1413            new_tx_subscription_broadcast,
1414            tx_broadcast,
1415            pre_confirmations_broadcast: preconfirmations_broadcast,
1416            reserved_peers_broadcast,
1417            block_height_broadcast,
1418            max_txs_per_request: config.max_txs_per_request,
1419        },
1420        request_receiver,
1421    )
1422}
1423
1424#[allow(clippy::too_many_arguments)]
1425pub fn new_service<V, B, T>(
1426    chain_id: ChainId,
1427    last_height: BlockHeight,
1428    p2p_config: Config<NotInitialized>,
1429    shared_state: SharedState,
1430    request_receiver: Receiver<TaskRequest>,
1431    view_provider: V,
1432    block_importer: B,
1433    tx_pool: T,
1434) -> Service<V, T>
1435where
1436    V: AtomicView + 'static,
1437    V::LatestView: P2pDb,
1438    B: BlockHeightImporter,
1439    T: TxPool,
1440{
1441    let task = UninitializedTask::new(
1442        chain_id,
1443        last_height,
1444        p2p_config,
1445        shared_state,
1446        request_receiver,
1447        view_provider,
1448        block_importer,
1449        tx_pool,
1450    );
1451    Service::new(task)
1452}
1453
1454pub fn to_message_acceptance(
1455    acceptance: &GossipsubMessageAcceptance,
1456) -> MessageAcceptance {
1457    match acceptance {
1458        GossipsubMessageAcceptance::Accept => MessageAcceptance::Accept,
1459        GossipsubMessageAcceptance::Reject => MessageAcceptance::Reject,
1460        GossipsubMessageAcceptance::Ignore => MessageAcceptance::Ignore,
1461    }
1462}
1463
1464fn report_message(
1465    p2p_service: &mut FuelP2PService,
1466    message: GossipsubMessageInfo,
1467    acceptance: GossipsubMessageAcceptance,
1468) {
1469    let GossipsubMessageInfo {
1470        peer_id,
1471        message_id,
1472    } = message;
1473
1474    let msg_id = message_id.into();
1475    let peer_id: Vec<u8> = peer_id.into();
1476
1477    if let Ok(peer_id) = peer_id.try_into() {
1478        let acceptance = to_message_acceptance(&acceptance);
1479        p2p_service.report_message_validation_result(&msg_id, peer_id, acceptance);
1480    } else {
1481        warn!(target: "fuel-p2p", "Failed to read PeerId from received GossipsubMessageId: {}", msg_id);
1482    }
1483}