1use std::ops::RangeBounds;
7use std::sync::Arc;
8use std::time::Duration;
9
10use blockstore::Blockstore;
11use celestia_types::hash::Hash;
12use celestia_types::nmt::Namespace;
13use celestia_types::row::Row;
14use celestia_types::row_namespace_data::RowNamespaceData;
15use celestia_types::sample::Sample;
16use celestia_types::{Blob, ExtendedHeader};
17use libp2p::identity::Keypair;
18use libp2p::swarm::NetworkInfo;
19use libp2p::{Multiaddr, PeerId};
20use lumina_utils::executor::{spawn_cancellable, JoinHandle};
21use tokio::sync::watch;
22use tokio_util::sync::CancellationToken;
23use tracing::warn;
24
25use crate::blockstore::{InMemoryBlockstore, SampleBlockstore};
26use crate::daser::{
27 Daser, DaserArgs, DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY, DEFAULT_CONCURENCY_LIMIT,
28};
29use crate::events::{EventChannel, EventSubscriber, NodeEvent};
30use crate::p2p::shwap::sample_cid;
31use crate::p2p::{P2p, P2pArgs};
32use crate::pruner::{Pruner, PrunerArgs};
33use crate::store::{InMemoryStore, SamplingMetadata, Store, StoreError};
34use crate::syncer::{Syncer, SyncerArgs};
35
36mod builder;
37
38pub use self::builder::{
39 NodeBuilder, NodeBuilderError, DEFAULT_PRUNING_WINDOW, DEFAULT_PRUNING_WINDOW_IN_MEMORY,
40 SAMPLING_WINDOW,
41};
42pub use crate::daser::DaserError;
43pub use crate::p2p::{HeaderExError, P2pError};
44pub use crate::peer_tracker::PeerTrackerInfo;
45pub use crate::syncer::{SyncerError, SyncingInfo};
46
47const DEFAULT_BLOCK_TIME: Duration = Duration::from_secs(6);
48
49pub type Result<T, E = NodeError> = std::result::Result<T, E>;
53
54#[derive(Debug, thiserror::Error)]
56pub enum NodeError {
57 #[error("NodeBuilder: {0}")]
59 NodeBuilder(#[from] NodeBuilderError),
60
61 #[error("P2p: {0}")]
63 P2p(#[from] P2pError),
64
65 #[error("Syncer: {0}")]
67 Syncer(#[from] SyncerError),
68
69 #[error("Store: {0}")]
71 Store(#[from] StoreError),
72
73 #[error("Daser: {0}")]
75 Daser(#[from] DaserError),
76}
77
78struct NodeConfig<B, S>
79where
80 B: Blockstore,
81 S: Store,
82{
83 pub(crate) blockstore: B,
84 pub(crate) store: S,
85 pub(crate) network_id: String,
86 pub(crate) p2p_local_keypair: Keypair,
87 pub(crate) p2p_bootnodes: Vec<Multiaddr>,
88 pub(crate) p2p_listen_on: Vec<Multiaddr>,
89 pub(crate) sync_batch_size: u64,
90 pub(crate) sampling_window: Duration,
91 pub(crate) pruning_window: Duration,
92}
93
94pub struct Node<B, S>
96where
97 B: Blockstore + 'static,
98 S: Store + 'static,
99{
100 event_channel: EventChannel,
101 p2p: Option<Arc<P2p>>,
102 blockstore: Option<Arc<SampleBlockstore<B>>>,
103 store: Option<Arc<S>>,
104 syncer: Option<Arc<Syncer<S>>>,
105 daser: Option<Arc<Daser>>,
106 pruner: Option<Arc<Pruner>>,
107 tasks_cancellation_token: CancellationToken,
108 network_compromised_task: JoinHandle,
109}
110
111impl Node<InMemoryBlockstore, InMemoryStore> {
112 pub fn builder() -> NodeBuilder<InMemoryBlockstore, InMemoryStore> {
132 NodeBuilder::new()
133 }
134}
135
136impl<B, S> Node<B, S>
137where
138 B: Blockstore,
139 S: Store,
140{
141 async fn start(config: NodeConfig<B, S>) -> Result<(Self, EventSubscriber)> {
143 let event_channel = EventChannel::new();
144 let event_sub = event_channel.subscribe();
145 let store = Arc::new(config.store);
146 let blockstore = Arc::new(SampleBlockstore::new(config.blockstore));
147
148 let p2p = Arc::new(
149 P2p::start(P2pArgs {
150 network_id: config.network_id,
151 local_keypair: config.p2p_local_keypair,
152 bootnodes: config.p2p_bootnodes,
153 listen_on: config.p2p_listen_on,
154 blockstore: blockstore.clone(),
155 store: store.clone(),
156 event_pub: event_channel.publisher(),
157 })
158 .await?,
159 );
160
161 let syncer = Arc::new(Syncer::start(SyncerArgs {
162 store: store.clone(),
163 p2p: p2p.clone(),
164 event_pub: event_channel.publisher(),
165 batch_size: config.sync_batch_size,
166 sampling_window: config.sampling_window,
167 pruning_window: config.pruning_window,
168 })?);
169
170 let daser = Arc::new(Daser::start(DaserArgs {
171 p2p: p2p.clone(),
172 store: store.clone(),
173 event_pub: event_channel.publisher(),
174 sampling_window: config.sampling_window,
175 concurrency_limit: DEFAULT_CONCURENCY_LIMIT,
176 additional_headersub_concurrency: DEFAULT_ADDITIONAL_HEADER_SUB_CONCURENCY,
177 })?);
178
179 let pruner = Arc::new(Pruner::start(PrunerArgs {
180 daser: daser.clone(),
181 store: store.clone(),
182 blockstore: blockstore.clone(),
183 event_pub: event_channel.publisher(),
184 block_time: DEFAULT_BLOCK_TIME,
185 sampling_window: config.sampling_window,
186 pruning_window: config.pruning_window,
187 }));
188
189 let tasks_cancellation_token = CancellationToken::new();
190
191 let network_compromised_task = spawn_cancellable(tasks_cancellation_token.child_token(), {
193 let network_compromised_token = p2p.get_network_compromised_token().await?;
194 let syncer = syncer.clone();
195 let daser = daser.clone();
196 let pruner = pruner.clone();
197 let event_pub = event_channel.publisher();
198
199 async move {
200 network_compromised_token.triggered().await;
201
202 syncer.stop();
204 daser.stop();
205 pruner.stop();
206
207 event_pub.send(NodeEvent::NetworkCompromised);
208 warn!("{}", NodeEvent::NetworkCompromised);
211 }
212 });
213
214 let node = Node {
215 event_channel,
216 p2p: Some(p2p),
217 blockstore: Some(blockstore),
218 store: Some(store),
219 syncer: Some(syncer),
220 daser: Some(daser),
221 pruner: Some(pruner),
222 tasks_cancellation_token,
223 network_compromised_task,
224 };
225
226 Ok((node, event_sub))
227 }
228
229 pub async fn stop(mut self) {
231 {
232 let daser = self.daser.take().expect("Daser not initialized");
233 let syncer = self.syncer.take().expect("Syncer not initialized");
234 let pruner = self.pruner.take().expect("Pruner not initialized");
235 let p2p = self.p2p.take().expect("P2p not initialized");
236
237 self.tasks_cancellation_token.cancel();
239 self.network_compromised_task.join().await;
240
241 daser.stop();
243 syncer.stop();
244 pruner.stop();
245
246 daser.join().await;
247 syncer.join().await;
248 pruner.join().await;
249
250 p2p.stop();
252 p2p.join().await;
253 }
254
255 let blockstore = self.blockstore.take().expect("Blockstore not initialized");
257 let blockstore = Arc::into_inner(blockstore).expect("Not all Arc<Blockstore> were dropped");
258 if let Err(e) = blockstore.close().await {
259 warn!("Blockstore failed to close: {e}");
260 }
261
262 let store = self.store.take().expect("Store not initialized");
264 let store = Arc::into_inner(store).expect("Not all Arc<Store> were dropped");
265 if let Err(e) = store.close().await {
266 warn!("Store failed to close: {e}");
267 }
268
269 self.event_channel.publisher().send(NodeEvent::NodeStopped);
270 }
271
272 fn syncer(&self) -> &Syncer<S> {
273 self.syncer.as_ref().expect("Syncer not initialized")
274 }
275
276 fn p2p(&self) -> &P2p {
277 self.p2p.as_ref().expect("P2p not initialized")
278 }
279
280 fn store(&self) -> &S {
281 self.store.as_ref().expect("Store not initialized")
282 }
283
284 pub fn event_subscriber(&self) -> EventSubscriber {
286 self.event_channel.subscribe()
287 }
288
289 pub fn local_peer_id(&self) -> &PeerId {
291 self.p2p().local_peer_id()
292 }
293
294 pub fn peer_tracker_info(&self) -> PeerTrackerInfo {
296 self.p2p().peer_tracker_info().clone()
297 }
298
299 pub fn peer_tracker_info_watcher(&self) -> watch::Receiver<PeerTrackerInfo> {
301 self.p2p().peer_tracker_info_watcher()
302 }
303
304 pub async fn wait_connected(&self) -> Result<()> {
306 Ok(self.p2p().wait_connected().await?)
307 }
308
309 pub async fn wait_connected_trusted(&self) -> Result<()> {
311 Ok(self.p2p().wait_connected_trusted().await?)
312 }
313
314 pub async fn network_info(&self) -> Result<NetworkInfo> {
316 Ok(self.p2p().network_info().await?)
317 }
318
319 pub async fn listeners(&self) -> Result<Vec<Multiaddr>> {
321 Ok(self.p2p().listeners().await?)
322 }
323
324 pub async fn connected_peers(&self) -> Result<Vec<PeerId>> {
326 Ok(self.p2p().connected_peers().await?)
327 }
328
329 pub async fn set_peer_trust(&self, peer_id: PeerId, is_trusted: bool) -> Result<()> {
331 Ok(self.p2p().set_peer_trust(peer_id, is_trusted).await?)
332 }
333
334 pub async fn request_head_header(&self) -> Result<ExtendedHeader> {
336 Ok(self.p2p().get_head_header().await?)
337 }
338
339 pub async fn request_header_by_hash(&self, hash: &Hash) -> Result<ExtendedHeader> {
341 Ok(self.p2p().get_header(*hash).await?)
342 }
343
344 pub async fn request_header_by_height(&self, hash: u64) -> Result<ExtendedHeader> {
346 Ok(self.p2p().get_header_by_height(hash).await?)
347 }
348
349 pub async fn request_verified_headers(
353 &self,
354 from: &ExtendedHeader,
355 amount: u64,
356 ) -> Result<Vec<ExtendedHeader>> {
357 Ok(self.p2p().get_verified_headers_range(from, amount).await?)
358 }
359
360 pub async fn request_row(
367 &self,
368 row_index: u16,
369 block_height: u64,
370 timeout: Option<Duration>,
371 ) -> Result<Row> {
372 Ok(self.p2p().get_row(row_index, block_height, timeout).await?)
373 }
374
375 pub async fn request_sample(
382 &self,
383 row_index: u16,
384 column_index: u16,
385 block_height: u64,
386 timeout: Option<Duration>,
387 ) -> Result<Sample> {
388 let sample = self
389 .p2p()
390 .get_sample(row_index, column_index, block_height, timeout)
391 .await?;
392
393 if let Some(metadata) = self.get_sampling_metadata(block_height).await? {
403 let cid = sample_cid(row_index, column_index, block_height)?;
404 if !metadata.cids.contains(&cid) {
405 let blockstore = self
406 .blockstore
407 .as_ref()
408 .expect("Blockstore not initialized");
409 let _ = blockstore.remove(&cid).await;
410 }
411 }
412
413 Ok(sample)
414 }
415
416 pub async fn request_row_namespace_data(
423 &self,
424 namespace: Namespace,
425 row_index: u16,
426 block_height: u64,
427 timeout: Option<Duration>,
428 ) -> Result<RowNamespaceData> {
429 Ok(self
430 .p2p()
431 .get_row_namespace_data(namespace, row_index, block_height, timeout)
432 .await?)
433 }
434
435 pub async fn request_all_blobs(
438 &self,
439 namespace: Namespace,
440 block_height: u64,
441 timeout: Option<Duration>,
442 ) -> Result<Vec<Blob>> {
443 Ok(self
444 .p2p()
445 .get_all_blobs(namespace, block_height, timeout, self.store())
446 .await?)
447 }
448
449 pub async fn syncer_info(&self) -> Result<SyncingInfo> {
451 Ok(self.syncer().info().await?)
452 }
453
454 pub async fn get_network_head_header(&self) -> Result<Option<ExtendedHeader>> {
456 Ok(self.p2p().get_network_head().await?)
457 }
458
459 pub async fn get_local_head_header(&self) -> Result<ExtendedHeader> {
461 Ok(self.store().get_head().await?)
462 }
463
464 pub async fn get_header_by_hash(&self, hash: &Hash) -> Result<ExtendedHeader> {
466 Ok(self.store().get_by_hash(hash).await?)
467 }
468
469 pub async fn get_header_by_height(&self, height: u64) -> Result<ExtendedHeader> {
471 Ok(self.store().get_by_height(height).await?)
472 }
473
474 pub async fn get_headers<R>(&self, range: R) -> Result<Vec<ExtendedHeader>>
485 where
486 R: RangeBounds<u64> + Send,
487 {
488 Ok(self.store().get_range(range).await?)
489 }
490
491 pub async fn get_sampling_metadata(&self, height: u64) -> Result<Option<SamplingMetadata>> {
495 match self.store().get_sampling_metadata(height).await {
496 Ok(val) => Ok(val),
497 Err(StoreError::NotFound) => Ok(None),
498 Err(e) => Err(e.into()),
499 }
500 }
501}
502
503impl<B, S> Drop for Node<B, S>
504where
505 B: Blockstore,
506 S: Store,
507{
508 fn drop(&mut self) {
509 self.tasks_cancellation_token.cancel();
511
512 if let Some(daser) = self.daser.take() {
513 daser.stop();
514 }
515
516 if let Some(syncer) = self.syncer.take() {
517 syncer.stop();
518 }
519
520 if let Some(pruner) = self.pruner.take() {
521 pruner.stop();
522 }
523
524 if let Some(p2p) = self.p2p.take() {
525 p2p.stop();
526 }
527 }
528}