1use std::ops::RangeBounds;
7use std::sync::Arc;
8use std::time::Duration;
9
10use libp2p::identity::Keypair;
11use libp2p::swarm::NetworkInfo;
12use libp2p::{Multiaddr, PeerId};
13use tokio::sync::{broadcast, mpsc, watch};
14use tokio_stream::wrappers::ReceiverStream;
15use tokio_util::sync::CancellationToken;
16use tracing::warn;
17
18use blockstore::Blockstore;
19use celestia_types::blob::BlobsAtHeight;
20use celestia_types::hash::Hash;
21use celestia_types::namespace_data::NamespaceData;
22use celestia_types::nmt::Namespace;
23use celestia_types::row::Row;
24use celestia_types::row_namespace_data::RowNamespaceData;
25use celestia_types::sample::Sample;
26use celestia_types::{Blob, ExtendedDataSquare, ExtendedHeader, SharesAtHeight};
27use lumina_utils::executor::{JoinHandle, spawn, spawn_cancellable};
28
29use crate::blockstore::{InMemoryBlockstore, SampleBlockstore};
30use crate::daser::{
31 DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY, DEFAULT_CONCURENCY_LIMIT, Daser, DaserArgs,
32};
33use crate::events::{EventChannel, EventSubscriber, NodeEvent};
34use crate::node::subscriptions::{SubscriptionError, forward_new_blobs, forward_new_shares};
35use crate::p2p::shwap::sample_cid;
36use crate::p2p::{P2p, P2pArgs};
37use crate::pruner::{Pruner, PrunerArgs};
38use crate::store::{InMemoryStore, SamplingMetadata, Store, StoreError};
39use crate::syncer::{Syncer, SyncerArgs};
40
41mod builder;
42pub mod subscriptions;
43
44pub use self::builder::{
45 DEFAULT_PRUNING_WINDOW, DEFAULT_PRUNING_WINDOW_IN_MEMORY, NodeBuilder, NodeBuilderError,
46 SAMPLING_WINDOW,
47};
48pub use crate::daser::DaserError;
49pub use crate::p2p::{HeaderExError, P2pError};
50pub use crate::peer_tracker::PeerTrackerInfo;
51pub use crate::syncer::{SyncerError, SyncingInfo};
52
53const DEFAULT_BLOCK_TIME: Duration = Duration::from_secs(6);
54
55pub type Result<T, E = NodeError> = std::result::Result<T, E>;
59
60#[derive(Debug, thiserror::Error)]
62pub enum NodeError {
63 #[error("NodeBuilder: {0}")]
65 NodeBuilder(#[from] NodeBuilderError),
66
67 #[error("P2p: {0}")]
69 P2p(#[from] P2pError),
70
71 #[error("Syncer: {0}")]
73 Syncer(#[from] SyncerError),
74
75 #[error("Store: {0}")]
77 Store(#[from] StoreError),
78
79 #[error("Daser: {0}")]
81 Daser(#[from] DaserError),
82}
83
84struct NodeConfig<B, S>
85where
86 B: Blockstore,
87 S: Store,
88{
89 pub(crate) blockstore: B,
90 pub(crate) store: S,
91 pub(crate) network_id: String,
92 pub(crate) p2p_local_keypair: Keypair,
93 pub(crate) p2p_bootnodes: Vec<Multiaddr>,
94 pub(crate) p2p_listen_on: Vec<Multiaddr>,
95 pub(crate) sync_batch_size: u64,
96 pub(crate) sampling_window: Duration,
97 pub(crate) pruning_window: Duration,
98}
99
100pub struct Node<B, S>
102where
103 B: Blockstore + 'static,
104 S: Store + 'static,
105{
106 event_channel: EventChannel,
107 p2p: Option<Arc<P2p>>,
108 blockstore: Option<Arc<SampleBlockstore<B>>>,
109 store: Option<Arc<S>>,
110 syncer: Option<Arc<Syncer<S>>>,
111 daser: Option<Arc<Daser>>,
112 pruner: Option<Arc<Pruner>>,
113 tasks_cancellation_token: CancellationToken,
114 network_compromised_task: JoinHandle,
115}
116
117impl Node<InMemoryBlockstore, InMemoryStore> {
118 pub fn builder() -> NodeBuilder<InMemoryBlockstore, InMemoryStore> {
138 NodeBuilder::new()
139 }
140}
141
142impl<B, S> Node<B, S>
143where
144 B: Blockstore,
145 S: Store,
146{
147 async fn start(config: NodeConfig<B, S>) -> Result<(Self, EventSubscriber)> {
149 let event_channel = EventChannel::new();
150 let event_sub = event_channel.subscribe();
151 let store = Arc::new(config.store);
152 let blockstore = Arc::new(SampleBlockstore::new(config.blockstore));
153
154 let p2p = Arc::new(
155 P2p::start(P2pArgs {
156 network_id: config.network_id,
157 local_keypair: config.p2p_local_keypair,
158 bootnodes: config.p2p_bootnodes,
159 listen_on: config.p2p_listen_on,
160 blockstore: blockstore.clone(),
161 store: store.clone(),
162 event_pub: event_channel.publisher(),
163 })
164 .await?,
165 );
166
167 let syncer = Arc::new(Syncer::start(SyncerArgs {
168 store: store.clone(),
169 p2p: p2p.clone(),
170 event_pub: event_channel.publisher(),
171 batch_size: config.sync_batch_size,
172 sampling_window: config.sampling_window,
173 pruning_window: config.pruning_window,
174 })?);
175
176 let daser = Arc::new(Daser::start(DaserArgs {
177 p2p: p2p.clone(),
178 store: store.clone(),
179 event_pub: event_channel.publisher(),
180 sampling_window: config.sampling_window,
181 concurrency_limit: DEFAULT_CONCURENCY_LIMIT,
182 additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
183 })?);
184
185 let pruner = Arc::new(Pruner::start(PrunerArgs {
186 daser: daser.clone(),
187 store: store.clone(),
188 blockstore: blockstore.clone(),
189 event_pub: event_channel.publisher(),
190 block_time: DEFAULT_BLOCK_TIME,
191 sampling_window: config.sampling_window,
192 pruning_window: config.pruning_window,
193 }));
194
195 let tasks_cancellation_token = CancellationToken::new();
196
197 let network_compromised_task = spawn_cancellable(tasks_cancellation_token.child_token(), {
199 let network_compromised_token = p2p.get_network_compromised_token().await?;
200 let syncer = syncer.clone();
201 let daser = daser.clone();
202 let pruner = pruner.clone();
203 let event_pub = event_channel.publisher();
204
205 async move {
206 network_compromised_token.triggered().await;
207
208 syncer.stop();
210 daser.stop();
211 pruner.stop();
212
213 event_pub.send(NodeEvent::NetworkCompromised);
214 warn!("{}", NodeEvent::NetworkCompromised);
217 }
218 });
219
220 let node = Node {
221 event_channel,
222 p2p: Some(p2p),
223 blockstore: Some(blockstore),
224 store: Some(store),
225 syncer: Some(syncer),
226 daser: Some(daser),
227 pruner: Some(pruner),
228 tasks_cancellation_token,
229 network_compromised_task,
230 };
231
232 Ok((node, event_sub))
233 }
234
235 pub async fn stop(mut self) {
237 {
238 let daser = self.daser.take().expect("Daser not initialized");
239 let syncer = self.syncer.take().expect("Syncer not initialized");
240 let pruner = self.pruner.take().expect("Pruner not initialized");
241 let p2p = self.p2p.take().expect("P2p not initialized");
242
243 self.tasks_cancellation_token.cancel();
245 self.network_compromised_task.join().await;
246
247 daser.stop();
249 syncer.stop();
250 pruner.stop();
251
252 daser.join().await;
253 syncer.join().await;
254 pruner.join().await;
255
256 p2p.stop();
258 p2p.join().await;
259 }
260
261 let blockstore = self.blockstore.take().expect("Blockstore not initialized");
263 let blockstore = Arc::into_inner(blockstore).expect("Not all Arc<Blockstore> were dropped");
264 if let Err(e) = blockstore.close().await {
265 warn!("Blockstore failed to close: {e}");
266 }
267
268 let store = self.store.take().expect("Store not initialized");
270 let store = Arc::into_inner(store).expect("Not all Arc<Store> were dropped");
271 if let Err(e) = store.close().await {
272 warn!("Store failed to close: {e}");
273 }
274
275 self.event_channel.publisher().send(NodeEvent::NodeStopped);
276 }
277
278 fn syncer(&self) -> &Syncer<S> {
279 self.syncer.as_ref().expect("Syncer not initialized")
280 }
281
282 fn p2p(&self) -> &P2p {
283 self.p2p.as_ref().expect("P2p not initialized")
284 }
285
286 fn store(&self) -> &S {
287 self.store.as_ref().expect("Store not initialized")
288 }
289
290 pub fn event_subscriber(&self) -> EventSubscriber {
292 self.event_channel.subscribe()
293 }
294
295 pub fn local_peer_id(&self) -> &PeerId {
297 self.p2p().local_peer_id()
298 }
299
300 pub fn peer_tracker_info(&self) -> PeerTrackerInfo {
302 self.p2p().peer_tracker_info().clone()
303 }
304
305 pub fn peer_tracker_info_watcher(&self) -> watch::Receiver<PeerTrackerInfo> {
307 self.p2p().peer_tracker_info_watcher()
308 }
309
310 pub async fn wait_connected(&self) -> Result<()> {
312 Ok(self.p2p().wait_connected().await?)
313 }
314
315 pub async fn wait_connected_trusted(&self) -> Result<()> {
317 Ok(self.p2p().wait_connected_trusted().await?)
318 }
319
320 pub async fn network_info(&self) -> Result<NetworkInfo> {
322 Ok(self.p2p().network_info().await?)
323 }
324
325 pub async fn listeners(&self) -> Result<Vec<Multiaddr>> {
327 Ok(self.p2p().listeners().await?)
328 }
329
330 pub async fn connected_peers(&self) -> Result<Vec<PeerId>> {
332 Ok(self.p2p().connected_peers().await?)
333 }
334
335 pub async fn set_peer_trust(&self, peer_id: PeerId, is_trusted: bool) -> Result<()> {
337 Ok(self.p2p().set_peer_trust(peer_id, is_trusted).await?)
338 }
339
340 #[cfg(any(test, feature = "test-utils"))]
342 pub async fn mark_as_archival(&self, peer_id: PeerId) -> Result<()> {
343 Ok(self.p2p().mark_as_archival(peer_id).await?)
344 }
345
346 pub async fn request_head_header(&self) -> Result<ExtendedHeader> {
348 Ok(self.p2p().get_head_header().await?)
349 }
350
351 pub async fn request_header_by_hash(&self, hash: &Hash) -> Result<ExtendedHeader> {
353 Ok(self.p2p().get_header(*hash).await?)
354 }
355
356 pub async fn request_header_by_height(&self, height: u64) -> Result<ExtendedHeader> {
358 Ok(self.p2p().get_header_by_height(height).await?)
359 }
360
361 pub async fn request_verified_headers(
365 &self,
366 from: &ExtendedHeader,
367 amount: u64,
368 ) -> Result<Vec<ExtendedHeader>> {
369 Ok(self.p2p().get_verified_headers_range(from, amount).await?)
370 }
371
372 pub async fn request_row(
379 &self,
380 row_index: u16,
381 block_height: u64,
382 timeout: Option<Duration>,
383 ) -> Result<Row> {
384 Ok(self.p2p().get_row(row_index, block_height, timeout).await?)
385 }
386
387 pub async fn request_sample(
394 &self,
395 row_index: u16,
396 column_index: u16,
397 block_height: u64,
398 timeout: Option<Duration>,
399 ) -> Result<Sample> {
400 let sample = self
401 .p2p()
402 .get_sample(row_index, column_index, block_height, timeout)
403 .await?;
404
405 if let Some(metadata) = self.get_sampling_metadata(block_height).await? {
415 let cid = sample_cid(row_index, column_index, block_height)?;
416 if !metadata.cids.contains(&cid) {
417 let blockstore = self
418 .blockstore
419 .as_ref()
420 .expect("Blockstore not initialized");
421 let _ = blockstore.remove(&cid).await;
422 }
423 }
424
425 Ok(sample)
426 }
427
428 pub async fn request_extended_data_square(
435 &self,
436 block_height: u64,
437 timeout: Option<Duration>,
438 ) -> Result<ExtendedDataSquare> {
439 Ok(self.p2p().get_eds(block_height, timeout).await?)
440 }
441
442 pub async fn request_row_namespace_data(
449 &self,
450 namespace: Namespace,
451 row_index: u16,
452 block_height: u64,
453 timeout: Option<Duration>,
454 ) -> Result<RowNamespaceData> {
455 Ok(self
456 .p2p()
457 .get_row_namespace_data(namespace, row_index, block_height, timeout)
458 .await?)
459 }
460
461 pub async fn request_namespace_data(
468 &self,
469 namespace: Namespace,
470 block_height: u64,
471 timeout: Option<Duration>,
472 ) -> Result<NamespaceData> {
473 Ok(self
474 .p2p()
475 .get_namespace_data(namespace, block_height, timeout)
476 .await?)
477 }
478
479 pub async fn request_all_blobs(
482 &self,
483 namespace: Namespace,
484 block_height: u64,
485 timeout: Option<Duration>,
486 ) -> Result<Vec<Blob>> {
487 Ok(self
488 .p2p()
489 .get_all_blobs(namespace, block_height, timeout, self.store())
490 .await?)
491 }
492
493 pub async fn syncer_info(&self) -> Result<SyncingInfo> {
495 Ok(self.syncer().info().await?)
496 }
497
498 pub async fn get_network_head_header(&self) -> Result<Option<ExtendedHeader>> {
500 Ok(self.p2p().get_network_head().await?)
501 }
502
503 pub async fn get_local_head_header(&self) -> Result<ExtendedHeader> {
505 Ok(self.store().get_head().await?)
506 }
507
508 pub async fn get_header_by_hash(&self, hash: &Hash) -> Result<ExtendedHeader> {
510 Ok(self.store().get_by_hash(hash).await?)
511 }
512
513 pub async fn get_header_by_height(&self, height: u64) -> Result<ExtendedHeader> {
515 Ok(self.store().get_by_height(height).await?)
516 }
517
518 pub async fn get_headers<R>(&self, range: R) -> Result<Vec<ExtendedHeader>>
529 where
530 R: RangeBounds<u64> + Send,
531 {
532 Ok(self.store().get_range(range).await?)
533 }
534
535 pub async fn get_sampling_metadata(&self, height: u64) -> Result<Option<SamplingMetadata>> {
539 match self.store().get_sampling_metadata(height).await {
540 Ok(val) => Ok(val),
541 Err(StoreError::NotFound) => Ok(None),
542 Err(e) => Err(e.into()),
543 }
544 }
545
546 pub async fn header_subscribe(&self) -> Result<broadcast::Receiver<ExtendedHeader>> {
551 Ok(self.syncer().subscribe_headers().await?)
552 }
553
554 pub async fn blob_subscribe(
559 &self,
560 namespace: Namespace,
561 ) -> Result<ReceiverStream<Result<BlobsAtHeight, SubscriptionError>>> {
562 let header_receiver = self.header_subscribe().await?;
563 let p2p = self.p2p.as_ref().cloned().expect("p2p should be present");
564
565 let (tx, rx) = mpsc::channel(16);
569
570 spawn(async move { forward_new_blobs(namespace, tx, header_receiver, p2p).await });
571
572 Ok(ReceiverStream::new(rx))
573 }
574
575 pub async fn namespace_subscribe(
580 &self,
581 namespace: Namespace,
582 ) -> Result<ReceiverStream<Result<SharesAtHeight, SubscriptionError>>> {
583 let header_receiver = self.header_subscribe().await?;
584 let p2p = self.p2p.as_ref().cloned().expect("p2p should be present");
585
586 let (tx, rx) = mpsc::channel(16);
590
591 spawn(async move { forward_new_shares(namespace, tx, header_receiver, p2p).await });
592
593 Ok(ReceiverStream::new(rx))
594 }
595}
596
597impl<B, S> Drop for Node<B, S>
598where
599 B: Blockstore,
600 S: Store,
601{
602 fn drop(&mut self) {
603 self.tasks_cancellation_token.cancel();
605
606 if let Some(daser) = self.daser.take() {
607 daser.stop();
608 }
609
610 if let Some(syncer) = self.syncer.take() {
611 syncer.stop();
612 }
613
614 if let Some(pruner) = self.pruner.take() {
615 pruner.stop();
616 }
617
618 if let Some(p2p) = self.p2p.take() {
619 p2p.stop();
620 }
621 }
622}