lumina_node/
node.rs

1//! Node that connects to Celestia's P2P network.
2//!
3//! Upon creation, `Node` will try to connect to Celestia's P2P network
4//! and then proceed with synchronization and data sampling of the blocks.
5
6use std::ops::RangeBounds;
7use std::sync::Arc;
8use std::time::Duration;
9
10use blockstore::Blockstore;
11use celestia_types::hash::Hash;
12use celestia_types::nmt::Namespace;
13use celestia_types::row::Row;
14use celestia_types::row_namespace_data::RowNamespaceData;
15use celestia_types::sample::Sample;
16use celestia_types::{Blob, ExtendedHeader};
17use libp2p::identity::Keypair;
18use libp2p::swarm::NetworkInfo;
19use libp2p::{Multiaddr, PeerId};
20use lumina_utils::executor::{spawn_cancellable, JoinHandle};
21use tokio::sync::watch;
22use tokio_util::sync::CancellationToken;
23use tracing::warn;
24
25use crate::blockstore::{InMemoryBlockstore, SampleBlockstore};
26use crate::daser::{
27    Daser, DaserArgs, DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY, DEFAULT_CONCURENCY_LIMIT,
28};
29use crate::events::{EventChannel, EventSubscriber, NodeEvent};
30use crate::p2p::shwap::sample_cid;
31use crate::p2p::{P2p, P2pArgs};
32use crate::pruner::{Pruner, PrunerArgs};
33use crate::store::{InMemoryStore, SamplingMetadata, Store, StoreError};
34use crate::syncer::{Syncer, SyncerArgs};
35
36mod builder;
37
38pub use self::builder::{
39    NodeBuilder, NodeBuilderError, DEFAULT_PRUNING_WINDOW, DEFAULT_PRUNING_WINDOW_IN_MEMORY,
40    SAMPLING_WINDOW,
41};
42pub use crate::daser::DaserError;
43pub use crate::p2p::{HeaderExError, P2pError};
44pub use crate::peer_tracker::PeerTrackerInfo;
45pub use crate::syncer::{SyncerError, SyncingInfo};
46
47const DEFAULT_BLOCK_TIME: Duration = Duration::from_secs(6);
48
49/// Alias of [`Result`] with [`NodeError`] error type
50///
51/// [`Result`]: std::result::Result
52pub type Result<T, E = NodeError> = std::result::Result<T, E>;
53
54/// Representation of all the errors that can occur when interacting with the [`Node`].
55#[derive(Debug, thiserror::Error)]
56pub enum NodeError {
57    /// An error propagated from the [`NodeBuilder`] component.
58    #[error("NodeBuilder: {0}")]
59    NodeBuilder(#[from] NodeBuilderError),
60
61    /// An error propagated from the `P2p` component.
62    #[error("P2p: {0}")]
63    P2p(#[from] P2pError),
64
65    /// An error propagated from the `Syncer` component.
66    #[error("Syncer: {0}")]
67    Syncer(#[from] SyncerError),
68
69    /// An error propagated from the [`Store`] component.
70    #[error("Store: {0}")]
71    Store(#[from] StoreError),
72
73    /// An error propagated from the `Daser` component.
74    #[error("Daser: {0}")]
75    Daser(#[from] DaserError),
76}
77
78struct NodeConfig<B, S>
79where
80    B: Blockstore,
81    S: Store,
82{
83    pub(crate) blockstore: B,
84    pub(crate) store: S,
85    pub(crate) network_id: String,
86    pub(crate) p2p_local_keypair: Keypair,
87    pub(crate) p2p_bootnodes: Vec<Multiaddr>,
88    pub(crate) p2p_listen_on: Vec<Multiaddr>,
89    pub(crate) sync_batch_size: u64,
90    pub(crate) sampling_window: Duration,
91    pub(crate) pruning_window: Duration,
92}
93
94/// Celestia node.
95pub struct Node<B, S>
96where
97    B: Blockstore + 'static,
98    S: Store + 'static,
99{
100    event_channel: EventChannel,
101    p2p: Option<Arc<P2p>>,
102    blockstore: Option<Arc<SampleBlockstore<B>>>,
103    store: Option<Arc<S>>,
104    syncer: Option<Arc<Syncer<S>>>,
105    daser: Option<Arc<Daser>>,
106    pruner: Option<Arc<Pruner>>,
107    tasks_cancellation_token: CancellationToken,
108    network_compromised_task: JoinHandle,
109}
110
111impl Node<InMemoryBlockstore, InMemoryStore> {
112    /// Creates a new [`NodeBuilder`] that is using in-memory stores.
113    ///
114    /// After the creation you can use [`NodeBuilder::blockstore`]
115    /// and [`NodeBuilder::store`] to set other stores.
116    ///
117    /// # Example
118    ///
119    /// ```no_run
120    /// # use lumina_node::network::Network;
121    /// # use lumina_node::Node;
122    /// #
123    /// # async fn example() {
124    /// let node = Node::builder()
125    ///     .network(Network::Mainnet)
126    ///     .start()
127    ///     .await
128    ///     .unwrap();
129    /// # }
130    /// ```
131    pub fn builder() -> NodeBuilder<InMemoryBlockstore, InMemoryStore> {
132        NodeBuilder::new()
133    }
134}
135
136impl<B, S> Node<B, S>
137where
138    B: Blockstore,
139    S: Store,
140{
141    /// Creates and starts a new celestia node with a given config.
142    async fn start(config: NodeConfig<B, S>) -> Result<(Self, EventSubscriber)> {
143        let event_channel = EventChannel::new();
144        let event_sub = event_channel.subscribe();
145        let store = Arc::new(config.store);
146        let blockstore = Arc::new(SampleBlockstore::new(config.blockstore));
147
148        let p2p = Arc::new(
149            P2p::start(P2pArgs {
150                network_id: config.network_id,
151                local_keypair: config.p2p_local_keypair,
152                bootnodes: config.p2p_bootnodes,
153                listen_on: config.p2p_listen_on,
154                blockstore: blockstore.clone(),
155                store: store.clone(),
156                event_pub: event_channel.publisher(),
157            })
158            .await?,
159        );
160
161        let syncer = Arc::new(Syncer::start(SyncerArgs {
162            store: store.clone(),
163            p2p: p2p.clone(),
164            event_pub: event_channel.publisher(),
165            batch_size: config.sync_batch_size,
166            sampling_window: config.sampling_window,
167            pruning_window: config.pruning_window,
168        })?);
169
170        let daser = Arc::new(Daser::start(DaserArgs {
171            p2p: p2p.clone(),
172            store: store.clone(),
173            event_pub: event_channel.publisher(),
174            sampling_window: config.sampling_window,
175            concurrency_limit: DEFAULT_CONCURENCY_LIMIT,
176            additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
177        })?);
178
179        let pruner = Arc::new(Pruner::start(PrunerArgs {
180            daser: daser.clone(),
181            store: store.clone(),
182            blockstore: blockstore.clone(),
183            event_pub: event_channel.publisher(),
184            block_time: DEFAULT_BLOCK_TIME,
185            sampling_window: config.sampling_window,
186            pruning_window: config.pruning_window,
187        }));
188
189        let tasks_cancellation_token = CancellationToken::new();
190
191        // spawn the task that will stop the services when the fraud is detected
192        let network_compromised_task = spawn_cancellable(tasks_cancellation_token.child_token(), {
193            let network_compromised_token = p2p.get_network_compromised_token().await?;
194            let syncer = syncer.clone();
195            let daser = daser.clone();
196            let pruner = pruner.clone();
197            let event_pub = event_channel.publisher();
198
199            async move {
200                network_compromised_token.triggered().await;
201
202                // Network compromised! Stop workers.
203                syncer.stop();
204                daser.stop();
205                pruner.stop();
206
207                event_pub.send(NodeEvent::NetworkCompromised);
208                // This is a very important message and we want to log it even
209                // if user consumes our events.
210                warn!("{}", NodeEvent::NetworkCompromised);
211            }
212        });
213
214        let node = Node {
215            event_channel,
216            p2p: Some(p2p),
217            blockstore: Some(blockstore),
218            store: Some(store),
219            syncer: Some(syncer),
220            daser: Some(daser),
221            pruner: Some(pruner),
222            tasks_cancellation_token,
223            network_compromised_task,
224        };
225
226        Ok((node, event_sub))
227    }
228
229    /// Stop the node.
230    pub async fn stop(mut self) {
231        {
232            let daser = self.daser.take().expect("Daser not initialized");
233            let syncer = self.syncer.take().expect("Syncer not initialized");
234            let pruner = self.pruner.take().expect("Pruner not initialized");
235            let p2p = self.p2p.take().expect("P2p not initialized");
236
237            // Cancel Node's tasks
238            self.tasks_cancellation_token.cancel();
239            self.network_compromised_task.join().await;
240
241            // Stop all components that use P2p.
242            daser.stop();
243            syncer.stop();
244            pruner.stop();
245
246            daser.join().await;
247            syncer.join().await;
248            pruner.join().await;
249
250            // Now stop P2p component.
251            p2p.stop();
252            p2p.join().await;
253        }
254
255        // Everything that was holding Blockstore is now dropped, so we can close it.
256        let blockstore = self.blockstore.take().expect("Blockstore not initialized");
257        let blockstore = Arc::into_inner(blockstore).expect("Not all Arc<Blockstore> were dropped");
258        if let Err(e) = blockstore.close().await {
259            warn!("Blockstore failed to close: {e}");
260        }
261
262        // Everything that was holding Store is now dropped, so we can close it.
263        let store = self.store.take().expect("Store not initialized");
264        let store = Arc::into_inner(store).expect("Not all Arc<Store> were dropped");
265        if let Err(e) = store.close().await {
266            warn!("Store failed to close: {e}");
267        }
268
269        self.event_channel.publisher().send(NodeEvent::NodeStopped);
270    }
271
272    fn syncer(&self) -> &Syncer<S> {
273        self.syncer.as_ref().expect("Syncer not initialized")
274    }
275
276    fn p2p(&self) -> &P2p {
277        self.p2p.as_ref().expect("P2p not initialized")
278    }
279
280    fn store(&self) -> &S {
281        self.store.as_ref().expect("Store not initialized")
282    }
283
284    /// Returns a new `EventSubscriber`.
285    pub fn event_subscriber(&self) -> EventSubscriber {
286        self.event_channel.subscribe()
287    }
288
289    /// Get node's local peer ID.
290    pub fn local_peer_id(&self) -> &PeerId {
291        self.p2p().local_peer_id()
292    }
293
294    /// Get current [`PeerTrackerInfo`].
295    pub fn peer_tracker_info(&self) -> PeerTrackerInfo {
296        self.p2p().peer_tracker_info().clone()
297    }
298
299    /// Get [`PeerTrackerInfo`] watcher.
300    pub fn peer_tracker_info_watcher(&self) -> watch::Receiver<PeerTrackerInfo> {
301        self.p2p().peer_tracker_info_watcher()
302    }
303
304    /// Wait until the node is connected to at least 1 peer.
305    pub async fn wait_connected(&self) -> Result<()> {
306        Ok(self.p2p().wait_connected().await?)
307    }
308
309    /// Wait until the node is connected to at least 1 trusted peer.
310    pub async fn wait_connected_trusted(&self) -> Result<()> {
311        Ok(self.p2p().wait_connected_trusted().await?)
312    }
313
314    /// Get current network info.
315    pub async fn network_info(&self) -> Result<NetworkInfo> {
316        Ok(self.p2p().network_info().await?)
317    }
318
319    /// Get all the multiaddresses on which the node listens.
320    pub async fn listeners(&self) -> Result<Vec<Multiaddr>> {
321        Ok(self.p2p().listeners().await?)
322    }
323
324    /// Get all the peers that node is connected to.
325    pub async fn connected_peers(&self) -> Result<Vec<PeerId>> {
326        Ok(self.p2p().connected_peers().await?)
327    }
328
329    /// Trust or untrust the peer with a given ID.
330    pub async fn set_peer_trust(&self, peer_id: PeerId, is_trusted: bool) -> Result<()> {
331        Ok(self.p2p().set_peer_trust(peer_id, is_trusted).await?)
332    }
333
334    /// Request the head header from the network.
335    pub async fn request_head_header(&self) -> Result<ExtendedHeader> {
336        Ok(self.p2p().get_head_header().await?)
337    }
338
339    /// Request a header for the block with a given hash from the network.
340    pub async fn request_header_by_hash(&self, hash: &Hash) -> Result<ExtendedHeader> {
341        Ok(self.p2p().get_header(*hash).await?)
342    }
343
344    /// Request a header for the block with a given height from the network.
345    pub async fn request_header_by_height(&self, hash: u64) -> Result<ExtendedHeader> {
346        Ok(self.p2p().get_header_by_height(hash).await?)
347    }
348
349    /// Request headers in range (from, from + amount] from the network.
350    ///
351    /// The headers will be verified with the `from` header.
352    pub async fn request_verified_headers(
353        &self,
354        from: &ExtendedHeader,
355        amount: u64,
356    ) -> Result<Vec<ExtendedHeader>> {
357        Ok(self.p2p().get_verified_headers_range(from, amount).await?)
358    }
359
360    /// Request a verified [`Row`] from the network.
361    ///
362    /// # Errors
363    ///
364    /// On failure to receive a verified [`Row`] within a certain time, the
365    /// `NodeError::P2p(P2pError::BitswapQueryTimeout)` error will be returned.
366    pub async fn request_row(
367        &self,
368        row_index: u16,
369        block_height: u64,
370        timeout: Option<Duration>,
371    ) -> Result<Row> {
372        Ok(self.p2p().get_row(row_index, block_height, timeout).await?)
373    }
374
375    /// Request a verified [`Sample`] from the network.
376    ///
377    /// # Errors
378    ///
379    /// On failure to receive a verified [`Sample`] within a certain time, the
380    /// `NodeError::P2p(P2pError::BitswapQueryTimeout)` error will be returned.
381    pub async fn request_sample(
382        &self,
383        row_index: u16,
384        column_index: u16,
385        block_height: u64,
386        timeout: Option<Duration>,
387    ) -> Result<Sample> {
388        let sample = self
389            .p2p()
390            .get_sample(row_index, column_index, block_height, timeout)
391            .await?;
392
393        // We want to immediately remove the sample from blockstore
394        // but **only if** it wasn't chosen for DASing. Otherwise we could
395        // accidentally remove samples needed for the block reconstruction.
396        //
397        // There's a small possibility of permanently storing this sample if
398        // persistent blockstore is used and user closes tab / kills process
399        // before the remove is called, but it is acceptable tradeoff to avoid complexity.
400        //
401        // TODO: It should be properly solved when we switch from bitswap to shrex.
402        if let Some(metadata) = self.get_sampling_metadata(block_height).await? {
403            let cid = sample_cid(row_index, column_index, block_height)?;
404            if !metadata.cids.contains(&cid) {
405                let blockstore = self
406                    .blockstore
407                    .as_ref()
408                    .expect("Blockstore not initialized");
409                let _ = blockstore.remove(&cid).await;
410            }
411        }
412
413        Ok(sample)
414    }
415
416    /// Request a verified [`RowNamespaceData`] from the network.
417    ///
418    /// # Errors
419    ///
420    /// On failure to receive a verified [`RowNamespaceData`] within a certain time, the
421    /// `NodeError::P2p(P2pError::BitswapQueryTimeout)` error will be returned.
422    pub async fn request_row_namespace_data(
423        &self,
424        namespace: Namespace,
425        row_index: u16,
426        block_height: u64,
427        timeout: Option<Duration>,
428    ) -> Result<RowNamespaceData> {
429        Ok(self
430            .p2p()
431            .get_row_namespace_data(namespace, row_index, block_height, timeout)
432            .await?)
433    }
434
435    /// Request all blobs with provided namespace in the block corresponding to this header
436    /// using bitswap protocol.
437    pub async fn request_all_blobs(
438        &self,
439        namespace: Namespace,
440        block_height: u64,
441        timeout: Option<Duration>,
442    ) -> Result<Vec<Blob>> {
443        Ok(self
444            .p2p()
445            .get_all_blobs(namespace, block_height, timeout, self.store())
446            .await?)
447    }
448
449    /// Get current header syncing info.
450    pub async fn syncer_info(&self) -> Result<SyncingInfo> {
451        Ok(self.syncer().info().await?)
452    }
453
454    /// Get the latest header announced in the network.
455    pub async fn get_network_head_header(&self) -> Result<Option<ExtendedHeader>> {
456        Ok(self.p2p().get_network_head().await?)
457    }
458
459    /// Get the latest locally synced header.
460    pub async fn get_local_head_header(&self) -> Result<ExtendedHeader> {
461        Ok(self.store().get_head().await?)
462    }
463
464    /// Get a synced header for the block with a given hash.
465    pub async fn get_header_by_hash(&self, hash: &Hash) -> Result<ExtendedHeader> {
466        Ok(self.store().get_by_hash(hash).await?)
467    }
468
469    /// Get a synced header for the block with a given height.
470    pub async fn get_header_by_height(&self, height: u64) -> Result<ExtendedHeader> {
471        Ok(self.store().get_by_height(height).await?)
472    }
473
474    /// Get synced headers from the given heights range.
475    ///
476    /// If start of the range is unbounded, the first returned header will be of height 1.
477    /// If end of the range is unbounded, the last returned header will be the last header in the
478    /// store.
479    ///
480    /// # Errors
481    ///
482    /// If range contains a height of a header that is not found in the store or [`RangeBounds`]
483    /// cannot be converted to a valid range.
484    pub async fn get_headers<R>(&self, range: R) -> Result<Vec<ExtendedHeader>>
485    where
486        R: RangeBounds<u64> + Send,
487    {
488        Ok(self.store().get_range(range).await?)
489    }
490
491    /// Get data sampling metadata of an already sampled height.
492    ///
493    /// Returns `Ok(None)` if metadata for the given height does not exists.
494    pub async fn get_sampling_metadata(&self, height: u64) -> Result<Option<SamplingMetadata>> {
495        match self.store().get_sampling_metadata(height).await {
496            Ok(val) => Ok(val),
497            Err(StoreError::NotFound) => Ok(None),
498            Err(e) => Err(e.into()),
499        }
500    }
501}
502
503impl<B, S> Drop for Node<B, S>
504where
505    B: Blockstore,
506    S: Store,
507{
508    fn drop(&mut self) {
509        // Stop everything, but don't join them.
510        self.tasks_cancellation_token.cancel();
511
512        if let Some(daser) = self.daser.take() {
513            daser.stop();
514        }
515
516        if let Some(syncer) = self.syncer.take() {
517            syncer.stop();
518        }
519
520        if let Some(pruner) = self.pruner.take() {
521            pruner.stop();
522        }
523
524        if let Some(p2p) = self.p2p.take() {
525            p2p.stop();
526        }
527    }
528}