use crate::PruningOptions;
use crate::block_processor::helpers::split_request_range;
use crate::block_processor::types::BlockToProcess;
use crate::block_requester::BlockRequest;
use crate::error::ScraperError;
use crate::modules::{BlockModule, MsgModule, TxModule};
use crate::rpc_client::RpcClient;
use crate::storage::{NyxdScraperStorage, NyxdScraperTransaction, persist_block};
use futures::StreamExt;
use std::cmp::max;
use std::collections::{BTreeMap, HashSet, VecDeque};
use std::ops::{Add, Range};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Notify;
use tokio::sync::mpsc::{Sender, UnboundedReceiver};
use tokio::time::{Instant, interval_at};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument, trace, warn};
mod helpers;
pub(crate) mod pruning;
pub(crate) mod types;
const MISSING_BLOCKS_CHECK_INTERVAL: Duration = Duration::from_secs(30);
const MAX_MISSING_BLOCKS_DELAY: Duration = Duration::from_secs(15);
const MAX_RANGE_SIZE: usize = 30;
#[derive(Debug, Default)]
struct PendingSync {
request_in_flight: HashSet<u32>,
queued_requests: VecDeque<Range<u32>>,
}
impl PendingSync {
fn is_empty(&self) -> bool {
self.request_in_flight.is_empty() && self.queued_requests.is_empty()
}
}
#[derive(Debug, Clone)]
pub struct BlockProcessorConfig {
pub pruning_options: PruningOptions,
pub store_precommits: bool,
pub explicit_starting_block_height: Option<u32>,
pub use_best_effort_start_height: bool,
}
impl Default for BlockProcessorConfig {
fn default() -> Self {
Self {
pruning_options: PruningOptions::nothing(),
store_precommits: true,
explicit_starting_block_height: None,
use_best_effort_start_height: false,
}
}
}
impl BlockProcessorConfig {
pub fn new(
pruning_options: PruningOptions,
store_precommits: bool,
explicit_starting_block_height: Option<u32>,
use_best_effort_start_height: bool,
) -> Self {
Self {
pruning_options,
store_precommits,
explicit_starting_block_height,
use_best_effort_start_height,
}
}
}
pub struct BlockProcessor<S> {
config: BlockProcessorConfig,
cancel: CancellationToken,
synced: Arc<Notify>,
last_processed_height: u32,
last_pruned_height: u32,
last_processed_at: Instant,
pending_sync: PendingSync,
queued_blocks: BTreeMap<u32, BlockToProcess>,
rpc_client: RpcClient,
incoming: UnboundedReceiverStream<BlockToProcess>,
block_requester: Sender<BlockRequest>,
storage: S,
block_modules: Vec<Box<dyn BlockModule + Send>>,
tx_modules: Vec<Box<dyn TxModule + Send>>,
msg_modules: Vec<Box<dyn MsgModule + Send>>,
}
#[allow(clippy::too_many_arguments)]
impl<S> BlockProcessor<S>
where
S: NyxdScraperStorage,
{
pub async fn new(
config: BlockProcessorConfig,
cancel: CancellationToken,
synced: Arc<Notify>,
incoming: UnboundedReceiver<BlockToProcess>,
block_requester: Sender<BlockRequest>,
storage: S,
rpc_client: RpcClient,
) -> Result<Self, ScraperError> {
let last_processed = storage.get_last_processed_height().await?;
let last_processed_height = last_processed.try_into().unwrap_or_default();
let last_pruned = storage.get_pruned_height().await?;
let last_pruned_height = last_pruned.try_into().unwrap_or_default();
debug!(last_processed_height = %last_processed_height, pruned_height = %last_pruned_height, "setting up block processor...");
Ok(BlockProcessor {
config,
cancel,
synced,
last_processed_height,
last_pruned_height,
last_processed_at: Instant::now(),
pending_sync: Default::default(),
queued_blocks: Default::default(),
rpc_client,
incoming: incoming.into(),
block_requester,
storage,
block_modules: vec![],
tx_modules: vec![],
msg_modules: vec![],
})
}
pub fn with_pruning(mut self, pruning_options: PruningOptions) -> Self {
self.config.pruning_options = pruning_options;
self
}
pub(super) async fn process_block(
&mut self,
block: BlockToProcess,
) -> Result<(), ScraperError> {
info!("processing block at height {}", block.height);
let full_info = self.rpc_client.try_get_full_details(block).await?;
debug!(
"this block has {} transaction(s)",
full_info.transactions.len()
);
for tx in &full_info.transactions {
debug!("{} has {} message(s)", tx.hash, tx.tx.body.messages.len());
for (index, msg) in tx.tx.body.messages.iter().enumerate() {
debug!("{index}: {:?}", msg.type_url)
}
}
let mut tx = self
.storage
.begin_processing_tx()
.await
.map_err(ScraperError::tx_begin_failure)?;
persist_block(&full_info, &mut tx, self.config.store_precommits).await?;
for block_module in &mut self.block_modules {
block_module.handle_block(&full_info, &mut tx).await?;
}
for block_tx in full_info.transactions {
for tx_module in &mut self.tx_modules {
tx_module.handle_tx(&block_tx, &mut tx).await?;
}
for (index, msg) in block_tx.tx.body.messages.iter().enumerate() {
for msg_module in &mut self.msg_modules {
if msg.type_url == msg_module.type_url() {
msg_module
.handle_msg(index, msg, &block_tx, &mut tx)
.await?
}
}
}
}
let commit_start = Instant::now();
tx.commit().await.map_err(ScraperError::tx_commit_failure)?;
crate::storage::helpers::log_db_operation_time("committing processing tx", commit_start);
self.last_processed_height = full_info.block.header.height.value() as u32;
self.last_processed_at = Instant::now();
if let Err(err) = self.maybe_prune_storage().await {
error!("failed to prune the storage: {err}");
}
Ok(())
}
pub fn set_block_modules(&mut self, modules: Vec<Box<dyn BlockModule + Send>>) {
self.block_modules = modules;
}
pub fn set_tx_modules(&mut self, modules: Vec<Box<dyn TxModule + Send>>) {
self.tx_modules = modules;
}
pub fn set_msg_modules(&mut self, modules: Vec<Box<dyn MsgModule + Send>>) {
self.msg_modules = modules;
}
pub(super) fn last_process_height(&self) -> u32 {
self.last_processed_height
}
async fn maybe_request_missing_blocks(&mut self) -> Result<(), ScraperError> {
if self.last_processed_at.elapsed() < MAX_MISSING_BLOCKS_DELAY {
debug!("no need to request missing blocks");
return Ok(());
}
if self.try_request_pending().await {
return Ok(());
}
let request_range = if let Some((next_available, _)) = self.queued_blocks.first_key_value()
{
self.last_processed_height + 1..*next_available
} else {
let current_height = self.rpc_client.current_block_height().await? as u32;
self.last_processed_height + 1..current_height + 1
};
self.request_missing_blocks(request_range).await?;
Ok(())
}
async fn request_missing_blocks(
&mut self,
request_range: Range<u32>,
) -> Result<(), ScraperError> {
let request_range = if request_range.len() > MAX_RANGE_SIZE {
let mut split = split_request_range(request_range);
#[allow(clippy::unwrap_used)]
let first = split.pop_front().unwrap();
self.pending_sync.queued_requests = split;
self.pending_sync.request_in_flight = first.clone().collect();
first
} else {
request_range
};
self.send_blocks_request(request_range).await
}
async fn send_blocks_request(&mut self, request_range: Range<u32>) -> Result<(), ScraperError> {
debug!("requesting missing blocks: {request_range:?}");
self.block_requester
.send(BlockRequest::Range(request_range))
.await?;
Ok(())
}
#[instrument(skip(self))]
async fn prune_storage(&mut self) -> Result<(), ScraperError> {
let keep_recent = self.config.pruning_options.strategy_keep_recent();
let last_to_keep = self.last_processed_height - keep_recent;
info!(
keep_recent,
oldest_to_keep = last_to_keep,
"pruning the storage"
);
let lowest: u32 = self
.storage
.lowest_block_height()
.await?
.unwrap_or_default()
.try_into()
.unwrap_or_default();
let to_prune = last_to_keep.saturating_sub(lowest);
match to_prune {
v if v > 1000 => warn!("approximately {v} blocks worth of data will be pruned"),
v if v > 100 => info!("approximately {v} blocks worth of data will be pruned"),
0 => trace!("no blocks to prune"),
v => debug!("approximately {v} blocks worth of data will be pruned"),
}
if to_prune == 0 {
self.last_pruned_height = self.last_processed_height;
return Ok(());
}
self.storage
.prune_storage(last_to_keep, self.last_processed_height)
.await?;
self.last_pruned_height = self.last_processed_height;
Ok(())
}
async fn maybe_prune_storage(&mut self) -> Result<(), ScraperError> {
debug!("checking for storage pruning");
if self.config.pruning_options.strategy.is_nothing() {
trace!("the current pruning strategy is 'nothing'");
return Ok(());
}
let interval = self.config.pruning_options.strategy_interval();
if self.last_pruned_height + interval <= self.last_processed_height {
self.prune_storage().await?;
}
Ok(())
}
async fn next_incoming(&mut self, block: BlockToProcess) {
let height = block.height;
self.pending_sync.request_in_flight.remove(&height);
if self.last_processed_height == 0 {
debug!("setting up initial processing height");
self.last_processed_height = height - 1
}
if height <= self.last_processed_height {
warn!("we have already processed block for height {height}");
return;
}
if self.last_processed_height + 1 != height {
if self.queued_blocks.insert(height, block).is_some() {
warn!("we have already queued up block for height {height}");
}
return;
}
if let Err(err) = self.process_block(block).await {
error!("failed to process block at height {height}: {err}");
return;
}
let mut next = height + 1;
while let Some(next_block) = self.queued_blocks.remove(&next) {
if let Err(err) = self.process_block(next_block).await {
error!("failed to process queued-up block at height {next}: {err}")
}
next += 1;
}
self.try_request_pending().await;
if self.pending_sync.is_empty() {
self.synced.notify_one();
}
}
async fn try_request_pending(&mut self) -> bool {
if self.pending_sync.request_in_flight.is_empty() {
if let Some(next_sync) = self.pending_sync.queued_requests.pop_front() {
debug!(
"current request range has been resolved. requesting another bunch of blocks"
);
if let Err(err) = self.send_blocks_request(next_sync.clone()).await {
error!("failed to request resync blocks: {err}");
self.pending_sync.queued_requests.push_front(next_sync);
} else {
self.pending_sync.request_in_flight = next_sync.collect()
}
return true;
}
}
false
}
async fn startup_resync(&mut self) -> Result<(), ScraperError> {
assert!(self.pending_sync.is_empty());
info!("attempting to run startup resync...");
self.maybe_prune_storage().await?;
let latest_block = self.rpc_client.current_block_height().await? as u32;
info!("obtained latest block height: {latest_block}");
if latest_block > self.last_processed_height && self.last_processed_height != 0 {
info!("we have already processed some blocks in the past - attempting to resume...");
let keep_recent = self.config.pruning_options.strategy_keep_recent();
let last_to_keep = latest_block - keep_recent;
if !self.config.pruning_options.strategy.is_nothing() {
self.last_processed_height = max(self.last_processed_height, last_to_keep);
}
let request_range = self.last_processed_height + 1..latest_block + 1;
info!(
keep_recent = %keep_recent,
last_to_keep = %last_to_keep,
last_processed_height = %self.last_processed_height,
"we need to request {request_range:?} to resync"
);
self.request_missing_blocks(request_range).await?;
return Ok(());
}
if self.last_processed_height == 0 {
info!("this is the first time starting up");
let Some(starting_height) = self.config.explicit_starting_block_height else {
info!("no starting block height set - will use the default behaviour");
return Ok(());
};
info!("attempting to start the scraper from block {starting_height}");
let earliest_available =
self.rpc_client.earliest_available_block_height().await? as u32;
info!("earliest available block height: {earliest_available}");
if earliest_available > starting_height && self.config.use_best_effort_start_height {
error!("the earliest available block is higher than the desired starting height");
return Err(ScraperError::BlocksUnavailable {
height: starting_height,
});
}
let starting_height = if earliest_available > starting_height {
earliest_available + 10
} else {
starting_height
};
let request_range = starting_height..latest_block + 1;
info!("going to start the scraper from block {starting_height}");
info!("we need to request {request_range:?} before properly starting up");
self.request_missing_blocks(request_range).await?;
}
Ok(())
}
pub(crate) async fn run(&mut self) {
info!("starting block processor processing loop");
let mut missing_check_interval = interval_at(
Instant::now().add(MISSING_BLOCKS_CHECK_INTERVAL),
MISSING_BLOCKS_CHECK_INTERVAL,
);
if let Err(err) = self.startup_resync().await {
error!("failed to perform startup sync: {err}");
self.cancel.cancel();
return;
};
loop {
tokio::select! {
_ = self.cancel.cancelled() => {
info!("received cancellation token");
break
}
_ = missing_check_interval.tick() => {
if let Err(err) = self.maybe_request_missing_blocks().await {
error!("failed to request missing blocks: {err}")
}
}
block = self.incoming.next() => {
match block {
Some(block) => self.next_incoming(block).await,
None => {
warn!("stopped receiving new blocks");
self.cancel.cancel();
break
}
}
}
}
}
}
}