kaspa_consensus/pipeline/body_processor/
processor.rs1use crate::{
2 consensus::services::DbWindowManager,
3 errors::{BlockProcessResult, RuleError},
4 model::{
5 services::reachability::MTReachabilityService,
6 stores::{
7 block_transactions::DbBlockTransactionsStore,
8 ghostdag::DbGhostdagStore,
9 headers::DbHeadersStore,
10 reachability::DbReachabilityStore,
11 statuses::{DbStatusesStore, StatusesStore, StatusesStoreBatchExtensions, StatusesStoreReader},
12 tips::{DbTipsStore, TipsStore},
13 DB,
14 },
15 },
16 pipeline::{
17 deps_manager::{BlockProcessingMessage, BlockTaskDependencyManager, TaskId, VirtualStateProcessingMessage},
18 ProcessingCounters,
19 },
20 processes::{coinbase::CoinbaseManager, transaction_validator::TransactionValidator},
21};
22use crossbeam_channel::{Receiver, Sender};
23use kaspa_consensus_core::{
24 block::Block,
25 blockstatus::BlockStatus::{self, StatusHeaderOnly, StatusInvalid},
26 config::genesis::GenesisBlock,
27 mass::MassCalculator,
28 tx::Transaction,
29};
30use kaspa_consensus_notify::{
31 notification::{BlockAddedNotification, Notification},
32 root::ConsensusNotificationRoot,
33};
34use kaspa_consensusmanager::SessionLock;
35use kaspa_hashes::Hash;
36use kaspa_notify::notifier::Notify;
37use parking_lot::RwLock;
38use rayon::ThreadPool;
39use rocksdb::WriteBatch;
40use std::sync::{atomic::Ordering, Arc};
41
42pub struct BlockBodyProcessor {
43 receiver: Receiver<BlockProcessingMessage>,
45 sender: Sender<VirtualStateProcessingMessage>,
46
47 pub(super) thread_pool: Arc<ThreadPool>,
49
50 db: Arc<DB>,
52
53 pub(super) max_block_mass: u64,
55 pub(super) genesis: GenesisBlock,
56
57 pub(super) statuses_store: Arc<RwLock<DbStatusesStore>>,
59 pub(super) ghostdag_store: Arc<DbGhostdagStore>,
60 pub(super) headers_store: Arc<DbHeadersStore>,
61 pub(super) block_transactions_store: Arc<DbBlockTransactionsStore>,
62 pub(super) body_tips_store: Arc<RwLock<DbTipsStore>>,
63
64 pub(super) reachability_service: MTReachabilityService<DbReachabilityStore>,
66 pub(super) coinbase_manager: CoinbaseManager,
67 pub(crate) mass_calculator: MassCalculator,
68 pub(super) transaction_validator: TransactionValidator,
69 pub(super) window_manager: DbWindowManager,
70
71 pruning_lock: SessionLock,
73
74 task_manager: BlockTaskDependencyManager,
76
77 notification_root: Arc<ConsensusNotificationRoot>,
79
80 counters: Arc<ProcessingCounters>,
82
83 pub(crate) storage_mass_activation_daa_score: u64,
85}
86
87impl BlockBodyProcessor {
88 #[allow(clippy::too_many_arguments)]
89 pub fn new(
90 receiver: Receiver<BlockProcessingMessage>,
91 sender: Sender<VirtualStateProcessingMessage>,
92 thread_pool: Arc<ThreadPool>,
93
94 db: Arc<DB>,
95 statuses_store: Arc<RwLock<DbStatusesStore>>,
96 ghostdag_store: Arc<DbGhostdagStore>,
97 headers_store: Arc<DbHeadersStore>,
98 block_transactions_store: Arc<DbBlockTransactionsStore>,
99 body_tips_store: Arc<RwLock<DbTipsStore>>,
100
101 reachability_service: MTReachabilityService<DbReachabilityStore>,
102 coinbase_manager: CoinbaseManager,
103 mass_calculator: MassCalculator,
104 transaction_validator: TransactionValidator,
105 window_manager: DbWindowManager,
106 max_block_mass: u64,
107 genesis: GenesisBlock,
108 pruning_lock: SessionLock,
109 notification_root: Arc<ConsensusNotificationRoot>,
110 counters: Arc<ProcessingCounters>,
111 storage_mass_activation_daa_score: u64,
112 ) -> Self {
113 Self {
114 receiver,
115 sender,
116 thread_pool,
117 db,
118 statuses_store,
119 reachability_service,
120 ghostdag_store,
121 headers_store,
122 block_transactions_store,
123 body_tips_store,
124 coinbase_manager,
125 mass_calculator,
126 transaction_validator,
127 window_manager,
128 max_block_mass,
129 genesis,
130 pruning_lock,
131 task_manager: BlockTaskDependencyManager::new(),
132 notification_root,
133 counters,
134 storage_mass_activation_daa_score,
135 }
136 }
137
138 pub fn worker(self: &Arc<BlockBodyProcessor>) {
139 while let Ok(msg) = self.receiver.recv() {
140 match msg {
141 BlockProcessingMessage::Exit => break,
142 BlockProcessingMessage::Process(task, block_result_transmitter, virtual_result_transmitter) => {
143 if let Some(task_id) = self.task_manager.register(task, block_result_transmitter, virtual_result_transmitter) {
144 let processor = self.clone();
145 self.thread_pool.spawn(move || {
146 processor.queue_block(task_id);
147 });
148 }
149 }
150 };
151 }
152
153 self.task_manager.wait_for_idle();
155
156 self.sender.send(VirtualStateProcessingMessage::Exit).unwrap();
158 }
159
160 fn queue_block(self: &Arc<BlockBodyProcessor>, task_id: TaskId) {
161 if let Some(task) = self.task_manager.try_begin(task_id) {
162 let res = self.process_body(task.block(), task.is_trusted());
163
164 let dependent_tasks = self.task_manager.end(task, |task, block_result_transmitter, virtual_state_result_transmitter| {
165 let _ = block_result_transmitter.send(res.clone());
166 if res.is_err() || !task.requires_virtual_processing() {
167 let _ = virtual_state_result_transmitter.send(res.clone());
169 } else {
170 self.sender.send(VirtualStateProcessingMessage::Process(task, virtual_state_result_transmitter)).unwrap();
171 }
172 });
173
174 for dep in dependent_tasks {
175 let processor = self.clone();
176 self.thread_pool.spawn(move || processor.queue_block(dep));
177 }
178 }
179 }
180
181 fn process_body(self: &Arc<BlockBodyProcessor>, block: &Block, is_trusted: bool) -> BlockProcessResult<BlockStatus> {
182 let _prune_guard = self.pruning_lock.blocking_read();
183 let status = self.statuses_store.read().get(block.hash()).unwrap();
184 match status {
185 StatusInvalid => return Err(RuleError::KnownInvalid),
186 StatusHeaderOnly => {} _ if status.has_block_body() => return Ok(status),
188 _ => panic!("unexpected block status {status:?}"),
189 }
190
191 let mass = match self.validate_body(block, is_trusted) {
192 Ok(mass) => mass,
193 Err(e) => {
194 if !matches!(e, RuleError::BadMerkleRoot(_, _) | RuleError::MissingParents(_) | RuleError::PrunedBlock) {
205 self.statuses_store.write().set(block.hash(), BlockStatus::StatusInvalid).unwrap();
206 }
207 return Err(e);
208 }
209 };
210
211 self.commit_body(block.hash(), block.header.direct_parents(), block.transactions.clone());
212
213 self.notification_root
215 .notify(Notification::BlockAdded(BlockAddedNotification::new(block.to_owned())))
216 .expect("expecting an open unbounded channel");
217
218 self.counters.body_counts.fetch_add(1, Ordering::Relaxed);
220 self.counters.txs_counts.fetch_add(block.transactions.len() as u64, Ordering::Relaxed);
221 self.counters.mass_counts.fetch_add(mass, Ordering::Relaxed);
222 Ok(BlockStatus::StatusUTXOPendingVerification)
223 }
224
225 fn validate_body(self: &Arc<BlockBodyProcessor>, block: &Block, is_trusted: bool) -> BlockProcessResult<u64> {
226 let mass = self.validate_body_in_isolation(block)?;
227 if !is_trusted {
228 self.validate_body_in_context(block)?;
229 }
230 Ok(mass)
231 }
232
233 fn commit_body(self: &Arc<BlockBodyProcessor>, hash: Hash, parents: &[Hash], transactions: Arc<Vec<Transaction>>) {
234 let mut batch = WriteBatch::default();
235
236 self.block_transactions_store.insert_batch(&mut batch, hash, transactions).unwrap();
238
239 let mut body_tips_write_guard = self.body_tips_store.write();
240 body_tips_write_guard.add_tip_batch(&mut batch, hash, parents).unwrap();
241 let statuses_write_guard =
242 self.statuses_store.set_batch(&mut batch, hash, BlockStatus::StatusUTXOPendingVerification).unwrap();
243
244 self.db.write(batch).unwrap();
245
246 drop(statuses_write_guard);
248 drop(body_tips_write_guard);
249 }
250
251 pub fn process_genesis(self: &Arc<BlockBodyProcessor>) {
252 let mut batch = WriteBatch::default();
254 let mut body_tips_write_guard = self.body_tips_store.write();
255 body_tips_write_guard.init_batch(&mut batch, &[]).unwrap();
256 self.db.write(batch).unwrap();
257 drop(body_tips_write_guard);
258
259 self.commit_body(self.genesis.hash, &[], Arc::new(self.genesis.build_genesis_transactions()))
261 }
262}