1use std::collections::HashMap;
17use std::future::poll_fn;
18use std::sync::Arc;
19use std::task::Poll;
20use std::time::Duration;
21
22use blockstore::Blockstore;
23use celestia_proto::p2p::pb::{HeaderRequest, header_request};
24use celestia_types::fraud_proof::BadEncodingFraudProof;
25use celestia_types::hash::Hash;
26use celestia_types::nmt::{Namespace, NamespacedSha2Hasher};
27use celestia_types::row::{Row, RowId};
28use celestia_types::row_namespace_data::{RowNamespaceData, RowNamespaceDataId};
29use celestia_types::sample::{Sample, SampleId};
30use celestia_types::{Blob, ExtendedHeader, FraudProof};
31use cid::Cid;
32use futures::TryStreamExt;
33use futures::stream::FuturesOrdered;
34use libp2p::gossipsub::TopicHash;
35use libp2p::identity::Keypair;
36use libp2p::swarm::{NetworkBehaviour, NetworkInfo};
37use libp2p::{Multiaddr, PeerId, gossipsub};
38use lumina_utils::executor::{JoinHandle, spawn};
39use lumina_utils::time::{self, Interval};
40use lumina_utils::token::Token;
41use smallvec::SmallVec;
42use tendermint_proto::Protobuf;
43use tokio::select;
44use tokio::sync::{mpsc, oneshot, watch};
45use tokio_util::sync::CancellationToken;
46use tracing::{debug, info, instrument, trace, warn};
47
48mod connection_control;
49mod header_ex;
50pub(crate) mod header_session;
51pub(crate) mod shwap;
52mod swarm;
53mod swarm_manager;
54
55use crate::block_ranges::BlockRange;
56use crate::events::EventPublisher;
57use crate::p2p::header_ex::{HeaderExBehaviour, HeaderExConfig};
58use crate::p2p::header_session::HeaderSession;
59use crate::p2p::shwap::{ShwapMultihasher, convert_cid, get_block_container};
60use crate::p2p::swarm_manager::SwarmManager;
61use crate::peer_tracker::PeerTracker;
62use crate::peer_tracker::PeerTrackerInfo;
63use crate::store::{Store, StoreError};
64use crate::utils::{
65 MultiaddrExt, OneshotResultSender, OneshotResultSenderExt, OneshotSenderExt,
66 celestia_protocol_id, fraudsub_ident_topic, gossipsub_ident_topic,
67};
68
69pub use crate::p2p::header_ex::HeaderExError;
70
71pub(crate) const MAX_MH_SIZE: usize = 64;
73
74const FRAUD_PROOF_HEAD_HEIGHT_THRESHOLD: u64 = 20;
77
78pub(crate) type Result<T, E = P2pError> = std::result::Result<T, E>;
79
80#[derive(Debug, thiserror::Error)]
82pub enum P2pError {
83 #[error("Failed to initialize gossipsub behaviour: {0}")]
85 GossipsubInit(String),
86
87 #[error("Failed to initialize TLS: {0}")]
89 TlsInit(String),
90
91 #[error("Failed to initialize noise: {0}")]
93 NoiseInit(String),
94
95 #[error("Worker died")]
97 WorkerDied,
98
99 #[error("Channel closed unexpectedly")]
101 ChannelClosedUnexpectedly,
102
103 #[error("Not connected to any peers")]
105 NoConnectedPeers,
106
107 #[error("HeaderEx: {0}")]
109 HeaderEx(#[from] HeaderExError),
110
111 #[error("Bootnode multiaddrs without peer ID: {0:?}")]
113 BootnodeAddrsWithoutPeerId(Vec<Multiaddr>),
114
115 #[error("Bitswap: {0}")]
117 Bitswap(#[from] beetswap::Error),
118
119 #[error("ProtoBuf decoding error: {0}")]
121 ProtoDecodeFailed(#[from] tendermint_proto::Error),
122
123 #[error("CID error: {0}")]
125 Cid(celestia_types::Error),
126
127 #[error("Bitswap query timed out")]
129 BitswapQueryTimeout,
130
131 #[error("Shwap: {0}")]
133 Shwap(String),
134
135 #[error(transparent)]
137 CelestiaTypes(#[from] celestia_types::Error),
138
139 #[error("Store error: {0}")]
141 Store(#[from] StoreError),
142
143 #[error("Header of {0} block was pruned because it is outside of retention period")]
145 HeaderPruned(u64),
146
147 #[error("Header of {0} block is not synced yet")]
149 HeaderNotSynced(u64),
150}
151
152impl P2pError {
153 pub(crate) fn is_fatal(&self) -> bool {
157 match self {
158 P2pError::GossipsubInit(_)
159 | P2pError::NoiseInit(_)
160 | P2pError::TlsInit(_)
161 | P2pError::WorkerDied
162 | P2pError::ChannelClosedUnexpectedly
163 | P2pError::BootnodeAddrsWithoutPeerId(_) => true,
164 P2pError::NoConnectedPeers
165 | P2pError::HeaderEx(_)
166 | P2pError::Bitswap(_)
167 | P2pError::ProtoDecodeFailed(_)
168 | P2pError::Cid(_)
169 | P2pError::BitswapQueryTimeout
170 | P2pError::Shwap(_)
171 | P2pError::CelestiaTypes(_)
172 | P2pError::HeaderPruned(_)
173 | P2pError::HeaderNotSynced(_) => false,
174 P2pError::Store(e) => e.is_fatal(),
175 }
176 }
177}
178
179impl From<oneshot::error::RecvError> for P2pError {
180 fn from(_value: oneshot::error::RecvError) -> Self {
181 P2pError::ChannelClosedUnexpectedly
182 }
183}
184
185impl From<prost::DecodeError> for P2pError {
186 fn from(value: prost::DecodeError) -> Self {
187 P2pError::ProtoDecodeFailed(tendermint_proto::Error::decode_message(value))
188 }
189}
190
191impl From<cid::Error> for P2pError {
192 fn from(value: cid::Error) -> Self {
193 P2pError::Cid(celestia_types::Error::CidError(
194 blockstore::block::CidError::InvalidCid(value.to_string()),
195 ))
196 }
197}
198
199#[derive(Debug)]
201pub(crate) struct P2p {
202 cancellation_token: CancellationToken,
203 cmd_tx: mpsc::Sender<P2pCmd>,
204 join_handle: JoinHandle,
205 peer_tracker_info_watcher: watch::Receiver<PeerTrackerInfo>,
206 local_peer_id: PeerId,
207}
208
209pub struct P2pArgs<B, S>
211where
212 B: Blockstore,
213 S: Store,
214{
215 pub network_id: String,
217 pub local_keypair: Keypair,
219 pub bootnodes: Vec<Multiaddr>,
221 pub listen_on: Vec<Multiaddr>,
223 pub blockstore: Arc<B>,
225 pub store: Arc<S>,
227 pub event_pub: EventPublisher,
229}
230
231#[derive(Debug)]
232pub(crate) enum P2pCmd {
233 NetworkInfo {
234 respond_to: oneshot::Sender<NetworkInfo>,
235 },
236 HeaderExRequest {
237 request: HeaderRequest,
238 respond_to: OneshotResultSender<Vec<ExtendedHeader>, P2pError>,
239 },
240 Listeners {
241 respond_to: oneshot::Sender<Vec<Multiaddr>>,
242 },
243 ConnectedPeers {
244 respond_to: oneshot::Sender<Vec<PeerId>>,
245 },
246 InitHeaderSub {
247 head: Box<ExtendedHeader>,
248 channel: mpsc::Sender<ExtendedHeader>,
250 },
251 SetPeerTrust {
252 peer_id: PeerId,
253 is_trusted: bool,
254 },
255 GetShwapCid {
256 cid: Cid,
257 respond_to: OneshotResultSender<Vec<u8>, P2pError>,
258 },
259 GetNetworkCompromisedToken {
260 respond_to: oneshot::Sender<Token>,
261 },
262 GetNetworkHead {
263 respond_to: oneshot::Sender<Option<ExtendedHeader>>,
264 },
265}
266
267impl P2p {
268 pub async fn start<B, S>(args: P2pArgs<B, S>) -> Result<Self>
270 where
271 B: Blockstore + 'static,
272 S: Store + 'static,
273 {
274 validate_bootnode_addrs(&args.bootnodes)?;
275
276 let local_peer_id = PeerId::from(args.local_keypair.public());
277
278 let peer_tracker = PeerTracker::new(args.event_pub.clone());
279 let peer_tracker_info_watcher = peer_tracker.info_watcher();
280
281 let cancellation_token = CancellationToken::new();
282 let (cmd_tx, cmd_rx) = mpsc::channel(16);
283
284 let mut worker =
285 Worker::new(args, cancellation_token.child_token(), cmd_rx, peer_tracker).await?;
286
287 let join_handle = spawn(async move {
288 worker.run().await;
289 });
290
291 Ok(P2p {
292 cancellation_token,
293 cmd_tx,
294 join_handle,
295 peer_tracker_info_watcher,
296 local_peer_id,
297 })
298 }
299
300 #[cfg(test)]
302 pub fn mocked() -> (Self, crate::test_utils::MockP2pHandle) {
303 let (cmd_tx, cmd_rx) = mpsc::channel(16);
304 let (peer_tracker_tx, peer_tracker_rx) = watch::channel(PeerTrackerInfo::default());
305 let cancellation_token = CancellationToken::new();
306
307 let join_handle = spawn(async {});
309
310 let p2p = P2p {
311 cmd_tx: cmd_tx.clone(),
312 cancellation_token,
313 join_handle,
314 peer_tracker_info_watcher: peer_tracker_rx,
315 local_peer_id: PeerId::random(),
316 };
317
318 let handle = crate::test_utils::MockP2pHandle {
319 cmd_tx,
320 cmd_rx,
321 header_sub_tx: None,
322 peer_tracker_tx,
323 };
324
325 (p2p, handle)
326 }
327
328 pub fn stop(&self) {
330 self.cancellation_token.cancel();
332 }
333
334 pub async fn join(&self) {
336 self.join_handle.join().await;
337 }
338
339 pub fn local_peer_id(&self) -> &PeerId {
341 &self.local_peer_id
342 }
343
344 async fn send_command(&self, cmd: P2pCmd) -> Result<()> {
345 self.cmd_tx
346 .send(cmd)
347 .await
348 .map_err(|_| P2pError::WorkerDied)
349 }
350
351 pub fn peer_tracker_info_watcher(&self) -> watch::Receiver<PeerTrackerInfo> {
353 self.peer_tracker_info_watcher.clone()
354 }
355
356 pub fn peer_tracker_info(&self) -> watch::Ref<'_, PeerTrackerInfo> {
358 self.peer_tracker_info_watcher.borrow()
359 }
360
361 pub async fn init_header_sub(
363 &self,
364 head: ExtendedHeader,
365 channel: mpsc::Sender<ExtendedHeader>,
366 ) -> Result<()> {
367 self.send_command(P2pCmd::InitHeaderSub {
368 head: Box::new(head),
369 channel,
370 })
371 .await
372 }
373
374 pub async fn wait_connected(&self) -> Result<()> {
376 self.peer_tracker_info_watcher()
377 .wait_for(|info| info.num_connected_peers > 0)
378 .await
379 .map(drop)
380 .map_err(|_| P2pError::WorkerDied)
381 }
382
383 pub async fn wait_connected_trusted(&self) -> Result<()> {
385 self.peer_tracker_info_watcher()
386 .wait_for(|info| info.num_connected_trusted_peers > 0)
387 .await
388 .map(drop)
389 .map_err(|_| P2pError::WorkerDied)
390 }
391
392 pub async fn network_info(&self) -> Result<NetworkInfo> {
394 let (tx, rx) = oneshot::channel();
395
396 self.send_command(P2pCmd::NetworkInfo { respond_to: tx })
397 .await?;
398
399 Ok(rx.await?)
400 }
401
402 pub async fn header_ex_request(&self, request: HeaderRequest) -> Result<Vec<ExtendedHeader>> {
404 let (tx, rx) = oneshot::channel();
405
406 self.send_command(P2pCmd::HeaderExRequest {
407 request,
408 respond_to: tx,
409 })
410 .await?;
411
412 rx.await?
413 }
414
415 pub async fn get_head_header(&self) -> Result<ExtendedHeader> {
417 self.get_header_by_height(0).await
418 }
419
420 pub async fn get_header(&self, hash: Hash) -> Result<ExtendedHeader> {
422 self.header_ex_request(HeaderRequest {
423 data: Some(header_request::Data::Hash(hash.as_bytes().to_vec())),
424 amount: 1,
425 })
426 .await?
427 .into_iter()
428 .next()
429 .ok_or(HeaderExError::HeaderNotFound.into())
430 }
431
432 pub async fn get_header_by_height(&self, height: u64) -> Result<ExtendedHeader> {
434 self.header_ex_request(HeaderRequest {
435 data: Some(header_request::Data::Origin(height)),
436 amount: 1,
437 })
438 .await?
439 .into_iter()
440 .next()
441 .ok_or(HeaderExError::HeaderNotFound.into())
442 }
443
444 pub async fn get_verified_headers_range(
449 &self,
450 from: &ExtendedHeader,
451 amount: u64,
452 ) -> Result<Vec<ExtendedHeader>> {
453 from.validate().map_err(|_| HeaderExError::InvalidRequest)?;
455
456 let height = from.height().value() + 1;
457
458 let range = height..=height + amount - 1;
459
460 let mut session = HeaderSession::new(range, self.cmd_tx.clone());
461 let headers = session.run().await?;
462
463 from.verify_adjacent_range(&headers)
468 .map_err(|_| HeaderExError::InvalidResponse)?;
469
470 Ok(headers)
471 }
472
473 pub(crate) async fn get_unverified_header_range(
478 &self,
479 range: BlockRange,
480 ) -> Result<Vec<ExtendedHeader>> {
481 if range.is_empty() {
482 return Err(HeaderExError::InvalidRequest.into());
483 }
484
485 let mut session = HeaderSession::new(range, self.cmd_tx.clone());
486 let headers = session.run().await?;
487
488 let Some(head) = headers.first() else {
489 return Err(HeaderExError::InvalidResponse.into());
490 };
491
492 head.verify_adjacent_range(&headers[1..])
493 .map_err(|_| HeaderExError::InvalidResponse)?;
494
495 Ok(headers)
496 }
497
498 pub(crate) async fn get_shwap_cid(
500 &self,
501 cid: Cid,
502 timeout: Option<Duration>,
503 ) -> Result<Vec<u8>> {
504 let (tx, rx) = oneshot::channel();
505
506 self.send_command(P2pCmd::GetShwapCid {
507 cid,
508 respond_to: tx,
509 })
510 .await?;
511
512 let data = match timeout {
513 Some(dur) => time::timeout(dur, rx)
514 .await
515 .map_err(|_| P2pError::BitswapQueryTimeout)???,
516 None => rx.await??,
517 };
518
519 get_block_container(&cid, &data)
520 }
521
522 pub async fn get_row(
524 &self,
525 row_index: u16,
526 block_height: u64,
527 timeout: Option<Duration>,
528 ) -> Result<Row> {
529 let id = RowId::new(row_index, block_height).map_err(P2pError::Cid)?;
530 let cid = convert_cid(&id.into())?;
531
532 let data = self.get_shwap_cid(cid, timeout).await?;
533 let row = Row::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?;
534 Ok(row)
535 }
536
537 pub async fn get_sample(
539 &self,
540 row_index: u16,
541 column_index: u16,
542 block_height: u64,
543 timeout: Option<Duration>,
544 ) -> Result<Sample> {
545 let id = SampleId::new(row_index, column_index, block_height).map_err(P2pError::Cid)?;
546 let cid = convert_cid(&id.into())?;
547
548 let data = self.get_shwap_cid(cid, timeout).await?;
549 let sample = Sample::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?;
550 Ok(sample)
551 }
552
553 pub async fn get_row_namespace_data(
555 &self,
556 namespace: Namespace,
557 row_index: u16,
558 block_height: u64,
559 timeout: Option<Duration>,
560 ) -> Result<RowNamespaceData> {
561 let id =
562 RowNamespaceDataId::new(namespace, row_index, block_height).map_err(P2pError::Cid)?;
563 let cid = convert_cid(&id.into())?;
564
565 let data = self.get_shwap_cid(cid, timeout).await?;
566 let row_namespace_data =
567 RowNamespaceData::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?;
568 Ok(row_namespace_data)
569 }
570
571 pub async fn get_all_blobs<S>(
574 &self,
575 namespace: Namespace,
576 block_height: u64,
577 timeout: Option<Duration>,
578 store: &S,
579 ) -> Result<Vec<Blob>>
580 where
581 S: Store,
582 {
583 let header = match store.get_by_height(block_height).await {
584 Ok(header) => header,
585 Err(StoreError::NotFound) => {
586 let pruned_ranges = store.get_pruned_ranges().await?;
587
588 if pruned_ranges.contains(block_height) {
589 return Err(P2pError::HeaderPruned(block_height));
590 } else {
591 return Err(P2pError::HeaderNotSynced(block_height));
592 }
593 }
594 Err(e) => return Err(e.into()),
595 };
596
597 let app_version = header.app_version()?;
598 let rows_to_fetch: Vec<_> = header
599 .dah
600 .row_roots()
601 .iter()
602 .enumerate()
603 .filter(|(_, row)| row.contains::<NamespacedSha2Hasher>(*namespace))
604 .map(|(n, _)| n as u16)
605 .collect();
606
607 let futs = rows_to_fetch
608 .into_iter()
609 .map(|row_idx| self.get_row_namespace_data(namespace, row_idx, block_height, timeout))
610 .collect::<FuturesOrdered<_>>();
611
612 let rows: Vec<_> = match futs.try_collect().await {
613 Ok(rows) => rows,
614 Err(P2pError::BitswapQueryTimeout) if !store.has_at(block_height).await => {
615 return Err(P2pError::HeaderPruned(block_height));
616 }
617 Err(e) => return Err(e),
618 };
619
620 let shares = rows.iter().flat_map(|row| row.shares.iter());
621
622 Ok(Blob::reconstruct_all(shares, app_version)?)
623 }
624
625 pub async fn listeners(&self) -> Result<Vec<Multiaddr>> {
627 let (tx, rx) = oneshot::channel();
628
629 self.send_command(P2pCmd::Listeners { respond_to: tx })
630 .await?;
631
632 Ok(rx.await?)
633 }
634
635 pub async fn connected_peers(&self) -> Result<Vec<PeerId>> {
637 let (tx, rx) = oneshot::channel();
638
639 self.send_command(P2pCmd::ConnectedPeers { respond_to: tx })
640 .await?;
641
642 Ok(rx.await?)
643 }
644
645 pub async fn set_peer_trust(&self, peer_id: PeerId, is_trusted: bool) -> Result<()> {
647 self.send_command(P2pCmd::SetPeerTrust {
648 peer_id,
649 is_trusted,
650 })
651 .await
652 }
653
654 pub(crate) async fn get_network_compromised_token(&self) -> Result<Token> {
659 let (tx, rx) = oneshot::channel();
660
661 self.send_command(P2pCmd::GetNetworkCompromisedToken { respond_to: tx })
662 .await?;
663
664 Ok(rx.await?)
665 }
666
667 pub async fn get_network_head(&self) -> Result<Option<ExtendedHeader>> {
669 let (tx, rx) = oneshot::channel();
670
671 self.send_command(P2pCmd::GetNetworkHead { respond_to: tx })
672 .await?;
673
674 Ok(rx.await?)
675 }
676}
677
678impl Drop for P2p {
679 fn drop(&mut self) {
680 self.stop();
681 }
682}
683
684#[derive(NetworkBehaviour)]
685struct Behaviour<B, S>
686where
687 B: Blockstore + 'static,
688 S: Store + 'static,
689{
690 bitswap: beetswap::Behaviour<MAX_MH_SIZE, B>,
691 header_ex: HeaderExBehaviour<S>,
692 gossipsub: gossipsub::Behaviour,
693}
694
695struct Worker<B, S>
696where
697 B: Blockstore + 'static,
698 S: Store + 'static,
699{
700 cancellation_token: CancellationToken,
701 swarm: SwarmManager<Behaviour<B, S>>,
702 header_sub_topic_hash: TopicHash,
703 bad_encoding_fraud_sub_topic: TopicHash,
704 cmd_rx: mpsc::Receiver<P2pCmd>,
705 header_sub_state: Option<HeaderSubState>,
706 bitswap_queries: HashMap<beetswap::QueryId, OneshotResultSender<Vec<u8>, P2pError>>,
707 network_compromised_token: Token,
708 store: Arc<S>,
709}
710
711struct HeaderSubState {
712 known_head: ExtendedHeader,
713 channel: mpsc::Sender<ExtendedHeader>,
714}
715
716impl<B, S> Worker<B, S>
717where
718 B: Blockstore,
719 S: Store,
720{
721 async fn new(
722 args: P2pArgs<B, S>,
723 cancellation_token: CancellationToken,
724 cmd_rx: mpsc::Receiver<P2pCmd>,
725 peer_tracker: PeerTracker,
726 ) -> Result<Self, P2pError> {
727 let header_sub_topic = gossipsub_ident_topic(&args.network_id, "/header-sub/v0.0.1");
728 let bad_encoding_fraud_sub_topic =
729 fraudsub_ident_topic(BadEncodingFraudProof::TYPE, &args.network_id);
730 let gossipsub = init_gossipsub(&args, [&header_sub_topic, &bad_encoding_fraud_sub_topic])?;
731
732 let bitswap = init_bitswap(
733 args.blockstore.clone(),
734 args.store.clone(),
735 &args.network_id,
736 )?;
737
738 let header_ex = HeaderExBehaviour::new(HeaderExConfig {
739 network_id: &args.network_id,
740 header_store: args.store.clone(),
741 });
742
743 let behaviour = Behaviour {
744 bitswap,
745 gossipsub,
746 header_ex,
747 };
748
749 let swarm = SwarmManager::new(
750 &args.network_id,
751 &args.local_keypair,
752 &args.bootnodes,
753 &args.listen_on,
754 peer_tracker,
755 args.event_pub,
756 behaviour,
757 )
758 .await?;
759
760 Ok(Worker {
761 cancellation_token,
762 swarm,
763 cmd_rx,
764 bad_encoding_fraud_sub_topic: bad_encoding_fraud_sub_topic.hash(),
765 header_sub_topic_hash: header_sub_topic.hash(),
766 header_sub_state: None,
767 bitswap_queries: HashMap::new(),
768 network_compromised_token: Token::new(),
769 store: args.store,
770 })
771 }
772
773 async fn run(&mut self) {
774 let mut report_interval = Interval::new(Duration::from_secs(60));
775
776 loop {
777 select! {
778 _ = self.cancellation_token.cancelled() => break,
779 _ = report_interval.tick() => {
780 self.report();
781 }
782 _ = poll_closed(&mut self.bitswap_queries) => {
783 self.prune_canceled_bitswap_queries();
784 }
785 res = self.swarm.poll() => {
786 match res {
787 Ok(ev) => {
788 if let Err(e) = self.on_behaviour_event(ev).await {
789 warn!("Failure while handling behaviour event: {e}");
790 }
791 }
792 Err(e) => warn!("Failure while polling SwarmManager: {e}"),
793 }
794 }
795 Some(cmd) = self.cmd_rx.recv() => {
796 if let Err(e) = self.on_cmd(cmd).await {
797 warn!("Failure while handling command. (error: {e})");
798 }
799 }
800 }
801 }
802
803 self.swarm.context().behaviour.header_ex.stop();
804 self.swarm.stop().await;
805 }
806
807 fn prune_canceled_bitswap_queries(&mut self) {
808 let mut cancelled = SmallVec::<[_; 16]>::new();
809
810 for (query_id, chan) in &self.bitswap_queries {
811 if chan.is_closed() {
812 cancelled.push(*query_id);
813 }
814 }
815
816 for query_id in cancelled {
817 self.bitswap_queries.remove(&query_id);
818 self.swarm.context().behaviour.bitswap.cancel(query_id);
819 }
820 }
821
822 async fn on_behaviour_event(&mut self, ev: BehaviourEvent<B, S>) -> Result<()> {
823 match ev {
824 BehaviourEvent::Gossipsub(ev) => self.on_gossip_sub_event(ev).await,
825 BehaviourEvent::Bitswap(ev) => self.on_bitswap_event(ev).await,
826 BehaviourEvent::HeaderEx(_) => {}
827 }
828
829 Ok(())
830 }
831
832 async fn on_cmd(&mut self, cmd: P2pCmd) -> Result<()> {
833 match cmd {
834 P2pCmd::NetworkInfo { respond_to } => {
835 respond_to.maybe_send(self.swarm.network_info());
836 }
837 P2pCmd::HeaderExRequest {
838 request,
839 respond_to,
840 } => {
841 let ctx = self.swarm.context();
842 ctx.behaviour
843 .header_ex
844 .send_request(request, respond_to, ctx.peer_tracker);
845 }
846 P2pCmd::Listeners { respond_to } => {
847 respond_to.maybe_send(self.swarm.listeners());
848 }
849 P2pCmd::ConnectedPeers { respond_to } => {
850 let peers = self
851 .swarm
852 .context()
853 .peer_tracker
854 .peers()
855 .filter_map(|peer| {
856 if peer.is_connected() {
857 Some(*peer.id())
858 } else {
859 None
860 }
861 })
862 .collect();
863 respond_to.maybe_send(peers);
864 }
865 P2pCmd::InitHeaderSub { head, channel } => {
866 self.on_init_header_sub(*head, channel);
867 }
868 P2pCmd::SetPeerTrust {
869 peer_id,
870 is_trusted,
871 } => {
872 self.swarm.set_peer_trust(&peer_id, is_trusted);
873 }
874 P2pCmd::GetShwapCid { cid, respond_to } => {
875 self.on_get_shwap_cid(cid, respond_to);
876 }
877 P2pCmd::GetNetworkCompromisedToken { respond_to } => {
878 respond_to.maybe_send(self.network_compromised_token.clone())
879 }
880 P2pCmd::GetNetworkHead { respond_to } => {
881 let head = self
882 .header_sub_state
883 .as_ref()
884 .map(|state| state.known_head.clone());
885 respond_to.maybe_send(head);
886 }
887 }
888
889 Ok(())
890 }
891
892 #[instrument(skip_all)]
893 fn report(&mut self) {
894 let tracker_info = self.swarm.context().peer_tracker.info();
895
896 info!(
897 "peers: {}, trusted peers: {}",
898 tracker_info.num_connected_peers, tracker_info.num_connected_trusted_peers,
899 );
900 }
901
902 #[instrument(level = "trace", skip(self))]
903 async fn on_gossip_sub_event(&mut self, ev: gossipsub::Event) {
904 match ev {
905 gossipsub::Event::Message {
906 message,
907 message_id,
908 ..
909 } => {
910 let Some(peer) = message.source else {
911 return;
913 };
914
915 let acceptance = if message.topic == self.header_sub_topic_hash {
916 self.on_header_sub_message(&message.data[..])
917 } else if message.topic == self.bad_encoding_fraud_sub_topic {
918 self.on_bad_encoding_fraud_sub_message(&message.data[..], &peer)
919 .await
920 } else {
921 trace!("Unhandled gossipsub message");
922 gossipsub::MessageAcceptance::Ignore
923 };
924
925 if !matches!(acceptance, gossipsub::MessageAcceptance::Reject) {
926 self.swarm.peer_maybe_discovered(&peer);
928 }
929
930 let _ = self
931 .swarm
932 .context()
933 .behaviour
934 .gossipsub
935 .report_message_validation_result(&message_id, &peer, acceptance);
936 }
937 _ => trace!("Unhandled gossipsub event"),
938 }
939 }
940
941 #[instrument(level = "trace", skip_all)]
942 fn on_get_shwap_cid(&mut self, cid: Cid, respond_to: OneshotResultSender<Vec<u8>, P2pError>) {
943 trace!("Requesting CID {cid} from bitswap");
944 let query_id = self.swarm.context().behaviour.bitswap.get(&cid);
945 self.bitswap_queries.insert(query_id, respond_to);
946 }
947
948 #[instrument(level = "trace", skip(self))]
949 async fn on_bitswap_event(&mut self, ev: beetswap::Event) {
950 match ev {
951 beetswap::Event::GetQueryResponse { query_id, data } => {
952 if let Some(respond_to) = self.bitswap_queries.remove(&query_id) {
953 respond_to.maybe_send_ok(data);
954 }
955 }
956 beetswap::Event::GetQueryError { query_id, error } => {
957 if let Some(respond_to) = self.bitswap_queries.remove(&query_id) {
958 let error: P2pError = error.into();
959 respond_to.maybe_send_err(error);
960 }
961 }
962 }
963 }
964
965 #[instrument(skip_all, fields(header = %head))]
966 fn on_init_header_sub(&mut self, head: ExtendedHeader, channel: mpsc::Sender<ExtendedHeader>) {
967 self.header_sub_state = Some(HeaderSubState {
968 known_head: head,
969 channel,
970 });
971 trace!("HeaderSub initialized");
972 }
973
974 #[instrument(skip_all)]
975 fn on_header_sub_message(&mut self, data: &[u8]) -> gossipsub::MessageAcceptance {
976 let Ok(header) = ExtendedHeader::decode_and_validate(data) else {
977 trace!("Malformed or invalid header from header-sub");
978 return gossipsub::MessageAcceptance::Reject;
979 };
980
981 trace!("Received header from header-sub ({header})");
982
983 let Some(ref mut state) = self.header_sub_state else {
984 debug!("header-sub not initialized yet");
985 return gossipsub::MessageAcceptance::Ignore;
986 };
987
988 if state.known_head.verify(&header).is_err() {
989 trace!("Failed to verify HeaderSub header. Ignoring {header}");
990 return gossipsub::MessageAcceptance::Ignore;
991 }
992
993 trace!("New header from header-sub ({header})");
994
995 state.known_head = header.clone();
996 let _ = state.channel.try_send(header);
999
1000 gossipsub::MessageAcceptance::Accept
1001 }
1002
1003 #[instrument(skip_all)]
1004 async fn on_bad_encoding_fraud_sub_message(
1005 &mut self,
1006 data: &[u8],
1007 peer: &PeerId,
1008 ) -> gossipsub::MessageAcceptance {
1009 let Ok(befp) = BadEncodingFraudProof::decode(data) else {
1010 trace!("Malformed bad encoding fraud proof from {peer}");
1011 self.swarm
1012 .context()
1013 .behaviour
1014 .gossipsub
1015 .blacklist_peer(peer);
1016 return gossipsub::MessageAcceptance::Reject;
1017 };
1018
1019 let height = befp.height().value();
1020
1021 let current_height = if let Some(ref header_sub_state) = self.header_sub_state {
1022 header_sub_state.known_head.height().value()
1023 } else if let Ok(local_head) = self.store.get_head().await {
1024 local_head.height().value()
1025 } else {
1026 return gossipsub::MessageAcceptance::Ignore;
1028 };
1029
1030 if height > current_height + FRAUD_PROOF_HEAD_HEIGHT_THRESHOLD {
1031 return gossipsub::MessageAcceptance::Ignore;
1034 }
1035
1036 let hash = befp.header_hash();
1037 let Ok(header) = self.store.get_by_hash(&hash).await else {
1038 return gossipsub::MessageAcceptance::Ignore;
1041 };
1042
1043 if let Err(e) = befp.validate(&header) {
1044 trace!("Received invalid bad encoding fraud proof from {peer}: {e}");
1045 self.swarm
1046 .context()
1047 .behaviour
1048 .gossipsub
1049 .blacklist_peer(peer);
1050 return gossipsub::MessageAcceptance::Reject;
1051 }
1052
1053 warn!("Received a valid bad encoding fraud proof");
1054 self.network_compromised_token.trigger();
1056
1057 gossipsub::MessageAcceptance::Accept
1058 }
1059}
1060
1061async fn poll_closed(
1063 bitswap_queries: &mut HashMap<beetswap::QueryId, OneshotResultSender<Vec<u8>, P2pError>>,
1064) {
1065 poll_fn(|cx| {
1066 for chan in bitswap_queries.values_mut() {
1067 match chan.poll_closed(cx) {
1068 Poll::Pending => continue,
1069 Poll::Ready(_) => return Poll::Ready(()),
1070 }
1071 }
1072
1073 Poll::Pending
1074 })
1075 .await
1076}
1077
1078fn validate_bootnode_addrs(addrs: &[Multiaddr]) -> Result<(), P2pError> {
1079 let mut invalid_addrs = Vec::new();
1080
1081 for addr in addrs {
1082 if addr.peer_id().is_none() {
1083 invalid_addrs.push(addr.to_owned());
1084 }
1085 }
1086
1087 if invalid_addrs.is_empty() {
1088 Ok(())
1089 } else {
1090 Err(P2pError::BootnodeAddrsWithoutPeerId(invalid_addrs))
1091 }
1092}
1093
1094fn init_gossipsub<'a, B, S>(
1095 args: &'a P2pArgs<B, S>,
1096 topics: impl IntoIterator<Item = &'a gossipsub::IdentTopic>,
1097) -> Result<gossipsub::Behaviour>
1098where
1099 B: Blockstore,
1100 S: Store,
1101{
1102 let message_authenticity = gossipsub::MessageAuthenticity::Signed(args.local_keypair.clone());
1105
1106 let config = gossipsub::ConfigBuilder::default()
1107 .validation_mode(gossipsub::ValidationMode::Strict)
1108 .validate_messages()
1109 .build()
1110 .map_err(|e| P2pError::GossipsubInit(e.to_string()))?;
1111
1112 let mut gossipsub: gossipsub::Behaviour =
1114 gossipsub::Behaviour::new(message_authenticity, config)
1115 .map_err(|e| P2pError::GossipsubInit(e.to_string()))?;
1116
1117 for topic in topics {
1118 gossipsub
1119 .subscribe(topic)
1120 .map_err(|e| P2pError::GossipsubInit(e.to_string()))?;
1121 }
1122
1123 Ok(gossipsub)
1124}
1125
1126fn init_bitswap<B, S>(
1127 blockstore: Arc<B>,
1128 store: Arc<S>,
1129 network_id: &str,
1130) -> Result<beetswap::Behaviour<MAX_MH_SIZE, B>>
1131where
1132 B: Blockstore + 'static,
1133 S: Store + 'static,
1134{
1135 let protocol_prefix = celestia_protocol_id(network_id, "shwap");
1136
1137 Ok(beetswap::Behaviour::builder(blockstore)
1138 .protocol_prefix(protocol_prefix.as_ref())?
1139 .register_multihasher(ShwapMultihasher::new(store))
1140 .client_set_send_dont_have(false)
1141 .build())
1142}