use crate::{
errors::{BlockProcessResult, RuleError},
model::{
services::{reachability::MTReachabilityService, relations::MTRelationsService},
stores::{
block_transactions::DbBlockTransactionsStore,
block_window_cache::BlockWindowCacheStore,
ghostdag::DbGhostdagStore,
headers::DbHeadersStore,
reachability::DbReachabilityStore,
relations::DbRelationsStore,
statuses::{DbStatusesStore, StatusesStore, StatusesStoreBatchExtensions, StatusesStoreReader},
tips::DbTipsStore,
DB,
},
},
pipeline::{
deps_manager::{BlockProcessingMessage, BlockTaskDependencyManager, TaskId},
ProcessingCounters,
},
processes::{
coinbase::CoinbaseManager, mass::MassCalculator, past_median_time::PastMedianTimeManager,
transaction_validator::TransactionValidator,
},
};
use crossbeam_channel::{Receiver, Sender};
use kaspa_consensus_core::{
block::Block,
blockstatus::BlockStatus::{self, StatusHeaderOnly, StatusInvalid},
config::genesis::GenesisBlock,
tx::Transaction,
};
use kaspa_consensus_notify::{
notification::{BlockAddedNotification, Notification},
root::ConsensusNotificationRoot,
};
use kaspa_hashes::Hash;
use kaspa_notify::notifier::Notify;
use parking_lot::RwLock;
use rayon::ThreadPool;
use rocksdb::WriteBatch;
use std::sync::{atomic::Ordering, Arc};
pub struct BlockBodyProcessor {
receiver: Receiver<BlockProcessingMessage>,
sender: Sender<BlockProcessingMessage>,
pub(super) thread_pool: Arc<ThreadPool>,
db: Arc<DB>,
pub(super) max_block_mass: u64,
pub(super) genesis: GenesisBlock,
pub(super) statuses_store: Arc<RwLock<DbStatusesStore>>,
pub(super) ghostdag_store: Arc<DbGhostdagStore>,
pub(super) headers_store: Arc<DbHeadersStore>,
pub(super) block_transactions_store: Arc<DbBlockTransactionsStore>,
pub(super) body_tips_store: Arc<RwLock<DbTipsStore>>,
pub(super) reachability_service: MTReachabilityService<DbReachabilityStore>,
pub(super) coinbase_manager: CoinbaseManager,
pub(crate) mass_calculator: MassCalculator,
pub(super) transaction_validator: TransactionValidator,
pub(super) past_median_time_manager: PastMedianTimeManager<
DbHeadersStore,
DbGhostdagStore,
BlockWindowCacheStore,
DbReachabilityStore,
MTRelationsService<DbRelationsStore>,
>,
task_manager: BlockTaskDependencyManager,
pub(crate) notification_root: Arc<ConsensusNotificationRoot>,
counters: Arc<ProcessingCounters>,
}
impl BlockBodyProcessor {
#[allow(clippy::too_many_arguments)]
pub fn new(
receiver: Receiver<BlockProcessingMessage>,
sender: Sender<BlockProcessingMessage>,
thread_pool: Arc<ThreadPool>,
db: Arc<DB>,
statuses_store: Arc<RwLock<DbStatusesStore>>,
ghostdag_store: Arc<DbGhostdagStore>,
headers_store: Arc<DbHeadersStore>,
block_transactions_store: Arc<DbBlockTransactionsStore>,
body_tips_store: Arc<RwLock<DbTipsStore>>,
reachability_service: MTReachabilityService<DbReachabilityStore>,
coinbase_manager: CoinbaseManager,
mass_calculator: MassCalculator,
transaction_validator: TransactionValidator,
past_median_time_manager: PastMedianTimeManager<
DbHeadersStore,
DbGhostdagStore,
BlockWindowCacheStore,
DbReachabilityStore,
MTRelationsService<DbRelationsStore>,
>,
max_block_mass: u64,
genesis: GenesisBlock,
notification_root: Arc<ConsensusNotificationRoot>,
counters: Arc<ProcessingCounters>,
) -> Self {
Self {
receiver,
sender,
thread_pool,
db,
statuses_store,
reachability_service,
ghostdag_store,
headers_store,
block_transactions_store,
body_tips_store,
coinbase_manager,
mass_calculator,
transaction_validator,
past_median_time_manager,
max_block_mass,
genesis,
task_manager: BlockTaskDependencyManager::new(),
notification_root,
counters,
}
}
pub fn worker(self: &Arc<BlockBodyProcessor>) {
while let Ok(msg) = self.receiver.recv() {
match msg {
BlockProcessingMessage::Exit => break,
BlockProcessingMessage::Process(task, result_transmitter) => {
if let Some(task_id) = self.task_manager.register(task, result_transmitter) {
let processor = self.clone();
self.thread_pool.spawn(move || {
processor.queue_block(task_id);
});
}
}
};
}
self.task_manager.wait_for_idle();
self.sender.send(BlockProcessingMessage::Exit).unwrap();
}
fn queue_block(self: &Arc<BlockBodyProcessor>, task_id: TaskId) {
if let Some(task) = self.task_manager.try_begin(task_id) {
let res = self.process_block_body(&task.block, task.trusted_ghostdag_data.is_some());
let dependent_tasks = self.task_manager.end(task, |task, result_transmitter| {
if res.is_err() {
let _ = result_transmitter.send(res.clone());
} else {
self.sender.send(BlockProcessingMessage::Process(task, result_transmitter)).unwrap();
}
});
for dep in dependent_tasks {
let processor = self.clone();
self.thread_pool.spawn(move || processor.queue_block(dep));
}
}
}
fn process_block_body(self: &Arc<BlockBodyProcessor>, block: &Block, is_trusted: bool) -> BlockProcessResult<BlockStatus> {
let status = self.statuses_store.read().get(block.hash()).unwrap();
match status {
StatusInvalid => return Err(RuleError::KnownInvalid),
StatusHeaderOnly => {} _ if status.has_block_body() => return Ok(status),
_ => panic!("unexpected block status {status:?}"),
}
if let Err(e) = self.validate_body(block, is_trusted) {
if !matches!(e, RuleError::BadMerkleRoot(_, _) | RuleError::MissingParents(_)) {
self.statuses_store.write().set(block.hash(), BlockStatus::StatusInvalid).unwrap();
}
return Err(e);
}
self.commit_body(block.hash(), block.header.direct_parents(), block.transactions.clone());
let _ = self.notification_root.notify(Notification::BlockAdded(BlockAddedNotification::new(block.to_owned())));
self.counters.body_counts.fetch_add(1, Ordering::Relaxed);
self.counters.txs_counts.fetch_add(block.transactions.len() as u64, Ordering::Relaxed);
Ok(BlockStatus::StatusUTXOPendingVerification)
}
fn validate_body(self: &Arc<BlockBodyProcessor>, block: &Block, is_trusted: bool) -> BlockProcessResult<()> {
self.validate_body_in_isolation(block)?;
if !is_trusted {
return self.validate_body_in_context(block);
}
Ok(())
}
fn commit_body(self: &Arc<BlockBodyProcessor>, hash: Hash, parents: &[Hash], transactions: Arc<Vec<Transaction>>) {
let mut batch = WriteBatch::default();
self.block_transactions_store.insert_batch(&mut batch, hash, transactions).unwrap();
let mut body_tips_write_guard = self.body_tips_store.write();
body_tips_write_guard.add_tip_batch(&mut batch, hash, parents).unwrap();
let statuses_write_guard =
self.statuses_store.set_batch(&mut batch, hash, BlockStatus::StatusUTXOPendingVerification).unwrap();
self.db.write(batch).unwrap();
drop(statuses_write_guard);
drop(body_tips_write_guard);
}
pub fn process_genesis(self: &Arc<BlockBodyProcessor>) {
let mut batch = WriteBatch::default();
let mut body_tips_write_guard = self.body_tips_store.write();
body_tips_write_guard.init_batch(&mut batch, &[]).unwrap();
self.db.write(batch).unwrap();
drop(body_tips_write_guard);
self.commit_body(self.genesis.hash, &[], Arc::new(self.genesis.build_genesis_transactions()))
}
}