nyxd_scraper_shared/scraper/
mod.rs

1// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::PruningOptions;
5use crate::block_processor::types::BlockToProcess;
6use crate::block_processor::{BlockProcessor, BlockProcessorConfig};
7use crate::block_requester::{BlockRequest, BlockRequester};
8use crate::error::ScraperError;
9use crate::modules::{BlockModule, MsgModule, TxModule};
10use crate::rpc_client::RpcClient;
11use crate::scraper::subscriber::ChainSubscriber;
12use crate::storage::NyxdScraperStorage;
13use futures::future::join_all;
14use std::marker::PhantomData;
15use std::sync::Arc;
16use tokio::sync::Notify;
17use tokio::sync::mpsc::{
18    Receiver, Sender, UnboundedReceiver, UnboundedSender, channel, unbounded_channel,
19};
20use tokio_util::sync::CancellationToken;
21use tokio_util::task::TaskTracker;
22use tracing::{error, info};
23use url::Url;
24
25mod subscriber;
26
27#[derive(Default, Clone, Copy)]
28pub struct StartingBlockOpts {
29    pub start_block_height: Option<u32>,
30
31    /// If the scraper fails to start from the desired height, rather than failing,
32    /// attempt to use the next available height
33    pub use_best_effort_start_height: bool,
34}
35
36pub struct Config {
37    /// Url to the websocket endpoint of a validator, for example `wss://rpc.nymtech.net/websocket`
38    pub websocket_url: Url,
39
40    /// Url to the rpc endpoint of a validator, for example `https://rpc.nymtech.net/`
41    pub rpc_url: Url,
42
43    /// Points to either underlying file (sqlite) or connection string (postgres)
44    pub database_storage: String,
45
46    pub pruning_options: PruningOptions,
47
48    pub store_precommits: bool,
49
50    pub start_block: StartingBlockOpts,
51}
52
53pub struct NyxdScraperBuilder<S> {
54    _storage: PhantomData<S>,
55    config: Config,
56
57    block_modules: Vec<Box<dyn BlockModule + Send>>,
58    tx_modules: Vec<Box<dyn TxModule + Send>>,
59    msg_modules: Vec<Box<dyn MsgModule + Send>>,
60}
61
62impl<S> NyxdScraperBuilder<S>
63where
64    S: NyxdScraperStorage + Send + Sync + 'static,
65    S::StorageTransaction: Send + Sync + 'static,
66{
67    pub async fn build_and_start(self) -> Result<NyxdScraper<S>, ScraperError> {
68        let scraper = NyxdScraper::<S>::new(self.config).await?;
69
70        let (processing_tx, processing_rx) = unbounded_channel();
71        let (req_tx, req_rx) = channel(5);
72
73        let rpc_client = RpcClient::new(&scraper.config.rpc_url)?;
74
75        // create the tasks
76        let block_requester = BlockRequester::new(
77            scraper.cancel_token.clone(),
78            rpc_client.clone(),
79            req_rx,
80            processing_tx.clone(),
81        );
82
83        let block_processor_config = BlockProcessorConfig::new(
84            scraper.config.pruning_options,
85            scraper.config.store_precommits,
86            scraper.config.start_block.start_block_height,
87            scraper.config.start_block.use_best_effort_start_height,
88        );
89
90        let mut block_processor = BlockProcessor::new(
91            block_processor_config,
92            scraper.cancel_token.clone(),
93            scraper.startup_sync.clone(),
94            processing_rx,
95            req_tx,
96            scraper.storage.clone(),
97            rpc_client,
98        )
99        .await?;
100        block_processor.set_block_modules(self.block_modules);
101        block_processor.set_tx_modules(self.tx_modules);
102        block_processor.set_msg_modules(self.msg_modules);
103
104        let chain_subscriber = ChainSubscriber::new(
105            &scraper.config.websocket_url,
106            scraper.cancel_token.clone(),
107            scraper.task_tracker.clone(),
108            processing_tx,
109        )
110        .await?;
111
112        scraper.start_tasks(block_requester, block_processor, chain_subscriber);
113
114        Ok(scraper)
115    }
116
117    pub fn new(config: Config) -> Self {
118        NyxdScraperBuilder {
119            _storage: PhantomData,
120            config,
121            block_modules: vec![],
122            tx_modules: vec![],
123            msg_modules: vec![],
124        }
125    }
126
127    pub fn with_block_module<M: BlockModule + Send + 'static>(mut self, module: M) -> Self {
128        self.block_modules.push(Box::new(module));
129        self
130    }
131
132    pub fn with_tx_module<M: TxModule + Send + 'static>(mut self, module: M) -> Self {
133        self.tx_modules.push(Box::new(module));
134        self
135    }
136
137    pub fn with_msg_module<M: MsgModule + Send + 'static>(mut self, module: M) -> Self {
138        self.msg_modules.push(Box::new(module));
139        self
140    }
141}
142
143pub struct NyxdScraper<S> {
144    config: Config,
145
146    task_tracker: TaskTracker,
147    cancel_token: CancellationToken,
148    startup_sync: Arc<Notify>,
149    storage: S,
150    rpc_client: RpcClient,
151}
152
153impl<S> NyxdScraper<S>
154where
155    S: NyxdScraperStorage + Send + Sync + 'static,
156    S::StorageTransaction: Send + Sync + 'static,
157{
158    pub fn builder(config: Config) -> NyxdScraperBuilder<S> {
159        NyxdScraperBuilder::new(config)
160    }
161
162    pub async fn new(config: Config) -> Result<Self, ScraperError> {
163        config.pruning_options.validate()?;
164        let storage = S::initialise(&config.database_storage).await?;
165        let rpc_client = RpcClient::new(&config.rpc_url)?;
166
167        Ok(NyxdScraper {
168            config,
169            task_tracker: TaskTracker::new(),
170            cancel_token: CancellationToken::new(),
171            startup_sync: Arc::new(Default::default()),
172            storage,
173            rpc_client,
174        })
175    }
176
177    pub fn storage(&self) -> &S {
178        &self.storage
179    }
180
181    fn start_tasks(
182        &self,
183        mut block_requester: BlockRequester,
184        mut block_processor: BlockProcessor<S>,
185        mut chain_subscriber: ChainSubscriber,
186    ) {
187        self.task_tracker
188            .spawn(async move { block_requester.run().await });
189        self.task_tracker
190            .spawn(async move { block_processor.run().await });
191        self.task_tracker
192            .spawn(async move { chain_subscriber.run().await });
193
194        self.task_tracker.close();
195    }
196
197    // DO NOT USE UNLESS YOU KNOW EXACTLY WHAT YOU'RE DOING
198    // AS THIS WILL NOT USE ANY OF YOUR REGISTERED MODULES
199    // YOU WILL BE FIRED IF YOU USE IT : )
200    pub async fn unsafe_process_single_block(&self, height: u32) -> Result<(), ScraperError> {
201        info!(height = height, "attempting to process a single block");
202        if !self.task_tracker.is_empty() {
203            return Err(ScraperError::ScraperAlreadyRunning);
204        }
205
206        let (_, processing_rx) = unbounded_channel();
207        let (req_tx, _) = channel(5);
208
209        let mut block_processor = self
210            .new_block_processor(req_tx.clone(), processing_rx)
211            .await?
212            .with_pruning(PruningOptions::nothing());
213
214        let block = self.rpc_client.get_basic_block_details(height).await?;
215
216        block_processor.process_block(block.into()).await
217    }
218
219    // DO NOT USE UNLESS YOU KNOW EXACTLY WHAT YOU'RE DOING
220    // AS THIS WILL NOT USE ANY OF YOUR REGISTERED MODULES
221    // YOU WILL BE FIRED IF YOU USE IT : )
222    pub async fn unsafe_process_block_range(
223        &self,
224        starting_height: Option<u32>,
225        end_height: Option<u32>,
226    ) -> Result<(), ScraperError> {
227        if !self.task_tracker.is_empty() {
228            return Err(ScraperError::ScraperAlreadyRunning);
229        }
230
231        let (_, processing_rx) = unbounded_channel();
232        let (req_tx, _) = channel(5);
233
234        let mut block_processor = self
235            .new_block_processor(req_tx.clone(), processing_rx)
236            .await?
237            .with_pruning(PruningOptions::nothing());
238
239        let mut current_height = self.rpc_client.current_block_height().await? as u32;
240        let last_processed = block_processor.last_process_height();
241
242        let mut starting_height = match starting_height {
243            // always attempt to use whatever the user has provided
244            Some(explicit) => explicit,
245            None => {
246                // otherwise, attempt to resume where we last stopped
247                // and if we haven't processed anything, start from the current height
248                if last_processed != 0 {
249                    last_processed
250                } else {
251                    current_height
252                }
253            }
254        };
255
256        let must_catch_up = end_height.is_none();
257        let mut end_height = match end_height {
258            // always attempt to use whatever the user has provided
259            Some(explicit) => explicit,
260            None => {
261                // otherwise, attempt to either go from the start height to the height right
262                // before the final processed block held in the storage (in case there are gaps)
263                // or finally, just go to the current block height
264                if last_processed > starting_height {
265                    last_processed - 1
266                } else {
267                    current_height
268                }
269            }
270        };
271
272        let mut last_processed = starting_height;
273
274        while last_processed < current_height {
275            info!(
276                starting_height = starting_height,
277                end_height = end_height,
278                "attempting to process block range"
279            );
280
281            let range = (starting_height..=end_height).collect::<Vec<_>>();
282
283            // the most likely bottleneck here are going to be the chain queries,
284            // so batch multiple requests
285            for batch in range.chunks(4) {
286                let batch_result = join_all(
287                    batch
288                        .iter()
289                        .map(|height| self.rpc_client.get_basic_block_details(*height)),
290                )
291                .await;
292                for result in batch_result {
293                    match result {
294                        Ok(block) => block_processor.process_block(block.into()).await?,
295                        Err(err) => {
296                            error!("failed to retrieve the block: {err}. stopping...");
297                            return Err(err);
298                        }
299                    }
300                }
301            }
302
303            // if we don't need to catch up, return early
304            if !must_catch_up {
305                return Ok(());
306            }
307
308            // check if we have caught up to the current block height
309            last_processed = end_height;
310            current_height = self.rpc_client.current_block_height().await? as u32;
311
312            info!(
313                last_processed = last_processed,
314                current_height = current_height,
315                "🏃 still need to catch up..."
316            );
317
318            starting_height = last_processed + 1;
319            end_height = current_height;
320        }
321
322        if must_catch_up {
323            info!(
324                last_processed = last_processed,
325                current_height = current_height,
326                "✅ block processing has caught up!"
327            );
328        }
329
330        Ok(())
331    }
332
333    fn new_block_requester(
334        &self,
335        req_rx: Receiver<BlockRequest>,
336        processing_tx: UnboundedSender<BlockToProcess>,
337    ) -> BlockRequester {
338        BlockRequester::new(
339            self.cancel_token.clone(),
340            self.rpc_client.clone(),
341            req_rx,
342            processing_tx.clone(),
343        )
344    }
345
346    async fn new_block_processor(
347        &self,
348        req_tx: Sender<BlockRequest>,
349        processing_rx: UnboundedReceiver<BlockToProcess>,
350    ) -> Result<BlockProcessor<S>, ScraperError> {
351        let block_processor_config = BlockProcessorConfig::new(
352            self.config.pruning_options,
353            self.config.store_precommits,
354            self.config.start_block.start_block_height,
355            self.config.start_block.use_best_effort_start_height,
356        );
357
358        BlockProcessor::<S>::new(
359            block_processor_config,
360            self.cancel_token.clone(),
361            self.startup_sync.clone(),
362            processing_rx,
363            req_tx,
364            self.storage.clone(),
365            self.rpc_client.clone(),
366        )
367        .await
368    }
369
370    async fn new_chain_subscriber(
371        &self,
372        processing_tx: UnboundedSender<BlockToProcess>,
373    ) -> Result<ChainSubscriber, ScraperError> {
374        ChainSubscriber::new(
375            &self.config.websocket_url,
376            self.cancel_token.clone(),
377            self.task_tracker.clone(),
378            processing_tx,
379        )
380        .await
381    }
382
383    pub async fn start(&self) -> Result<(), ScraperError> {
384        let (processing_tx, processing_rx) = unbounded_channel();
385        let (req_tx, req_rx) = channel(5);
386
387        // create the tasks
388        let block_requester = self.new_block_requester(req_rx, processing_tx.clone());
389        let block_processor = self.new_block_processor(req_tx, processing_rx).await?;
390        let chain_subscriber = self.new_chain_subscriber(processing_tx).await?;
391
392        // spawn them
393        self.start_tasks(block_requester, block_processor, chain_subscriber);
394
395        Ok(())
396    }
397
398    pub async fn wait_for_startup_sync(&self) {
399        info!("awaiting startup chain sync");
400        self.startup_sync.notified().await
401    }
402
403    pub async fn stop(self) {
404        info!("stopping the chain scraper");
405        assert!(self.task_tracker.is_closed());
406
407        self.cancel_token.cancel();
408        self.task_tracker.wait().await
409    }
410
411    pub fn cancel_token(&self) -> CancellationToken {
412        self.cancel_token.clone()
413    }
414
415    pub fn is_cancelled(&self) -> bool {
416        self.cancel_token.is_cancelled()
417    }
418}