nyxd_scraper_shared/scraper/
mod.rs1use crate::PruningOptions;
5use crate::block_processor::types::BlockToProcess;
6use crate::block_processor::{BlockProcessor, BlockProcessorConfig};
7use crate::block_requester::{BlockRequest, BlockRequester};
8use crate::error::ScraperError;
9use crate::modules::{BlockModule, MsgModule, TxModule};
10use crate::rpc_client::RpcClient;
11use crate::scraper::subscriber::ChainSubscriber;
12use crate::storage::NyxdScraperStorage;
13use futures::future::join_all;
14use std::marker::PhantomData;
15use std::sync::Arc;
16use tokio::sync::Notify;
17use tokio::sync::mpsc::{
18 Receiver, Sender, UnboundedReceiver, UnboundedSender, channel, unbounded_channel,
19};
20use tokio_util::sync::CancellationToken;
21use tokio_util::task::TaskTracker;
22use tracing::{error, info};
23use url::Url;
24
25mod subscriber;
26
27#[derive(Default, Clone, Copy)]
28pub struct StartingBlockOpts {
29 pub start_block_height: Option<u32>,
30
31 pub use_best_effort_start_height: bool,
34}
35
36pub struct Config {
37 pub websocket_url: Url,
39
40 pub rpc_url: Url,
42
43 pub database_storage: String,
45
46 pub pruning_options: PruningOptions,
47
48 pub store_precommits: bool,
49
50 pub start_block: StartingBlockOpts,
51}
52
53pub struct NyxdScraperBuilder<S> {
54 _storage: PhantomData<S>,
55 config: Config,
56
57 block_modules: Vec<Box<dyn BlockModule + Send>>,
58 tx_modules: Vec<Box<dyn TxModule + Send>>,
59 msg_modules: Vec<Box<dyn MsgModule + Send>>,
60}
61
62impl<S> NyxdScraperBuilder<S>
63where
64 S: NyxdScraperStorage + Send + Sync + 'static,
65 S::StorageTransaction: Send + Sync + 'static,
66{
67 pub async fn build_and_start(self) -> Result<NyxdScraper<S>, ScraperError> {
68 let scraper = NyxdScraper::<S>::new(self.config).await?;
69
70 let (processing_tx, processing_rx) = unbounded_channel();
71 let (req_tx, req_rx) = channel(5);
72
73 let rpc_client = RpcClient::new(&scraper.config.rpc_url)?;
74
75 let block_requester = BlockRequester::new(
77 scraper.cancel_token.clone(),
78 rpc_client.clone(),
79 req_rx,
80 processing_tx.clone(),
81 );
82
83 let block_processor_config = BlockProcessorConfig::new(
84 scraper.config.pruning_options,
85 scraper.config.store_precommits,
86 scraper.config.start_block.start_block_height,
87 scraper.config.start_block.use_best_effort_start_height,
88 );
89
90 let mut block_processor = BlockProcessor::new(
91 block_processor_config,
92 scraper.cancel_token.clone(),
93 scraper.startup_sync.clone(),
94 processing_rx,
95 req_tx,
96 scraper.storage.clone(),
97 rpc_client,
98 )
99 .await?;
100 block_processor.set_block_modules(self.block_modules);
101 block_processor.set_tx_modules(self.tx_modules);
102 block_processor.set_msg_modules(self.msg_modules);
103
104 let chain_subscriber = ChainSubscriber::new(
105 &scraper.config.websocket_url,
106 scraper.cancel_token.clone(),
107 scraper.task_tracker.clone(),
108 processing_tx,
109 )
110 .await?;
111
112 scraper.start_tasks(block_requester, block_processor, chain_subscriber);
113
114 Ok(scraper)
115 }
116
117 pub fn new(config: Config) -> Self {
118 NyxdScraperBuilder {
119 _storage: PhantomData,
120 config,
121 block_modules: vec![],
122 tx_modules: vec![],
123 msg_modules: vec![],
124 }
125 }
126
127 pub fn with_block_module<M: BlockModule + Send + 'static>(mut self, module: M) -> Self {
128 self.block_modules.push(Box::new(module));
129 self
130 }
131
132 pub fn with_tx_module<M: TxModule + Send + 'static>(mut self, module: M) -> Self {
133 self.tx_modules.push(Box::new(module));
134 self
135 }
136
137 pub fn with_msg_module<M: MsgModule + Send + 'static>(mut self, module: M) -> Self {
138 self.msg_modules.push(Box::new(module));
139 self
140 }
141}
142
143pub struct NyxdScraper<S> {
144 config: Config,
145
146 task_tracker: TaskTracker,
147 cancel_token: CancellationToken,
148 startup_sync: Arc<Notify>,
149 storage: S,
150 rpc_client: RpcClient,
151}
152
153impl<S> NyxdScraper<S>
154where
155 S: NyxdScraperStorage + Send + Sync + 'static,
156 S::StorageTransaction: Send + Sync + 'static,
157{
158 pub fn builder(config: Config) -> NyxdScraperBuilder<S> {
159 NyxdScraperBuilder::new(config)
160 }
161
162 pub async fn new(config: Config) -> Result<Self, ScraperError> {
163 config.pruning_options.validate()?;
164 let storage = S::initialise(&config.database_storage).await?;
165 let rpc_client = RpcClient::new(&config.rpc_url)?;
166
167 Ok(NyxdScraper {
168 config,
169 task_tracker: TaskTracker::new(),
170 cancel_token: CancellationToken::new(),
171 startup_sync: Arc::new(Default::default()),
172 storage,
173 rpc_client,
174 })
175 }
176
177 pub fn storage(&self) -> &S {
178 &self.storage
179 }
180
181 fn start_tasks(
182 &self,
183 mut block_requester: BlockRequester,
184 mut block_processor: BlockProcessor<S>,
185 mut chain_subscriber: ChainSubscriber,
186 ) {
187 self.task_tracker
188 .spawn(async move { block_requester.run().await });
189 self.task_tracker
190 .spawn(async move { block_processor.run().await });
191 self.task_tracker
192 .spawn(async move { chain_subscriber.run().await });
193
194 self.task_tracker.close();
195 }
196
197 pub async fn unsafe_process_single_block(&self, height: u32) -> Result<(), ScraperError> {
201 info!(height = height, "attempting to process a single block");
202 if !self.task_tracker.is_empty() {
203 return Err(ScraperError::ScraperAlreadyRunning);
204 }
205
206 let (_, processing_rx) = unbounded_channel();
207 let (req_tx, _) = channel(5);
208
209 let mut block_processor = self
210 .new_block_processor(req_tx.clone(), processing_rx)
211 .await?
212 .with_pruning(PruningOptions::nothing());
213
214 let block = self.rpc_client.get_basic_block_details(height).await?;
215
216 block_processor.process_block(block.into()).await
217 }
218
219 pub async fn unsafe_process_block_range(
223 &self,
224 starting_height: Option<u32>,
225 end_height: Option<u32>,
226 ) -> Result<(), ScraperError> {
227 if !self.task_tracker.is_empty() {
228 return Err(ScraperError::ScraperAlreadyRunning);
229 }
230
231 let (_, processing_rx) = unbounded_channel();
232 let (req_tx, _) = channel(5);
233
234 let mut block_processor = self
235 .new_block_processor(req_tx.clone(), processing_rx)
236 .await?
237 .with_pruning(PruningOptions::nothing());
238
239 let mut current_height = self.rpc_client.current_block_height().await? as u32;
240 let last_processed = block_processor.last_process_height();
241
242 let mut starting_height = match starting_height {
243 Some(explicit) => explicit,
245 None => {
246 if last_processed != 0 {
249 last_processed
250 } else {
251 current_height
252 }
253 }
254 };
255
256 let must_catch_up = end_height.is_none();
257 let mut end_height = match end_height {
258 Some(explicit) => explicit,
260 None => {
261 if last_processed > starting_height {
265 last_processed - 1
266 } else {
267 current_height
268 }
269 }
270 };
271
272 let mut last_processed = starting_height;
273
274 while last_processed < current_height {
275 info!(
276 starting_height = starting_height,
277 end_height = end_height,
278 "attempting to process block range"
279 );
280
281 let range = (starting_height..=end_height).collect::<Vec<_>>();
282
283 for batch in range.chunks(4) {
286 let batch_result = join_all(
287 batch
288 .iter()
289 .map(|height| self.rpc_client.get_basic_block_details(*height)),
290 )
291 .await;
292 for result in batch_result {
293 match result {
294 Ok(block) => block_processor.process_block(block.into()).await?,
295 Err(err) => {
296 error!("failed to retrieve the block: {err}. stopping...");
297 return Err(err);
298 }
299 }
300 }
301 }
302
303 if !must_catch_up {
305 return Ok(());
306 }
307
308 last_processed = end_height;
310 current_height = self.rpc_client.current_block_height().await? as u32;
311
312 info!(
313 last_processed = last_processed,
314 current_height = current_height,
315 "🏃 still need to catch up..."
316 );
317
318 starting_height = last_processed + 1;
319 end_height = current_height;
320 }
321
322 if must_catch_up {
323 info!(
324 last_processed = last_processed,
325 current_height = current_height,
326 "✅ block processing has caught up!"
327 );
328 }
329
330 Ok(())
331 }
332
333 fn new_block_requester(
334 &self,
335 req_rx: Receiver<BlockRequest>,
336 processing_tx: UnboundedSender<BlockToProcess>,
337 ) -> BlockRequester {
338 BlockRequester::new(
339 self.cancel_token.clone(),
340 self.rpc_client.clone(),
341 req_rx,
342 processing_tx.clone(),
343 )
344 }
345
346 async fn new_block_processor(
347 &self,
348 req_tx: Sender<BlockRequest>,
349 processing_rx: UnboundedReceiver<BlockToProcess>,
350 ) -> Result<BlockProcessor<S>, ScraperError> {
351 let block_processor_config = BlockProcessorConfig::new(
352 self.config.pruning_options,
353 self.config.store_precommits,
354 self.config.start_block.start_block_height,
355 self.config.start_block.use_best_effort_start_height,
356 );
357
358 BlockProcessor::<S>::new(
359 block_processor_config,
360 self.cancel_token.clone(),
361 self.startup_sync.clone(),
362 processing_rx,
363 req_tx,
364 self.storage.clone(),
365 self.rpc_client.clone(),
366 )
367 .await
368 }
369
370 async fn new_chain_subscriber(
371 &self,
372 processing_tx: UnboundedSender<BlockToProcess>,
373 ) -> Result<ChainSubscriber, ScraperError> {
374 ChainSubscriber::new(
375 &self.config.websocket_url,
376 self.cancel_token.clone(),
377 self.task_tracker.clone(),
378 processing_tx,
379 )
380 .await
381 }
382
383 pub async fn start(&self) -> Result<(), ScraperError> {
384 let (processing_tx, processing_rx) = unbounded_channel();
385 let (req_tx, req_rx) = channel(5);
386
387 let block_requester = self.new_block_requester(req_rx, processing_tx.clone());
389 let block_processor = self.new_block_processor(req_tx, processing_rx).await?;
390 let chain_subscriber = self.new_chain_subscriber(processing_tx).await?;
391
392 self.start_tasks(block_requester, block_processor, chain_subscriber);
394
395 Ok(())
396 }
397
398 pub async fn wait_for_startup_sync(&self) {
399 info!("awaiting startup chain sync");
400 self.startup_sync.notified().await
401 }
402
403 pub async fn stop(self) {
404 info!("stopping the chain scraper");
405 assert!(self.task_tracker.is_closed());
406
407 self.cancel_token.cancel();
408 self.task_tracker.wait().await
409 }
410
411 pub fn cancel_token(&self) -> CancellationToken {
412 self.cancel_token.clone()
413 }
414
415 pub fn is_cancelled(&self) -> bool {
416 self.cancel_token.is_cancelled()
417 }
418}