1use std::collections::HashMap;
17use std::future::poll_fn;
18use std::sync::Arc;
19use std::task::Poll;
20use std::time::Duration;
21
22use blockstore::Blockstore;
23use celestia_proto::p2p::pb::{header_request, HeaderRequest};
24use celestia_types::fraud_proof::BadEncodingFraudProof;
25use celestia_types::hash::Hash;
26use celestia_types::nmt::{Namespace, NamespacedSha2Hasher};
27use celestia_types::row::{Row, RowId};
28use celestia_types::row_namespace_data::{RowNamespaceData, RowNamespaceDataId};
29use celestia_types::sample::{Sample, SampleId};
30use celestia_types::{Blob, ExtendedHeader, FraudProof};
31use cid::Cid;
32use futures::stream::FuturesOrdered;
33use futures::{StreamExt, TryStreamExt};
34use libp2p::core::transport::ListenerId;
35use libp2p::{
36 autonat,
37 core::{ConnectedPoint, Endpoint},
38 gossipsub::{self, TopicHash},
39 identify,
40 identity::Keypair,
41 kad,
42 multiaddr::Protocol,
43 ping,
44 swarm::{
45 dial_opts::{DialOpts, PeerCondition},
46 ConnectionId, DialError, NetworkBehaviour, NetworkInfo, Swarm, SwarmEvent,
47 },
48 Multiaddr, PeerId,
49};
50use lumina_utils::executor::{spawn, JoinHandle};
51use lumina_utils::time::{self, Interval};
52use lumina_utils::token::Token;
53use smallvec::SmallVec;
54use tendermint_proto::Protobuf;
55use tokio::select;
56use tokio::sync::{mpsc, oneshot, watch};
57use tokio_util::sync::CancellationToken;
58use tracing::{debug, error, info, instrument, trace, warn};
59
60mod connection_control;
61mod header_ex;
62pub(crate) mod header_session;
63pub(crate) mod shwap;
64mod swarm;
65
66use crate::block_ranges::BlockRange;
67use crate::events::{EventPublisher, NodeEvent};
68use crate::p2p::header_ex::{HeaderExBehaviour, HeaderExConfig};
69use crate::p2p::header_session::HeaderSession;
70use crate::p2p::shwap::{convert_cid, get_block_container, ShwapMultihasher};
71use crate::p2p::swarm::new_swarm;
72use crate::peer_tracker::PeerTracker;
73use crate::peer_tracker::PeerTrackerInfo;
74use crate::store::{Store, StoreError};
75use crate::utils::{
76 celestia_protocol_id, fraudsub_ident_topic, gossipsub_ident_topic, MultiaddrExt,
77 OneshotResultSender, OneshotResultSenderExt, OneshotSenderExt,
78};
79
80pub use crate::p2p::header_ex::HeaderExError;
81
82const MIN_CONNECTED_PEERS: u64 = 4;
86
87pub(crate) const MAX_MH_SIZE: usize = 64;
89
90const FRAUD_PROOF_HEAD_HEIGHT_THRESHOLD: u64 = 20;
93
94pub(crate) type Result<T, E = P2pError> = std::result::Result<T, E>;
95
96#[derive(Debug, thiserror::Error)]
98pub enum P2pError {
99 #[error("Failed to initialize gossipsub behaviour: {0}")]
101 GossipsubInit(String),
102
103 #[error("Failed to initialize TLS: {0}")]
105 TlsInit(String),
106
107 #[error("Failed to initialize noise: {0}")]
109 NoiseInit(String),
110
111 #[error("Worker died")]
113 WorkerDied,
114
115 #[error("Channel closed unexpectedly")]
117 ChannelClosedUnexpectedly,
118
119 #[error("Not connected to any peers")]
121 NoConnectedPeers,
122
123 #[error("HeaderEx: {0}")]
125 HeaderEx(#[from] HeaderExError),
126
127 #[error("Bootnode multiaddrs without peer ID: {0:?}")]
129 BootnodeAddrsWithoutPeerId(Vec<Multiaddr>),
130
131 #[error("Bitswap: {0}")]
133 Bitswap(#[from] beetswap::Error),
134
135 #[error("ProtoBuf decoding error: {0}")]
137 ProtoDecodeFailed(#[from] tendermint_proto::Error),
138
139 #[error("CID error: {0}")]
141 Cid(celestia_types::Error),
142
143 #[error("Bitswap query timed out")]
145 BitswapQueryTimeout,
146
147 #[error("Shwap: {0}")]
149 Shwap(String),
150
151 #[error(transparent)]
153 CelestiaTypes(#[from] celestia_types::Error),
154
155 #[error("Store error: {0}")]
157 Store(#[from] StoreError),
158
159 #[error("Header of {0} block was pruned because it is outside of retention period")]
161 HeaderPruned(u64),
162
163 #[error("Header of {0} block is not synced yet")]
165 HeaderNotSynced(u64),
166}
167
168impl P2pError {
169 pub(crate) fn is_fatal(&self) -> bool {
173 match self {
174 P2pError::GossipsubInit(_)
175 | P2pError::NoiseInit(_)
176 | P2pError::TlsInit(_)
177 | P2pError::WorkerDied
178 | P2pError::ChannelClosedUnexpectedly
179 | P2pError::BootnodeAddrsWithoutPeerId(_) => true,
180 P2pError::NoConnectedPeers
181 | P2pError::HeaderEx(_)
182 | P2pError::Bitswap(_)
183 | P2pError::ProtoDecodeFailed(_)
184 | P2pError::Cid(_)
185 | P2pError::BitswapQueryTimeout
186 | P2pError::Shwap(_)
187 | P2pError::CelestiaTypes(_)
188 | P2pError::HeaderPruned(_)
189 | P2pError::HeaderNotSynced(_) => false,
190 P2pError::Store(e) => e.is_fatal(),
191 }
192 }
193}
194
195impl From<oneshot::error::RecvError> for P2pError {
196 fn from(_value: oneshot::error::RecvError) -> Self {
197 P2pError::ChannelClosedUnexpectedly
198 }
199}
200
201impl From<prost::DecodeError> for P2pError {
202 fn from(value: prost::DecodeError) -> Self {
203 P2pError::ProtoDecodeFailed(tendermint_proto::Error::decode_message(value))
204 }
205}
206
207impl From<cid::Error> for P2pError {
208 fn from(value: cid::Error) -> Self {
209 P2pError::Cid(celestia_types::Error::CidError(
210 blockstore::block::CidError::InvalidCid(value.to_string()),
211 ))
212 }
213}
214
215#[derive(Debug)]
217pub(crate) struct P2p {
218 cancellation_token: CancellationToken,
219 cmd_tx: mpsc::Sender<P2pCmd>,
220 join_handle: JoinHandle,
221 peer_tracker_info_watcher: watch::Receiver<PeerTrackerInfo>,
222 local_peer_id: PeerId,
223}
224
225pub struct P2pArgs<B, S>
227where
228 B: Blockstore,
229 S: Store,
230{
231 pub network_id: String,
233 pub local_keypair: Keypair,
235 pub bootnodes: Vec<Multiaddr>,
237 pub listen_on: Vec<Multiaddr>,
239 pub blockstore: Arc<B>,
241 pub store: Arc<S>,
243 pub event_pub: EventPublisher,
245}
246
247#[derive(Debug)]
248pub(crate) enum P2pCmd {
249 NetworkInfo {
250 respond_to: oneshot::Sender<NetworkInfo>,
251 },
252 HeaderExRequest {
253 request: HeaderRequest,
254 respond_to: OneshotResultSender<Vec<ExtendedHeader>, P2pError>,
255 },
256 Listeners {
257 respond_to: oneshot::Sender<Vec<Multiaddr>>,
258 },
259 ConnectedPeers {
260 respond_to: oneshot::Sender<Vec<PeerId>>,
261 },
262 InitHeaderSub {
263 head: Box<ExtendedHeader>,
264 channel: mpsc::Sender<ExtendedHeader>,
266 },
267 SetPeerTrust {
268 peer_id: PeerId,
269 is_trusted: bool,
270 },
271 GetShwapCid {
272 cid: Cid,
273 respond_to: OneshotResultSender<Vec<u8>, P2pError>,
274 },
275 GetNetworkCompromisedToken {
276 respond_to: oneshot::Sender<Token>,
277 },
278 GetNetworkHead {
279 respond_to: oneshot::Sender<Option<ExtendedHeader>>,
280 },
281}
282
283impl P2p {
284 pub async fn start<B, S>(args: P2pArgs<B, S>) -> Result<Self>
286 where
287 B: Blockstore + 'static,
288 S: Store + 'static,
289 {
290 validate_bootnode_addrs(&args.bootnodes)?;
291
292 let local_peer_id = PeerId::from(args.local_keypair.public());
293
294 let peer_tracker = Arc::new(PeerTracker::new(args.event_pub.clone()));
295 let peer_tracker_info_watcher = peer_tracker.info_watcher();
296
297 let cancellation_token = CancellationToken::new();
298 let (cmd_tx, cmd_rx) = mpsc::channel(16);
299
300 let mut worker =
301 Worker::new(args, cancellation_token.child_token(), cmd_rx, peer_tracker).await?;
302
303 let join_handle = spawn(async move {
304 worker.run().await;
305 });
306
307 Ok(P2p {
308 cancellation_token,
309 cmd_tx,
310 join_handle,
311 peer_tracker_info_watcher,
312 local_peer_id,
313 })
314 }
315
316 #[cfg(test)]
318 pub fn mocked() -> (Self, crate::test_utils::MockP2pHandle) {
319 let (cmd_tx, cmd_rx) = mpsc::channel(16);
320 let (peer_tracker_tx, peer_tracker_rx) = watch::channel(PeerTrackerInfo::default());
321 let cancellation_token = CancellationToken::new();
322
323 let join_handle = spawn(async {});
325
326 let p2p = P2p {
327 cmd_tx: cmd_tx.clone(),
328 cancellation_token,
329 join_handle,
330 peer_tracker_info_watcher: peer_tracker_rx,
331 local_peer_id: PeerId::random(),
332 };
333
334 let handle = crate::test_utils::MockP2pHandle {
335 cmd_tx,
336 cmd_rx,
337 header_sub_tx: None,
338 peer_tracker_tx,
339 };
340
341 (p2p, handle)
342 }
343
344 pub fn stop(&self) {
346 self.cancellation_token.cancel();
348 }
349
350 pub async fn join(&self) {
352 self.join_handle.join().await;
353 }
354
355 pub fn local_peer_id(&self) -> &PeerId {
357 &self.local_peer_id
358 }
359
360 async fn send_command(&self, cmd: P2pCmd) -> Result<()> {
361 self.cmd_tx
362 .send(cmd)
363 .await
364 .map_err(|_| P2pError::WorkerDied)
365 }
366
367 pub fn peer_tracker_info_watcher(&self) -> watch::Receiver<PeerTrackerInfo> {
369 self.peer_tracker_info_watcher.clone()
370 }
371
372 pub fn peer_tracker_info(&self) -> watch::Ref<'_, PeerTrackerInfo> {
374 self.peer_tracker_info_watcher.borrow()
375 }
376
377 pub async fn init_header_sub(
379 &self,
380 head: ExtendedHeader,
381 channel: mpsc::Sender<ExtendedHeader>,
382 ) -> Result<()> {
383 self.send_command(P2pCmd::InitHeaderSub {
384 head: Box::new(head),
385 channel,
386 })
387 .await
388 }
389
390 pub async fn wait_connected(&self) -> Result<()> {
392 self.peer_tracker_info_watcher()
393 .wait_for(|info| info.num_connected_peers > 0)
394 .await
395 .map(drop)
396 .map_err(|_| P2pError::WorkerDied)
397 }
398
399 pub async fn wait_connected_trusted(&self) -> Result<()> {
401 self.peer_tracker_info_watcher()
402 .wait_for(|info| info.num_connected_trusted_peers > 0)
403 .await
404 .map(drop)
405 .map_err(|_| P2pError::WorkerDied)
406 }
407
408 pub async fn network_info(&self) -> Result<NetworkInfo> {
410 let (tx, rx) = oneshot::channel();
411
412 self.send_command(P2pCmd::NetworkInfo { respond_to: tx })
413 .await?;
414
415 Ok(rx.await?)
416 }
417
418 pub async fn header_ex_request(&self, request: HeaderRequest) -> Result<Vec<ExtendedHeader>> {
420 let (tx, rx) = oneshot::channel();
421
422 self.send_command(P2pCmd::HeaderExRequest {
423 request,
424 respond_to: tx,
425 })
426 .await?;
427
428 rx.await?
429 }
430
431 pub async fn get_head_header(&self) -> Result<ExtendedHeader> {
433 self.get_header_by_height(0).await
434 }
435
436 pub async fn get_header(&self, hash: Hash) -> Result<ExtendedHeader> {
438 self.header_ex_request(HeaderRequest {
439 data: Some(header_request::Data::Hash(hash.as_bytes().to_vec())),
440 amount: 1,
441 })
442 .await?
443 .into_iter()
444 .next()
445 .ok_or(HeaderExError::HeaderNotFound.into())
446 }
447
448 pub async fn get_header_by_height(&self, height: u64) -> Result<ExtendedHeader> {
450 self.header_ex_request(HeaderRequest {
451 data: Some(header_request::Data::Origin(height)),
452 amount: 1,
453 })
454 .await?
455 .into_iter()
456 .next()
457 .ok_or(HeaderExError::HeaderNotFound.into())
458 }
459
460 pub async fn get_verified_headers_range(
465 &self,
466 from: &ExtendedHeader,
467 amount: u64,
468 ) -> Result<Vec<ExtendedHeader>> {
469 from.validate().map_err(|_| HeaderExError::InvalidRequest)?;
471
472 let height = from.height().value() + 1;
473
474 let range = height..=height + amount - 1;
475
476 let mut session = HeaderSession::new(range, self.cmd_tx.clone());
477 let headers = session.run().await?;
478
479 from.verify_adjacent_range(&headers)
484 .map_err(|_| HeaderExError::InvalidResponse)?;
485
486 Ok(headers)
487 }
488
489 pub(crate) async fn get_unverified_header_range(
494 &self,
495 range: BlockRange,
496 ) -> Result<Vec<ExtendedHeader>> {
497 if range.is_empty() {
498 return Err(HeaderExError::InvalidRequest.into());
499 }
500
501 let mut session = HeaderSession::new(range, self.cmd_tx.clone());
502 let headers = session.run().await?;
503
504 let Some(head) = headers.first() else {
505 return Err(HeaderExError::InvalidResponse.into());
506 };
507
508 head.verify_adjacent_range(&headers[1..])
509 .map_err(|_| HeaderExError::InvalidResponse)?;
510
511 Ok(headers)
512 }
513
514 pub(crate) async fn get_shwap_cid(
516 &self,
517 cid: Cid,
518 timeout: Option<Duration>,
519 ) -> Result<Vec<u8>> {
520 let (tx, rx) = oneshot::channel();
521
522 self.send_command(P2pCmd::GetShwapCid {
523 cid,
524 respond_to: tx,
525 })
526 .await?;
527
528 let data = match timeout {
529 Some(dur) => time::timeout(dur, rx)
530 .await
531 .map_err(|_| P2pError::BitswapQueryTimeout)???,
532 None => rx.await??,
533 };
534
535 get_block_container(&cid, &data)
536 }
537
538 pub async fn get_row(
540 &self,
541 row_index: u16,
542 block_height: u64,
543 timeout: Option<Duration>,
544 ) -> Result<Row> {
545 let id = RowId::new(row_index, block_height).map_err(P2pError::Cid)?;
546 let cid = convert_cid(&id.into())?;
547
548 let data = self.get_shwap_cid(cid, timeout).await?;
549 let row = Row::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?;
550 Ok(row)
551 }
552
553 pub async fn get_sample(
555 &self,
556 row_index: u16,
557 column_index: u16,
558 block_height: u64,
559 timeout: Option<Duration>,
560 ) -> Result<Sample> {
561 let id = SampleId::new(row_index, column_index, block_height).map_err(P2pError::Cid)?;
562 let cid = convert_cid(&id.into())?;
563
564 let data = self.get_shwap_cid(cid, timeout).await?;
565 let sample = Sample::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?;
566 Ok(sample)
567 }
568
569 pub async fn get_row_namespace_data(
571 &self,
572 namespace: Namespace,
573 row_index: u16,
574 block_height: u64,
575 timeout: Option<Duration>,
576 ) -> Result<RowNamespaceData> {
577 let id =
578 RowNamespaceDataId::new(namespace, row_index, block_height).map_err(P2pError::Cid)?;
579 let cid = convert_cid(&id.into())?;
580
581 let data = self.get_shwap_cid(cid, timeout).await?;
582 let row_namespace_data =
583 RowNamespaceData::decode(id, &data[..]).map_err(|e| P2pError::Shwap(e.to_string()))?;
584 Ok(row_namespace_data)
585 }
586
587 pub async fn get_all_blobs<S>(
590 &self,
591 namespace: Namespace,
592 block_height: u64,
593 timeout: Option<Duration>,
594 store: &S,
595 ) -> Result<Vec<Blob>>
596 where
597 S: Store,
598 {
599 let header = match store.get_by_height(block_height).await {
600 Ok(header) => header,
601 Err(StoreError::NotFound) => {
602 let pruned_ranges = store.get_pruned_ranges().await?;
603
604 if pruned_ranges.contains(block_height) {
605 return Err(P2pError::HeaderPruned(block_height));
606 } else {
607 return Err(P2pError::HeaderNotSynced(block_height));
608 }
609 }
610 Err(e) => return Err(e.into()),
611 };
612
613 let app_version = header.app_version()?;
614 let rows_to_fetch: Vec<_> = header
615 .dah
616 .row_roots()
617 .iter()
618 .enumerate()
619 .filter(|(_, row)| row.contains::<NamespacedSha2Hasher>(*namespace))
620 .map(|(n, _)| n as u16)
621 .collect();
622
623 let futs = rows_to_fetch
624 .into_iter()
625 .map(|row_idx| self.get_row_namespace_data(namespace, row_idx, block_height, timeout))
626 .collect::<FuturesOrdered<_>>();
627
628 let rows: Vec<_> = match futs.try_collect().await {
629 Ok(rows) => rows,
630 Err(P2pError::BitswapQueryTimeout) if !store.has_at(block_height).await => {
631 return Err(P2pError::HeaderPruned(block_height));
632 }
633 Err(e) => return Err(e),
634 };
635
636 let shares = rows.iter().flat_map(|row| row.shares.iter());
637
638 Ok(Blob::reconstruct_all(shares, app_version)?)
639 }
640
641 pub async fn listeners(&self) -> Result<Vec<Multiaddr>> {
643 let (tx, rx) = oneshot::channel();
644
645 self.send_command(P2pCmd::Listeners { respond_to: tx })
646 .await?;
647
648 Ok(rx.await?)
649 }
650
651 pub async fn connected_peers(&self) -> Result<Vec<PeerId>> {
653 let (tx, rx) = oneshot::channel();
654
655 self.send_command(P2pCmd::ConnectedPeers { respond_to: tx })
656 .await?;
657
658 Ok(rx.await?)
659 }
660
661 pub async fn set_peer_trust(&self, peer_id: PeerId, is_trusted: bool) -> Result<()> {
663 self.send_command(P2pCmd::SetPeerTrust {
664 peer_id,
665 is_trusted,
666 })
667 .await
668 }
669
670 pub(crate) async fn get_network_compromised_token(&self) -> Result<Token> {
675 let (tx, rx) = oneshot::channel();
676
677 self.send_command(P2pCmd::GetNetworkCompromisedToken { respond_to: tx })
678 .await?;
679
680 Ok(rx.await?)
681 }
682
683 pub async fn get_network_head(&self) -> Result<Option<ExtendedHeader>> {
685 let (tx, rx) = oneshot::channel();
686
687 self.send_command(P2pCmd::GetNetworkHead { respond_to: tx })
688 .await?;
689
690 Ok(rx.await?)
691 }
692}
693
694impl Drop for P2p {
695 fn drop(&mut self) {
696 self.stop();
697 }
698}
699
700#[derive(NetworkBehaviour)]
702struct Behaviour<B, S>
703where
704 B: Blockstore + 'static,
705 S: Store + 'static,
706{
707 connection_control: connection_control::Behaviour,
708 autonat: autonat::Behaviour,
709 bitswap: beetswap::Behaviour<MAX_MH_SIZE, B>,
710 ping: ping::Behaviour,
711 identify: identify::Behaviour,
712 header_ex: HeaderExBehaviour<S>,
713 gossipsub: gossipsub::Behaviour,
714 kademlia: kad::Behaviour<kad::store::MemoryStore>,
715}
716
717struct Worker<B, S>
718where
719 B: Blockstore + 'static,
720 S: Store + 'static,
721{
722 cancellation_token: CancellationToken,
723 swarm: Swarm<Behaviour<B, S>>,
724 listeners: SmallVec<[ListenerId; 1]>,
725 header_sub_topic_hash: TopicHash,
726 bad_encoding_fraud_sub_topic: TopicHash,
727 cmd_rx: mpsc::Receiver<P2pCmd>,
728 peer_tracker: Arc<PeerTracker>,
729 header_sub_state: Option<HeaderSubState>,
730 bitswap_queries: HashMap<beetswap::QueryId, OneshotResultSender<Vec<u8>, P2pError>>,
731 network_compromised_token: Token,
732 store: Arc<S>,
733 event_pub: EventPublisher,
734 bootnodes: HashMap<PeerId, Vec<Multiaddr>>,
735}
736
737struct HeaderSubState {
738 known_head: ExtendedHeader,
739 channel: mpsc::Sender<ExtendedHeader>,
740}
741
742impl<B, S> Worker<B, S>
743where
744 B: Blockstore,
745 S: Store,
746{
747 async fn new(
748 args: P2pArgs<B, S>,
749 cancellation_token: CancellationToken,
750 cmd_rx: mpsc::Receiver<P2pCmd>,
751 peer_tracker: Arc<PeerTracker>,
752 ) -> Result<Self, P2pError> {
753 let local_peer_id = PeerId::from(args.local_keypair.public());
754
755 let connection_control = connection_control::Behaviour::new();
756 let autonat = autonat::Behaviour::new(local_peer_id, autonat::Config::default());
757 let ping = ping::Behaviour::new(ping::Config::default());
758
759 let agent_version = format!("lumina/{}/{}", args.network_id, env!("CARGO_PKG_VERSION"));
760 let identify = identify::Behaviour::new(
761 identify::Config::new(String::new(), args.local_keypair.public())
762 .with_agent_version(agent_version),
763 );
764
765 let header_sub_topic = gossipsub_ident_topic(&args.network_id, "/header-sub/v0.0.1");
766 let bad_encoding_fraud_sub_topic =
767 fraudsub_ident_topic(BadEncodingFraudProof::TYPE, &args.network_id);
768 let gossipsub = init_gossipsub(&args, [&header_sub_topic, &bad_encoding_fraud_sub_topic])?;
769
770 let kademlia = init_kademlia(&args)?;
771 let bitswap = init_bitswap(
772 args.blockstore.clone(),
773 args.store.clone(),
774 &args.network_id,
775 )?;
776
777 let header_ex = HeaderExBehaviour::new(HeaderExConfig {
778 network_id: &args.network_id,
779 peer_tracker: peer_tracker.clone(),
780 header_store: args.store.clone(),
781 });
782
783 let behaviour = Behaviour {
784 connection_control,
785 autonat,
786 bitswap,
787 ping,
788 identify,
789 gossipsub,
790 header_ex,
791 kademlia,
792 };
793
794 let mut swarm = new_swarm(args.local_keypair, behaviour).await?;
795 let mut listeners = SmallVec::new();
796
797 for addr in args.listen_on {
798 match swarm.listen_on(addr.clone()) {
799 Ok(id) => listeners.push(id),
800 Err(e) => error!("Failed to listen on {addr}: {e}"),
801 }
802 }
803
804 let mut bootnodes = HashMap::<_, Vec<_>>::new();
805
806 for addr in args.bootnodes {
807 let peer_id = addr.peer_id().expect("multiaddr already validated");
808 bootnodes.entry(peer_id).or_default().push(addr);
809 }
810
811 for (peer_id, addrs) in bootnodes.iter_mut() {
812 addrs.sort();
813 addrs.dedup();
814 addrs.shrink_to_fit();
815
816 peer_tracker.set_trusted(*peer_id, true);
818 }
819
820 Ok(Worker {
821 cancellation_token,
822 cmd_rx,
823 swarm,
824 listeners,
825 bad_encoding_fraud_sub_topic: bad_encoding_fraud_sub_topic.hash(),
826 header_sub_topic_hash: header_sub_topic.hash(),
827 peer_tracker,
828 header_sub_state: None,
829 bitswap_queries: HashMap::new(),
830 network_compromised_token: Token::new(),
831 store: args.store,
832 event_pub: args.event_pub,
833 bootnodes,
834 })
835 }
836
837 async fn run(&mut self) {
838 let mut report_interval = Interval::new(Duration::from_secs(60)).await;
839 let mut kademlia_interval = Interval::new(Duration::from_secs(30)).await;
840 let mut peer_tracker_info_watcher = self.peer_tracker.info_watcher();
841
842 self.bootstrap();
844
845 loop {
846 select! {
847 _ = self.cancellation_token.cancelled() => break,
848 _ = peer_tracker_info_watcher.changed() => {
849 if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
850 warn!("All peers disconnected");
851 self.bootstrap();
852 }
853 }
854 _ = report_interval.tick() => {
855 self.report();
856 }
857 _ = kademlia_interval.tick() => {
858 if self.peer_tracker.info().num_connected_peers < MIN_CONNECTED_PEERS
859 {
860 self.bootstrap();
861 }
862 }
863 _ = poll_closed(&mut self.bitswap_queries) => {
864 self.prune_canceled_bitswap_queries();
865 }
866 ev = self.swarm.select_next_some() => {
867 if let Err(e) = self.on_swarm_event(ev).await {
868 warn!("Failure while handling swarm event: {e}");
869 }
870 },
871 Some(cmd) = self.cmd_rx.recv() => {
872 if let Err(e) = self.on_cmd(cmd).await {
873 warn!("Failure while handling command. (error: {e})");
874 }
875 }
876 }
877 }
878
879 self.on_stop().await;
880 }
881
882 fn bootstrap(&mut self) {
883 self.event_pub.send(NodeEvent::ConnectingToBootnodes);
884
885 for (peer_id, addrs) in &self.bootnodes {
886 let dial_opts = DialOpts::peer_id(*peer_id)
887 .addresses(addrs.clone())
888 .condition(PeerCondition::DisconnectedAndNotDialing)
891 .build();
892
893 if let Err(e) = self.swarm.dial(dial_opts) {
894 if !matches!(e, DialError::DialPeerConditionFalse(_)) {
895 warn!("Failed to dial on {addrs:?}: {e}");
896 }
897 }
898 }
899
900 if self.swarm.behaviour_mut().kademlia.bootstrap().is_err() {
902 warn!("Can't run kademlia bootstrap, no known peers");
903 }
904 }
905
906 fn prune_canceled_bitswap_queries(&mut self) {
907 let mut cancelled = SmallVec::<[_; 16]>::new();
908
909 for (query_id, chan) in &self.bitswap_queries {
910 if chan.is_closed() {
911 cancelled.push(*query_id);
912 }
913 }
914
915 for query_id in cancelled {
916 self.bitswap_queries.remove(&query_id);
917 self.swarm.behaviour_mut().bitswap.cancel(query_id);
918 }
919 }
920
921 async fn on_stop(&mut self) {
922 self.swarm
923 .behaviour_mut()
924 .connection_control
925 .set_stopping(true);
926 self.swarm.behaviour_mut().header_ex.stop();
927
928 for listener in self.listeners.drain(..) {
929 self.swarm.remove_listener(listener);
930 }
931
932 for (_, ids) in self.peer_tracker.connections() {
933 for id in ids {
934 self.swarm.close_connection(id);
935 }
936 }
937
938 while self
940 .swarm
941 .network_info()
942 .connection_counters()
943 .num_established()
944 > 0
945 {
946 match self.swarm.select_next_some().await {
947 SwarmEvent::ConnectionEstablished { connection_id, .. } => {
949 self.swarm.close_connection(connection_id);
951 }
952 SwarmEvent::ConnectionClosed {
953 peer_id,
954 connection_id,
955 ..
956 } => {
957 self.on_peer_disconnected(peer_id, connection_id);
959 }
960 _ => {}
961 }
962 }
963 }
964
965 async fn on_swarm_event(&mut self, ev: SwarmEvent<BehaviourEvent<B, S>>) -> Result<()> {
966 match ev {
967 SwarmEvent::Behaviour(ev) => match ev {
968 BehaviourEvent::Identify(ev) => self.on_identify_event(ev).await?,
969 BehaviourEvent::Gossipsub(ev) => self.on_gossip_sub_event(ev).await,
970 BehaviourEvent::Kademlia(ev) => self.on_kademlia_event(ev).await?,
971 BehaviourEvent::Bitswap(ev) => self.on_bitswap_event(ev).await,
972 BehaviourEvent::Ping(ev) => self.on_ping_event(ev).await,
973 BehaviourEvent::Autonat(_)
974 | BehaviourEvent::ConnectionControl(_)
975 | BehaviourEvent::HeaderEx(_) => {}
976 },
977 SwarmEvent::ConnectionEstablished {
978 peer_id,
979 connection_id,
980 endpoint,
981 ..
982 } => {
983 self.on_peer_connected(peer_id, connection_id, endpoint);
984 }
985 SwarmEvent::ConnectionClosed {
986 peer_id,
987 connection_id,
988 ..
989 } => {
990 self.on_peer_disconnected(peer_id, connection_id);
991 }
992 _ => {}
993 }
994
995 Ok(())
996 }
997
998 async fn on_cmd(&mut self, cmd: P2pCmd) -> Result<()> {
999 match cmd {
1000 P2pCmd::NetworkInfo { respond_to } => {
1001 respond_to.maybe_send(self.swarm.network_info());
1002 }
1003 P2pCmd::HeaderExRequest {
1004 request,
1005 respond_to,
1006 } => {
1007 self.swarm
1008 .behaviour_mut()
1009 .header_ex
1010 .send_request(request, respond_to);
1011 }
1012 P2pCmd::Listeners { respond_to } => {
1013 let local_peer_id = self.swarm.local_peer_id().to_owned();
1014 let listeners = self
1015 .swarm
1016 .listeners()
1017 .cloned()
1018 .map(|mut ma| {
1019 if !ma.protocol_stack().any(|protocol| protocol == "p2p") {
1020 ma.push(Protocol::P2p(local_peer_id))
1021 }
1022 ma
1023 })
1024 .collect();
1025
1026 respond_to.maybe_send(listeners);
1027 }
1028 P2pCmd::ConnectedPeers { respond_to } => {
1029 respond_to.maybe_send(self.peer_tracker.connected_peers());
1030 }
1031 P2pCmd::InitHeaderSub { head, channel } => {
1032 self.on_init_header_sub(*head, channel);
1033 }
1034 P2pCmd::SetPeerTrust {
1035 peer_id,
1036 is_trusted,
1037 } => {
1038 if *self.swarm.local_peer_id() != peer_id {
1039 self.peer_tracker.set_trusted(peer_id, is_trusted);
1040 }
1041 }
1042 P2pCmd::GetShwapCid { cid, respond_to } => {
1043 self.on_get_shwap_cid(cid, respond_to);
1044 }
1045 P2pCmd::GetNetworkCompromisedToken { respond_to } => {
1046 respond_to.maybe_send(self.network_compromised_token.clone())
1047 }
1048 P2pCmd::GetNetworkHead { respond_to } => {
1049 let head = self
1050 .header_sub_state
1051 .as_ref()
1052 .map(|state| state.known_head.clone());
1053 respond_to.maybe_send(head);
1054 }
1055 }
1056
1057 Ok(())
1058 }
1059
1060 #[instrument(skip_all)]
1061 fn report(&mut self) {
1062 let tracker_info = self.peer_tracker.info();
1063
1064 info!(
1065 "peers: {}, trusted peers: {}",
1066 tracker_info.num_connected_peers, tracker_info.num_connected_trusted_peers,
1067 );
1068 }
1069
1070 #[instrument(level = "trace", skip(self))]
1071 async fn on_identify_event(&mut self, ev: identify::Event) -> Result<()> {
1072 match ev {
1073 identify::Event::Received { peer_id, info, .. } => {
1074 for addr in info.listen_addrs {
1077 self.swarm
1078 .behaviour_mut()
1079 .kademlia
1080 .add_address(&peer_id, addr);
1081 }
1082 }
1083 _ => trace!("Unhandled identify event"),
1084 }
1085
1086 Ok(())
1087 }
1088
1089 #[instrument(level = "trace", skip(self))]
1090 async fn on_gossip_sub_event(&mut self, ev: gossipsub::Event) {
1091 match ev {
1092 gossipsub::Event::Message {
1093 message,
1094 message_id,
1095 ..
1096 } => {
1097 let Some(peer) = message.source else {
1098 return;
1100 };
1101
1102 let acceptance = if message.topic == self.header_sub_topic_hash {
1103 self.on_header_sub_message(&message.data[..])
1104 } else if message.topic == self.bad_encoding_fraud_sub_topic {
1105 self.on_bad_encoding_fraud_sub_message(&message.data[..], &peer)
1106 .await
1107 } else {
1108 trace!("Unhandled gossipsub message");
1109 gossipsub::MessageAcceptance::Ignore
1110 };
1111
1112 if !matches!(acceptance, gossipsub::MessageAcceptance::Reject) {
1113 self.peer_maybe_discovered(peer);
1115 }
1116
1117 let _ = self
1118 .swarm
1119 .behaviour_mut()
1120 .gossipsub
1121 .report_message_validation_result(&message_id, &peer, acceptance);
1122 }
1123 _ => trace!("Unhandled gossipsub event"),
1124 }
1125 }
1126
1127 #[instrument(level = "trace", skip(self))]
1128 async fn on_kademlia_event(&mut self, ev: kad::Event) -> Result<()> {
1129 match ev {
1130 kad::Event::RoutingUpdated {
1131 peer, addresses, ..
1132 } => {
1133 self.peer_tracker.add_addresses(peer, addresses.iter());
1134 }
1135 _ => trace!("Unhandled Kademlia event"),
1136 }
1137
1138 Ok(())
1139 }
1140
1141 #[instrument(level = "trace", skip_all)]
1142 fn on_get_shwap_cid(&mut self, cid: Cid, respond_to: OneshotResultSender<Vec<u8>, P2pError>) {
1143 trace!("Requesting CID {cid} from bitswap");
1144 let query_id = self.swarm.behaviour_mut().bitswap.get(&cid);
1145 self.bitswap_queries.insert(query_id, respond_to);
1146 }
1147
1148 #[instrument(level = "trace", skip(self))]
1149 async fn on_bitswap_event(&mut self, ev: beetswap::Event) {
1150 match ev {
1151 beetswap::Event::GetQueryResponse { query_id, data } => {
1152 if let Some(respond_to) = self.bitswap_queries.remove(&query_id) {
1153 respond_to.maybe_send_ok(data);
1154 }
1155 }
1156 beetswap::Event::GetQueryError { query_id, error } => {
1157 if let Some(respond_to) = self.bitswap_queries.remove(&query_id) {
1158 let error: P2pError = error.into();
1159 respond_to.maybe_send_err(error);
1160 }
1161 }
1162 }
1163 }
1164
1165 #[instrument(level = "debug", skip_all)]
1166 async fn on_ping_event(&mut self, ev: ping::Event) {
1167 match ev.result {
1168 Ok(dur) => debug!(
1169 "Ping success: peer: {}, connection_id: {}, time: {:?}",
1170 ev.peer, ev.connection, dur
1171 ),
1172 Err(e) => {
1173 debug!(
1174 "Ping failure: peer: {}, connection_id: {}, error: {}",
1175 &ev.peer, &ev.connection, e
1176 );
1177 self.swarm.close_connection(ev.connection);
1178 }
1179 }
1180 }
1181
1182 #[instrument(skip_all, fields(peer_id = %peer_id))]
1183 fn peer_maybe_discovered(&mut self, peer_id: PeerId) {
1184 if !self.peer_tracker.set_maybe_discovered(peer_id) {
1185 return;
1186 }
1187
1188 debug!("Peer discovered");
1189 }
1190
1191 #[instrument(skip_all, fields(peer_id = %peer_id))]
1192 fn on_peer_connected(
1193 &mut self,
1194 peer_id: PeerId,
1195 connection_id: ConnectionId,
1196 endpoint: ConnectedPoint,
1197 ) {
1198 debug!("Peer connected");
1199
1200 let dialed_addr = match endpoint {
1206 ConnectedPoint::Dialer {
1207 address,
1208 role_override: Endpoint::Dialer,
1209 ..
1210 } => Some(address),
1211 _ => None,
1212 };
1213
1214 self.peer_tracker
1215 .set_connected(peer_id, connection_id, dialed_addr);
1216 }
1217
1218 #[instrument(skip_all, fields(peer_id = %peer_id))]
1219 fn on_peer_disconnected(&mut self, peer_id: PeerId, connection_id: ConnectionId) {
1220 if self
1221 .peer_tracker
1222 .set_maybe_disconnected(peer_id, connection_id)
1223 {
1224 debug!("Peer disconnected");
1225 }
1226 }
1227
1228 #[instrument(skip_all, fields(header = %head))]
1229 fn on_init_header_sub(&mut self, head: ExtendedHeader, channel: mpsc::Sender<ExtendedHeader>) {
1230 self.header_sub_state = Some(HeaderSubState {
1231 known_head: head,
1232 channel,
1233 });
1234 trace!("HeaderSub initialized");
1235 }
1236
1237 #[instrument(skip_all)]
1238 fn on_header_sub_message(&mut self, data: &[u8]) -> gossipsub::MessageAcceptance {
1239 let Ok(header) = ExtendedHeader::decode_and_validate(data) else {
1240 trace!("Malformed or invalid header from header-sub");
1241 return gossipsub::MessageAcceptance::Reject;
1242 };
1243
1244 trace!("Received header from header-sub ({header})");
1245
1246 let Some(ref mut state) = self.header_sub_state else {
1247 debug!("header-sub not initialized yet");
1248 return gossipsub::MessageAcceptance::Ignore;
1249 };
1250
1251 if state.known_head.verify(&header).is_err() {
1252 trace!("Failed to verify HeaderSub header. Ignoring {header}");
1253 return gossipsub::MessageAcceptance::Ignore;
1254 }
1255
1256 trace!("New header from header-sub ({header})");
1257
1258 state.known_head = header.clone();
1259 let _ = state.channel.try_send(header);
1262
1263 gossipsub::MessageAcceptance::Accept
1264 }
1265
1266 #[instrument(skip_all)]
1267 async fn on_bad_encoding_fraud_sub_message(
1268 &mut self,
1269 data: &[u8],
1270 peer: &PeerId,
1271 ) -> gossipsub::MessageAcceptance {
1272 let Ok(befp) = BadEncodingFraudProof::decode(data) else {
1273 trace!("Malformed bad encoding fraud proof from {peer}");
1274 self.swarm.behaviour_mut().gossipsub.blacklist_peer(peer);
1275 return gossipsub::MessageAcceptance::Reject;
1276 };
1277
1278 let height = befp.height().value();
1279
1280 let current_height = if let Some(ref header_sub_state) = self.header_sub_state {
1281 header_sub_state.known_head.height().value()
1282 } else if let Ok(local_head) = self.store.get_head().await {
1283 local_head.height().value()
1284 } else {
1285 return gossipsub::MessageAcceptance::Ignore;
1287 };
1288
1289 if height > current_height + FRAUD_PROOF_HEAD_HEIGHT_THRESHOLD {
1290 return gossipsub::MessageAcceptance::Ignore;
1293 }
1294
1295 let hash = befp.header_hash();
1296 let Ok(header) = self.store.get_by_hash(&hash).await else {
1297 return gossipsub::MessageAcceptance::Ignore;
1300 };
1301
1302 if let Err(e) = befp.validate(&header) {
1303 trace!("Received invalid bad encoding fraud proof from {peer}: {e}");
1304 self.swarm.behaviour_mut().gossipsub.blacklist_peer(peer);
1305 return gossipsub::MessageAcceptance::Reject;
1306 }
1307
1308 warn!("Received a valid bad encoding fraud proof");
1309 self.network_compromised_token.trigger();
1311
1312 gossipsub::MessageAcceptance::Accept
1313 }
1314}
1315
1316async fn poll_closed(
1318 bitswap_queries: &mut HashMap<beetswap::QueryId, OneshotResultSender<Vec<u8>, P2pError>>,
1319) {
1320 poll_fn(|cx| {
1321 for chan in bitswap_queries.values_mut() {
1322 match chan.poll_closed(cx) {
1323 Poll::Pending => continue,
1324 Poll::Ready(_) => return Poll::Ready(()),
1325 }
1326 }
1327
1328 Poll::Pending
1329 })
1330 .await
1331}
1332
1333fn validate_bootnode_addrs(addrs: &[Multiaddr]) -> Result<(), P2pError> {
1334 let mut invalid_addrs = Vec::new();
1335
1336 for addr in addrs {
1337 if addr.peer_id().is_none() {
1338 invalid_addrs.push(addr.to_owned());
1339 }
1340 }
1341
1342 if invalid_addrs.is_empty() {
1343 Ok(())
1344 } else {
1345 Err(P2pError::BootnodeAddrsWithoutPeerId(invalid_addrs))
1346 }
1347}
1348
1349fn init_gossipsub<'a, B, S>(
1350 args: &'a P2pArgs<B, S>,
1351 topics: impl IntoIterator<Item = &'a gossipsub::IdentTopic>,
1352) -> Result<gossipsub::Behaviour>
1353where
1354 B: Blockstore,
1355 S: Store,
1356{
1357 let message_authenticity = gossipsub::MessageAuthenticity::Signed(args.local_keypair.clone());
1360
1361 let config = gossipsub::ConfigBuilder::default()
1362 .validation_mode(gossipsub::ValidationMode::Strict)
1363 .validate_messages()
1364 .build()
1365 .map_err(|e| P2pError::GossipsubInit(e.to_string()))?;
1366
1367 let mut gossipsub: gossipsub::Behaviour =
1369 gossipsub::Behaviour::new(message_authenticity, config)
1370 .map_err(|e| P2pError::GossipsubInit(e.to_string()))?;
1371
1372 for topic in topics {
1373 gossipsub
1374 .subscribe(topic)
1375 .map_err(|e| P2pError::GossipsubInit(e.to_string()))?;
1376 }
1377
1378 Ok(gossipsub)
1379}
1380
1381fn init_kademlia<B, S>(args: &P2pArgs<B, S>) -> Result<kad::Behaviour<kad::store::MemoryStore>>
1382where
1383 B: Blockstore,
1384 S: Store,
1385{
1386 let local_peer_id = PeerId::from(args.local_keypair.public());
1387 let store = kad::store::MemoryStore::new(local_peer_id);
1388
1389 let protocol_id = celestia_protocol_id(&args.network_id, "/kad/1.0.0");
1390 let config = kad::Config::new(protocol_id);
1391
1392 let mut kademlia = kad::Behaviour::with_config(local_peer_id, store, config);
1393
1394 for addr in &args.bootnodes {
1395 if let Some(peer_id) = addr.peer_id() {
1396 kademlia.add_address(&peer_id, addr.to_owned());
1397 }
1398 }
1399
1400 if !args.listen_on.is_empty() {
1401 kademlia.set_mode(Some(kad::Mode::Server));
1402 }
1403
1404 Ok(kademlia)
1405}
1406
1407fn init_bitswap<B, S>(
1408 blockstore: Arc<B>,
1409 store: Arc<S>,
1410 network_id: &str,
1411) -> Result<beetswap::Behaviour<MAX_MH_SIZE, B>>
1412where
1413 B: Blockstore + 'static,
1414 S: Store + 'static,
1415{
1416 let protocol_prefix = celestia_protocol_id(network_id, "shwap");
1417
1418 Ok(beetswap::Behaviour::builder(blockstore)
1419 .protocol_prefix(protocol_prefix.as_ref())?
1420 .register_multihasher(ShwapMultihasher::new(store))
1421 .client_set_send_dont_have(false)
1422 .build())
1423}