1use crate::{
2 consensus::{
3 services::{
4 ConsensusServices, DbBlockDepthManager, DbDagTraversalManager, DbGhostdagManager, DbParentsManager, DbPruningPointManager,
5 DbWindowManager,
6 },
7 storage::ConsensusStorage,
8 },
9 errors::{BlockProcessResult, RuleError},
10 model::{
11 services::reachability::MTReachabilityService,
12 stores::{
13 block_window_cache::{BlockWindowCacheStore, BlockWindowHeap},
14 daa::DbDaaStore,
15 depth::DbDepthStore,
16 ghostdag::{DbGhostdagStore, GhostdagData, GhostdagStoreReader},
17 headers::DbHeadersStore,
18 headers_selected_tip::{DbHeadersSelectedTipStore, HeadersSelectedTipStoreReader},
19 pruning::{DbPruningStore, PruningPointInfo, PruningStoreReader},
20 reachability::{DbReachabilityStore, StagingReachabilityStore},
21 relations::{DbRelationsStore, RelationsStoreReader},
22 statuses::{DbStatusesStore, StatusesStore, StatusesStoreBatchExtensions, StatusesStoreReader},
23 DB,
24 },
25 },
26 params::Params,
27 pipeline::deps_manager::{BlockProcessingMessage, BlockTask, BlockTaskDependencyManager, TaskId},
28 processes::{ghostdag::ordering::SortableBlock, reachability::inquirer as reachability, relations::RelationsStoreExtensions},
29};
30use crossbeam_channel::{Receiver, Sender};
31use itertools::Itertools;
32use kaspa_consensus_core::{
33 blockhash::{BlockHashes, ORIGIN},
34 blockstatus::BlockStatus::{self, StatusHeaderOnly, StatusInvalid},
35 config::genesis::GenesisBlock,
36 header::Header,
37 BlockHashSet, BlockLevel,
38};
39use kaspa_consensusmanager::SessionLock;
40use kaspa_database::prelude::{StoreResultEmptyTuple, StoreResultExtensions};
41use kaspa_hashes::Hash;
42use kaspa_utils::vec::VecExtensions;
43use parking_lot::RwLock;
44use rayon::ThreadPool;
45use rocksdb::WriteBatch;
46use std::sync::{atomic::Ordering, Arc};
47
48use super::super::ProcessingCounters;
49
50pub struct HeaderProcessingContext {
51 pub hash: Hash,
52 pub header: Arc<Header>,
53 pub pruning_info: PruningPointInfo,
54 pub block_level: BlockLevel,
55 pub known_parents: Vec<BlockHashes>,
56
57 pub ghostdag_data: Option<Vec<Arc<GhostdagData>>>,
59 pub block_window_for_difficulty: Option<Arc<BlockWindowHeap>>,
60 pub block_window_for_past_median_time: Option<Arc<BlockWindowHeap>>,
61 pub mergeset_non_daa: Option<BlockHashSet>,
62 pub merge_depth_root: Option<Hash>,
63 pub finality_point: Option<Hash>,
64}
65
66impl HeaderProcessingContext {
67 pub fn new(
68 hash: Hash,
69 header: Arc<Header>,
70 block_level: BlockLevel,
71 pruning_info: PruningPointInfo,
72 known_parents: Vec<BlockHashes>,
73 ) -> Self {
74 Self {
75 hash,
76 header,
77 block_level,
78 pruning_info,
79 known_parents,
80 ghostdag_data: None,
81 block_window_for_difficulty: None,
82 mergeset_non_daa: None,
83 block_window_for_past_median_time: None,
84 merge_depth_root: None,
85 finality_point: None,
86 }
87 }
88
89 pub fn direct_known_parents(&self) -> &[Hash] {
91 &self.known_parents[0]
92 }
93
94 pub fn pruning_point(&self) -> Hash {
96 self.pruning_info.pruning_point
97 }
98
99 pub fn ghostdag_data(&self) -> &Arc<GhostdagData> {
102 &self.ghostdag_data.as_ref().unwrap()[0]
103 }
104}
105
106pub struct HeaderProcessor {
107 receiver: Receiver<BlockProcessingMessage>,
109 body_sender: Sender<BlockProcessingMessage>,
110
111 pub(super) thread_pool: Arc<ThreadPool>,
113
114 pub(super) genesis: GenesisBlock,
116 pub(super) timestamp_deviation_tolerance: u64,
117 pub(super) target_time_per_block: u64,
118 pub(super) max_block_parents: u8,
119 pub(super) mergeset_size_limit: u64,
120 pub(super) skip_proof_of_work: bool,
121 pub(super) max_block_level: BlockLevel,
122
123 db: Arc<DB>,
125
126 pub(super) relations_stores: Arc<RwLock<Vec<DbRelationsStore>>>,
128 pub(super) reachability_store: Arc<RwLock<DbReachabilityStore>>,
129 pub(super) reachability_relations_store: Arc<RwLock<DbRelationsStore>>,
130 pub(super) ghostdag_stores: Arc<Vec<Arc<DbGhostdagStore>>>,
131 pub(super) statuses_store: Arc<RwLock<DbStatusesStore>>,
132 pub(super) pruning_point_store: Arc<RwLock<DbPruningStore>>,
133 pub(super) block_window_cache_for_difficulty: Arc<BlockWindowCacheStore>,
134 pub(super) block_window_cache_for_past_median_time: Arc<BlockWindowCacheStore>,
135 pub(super) daa_excluded_store: Arc<DbDaaStore>,
136 pub(super) headers_store: Arc<DbHeadersStore>,
137 pub(super) headers_selected_tip_store: Arc<RwLock<DbHeadersSelectedTipStore>>,
138 pub(super) depth_store: Arc<DbDepthStore>,
139
140 pub(super) ghostdag_managers: Arc<Vec<DbGhostdagManager>>,
142 pub(super) dag_traversal_manager: DbDagTraversalManager,
143 pub(super) window_manager: DbWindowManager,
144 pub(super) depth_manager: DbBlockDepthManager,
145 pub(super) reachability_service: MTReachabilityService<DbReachabilityStore>,
146 pub(super) pruning_point_manager: DbPruningPointManager,
147 pub(super) parents_manager: DbParentsManager,
148
149 pruning_lock: SessionLock,
151
152 task_manager: BlockTaskDependencyManager,
154
155 counters: Arc<ProcessingCounters>,
157}
158
159impl HeaderProcessor {
160 pub fn new(
161 receiver: Receiver<BlockProcessingMessage>,
162 body_sender: Sender<BlockProcessingMessage>,
163 thread_pool: Arc<ThreadPool>,
164 params: &Params,
165 db: Arc<DB>,
166 storage: &Arc<ConsensusStorage>,
167 services: &Arc<ConsensusServices>,
168 pruning_lock: SessionLock,
169 counters: Arc<ProcessingCounters>,
170 ) -> Self {
171 Self {
172 receiver,
173 body_sender,
174 thread_pool,
175 genesis: params.genesis.clone(),
176 db,
177
178 relations_stores: storage.relations_stores.clone(),
179 reachability_store: storage.reachability_store.clone(),
180 reachability_relations_store: storage.reachability_relations_store.clone(),
181 ghostdag_stores: storage.ghostdag_stores.clone(),
182 statuses_store: storage.statuses_store.clone(),
183 pruning_point_store: storage.pruning_point_store.clone(),
184 daa_excluded_store: storage.daa_excluded_store.clone(),
185 headers_store: storage.headers_store.clone(),
186 depth_store: storage.depth_store.clone(),
187 headers_selected_tip_store: storage.headers_selected_tip_store.clone(),
188 block_window_cache_for_difficulty: storage.block_window_cache_for_difficulty.clone(),
189 block_window_cache_for_past_median_time: storage.block_window_cache_for_past_median_time.clone(),
190
191 ghostdag_managers: services.ghostdag_managers.clone(),
192 dag_traversal_manager: services.dag_traversal_manager.clone(),
193 window_manager: services.window_manager.clone(),
194 reachability_service: services.reachability_service.clone(),
195 depth_manager: services.depth_manager.clone(),
196 pruning_point_manager: services.pruning_point_manager.clone(),
197 parents_manager: services.parents_manager.clone(),
198
199 task_manager: BlockTaskDependencyManager::new(),
200 pruning_lock,
201 counters,
202 timestamp_deviation_tolerance: params.timestamp_deviation_tolerance(0),
204 target_time_per_block: params.target_time_per_block,
205 max_block_parents: params.max_block_parents,
206 mergeset_size_limit: params.mergeset_size_limit,
207 skip_proof_of_work: params.skip_proof_of_work,
208 max_block_level: params.max_block_level,
209 }
210 }
211
212 pub fn worker(self: &Arc<HeaderProcessor>) {
213 while let Ok(msg) = self.receiver.recv() {
214 match msg {
215 BlockProcessingMessage::Exit => {
216 break;
217 }
218 BlockProcessingMessage::Process(task, block_result_transmitter, virtual_state_result_transmitter) => {
219 if let Some(task_id) = self.task_manager.register(task, block_result_transmitter, virtual_state_result_transmitter)
220 {
221 let processor = self.clone();
222 self.thread_pool.spawn(move || {
223 processor.queue_block(task_id);
224 });
225 }
226 }
227 };
228 }
229
230 self.task_manager.wait_for_idle();
232
233 self.body_sender.send(BlockProcessingMessage::Exit).unwrap();
235 }
236
237 fn queue_block(self: &Arc<HeaderProcessor>, task_id: TaskId) {
238 if let Some(task) = self.task_manager.try_begin(task_id) {
239 let res = self.process_header(&task);
240
241 let dependent_tasks = self.task_manager.end(
242 task,
243 |task,
244 block_result_transmitter: tokio::sync::oneshot::Sender<Result<BlockStatus, RuleError>>,
245 virtual_state_result_transmitter| {
246 if res.is_err() || task.block().is_header_only() {
247 let _ = block_result_transmitter.send(res.clone());
249 let _ = virtual_state_result_transmitter.send(res.clone());
250 } else {
251 self.body_sender
252 .send(BlockProcessingMessage::Process(task, block_result_transmitter, virtual_state_result_transmitter))
253 .unwrap();
254 }
255 },
256 );
257
258 for dep in dependent_tasks {
259 let processor = self.clone();
260 self.thread_pool.spawn(move || processor.queue_block(dep));
261 }
262 }
263 }
264
265 fn process_header(&self, task: &BlockTask) -> BlockProcessResult<BlockStatus> {
266 let _prune_guard = self.pruning_lock.blocking_read();
267 let header = &task.block().header;
268 let status_option = self.statuses_store.read().get(header.hash).unwrap_option();
269
270 match status_option {
271 Some(StatusInvalid) => return Err(RuleError::KnownInvalid),
272 Some(status) => return Ok(status),
273 None => {}
274 }
275
276 match task {
278 BlockTask::Ordinary { .. } => {
279 let ctx = self.validate_header(header)?;
280 self.commit_header(ctx, header);
281 }
282 BlockTask::Trusted { .. } => {
283 let ctx = self.validate_trusted_header(header)?;
284 self.commit_trusted_header(ctx, header);
285 }
286 }
287
288 self.counters.header_counts.fetch_add(1, Ordering::Relaxed);
290 self.counters.dep_counts.fetch_add(header.direct_parents().len() as u64, Ordering::Relaxed);
291
292 Ok(StatusHeaderOnly)
293 }
294
295 fn validate_header(&self, header: &Arc<Header>) -> BlockProcessResult<HeaderProcessingContext> {
297 let block_level = self.validate_header_in_isolation(header)?;
298 self.validate_parent_relations(header)?;
299 let mut ctx = self.build_processing_context(header, block_level);
300 self.ghostdag(&mut ctx);
301 self.pre_pow_validation(&mut ctx, header)?;
302 if let Err(e) = self.post_pow_validation(&mut ctx, header) {
303 self.statuses_store.write().set(ctx.hash, StatusInvalid).unwrap();
304 return Err(e);
305 }
306 Ok(ctx)
307 }
308
309 fn validate_trusted_header(&self, header: &Arc<Header>) -> BlockProcessResult<HeaderProcessingContext> {
311 let block_level = self.validate_header_in_isolation(header)?;
312 let mut ctx = self.build_processing_context(header, block_level);
313 self.ghostdag(&mut ctx);
314 Ok(ctx)
315 }
316
317 fn build_processing_context(&self, header: &Arc<Header>, block_level: u8) -> HeaderProcessingContext {
318 HeaderProcessingContext::new(
319 header.hash,
320 header.clone(),
321 block_level,
322 self.pruning_point_store.read().get().unwrap(),
323 self.collect_known_parents(header, block_level),
324 )
325 }
326
327 fn collect_known_parents(&self, header: &Header, block_level: BlockLevel) -> Vec<Arc<Vec<Hash>>> {
329 let relations_read = self.relations_stores.read();
330 (0..=block_level)
331 .map(|level| {
332 Arc::new(
333 self.parents_manager
334 .parents_at_level(header, level)
335 .iter()
336 .copied()
337 .filter(|parent| relations_read[level as usize].has(*parent).unwrap())
338 .collect_vec()
339 .push_if_empty(ORIGIN),
342 )
343 })
344 .collect_vec()
345 }
346
347 fn ghostdag(&self, ctx: &mut HeaderProcessingContext) {
349 let ghostdag_data = (0..=ctx.block_level as usize)
350 .map(|level| {
351 self.ghostdag_stores[level]
352 .get_data(ctx.hash)
353 .unwrap_option()
354 .unwrap_or_else(|| Arc::new(self.ghostdag_managers[level].ghostdag(&ctx.known_parents[level])))
355 })
356 .collect_vec();
357
358 self.counters.mergeset_counts.fetch_add(ghostdag_data[0].mergeset_size() as u64, Ordering::Relaxed);
359 ctx.ghostdag_data = Some(ghostdag_data);
360 }
361
362 fn commit_header(&self, ctx: HeaderProcessingContext, header: &Header) {
363 let ghostdag_data = ctx.ghostdag_data.as_ref().unwrap();
364 let pp = ctx.pruning_point();
365
366 let mut batch = WriteBatch::default();
368
369 for (level, datum) in ghostdag_data.iter().enumerate() {
374 self.ghostdag_stores[level].insert_batch(&mut batch, ctx.hash, datum).unwrap();
375 }
376 if let Some(window) = ctx.block_window_for_difficulty {
377 self.block_window_cache_for_difficulty.insert(ctx.hash, window);
378 }
379 if let Some(window) = ctx.block_window_for_past_median_time {
380 self.block_window_cache_for_past_median_time.insert(ctx.hash, window);
381 }
382
383 self.daa_excluded_store.insert_batch(&mut batch, ctx.hash, Arc::new(ctx.mergeset_non_daa.unwrap())).unwrap();
384 self.headers_store.insert_batch(&mut batch, ctx.hash, ctx.header, ctx.block_level).unwrap();
385 self.depth_store.insert_batch(&mut batch, ctx.hash, ctx.merge_depth_root.unwrap(), ctx.finality_point.unwrap()).unwrap();
386
387 let mut staging = StagingReachabilityStore::new(self.reachability_store.upgradable_read());
396 let selected_parent = ghostdag_data[0].selected_parent;
397 let mut reachability_mergeset = ghostdag_data[0].unordered_mergeset_without_selected_parent();
398 reachability::add_block(&mut staging, ctx.hash, selected_parent, &mut reachability_mergeset).unwrap();
399
400 let mut hst_write = self.headers_selected_tip_store.write();
403 let prev_hst = hst_write.get().unwrap();
404 if SortableBlock::new(ctx.hash, header.blue_work) > prev_hst
405 && reachability::is_chain_ancestor_of(&staging, pp, ctx.hash).unwrap()
406 {
407 reachability::hint_virtual_selected_parent(&mut staging, ctx.hash).unwrap();
409 hst_write.set_batch(&mut batch, SortableBlock::new(ctx.hash, header.blue_work)).unwrap();
410 }
411
412 let reachability_parents = ctx.known_parents[0].clone();
417
418 let mut relations_write = self.relations_stores.write();
419 ctx.known_parents.into_iter().enumerate().for_each(|(level, parents_by_level)| {
420 relations_write[level].insert_batch(&mut batch, header.hash, parents_by_level).unwrap();
421 });
422
423 let mut reachability_relations_write = self.reachability_relations_store.write();
425 reachability_relations_write.insert_batch(&mut batch, ctx.hash, reachability_parents).unwrap();
426
427 let statuses_write = self.statuses_store.set_batch(&mut batch, ctx.hash, StatusHeaderOnly).unwrap();
428
429 let reachability_write = staging.commit(&mut batch).unwrap();
433
434 self.db.write(batch).unwrap();
436
437 drop(reachability_write);
439 drop(statuses_write);
440 drop(reachability_relations_write);
441 drop(relations_write);
442 drop(hst_write);
443 }
444
445 fn commit_trusted_header(&self, ctx: HeaderProcessingContext, _header: &Header) {
446 let ghostdag_data = ctx.ghostdag_data.as_ref().unwrap();
447
448 let mut batch = WriteBatch::default();
450
451 for (level, datum) in ghostdag_data.iter().enumerate() {
452 self.ghostdag_stores[level].insert_batch(&mut batch, ctx.hash, datum).unwrap_or_exists();
454 }
455
456 let mut relations_write = self.relations_stores.write();
457 ctx.known_parents.into_iter().enumerate().for_each(|(level, parents_by_level)| {
458 relations_write[level].insert_batch(&mut batch, ctx.hash, parents_by_level).unwrap_or_exists();
460 });
461
462 let statuses_write = self.statuses_store.set_batch(&mut batch, ctx.hash, StatusHeaderOnly).unwrap();
463
464 self.db.write(batch).unwrap();
466
467 drop(statuses_write);
469 drop(relations_write);
470 }
471
472 pub fn process_genesis(&self) {
473 let mut batch = WriteBatch::default();
475 let mut hst_write = self.headers_selected_tip_store.write();
476 hst_write.set_batch(&mut batch, SortableBlock::new(self.genesis.hash, 0.into())).unwrap();
477 self.db.write(batch).unwrap();
478 drop(hst_write);
479
480 let mut genesis_header: Header = (&self.genesis).into();
482 genesis_header.hash = self.genesis.hash;
485 let genesis_header = Arc::new(genesis_header);
486
487 let mut ctx = HeaderProcessingContext::new(
488 self.genesis.hash,
489 genesis_header.clone(),
490 self.max_block_level,
491 PruningPointInfo::from_genesis(self.genesis.hash),
492 (0..=self.max_block_level).map(|_| BlockHashes::new(vec![ORIGIN])).collect(),
493 );
494 ctx.ghostdag_data =
495 Some(self.ghostdag_managers.iter().map(|manager_by_level| Arc::new(manager_by_level.genesis_ghostdag_data())).collect());
496 ctx.mergeset_non_daa = Some(Default::default());
497 ctx.merge_depth_root = Some(ORIGIN);
498 ctx.finality_point = Some(ORIGIN);
499
500 self.commit_header(ctx, &genesis_header);
501 }
502
503 pub fn init(&self) {
504 if self.relations_stores.read()[0].has(ORIGIN).unwrap() {
505 return;
506 }
507
508 let mut batch = WriteBatch::default();
509 let mut relations_write = self.relations_stores.write();
510 (0..=self.max_block_level)
511 .for_each(|level| relations_write[level as usize].insert_batch(&mut batch, ORIGIN, BlockHashes::new(vec![])).unwrap());
512 let mut hst_write = self.headers_selected_tip_store.write();
513 hst_write.set_batch(&mut batch, SortableBlock::new(ORIGIN, 0.into())).unwrap();
514 self.db.write(batch).unwrap();
515 drop(hst_write);
516 drop(relations_write);
517 }
518}