lumina_node/
p2p.rs

1//! Component responsible for the messaging and interacting with other nodes on the peer to peer layer.
2//!
3//! It is a high level integration of various p2p protocols used by Celestia nodes.
4//! Currently supporting:
5//! - libp2p-identify
6//! - libp2p-kad
7//! - libp2p-autonat
8//! - libp2p-ping
9//! - header-sub topic on libp2p-gossipsub
10//! - fraud-sub topic on libp2p-gossipsub
11//! - header-ex client
12//! - header-ex server
13//! - bitswap 1.2.0
14//! - shwap - celestia's data availability protocol on top of bitswap
15
16use std::collections::HashMap;
17use std::future::poll_fn;
18use std::sync::Arc;
19use std::task::Poll;
20use std::time::Duration;
21
22use blockstore::Blockstore;
23use blockstore::block::CidError;
24use celestia_proto::p2p::pb::{HeaderRequest, header_request};
25use celestia_types::fraud_proof::BadEncodingFraudProof;
26use celestia_types::hash::Hash;
27use celestia_types::namespace_data::NamespaceData;
28use celestia_types::nmt::Namespace;
29use celestia_types::row::{Row, RowId};
30use celestia_types::row_namespace_data::{RowNamespaceData, RowNamespaceDataId};
31use celestia_types::sample::{Sample, SampleId};
32use celestia_types::{Blob, ExtendedDataSquare, ExtendedHeader, FraudProof};
33use cid::Cid;
34use libp2p::gossipsub::TopicHash;
35use libp2p::identity::Keypair;
36use libp2p::swarm::{NetworkBehaviour, NetworkInfo};
37use libp2p::{Multiaddr, PeerId, gossipsub};
38use lumina_utils::executor::{JoinHandle, spawn};
39use lumina_utils::time::{self, Interval};
40use lumina_utils::token::Token;
41use smallvec::SmallVec;
42use tendermint_proto::Protobuf;
43use tokio::select;
44use tokio::sync::{mpsc, oneshot, watch};
45use tokio_util::sync::CancellationToken;
46use tracing::{debug, info, instrument, trace, warn};
47
48mod connection_control;
49mod header_ex;
50pub(crate) mod header_session;
51mod shrex;
52pub(crate) mod shwap;
53mod swarm;
54mod swarm_manager;
55mod utils;
56
57use crate::block_ranges::BlockRange;
58use crate::events::EventPublisher;
59use crate::p2p::header_session::HeaderSession;
60use crate::p2p::shwap::{ShwapMultihasher, convert_cid, get_block_container};
61use crate::p2p::swarm_manager::SwarmManager;
62use crate::peer_tracker::PeerTracker;
63use crate::peer_tracker::PeerTrackerInfo;
64use crate::store::{Store, StoreError};
65use crate::utils::{
66    MultiaddrExt, OneshotResultSender, OneshotResultSenderExt, OneshotSenderExt,
67    celestia_protocol_id, fraudsub_ident_topic, gossipsub_ident_topic,
68};
69
70pub use crate::p2p::header_ex::HeaderExError;
71pub use crate::p2p::shrex::ShrExError;
72
73// Maximum size of a [`Multihash`].
74pub(crate) const MAX_MH_SIZE: usize = 64;
75
76// all fraud proofs for height bigger than head height by this threshold
77// will be ignored
78const FRAUD_PROOF_HEAD_HEIGHT_THRESHOLD: u64 = 20;
79
80pub(crate) type Result<T, E = P2pError> = std::result::Result<T, E>;
81
82/// Representation of all the errors that can occur in `P2p` component.
83#[derive(Debug, thiserror::Error)]
84pub enum P2pError {
85    /// Failed to initialize gossipsub behaviour.
86    #[error("Failed to initialize gossipsub behaviour: {0}")]
87    GossipsubInit(String),
88
89    /// Failed to initialize TLS.
90    #[error("Failed to initialize TLS: {0}")]
91    TlsInit(String),
92
93    /// Failed to initialize noise protocol.
94    #[error("Failed to initialize noise: {0}")]
95    NoiseInit(String),
96
97    /// The worker has died.
98    #[error("Worker died")]
99    WorkerDied,
100
101    /// Channel closed unexpectedly.
102    #[error("Channel closed unexpectedly")]
103    ChannelClosedUnexpectedly,
104
105    /// An error propagated from the `header-ex`.
106    #[error("HeaderEx: {0}")]
107    HeaderEx(#[from] HeaderExError),
108
109    /// An error propagated from the `shr-ex`.
110    #[error("ShrEx: {0}")]
111    ShrEx(#[from] ShrExError),
112
113    /// Bootnode address is missing its peer ID.
114    #[error("Bootnode multiaddrs without peer ID: {0:?}")]
115    BootnodeAddrsWithoutPeerId(Vec<Multiaddr>),
116
117    /// An error propagated from [`beetswap::Behaviour`].
118    #[error("Bitswap: {0}")]
119    Bitswap(#[from] beetswap::Error),
120
121    /// Protobuf message failed to be decoded.
122    #[error("ProtoBuf decoding error: {0}")]
123    ProtoDecodeFailed(#[from] tendermint_proto::Error),
124
125    /// An error propagated from [`celestia_types`] that is related to [`Cid`].
126    #[error("CID error: {0}")]
127    Cid(CidError),
128
129    /// Request timed out.
130    #[error("Request timed out")]
131    RequestTimedOut,
132
133    /// Shwap protocol error.
134    #[error("Shwap: {0}")]
135    Shwap(String),
136
137    /// An error propagated from [`celestia_types`].
138    #[error(transparent)]
139    CelestiaTypes(#[from] celestia_types::Error),
140
141    /// An error propagated from [`Store`].
142    #[error("Store error: {0}")]
143    Store(#[from] StoreError),
144
145    /// Header was pruned.
146    #[error("Header of {0} block was pruned because it is outside of retention period")]
147    HeaderPruned(u64),
148
149    /// Header not synced yet.
150    #[error("Header of {0} block is not synced yet")]
151    HeaderNotSynced(u64),
152}
153
154impl P2pError {
155    /// Returns `true` if an error is fatal in all possible scenarios.
156    ///
157    /// If unsure mark it as non-fatal error.
158    pub(crate) fn is_fatal(&self) -> bool {
159        match self {
160            P2pError::GossipsubInit(_)
161            | P2pError::NoiseInit(_)
162            | P2pError::TlsInit(_)
163            | P2pError::WorkerDied
164            | P2pError::ChannelClosedUnexpectedly
165            | P2pError::BootnodeAddrsWithoutPeerId(_) => true,
166            P2pError::HeaderEx(_)
167            | P2pError::ShrEx(_)
168            | P2pError::Bitswap(_)
169            | P2pError::ProtoDecodeFailed(_)
170            | P2pError::Cid(_)
171            | P2pError::RequestTimedOut
172            | P2pError::Shwap(_)
173            | P2pError::CelestiaTypes(_)
174            | P2pError::HeaderPruned(_)
175            | P2pError::HeaderNotSynced(_) => false,
176            P2pError::Store(e) => e.is_fatal(),
177        }
178    }
179}
180
181impl From<oneshot::error::RecvError> for P2pError {
182    fn from(_value: oneshot::error::RecvError) -> Self {
183        P2pError::ChannelClosedUnexpectedly
184    }
185}
186
187impl From<prost::DecodeError> for P2pError {
188    fn from(value: prost::DecodeError) -> Self {
189        P2pError::ProtoDecodeFailed(tendermint_proto::Error::decode_message(value))
190    }
191}
192
193impl From<cid::Error> for P2pError {
194    fn from(value: cid::Error) -> Self {
195        P2pError::Cid(CidError::InvalidCid(value.to_string()))
196    }
197}
198
199/// Component responsible for the peer to peer networking handling.
200#[derive(Debug)]
201pub(crate) struct P2p {
202    cancellation_token: CancellationToken,
203    cmd_tx: mpsc::Sender<P2pCmd>,
204    join_handle: JoinHandle,
205    peer_tracker_info_watcher: watch::Receiver<PeerTrackerInfo>,
206    local_peer_id: PeerId,
207}
208
209/// Arguments used to configure the [`P2p`].
210pub struct P2pArgs<B, S>
211where
212    B: Blockstore,
213    S: Store,
214{
215    /// An id of the network to connect to.
216    pub network_id: String,
217    /// The keypair to be used as the identity.
218    pub local_keypair: Keypair,
219    /// List of bootstrap nodes to connect to and trust.
220    pub bootnodes: Vec<Multiaddr>,
221    /// List of the addresses on which to listen for incoming connections.
222    pub listen_on: Vec<Multiaddr>,
223    /// The store for headers.
224    pub blockstore: Arc<B>,
225    /// The store for headers.
226    pub store: Arc<S>,
227    /// Event publisher.
228    pub event_pub: EventPublisher,
229}
230
231#[derive(Debug)]
232pub(crate) enum P2pCmd {
233    NetworkInfo {
234        respond_to: oneshot::Sender<NetworkInfo>,
235    },
236    HeaderExRequest {
237        request: HeaderRequest,
238        respond_to: OneshotResultSender<Vec<ExtendedHeader>, P2pError>,
239    },
240    Listeners {
241        respond_to: oneshot::Sender<Vec<Multiaddr>>,
242    },
243    ConnectedPeers {
244        respond_to: oneshot::Sender<Vec<PeerId>>,
245    },
246    InitHeaderSub {
247        head: Box<ExtendedHeader>,
248        /// Any valid headers received by header-sub will be send to this channel.
249        channel: mpsc::Sender<ExtendedHeader>,
250    },
251    SetPeerTrust {
252        peer_id: PeerId,
253        is_trusted: bool,
254    },
255    #[cfg(any(test, feature = "test-utils"))]
256    MarkAsArchival {
257        peer_id: PeerId,
258    },
259    GetShwapCid {
260        cid: Cid,
261        respond_to: OneshotResultSender<Vec<u8>, P2pError>,
262    },
263    GetNetworkCompromisedToken {
264        respond_to: oneshot::Sender<Token>,
265    },
266    GetNetworkHead {
267        respond_to: oneshot::Sender<Option<ExtendedHeader>>,
268    },
269    // This is dead code because `get_row` still uses Bitswap.
270    // We can use this when celestia-node#4288 is merged.
271    #[allow(dead_code)]
272    GetRow {
273        row_index: u16,
274        block_height: u64,
275        respond_to: OneshotResultSender<Row, P2pError>,
276    },
277    // This is dead code because `get_sample` still uses Bitswap.
278    // We can use this when celestia-node#4288 is merged.
279    #[allow(dead_code)]
280    GetSample {
281        row_index: u16,
282        column_index: u16,
283        block_height: u64,
284        respond_to: OneshotResultSender<Sample, P2pError>,
285    },
286    GetNamespaceData {
287        namespace: Namespace,
288        block_height: u64,
289        respond_to: OneshotResultSender<NamespaceData, P2pError>,
290    },
291    GetEds {
292        block_height: u64,
293        respond_to: OneshotResultSender<ExtendedDataSquare, P2pError>,
294    },
295}
296
297impl P2p {
298    /// Creates and starts a new p2p handler.
299    pub async fn start<B, S>(args: P2pArgs<B, S>) -> Result<Self>
300    where
301        B: Blockstore + 'static,
302        S: Store + 'static,
303    {
304        validate_bootnode_addrs(&args.bootnodes)?;
305
306        let local_peer_id = PeerId::from(args.local_keypair.public());
307
308        let peer_tracker = PeerTracker::new(args.event_pub.clone());
309        let peer_tracker_info_watcher = peer_tracker.info_watcher();
310
311        let cancellation_token = CancellationToken::new();
312        let (cmd_tx, cmd_rx) = mpsc::channel(16);
313
314        let mut worker =
315            Worker::new(args, cancellation_token.child_token(), cmd_rx, peer_tracker).await?;
316
317        let join_handle = spawn(async move {
318            worker.run().await;
319        });
320
321        Ok(P2p {
322            cancellation_token,
323            cmd_tx,
324            join_handle,
325            peer_tracker_info_watcher,
326            local_peer_id,
327        })
328    }
329
330    /// Creates and starts a new mocked p2p handler.
331    #[cfg(test)]
332    pub fn mocked() -> (Self, crate::test_utils::MockP2pHandle) {
333        let (cmd_tx, cmd_rx) = mpsc::channel(16);
334        let (peer_tracker_tx, peer_tracker_rx) = watch::channel(PeerTrackerInfo::default());
335        let cancellation_token = CancellationToken::new();
336
337        // Just a fake join_handle
338        let join_handle = spawn(async {});
339
340        let p2p = P2p {
341            cmd_tx: cmd_tx.clone(),
342            cancellation_token,
343            join_handle,
344            peer_tracker_info_watcher: peer_tracker_rx,
345            local_peer_id: PeerId::random(),
346        };
347
348        let handle = crate::test_utils::MockP2pHandle {
349            cmd_tx,
350            cmd_rx,
351            header_sub_tx: None,
352            peer_tracker_tx,
353        };
354
355        (p2p, handle)
356    }
357
358    /// Stop the worker.
359    pub fn stop(&self) {
360        // Signal the Worker to stop.
361        self.cancellation_token.cancel();
362    }
363
364    /// Wait until worker is completely stopped.
365    pub async fn join(&self) {
366        self.join_handle.join().await;
367    }
368
369    /// Local peer ID on the p2p network.
370    pub fn local_peer_id(&self) -> &PeerId {
371        &self.local_peer_id
372    }
373
374    async fn send_command(&self, cmd: P2pCmd) -> Result<()> {
375        self.cmd_tx
376            .send(cmd)
377            .await
378            .map_err(|_| P2pError::WorkerDied)
379    }
380
381    /// Watcher for the current [`PeerTrackerInfo`].
382    pub fn peer_tracker_info_watcher(&self) -> watch::Receiver<PeerTrackerInfo> {
383        self.peer_tracker_info_watcher.clone()
384    }
385
386    /// A reference to the current [`PeerTrackerInfo`].
387    pub fn peer_tracker_info(&self) -> watch::Ref<'_, PeerTrackerInfo> {
388        self.peer_tracker_info_watcher.borrow()
389    }
390
391    /// Initializes `header-sub` protocol with a given `subjective_head`.
392    pub async fn init_header_sub(
393        &self,
394        head: ExtendedHeader,
395        channel: mpsc::Sender<ExtendedHeader>,
396    ) -> Result<()> {
397        self.send_command(P2pCmd::InitHeaderSub {
398            head: Box::new(head),
399            channel,
400        })
401        .await
402    }
403
404    /// Wait until the node is connected to any peer.
405    pub async fn wait_connected(&self) -> Result<()> {
406        self.peer_tracker_info_watcher()
407            .wait_for(|info| info.num_connected_peers > 0)
408            .await
409            .map(drop)
410            .map_err(|_| P2pError::WorkerDied)
411    }
412
413    /// Wait until the node is connected to any trusted peer.
414    pub async fn wait_connected_trusted(&self) -> Result<()> {
415        self.peer_tracker_info_watcher()
416            .wait_for(|info| info.num_connected_trusted_peers > 0)
417            .await
418            .map(drop)
419            .map_err(|_| P2pError::WorkerDied)
420    }
421
422    /// Get current [`NetworkInfo`].
423    pub async fn network_info(&self) -> Result<NetworkInfo> {
424        let (tx, rx) = oneshot::channel();
425
426        self.send_command(P2pCmd::NetworkInfo { respond_to: tx })
427            .await?;
428
429        Ok(rx.await?)
430    }
431
432    /// Send a request on the `header-ex` protocol.
433    pub async fn header_ex_request(&self, request: HeaderRequest) -> Result<Vec<ExtendedHeader>> {
434        let (tx, rx) = oneshot::channel();
435
436        self.send_command(P2pCmd::HeaderExRequest {
437            request,
438            respond_to: tx,
439        })
440        .await?;
441
442        rx.await?
443    }
444
445    /// Request the head header on the `header-ex` protocol.
446    pub async fn get_head_header(&self) -> Result<ExtendedHeader> {
447        self.get_header_by_height(0).await
448    }
449
450    /// Request the header by hash on the `header-ex` protocol.
451    pub async fn get_header(&self, hash: Hash) -> Result<ExtendedHeader> {
452        self.header_ex_request(HeaderRequest {
453            data: Some(header_request::Data::Hash(hash.as_bytes().to_vec())),
454            amount: 1,
455        })
456        .await?
457        .into_iter()
458        .next()
459        .ok_or(HeaderExError::HeaderNotFound.into())
460    }
461
462    /// Request the header by height on the `header-ex` protocol.
463    pub async fn get_header_by_height(&self, height: u64) -> Result<ExtendedHeader> {
464        self.header_ex_request(HeaderRequest {
465            data: Some(header_request::Data::Origin(height)),
466            amount: 1,
467        })
468        .await?
469        .into_iter()
470        .next()
471        .ok_or(HeaderExError::HeaderNotFound.into())
472    }
473
474    /// Request the headers following the one given with the `header-ex` protocol.
475    ///
476    /// First header from the requested range will be verified against the provided one,
477    /// then each subsequent is verified against the previous one.
478    pub async fn get_verified_headers_range(
479        &self,
480        from: &ExtendedHeader,
481        amount: u64,
482    ) -> Result<Vec<ExtendedHeader>> {
483        // User can give us a bad header, so validate it.
484        from.validate().map_err(|_| HeaderExError::InvalidRequest)?;
485
486        let height = from.height() + 1;
487
488        let range = height..=height + amount - 1;
489
490        let mut session = HeaderSession::new(range, self.cmd_tx.clone());
491        let headers = session.run().await?;
492
493        // `.validate()` is called on each header separately by `HeaderExClientHandler`.
494        //
495        // The last step is to verify that all headers are from the same chain
496        // and indeed connected with the next one.
497        from.verify_adjacent_range(&headers)
498            .map_err(|_| HeaderExError::InvalidResponse)?;
499
500        Ok(headers)
501    }
502
503    /// Request a list of ranges with the `header-ex` protocol
504    ///
505    /// For each of the ranges, headers are verified against each other, but it's the caller
506    /// responsibility to verify range edges against headers existing in the store.
507    pub(crate) async fn get_unverified_header_range(
508        &self,
509        range: BlockRange,
510    ) -> Result<Vec<ExtendedHeader>> {
511        if range.is_empty() {
512            return Err(HeaderExError::InvalidRequest.into());
513        }
514
515        let mut session = HeaderSession::new(range, self.cmd_tx.clone());
516        let headers = session.run().await?;
517
518        let Some(head) = headers.first() else {
519            return Err(HeaderExError::InvalidResponse.into());
520        };
521
522        head.verify_adjacent_range(&headers[1..])
523            .map_err(|_| HeaderExError::InvalidResponse)?;
524
525        Ok(headers)
526    }
527
528    /// Request a [`Cid`] on bitswap protocol.
529    pub(crate) async fn get_shwap_cid(
530        &self,
531        cid: Cid,
532        timeout: Option<Duration>,
533    ) -> Result<Vec<u8>> {
534        let (tx, rx) = oneshot::channel();
535
536        self.send_command(P2pCmd::GetShwapCid {
537            cid,
538            respond_to: tx,
539        })
540        .await?;
541
542        let data = match timeout {
543            Some(dur) => time::timeout(dur, rx)
544                .await
545                .map_err(|_| P2pError::RequestTimedOut)???,
546            None => rx.await??,
547        };
548
549        get_block_container(&cid, &data)
550    }
551
552    /// Request a [`Row`] on bitswap protocol.
553    pub async fn get_row(
554        &self,
555        row_index: u16,
556        block_height: u64,
557        timeout: Option<Duration>,
558    ) -> Result<Row> {
559        let id = RowId::new(row_index, block_height)?;
560        let cid = convert_cid(&id.into())?;
561
562        let data = self.get_shwap_cid(cid, timeout).await?;
563        let row = Row::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?;
564        Ok(row)
565    }
566
567    /// Request a [`Sample`] on bitswap protocol.
568    pub async fn get_sample(
569        &self,
570        row_index: u16,
571        column_index: u16,
572        block_height: u64,
573        timeout: Option<Duration>,
574    ) -> Result<Sample> {
575        let id = SampleId::new(row_index, column_index, block_height)?;
576        let cid = convert_cid(&id.into())?;
577
578        let data = self.get_shwap_cid(cid, timeout).await?;
579        let sample = Sample::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?;
580        Ok(sample)
581    }
582
583    pub async fn get_eds(
584        &self,
585        block_height: u64,
586        timeout: Option<Duration>,
587    ) -> Result<ExtendedDataSquare> {
588        let (tx, rx) = oneshot::channel();
589
590        self.send_command(P2pCmd::GetEds {
591            block_height,
592            respond_to: tx,
593        })
594        .await?;
595
596        match timeout {
597            Some(dur) => time::timeout(dur, rx)
598                .await
599                .map_err(|_| P2pError::RequestTimedOut)??,
600            None => rx.await?,
601        }
602    }
603
604    /// Request a [`RowNamespaceData`] on bitswap protocol.
605    pub async fn get_row_namespace_data(
606        &self,
607        namespace: Namespace,
608        row_index: u16,
609        block_height: u64,
610        timeout: Option<Duration>,
611    ) -> Result<RowNamespaceData> {
612        let id = RowNamespaceDataId::new(namespace, row_index, block_height)?;
613        let cid = convert_cid(&id.into())?;
614
615        let data = self.get_shwap_cid(cid, timeout).await?;
616        let row_namespace_data =
617            RowNamespaceData::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?;
618        Ok(row_namespace_data)
619    }
620
621    pub async fn get_namespace_data(
622        &self,
623        namespace: Namespace,
624        block_height: u64,
625        timeout: Option<Duration>,
626    ) -> Result<NamespaceData> {
627        let (tx, rx) = oneshot::channel();
628
629        self.send_command(P2pCmd::GetNamespaceData {
630            namespace,
631            block_height,
632            respond_to: tx,
633        })
634        .await?;
635
636        match timeout {
637            Some(dur) => time::timeout(dur, rx)
638                .await
639                .map_err(|_| P2pError::RequestTimedOut)??,
640            None => rx.await?,
641        }
642    }
643
644    /// Request all blobs with provided namespace in the block corresponding to this header
645    /// using bitswap protocol.
646    pub async fn get_all_blobs<S>(
647        &self,
648        namespace: Namespace,
649        block_height: u64,
650        timeout: Option<Duration>,
651        store: &S,
652    ) -> Result<Vec<Blob>>
653    where
654        S: Store,
655    {
656        // TODO: Figure out app_version based on network id and height instead
657        // of retrieving the header.
658        let app_version = match store.get_by_height(block_height).await {
659            Ok(header) => header.app_version(),
660            Err(StoreError::NotFound) => {
661                let pruned_ranges = store.get_pruned_ranges().await?;
662
663                if pruned_ranges.contains(block_height) {
664                    return Err(P2pError::HeaderPruned(block_height));
665                } else {
666                    return Err(P2pError::HeaderNotSynced(block_height));
667                }
668            }
669            Err(e) => return Err(e.into()),
670        };
671
672        let namespace_data = self
673            .get_namespace_data(namespace, block_height, timeout)
674            .await?;
675
676        let shares = namespace_data
677            .rows()
678            .iter()
679            .flat_map(|row| row.shares.iter());
680
681        Ok(Blob::reconstruct_all(shares, app_version)?)
682    }
683
684    /// Get the addresses where [`P2p`] listens on for incoming connections.
685    pub async fn listeners(&self) -> Result<Vec<Multiaddr>> {
686        let (tx, rx) = oneshot::channel();
687
688        self.send_command(P2pCmd::Listeners { respond_to: tx })
689            .await?;
690
691        Ok(rx.await?)
692    }
693
694    /// Get the list of connected peers.
695    pub async fn connected_peers(&self) -> Result<Vec<PeerId>> {
696        let (tx, rx) = oneshot::channel();
697
698        self.send_command(P2pCmd::ConnectedPeers { respond_to: tx })
699            .await?;
700
701        Ok(rx.await?)
702    }
703
704    /// Alter the trust status for a given peer.
705    pub async fn set_peer_trust(&self, peer_id: PeerId, is_trusted: bool) -> Result<()> {
706        self.send_command(P2pCmd::SetPeerTrust {
707            peer_id,
708            is_trusted,
709        })
710        .await
711    }
712
713    #[cfg(any(test, feature = "test-utils"))]
714    pub(crate) async fn mark_as_archival(&self, peer_id: PeerId) -> Result<()> {
715        self.send_command(P2pCmd::MarkAsArchival { peer_id }).await
716    }
717
718    /// Get the cancellation token which will be cancelled when the network gets compromised.
719    ///
720    /// After this token is cancelled, the network should be treated as insincere
721    /// and should not be trusted.
722    pub(crate) async fn get_network_compromised_token(&self) -> Result<Token> {
723        let (tx, rx) = oneshot::channel();
724
725        self.send_command(P2pCmd::GetNetworkCompromisedToken { respond_to: tx })
726            .await?;
727
728        Ok(rx.await?)
729    }
730
731    /// Get the latest header announced on the network.
732    pub async fn get_network_head(&self) -> Result<Option<ExtendedHeader>> {
733        let (tx, rx) = oneshot::channel();
734
735        self.send_command(P2pCmd::GetNetworkHead { respond_to: tx })
736            .await?;
737
738        Ok(rx.await?)
739    }
740}
741
742impl Drop for P2p {
743    fn drop(&mut self) {
744        self.stop();
745    }
746}
747
748#[derive(NetworkBehaviour)]
749struct Behaviour<B, S>
750where
751    B: Blockstore + 'static,
752    S: Store + 'static,
753{
754    bitswap: beetswap::Behaviour<MAX_MH_SIZE, B>,
755    header_ex: header_ex::Behaviour<S>,
756    shr_ex: shrex::Behaviour<S>,
757    gossipsub: gossipsub::Behaviour,
758}
759
760struct Worker<B, S>
761where
762    B: Blockstore + 'static,
763    S: Store + 'static,
764{
765    cancellation_token: CancellationToken,
766    swarm: SwarmManager<Behaviour<B, S>>,
767    header_sub_topic_hash: TopicHash,
768    bad_encoding_fraud_sub_topic: TopicHash,
769    cmd_rx: mpsc::Receiver<P2pCmd>,
770    header_sub_state: Option<HeaderSubState>,
771    bitswap_queries: HashMap<beetswap::QueryId, OneshotResultSender<Vec<u8>, P2pError>>,
772    network_compromised_token: Token,
773    store: Arc<S>,
774}
775
776struct HeaderSubState {
777    known_head: ExtendedHeader,
778    channel: mpsc::Sender<ExtendedHeader>,
779}
780
781impl<B, S> Worker<B, S>
782where
783    B: Blockstore,
784    S: Store,
785{
786    async fn new(
787        args: P2pArgs<B, S>,
788        cancellation_token: CancellationToken,
789        cmd_rx: mpsc::Receiver<P2pCmd>,
790        peer_tracker: PeerTracker,
791    ) -> Result<Self, P2pError> {
792        let mut swarm = SwarmManager::new(
793            &args.network_id,
794            &args.local_keypair,
795            &args.bootnodes,
796            &args.listen_on,
797            peer_tracker,
798            args.event_pub.clone(),
799        )
800        .await?;
801
802        let header_sub_topic = gossipsub_ident_topic(&args.network_id, "/header-sub/v0.0.1");
803        let bad_encoding_fraud_sub_topic =
804            fraudsub_ident_topic(BadEncodingFraudProof::TYPE, &args.network_id);
805        let gossipsub = init_gossipsub(&args, [&header_sub_topic, &bad_encoding_fraud_sub_topic])?;
806
807        let bitswap = init_bitswap(
808            args.blockstore.clone(),
809            args.store.clone(),
810            &args.network_id,
811        )?;
812
813        let header_ex = header_ex::Behaviour::new(header_ex::Config {
814            network_id: &args.network_id,
815            header_store: args.store.clone(),
816        });
817
818        let shr_ex = shrex::Behaviour::new(shrex::Config {
819            network_id: &args.network_id,
820            local_keypair: &args.local_keypair,
821            header_store: args.store.clone(),
822            stream_ctrl: swarm.stream_control(),
823        })?;
824
825        swarm.attach_behaviour(Behaviour {
826            bitswap,
827            gossipsub,
828            header_ex,
829            shr_ex,
830        });
831
832        Ok(Worker {
833            cancellation_token,
834            swarm,
835            cmd_rx,
836            bad_encoding_fraud_sub_topic: bad_encoding_fraud_sub_topic.hash(),
837            header_sub_topic_hash: header_sub_topic.hash(),
838            header_sub_state: None,
839            bitswap_queries: HashMap::new(),
840            network_compromised_token: Token::new(),
841            store: args.store,
842        })
843    }
844
845    async fn run(&mut self) {
846        let mut report_interval = Interval::new(Duration::from_secs(60));
847
848        loop {
849            select! {
850                _ = self.cancellation_token.cancelled() => break,
851                _ = report_interval.tick() => {
852                    self.report();
853                }
854                _ = poll_closed(&mut self.bitswap_queries) => {
855                    self.prune_canceled_bitswap_queries();
856                }
857                res = self.swarm.poll() => {
858                    match res {
859                        Ok(ev) => {
860                            if let Err(e) = self.on_behaviour_event(ev).await {
861                                warn!("Failure while handling behaviour event: {e}");
862                            }
863                        }
864                        Err(e) => warn!("Failure while polling SwarmManager: {e}"),
865                    }
866                }
867                Some(cmd) = self.cmd_rx.recv() => {
868                    if let Err(e) = self.on_cmd(cmd).await {
869                        warn!("Failure while handling command. (error: {e})");
870                    }
871                }
872            }
873        }
874
875        self.swarm.context().behaviour.header_ex.stop();
876        self.swarm.context().behaviour.shr_ex.stop();
877        self.swarm.stop().await;
878    }
879
880    fn prune_canceled_bitswap_queries(&mut self) {
881        let mut cancelled = SmallVec::<[_; 16]>::new();
882
883        for (query_id, chan) in &self.bitswap_queries {
884            if chan.is_closed() {
885                cancelled.push(*query_id);
886            }
887        }
888
889        for query_id in cancelled {
890            self.bitswap_queries.remove(&query_id);
891            self.swarm.context().behaviour.bitswap.cancel(query_id);
892        }
893    }
894
895    async fn on_behaviour_event(&mut self, ev: BehaviourEvent<B, S>) -> Result<()> {
896        match ev {
897            BehaviourEvent::Gossipsub(ev) => self.on_gossip_sub_event(ev).await,
898            BehaviourEvent::Bitswap(ev) => self.on_bitswap_event(ev).await,
899            BehaviourEvent::HeaderEx(ev) => self.on_header_ex_event(ev).await,
900            BehaviourEvent::ShrEx(ev) => self.on_shrex_event(ev).await,
901        }
902
903        Ok(())
904    }
905
906    async fn on_cmd(&mut self, cmd: P2pCmd) -> Result<()> {
907        match cmd {
908            P2pCmd::NetworkInfo { respond_to } => {
909                respond_to.maybe_send(self.swarm.network_info());
910            }
911            P2pCmd::HeaderExRequest {
912                request,
913                respond_to,
914            } => {
915                self.swarm
916                    .context()
917                    .behaviour
918                    .header_ex
919                    .send_request(request, respond_to);
920            }
921            P2pCmd::Listeners { respond_to } => {
922                respond_to.maybe_send(self.swarm.listeners());
923            }
924            P2pCmd::ConnectedPeers { respond_to } => {
925                let peers = self
926                    .swarm
927                    .context()
928                    .peer_tracker
929                    .peers()
930                    .filter_map(|peer| {
931                        if peer.is_connected() {
932                            Some(*peer.id())
933                        } else {
934                            None
935                        }
936                    })
937                    .collect();
938                respond_to.maybe_send(peers);
939            }
940            P2pCmd::InitHeaderSub { head, channel } => {
941                self.on_init_header_sub(*head, channel);
942            }
943            P2pCmd::SetPeerTrust {
944                peer_id,
945                is_trusted,
946            } => {
947                self.swarm.set_peer_trust(&peer_id, is_trusted);
948            }
949            #[cfg(any(test, feature = "test-utils"))]
950            P2pCmd::MarkAsArchival { peer_id } => {
951                self.swarm.mark_as_archival(&peer_id);
952            }
953            P2pCmd::GetShwapCid { cid, respond_to } => {
954                self.on_get_shwap_cid(cid, respond_to);
955            }
956            P2pCmd::GetNetworkCompromisedToken { respond_to } => {
957                respond_to.maybe_send(self.network_compromised_token.clone())
958            }
959            P2pCmd::GetNetworkHead { respond_to } => {
960                let head = self
961                    .header_sub_state
962                    .as_ref()
963                    .map(|state| state.known_head.clone());
964                respond_to.maybe_send(head);
965            }
966            P2pCmd::GetRow {
967                row_index,
968                block_height,
969                respond_to,
970            } => {
971                self.swarm
972                    .context()
973                    .behaviour
974                    .shr_ex
975                    .get_row(block_height, row_index, respond_to)
976                    .await
977            }
978            P2pCmd::GetSample {
979                row_index,
980                column_index,
981                block_height,
982                respond_to,
983            } => {
984                self.swarm
985                    .context()
986                    .behaviour
987                    .shr_ex
988                    .get_sample(block_height, row_index, column_index, respond_to)
989                    .await
990            }
991            P2pCmd::GetNamespaceData {
992                namespace,
993                block_height,
994                respond_to,
995            } => {
996                self.swarm
997                    .context()
998                    .behaviour
999                    .shr_ex
1000                    .get_namespace_data(block_height, namespace, respond_to)
1001                    .await
1002            }
1003            P2pCmd::GetEds {
1004                block_height,
1005                respond_to,
1006            } => {
1007                self.swarm
1008                    .context()
1009                    .behaviour
1010                    .shr_ex
1011                    .get_eds(block_height, respond_to)
1012                    .await
1013            }
1014        }
1015
1016        Ok(())
1017    }
1018
1019    #[instrument(skip_all)]
1020    fn report(&mut self) {
1021        let tracker_info = self.swarm.context().peer_tracker.info();
1022
1023        info!(
1024            "peers: {}, trusted peers: {}",
1025            tracker_info.num_connected_peers, tracker_info.num_connected_trusted_peers,
1026        );
1027    }
1028
1029    #[instrument(level = "trace", skip(self))]
1030    async fn on_gossip_sub_event(&mut self, ev: gossipsub::Event) {
1031        match ev {
1032            gossipsub::Event::Message {
1033                message,
1034                message_id,
1035                ..
1036            } => {
1037                let Some(peer) = message.source else {
1038                    // Validation mode is `strict` so this will never happen
1039                    return;
1040                };
1041
1042                let acceptance = if message.topic == self.header_sub_topic_hash {
1043                    self.on_header_sub_message(&message.data[..])
1044                } else if message.topic == self.bad_encoding_fraud_sub_topic {
1045                    self.on_bad_encoding_fraud_sub_message(&message.data[..], &peer)
1046                        .await
1047                } else {
1048                    trace!("Unhandled gossipsub message");
1049                    gossipsub::MessageAcceptance::Ignore
1050                };
1051
1052                if !matches!(acceptance, gossipsub::MessageAcceptance::Reject) {
1053                    // We may have discovered a new peer
1054                    self.swarm.peer_maybe_discovered(&peer);
1055                }
1056
1057                let _ = self
1058                    .swarm
1059                    .context()
1060                    .behaviour
1061                    .gossipsub
1062                    .report_message_validation_result(&message_id, &peer, acceptance);
1063            }
1064            _ => trace!("Unhandled gossipsub event"),
1065        }
1066    }
1067
1068    #[instrument(level = "trace", skip_all)]
1069    fn on_get_shwap_cid(&mut self, cid: Cid, respond_to: OneshotResultSender<Vec<u8>, P2pError>) {
1070        trace!("Requesting CID {cid} from bitswap");
1071        let query_id = self.swarm.context().behaviour.bitswap.get(&cid);
1072        self.bitswap_queries.insert(query_id, respond_to);
1073    }
1074
1075    #[instrument(level = "trace", skip(self))]
1076    async fn on_bitswap_event(&mut self, ev: beetswap::Event) {
1077        match ev {
1078            beetswap::Event::GetQueryResponse { query_id, data } => {
1079                if let Some(respond_to) = self.bitswap_queries.remove(&query_id) {
1080                    respond_to.maybe_send_ok(data);
1081                }
1082            }
1083            beetswap::Event::GetQueryError { query_id, error } => {
1084                if let Some(respond_to) = self.bitswap_queries.remove(&query_id) {
1085                    let error: P2pError = error.into();
1086                    respond_to.maybe_send_err(error);
1087                }
1088            }
1089        }
1090    }
1091
1092    #[instrument(level = "trace", skip(self))]
1093    async fn on_header_ex_event(&mut self, ev: header_ex::Event) {
1094        match ev {
1095            header_ex::Event::SchedulePendingRequests => {
1096                let ctx = self.swarm.context();
1097
1098                ctx.behaviour
1099                    .header_ex
1100                    .schedule_pending_requests(ctx.peer_tracker);
1101            }
1102            header_ex::Event::NeedTrustedPeers => {
1103                self.swarm.connect_to_bootnodes();
1104            }
1105            header_ex::Event::NeedArchivalPeers => {
1106                self.swarm.start_archival_node_kad_query();
1107            }
1108        }
1109    }
1110
1111    #[instrument(level = "trace", skip(self))]
1112    async fn on_shrex_event(&mut self, ev: shrex::Event) {
1113        match ev {
1114            shrex::Event::SchedulePendingRequests => {
1115                let ctx = self.swarm.context();
1116
1117                ctx.behaviour
1118                    .shr_ex
1119                    .schedule_pending_requests(ctx.peer_tracker);
1120            }
1121
1122            shrex::Event::AddPeers(peers) => {
1123                let added = peers
1124                    .iter()
1125                    .filter(|peer| self.swarm.peer_maybe_discovered(peer))
1126                    .count();
1127
1128                debug!("Added {added} peers discovered through shrex");
1129            }
1130
1131            shrex::Event::BlockPeers(peers) => {
1132                let blocked = peers
1133                    .into_iter()
1134                    .filter(|peer| self.swarm.blacklist_peer(peer))
1135                    .count();
1136
1137                debug!("Blocked {blocked} peers for shrex missbehaviour");
1138            }
1139        }
1140    }
1141
1142    #[instrument(skip_all, fields(header = %head))]
1143    fn on_init_header_sub(&mut self, head: ExtendedHeader, channel: mpsc::Sender<ExtendedHeader>) {
1144        self.header_sub_state = Some(HeaderSubState {
1145            known_head: head,
1146            channel,
1147        });
1148        trace!("HeaderSub initialized");
1149    }
1150
1151    #[instrument(skip_all)]
1152    fn on_header_sub_message(&mut self, data: &[u8]) -> gossipsub::MessageAcceptance {
1153        let Ok(header) = ExtendedHeader::decode_and_validate(data) else {
1154            trace!("Malformed or invalid header from header-sub");
1155            return gossipsub::MessageAcceptance::Reject;
1156        };
1157
1158        trace!("Received header from header-sub ({header})");
1159
1160        let Some(ref mut state) = self.header_sub_state else {
1161            debug!("header-sub not initialized yet");
1162            return gossipsub::MessageAcceptance::Ignore;
1163        };
1164
1165        if state.known_head.verify(&header).is_err() {
1166            trace!("Failed to verify HeaderSub header. Ignoring {header}");
1167            return gossipsub::MessageAcceptance::Ignore;
1168        }
1169
1170        trace!("New header from header-sub ({header})");
1171
1172        state.known_head = header.clone();
1173        // We intentionally do not `send().await` to avoid blocking `P2p`
1174        // in case `Syncer` enters some weird state.
1175        let _ = state.channel.try_send(header);
1176
1177        gossipsub::MessageAcceptance::Accept
1178    }
1179
1180    #[instrument(skip_all)]
1181    async fn on_bad_encoding_fraud_sub_message(
1182        &mut self,
1183        data: &[u8],
1184        peer: &PeerId,
1185    ) -> gossipsub::MessageAcceptance {
1186        let Ok(befp) = BadEncodingFraudProof::decode(data) else {
1187            trace!("Malformed bad encoding fraud proof from {peer}");
1188            self.swarm
1189                .context()
1190                .behaviour
1191                .gossipsub
1192                .blacklist_peer(peer);
1193            return gossipsub::MessageAcceptance::Reject;
1194        };
1195
1196        let height = befp.height();
1197
1198        let current_height = if let Some(ref header_sub_state) = self.header_sub_state {
1199            header_sub_state.known_head.height()
1200        } else if let Ok(local_head) = self.store.get_head().await {
1201            local_head.height()
1202        } else {
1203            // we aren't tracking the network and have uninitialized store
1204            return gossipsub::MessageAcceptance::Ignore;
1205        };
1206
1207        if height > current_height + FRAUD_PROOF_HEAD_HEIGHT_THRESHOLD {
1208            // does this threshold make any sense if we're gonna ignore it anyway
1209            // since we won't have the header
1210            return gossipsub::MessageAcceptance::Ignore;
1211        }
1212
1213        let hash = befp.header_hash();
1214        let Ok(header) = self.store.get_by_hash(&hash).await else {
1215            // we can't verify the proof without a header
1216            // TODO: should we then store it and wait for the height? celestia doesn't
1217            return gossipsub::MessageAcceptance::Ignore;
1218        };
1219
1220        if let Err(e) = befp.validate(&header) {
1221            trace!("Received invalid bad encoding fraud proof from {peer}: {e}");
1222            self.swarm
1223                .context()
1224                .behaviour
1225                .gossipsub
1226                .blacklist_peer(peer);
1227            return gossipsub::MessageAcceptance::Reject;
1228        }
1229
1230        warn!("Received a valid bad encoding fraud proof");
1231        // trigger cancellation for all services
1232        self.network_compromised_token.trigger();
1233
1234        gossipsub::MessageAcceptance::Accept
1235    }
1236}
1237
1238/// Awaits at least one channel from the `bitswap_queries` to close.
1239async fn poll_closed(
1240    bitswap_queries: &mut HashMap<beetswap::QueryId, OneshotResultSender<Vec<u8>, P2pError>>,
1241) {
1242    poll_fn(|cx| {
1243        for chan in bitswap_queries.values_mut() {
1244            match chan.poll_closed(cx) {
1245                Poll::Pending => continue,
1246                Poll::Ready(_) => return Poll::Ready(()),
1247            }
1248        }
1249
1250        Poll::Pending
1251    })
1252    .await
1253}
1254
1255fn validate_bootnode_addrs(addrs: &[Multiaddr]) -> Result<(), P2pError> {
1256    let mut invalid_addrs = Vec::new();
1257
1258    for addr in addrs {
1259        if addr.peer_id().is_none() {
1260            invalid_addrs.push(addr.to_owned());
1261        }
1262    }
1263
1264    if invalid_addrs.is_empty() {
1265        Ok(())
1266    } else {
1267        Err(P2pError::BootnodeAddrsWithoutPeerId(invalid_addrs))
1268    }
1269}
1270
1271fn init_gossipsub<'a, B, S>(
1272    args: &'a P2pArgs<B, S>,
1273    topics: impl IntoIterator<Item = &'a gossipsub::IdentTopic>,
1274) -> Result<gossipsub::Behaviour>
1275where
1276    B: Blockstore,
1277    S: Store,
1278{
1279    // Set the message authenticity - How we expect to publish messages
1280    // Here we expect the publisher to sign the message with their key.
1281    let message_authenticity = gossipsub::MessageAuthenticity::Signed(args.local_keypair.clone());
1282
1283    let config = gossipsub::ConfigBuilder::default()
1284        .validation_mode(gossipsub::ValidationMode::Strict)
1285        .validate_messages()
1286        .build()
1287        .map_err(|e| P2pError::GossipsubInit(e.to_string()))?;
1288
1289    // build a gossipsub network behaviour
1290    let mut gossipsub: gossipsub::Behaviour =
1291        gossipsub::Behaviour::new(message_authenticity, config)
1292            .map_err(|e| P2pError::GossipsubInit(e.to_string()))?;
1293
1294    for topic in topics {
1295        gossipsub
1296            .subscribe(topic)
1297            .map_err(|e| P2pError::GossipsubInit(e.to_string()))?;
1298    }
1299
1300    Ok(gossipsub)
1301}
1302
1303fn init_bitswap<B, S>(
1304    blockstore: Arc<B>,
1305    store: Arc<S>,
1306    network_id: &str,
1307) -> Result<beetswap::Behaviour<MAX_MH_SIZE, B>>
1308where
1309    B: Blockstore + 'static,
1310    S: Store + 'static,
1311{
1312    let protocol_prefix = celestia_protocol_id(network_id, "shwap");
1313
1314    Ok(beetswap::Behaviour::builder(blockstore)
1315        .protocol_prefix(protocol_prefix.as_ref())?
1316        .register_multihasher(ShwapMultihasher::new(store))
1317        .client_set_send_dont_have(false)
1318        .build())
1319}