Skip to main content

lumina_node/
p2p.rs

1//! Component responsible for the messaging and interacting with other nodes on the peer to peer layer.
2//!
3//! It is a high level integration of various p2p protocols used by Celestia nodes.
4//! Currently supporting:
5//! - libp2p-identify
6//! - libp2p-kad
7//! - libp2p-autonat
8//! - libp2p-ping
9//! - header-sub topic on libp2p-gossipsub
10//! - fraud-sub topic on libp2p-gossipsub
11//! - header-ex client
12//! - header-ex server
13//! - bitswap 1.2.0
14//! - shwap - celestia's data availability protocol on top of bitswap
15
16use std::collections::HashMap;
17use std::future::poll_fn;
18use std::sync::Arc;
19use std::task::Poll;
20use std::time::Duration;
21
22use blockstore::Blockstore;
23use blockstore::block::CidError;
24use celestia_proto::p2p::pb::{HeaderRequest, header_request};
25use celestia_types::fraud_proof::BadEncodingFraudProof;
26use celestia_types::hash::Hash;
27use celestia_types::namespace_data::NamespaceData;
28use celestia_types::nmt::Namespace;
29use celestia_types::row::{Row, RowId};
30use celestia_types::row_namespace_data::{RowNamespaceData, RowNamespaceDataId};
31use celestia_types::sample::{Sample, SampleId};
32use celestia_types::{Blob, ExtendedDataSquare, ExtendedHeader, FraudProof};
33use cid::Cid;
34use libp2p::gossipsub::TopicHash;
35use libp2p::identity::Keypair;
36use libp2p::swarm::{NetworkBehaviour, NetworkInfo};
37use libp2p::{Multiaddr, PeerId, gossipsub};
38use lumina_utils::executor::{JoinHandle, spawn};
39use lumina_utils::time::{self, Interval};
40use lumina_utils::token::Token;
41use smallvec::SmallVec;
42use tendermint_proto::Protobuf;
43use tokio::select;
44use tokio::sync::{mpsc, oneshot, watch};
45use tokio_util::sync::CancellationToken;
46use tracing::{debug, info, instrument, trace, warn};
47
48mod connection_control;
49mod header_ex;
50pub(crate) mod header_session;
51mod shrex;
52pub(crate) mod shwap;
53mod swarm;
54mod swarm_manager;
55mod utils;
56
57use crate::block_ranges::BlockRange;
58use crate::events::EventPublisher;
59use crate::p2p::header_session::HeaderSession;
60use crate::p2p::shwap::{ShwapMultihasher, convert_cid, get_block_container};
61use crate::p2p::swarm_manager::SwarmManager;
62use crate::peer_tracker::PeerTracker;
63use crate::peer_tracker::PeerTrackerInfo;
64use crate::store::{Store, StoreError};
65use crate::utils::{
66    MultiaddrExt, OneshotResultSender, OneshotResultSenderExt, OneshotSenderExt,
67    celestia_protocol_id, fraudsub_ident_topic, gossipsub_ident_topic,
68};
69
70pub use crate::p2p::header_ex::HeaderExError;
71pub use crate::p2p::shrex::ShrExError;
72
73// Maximum size of a [`Multihash`].
74pub(crate) const MAX_MH_SIZE: usize = 64;
75
76// all fraud proofs for height bigger than head height by this threshold
77// will be ignored
78const FRAUD_PROOF_HEAD_HEIGHT_THRESHOLD: u64 = 20;
79
80pub(crate) type Result<T, E = P2pError> = std::result::Result<T, E>;
81
82/// Representation of all the errors that can occur in `P2p` component.
83#[derive(Debug, thiserror::Error)]
84pub enum P2pError {
85    /// Failed to initialize gossipsub behaviour.
86    #[error("Failed to initialize gossipsub behaviour: {0}")]
87    GossipsubInit(String),
88
89    /// Failed to initialize TLS.
90    #[error("Failed to initialize TLS: {0}")]
91    TlsInit(String),
92
93    /// Failed to initialize noise protocol.
94    #[error("Failed to initialize noise: {0}")]
95    NoiseInit(String),
96
97    /// The worker has died.
98    #[error("Worker died")]
99    WorkerDied,
100
101    /// Channel closed unexpectedly.
102    #[error("Channel closed unexpectedly")]
103    ChannelClosedUnexpectedly,
104
105    /// An error propagated from the `header-ex`.
106    #[error("HeaderEx: {0}")]
107    HeaderEx(#[from] HeaderExError),
108
109    /// An error propagated from the `shr-ex`.
110    #[error("ShrEx: {0}")]
111    ShrEx(#[from] ShrExError),
112
113    /// Bootnode address is missing its peer ID.
114    #[error("Bootnode multiaddrs without peer ID: {0:?}")]
115    BootnodeAddrsWithoutPeerId(Vec<Multiaddr>),
116
117    /// An error propagated from [`beetswap::Behaviour`].
118    #[error("Bitswap: {0}")]
119    Bitswap(#[from] beetswap::Error),
120
121    /// Protobuf message failed to be decoded.
122    #[error("ProtoBuf decoding error: {0}")]
123    ProtoDecodeFailed(#[from] tendermint_proto::Error),
124
125    /// An error propagated from [`celestia_types`] that is related to [`Cid`].
126    #[error("CID error: {0}")]
127    Cid(CidError),
128
129    /// Request timed out.
130    #[error("Request timed out")]
131    RequestTimedOut,
132
133    /// Shwap protocol error.
134    #[error("Shwap: {0}")]
135    Shwap(String),
136
137    /// An error propagated from [`celestia_types`].
138    #[error(transparent)]
139    CelestiaTypes(#[from] celestia_types::Error),
140
141    /// An error propagated from [`Store`].
142    #[error("Store error: {0}")]
143    Store(#[from] StoreError),
144
145    /// Header was pruned.
146    #[error("Header of {0} block was pruned because it is outside of retention period")]
147    HeaderPruned(u64),
148
149    /// Header not synced yet.
150    #[error("Header of {0} block is not synced yet")]
151    HeaderNotSynced(u64),
152}
153
154impl P2pError {
155    /// Returns `true` if an error is fatal in all possible scenarios.
156    ///
157    /// If unsure mark it as non-fatal error.
158    pub(crate) fn is_fatal(&self) -> bool {
159        match self {
160            P2pError::GossipsubInit(_)
161            | P2pError::NoiseInit(_)
162            | P2pError::TlsInit(_)
163            | P2pError::WorkerDied
164            | P2pError::ChannelClosedUnexpectedly
165            | P2pError::BootnodeAddrsWithoutPeerId(_) => true,
166            P2pError::HeaderEx(_)
167            | P2pError::ShrEx(_)
168            | P2pError::Bitswap(_)
169            | P2pError::ProtoDecodeFailed(_)
170            | P2pError::Cid(_)
171            | P2pError::RequestTimedOut
172            | P2pError::Shwap(_)
173            | P2pError::CelestiaTypes(_)
174            | P2pError::HeaderPruned(_)
175            | P2pError::HeaderNotSynced(_) => false,
176            P2pError::Store(e) => e.is_fatal(),
177        }
178    }
179}
180
181impl From<oneshot::error::RecvError> for P2pError {
182    fn from(_value: oneshot::error::RecvError) -> Self {
183        P2pError::ChannelClosedUnexpectedly
184    }
185}
186
187impl From<prost::DecodeError> for P2pError {
188    fn from(value: prost::DecodeError) -> Self {
189        P2pError::ProtoDecodeFailed(tendermint_proto::Error::decode_message(value))
190    }
191}
192
193impl From<cid::Error> for P2pError {
194    fn from(value: cid::Error) -> Self {
195        P2pError::Cid(CidError::InvalidCid(value.to_string()))
196    }
197}
198
199/// Component responsible for the peer to peer networking handling.
200#[derive(Debug)]
201pub(crate) struct P2p {
202    cancellation_token: CancellationToken,
203    cmd_tx: mpsc::Sender<P2pCmd>,
204    join_handle: JoinHandle,
205    peer_tracker_info_watcher: watch::Receiver<PeerTrackerInfo>,
206    local_peer_id: PeerId,
207}
208
209/// Arguments used to configure the [`P2p`].
210pub struct P2pArgs<B, S>
211where
212    B: Blockstore,
213    S: Store,
214{
215    /// An id of the network to connect to.
216    pub network_id: String,
217    /// The keypair to be used as the identity.
218    pub local_keypair: Keypair,
219    /// List of bootstrap nodes to connect to and trust.
220    pub bootnodes: Vec<Multiaddr>,
221    /// List of the addresses on which to listen for incoming connections.
222    pub listen_on: Vec<Multiaddr>,
223    /// The store for headers.
224    pub blockstore: Arc<B>,
225    /// The store for headers.
226    pub store: Arc<S>,
227    /// Event publisher.
228    pub event_pub: EventPublisher,
229}
230
231#[derive(Debug)]
232pub(crate) enum P2pCmd {
233    NetworkInfo {
234        respond_to: oneshot::Sender<NetworkInfo>,
235    },
236    HeaderExRequest {
237        request: HeaderRequest,
238        respond_to: OneshotResultSender<Vec<ExtendedHeader>, P2pError>,
239    },
240    Listeners {
241        respond_to: oneshot::Sender<Vec<Multiaddr>>,
242    },
243    ConnectedPeers {
244        respond_to: oneshot::Sender<Vec<PeerId>>,
245    },
246    InitHeaderSub {
247        head: Box<ExtendedHeader>,
248        /// Any valid headers received by header-sub will be send to this channel.
249        channel: mpsc::Sender<ExtendedHeader>,
250    },
251    SetPeerTrust {
252        peer_id: PeerId,
253        is_trusted: bool,
254    },
255    #[cfg(any(test, feature = "test-utils"))]
256    MarkAsArchival {
257        peer_id: PeerId,
258    },
259    GetShwapCid {
260        cid: Cid,
261        respond_to: OneshotResultSender<Vec<u8>, P2pError>,
262    },
263    GetNetworkCompromisedToken {
264        respond_to: oneshot::Sender<Token>,
265    },
266    GetNetworkHead {
267        respond_to: oneshot::Sender<Option<ExtendedHeader>>,
268    },
269    // This is dead code because `get_row` still uses Bitswap.
270    // We can use this when celestia-node#4288 is merged.
271    #[allow(dead_code)]
272    GetRow {
273        row_index: u16,
274        block_height: u64,
275        respond_to: OneshotResultSender<Row, P2pError>,
276    },
277    // This is dead code because `get_sample` still uses Bitswap.
278    // We can use this when celestia-node#4288 is merged.
279    #[allow(dead_code)]
280    GetSample {
281        row_index: u16,
282        column_index: u16,
283        block_height: u64,
284        respond_to: OneshotResultSender<Sample, P2pError>,
285    },
286    GetNamespaceData {
287        namespace: Namespace,
288        block_height: u64,
289        respond_to: OneshotResultSender<NamespaceData, P2pError>,
290    },
291    GetEds {
292        block_height: u64,
293        respond_to: OneshotResultSender<ExtendedDataSquare, P2pError>,
294    },
295}
296
297impl P2p {
298    /// Creates and starts a new p2p handler.
299    pub async fn start<B, S>(args: P2pArgs<B, S>) -> Result<Self>
300    where
301        B: Blockstore + 'static,
302        S: Store + 'static,
303    {
304        validate_bootnode_addrs(&args.bootnodes)?;
305
306        let local_peer_id = PeerId::from(args.local_keypair.public());
307
308        let peer_tracker = PeerTracker::new(args.event_pub.clone());
309        let peer_tracker_info_watcher = peer_tracker.info_watcher();
310
311        let cancellation_token = CancellationToken::new();
312        let (cmd_tx, cmd_rx) = mpsc::channel(16);
313
314        let mut worker =
315            Worker::new(args, cancellation_token.child_token(), cmd_rx, peer_tracker).await?;
316
317        let join_handle = spawn(async move {
318            worker.run().await;
319        });
320
321        Ok(P2p {
322            cancellation_token,
323            cmd_tx,
324            join_handle,
325            peer_tracker_info_watcher,
326            local_peer_id,
327        })
328    }
329
330    /// Creates and starts a new mocked p2p handler.
331    #[cfg(test)]
332    pub fn mocked() -> (Self, crate::test_utils::MockP2pHandle) {
333        let (cmd_tx, cmd_rx) = mpsc::channel(16);
334        let (peer_tracker_tx, peer_tracker_rx) = watch::channel(PeerTrackerInfo::default());
335        let cancellation_token = CancellationToken::new();
336
337        // Just a fake join_handle
338        let join_handle = spawn(async {});
339
340        let p2p = P2p {
341            cmd_tx: cmd_tx.clone(),
342            cancellation_token,
343            join_handle,
344            peer_tracker_info_watcher: peer_tracker_rx,
345            local_peer_id: PeerId::random(),
346        };
347
348        let handle = crate::test_utils::MockP2pHandle {
349            cmd_tx,
350            cmd_rx,
351            header_sub_tx: None,
352            peer_tracker_tx,
353        };
354
355        (p2p, handle)
356    }
357
358    /// Stop the worker.
359    pub fn stop(&self) {
360        // Signal the Worker to stop.
361        self.cancellation_token.cancel();
362    }
363
364    /// Wait until worker is completely stopped.
365    pub async fn join(&self) {
366        self.join_handle.join().await;
367    }
368
369    /// Local peer ID on the p2p network.
370    pub fn local_peer_id(&self) -> &PeerId {
371        &self.local_peer_id
372    }
373
374    async fn send_command(&self, cmd: P2pCmd) -> Result<()> {
375        self.cmd_tx
376            .send(cmd)
377            .await
378            .map_err(|_| P2pError::WorkerDied)
379    }
380
381    /// Watcher for the current [`PeerTrackerInfo`].
382    pub fn peer_tracker_info_watcher(&self) -> watch::Receiver<PeerTrackerInfo> {
383        self.peer_tracker_info_watcher.clone()
384    }
385
386    /// A reference to the current [`PeerTrackerInfo`].
387    pub fn peer_tracker_info(&self) -> watch::Ref<'_, PeerTrackerInfo> {
388        self.peer_tracker_info_watcher.borrow()
389    }
390
391    /// Initializes `header-sub` protocol with a given `subjective_head`.
392    pub async fn init_header_sub(
393        &self,
394        head: ExtendedHeader,
395        channel: mpsc::Sender<ExtendedHeader>,
396    ) -> Result<()> {
397        self.send_command(P2pCmd::InitHeaderSub {
398            head: Box::new(head),
399            channel,
400        })
401        .await
402    }
403
404    /// Wait until the node is connected to any peer.
405    pub async fn wait_connected(&self) -> Result<()> {
406        self.peer_tracker_info_watcher()
407            .wait_for(|info| info.num_connected_peers > 0)
408            .await
409            .map(drop)
410            .map_err(|_| P2pError::WorkerDied)
411    }
412
413    /// Wait until the node is connected to any trusted peer.
414    pub async fn wait_connected_trusted(&self) -> Result<()> {
415        self.peer_tracker_info_watcher()
416            .wait_for(|info| info.num_connected_trusted_peers > 0)
417            .await
418            .map(drop)
419            .map_err(|_| P2pError::WorkerDied)
420    }
421
422    /// Get current [`NetworkInfo`].
423    pub async fn network_info(&self) -> Result<NetworkInfo> {
424        let (tx, rx) = oneshot::channel();
425
426        self.send_command(P2pCmd::NetworkInfo { respond_to: tx })
427            .await?;
428
429        Ok(rx.await?)
430    }
431
432    /// Send a request on the `header-ex` protocol.
433    pub async fn header_ex_request(&self, request: HeaderRequest) -> Result<Vec<ExtendedHeader>> {
434        let (tx, rx) = oneshot::channel();
435
436        self.send_command(P2pCmd::HeaderExRequest {
437            request,
438            respond_to: tx,
439        })
440        .await?;
441
442        rx.await?
443    }
444
445    /// Request the head header on the `header-ex` protocol.
446    pub async fn get_head_header(&self) -> Result<ExtendedHeader> {
447        self.get_header_by_height(0).await
448    }
449
450    /// Request the header by hash on the `header-ex` protocol.
451    pub async fn get_header(&self, hash: Hash) -> Result<ExtendedHeader> {
452        self.header_ex_request(HeaderRequest {
453            data: Some(header_request::Data::Hash(hash.as_bytes().to_vec())),
454            amount: 1,
455        })
456        .await?
457        .into_iter()
458        .next()
459        .ok_or(HeaderExError::HeaderNotFound.into())
460    }
461
462    /// Request the header by height on the `header-ex` protocol.
463    pub async fn get_header_by_height(&self, height: u64) -> Result<ExtendedHeader> {
464        self.header_ex_request(HeaderRequest {
465            data: Some(header_request::Data::Origin(height)),
466            amount: 1,
467        })
468        .await?
469        .into_iter()
470        .next()
471        .ok_or(HeaderExError::HeaderNotFound.into())
472    }
473
474    /// Request the headers following the one given with the `header-ex` protocol.
475    ///
476    /// First header from the requested range will be verified against the provided one,
477    /// then each subsequent is verified against the previous one.
478    pub async fn get_verified_headers_range(
479        &self,
480        from: &ExtendedHeader,
481        amount: u64,
482    ) -> Result<Vec<ExtendedHeader>> {
483        // User can give us a bad header, so validate it.
484        from.validate().map_err(|_| HeaderExError::InvalidRequest)?;
485
486        let height = from.height() + 1;
487
488        let range = height..=height + amount - 1;
489
490        let mut session = HeaderSession::new(range, self.cmd_tx.clone());
491        let headers = session.run().await?;
492
493        // `.validate()` is called on each header separately by `HeaderExClientHandler`.
494        //
495        // The last step is to verify that all headers are from the same chain
496        // and indeed connected with the next one.
497        from.verify_adjacent_range(&headers)
498            .map_err(|_| HeaderExError::InvalidResponse)?;
499
500        Ok(headers)
501    }
502
503    /// Request a list of ranges with the `header-ex` protocol
504    ///
505    /// For each of the ranges, headers are verified against each other, but it's the caller
506    /// responsibility to verify range edges against headers existing in the store.
507    pub(crate) async fn get_unverified_header_range(
508        &self,
509        range: BlockRange,
510    ) -> Result<Vec<ExtendedHeader>> {
511        if range.is_empty() {
512            return Err(HeaderExError::InvalidRequest.into());
513        }
514
515        let mut session = HeaderSession::new(range, self.cmd_tx.clone());
516        let headers = session.run().await?;
517
518        let Some(head) = headers.first() else {
519            return Err(HeaderExError::InvalidResponse.into());
520        };
521
522        head.verify_adjacent_range(&headers[1..])
523            .map_err(|_| HeaderExError::InvalidResponse)?;
524
525        Ok(headers)
526    }
527
528    /// Request a [`Cid`] on bitswap protocol.
529    pub(crate) async fn get_shwap_cid(
530        &self,
531        cid: Cid,
532        timeout: Option<Duration>,
533    ) -> Result<Vec<u8>> {
534        let (tx, rx) = oneshot::channel();
535
536        self.send_command(P2pCmd::GetShwapCid {
537            cid,
538            respond_to: tx,
539        })
540        .await?;
541
542        let data = match timeout {
543            Some(dur) => time::timeout(dur, rx)
544                .await
545                .map_err(|_| P2pError::RequestTimedOut)???,
546            None => rx.await??,
547        };
548
549        get_block_container(&cid, &data)
550    }
551
552    /// Request a [`Row`] on bitswap protocol.
553    pub async fn get_row(
554        &self,
555        row_index: u16,
556        block_height: u64,
557        timeout: Option<Duration>,
558    ) -> Result<Row> {
559        let id = RowId::new(row_index, block_height)?;
560        let cid = convert_cid(&id.into())?;
561
562        let data = self.get_shwap_cid(cid, timeout).await?;
563        let row = Row::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?;
564        Ok(row)
565    }
566
567    /// Request a [`Sample`] on bitswap protocol.
568    pub async fn get_sample(
569        &self,
570        row_index: u16,
571        column_index: u16,
572        block_height: u64,
573        timeout: Option<Duration>,
574    ) -> Result<Sample> {
575        let id = SampleId::new(row_index, column_index, block_height)?;
576        let cid = convert_cid(&id.into())?;
577
578        let data = self.get_shwap_cid(cid, timeout).await?;
579        let sample = Sample::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?;
580        Ok(sample)
581    }
582
583    pub async fn get_eds(
584        &self,
585        block_height: u64,
586        timeout: Option<Duration>,
587    ) -> Result<ExtendedDataSquare> {
588        let (tx, rx) = oneshot::channel();
589
590        self.send_command(P2pCmd::GetEds {
591            block_height,
592            respond_to: tx,
593        })
594        .await?;
595
596        match timeout {
597            Some(dur) => time::timeout(dur, rx)
598                .await
599                .map_err(|_| P2pError::RequestTimedOut)??,
600            None => rx.await?,
601        }
602    }
603
604    /// Request a [`RowNamespaceData`] on bitswap protocol.
605    pub async fn get_row_namespace_data(
606        &self,
607        namespace: Namespace,
608        row_index: u16,
609        block_height: u64,
610        timeout: Option<Duration>,
611    ) -> Result<RowNamespaceData> {
612        let id = RowNamespaceDataId::new(namespace, row_index, block_height)?;
613        let cid = convert_cid(&id.into())?;
614
615        let data = self.get_shwap_cid(cid, timeout).await?;
616        let row_namespace_data =
617            RowNamespaceData::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?;
618        Ok(row_namespace_data)
619    }
620
621    pub async fn get_namespace_data(
622        &self,
623        namespace: Namespace,
624        block_height: u64,
625        timeout: Option<Duration>,
626    ) -> Result<NamespaceData> {
627        let (tx, rx) = oneshot::channel();
628
629        self.send_command(P2pCmd::GetNamespaceData {
630            namespace,
631            block_height,
632            respond_to: tx,
633        })
634        .await?;
635
636        match timeout {
637            Some(dur) => time::timeout(dur, rx)
638                .await
639                .map_err(|_| P2pError::RequestTimedOut)??,
640            None => rx.await?,
641        }
642    }
643
644    /// Request all blobs with provided namespace in the block corresponding to this header
645    /// using bitswap protocol.
646    pub async fn get_all_blobs(
647        &self,
648        namespace: Namespace,
649        block_height: u64,
650        timeout: Option<Duration>,
651    ) -> Result<Vec<Blob>> {
652        let namespace_data = self
653            .get_namespace_data(namespace, block_height, timeout)
654            .await?;
655
656        let shares = namespace_data
657            .rows()
658            .iter()
659            .flat_map(|row| row.shares.iter());
660
661        Ok(Blob::reconstruct_all(shares)?)
662    }
663
664    /// Get the addresses where [`P2p`] listens on for incoming connections.
665    pub async fn listeners(&self) -> Result<Vec<Multiaddr>> {
666        let (tx, rx) = oneshot::channel();
667
668        self.send_command(P2pCmd::Listeners { respond_to: tx })
669            .await?;
670
671        Ok(rx.await?)
672    }
673
674    /// Get the list of connected peers.
675    pub async fn connected_peers(&self) -> Result<Vec<PeerId>> {
676        let (tx, rx) = oneshot::channel();
677
678        self.send_command(P2pCmd::ConnectedPeers { respond_to: tx })
679            .await?;
680
681        Ok(rx.await?)
682    }
683
684    /// Alter the trust status for a given peer.
685    pub async fn set_peer_trust(&self, peer_id: PeerId, is_trusted: bool) -> Result<()> {
686        self.send_command(P2pCmd::SetPeerTrust {
687            peer_id,
688            is_trusted,
689        })
690        .await
691    }
692
693    #[cfg(any(test, feature = "test-utils"))]
694    pub(crate) async fn mark_as_archival(&self, peer_id: PeerId) -> Result<()> {
695        self.send_command(P2pCmd::MarkAsArchival { peer_id }).await
696    }
697
698    /// Get the cancellation token which will be cancelled when the network gets compromised.
699    ///
700    /// After this token is cancelled, the network should be treated as insincere
701    /// and should not be trusted.
702    pub(crate) async fn get_network_compromised_token(&self) -> Result<Token> {
703        let (tx, rx) = oneshot::channel();
704
705        self.send_command(P2pCmd::GetNetworkCompromisedToken { respond_to: tx })
706            .await?;
707
708        Ok(rx.await?)
709    }
710
711    /// Get the latest header announced on the network.
712    pub async fn get_network_head(&self) -> Result<Option<ExtendedHeader>> {
713        let (tx, rx) = oneshot::channel();
714
715        self.send_command(P2pCmd::GetNetworkHead { respond_to: tx })
716            .await?;
717
718        Ok(rx.await?)
719    }
720}
721
722impl Drop for P2p {
723    fn drop(&mut self) {
724        self.stop();
725    }
726}
727
728#[derive(NetworkBehaviour)]
729struct Behaviour<B, S>
730where
731    B: Blockstore + 'static,
732    S: Store + 'static,
733{
734    bitswap: beetswap::Behaviour<MAX_MH_SIZE, B>,
735    header_ex: header_ex::Behaviour<S>,
736    shr_ex: shrex::Behaviour<S>,
737    gossipsub: gossipsub::Behaviour,
738}
739
740struct Worker<B, S>
741where
742    B: Blockstore + 'static,
743    S: Store + 'static,
744{
745    cancellation_token: CancellationToken,
746    swarm: SwarmManager<Behaviour<B, S>>,
747    header_sub_topic_hash: TopicHash,
748    bad_encoding_fraud_sub_topic: TopicHash,
749    cmd_rx: mpsc::Receiver<P2pCmd>,
750    header_sub_state: Option<HeaderSubState>,
751    bitswap_queries: HashMap<beetswap::QueryId, OneshotResultSender<Vec<u8>, P2pError>>,
752    network_compromised_token: Token,
753    store: Arc<S>,
754}
755
756struct HeaderSubState {
757    known_head: ExtendedHeader,
758    channel: mpsc::Sender<ExtendedHeader>,
759}
760
761impl<B, S> Worker<B, S>
762where
763    B: Blockstore,
764    S: Store,
765{
766    async fn new(
767        args: P2pArgs<B, S>,
768        cancellation_token: CancellationToken,
769        cmd_rx: mpsc::Receiver<P2pCmd>,
770        peer_tracker: PeerTracker,
771    ) -> Result<Self, P2pError> {
772        let mut swarm = SwarmManager::new(
773            &args.network_id,
774            &args.local_keypair,
775            &args.bootnodes,
776            &args.listen_on,
777            peer_tracker,
778            args.event_pub.clone(),
779        )
780        .await?;
781
782        let header_sub_topic = gossipsub_ident_topic(&args.network_id, "/header-sub/v0.0.1");
783        let bad_encoding_fraud_sub_topic =
784            fraudsub_ident_topic(BadEncodingFraudProof::TYPE, &args.network_id);
785        let gossipsub = init_gossipsub(&args, [&header_sub_topic, &bad_encoding_fraud_sub_topic])?;
786
787        let bitswap = init_bitswap(
788            args.blockstore.clone(),
789            args.store.clone(),
790            &args.network_id,
791        )?;
792
793        let header_ex = header_ex::Behaviour::new(header_ex::Config {
794            network_id: &args.network_id,
795            header_store: args.store.clone(),
796        });
797
798        let shr_ex = shrex::Behaviour::new(shrex::Config {
799            network_id: &args.network_id,
800            local_keypair: &args.local_keypair,
801            header_store: args.store.clone(),
802            stream_ctrl: swarm.stream_control(),
803        })?;
804
805        swarm.attach_behaviour(Behaviour {
806            bitswap,
807            gossipsub,
808            header_ex,
809            shr_ex,
810        });
811
812        Ok(Worker {
813            cancellation_token,
814            swarm,
815            cmd_rx,
816            bad_encoding_fraud_sub_topic: bad_encoding_fraud_sub_topic.hash(),
817            header_sub_topic_hash: header_sub_topic.hash(),
818            header_sub_state: None,
819            bitswap_queries: HashMap::new(),
820            network_compromised_token: Token::new(),
821            store: args.store,
822        })
823    }
824
825    async fn run(&mut self) {
826        let mut report_interval = Interval::new(Duration::from_secs(60));
827
828        loop {
829            select! {
830                _ = self.cancellation_token.cancelled() => break,
831                _ = report_interval.tick() => {
832                    self.report();
833                }
834                _ = poll_closed(&mut self.bitswap_queries) => {
835                    self.prune_canceled_bitswap_queries();
836                }
837                res = self.swarm.poll() => {
838                    match res {
839                        Ok(ev) => {
840                            if let Err(e) = self.on_behaviour_event(ev).await {
841                                warn!("Failure while handling behaviour event: {e}");
842                            }
843                        }
844                        Err(e) => warn!("Failure while polling SwarmManager: {e}"),
845                    }
846                }
847                Some(cmd) = self.cmd_rx.recv() => {
848                    if let Err(e) = self.on_cmd(cmd).await {
849                        warn!("Failure while handling command. (error: {e})");
850                    }
851                }
852            }
853        }
854
855        self.swarm.context().behaviour.header_ex.stop();
856        self.swarm.context().behaviour.shr_ex.stop();
857        self.swarm.stop().await;
858    }
859
860    fn prune_canceled_bitswap_queries(&mut self) {
861        let mut cancelled = SmallVec::<[_; 16]>::new();
862
863        for (query_id, chan) in &self.bitswap_queries {
864            if chan.is_closed() {
865                cancelled.push(*query_id);
866            }
867        }
868
869        for query_id in cancelled {
870            self.bitswap_queries.remove(&query_id);
871            self.swarm.context().behaviour.bitswap.cancel(query_id);
872        }
873    }
874
875    async fn on_behaviour_event(&mut self, ev: BehaviourEvent<B, S>) -> Result<()> {
876        match ev {
877            BehaviourEvent::Gossipsub(ev) => self.on_gossip_sub_event(ev).await,
878            BehaviourEvent::Bitswap(ev) => self.on_bitswap_event(ev).await,
879            BehaviourEvent::HeaderEx(ev) => self.on_header_ex_event(ev).await,
880            BehaviourEvent::ShrEx(ev) => self.on_shrex_event(ev).await,
881        }
882
883        Ok(())
884    }
885
886    async fn on_cmd(&mut self, cmd: P2pCmd) -> Result<()> {
887        match cmd {
888            P2pCmd::NetworkInfo { respond_to } => {
889                respond_to.maybe_send(self.swarm.network_info());
890            }
891            P2pCmd::HeaderExRequest {
892                request,
893                respond_to,
894            } => {
895                self.swarm
896                    .context()
897                    .behaviour
898                    .header_ex
899                    .send_request(request, respond_to);
900            }
901            P2pCmd::Listeners { respond_to } => {
902                respond_to.maybe_send(self.swarm.listeners());
903            }
904            P2pCmd::ConnectedPeers { respond_to } => {
905                let peers = self
906                    .swarm
907                    .context()
908                    .peer_tracker
909                    .peers()
910                    .filter_map(|peer| {
911                        if peer.is_connected() {
912                            Some(*peer.id())
913                        } else {
914                            None
915                        }
916                    })
917                    .collect();
918                respond_to.maybe_send(peers);
919            }
920            P2pCmd::InitHeaderSub { head, channel } => {
921                self.on_init_header_sub(*head, channel);
922            }
923            P2pCmd::SetPeerTrust {
924                peer_id,
925                is_trusted,
926            } => {
927                self.swarm.set_peer_trust(&peer_id, is_trusted);
928            }
929            #[cfg(any(test, feature = "test-utils"))]
930            P2pCmd::MarkAsArchival { peer_id } => {
931                self.swarm.mark_as_archival(&peer_id);
932            }
933            P2pCmd::GetShwapCid { cid, respond_to } => {
934                self.on_get_shwap_cid(cid, respond_to);
935            }
936            P2pCmd::GetNetworkCompromisedToken { respond_to } => {
937                respond_to.maybe_send(self.network_compromised_token.clone())
938            }
939            P2pCmd::GetNetworkHead { respond_to } => {
940                let head = self
941                    .header_sub_state
942                    .as_ref()
943                    .map(|state| state.known_head.clone());
944                respond_to.maybe_send(head);
945            }
946            P2pCmd::GetRow {
947                row_index,
948                block_height,
949                respond_to,
950            } => {
951                self.swarm
952                    .context()
953                    .behaviour
954                    .shr_ex
955                    .get_row(block_height, row_index, respond_to)
956                    .await
957            }
958            P2pCmd::GetSample {
959                row_index,
960                column_index,
961                block_height,
962                respond_to,
963            } => {
964                self.swarm
965                    .context()
966                    .behaviour
967                    .shr_ex
968                    .get_sample(block_height, row_index, column_index, respond_to)
969                    .await
970            }
971            P2pCmd::GetNamespaceData {
972                namespace,
973                block_height,
974                respond_to,
975            } => {
976                self.swarm
977                    .context()
978                    .behaviour
979                    .shr_ex
980                    .get_namespace_data(block_height, namespace, respond_to)
981                    .await
982            }
983            P2pCmd::GetEds {
984                block_height,
985                respond_to,
986            } => {
987                self.swarm
988                    .context()
989                    .behaviour
990                    .shr_ex
991                    .get_eds(block_height, respond_to)
992                    .await
993            }
994        }
995
996        Ok(())
997    }
998
999    #[instrument(skip_all)]
1000    fn report(&mut self) {
1001        let tracker_info = self.swarm.context().peer_tracker.info();
1002
1003        info!(
1004            "peers: {}, trusted peers: {}",
1005            tracker_info.num_connected_peers, tracker_info.num_connected_trusted_peers,
1006        );
1007    }
1008
1009    #[instrument(level = "trace", skip(self))]
1010    async fn on_gossip_sub_event(&mut self, ev: gossipsub::Event) {
1011        match ev {
1012            gossipsub::Event::Message {
1013                message,
1014                message_id,
1015                ..
1016            } => {
1017                let Some(peer) = message.source else {
1018                    // Validation mode is `strict` so this will never happen
1019                    return;
1020                };
1021
1022                let acceptance = if message.topic == self.header_sub_topic_hash {
1023                    self.on_header_sub_message(&message.data[..])
1024                } else if message.topic == self.bad_encoding_fraud_sub_topic {
1025                    self.on_bad_encoding_fraud_sub_message(&message.data[..], &peer)
1026                        .await
1027                } else {
1028                    trace!("Unhandled gossipsub message");
1029                    gossipsub::MessageAcceptance::Ignore
1030                };
1031
1032                if !matches!(acceptance, gossipsub::MessageAcceptance::Reject) {
1033                    // We may have discovered a new peer
1034                    self.swarm.peer_maybe_discovered(&peer);
1035                }
1036
1037                let _ = self
1038                    .swarm
1039                    .context()
1040                    .behaviour
1041                    .gossipsub
1042                    .report_message_validation_result(&message_id, &peer, acceptance);
1043            }
1044            _ => trace!("Unhandled gossipsub event"),
1045        }
1046    }
1047
1048    #[instrument(level = "trace", skip_all)]
1049    fn on_get_shwap_cid(&mut self, cid: Cid, respond_to: OneshotResultSender<Vec<u8>, P2pError>) {
1050        trace!("Requesting CID {cid} from bitswap");
1051        let query_id = self.swarm.context().behaviour.bitswap.get(&cid);
1052        self.bitswap_queries.insert(query_id, respond_to);
1053    }
1054
1055    #[instrument(level = "trace", skip(self))]
1056    async fn on_bitswap_event(&mut self, ev: beetswap::Event) {
1057        match ev {
1058            beetswap::Event::GetQueryResponse { query_id, data } => {
1059                if let Some(respond_to) = self.bitswap_queries.remove(&query_id) {
1060                    respond_to.maybe_send_ok(data);
1061                }
1062            }
1063            beetswap::Event::GetQueryError { query_id, error } => {
1064                if let Some(respond_to) = self.bitswap_queries.remove(&query_id) {
1065                    let error: P2pError = error.into();
1066                    respond_to.maybe_send_err(error);
1067                }
1068            }
1069        }
1070    }
1071
1072    #[instrument(level = "trace", skip(self))]
1073    async fn on_header_ex_event(&mut self, ev: header_ex::Event) {
1074        match ev {
1075            header_ex::Event::SchedulePendingRequests => {
1076                let ctx = self.swarm.context();
1077
1078                ctx.behaviour
1079                    .header_ex
1080                    .schedule_pending_requests(ctx.peer_tracker);
1081            }
1082            header_ex::Event::NeedTrustedPeers => {
1083                self.swarm.connect_to_bootnodes();
1084            }
1085            header_ex::Event::NeedArchivalPeers => {
1086                self.swarm.start_archival_node_kad_query();
1087            }
1088        }
1089    }
1090
1091    #[instrument(level = "trace", skip(self))]
1092    async fn on_shrex_event(&mut self, ev: shrex::Event) {
1093        match ev {
1094            shrex::Event::SchedulePendingRequests => {
1095                let ctx = self.swarm.context();
1096
1097                ctx.behaviour
1098                    .shr_ex
1099                    .schedule_pending_requests(ctx.peer_tracker);
1100            }
1101
1102            shrex::Event::AddPeers(peers) => {
1103                let added = peers
1104                    .iter()
1105                    .filter(|peer| self.swarm.peer_maybe_discovered(peer))
1106                    .count();
1107
1108                debug!("Added {added} peers discovered through shrex");
1109            }
1110
1111            shrex::Event::BlockPeers(peers) => {
1112                let blocked = peers
1113                    .into_iter()
1114                    .filter(|peer| self.swarm.blacklist_peer(peer))
1115                    .count();
1116
1117                debug!("Blocked {blocked} peers for shrex missbehaviour");
1118            }
1119        }
1120    }
1121
1122    #[instrument(skip_all, fields(header = %head))]
1123    fn on_init_header_sub(&mut self, head: ExtendedHeader, channel: mpsc::Sender<ExtendedHeader>) {
1124        self.header_sub_state = Some(HeaderSubState {
1125            known_head: head,
1126            channel,
1127        });
1128        trace!("HeaderSub initialized");
1129    }
1130
1131    #[instrument(skip_all)]
1132    fn on_header_sub_message(&mut self, data: &[u8]) -> gossipsub::MessageAcceptance {
1133        let Ok(header) = ExtendedHeader::decode_and_validate(data) else {
1134            trace!("Malformed or invalid header from header-sub");
1135            return gossipsub::MessageAcceptance::Reject;
1136        };
1137
1138        trace!("Received header from header-sub ({header})");
1139
1140        let Some(ref mut state) = self.header_sub_state else {
1141            debug!("header-sub not initialized yet");
1142            return gossipsub::MessageAcceptance::Ignore;
1143        };
1144
1145        if state.known_head.verify(&header).is_err() {
1146            trace!("Failed to verify HeaderSub header. Ignoring {header}");
1147            return gossipsub::MessageAcceptance::Ignore;
1148        }
1149
1150        trace!("New header from header-sub ({header})");
1151
1152        state.known_head = header.clone();
1153        // We intentionally do not `send().await` to avoid blocking `P2p`
1154        // in case `Syncer` enters some weird state.
1155        let _ = state.channel.try_send(header);
1156
1157        gossipsub::MessageAcceptance::Accept
1158    }
1159
1160    #[instrument(skip_all)]
1161    async fn on_bad_encoding_fraud_sub_message(
1162        &mut self,
1163        data: &[u8],
1164        peer: &PeerId,
1165    ) -> gossipsub::MessageAcceptance {
1166        let Ok(befp) = BadEncodingFraudProof::decode(data) else {
1167            trace!("Malformed bad encoding fraud proof from {peer}");
1168            self.swarm
1169                .context()
1170                .behaviour
1171                .gossipsub
1172                .blacklist_peer(peer);
1173            return gossipsub::MessageAcceptance::Reject;
1174        };
1175
1176        let height = befp.height();
1177
1178        let current_height = if let Some(ref header_sub_state) = self.header_sub_state {
1179            header_sub_state.known_head.height()
1180        } else if let Ok(local_head) = self.store.get_head().await {
1181            local_head.height()
1182        } else {
1183            // we aren't tracking the network and have uninitialized store
1184            return gossipsub::MessageAcceptance::Ignore;
1185        };
1186
1187        if height > current_height + FRAUD_PROOF_HEAD_HEIGHT_THRESHOLD {
1188            // does this threshold make any sense if we're gonna ignore it anyway
1189            // since we won't have the header
1190            return gossipsub::MessageAcceptance::Ignore;
1191        }
1192
1193        let hash = befp.header_hash();
1194        let Ok(header) = self.store.get_by_hash(&hash).await else {
1195            // we can't verify the proof without a header
1196            // TODO: should we then store it and wait for the height? celestia doesn't
1197            return gossipsub::MessageAcceptance::Ignore;
1198        };
1199
1200        if let Err(e) = befp.validate(&header) {
1201            trace!("Received invalid bad encoding fraud proof from {peer}: {e}");
1202            self.swarm
1203                .context()
1204                .behaviour
1205                .gossipsub
1206                .blacklist_peer(peer);
1207            return gossipsub::MessageAcceptance::Reject;
1208        }
1209
1210        warn!("Received a valid bad encoding fraud proof");
1211        // trigger cancellation for all services
1212        self.network_compromised_token.trigger();
1213
1214        gossipsub::MessageAcceptance::Accept
1215    }
1216}
1217
1218/// Awaits at least one channel from the `bitswap_queries` to close.
1219async fn poll_closed(
1220    bitswap_queries: &mut HashMap<beetswap::QueryId, OneshotResultSender<Vec<u8>, P2pError>>,
1221) {
1222    poll_fn(|cx| {
1223        for chan in bitswap_queries.values_mut() {
1224            match chan.poll_closed(cx) {
1225                Poll::Pending => continue,
1226                Poll::Ready(_) => return Poll::Ready(()),
1227            }
1228        }
1229
1230        Poll::Pending
1231    })
1232    .await
1233}
1234
1235fn validate_bootnode_addrs(addrs: &[Multiaddr]) -> Result<(), P2pError> {
1236    let mut invalid_addrs = Vec::new();
1237
1238    for addr in addrs {
1239        if addr.peer_id().is_none() {
1240            invalid_addrs.push(addr.to_owned());
1241        }
1242    }
1243
1244    if invalid_addrs.is_empty() {
1245        Ok(())
1246    } else {
1247        Err(P2pError::BootnodeAddrsWithoutPeerId(invalid_addrs))
1248    }
1249}
1250
1251fn init_gossipsub<'a, B, S>(
1252    args: &'a P2pArgs<B, S>,
1253    topics: impl IntoIterator<Item = &'a gossipsub::IdentTopic>,
1254) -> Result<gossipsub::Behaviour>
1255where
1256    B: Blockstore,
1257    S: Store,
1258{
1259    // Set the message authenticity - How we expect to publish messages
1260    // Here we expect the publisher to sign the message with their key.
1261    let message_authenticity = gossipsub::MessageAuthenticity::Signed(args.local_keypair.clone());
1262
1263    let config = gossipsub::ConfigBuilder::default()
1264        .validation_mode(gossipsub::ValidationMode::Strict)
1265        .validate_messages()
1266        .build()
1267        .map_err(|e| P2pError::GossipsubInit(e.to_string()))?;
1268
1269    // build a gossipsub network behaviour
1270    let mut gossipsub: gossipsub::Behaviour =
1271        gossipsub::Behaviour::new(message_authenticity, config)
1272            .map_err(|e| P2pError::GossipsubInit(e.to_string()))?;
1273
1274    for topic in topics {
1275        gossipsub
1276            .subscribe(topic)
1277            .map_err(|e| P2pError::GossipsubInit(e.to_string()))?;
1278    }
1279
1280    Ok(gossipsub)
1281}
1282
1283fn init_bitswap<B, S>(
1284    blockstore: Arc<B>,
1285    store: Arc<S>,
1286    network_id: &str,
1287) -> Result<beetswap::Behaviour<MAX_MH_SIZE, B>>
1288where
1289    B: Blockstore + 'static,
1290    S: Store + 'static,
1291{
1292    let protocol_prefix = celestia_protocol_id(network_id, "shwap");
1293
1294    Ok(beetswap::Behaviour::builder(blockstore)
1295        .protocol_prefix(protocol_prefix.as_ref())?
1296        .register_multihasher(ShwapMultihasher::new(store))
1297        .client_set_send_dont_have(false)
1298        .build())
1299}