lumina_node/
node.rs

1//! Node that connects to Celestia's P2P network.
2//!
3//! Upon creation, `Node` will try to connect to Celestia's P2P network
4//! and then proceed with synchronization and data sampling of the blocks.
5
6use std::ops::RangeBounds;
7use std::sync::Arc;
8use std::time::Duration;
9
10use blockstore::Blockstore;
11use celestia_types::hash::Hash;
12use celestia_types::nmt::Namespace;
13use celestia_types::row::Row;
14use celestia_types::row_namespace_data::RowNamespaceData;
15use celestia_types::sample::Sample;
16use celestia_types::{Blob, ExtendedHeader};
17use libp2p::identity::Keypair;
18use libp2p::swarm::NetworkInfo;
19use libp2p::{Multiaddr, PeerId};
20use lumina_utils::executor::{spawn_cancellable, JoinHandle};
21use tokio::sync::watch;
22use tokio_util::sync::CancellationToken;
23use tracing::warn;
24
25use crate::blockstore::InMemoryBlockstore;
26use crate::daser::{Daser, DaserArgs};
27use crate::events::{EventChannel, EventSubscriber, NodeEvent};
28use crate::p2p::{P2p, P2pArgs};
29use crate::pruner::{Pruner, PrunerArgs, DEFAULT_PRUNING_INTERVAL};
30use crate::store::{InMemoryStore, SamplingMetadata, Store, StoreError};
31use crate::syncer::{Syncer, SyncerArgs};
32
33mod builder;
34
35pub use self::builder::{
36    NodeBuilder, NodeBuilderError, DEFAULT_PRUNING_DELAY, DEFAULT_SAMPLING_WINDOW,
37    MIN_PRUNING_DELAY, MIN_SAMPLING_WINDOW,
38};
39pub use crate::daser::DaserError;
40pub use crate::p2p::{HeaderExError, P2pError};
41pub use crate::peer_tracker::PeerTrackerInfo;
42pub use crate::syncer::{SyncerError, SyncingInfo};
43
44/// Alias of [`Result`] with [`NodeError`] error type
45///
46/// [`Result`]: std::result::Result
47pub type Result<T, E = NodeError> = std::result::Result<T, E>;
48
49/// Representation of all the errors that can occur when interacting with the [`Node`].
50#[derive(Debug, thiserror::Error)]
51pub enum NodeError {
52    /// An error propagated from the [`NodeBuilder`] component.
53    #[error("NodeBuilder: {0}")]
54    NodeBuilder(#[from] NodeBuilderError),
55
56    /// An error propagated from the `P2p` component.
57    #[error("P2p: {0}")]
58    P2p(#[from] P2pError),
59
60    /// An error propagated from the `Syncer` component.
61    #[error("Syncer: {0}")]
62    Syncer(#[from] SyncerError),
63
64    /// An error propagated from the [`Store`] component.
65    #[error("Store: {0}")]
66    Store(#[from] StoreError),
67
68    /// An error propagated from the `Daser` component.
69    #[error("Daser: {0}")]
70    Daser(#[from] DaserError),
71}
72
73struct NodeConfig<B, S>
74where
75    B: Blockstore,
76    S: Store,
77{
78    pub(crate) blockstore: B,
79    pub(crate) store: S,
80    pub(crate) network_id: String,
81    pub(crate) p2p_local_keypair: Keypair,
82    pub(crate) p2p_bootnodes: Vec<Multiaddr>,
83    pub(crate) p2p_listen_on: Vec<Multiaddr>,
84    pub(crate) sync_batch_size: u64,
85    pub(crate) sampling_window: Duration,
86    pub(crate) pruning_window: Duration,
87}
88
89/// Celestia node.
90pub struct Node<B, S>
91where
92    B: Blockstore + 'static,
93    S: Store + 'static,
94{
95    event_channel: EventChannel,
96    p2p: Option<Arc<P2p>>,
97    blockstore: Option<Arc<B>>,
98    store: Option<Arc<S>>,
99    syncer: Option<Arc<Syncer<S>>>,
100    daser: Option<Arc<Daser>>,
101    pruner: Option<Arc<Pruner>>,
102    tasks_cancellation_token: CancellationToken,
103    network_compromised_task: JoinHandle,
104}
105
106impl Node<InMemoryBlockstore, InMemoryStore> {
107    /// Creates a new [`NodeBuilder`] that is using in-memory stores.
108    ///
109    /// After the creation you can use [`NodeBuilder::blockstore`]
110    /// and [`NodeBuilder::store`] to set other stores.
111    ///
112    /// # Example
113    ///
114    /// ```no_run
115    /// # use lumina_node::network::Network;
116    /// # use lumina_node::Node;
117    /// #
118    /// # async fn example() {
119    /// let node = Node::builder()
120    ///     .network(Network::Mainnet)
121    ///     .start()
122    ///     .await
123    ///     .unwrap();
124    /// # }
125    /// ```
126    pub fn builder() -> NodeBuilder<InMemoryBlockstore, InMemoryStore> {
127        NodeBuilder::new()
128    }
129}
130
131impl<B, S> Node<B, S>
132where
133    B: Blockstore,
134    S: Store,
135{
136    /// Creates and starts a new celestia node with a given config.
137    async fn start(config: NodeConfig<B, S>) -> Result<(Self, EventSubscriber)> {
138        let event_channel = EventChannel::new();
139        let event_sub = event_channel.subscribe();
140        let store = Arc::new(config.store);
141        let blockstore = Arc::new(config.blockstore);
142
143        let p2p = Arc::new(
144            P2p::start(P2pArgs {
145                network_id: config.network_id,
146                local_keypair: config.p2p_local_keypair,
147                bootnodes: config.p2p_bootnodes,
148                listen_on: config.p2p_listen_on,
149                blockstore: blockstore.clone(),
150                store: store.clone(),
151                event_pub: event_channel.publisher(),
152            })
153            .await?,
154        );
155
156        let syncer = Arc::new(Syncer::start(SyncerArgs {
157            store: store.clone(),
158            p2p: p2p.clone(),
159            event_pub: event_channel.publisher(),
160            batch_size: config.sync_batch_size,
161            // We sync only what we need to sample. So syncing_window is
162            // the same as sampling_window.
163            syncing_window: config.sampling_window,
164        })?);
165
166        let daser = Arc::new(Daser::start(DaserArgs {
167            p2p: p2p.clone(),
168            store: store.clone(),
169            event_pub: event_channel.publisher(),
170            sampling_window: config.sampling_window,
171        })?);
172
173        let pruner = Arc::new(Pruner::start(PrunerArgs {
174            store: store.clone(),
175            blockstore: blockstore.clone(),
176            event_pub: event_channel.publisher(),
177            pruning_interval: DEFAULT_PRUNING_INTERVAL,
178            pruning_window: config.pruning_window,
179        }));
180
181        let tasks_cancellation_token = CancellationToken::new();
182
183        // spawn the task that will stop the services when the fraud is detected
184        let network_compromised_task = spawn_cancellable(tasks_cancellation_token.child_token(), {
185            let network_compromised_token = p2p.get_network_compromised_token().await?;
186            let syncer = syncer.clone();
187            let daser = daser.clone();
188            let pruner = pruner.clone();
189            let event_pub = event_channel.publisher();
190
191            async move {
192                network_compromised_token.triggered().await;
193
194                // Network compromised! Stop workers.
195                syncer.stop();
196                daser.stop();
197                pruner.stop();
198
199                event_pub.send(NodeEvent::NetworkCompromised);
200                // This is a very important message and we want to log it even
201                // if user consumes our events.
202                warn!("{}", NodeEvent::NetworkCompromised);
203            }
204        });
205
206        let node = Node {
207            event_channel,
208            p2p: Some(p2p),
209            blockstore: Some(blockstore),
210            store: Some(store),
211            syncer: Some(syncer),
212            daser: Some(daser),
213            pruner: Some(pruner),
214            tasks_cancellation_token,
215            network_compromised_task,
216        };
217
218        Ok((node, event_sub))
219    }
220
221    /// Stop the node.
222    pub async fn stop(mut self) {
223        {
224            let daser = self.daser.take().expect("Daser not initialized");
225            let syncer = self.syncer.take().expect("Syncer not initialized");
226            let pruner = self.pruner.take().expect("Pruner not initialized");
227            let p2p = self.p2p.take().expect("P2p not initialized");
228
229            // Cancel Node's tasks
230            self.tasks_cancellation_token.cancel();
231            self.network_compromised_task.join().await;
232
233            // Stop all components that use P2p.
234            daser.stop();
235            syncer.stop();
236            pruner.stop();
237
238            daser.join().await;
239            syncer.join().await;
240            pruner.join().await;
241
242            // Now stop P2p component.
243            p2p.stop();
244            p2p.join().await;
245        }
246
247        // Everything that was holding Blockstore is now dropped, so we can close it.
248        let blockstore = self.blockstore.take().expect("Blockstore not initialized");
249        let blockstore = Arc::into_inner(blockstore).expect("Not all Arc<Blockstore> were dropped");
250        if let Err(e) = blockstore.close().await {
251            warn!("Blockstore failed to close: {e}");
252        }
253
254        // Everything that was holding Store is now dropped, so we can close it.
255        let store = self.store.take().expect("Store not initialized");
256        let store = Arc::into_inner(store).expect("Not all Arc<Store> were dropped");
257        if let Err(e) = store.close().await {
258            warn!("Store failed to close: {e}");
259        }
260
261        self.event_channel.publisher().send(NodeEvent::NodeStopped);
262    }
263
264    fn syncer(&self) -> &Syncer<S> {
265        self.syncer.as_ref().expect("Syncer not initialized")
266    }
267
268    fn p2p(&self) -> &P2p {
269        self.p2p.as_ref().expect("P2p not initialized")
270    }
271
272    fn store(&self) -> &S {
273        self.store.as_ref().expect("Store not initialized")
274    }
275
276    /// Returns a new `EventSubscriber`.
277    pub fn event_subscriber(&self) -> EventSubscriber {
278        self.event_channel.subscribe()
279    }
280
281    /// Get node's local peer ID.
282    pub fn local_peer_id(&self) -> &PeerId {
283        self.p2p().local_peer_id()
284    }
285
286    /// Get current [`PeerTrackerInfo`].
287    pub fn peer_tracker_info(&self) -> PeerTrackerInfo {
288        self.p2p().peer_tracker_info().clone()
289    }
290
291    /// Get [`PeerTrackerInfo`] watcher.
292    pub fn peer_tracker_info_watcher(&self) -> watch::Receiver<PeerTrackerInfo> {
293        self.p2p().peer_tracker_info_watcher()
294    }
295
296    /// Wait until the node is connected to at least 1 peer.
297    pub async fn wait_connected(&self) -> Result<()> {
298        Ok(self.p2p().wait_connected().await?)
299    }
300
301    /// Wait until the node is connected to at least 1 trusted peer.
302    pub async fn wait_connected_trusted(&self) -> Result<()> {
303        Ok(self.p2p().wait_connected_trusted().await?)
304    }
305
306    /// Get current network info.
307    pub async fn network_info(&self) -> Result<NetworkInfo> {
308        Ok(self.p2p().network_info().await?)
309    }
310
311    /// Get all the multiaddresses on which the node listens.
312    pub async fn listeners(&self) -> Result<Vec<Multiaddr>> {
313        Ok(self.p2p().listeners().await?)
314    }
315
316    /// Get all the peers that node is connected to.
317    pub async fn connected_peers(&self) -> Result<Vec<PeerId>> {
318        Ok(self.p2p().connected_peers().await?)
319    }
320
321    /// Trust or untrust the peer with a given ID.
322    pub async fn set_peer_trust(&self, peer_id: PeerId, is_trusted: bool) -> Result<()> {
323        Ok(self.p2p().set_peer_trust(peer_id, is_trusted).await?)
324    }
325
326    /// Request the head header from the network.
327    pub async fn request_head_header(&self) -> Result<ExtendedHeader> {
328        Ok(self.p2p().get_head_header().await?)
329    }
330
331    /// Request a header for the block with a given hash from the network.
332    pub async fn request_header_by_hash(&self, hash: &Hash) -> Result<ExtendedHeader> {
333        Ok(self.p2p().get_header(*hash).await?)
334    }
335
336    /// Request a header for the block with a given height from the network.
337    pub async fn request_header_by_height(&self, hash: u64) -> Result<ExtendedHeader> {
338        Ok(self.p2p().get_header_by_height(hash).await?)
339    }
340
341    /// Request headers in range (from, from + amount] from the network.
342    ///
343    /// The headers will be verified with the `from` header.
344    pub async fn request_verified_headers(
345        &self,
346        from: &ExtendedHeader,
347        amount: u64,
348    ) -> Result<Vec<ExtendedHeader>> {
349        Ok(self.p2p().get_verified_headers_range(from, amount).await?)
350    }
351
352    /// Request a verified [`Row`] from the network.
353    ///
354    /// # Errors
355    ///
356    /// On failure to receive a verified [`Row`] within a certain time, the
357    /// `NodeError::P2p(P2pError::BitswapQueryTimeout)` error will be returned.
358    pub async fn request_row(
359        &self,
360        row_index: u16,
361        block_height: u64,
362        timeout: Option<Duration>,
363    ) -> Result<Row> {
364        Ok(self.p2p().get_row(row_index, block_height, timeout).await?)
365    }
366
367    /// Request a verified [`Sample`] from the network.
368    ///
369    /// # Errors
370    ///
371    /// On failure to receive a verified [`Sample`] within a certain time, the
372    /// `NodeError::P2p(P2pError::BitswapQueryTimeout)` error will be returned.
373    pub async fn request_sample(
374        &self,
375        row_index: u16,
376        column_index: u16,
377        block_height: u64,
378        timeout: Option<Duration>,
379    ) -> Result<Sample> {
380        Ok(self
381            .p2p()
382            .get_sample(row_index, column_index, block_height, timeout)
383            .await?)
384    }
385
386    /// Request a verified [`RowNamespaceData`] from the network.
387    ///
388    /// # Errors
389    ///
390    /// On failure to receive a verified [`RowNamespaceData`] within a certain time, the
391    /// `NodeError::P2p(P2pError::BitswapQueryTimeout)` error will be returned.
392    pub async fn request_row_namespace_data(
393        &self,
394        namespace: Namespace,
395        row_index: u16,
396        block_height: u64,
397        timeout: Option<Duration>,
398    ) -> Result<RowNamespaceData> {
399        Ok(self
400            .p2p()
401            .get_row_namespace_data(namespace, row_index, block_height, timeout)
402            .await?)
403    }
404
405    /// Request all blobs with provided namespace in the block corresponding to this header
406    /// using bitswap protocol.
407    pub async fn request_all_blobs(
408        &self,
409        header: &ExtendedHeader,
410        namespace: Namespace,
411        timeout: Option<Duration>,
412    ) -> Result<Vec<Blob>> {
413        Ok(self.p2p().get_all_blobs(header, namespace, timeout).await?)
414    }
415
416    /// Get current header syncing info.
417    pub async fn syncer_info(&self) -> Result<SyncingInfo> {
418        Ok(self.syncer().info().await?)
419    }
420
421    /// Get the latest header announced in the network.
422    pub async fn get_network_head_header(&self) -> Result<Option<ExtendedHeader>> {
423        Ok(self.p2p().get_network_head().await?)
424    }
425
426    /// Get the latest locally synced header.
427    pub async fn get_local_head_header(&self) -> Result<ExtendedHeader> {
428        Ok(self.store().get_head().await?)
429    }
430
431    /// Get a synced header for the block with a given hash.
432    pub async fn get_header_by_hash(&self, hash: &Hash) -> Result<ExtendedHeader> {
433        Ok(self.store().get_by_hash(hash).await?)
434    }
435
436    /// Get a synced header for the block with a given height.
437    pub async fn get_header_by_height(&self, height: u64) -> Result<ExtendedHeader> {
438        Ok(self.store().get_by_height(height).await?)
439    }
440
441    /// Get synced headers from the given heights range.
442    ///
443    /// If start of the range is unbounded, the first returned header will be of height 1.
444    /// If end of the range is unbounded, the last returned header will be the last header in the
445    /// store.
446    ///
447    /// # Errors
448    ///
449    /// If range contains a height of a header that is not found in the store or [`RangeBounds`]
450    /// cannot be converted to a valid range.
451    pub async fn get_headers<R>(&self, range: R) -> Result<Vec<ExtendedHeader>>
452    where
453        R: RangeBounds<u64> + Send,
454    {
455        Ok(self.store().get_range(range).await?)
456    }
457
458    /// Get data sampling metadata of an already sampled height.
459    ///
460    /// Returns `Ok(None)` if metadata for the given height does not exists.
461    pub async fn get_sampling_metadata(&self, height: u64) -> Result<Option<SamplingMetadata>> {
462        match self.store().get_sampling_metadata(height).await {
463            Ok(val) => Ok(val),
464            Err(StoreError::NotFound) => Ok(None),
465            Err(e) => Err(e.into()),
466        }
467    }
468}
469
470impl<B, S> Drop for Node<B, S>
471where
472    B: Blockstore,
473    S: Store,
474{
475    fn drop(&mut self) {
476        // Stop everything, but don't join them.
477        self.tasks_cancellation_token.cancel();
478
479        if let Some(daser) = self.daser.take() {
480            daser.stop();
481        }
482
483        if let Some(syncer) = self.syncer.take() {
484            syncer.stop();
485        }
486
487        if let Some(pruner) = self.pruner.take() {
488            pruner.stop();
489        }
490
491        if let Some(p2p) = self.p2p.take() {
492            p2p.stop();
493        }
494    }
495}