kaspa_consensus/pipeline/body_processor/
processor.rs

1use crate::{
2    consensus::services::DbWindowManager,
3    errors::{BlockProcessResult, RuleError},
4    model::{
5        services::reachability::MTReachabilityService,
6        stores::{
7            block_transactions::DbBlockTransactionsStore,
8            ghostdag::DbGhostdagStore,
9            headers::DbHeadersStore,
10            reachability::DbReachabilityStore,
11            statuses::{DbStatusesStore, StatusesStore, StatusesStoreBatchExtensions, StatusesStoreReader},
12            tips::{DbTipsStore, TipsStore},
13            DB,
14        },
15    },
16    pipeline::{
17        deps_manager::{BlockProcessingMessage, BlockTaskDependencyManager, TaskId, VirtualStateProcessingMessage},
18        ProcessingCounters,
19    },
20    processes::{coinbase::CoinbaseManager, transaction_validator::TransactionValidator},
21};
22use crossbeam_channel::{Receiver, Sender};
23use kaspa_consensus_core::{
24    block::Block,
25    blockstatus::BlockStatus::{self, StatusHeaderOnly, StatusInvalid},
26    config::genesis::GenesisBlock,
27    mass::MassCalculator,
28    tx::Transaction,
29};
30use kaspa_consensus_notify::{
31    notification::{BlockAddedNotification, Notification},
32    root::ConsensusNotificationRoot,
33};
34use kaspa_consensusmanager::SessionLock;
35use kaspa_hashes::Hash;
36use kaspa_notify::notifier::Notify;
37use parking_lot::RwLock;
38use rayon::ThreadPool;
39use rocksdb::WriteBatch;
40use std::sync::{atomic::Ordering, Arc};
41
42pub struct BlockBodyProcessor {
43    // Channels
44    receiver: Receiver<BlockProcessingMessage>,
45    sender: Sender<VirtualStateProcessingMessage>,
46
47    // Thread pool
48    pub(super) thread_pool: Arc<ThreadPool>,
49
50    // DB
51    db: Arc<DB>,
52
53    // Config
54    pub(super) max_block_mass: u64,
55    pub(super) genesis: GenesisBlock,
56
57    // Stores
58    pub(super) statuses_store: Arc<RwLock<DbStatusesStore>>,
59    pub(super) ghostdag_store: Arc<DbGhostdagStore>,
60    pub(super) headers_store: Arc<DbHeadersStore>,
61    pub(super) block_transactions_store: Arc<DbBlockTransactionsStore>,
62    pub(super) body_tips_store: Arc<RwLock<DbTipsStore>>,
63
64    // Managers and services
65    pub(super) reachability_service: MTReachabilityService<DbReachabilityStore>,
66    pub(super) coinbase_manager: CoinbaseManager,
67    pub(crate) mass_calculator: MassCalculator,
68    pub(super) transaction_validator: TransactionValidator,
69    pub(super) window_manager: DbWindowManager,
70
71    // Pruning lock
72    pruning_lock: SessionLock,
73
74    // Dependency manager
75    task_manager: BlockTaskDependencyManager,
76
77    // Notifier
78    notification_root: Arc<ConsensusNotificationRoot>,
79
80    // Counters
81    counters: Arc<ProcessingCounters>,
82
83    /// Storage mass hardfork DAA score
84    pub(crate) storage_mass_activation_daa_score: u64,
85}
86
87impl BlockBodyProcessor {
88    #[allow(clippy::too_many_arguments)]
89    pub fn new(
90        receiver: Receiver<BlockProcessingMessage>,
91        sender: Sender<VirtualStateProcessingMessage>,
92        thread_pool: Arc<ThreadPool>,
93
94        db: Arc<DB>,
95        statuses_store: Arc<RwLock<DbStatusesStore>>,
96        ghostdag_store: Arc<DbGhostdagStore>,
97        headers_store: Arc<DbHeadersStore>,
98        block_transactions_store: Arc<DbBlockTransactionsStore>,
99        body_tips_store: Arc<RwLock<DbTipsStore>>,
100
101        reachability_service: MTReachabilityService<DbReachabilityStore>,
102        coinbase_manager: CoinbaseManager,
103        mass_calculator: MassCalculator,
104        transaction_validator: TransactionValidator,
105        window_manager: DbWindowManager,
106        max_block_mass: u64,
107        genesis: GenesisBlock,
108        pruning_lock: SessionLock,
109        notification_root: Arc<ConsensusNotificationRoot>,
110        counters: Arc<ProcessingCounters>,
111        storage_mass_activation_daa_score: u64,
112    ) -> Self {
113        Self {
114            receiver,
115            sender,
116            thread_pool,
117            db,
118            statuses_store,
119            reachability_service,
120            ghostdag_store,
121            headers_store,
122            block_transactions_store,
123            body_tips_store,
124            coinbase_manager,
125            mass_calculator,
126            transaction_validator,
127            window_manager,
128            max_block_mass,
129            genesis,
130            pruning_lock,
131            task_manager: BlockTaskDependencyManager::new(),
132            notification_root,
133            counters,
134            storage_mass_activation_daa_score,
135        }
136    }
137
138    pub fn worker(self: &Arc<BlockBodyProcessor>) {
139        while let Ok(msg) = self.receiver.recv() {
140            match msg {
141                BlockProcessingMessage::Exit => break,
142                BlockProcessingMessage::Process(task, block_result_transmitter, virtual_result_transmitter) => {
143                    if let Some(task_id) = self.task_manager.register(task, block_result_transmitter, virtual_result_transmitter) {
144                        let processor = self.clone();
145                        self.thread_pool.spawn(move || {
146                            processor.queue_block(task_id);
147                        });
148                    }
149                }
150            };
151        }
152
153        // Wait until all workers are idle before exiting
154        self.task_manager.wait_for_idle();
155
156        // Pass the exit signal on to the following processor
157        self.sender.send(VirtualStateProcessingMessage::Exit).unwrap();
158    }
159
160    fn queue_block(self: &Arc<BlockBodyProcessor>, task_id: TaskId) {
161        if let Some(task) = self.task_manager.try_begin(task_id) {
162            let res = self.process_body(task.block(), task.is_trusted());
163
164            let dependent_tasks = self.task_manager.end(task, |task, block_result_transmitter, virtual_state_result_transmitter| {
165                let _ = block_result_transmitter.send(res.clone());
166                if res.is_err() || !task.requires_virtual_processing() {
167                    // We don't care if receivers were dropped
168                    let _ = virtual_state_result_transmitter.send(res.clone());
169                } else {
170                    self.sender.send(VirtualStateProcessingMessage::Process(task, virtual_state_result_transmitter)).unwrap();
171                }
172            });
173
174            for dep in dependent_tasks {
175                let processor = self.clone();
176                self.thread_pool.spawn(move || processor.queue_block(dep));
177            }
178        }
179    }
180
181    fn process_body(self: &Arc<BlockBodyProcessor>, block: &Block, is_trusted: bool) -> BlockProcessResult<BlockStatus> {
182        let _prune_guard = self.pruning_lock.blocking_read();
183        let status = self.statuses_store.read().get(block.hash()).unwrap();
184        match status {
185            StatusInvalid => return Err(RuleError::KnownInvalid),
186            StatusHeaderOnly => {} // Proceed to body processing
187            _ if status.has_block_body() => return Ok(status),
188            _ => panic!("unexpected block status {status:?}"),
189        }
190
191        let mass = match self.validate_body(block, is_trusted) {
192            Ok(mass) => mass,
193            Err(e) => {
194                // We mark invalid blocks with status StatusInvalid except in the
195                // case of the following errors:
196                // MissingParents - If we got MissingParents the block shouldn't be
197                // considered as invalid because it could be added later on when its
198                // parents are present.
199                // BadMerkleRoot - if we get BadMerkleRoot we shouldn't mark the
200                // block as invalid because later on we can get the block with
201                // transactions that fits the merkle root.
202                // PrunedBlock - PrunedBlock is an error that rejects a block body and
203                // not the block as a whole, so we shouldn't mark it as invalid.
204                if !matches!(e, RuleError::BadMerkleRoot(_, _) | RuleError::MissingParents(_) | RuleError::PrunedBlock) {
205                    self.statuses_store.write().set(block.hash(), BlockStatus::StatusInvalid).unwrap();
206                }
207                return Err(e);
208            }
209        };
210
211        self.commit_body(block.hash(), block.header.direct_parents(), block.transactions.clone());
212
213        // Send a BlockAdded notification
214        self.notification_root
215            .notify(Notification::BlockAdded(BlockAddedNotification::new(block.to_owned())))
216            .expect("expecting an open unbounded channel");
217
218        // Report counters
219        self.counters.body_counts.fetch_add(1, Ordering::Relaxed);
220        self.counters.txs_counts.fetch_add(block.transactions.len() as u64, Ordering::Relaxed);
221        self.counters.mass_counts.fetch_add(mass, Ordering::Relaxed);
222        Ok(BlockStatus::StatusUTXOPendingVerification)
223    }
224
225    fn validate_body(self: &Arc<BlockBodyProcessor>, block: &Block, is_trusted: bool) -> BlockProcessResult<u64> {
226        let mass = self.validate_body_in_isolation(block)?;
227        if !is_trusted {
228            self.validate_body_in_context(block)?;
229        }
230        Ok(mass)
231    }
232
233    fn commit_body(self: &Arc<BlockBodyProcessor>, hash: Hash, parents: &[Hash], transactions: Arc<Vec<Transaction>>) {
234        let mut batch = WriteBatch::default();
235
236        // This is an append only store so it requires no lock.
237        self.block_transactions_store.insert_batch(&mut batch, hash, transactions).unwrap();
238
239        let mut body_tips_write_guard = self.body_tips_store.write();
240        body_tips_write_guard.add_tip_batch(&mut batch, hash, parents).unwrap();
241        let statuses_write_guard =
242            self.statuses_store.set_batch(&mut batch, hash, BlockStatus::StatusUTXOPendingVerification).unwrap();
243
244        self.db.write(batch).unwrap();
245
246        // Calling the drops explicitly after the batch is written in order to avoid possible errors.
247        drop(statuses_write_guard);
248        drop(body_tips_write_guard);
249    }
250
251    pub fn process_genesis(self: &Arc<BlockBodyProcessor>) {
252        // Init tips store
253        let mut batch = WriteBatch::default();
254        let mut body_tips_write_guard = self.body_tips_store.write();
255        body_tips_write_guard.init_batch(&mut batch, &[]).unwrap();
256        self.db.write(batch).unwrap();
257        drop(body_tips_write_guard);
258
259        // Write the genesis body
260        self.commit_body(self.genesis.hash, &[], Arc::new(self.genesis.build_genesis_transactions()))
261    }
262}