1use std::collections::HashMap;
17use std::future::poll_fn;
18use std::sync::Arc;
19use std::task::Poll;
20use std::time::Duration;
21
22use blockstore::Blockstore;
23use blockstore::block::CidError;
24use celestia_proto::p2p::pb::{HeaderRequest, header_request};
25use celestia_types::fraud_proof::BadEncodingFraudProof;
26use celestia_types::hash::Hash;
27use celestia_types::namespace_data::NamespaceData;
28use celestia_types::nmt::Namespace;
29use celestia_types::row::{Row, RowId};
30use celestia_types::row_namespace_data::{RowNamespaceData, RowNamespaceDataId};
31use celestia_types::sample::{Sample, SampleId};
32use celestia_types::{Blob, ExtendedDataSquare, ExtendedHeader, FraudProof};
33use cid::Cid;
34use libp2p::gossipsub::TopicHash;
35use libp2p::identity::Keypair;
36use libp2p::swarm::{NetworkBehaviour, NetworkInfo};
37use libp2p::{Multiaddr, PeerId, gossipsub};
38use lumina_utils::executor::{JoinHandle, spawn};
39use lumina_utils::time::{self, Interval};
40use lumina_utils::token::Token;
41use smallvec::SmallVec;
42use tendermint_proto::Protobuf;
43use tokio::select;
44use tokio::sync::{mpsc, oneshot, watch};
45use tokio_util::sync::CancellationToken;
46use tracing::{debug, info, instrument, trace, warn};
47
48mod connection_control;
49mod header_ex;
50pub(crate) mod header_session;
51mod shrex;
52pub(crate) mod shwap;
53mod swarm;
54mod swarm_manager;
55mod utils;
56
57use crate::block_ranges::BlockRange;
58use crate::events::EventPublisher;
59use crate::p2p::header_session::HeaderSession;
60use crate::p2p::shwap::{ShwapMultihasher, convert_cid, get_block_container};
61use crate::p2p::swarm_manager::SwarmManager;
62use crate::peer_tracker::PeerTracker;
63use crate::peer_tracker::PeerTrackerInfo;
64use crate::store::{Store, StoreError};
65use crate::utils::{
66 MultiaddrExt, OneshotResultSender, OneshotResultSenderExt, OneshotSenderExt,
67 celestia_protocol_id, fraudsub_ident_topic, gossipsub_ident_topic,
68};
69
70pub use crate::p2p::header_ex::HeaderExError;
71pub use crate::p2p::shrex::ShrExError;
72
73pub(crate) const MAX_MH_SIZE: usize = 64;
75
76const FRAUD_PROOF_HEAD_HEIGHT_THRESHOLD: u64 = 20;
79
80pub(crate) type Result<T, E = P2pError> = std::result::Result<T, E>;
81
82#[derive(Debug, thiserror::Error)]
84pub enum P2pError {
85 #[error("Failed to initialize gossipsub behaviour: {0}")]
87 GossipsubInit(String),
88
89 #[error("Failed to initialize TLS: {0}")]
91 TlsInit(String),
92
93 #[error("Failed to initialize noise: {0}")]
95 NoiseInit(String),
96
97 #[error("Worker died")]
99 WorkerDied,
100
101 #[error("Channel closed unexpectedly")]
103 ChannelClosedUnexpectedly,
104
105 #[error("HeaderEx: {0}")]
107 HeaderEx(#[from] HeaderExError),
108
109 #[error("ShrEx: {0}")]
111 ShrEx(#[from] ShrExError),
112
113 #[error("Bootnode multiaddrs without peer ID: {0:?}")]
115 BootnodeAddrsWithoutPeerId(Vec<Multiaddr>),
116
117 #[error("Bitswap: {0}")]
119 Bitswap(#[from] beetswap::Error),
120
121 #[error("ProtoBuf decoding error: {0}")]
123 ProtoDecodeFailed(#[from] tendermint_proto::Error),
124
125 #[error("CID error: {0}")]
127 Cid(CidError),
128
129 #[error("Request timed out")]
131 RequestTimedOut,
132
133 #[error("Shwap: {0}")]
135 Shwap(String),
136
137 #[error(transparent)]
139 CelestiaTypes(#[from] celestia_types::Error),
140
141 #[error("Store error: {0}")]
143 Store(#[from] StoreError),
144
145 #[error("Header of {0} block was pruned because it is outside of retention period")]
147 HeaderPruned(u64),
148
149 #[error("Header of {0} block is not synced yet")]
151 HeaderNotSynced(u64),
152}
153
154impl P2pError {
155 pub(crate) fn is_fatal(&self) -> bool {
159 match self {
160 P2pError::GossipsubInit(_)
161 | P2pError::NoiseInit(_)
162 | P2pError::TlsInit(_)
163 | P2pError::WorkerDied
164 | P2pError::ChannelClosedUnexpectedly
165 | P2pError::BootnodeAddrsWithoutPeerId(_) => true,
166 P2pError::HeaderEx(_)
167 | P2pError::ShrEx(_)
168 | P2pError::Bitswap(_)
169 | P2pError::ProtoDecodeFailed(_)
170 | P2pError::Cid(_)
171 | P2pError::RequestTimedOut
172 | P2pError::Shwap(_)
173 | P2pError::CelestiaTypes(_)
174 | P2pError::HeaderPruned(_)
175 | P2pError::HeaderNotSynced(_) => false,
176 P2pError::Store(e) => e.is_fatal(),
177 }
178 }
179}
180
181impl From<oneshot::error::RecvError> for P2pError {
182 fn from(_value: oneshot::error::RecvError) -> Self {
183 P2pError::ChannelClosedUnexpectedly
184 }
185}
186
187impl From<prost::DecodeError> for P2pError {
188 fn from(value: prost::DecodeError) -> Self {
189 P2pError::ProtoDecodeFailed(tendermint_proto::Error::decode_message(value))
190 }
191}
192
193impl From<cid::Error> for P2pError {
194 fn from(value: cid::Error) -> Self {
195 P2pError::Cid(CidError::InvalidCid(value.to_string()))
196 }
197}
198
199#[derive(Debug)]
201pub(crate) struct P2p {
202 cancellation_token: CancellationToken,
203 cmd_tx: mpsc::Sender<P2pCmd>,
204 join_handle: JoinHandle,
205 peer_tracker_info_watcher: watch::Receiver<PeerTrackerInfo>,
206 local_peer_id: PeerId,
207}
208
209pub struct P2pArgs<B, S>
211where
212 B: Blockstore,
213 S: Store,
214{
215 pub network_id: String,
217 pub local_keypair: Keypair,
219 pub bootnodes: Vec<Multiaddr>,
221 pub listen_on: Vec<Multiaddr>,
223 pub blockstore: Arc<B>,
225 pub store: Arc<S>,
227 pub event_pub: EventPublisher,
229}
230
231#[derive(Debug)]
232pub(crate) enum P2pCmd {
233 NetworkInfo {
234 respond_to: oneshot::Sender<NetworkInfo>,
235 },
236 HeaderExRequest {
237 request: HeaderRequest,
238 respond_to: OneshotResultSender<Vec<ExtendedHeader>, P2pError>,
239 },
240 Listeners {
241 respond_to: oneshot::Sender<Vec<Multiaddr>>,
242 },
243 ConnectedPeers {
244 respond_to: oneshot::Sender<Vec<PeerId>>,
245 },
246 InitHeaderSub {
247 head: Box<ExtendedHeader>,
248 channel: mpsc::Sender<ExtendedHeader>,
250 },
251 SetPeerTrust {
252 peer_id: PeerId,
253 is_trusted: bool,
254 },
255 #[cfg(any(test, feature = "test-utils"))]
256 MarkAsArchival {
257 peer_id: PeerId,
258 },
259 GetShwapCid {
260 cid: Cid,
261 respond_to: OneshotResultSender<Vec<u8>, P2pError>,
262 },
263 GetNetworkCompromisedToken {
264 respond_to: oneshot::Sender<Token>,
265 },
266 GetNetworkHead {
267 respond_to: oneshot::Sender<Option<ExtendedHeader>>,
268 },
269 #[allow(dead_code)]
272 GetRow {
273 row_index: u16,
274 block_height: u64,
275 respond_to: OneshotResultSender<Row, P2pError>,
276 },
277 #[allow(dead_code)]
280 GetSample {
281 row_index: u16,
282 column_index: u16,
283 block_height: u64,
284 respond_to: OneshotResultSender<Sample, P2pError>,
285 },
286 GetNamespaceData {
287 namespace: Namespace,
288 block_height: u64,
289 respond_to: OneshotResultSender<NamespaceData, P2pError>,
290 },
291 GetEds {
292 block_height: u64,
293 respond_to: OneshotResultSender<ExtendedDataSquare, P2pError>,
294 },
295}
296
297impl P2p {
298 pub async fn start<B, S>(args: P2pArgs<B, S>) -> Result<Self>
300 where
301 B: Blockstore + 'static,
302 S: Store + 'static,
303 {
304 validate_bootnode_addrs(&args.bootnodes)?;
305
306 let local_peer_id = PeerId::from(args.local_keypair.public());
307
308 let peer_tracker = PeerTracker::new(args.event_pub.clone());
309 let peer_tracker_info_watcher = peer_tracker.info_watcher();
310
311 let cancellation_token = CancellationToken::new();
312 let (cmd_tx, cmd_rx) = mpsc::channel(16);
313
314 let mut worker =
315 Worker::new(args, cancellation_token.child_token(), cmd_rx, peer_tracker).await?;
316
317 let join_handle = spawn(async move {
318 worker.run().await;
319 });
320
321 Ok(P2p {
322 cancellation_token,
323 cmd_tx,
324 join_handle,
325 peer_tracker_info_watcher,
326 local_peer_id,
327 })
328 }
329
330 #[cfg(test)]
332 pub fn mocked() -> (Self, crate::test_utils::MockP2pHandle) {
333 let (cmd_tx, cmd_rx) = mpsc::channel(16);
334 let (peer_tracker_tx, peer_tracker_rx) = watch::channel(PeerTrackerInfo::default());
335 let cancellation_token = CancellationToken::new();
336
337 let join_handle = spawn(async {});
339
340 let p2p = P2p {
341 cmd_tx: cmd_tx.clone(),
342 cancellation_token,
343 join_handle,
344 peer_tracker_info_watcher: peer_tracker_rx,
345 local_peer_id: PeerId::random(),
346 };
347
348 let handle = crate::test_utils::MockP2pHandle {
349 cmd_tx,
350 cmd_rx,
351 header_sub_tx: None,
352 peer_tracker_tx,
353 };
354
355 (p2p, handle)
356 }
357
358 pub fn stop(&self) {
360 self.cancellation_token.cancel();
362 }
363
364 pub async fn join(&self) {
366 self.join_handle.join().await;
367 }
368
369 pub fn local_peer_id(&self) -> &PeerId {
371 &self.local_peer_id
372 }
373
374 async fn send_command(&self, cmd: P2pCmd) -> Result<()> {
375 self.cmd_tx
376 .send(cmd)
377 .await
378 .map_err(|_| P2pError::WorkerDied)
379 }
380
381 pub fn peer_tracker_info_watcher(&self) -> watch::Receiver<PeerTrackerInfo> {
383 self.peer_tracker_info_watcher.clone()
384 }
385
386 pub fn peer_tracker_info(&self) -> watch::Ref<'_, PeerTrackerInfo> {
388 self.peer_tracker_info_watcher.borrow()
389 }
390
391 pub async fn init_header_sub(
393 &self,
394 head: ExtendedHeader,
395 channel: mpsc::Sender<ExtendedHeader>,
396 ) -> Result<()> {
397 self.send_command(P2pCmd::InitHeaderSub {
398 head: Box::new(head),
399 channel,
400 })
401 .await
402 }
403
404 pub async fn wait_connected(&self) -> Result<()> {
406 self.peer_tracker_info_watcher()
407 .wait_for(|info| info.num_connected_peers > 0)
408 .await
409 .map(drop)
410 .map_err(|_| P2pError::WorkerDied)
411 }
412
413 pub async fn wait_connected_trusted(&self) -> Result<()> {
415 self.peer_tracker_info_watcher()
416 .wait_for(|info| info.num_connected_trusted_peers > 0)
417 .await
418 .map(drop)
419 .map_err(|_| P2pError::WorkerDied)
420 }
421
422 pub async fn network_info(&self) -> Result<NetworkInfo> {
424 let (tx, rx) = oneshot::channel();
425
426 self.send_command(P2pCmd::NetworkInfo { respond_to: tx })
427 .await?;
428
429 Ok(rx.await?)
430 }
431
432 pub async fn header_ex_request(&self, request: HeaderRequest) -> Result<Vec<ExtendedHeader>> {
434 let (tx, rx) = oneshot::channel();
435
436 self.send_command(P2pCmd::HeaderExRequest {
437 request,
438 respond_to: tx,
439 })
440 .await?;
441
442 rx.await?
443 }
444
445 pub async fn get_head_header(&self) -> Result<ExtendedHeader> {
447 self.get_header_by_height(0).await
448 }
449
450 pub async fn get_header(&self, hash: Hash) -> Result<ExtendedHeader> {
452 self.header_ex_request(HeaderRequest {
453 data: Some(header_request::Data::Hash(hash.as_bytes().to_vec())),
454 amount: 1,
455 })
456 .await?
457 .into_iter()
458 .next()
459 .ok_or(HeaderExError::HeaderNotFound.into())
460 }
461
462 pub async fn get_header_by_height(&self, height: u64) -> Result<ExtendedHeader> {
464 self.header_ex_request(HeaderRequest {
465 data: Some(header_request::Data::Origin(height)),
466 amount: 1,
467 })
468 .await?
469 .into_iter()
470 .next()
471 .ok_or(HeaderExError::HeaderNotFound.into())
472 }
473
474 pub async fn get_verified_headers_range(
479 &self,
480 from: &ExtendedHeader,
481 amount: u64,
482 ) -> Result<Vec<ExtendedHeader>> {
483 from.validate().map_err(|_| HeaderExError::InvalidRequest)?;
485
486 let height = from.height() + 1;
487
488 let range = height..=height + amount - 1;
489
490 let mut session = HeaderSession::new(range, self.cmd_tx.clone());
491 let headers = session.run().await?;
492
493 from.verify_adjacent_range(&headers)
498 .map_err(|_| HeaderExError::InvalidResponse)?;
499
500 Ok(headers)
501 }
502
503 pub(crate) async fn get_unverified_header_range(
508 &self,
509 range: BlockRange,
510 ) -> Result<Vec<ExtendedHeader>> {
511 if range.is_empty() {
512 return Err(HeaderExError::InvalidRequest.into());
513 }
514
515 let mut session = HeaderSession::new(range, self.cmd_tx.clone());
516 let headers = session.run().await?;
517
518 let Some(head) = headers.first() else {
519 return Err(HeaderExError::InvalidResponse.into());
520 };
521
522 head.verify_adjacent_range(&headers[1..])
523 .map_err(|_| HeaderExError::InvalidResponse)?;
524
525 Ok(headers)
526 }
527
528 pub(crate) async fn get_shwap_cid(
530 &self,
531 cid: Cid,
532 timeout: Option<Duration>,
533 ) -> Result<Vec<u8>> {
534 let (tx, rx) = oneshot::channel();
535
536 self.send_command(P2pCmd::GetShwapCid {
537 cid,
538 respond_to: tx,
539 })
540 .await?;
541
542 let data = match timeout {
543 Some(dur) => time::timeout(dur, rx)
544 .await
545 .map_err(|_| P2pError::RequestTimedOut)???,
546 None => rx.await??,
547 };
548
549 get_block_container(&cid, &data)
550 }
551
552 pub async fn get_row(
554 &self,
555 row_index: u16,
556 block_height: u64,
557 timeout: Option<Duration>,
558 ) -> Result<Row> {
559 let id = RowId::new(row_index, block_height)?;
560 let cid = convert_cid(&id.into())?;
561
562 let data = self.get_shwap_cid(cid, timeout).await?;
563 let row = Row::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?;
564 Ok(row)
565 }
566
567 pub async fn get_sample(
569 &self,
570 row_index: u16,
571 column_index: u16,
572 block_height: u64,
573 timeout: Option<Duration>,
574 ) -> Result<Sample> {
575 let id = SampleId::new(row_index, column_index, block_height)?;
576 let cid = convert_cid(&id.into())?;
577
578 let data = self.get_shwap_cid(cid, timeout).await?;
579 let sample = Sample::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?;
580 Ok(sample)
581 }
582
583 pub async fn get_eds(
584 &self,
585 block_height: u64,
586 timeout: Option<Duration>,
587 ) -> Result<ExtendedDataSquare> {
588 let (tx, rx) = oneshot::channel();
589
590 self.send_command(P2pCmd::GetEds {
591 block_height,
592 respond_to: tx,
593 })
594 .await?;
595
596 match timeout {
597 Some(dur) => time::timeout(dur, rx)
598 .await
599 .map_err(|_| P2pError::RequestTimedOut)??,
600 None => rx.await?,
601 }
602 }
603
604 pub async fn get_row_namespace_data(
606 &self,
607 namespace: Namespace,
608 row_index: u16,
609 block_height: u64,
610 timeout: Option<Duration>,
611 ) -> Result<RowNamespaceData> {
612 let id = RowNamespaceDataId::new(namespace, row_index, block_height)?;
613 let cid = convert_cid(&id.into())?;
614
615 let data = self.get_shwap_cid(cid, timeout).await?;
616 let row_namespace_data =
617 RowNamespaceData::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?;
618 Ok(row_namespace_data)
619 }
620
621 pub async fn get_namespace_data(
622 &self,
623 namespace: Namespace,
624 block_height: u64,
625 timeout: Option<Duration>,
626 ) -> Result<NamespaceData> {
627 let (tx, rx) = oneshot::channel();
628
629 self.send_command(P2pCmd::GetNamespaceData {
630 namespace,
631 block_height,
632 respond_to: tx,
633 })
634 .await?;
635
636 match timeout {
637 Some(dur) => time::timeout(dur, rx)
638 .await
639 .map_err(|_| P2pError::RequestTimedOut)??,
640 None => rx.await?,
641 }
642 }
643
644 pub async fn get_all_blobs(
647 &self,
648 namespace: Namespace,
649 block_height: u64,
650 timeout: Option<Duration>,
651 ) -> Result<Vec<Blob>> {
652 let namespace_data = self
653 .get_namespace_data(namespace, block_height, timeout)
654 .await?;
655
656 let shares = namespace_data
657 .rows()
658 .iter()
659 .flat_map(|row| row.shares.iter());
660
661 Ok(Blob::reconstruct_all(shares)?)
662 }
663
664 pub async fn listeners(&self) -> Result<Vec<Multiaddr>> {
666 let (tx, rx) = oneshot::channel();
667
668 self.send_command(P2pCmd::Listeners { respond_to: tx })
669 .await?;
670
671 Ok(rx.await?)
672 }
673
674 pub async fn connected_peers(&self) -> Result<Vec<PeerId>> {
676 let (tx, rx) = oneshot::channel();
677
678 self.send_command(P2pCmd::ConnectedPeers { respond_to: tx })
679 .await?;
680
681 Ok(rx.await?)
682 }
683
684 pub async fn set_peer_trust(&self, peer_id: PeerId, is_trusted: bool) -> Result<()> {
686 self.send_command(P2pCmd::SetPeerTrust {
687 peer_id,
688 is_trusted,
689 })
690 .await
691 }
692
693 #[cfg(any(test, feature = "test-utils"))]
694 pub(crate) async fn mark_as_archival(&self, peer_id: PeerId) -> Result<()> {
695 self.send_command(P2pCmd::MarkAsArchival { peer_id }).await
696 }
697
698 pub(crate) async fn get_network_compromised_token(&self) -> Result<Token> {
703 let (tx, rx) = oneshot::channel();
704
705 self.send_command(P2pCmd::GetNetworkCompromisedToken { respond_to: tx })
706 .await?;
707
708 Ok(rx.await?)
709 }
710
711 pub async fn get_network_head(&self) -> Result<Option<ExtendedHeader>> {
713 let (tx, rx) = oneshot::channel();
714
715 self.send_command(P2pCmd::GetNetworkHead { respond_to: tx })
716 .await?;
717
718 Ok(rx.await?)
719 }
720}
721
722impl Drop for P2p {
723 fn drop(&mut self) {
724 self.stop();
725 }
726}
727
728#[derive(NetworkBehaviour)]
729struct Behaviour<B, S>
730where
731 B: Blockstore + 'static,
732 S: Store + 'static,
733{
734 bitswap: beetswap::Behaviour<MAX_MH_SIZE, B>,
735 header_ex: header_ex::Behaviour<S>,
736 shr_ex: shrex::Behaviour<S>,
737 gossipsub: gossipsub::Behaviour,
738}
739
740struct Worker<B, S>
741where
742 B: Blockstore + 'static,
743 S: Store + 'static,
744{
745 cancellation_token: CancellationToken,
746 swarm: SwarmManager<Behaviour<B, S>>,
747 header_sub_topic_hash: TopicHash,
748 bad_encoding_fraud_sub_topic: TopicHash,
749 cmd_rx: mpsc::Receiver<P2pCmd>,
750 header_sub_state: Option<HeaderSubState>,
751 bitswap_queries: HashMap<beetswap::QueryId, OneshotResultSender<Vec<u8>, P2pError>>,
752 network_compromised_token: Token,
753 store: Arc<S>,
754}
755
756struct HeaderSubState {
757 known_head: ExtendedHeader,
758 channel: mpsc::Sender<ExtendedHeader>,
759}
760
761impl<B, S> Worker<B, S>
762where
763 B: Blockstore,
764 S: Store,
765{
766 async fn new(
767 args: P2pArgs<B, S>,
768 cancellation_token: CancellationToken,
769 cmd_rx: mpsc::Receiver<P2pCmd>,
770 peer_tracker: PeerTracker,
771 ) -> Result<Self, P2pError> {
772 let mut swarm = SwarmManager::new(
773 &args.network_id,
774 &args.local_keypair,
775 &args.bootnodes,
776 &args.listen_on,
777 peer_tracker,
778 args.event_pub.clone(),
779 )
780 .await?;
781
782 let header_sub_topic = gossipsub_ident_topic(&args.network_id, "/header-sub/v0.0.1");
783 let bad_encoding_fraud_sub_topic =
784 fraudsub_ident_topic(BadEncodingFraudProof::TYPE, &args.network_id);
785 let gossipsub = init_gossipsub(&args, [&header_sub_topic, &bad_encoding_fraud_sub_topic])?;
786
787 let bitswap = init_bitswap(
788 args.blockstore.clone(),
789 args.store.clone(),
790 &args.network_id,
791 )?;
792
793 let header_ex = header_ex::Behaviour::new(header_ex::Config {
794 network_id: &args.network_id,
795 header_store: args.store.clone(),
796 });
797
798 let shr_ex = shrex::Behaviour::new(shrex::Config {
799 network_id: &args.network_id,
800 local_keypair: &args.local_keypair,
801 header_store: args.store.clone(),
802 stream_ctrl: swarm.stream_control(),
803 })?;
804
805 swarm.attach_behaviour(Behaviour {
806 bitswap,
807 gossipsub,
808 header_ex,
809 shr_ex,
810 });
811
812 Ok(Worker {
813 cancellation_token,
814 swarm,
815 cmd_rx,
816 bad_encoding_fraud_sub_topic: bad_encoding_fraud_sub_topic.hash(),
817 header_sub_topic_hash: header_sub_topic.hash(),
818 header_sub_state: None,
819 bitswap_queries: HashMap::new(),
820 network_compromised_token: Token::new(),
821 store: args.store,
822 })
823 }
824
825 async fn run(&mut self) {
826 let mut report_interval = Interval::new(Duration::from_secs(60));
827
828 loop {
829 select! {
830 _ = self.cancellation_token.cancelled() => break,
831 _ = report_interval.tick() => {
832 self.report();
833 }
834 _ = poll_closed(&mut self.bitswap_queries) => {
835 self.prune_canceled_bitswap_queries();
836 }
837 res = self.swarm.poll() => {
838 match res {
839 Ok(ev) => {
840 if let Err(e) = self.on_behaviour_event(ev).await {
841 warn!("Failure while handling behaviour event: {e}");
842 }
843 }
844 Err(e) => warn!("Failure while polling SwarmManager: {e}"),
845 }
846 }
847 Some(cmd) = self.cmd_rx.recv() => {
848 if let Err(e) = self.on_cmd(cmd).await {
849 warn!("Failure while handling command. (error: {e})");
850 }
851 }
852 }
853 }
854
855 self.swarm.context().behaviour.header_ex.stop();
856 self.swarm.context().behaviour.shr_ex.stop();
857 self.swarm.stop().await;
858 }
859
860 fn prune_canceled_bitswap_queries(&mut self) {
861 let mut cancelled = SmallVec::<[_; 16]>::new();
862
863 for (query_id, chan) in &self.bitswap_queries {
864 if chan.is_closed() {
865 cancelled.push(*query_id);
866 }
867 }
868
869 for query_id in cancelled {
870 self.bitswap_queries.remove(&query_id);
871 self.swarm.context().behaviour.bitswap.cancel(query_id);
872 }
873 }
874
875 async fn on_behaviour_event(&mut self, ev: BehaviourEvent<B, S>) -> Result<()> {
876 match ev {
877 BehaviourEvent::Gossipsub(ev) => self.on_gossip_sub_event(ev).await,
878 BehaviourEvent::Bitswap(ev) => self.on_bitswap_event(ev).await,
879 BehaviourEvent::HeaderEx(ev) => self.on_header_ex_event(ev).await,
880 BehaviourEvent::ShrEx(ev) => self.on_shrex_event(ev).await,
881 }
882
883 Ok(())
884 }
885
886 async fn on_cmd(&mut self, cmd: P2pCmd) -> Result<()> {
887 match cmd {
888 P2pCmd::NetworkInfo { respond_to } => {
889 respond_to.maybe_send(self.swarm.network_info());
890 }
891 P2pCmd::HeaderExRequest {
892 request,
893 respond_to,
894 } => {
895 self.swarm
896 .context()
897 .behaviour
898 .header_ex
899 .send_request(request, respond_to);
900 }
901 P2pCmd::Listeners { respond_to } => {
902 respond_to.maybe_send(self.swarm.listeners());
903 }
904 P2pCmd::ConnectedPeers { respond_to } => {
905 let peers = self
906 .swarm
907 .context()
908 .peer_tracker
909 .peers()
910 .filter_map(|peer| {
911 if peer.is_connected() {
912 Some(*peer.id())
913 } else {
914 None
915 }
916 })
917 .collect();
918 respond_to.maybe_send(peers);
919 }
920 P2pCmd::InitHeaderSub { head, channel } => {
921 self.on_init_header_sub(*head, channel);
922 }
923 P2pCmd::SetPeerTrust {
924 peer_id,
925 is_trusted,
926 } => {
927 self.swarm.set_peer_trust(&peer_id, is_trusted);
928 }
929 #[cfg(any(test, feature = "test-utils"))]
930 P2pCmd::MarkAsArchival { peer_id } => {
931 self.swarm.mark_as_archival(&peer_id);
932 }
933 P2pCmd::GetShwapCid { cid, respond_to } => {
934 self.on_get_shwap_cid(cid, respond_to);
935 }
936 P2pCmd::GetNetworkCompromisedToken { respond_to } => {
937 respond_to.maybe_send(self.network_compromised_token.clone())
938 }
939 P2pCmd::GetNetworkHead { respond_to } => {
940 let head = self
941 .header_sub_state
942 .as_ref()
943 .map(|state| state.known_head.clone());
944 respond_to.maybe_send(head);
945 }
946 P2pCmd::GetRow {
947 row_index,
948 block_height,
949 respond_to,
950 } => {
951 self.swarm
952 .context()
953 .behaviour
954 .shr_ex
955 .get_row(block_height, row_index, respond_to)
956 .await
957 }
958 P2pCmd::GetSample {
959 row_index,
960 column_index,
961 block_height,
962 respond_to,
963 } => {
964 self.swarm
965 .context()
966 .behaviour
967 .shr_ex
968 .get_sample(block_height, row_index, column_index, respond_to)
969 .await
970 }
971 P2pCmd::GetNamespaceData {
972 namespace,
973 block_height,
974 respond_to,
975 } => {
976 self.swarm
977 .context()
978 .behaviour
979 .shr_ex
980 .get_namespace_data(block_height, namespace, respond_to)
981 .await
982 }
983 P2pCmd::GetEds {
984 block_height,
985 respond_to,
986 } => {
987 self.swarm
988 .context()
989 .behaviour
990 .shr_ex
991 .get_eds(block_height, respond_to)
992 .await
993 }
994 }
995
996 Ok(())
997 }
998
999 #[instrument(skip_all)]
1000 fn report(&mut self) {
1001 let tracker_info = self.swarm.context().peer_tracker.info();
1002
1003 info!(
1004 "peers: {}, trusted peers: {}",
1005 tracker_info.num_connected_peers, tracker_info.num_connected_trusted_peers,
1006 );
1007 }
1008
1009 #[instrument(level = "trace", skip(self))]
1010 async fn on_gossip_sub_event(&mut self, ev: gossipsub::Event) {
1011 match ev {
1012 gossipsub::Event::Message {
1013 message,
1014 message_id,
1015 ..
1016 } => {
1017 let Some(peer) = message.source else {
1018 return;
1020 };
1021
1022 let acceptance = if message.topic == self.header_sub_topic_hash {
1023 self.on_header_sub_message(&message.data[..])
1024 } else if message.topic == self.bad_encoding_fraud_sub_topic {
1025 self.on_bad_encoding_fraud_sub_message(&message.data[..], &peer)
1026 .await
1027 } else {
1028 trace!("Unhandled gossipsub message");
1029 gossipsub::MessageAcceptance::Ignore
1030 };
1031
1032 if !matches!(acceptance, gossipsub::MessageAcceptance::Reject) {
1033 self.swarm.peer_maybe_discovered(&peer);
1035 }
1036
1037 let _ = self
1038 .swarm
1039 .context()
1040 .behaviour
1041 .gossipsub
1042 .report_message_validation_result(&message_id, &peer, acceptance);
1043 }
1044 _ => trace!("Unhandled gossipsub event"),
1045 }
1046 }
1047
1048 #[instrument(level = "trace", skip_all)]
1049 fn on_get_shwap_cid(&mut self, cid: Cid, respond_to: OneshotResultSender<Vec<u8>, P2pError>) {
1050 trace!("Requesting CID {cid} from bitswap");
1051 let query_id = self.swarm.context().behaviour.bitswap.get(&cid);
1052 self.bitswap_queries.insert(query_id, respond_to);
1053 }
1054
1055 #[instrument(level = "trace", skip(self))]
1056 async fn on_bitswap_event(&mut self, ev: beetswap::Event) {
1057 match ev {
1058 beetswap::Event::GetQueryResponse { query_id, data } => {
1059 if let Some(respond_to) = self.bitswap_queries.remove(&query_id) {
1060 respond_to.maybe_send_ok(data);
1061 }
1062 }
1063 beetswap::Event::GetQueryError { query_id, error } => {
1064 if let Some(respond_to) = self.bitswap_queries.remove(&query_id) {
1065 let error: P2pError = error.into();
1066 respond_to.maybe_send_err(error);
1067 }
1068 }
1069 }
1070 }
1071
1072 #[instrument(level = "trace", skip(self))]
1073 async fn on_header_ex_event(&mut self, ev: header_ex::Event) {
1074 match ev {
1075 header_ex::Event::SchedulePendingRequests => {
1076 let ctx = self.swarm.context();
1077
1078 ctx.behaviour
1079 .header_ex
1080 .schedule_pending_requests(ctx.peer_tracker);
1081 }
1082 header_ex::Event::NeedTrustedPeers => {
1083 self.swarm.connect_to_bootnodes();
1084 }
1085 header_ex::Event::NeedArchivalPeers => {
1086 self.swarm.start_archival_node_kad_query();
1087 }
1088 }
1089 }
1090
1091 #[instrument(level = "trace", skip(self))]
1092 async fn on_shrex_event(&mut self, ev: shrex::Event) {
1093 match ev {
1094 shrex::Event::SchedulePendingRequests => {
1095 let ctx = self.swarm.context();
1096
1097 ctx.behaviour
1098 .shr_ex
1099 .schedule_pending_requests(ctx.peer_tracker);
1100 }
1101
1102 shrex::Event::AddPeers(peers) => {
1103 let added = peers
1104 .iter()
1105 .filter(|peer| self.swarm.peer_maybe_discovered(peer))
1106 .count();
1107
1108 debug!("Added {added} peers discovered through shrex");
1109 }
1110
1111 shrex::Event::BlockPeers(peers) => {
1112 let blocked = peers
1113 .into_iter()
1114 .filter(|peer| self.swarm.blacklist_peer(peer))
1115 .count();
1116
1117 debug!("Blocked {blocked} peers for shrex missbehaviour");
1118 }
1119 }
1120 }
1121
1122 #[instrument(skip_all, fields(header = %head))]
1123 fn on_init_header_sub(&mut self, head: ExtendedHeader, channel: mpsc::Sender<ExtendedHeader>) {
1124 self.header_sub_state = Some(HeaderSubState {
1125 known_head: head,
1126 channel,
1127 });
1128 trace!("HeaderSub initialized");
1129 }
1130
1131 #[instrument(skip_all)]
1132 fn on_header_sub_message(&mut self, data: &[u8]) -> gossipsub::MessageAcceptance {
1133 let Ok(header) = ExtendedHeader::decode_and_validate(data) else {
1134 trace!("Malformed or invalid header from header-sub");
1135 return gossipsub::MessageAcceptance::Reject;
1136 };
1137
1138 trace!("Received header from header-sub ({header})");
1139
1140 let Some(ref mut state) = self.header_sub_state else {
1141 debug!("header-sub not initialized yet");
1142 return gossipsub::MessageAcceptance::Ignore;
1143 };
1144
1145 if state.known_head.verify(&header).is_err() {
1146 trace!("Failed to verify HeaderSub header. Ignoring {header}");
1147 return gossipsub::MessageAcceptance::Ignore;
1148 }
1149
1150 trace!("New header from header-sub ({header})");
1151
1152 state.known_head = header.clone();
1153 let _ = state.channel.try_send(header);
1156
1157 gossipsub::MessageAcceptance::Accept
1158 }
1159
1160 #[instrument(skip_all)]
1161 async fn on_bad_encoding_fraud_sub_message(
1162 &mut self,
1163 data: &[u8],
1164 peer: &PeerId,
1165 ) -> gossipsub::MessageAcceptance {
1166 let Ok(befp) = BadEncodingFraudProof::decode(data) else {
1167 trace!("Malformed bad encoding fraud proof from {peer}");
1168 self.swarm
1169 .context()
1170 .behaviour
1171 .gossipsub
1172 .blacklist_peer(peer);
1173 return gossipsub::MessageAcceptance::Reject;
1174 };
1175
1176 let height = befp.height();
1177
1178 let current_height = if let Some(ref header_sub_state) = self.header_sub_state {
1179 header_sub_state.known_head.height()
1180 } else if let Ok(local_head) = self.store.get_head().await {
1181 local_head.height()
1182 } else {
1183 return gossipsub::MessageAcceptance::Ignore;
1185 };
1186
1187 if height > current_height + FRAUD_PROOF_HEAD_HEIGHT_THRESHOLD {
1188 return gossipsub::MessageAcceptance::Ignore;
1191 }
1192
1193 let hash = befp.header_hash();
1194 let Ok(header) = self.store.get_by_hash(&hash).await else {
1195 return gossipsub::MessageAcceptance::Ignore;
1198 };
1199
1200 if let Err(e) = befp.validate(&header) {
1201 trace!("Received invalid bad encoding fraud proof from {peer}: {e}");
1202 self.swarm
1203 .context()
1204 .behaviour
1205 .gossipsub
1206 .blacklist_peer(peer);
1207 return gossipsub::MessageAcceptance::Reject;
1208 }
1209
1210 warn!("Received a valid bad encoding fraud proof");
1211 self.network_compromised_token.trigger();
1213
1214 gossipsub::MessageAcceptance::Accept
1215 }
1216}
1217
1218async fn poll_closed(
1220 bitswap_queries: &mut HashMap<beetswap::QueryId, OneshotResultSender<Vec<u8>, P2pError>>,
1221) {
1222 poll_fn(|cx| {
1223 for chan in bitswap_queries.values_mut() {
1224 match chan.poll_closed(cx) {
1225 Poll::Pending => continue,
1226 Poll::Ready(_) => return Poll::Ready(()),
1227 }
1228 }
1229
1230 Poll::Pending
1231 })
1232 .await
1233}
1234
1235fn validate_bootnode_addrs(addrs: &[Multiaddr]) -> Result<(), P2pError> {
1236 let mut invalid_addrs = Vec::new();
1237
1238 for addr in addrs {
1239 if addr.peer_id().is_none() {
1240 invalid_addrs.push(addr.to_owned());
1241 }
1242 }
1243
1244 if invalid_addrs.is_empty() {
1245 Ok(())
1246 } else {
1247 Err(P2pError::BootnodeAddrsWithoutPeerId(invalid_addrs))
1248 }
1249}
1250
1251fn init_gossipsub<'a, B, S>(
1252 args: &'a P2pArgs<B, S>,
1253 topics: impl IntoIterator<Item = &'a gossipsub::IdentTopic>,
1254) -> Result<gossipsub::Behaviour>
1255where
1256 B: Blockstore,
1257 S: Store,
1258{
1259 let message_authenticity = gossipsub::MessageAuthenticity::Signed(args.local_keypair.clone());
1262
1263 let config = gossipsub::ConfigBuilder::default()
1264 .validation_mode(gossipsub::ValidationMode::Strict)
1265 .validate_messages()
1266 .build()
1267 .map_err(|e| P2pError::GossipsubInit(e.to_string()))?;
1268
1269 let mut gossipsub: gossipsub::Behaviour =
1271 gossipsub::Behaviour::new(message_authenticity, config)
1272 .map_err(|e| P2pError::GossipsubInit(e.to_string()))?;
1273
1274 for topic in topics {
1275 gossipsub
1276 .subscribe(topic)
1277 .map_err(|e| P2pError::GossipsubInit(e.to_string()))?;
1278 }
1279
1280 Ok(gossipsub)
1281}
1282
1283fn init_bitswap<B, S>(
1284 blockstore: Arc<B>,
1285 store: Arc<S>,
1286 network_id: &str,
1287) -> Result<beetswap::Behaviour<MAX_MH_SIZE, B>>
1288where
1289 B: Blockstore + 'static,
1290 S: Store + 'static,
1291{
1292 let protocol_prefix = celestia_protocol_id(network_id, "shwap");
1293
1294 Ok(beetswap::Behaviour::builder(blockstore)
1295 .protocol_prefix(protocol_prefix.as_ref())?
1296 .register_multihasher(ShwapMultihasher::new(store))
1297 .client_set_send_dont_have(false)
1298 .build())
1299}