Skip to main content

fuel_core_p2p/
service.rs

1use crate::{
2    cached_view::CachedView,
3    codecs::{
4        gossipsub::GossipsubMessageHandler,
5        request_response::RequestResponseMessageHandler,
6    },
7    config::{
8        Config,
9        NotInitialized,
10    },
11    gossipsub::messages::{
12        GossipTopicTag,
13        GossipsubBroadcastRequest,
14        GossipsubMessage,
15    },
16    p2p_service::{
17        FuelP2PEvent,
18        FuelP2PService,
19    },
20    peer_manager::PeerInfo,
21    ports::{
22        BlockHeightImporter,
23        P2PPreConfirmationGossipData,
24        P2PPreConfirmationMessage,
25        P2pDb,
26        TxPool,
27    },
28    request_response::messages::{
29        OnResponse,
30        OnResponseWithPeerSelection,
31        RequestMessage,
32        ResponseMessageErrorCode,
33        ResponseSender,
34        V2ResponseMessage,
35    },
36};
37use anyhow::anyhow;
38use fuel_core_metrics::p2p_metrics::set_blocks_requested;
39use fuel_core_services::{
40    AsyncProcessor,
41    RunnableService,
42    RunnableTask,
43    ServiceRunner,
44    StateWatcher,
45    SyncProcessor,
46    TaskNextAction,
47    TraceErr,
48    stream::BoxStream,
49};
50use fuel_core_storage::transactional::AtomicView;
51use fuel_core_types::{
52    blockchain::SealedBlockHeader,
53    fuel_tx::{
54        Transaction,
55        TxId,
56        UniqueIdentifier,
57    },
58    fuel_types::{
59        BlockHeight,
60        ChainId,
61    },
62    services::p2p::{
63        BlockHeightHeartbeatData,
64        GossipData,
65        GossipsubMessageAcceptance,
66        GossipsubMessageInfo,
67        NetworkableTransactionPool,
68        PeerId as FuelPeerId,
69        TransactionGossipData,
70        Transactions,
71        peer_reputation::{
72            AppScore,
73            PeerReport,
74        },
75    },
76};
77use futures::{
78    StreamExt,
79    future::BoxFuture,
80};
81use libp2p::{
82    PeerId,
83    gossipsub::{
84        MessageAcceptance,
85        MessageId,
86        PublishError,
87    },
88    request_response::InboundRequestId,
89};
90use std::{
91    fmt::Debug,
92    future::Future,
93    ops::Range,
94    sync::Arc,
95};
96use thiserror::Error;
97use tokio::{
98    sync::{
99        broadcast,
100        mpsc::{
101            self,
102            Receiver,
103        },
104        oneshot,
105    },
106    time::{
107        Duration,
108        Instant,
109    },
110};
111use tracing::warn;
112
113#[cfg(test)]
114pub mod broadcast_tests;
115#[cfg(test)]
116pub mod task_tests;
117
118const CHANNEL_SIZE: usize = 1024 * 10;
119
120pub type Service<V, T> = ServiceRunner<UninitializedTask<V, SharedState, T>>;
121
122#[derive(Debug, Error)]
123pub enum TaskError {
124    #[error("No peer found to send request to")]
125    NoPeerFound,
126}
127
128pub enum TaskRequest {
129    // Broadcast requests to p2p network
130    BroadcastTransaction(Arc<Transaction>),
131    // Broadcast Preconfirmations to p2p network
132    BroadcastPreConfirmations(Arc<P2PPreConfirmationMessage>),
133    // Request to get information about all connected peers
134    GetAllPeerInfo {
135        channel: oneshot::Sender<Vec<(PeerId, PeerInfo)>>,
136    },
137    GetSealedHeaders {
138        block_height_range: Range<u32>,
139        channel: OnResponseWithPeerSelection<
140            Result<Vec<SealedBlockHeader>, ResponseMessageErrorCode>,
141        >,
142    },
143    GetTransactions {
144        block_height_range: Range<u32>,
145        channel: OnResponseWithPeerSelection<
146            Result<Vec<Transactions>, ResponseMessageErrorCode>,
147        >,
148    },
149    GetTransactionsFromPeer {
150        block_height_range: Range<u32>,
151        from_peer: PeerId,
152        channel: OnResponse<Result<Vec<Transactions>, ResponseMessageErrorCode>>,
153    },
154    TxPoolGetAllTxIds {
155        from_peer: PeerId,
156        channel: OnResponse<Result<Vec<TxId>, ResponseMessageErrorCode>>,
157    },
158    TxPoolGetFullTransactions {
159        tx_ids: Vec<TxId>,
160        from_peer: PeerId,
161        channel: OnResponse<
162            Result<Vec<Option<NetworkableTransactionPool>>, ResponseMessageErrorCode>,
163        >,
164    },
165    // Responds back to the p2p network
166    RespondWithGossipsubMessageReport((GossipsubMessageInfo, GossipsubMessageAcceptance)),
167    RespondWithPeerReport {
168        peer_id: PeerId,
169        score: AppScore,
170        reporting_service: &'static str,
171    },
172    DatabaseTransactionsLookUp {
173        response: Result<Vec<Transactions>, ResponseMessageErrorCode>,
174        request_id: InboundRequestId,
175    },
176    DatabaseHeaderLookUp {
177        response: Result<Vec<SealedBlockHeader>, ResponseMessageErrorCode>,
178        request_id: InboundRequestId,
179    },
180    TxPoolAllTransactionsIds {
181        response: Result<Vec<TxId>, ResponseMessageErrorCode>,
182        request_id: InboundRequestId,
183    },
184    TxPoolFullTransactions {
185        response:
186            Result<Vec<Option<NetworkableTransactionPool>>, ResponseMessageErrorCode>,
187        request_id: InboundRequestId,
188    },
189}
190
191impl Debug for TaskRequest {
192    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193        match self {
194            TaskRequest::BroadcastTransaction(_) => {
195                write!(f, "TaskRequest::BroadcastTransaction")
196            }
197            TaskRequest::BroadcastPreConfirmations(_) => {
198                write!(f, "TaskRequest::BroadcastPreConfirmations")
199            }
200            TaskRequest::GetSealedHeaders { .. } => {
201                write!(f, "TaskRequest::GetSealedHeaders")
202            }
203            TaskRequest::GetTransactions { .. } => {
204                write!(f, "TaskRequest::GetTransactions")
205            }
206            TaskRequest::GetTransactionsFromPeer { .. } => {
207                write!(f, "TaskRequest::GetTransactionsFromPeer")
208            }
209            TaskRequest::TxPoolGetAllTxIds { .. } => {
210                write!(f, "TaskRequest::TxPoolGetAllTxIds")
211            }
212            TaskRequest::TxPoolGetFullTransactions { .. } => {
213                write!(f, "TaskRequest::TxPoolGetFullTransactions")
214            }
215            TaskRequest::RespondWithGossipsubMessageReport(_) => {
216                write!(f, "TaskRequest::RespondWithGossipsubMessageReport")
217            }
218            TaskRequest::RespondWithPeerReport { .. } => {
219                write!(f, "TaskRequest::RespondWithPeerReport")
220            }
221            TaskRequest::GetAllPeerInfo { .. } => {
222                write!(f, "TaskRequest::GetPeerInfo")
223            }
224            TaskRequest::DatabaseTransactionsLookUp { .. } => {
225                write!(f, "TaskRequest::DatabaseTransactionsLookUp")
226            }
227            TaskRequest::DatabaseHeaderLookUp { .. } => {
228                write!(f, "TaskRequest::DatabaseHeaderLookUp")
229            }
230            TaskRequest::TxPoolAllTransactionsIds { .. } => {
231                write!(f, "TaskRequest::TxPoolAllTransactionsIds")
232            }
233            TaskRequest::TxPoolFullTransactions { .. } => {
234                write!(f, "TaskRequest::TxPoolFullTransactions")
235            }
236        }
237    }
238}
239
240#[derive(Debug, Clone, Copy, PartialEq, Eq)]
241pub enum HeartBeatPeerReportReason {
242    OldHeartBeat,
243    LowHeartBeatFrequency,
244}
245
246pub trait TaskP2PService: Send {
247    fn get_all_peer_info(&self) -> Vec<(&PeerId, &PeerInfo)>;
248    fn get_peer_id_with_height(&self, height: &BlockHeight) -> Option<PeerId>;
249
250    fn next_event(&mut self) -> BoxFuture<'_, Option<FuelP2PEvent>>;
251
252    fn publish_message(
253        &mut self,
254        message: GossipsubBroadcastRequest,
255    ) -> anyhow::Result<()>;
256
257    fn send_request_msg(
258        &mut self,
259        peer_id: Option<PeerId>,
260        request_msg: RequestMessage,
261        on_response: ResponseSender,
262    ) -> anyhow::Result<()>;
263
264    fn send_response_msg(
265        &mut self,
266        request_id: InboundRequestId,
267        message: V2ResponseMessage,
268    ) -> anyhow::Result<()>;
269
270    fn report_message(
271        &mut self,
272        message: GossipsubMessageInfo,
273        acceptance: GossipsubMessageAcceptance,
274    ) -> anyhow::Result<()>;
275
276    fn report_peer(
277        &mut self,
278        peer_id: PeerId,
279        score: AppScore,
280        reporting_service: &str,
281    ) -> anyhow::Result<()>;
282
283    fn update_block_height(&mut self, height: BlockHeight) -> anyhow::Result<()>;
284
285    fn update_metrics<T>(&self, update_fn: T)
286    where
287        T: FnOnce();
288}
289
290impl TaskP2PService for FuelP2PService {
291    fn update_metrics<T>(&self, update_fn: T)
292    where
293        T: FnOnce(),
294    {
295        FuelP2PService::update_metrics(self, update_fn)
296    }
297
298    fn get_all_peer_info(&self) -> Vec<(&PeerId, &PeerInfo)> {
299        self.peer_manager().get_all_peers().collect()
300    }
301
302    fn get_peer_id_with_height(&self, height: &BlockHeight) -> Option<PeerId> {
303        self.peer_manager().get_peer_id_with_height(height)
304    }
305
306    fn next_event(&mut self) -> BoxFuture<'_, Option<FuelP2PEvent>> {
307        Box::pin(self.next_event())
308    }
309
310    fn publish_message(
311        &mut self,
312        message: GossipsubBroadcastRequest,
313    ) -> anyhow::Result<()> {
314        let result = self.publish_message(message);
315
316        match result {
317            Ok(_) => Ok(()),
318            Err(e) => {
319                if matches!(&e, PublishError::InsufficientPeers) {
320                    Ok(())
321                } else {
322                    Err(anyhow!(e))
323                }
324            }
325        }
326    }
327
328    fn send_request_msg(
329        &mut self,
330        peer_id: Option<PeerId>,
331        request_msg: RequestMessage,
332        on_response: ResponseSender,
333    ) -> anyhow::Result<()> {
334        self.send_request_msg(peer_id, request_msg, on_response)?;
335        Ok(())
336    }
337
338    fn send_response_msg(
339        &mut self,
340        request_id: InboundRequestId,
341        message: V2ResponseMessage,
342    ) -> anyhow::Result<()> {
343        self.send_response_msg(request_id, message)?;
344        Ok(())
345    }
346
347    fn report_message(
348        &mut self,
349        message: GossipsubMessageInfo,
350        acceptance: GossipsubMessageAcceptance,
351    ) -> anyhow::Result<()> {
352        report_message(self, message, acceptance);
353        Ok(())
354    }
355
356    fn report_peer(
357        &mut self,
358        peer_id: PeerId,
359        score: AppScore,
360        reporting_service: &str,
361    ) -> anyhow::Result<()> {
362        self.report_peer(peer_id, score, reporting_service);
363        Ok(())
364    }
365
366    fn update_block_height(&mut self, height: BlockHeight) -> anyhow::Result<()> {
367        self.update_block_height(height);
368        Ok(())
369    }
370}
371
372pub trait Broadcast: Send {
373    fn report_peer(
374        &self,
375        peer_id: FuelPeerId,
376        report: AppScore,
377        reporting_service: &'static str,
378    ) -> anyhow::Result<()>;
379
380    fn block_height_broadcast(
381        &self,
382        block_height_data: BlockHeightHeartbeatData,
383    ) -> anyhow::Result<()>;
384
385    fn tx_broadcast(&self, transaction: TransactionGossipData) -> anyhow::Result<()>;
386
387    fn pre_confirmation_broadcast(
388        &self,
389        confirmations: P2PPreConfirmationGossipData,
390    ) -> anyhow::Result<()>;
391
392    fn new_tx_subscription_broadcast(&self, peer_id: FuelPeerId) -> anyhow::Result<()>;
393}
394
395impl Broadcast for SharedState {
396    fn report_peer(
397        &self,
398        peer_id: FuelPeerId,
399        report: AppScore,
400        reporting_service: &'static str,
401    ) -> anyhow::Result<()> {
402        self.report_peer(peer_id, report, reporting_service)
403    }
404
405    fn block_height_broadcast(
406        &self,
407        block_height_data: BlockHeightHeartbeatData,
408    ) -> anyhow::Result<()> {
409        self.block_height_broadcast.send(block_height_data)?;
410        Ok(())
411    }
412
413    fn tx_broadcast(&self, transaction: TransactionGossipData) -> anyhow::Result<()> {
414        self.tx_broadcast.send(transaction)?;
415        Ok(())
416    }
417
418    fn pre_confirmation_broadcast(
419        &self,
420        confirmations: P2PPreConfirmationGossipData,
421    ) -> anyhow::Result<()> {
422        self.pre_confirmations_broadcast.send(confirmations)?;
423        Ok(())
424    }
425
426    fn new_tx_subscription_broadcast(&self, peer_id: FuelPeerId) -> anyhow::Result<()> {
427        self.new_tx_subscription_broadcast.send(peer_id)?;
428        Ok(())
429    }
430}
431
432/// Uninitialized task for the p2p that can be upgraded later into [`Task`].
433pub struct UninitializedTask<V, B, T> {
434    chain_id: ChainId,
435    last_height: BlockHeight,
436    view_provider: V,
437    next_block_height: BoxStream<BlockHeight>,
438    /// Receive internal Task Requests
439    request_receiver: mpsc::Receiver<TaskRequest>,
440    broadcast: B,
441    tx_pool: T,
442    config: Config<NotInitialized>,
443}
444
445/// Orchestrates various p2p-related events between the inner `P2pService`
446/// and the top level `NetworkService`.
447pub struct Task<P, V, B, T> {
448    chain_id: ChainId,
449    response_timeout: Duration,
450    p2p_service: P,
451    view_provider: V,
452    next_block_height: BoxStream<BlockHeight>,
453    /// Receive internal Task Requests
454    request_receiver: mpsc::Receiver<TaskRequest>,
455    request_sender: mpsc::Sender<TaskRequest>,
456    db_heavy_task_processor: SyncProcessor,
457    tx_pool_heavy_task_processor: AsyncProcessor,
458    broadcast: B,
459    tx_pool: T,
460    max_headers_per_request: usize,
461    max_txs_per_request: usize,
462    // milliseconds wait time between peer heartbeat reputation checks
463    heartbeat_check_interval: Duration,
464    heartbeat_max_avg_interval: Duration,
465    heartbeat_max_time_since_last: Duration,
466    next_check_time: Instant,
467    heartbeat_peer_reputation_config: HeartbeatPeerReputationConfig,
468    // cached view
469    cached_view: Arc<CachedView>,
470}
471
472impl<P, V, B, T> Task<P, V, B, T>
473where
474    P: TaskP2PService,
475    B: Broadcast,
476{
477    pub(crate) fn broadcast_gossip_message(
478        &mut self,
479        message: GossipsubMessage,
480        message_id: MessageId,
481        peer_id: PeerId,
482    ) {
483        let message_id = message_id.0;
484
485        match message {
486            GossipsubMessage::NewTx(transaction) => {
487                let next_transaction = GossipData::new(transaction, peer_id, message_id);
488                let _ = self.broadcast.tx_broadcast(next_transaction);
489            }
490            GossipsubMessage::TxPreConfirmations(confirmations) => {
491                let data = GossipData::new(confirmations, peer_id, message_id);
492                let _ = self.broadcast.pre_confirmation_broadcast(data);
493            }
494        }
495    }
496}
497
498#[derive(Default, Clone)]
499pub struct HeartbeatPeerReputationConfig {
500    old_heartbeat_penalty: AppScore,
501    low_heartbeat_frequency_penalty: AppScore,
502}
503
504impl<V, T> UninitializedTask<V, SharedState, T> {
505    #[allow(clippy::too_many_arguments)]
506    pub fn new<B: BlockHeightImporter>(
507        chain_id: ChainId,
508        last_height: BlockHeight,
509        config: Config<NotInitialized>,
510        shared_state: SharedState,
511        request_receiver: Receiver<TaskRequest>,
512        view_provider: V,
513        block_importer: B,
514        tx_pool: T,
515    ) -> Self {
516        let next_block_height = block_importer.next_block_height();
517
518        Self {
519            chain_id,
520            last_height,
521            view_provider,
522            tx_pool,
523            next_block_height,
524            request_receiver,
525            broadcast: shared_state,
526            config,
527        }
528    }
529}
530
531impl<P, V, B, T> Task<P, V, B, T>
532where
533    P: TaskP2PService,
534    V: AtomicView,
535    B: Broadcast,
536{
537    fn peer_heartbeat_reputation_checks(&self) -> anyhow::Result<()> {
538        for (peer_id, peer_info) in self.p2p_service.get_all_peer_info() {
539            if peer_info.heartbeat_data.duration_since_last_heartbeat()
540                > self.heartbeat_max_time_since_last
541            {
542                tracing::debug!("Peer {:?} has old heartbeat", peer_id);
543                let report = HeartBeatPeerReportReason::OldHeartBeat;
544                let peer_id = convert_peer_id(peer_id)?;
545                self.report_peer(peer_id, report)?;
546            } else if peer_info.heartbeat_data.average_time_between_heartbeats()
547                > self.heartbeat_max_avg_interval
548            {
549                tracing::debug!("Peer {:?} has low heartbeat frequency", peer_id);
550                let report = HeartBeatPeerReportReason::LowHeartBeatFrequency;
551                let peer_id = convert_peer_id(peer_id)?;
552                self.report_peer(peer_id, report)?;
553            }
554        }
555        Ok(())
556    }
557
558    fn report_peer(
559        &self,
560        peer_id: FuelPeerId,
561        report: HeartBeatPeerReportReason,
562    ) -> anyhow::Result<()> {
563        let app_score = match report {
564            HeartBeatPeerReportReason::OldHeartBeat => {
565                self.heartbeat_peer_reputation_config.old_heartbeat_penalty
566            }
567            HeartBeatPeerReportReason::LowHeartBeatFrequency => {
568                self.heartbeat_peer_reputation_config
569                    .low_heartbeat_frequency_penalty
570            }
571        };
572        let reporting_service = "p2p";
573        self.broadcast
574            .report_peer(peer_id, app_score, reporting_service)?;
575        Ok(())
576    }
577}
578
579impl<P, V, B, T> Task<P, V, B, T>
580where
581    P: TaskP2PService + 'static,
582    V: AtomicView + 'static,
583    V::LatestView: P2pDb,
584    T: TxPool + 'static,
585    B: Send,
586{
587    fn update_metrics<U>(&self, update_fn: U)
588    where
589        U: FnOnce(),
590    {
591        self.p2p_service.update_metrics(update_fn)
592    }
593
594    fn process_request(
595        &mut self,
596        request_message: RequestMessage,
597        request_id: InboundRequestId,
598    ) -> anyhow::Result<()> {
599        match request_message {
600            RequestMessage::Transactions(range) => {
601                self.handle_transactions_request(range, request_id)
602            }
603            RequestMessage::SealedHeaders(range) => {
604                self.handle_sealed_headers_request(range, request_id)
605            }
606            RequestMessage::TxPoolAllTransactionsIds => {
607                self.handle_all_transactions_ids_request(request_id)
608            }
609            RequestMessage::TxPoolFullTransactions(tx_ids) => {
610                self.handle_full_transactions_request(tx_ids, request_id)
611            }
612        }
613    }
614
615    fn handle_db_request<DbLookUpFn, ResponseSenderFn, TaskRequestFn, R>(
616        &mut self,
617        range: Range<u32>,
618        request_id: InboundRequestId,
619        response_sender: ResponseSenderFn,
620        db_lookup: DbLookUpFn,
621        task_request: TaskRequestFn,
622        max_len: usize,
623    ) -> anyhow::Result<()>
624    where
625        DbLookUpFn: Fn(&V::LatestView, &Arc<CachedView>, Range<u32>) -> anyhow::Result<Option<R>>
626            + Send
627            + 'static,
628        ResponseSenderFn:
629            Fn(Result<R, ResponseMessageErrorCode>) -> V2ResponseMessage + Send + 'static,
630        TaskRequestFn: Fn(Result<R, ResponseMessageErrorCode>, InboundRequestId) -> TaskRequest
631            + Send
632            + 'static,
633        R: Send + 'static,
634    {
635        let instant = Instant::now();
636        let timeout = self.response_timeout;
637        let response_channel = self.request_sender.clone();
638        let range_len = range.len();
639
640        self.update_metrics(|| set_blocks_requested(range_len));
641
642        if range_len > max_len {
643            tracing::error!(
644                requested_length = range.len(),
645                max_len,
646                "Requested range is too large"
647            );
648            let response = Err(ResponseMessageErrorCode::RequestedRangeTooLarge);
649            let _ = self
650                .p2p_service
651                .send_response_msg(request_id, response_sender(response));
652            return Ok(());
653        }
654
655        let view = self.view_provider.latest_view()?;
656        let result = self.db_heavy_task_processor.try_spawn({
657            let cached_view = self.cached_view.clone();
658            move || {
659                if instant.elapsed() > timeout {
660                    tracing::warn!("Request timed out");
661                    return;
662                }
663
664                let response = db_lookup(&view, &cached_view, range.clone())
665                    .ok()
666                    .flatten()
667                    .ok_or(ResponseMessageErrorCode::Timeout);
668
669                let _ = response_channel
670                    .try_send(task_request(response, request_id))
671                    .trace_err("Failed to send response to the request channel");
672            }
673        });
674
675        if result.is_err() {
676            let err = Err(ResponseMessageErrorCode::SyncProcessorOutOfCapacity);
677            let _ = self
678                .p2p_service
679                .send_response_msg(request_id, response_sender(err));
680        }
681
682        Ok(())
683    }
684
685    fn handle_transactions_request(
686        &mut self,
687        range: Range<u32>,
688        request_id: InboundRequestId,
689    ) -> anyhow::Result<()> {
690        self.handle_db_request(
691            range,
692            request_id,
693            V2ResponseMessage::Transactions,
694            |view, cached_view, range| {
695                cached_view
696                    .get_transactions(view, range)
697                    .map_err(anyhow::Error::from)
698            },
699            |response, request_id| TaskRequest::DatabaseTransactionsLookUp {
700                response,
701                request_id,
702            },
703            self.max_headers_per_request,
704        )
705    }
706
707    fn handle_sealed_headers_request(
708        &mut self,
709        range: Range<u32>,
710        request_id: InboundRequestId,
711    ) -> anyhow::Result<()> {
712        self.handle_db_request(
713            range,
714            request_id,
715            V2ResponseMessage::SealedHeaders,
716            |view, cached_view, range| {
717                cached_view
718                    .get_sealed_headers(view, range)
719                    .map_err(anyhow::Error::from)
720            },
721            |response, request_id| TaskRequest::DatabaseHeaderLookUp {
722                response,
723                request_id,
724            },
725            self.max_headers_per_request,
726        )
727    }
728
729    fn handle_txpool_request<F, ResponseSenderFn, TaskRequestFn, R>(
730        &mut self,
731        request_id: InboundRequestId,
732        txpool_function: F,
733        response_sender: ResponseSenderFn,
734        task_request: TaskRequestFn,
735    ) -> anyhow::Result<()>
736    where
737        ResponseSenderFn:
738            Fn(Result<R, ResponseMessageErrorCode>) -> V2ResponseMessage + Send + 'static,
739        TaskRequestFn: Fn(Result<R, ResponseMessageErrorCode>, InboundRequestId) -> TaskRequest
740            + Send
741            + 'static,
742        F: Future<Output = anyhow::Result<R>> + Send + 'static,
743    {
744        let instant = Instant::now();
745        let timeout = self.response_timeout;
746        let response_channel = self.request_sender.clone();
747        let result = self.tx_pool_heavy_task_processor.try_spawn(async move {
748            if instant.elapsed() > timeout {
749                tracing::warn!("Request timed out");
750                return;
751            }
752
753            let Ok(response) = txpool_function.await else {
754                warn!("Failed to get txpool data");
755                return;
756            };
757
758            let _ = response_channel
759                .try_send(task_request(Ok(response), request_id))
760                .trace_err("Failed to send response to the request channel");
761        });
762
763        if result.is_err() {
764            let res = Err(ResponseMessageErrorCode::SyncProcessorOutOfCapacity);
765            let _ = self
766                .p2p_service
767                .send_response_msg(request_id, response_sender(res));
768        }
769
770        Ok(())
771    }
772
773    fn handle_all_transactions_ids_request(
774        &mut self,
775        request_id: InboundRequestId,
776    ) -> anyhow::Result<()> {
777        let max_txs = self.max_txs_per_request;
778        let tx_pool = self.tx_pool.clone();
779        self.handle_txpool_request(
780            request_id,
781            async move { tx_pool.get_tx_ids(max_txs).await },
782            V2ResponseMessage::TxPoolAllTransactionsIds,
783            |response, request_id| TaskRequest::TxPoolAllTransactionsIds {
784                response,
785                request_id,
786            },
787        )
788    }
789
790    fn handle_full_transactions_request(
791        &mut self,
792        tx_ids: Vec<TxId>,
793        request_id: InboundRequestId,
794    ) -> anyhow::Result<()> {
795        if tx_ids.len() > self.max_txs_per_request {
796            self.p2p_service.send_response_msg(
797                request_id,
798                V2ResponseMessage::TxPoolFullTransactions(Err(
799                    ResponseMessageErrorCode::RequestedRangeTooLarge,
800                )),
801            )?;
802            return Ok(());
803        }
804        let tx_pool = self.tx_pool.clone();
805        self.handle_txpool_request(
806            request_id,
807            async move { tx_pool.get_full_txs(tx_ids).await },
808            V2ResponseMessage::TxPoolFullTransactions,
809            |response, request_id| TaskRequest::TxPoolFullTransactions {
810                response,
811                request_id,
812            },
813        )
814    }
815}
816
817fn convert_peer_id(peer_id: &PeerId) -> anyhow::Result<FuelPeerId> {
818    let inner = Vec::from(*peer_id);
819    Ok(FuelPeerId::from(inner))
820}
821
822#[async_trait::async_trait]
823impl<V, T> RunnableService for UninitializedTask<V, SharedState, T>
824where
825    V: AtomicView + 'static,
826    V::LatestView: P2pDb,
827    T: TxPool + 'static,
828{
829    const NAME: &'static str = "P2P";
830
831    type SharedData = SharedState;
832    type Task = Task<FuelP2PService, V, SharedState, T>;
833    type TaskParams = ();
834
835    fn shared_data(&self) -> Self::SharedData {
836        self.broadcast.clone()
837    }
838
839    async fn into_task(
840        mut self,
841        _: &StateWatcher,
842        _: Self::TaskParams,
843    ) -> anyhow::Result<Self::Task> {
844        let Self {
845            chain_id,
846            last_height,
847            view_provider,
848            next_block_height,
849            request_receiver,
850            broadcast,
851            tx_pool,
852            config,
853        } = self;
854
855        let view = view_provider.latest_view()?;
856        let genesis = view.get_genesis()?;
857        let config = config.init(genesis)?;
858        let Config {
859            max_block_size,
860            max_headers_per_request,
861            max_txs_per_request,
862            heartbeat_check_interval,
863            heartbeat_max_avg_interval,
864            heartbeat_max_time_since_last,
865            database_read_threads,
866            tx_pool_threads,
867            metrics,
868            cache_size,
869            ..
870        } = config;
871
872        // Hardcoded for now, but left here to be configurable in the future.
873        // TODO: https://github.com/FuelLabs/fuel-core/issues/1340
874        let heartbeat_peer_reputation_config = HeartbeatPeerReputationConfig {
875            old_heartbeat_penalty: -5.,
876            low_heartbeat_frequency_penalty: -5.,
877        };
878
879        let response_timeout = config.set_request_timeout;
880        let mut p2p_service = FuelP2PService::new(
881            broadcast.reserved_peers_broadcast.clone(),
882            config,
883            GossipsubMessageHandler::new(),
884            RequestResponseMessageHandler::new(max_block_size),
885        )
886        .await?;
887        p2p_service.update_block_height(last_height);
888        p2p_service.start().await?;
889
890        let next_check_time =
891            Instant::now().checked_add(heartbeat_check_interval).expect(
892                "The heartbeat check interval should be small enough to do frequently",
893            );
894        let db_heavy_task_processor = SyncProcessor::new(
895            "P2P_DatabaseProcessor",
896            database_read_threads,
897            1024 * 10,
898        )?;
899        let tx_pool_heavy_task_processor =
900            AsyncProcessor::new("P2P_TxPoolLookUpProcessor", tx_pool_threads, 32)?;
901        let request_sender = broadcast.request_sender.clone();
902
903        let task = Task {
904            chain_id,
905            response_timeout,
906            p2p_service,
907            view_provider,
908            request_receiver,
909            request_sender,
910            next_block_height,
911            broadcast,
912            tx_pool,
913            db_heavy_task_processor,
914            tx_pool_heavy_task_processor,
915            max_headers_per_request,
916            max_txs_per_request,
917            heartbeat_check_interval,
918            heartbeat_max_avg_interval,
919            heartbeat_max_time_since_last,
920            next_check_time,
921            heartbeat_peer_reputation_config,
922            cached_view: Arc::new(CachedView::new(
923                cache_size
924                    .map(|cache_size| cache_size.into())
925                    .unwrap_or(1_535),
926                metrics,
927            )),
928        };
929        Ok(task)
930    }
931}
932
933// TODO: Add tests https://github.com/FuelLabs/fuel-core/issues/1275
934impl<P, V, B, T> RunnableTask for Task<P, V, B, T>
935where
936    P: TaskP2PService + 'static,
937    V: AtomicView + 'static,
938    V::LatestView: P2pDb,
939    B: Broadcast + 'static,
940    T: TxPool + 'static,
941{
942    async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction {
943        tokio::select! {
944            biased;
945
946            _ = watcher.while_started() => {
947                TaskNextAction::Stop
948            },
949            latest_block_height = self.next_block_height.next() => {
950                if let Some(latest_block_height) = latest_block_height {
951                    let _ = self.p2p_service.update_block_height(latest_block_height);
952                    TaskNextAction::Continue
953                } else {
954                    TaskNextAction::Stop
955                }
956            },
957            next_service_request = self.request_receiver.recv() => {
958                match next_service_request {
959                    Some(TaskRequest::BroadcastTransaction(transaction)) => {
960                        let tx_id = transaction.id(&self.chain_id);
961                        let broadcast = GossipsubBroadcastRequest::NewTx(transaction);
962                        let result = self.p2p_service.publish_message(broadcast);
963                        if let Err(e) = result {
964                            tracing::error!("Got an error during transaction {} broadcasting {}", tx_id, e);
965                        }
966                    }
967                    Some(TaskRequest::BroadcastPreConfirmations(pre_confirmation_message)) => {
968                        let broadcast = GossipsubBroadcastRequest::TxPreConfirmations(pre_confirmation_message);
969                        let result = self.p2p_service.publish_message(broadcast.clone());
970                        if let Err(e) = result {
971                            tracing::error!("Got an error during pre-confirmation message broadcasting {:?}: {}", broadcast, e);
972                        }
973                    }
974                    Some(TaskRequest::GetSealedHeaders { block_height_range, channel}) => {
975                        // Note: this range has already been checked for
976                        // validity in `SharedState::get_sealed_block_headers`.
977                        let height = BlockHeight::from(block_height_range.end.saturating_sub(1));
978                        let Some(peer) = self.p2p_service.get_peer_id_with_height(&height) else {
979                            let _ = channel.send(Err(TaskError::NoPeerFound));
980                            return TaskNextAction::Continue
981                        };
982                        let channel = ResponseSender::SealedHeaders(channel);
983                        let request_msg = RequestMessage::SealedHeaders(block_height_range.clone());
984                        self.p2p_service.send_request_msg(Some(peer), request_msg, channel).expect("We always have a peer here, so send has a target");
985                    }
986                    Some(TaskRequest::GetTransactions {block_height_range, channel }) => {
987                        let height = BlockHeight::from(block_height_range.end.saturating_sub(1));
988                        let Some(peer) = self.p2p_service.get_peer_id_with_height(&height) else {
989                            let _ = channel.send(Err(TaskError::NoPeerFound));
990                            return TaskNextAction::Continue
991                        };
992                        let channel = ResponseSender::Transactions(channel);
993                        let request_msg = RequestMessage::Transactions(block_height_range.clone());
994                        self.p2p_service.send_request_msg(Some(peer), request_msg, channel).expect("We always have a peer here, so send has a target");
995                    }
996                    Some(TaskRequest::GetTransactionsFromPeer { block_height_range, from_peer, channel }) => {
997                        let channel = ResponseSender::TransactionsFromPeer(channel);
998                        let request_msg = RequestMessage::Transactions(block_height_range);
999                        self.p2p_service.send_request_msg(Some(from_peer), request_msg, channel).expect("We always a peer here, so send has a target");
1000                    }
1001                    Some(TaskRequest::TxPoolGetAllTxIds { from_peer, channel }) => {
1002                        let channel = ResponseSender::TxPoolAllTransactionsIds(channel);
1003                        let request_msg = RequestMessage::TxPoolAllTransactionsIds;
1004                        self.p2p_service.send_request_msg(Some(from_peer), request_msg, channel).expect("We always have a peer here, so send has a target");
1005                    }
1006                    Some(TaskRequest::TxPoolGetFullTransactions { tx_ids, from_peer, channel }) => {
1007                        let channel = ResponseSender::TxPoolFullTransactions(channel);
1008                        let request_msg = RequestMessage::TxPoolFullTransactions(tx_ids);
1009                        self.p2p_service.send_request_msg(Some(from_peer), request_msg, channel).expect("We always have a peer here, so send has a target");
1010                    }
1011                    Some(TaskRequest::RespondWithGossipsubMessageReport((message, acceptance))) => {
1012                        let res = self.p2p_service.report_message(message, acceptance);
1013                        if let Err(err) = res {
1014                            return TaskNextAction::ErrorContinue(err)
1015                        }
1016                    }
1017                    Some(TaskRequest::RespondWithPeerReport { peer_id, score, reporting_service }) => {
1018                        let _ = self.p2p_service.report_peer(peer_id, score, reporting_service);
1019                    }
1020                    Some(TaskRequest::GetAllPeerInfo { channel }) => {
1021                        let peers = self.p2p_service.get_all_peer_info()
1022                            .into_iter()
1023                            .map(|(id, info)| (*id, info.clone()))
1024                            .collect::<Vec<_>>();
1025                        let _ = channel.send(peers);
1026                    }
1027                    Some(TaskRequest::DatabaseTransactionsLookUp { response, request_id }) => {
1028                        let _ = self.p2p_service.send_response_msg(request_id, V2ResponseMessage::Transactions(response));
1029                    }
1030                    Some(TaskRequest::DatabaseHeaderLookUp { response, request_id }) => {
1031                        let _ = self.p2p_service.send_response_msg(request_id, V2ResponseMessage::SealedHeaders(response));
1032                    }
1033                    Some(TaskRequest::TxPoolAllTransactionsIds { response, request_id }) => {
1034                        let _ = self.p2p_service.send_response_msg(request_id, V2ResponseMessage::TxPoolAllTransactionsIds(response));
1035                    }
1036                    Some(TaskRequest::TxPoolFullTransactions { response, request_id }) => {
1037                        let _ = self.p2p_service.send_response_msg(request_id, V2ResponseMessage::TxPoolFullTransactions(response));
1038                    }
1039                    None => {
1040                        tracing::error!("The P2P `Task` should be holder of the `Sender`");
1041                        return TaskNextAction::Stop
1042                    }
1043                }
1044                    TaskNextAction::Continue
1045            }
1046            p2p_event = self.p2p_service.next_event() => {
1047                match p2p_event {
1048                    Some(FuelP2PEvent::PeerInfoUpdated { peer_id, block_height }) => {
1049                        let peer_id: Vec<u8> = peer_id.into();
1050                        let block_height_data = BlockHeightHeartbeatData {
1051                            peer_id: peer_id.into(),
1052                            block_height,
1053                        };
1054
1055                        let _ = self.broadcast.block_height_broadcast(block_height_data);
1056                    }
1057                    Some(FuelP2PEvent::GossipsubMessage { message, message_id, peer_id,.. }) => {
1058                        tracing::debug!("Received gossip message from peer {:?}", peer_id);
1059                        self.broadcast_gossip_message(message, message_id, peer_id);
1060                    },
1061                    Some(FuelP2PEvent::InboundRequestMessage { request_message, request_id }) => {
1062                        let res = self.process_request(request_message, request_id);
1063                        if let Err(err) = res {
1064                            return TaskNextAction::ErrorContinue(err)
1065                        }
1066                    },
1067                    Some(FuelP2PEvent::NewSubscription { peer_id, tag }) => {
1068                        if tag == GossipTopicTag::NewTx {
1069                            let _ = self.broadcast.new_tx_subscription_broadcast(FuelPeerId::from(peer_id.to_bytes()));
1070                        }
1071                    },
1072                    _ => (),
1073                }
1074                TaskNextAction::Continue
1075            },
1076            _  = tokio::time::sleep_until(self.next_check_time) => {
1077                let res = self.peer_heartbeat_reputation_checks();
1078                match res {
1079                    Ok(_) => tracing::debug!("Peer heartbeat reputation checks completed"),
1080                    Err(e) => {
1081                        tracing::error!("Failed to perform peer heartbeat reputation checks: {:?}", e);
1082                    }
1083                }
1084
1085                if let Some(next_check_time) = self.next_check_time.checked_add(self.heartbeat_check_interval) {
1086                    self.next_check_time = next_check_time;
1087                    TaskNextAction::Continue
1088                } else {
1089                    tracing::error!("Next check time overflowed");
1090                    TaskNextAction::Stop
1091                }
1092            }
1093        }
1094    }
1095
1096    async fn shutdown(self) -> anyhow::Result<()> {
1097        // Nothing to shut down because we don't have any temporary state that should be dumped,
1098        // and we don't spawn any sub-tasks that we need to finish or await.
1099
1100        // `FuelP2PService` doesn't support graceful shutdown(with informing of connected peers).
1101        // https://github.com/libp2p/specs/blob/master/ROADMAP.md#%EF%B8%8F-polite-peering
1102        // Dropping of the `FuelP2PService` will close all connections.
1103
1104        Ok(())
1105    }
1106}
1107
1108#[derive(Clone)]
1109pub struct SharedState {
1110    /// Sender of p2p with peer gossip subscription (`Vec<u8>` represent the peer_id)
1111    new_tx_subscription_broadcast: broadcast::Sender<FuelPeerId>,
1112    /// Sender of p2p transaction used for subscribing.
1113    tx_broadcast: broadcast::Sender<TransactionGossipData>,
1114    /// Sender of p2p transaction preconfirmations used for subscribing.
1115    pre_confirmations_broadcast: broadcast::Sender<P2PPreConfirmationGossipData>,
1116    /// Sender of reserved peers connection updates.
1117    reserved_peers_broadcast: broadcast::Sender<usize>,
1118    /// Used for communicating with the `Task`.
1119    request_sender: mpsc::Sender<TaskRequest>,
1120    /// Sender of p2p block height data
1121    block_height_broadcast: broadcast::Sender<BlockHeightHeartbeatData>,
1122    /// Max txs per request
1123    max_txs_per_request: usize,
1124}
1125
1126impl SharedState {
1127    pub fn notify_gossip_transaction_validity(
1128        &self,
1129        message_info: GossipsubMessageInfo,
1130        acceptance: GossipsubMessageAcceptance,
1131    ) -> anyhow::Result<()> {
1132        self.request_sender
1133            .try_send(TaskRequest::RespondWithGossipsubMessageReport((
1134                message_info,
1135                acceptance,
1136            )))?;
1137        Ok(())
1138    }
1139
1140    pub async fn get_sealed_block_headers(
1141        &self,
1142        block_height_range: Range<u32>,
1143    ) -> anyhow::Result<(Vec<u8>, Option<Vec<SealedBlockHeader>>)> {
1144        let (sender, receiver) = oneshot::channel();
1145
1146        if block_height_range.is_empty() {
1147            return Err(anyhow!(
1148                "Cannot retrieve headers for an empty range of block heights"
1149            ));
1150        }
1151
1152        self.request_sender
1153            .send(TaskRequest::GetSealedHeaders {
1154                block_height_range,
1155                channel: sender,
1156            })
1157            .await?;
1158
1159        let (peer_id, response) = receiver
1160            .await
1161            .map_err(|e| anyhow!("{e}"))?
1162            .map_err(|e| anyhow!("{e}"))?;
1163
1164        let data = response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?;
1165        if let Err(ref response_error_code) = data {
1166            warn!(
1167                "Peer {peer_id:?} failed to respond with sealed headers: {response_error_code:?}"
1168            );
1169        };
1170
1171        Ok((peer_id.to_bytes(), data.ok()))
1172    }
1173
1174    pub async fn get_transactions(
1175        &self,
1176        range: Range<u32>,
1177    ) -> anyhow::Result<(Vec<u8>, Option<Vec<Transactions>>)> {
1178        let (sender, receiver) = oneshot::channel();
1179
1180        if range.is_empty() {
1181            return Err(anyhow!(
1182                "Cannot retrieve transactions for an empty range of block heights"
1183            ));
1184        }
1185
1186        self.request_sender
1187            .send(TaskRequest::GetTransactions {
1188                block_height_range: range,
1189                channel: sender,
1190            })
1191            .await?;
1192
1193        let (peer_id, response) = receiver
1194            .await
1195            .map_err(|e| anyhow!("{e}"))?
1196            .map_err(|e| anyhow!("{e}"))?;
1197
1198        let data = match response {
1199            Err(request_response_protocol_error) => Err(anyhow!(
1200                "Invalid response from peer {request_response_protocol_error:?}"
1201            )),
1202            Ok(Err(response_error_code)) => {
1203                warn!(
1204                    "Peer {peer_id:?} failed to respond with sealed headers: {response_error_code:?}"
1205                );
1206                Ok(None)
1207            }
1208            Ok(Ok(headers)) => Ok(Some(headers)),
1209        };
1210        data.map(|data| (peer_id.to_bytes(), data))
1211    }
1212
1213    pub async fn get_transactions_from_peer(
1214        &self,
1215        peer_id: FuelPeerId,
1216        range: Range<u32>,
1217    ) -> anyhow::Result<Option<Vec<Transactions>>> {
1218        let (sender, receiver) = oneshot::channel();
1219        let from_peer = PeerId::from_bytes(peer_id.as_ref()).expect("Valid PeerId");
1220
1221        let request = TaskRequest::GetTransactionsFromPeer {
1222            block_height_range: range,
1223            from_peer,
1224            channel: sender,
1225        };
1226        self.request_sender.send(request).await?;
1227
1228        let (response_from_peer, response) =
1229            receiver.await.map_err(|e| anyhow!("{e}"))?;
1230        assert_eq!(
1231            peer_id.as_ref(),
1232            response_from_peer.to_bytes(),
1233            "Bug: response from non-requested peer"
1234        );
1235
1236        match response {
1237            Err(request_response_protocol_error) => Err(anyhow!(
1238                "Invalid response from peer {request_response_protocol_error:?}"
1239            )),
1240            Ok(Err(response_error_code)) => {
1241                warn!(
1242                    "Peer {peer_id:?} failed to respond with transactions: {response_error_code:?}"
1243                );
1244                Ok(None)
1245            }
1246            Ok(Ok(txs)) => Ok(Some(txs)),
1247        }
1248    }
1249
1250    pub async fn get_all_transactions_ids_from_peer(
1251        &self,
1252        peer_id: FuelPeerId,
1253    ) -> anyhow::Result<Vec<TxId>> {
1254        let (sender, receiver) = oneshot::channel();
1255        let from_peer = PeerId::from_bytes(peer_id.as_ref()).expect("Valid PeerId");
1256        let request = TaskRequest::TxPoolGetAllTxIds {
1257            from_peer,
1258            channel: sender,
1259        };
1260        self.request_sender.try_send(request)?;
1261
1262        let (response_from_peer, response) =
1263            receiver.await.map_err(|e| anyhow!("{e}"))?;
1264
1265        debug_assert_eq!(
1266            peer_id.as_ref(),
1267            response_from_peer.to_bytes(),
1268            "Bug: response from non-requested peer"
1269        );
1270
1271        let response =
1272            response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?;
1273
1274        let txs = response.inspect_err(|e| { warn!("Peer {peer_id:?} could not response to request to get all transactions ids: {e:?}"); } ).unwrap_or_default();
1275
1276        if txs.len() > self.max_txs_per_request {
1277            return Err(anyhow!("Too many transactions requested: {}", txs.len()));
1278        }
1279        Ok(txs)
1280    }
1281
1282    pub async fn get_full_transactions_from_peer(
1283        &self,
1284        peer_id: FuelPeerId,
1285        tx_ids: Vec<TxId>,
1286    ) -> anyhow::Result<Vec<Option<Transaction>>> {
1287        let (sender, receiver) = oneshot::channel();
1288        let from_peer = PeerId::from_bytes(peer_id.as_ref()).expect("Valid PeerId");
1289        let request = TaskRequest::TxPoolGetFullTransactions {
1290            tx_ids,
1291            from_peer,
1292            channel: sender,
1293        };
1294        self.request_sender.try_send(request)?;
1295
1296        let (response_from_peer, response) =
1297            receiver.await.map_err(|e| anyhow!("{e}"))?;
1298        debug_assert_eq!(
1299            peer_id.as_ref(),
1300            response_from_peer.to_bytes(),
1301            "Bug: response from non-requested peer"
1302        );
1303
1304        let response =
1305            response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?;
1306        let txs = response.inspect_err(|e| { warn!("Peer {peer_id:?} could not response to request to get full transactions: {e:?}"); } ).unwrap_or_default();
1307
1308        if txs.len() > self.max_txs_per_request {
1309            return Err(anyhow!("Too many transactions requested: {}", txs.len()));
1310        }
1311        txs.into_iter()
1312            .map(|tx| {
1313                tx.map(Transaction::try_from)
1314                    .transpose()
1315                    .map_err(|err| anyhow::anyhow!(err))
1316            })
1317            .collect()
1318    }
1319
1320    pub fn broadcast_transaction(
1321        &self,
1322        transaction: Arc<Transaction>,
1323    ) -> anyhow::Result<()> {
1324        self.request_sender
1325            .try_send(TaskRequest::BroadcastTransaction(transaction))?;
1326        Ok(())
1327    }
1328
1329    pub fn broadcast_preconfirmations(
1330        &self,
1331        preconfirmations: Arc<P2PPreConfirmationMessage>,
1332    ) -> anyhow::Result<()> {
1333        self.request_sender
1334            .try_send(TaskRequest::BroadcastPreConfirmations(preconfirmations))?;
1335        Ok(())
1336    }
1337
1338    pub async fn get_all_peers(&self) -> anyhow::Result<Vec<(PeerId, PeerInfo)>> {
1339        let (sender, receiver) = oneshot::channel();
1340
1341        self.request_sender
1342            .send(TaskRequest::GetAllPeerInfo { channel: sender })
1343            .await?;
1344
1345        receiver.await.map_err(|e| anyhow!("{}", e))
1346    }
1347
1348    pub fn subscribe_new_peers(&self) -> broadcast::Receiver<FuelPeerId> {
1349        self.new_tx_subscription_broadcast.subscribe()
1350    }
1351
1352    pub fn subscribe_tx(&self) -> broadcast::Receiver<TransactionGossipData> {
1353        self.tx_broadcast.subscribe()
1354    }
1355
1356    pub fn subscribe_preconfirmations(
1357        &self,
1358    ) -> broadcast::Receiver<P2PPreConfirmationGossipData> {
1359        self.pre_confirmations_broadcast.subscribe()
1360    }
1361
1362    pub fn subscribe_block_height(
1363        &self,
1364    ) -> broadcast::Receiver<BlockHeightHeartbeatData> {
1365        self.block_height_broadcast.subscribe()
1366    }
1367
1368    pub fn subscribe_reserved_peers_count(&self) -> broadcast::Receiver<usize> {
1369        self.reserved_peers_broadcast.subscribe()
1370    }
1371
1372    pub fn report_peer<T: PeerReport>(
1373        &self,
1374        peer_id: FuelPeerId,
1375        peer_report: T,
1376        reporting_service: &'static str,
1377    ) -> anyhow::Result<()> {
1378        match Vec::from(peer_id).try_into() {
1379            Ok(peer_id) => {
1380                let score = peer_report.get_score_from_report();
1381
1382                self.request_sender
1383                    .try_send(TaskRequest::RespondWithPeerReport {
1384                        peer_id,
1385                        score,
1386                        reporting_service,
1387                    })?;
1388
1389                Ok(())
1390            }
1391            Err(e) => {
1392                warn!(target: "fuel-p2p", "Failed to read PeerId from {e:?}");
1393                Err(anyhow::anyhow!("Failed to read PeerId from {e:?}"))
1394            }
1395        }
1396    }
1397}
1398
1399pub fn build_shared_state(
1400    config: Config<NotInitialized>,
1401) -> (SharedState, Receiver<TaskRequest>) {
1402    let (request_sender, request_receiver) = mpsc::channel(CHANNEL_SIZE);
1403    let (tx_broadcast, _) = broadcast::channel(CHANNEL_SIZE);
1404    let (preconfirmations_broadcast, _) = broadcast::channel(CHANNEL_SIZE);
1405    let (new_tx_subscription_broadcast, _) = broadcast::channel(CHANNEL_SIZE);
1406    let (block_height_broadcast, _) = broadcast::channel(CHANNEL_SIZE);
1407
1408    let (reserved_peers_broadcast, _) = broadcast::channel::<usize>(
1409        config
1410            .reserved_nodes
1411            .len()
1412            .saturating_mul(2)
1413            .saturating_add(1),
1414    );
1415
1416    (
1417        SharedState {
1418            request_sender,
1419            new_tx_subscription_broadcast,
1420            tx_broadcast,
1421            pre_confirmations_broadcast: preconfirmations_broadcast,
1422            reserved_peers_broadcast,
1423            block_height_broadcast,
1424            max_txs_per_request: config.max_txs_per_request,
1425        },
1426        request_receiver,
1427    )
1428}
1429
1430#[allow(clippy::too_many_arguments)]
1431pub fn new_service<V, B, T>(
1432    chain_id: ChainId,
1433    last_height: BlockHeight,
1434    p2p_config: Config<NotInitialized>,
1435    shared_state: SharedState,
1436    request_receiver: Receiver<TaskRequest>,
1437    view_provider: V,
1438    block_importer: B,
1439    tx_pool: T,
1440) -> Service<V, T>
1441where
1442    V: AtomicView + 'static,
1443    V::LatestView: P2pDb,
1444    B: BlockHeightImporter,
1445    T: TxPool,
1446{
1447    let task = UninitializedTask::new(
1448        chain_id,
1449        last_height,
1450        p2p_config,
1451        shared_state,
1452        request_receiver,
1453        view_provider,
1454        block_importer,
1455        tx_pool,
1456    );
1457    Service::new(task)
1458}
1459
1460pub fn to_message_acceptance(
1461    acceptance: &GossipsubMessageAcceptance,
1462) -> MessageAcceptance {
1463    match acceptance {
1464        GossipsubMessageAcceptance::Accept => MessageAcceptance::Accept,
1465        GossipsubMessageAcceptance::Reject => MessageAcceptance::Reject,
1466        GossipsubMessageAcceptance::Ignore => MessageAcceptance::Ignore,
1467    }
1468}
1469
1470fn report_message(
1471    p2p_service: &mut FuelP2PService,
1472    message: GossipsubMessageInfo,
1473    acceptance: GossipsubMessageAcceptance,
1474) {
1475    let GossipsubMessageInfo {
1476        peer_id,
1477        message_id,
1478    } = message;
1479
1480    let msg_id = message_id.into();
1481    let peer_id: Vec<u8> = peer_id.into();
1482
1483    if let Ok(peer_id) = peer_id.try_into() {
1484        let acceptance = to_message_acceptance(&acceptance);
1485        p2p_service.report_message_validation_result(&msg_id, peer_id, acceptance);
1486    } else {
1487        warn!(target: "fuel-p2p", "Failed to read PeerId from received GossipsubMessageId: {}", msg_id);
1488    }
1489}