kaspa-consensus 0.0.2

Kaspa consensus library
use crate::{
    errors::{BlockProcessResult, RuleError},
    model::{
        services::{reachability::MTReachabilityService, relations::MTRelationsService},
        stores::{
            block_transactions::DbBlockTransactionsStore,
            block_window_cache::BlockWindowCacheStore,
            ghostdag::DbGhostdagStore,
            headers::DbHeadersStore,
            reachability::DbReachabilityStore,
            relations::DbRelationsStore,
            statuses::{DbStatusesStore, StatusesStore, StatusesStoreBatchExtensions, StatusesStoreReader},
            tips::DbTipsStore,
            DB,
        },
    },
    pipeline::{
        deps_manager::{BlockProcessingMessage, BlockTaskDependencyManager, TaskId},
        ProcessingCounters,
    },
    processes::{
        coinbase::CoinbaseManager, mass::MassCalculator, past_median_time::PastMedianTimeManager,
        transaction_validator::TransactionValidator,
    },
};
use crossbeam_channel::{Receiver, Sender};
use kaspa_consensus_core::{
    block::Block,
    blockstatus::BlockStatus::{self, StatusHeaderOnly, StatusInvalid},
    config::genesis::GenesisBlock,
    tx::Transaction,
};
use kaspa_consensus_notify::{
    notification::{BlockAddedNotification, Notification},
    root::ConsensusNotificationRoot,
};
use kaspa_hashes::Hash;
use kaspa_notify::notifier::Notify;
use parking_lot::RwLock;
use rayon::ThreadPool;
use rocksdb::WriteBatch;
use std::sync::{atomic::Ordering, Arc};

pub struct BlockBodyProcessor {
    // Channels
    receiver: Receiver<BlockProcessingMessage>,
    sender: Sender<BlockProcessingMessage>,

    // Thread pool
    pub(super) thread_pool: Arc<ThreadPool>,

    // DB
    db: Arc<DB>,

    // Config
    pub(super) max_block_mass: u64,
    pub(super) genesis: GenesisBlock,

    // Stores
    pub(super) statuses_store: Arc<RwLock<DbStatusesStore>>,
    pub(super) ghostdag_store: Arc<DbGhostdagStore>,
    pub(super) headers_store: Arc<DbHeadersStore>,
    pub(super) block_transactions_store: Arc<DbBlockTransactionsStore>,
    pub(super) body_tips_store: Arc<RwLock<DbTipsStore>>,

    // Managers and services
    pub(super) reachability_service: MTReachabilityService<DbReachabilityStore>,
    pub(super) coinbase_manager: CoinbaseManager,
    pub(crate) mass_calculator: MassCalculator,
    pub(super) transaction_validator: TransactionValidator,
    pub(super) past_median_time_manager: PastMedianTimeManager<
        DbHeadersStore,
        DbGhostdagStore,
        BlockWindowCacheStore,
        DbReachabilityStore,
        MTRelationsService<DbRelationsStore>,
    >,

    // Dependency manager
    task_manager: BlockTaskDependencyManager,

    pub(crate) notification_root: Arc<ConsensusNotificationRoot>,

    // Counters
    counters: Arc<ProcessingCounters>,
}

impl BlockBodyProcessor {
    #[allow(clippy::too_many_arguments)]
    pub fn new(
        receiver: Receiver<BlockProcessingMessage>,
        sender: Sender<BlockProcessingMessage>,
        thread_pool: Arc<ThreadPool>,
        db: Arc<DB>,
        statuses_store: Arc<RwLock<DbStatusesStore>>,
        ghostdag_store: Arc<DbGhostdagStore>,
        headers_store: Arc<DbHeadersStore>,
        block_transactions_store: Arc<DbBlockTransactionsStore>,
        body_tips_store: Arc<RwLock<DbTipsStore>>,
        reachability_service: MTReachabilityService<DbReachabilityStore>,
        coinbase_manager: CoinbaseManager,
        mass_calculator: MassCalculator,
        transaction_validator: TransactionValidator,
        past_median_time_manager: PastMedianTimeManager<
            DbHeadersStore,
            DbGhostdagStore,
            BlockWindowCacheStore,
            DbReachabilityStore,
            MTRelationsService<DbRelationsStore>,
        >,
        max_block_mass: u64,
        genesis: GenesisBlock,
        notification_root: Arc<ConsensusNotificationRoot>,
        counters: Arc<ProcessingCounters>,
    ) -> Self {
        Self {
            receiver,
            sender,
            thread_pool,
            db,
            statuses_store,
            reachability_service,
            ghostdag_store,
            headers_store,
            block_transactions_store,
            body_tips_store,
            coinbase_manager,
            mass_calculator,
            transaction_validator,
            past_median_time_manager,
            max_block_mass,
            genesis,
            task_manager: BlockTaskDependencyManager::new(),
            notification_root,
            counters,
        }
    }

    pub fn worker(self: &Arc<BlockBodyProcessor>) {
        while let Ok(msg) = self.receiver.recv() {
            match msg {
                BlockProcessingMessage::Exit => break,
                BlockProcessingMessage::Process(task, result_transmitter) => {
                    if let Some(task_id) = self.task_manager.register(task, result_transmitter) {
                        let processor = self.clone();
                        self.thread_pool.spawn(move || {
                            processor.queue_block(task_id);
                        });
                    }
                }
            };
        }

        // Wait until all workers are idle before exiting
        self.task_manager.wait_for_idle();

        // Pass the exit signal on to the following processor
        self.sender.send(BlockProcessingMessage::Exit).unwrap();
    }

    fn queue_block(self: &Arc<BlockBodyProcessor>, task_id: TaskId) {
        if let Some(task) = self.task_manager.try_begin(task_id) {
            let res = self.process_block_body(&task.block, task.trusted_ghostdag_data.is_some());

            let dependent_tasks = self.task_manager.end(task, |task, result_transmitter| {
                if res.is_err() {
                    // We don't care if receivers were dropped
                    let _ = result_transmitter.send(res.clone());
                } else {
                    self.sender.send(BlockProcessingMessage::Process(task, result_transmitter)).unwrap();
                }
            });

            for dep in dependent_tasks {
                let processor = self.clone();
                self.thread_pool.spawn(move || processor.queue_block(dep));
            }
        }
    }

    fn process_block_body(self: &Arc<BlockBodyProcessor>, block: &Block, is_trusted: bool) -> BlockProcessResult<BlockStatus> {
        let status = self.statuses_store.read().get(block.hash()).unwrap();
        match status {
            StatusInvalid => return Err(RuleError::KnownInvalid),
            StatusHeaderOnly => {} // Proceed to body processing
            _ if status.has_block_body() => return Ok(status),
            _ => panic!("unexpected block status {status:?}"),
        }

        if let Err(e) = self.validate_body(block, is_trusted) {
            // We mark invalid blocks with status StatusInvalid except in the
            // case of the following errors:
            // MissingParents - If we got MissingParents the block shouldn't be
            // considered as invalid because it could be added later on when its
            // parents are present.
            // BadMerkleRoot - if we get BadMerkleRoot we shouldn't mark the
            // block as invalid because later on we can get the block with
            // transactions that fits the merkle root.
            // PrunedBlock - PrunedBlock is an error that rejects a block body and
            // not the block as a whole, so we shouldn't mark it as invalid.
            // TODO: implement the last part.
            if !matches!(e, RuleError::BadMerkleRoot(_, _) | RuleError::MissingParents(_)) {
                self.statuses_store.write().set(block.hash(), BlockStatus::StatusInvalid).unwrap();
            }
            return Err(e);
        }

        self.commit_body(block.hash(), block.header.direct_parents(), block.transactions.clone());

        // Send a BlockAdded notification
        // TODO: handle notify errors
        let _ = self.notification_root.notify(Notification::BlockAdded(BlockAddedNotification::new(block.to_owned())));

        // Report counters
        self.counters.body_counts.fetch_add(1, Ordering::Relaxed);
        self.counters.txs_counts.fetch_add(block.transactions.len() as u64, Ordering::Relaxed);
        Ok(BlockStatus::StatusUTXOPendingVerification)
    }

    fn validate_body(self: &Arc<BlockBodyProcessor>, block: &Block, is_trusted: bool) -> BlockProcessResult<()> {
        self.validate_body_in_isolation(block)?;
        if !is_trusted {
            // TODO: Check that it's safe to skip this check if the block is trusted.
            return self.validate_body_in_context(block);
        }
        Ok(())
    }

    fn commit_body(self: &Arc<BlockBodyProcessor>, hash: Hash, parents: &[Hash], transactions: Arc<Vec<Transaction>>) {
        let mut batch = WriteBatch::default();

        // This is an append only store so it requires no lock.
        self.block_transactions_store.insert_batch(&mut batch, hash, transactions).unwrap();

        let mut body_tips_write_guard = self.body_tips_store.write();
        body_tips_write_guard.add_tip_batch(&mut batch, hash, parents).unwrap();
        let statuses_write_guard =
            self.statuses_store.set_batch(&mut batch, hash, BlockStatus::StatusUTXOPendingVerification).unwrap();

        self.db.write(batch).unwrap();

        // Calling the drops explicitly after the batch is written in order to avoid possible errors.
        drop(statuses_write_guard);
        drop(body_tips_write_guard);
    }

    pub fn process_genesis(self: &Arc<BlockBodyProcessor>) {
        // Init tips store
        let mut batch = WriteBatch::default();
        let mut body_tips_write_guard = self.body_tips_store.write();
        body_tips_write_guard.init_batch(&mut batch, &[]).unwrap();
        self.db.write(batch).unwrap();
        drop(body_tips_write_guard);

        // Write the genesis body
        self.commit_body(self.genesis.hash, &[], Arc::new(self.genesis.build_genesis_transactions()))
    }
}