1use std::collections::HashMap;
17use std::future::poll_fn;
18use std::sync::Arc;
19use std::task::Poll;
20use std::time::Duration;
21
22use blockstore::Blockstore;
23use celestia_proto::p2p::pb::{header_request, HeaderRequest};
24use celestia_types::fraud_proof::BadEncodingFraudProof;
25use celestia_types::hash::Hash;
26use celestia_types::nmt::{Namespace, NamespacedSha2Hasher};
27use celestia_types::row::{Row, RowId};
28use celestia_types::row_namespace_data::{RowNamespaceData, RowNamespaceDataId};
29use celestia_types::sample::{Sample, SampleId};
30use celestia_types::{Blob, ExtendedHeader, FraudProof};
31use cid::Cid;
32use futures::stream::FuturesOrdered;
33use futures::{StreamExt, TryStreamExt};
34use libp2p::core::transport::ListenerId;
35use libp2p::{
36 autonat,
37 core::{ConnectedPoint, Endpoint},
38 gossipsub::{self, TopicHash},
39 identify,
40 identity::Keypair,
41 kad,
42 multiaddr::Protocol,
43 ping,
44 swarm::{
45 dial_opts::{DialOpts, PeerCondition},
46 ConnectionId, DialError, NetworkBehaviour, NetworkInfo, Swarm, SwarmEvent,
47 },
48 Multiaddr, PeerId,
49};
50use lumina_utils::executor::{spawn, JoinHandle};
51use lumina_utils::time::{self, Interval};
52use lumina_utils::token::Token;
53use smallvec::SmallVec;
54use tendermint_proto::Protobuf;
55use tokio::select;
56use tokio::sync::{mpsc, oneshot, watch};
57use tokio_util::sync::CancellationToken;
58use tracing::{debug, error, info, instrument, trace, warn};
59
60mod connection_control;
61mod header_ex;
62pub(crate) mod header_session;
63pub(crate) mod shwap;
64mod swarm;
65
66use crate::block_ranges::BlockRange;
67use crate::events::{EventPublisher, NodeEvent};
68use crate::p2p::header_ex::{HeaderExBehaviour, HeaderExConfig};
69use crate::p2p::header_session::HeaderSession;
70use crate::p2p::shwap::{convert_cid, get_block_container, ShwapMultihasher};
71use crate::p2p::swarm::new_swarm;
72use crate::peer_tracker::PeerTracker;
73use crate::peer_tracker::PeerTrackerInfo;
74use crate::store::Store;
75use crate::utils::{
76 celestia_protocol_id, fraudsub_ident_topic, gossipsub_ident_topic, MultiaddrExt,
77 OneshotResultSender, OneshotResultSenderExt, OneshotSenderExt,
78};
79
80pub use crate::p2p::header_ex::HeaderExError;
81
82const MIN_CONNECTED_PEERS: u64 = 4;
86
87pub(crate) const MAX_MH_SIZE: usize = 64;
89
90const FRAUD_PROOF_HEAD_HEIGHT_THRESHOLD: u64 = 20;
93
94pub(crate) type Result<T, E = P2pError> = std::result::Result<T, E>;
95
96#[derive(Debug, thiserror::Error)]
98pub enum P2pError {
99 #[error("Failed to initialize gossipsub behaviour: {0}")]
101 GossipsubInit(String),
102
103 #[error("Failed to initialize TLS: {0}")]
105 TlsInit(String),
106
107 #[error("Failed to initialize noise: {0}")]
109 NoiseInit(String),
110
111 #[error("Worker died")]
113 WorkerDied,
114
115 #[error("Channel closed unexpectedly")]
117 ChannelClosedUnexpectedly,
118
119 #[error("Not connected to any peers")]
121 NoConnectedPeers,
122
123 #[error("HeaderEx: {0}")]
125 HeaderEx(#[from] HeaderExError),
126
127 #[error("Bootnode multiaddrs without peer ID: {0:?}")]
129 BootnodeAddrsWithoutPeerId(Vec<Multiaddr>),
130
131 #[error("Bitswap: {0}")]
133 Bitswap(#[from] beetswap::Error),
134
135 #[error("ProtoBuf decoding error: {0}")]
137 ProtoDecodeFailed(#[from] tendermint_proto::Error),
138
139 #[error("CID error: {0}")]
141 Cid(celestia_types::Error),
142
143 #[error("Bitswap query timed out")]
145 BitswapQueryTimeout,
146
147 #[error("Shwap: {0}")]
149 Shwap(String),
150
151 #[error(transparent)]
153 CelestiaTypes(#[from] celestia_types::Error),
154}
155
156impl P2pError {
157 pub(crate) fn is_fatal(&self) -> bool {
161 match self {
162 P2pError::GossipsubInit(_)
163 | P2pError::NoiseInit(_)
164 | P2pError::TlsInit(_)
165 | P2pError::WorkerDied
166 | P2pError::ChannelClosedUnexpectedly
167 | P2pError::BootnodeAddrsWithoutPeerId(_) => true,
168 P2pError::NoConnectedPeers
169 | P2pError::HeaderEx(_)
170 | P2pError::Bitswap(_)
171 | P2pError::ProtoDecodeFailed(_)
172 | P2pError::Cid(_)
173 | P2pError::BitswapQueryTimeout
174 | P2pError::Shwap(_)
175 | P2pError::CelestiaTypes(_) => false,
176 }
177 }
178}
179
180impl From<oneshot::error::RecvError> for P2pError {
181 fn from(_value: oneshot::error::RecvError) -> Self {
182 P2pError::ChannelClosedUnexpectedly
183 }
184}
185
186impl From<prost::DecodeError> for P2pError {
187 fn from(value: prost::DecodeError) -> Self {
188 P2pError::ProtoDecodeFailed(tendermint_proto::Error::decode_message(value))
189 }
190}
191
192impl From<cid::Error> for P2pError {
193 fn from(value: cid::Error) -> Self {
194 P2pError::Cid(celestia_types::Error::CidError(
195 blockstore::block::CidError::InvalidCid(value.to_string()),
196 ))
197 }
198}
199
200#[derive(Debug)]
202pub(crate) struct P2p {
203 cancellation_token: CancellationToken,
204 cmd_tx: mpsc::Sender<P2pCmd>,
205 join_handle: JoinHandle,
206 peer_tracker_info_watcher: watch::Receiver<PeerTrackerInfo>,
207 local_peer_id: PeerId,
208}
209
210pub struct P2pArgs<B, S>
212where
213 B: Blockstore,
214 S: Store,
215{
216 pub network_id: String,
218 pub local_keypair: Keypair,
220 pub bootnodes: Vec<Multiaddr>,
222 pub listen_on: Vec<Multiaddr>,
224 pub blockstore: Arc<B>,
226 pub store: Arc<S>,
228 pub event_pub: EventPublisher,
230}
231
232#[derive(Debug)]
233pub(crate) enum P2pCmd {
234 NetworkInfo {
235 respond_to: oneshot::Sender<NetworkInfo>,
236 },
237 HeaderExRequest {
238 request: HeaderRequest,
239 respond_to: OneshotResultSender<Vec<ExtendedHeader>, P2pError>,
240 },
241 Listeners {
242 respond_to: oneshot::Sender<Vec<Multiaddr>>,
243 },
244 ConnectedPeers {
245 respond_to: oneshot::Sender<Vec<PeerId>>,
246 },
247 InitHeaderSub {
248 head: Box<ExtendedHeader>,
249 channel: mpsc::Sender<ExtendedHeader>,
251 },
252 SetPeerTrust {
253 peer_id: PeerId,
254 is_trusted: bool,
255 },
256 GetShwapCid {
257 cid: Cid,
258 respond_to: OneshotResultSender<Vec<u8>, P2pError>,
259 },
260 GetNetworkCompromisedToken {
261 respond_to: oneshot::Sender<Token>,
262 },
263 GetNetworkHead {
264 respond_to: oneshot::Sender<Option<ExtendedHeader>>,
265 },
266}
267
268impl P2p {
269 pub async fn start<B, S>(args: P2pArgs<B, S>) -> Result<Self>
271 where
272 B: Blockstore + 'static,
273 S: Store + 'static,
274 {
275 validate_bootnode_addrs(&args.bootnodes)?;
276
277 let local_peer_id = PeerId::from(args.local_keypair.public());
278
279 let peer_tracker = Arc::new(PeerTracker::new(args.event_pub.clone()));
280 let peer_tracker_info_watcher = peer_tracker.info_watcher();
281
282 let cancellation_token = CancellationToken::new();
283 let (cmd_tx, cmd_rx) = mpsc::channel(16);
284
285 let mut worker =
286 Worker::new(args, cancellation_token.child_token(), cmd_rx, peer_tracker).await?;
287
288 let join_handle = spawn(async move {
289 worker.run().await;
290 });
291
292 Ok(P2p {
293 cancellation_token,
294 cmd_tx,
295 join_handle,
296 peer_tracker_info_watcher,
297 local_peer_id,
298 })
299 }
300
301 #[cfg(test)]
303 pub fn mocked() -> (Self, crate::test_utils::MockP2pHandle) {
304 let (cmd_tx, cmd_rx) = mpsc::channel(16);
305 let (peer_tracker_tx, peer_tracker_rx) = watch::channel(PeerTrackerInfo::default());
306 let cancellation_token = CancellationToken::new();
307
308 let join_handle = spawn(async {});
310
311 let p2p = P2p {
312 cmd_tx: cmd_tx.clone(),
313 cancellation_token,
314 join_handle,
315 peer_tracker_info_watcher: peer_tracker_rx,
316 local_peer_id: PeerId::random(),
317 };
318
319 let handle = crate::test_utils::MockP2pHandle {
320 cmd_tx,
321 cmd_rx,
322 header_sub_tx: None,
323 peer_tracker_tx,
324 };
325
326 (p2p, handle)
327 }
328
329 pub fn stop(&self) {
331 self.cancellation_token.cancel();
333 }
334
335 pub async fn join(&self) {
337 self.join_handle.join().await;
338 }
339
340 pub fn local_peer_id(&self) -> &PeerId {
342 &self.local_peer_id
343 }
344
345 async fn send_command(&self, cmd: P2pCmd) -> Result<()> {
346 self.cmd_tx
347 .send(cmd)
348 .await
349 .map_err(|_| P2pError::WorkerDied)
350 }
351
352 pub fn peer_tracker_info_watcher(&self) -> watch::Receiver<PeerTrackerInfo> {
354 self.peer_tracker_info_watcher.clone()
355 }
356
357 pub fn peer_tracker_info(&self) -> watch::Ref<PeerTrackerInfo> {
359 self.peer_tracker_info_watcher.borrow()
360 }
361
362 pub async fn init_header_sub(
364 &self,
365 head: ExtendedHeader,
366 channel: mpsc::Sender<ExtendedHeader>,
367 ) -> Result<()> {
368 self.send_command(P2pCmd::InitHeaderSub {
369 head: Box::new(head),
370 channel,
371 })
372 .await
373 }
374
375 pub async fn wait_connected(&self) -> Result<()> {
377 self.peer_tracker_info_watcher()
378 .wait_for(|info| info.num_connected_peers > 0)
379 .await
380 .map(drop)
381 .map_err(|_| P2pError::WorkerDied)
382 }
383
384 pub async fn wait_connected_trusted(&self) -> Result<()> {
386 self.peer_tracker_info_watcher()
387 .wait_for(|info| info.num_connected_trusted_peers > 0)
388 .await
389 .map(drop)
390 .map_err(|_| P2pError::WorkerDied)
391 }
392
393 pub async fn network_info(&self) -> Result<NetworkInfo> {
395 let (tx, rx) = oneshot::channel();
396
397 self.send_command(P2pCmd::NetworkInfo { respond_to: tx })
398 .await?;
399
400 Ok(rx.await?)
401 }
402
403 pub async fn header_ex_request(&self, request: HeaderRequest) -> Result<Vec<ExtendedHeader>> {
405 let (tx, rx) = oneshot::channel();
406
407 self.send_command(P2pCmd::HeaderExRequest {
408 request,
409 respond_to: tx,
410 })
411 .await?;
412
413 rx.await?
414 }
415
416 pub async fn get_head_header(&self) -> Result<ExtendedHeader> {
418 self.get_header_by_height(0).await
419 }
420
421 pub async fn get_header(&self, hash: Hash) -> Result<ExtendedHeader> {
423 self.header_ex_request(HeaderRequest {
424 data: Some(header_request::Data::Hash(hash.as_bytes().to_vec())),
425 amount: 1,
426 })
427 .await?
428 .into_iter()
429 .next()
430 .ok_or(HeaderExError::HeaderNotFound.into())
431 }
432
433 pub async fn get_header_by_height(&self, height: u64) -> Result<ExtendedHeader> {
435 self.header_ex_request(HeaderRequest {
436 data: Some(header_request::Data::Origin(height)),
437 amount: 1,
438 })
439 .await?
440 .into_iter()
441 .next()
442 .ok_or(HeaderExError::HeaderNotFound.into())
443 }
444
445 pub async fn get_verified_headers_range(
450 &self,
451 from: &ExtendedHeader,
452 amount: u64,
453 ) -> Result<Vec<ExtendedHeader>> {
454 from.validate().map_err(|_| HeaderExError::InvalidRequest)?;
456
457 let height = from.height().value() + 1;
458
459 let range = height..=height + amount - 1;
460
461 let mut session = HeaderSession::new(range, self.cmd_tx.clone());
462 let headers = session.run().await?;
463
464 from.verify_adjacent_range(&headers)
469 .map_err(|_| HeaderExError::InvalidResponse)?;
470
471 Ok(headers)
472 }
473
474 pub(crate) async fn get_unverified_header_range(
479 &self,
480 range: BlockRange,
481 ) -> Result<Vec<ExtendedHeader>> {
482 if range.is_empty() {
483 return Err(HeaderExError::InvalidRequest.into());
484 }
485
486 let mut session = HeaderSession::new(range, self.cmd_tx.clone());
487 let headers = session.run().await?;
488
489 let Some(head) = headers.first() else {
490 return Err(HeaderExError::InvalidResponse.into());
491 };
492
493 head.verify_adjacent_range(&headers[1..])
494 .map_err(|_| HeaderExError::InvalidResponse)?;
495
496 Ok(headers)
497 }
498
499 pub(crate) async fn get_shwap_cid(
501 &self,
502 cid: Cid,
503 timeout: Option<Duration>,
504 ) -> Result<Vec<u8>> {
505 let (tx, rx) = oneshot::channel();
506
507 self.send_command(P2pCmd::GetShwapCid {
508 cid,
509 respond_to: tx,
510 })
511 .await?;
512
513 let data = match timeout {
514 Some(dur) => time::timeout(dur, rx)
515 .await
516 .map_err(|_| P2pError::BitswapQueryTimeout)???,
517 None => rx.await??,
518 };
519
520 get_block_container(&cid, &data)
521 }
522
523 pub async fn get_row(
525 &self,
526 row_index: u16,
527 block_height: u64,
528 timeout: Option<Duration>,
529 ) -> Result<Row> {
530 let id = RowId::new(row_index, block_height).map_err(P2pError::Cid)?;
531 let cid = convert_cid(&id.into())?;
532
533 let data = self.get_shwap_cid(cid, timeout).await?;
534 let row = Row::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?;
535 Ok(row)
536 }
537
538 pub async fn get_sample(
540 &self,
541 row_index: u16,
542 column_index: u16,
543 block_height: u64,
544 timeout: Option<Duration>,
545 ) -> Result<Sample> {
546 let id = SampleId::new(row_index, column_index, block_height).map_err(P2pError::Cid)?;
547 let cid = convert_cid(&id.into())?;
548
549 let data = self.get_shwap_cid(cid, timeout).await?;
550 let sample = Sample::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?;
551 Ok(sample)
552 }
553
554 pub async fn get_row_namespace_data(
556 &self,
557 namespace: Namespace,
558 row_index: u16,
559 block_height: u64,
560 timeout: Option<Duration>,
561 ) -> Result<RowNamespaceData> {
562 let id =
563 RowNamespaceDataId::new(namespace, row_index, block_height).map_err(P2pError::Cid)?;
564 let cid = convert_cid(&id.into())?;
565
566 let data = self.get_shwap_cid(cid, timeout).await?;
567 let row_namespace_data =
568 RowNamespaceData::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?;
569 Ok(row_namespace_data)
570 }
571
572 pub async fn get_all_blobs(
575 &self,
576 header: &ExtendedHeader,
577 namespace: Namespace,
578 timeout: Option<Duration>,
579 ) -> Result<Vec<Blob>> {
580 let height = header.height().value();
581 let app_version = header.app_version()?;
582 let rows_to_fetch: Vec<_> = header
583 .dah
584 .row_roots()
585 .iter()
586 .enumerate()
587 .filter(|(_, row)| row.contains::<NamespacedSha2Hasher>(*namespace))
588 .map(|(n, _)| n as u16)
589 .collect();
590
591 let futs = rows_to_fetch
592 .into_iter()
593 .map(|row_idx| self.get_row_namespace_data(namespace, row_idx, height, timeout))
594 .collect::<FuturesOrdered<_>>();
595 let rows: Vec<_> = futs.try_collect().await?;
596 let shares = rows.iter().flat_map(|row| row.shares.iter());
597
598 Ok(Blob::reconstruct_all(shares, app_version)?)
599 }
600
601 pub async fn listeners(&self) -> Result<Vec<Multiaddr>> {
603 let (tx, rx) = oneshot::channel();
604
605 self.send_command(P2pCmd::Listeners { respond_to: tx })
606 .await?;
607
608 Ok(rx.await?)
609 }
610
611 pub async fn connected_peers(&self) -> Result<Vec<PeerId>> {
613 let (tx, rx) = oneshot::channel();
614
615 self.send_command(P2pCmd::ConnectedPeers { respond_to: tx })
616 .await?;
617
618 Ok(rx.await?)
619 }
620
621 pub async fn set_peer_trust(&self, peer_id: PeerId, is_trusted: bool) -> Result<()> {
623 self.send_command(P2pCmd::SetPeerTrust {
624 peer_id,
625 is_trusted,
626 })
627 .await
628 }
629
630 pub(crate) async fn get_network_compromised_token(&self) -> Result<Token> {
635 let (tx, rx) = oneshot::channel();
636
637 self.send_command(P2pCmd::GetNetworkCompromisedToken { respond_to: tx })
638 .await?;
639
640 Ok(rx.await?)
641 }
642
643 pub async fn get_network_head(&self) -> Result<Option<ExtendedHeader>> {
645 let (tx, rx) = oneshot::channel();
646
647 self.send_command(P2pCmd::GetNetworkHead { respond_to: tx })
648 .await?;
649
650 Ok(rx.await?)
651 }
652}
653
654impl Drop for P2p {
655 fn drop(&mut self) {
656 self.stop();
657 }
658}
659
660#[derive(NetworkBehaviour)]
662struct Behaviour<B, S>
663where
664 B: Blockstore + 'static,
665 S: Store + 'static,
666{
667 connection_control: connection_control::Behaviour,
668 autonat: autonat::Behaviour,
669 bitswap: beetswap::Behaviour<MAX_MH_SIZE, B>,
670 ping: ping::Behaviour,
671 identify: identify::Behaviour,
672 header_ex: HeaderExBehaviour<S>,
673 gossipsub: gossipsub::Behaviour,
674 kademlia: kad::Behaviour<kad::store::MemoryStore>,
675}
676
677struct Worker<B, S>
678where
679 B: Blockstore + 'static,
680 S: Store + 'static,
681{
682 cancellation_token: CancellationToken,
683 swarm: Swarm<Behaviour<B, S>>,
684 listeners: SmallVec<[ListenerId; 1]>,
685 header_sub_topic_hash: TopicHash,
686 bad_encoding_fraud_sub_topic: TopicHash,
687 cmd_rx: mpsc::Receiver<P2pCmd>,
688 peer_tracker: Arc<PeerTracker>,
689 header_sub_state: Option<HeaderSubState>,
690 bitswap_queries: HashMap<beetswap::QueryId, OneshotResultSender<Vec<u8>, P2pError>>,
691 network_compromised_token: Token,
692 store: Arc<S>,
693 event_pub: EventPublisher,
694 bootnodes: HashMap<PeerId, Vec<Multiaddr>>,
695}
696
697struct HeaderSubState {
698 known_head: ExtendedHeader,
699 channel: mpsc::Sender<ExtendedHeader>,
700}
701
702impl<B, S> Worker<B, S>
703where
704 B: Blockstore,
705 S: Store,
706{
707 async fn new(
708 args: P2pArgs<B, S>,
709 cancellation_token: CancellationToken,
710 cmd_rx: mpsc::Receiver<P2pCmd>,
711 peer_tracker: Arc<PeerTracker>,
712 ) -> Result<Self, P2pError> {
713 let local_peer_id = PeerId::from(args.local_keypair.public());
714
715 let connection_control = connection_control::Behaviour::new();
716 let autonat = autonat::Behaviour::new(local_peer_id, autonat::Config::default());
717 let ping = ping::Behaviour::new(ping::Config::default());
718
719 let agent_version = format!("lumina/{}/{}", args.network_id, env!("CARGO_PKG_VERSION"));
720 let identify = identify::Behaviour::new(
721 identify::Config::new(String::new(), args.local_keypair.public())
722 .with_agent_version(agent_version),
723 );
724
725 let header_sub_topic = gossipsub_ident_topic(&args.network_id, "/header-sub/v0.0.1");
726 let bad_encoding_fraud_sub_topic =
727 fraudsub_ident_topic(BadEncodingFraudProof::TYPE, &args.network_id);
728 let gossipsub = init_gossipsub(&args, [&header_sub_topic, &bad_encoding_fraud_sub_topic])?;
729
730 let kademlia = init_kademlia(&args)?;
731 let bitswap = init_bitswap(
732 args.blockstore.clone(),
733 args.store.clone(),
734 &args.network_id,
735 )?;
736
737 let header_ex = HeaderExBehaviour::new(HeaderExConfig {
738 network_id: &args.network_id,
739 peer_tracker: peer_tracker.clone(),
740 header_store: args.store.clone(),
741 });
742
743 let behaviour = Behaviour {
744 connection_control,
745 autonat,
746 bitswap,
747 ping,
748 identify,
749 gossipsub,
750 header_ex,
751 kademlia,
752 };
753
754 let mut swarm = new_swarm(args.local_keypair, behaviour).await?;
755 let mut listeners = SmallVec::new();
756
757 for addr in args.listen_on {
758 match swarm.listen_on(addr.clone()) {
759 Ok(id) => listeners.push(id),
760 Err(e) => error!("Failed to listen on {addr}: {e}"),
761 }
762 }
763
764 let mut bootnodes = HashMap::<_, Vec<_>>::new();
765
766 for addr in args.bootnodes {
767 let peer_id = addr.peer_id().expect("multiaddr already validated");
768 bootnodes.entry(peer_id).or_default().push(addr);
769 }
770
771 for (peer_id, addrs) in bootnodes.iter_mut() {
772 addrs.sort();
773 addrs.dedup();
774 addrs.shrink_to_fit();
775
776 peer_tracker.set_trusted(*peer_id, true);
778 }
779
780 Ok(Worker {
781 cancellation_token,
782 cmd_rx,
783 swarm,
784 listeners,
785 bad_encoding_fraud_sub_topic: bad_encoding_fraud_sub_topic.hash(),
786 header_sub_topic_hash: header_sub_topic.hash(),
787 peer_tracker,
788 header_sub_state: None,
789 bitswap_queries: HashMap::new(),
790 network_compromised_token: Token::new(),
791 store: args.store,
792 event_pub: args.event_pub,
793 bootnodes,
794 })
795 }
796
797 async fn run(&mut self) {
798 let mut report_interval = Interval::new(Duration::from_secs(60)).await;
799 let mut kademlia_interval = Interval::new(Duration::from_secs(30)).await;
800 let mut peer_tracker_info_watcher = self.peer_tracker.info_watcher();
801
802 self.bootstrap();
804
805 loop {
806 select! {
807 _ = self.cancellation_token.cancelled() => break,
808 _ = peer_tracker_info_watcher.changed() => {
809 if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
810 warn!("All peers disconnected");
811 self.bootstrap();
812 }
813 }
814 _ = report_interval.tick() => {
815 self.report();
816 }
817 _ = kademlia_interval.tick() => {
818 if self.peer_tracker.info().num_connected_peers < MIN_CONNECTED_PEERS
819 {
820 self.bootstrap();
821 }
822 }
823 _ = poll_closed(&mut self.bitswap_queries) => {
824 self.prune_canceled_bitswap_queries();
825 }
826 ev = self.swarm.select_next_some() => {
827 if let Err(e) = self.on_swarm_event(ev).await {
828 warn!("Failure while handling swarm event: {e}");
829 }
830 },
831 Some(cmd) = self.cmd_rx.recv() => {
832 if let Err(e) = self.on_cmd(cmd).await {
833 warn!("Failure while handling command. (error: {e})");
834 }
835 }
836 }
837 }
838
839 self.on_stop().await;
840 }
841
842 fn bootstrap(&mut self) {
843 self.event_pub.send(NodeEvent::ConnectingToBootnodes);
844
845 for (peer_id, addrs) in &self.bootnodes {
846 let dial_opts = DialOpts::peer_id(*peer_id)
847 .addresses(addrs.clone())
848 .condition(PeerCondition::DisconnectedAndNotDialing)
851 .build();
852
853 if let Err(e) = self.swarm.dial(dial_opts) {
854 if !matches!(e, DialError::DialPeerConditionFalse(_)) {
855 warn!("Failed to dial on {addrs:?}: {e}");
856 }
857 }
858 }
859
860 if self.swarm.behaviour_mut().kademlia.bootstrap().is_err() {
862 warn!("Can't run kademlia bootstrap, no known peers");
863 }
864 }
865
866 fn prune_canceled_bitswap_queries(&mut self) {
867 let mut cancelled = SmallVec::<[_; 16]>::new();
868
869 for (query_id, chan) in &self.bitswap_queries {
870 if chan.is_closed() {
871 cancelled.push(*query_id);
872 }
873 }
874
875 for query_id in cancelled {
876 self.bitswap_queries.remove(&query_id);
877 self.swarm.behaviour_mut().bitswap.cancel(query_id);
878 }
879 }
880
881 async fn on_stop(&mut self) {
882 self.swarm
883 .behaviour_mut()
884 .connection_control
885 .set_stopping(true);
886 self.swarm.behaviour_mut().header_ex.stop();
887
888 for listener in self.listeners.drain(..) {
889 self.swarm.remove_listener(listener);
890 }
891
892 for (_, ids) in self.peer_tracker.connections() {
893 for id in ids {
894 self.swarm.close_connection(id);
895 }
896 }
897
898 while self
900 .swarm
901 .network_info()
902 .connection_counters()
903 .num_established()
904 > 0
905 {
906 match self.swarm.select_next_some().await {
907 SwarmEvent::ConnectionEstablished { connection_id, .. } => {
909 self.swarm.close_connection(connection_id);
911 }
912 SwarmEvent::ConnectionClosed {
913 peer_id,
914 connection_id,
915 ..
916 } => {
917 self.on_peer_disconnected(peer_id, connection_id);
919 }
920 _ => {}
921 }
922 }
923 }
924
925 async fn on_swarm_event(&mut self, ev: SwarmEvent<BehaviourEvent<B, S>>) -> Result<()> {
926 match ev {
927 SwarmEvent::Behaviour(ev) => match ev {
928 BehaviourEvent::Identify(ev) => self.on_identify_event(ev).await?,
929 BehaviourEvent::Gossipsub(ev) => self.on_gossip_sub_event(ev).await,
930 BehaviourEvent::Kademlia(ev) => self.on_kademlia_event(ev).await?,
931 BehaviourEvent::Bitswap(ev) => self.on_bitswap_event(ev).await,
932 BehaviourEvent::Ping(ev) => self.on_ping_event(ev).await,
933 BehaviourEvent::Autonat(_)
934 | BehaviourEvent::ConnectionControl(_)
935 | BehaviourEvent::HeaderEx(_) => {}
936 },
937 SwarmEvent::ConnectionEstablished {
938 peer_id,
939 connection_id,
940 endpoint,
941 ..
942 } => {
943 self.on_peer_connected(peer_id, connection_id, endpoint);
944 }
945 SwarmEvent::ConnectionClosed {
946 peer_id,
947 connection_id,
948 ..
949 } => {
950 self.on_peer_disconnected(peer_id, connection_id);
951 }
952 _ => {}
953 }
954
955 Ok(())
956 }
957
958 async fn on_cmd(&mut self, cmd: P2pCmd) -> Result<()> {
959 match cmd {
960 P2pCmd::NetworkInfo { respond_to } => {
961 respond_to.maybe_send(self.swarm.network_info());
962 }
963 P2pCmd::HeaderExRequest {
964 request,
965 respond_to,
966 } => {
967 self.swarm
968 .behaviour_mut()
969 .header_ex
970 .send_request(request, respond_to);
971 }
972 P2pCmd::Listeners { respond_to } => {
973 let local_peer_id = self.swarm.local_peer_id().to_owned();
974 let listeners = self
975 .swarm
976 .listeners()
977 .cloned()
978 .map(|mut ma| {
979 if !ma.protocol_stack().any(|protocol| protocol == "p2p") {
980 ma.push(Protocol::P2p(local_peer_id))
981 }
982 ma
983 })
984 .collect();
985
986 respond_to.maybe_send(listeners);
987 }
988 P2pCmd::ConnectedPeers { respond_to } => {
989 respond_to.maybe_send(self.peer_tracker.connected_peers());
990 }
991 P2pCmd::InitHeaderSub { head, channel } => {
992 self.on_init_header_sub(*head, channel);
993 }
994 P2pCmd::SetPeerTrust {
995 peer_id,
996 is_trusted,
997 } => {
998 if *self.swarm.local_peer_id() != peer_id {
999 self.peer_tracker.set_trusted(peer_id, is_trusted);
1000 }
1001 }
1002 P2pCmd::GetShwapCid { cid, respond_to } => {
1003 self.on_get_shwap_cid(cid, respond_to);
1004 }
1005 P2pCmd::GetNetworkCompromisedToken { respond_to } => {
1006 respond_to.maybe_send(self.network_compromised_token.clone())
1007 }
1008 P2pCmd::GetNetworkHead { respond_to } => {
1009 let head = self
1010 .header_sub_state
1011 .as_ref()
1012 .map(|state| state.known_head.clone());
1013 respond_to.maybe_send(head);
1014 }
1015 }
1016
1017 Ok(())
1018 }
1019
1020 #[instrument(skip_all)]
1021 fn report(&mut self) {
1022 let tracker_info = self.peer_tracker.info();
1023
1024 info!(
1025 "peers: {}, trusted peers: {}",
1026 tracker_info.num_connected_peers, tracker_info.num_connected_trusted_peers,
1027 );
1028 }
1029
1030 #[instrument(level = "trace", skip(self))]
1031 async fn on_identify_event(&mut self, ev: identify::Event) -> Result<()> {
1032 match ev {
1033 identify::Event::Received { peer_id, info, .. } => {
1034 for addr in info.listen_addrs {
1037 self.swarm
1038 .behaviour_mut()
1039 .kademlia
1040 .add_address(&peer_id, addr);
1041 }
1042 }
1043 _ => trace!("Unhandled identify event"),
1044 }
1045
1046 Ok(())
1047 }
1048
1049 #[instrument(level = "trace", skip(self))]
1050 async fn on_gossip_sub_event(&mut self, ev: gossipsub::Event) {
1051 match ev {
1052 gossipsub::Event::Message {
1053 message,
1054 message_id,
1055 ..
1056 } => {
1057 let Some(peer) = message.source else {
1058 return;
1060 };
1061
1062 let acceptance = if message.topic == self.header_sub_topic_hash {
1063 self.on_header_sub_message(&message.data[..])
1064 } else if message.topic == self.bad_encoding_fraud_sub_topic {
1065 self.on_bad_encoding_fraud_sub_message(&message.data[..], &peer)
1066 .await
1067 } else {
1068 trace!("Unhandled gossipsub message");
1069 gossipsub::MessageAcceptance::Ignore
1070 };
1071
1072 if !matches!(acceptance, gossipsub::MessageAcceptance::Reject) {
1073 self.peer_maybe_discovered(peer);
1075 }
1076
1077 let _ = self
1078 .swarm
1079 .behaviour_mut()
1080 .gossipsub
1081 .report_message_validation_result(&message_id, &peer, acceptance);
1082 }
1083 _ => trace!("Unhandled gossipsub event"),
1084 }
1085 }
1086
1087 #[instrument(level = "trace", skip(self))]
1088 async fn on_kademlia_event(&mut self, ev: kad::Event) -> Result<()> {
1089 match ev {
1090 kad::Event::RoutingUpdated {
1091 peer, addresses, ..
1092 } => {
1093 self.peer_tracker.add_addresses(peer, addresses.iter());
1094 }
1095 _ => trace!("Unhandled Kademlia event"),
1096 }
1097
1098 Ok(())
1099 }
1100
1101 #[instrument(level = "trace", skip_all)]
1102 fn on_get_shwap_cid(&mut self, cid: Cid, respond_to: OneshotResultSender<Vec<u8>, P2pError>) {
1103 trace!("Requesting CID {cid} from bitswap");
1104 let query_id = self.swarm.behaviour_mut().bitswap.get(&cid);
1105 self.bitswap_queries.insert(query_id, respond_to);
1106 }
1107
1108 #[instrument(level = "trace", skip(self))]
1109 async fn on_bitswap_event(&mut self, ev: beetswap::Event) {
1110 match ev {
1111 beetswap::Event::GetQueryResponse { query_id, data } => {
1112 if let Some(respond_to) = self.bitswap_queries.remove(&query_id) {
1113 respond_to.maybe_send_ok(data);
1114 }
1115 }
1116 beetswap::Event::GetQueryError { query_id, error } => {
1117 if let Some(respond_to) = self.bitswap_queries.remove(&query_id) {
1118 let error: P2pError = error.into();
1119 respond_to.maybe_send_err(error);
1120 }
1121 }
1122 }
1123 }
1124
1125 #[instrument(level = "debug", skip_all)]
1126 async fn on_ping_event(&mut self, ev: ping::Event) {
1127 match ev.result {
1128 Ok(dur) => debug!(
1129 "Ping success: peer: {}, connection_id: {}, time: {:?}",
1130 ev.peer, ev.connection, dur
1131 ),
1132 Err(e) => {
1133 debug!(
1134 "Ping failure: peer: {}, connection_id: {}, error: {}",
1135 &ev.peer, &ev.connection, e
1136 );
1137 self.swarm.close_connection(ev.connection);
1138 }
1139 }
1140 }
1141
1142 #[instrument(skip_all, fields(peer_id = %peer_id))]
1143 fn peer_maybe_discovered(&mut self, peer_id: PeerId) {
1144 if !self.peer_tracker.set_maybe_discovered(peer_id) {
1145 return;
1146 }
1147
1148 debug!("Peer discovered");
1149 }
1150
1151 #[instrument(skip_all, fields(peer_id = %peer_id))]
1152 fn on_peer_connected(
1153 &mut self,
1154 peer_id: PeerId,
1155 connection_id: ConnectionId,
1156 endpoint: ConnectedPoint,
1157 ) {
1158 debug!("Peer connected");
1159
1160 let dialed_addr = match endpoint {
1166 ConnectedPoint::Dialer {
1167 address,
1168 role_override: Endpoint::Dialer,
1169 ..
1170 } => Some(address),
1171 _ => None,
1172 };
1173
1174 self.peer_tracker
1175 .set_connected(peer_id, connection_id, dialed_addr);
1176 }
1177
1178 #[instrument(skip_all, fields(peer_id = %peer_id))]
1179 fn on_peer_disconnected(&mut self, peer_id: PeerId, connection_id: ConnectionId) {
1180 if self
1181 .peer_tracker
1182 .set_maybe_disconnected(peer_id, connection_id)
1183 {
1184 debug!("Peer disconnected");
1185 }
1186 }
1187
1188 #[instrument(skip_all, fields(header = %head))]
1189 fn on_init_header_sub(&mut self, head: ExtendedHeader, channel: mpsc::Sender<ExtendedHeader>) {
1190 self.header_sub_state = Some(HeaderSubState {
1191 known_head: head,
1192 channel,
1193 });
1194 trace!("HeaderSub initialized");
1195 }
1196
1197 #[instrument(skip_all)]
1198 fn on_header_sub_message(&mut self, data: &[u8]) -> gossipsub::MessageAcceptance {
1199 let Ok(header) = ExtendedHeader::decode_and_validate(data) else {
1200 trace!("Malformed or invalid header from header-sub");
1201 return gossipsub::MessageAcceptance::Reject;
1202 };
1203
1204 trace!("Received header from header-sub ({header})");
1205
1206 let Some(ref mut state) = self.header_sub_state else {
1207 debug!("header-sub not initialized yet");
1208 return gossipsub::MessageAcceptance::Ignore;
1209 };
1210
1211 if state.known_head.verify(&header).is_err() {
1212 trace!("Failed to verify HeaderSub header. Ignoring {header}");
1213 return gossipsub::MessageAcceptance::Ignore;
1214 }
1215
1216 trace!("New header from header-sub ({header})");
1217
1218 state.known_head = header.clone();
1219 let _ = state.channel.try_send(header);
1222
1223 gossipsub::MessageAcceptance::Accept
1224 }
1225
1226 #[instrument(skip_all)]
1227 async fn on_bad_encoding_fraud_sub_message(
1228 &mut self,
1229 data: &[u8],
1230 peer: &PeerId,
1231 ) -> gossipsub::MessageAcceptance {
1232 let Ok(befp) = BadEncodingFraudProof::decode(data) else {
1233 trace!("Malformed bad encoding fraud proof from {peer}");
1234 self.swarm.behaviour_mut().gossipsub.blacklist_peer(peer);
1235 return gossipsub::MessageAcceptance::Reject;
1236 };
1237
1238 let height = befp.height().value();
1239
1240 let current_height = if let Some(ref header_sub_state) = self.header_sub_state {
1241 header_sub_state.known_head.height().value()
1242 } else if let Ok(local_head) = self.store.get_head().await {
1243 local_head.height().value()
1244 } else {
1245 return gossipsub::MessageAcceptance::Ignore;
1247 };
1248
1249 if height > current_height + FRAUD_PROOF_HEAD_HEIGHT_THRESHOLD {
1250 return gossipsub::MessageAcceptance::Ignore;
1253 }
1254
1255 let hash = befp.header_hash();
1256 let Ok(header) = self.store.get_by_hash(&hash).await else {
1257 return gossipsub::MessageAcceptance::Ignore;
1260 };
1261
1262 if let Err(e) = befp.validate(&header) {
1263 trace!("Received invalid bad encoding fraud proof from {peer}: {e}");
1264 self.swarm.behaviour_mut().gossipsub.blacklist_peer(peer);
1265 return gossipsub::MessageAcceptance::Reject;
1266 }
1267
1268 warn!("Received a valid bad encoding fraud proof");
1269 self.network_compromised_token.trigger();
1271
1272 gossipsub::MessageAcceptance::Accept
1273 }
1274}
1275
1276async fn poll_closed(
1278 bitswap_queries: &mut HashMap<beetswap::QueryId, OneshotResultSender<Vec<u8>, P2pError>>,
1279) {
1280 poll_fn(|cx| {
1281 for chan in bitswap_queries.values_mut() {
1282 match chan.poll_closed(cx) {
1283 Poll::Pending => continue,
1284 Poll::Ready(_) => return Poll::Ready(()),
1285 }
1286 }
1287
1288 Poll::Pending
1289 })
1290 .await
1291}
1292
1293fn validate_bootnode_addrs(addrs: &[Multiaddr]) -> Result<(), P2pError> {
1294 let mut invalid_addrs = Vec::new();
1295
1296 for addr in addrs {
1297 if addr.peer_id().is_none() {
1298 invalid_addrs.push(addr.to_owned());
1299 }
1300 }
1301
1302 if invalid_addrs.is_empty() {
1303 Ok(())
1304 } else {
1305 Err(P2pError::BootnodeAddrsWithoutPeerId(invalid_addrs))
1306 }
1307}
1308
1309fn init_gossipsub<'a, B, S>(
1310 args: &'a P2pArgs<B, S>,
1311 topics: impl IntoIterator<Item = &'a gossipsub::IdentTopic>,
1312) -> Result<gossipsub::Behaviour>
1313where
1314 B: Blockstore,
1315 S: Store,
1316{
1317 let message_authenticity = gossipsub::MessageAuthenticity::Signed(args.local_keypair.clone());
1320
1321 let config = gossipsub::ConfigBuilder::default()
1322 .validation_mode(gossipsub::ValidationMode::Strict)
1323 .validate_messages()
1324 .build()
1325 .map_err(|e| P2pError::GossipsubInit(e.to_string()))?;
1326
1327 let mut gossipsub: gossipsub::Behaviour =
1329 gossipsub::Behaviour::new(message_authenticity, config)
1330 .map_err(|e| P2pError::GossipsubInit(e.to_string()))?;
1331
1332 for topic in topics {
1333 gossipsub
1334 .subscribe(topic)
1335 .map_err(|e| P2pError::GossipsubInit(e.to_string()))?;
1336 }
1337
1338 Ok(gossipsub)
1339}
1340
1341fn init_kademlia<B, S>(args: &P2pArgs<B, S>) -> Result<kad::Behaviour<kad::store::MemoryStore>>
1342where
1343 B: Blockstore,
1344 S: Store,
1345{
1346 let local_peer_id = PeerId::from(args.local_keypair.public());
1347 let store = kad::store::MemoryStore::new(local_peer_id);
1348
1349 let protocol_id = celestia_protocol_id(&args.network_id, "/kad/1.0.0");
1350 let config = kad::Config::new(protocol_id);
1351
1352 let mut kademlia = kad::Behaviour::with_config(local_peer_id, store, config);
1353
1354 for addr in &args.bootnodes {
1355 if let Some(peer_id) = addr.peer_id() {
1356 kademlia.add_address(&peer_id, addr.to_owned());
1357 }
1358 }
1359
1360 if !args.listen_on.is_empty() {
1361 kademlia.set_mode(Some(kad::Mode::Server));
1362 }
1363
1364 Ok(kademlia)
1365}
1366
1367fn init_bitswap<B, S>(
1368 blockstore: Arc<B>,
1369 store: Arc<S>,
1370 network_id: &str,
1371) -> Result<beetswap::Behaviour<MAX_MH_SIZE, B>>
1372where
1373 B: Blockstore + 'static,
1374 S: Store + 'static,
1375{
1376 let protocol_prefix = celestia_protocol_id(network_id, "shwap");
1377
1378 Ok(beetswap::Behaviour::builder(blockstore)
1379 .protocol_prefix(protocol_prefix.as_ref())?
1380 .register_multihasher(ShwapMultihasher::new(store))
1381 .client_set_send_dont_have(false)
1382 .build())
1383}