lumina_node/
p2p.rs

1//! Component responsible for the messaging and interacting with other nodes on the peer to peer layer.
2//!
3//! It is a high level integration of various p2p protocols used by Celestia nodes.
4//! Currently supporting:
5//! - libp2p-identitfy
6//! - libp2p-kad
7//! - libp2p-autonat
8//! - libp2p-ping
9//! - header-sub topic on libp2p-gossipsub
10//! - fraud-sub topic on libp2p-gossipsub
11//! - header-ex client
12//! - header-ex server
13//! - bitswap 1.2.0
14//! - shwap - celestia's data availability protocol on top of bitswap
15
16use std::collections::HashMap;
17use std::future::poll_fn;
18use std::sync::Arc;
19use std::task::Poll;
20use std::time::Duration;
21
22use blockstore::Blockstore;
23use celestia_proto::p2p::pb::{header_request, HeaderRequest};
24use celestia_types::fraud_proof::BadEncodingFraudProof;
25use celestia_types::hash::Hash;
26use celestia_types::nmt::{Namespace, NamespacedSha2Hasher};
27use celestia_types::row::{Row, RowId};
28use celestia_types::row_namespace_data::{RowNamespaceData, RowNamespaceDataId};
29use celestia_types::sample::{Sample, SampleId};
30use celestia_types::{Blob, ExtendedHeader, FraudProof};
31use cid::Cid;
32use futures::stream::FuturesOrdered;
33use futures::{StreamExt, TryStreamExt};
34use libp2p::core::transport::ListenerId;
35use libp2p::{
36    autonat,
37    core::{ConnectedPoint, Endpoint},
38    gossipsub::{self, TopicHash},
39    identify,
40    identity::Keypair,
41    kad,
42    multiaddr::Protocol,
43    ping,
44    swarm::{
45        dial_opts::{DialOpts, PeerCondition},
46        ConnectionId, DialError, NetworkBehaviour, NetworkInfo, Swarm, SwarmEvent,
47    },
48    Multiaddr, PeerId,
49};
50use lumina_utils::executor::{spawn, JoinHandle};
51use lumina_utils::time::{self, Interval};
52use lumina_utils::token::Token;
53use smallvec::SmallVec;
54use tendermint_proto::Protobuf;
55use tokio::select;
56use tokio::sync::{mpsc, oneshot, watch};
57use tokio_util::sync::CancellationToken;
58use tracing::{debug, error, info, instrument, trace, warn};
59
60mod connection_control;
61mod header_ex;
62pub(crate) mod header_session;
63pub(crate) mod shwap;
64mod swarm;
65
66use crate::block_ranges::BlockRange;
67use crate::events::{EventPublisher, NodeEvent};
68use crate::p2p::header_ex::{HeaderExBehaviour, HeaderExConfig};
69use crate::p2p::header_session::HeaderSession;
70use crate::p2p::shwap::{convert_cid, get_block_container, ShwapMultihasher};
71use crate::p2p::swarm::new_swarm;
72use crate::peer_tracker::PeerTracker;
73use crate::peer_tracker::PeerTrackerInfo;
74use crate::store::Store;
75use crate::utils::{
76    celestia_protocol_id, fraudsub_ident_topic, gossipsub_ident_topic, MultiaddrExt,
77    OneshotResultSender, OneshotResultSenderExt, OneshotSenderExt,
78};
79
80pub use crate::p2p::header_ex::HeaderExError;
81
82// Minimal number of peers that we want to maintain connection to.
83// If we have fewer peers than that, we will try to reconnect / discover
84// more aggresively.
85const MIN_CONNECTED_PEERS: u64 = 4;
86
87// Maximum size of a [`Multihash`].
88pub(crate) const MAX_MH_SIZE: usize = 64;
89
90// all fraud proofs for height bigger than head height by this threshold
91// will be ignored
92const FRAUD_PROOF_HEAD_HEIGHT_THRESHOLD: u64 = 20;
93
94pub(crate) type Result<T, E = P2pError> = std::result::Result<T, E>;
95
96/// Representation of all the errors that can occur in `P2p` component.
97#[derive(Debug, thiserror::Error)]
98pub enum P2pError {
99    /// Failed to initialize gossipsub behaviour.
100    #[error("Failed to initialize gossipsub behaviour: {0}")]
101    GossipsubInit(String),
102
103    /// Failed to initialize TLS.
104    #[error("Failed to initialize TLS: {0}")]
105    TlsInit(String),
106
107    /// Failed to initialize noise protocol.
108    #[error("Failed to initialize noise: {0}")]
109    NoiseInit(String),
110
111    /// The worker has died.
112    #[error("Worker died")]
113    WorkerDied,
114
115    /// Channel closed unexpectedly.
116    #[error("Channel closed unexpectedly")]
117    ChannelClosedUnexpectedly,
118
119    /// Not connected to any peers.
120    #[error("Not connected to any peers")]
121    NoConnectedPeers,
122
123    /// An error propagated from the `header-ex`.
124    #[error("HeaderEx: {0}")]
125    HeaderEx(#[from] HeaderExError),
126
127    /// Bootnode address is missing its peer ID.
128    #[error("Bootnode multiaddrs without peer ID: {0:?}")]
129    BootnodeAddrsWithoutPeerId(Vec<Multiaddr>),
130
131    /// An error propagated from [`beetswap::Behaviour`].
132    #[error("Bitswap: {0}")]
133    Bitswap(#[from] beetswap::Error),
134
135    /// ProtoBuf message failed to be decoded.
136    #[error("ProtoBuf decoding error: {0}")]
137    ProtoDecodeFailed(#[from] tendermint_proto::Error),
138
139    /// An error propagated from [`celestia_types`] that is related to [`Cid`].
140    #[error("CID error: {0}")]
141    Cid(celestia_types::Error),
142
143    /// Bitswap query timed out.
144    #[error("Bitswap query timed out")]
145    BitswapQueryTimeout,
146
147    /// Shwap protocol error.
148    #[error("Shwap: {0}")]
149    Shwap(String),
150
151    /// An error propagated from [`celestia_types`].
152    #[error(transparent)]
153    CelestiaTypes(#[from] celestia_types::Error),
154}
155
156impl P2pError {
157    /// Returns `true` if an error is fatal in all possible scenarios.
158    ///
159    /// If unsure mark it as non-fatal error.
160    pub(crate) fn is_fatal(&self) -> bool {
161        match self {
162            P2pError::GossipsubInit(_)
163            | P2pError::NoiseInit(_)
164            | P2pError::TlsInit(_)
165            | P2pError::WorkerDied
166            | P2pError::ChannelClosedUnexpectedly
167            | P2pError::BootnodeAddrsWithoutPeerId(_) => true,
168            P2pError::NoConnectedPeers
169            | P2pError::HeaderEx(_)
170            | P2pError::Bitswap(_)
171            | P2pError::ProtoDecodeFailed(_)
172            | P2pError::Cid(_)
173            | P2pError::BitswapQueryTimeout
174            | P2pError::Shwap(_)
175            | P2pError::CelestiaTypes(_) => false,
176        }
177    }
178}
179
180impl From<oneshot::error::RecvError> for P2pError {
181    fn from(_value: oneshot::error::RecvError) -> Self {
182        P2pError::ChannelClosedUnexpectedly
183    }
184}
185
186impl From<prost::DecodeError> for P2pError {
187    fn from(value: prost::DecodeError) -> Self {
188        P2pError::ProtoDecodeFailed(tendermint_proto::Error::decode_message(value))
189    }
190}
191
192impl From<cid::Error> for P2pError {
193    fn from(value: cid::Error) -> Self {
194        P2pError::Cid(celestia_types::Error::CidError(
195            blockstore::block::CidError::InvalidCid(value.to_string()),
196        ))
197    }
198}
199
200/// Component responsible for the peer to peer networking handling.
201#[derive(Debug)]
202pub(crate) struct P2p {
203    cancellation_token: CancellationToken,
204    cmd_tx: mpsc::Sender<P2pCmd>,
205    join_handle: JoinHandle,
206    peer_tracker_info_watcher: watch::Receiver<PeerTrackerInfo>,
207    local_peer_id: PeerId,
208}
209
210/// Arguments used to configure the [`P2p`].
211pub struct P2pArgs<B, S>
212where
213    B: Blockstore,
214    S: Store,
215{
216    /// An id of the network to connect to.
217    pub network_id: String,
218    /// The keypair to be used as the identity.
219    pub local_keypair: Keypair,
220    /// List of bootstrap nodes to connect to and trust.
221    pub bootnodes: Vec<Multiaddr>,
222    /// List of the addresses on which to listen for incoming connections.
223    pub listen_on: Vec<Multiaddr>,
224    /// The store for headers.
225    pub blockstore: Arc<B>,
226    /// The store for headers.
227    pub store: Arc<S>,
228    /// Event publisher.
229    pub event_pub: EventPublisher,
230}
231
232#[derive(Debug)]
233pub(crate) enum P2pCmd {
234    NetworkInfo {
235        respond_to: oneshot::Sender<NetworkInfo>,
236    },
237    HeaderExRequest {
238        request: HeaderRequest,
239        respond_to: OneshotResultSender<Vec<ExtendedHeader>, P2pError>,
240    },
241    Listeners {
242        respond_to: oneshot::Sender<Vec<Multiaddr>>,
243    },
244    ConnectedPeers {
245        respond_to: oneshot::Sender<Vec<PeerId>>,
246    },
247    InitHeaderSub {
248        head: Box<ExtendedHeader>,
249        /// Any valid headers received by header-sub will be send to this channel.
250        channel: mpsc::Sender<ExtendedHeader>,
251    },
252    SetPeerTrust {
253        peer_id: PeerId,
254        is_trusted: bool,
255    },
256    GetShwapCid {
257        cid: Cid,
258        respond_to: OneshotResultSender<Vec<u8>, P2pError>,
259    },
260    GetNetworkCompromisedToken {
261        respond_to: oneshot::Sender<Token>,
262    },
263    GetNetworkHead {
264        respond_to: oneshot::Sender<Option<ExtendedHeader>>,
265    },
266}
267
268impl P2p {
269    /// Creates and starts a new p2p handler.
270    pub async fn start<B, S>(args: P2pArgs<B, S>) -> Result<Self>
271    where
272        B: Blockstore + 'static,
273        S: Store + 'static,
274    {
275        validate_bootnode_addrs(&args.bootnodes)?;
276
277        let local_peer_id = PeerId::from(args.local_keypair.public());
278
279        let peer_tracker = Arc::new(PeerTracker::new(args.event_pub.clone()));
280        let peer_tracker_info_watcher = peer_tracker.info_watcher();
281
282        let cancellation_token = CancellationToken::new();
283        let (cmd_tx, cmd_rx) = mpsc::channel(16);
284
285        let mut worker =
286            Worker::new(args, cancellation_token.child_token(), cmd_rx, peer_tracker).await?;
287
288        let join_handle = spawn(async move {
289            worker.run().await;
290        });
291
292        Ok(P2p {
293            cancellation_token,
294            cmd_tx,
295            join_handle,
296            peer_tracker_info_watcher,
297            local_peer_id,
298        })
299    }
300
301    /// Creates and starts a new mocked p2p handler.
302    #[cfg(test)]
303    pub fn mocked() -> (Self, crate::test_utils::MockP2pHandle) {
304        let (cmd_tx, cmd_rx) = mpsc::channel(16);
305        let (peer_tracker_tx, peer_tracker_rx) = watch::channel(PeerTrackerInfo::default());
306        let cancellation_token = CancellationToken::new();
307
308        // Just a fake join_handle
309        let join_handle = spawn(async {});
310
311        let p2p = P2p {
312            cmd_tx: cmd_tx.clone(),
313            cancellation_token,
314            join_handle,
315            peer_tracker_info_watcher: peer_tracker_rx,
316            local_peer_id: PeerId::random(),
317        };
318
319        let handle = crate::test_utils::MockP2pHandle {
320            cmd_tx,
321            cmd_rx,
322            header_sub_tx: None,
323            peer_tracker_tx,
324        };
325
326        (p2p, handle)
327    }
328
329    /// Stop the worker.
330    pub fn stop(&self) {
331        // Singal the Worker to stop.
332        self.cancellation_token.cancel();
333    }
334
335    /// Wait until worker is completely stopped.
336    pub async fn join(&self) {
337        self.join_handle.join().await;
338    }
339
340    /// Local peer ID on the p2p network.
341    pub fn local_peer_id(&self) -> &PeerId {
342        &self.local_peer_id
343    }
344
345    async fn send_command(&self, cmd: P2pCmd) -> Result<()> {
346        self.cmd_tx
347            .send(cmd)
348            .await
349            .map_err(|_| P2pError::WorkerDied)
350    }
351
352    /// Watcher for the current [`PeerTrackerInfo`].
353    pub fn peer_tracker_info_watcher(&self) -> watch::Receiver<PeerTrackerInfo> {
354        self.peer_tracker_info_watcher.clone()
355    }
356
357    /// A reference to the current [`PeerTrackerInfo`].
358    pub fn peer_tracker_info(&self) -> watch::Ref<PeerTrackerInfo> {
359        self.peer_tracker_info_watcher.borrow()
360    }
361
362    /// Initializes `header-sub` protocol with a given `subjective_head`.
363    pub async fn init_header_sub(
364        &self,
365        head: ExtendedHeader,
366        channel: mpsc::Sender<ExtendedHeader>,
367    ) -> Result<()> {
368        self.send_command(P2pCmd::InitHeaderSub {
369            head: Box::new(head),
370            channel,
371        })
372        .await
373    }
374
375    /// Wait until the node is connected to any peer.
376    pub async fn wait_connected(&self) -> Result<()> {
377        self.peer_tracker_info_watcher()
378            .wait_for(|info| info.num_connected_peers > 0)
379            .await
380            .map(drop)
381            .map_err(|_| P2pError::WorkerDied)
382    }
383
384    /// Wait until the node is connected to any trusted peer.
385    pub async fn wait_connected_trusted(&self) -> Result<()> {
386        self.peer_tracker_info_watcher()
387            .wait_for(|info| info.num_connected_trusted_peers > 0)
388            .await
389            .map(drop)
390            .map_err(|_| P2pError::WorkerDied)
391    }
392
393    /// Get current [`NetworkInfo`].
394    pub async fn network_info(&self) -> Result<NetworkInfo> {
395        let (tx, rx) = oneshot::channel();
396
397        self.send_command(P2pCmd::NetworkInfo { respond_to: tx })
398            .await?;
399
400        Ok(rx.await?)
401    }
402
403    /// Send a request on the `header-ex` protocol.
404    pub async fn header_ex_request(&self, request: HeaderRequest) -> Result<Vec<ExtendedHeader>> {
405        let (tx, rx) = oneshot::channel();
406
407        self.send_command(P2pCmd::HeaderExRequest {
408            request,
409            respond_to: tx,
410        })
411        .await?;
412
413        rx.await?
414    }
415
416    /// Request the head header on the `header-ex` protocol.
417    pub async fn get_head_header(&self) -> Result<ExtendedHeader> {
418        self.get_header_by_height(0).await
419    }
420
421    /// Request the header by hash on the `header-ex` protocol.
422    pub async fn get_header(&self, hash: Hash) -> Result<ExtendedHeader> {
423        self.header_ex_request(HeaderRequest {
424            data: Some(header_request::Data::Hash(hash.as_bytes().to_vec())),
425            amount: 1,
426        })
427        .await?
428        .into_iter()
429        .next()
430        .ok_or(HeaderExError::HeaderNotFound.into())
431    }
432
433    /// Request the header by height on the `header-ex` protocol.
434    pub async fn get_header_by_height(&self, height: u64) -> Result<ExtendedHeader> {
435        self.header_ex_request(HeaderRequest {
436            data: Some(header_request::Data::Origin(height)),
437            amount: 1,
438        })
439        .await?
440        .into_iter()
441        .next()
442        .ok_or(HeaderExError::HeaderNotFound.into())
443    }
444
445    /// Request the headers following the one given with the `header-ex` protocol.
446    ///
447    /// First header from the requested range will be verified against the provided one,
448    /// then each subsequent is verified against the previous one.
449    pub async fn get_verified_headers_range(
450        &self,
451        from: &ExtendedHeader,
452        amount: u64,
453    ) -> Result<Vec<ExtendedHeader>> {
454        // User can give us a bad header, so validate it.
455        from.validate().map_err(|_| HeaderExError::InvalidRequest)?;
456
457        let height = from.height().value() + 1;
458
459        let range = height..=height + amount - 1;
460
461        let mut session = HeaderSession::new(range, self.cmd_tx.clone());
462        let headers = session.run().await?;
463
464        // `.validate()` is called on each header separately by `HeaderExClientHandler`.
465        //
466        // The last step is to verify that all headers are from the same chain
467        // and indeed connected with the next one.
468        from.verify_adjacent_range(&headers)
469            .map_err(|_| HeaderExError::InvalidResponse)?;
470
471        Ok(headers)
472    }
473
474    /// Request a list of ranges with the `header-ex` protocol
475    ///
476    /// For each of the ranges, headers are verified against each other, but it's the caller
477    /// responsibility to verify range edges against headers existing in the store.
478    pub(crate) async fn get_unverified_header_range(
479        &self,
480        range: BlockRange,
481    ) -> Result<Vec<ExtendedHeader>> {
482        if range.is_empty() {
483            return Err(HeaderExError::InvalidRequest.into());
484        }
485
486        let mut session = HeaderSession::new(range, self.cmd_tx.clone());
487        let headers = session.run().await?;
488
489        let Some(head) = headers.first() else {
490            return Err(HeaderExError::InvalidResponse.into());
491        };
492
493        head.verify_adjacent_range(&headers[1..])
494            .map_err(|_| HeaderExError::InvalidResponse)?;
495
496        Ok(headers)
497    }
498
499    /// Request a [`Cid`] on bitswap protocol.
500    pub(crate) async fn get_shwap_cid(
501        &self,
502        cid: Cid,
503        timeout: Option<Duration>,
504    ) -> Result<Vec<u8>> {
505        let (tx, rx) = oneshot::channel();
506
507        self.send_command(P2pCmd::GetShwapCid {
508            cid,
509            respond_to: tx,
510        })
511        .await?;
512
513        let data = match timeout {
514            Some(dur) => time::timeout(dur, rx)
515                .await
516                .map_err(|_| P2pError::BitswapQueryTimeout)???,
517            None => rx.await??,
518        };
519
520        get_block_container(&cid, &data)
521    }
522
523    /// Request a [`Row`] on bitswap protocol.
524    pub async fn get_row(
525        &self,
526        row_index: u16,
527        block_height: u64,
528        timeout: Option<Duration>,
529    ) -> Result<Row> {
530        let id = RowId::new(row_index, block_height).map_err(P2pError::Cid)?;
531        let cid = convert_cid(&id.into())?;
532
533        let data = self.get_shwap_cid(cid, timeout).await?;
534        let row = Row::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?;
535        Ok(row)
536    }
537
538    /// Request a [`Sample`] on bitswap protocol.
539    pub async fn get_sample(
540        &self,
541        row_index: u16,
542        column_index: u16,
543        block_height: u64,
544        timeout: Option<Duration>,
545    ) -> Result<Sample> {
546        let id = SampleId::new(row_index, column_index, block_height).map_err(P2pError::Cid)?;
547        let cid = convert_cid(&id.into())?;
548
549        let data = self.get_shwap_cid(cid, timeout).await?;
550        let sample = Sample::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?;
551        Ok(sample)
552    }
553
554    /// Request a [`RowNamespaceData`] on bitswap protocol.
555    pub async fn get_row_namespace_data(
556        &self,
557        namespace: Namespace,
558        row_index: u16,
559        block_height: u64,
560        timeout: Option<Duration>,
561    ) -> Result<RowNamespaceData> {
562        let id =
563            RowNamespaceDataId::new(namespace, row_index, block_height).map_err(P2pError::Cid)?;
564        let cid = convert_cid(&id.into())?;
565
566        let data = self.get_shwap_cid(cid, timeout).await?;
567        let row_namespace_data =
568            RowNamespaceData::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?;
569        Ok(row_namespace_data)
570    }
571
572    /// Request all blobs with provided namespace in the block corresponding to this header
573    /// using bitswap protocol.
574    pub async fn get_all_blobs(
575        &self,
576        header: &ExtendedHeader,
577        namespace: Namespace,
578        timeout: Option<Duration>,
579    ) -> Result<Vec<Blob>> {
580        let height = header.height().value();
581        let app_version = header.app_version()?;
582        let rows_to_fetch: Vec<_> = header
583            .dah
584            .row_roots()
585            .iter()
586            .enumerate()
587            .filter(|(_, row)| row.contains::<NamespacedSha2Hasher>(*namespace))
588            .map(|(n, _)| n as u16)
589            .collect();
590
591        let futs = rows_to_fetch
592            .into_iter()
593            .map(|row_idx| self.get_row_namespace_data(namespace, row_idx, height, timeout))
594            .collect::<FuturesOrdered<_>>();
595        let rows: Vec<_> = futs.try_collect().await?;
596        let shares = rows.iter().flat_map(|row| row.shares.iter());
597
598        Ok(Blob::reconstruct_all(shares, app_version)?)
599    }
600
601    /// Get the addresses where [`P2p`] listens on for incoming connections.
602    pub async fn listeners(&self) -> Result<Vec<Multiaddr>> {
603        let (tx, rx) = oneshot::channel();
604
605        self.send_command(P2pCmd::Listeners { respond_to: tx })
606            .await?;
607
608        Ok(rx.await?)
609    }
610
611    /// Get the list of connected peers.
612    pub async fn connected_peers(&self) -> Result<Vec<PeerId>> {
613        let (tx, rx) = oneshot::channel();
614
615        self.send_command(P2pCmd::ConnectedPeers { respond_to: tx })
616            .await?;
617
618        Ok(rx.await?)
619    }
620
621    /// Alter the trust status for a given peer.
622    pub async fn set_peer_trust(&self, peer_id: PeerId, is_trusted: bool) -> Result<()> {
623        self.send_command(P2pCmd::SetPeerTrust {
624            peer_id,
625            is_trusted,
626        })
627        .await
628    }
629
630    /// Get the cancellation token which will be cancelled when the network gets compromised.
631    ///
632    /// After this token is cancelled, the network should be treated as insincere
633    /// and should not be trusted.
634    pub(crate) async fn get_network_compromised_token(&self) -> Result<Token> {
635        let (tx, rx) = oneshot::channel();
636
637        self.send_command(P2pCmd::GetNetworkCompromisedToken { respond_to: tx })
638            .await?;
639
640        Ok(rx.await?)
641    }
642
643    /// Get the latest header announced on the network.
644    pub async fn get_network_head(&self) -> Result<Option<ExtendedHeader>> {
645        let (tx, rx) = oneshot::channel();
646
647        self.send_command(P2pCmd::GetNetworkHead { respond_to: tx })
648            .await?;
649
650        Ok(rx.await?)
651    }
652}
653
654impl Drop for P2p {
655    fn drop(&mut self) {
656        self.stop();
657    }
658}
659
660/// Our network behaviour.
661#[derive(NetworkBehaviour)]
662struct Behaviour<B, S>
663where
664    B: Blockstore + 'static,
665    S: Store + 'static,
666{
667    connection_control: connection_control::Behaviour,
668    autonat: autonat::Behaviour,
669    bitswap: beetswap::Behaviour<MAX_MH_SIZE, B>,
670    ping: ping::Behaviour,
671    identify: identify::Behaviour,
672    header_ex: HeaderExBehaviour<S>,
673    gossipsub: gossipsub::Behaviour,
674    kademlia: kad::Behaviour<kad::store::MemoryStore>,
675}
676
677struct Worker<B, S>
678where
679    B: Blockstore + 'static,
680    S: Store + 'static,
681{
682    cancellation_token: CancellationToken,
683    swarm: Swarm<Behaviour<B, S>>,
684    listeners: SmallVec<[ListenerId; 1]>,
685    header_sub_topic_hash: TopicHash,
686    bad_encoding_fraud_sub_topic: TopicHash,
687    cmd_rx: mpsc::Receiver<P2pCmd>,
688    peer_tracker: Arc<PeerTracker>,
689    header_sub_state: Option<HeaderSubState>,
690    bitswap_queries: HashMap<beetswap::QueryId, OneshotResultSender<Vec<u8>, P2pError>>,
691    network_compromised_token: Token,
692    store: Arc<S>,
693    event_pub: EventPublisher,
694    bootnodes: HashMap<PeerId, Vec<Multiaddr>>,
695}
696
697struct HeaderSubState {
698    known_head: ExtendedHeader,
699    channel: mpsc::Sender<ExtendedHeader>,
700}
701
702impl<B, S> Worker<B, S>
703where
704    B: Blockstore,
705    S: Store,
706{
707    async fn new(
708        args: P2pArgs<B, S>,
709        cancellation_token: CancellationToken,
710        cmd_rx: mpsc::Receiver<P2pCmd>,
711        peer_tracker: Arc<PeerTracker>,
712    ) -> Result<Self, P2pError> {
713        let local_peer_id = PeerId::from(args.local_keypair.public());
714
715        let connection_control = connection_control::Behaviour::new();
716        let autonat = autonat::Behaviour::new(local_peer_id, autonat::Config::default());
717        let ping = ping::Behaviour::new(ping::Config::default());
718
719        let agent_version = format!("lumina/{}/{}", args.network_id, env!("CARGO_PKG_VERSION"));
720        let identify = identify::Behaviour::new(
721            identify::Config::new(String::new(), args.local_keypair.public())
722                .with_agent_version(agent_version),
723        );
724
725        let header_sub_topic = gossipsub_ident_topic(&args.network_id, "/header-sub/v0.0.1");
726        let bad_encoding_fraud_sub_topic =
727            fraudsub_ident_topic(BadEncodingFraudProof::TYPE, &args.network_id);
728        let gossipsub = init_gossipsub(&args, [&header_sub_topic, &bad_encoding_fraud_sub_topic])?;
729
730        let kademlia = init_kademlia(&args)?;
731        let bitswap = init_bitswap(
732            args.blockstore.clone(),
733            args.store.clone(),
734            &args.network_id,
735        )?;
736
737        let header_ex = HeaderExBehaviour::new(HeaderExConfig {
738            network_id: &args.network_id,
739            peer_tracker: peer_tracker.clone(),
740            header_store: args.store.clone(),
741        });
742
743        let behaviour = Behaviour {
744            connection_control,
745            autonat,
746            bitswap,
747            ping,
748            identify,
749            gossipsub,
750            header_ex,
751            kademlia,
752        };
753
754        let mut swarm = new_swarm(args.local_keypair, behaviour).await?;
755        let mut listeners = SmallVec::new();
756
757        for addr in args.listen_on {
758            match swarm.listen_on(addr.clone()) {
759                Ok(id) => listeners.push(id),
760                Err(e) => error!("Failed to listen on {addr}: {e}"),
761            }
762        }
763
764        let mut bootnodes = HashMap::<_, Vec<_>>::new();
765
766        for addr in args.bootnodes {
767            let peer_id = addr.peer_id().expect("multiaddr already validated");
768            bootnodes.entry(peer_id).or_default().push(addr);
769        }
770
771        for (peer_id, addrs) in bootnodes.iter_mut() {
772            addrs.sort();
773            addrs.dedup();
774            addrs.shrink_to_fit();
775
776            // Bootstrap peers are always trusted
777            peer_tracker.set_trusted(*peer_id, true);
778        }
779
780        Ok(Worker {
781            cancellation_token,
782            cmd_rx,
783            swarm,
784            listeners,
785            bad_encoding_fraud_sub_topic: bad_encoding_fraud_sub_topic.hash(),
786            header_sub_topic_hash: header_sub_topic.hash(),
787            peer_tracker,
788            header_sub_state: None,
789            bitswap_queries: HashMap::new(),
790            network_compromised_token: Token::new(),
791            store: args.store,
792            event_pub: args.event_pub,
793            bootnodes,
794        })
795    }
796
797    async fn run(&mut self) {
798        let mut report_interval = Interval::new(Duration::from_secs(60)).await;
799        let mut kademlia_interval = Interval::new(Duration::from_secs(30)).await;
800        let mut peer_tracker_info_watcher = self.peer_tracker.info_watcher();
801
802        // Initiate discovery
803        self.bootstrap();
804
805        loop {
806            select! {
807                _ = self.cancellation_token.cancelled() => break,
808                _ = peer_tracker_info_watcher.changed() => {
809                    if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
810                        warn!("All peers disconnected");
811                        self.bootstrap();
812                    }
813                }
814                _ = report_interval.tick() => {
815                    self.report();
816                }
817                _ = kademlia_interval.tick() => {
818                    if self.peer_tracker.info().num_connected_peers < MIN_CONNECTED_PEERS
819                    {
820                        self.bootstrap();
821                    }
822                }
823                _ = poll_closed(&mut self.bitswap_queries) => {
824                    self.prune_canceled_bitswap_queries();
825                }
826                ev = self.swarm.select_next_some() => {
827                    if let Err(e) = self.on_swarm_event(ev).await {
828                        warn!("Failure while handling swarm event: {e}");
829                    }
830                },
831                Some(cmd) = self.cmd_rx.recv() => {
832                    if let Err(e) = self.on_cmd(cmd).await {
833                        warn!("Failure while handling command. (error: {e})");
834                    }
835                }
836            }
837        }
838
839        self.on_stop().await;
840    }
841
842    fn bootstrap(&mut self) {
843        self.event_pub.send(NodeEvent::ConnectingToBootnodes);
844
845        for (peer_id, addrs) in &self.bootnodes {
846            let dial_opts = DialOpts::peer_id(*peer_id)
847                .addresses(addrs.clone())
848                // Tell Swarm not to dial if peer is already connected or there
849                // is an ongoing dialing.
850                .condition(PeerCondition::DisconnectedAndNotDialing)
851                .build();
852
853            if let Err(e) = self.swarm.dial(dial_opts) {
854                if !matches!(e, DialError::DialPeerConditionFalse(_)) {
855                    warn!("Failed to dial on {addrs:?}: {e}");
856                }
857            }
858        }
859
860        // trigger kademlia bootstrap
861        if self.swarm.behaviour_mut().kademlia.bootstrap().is_err() {
862            warn!("Can't run kademlia bootstrap, no known peers");
863        }
864    }
865
866    fn prune_canceled_bitswap_queries(&mut self) {
867        let mut cancelled = SmallVec::<[_; 16]>::new();
868
869        for (query_id, chan) in &self.bitswap_queries {
870            if chan.is_closed() {
871                cancelled.push(*query_id);
872            }
873        }
874
875        for query_id in cancelled {
876            self.bitswap_queries.remove(&query_id);
877            self.swarm.behaviour_mut().bitswap.cancel(query_id);
878        }
879    }
880
881    async fn on_stop(&mut self) {
882        self.swarm
883            .behaviour_mut()
884            .connection_control
885            .set_stopping(true);
886        self.swarm.behaviour_mut().header_ex.stop();
887
888        for listener in self.listeners.drain(..) {
889            self.swarm.remove_listener(listener);
890        }
891
892        for (_, ids) in self.peer_tracker.connections() {
893            for id in ids {
894                self.swarm.close_connection(id);
895            }
896        }
897
898        // Waiting until all established connections closed.
899        while self
900            .swarm
901            .network_info()
902            .connection_counters()
903            .num_established()
904            > 0
905        {
906            match self.swarm.select_next_some().await {
907                // We may receive this if connection was established just before we trigger stop.
908                SwarmEvent::ConnectionEstablished { connection_id, .. } => {
909                    // We immediately close the connection in this case.
910                    self.swarm.close_connection(connection_id);
911                }
912                SwarmEvent::ConnectionClosed {
913                    peer_id,
914                    connection_id,
915                    ..
916                } => {
917                    // This will generate the PeerDisconnected events.
918                    self.on_peer_disconnected(peer_id, connection_id);
919                }
920                _ => {}
921            }
922        }
923    }
924
925    async fn on_swarm_event(&mut self, ev: SwarmEvent<BehaviourEvent<B, S>>) -> Result<()> {
926        match ev {
927            SwarmEvent::Behaviour(ev) => match ev {
928                BehaviourEvent::Identify(ev) => self.on_identify_event(ev).await?,
929                BehaviourEvent::Gossipsub(ev) => self.on_gossip_sub_event(ev).await,
930                BehaviourEvent::Kademlia(ev) => self.on_kademlia_event(ev).await?,
931                BehaviourEvent::Bitswap(ev) => self.on_bitswap_event(ev).await,
932                BehaviourEvent::Ping(ev) => self.on_ping_event(ev).await,
933                BehaviourEvent::Autonat(_)
934                | BehaviourEvent::ConnectionControl(_)
935                | BehaviourEvent::HeaderEx(_) => {}
936            },
937            SwarmEvent::ConnectionEstablished {
938                peer_id,
939                connection_id,
940                endpoint,
941                ..
942            } => {
943                self.on_peer_connected(peer_id, connection_id, endpoint);
944            }
945            SwarmEvent::ConnectionClosed {
946                peer_id,
947                connection_id,
948                ..
949            } => {
950                self.on_peer_disconnected(peer_id, connection_id);
951            }
952            _ => {}
953        }
954
955        Ok(())
956    }
957
958    async fn on_cmd(&mut self, cmd: P2pCmd) -> Result<()> {
959        match cmd {
960            P2pCmd::NetworkInfo { respond_to } => {
961                respond_to.maybe_send(self.swarm.network_info());
962            }
963            P2pCmd::HeaderExRequest {
964                request,
965                respond_to,
966            } => {
967                self.swarm
968                    .behaviour_mut()
969                    .header_ex
970                    .send_request(request, respond_to);
971            }
972            P2pCmd::Listeners { respond_to } => {
973                let local_peer_id = self.swarm.local_peer_id().to_owned();
974                let listeners = self
975                    .swarm
976                    .listeners()
977                    .cloned()
978                    .map(|mut ma| {
979                        if !ma.protocol_stack().any(|protocol| protocol == "p2p") {
980                            ma.push(Protocol::P2p(local_peer_id))
981                        }
982                        ma
983                    })
984                    .collect();
985
986                respond_to.maybe_send(listeners);
987            }
988            P2pCmd::ConnectedPeers { respond_to } => {
989                respond_to.maybe_send(self.peer_tracker.connected_peers());
990            }
991            P2pCmd::InitHeaderSub { head, channel } => {
992                self.on_init_header_sub(*head, channel);
993            }
994            P2pCmd::SetPeerTrust {
995                peer_id,
996                is_trusted,
997            } => {
998                if *self.swarm.local_peer_id() != peer_id {
999                    self.peer_tracker.set_trusted(peer_id, is_trusted);
1000                }
1001            }
1002            P2pCmd::GetShwapCid { cid, respond_to } => {
1003                self.on_get_shwap_cid(cid, respond_to);
1004            }
1005            P2pCmd::GetNetworkCompromisedToken { respond_to } => {
1006                respond_to.maybe_send(self.network_compromised_token.clone())
1007            }
1008            P2pCmd::GetNetworkHead { respond_to } => {
1009                let head = self
1010                    .header_sub_state
1011                    .as_ref()
1012                    .map(|state| state.known_head.clone());
1013                respond_to.maybe_send(head);
1014            }
1015        }
1016
1017        Ok(())
1018    }
1019
1020    #[instrument(skip_all)]
1021    fn report(&mut self) {
1022        let tracker_info = self.peer_tracker.info();
1023
1024        info!(
1025            "peers: {}, trusted peers: {}",
1026            tracker_info.num_connected_peers, tracker_info.num_connected_trusted_peers,
1027        );
1028    }
1029
1030    #[instrument(level = "trace", skip(self))]
1031    async fn on_identify_event(&mut self, ev: identify::Event) -> Result<()> {
1032        match ev {
1033            identify::Event::Received { peer_id, info, .. } => {
1034                // Inform Kademlia about the listening addresses
1035                // TODO: Remove this when rust-libp2p#5103 is implemented
1036                for addr in info.listen_addrs {
1037                    self.swarm
1038                        .behaviour_mut()
1039                        .kademlia
1040                        .add_address(&peer_id, addr);
1041                }
1042            }
1043            _ => trace!("Unhandled identify event"),
1044        }
1045
1046        Ok(())
1047    }
1048
1049    #[instrument(level = "trace", skip(self))]
1050    async fn on_gossip_sub_event(&mut self, ev: gossipsub::Event) {
1051        match ev {
1052            gossipsub::Event::Message {
1053                message,
1054                message_id,
1055                ..
1056            } => {
1057                let Some(peer) = message.source else {
1058                    // Validation mode is `strict` so this will never happen
1059                    return;
1060                };
1061
1062                let acceptance = if message.topic == self.header_sub_topic_hash {
1063                    self.on_header_sub_message(&message.data[..])
1064                } else if message.topic == self.bad_encoding_fraud_sub_topic {
1065                    self.on_bad_encoding_fraud_sub_message(&message.data[..], &peer)
1066                        .await
1067                } else {
1068                    trace!("Unhandled gossipsub message");
1069                    gossipsub::MessageAcceptance::Ignore
1070                };
1071
1072                if !matches!(acceptance, gossipsub::MessageAcceptance::Reject) {
1073                    // We may have discovered a new peer
1074                    self.peer_maybe_discovered(peer);
1075                }
1076
1077                let _ = self
1078                    .swarm
1079                    .behaviour_mut()
1080                    .gossipsub
1081                    .report_message_validation_result(&message_id, &peer, acceptance);
1082            }
1083            _ => trace!("Unhandled gossipsub event"),
1084        }
1085    }
1086
1087    #[instrument(level = "trace", skip(self))]
1088    async fn on_kademlia_event(&mut self, ev: kad::Event) -> Result<()> {
1089        match ev {
1090            kad::Event::RoutingUpdated {
1091                peer, addresses, ..
1092            } => {
1093                self.peer_tracker.add_addresses(peer, addresses.iter());
1094            }
1095            _ => trace!("Unhandled Kademlia event"),
1096        }
1097
1098        Ok(())
1099    }
1100
1101    #[instrument(level = "trace", skip_all)]
1102    fn on_get_shwap_cid(&mut self, cid: Cid, respond_to: OneshotResultSender<Vec<u8>, P2pError>) {
1103        trace!("Requesting CID {cid} from bitswap");
1104        let query_id = self.swarm.behaviour_mut().bitswap.get(&cid);
1105        self.bitswap_queries.insert(query_id, respond_to);
1106    }
1107
1108    #[instrument(level = "trace", skip(self))]
1109    async fn on_bitswap_event(&mut self, ev: beetswap::Event) {
1110        match ev {
1111            beetswap::Event::GetQueryResponse { query_id, data } => {
1112                if let Some(respond_to) = self.bitswap_queries.remove(&query_id) {
1113                    respond_to.maybe_send_ok(data);
1114                }
1115            }
1116            beetswap::Event::GetQueryError { query_id, error } => {
1117                if let Some(respond_to) = self.bitswap_queries.remove(&query_id) {
1118                    let error: P2pError = error.into();
1119                    respond_to.maybe_send_err(error);
1120                }
1121            }
1122        }
1123    }
1124
1125    #[instrument(level = "debug", skip_all)]
1126    async fn on_ping_event(&mut self, ev: ping::Event) {
1127        match ev.result {
1128            Ok(dur) => debug!(
1129                "Ping success: peer: {}, connection_id: {}, time: {:?}",
1130                ev.peer, ev.connection, dur
1131            ),
1132            Err(e) => {
1133                debug!(
1134                    "Ping failure: peer: {}, connection_id: {}, error: {}",
1135                    &ev.peer, &ev.connection, e
1136                );
1137                self.swarm.close_connection(ev.connection);
1138            }
1139        }
1140    }
1141
1142    #[instrument(skip_all, fields(peer_id = %peer_id))]
1143    fn peer_maybe_discovered(&mut self, peer_id: PeerId) {
1144        if !self.peer_tracker.set_maybe_discovered(peer_id) {
1145            return;
1146        }
1147
1148        debug!("Peer discovered");
1149    }
1150
1151    #[instrument(skip_all, fields(peer_id = %peer_id))]
1152    fn on_peer_connected(
1153        &mut self,
1154        peer_id: PeerId,
1155        connection_id: ConnectionId,
1156        endpoint: ConnectedPoint,
1157    ) {
1158        debug!("Peer connected");
1159
1160        // Inform PeerTracker about the dialed address.
1161        //
1162        // We do this because Kademlia send commands to Swarm
1163        // for dialing a peer and we may not have that address
1164        // in PeerTracker.
1165        let dialed_addr = match endpoint {
1166            ConnectedPoint::Dialer {
1167                address,
1168                role_override: Endpoint::Dialer,
1169                ..
1170            } => Some(address),
1171            _ => None,
1172        };
1173
1174        self.peer_tracker
1175            .set_connected(peer_id, connection_id, dialed_addr);
1176    }
1177
1178    #[instrument(skip_all, fields(peer_id = %peer_id))]
1179    fn on_peer_disconnected(&mut self, peer_id: PeerId, connection_id: ConnectionId) {
1180        if self
1181            .peer_tracker
1182            .set_maybe_disconnected(peer_id, connection_id)
1183        {
1184            debug!("Peer disconnected");
1185        }
1186    }
1187
1188    #[instrument(skip_all, fields(header = %head))]
1189    fn on_init_header_sub(&mut self, head: ExtendedHeader, channel: mpsc::Sender<ExtendedHeader>) {
1190        self.header_sub_state = Some(HeaderSubState {
1191            known_head: head,
1192            channel,
1193        });
1194        trace!("HeaderSub initialized");
1195    }
1196
1197    #[instrument(skip_all)]
1198    fn on_header_sub_message(&mut self, data: &[u8]) -> gossipsub::MessageAcceptance {
1199        let Ok(header) = ExtendedHeader::decode_and_validate(data) else {
1200            trace!("Malformed or invalid header from header-sub");
1201            return gossipsub::MessageAcceptance::Reject;
1202        };
1203
1204        trace!("Received header from header-sub ({header})");
1205
1206        let Some(ref mut state) = self.header_sub_state else {
1207            debug!("header-sub not initialized yet");
1208            return gossipsub::MessageAcceptance::Ignore;
1209        };
1210
1211        if state.known_head.verify(&header).is_err() {
1212            trace!("Failed to verify HeaderSub header. Ignoring {header}");
1213            return gossipsub::MessageAcceptance::Ignore;
1214        }
1215
1216        trace!("New header from header-sub ({header})");
1217
1218        state.known_head = header.clone();
1219        // We intentionally do not `send().await` to avoid blocking `P2p`
1220        // in case `Syncer` enters some weird state.
1221        let _ = state.channel.try_send(header);
1222
1223        gossipsub::MessageAcceptance::Accept
1224    }
1225
1226    #[instrument(skip_all)]
1227    async fn on_bad_encoding_fraud_sub_message(
1228        &mut self,
1229        data: &[u8],
1230        peer: &PeerId,
1231    ) -> gossipsub::MessageAcceptance {
1232        let Ok(befp) = BadEncodingFraudProof::decode(data) else {
1233            trace!("Malformed bad encoding fraud proof from {peer}");
1234            self.swarm.behaviour_mut().gossipsub.blacklist_peer(peer);
1235            return gossipsub::MessageAcceptance::Reject;
1236        };
1237
1238        let height = befp.height().value();
1239
1240        let current_height = if let Some(ref header_sub_state) = self.header_sub_state {
1241            header_sub_state.known_head.height().value()
1242        } else if let Ok(local_head) = self.store.get_head().await {
1243            local_head.height().value()
1244        } else {
1245            // we aren't tracking the network and have uninitialized store
1246            return gossipsub::MessageAcceptance::Ignore;
1247        };
1248
1249        if height > current_height + FRAUD_PROOF_HEAD_HEIGHT_THRESHOLD {
1250            // does this threshold make any sense if we're gonna ignore it anyway
1251            // since we won't have the header
1252            return gossipsub::MessageAcceptance::Ignore;
1253        }
1254
1255        let hash = befp.header_hash();
1256        let Ok(header) = self.store.get_by_hash(&hash).await else {
1257            // we can't verify the proof without a header
1258            // TODO: should we then store it and wait for the height? celestia doesn't
1259            return gossipsub::MessageAcceptance::Ignore;
1260        };
1261
1262        if let Err(e) = befp.validate(&header) {
1263            trace!("Received invalid bad encoding fraud proof from {peer}: {e}");
1264            self.swarm.behaviour_mut().gossipsub.blacklist_peer(peer);
1265            return gossipsub::MessageAcceptance::Reject;
1266        }
1267
1268        warn!("Received a valid bad encoding fraud proof");
1269        // trigger cancellation for all services
1270        self.network_compromised_token.trigger();
1271
1272        gossipsub::MessageAcceptance::Accept
1273    }
1274}
1275
1276/// Awaits at least one channel from the `bitswap_queries` to close.
1277async fn poll_closed(
1278    bitswap_queries: &mut HashMap<beetswap::QueryId, OneshotResultSender<Vec<u8>, P2pError>>,
1279) {
1280    poll_fn(|cx| {
1281        for chan in bitswap_queries.values_mut() {
1282            match chan.poll_closed(cx) {
1283                Poll::Pending => continue,
1284                Poll::Ready(_) => return Poll::Ready(()),
1285            }
1286        }
1287
1288        Poll::Pending
1289    })
1290    .await
1291}
1292
1293fn validate_bootnode_addrs(addrs: &[Multiaddr]) -> Result<(), P2pError> {
1294    let mut invalid_addrs = Vec::new();
1295
1296    for addr in addrs {
1297        if addr.peer_id().is_none() {
1298            invalid_addrs.push(addr.to_owned());
1299        }
1300    }
1301
1302    if invalid_addrs.is_empty() {
1303        Ok(())
1304    } else {
1305        Err(P2pError::BootnodeAddrsWithoutPeerId(invalid_addrs))
1306    }
1307}
1308
1309fn init_gossipsub<'a, B, S>(
1310    args: &'a P2pArgs<B, S>,
1311    topics: impl IntoIterator<Item = &'a gossipsub::IdentTopic>,
1312) -> Result<gossipsub::Behaviour>
1313where
1314    B: Blockstore,
1315    S: Store,
1316{
1317    // Set the message authenticity - How we expect to publish messages
1318    // Here we expect the publisher to sign the message with their key.
1319    let message_authenticity = gossipsub::MessageAuthenticity::Signed(args.local_keypair.clone());
1320
1321    let config = gossipsub::ConfigBuilder::default()
1322        .validation_mode(gossipsub::ValidationMode::Strict)
1323        .validate_messages()
1324        .build()
1325        .map_err(|e| P2pError::GossipsubInit(e.to_string()))?;
1326
1327    // build a gossipsub network behaviour
1328    let mut gossipsub: gossipsub::Behaviour =
1329        gossipsub::Behaviour::new(message_authenticity, config)
1330            .map_err(|e| P2pError::GossipsubInit(e.to_string()))?;
1331
1332    for topic in topics {
1333        gossipsub
1334            .subscribe(topic)
1335            .map_err(|e| P2pError::GossipsubInit(e.to_string()))?;
1336    }
1337
1338    Ok(gossipsub)
1339}
1340
1341fn init_kademlia<B, S>(args: &P2pArgs<B, S>) -> Result<kad::Behaviour<kad::store::MemoryStore>>
1342where
1343    B: Blockstore,
1344    S: Store,
1345{
1346    let local_peer_id = PeerId::from(args.local_keypair.public());
1347    let store = kad::store::MemoryStore::new(local_peer_id);
1348
1349    let protocol_id = celestia_protocol_id(&args.network_id, "/kad/1.0.0");
1350    let config = kad::Config::new(protocol_id);
1351
1352    let mut kademlia = kad::Behaviour::with_config(local_peer_id, store, config);
1353
1354    for addr in &args.bootnodes {
1355        if let Some(peer_id) = addr.peer_id() {
1356            kademlia.add_address(&peer_id, addr.to_owned());
1357        }
1358    }
1359
1360    if !args.listen_on.is_empty() {
1361        kademlia.set_mode(Some(kad::Mode::Server));
1362    }
1363
1364    Ok(kademlia)
1365}
1366
1367fn init_bitswap<B, S>(
1368    blockstore: Arc<B>,
1369    store: Arc<S>,
1370    network_id: &str,
1371) -> Result<beetswap::Behaviour<MAX_MH_SIZE, B>>
1372where
1373    B: Blockstore + 'static,
1374    S: Store + 'static,
1375{
1376    let protocol_prefix = celestia_protocol_id(network_id, "shwap");
1377
1378    Ok(beetswap::Behaviour::builder(blockstore)
1379        .protocol_prefix(protocol_prefix.as_ref())?
1380        .register_multihasher(ShwapMultihasher::new(store))
1381        .client_set_send_dont_have(false)
1382        .build())
1383}