kaspa_consensus/pipeline/header_processor/
processor.rs

1use crate::{
2    consensus::{
3        services::{
4            ConsensusServices, DbBlockDepthManager, DbDagTraversalManager, DbGhostdagManager, DbParentsManager, DbPruningPointManager,
5            DbWindowManager,
6        },
7        storage::ConsensusStorage,
8    },
9    errors::{BlockProcessResult, RuleError},
10    model::{
11        services::reachability::MTReachabilityService,
12        stores::{
13            block_window_cache::{BlockWindowCacheStore, BlockWindowHeap},
14            daa::DbDaaStore,
15            depth::DbDepthStore,
16            ghostdag::{DbGhostdagStore, GhostdagData, GhostdagStoreReader},
17            headers::DbHeadersStore,
18            headers_selected_tip::{DbHeadersSelectedTipStore, HeadersSelectedTipStoreReader},
19            pruning::{DbPruningStore, PruningPointInfo, PruningStoreReader},
20            reachability::{DbReachabilityStore, StagingReachabilityStore},
21            relations::{DbRelationsStore, RelationsStoreReader},
22            statuses::{DbStatusesStore, StatusesStore, StatusesStoreBatchExtensions, StatusesStoreReader},
23            DB,
24        },
25    },
26    params::Params,
27    pipeline::deps_manager::{BlockProcessingMessage, BlockTask, BlockTaskDependencyManager, TaskId},
28    processes::{ghostdag::ordering::SortableBlock, reachability::inquirer as reachability, relations::RelationsStoreExtensions},
29};
30use crossbeam_channel::{Receiver, Sender};
31use itertools::Itertools;
32use kaspa_consensus_core::{
33    blockhash::{BlockHashes, ORIGIN},
34    blockstatus::BlockStatus::{self, StatusHeaderOnly, StatusInvalid},
35    config::genesis::GenesisBlock,
36    header::Header,
37    BlockHashSet, BlockLevel,
38};
39use kaspa_consensusmanager::SessionLock;
40use kaspa_database::prelude::{StoreResultEmptyTuple, StoreResultExtensions};
41use kaspa_hashes::Hash;
42use kaspa_utils::vec::VecExtensions;
43use parking_lot::RwLock;
44use rayon::ThreadPool;
45use rocksdb::WriteBatch;
46use std::sync::{atomic::Ordering, Arc};
47
48use super::super::ProcessingCounters;
49
50pub struct HeaderProcessingContext {
51    pub hash: Hash,
52    pub header: Arc<Header>,
53    pub pruning_info: PruningPointInfo,
54    pub block_level: BlockLevel,
55    pub known_parents: Vec<BlockHashes>,
56
57    // Staging data
58    pub ghostdag_data: Option<Vec<Arc<GhostdagData>>>,
59    pub block_window_for_difficulty: Option<Arc<BlockWindowHeap>>,
60    pub block_window_for_past_median_time: Option<Arc<BlockWindowHeap>>,
61    pub mergeset_non_daa: Option<BlockHashSet>,
62    pub merge_depth_root: Option<Hash>,
63    pub finality_point: Option<Hash>,
64}
65
66impl HeaderProcessingContext {
67    pub fn new(
68        hash: Hash,
69        header: Arc<Header>,
70        block_level: BlockLevel,
71        pruning_info: PruningPointInfo,
72        known_parents: Vec<BlockHashes>,
73    ) -> Self {
74        Self {
75            hash,
76            header,
77            block_level,
78            pruning_info,
79            known_parents,
80            ghostdag_data: None,
81            block_window_for_difficulty: None,
82            mergeset_non_daa: None,
83            block_window_for_past_median_time: None,
84            merge_depth_root: None,
85            finality_point: None,
86        }
87    }
88
89    /// Returns the direct parents of this header after removal of unknown parents
90    pub fn direct_known_parents(&self) -> &[Hash] {
91        &self.known_parents[0]
92    }
93
94    /// Returns the pruning point at the time this header began processing
95    pub fn pruning_point(&self) -> Hash {
96        self.pruning_info.pruning_point
97    }
98
99    /// Returns the primary (level 0) GHOSTDAG data of this header.
100    /// NOTE: is expected to be called only after GHOSTDAG computation was pushed into the context
101    pub fn ghostdag_data(&self) -> &Arc<GhostdagData> {
102        &self.ghostdag_data.as_ref().unwrap()[0]
103    }
104}
105
106pub struct HeaderProcessor {
107    // Channels
108    receiver: Receiver<BlockProcessingMessage>,
109    body_sender: Sender<BlockProcessingMessage>,
110
111    // Thread pool
112    pub(super) thread_pool: Arc<ThreadPool>,
113
114    // Config
115    pub(super) genesis: GenesisBlock,
116    pub(super) timestamp_deviation_tolerance: u64,
117    pub(super) target_time_per_block: u64,
118    pub(super) max_block_parents: u8,
119    pub(super) mergeset_size_limit: u64,
120    pub(super) skip_proof_of_work: bool,
121    pub(super) max_block_level: BlockLevel,
122
123    // DB
124    db: Arc<DB>,
125
126    // Stores
127    pub(super) relations_stores: Arc<RwLock<Vec<DbRelationsStore>>>,
128    pub(super) reachability_store: Arc<RwLock<DbReachabilityStore>>,
129    pub(super) reachability_relations_store: Arc<RwLock<DbRelationsStore>>,
130    pub(super) ghostdag_stores: Arc<Vec<Arc<DbGhostdagStore>>>,
131    pub(super) statuses_store: Arc<RwLock<DbStatusesStore>>,
132    pub(super) pruning_point_store: Arc<RwLock<DbPruningStore>>,
133    pub(super) block_window_cache_for_difficulty: Arc<BlockWindowCacheStore>,
134    pub(super) block_window_cache_for_past_median_time: Arc<BlockWindowCacheStore>,
135    pub(super) daa_excluded_store: Arc<DbDaaStore>,
136    pub(super) headers_store: Arc<DbHeadersStore>,
137    pub(super) headers_selected_tip_store: Arc<RwLock<DbHeadersSelectedTipStore>>,
138    pub(super) depth_store: Arc<DbDepthStore>,
139
140    // Managers and services
141    pub(super) ghostdag_managers: Arc<Vec<DbGhostdagManager>>,
142    pub(super) dag_traversal_manager: DbDagTraversalManager,
143    pub(super) window_manager: DbWindowManager,
144    pub(super) depth_manager: DbBlockDepthManager,
145    pub(super) reachability_service: MTReachabilityService<DbReachabilityStore>,
146    pub(super) pruning_point_manager: DbPruningPointManager,
147    pub(super) parents_manager: DbParentsManager,
148
149    // Pruning lock
150    pruning_lock: SessionLock,
151
152    // Dependency manager
153    task_manager: BlockTaskDependencyManager,
154
155    // Counters
156    counters: Arc<ProcessingCounters>,
157}
158
159impl HeaderProcessor {
160    pub fn new(
161        receiver: Receiver<BlockProcessingMessage>,
162        body_sender: Sender<BlockProcessingMessage>,
163        thread_pool: Arc<ThreadPool>,
164        params: &Params,
165        db: Arc<DB>,
166        storage: &Arc<ConsensusStorage>,
167        services: &Arc<ConsensusServices>,
168        pruning_lock: SessionLock,
169        counters: Arc<ProcessingCounters>,
170    ) -> Self {
171        Self {
172            receiver,
173            body_sender,
174            thread_pool,
175            genesis: params.genesis.clone(),
176            db,
177
178            relations_stores: storage.relations_stores.clone(),
179            reachability_store: storage.reachability_store.clone(),
180            reachability_relations_store: storage.reachability_relations_store.clone(),
181            ghostdag_stores: storage.ghostdag_stores.clone(),
182            statuses_store: storage.statuses_store.clone(),
183            pruning_point_store: storage.pruning_point_store.clone(),
184            daa_excluded_store: storage.daa_excluded_store.clone(),
185            headers_store: storage.headers_store.clone(),
186            depth_store: storage.depth_store.clone(),
187            headers_selected_tip_store: storage.headers_selected_tip_store.clone(),
188            block_window_cache_for_difficulty: storage.block_window_cache_for_difficulty.clone(),
189            block_window_cache_for_past_median_time: storage.block_window_cache_for_past_median_time.clone(),
190
191            ghostdag_managers: services.ghostdag_managers.clone(),
192            dag_traversal_manager: services.dag_traversal_manager.clone(),
193            window_manager: services.window_manager.clone(),
194            reachability_service: services.reachability_service.clone(),
195            depth_manager: services.depth_manager.clone(),
196            pruning_point_manager: services.pruning_point_manager.clone(),
197            parents_manager: services.parents_manager.clone(),
198
199            task_manager: BlockTaskDependencyManager::new(),
200            pruning_lock,
201            counters,
202            // TODO (HF): make sure to also pass `new_timestamp_deviation_tolerance` and use according to HF activation score
203            timestamp_deviation_tolerance: params.timestamp_deviation_tolerance(0),
204            target_time_per_block: params.target_time_per_block,
205            max_block_parents: params.max_block_parents,
206            mergeset_size_limit: params.mergeset_size_limit,
207            skip_proof_of_work: params.skip_proof_of_work,
208            max_block_level: params.max_block_level,
209        }
210    }
211
212    pub fn worker(self: &Arc<HeaderProcessor>) {
213        while let Ok(msg) = self.receiver.recv() {
214            match msg {
215                BlockProcessingMessage::Exit => {
216                    break;
217                }
218                BlockProcessingMessage::Process(task, block_result_transmitter, virtual_state_result_transmitter) => {
219                    if let Some(task_id) = self.task_manager.register(task, block_result_transmitter, virtual_state_result_transmitter)
220                    {
221                        let processor = self.clone();
222                        self.thread_pool.spawn(move || {
223                            processor.queue_block(task_id);
224                        });
225                    }
226                }
227            };
228        }
229
230        // Wait until all workers are idle before exiting
231        self.task_manager.wait_for_idle();
232
233        // Pass the exit signal on to the following processor
234        self.body_sender.send(BlockProcessingMessage::Exit).unwrap();
235    }
236
237    fn queue_block(self: &Arc<HeaderProcessor>, task_id: TaskId) {
238        if let Some(task) = self.task_manager.try_begin(task_id) {
239            let res = self.process_header(&task);
240
241            let dependent_tasks = self.task_manager.end(
242                task,
243                |task,
244                 block_result_transmitter: tokio::sync::oneshot::Sender<Result<BlockStatus, RuleError>>,
245                 virtual_state_result_transmitter| {
246                    if res.is_err() || task.block().is_header_only() {
247                        // We don't care if receivers were dropped
248                        let _ = block_result_transmitter.send(res.clone());
249                        let _ = virtual_state_result_transmitter.send(res.clone());
250                    } else {
251                        self.body_sender
252                            .send(BlockProcessingMessage::Process(task, block_result_transmitter, virtual_state_result_transmitter))
253                            .unwrap();
254                    }
255                },
256            );
257
258            for dep in dependent_tasks {
259                let processor = self.clone();
260                self.thread_pool.spawn(move || processor.queue_block(dep));
261            }
262        }
263    }
264
265    fn process_header(&self, task: &BlockTask) -> BlockProcessResult<BlockStatus> {
266        let _prune_guard = self.pruning_lock.blocking_read();
267        let header = &task.block().header;
268        let status_option = self.statuses_store.read().get(header.hash).unwrap_option();
269
270        match status_option {
271            Some(StatusInvalid) => return Err(RuleError::KnownInvalid),
272            Some(status) => return Ok(status),
273            None => {}
274        }
275
276        // Validate the header depending on task type
277        match task {
278            BlockTask::Ordinary { .. } => {
279                let ctx = self.validate_header(header)?;
280                self.commit_header(ctx, header);
281            }
282            BlockTask::Trusted { .. } => {
283                let ctx = self.validate_trusted_header(header)?;
284                self.commit_trusted_header(ctx, header);
285            }
286        }
287
288        // Report counters
289        self.counters.header_counts.fetch_add(1, Ordering::Relaxed);
290        self.counters.dep_counts.fetch_add(header.direct_parents().len() as u64, Ordering::Relaxed);
291
292        Ok(StatusHeaderOnly)
293    }
294
295    /// Runs full ordinary header validation
296    fn validate_header(&self, header: &Arc<Header>) -> BlockProcessResult<HeaderProcessingContext> {
297        let block_level = self.validate_header_in_isolation(header)?;
298        self.validate_parent_relations(header)?;
299        let mut ctx = self.build_processing_context(header, block_level);
300        self.ghostdag(&mut ctx);
301        self.pre_pow_validation(&mut ctx, header)?;
302        if let Err(e) = self.post_pow_validation(&mut ctx, header) {
303            self.statuses_store.write().set(ctx.hash, StatusInvalid).unwrap();
304            return Err(e);
305        }
306        Ok(ctx)
307    }
308
309    // Runs partial header validation for trusted blocks (currently validates only header-in-isolation and computes GHOSTDAG).
310    fn validate_trusted_header(&self, header: &Arc<Header>) -> BlockProcessResult<HeaderProcessingContext> {
311        let block_level = self.validate_header_in_isolation(header)?;
312        let mut ctx = self.build_processing_context(header, block_level);
313        self.ghostdag(&mut ctx);
314        Ok(ctx)
315    }
316
317    fn build_processing_context(&self, header: &Arc<Header>, block_level: u8) -> HeaderProcessingContext {
318        HeaderProcessingContext::new(
319            header.hash,
320            header.clone(),
321            block_level,
322            self.pruning_point_store.read().get().unwrap(),
323            self.collect_known_parents(header, block_level),
324        )
325    }
326
327    /// Collects the known parents for all block levels
328    fn collect_known_parents(&self, header: &Header, block_level: BlockLevel) -> Vec<Arc<Vec<Hash>>> {
329        let relations_read = self.relations_stores.read();
330        (0..=block_level)
331            .map(|level| {
332                Arc::new(
333                    self.parents_manager
334                        .parents_at_level(header, level)
335                        .iter()
336                        .copied()
337                        .filter(|parent| relations_read[level as usize].has(*parent).unwrap())
338                        .collect_vec()
339                        // This kicks-in only for trusted blocks or for level > 0. If an ordinary block is 
340                        // missing direct parents it will fail validation.
341                        .push_if_empty(ORIGIN),
342                )
343            })
344            .collect_vec()
345    }
346
347    /// Runs the GHOSTDAG algorithm for all block levels and writes the data into the context (if hasn't run already)
348    fn ghostdag(&self, ctx: &mut HeaderProcessingContext) {
349        let ghostdag_data = (0..=ctx.block_level as usize)
350            .map(|level| {
351                self.ghostdag_stores[level]
352                    .get_data(ctx.hash)
353                    .unwrap_option()
354                    .unwrap_or_else(|| Arc::new(self.ghostdag_managers[level].ghostdag(&ctx.known_parents[level])))
355            })
356            .collect_vec();
357
358        self.counters.mergeset_counts.fetch_add(ghostdag_data[0].mergeset_size() as u64, Ordering::Relaxed);
359        ctx.ghostdag_data = Some(ghostdag_data);
360    }
361
362    fn commit_header(&self, ctx: HeaderProcessingContext, header: &Header) {
363        let ghostdag_data = ctx.ghostdag_data.as_ref().unwrap();
364        let pp = ctx.pruning_point();
365
366        // Create a DB batch writer
367        let mut batch = WriteBatch::default();
368
369        //
370        // Append-only stores: these require no lock and hence done first in order to reduce locking time
371        //
372
373        for (level, datum) in ghostdag_data.iter().enumerate() {
374            self.ghostdag_stores[level].insert_batch(&mut batch, ctx.hash, datum).unwrap();
375        }
376        if let Some(window) = ctx.block_window_for_difficulty {
377            self.block_window_cache_for_difficulty.insert(ctx.hash, window);
378        }
379        if let Some(window) = ctx.block_window_for_past_median_time {
380            self.block_window_cache_for_past_median_time.insert(ctx.hash, window);
381        }
382
383        self.daa_excluded_store.insert_batch(&mut batch, ctx.hash, Arc::new(ctx.mergeset_non_daa.unwrap())).unwrap();
384        self.headers_store.insert_batch(&mut batch, ctx.hash, ctx.header, ctx.block_level).unwrap();
385        self.depth_store.insert_batch(&mut batch, ctx.hash, ctx.merge_depth_root.unwrap(), ctx.finality_point.unwrap()).unwrap();
386
387        //
388        // Reachability and header chain stores
389        //
390
391        // Create staging reachability store. We use an upgradable read here to avoid concurrent
392        // staging reachability operations. PERF: we assume that reachability processing time << header processing
393        // time, and thus serializing this part will do no harm. However this should be benchmarked. The
394        // alternative is to create a separate ReachabilityProcessor and to manage things more tightly.
395        let mut staging = StagingReachabilityStore::new(self.reachability_store.upgradable_read());
396        let selected_parent = ghostdag_data[0].selected_parent;
397        let mut reachability_mergeset = ghostdag_data[0].unordered_mergeset_without_selected_parent();
398        reachability::add_block(&mut staging, ctx.hash, selected_parent, &mut reachability_mergeset).unwrap();
399
400        // Non-append only stores need to use write locks.
401        // Note we need to keep the lock write guards until the batch is written.
402        let mut hst_write = self.headers_selected_tip_store.write();
403        let prev_hst = hst_write.get().unwrap();
404        if SortableBlock::new(ctx.hash, header.blue_work) > prev_hst
405            && reachability::is_chain_ancestor_of(&staging, pp, ctx.hash).unwrap()
406        {
407            // Hint reachability about the new tip.
408            reachability::hint_virtual_selected_parent(&mut staging, ctx.hash).unwrap();
409            hst_write.set_batch(&mut batch, SortableBlock::new(ctx.hash, header.blue_work)).unwrap();
410        }
411
412        //
413        // Relations and statuses
414        //
415
416        let reachability_parents = ctx.known_parents[0].clone();
417
418        let mut relations_write = self.relations_stores.write();
419        ctx.known_parents.into_iter().enumerate().for_each(|(level, parents_by_level)| {
420            relations_write[level].insert_batch(&mut batch, header.hash, parents_by_level).unwrap();
421        });
422
423        // Write reachability relations. These relations are only needed during header pruning
424        let mut reachability_relations_write = self.reachability_relations_store.write();
425        reachability_relations_write.insert_batch(&mut batch, ctx.hash, reachability_parents).unwrap();
426
427        let statuses_write = self.statuses_store.set_batch(&mut batch, ctx.hash, StatusHeaderOnly).unwrap();
428
429        // Write reachability data. Only at this brief moment the reachability store is locked for reads.
430        // We take special care for this since reachability read queries are used throughout the system frequently.
431        // Note we hold the lock until the batch is written
432        let reachability_write = staging.commit(&mut batch).unwrap();
433
434        // Flush the batch to the DB
435        self.db.write(batch).unwrap();
436
437        // Calling the drops explicitly after the batch is written in order to avoid possible errors.
438        drop(reachability_write);
439        drop(statuses_write);
440        drop(reachability_relations_write);
441        drop(relations_write);
442        drop(hst_write);
443    }
444
445    fn commit_trusted_header(&self, ctx: HeaderProcessingContext, _header: &Header) {
446        let ghostdag_data = ctx.ghostdag_data.as_ref().unwrap();
447
448        // Create a DB batch writer
449        let mut batch = WriteBatch::default();
450
451        for (level, datum) in ghostdag_data.iter().enumerate() {
452            // This data might have been already written when applying the pruning proof.
453            self.ghostdag_stores[level].insert_batch(&mut batch, ctx.hash, datum).unwrap_or_exists();
454        }
455
456        let mut relations_write = self.relations_stores.write();
457        ctx.known_parents.into_iter().enumerate().for_each(|(level, parents_by_level)| {
458            // This data might have been already written when applying the pruning proof.
459            relations_write[level].insert_batch(&mut batch, ctx.hash, parents_by_level).unwrap_or_exists();
460        });
461
462        let statuses_write = self.statuses_store.set_batch(&mut batch, ctx.hash, StatusHeaderOnly).unwrap();
463
464        // Flush the batch to the DB
465        self.db.write(batch).unwrap();
466
467        // Calling the drops explicitly after the batch is written in order to avoid possible errors.
468        drop(statuses_write);
469        drop(relations_write);
470    }
471
472    pub fn process_genesis(&self) {
473        // Init headers selected tip and selected chain stores
474        let mut batch = WriteBatch::default();
475        let mut hst_write = self.headers_selected_tip_store.write();
476        hst_write.set_batch(&mut batch, SortableBlock::new(self.genesis.hash, 0.into())).unwrap();
477        self.db.write(batch).unwrap();
478        drop(hst_write);
479
480        // Write the genesis header
481        let mut genesis_header: Header = (&self.genesis).into();
482        // Force the provided genesis hash. Important for some tests which manually modify the genesis hash.
483        // Note that for official nets (mainnet, testnet etc) they are guaranteed to be equal as enforced by a test in genesis.rs
484        genesis_header.hash = self.genesis.hash;
485        let genesis_header = Arc::new(genesis_header);
486
487        let mut ctx = HeaderProcessingContext::new(
488            self.genesis.hash,
489            genesis_header.clone(),
490            self.max_block_level,
491            PruningPointInfo::from_genesis(self.genesis.hash),
492            (0..=self.max_block_level).map(|_| BlockHashes::new(vec![ORIGIN])).collect(),
493        );
494        ctx.ghostdag_data =
495            Some(self.ghostdag_managers.iter().map(|manager_by_level| Arc::new(manager_by_level.genesis_ghostdag_data())).collect());
496        ctx.mergeset_non_daa = Some(Default::default());
497        ctx.merge_depth_root = Some(ORIGIN);
498        ctx.finality_point = Some(ORIGIN);
499
500        self.commit_header(ctx, &genesis_header);
501    }
502
503    pub fn init(&self) {
504        if self.relations_stores.read()[0].has(ORIGIN).unwrap() {
505            return;
506        }
507
508        let mut batch = WriteBatch::default();
509        let mut relations_write = self.relations_stores.write();
510        (0..=self.max_block_level)
511            .for_each(|level| relations_write[level as usize].insert_batch(&mut batch, ORIGIN, BlockHashes::new(vec![])).unwrap());
512        let mut hst_write = self.headers_selected_tip_store.write();
513        hst_write.set_batch(&mut batch, SortableBlock::new(ORIGIN, 0.into())).unwrap();
514        self.db.write(batch).unwrap();
515        drop(hst_write);
516        drop(relations_write);
517    }
518}