lumina_node/
p2p.rs

1//! Component responsible for the messaging and interacting with other nodes on the peer to peer layer.
2//!
3//! It is a high level integration of various p2p protocols used by Celestia nodes.
4//! Currently supporting:
5//! - libp2p-identitfy
6//! - libp2p-kad
7//! - libp2p-autonat
8//! - libp2p-ping
9//! - header-sub topic on libp2p-gossipsub
10//! - fraud-sub topic on libp2p-gossipsub
11//! - header-ex client
12//! - header-ex server
13//! - bitswap 1.2.0
14//! - shwap - celestia's data availability protocol on top of bitswap
15
16use std::collections::HashMap;
17use std::future::poll_fn;
18use std::sync::Arc;
19use std::task::Poll;
20use std::time::Duration;
21
22use blockstore::Blockstore;
23use celestia_proto::p2p::pb::{header_request, HeaderRequest};
24use celestia_types::fraud_proof::BadEncodingFraudProof;
25use celestia_types::hash::Hash;
26use celestia_types::nmt::{Namespace, NamespacedSha2Hasher};
27use celestia_types::row::{Row, RowId};
28use celestia_types::row_namespace_data::{RowNamespaceData, RowNamespaceDataId};
29use celestia_types::sample::{Sample, SampleId};
30use celestia_types::{Blob, ExtendedHeader, FraudProof};
31use cid::Cid;
32use futures::stream::FuturesOrdered;
33use futures::{StreamExt, TryStreamExt};
34use libp2p::core::transport::ListenerId;
35use libp2p::{
36    autonat,
37    core::{ConnectedPoint, Endpoint},
38    gossipsub::{self, TopicHash},
39    identify,
40    identity::Keypair,
41    kad,
42    multiaddr::Protocol,
43    ping,
44    swarm::{
45        dial_opts::{DialOpts, PeerCondition},
46        ConnectionId, DialError, NetworkBehaviour, NetworkInfo, Swarm, SwarmEvent,
47    },
48    Multiaddr, PeerId,
49};
50use lumina_utils::executor::{spawn, JoinHandle};
51use lumina_utils::time::{self, Interval};
52use lumina_utils::token::Token;
53use smallvec::SmallVec;
54use tendermint_proto::Protobuf;
55use tokio::select;
56use tokio::sync::{mpsc, oneshot, watch};
57use tokio_util::sync::CancellationToken;
58use tracing::{debug, error, info, instrument, trace, warn};
59
60mod connection_control;
61mod header_ex;
62pub(crate) mod header_session;
63pub(crate) mod shwap;
64mod swarm;
65
66use crate::block_ranges::BlockRange;
67use crate::events::{EventPublisher, NodeEvent};
68use crate::p2p::header_ex::{HeaderExBehaviour, HeaderExConfig};
69use crate::p2p::header_session::HeaderSession;
70use crate::p2p::shwap::{convert_cid, get_block_container, ShwapMultihasher};
71use crate::p2p::swarm::new_swarm;
72use crate::peer_tracker::PeerTracker;
73use crate::peer_tracker::PeerTrackerInfo;
74use crate::store::{Store, StoreError};
75use crate::utils::{
76    celestia_protocol_id, fraudsub_ident_topic, gossipsub_ident_topic, MultiaddrExt,
77    OneshotResultSender, OneshotResultSenderExt, OneshotSenderExt,
78};
79
80pub use crate::p2p::header_ex::HeaderExError;
81
82// Minimal number of peers that we want to maintain connection to.
83// If we have fewer peers than that, we will try to reconnect / discover
84// more aggresively.
85const MIN_CONNECTED_PEERS: u64 = 4;
86
87// Maximum size of a [`Multihash`].
88pub(crate) const MAX_MH_SIZE: usize = 64;
89
90// all fraud proofs for height bigger than head height by this threshold
91// will be ignored
92const FRAUD_PROOF_HEAD_HEIGHT_THRESHOLD: u64 = 20;
93
94pub(crate) type Result<T, E = P2pError> = std::result::Result<T, E>;
95
96/// Representation of all the errors that can occur in `P2p` component.
97#[derive(Debug, thiserror::Error)]
98pub enum P2pError {
99    /// Failed to initialize gossipsub behaviour.
100    #[error("Failed to initialize gossipsub behaviour: {0}")]
101    GossipsubInit(String),
102
103    /// Failed to initialize TLS.
104    #[error("Failed to initialize TLS: {0}")]
105    TlsInit(String),
106
107    /// Failed to initialize noise protocol.
108    #[error("Failed to initialize noise: {0}")]
109    NoiseInit(String),
110
111    /// The worker has died.
112    #[error("Worker died")]
113    WorkerDied,
114
115    /// Channel closed unexpectedly.
116    #[error("Channel closed unexpectedly")]
117    ChannelClosedUnexpectedly,
118
119    /// Not connected to any peers.
120    #[error("Not connected to any peers")]
121    NoConnectedPeers,
122
123    /// An error propagated from the `header-ex`.
124    #[error("HeaderEx: {0}")]
125    HeaderEx(#[from] HeaderExError),
126
127    /// Bootnode address is missing its peer ID.
128    #[error("Bootnode multiaddrs without peer ID: {0:?}")]
129    BootnodeAddrsWithoutPeerId(Vec<Multiaddr>),
130
131    /// An error propagated from [`beetswap::Behaviour`].
132    #[error("Bitswap: {0}")]
133    Bitswap(#[from] beetswap::Error),
134
135    /// ProtoBuf message failed to be decoded.
136    #[error("ProtoBuf decoding error: {0}")]
137    ProtoDecodeFailed(#[from] tendermint_proto::Error),
138
139    /// An error propagated from [`celestia_types`] that is related to [`Cid`].
140    #[error("CID error: {0}")]
141    Cid(celestia_types::Error),
142
143    /// Bitswap query timed out.
144    #[error("Bitswap query timed out")]
145    BitswapQueryTimeout,
146
147    /// Shwap protocol error.
148    #[error("Shwap: {0}")]
149    Shwap(String),
150
151    /// An error propagated from [`celestia_types`].
152    #[error(transparent)]
153    CelestiaTypes(#[from] celestia_types::Error),
154
155    /// An error propagated from [`Store`].
156    #[error("Store error: {0}")]
157    Store(#[from] StoreError),
158
159    /// Header was pruned.
160    #[error("Header of {0} block was pruned because it is outside of retention period")]
161    HeaderPruned(u64),
162
163    /// Header not synced yet.
164    #[error("Header of {0} block is not synced yet")]
165    HeaderNotSynced(u64),
166}
167
168impl P2pError {
169    /// Returns `true` if an error is fatal in all possible scenarios.
170    ///
171    /// If unsure mark it as non-fatal error.
172    pub(crate) fn is_fatal(&self) -> bool {
173        match self {
174            P2pError::GossipsubInit(_)
175            | P2pError::NoiseInit(_)
176            | P2pError::TlsInit(_)
177            | P2pError::WorkerDied
178            | P2pError::ChannelClosedUnexpectedly
179            | P2pError::BootnodeAddrsWithoutPeerId(_) => true,
180            P2pError::NoConnectedPeers
181            | P2pError::HeaderEx(_)
182            | P2pError::Bitswap(_)
183            | P2pError::ProtoDecodeFailed(_)
184            | P2pError::Cid(_)
185            | P2pError::BitswapQueryTimeout
186            | P2pError::Shwap(_)
187            | P2pError::CelestiaTypes(_)
188            | P2pError::HeaderPruned(_)
189            | P2pError::HeaderNotSynced(_) => false,
190            P2pError::Store(e) => e.is_fatal(),
191        }
192    }
193}
194
195impl From<oneshot::error::RecvError> for P2pError {
196    fn from(_value: oneshot::error::RecvError) -> Self {
197        P2pError::ChannelClosedUnexpectedly
198    }
199}
200
201impl From<prost::DecodeError> for P2pError {
202    fn from(value: prost::DecodeError) -> Self {
203        P2pError::ProtoDecodeFailed(tendermint_proto::Error::decode_message(value))
204    }
205}
206
207impl From<cid::Error> for P2pError {
208    fn from(value: cid::Error) -> Self {
209        P2pError::Cid(celestia_types::Error::CidError(
210            blockstore::block::CidError::InvalidCid(value.to_string()),
211        ))
212    }
213}
214
215/// Component responsible for the peer to peer networking handling.
216#[derive(Debug)]
217pub(crate) struct P2p {
218    cancellation_token: CancellationToken,
219    cmd_tx: mpsc::Sender<P2pCmd>,
220    join_handle: JoinHandle,
221    peer_tracker_info_watcher: watch::Receiver<PeerTrackerInfo>,
222    local_peer_id: PeerId,
223}
224
225/// Arguments used to configure the [`P2p`].
226pub struct P2pArgs<B, S>
227where
228    B: Blockstore,
229    S: Store,
230{
231    /// An id of the network to connect to.
232    pub network_id: String,
233    /// The keypair to be used as the identity.
234    pub local_keypair: Keypair,
235    /// List of bootstrap nodes to connect to and trust.
236    pub bootnodes: Vec<Multiaddr>,
237    /// List of the addresses on which to listen for incoming connections.
238    pub listen_on: Vec<Multiaddr>,
239    /// The store for headers.
240    pub blockstore: Arc<B>,
241    /// The store for headers.
242    pub store: Arc<S>,
243    /// Event publisher.
244    pub event_pub: EventPublisher,
245}
246
247#[derive(Debug)]
248pub(crate) enum P2pCmd {
249    NetworkInfo {
250        respond_to: oneshot::Sender<NetworkInfo>,
251    },
252    HeaderExRequest {
253        request: HeaderRequest,
254        respond_to: OneshotResultSender<Vec<ExtendedHeader>, P2pError>,
255    },
256    Listeners {
257        respond_to: oneshot::Sender<Vec<Multiaddr>>,
258    },
259    ConnectedPeers {
260        respond_to: oneshot::Sender<Vec<PeerId>>,
261    },
262    InitHeaderSub {
263        head: Box<ExtendedHeader>,
264        /// Any valid headers received by header-sub will be send to this channel.
265        channel: mpsc::Sender<ExtendedHeader>,
266    },
267    SetPeerTrust {
268        peer_id: PeerId,
269        is_trusted: bool,
270    },
271    GetShwapCid {
272        cid: Cid,
273        respond_to: OneshotResultSender<Vec<u8>, P2pError>,
274    },
275    GetNetworkCompromisedToken {
276        respond_to: oneshot::Sender<Token>,
277    },
278    GetNetworkHead {
279        respond_to: oneshot::Sender<Option<ExtendedHeader>>,
280    },
281}
282
283impl P2p {
284    /// Creates and starts a new p2p handler.
285    pub async fn start<B, S>(args: P2pArgs<B, S>) -> Result<Self>
286    where
287        B: Blockstore + 'static,
288        S: Store + 'static,
289    {
290        validate_bootnode_addrs(&args.bootnodes)?;
291
292        let local_peer_id = PeerId::from(args.local_keypair.public());
293
294        let peer_tracker = Arc::new(PeerTracker::new(args.event_pub.clone()));
295        let peer_tracker_info_watcher = peer_tracker.info_watcher();
296
297        let cancellation_token = CancellationToken::new();
298        let (cmd_tx, cmd_rx) = mpsc::channel(16);
299
300        let mut worker =
301            Worker::new(args, cancellation_token.child_token(), cmd_rx, peer_tracker).await?;
302
303        let join_handle = spawn(async move {
304            worker.run().await;
305        });
306
307        Ok(P2p {
308            cancellation_token,
309            cmd_tx,
310            join_handle,
311            peer_tracker_info_watcher,
312            local_peer_id,
313        })
314    }
315
316    /// Creates and starts a new mocked p2p handler.
317    #[cfg(test)]
318    pub fn mocked() -> (Self, crate::test_utils::MockP2pHandle) {
319        let (cmd_tx, cmd_rx) = mpsc::channel(16);
320        let (peer_tracker_tx, peer_tracker_rx) = watch::channel(PeerTrackerInfo::default());
321        let cancellation_token = CancellationToken::new();
322
323        // Just a fake join_handle
324        let join_handle = spawn(async {});
325
326        let p2p = P2p {
327            cmd_tx: cmd_tx.clone(),
328            cancellation_token,
329            join_handle,
330            peer_tracker_info_watcher: peer_tracker_rx,
331            local_peer_id: PeerId::random(),
332        };
333
334        let handle = crate::test_utils::MockP2pHandle {
335            cmd_tx,
336            cmd_rx,
337            header_sub_tx: None,
338            peer_tracker_tx,
339        };
340
341        (p2p, handle)
342    }
343
344    /// Stop the worker.
345    pub fn stop(&self) {
346        // Singal the Worker to stop.
347        self.cancellation_token.cancel();
348    }
349
350    /// Wait until worker is completely stopped.
351    pub async fn join(&self) {
352        self.join_handle.join().await;
353    }
354
355    /// Local peer ID on the p2p network.
356    pub fn local_peer_id(&self) -> &PeerId {
357        &self.local_peer_id
358    }
359
360    async fn send_command(&self, cmd: P2pCmd) -> Result<()> {
361        self.cmd_tx
362            .send(cmd)
363            .await
364            .map_err(|_| P2pError::WorkerDied)
365    }
366
367    /// Watcher for the current [`PeerTrackerInfo`].
368    pub fn peer_tracker_info_watcher(&self) -> watch::Receiver<PeerTrackerInfo> {
369        self.peer_tracker_info_watcher.clone()
370    }
371
372    /// A reference to the current [`PeerTrackerInfo`].
373    pub fn peer_tracker_info(&self) -> watch::Ref<'_, PeerTrackerInfo> {
374        self.peer_tracker_info_watcher.borrow()
375    }
376
377    /// Initializes `header-sub` protocol with a given `subjective_head`.
378    pub async fn init_header_sub(
379        &self,
380        head: ExtendedHeader,
381        channel: mpsc::Sender<ExtendedHeader>,
382    ) -> Result<()> {
383        self.send_command(P2pCmd::InitHeaderSub {
384            head: Box::new(head),
385            channel,
386        })
387        .await
388    }
389
390    /// Wait until the node is connected to any peer.
391    pub async fn wait_connected(&self) -> Result<()> {
392        self.peer_tracker_info_watcher()
393            .wait_for(|info| info.num_connected_peers > 0)
394            .await
395            .map(drop)
396            .map_err(|_| P2pError::WorkerDied)
397    }
398
399    /// Wait until the node is connected to any trusted peer.
400    pub async fn wait_connected_trusted(&self) -> Result<()> {
401        self.peer_tracker_info_watcher()
402            .wait_for(|info| info.num_connected_trusted_peers > 0)
403            .await
404            .map(drop)
405            .map_err(|_| P2pError::WorkerDied)
406    }
407
408    /// Get current [`NetworkInfo`].
409    pub async fn network_info(&self) -> Result<NetworkInfo> {
410        let (tx, rx) = oneshot::channel();
411
412        self.send_command(P2pCmd::NetworkInfo { respond_to: tx })
413            .await?;
414
415        Ok(rx.await?)
416    }
417
418    /// Send a request on the `header-ex` protocol.
419    pub async fn header_ex_request(&self, request: HeaderRequest) -> Result<Vec<ExtendedHeader>> {
420        let (tx, rx) = oneshot::channel();
421
422        self.send_command(P2pCmd::HeaderExRequest {
423            request,
424            respond_to: tx,
425        })
426        .await?;
427
428        rx.await?
429    }
430
431    /// Request the head header on the `header-ex` protocol.
432    pub async fn get_head_header(&self) -> Result<ExtendedHeader> {
433        self.get_header_by_height(0).await
434    }
435
436    /// Request the header by hash on the `header-ex` protocol.
437    pub async fn get_header(&self, hash: Hash) -> Result<ExtendedHeader> {
438        self.header_ex_request(HeaderRequest {
439            data: Some(header_request::Data::Hash(hash.as_bytes().to_vec())),
440            amount: 1,
441        })
442        .await?
443        .into_iter()
444        .next()
445        .ok_or(HeaderExError::HeaderNotFound.into())
446    }
447
448    /// Request the header by height on the `header-ex` protocol.
449    pub async fn get_header_by_height(&self, height: u64) -> Result<ExtendedHeader> {
450        self.header_ex_request(HeaderRequest {
451            data: Some(header_request::Data::Origin(height)),
452            amount: 1,
453        })
454        .await?
455        .into_iter()
456        .next()
457        .ok_or(HeaderExError::HeaderNotFound.into())
458    }
459
460    /// Request the headers following the one given with the `header-ex` protocol.
461    ///
462    /// First header from the requested range will be verified against the provided one,
463    /// then each subsequent is verified against the previous one.
464    pub async fn get_verified_headers_range(
465        &self,
466        from: &ExtendedHeader,
467        amount: u64,
468    ) -> Result<Vec<ExtendedHeader>> {
469        // User can give us a bad header, so validate it.
470        from.validate().map_err(|_| HeaderExError::InvalidRequest)?;
471
472        let height = from.height().value() + 1;
473
474        let range = height..=height + amount - 1;
475
476        let mut session = HeaderSession::new(range, self.cmd_tx.clone());
477        let headers = session.run().await?;
478
479        // `.validate()` is called on each header separately by `HeaderExClientHandler`.
480        //
481        // The last step is to verify that all headers are from the same chain
482        // and indeed connected with the next one.
483        from.verify_adjacent_range(&headers)
484            .map_err(|_| HeaderExError::InvalidResponse)?;
485
486        Ok(headers)
487    }
488
489    /// Request a list of ranges with the `header-ex` protocol
490    ///
491    /// For each of the ranges, headers are verified against each other, but it's the caller
492    /// responsibility to verify range edges against headers existing in the store.
493    pub(crate) async fn get_unverified_header_range(
494        &self,
495        range: BlockRange,
496    ) -> Result<Vec<ExtendedHeader>> {
497        if range.is_empty() {
498            return Err(HeaderExError::InvalidRequest.into());
499        }
500
501        let mut session = HeaderSession::new(range, self.cmd_tx.clone());
502        let headers = session.run().await?;
503
504        let Some(head) = headers.first() else {
505            return Err(HeaderExError::InvalidResponse.into());
506        };
507
508        head.verify_adjacent_range(&headers[1..])
509            .map_err(|_| HeaderExError::InvalidResponse)?;
510
511        Ok(headers)
512    }
513
514    /// Request a [`Cid`] on bitswap protocol.
515    pub(crate) async fn get_shwap_cid(
516        &self,
517        cid: Cid,
518        timeout: Option<Duration>,
519    ) -> Result<Vec<u8>> {
520        let (tx, rx) = oneshot::channel();
521
522        self.send_command(P2pCmd::GetShwapCid {
523            cid,
524            respond_to: tx,
525        })
526        .await?;
527
528        let data = match timeout {
529            Some(dur) => time::timeout(dur, rx)
530                .await
531                .map_err(|_| P2pError::BitswapQueryTimeout)???,
532            None => rx.await??,
533        };
534
535        get_block_container(&cid, &data)
536    }
537
538    /// Request a [`Row`] on bitswap protocol.
539    pub async fn get_row(
540        &self,
541        row_index: u16,
542        block_height: u64,
543        timeout: Option<Duration>,
544    ) -> Result<Row> {
545        let id = RowId::new(row_index, block_height).map_err(P2pError::Cid)?;
546        let cid = convert_cid(&id.into())?;
547
548        let data = self.get_shwap_cid(cid, timeout).await?;
549        let row = Row::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?;
550        Ok(row)
551    }
552
553    /// Request a [`Sample`] on bitswap protocol.
554    pub async fn get_sample(
555        &self,
556        row_index: u16,
557        column_index: u16,
558        block_height: u64,
559        timeout: Option<Duration>,
560    ) -> Result<Sample> {
561        let id = SampleId::new(row_index, column_index, block_height).map_err(P2pError::Cid)?;
562        let cid = convert_cid(&id.into())?;
563
564        let data = self.get_shwap_cid(cid, timeout).await?;
565        let sample = Sample::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?;
566        Ok(sample)
567    }
568
569    /// Request a [`RowNamespaceData`] on bitswap protocol.
570    pub async fn get_row_namespace_data(
571        &self,
572        namespace: Namespace,
573        row_index: u16,
574        block_height: u64,
575        timeout: Option<Duration>,
576    ) -> Result<RowNamespaceData> {
577        let id =
578            RowNamespaceDataId::new(namespace, row_index, block_height).map_err(P2pError::Cid)?;
579        let cid = convert_cid(&id.into())?;
580
581        let data = self.get_shwap_cid(cid, timeout).await?;
582        let row_namespace_data =
583            RowNamespaceData::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?;
584        Ok(row_namespace_data)
585    }
586
587    /// Request all blobs with provided namespace in the block corresponding to this header
588    /// using bitswap protocol.
589    pub async fn get_all_blobs<S>(
590        &self,
591        namespace: Namespace,
592        block_height: u64,
593        timeout: Option<Duration>,
594        store: &S,
595    ) -> Result<Vec<Blob>>
596    where
597        S: Store,
598    {
599        let header = match store.get_by_height(block_height).await {
600            Ok(header) => header,
601            Err(StoreError::NotFound) => {
602                let pruned_ranges = store.get_pruned_ranges().await?;
603
604                if pruned_ranges.contains(block_height) {
605                    return Err(P2pError::HeaderPruned(block_height));
606                } else {
607                    return Err(P2pError::HeaderNotSynced(block_height));
608                }
609            }
610            Err(e) => return Err(e.into()),
611        };
612
613        let app_version = header.app_version()?;
614        let rows_to_fetch: Vec<_> = header
615            .dah
616            .row_roots()
617            .iter()
618            .enumerate()
619            .filter(|(_, row)| row.contains::<NamespacedSha2Hasher>(*namespace))
620            .map(|(n, _)| n as u16)
621            .collect();
622
623        let futs = rows_to_fetch
624            .into_iter()
625            .map(|row_idx| self.get_row_namespace_data(namespace, row_idx, block_height, timeout))
626            .collect::<FuturesOrdered<_>>();
627
628        let rows: Vec<_> = match futs.try_collect().await {
629            Ok(rows) => rows,
630            Err(P2pError::BitswapQueryTimeout) if !store.has_at(block_height).await => {
631                return Err(P2pError::HeaderPruned(block_height));
632            }
633            Err(e) => return Err(e),
634        };
635
636        let shares = rows.iter().flat_map(|row| row.shares.iter());
637
638        Ok(Blob::reconstruct_all(shares, app_version)?)
639    }
640
641    /// Get the addresses where [`P2p`] listens on for incoming connections.
642    pub async fn listeners(&self) -> Result<Vec<Multiaddr>> {
643        let (tx, rx) = oneshot::channel();
644
645        self.send_command(P2pCmd::Listeners { respond_to: tx })
646            .await?;
647
648        Ok(rx.await?)
649    }
650
651    /// Get the list of connected peers.
652    pub async fn connected_peers(&self) -> Result<Vec<PeerId>> {
653        let (tx, rx) = oneshot::channel();
654
655        self.send_command(P2pCmd::ConnectedPeers { respond_to: tx })
656            .await?;
657
658        Ok(rx.await?)
659    }
660
661    /// Alter the trust status for a given peer.
662    pub async fn set_peer_trust(&self, peer_id: PeerId, is_trusted: bool) -> Result<()> {
663        self.send_command(P2pCmd::SetPeerTrust {
664            peer_id,
665            is_trusted,
666        })
667        .await
668    }
669
670    /// Get the cancellation token which will be cancelled when the network gets compromised.
671    ///
672    /// After this token is cancelled, the network should be treated as insincere
673    /// and should not be trusted.
674    pub(crate) async fn get_network_compromised_token(&self) -> Result<Token> {
675        let (tx, rx) = oneshot::channel();
676
677        self.send_command(P2pCmd::GetNetworkCompromisedToken { respond_to: tx })
678            .await?;
679
680        Ok(rx.await?)
681    }
682
683    /// Get the latest header announced on the network.
684    pub async fn get_network_head(&self) -> Result<Option<ExtendedHeader>> {
685        let (tx, rx) = oneshot::channel();
686
687        self.send_command(P2pCmd::GetNetworkHead { respond_to: tx })
688            .await?;
689
690        Ok(rx.await?)
691    }
692}
693
694impl Drop for P2p {
695    fn drop(&mut self) {
696        self.stop();
697    }
698}
699
700/// Our network behaviour.
701#[derive(NetworkBehaviour)]
702struct Behaviour<B, S>
703where
704    B: Blockstore + 'static,
705    S: Store + 'static,
706{
707    connection_control: connection_control::Behaviour,
708    autonat: autonat::Behaviour,
709    bitswap: beetswap::Behaviour<MAX_MH_SIZE, B>,
710    ping: ping::Behaviour,
711    identify: identify::Behaviour,
712    header_ex: HeaderExBehaviour<S>,
713    gossipsub: gossipsub::Behaviour,
714    kademlia: kad::Behaviour<kad::store::MemoryStore>,
715}
716
717struct Worker<B, S>
718where
719    B: Blockstore + 'static,
720    S: Store + 'static,
721{
722    cancellation_token: CancellationToken,
723    swarm: Swarm<Behaviour<B, S>>,
724    listeners: SmallVec<[ListenerId; 1]>,
725    header_sub_topic_hash: TopicHash,
726    bad_encoding_fraud_sub_topic: TopicHash,
727    cmd_rx: mpsc::Receiver<P2pCmd>,
728    peer_tracker: Arc<PeerTracker>,
729    header_sub_state: Option<HeaderSubState>,
730    bitswap_queries: HashMap<beetswap::QueryId, OneshotResultSender<Vec<u8>, P2pError>>,
731    network_compromised_token: Token,
732    store: Arc<S>,
733    event_pub: EventPublisher,
734    bootnodes: HashMap<PeerId, Vec<Multiaddr>>,
735}
736
737struct HeaderSubState {
738    known_head: ExtendedHeader,
739    channel: mpsc::Sender<ExtendedHeader>,
740}
741
742impl<B, S> Worker<B, S>
743where
744    B: Blockstore,
745    S: Store,
746{
747    async fn new(
748        args: P2pArgs<B, S>,
749        cancellation_token: CancellationToken,
750        cmd_rx: mpsc::Receiver<P2pCmd>,
751        peer_tracker: Arc<PeerTracker>,
752    ) -> Result<Self, P2pError> {
753        let local_peer_id = PeerId::from(args.local_keypair.public());
754
755        let connection_control = connection_control::Behaviour::new();
756        let autonat = autonat::Behaviour::new(local_peer_id, autonat::Config::default());
757        let ping = ping::Behaviour::new(ping::Config::default());
758
759        let agent_version = format!("lumina/{}/{}", args.network_id, env!("CARGO_PKG_VERSION"));
760        let identify = identify::Behaviour::new(
761            identify::Config::new(String::new(), args.local_keypair.public())
762                .with_agent_version(agent_version),
763        );
764
765        let header_sub_topic = gossipsub_ident_topic(&args.network_id, "/header-sub/v0.0.1");
766        let bad_encoding_fraud_sub_topic =
767            fraudsub_ident_topic(BadEncodingFraudProof::TYPE, &args.network_id);
768        let gossipsub = init_gossipsub(&args, [&header_sub_topic, &bad_encoding_fraud_sub_topic])?;
769
770        let kademlia = init_kademlia(&args)?;
771        let bitswap = init_bitswap(
772            args.blockstore.clone(),
773            args.store.clone(),
774            &args.network_id,
775        )?;
776
777        let header_ex = HeaderExBehaviour::new(HeaderExConfig {
778            network_id: &args.network_id,
779            peer_tracker: peer_tracker.clone(),
780            header_store: args.store.clone(),
781        });
782
783        let behaviour = Behaviour {
784            connection_control,
785            autonat,
786            bitswap,
787            ping,
788            identify,
789            gossipsub,
790            header_ex,
791            kademlia,
792        };
793
794        let mut swarm = new_swarm(args.local_keypair, behaviour).await?;
795        let mut listeners = SmallVec::new();
796
797        for addr in args.listen_on {
798            match swarm.listen_on(addr.clone()) {
799                Ok(id) => listeners.push(id),
800                Err(e) => error!("Failed to listen on {addr}: {e}"),
801            }
802        }
803
804        let mut bootnodes = HashMap::<_, Vec<_>>::new();
805
806        for addr in args.bootnodes {
807            let peer_id = addr.peer_id().expect("multiaddr already validated");
808            bootnodes.entry(peer_id).or_default().push(addr);
809        }
810
811        for (peer_id, addrs) in bootnodes.iter_mut() {
812            addrs.sort();
813            addrs.dedup();
814            addrs.shrink_to_fit();
815
816            // Bootstrap peers are always trusted
817            peer_tracker.set_trusted(*peer_id, true);
818        }
819
820        Ok(Worker {
821            cancellation_token,
822            cmd_rx,
823            swarm,
824            listeners,
825            bad_encoding_fraud_sub_topic: bad_encoding_fraud_sub_topic.hash(),
826            header_sub_topic_hash: header_sub_topic.hash(),
827            peer_tracker,
828            header_sub_state: None,
829            bitswap_queries: HashMap::new(),
830            network_compromised_token: Token::new(),
831            store: args.store,
832            event_pub: args.event_pub,
833            bootnodes,
834        })
835    }
836
837    async fn run(&mut self) {
838        let mut report_interval = Interval::new(Duration::from_secs(60)).await;
839        let mut kademlia_interval = Interval::new(Duration::from_secs(30)).await;
840        let mut peer_tracker_info_watcher = self.peer_tracker.info_watcher();
841
842        // Initiate discovery
843        self.bootstrap();
844
845        loop {
846            select! {
847                _ = self.cancellation_token.cancelled() => break,
848                _ = peer_tracker_info_watcher.changed() => {
849                    if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
850                        warn!("All peers disconnected");
851                        self.bootstrap();
852                    }
853                }
854                _ = report_interval.tick() => {
855                    self.report();
856                }
857                _ = kademlia_interval.tick() => {
858                    if self.peer_tracker.info().num_connected_peers < MIN_CONNECTED_PEERS
859                    {
860                        self.bootstrap();
861                    }
862                }
863                _ = poll_closed(&mut self.bitswap_queries) => {
864                    self.prune_canceled_bitswap_queries();
865                }
866                ev = self.swarm.select_next_some() => {
867                    if let Err(e) = self.on_swarm_event(ev).await {
868                        warn!("Failure while handling swarm event: {e}");
869                    }
870                },
871                Some(cmd) = self.cmd_rx.recv() => {
872                    if let Err(e) = self.on_cmd(cmd).await {
873                        warn!("Failure while handling command. (error: {e})");
874                    }
875                }
876            }
877        }
878
879        self.on_stop().await;
880    }
881
882    fn bootstrap(&mut self) {
883        self.event_pub.send(NodeEvent::ConnectingToBootnodes);
884
885        for (peer_id, addrs) in &self.bootnodes {
886            let dial_opts = DialOpts::peer_id(*peer_id)
887                .addresses(addrs.clone())
888                // Tell Swarm not to dial if peer is already connected or there
889                // is an ongoing dialing.
890                .condition(PeerCondition::DisconnectedAndNotDialing)
891                .build();
892
893            if let Err(e) = self.swarm.dial(dial_opts) {
894                if !matches!(e, DialError::DialPeerConditionFalse(_)) {
895                    warn!("Failed to dial on {addrs:?}: {e}");
896                }
897            }
898        }
899
900        // trigger kademlia bootstrap
901        if self.swarm.behaviour_mut().kademlia.bootstrap().is_err() {
902            warn!("Can't run kademlia bootstrap, no known peers");
903        }
904    }
905
906    fn prune_canceled_bitswap_queries(&mut self) {
907        let mut cancelled = SmallVec::<[_; 16]>::new();
908
909        for (query_id, chan) in &self.bitswap_queries {
910            if chan.is_closed() {
911                cancelled.push(*query_id);
912            }
913        }
914
915        for query_id in cancelled {
916            self.bitswap_queries.remove(&query_id);
917            self.swarm.behaviour_mut().bitswap.cancel(query_id);
918        }
919    }
920
921    async fn on_stop(&mut self) {
922        self.swarm
923            .behaviour_mut()
924            .connection_control
925            .set_stopping(true);
926        self.swarm.behaviour_mut().header_ex.stop();
927
928        for listener in self.listeners.drain(..) {
929            self.swarm.remove_listener(listener);
930        }
931
932        for (_, ids) in self.peer_tracker.connections() {
933            for id in ids {
934                self.swarm.close_connection(id);
935            }
936        }
937
938        // Waiting until all established connections closed.
939        while self
940            .swarm
941            .network_info()
942            .connection_counters()
943            .num_established()
944            > 0
945        {
946            match self.swarm.select_next_some().await {
947                // We may receive this if connection was established just before we trigger stop.
948                SwarmEvent::ConnectionEstablished { connection_id, .. } => {
949                    // We immediately close the connection in this case.
950                    self.swarm.close_connection(connection_id);
951                }
952                SwarmEvent::ConnectionClosed {
953                    peer_id,
954                    connection_id,
955                    ..
956                } => {
957                    // This will generate the PeerDisconnected events.
958                    self.on_peer_disconnected(peer_id, connection_id);
959                }
960                _ => {}
961            }
962        }
963    }
964
965    async fn on_swarm_event(&mut self, ev: SwarmEvent<BehaviourEvent<B, S>>) -> Result<()> {
966        match ev {
967            SwarmEvent::Behaviour(ev) => match ev {
968                BehaviourEvent::Identify(ev) => self.on_identify_event(ev).await?,
969                BehaviourEvent::Gossipsub(ev) => self.on_gossip_sub_event(ev).await,
970                BehaviourEvent::Kademlia(ev) => self.on_kademlia_event(ev).await?,
971                BehaviourEvent::Bitswap(ev) => self.on_bitswap_event(ev).await,
972                BehaviourEvent::Ping(ev) => self.on_ping_event(ev).await,
973                BehaviourEvent::Autonat(_)
974                | BehaviourEvent::ConnectionControl(_)
975                | BehaviourEvent::HeaderEx(_) => {}
976            },
977            SwarmEvent::ConnectionEstablished {
978                peer_id,
979                connection_id,
980                endpoint,
981                ..
982            } => {
983                self.on_peer_connected(peer_id, connection_id, endpoint);
984            }
985            SwarmEvent::ConnectionClosed {
986                peer_id,
987                connection_id,
988                ..
989            } => {
990                self.on_peer_disconnected(peer_id, connection_id);
991            }
992            _ => {}
993        }
994
995        Ok(())
996    }
997
998    async fn on_cmd(&mut self, cmd: P2pCmd) -> Result<()> {
999        match cmd {
1000            P2pCmd::NetworkInfo { respond_to } => {
1001                respond_to.maybe_send(self.swarm.network_info());
1002            }
1003            P2pCmd::HeaderExRequest {
1004                request,
1005                respond_to,
1006            } => {
1007                self.swarm
1008                    .behaviour_mut()
1009                    .header_ex
1010                    .send_request(request, respond_to);
1011            }
1012            P2pCmd::Listeners { respond_to } => {
1013                let local_peer_id = self.swarm.local_peer_id().to_owned();
1014                let listeners = self
1015                    .swarm
1016                    .listeners()
1017                    .cloned()
1018                    .map(|mut ma| {
1019                        if !ma.protocol_stack().any(|protocol| protocol == "p2p") {
1020                            ma.push(Protocol::P2p(local_peer_id))
1021                        }
1022                        ma
1023                    })
1024                    .collect();
1025
1026                respond_to.maybe_send(listeners);
1027            }
1028            P2pCmd::ConnectedPeers { respond_to } => {
1029                respond_to.maybe_send(self.peer_tracker.connected_peers());
1030            }
1031            P2pCmd::InitHeaderSub { head, channel } => {
1032                self.on_init_header_sub(*head, channel);
1033            }
1034            P2pCmd::SetPeerTrust {
1035                peer_id,
1036                is_trusted,
1037            } => {
1038                if *self.swarm.local_peer_id() != peer_id {
1039                    self.peer_tracker.set_trusted(peer_id, is_trusted);
1040                }
1041            }
1042            P2pCmd::GetShwapCid { cid, respond_to } => {
1043                self.on_get_shwap_cid(cid, respond_to);
1044            }
1045            P2pCmd::GetNetworkCompromisedToken { respond_to } => {
1046                respond_to.maybe_send(self.network_compromised_token.clone())
1047            }
1048            P2pCmd::GetNetworkHead { respond_to } => {
1049                let head = self
1050                    .header_sub_state
1051                    .as_ref()
1052                    .map(|state| state.known_head.clone());
1053                respond_to.maybe_send(head);
1054            }
1055        }
1056
1057        Ok(())
1058    }
1059
1060    #[instrument(skip_all)]
1061    fn report(&mut self) {
1062        let tracker_info = self.peer_tracker.info();
1063
1064        info!(
1065            "peers: {}, trusted peers: {}",
1066            tracker_info.num_connected_peers, tracker_info.num_connected_trusted_peers,
1067        );
1068    }
1069
1070    #[instrument(level = "trace", skip(self))]
1071    async fn on_identify_event(&mut self, ev: identify::Event) -> Result<()> {
1072        match ev {
1073            identify::Event::Received { peer_id, info, .. } => {
1074                // Inform Kademlia about the listening addresses
1075                // TODO: Remove this when rust-libp2p#5103 is implemented
1076                for addr in info.listen_addrs {
1077                    self.swarm
1078                        .behaviour_mut()
1079                        .kademlia
1080                        .add_address(&peer_id, addr);
1081                }
1082            }
1083            _ => trace!("Unhandled identify event"),
1084        }
1085
1086        Ok(())
1087    }
1088
1089    #[instrument(level = "trace", skip(self))]
1090    async fn on_gossip_sub_event(&mut self, ev: gossipsub::Event) {
1091        match ev {
1092            gossipsub::Event::Message {
1093                message,
1094                message_id,
1095                ..
1096            } => {
1097                let Some(peer) = message.source else {
1098                    // Validation mode is `strict` so this will never happen
1099                    return;
1100                };
1101
1102                let acceptance = if message.topic == self.header_sub_topic_hash {
1103                    self.on_header_sub_message(&message.data[..])
1104                } else if message.topic == self.bad_encoding_fraud_sub_topic {
1105                    self.on_bad_encoding_fraud_sub_message(&message.data[..], &peer)
1106                        .await
1107                } else {
1108                    trace!("Unhandled gossipsub message");
1109                    gossipsub::MessageAcceptance::Ignore
1110                };
1111
1112                if !matches!(acceptance, gossipsub::MessageAcceptance::Reject) {
1113                    // We may have discovered a new peer
1114                    self.peer_maybe_discovered(peer);
1115                }
1116
1117                let _ = self
1118                    .swarm
1119                    .behaviour_mut()
1120                    .gossipsub
1121                    .report_message_validation_result(&message_id, &peer, acceptance);
1122            }
1123            _ => trace!("Unhandled gossipsub event"),
1124        }
1125    }
1126
1127    #[instrument(level = "trace", skip(self))]
1128    async fn on_kademlia_event(&mut self, ev: kad::Event) -> Result<()> {
1129        match ev {
1130            kad::Event::RoutingUpdated {
1131                peer, addresses, ..
1132            } => {
1133                self.peer_tracker.add_addresses(peer, addresses.iter());
1134            }
1135            _ => trace!("Unhandled Kademlia event"),
1136        }
1137
1138        Ok(())
1139    }
1140
1141    #[instrument(level = "trace", skip_all)]
1142    fn on_get_shwap_cid(&mut self, cid: Cid, respond_to: OneshotResultSender<Vec<u8>, P2pError>) {
1143        trace!("Requesting CID {cid} from bitswap");
1144        let query_id = self.swarm.behaviour_mut().bitswap.get(&cid);
1145        self.bitswap_queries.insert(query_id, respond_to);
1146    }
1147
1148    #[instrument(level = "trace", skip(self))]
1149    async fn on_bitswap_event(&mut self, ev: beetswap::Event) {
1150        match ev {
1151            beetswap::Event::GetQueryResponse { query_id, data } => {
1152                if let Some(respond_to) = self.bitswap_queries.remove(&query_id) {
1153                    respond_to.maybe_send_ok(data);
1154                }
1155            }
1156            beetswap::Event::GetQueryError { query_id, error } => {
1157                if let Some(respond_to) = self.bitswap_queries.remove(&query_id) {
1158                    let error: P2pError = error.into();
1159                    respond_to.maybe_send_err(error);
1160                }
1161            }
1162        }
1163    }
1164
1165    #[instrument(level = "debug", skip_all)]
1166    async fn on_ping_event(&mut self, ev: ping::Event) {
1167        match ev.result {
1168            Ok(dur) => debug!(
1169                "Ping success: peer: {}, connection_id: {}, time: {:?}",
1170                ev.peer, ev.connection, dur
1171            ),
1172            Err(e) => {
1173                debug!(
1174                    "Ping failure: peer: {}, connection_id: {}, error: {}",
1175                    &ev.peer, &ev.connection, e
1176                );
1177                self.swarm.close_connection(ev.connection);
1178            }
1179        }
1180    }
1181
1182    #[instrument(skip_all, fields(peer_id = %peer_id))]
1183    fn peer_maybe_discovered(&mut self, peer_id: PeerId) {
1184        if !self.peer_tracker.set_maybe_discovered(peer_id) {
1185            return;
1186        }
1187
1188        debug!("Peer discovered");
1189    }
1190
1191    #[instrument(skip_all, fields(peer_id = %peer_id))]
1192    fn on_peer_connected(
1193        &mut self,
1194        peer_id: PeerId,
1195        connection_id: ConnectionId,
1196        endpoint: ConnectedPoint,
1197    ) {
1198        debug!("Peer connected");
1199
1200        // Inform PeerTracker about the dialed address.
1201        //
1202        // We do this because Kademlia send commands to Swarm
1203        // for dialing a peer and we may not have that address
1204        // in PeerTracker.
1205        let dialed_addr = match endpoint {
1206            ConnectedPoint::Dialer {
1207                address,
1208                role_override: Endpoint::Dialer,
1209                ..
1210            } => Some(address),
1211            _ => None,
1212        };
1213
1214        self.peer_tracker
1215            .set_connected(peer_id, connection_id, dialed_addr);
1216    }
1217
1218    #[instrument(skip_all, fields(peer_id = %peer_id))]
1219    fn on_peer_disconnected(&mut self, peer_id: PeerId, connection_id: ConnectionId) {
1220        if self
1221            .peer_tracker
1222            .set_maybe_disconnected(peer_id, connection_id)
1223        {
1224            debug!("Peer disconnected");
1225        }
1226    }
1227
1228    #[instrument(skip_all, fields(header = %head))]
1229    fn on_init_header_sub(&mut self, head: ExtendedHeader, channel: mpsc::Sender<ExtendedHeader>) {
1230        self.header_sub_state = Some(HeaderSubState {
1231            known_head: head,
1232            channel,
1233        });
1234        trace!("HeaderSub initialized");
1235    }
1236
1237    #[instrument(skip_all)]
1238    fn on_header_sub_message(&mut self, data: &[u8]) -> gossipsub::MessageAcceptance {
1239        let Ok(header) = ExtendedHeader::decode_and_validate(data) else {
1240            trace!("Malformed or invalid header from header-sub");
1241            return gossipsub::MessageAcceptance::Reject;
1242        };
1243
1244        trace!("Received header from header-sub ({header})");
1245
1246        let Some(ref mut state) = self.header_sub_state else {
1247            debug!("header-sub not initialized yet");
1248            return gossipsub::MessageAcceptance::Ignore;
1249        };
1250
1251        if state.known_head.verify(&header).is_err() {
1252            trace!("Failed to verify HeaderSub header. Ignoring {header}");
1253            return gossipsub::MessageAcceptance::Ignore;
1254        }
1255
1256        trace!("New header from header-sub ({header})");
1257
1258        state.known_head = header.clone();
1259        // We intentionally do not `send().await` to avoid blocking `P2p`
1260        // in case `Syncer` enters some weird state.
1261        let _ = state.channel.try_send(header);
1262
1263        gossipsub::MessageAcceptance::Accept
1264    }
1265
1266    #[instrument(skip_all)]
1267    async fn on_bad_encoding_fraud_sub_message(
1268        &mut self,
1269        data: &[u8],
1270        peer: &PeerId,
1271    ) -> gossipsub::MessageAcceptance {
1272        let Ok(befp) = BadEncodingFraudProof::decode(data) else {
1273            trace!("Malformed bad encoding fraud proof from {peer}");
1274            self.swarm.behaviour_mut().gossipsub.blacklist_peer(peer);
1275            return gossipsub::MessageAcceptance::Reject;
1276        };
1277
1278        let height = befp.height().value();
1279
1280        let current_height = if let Some(ref header_sub_state) = self.header_sub_state {
1281            header_sub_state.known_head.height().value()
1282        } else if let Ok(local_head) = self.store.get_head().await {
1283            local_head.height().value()
1284        } else {
1285            // we aren't tracking the network and have uninitialized store
1286            return gossipsub::MessageAcceptance::Ignore;
1287        };
1288
1289        if height > current_height + FRAUD_PROOF_HEAD_HEIGHT_THRESHOLD {
1290            // does this threshold make any sense if we're gonna ignore it anyway
1291            // since we won't have the header
1292            return gossipsub::MessageAcceptance::Ignore;
1293        }
1294
1295        let hash = befp.header_hash();
1296        let Ok(header) = self.store.get_by_hash(&hash).await else {
1297            // we can't verify the proof without a header
1298            // TODO: should we then store it and wait for the height? celestia doesn't
1299            return gossipsub::MessageAcceptance::Ignore;
1300        };
1301
1302        if let Err(e) = befp.validate(&header) {
1303            trace!("Received invalid bad encoding fraud proof from {peer}: {e}");
1304            self.swarm.behaviour_mut().gossipsub.blacklist_peer(peer);
1305            return gossipsub::MessageAcceptance::Reject;
1306        }
1307
1308        warn!("Received a valid bad encoding fraud proof");
1309        // trigger cancellation for all services
1310        self.network_compromised_token.trigger();
1311
1312        gossipsub::MessageAcceptance::Accept
1313    }
1314}
1315
1316/// Awaits at least one channel from the `bitswap_queries` to close.
1317async fn poll_closed(
1318    bitswap_queries: &mut HashMap<beetswap::QueryId, OneshotResultSender<Vec<u8>, P2pError>>,
1319) {
1320    poll_fn(|cx| {
1321        for chan in bitswap_queries.values_mut() {
1322            match chan.poll_closed(cx) {
1323                Poll::Pending => continue,
1324                Poll::Ready(_) => return Poll::Ready(()),
1325            }
1326        }
1327
1328        Poll::Pending
1329    })
1330    .await
1331}
1332
1333fn validate_bootnode_addrs(addrs: &[Multiaddr]) -> Result<(), P2pError> {
1334    let mut invalid_addrs = Vec::new();
1335
1336    for addr in addrs {
1337        if addr.peer_id().is_none() {
1338            invalid_addrs.push(addr.to_owned());
1339        }
1340    }
1341
1342    if invalid_addrs.is_empty() {
1343        Ok(())
1344    } else {
1345        Err(P2pError::BootnodeAddrsWithoutPeerId(invalid_addrs))
1346    }
1347}
1348
1349fn init_gossipsub<'a, B, S>(
1350    args: &'a P2pArgs<B, S>,
1351    topics: impl IntoIterator<Item = &'a gossipsub::IdentTopic>,
1352) -> Result<gossipsub::Behaviour>
1353where
1354    B: Blockstore,
1355    S: Store,
1356{
1357    // Set the message authenticity - How we expect to publish messages
1358    // Here we expect the publisher to sign the message with their key.
1359    let message_authenticity = gossipsub::MessageAuthenticity::Signed(args.local_keypair.clone());
1360
1361    let config = gossipsub::ConfigBuilder::default()
1362        .validation_mode(gossipsub::ValidationMode::Strict)
1363        .validate_messages()
1364        .build()
1365        .map_err(|e| P2pError::GossipsubInit(e.to_string()))?;
1366
1367    // build a gossipsub network behaviour
1368    let mut gossipsub: gossipsub::Behaviour =
1369        gossipsub::Behaviour::new(message_authenticity, config)
1370            .map_err(|e| P2pError::GossipsubInit(e.to_string()))?;
1371
1372    for topic in topics {
1373        gossipsub
1374            .subscribe(topic)
1375            .map_err(|e| P2pError::GossipsubInit(e.to_string()))?;
1376    }
1377
1378    Ok(gossipsub)
1379}
1380
1381fn init_kademlia<B, S>(args: &P2pArgs<B, S>) -> Result<kad::Behaviour<kad::store::MemoryStore>>
1382where
1383    B: Blockstore,
1384    S: Store,
1385{
1386    let local_peer_id = PeerId::from(args.local_keypair.public());
1387    let store = kad::store::MemoryStore::new(local_peer_id);
1388
1389    let protocol_id = celestia_protocol_id(&args.network_id, "/kad/1.0.0");
1390    let config = kad::Config::new(protocol_id);
1391
1392    let mut kademlia = kad::Behaviour::with_config(local_peer_id, store, config);
1393
1394    for addr in &args.bootnodes {
1395        if let Some(peer_id) = addr.peer_id() {
1396            kademlia.add_address(&peer_id, addr.to_owned());
1397        }
1398    }
1399
1400    if !args.listen_on.is_empty() {
1401        kademlia.set_mode(Some(kad::Mode::Server));
1402    }
1403
1404    Ok(kademlia)
1405}
1406
1407fn init_bitswap<B, S>(
1408    blockstore: Arc<B>,
1409    store: Arc<S>,
1410    network_id: &str,
1411) -> Result<beetswap::Behaviour<MAX_MH_SIZE, B>>
1412where
1413    B: Blockstore + 'static,
1414    S: Store + 'static,
1415{
1416    let protocol_prefix = celestia_protocol_id(network_id, "shwap");
1417
1418    Ok(beetswap::Behaviour::builder(blockstore)
1419        .protocol_prefix(protocol_prefix.as_ref())?
1420        .register_multihasher(ShwapMultihasher::new(store))
1421        .client_set_send_dont_have(false)
1422        .build())
1423}