lumina_node/
node.rs

1//! Node that connects to Celestia's P2P network.
2//!
3//! Upon creation, `Node` will try to connect to Celestia's P2P network
4//! and then proceed with synchronization and data sampling of the blocks.
5
6use std::ops::RangeBounds;
7use std::sync::Arc;
8use std::time::Duration;
9
10use libp2p::identity::Keypair;
11use libp2p::swarm::NetworkInfo;
12use libp2p::{Multiaddr, PeerId};
13use tokio::sync::{broadcast, mpsc, watch};
14use tokio_stream::wrappers::ReceiverStream;
15use tokio_util::sync::CancellationToken;
16use tracing::warn;
17
18use blockstore::Blockstore;
19use celestia_types::blob::BlobsAtHeight;
20use celestia_types::hash::Hash;
21use celestia_types::namespace_data::NamespaceData;
22use celestia_types::nmt::Namespace;
23use celestia_types::row::Row;
24use celestia_types::row_namespace_data::RowNamespaceData;
25use celestia_types::sample::Sample;
26use celestia_types::{Blob, ExtendedDataSquare, ExtendedHeader, SharesAtHeight};
27use lumina_utils::executor::{JoinHandle, spawn, spawn_cancellable};
28
29use crate::blockstore::{InMemoryBlockstore, SampleBlockstore};
30use crate::daser::{
31    DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY, DEFAULT_CONCURENCY_LIMIT, Daser, DaserArgs,
32};
33use crate::events::{EventChannel, EventSubscriber, NodeEvent};
34use crate::node::subscriptions::{SubscriptionError, forward_new_blobs, forward_new_shares};
35use crate::p2p::shwap::sample_cid;
36use crate::p2p::{P2p, P2pArgs};
37use crate::pruner::{Pruner, PrunerArgs};
38use crate::store::{InMemoryStore, SamplingMetadata, Store, StoreError};
39use crate::syncer::{Syncer, SyncerArgs};
40
41mod builder;
42pub mod subscriptions;
43
44pub use self::builder::{
45    DEFAULT_PRUNING_WINDOW, DEFAULT_PRUNING_WINDOW_IN_MEMORY, NodeBuilder, NodeBuilderError,
46    SAMPLING_WINDOW,
47};
48pub use crate::daser::DaserError;
49pub use crate::p2p::{HeaderExError, P2pError};
50pub use crate::peer_tracker::PeerTrackerInfo;
51pub use crate::syncer::{SyncerError, SyncingInfo};
52
53const DEFAULT_BLOCK_TIME: Duration = Duration::from_secs(6);
54
55/// Alias of [`Result`] with [`NodeError`] error type
56///
57/// [`Result`]: std::result::Result
58pub type Result<T, E = NodeError> = std::result::Result<T, E>;
59
60/// Representation of all the errors that can occur when interacting with the [`Node`].
61#[derive(Debug, thiserror::Error)]
62pub enum NodeError {
63    /// An error propagated from the [`NodeBuilder`] component.
64    #[error("NodeBuilder: {0}")]
65    NodeBuilder(#[from] NodeBuilderError),
66
67    /// An error propagated from the `P2p` component.
68    #[error("P2p: {0}")]
69    P2p(#[from] P2pError),
70
71    /// An error propagated from the `Syncer` component.
72    #[error("Syncer: {0}")]
73    Syncer(#[from] SyncerError),
74
75    /// An error propagated from the [`Store`] component.
76    #[error("Store: {0}")]
77    Store(#[from] StoreError),
78
79    /// An error propagated from the `Daser` component.
80    #[error("Daser: {0}")]
81    Daser(#[from] DaserError),
82}
83
84struct NodeConfig<B, S>
85where
86    B: Blockstore,
87    S: Store,
88{
89    pub(crate) blockstore: B,
90    pub(crate) store: S,
91    pub(crate) network_id: String,
92    pub(crate) p2p_local_keypair: Keypair,
93    pub(crate) p2p_bootnodes: Vec<Multiaddr>,
94    pub(crate) p2p_listen_on: Vec<Multiaddr>,
95    pub(crate) sync_batch_size: u64,
96    pub(crate) sampling_window: Duration,
97    pub(crate) pruning_window: Duration,
98}
99
100/// Celestia node.
101pub struct Node<B, S>
102where
103    B: Blockstore + 'static,
104    S: Store + 'static,
105{
106    event_channel: EventChannel,
107    p2p: Option<Arc<P2p>>,
108    blockstore: Option<Arc<SampleBlockstore<B>>>,
109    store: Option<Arc<S>>,
110    syncer: Option<Arc<Syncer<S>>>,
111    daser: Option<Arc<Daser>>,
112    pruner: Option<Arc<Pruner>>,
113    tasks_cancellation_token: CancellationToken,
114    network_compromised_task: JoinHandle,
115}
116
117impl Node<InMemoryBlockstore, InMemoryStore> {
118    /// Creates a new [`NodeBuilder`] that is using in-memory stores.
119    ///
120    /// After the creation you can use [`NodeBuilder::blockstore`]
121    /// and [`NodeBuilder::store`] to set other stores.
122    ///
123    /// # Example
124    ///
125    /// ```no_run
126    /// # use lumina_node::network::Network;
127    /// # use lumina_node::Node;
128    /// #
129    /// # async fn example() {
130    /// let node = Node::builder()
131    ///     .network(Network::Mainnet)
132    ///     .start()
133    ///     .await
134    ///     .unwrap();
135    /// # }
136    /// ```
137    pub fn builder() -> NodeBuilder<InMemoryBlockstore, InMemoryStore> {
138        NodeBuilder::new()
139    }
140}
141
142impl<B, S> Node<B, S>
143where
144    B: Blockstore,
145    S: Store,
146{
147    /// Creates and starts a new celestia node with a given config.
148    async fn start(config: NodeConfig<B, S>) -> Result<(Self, EventSubscriber)> {
149        let event_channel = EventChannel::new();
150        let event_sub = event_channel.subscribe();
151        let store = Arc::new(config.store);
152        let blockstore = Arc::new(SampleBlockstore::new(config.blockstore));
153
154        let p2p = Arc::new(
155            P2p::start(P2pArgs {
156                network_id: config.network_id,
157                local_keypair: config.p2p_local_keypair,
158                bootnodes: config.p2p_bootnodes,
159                listen_on: config.p2p_listen_on,
160                blockstore: blockstore.clone(),
161                store: store.clone(),
162                event_pub: event_channel.publisher(),
163            })
164            .await?,
165        );
166
167        let syncer = Arc::new(Syncer::start(SyncerArgs {
168            store: store.clone(),
169            p2p: p2p.clone(),
170            event_pub: event_channel.publisher(),
171            batch_size: config.sync_batch_size,
172            sampling_window: config.sampling_window,
173            pruning_window: config.pruning_window,
174        })?);
175
176        let daser = Arc::new(Daser::start(DaserArgs {
177            p2p: p2p.clone(),
178            store: store.clone(),
179            event_pub: event_channel.publisher(),
180            sampling_window: config.sampling_window,
181            concurrency_limit: DEFAULT_CONCURENCY_LIMIT,
182            additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
183        })?);
184
185        let pruner = Arc::new(Pruner::start(PrunerArgs {
186            daser: daser.clone(),
187            store: store.clone(),
188            blockstore: blockstore.clone(),
189            event_pub: event_channel.publisher(),
190            block_time: DEFAULT_BLOCK_TIME,
191            sampling_window: config.sampling_window,
192            pruning_window: config.pruning_window,
193        }));
194
195        let tasks_cancellation_token = CancellationToken::new();
196
197        // spawn the task that will stop the services when the fraud is detected
198        let network_compromised_task = spawn_cancellable(tasks_cancellation_token.child_token(), {
199            let network_compromised_token = p2p.get_network_compromised_token().await?;
200            let syncer = syncer.clone();
201            let daser = daser.clone();
202            let pruner = pruner.clone();
203            let event_pub = event_channel.publisher();
204
205            async move {
206                network_compromised_token.triggered().await;
207
208                // Network compromised! Stop workers.
209                syncer.stop();
210                daser.stop();
211                pruner.stop();
212
213                event_pub.send(NodeEvent::NetworkCompromised);
214                // This is a very important message and we want to log it even
215                // if user consumes our events.
216                warn!("{}", NodeEvent::NetworkCompromised);
217            }
218        });
219
220        let node = Node {
221            event_channel,
222            p2p: Some(p2p),
223            blockstore: Some(blockstore),
224            store: Some(store),
225            syncer: Some(syncer),
226            daser: Some(daser),
227            pruner: Some(pruner),
228            tasks_cancellation_token,
229            network_compromised_task,
230        };
231
232        Ok((node, event_sub))
233    }
234
235    /// Stop the node.
236    pub async fn stop(mut self) {
237        {
238            let daser = self.daser.take().expect("Daser not initialized");
239            let syncer = self.syncer.take().expect("Syncer not initialized");
240            let pruner = self.pruner.take().expect("Pruner not initialized");
241            let p2p = self.p2p.take().expect("P2p not initialized");
242
243            // Cancel Node's tasks
244            self.tasks_cancellation_token.cancel();
245            self.network_compromised_task.join().await;
246
247            // Stop all components that use P2p.
248            daser.stop();
249            syncer.stop();
250            pruner.stop();
251
252            daser.join().await;
253            syncer.join().await;
254            pruner.join().await;
255
256            // Now stop P2p component.
257            p2p.stop();
258            p2p.join().await;
259        }
260
261        // Everything that was holding Blockstore is now dropped, so we can close it.
262        let blockstore = self.blockstore.take().expect("Blockstore not initialized");
263        let blockstore = Arc::into_inner(blockstore).expect("Not all Arc<Blockstore> were dropped");
264        if let Err(e) = blockstore.close().await {
265            warn!("Blockstore failed to close: {e}");
266        }
267
268        // Everything that was holding Store is now dropped, so we can close it.
269        let store = self.store.take().expect("Store not initialized");
270        let store = Arc::into_inner(store).expect("Not all Arc<Store> were dropped");
271        if let Err(e) = store.close().await {
272            warn!("Store failed to close: {e}");
273        }
274
275        self.event_channel.publisher().send(NodeEvent::NodeStopped);
276    }
277
278    fn syncer(&self) -> &Syncer<S> {
279        self.syncer.as_ref().expect("Syncer not initialized")
280    }
281
282    fn p2p(&self) -> &P2p {
283        self.p2p.as_ref().expect("P2p not initialized")
284    }
285
286    fn store(&self) -> &S {
287        self.store.as_ref().expect("Store not initialized")
288    }
289
290    /// Returns a new `EventSubscriber`.
291    pub fn event_subscriber(&self) -> EventSubscriber {
292        self.event_channel.subscribe()
293    }
294
295    /// Get node's local peer ID.
296    pub fn local_peer_id(&self) -> &PeerId {
297        self.p2p().local_peer_id()
298    }
299
300    /// Get current [`PeerTrackerInfo`].
301    pub fn peer_tracker_info(&self) -> PeerTrackerInfo {
302        self.p2p().peer_tracker_info().clone()
303    }
304
305    /// Get [`PeerTrackerInfo`] watcher.
306    pub fn peer_tracker_info_watcher(&self) -> watch::Receiver<PeerTrackerInfo> {
307        self.p2p().peer_tracker_info_watcher()
308    }
309
310    /// Wait until the node is connected to at least 1 peer.
311    pub async fn wait_connected(&self) -> Result<()> {
312        Ok(self.p2p().wait_connected().await?)
313    }
314
315    /// Wait until the node is connected to at least 1 trusted peer.
316    pub async fn wait_connected_trusted(&self) -> Result<()> {
317        Ok(self.p2p().wait_connected_trusted().await?)
318    }
319
320    /// Get current network info.
321    pub async fn network_info(&self) -> Result<NetworkInfo> {
322        Ok(self.p2p().network_info().await?)
323    }
324
325    /// Get all the multiaddresses on which the node listens.
326    pub async fn listeners(&self) -> Result<Vec<Multiaddr>> {
327        Ok(self.p2p().listeners().await?)
328    }
329
330    /// Get all the peers that node is connected to.
331    pub async fn connected_peers(&self) -> Result<Vec<PeerId>> {
332        Ok(self.p2p().connected_peers().await?)
333    }
334
335    /// Trust or untrust the peer with a given ID.
336    pub async fn set_peer_trust(&self, peer_id: PeerId, is_trusted: bool) -> Result<()> {
337        Ok(self.p2p().set_peer_trust(peer_id, is_trusted).await?)
338    }
339
340    /// Manually mark the peer as archival.
341    #[cfg(any(test, feature = "test-utils"))]
342    pub async fn mark_as_archival(&self, peer_id: PeerId) -> Result<()> {
343        Ok(self.p2p().mark_as_archival(peer_id).await?)
344    }
345
346    /// Request the head header from the network.
347    pub async fn request_head_header(&self) -> Result<ExtendedHeader> {
348        Ok(self.p2p().get_head_header().await?)
349    }
350
351    /// Request a header for the block with a given hash from the network.
352    pub async fn request_header_by_hash(&self, hash: &Hash) -> Result<ExtendedHeader> {
353        Ok(self.p2p().get_header(*hash).await?)
354    }
355
356    /// Request a header for the block with a given height from the network.
357    pub async fn request_header_by_height(&self, height: u64) -> Result<ExtendedHeader> {
358        Ok(self.p2p().get_header_by_height(height).await?)
359    }
360
361    /// Request headers in range (from, from + amount] from the network.
362    ///
363    /// The headers will be verified with the `from` header.
364    pub async fn request_verified_headers(
365        &self,
366        from: &ExtendedHeader,
367        amount: u64,
368    ) -> Result<Vec<ExtendedHeader>> {
369        Ok(self.p2p().get_verified_headers_range(from, amount).await?)
370    }
371
372    /// Request a verified [`Row`] from the network.
373    ///
374    /// # Errors
375    ///
376    /// On failure to receive a verified [`Row`] within a certain time, the
377    /// `NodeError::P2p(P2pError::RequestTimedOut)` error will be returned.
378    pub async fn request_row(
379        &self,
380        row_index: u16,
381        block_height: u64,
382        timeout: Option<Duration>,
383    ) -> Result<Row> {
384        Ok(self.p2p().get_row(row_index, block_height, timeout).await?)
385    }
386
387    /// Request a verified [`Sample`] from the network.
388    ///
389    /// # Errors
390    ///
391    /// On failure to receive a verified [`Sample`] within a certain time, the
392    /// `NodeError::P2p(P2pError::RequestTimedOut)` error will be returned.
393    pub async fn request_sample(
394        &self,
395        row_index: u16,
396        column_index: u16,
397        block_height: u64,
398        timeout: Option<Duration>,
399    ) -> Result<Sample> {
400        let sample = self
401            .p2p()
402            .get_sample(row_index, column_index, block_height, timeout)
403            .await?;
404
405        // We want to immediately remove the sample from blockstore
406        // but **only if** it wasn't chosen for DASing. Otherwise, we could
407        // accidentally remove samples needed for the block reconstruction.
408        //
409        // There's a small possibility of permanently storing this sample if
410        // persistent blockstore is used and user closes tab / kills process
411        // before the remove is called, but it is acceptable tradeoff to avoid complexity.
412        //
413        // TODO: It should be properly solved when we switch from bitswap to shrex.
414        if let Some(metadata) = self.get_sampling_metadata(block_height).await? {
415            let cid = sample_cid(row_index, column_index, block_height)?;
416            if !metadata.cids.contains(&cid) {
417                let blockstore = self
418                    .blockstore
419                    .as_ref()
420                    .expect("Blockstore not initialized");
421                let _ = blockstore.remove(&cid).await;
422            }
423        }
424
425        Ok(sample)
426    }
427
428    /// Request a verified [`ExtendedDataSquare`] from the network.
429    ///
430    /// # Errors
431    ///
432    /// On failure to receive a verified [`ExtendedDataSquare`] within a certain time, the
433    /// `NodeError::P2p(P2pError::RequestTimedOut)` error will be returned.
434    pub async fn request_extended_data_square(
435        &self,
436        block_height: u64,
437        timeout: Option<Duration>,
438    ) -> Result<ExtendedDataSquare> {
439        Ok(self.p2p().get_eds(block_height, timeout).await?)
440    }
441
442    /// Request a verified [`RowNamespaceData`] from the network.
443    ///
444    /// # Errors
445    ///
446    /// On failure to receive a verified [`RowNamespaceData`] within a certain time, the
447    /// `NodeError::P2p(P2pError::RequestTimedOut)` error will be returned.
448    pub async fn request_row_namespace_data(
449        &self,
450        namespace: Namespace,
451        row_index: u16,
452        block_height: u64,
453        timeout: Option<Duration>,
454    ) -> Result<RowNamespaceData> {
455        Ok(self
456            .p2p()
457            .get_row_namespace_data(namespace, row_index, block_height, timeout)
458            .await?)
459    }
460
461    /// Request a verified [`NamespaceData`] from the network.
462    ///
463    /// # Errors
464    ///
465    /// On failure to receive a verified [`NamespaceData`] within a certain time, the
466    /// `NodeError::P2p(P2pError::RequestTimedOut)` error will be returned.
467    pub async fn request_namespace_data(
468        &self,
469        namespace: Namespace,
470        block_height: u64,
471        timeout: Option<Duration>,
472    ) -> Result<NamespaceData> {
473        Ok(self
474            .p2p()
475            .get_namespace_data(namespace, block_height, timeout)
476            .await?)
477    }
478
479    /// Request all blobs with provided namespace in the block corresponding to this header
480    /// using bitswap protocol.
481    pub async fn request_all_blobs(
482        &self,
483        namespace: Namespace,
484        block_height: u64,
485        timeout: Option<Duration>,
486    ) -> Result<Vec<Blob>> {
487        Ok(self
488            .p2p()
489            .get_all_blobs(namespace, block_height, timeout, self.store())
490            .await?)
491    }
492
493    /// Get current header syncing info.
494    pub async fn syncer_info(&self) -> Result<SyncingInfo> {
495        Ok(self.syncer().info().await?)
496    }
497
498    /// Get the latest header announced in the network.
499    pub async fn get_network_head_header(&self) -> Result<Option<ExtendedHeader>> {
500        Ok(self.p2p().get_network_head().await?)
501    }
502
503    /// Get the latest locally synced header.
504    pub async fn get_local_head_header(&self) -> Result<ExtendedHeader> {
505        Ok(self.store().get_head().await?)
506    }
507
508    /// Get a synced header for the block with a given hash.
509    pub async fn get_header_by_hash(&self, hash: &Hash) -> Result<ExtendedHeader> {
510        Ok(self.store().get_by_hash(hash).await?)
511    }
512
513    /// Get a synced header for the block with a given height.
514    pub async fn get_header_by_height(&self, height: u64) -> Result<ExtendedHeader> {
515        Ok(self.store().get_by_height(height).await?)
516    }
517
518    /// Get synced headers from the given heights range.
519    ///
520    /// If start of the range is unbounded, the first returned header will be of height 1.
521    /// If end of the range is unbounded, the last returned header will be the last header in the
522    /// store.
523    ///
524    /// # Errors
525    ///
526    /// If range contains a height of a header that is not found in the store or [`RangeBounds`]
527    /// cannot be converted to a valid range.
528    pub async fn get_headers<R>(&self, range: R) -> Result<Vec<ExtendedHeader>>
529    where
530        R: RangeBounds<u64> + Send,
531    {
532        Ok(self.store().get_range(range).await?)
533    }
534
535    /// Get data sampling metadata of an already sampled height.
536    ///
537    /// Returns `Ok(None)` if metadata for the given height does not exists.
538    pub async fn get_sampling_metadata(&self, height: u64) -> Result<Option<SamplingMetadata>> {
539        match self.store().get_sampling_metadata(height).await {
540            Ok(val) => Ok(val),
541            Err(StoreError::NotFound) => Ok(None),
542            Err(e) => Err(e.into()),
543        }
544    }
545
546    /// Subscribe to new headers received by the node from the network.
547    ///
548    /// Return a stream which will yield all the headers, as they are being received by the
549    /// node, starting from the first header received after the call.
550    pub async fn header_subscribe(&self) -> Result<broadcast::Receiver<ExtendedHeader>> {
551        Ok(self.syncer().subscribe_headers().await?)
552    }
553
554    /// Subscribe to the shares from the namespace, as new headers are received by the node
555    ///
556    /// Return a stream which will yield all the blobs from the namespace, as the new headers
557    /// are being received by the node, starting from the first header received after the call.
558    pub async fn blob_subscribe(
559        &self,
560        namespace: Namespace,
561    ) -> Result<ReceiverStream<Result<BlobsAtHeight, SubscriptionError>>> {
562        let header_receiver = self.header_subscribe().await?;
563        let p2p = self.p2p.as_ref().cloned().expect("p2p should be present");
564
565        // We're keeping a small buffer of blobs, so that new headers are cached eagerly
566        // as they come in, giving consumer more time before they are no longer available.
567        // This is mostly relevant for cases where pruning window is set to zero.
568        let (tx, rx) = mpsc::channel(16);
569
570        spawn(async move { forward_new_blobs(namespace, tx, header_receiver, p2p).await });
571
572        Ok(ReceiverStream::new(rx))
573    }
574
575    /// Subscribe to the blobs from the namespace, as new headers are received by the node
576    ///
577    /// Return a stream which will yield all the shares from the namespace, as the new headers
578    /// are being received by the node, starting from the first header received after the call.
579    pub async fn namespace_subscribe(
580        &self,
581        namespace: Namespace,
582    ) -> Result<ReceiverStream<Result<SharesAtHeight, SubscriptionError>>> {
583        let header_receiver = self.header_subscribe().await?;
584        let p2p = self.p2p.as_ref().cloned().expect("p2p should be present");
585
586        // We're keeping a small buffer of shares, so that new headers are cached eagerly
587        // as they come in, giving consumer more time before they are no longer available.
588        // This is mostly relevant for cases where pruning window is set to zero.
589        let (tx, rx) = mpsc::channel(16);
590
591        spawn(async move { forward_new_shares(namespace, tx, header_receiver, p2p).await });
592
593        Ok(ReceiverStream::new(rx))
594    }
595}
596
597impl<B, S> Drop for Node<B, S>
598where
599    B: Blockstore,
600    S: Store,
601{
602    fn drop(&mut self) {
603        // Stop everything, but don't join them.
604        self.tasks_cancellation_token.cancel();
605
606        if let Some(daser) = self.daser.take() {
607            daser.stop();
608        }
609
610        if let Some(syncer) = self.syncer.take() {
611            syncer.stop();
612        }
613
614        if let Some(pruner) = self.pruner.take() {
615            pruner.stop();
616        }
617
618        if let Some(p2p) = self.p2p.take() {
619            p2p.stop();
620        }
621    }
622}