lumina_node/
p2p.rs

1//! Component responsible for the messaging and interacting with other nodes on the peer to peer layer.
2//!
3//! It is a high level integration of various p2p protocols used by Celestia nodes.
4//! Currently supporting:
5//! - libp2p-identitfy
6//! - libp2p-kad
7//! - libp2p-autonat
8//! - libp2p-ping
9//! - header-sub topic on libp2p-gossipsub
10//! - fraud-sub topic on libp2p-gossipsub
11//! - header-ex client
12//! - header-ex server
13//! - bitswap 1.2.0
14//! - shwap - celestia's data availability protocol on top of bitswap
15
16use std::collections::HashMap;
17use std::future::poll_fn;
18use std::sync::Arc;
19use std::task::Poll;
20use std::time::Duration;
21
22use blockstore::Blockstore;
23use celestia_proto::p2p::pb::{HeaderRequest, header_request};
24use celestia_types::fraud_proof::BadEncodingFraudProof;
25use celestia_types::hash::Hash;
26use celestia_types::nmt::{Namespace, NamespacedSha2Hasher};
27use celestia_types::row::{Row, RowId};
28use celestia_types::row_namespace_data::{RowNamespaceData, RowNamespaceDataId};
29use celestia_types::sample::{Sample, SampleId};
30use celestia_types::{Blob, ExtendedHeader, FraudProof};
31use cid::Cid;
32use futures::TryStreamExt;
33use futures::stream::FuturesOrdered;
34use libp2p::gossipsub::TopicHash;
35use libp2p::identity::Keypair;
36use libp2p::swarm::{NetworkBehaviour, NetworkInfo};
37use libp2p::{Multiaddr, PeerId, gossipsub};
38use lumina_utils::executor::{JoinHandle, spawn};
39use lumina_utils::time::{self, Interval};
40use lumina_utils::token::Token;
41use smallvec::SmallVec;
42use tendermint_proto::Protobuf;
43use tokio::select;
44use tokio::sync::{mpsc, oneshot, watch};
45use tokio_util::sync::CancellationToken;
46use tracing::{debug, info, instrument, trace, warn};
47
48mod connection_control;
49mod header_ex;
50pub(crate) mod header_session;
51pub(crate) mod shwap;
52mod swarm;
53mod swarm_manager;
54
55use crate::block_ranges::BlockRange;
56use crate::events::EventPublisher;
57use crate::p2p::header_ex::{HeaderExBehaviour, HeaderExConfig};
58use crate::p2p::header_session::HeaderSession;
59use crate::p2p::shwap::{ShwapMultihasher, convert_cid, get_block_container};
60use crate::p2p::swarm_manager::SwarmManager;
61use crate::peer_tracker::PeerTracker;
62use crate::peer_tracker::PeerTrackerInfo;
63use crate::store::{Store, StoreError};
64use crate::utils::{
65    MultiaddrExt, OneshotResultSender, OneshotResultSenderExt, OneshotSenderExt,
66    celestia_protocol_id, fraudsub_ident_topic, gossipsub_ident_topic,
67};
68
69pub use crate::p2p::header_ex::HeaderExError;
70
71// Maximum size of a [`Multihash`].
72pub(crate) const MAX_MH_SIZE: usize = 64;
73
74// all fraud proofs for height bigger than head height by this threshold
75// will be ignored
76const FRAUD_PROOF_HEAD_HEIGHT_THRESHOLD: u64 = 20;
77
78pub(crate) type Result<T, E = P2pError> = std::result::Result<T, E>;
79
80/// Representation of all the errors that can occur in `P2p` component.
81#[derive(Debug, thiserror::Error)]
82pub enum P2pError {
83    /// Failed to initialize gossipsub behaviour.
84    #[error("Failed to initialize gossipsub behaviour: {0}")]
85    GossipsubInit(String),
86
87    /// Failed to initialize TLS.
88    #[error("Failed to initialize TLS: {0}")]
89    TlsInit(String),
90
91    /// Failed to initialize noise protocol.
92    #[error("Failed to initialize noise: {0}")]
93    NoiseInit(String),
94
95    /// The worker has died.
96    #[error("Worker died")]
97    WorkerDied,
98
99    /// Channel closed unexpectedly.
100    #[error("Channel closed unexpectedly")]
101    ChannelClosedUnexpectedly,
102
103    /// Not connected to any peers.
104    #[error("Not connected to any peers")]
105    NoConnectedPeers,
106
107    /// An error propagated from the `header-ex`.
108    #[error("HeaderEx: {0}")]
109    HeaderEx(#[from] HeaderExError),
110
111    /// Bootnode address is missing its peer ID.
112    #[error("Bootnode multiaddrs without peer ID: {0:?}")]
113    BootnodeAddrsWithoutPeerId(Vec<Multiaddr>),
114
115    /// An error propagated from [`beetswap::Behaviour`].
116    #[error("Bitswap: {0}")]
117    Bitswap(#[from] beetswap::Error),
118
119    /// ProtoBuf message failed to be decoded.
120    #[error("ProtoBuf decoding error: {0}")]
121    ProtoDecodeFailed(#[from] tendermint_proto::Error),
122
123    /// An error propagated from [`celestia_types`] that is related to [`Cid`].
124    #[error("CID error: {0}")]
125    Cid(celestia_types::Error),
126
127    /// Bitswap query timed out.
128    #[error("Bitswap query timed out")]
129    BitswapQueryTimeout,
130
131    /// Shwap protocol error.
132    #[error("Shwap: {0}")]
133    Shwap(String),
134
135    /// An error propagated from [`celestia_types`].
136    #[error(transparent)]
137    CelestiaTypes(#[from] celestia_types::Error),
138
139    /// An error propagated from [`Store`].
140    #[error("Store error: {0}")]
141    Store(#[from] StoreError),
142
143    /// Header was pruned.
144    #[error("Header of {0} block was pruned because it is outside of retention period")]
145    HeaderPruned(u64),
146
147    /// Header not synced yet.
148    #[error("Header of {0} block is not synced yet")]
149    HeaderNotSynced(u64),
150}
151
152impl P2pError {
153    /// Returns `true` if an error is fatal in all possible scenarios.
154    ///
155    /// If unsure mark it as non-fatal error.
156    pub(crate) fn is_fatal(&self) -> bool {
157        match self {
158            P2pError::GossipsubInit(_)
159            | P2pError::NoiseInit(_)
160            | P2pError::TlsInit(_)
161            | P2pError::WorkerDied
162            | P2pError::ChannelClosedUnexpectedly
163            | P2pError::BootnodeAddrsWithoutPeerId(_) => true,
164            P2pError::NoConnectedPeers
165            | P2pError::HeaderEx(_)
166            | P2pError::Bitswap(_)
167            | P2pError::ProtoDecodeFailed(_)
168            | P2pError::Cid(_)
169            | P2pError::BitswapQueryTimeout
170            | P2pError::Shwap(_)
171            | P2pError::CelestiaTypes(_)
172            | P2pError::HeaderPruned(_)
173            | P2pError::HeaderNotSynced(_) => false,
174            P2pError::Store(e) => e.is_fatal(),
175        }
176    }
177}
178
179impl From<oneshot::error::RecvError> for P2pError {
180    fn from(_value: oneshot::error::RecvError) -> Self {
181        P2pError::ChannelClosedUnexpectedly
182    }
183}
184
185impl From<prost::DecodeError> for P2pError {
186    fn from(value: prost::DecodeError) -> Self {
187        P2pError::ProtoDecodeFailed(tendermint_proto::Error::decode_message(value))
188    }
189}
190
191impl From<cid::Error> for P2pError {
192    fn from(value: cid::Error) -> Self {
193        P2pError::Cid(celestia_types::Error::CidError(
194            blockstore::block::CidError::InvalidCid(value.to_string()),
195        ))
196    }
197}
198
199/// Component responsible for the peer to peer networking handling.
200#[derive(Debug)]
201pub(crate) struct P2p {
202    cancellation_token: CancellationToken,
203    cmd_tx: mpsc::Sender<P2pCmd>,
204    join_handle: JoinHandle,
205    peer_tracker_info_watcher: watch::Receiver<PeerTrackerInfo>,
206    local_peer_id: PeerId,
207}
208
209/// Arguments used to configure the [`P2p`].
210pub struct P2pArgs<B, S>
211where
212    B: Blockstore,
213    S: Store,
214{
215    /// An id of the network to connect to.
216    pub network_id: String,
217    /// The keypair to be used as the identity.
218    pub local_keypair: Keypair,
219    /// List of bootstrap nodes to connect to and trust.
220    pub bootnodes: Vec<Multiaddr>,
221    /// List of the addresses on which to listen for incoming connections.
222    pub listen_on: Vec<Multiaddr>,
223    /// The store for headers.
224    pub blockstore: Arc<B>,
225    /// The store for headers.
226    pub store: Arc<S>,
227    /// Event publisher.
228    pub event_pub: EventPublisher,
229}
230
231#[derive(Debug)]
232pub(crate) enum P2pCmd {
233    NetworkInfo {
234        respond_to: oneshot::Sender<NetworkInfo>,
235    },
236    HeaderExRequest {
237        request: HeaderRequest,
238        respond_to: OneshotResultSender<Vec<ExtendedHeader>, P2pError>,
239    },
240    Listeners {
241        respond_to: oneshot::Sender<Vec<Multiaddr>>,
242    },
243    ConnectedPeers {
244        respond_to: oneshot::Sender<Vec<PeerId>>,
245    },
246    InitHeaderSub {
247        head: Box<ExtendedHeader>,
248        /// Any valid headers received by header-sub will be send to this channel.
249        channel: mpsc::Sender<ExtendedHeader>,
250    },
251    SetPeerTrust {
252        peer_id: PeerId,
253        is_trusted: bool,
254    },
255    GetShwapCid {
256        cid: Cid,
257        respond_to: OneshotResultSender<Vec<u8>, P2pError>,
258    },
259    GetNetworkCompromisedToken {
260        respond_to: oneshot::Sender<Token>,
261    },
262    GetNetworkHead {
263        respond_to: oneshot::Sender<Option<ExtendedHeader>>,
264    },
265}
266
267impl P2p {
268    /// Creates and starts a new p2p handler.
269    pub async fn start<B, S>(args: P2pArgs<B, S>) -> Result<Self>
270    where
271        B: Blockstore + 'static,
272        S: Store + 'static,
273    {
274        validate_bootnode_addrs(&args.bootnodes)?;
275
276        let local_peer_id = PeerId::from(args.local_keypair.public());
277
278        let peer_tracker = PeerTracker::new(args.event_pub.clone());
279        let peer_tracker_info_watcher = peer_tracker.info_watcher();
280
281        let cancellation_token = CancellationToken::new();
282        let (cmd_tx, cmd_rx) = mpsc::channel(16);
283
284        let mut worker =
285            Worker::new(args, cancellation_token.child_token(), cmd_rx, peer_tracker).await?;
286
287        let join_handle = spawn(async move {
288            worker.run().await;
289        });
290
291        Ok(P2p {
292            cancellation_token,
293            cmd_tx,
294            join_handle,
295            peer_tracker_info_watcher,
296            local_peer_id,
297        })
298    }
299
300    /// Creates and starts a new mocked p2p handler.
301    #[cfg(test)]
302    pub fn mocked() -> (Self, crate::test_utils::MockP2pHandle) {
303        let (cmd_tx, cmd_rx) = mpsc::channel(16);
304        let (peer_tracker_tx, peer_tracker_rx) = watch::channel(PeerTrackerInfo::default());
305        let cancellation_token = CancellationToken::new();
306
307        // Just a fake join_handle
308        let join_handle = spawn(async {});
309
310        let p2p = P2p {
311            cmd_tx: cmd_tx.clone(),
312            cancellation_token,
313            join_handle,
314            peer_tracker_info_watcher: peer_tracker_rx,
315            local_peer_id: PeerId::random(),
316        };
317
318        let handle = crate::test_utils::MockP2pHandle {
319            cmd_tx,
320            cmd_rx,
321            header_sub_tx: None,
322            peer_tracker_tx,
323        };
324
325        (p2p, handle)
326    }
327
328    /// Stop the worker.
329    pub fn stop(&self) {
330        // Singal the Worker to stop.
331        self.cancellation_token.cancel();
332    }
333
334    /// Wait until worker is completely stopped.
335    pub async fn join(&self) {
336        self.join_handle.join().await;
337    }
338
339    /// Local peer ID on the p2p network.
340    pub fn local_peer_id(&self) -> &PeerId {
341        &self.local_peer_id
342    }
343
344    async fn send_command(&self, cmd: P2pCmd) -> Result<()> {
345        self.cmd_tx
346            .send(cmd)
347            .await
348            .map_err(|_| P2pError::WorkerDied)
349    }
350
351    /// Watcher for the current [`PeerTrackerInfo`].
352    pub fn peer_tracker_info_watcher(&self) -> watch::Receiver<PeerTrackerInfo> {
353        self.peer_tracker_info_watcher.clone()
354    }
355
356    /// A reference to the current [`PeerTrackerInfo`].
357    pub fn peer_tracker_info(&self) -> watch::Ref<'_, PeerTrackerInfo> {
358        self.peer_tracker_info_watcher.borrow()
359    }
360
361    /// Initializes `header-sub` protocol with a given `subjective_head`.
362    pub async fn init_header_sub(
363        &self,
364        head: ExtendedHeader,
365        channel: mpsc::Sender<ExtendedHeader>,
366    ) -> Result<()> {
367        self.send_command(P2pCmd::InitHeaderSub {
368            head: Box::new(head),
369            channel,
370        })
371        .await
372    }
373
374    /// Wait until the node is connected to any peer.
375    pub async fn wait_connected(&self) -> Result<()> {
376        self.peer_tracker_info_watcher()
377            .wait_for(|info| info.num_connected_peers > 0)
378            .await
379            .map(drop)
380            .map_err(|_| P2pError::WorkerDied)
381    }
382
383    /// Wait until the node is connected to any trusted peer.
384    pub async fn wait_connected_trusted(&self) -> Result<()> {
385        self.peer_tracker_info_watcher()
386            .wait_for(|info| info.num_connected_trusted_peers > 0)
387            .await
388            .map(drop)
389            .map_err(|_| P2pError::WorkerDied)
390    }
391
392    /// Get current [`NetworkInfo`].
393    pub async fn network_info(&self) -> Result<NetworkInfo> {
394        let (tx, rx) = oneshot::channel();
395
396        self.send_command(P2pCmd::NetworkInfo { respond_to: tx })
397            .await?;
398
399        Ok(rx.await?)
400    }
401
402    /// Send a request on the `header-ex` protocol.
403    pub async fn header_ex_request(&self, request: HeaderRequest) -> Result<Vec<ExtendedHeader>> {
404        let (tx, rx) = oneshot::channel();
405
406        self.send_command(P2pCmd::HeaderExRequest {
407            request,
408            respond_to: tx,
409        })
410        .await?;
411
412        rx.await?
413    }
414
415    /// Request the head header on the `header-ex` protocol.
416    pub async fn get_head_header(&self) -> Result<ExtendedHeader> {
417        self.get_header_by_height(0).await
418    }
419
420    /// Request the header by hash on the `header-ex` protocol.
421    pub async fn get_header(&self, hash: Hash) -> Result<ExtendedHeader> {
422        self.header_ex_request(HeaderRequest {
423            data: Some(header_request::Data::Hash(hash.as_bytes().to_vec())),
424            amount: 1,
425        })
426        .await?
427        .into_iter()
428        .next()
429        .ok_or(HeaderExError::HeaderNotFound.into())
430    }
431
432    /// Request the header by height on the `header-ex` protocol.
433    pub async fn get_header_by_height(&self, height: u64) -> Result<ExtendedHeader> {
434        self.header_ex_request(HeaderRequest {
435            data: Some(header_request::Data::Origin(height)),
436            amount: 1,
437        })
438        .await?
439        .into_iter()
440        .next()
441        .ok_or(HeaderExError::HeaderNotFound.into())
442    }
443
444    /// Request the headers following the one given with the `header-ex` protocol.
445    ///
446    /// First header from the requested range will be verified against the provided one,
447    /// then each subsequent is verified against the previous one.
448    pub async fn get_verified_headers_range(
449        &self,
450        from: &ExtendedHeader,
451        amount: u64,
452    ) -> Result<Vec<ExtendedHeader>> {
453        // User can give us a bad header, so validate it.
454        from.validate().map_err(|_| HeaderExError::InvalidRequest)?;
455
456        let height = from.height().value() + 1;
457
458        let range = height..=height + amount - 1;
459
460        let mut session = HeaderSession::new(range, self.cmd_tx.clone());
461        let headers = session.run().await?;
462
463        // `.validate()` is called on each header separately by `HeaderExClientHandler`.
464        //
465        // The last step is to verify that all headers are from the same chain
466        // and indeed connected with the next one.
467        from.verify_adjacent_range(&headers)
468            .map_err(|_| HeaderExError::InvalidResponse)?;
469
470        Ok(headers)
471    }
472
473    /// Request a list of ranges with the `header-ex` protocol
474    ///
475    /// For each of the ranges, headers are verified against each other, but it's the caller
476    /// responsibility to verify range edges against headers existing in the store.
477    pub(crate) async fn get_unverified_header_range(
478        &self,
479        range: BlockRange,
480    ) -> Result<Vec<ExtendedHeader>> {
481        if range.is_empty() {
482            return Err(HeaderExError::InvalidRequest.into());
483        }
484
485        let mut session = HeaderSession::new(range, self.cmd_tx.clone());
486        let headers = session.run().await?;
487
488        let Some(head) = headers.first() else {
489            return Err(HeaderExError::InvalidResponse.into());
490        };
491
492        head.verify_adjacent_range(&headers[1..])
493            .map_err(|_| HeaderExError::InvalidResponse)?;
494
495        Ok(headers)
496    }
497
498    /// Request a [`Cid`] on bitswap protocol.
499    pub(crate) async fn get_shwap_cid(
500        &self,
501        cid: Cid,
502        timeout: Option<Duration>,
503    ) -> Result<Vec<u8>> {
504        let (tx, rx) = oneshot::channel();
505
506        self.send_command(P2pCmd::GetShwapCid {
507            cid,
508            respond_to: tx,
509        })
510        .await?;
511
512        let data = match timeout {
513            Some(dur) => time::timeout(dur, rx)
514                .await
515                .map_err(|_| P2pError::BitswapQueryTimeout)???,
516            None => rx.await??,
517        };
518
519        get_block_container(&cid, &data)
520    }
521
522    /// Request a [`Row`] on bitswap protocol.
523    pub async fn get_row(
524        &self,
525        row_index: u16,
526        block_height: u64,
527        timeout: Option<Duration>,
528    ) -> Result<Row> {
529        let id = RowId::new(row_index, block_height).map_err(P2pError::Cid)?;
530        let cid = convert_cid(&id.into())?;
531
532        let data = self.get_shwap_cid(cid, timeout).await?;
533        let row = Row::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?;
534        Ok(row)
535    }
536
537    /// Request a [`Sample`] on bitswap protocol.
538    pub async fn get_sample(
539        &self,
540        row_index: u16,
541        column_index: u16,
542        block_height: u64,
543        timeout: Option<Duration>,
544    ) -> Result<Sample> {
545        let id = SampleId::new(row_index, column_index, block_height).map_err(P2pError::Cid)?;
546        let cid = convert_cid(&id.into())?;
547
548        let data = self.get_shwap_cid(cid, timeout).await?;
549        let sample = Sample::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?;
550        Ok(sample)
551    }
552
553    /// Request a [`RowNamespaceData`] on bitswap protocol.
554    pub async fn get_row_namespace_data(
555        &self,
556        namespace: Namespace,
557        row_index: u16,
558        block_height: u64,
559        timeout: Option<Duration>,
560    ) -> Result<RowNamespaceData> {
561        let id =
562            RowNamespaceDataId::new(namespace, row_index, block_height).map_err(P2pError::Cid)?;
563        let cid = convert_cid(&id.into())?;
564
565        let data = self.get_shwap_cid(cid, timeout).await?;
566        let row_namespace_data =
567            RowNamespaceData::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?;
568        Ok(row_namespace_data)
569    }
570
571    /// Request all blobs with provided namespace in the block corresponding to this header
572    /// using bitswap protocol.
573    pub async fn get_all_blobs<S>(
574        &self,
575        namespace: Namespace,
576        block_height: u64,
577        timeout: Option<Duration>,
578        store: &S,
579    ) -> Result<Vec<Blob>>
580    where
581        S: Store,
582    {
583        let header = match store.get_by_height(block_height).await {
584            Ok(header) => header,
585            Err(StoreError::NotFound) => {
586                let pruned_ranges = store.get_pruned_ranges().await?;
587
588                if pruned_ranges.contains(block_height) {
589                    return Err(P2pError::HeaderPruned(block_height));
590                } else {
591                    return Err(P2pError::HeaderNotSynced(block_height));
592                }
593            }
594            Err(e) => return Err(e.into()),
595        };
596
597        let app_version = header.app_version()?;
598        let rows_to_fetch: Vec<_> = header
599            .dah
600            .row_roots()
601            .iter()
602            .enumerate()
603            .filter(|(_, row)| row.contains::<NamespacedSha2Hasher>(*namespace))
604            .map(|(n, _)| n as u16)
605            .collect();
606
607        let futs = rows_to_fetch
608            .into_iter()
609            .map(|row_idx| self.get_row_namespace_data(namespace, row_idx, block_height, timeout))
610            .collect::<FuturesOrdered<_>>();
611
612        let rows: Vec<_> = match futs.try_collect().await {
613            Ok(rows) => rows,
614            Err(P2pError::BitswapQueryTimeout) if !store.has_at(block_height).await => {
615                return Err(P2pError::HeaderPruned(block_height));
616            }
617            Err(e) => return Err(e),
618        };
619
620        let shares = rows.iter().flat_map(|row| row.shares.iter());
621
622        Ok(Blob::reconstruct_all(shares, app_version)?)
623    }
624
625    /// Get the addresses where [`P2p`] listens on for incoming connections.
626    pub async fn listeners(&self) -> Result<Vec<Multiaddr>> {
627        let (tx, rx) = oneshot::channel();
628
629        self.send_command(P2pCmd::Listeners { respond_to: tx })
630            .await?;
631
632        Ok(rx.await?)
633    }
634
635    /// Get the list of connected peers.
636    pub async fn connected_peers(&self) -> Result<Vec<PeerId>> {
637        let (tx, rx) = oneshot::channel();
638
639        self.send_command(P2pCmd::ConnectedPeers { respond_to: tx })
640            .await?;
641
642        Ok(rx.await?)
643    }
644
645    /// Alter the trust status for a given peer.
646    pub async fn set_peer_trust(&self, peer_id: PeerId, is_trusted: bool) -> Result<()> {
647        self.send_command(P2pCmd::SetPeerTrust {
648            peer_id,
649            is_trusted,
650        })
651        .await
652    }
653
654    /// Get the cancellation token which will be cancelled when the network gets compromised.
655    ///
656    /// After this token is cancelled, the network should be treated as insincere
657    /// and should not be trusted.
658    pub(crate) async fn get_network_compromised_token(&self) -> Result<Token> {
659        let (tx, rx) = oneshot::channel();
660
661        self.send_command(P2pCmd::GetNetworkCompromisedToken { respond_to: tx })
662            .await?;
663
664        Ok(rx.await?)
665    }
666
667    /// Get the latest header announced on the network.
668    pub async fn get_network_head(&self) -> Result<Option<ExtendedHeader>> {
669        let (tx, rx) = oneshot::channel();
670
671        self.send_command(P2pCmd::GetNetworkHead { respond_to: tx })
672            .await?;
673
674        Ok(rx.await?)
675    }
676}
677
678impl Drop for P2p {
679    fn drop(&mut self) {
680        self.stop();
681    }
682}
683
684#[derive(NetworkBehaviour)]
685struct Behaviour<B, S>
686where
687    B: Blockstore + 'static,
688    S: Store + 'static,
689{
690    bitswap: beetswap::Behaviour<MAX_MH_SIZE, B>,
691    header_ex: HeaderExBehaviour<S>,
692    gossipsub: gossipsub::Behaviour,
693}
694
695struct Worker<B, S>
696where
697    B: Blockstore + 'static,
698    S: Store + 'static,
699{
700    cancellation_token: CancellationToken,
701    swarm: SwarmManager<Behaviour<B, S>>,
702    header_sub_topic_hash: TopicHash,
703    bad_encoding_fraud_sub_topic: TopicHash,
704    cmd_rx: mpsc::Receiver<P2pCmd>,
705    header_sub_state: Option<HeaderSubState>,
706    bitswap_queries: HashMap<beetswap::QueryId, OneshotResultSender<Vec<u8>, P2pError>>,
707    network_compromised_token: Token,
708    store: Arc<S>,
709}
710
711struct HeaderSubState {
712    known_head: ExtendedHeader,
713    channel: mpsc::Sender<ExtendedHeader>,
714}
715
716impl<B, S> Worker<B, S>
717where
718    B: Blockstore,
719    S: Store,
720{
721    async fn new(
722        args: P2pArgs<B, S>,
723        cancellation_token: CancellationToken,
724        cmd_rx: mpsc::Receiver<P2pCmd>,
725        peer_tracker: PeerTracker,
726    ) -> Result<Self, P2pError> {
727        let header_sub_topic = gossipsub_ident_topic(&args.network_id, "/header-sub/v0.0.1");
728        let bad_encoding_fraud_sub_topic =
729            fraudsub_ident_topic(BadEncodingFraudProof::TYPE, &args.network_id);
730        let gossipsub = init_gossipsub(&args, [&header_sub_topic, &bad_encoding_fraud_sub_topic])?;
731
732        let bitswap = init_bitswap(
733            args.blockstore.clone(),
734            args.store.clone(),
735            &args.network_id,
736        )?;
737
738        let header_ex = HeaderExBehaviour::new(HeaderExConfig {
739            network_id: &args.network_id,
740            header_store: args.store.clone(),
741        });
742
743        let behaviour = Behaviour {
744            bitswap,
745            gossipsub,
746            header_ex,
747        };
748
749        let swarm = SwarmManager::new(
750            &args.network_id,
751            &args.local_keypair,
752            &args.bootnodes,
753            &args.listen_on,
754            peer_tracker,
755            args.event_pub,
756            behaviour,
757        )
758        .await?;
759
760        Ok(Worker {
761            cancellation_token,
762            swarm,
763            cmd_rx,
764            bad_encoding_fraud_sub_topic: bad_encoding_fraud_sub_topic.hash(),
765            header_sub_topic_hash: header_sub_topic.hash(),
766            header_sub_state: None,
767            bitswap_queries: HashMap::new(),
768            network_compromised_token: Token::new(),
769            store: args.store,
770        })
771    }
772
773    async fn run(&mut self) {
774        let mut report_interval = Interval::new(Duration::from_secs(60));
775
776        loop {
777            select! {
778                _ = self.cancellation_token.cancelled() => break,
779                _ = report_interval.tick() => {
780                    self.report();
781                }
782                _ = poll_closed(&mut self.bitswap_queries) => {
783                    self.prune_canceled_bitswap_queries();
784                }
785                res = self.swarm.poll() => {
786                    match res {
787                        Ok(ev) => {
788                            if let Err(e) = self.on_behaviour_event(ev).await {
789                                warn!("Failure while handling behaviour event: {e}");
790                            }
791                        }
792                        Err(e) => warn!("Failure while polling SwarmManager: {e}"),
793                    }
794                }
795                Some(cmd) = self.cmd_rx.recv() => {
796                    if let Err(e) = self.on_cmd(cmd).await {
797                        warn!("Failure while handling command. (error: {e})");
798                    }
799                }
800            }
801        }
802
803        self.swarm.context().behaviour.header_ex.stop();
804        self.swarm.stop().await;
805    }
806
807    fn prune_canceled_bitswap_queries(&mut self) {
808        let mut cancelled = SmallVec::<[_; 16]>::new();
809
810        for (query_id, chan) in &self.bitswap_queries {
811            if chan.is_closed() {
812                cancelled.push(*query_id);
813            }
814        }
815
816        for query_id in cancelled {
817            self.bitswap_queries.remove(&query_id);
818            self.swarm.context().behaviour.bitswap.cancel(query_id);
819        }
820    }
821
822    async fn on_behaviour_event(&mut self, ev: BehaviourEvent<B, S>) -> Result<()> {
823        match ev {
824            BehaviourEvent::Gossipsub(ev) => self.on_gossip_sub_event(ev).await,
825            BehaviourEvent::Bitswap(ev) => self.on_bitswap_event(ev).await,
826            BehaviourEvent::HeaderEx(_) => {}
827        }
828
829        Ok(())
830    }
831
832    async fn on_cmd(&mut self, cmd: P2pCmd) -> Result<()> {
833        match cmd {
834            P2pCmd::NetworkInfo { respond_to } => {
835                respond_to.maybe_send(self.swarm.network_info());
836            }
837            P2pCmd::HeaderExRequest {
838                request,
839                respond_to,
840            } => {
841                let ctx = self.swarm.context();
842                ctx.behaviour
843                    .header_ex
844                    .send_request(request, respond_to, ctx.peer_tracker);
845            }
846            P2pCmd::Listeners { respond_to } => {
847                respond_to.maybe_send(self.swarm.listeners());
848            }
849            P2pCmd::ConnectedPeers { respond_to } => {
850                let peers = self
851                    .swarm
852                    .context()
853                    .peer_tracker
854                    .peers()
855                    .filter_map(|peer| {
856                        if peer.is_connected() {
857                            Some(*peer.id())
858                        } else {
859                            None
860                        }
861                    })
862                    .collect();
863                respond_to.maybe_send(peers);
864            }
865            P2pCmd::InitHeaderSub { head, channel } => {
866                self.on_init_header_sub(*head, channel);
867            }
868            P2pCmd::SetPeerTrust {
869                peer_id,
870                is_trusted,
871            } => {
872                self.swarm.set_peer_trust(&peer_id, is_trusted);
873            }
874            P2pCmd::GetShwapCid { cid, respond_to } => {
875                self.on_get_shwap_cid(cid, respond_to);
876            }
877            P2pCmd::GetNetworkCompromisedToken { respond_to } => {
878                respond_to.maybe_send(self.network_compromised_token.clone())
879            }
880            P2pCmd::GetNetworkHead { respond_to } => {
881                let head = self
882                    .header_sub_state
883                    .as_ref()
884                    .map(|state| state.known_head.clone());
885                respond_to.maybe_send(head);
886            }
887        }
888
889        Ok(())
890    }
891
892    #[instrument(skip_all)]
893    fn report(&mut self) {
894        let tracker_info = self.swarm.context().peer_tracker.info();
895
896        info!(
897            "peers: {}, trusted peers: {}",
898            tracker_info.num_connected_peers, tracker_info.num_connected_trusted_peers,
899        );
900    }
901
902    #[instrument(level = "trace", skip(self))]
903    async fn on_gossip_sub_event(&mut self, ev: gossipsub::Event) {
904        match ev {
905            gossipsub::Event::Message {
906                message,
907                message_id,
908                ..
909            } => {
910                let Some(peer) = message.source else {
911                    // Validation mode is `strict` so this will never happen
912                    return;
913                };
914
915                let acceptance = if message.topic == self.header_sub_topic_hash {
916                    self.on_header_sub_message(&message.data[..])
917                } else if message.topic == self.bad_encoding_fraud_sub_topic {
918                    self.on_bad_encoding_fraud_sub_message(&message.data[..], &peer)
919                        .await
920                } else {
921                    trace!("Unhandled gossipsub message");
922                    gossipsub::MessageAcceptance::Ignore
923                };
924
925                if !matches!(acceptance, gossipsub::MessageAcceptance::Reject) {
926                    // We may have discovered a new peer
927                    self.swarm.peer_maybe_discovered(&peer);
928                }
929
930                let _ = self
931                    .swarm
932                    .context()
933                    .behaviour
934                    .gossipsub
935                    .report_message_validation_result(&message_id, &peer, acceptance);
936            }
937            _ => trace!("Unhandled gossipsub event"),
938        }
939    }
940
941    #[instrument(level = "trace", skip_all)]
942    fn on_get_shwap_cid(&mut self, cid: Cid, respond_to: OneshotResultSender<Vec<u8>, P2pError>) {
943        trace!("Requesting CID {cid} from bitswap");
944        let query_id = self.swarm.context().behaviour.bitswap.get(&cid);
945        self.bitswap_queries.insert(query_id, respond_to);
946    }
947
948    #[instrument(level = "trace", skip(self))]
949    async fn on_bitswap_event(&mut self, ev: beetswap::Event) {
950        match ev {
951            beetswap::Event::GetQueryResponse { query_id, data } => {
952                if let Some(respond_to) = self.bitswap_queries.remove(&query_id) {
953                    respond_to.maybe_send_ok(data);
954                }
955            }
956            beetswap::Event::GetQueryError { query_id, error } => {
957                if let Some(respond_to) = self.bitswap_queries.remove(&query_id) {
958                    let error: P2pError = error.into();
959                    respond_to.maybe_send_err(error);
960                }
961            }
962        }
963    }
964
965    #[instrument(skip_all, fields(header = %head))]
966    fn on_init_header_sub(&mut self, head: ExtendedHeader, channel: mpsc::Sender<ExtendedHeader>) {
967        self.header_sub_state = Some(HeaderSubState {
968            known_head: head,
969            channel,
970        });
971        trace!("HeaderSub initialized");
972    }
973
974    #[instrument(skip_all)]
975    fn on_header_sub_message(&mut self, data: &[u8]) -> gossipsub::MessageAcceptance {
976        let Ok(header) = ExtendedHeader::decode_and_validate(data) else {
977            trace!("Malformed or invalid header from header-sub");
978            return gossipsub::MessageAcceptance::Reject;
979        };
980
981        trace!("Received header from header-sub ({header})");
982
983        let Some(ref mut state) = self.header_sub_state else {
984            debug!("header-sub not initialized yet");
985            return gossipsub::MessageAcceptance::Ignore;
986        };
987
988        if state.known_head.verify(&header).is_err() {
989            trace!("Failed to verify HeaderSub header. Ignoring {header}");
990            return gossipsub::MessageAcceptance::Ignore;
991        }
992
993        trace!("New header from header-sub ({header})");
994
995        state.known_head = header.clone();
996        // We intentionally do not `send().await` to avoid blocking `P2p`
997        // in case `Syncer` enters some weird state.
998        let _ = state.channel.try_send(header);
999
1000        gossipsub::MessageAcceptance::Accept
1001    }
1002
1003    #[instrument(skip_all)]
1004    async fn on_bad_encoding_fraud_sub_message(
1005        &mut self,
1006        data: &[u8],
1007        peer: &PeerId,
1008    ) -> gossipsub::MessageAcceptance {
1009        let Ok(befp) = BadEncodingFraudProof::decode(data) else {
1010            trace!("Malformed bad encoding fraud proof from {peer}");
1011            self.swarm
1012                .context()
1013                .behaviour
1014                .gossipsub
1015                .blacklist_peer(peer);
1016            return gossipsub::MessageAcceptance::Reject;
1017        };
1018
1019        let height = befp.height().value();
1020
1021        let current_height = if let Some(ref header_sub_state) = self.header_sub_state {
1022            header_sub_state.known_head.height().value()
1023        } else if let Ok(local_head) = self.store.get_head().await {
1024            local_head.height().value()
1025        } else {
1026            // we aren't tracking the network and have uninitialized store
1027            return gossipsub::MessageAcceptance::Ignore;
1028        };
1029
1030        if height > current_height + FRAUD_PROOF_HEAD_HEIGHT_THRESHOLD {
1031            // does this threshold make any sense if we're gonna ignore it anyway
1032            // since we won't have the header
1033            return gossipsub::MessageAcceptance::Ignore;
1034        }
1035
1036        let hash = befp.header_hash();
1037        let Ok(header) = self.store.get_by_hash(&hash).await else {
1038            // we can't verify the proof without a header
1039            // TODO: should we then store it and wait for the height? celestia doesn't
1040            return gossipsub::MessageAcceptance::Ignore;
1041        };
1042
1043        if let Err(e) = befp.validate(&header) {
1044            trace!("Received invalid bad encoding fraud proof from {peer}: {e}");
1045            self.swarm
1046                .context()
1047                .behaviour
1048                .gossipsub
1049                .blacklist_peer(peer);
1050            return gossipsub::MessageAcceptance::Reject;
1051        }
1052
1053        warn!("Received a valid bad encoding fraud proof");
1054        // trigger cancellation for all services
1055        self.network_compromised_token.trigger();
1056
1057        gossipsub::MessageAcceptance::Accept
1058    }
1059}
1060
1061/// Awaits at least one channel from the `bitswap_queries` to close.
1062async fn poll_closed(
1063    bitswap_queries: &mut HashMap<beetswap::QueryId, OneshotResultSender<Vec<u8>, P2pError>>,
1064) {
1065    poll_fn(|cx| {
1066        for chan in bitswap_queries.values_mut() {
1067            match chan.poll_closed(cx) {
1068                Poll::Pending => continue,
1069                Poll::Ready(_) => return Poll::Ready(()),
1070            }
1071        }
1072
1073        Poll::Pending
1074    })
1075    .await
1076}
1077
1078fn validate_bootnode_addrs(addrs: &[Multiaddr]) -> Result<(), P2pError> {
1079    let mut invalid_addrs = Vec::new();
1080
1081    for addr in addrs {
1082        if addr.peer_id().is_none() {
1083            invalid_addrs.push(addr.to_owned());
1084        }
1085    }
1086
1087    if invalid_addrs.is_empty() {
1088        Ok(())
1089    } else {
1090        Err(P2pError::BootnodeAddrsWithoutPeerId(invalid_addrs))
1091    }
1092}
1093
1094fn init_gossipsub<'a, B, S>(
1095    args: &'a P2pArgs<B, S>,
1096    topics: impl IntoIterator<Item = &'a gossipsub::IdentTopic>,
1097) -> Result<gossipsub::Behaviour>
1098where
1099    B: Blockstore,
1100    S: Store,
1101{
1102    // Set the message authenticity - How we expect to publish messages
1103    // Here we expect the publisher to sign the message with their key.
1104    let message_authenticity = gossipsub::MessageAuthenticity::Signed(args.local_keypair.clone());
1105
1106    let config = gossipsub::ConfigBuilder::default()
1107        .validation_mode(gossipsub::ValidationMode::Strict)
1108        .validate_messages()
1109        .build()
1110        .map_err(|e| P2pError::GossipsubInit(e.to_string()))?;
1111
1112    // build a gossipsub network behaviour
1113    let mut gossipsub: gossipsub::Behaviour =
1114        gossipsub::Behaviour::new(message_authenticity, config)
1115            .map_err(|e| P2pError::GossipsubInit(e.to_string()))?;
1116
1117    for topic in topics {
1118        gossipsub
1119            .subscribe(topic)
1120            .map_err(|e| P2pError::GossipsubInit(e.to_string()))?;
1121    }
1122
1123    Ok(gossipsub)
1124}
1125
1126fn init_bitswap<B, S>(
1127    blockstore: Arc<B>,
1128    store: Arc<S>,
1129    network_id: &str,
1130) -> Result<beetswap::Behaviour<MAX_MH_SIZE, B>>
1131where
1132    B: Blockstore + 'static,
1133    S: Store + 'static,
1134{
1135    let protocol_prefix = celestia_protocol_id(network_id, "shwap");
1136
1137    Ok(beetswap::Behaviour::builder(blockstore)
1138        .protocol_prefix(protocol_prefix.as_ref())?
1139        .register_multihasher(ShwapMultihasher::new(store))
1140        .client_set_send_dont_have(false)
1141        .build())
1142}