1use std::ops::RangeBounds;
7use std::sync::Arc;
8use std::time::Duration;
9
10use blockstore::Blockstore;
11use celestia_types::hash::Hash;
12use celestia_types::nmt::Namespace;
13use celestia_types::row::Row;
14use celestia_types::row_namespace_data::RowNamespaceData;
15use celestia_types::sample::Sample;
16use celestia_types::{Blob, ExtendedHeader};
17use libp2p::identity::Keypair;
18use libp2p::swarm::NetworkInfo;
19use libp2p::{Multiaddr, PeerId};
20use lumina_utils::executor::{spawn_cancellable, JoinHandle};
21use tokio::sync::watch;
22use tokio_util::sync::CancellationToken;
23use tracing::warn;
24
25use crate::blockstore::InMemoryBlockstore;
26use crate::daser::{Daser, DaserArgs};
27use crate::events::{EventChannel, EventSubscriber, NodeEvent};
28use crate::p2p::{P2p, P2pArgs};
29use crate::pruner::{Pruner, PrunerArgs, DEFAULT_PRUNING_INTERVAL};
30use crate::store::{InMemoryStore, SamplingMetadata, Store, StoreError};
31use crate::syncer::{Syncer, SyncerArgs};
32
33mod builder;
34
35pub use self::builder::{
36 NodeBuilder, NodeBuilderError, DEFAULT_PRUNING_DELAY, DEFAULT_SAMPLING_WINDOW,
37 MIN_PRUNING_DELAY, MIN_SAMPLING_WINDOW,
38};
39pub use crate::daser::DaserError;
40pub use crate::p2p::{HeaderExError, P2pError};
41pub use crate::peer_tracker::PeerTrackerInfo;
42pub use crate::syncer::{SyncerError, SyncingInfo};
43
44pub type Result<T, E = NodeError> = std::result::Result<T, E>;
48
49#[derive(Debug, thiserror::Error)]
51pub enum NodeError {
52 #[error("NodeBuilder: {0}")]
54 NodeBuilder(#[from] NodeBuilderError),
55
56 #[error("P2p: {0}")]
58 P2p(#[from] P2pError),
59
60 #[error("Syncer: {0}")]
62 Syncer(#[from] SyncerError),
63
64 #[error("Store: {0}")]
66 Store(#[from] StoreError),
67
68 #[error("Daser: {0}")]
70 Daser(#[from] DaserError),
71}
72
73struct NodeConfig<B, S>
74where
75 B: Blockstore,
76 S: Store,
77{
78 pub(crate) blockstore: B,
79 pub(crate) store: S,
80 pub(crate) network_id: String,
81 pub(crate) p2p_local_keypair: Keypair,
82 pub(crate) p2p_bootnodes: Vec<Multiaddr>,
83 pub(crate) p2p_listen_on: Vec<Multiaddr>,
84 pub(crate) sync_batch_size: u64,
85 pub(crate) sampling_window: Duration,
86 pub(crate) pruning_window: Duration,
87}
88
89pub struct Node<B, S>
91where
92 B: Blockstore + 'static,
93 S: Store + 'static,
94{
95 event_channel: EventChannel,
96 p2p: Option<Arc<P2p>>,
97 blockstore: Option<Arc<B>>,
98 store: Option<Arc<S>>,
99 syncer: Option<Arc<Syncer<S>>>,
100 daser: Option<Arc<Daser>>,
101 pruner: Option<Arc<Pruner>>,
102 tasks_cancellation_token: CancellationToken,
103 network_compromised_task: JoinHandle,
104}
105
106impl Node<InMemoryBlockstore, InMemoryStore> {
107 pub fn builder() -> NodeBuilder<InMemoryBlockstore, InMemoryStore> {
127 NodeBuilder::new()
128 }
129}
130
131impl<B, S> Node<B, S>
132where
133 B: Blockstore,
134 S: Store,
135{
136 async fn start(config: NodeConfig<B, S>) -> Result<(Self, EventSubscriber)> {
138 let event_channel = EventChannel::new();
139 let event_sub = event_channel.subscribe();
140 let store = Arc::new(config.store);
141 let blockstore = Arc::new(config.blockstore);
142
143 let p2p = Arc::new(
144 P2p::start(P2pArgs {
145 network_id: config.network_id,
146 local_keypair: config.p2p_local_keypair,
147 bootnodes: config.p2p_bootnodes,
148 listen_on: config.p2p_listen_on,
149 blockstore: blockstore.clone(),
150 store: store.clone(),
151 event_pub: event_channel.publisher(),
152 })
153 .await?,
154 );
155
156 let syncer = Arc::new(Syncer::start(SyncerArgs {
157 store: store.clone(),
158 p2p: p2p.clone(),
159 event_pub: event_channel.publisher(),
160 batch_size: config.sync_batch_size,
161 syncing_window: config.sampling_window,
164 })?);
165
166 let daser = Arc::new(Daser::start(DaserArgs {
167 p2p: p2p.clone(),
168 store: store.clone(),
169 event_pub: event_channel.publisher(),
170 sampling_window: config.sampling_window,
171 })?);
172
173 let pruner = Arc::new(Pruner::start(PrunerArgs {
174 store: store.clone(),
175 blockstore: blockstore.clone(),
176 event_pub: event_channel.publisher(),
177 pruning_interval: DEFAULT_PRUNING_INTERVAL,
178 pruning_window: config.pruning_window,
179 }));
180
181 let tasks_cancellation_token = CancellationToken::new();
182
183 let network_compromised_task = spawn_cancellable(tasks_cancellation_token.child_token(), {
185 let network_compromised_token = p2p.get_network_compromised_token().await?;
186 let syncer = syncer.clone();
187 let daser = daser.clone();
188 let pruner = pruner.clone();
189 let event_pub = event_channel.publisher();
190
191 async move {
192 network_compromised_token.triggered().await;
193
194 syncer.stop();
196 daser.stop();
197 pruner.stop();
198
199 event_pub.send(NodeEvent::NetworkCompromised);
200 warn!("{}", NodeEvent::NetworkCompromised);
203 }
204 });
205
206 let node = Node {
207 event_channel,
208 p2p: Some(p2p),
209 blockstore: Some(blockstore),
210 store: Some(store),
211 syncer: Some(syncer),
212 daser: Some(daser),
213 pruner: Some(pruner),
214 tasks_cancellation_token,
215 network_compromised_task,
216 };
217
218 Ok((node, event_sub))
219 }
220
221 pub async fn stop(mut self) {
223 {
224 let daser = self.daser.take().expect("Daser not initialized");
225 let syncer = self.syncer.take().expect("Syncer not initialized");
226 let pruner = self.pruner.take().expect("Pruner not initialized");
227 let p2p = self.p2p.take().expect("P2p not initialized");
228
229 self.tasks_cancellation_token.cancel();
231 self.network_compromised_task.join().await;
232
233 daser.stop();
235 syncer.stop();
236 pruner.stop();
237
238 daser.join().await;
239 syncer.join().await;
240 pruner.join().await;
241
242 p2p.stop();
244 p2p.join().await;
245 }
246
247 let blockstore = self.blockstore.take().expect("Blockstore not initialized");
249 let blockstore = Arc::into_inner(blockstore).expect("Not all Arc<Blockstore> were dropped");
250 if let Err(e) = blockstore.close().await {
251 warn!("Blockstore failed to close: {e}");
252 }
253
254 let store = self.store.take().expect("Store not initialized");
256 let store = Arc::into_inner(store).expect("Not all Arc<Store> were dropped");
257 if let Err(e) = store.close().await {
258 warn!("Store failed to close: {e}");
259 }
260
261 self.event_channel.publisher().send(NodeEvent::NodeStopped);
262 }
263
264 fn syncer(&self) -> &Syncer<S> {
265 self.syncer.as_ref().expect("Syncer not initialized")
266 }
267
268 fn p2p(&self) -> &P2p {
269 self.p2p.as_ref().expect("P2p not initialized")
270 }
271
272 fn store(&self) -> &S {
273 self.store.as_ref().expect("Store not initialized")
274 }
275
276 pub fn event_subscriber(&self) -> EventSubscriber {
278 self.event_channel.subscribe()
279 }
280
281 pub fn local_peer_id(&self) -> &PeerId {
283 self.p2p().local_peer_id()
284 }
285
286 pub fn peer_tracker_info(&self) -> PeerTrackerInfo {
288 self.p2p().peer_tracker_info().clone()
289 }
290
291 pub fn peer_tracker_info_watcher(&self) -> watch::Receiver<PeerTrackerInfo> {
293 self.p2p().peer_tracker_info_watcher()
294 }
295
296 pub async fn wait_connected(&self) -> Result<()> {
298 Ok(self.p2p().wait_connected().await?)
299 }
300
301 pub async fn wait_connected_trusted(&self) -> Result<()> {
303 Ok(self.p2p().wait_connected_trusted().await?)
304 }
305
306 pub async fn network_info(&self) -> Result<NetworkInfo> {
308 Ok(self.p2p().network_info().await?)
309 }
310
311 pub async fn listeners(&self) -> Result<Vec<Multiaddr>> {
313 Ok(self.p2p().listeners().await?)
314 }
315
316 pub async fn connected_peers(&self) -> Result<Vec<PeerId>> {
318 Ok(self.p2p().connected_peers().await?)
319 }
320
321 pub async fn set_peer_trust(&self, peer_id: PeerId, is_trusted: bool) -> Result<()> {
323 Ok(self.p2p().set_peer_trust(peer_id, is_trusted).await?)
324 }
325
326 pub async fn request_head_header(&self) -> Result<ExtendedHeader> {
328 Ok(self.p2p().get_head_header().await?)
329 }
330
331 pub async fn request_header_by_hash(&self, hash: &Hash) -> Result<ExtendedHeader> {
333 Ok(self.p2p().get_header(*hash).await?)
334 }
335
336 pub async fn request_header_by_height(&self, hash: u64) -> Result<ExtendedHeader> {
338 Ok(self.p2p().get_header_by_height(hash).await?)
339 }
340
341 pub async fn request_verified_headers(
345 &self,
346 from: &ExtendedHeader,
347 amount: u64,
348 ) -> Result<Vec<ExtendedHeader>> {
349 Ok(self.p2p().get_verified_headers_range(from, amount).await?)
350 }
351
352 pub async fn request_row(
359 &self,
360 row_index: u16,
361 block_height: u64,
362 timeout: Option<Duration>,
363 ) -> Result<Row> {
364 Ok(self.p2p().get_row(row_index, block_height, timeout).await?)
365 }
366
367 pub async fn request_sample(
374 &self,
375 row_index: u16,
376 column_index: u16,
377 block_height: u64,
378 timeout: Option<Duration>,
379 ) -> Result<Sample> {
380 Ok(self
381 .p2p()
382 .get_sample(row_index, column_index, block_height, timeout)
383 .await?)
384 }
385
386 pub async fn request_row_namespace_data(
393 &self,
394 namespace: Namespace,
395 row_index: u16,
396 block_height: u64,
397 timeout: Option<Duration>,
398 ) -> Result<RowNamespaceData> {
399 Ok(self
400 .p2p()
401 .get_row_namespace_data(namespace, row_index, block_height, timeout)
402 .await?)
403 }
404
405 pub async fn request_all_blobs(
408 &self,
409 header: &ExtendedHeader,
410 namespace: Namespace,
411 timeout: Option<Duration>,
412 ) -> Result<Vec<Blob>> {
413 Ok(self.p2p().get_all_blobs(header, namespace, timeout).await?)
414 }
415
416 pub async fn syncer_info(&self) -> Result<SyncingInfo> {
418 Ok(self.syncer().info().await?)
419 }
420
421 pub async fn get_network_head_header(&self) -> Result<Option<ExtendedHeader>> {
423 Ok(self.p2p().get_network_head().await?)
424 }
425
426 pub async fn get_local_head_header(&self) -> Result<ExtendedHeader> {
428 Ok(self.store().get_head().await?)
429 }
430
431 pub async fn get_header_by_hash(&self, hash: &Hash) -> Result<ExtendedHeader> {
433 Ok(self.store().get_by_hash(hash).await?)
434 }
435
436 pub async fn get_header_by_height(&self, height: u64) -> Result<ExtendedHeader> {
438 Ok(self.store().get_by_height(height).await?)
439 }
440
441 pub async fn get_headers<R>(&self, range: R) -> Result<Vec<ExtendedHeader>>
452 where
453 R: RangeBounds<u64> + Send,
454 {
455 Ok(self.store().get_range(range).await?)
456 }
457
458 pub async fn get_sampling_metadata(&self, height: u64) -> Result<Option<SamplingMetadata>> {
462 match self.store().get_sampling_metadata(height).await {
463 Ok(val) => Ok(val),
464 Err(StoreError::NotFound) => Ok(None),
465 Err(e) => Err(e.into()),
466 }
467 }
468}
469
470impl<B, S> Drop for Node<B, S>
471where
472 B: Blockstore,
473 S: Store,
474{
475 fn drop(&mut self) {
476 self.tasks_cancellation_token.cancel();
478
479 if let Some(daser) = self.daser.take() {
480 daser.stop();
481 }
482
483 if let Some(syncer) = self.syncer.take() {
484 syncer.stop();
485 }
486
487 if let Some(pruner) = self.pruner.take() {
488 pruner.stop();
489 }
490
491 if let Some(p2p) = self.p2p.take() {
492 p2p.stop();
493 }
494 }
495}