1use std::collections::HashMap;
17use std::future::poll_fn;
18use std::sync::Arc;
19use std::task::Poll;
20use std::time::Duration;
21
22use blockstore::Blockstore;
23use blockstore::block::CidError;
24use celestia_proto::p2p::pb::{HeaderRequest, header_request};
25use celestia_types::fraud_proof::BadEncodingFraudProof;
26use celestia_types::hash::Hash;
27use celestia_types::namespace_data::NamespaceData;
28use celestia_types::nmt::Namespace;
29use celestia_types::row::{Row, RowId};
30use celestia_types::row_namespace_data::{RowNamespaceData, RowNamespaceDataId};
31use celestia_types::sample::{Sample, SampleId};
32use celestia_types::{Blob, ExtendedDataSquare, ExtendedHeader, FraudProof};
33use cid::Cid;
34use libp2p::gossipsub::TopicHash;
35use libp2p::identity::Keypair;
36use libp2p::swarm::{NetworkBehaviour, NetworkInfo};
37use libp2p::{Multiaddr, PeerId, gossipsub};
38use lumina_utils::executor::{JoinHandle, spawn};
39use lumina_utils::time::{self, Interval};
40use lumina_utils::token::Token;
41use smallvec::SmallVec;
42use tendermint_proto::Protobuf;
43use tokio::select;
44use tokio::sync::{mpsc, oneshot, watch};
45use tokio_util::sync::CancellationToken;
46use tracing::{debug, info, instrument, trace, warn};
47
48mod connection_control;
49mod header_ex;
50pub(crate) mod header_session;
51mod shrex;
52pub(crate) mod shwap;
53mod swarm;
54mod swarm_manager;
55mod utils;
56
57use crate::block_ranges::BlockRange;
58use crate::events::EventPublisher;
59use crate::p2p::header_session::HeaderSession;
60use crate::p2p::shwap::{ShwapMultihasher, convert_cid, get_block_container};
61use crate::p2p::swarm_manager::SwarmManager;
62use crate::peer_tracker::PeerTracker;
63use crate::peer_tracker::PeerTrackerInfo;
64use crate::store::{Store, StoreError};
65use crate::utils::{
66 MultiaddrExt, OneshotResultSender, OneshotResultSenderExt, OneshotSenderExt,
67 celestia_protocol_id, fraudsub_ident_topic, gossipsub_ident_topic,
68};
69
70pub use crate::p2p::header_ex::HeaderExError;
71pub use crate::p2p::shrex::ShrExError;
72
73pub(crate) const MAX_MH_SIZE: usize = 64;
75
76const FRAUD_PROOF_HEAD_HEIGHT_THRESHOLD: u64 = 20;
79
80pub(crate) type Result<T, E = P2pError> = std::result::Result<T, E>;
81
82#[derive(Debug, thiserror::Error)]
84pub enum P2pError {
85 #[error("Failed to initialize gossipsub behaviour: {0}")]
87 GossipsubInit(String),
88
89 #[error("Failed to initialize TLS: {0}")]
91 TlsInit(String),
92
93 #[error("Failed to initialize noise: {0}")]
95 NoiseInit(String),
96
97 #[error("Worker died")]
99 WorkerDied,
100
101 #[error("Channel closed unexpectedly")]
103 ChannelClosedUnexpectedly,
104
105 #[error("HeaderEx: {0}")]
107 HeaderEx(#[from] HeaderExError),
108
109 #[error("ShrEx: {0}")]
111 ShrEx(#[from] ShrExError),
112
113 #[error("Bootnode multiaddrs without peer ID: {0:?}")]
115 BootnodeAddrsWithoutPeerId(Vec<Multiaddr>),
116
117 #[error("Bitswap: {0}")]
119 Bitswap(#[from] beetswap::Error),
120
121 #[error("ProtoBuf decoding error: {0}")]
123 ProtoDecodeFailed(#[from] tendermint_proto::Error),
124
125 #[error("CID error: {0}")]
127 Cid(CidError),
128
129 #[error("Request timed out")]
131 RequestTimedOut,
132
133 #[error("Shwap: {0}")]
135 Shwap(String),
136
137 #[error(transparent)]
139 CelestiaTypes(#[from] celestia_types::Error),
140
141 #[error("Store error: {0}")]
143 Store(#[from] StoreError),
144
145 #[error("Header of {0} block was pruned because it is outside of retention period")]
147 HeaderPruned(u64),
148
149 #[error("Header of {0} block is not synced yet")]
151 HeaderNotSynced(u64),
152}
153
154impl P2pError {
155 pub(crate) fn is_fatal(&self) -> bool {
159 match self {
160 P2pError::GossipsubInit(_)
161 | P2pError::NoiseInit(_)
162 | P2pError::TlsInit(_)
163 | P2pError::WorkerDied
164 | P2pError::ChannelClosedUnexpectedly
165 | P2pError::BootnodeAddrsWithoutPeerId(_) => true,
166 P2pError::HeaderEx(_)
167 | P2pError::ShrEx(_)
168 | P2pError::Bitswap(_)
169 | P2pError::ProtoDecodeFailed(_)
170 | P2pError::Cid(_)
171 | P2pError::RequestTimedOut
172 | P2pError::Shwap(_)
173 | P2pError::CelestiaTypes(_)
174 | P2pError::HeaderPruned(_)
175 | P2pError::HeaderNotSynced(_) => false,
176 P2pError::Store(e) => e.is_fatal(),
177 }
178 }
179}
180
181impl From<oneshot::error::RecvError> for P2pError {
182 fn from(_value: oneshot::error::RecvError) -> Self {
183 P2pError::ChannelClosedUnexpectedly
184 }
185}
186
187impl From<prost::DecodeError> for P2pError {
188 fn from(value: prost::DecodeError) -> Self {
189 P2pError::ProtoDecodeFailed(tendermint_proto::Error::decode_message(value))
190 }
191}
192
193impl From<cid::Error> for P2pError {
194 fn from(value: cid::Error) -> Self {
195 P2pError::Cid(CidError::InvalidCid(value.to_string()))
196 }
197}
198
199#[derive(Debug)]
201pub(crate) struct P2p {
202 cancellation_token: CancellationToken,
203 cmd_tx: mpsc::Sender<P2pCmd>,
204 join_handle: JoinHandle,
205 peer_tracker_info_watcher: watch::Receiver<PeerTrackerInfo>,
206 local_peer_id: PeerId,
207}
208
209pub struct P2pArgs<B, S>
211where
212 B: Blockstore,
213 S: Store,
214{
215 pub network_id: String,
217 pub local_keypair: Keypair,
219 pub bootnodes: Vec<Multiaddr>,
221 pub listen_on: Vec<Multiaddr>,
223 pub blockstore: Arc<B>,
225 pub store: Arc<S>,
227 pub event_pub: EventPublisher,
229}
230
231#[derive(Debug)]
232pub(crate) enum P2pCmd {
233 NetworkInfo {
234 respond_to: oneshot::Sender<NetworkInfo>,
235 },
236 HeaderExRequest {
237 request: HeaderRequest,
238 respond_to: OneshotResultSender<Vec<ExtendedHeader>, P2pError>,
239 },
240 Listeners {
241 respond_to: oneshot::Sender<Vec<Multiaddr>>,
242 },
243 ConnectedPeers {
244 respond_to: oneshot::Sender<Vec<PeerId>>,
245 },
246 InitHeaderSub {
247 head: Box<ExtendedHeader>,
248 channel: mpsc::Sender<ExtendedHeader>,
250 },
251 SetPeerTrust {
252 peer_id: PeerId,
253 is_trusted: bool,
254 },
255 #[cfg(any(test, feature = "test-utils"))]
256 MarkAsArchival {
257 peer_id: PeerId,
258 },
259 GetShwapCid {
260 cid: Cid,
261 respond_to: OneshotResultSender<Vec<u8>, P2pError>,
262 },
263 GetNetworkCompromisedToken {
264 respond_to: oneshot::Sender<Token>,
265 },
266 GetNetworkHead {
267 respond_to: oneshot::Sender<Option<ExtendedHeader>>,
268 },
269 #[allow(dead_code)]
272 GetRow {
273 row_index: u16,
274 block_height: u64,
275 respond_to: OneshotResultSender<Row, P2pError>,
276 },
277 #[allow(dead_code)]
280 GetSample {
281 row_index: u16,
282 column_index: u16,
283 block_height: u64,
284 respond_to: OneshotResultSender<Sample, P2pError>,
285 },
286 GetNamespaceData {
287 namespace: Namespace,
288 block_height: u64,
289 respond_to: OneshotResultSender<NamespaceData, P2pError>,
290 },
291 GetEds {
292 block_height: u64,
293 respond_to: OneshotResultSender<ExtendedDataSquare, P2pError>,
294 },
295}
296
297impl P2p {
298 pub async fn start<B, S>(args: P2pArgs<B, S>) -> Result<Self>
300 where
301 B: Blockstore + 'static,
302 S: Store + 'static,
303 {
304 validate_bootnode_addrs(&args.bootnodes)?;
305
306 let local_peer_id = PeerId::from(args.local_keypair.public());
307
308 let peer_tracker = PeerTracker::new(args.event_pub.clone());
309 let peer_tracker_info_watcher = peer_tracker.info_watcher();
310
311 let cancellation_token = CancellationToken::new();
312 let (cmd_tx, cmd_rx) = mpsc::channel(16);
313
314 let mut worker =
315 Worker::new(args, cancellation_token.child_token(), cmd_rx, peer_tracker).await?;
316
317 let join_handle = spawn(async move {
318 worker.run().await;
319 });
320
321 Ok(P2p {
322 cancellation_token,
323 cmd_tx,
324 join_handle,
325 peer_tracker_info_watcher,
326 local_peer_id,
327 })
328 }
329
330 #[cfg(test)]
332 pub fn mocked() -> (Self, crate::test_utils::MockP2pHandle) {
333 let (cmd_tx, cmd_rx) = mpsc::channel(16);
334 let (peer_tracker_tx, peer_tracker_rx) = watch::channel(PeerTrackerInfo::default());
335 let cancellation_token = CancellationToken::new();
336
337 let join_handle = spawn(async {});
339
340 let p2p = P2p {
341 cmd_tx: cmd_tx.clone(),
342 cancellation_token,
343 join_handle,
344 peer_tracker_info_watcher: peer_tracker_rx,
345 local_peer_id: PeerId::random(),
346 };
347
348 let handle = crate::test_utils::MockP2pHandle {
349 cmd_tx,
350 cmd_rx,
351 header_sub_tx: None,
352 peer_tracker_tx,
353 };
354
355 (p2p, handle)
356 }
357
358 pub fn stop(&self) {
360 self.cancellation_token.cancel();
362 }
363
364 pub async fn join(&self) {
366 self.join_handle.join().await;
367 }
368
369 pub fn local_peer_id(&self) -> &PeerId {
371 &self.local_peer_id
372 }
373
374 async fn send_command(&self, cmd: P2pCmd) -> Result<()> {
375 self.cmd_tx
376 .send(cmd)
377 .await
378 .map_err(|_| P2pError::WorkerDied)
379 }
380
381 pub fn peer_tracker_info_watcher(&self) -> watch::Receiver<PeerTrackerInfo> {
383 self.peer_tracker_info_watcher.clone()
384 }
385
386 pub fn peer_tracker_info(&self) -> watch::Ref<'_, PeerTrackerInfo> {
388 self.peer_tracker_info_watcher.borrow()
389 }
390
391 pub async fn init_header_sub(
393 &self,
394 head: ExtendedHeader,
395 channel: mpsc::Sender<ExtendedHeader>,
396 ) -> Result<()> {
397 self.send_command(P2pCmd::InitHeaderSub {
398 head: Box::new(head),
399 channel,
400 })
401 .await
402 }
403
404 pub async fn wait_connected(&self) -> Result<()> {
406 self.peer_tracker_info_watcher()
407 .wait_for(|info| info.num_connected_peers > 0)
408 .await
409 .map(drop)
410 .map_err(|_| P2pError::WorkerDied)
411 }
412
413 pub async fn wait_connected_trusted(&self) -> Result<()> {
415 self.peer_tracker_info_watcher()
416 .wait_for(|info| info.num_connected_trusted_peers > 0)
417 .await
418 .map(drop)
419 .map_err(|_| P2pError::WorkerDied)
420 }
421
422 pub async fn network_info(&self) -> Result<NetworkInfo> {
424 let (tx, rx) = oneshot::channel();
425
426 self.send_command(P2pCmd::NetworkInfo { respond_to: tx })
427 .await?;
428
429 Ok(rx.await?)
430 }
431
432 pub async fn header_ex_request(&self, request: HeaderRequest) -> Result<Vec<ExtendedHeader>> {
434 let (tx, rx) = oneshot::channel();
435
436 self.send_command(P2pCmd::HeaderExRequest {
437 request,
438 respond_to: tx,
439 })
440 .await?;
441
442 rx.await?
443 }
444
445 pub async fn get_head_header(&self) -> Result<ExtendedHeader> {
447 self.get_header_by_height(0).await
448 }
449
450 pub async fn get_header(&self, hash: Hash) -> Result<ExtendedHeader> {
452 self.header_ex_request(HeaderRequest {
453 data: Some(header_request::Data::Hash(hash.as_bytes().to_vec())),
454 amount: 1,
455 })
456 .await?
457 .into_iter()
458 .next()
459 .ok_or(HeaderExError::HeaderNotFound.into())
460 }
461
462 pub async fn get_header_by_height(&self, height: u64) -> Result<ExtendedHeader> {
464 self.header_ex_request(HeaderRequest {
465 data: Some(header_request::Data::Origin(height)),
466 amount: 1,
467 })
468 .await?
469 .into_iter()
470 .next()
471 .ok_or(HeaderExError::HeaderNotFound.into())
472 }
473
474 pub async fn get_verified_headers_range(
479 &self,
480 from: &ExtendedHeader,
481 amount: u64,
482 ) -> Result<Vec<ExtendedHeader>> {
483 from.validate().map_err(|_| HeaderExError::InvalidRequest)?;
485
486 let height = from.height() + 1;
487
488 let range = height..=height + amount - 1;
489
490 let mut session = HeaderSession::new(range, self.cmd_tx.clone());
491 let headers = session.run().await?;
492
493 from.verify_adjacent_range(&headers)
498 .map_err(|_| HeaderExError::InvalidResponse)?;
499
500 Ok(headers)
501 }
502
503 pub(crate) async fn get_unverified_header_range(
508 &self,
509 range: BlockRange,
510 ) -> Result<Vec<ExtendedHeader>> {
511 if range.is_empty() {
512 return Err(HeaderExError::InvalidRequest.into());
513 }
514
515 let mut session = HeaderSession::new(range, self.cmd_tx.clone());
516 let headers = session.run().await?;
517
518 let Some(head) = headers.first() else {
519 return Err(HeaderExError::InvalidResponse.into());
520 };
521
522 head.verify_adjacent_range(&headers[1..])
523 .map_err(|_| HeaderExError::InvalidResponse)?;
524
525 Ok(headers)
526 }
527
528 pub(crate) async fn get_shwap_cid(
530 &self,
531 cid: Cid,
532 timeout: Option<Duration>,
533 ) -> Result<Vec<u8>> {
534 let (tx, rx) = oneshot::channel();
535
536 self.send_command(P2pCmd::GetShwapCid {
537 cid,
538 respond_to: tx,
539 })
540 .await?;
541
542 let data = match timeout {
543 Some(dur) => time::timeout(dur, rx)
544 .await
545 .map_err(|_| P2pError::RequestTimedOut)???,
546 None => rx.await??,
547 };
548
549 get_block_container(&cid, &data)
550 }
551
552 pub async fn get_row(
554 &self,
555 row_index: u16,
556 block_height: u64,
557 timeout: Option<Duration>,
558 ) -> Result<Row> {
559 let id = RowId::new(row_index, block_height)?;
560 let cid = convert_cid(&id.into())?;
561
562 let data = self.get_shwap_cid(cid, timeout).await?;
563 let row = Row::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?;
564 Ok(row)
565 }
566
567 pub async fn get_sample(
569 &self,
570 row_index: u16,
571 column_index: u16,
572 block_height: u64,
573 timeout: Option<Duration>,
574 ) -> Result<Sample> {
575 let id = SampleId::new(row_index, column_index, block_height)?;
576 let cid = convert_cid(&id.into())?;
577
578 let data = self.get_shwap_cid(cid, timeout).await?;
579 let sample = Sample::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?;
580 Ok(sample)
581 }
582
583 pub async fn get_eds(
584 &self,
585 block_height: u64,
586 timeout: Option<Duration>,
587 ) -> Result<ExtendedDataSquare> {
588 let (tx, rx) = oneshot::channel();
589
590 self.send_command(P2pCmd::GetEds {
591 block_height,
592 respond_to: tx,
593 })
594 .await?;
595
596 match timeout {
597 Some(dur) => time::timeout(dur, rx)
598 .await
599 .map_err(|_| P2pError::RequestTimedOut)??,
600 None => rx.await?,
601 }
602 }
603
604 pub async fn get_row_namespace_data(
606 &self,
607 namespace: Namespace,
608 row_index: u16,
609 block_height: u64,
610 timeout: Option<Duration>,
611 ) -> Result<RowNamespaceData> {
612 let id = RowNamespaceDataId::new(namespace, row_index, block_height)?;
613 let cid = convert_cid(&id.into())?;
614
615 let data = self.get_shwap_cid(cid, timeout).await?;
616 let row_namespace_data =
617 RowNamespaceData::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?;
618 Ok(row_namespace_data)
619 }
620
621 pub async fn get_namespace_data(
622 &self,
623 namespace: Namespace,
624 block_height: u64,
625 timeout: Option<Duration>,
626 ) -> Result<NamespaceData> {
627 let (tx, rx) = oneshot::channel();
628
629 self.send_command(P2pCmd::GetNamespaceData {
630 namespace,
631 block_height,
632 respond_to: tx,
633 })
634 .await?;
635
636 match timeout {
637 Some(dur) => time::timeout(dur, rx)
638 .await
639 .map_err(|_| P2pError::RequestTimedOut)??,
640 None => rx.await?,
641 }
642 }
643
644 pub async fn get_all_blobs<S>(
647 &self,
648 namespace: Namespace,
649 block_height: u64,
650 timeout: Option<Duration>,
651 store: &S,
652 ) -> Result<Vec<Blob>>
653 where
654 S: Store,
655 {
656 let app_version = match store.get_by_height(block_height).await {
659 Ok(header) => header.app_version(),
660 Err(StoreError::NotFound) => {
661 let pruned_ranges = store.get_pruned_ranges().await?;
662
663 if pruned_ranges.contains(block_height) {
664 return Err(P2pError::HeaderPruned(block_height));
665 } else {
666 return Err(P2pError::HeaderNotSynced(block_height));
667 }
668 }
669 Err(e) => return Err(e.into()),
670 };
671
672 let namespace_data = self
673 .get_namespace_data(namespace, block_height, timeout)
674 .await?;
675
676 let shares = namespace_data
677 .rows()
678 .iter()
679 .flat_map(|row| row.shares.iter());
680
681 Ok(Blob::reconstruct_all(shares, app_version)?)
682 }
683
684 pub async fn listeners(&self) -> Result<Vec<Multiaddr>> {
686 let (tx, rx) = oneshot::channel();
687
688 self.send_command(P2pCmd::Listeners { respond_to: tx })
689 .await?;
690
691 Ok(rx.await?)
692 }
693
694 pub async fn connected_peers(&self) -> Result<Vec<PeerId>> {
696 let (tx, rx) = oneshot::channel();
697
698 self.send_command(P2pCmd::ConnectedPeers { respond_to: tx })
699 .await?;
700
701 Ok(rx.await?)
702 }
703
704 pub async fn set_peer_trust(&self, peer_id: PeerId, is_trusted: bool) -> Result<()> {
706 self.send_command(P2pCmd::SetPeerTrust {
707 peer_id,
708 is_trusted,
709 })
710 .await
711 }
712
713 #[cfg(any(test, feature = "test-utils"))]
714 pub(crate) async fn mark_as_archival(&self, peer_id: PeerId) -> Result<()> {
715 self.send_command(P2pCmd::MarkAsArchival { peer_id }).await
716 }
717
718 pub(crate) async fn get_network_compromised_token(&self) -> Result<Token> {
723 let (tx, rx) = oneshot::channel();
724
725 self.send_command(P2pCmd::GetNetworkCompromisedToken { respond_to: tx })
726 .await?;
727
728 Ok(rx.await?)
729 }
730
731 pub async fn get_network_head(&self) -> Result<Option<ExtendedHeader>> {
733 let (tx, rx) = oneshot::channel();
734
735 self.send_command(P2pCmd::GetNetworkHead { respond_to: tx })
736 .await?;
737
738 Ok(rx.await?)
739 }
740}
741
742impl Drop for P2p {
743 fn drop(&mut self) {
744 self.stop();
745 }
746}
747
748#[derive(NetworkBehaviour)]
749struct Behaviour<B, S>
750where
751 B: Blockstore + 'static,
752 S: Store + 'static,
753{
754 bitswap: beetswap::Behaviour<MAX_MH_SIZE, B>,
755 header_ex: header_ex::Behaviour<S>,
756 shr_ex: shrex::Behaviour<S>,
757 gossipsub: gossipsub::Behaviour,
758}
759
760struct Worker<B, S>
761where
762 B: Blockstore + 'static,
763 S: Store + 'static,
764{
765 cancellation_token: CancellationToken,
766 swarm: SwarmManager<Behaviour<B, S>>,
767 header_sub_topic_hash: TopicHash,
768 bad_encoding_fraud_sub_topic: TopicHash,
769 cmd_rx: mpsc::Receiver<P2pCmd>,
770 header_sub_state: Option<HeaderSubState>,
771 bitswap_queries: HashMap<beetswap::QueryId, OneshotResultSender<Vec<u8>, P2pError>>,
772 network_compromised_token: Token,
773 store: Arc<S>,
774}
775
776struct HeaderSubState {
777 known_head: ExtendedHeader,
778 channel: mpsc::Sender<ExtendedHeader>,
779}
780
781impl<B, S> Worker<B, S>
782where
783 B: Blockstore,
784 S: Store,
785{
786 async fn new(
787 args: P2pArgs<B, S>,
788 cancellation_token: CancellationToken,
789 cmd_rx: mpsc::Receiver<P2pCmd>,
790 peer_tracker: PeerTracker,
791 ) -> Result<Self, P2pError> {
792 let mut swarm = SwarmManager::new(
793 &args.network_id,
794 &args.local_keypair,
795 &args.bootnodes,
796 &args.listen_on,
797 peer_tracker,
798 args.event_pub.clone(),
799 )
800 .await?;
801
802 let header_sub_topic = gossipsub_ident_topic(&args.network_id, "/header-sub/v0.0.1");
803 let bad_encoding_fraud_sub_topic =
804 fraudsub_ident_topic(BadEncodingFraudProof::TYPE, &args.network_id);
805 let gossipsub = init_gossipsub(&args, [&header_sub_topic, &bad_encoding_fraud_sub_topic])?;
806
807 let bitswap = init_bitswap(
808 args.blockstore.clone(),
809 args.store.clone(),
810 &args.network_id,
811 )?;
812
813 let header_ex = header_ex::Behaviour::new(header_ex::Config {
814 network_id: &args.network_id,
815 header_store: args.store.clone(),
816 });
817
818 let shr_ex = shrex::Behaviour::new(shrex::Config {
819 network_id: &args.network_id,
820 local_keypair: &args.local_keypair,
821 header_store: args.store.clone(),
822 stream_ctrl: swarm.stream_control(),
823 })?;
824
825 swarm.attach_behaviour(Behaviour {
826 bitswap,
827 gossipsub,
828 header_ex,
829 shr_ex,
830 });
831
832 Ok(Worker {
833 cancellation_token,
834 swarm,
835 cmd_rx,
836 bad_encoding_fraud_sub_topic: bad_encoding_fraud_sub_topic.hash(),
837 header_sub_topic_hash: header_sub_topic.hash(),
838 header_sub_state: None,
839 bitswap_queries: HashMap::new(),
840 network_compromised_token: Token::new(),
841 store: args.store,
842 })
843 }
844
845 async fn run(&mut self) {
846 let mut report_interval = Interval::new(Duration::from_secs(60));
847
848 loop {
849 select! {
850 _ = self.cancellation_token.cancelled() => break,
851 _ = report_interval.tick() => {
852 self.report();
853 }
854 _ = poll_closed(&mut self.bitswap_queries) => {
855 self.prune_canceled_bitswap_queries();
856 }
857 res = self.swarm.poll() => {
858 match res {
859 Ok(ev) => {
860 if let Err(e) = self.on_behaviour_event(ev).await {
861 warn!("Failure while handling behaviour event: {e}");
862 }
863 }
864 Err(e) => warn!("Failure while polling SwarmManager: {e}"),
865 }
866 }
867 Some(cmd) = self.cmd_rx.recv() => {
868 if let Err(e) = self.on_cmd(cmd).await {
869 warn!("Failure while handling command. (error: {e})");
870 }
871 }
872 }
873 }
874
875 self.swarm.context().behaviour.header_ex.stop();
876 self.swarm.context().behaviour.shr_ex.stop();
877 self.swarm.stop().await;
878 }
879
880 fn prune_canceled_bitswap_queries(&mut self) {
881 let mut cancelled = SmallVec::<[_; 16]>::new();
882
883 for (query_id, chan) in &self.bitswap_queries {
884 if chan.is_closed() {
885 cancelled.push(*query_id);
886 }
887 }
888
889 for query_id in cancelled {
890 self.bitswap_queries.remove(&query_id);
891 self.swarm.context().behaviour.bitswap.cancel(query_id);
892 }
893 }
894
895 async fn on_behaviour_event(&mut self, ev: BehaviourEvent<B, S>) -> Result<()> {
896 match ev {
897 BehaviourEvent::Gossipsub(ev) => self.on_gossip_sub_event(ev).await,
898 BehaviourEvent::Bitswap(ev) => self.on_bitswap_event(ev).await,
899 BehaviourEvent::HeaderEx(ev) => self.on_header_ex_event(ev).await,
900 BehaviourEvent::ShrEx(ev) => self.on_shrex_event(ev).await,
901 }
902
903 Ok(())
904 }
905
906 async fn on_cmd(&mut self, cmd: P2pCmd) -> Result<()> {
907 match cmd {
908 P2pCmd::NetworkInfo { respond_to } => {
909 respond_to.maybe_send(self.swarm.network_info());
910 }
911 P2pCmd::HeaderExRequest {
912 request,
913 respond_to,
914 } => {
915 self.swarm
916 .context()
917 .behaviour
918 .header_ex
919 .send_request(request, respond_to);
920 }
921 P2pCmd::Listeners { respond_to } => {
922 respond_to.maybe_send(self.swarm.listeners());
923 }
924 P2pCmd::ConnectedPeers { respond_to } => {
925 let peers = self
926 .swarm
927 .context()
928 .peer_tracker
929 .peers()
930 .filter_map(|peer| {
931 if peer.is_connected() {
932 Some(*peer.id())
933 } else {
934 None
935 }
936 })
937 .collect();
938 respond_to.maybe_send(peers);
939 }
940 P2pCmd::InitHeaderSub { head, channel } => {
941 self.on_init_header_sub(*head, channel);
942 }
943 P2pCmd::SetPeerTrust {
944 peer_id,
945 is_trusted,
946 } => {
947 self.swarm.set_peer_trust(&peer_id, is_trusted);
948 }
949 #[cfg(any(test, feature = "test-utils"))]
950 P2pCmd::MarkAsArchival { peer_id } => {
951 self.swarm.mark_as_archival(&peer_id);
952 }
953 P2pCmd::GetShwapCid { cid, respond_to } => {
954 self.on_get_shwap_cid(cid, respond_to);
955 }
956 P2pCmd::GetNetworkCompromisedToken { respond_to } => {
957 respond_to.maybe_send(self.network_compromised_token.clone())
958 }
959 P2pCmd::GetNetworkHead { respond_to } => {
960 let head = self
961 .header_sub_state
962 .as_ref()
963 .map(|state| state.known_head.clone());
964 respond_to.maybe_send(head);
965 }
966 P2pCmd::GetRow {
967 row_index,
968 block_height,
969 respond_to,
970 } => {
971 self.swarm
972 .context()
973 .behaviour
974 .shr_ex
975 .get_row(block_height, row_index, respond_to)
976 .await
977 }
978 P2pCmd::GetSample {
979 row_index,
980 column_index,
981 block_height,
982 respond_to,
983 } => {
984 self.swarm
985 .context()
986 .behaviour
987 .shr_ex
988 .get_sample(block_height, row_index, column_index, respond_to)
989 .await
990 }
991 P2pCmd::GetNamespaceData {
992 namespace,
993 block_height,
994 respond_to,
995 } => {
996 self.swarm
997 .context()
998 .behaviour
999 .shr_ex
1000 .get_namespace_data(block_height, namespace, respond_to)
1001 .await
1002 }
1003 P2pCmd::GetEds {
1004 block_height,
1005 respond_to,
1006 } => {
1007 self.swarm
1008 .context()
1009 .behaviour
1010 .shr_ex
1011 .get_eds(block_height, respond_to)
1012 .await
1013 }
1014 }
1015
1016 Ok(())
1017 }
1018
1019 #[instrument(skip_all)]
1020 fn report(&mut self) {
1021 let tracker_info = self.swarm.context().peer_tracker.info();
1022
1023 info!(
1024 "peers: {}, trusted peers: {}",
1025 tracker_info.num_connected_peers, tracker_info.num_connected_trusted_peers,
1026 );
1027 }
1028
1029 #[instrument(level = "trace", skip(self))]
1030 async fn on_gossip_sub_event(&mut self, ev: gossipsub::Event) {
1031 match ev {
1032 gossipsub::Event::Message {
1033 message,
1034 message_id,
1035 ..
1036 } => {
1037 let Some(peer) = message.source else {
1038 return;
1040 };
1041
1042 let acceptance = if message.topic == self.header_sub_topic_hash {
1043 self.on_header_sub_message(&message.data[..])
1044 } else if message.topic == self.bad_encoding_fraud_sub_topic {
1045 self.on_bad_encoding_fraud_sub_message(&message.data[..], &peer)
1046 .await
1047 } else {
1048 trace!("Unhandled gossipsub message");
1049 gossipsub::MessageAcceptance::Ignore
1050 };
1051
1052 if !matches!(acceptance, gossipsub::MessageAcceptance::Reject) {
1053 self.swarm.peer_maybe_discovered(&peer);
1055 }
1056
1057 let _ = self
1058 .swarm
1059 .context()
1060 .behaviour
1061 .gossipsub
1062 .report_message_validation_result(&message_id, &peer, acceptance);
1063 }
1064 _ => trace!("Unhandled gossipsub event"),
1065 }
1066 }
1067
1068 #[instrument(level = "trace", skip_all)]
1069 fn on_get_shwap_cid(&mut self, cid: Cid, respond_to: OneshotResultSender<Vec<u8>, P2pError>) {
1070 trace!("Requesting CID {cid} from bitswap");
1071 let query_id = self.swarm.context().behaviour.bitswap.get(&cid);
1072 self.bitswap_queries.insert(query_id, respond_to);
1073 }
1074
1075 #[instrument(level = "trace", skip(self))]
1076 async fn on_bitswap_event(&mut self, ev: beetswap::Event) {
1077 match ev {
1078 beetswap::Event::GetQueryResponse { query_id, data } => {
1079 if let Some(respond_to) = self.bitswap_queries.remove(&query_id) {
1080 respond_to.maybe_send_ok(data);
1081 }
1082 }
1083 beetswap::Event::GetQueryError { query_id, error } => {
1084 if let Some(respond_to) = self.bitswap_queries.remove(&query_id) {
1085 let error: P2pError = error.into();
1086 respond_to.maybe_send_err(error);
1087 }
1088 }
1089 }
1090 }
1091
1092 #[instrument(level = "trace", skip(self))]
1093 async fn on_header_ex_event(&mut self, ev: header_ex::Event) {
1094 match ev {
1095 header_ex::Event::SchedulePendingRequests => {
1096 let ctx = self.swarm.context();
1097
1098 ctx.behaviour
1099 .header_ex
1100 .schedule_pending_requests(ctx.peer_tracker);
1101 }
1102 header_ex::Event::NeedTrustedPeers => {
1103 self.swarm.connect_to_bootnodes();
1104 }
1105 header_ex::Event::NeedArchivalPeers => {
1106 self.swarm.start_archival_node_kad_query();
1107 }
1108 }
1109 }
1110
1111 #[instrument(level = "trace", skip(self))]
1112 async fn on_shrex_event(&mut self, ev: shrex::Event) {
1113 match ev {
1114 shrex::Event::SchedulePendingRequests => {
1115 let ctx = self.swarm.context();
1116
1117 ctx.behaviour
1118 .shr_ex
1119 .schedule_pending_requests(ctx.peer_tracker);
1120 }
1121
1122 shrex::Event::AddPeers(peers) => {
1123 let added = peers
1124 .iter()
1125 .filter(|peer| self.swarm.peer_maybe_discovered(peer))
1126 .count();
1127
1128 debug!("Added {added} peers discovered through shrex");
1129 }
1130
1131 shrex::Event::BlockPeers(peers) => {
1132 let blocked = peers
1133 .into_iter()
1134 .filter(|peer| self.swarm.blacklist_peer(peer))
1135 .count();
1136
1137 debug!("Blocked {blocked} peers for shrex missbehaviour");
1138 }
1139 }
1140 }
1141
1142 #[instrument(skip_all, fields(header = %head))]
1143 fn on_init_header_sub(&mut self, head: ExtendedHeader, channel: mpsc::Sender<ExtendedHeader>) {
1144 self.header_sub_state = Some(HeaderSubState {
1145 known_head: head,
1146 channel,
1147 });
1148 trace!("HeaderSub initialized");
1149 }
1150
1151 #[instrument(skip_all)]
1152 fn on_header_sub_message(&mut self, data: &[u8]) -> gossipsub::MessageAcceptance {
1153 let Ok(header) = ExtendedHeader::decode_and_validate(data) else {
1154 trace!("Malformed or invalid header from header-sub");
1155 return gossipsub::MessageAcceptance::Reject;
1156 };
1157
1158 trace!("Received header from header-sub ({header})");
1159
1160 let Some(ref mut state) = self.header_sub_state else {
1161 debug!("header-sub not initialized yet");
1162 return gossipsub::MessageAcceptance::Ignore;
1163 };
1164
1165 if state.known_head.verify(&header).is_err() {
1166 trace!("Failed to verify HeaderSub header. Ignoring {header}");
1167 return gossipsub::MessageAcceptance::Ignore;
1168 }
1169
1170 trace!("New header from header-sub ({header})");
1171
1172 state.known_head = header.clone();
1173 let _ = state.channel.try_send(header);
1176
1177 gossipsub::MessageAcceptance::Accept
1178 }
1179
1180 #[instrument(skip_all)]
1181 async fn on_bad_encoding_fraud_sub_message(
1182 &mut self,
1183 data: &[u8],
1184 peer: &PeerId,
1185 ) -> gossipsub::MessageAcceptance {
1186 let Ok(befp) = BadEncodingFraudProof::decode(data) else {
1187 trace!("Malformed bad encoding fraud proof from {peer}");
1188 self.swarm
1189 .context()
1190 .behaviour
1191 .gossipsub
1192 .blacklist_peer(peer);
1193 return gossipsub::MessageAcceptance::Reject;
1194 };
1195
1196 let height = befp.height();
1197
1198 let current_height = if let Some(ref header_sub_state) = self.header_sub_state {
1199 header_sub_state.known_head.height()
1200 } else if let Ok(local_head) = self.store.get_head().await {
1201 local_head.height()
1202 } else {
1203 return gossipsub::MessageAcceptance::Ignore;
1205 };
1206
1207 if height > current_height + FRAUD_PROOF_HEAD_HEIGHT_THRESHOLD {
1208 return gossipsub::MessageAcceptance::Ignore;
1211 }
1212
1213 let hash = befp.header_hash();
1214 let Ok(header) = self.store.get_by_hash(&hash).await else {
1215 return gossipsub::MessageAcceptance::Ignore;
1218 };
1219
1220 if let Err(e) = befp.validate(&header) {
1221 trace!("Received invalid bad encoding fraud proof from {peer}: {e}");
1222 self.swarm
1223 .context()
1224 .behaviour
1225 .gossipsub
1226 .blacklist_peer(peer);
1227 return gossipsub::MessageAcceptance::Reject;
1228 }
1229
1230 warn!("Received a valid bad encoding fraud proof");
1231 self.network_compromised_token.trigger();
1233
1234 gossipsub::MessageAcceptance::Accept
1235 }
1236}
1237
1238async fn poll_closed(
1240 bitswap_queries: &mut HashMap<beetswap::QueryId, OneshotResultSender<Vec<u8>, P2pError>>,
1241) {
1242 poll_fn(|cx| {
1243 for chan in bitswap_queries.values_mut() {
1244 match chan.poll_closed(cx) {
1245 Poll::Pending => continue,
1246 Poll::Ready(_) => return Poll::Ready(()),
1247 }
1248 }
1249
1250 Poll::Pending
1251 })
1252 .await
1253}
1254
1255fn validate_bootnode_addrs(addrs: &[Multiaddr]) -> Result<(), P2pError> {
1256 let mut invalid_addrs = Vec::new();
1257
1258 for addr in addrs {
1259 if addr.peer_id().is_none() {
1260 invalid_addrs.push(addr.to_owned());
1261 }
1262 }
1263
1264 if invalid_addrs.is_empty() {
1265 Ok(())
1266 } else {
1267 Err(P2pError::BootnodeAddrsWithoutPeerId(invalid_addrs))
1268 }
1269}
1270
1271fn init_gossipsub<'a, B, S>(
1272 args: &'a P2pArgs<B, S>,
1273 topics: impl IntoIterator<Item = &'a gossipsub::IdentTopic>,
1274) -> Result<gossipsub::Behaviour>
1275where
1276 B: Blockstore,
1277 S: Store,
1278{
1279 let message_authenticity = gossipsub::MessageAuthenticity::Signed(args.local_keypair.clone());
1282
1283 let config = gossipsub::ConfigBuilder::default()
1284 .validation_mode(gossipsub::ValidationMode::Strict)
1285 .validate_messages()
1286 .build()
1287 .map_err(|e| P2pError::GossipsubInit(e.to_string()))?;
1288
1289 let mut gossipsub: gossipsub::Behaviour =
1291 gossipsub::Behaviour::new(message_authenticity, config)
1292 .map_err(|e| P2pError::GossipsubInit(e.to_string()))?;
1293
1294 for topic in topics {
1295 gossipsub
1296 .subscribe(topic)
1297 .map_err(|e| P2pError::GossipsubInit(e.to_string()))?;
1298 }
1299
1300 Ok(gossipsub)
1301}
1302
1303fn init_bitswap<B, S>(
1304 blockstore: Arc<B>,
1305 store: Arc<S>,
1306 network_id: &str,
1307) -> Result<beetswap::Behaviour<MAX_MH_SIZE, B>>
1308where
1309 B: Blockstore + 'static,
1310 S: Store + 'static,
1311{
1312 let protocol_prefix = celestia_protocol_id(network_id, "shwap");
1313
1314 Ok(beetswap::Behaviour::builder(blockstore)
1315 .protocol_prefix(protocol_prefix.as_ref())?
1316 .register_multihasher(ShwapMultihasher::new(store))
1317 .client_set_send_dont_have(false)
1318 .build())
1319}