kaspa_consensus/processes/pruning_proof/
mod.rs

1use std::{
2    cmp::{max, Reverse},
3    collections::{hash_map::Entry, BinaryHeap},
4    collections::{hash_map::Entry::Vacant, VecDeque},
5    ops::{Deref, DerefMut},
6    sync::{
7        atomic::{AtomicBool, Ordering},
8        Arc,
9    },
10};
11
12use itertools::Itertools;
13use kaspa_math::int::SignedInteger;
14use parking_lot::{Mutex, RwLock};
15use rocksdb::WriteBatch;
16
17use kaspa_consensus_core::{
18    blockhash::{self, BlockHashExtensions, BlockHashes, ORIGIN},
19    errors::{
20        consensus::{ConsensusError, ConsensusResult},
21        pruning::{PruningImportError, PruningImportResult},
22    },
23    header::Header,
24    pruning::{PruningPointProof, PruningPointTrustedData},
25    trusted::{TrustedBlock, TrustedGhostdagData, TrustedHeader},
26    BlockHashMap, BlockHashSet, BlockLevel, HashMapCustomHasher, KType,
27};
28use kaspa_core::{debug, info, trace};
29use kaspa_database::prelude::{CachePolicy, ConnBuilder, StoreResultEmptyTuple, StoreResultExtensions};
30use kaspa_hashes::Hash;
31use kaspa_pow::calc_block_level;
32use kaspa_utils::{binary_heap::BinaryHeapExtensions, vec::VecExtensions};
33use thiserror::Error;
34
35use crate::{
36    consensus::{
37        services::{DbDagTraversalManager, DbGhostdagManager, DbParentsManager, DbWindowManager},
38        storage::ConsensusStorage,
39    },
40    model::{
41        services::reachability::{MTReachabilityService, ReachabilityService},
42        stores::{
43            depth::DbDepthStore,
44            ghostdag::{DbGhostdagStore, GhostdagData, GhostdagStore, GhostdagStoreReader},
45            headers::{DbHeadersStore, HeaderStore, HeaderStoreReader},
46            headers_selected_tip::DbHeadersSelectedTipStore,
47            past_pruning_points::{DbPastPruningPointsStore, PastPruningPointsStore},
48            pruning::{DbPruningStore, PruningStoreReader},
49            reachability::{DbReachabilityStore, ReachabilityStoreReader, StagingReachabilityStore},
50            relations::{DbRelationsStore, RelationsStoreReader, StagingRelationsStore},
51            selected_chain::{DbSelectedChainStore, SelectedChainStore},
52            tips::DbTipsStore,
53            virtual_state::{VirtualState, VirtualStateStore, VirtualStateStoreReader, VirtualStores},
54            DB,
55        },
56    },
57    processes::{
58        ghostdag::ordering::SortableBlock, reachability::inquirer as reachability, relations::RelationsStoreExtensions,
59        window::WindowType,
60    },
61};
62
63use super::{
64    ghostdag::{mergeset::unordered_mergeset_without_selected_parent, protocol::GhostdagManager},
65    window::WindowManager,
66};
67
68#[derive(Error, Debug)]
69enum PruningProofManagerInternalError {
70    #[error("block at depth error: {0}")]
71    BlockAtDepth(String),
72
73    #[error("find common ancestor error: {0}")]
74    FindCommonAncestor(String),
75
76    #[error("cannot find a common ancestor: {0}")]
77    NoCommonAncestor(String),
78}
79
80struct CachedPruningPointData<T: ?Sized> {
81    pruning_point: Hash,
82    data: Arc<T>,
83}
84
85impl<T> Clone for CachedPruningPointData<T> {
86    fn clone(&self) -> Self {
87        Self { pruning_point: self.pruning_point, data: self.data.clone() }
88    }
89}
90
91pub struct PruningProofManager {
92    db: Arc<DB>,
93
94    headers_store: Arc<DbHeadersStore>,
95    reachability_store: Arc<RwLock<DbReachabilityStore>>,
96    reachability_relations_store: Arc<RwLock<DbRelationsStore>>,
97    reachability_service: MTReachabilityService<DbReachabilityStore>,
98    ghostdag_stores: Arc<Vec<Arc<DbGhostdagStore>>>,
99    relations_stores: Arc<RwLock<Vec<DbRelationsStore>>>,
100    pruning_point_store: Arc<RwLock<DbPruningStore>>,
101    past_pruning_points_store: Arc<DbPastPruningPointsStore>,
102    virtual_stores: Arc<RwLock<VirtualStores>>,
103    body_tips_store: Arc<RwLock<DbTipsStore>>,
104    headers_selected_tip_store: Arc<RwLock<DbHeadersSelectedTipStore>>,
105    depth_store: Arc<DbDepthStore>,
106    selected_chain_store: Arc<RwLock<DbSelectedChainStore>>,
107
108    ghostdag_managers: Arc<Vec<DbGhostdagManager>>,
109    traversal_manager: DbDagTraversalManager,
110    window_manager: DbWindowManager,
111    parents_manager: DbParentsManager,
112
113    cached_proof: Mutex<Option<CachedPruningPointData<PruningPointProof>>>,
114    cached_anticone: Mutex<Option<CachedPruningPointData<PruningPointTrustedData>>>,
115
116    max_block_level: BlockLevel,
117    genesis_hash: Hash,
118    pruning_proof_m: u64,
119    anticone_finalization_depth: u64,
120    ghostdag_k: KType,
121
122    is_consensus_exiting: Arc<AtomicBool>,
123}
124
125impl PruningProofManager {
126    #[allow(clippy::too_many_arguments)]
127    pub fn new(
128        db: Arc<DB>,
129        storage: &Arc<ConsensusStorage>,
130        parents_manager: DbParentsManager,
131        reachability_service: MTReachabilityService<DbReachabilityStore>,
132        ghostdag_managers: Arc<Vec<DbGhostdagManager>>,
133        traversal_manager: DbDagTraversalManager,
134        window_manager: DbWindowManager,
135        max_block_level: BlockLevel,
136        genesis_hash: Hash,
137        pruning_proof_m: u64,
138        anticone_finalization_depth: u64,
139        ghostdag_k: KType,
140        is_consensus_exiting: Arc<AtomicBool>,
141    ) -> Self {
142        Self {
143            db,
144            headers_store: storage.headers_store.clone(),
145            reachability_store: storage.reachability_store.clone(),
146            reachability_relations_store: storage.reachability_relations_store.clone(),
147            reachability_service,
148            ghostdag_stores: storage.ghostdag_stores.clone(),
149            relations_stores: storage.relations_stores.clone(),
150            pruning_point_store: storage.pruning_point_store.clone(),
151            past_pruning_points_store: storage.past_pruning_points_store.clone(),
152            virtual_stores: storage.virtual_stores.clone(),
153            body_tips_store: storage.body_tips_store.clone(),
154            headers_selected_tip_store: storage.headers_selected_tip_store.clone(),
155            selected_chain_store: storage.selected_chain_store.clone(),
156            depth_store: storage.depth_store.clone(),
157
158            ghostdag_managers,
159            traversal_manager,
160            window_manager,
161            parents_manager,
162
163            cached_proof: Mutex::new(None),
164            cached_anticone: Mutex::new(None),
165
166            max_block_level,
167            genesis_hash,
168            pruning_proof_m,
169            anticone_finalization_depth,
170            ghostdag_k,
171
172            is_consensus_exiting,
173        }
174    }
175
176    pub fn import_pruning_points(&self, pruning_points: &[Arc<Header>]) {
177        for (i, header) in pruning_points.iter().enumerate() {
178            self.past_pruning_points_store.set(i as u64, header.hash).unwrap();
179
180            if self.headers_store.has(header.hash).unwrap() {
181                continue;
182            }
183
184            let state = kaspa_pow::State::new(header);
185            let (_, pow) = state.check_pow(header.nonce);
186            let signed_block_level = self.max_block_level as i64 - pow.bits() as i64;
187            let block_level = max(signed_block_level, 0) as BlockLevel;
188            self.headers_store.insert(header.hash, header.clone(), block_level).unwrap();
189        }
190
191        let new_pruning_point = pruning_points.last().unwrap().hash;
192        info!("Setting {new_pruning_point} as the staging pruning point");
193
194        let mut pruning_point_write = self.pruning_point_store.write();
195        let mut batch = WriteBatch::default();
196        pruning_point_write.set_batch(&mut batch, new_pruning_point, new_pruning_point, (pruning_points.len() - 1) as u64).unwrap();
197        pruning_point_write.set_history_root(&mut batch, new_pruning_point).unwrap();
198        self.db.write(batch).unwrap();
199        drop(pruning_point_write);
200    }
201
202    pub fn apply_proof(&self, mut proof: PruningPointProof, trusted_set: &[TrustedBlock]) -> PruningImportResult<()> {
203        let pruning_point_header = proof[0].last().unwrap().clone();
204        let pruning_point = pruning_point_header.hash;
205
206        let proof_zero_set = BlockHashSet::from_iter(proof[0].iter().map(|header| header.hash));
207        let mut trusted_gd_map: BlockHashMap<GhostdagData> = BlockHashMap::new();
208        for tb in trusted_set.iter() {
209            trusted_gd_map.insert(tb.block.hash(), tb.ghostdag.clone().into());
210            if proof_zero_set.contains(&tb.block.hash()) {
211                continue;
212            }
213
214            proof[0].push(tb.block.header.clone());
215        }
216
217        proof[0].sort_by(|a, b| a.blue_work.cmp(&b.blue_work));
218        self.populate_reachability_and_headers(&proof);
219
220        {
221            let reachability_read = self.reachability_store.read();
222            for tb in trusted_set.iter() {
223                // Header-only trusted blocks are expected to be in pruning point past
224                if tb.block.is_header_only() && !reachability_read.is_dag_ancestor_of(tb.block.hash(), pruning_point) {
225                    return Err(PruningImportError::PruningPointPastMissingReachability(tb.block.hash()));
226                }
227            }
228        }
229
230        for (level, headers) in proof.iter().enumerate() {
231            trace!("Applying level {} from the pruning point proof", level);
232            self.ghostdag_stores[level].insert(ORIGIN, self.ghostdag_managers[level].origin_ghostdag_data()).unwrap();
233            for header in headers.iter() {
234                let parents = Arc::new(
235                    self.parents_manager
236                        .parents_at_level(header, level as BlockLevel)
237                        .iter()
238                        .copied()
239                        .filter(|parent| self.ghostdag_stores[level].has(*parent).unwrap())
240                        .collect_vec()
241                        .push_if_empty(ORIGIN),
242                );
243
244                self.relations_stores.write()[level].insert(header.hash, parents.clone()).unwrap();
245                let gd = if header.hash == self.genesis_hash {
246                    self.ghostdag_managers[level].genesis_ghostdag_data()
247                } else if level == 0 {
248                    if let Some(gd) = trusted_gd_map.get(&header.hash) {
249                        gd.clone()
250                    } else {
251                        let calculated_gd = self.ghostdag_managers[level].ghostdag(&parents);
252                        // Override the ghostdag data with the real blue score and blue work
253                        GhostdagData {
254                            blue_score: header.blue_score,
255                            blue_work: header.blue_work,
256                            selected_parent: calculated_gd.selected_parent,
257                            mergeset_blues: calculated_gd.mergeset_blues.clone(),
258                            mergeset_reds: calculated_gd.mergeset_reds.clone(),
259                            blues_anticone_sizes: calculated_gd.blues_anticone_sizes.clone(),
260                        }
261                    }
262                } else {
263                    self.ghostdag_managers[level].ghostdag(&parents)
264                };
265                self.ghostdag_stores[level].insert(header.hash, Arc::new(gd)).unwrap();
266            }
267        }
268
269        let virtual_parents = vec![pruning_point];
270        let virtual_state = Arc::new(VirtualState {
271            parents: virtual_parents.clone(),
272            ghostdag_data: self.ghostdag_managers[0].ghostdag(&virtual_parents),
273            ..VirtualState::default()
274        });
275        self.virtual_stores.write().state.set(virtual_state).unwrap();
276
277        let mut batch = WriteBatch::default();
278        self.body_tips_store.write().init_batch(&mut batch, &virtual_parents).unwrap();
279        self.headers_selected_tip_store
280            .write()
281            .set_batch(&mut batch, SortableBlock { hash: pruning_point, blue_work: pruning_point_header.blue_work })
282            .unwrap();
283        self.selected_chain_store.write().init_with_pruning_point(&mut batch, pruning_point).unwrap();
284        self.depth_store.insert_batch(&mut batch, pruning_point, ORIGIN, ORIGIN).unwrap();
285        self.db.write(batch).unwrap();
286
287        Ok(())
288    }
289
290    fn estimate_proof_unique_size(&self, proof: &PruningPointProof) -> usize {
291        let approx_history_size = proof[0][0].daa_score;
292        let approx_unique_full_levels = f64::log2(approx_history_size as f64 / self.pruning_proof_m as f64).max(0f64) as usize;
293        proof.iter().map(|l| l.len()).sum::<usize>().min((approx_unique_full_levels + 1) * self.pruning_proof_m as usize)
294    }
295
296    pub fn populate_reachability_and_headers(&self, proof: &PruningPointProof) {
297        let capacity_estimate = self.estimate_proof_unique_size(proof);
298        let mut dag = BlockHashMap::with_capacity(capacity_estimate);
299        let mut up_heap = BinaryHeap::with_capacity(capacity_estimate);
300        for header in proof.iter().flatten().cloned() {
301            if let Vacant(e) = dag.entry(header.hash) {
302                let state = kaspa_pow::State::new(&header);
303                let (_, pow) = state.check_pow(header.nonce); // TODO: Check if pow passes
304                let signed_block_level = self.max_block_level as i64 - pow.bits() as i64;
305                let block_level = max(signed_block_level, 0) as BlockLevel;
306                self.headers_store.insert(header.hash, header.clone(), block_level).unwrap();
307
308                let mut parents = BlockHashSet::with_capacity(header.direct_parents().len() * 2);
309                // We collect all available parent relations in order to maximize reachability information.
310                // By taking into account parents from all levels we ensure that the induced DAG has valid
311                // reachability information for each level-specific sub-DAG -- hence a single reachability
312                // oracle can serve them all
313                for level in 0..=self.max_block_level {
314                    for parent in self.parents_manager.parents_at_level(&header, level) {
315                        parents.insert(*parent);
316                    }
317                }
318
319                struct DagEntry {
320                    header: Arc<Header>,
321                    parents: Arc<BlockHashSet>,
322                }
323
324                up_heap.push(Reverse(SortableBlock { hash: header.hash, blue_work: header.blue_work }));
325                e.insert(DagEntry { header, parents: Arc::new(parents) });
326            }
327        }
328
329        debug!("Estimated proof size: {}, actual size: {}", capacity_estimate, dag.len());
330
331        for reverse_sortable_block in up_heap.into_sorted_iter() {
332            // TODO: Convert to into_iter_sorted once it gets stable
333            let hash = reverse_sortable_block.0.hash;
334            let dag_entry = dag.get(&hash).unwrap();
335
336            // Filter only existing parents
337            let parents_in_dag = BinaryHeap::from_iter(
338                dag_entry
339                    .parents
340                    .iter()
341                    .cloned()
342                    .filter(|parent| dag.contains_key(parent))
343                    .map(|parent| SortableBlock { hash: parent, blue_work: dag.get(&parent).unwrap().header.blue_work }),
344            );
345
346            let reachability_read = self.reachability_store.upgradable_read();
347
348            // Find the maximal parent antichain from the possibly redundant set of existing parents
349            let mut reachability_parents: Vec<SortableBlock> = Vec::new();
350            for parent in parents_in_dag.into_sorted_iter() {
351                if reachability_read.is_dag_ancestor_of_any(parent.hash, &mut reachability_parents.iter().map(|parent| parent.hash)) {
352                    continue;
353                }
354
355                reachability_parents.push(parent);
356            }
357            let reachability_parents_hashes =
358                BlockHashes::new(reachability_parents.iter().map(|parent| parent.hash).collect_vec().push_if_empty(ORIGIN));
359            let selected_parent = reachability_parents.iter().max().map(|parent| parent.hash).unwrap_or(ORIGIN);
360
361            // Prepare batch
362            let mut batch = WriteBatch::default();
363            let mut reachability_relations_write = self.reachability_relations_store.write();
364            let mut staging_reachability = StagingReachabilityStore::new(reachability_read);
365            let mut staging_reachability_relations = StagingRelationsStore::new(&mut reachability_relations_write);
366
367            // Stage
368            staging_reachability_relations.insert(hash, reachability_parents_hashes.clone()).unwrap();
369            let mergeset = unordered_mergeset_without_selected_parent(
370                &staging_reachability_relations,
371                &staging_reachability,
372                selected_parent,
373                &reachability_parents_hashes,
374            );
375            reachability::add_block(&mut staging_reachability, hash, selected_parent, &mut mergeset.iter().copied()).unwrap();
376
377            // Commit
378            let reachability_write = staging_reachability.commit(&mut batch).unwrap();
379            staging_reachability_relations.commit(&mut batch).unwrap();
380
381            // Write
382            self.db.write(batch).unwrap();
383
384            // Drop
385            drop(reachability_write);
386            drop(reachability_relations_write);
387        }
388    }
389
390    pub fn validate_pruning_point_proof(&self, proof: &PruningPointProof) -> PruningImportResult<()> {
391        if proof.len() != self.max_block_level as usize + 1 {
392            return Err(PruningImportError::ProofNotEnoughLevels(self.max_block_level as usize + 1));
393        }
394        if proof[0].is_empty() {
395            return Err(PruningImportError::PruningProofNotEnoughHeaders);
396        }
397
398        let headers_estimate = self.estimate_proof_unique_size(proof);
399        let proof_pp_header = proof[0].last().expect("checked if empty");
400        let proof_pp = proof_pp_header.hash;
401        let proof_pp_level = calc_block_level(proof_pp_header, self.max_block_level);
402        let (db_lifetime, db) = kaspa_database::create_temp_db!(ConnBuilder::default().with_files_limit(10));
403        let cache_policy = CachePolicy::Count(2 * self.pruning_proof_m as usize);
404        let headers_store =
405            Arc::new(DbHeadersStore::new(db.clone(), CachePolicy::Count(headers_estimate), CachePolicy::Count(headers_estimate)));
406        let ghostdag_stores = (0..=self.max_block_level)
407            .map(|level| Arc::new(DbGhostdagStore::new(db.clone(), level, cache_policy, cache_policy)))
408            .collect_vec();
409        let mut relations_stores =
410            (0..=self.max_block_level).map(|level| DbRelationsStore::new(db.clone(), level, cache_policy, cache_policy)).collect_vec();
411        let reachability_stores = (0..=self.max_block_level)
412            .map(|level| Arc::new(RwLock::new(DbReachabilityStore::with_block_level(db.clone(), cache_policy, cache_policy, level))))
413            .collect_vec();
414
415        let reachability_services = (0..=self.max_block_level)
416            .map(|level| MTReachabilityService::new(reachability_stores[level as usize].clone()))
417            .collect_vec();
418
419        let ghostdag_managers = ghostdag_stores
420            .iter()
421            .cloned()
422            .enumerate()
423            .map(|(level, ghostdag_store)| {
424                GhostdagManager::new(
425                    self.genesis_hash,
426                    self.ghostdag_k,
427                    ghostdag_store,
428                    relations_stores[level].clone(),
429                    headers_store.clone(),
430                    reachability_services[level].clone(),
431                )
432            })
433            .collect_vec();
434
435        {
436            let mut batch = WriteBatch::default();
437            for level in 0..=self.max_block_level {
438                let level = level as usize;
439                reachability::init(reachability_stores[level].write().deref_mut()).unwrap();
440                relations_stores[level].insert_batch(&mut batch, ORIGIN, BlockHashes::new(vec![])).unwrap();
441                ghostdag_stores[level].insert(ORIGIN, self.ghostdag_managers[level].origin_ghostdag_data()).unwrap();
442            }
443
444            db.write(batch).unwrap();
445        }
446
447        let mut selected_tip_by_level = vec![None; self.max_block_level as usize + 1];
448        for level in (0..=self.max_block_level).rev() {
449            // Before processing this level, check if the process is exiting so we can end early
450            if self.is_consensus_exiting.load(Ordering::Relaxed) {
451                return Err(PruningImportError::PruningValidationInterrupted);
452            }
453
454            info!("Validating level {level} from the pruning point proof ({} headers)", proof[level as usize].len());
455            let level_idx = level as usize;
456            let mut selected_tip = None;
457            for (i, header) in proof[level as usize].iter().enumerate() {
458                let header_level = calc_block_level(header, self.max_block_level);
459                if header_level < level {
460                    return Err(PruningImportError::PruningProofWrongBlockLevel(header.hash, header_level, level));
461                }
462
463                headers_store.insert(header.hash, header.clone(), header_level).unwrap_or_exists();
464
465                let parents = self
466                    .parents_manager
467                    .parents_at_level(header, level)
468                    .iter()
469                    .copied()
470                    .filter(|parent| ghostdag_stores[level_idx].has(*parent).unwrap())
471                    .collect_vec();
472
473                // Only the first block at each level is allowed to have no known parents
474                if parents.is_empty() && i != 0 {
475                    return Err(PruningImportError::PruningProofHeaderWithNoKnownParents(header.hash, level));
476                }
477
478                let parents: BlockHashes = parents.push_if_empty(ORIGIN).into();
479
480                if relations_stores[level_idx].has(header.hash).unwrap() {
481                    return Err(PruningImportError::PruningProofDuplicateHeaderAtLevel(header.hash, level));
482                }
483
484                relations_stores[level_idx].insert(header.hash, parents.clone()).unwrap();
485                let ghostdag_data = Arc::new(ghostdag_managers[level_idx].ghostdag(&parents));
486                ghostdag_stores[level_idx].insert(header.hash, ghostdag_data.clone()).unwrap();
487                selected_tip = Some(match selected_tip {
488                    Some(tip) => ghostdag_managers[level_idx].find_selected_parent([tip, header.hash]),
489                    None => header.hash,
490                });
491
492                let mut reachability_mergeset = {
493                    let reachability_read = reachability_stores[level_idx].read();
494                    ghostdag_data
495                        .unordered_mergeset_without_selected_parent()
496                        .filter(|hash| reachability_read.has(*hash).unwrap())
497                        .collect_vec() // We collect to vector so reachability_read can be released and let `reachability::add_block` use a write lock.
498                        .into_iter()
499                };
500                reachability::add_block(
501                    reachability_stores[level_idx].write().deref_mut(),
502                    header.hash,
503                    ghostdag_data.selected_parent,
504                    &mut reachability_mergeset,
505                )
506                .unwrap();
507
508                if selected_tip.unwrap() == header.hash {
509                    reachability::hint_virtual_selected_parent(reachability_stores[level_idx].write().deref_mut(), header.hash)
510                        .unwrap();
511                }
512            }
513
514            if level < self.max_block_level {
515                let block_at_depth_m_at_next_level = self
516                    .block_at_depth(
517                        &*ghostdag_stores[level_idx + 1],
518                        selected_tip_by_level[level_idx + 1].unwrap(),
519                        self.pruning_proof_m,
520                    )
521                    .unwrap();
522                if !relations_stores[level_idx].has(block_at_depth_m_at_next_level).unwrap() {
523                    return Err(PruningImportError::PruningProofMissingBlockAtDepthMFromNextLevel(level, level + 1));
524                }
525            }
526
527            if selected_tip.unwrap() != proof_pp
528                && !self.parents_manager.parents_at_level(proof_pp_header, level).contains(&selected_tip.unwrap())
529            {
530                return Err(PruningImportError::PruningProofMissesBlocksBelowPruningPoint(selected_tip.unwrap(), level));
531            }
532
533            selected_tip_by_level[level_idx] = selected_tip;
534        }
535
536        let pruning_read = self.pruning_point_store.read();
537        let relations_read = self.relations_stores.read();
538        let current_pp = pruning_read.get().unwrap().pruning_point;
539        let current_pp_header = self.headers_store.get_header(current_pp).unwrap();
540
541        for (level_idx, selected_tip) in selected_tip_by_level.into_iter().enumerate() {
542            let level = level_idx as BlockLevel;
543            let selected_tip = selected_tip.unwrap();
544            if level <= proof_pp_level {
545                if selected_tip != proof_pp {
546                    return Err(PruningImportError::PruningProofSelectedTipIsNotThePruningPoint(selected_tip, level));
547                }
548            } else if !self.parents_manager.parents_at_level(proof_pp_header, level).contains(&selected_tip) {
549                return Err(PruningImportError::PruningProofSelectedTipNotParentOfPruningPoint(selected_tip, level));
550            }
551
552            let proof_selected_tip_gd = ghostdag_stores[level_idx].get_compact_data(selected_tip).unwrap();
553            if proof_selected_tip_gd.blue_score < 2 * self.pruning_proof_m {
554                continue;
555            }
556
557            let mut proof_current = selected_tip;
558            let mut proof_current_gd = proof_selected_tip_gd;
559            let common_ancestor_data = loop {
560                match self.ghostdag_stores[level_idx].get_compact_data(proof_current).unwrap_option() {
561                    Some(current_gd) => {
562                        break Some((proof_current_gd, current_gd));
563                    }
564                    None => {
565                        proof_current = proof_current_gd.selected_parent;
566                        if proof_current.is_origin() {
567                            break None;
568                        }
569                        proof_current_gd = ghostdag_stores[level_idx].get_compact_data(proof_current).unwrap();
570                    }
571                };
572            };
573
574            if let Some((proof_common_ancestor_gd, common_ancestor_gd)) = common_ancestor_data {
575                let selected_tip_blue_work_diff =
576                    SignedInteger::from(proof_selected_tip_gd.blue_work) - SignedInteger::from(proof_common_ancestor_gd.blue_work);
577                for parent in self.parents_manager.parents_at_level(&current_pp_header, level).iter().copied() {
578                    let parent_blue_work = self.ghostdag_stores[level_idx].get_blue_work(parent).unwrap();
579                    let parent_blue_work_diff =
580                        SignedInteger::from(parent_blue_work) - SignedInteger::from(common_ancestor_gd.blue_work);
581                    if parent_blue_work_diff >= selected_tip_blue_work_diff {
582                        return Err(PruningImportError::PruningProofInsufficientBlueWork);
583                    }
584                }
585
586                return Ok(());
587            }
588        }
589
590        if current_pp == self.genesis_hash {
591            // If the proof has better tips and the current pruning point is still
592            // genesis, we consider the proof state to be better.
593            return Ok(());
594        }
595
596        for level in (0..=self.max_block_level).rev() {
597            let level_idx = level as usize;
598            match relations_read[level_idx].get_parents(current_pp).unwrap_option() {
599                Some(parents) => {
600                    if parents
601                        .iter()
602                        .copied()
603                        .any(|parent| self.ghostdag_stores[level_idx].get_blue_score(parent).unwrap() < 2 * self.pruning_proof_m)
604                    {
605                        return Ok(());
606                    }
607                }
608                None => {
609                    // If the current pruning point doesn't have a parent at this level, we consider the proof state to be better.
610                    return Ok(());
611                }
612            }
613        }
614
615        drop(pruning_read);
616        drop(relations_read);
617        drop(db_lifetime);
618
619        Err(PruningImportError::PruningProofNotEnoughHeaders)
620    }
621
622    pub(crate) fn build_pruning_point_proof(&self, pp: Hash) -> PruningPointProof {
623        if pp == self.genesis_hash {
624            return vec![];
625        }
626
627        let pp_header = self.headers_store.get_header_with_block_level(pp).unwrap();
628        let selected_tip_by_level = (0..=self.max_block_level)
629            .map(|level| {
630                if level <= pp_header.block_level {
631                    pp
632                } else {
633                    self.ghostdag_managers[level as usize].find_selected_parent(
634                        self.parents_manager
635                            .parents_at_level(&pp_header.header, level)
636                            .iter()
637                            .filter(|parent| self.ghostdag_stores[level as usize].has(**parent).unwrap())
638                            .cloned(),
639                    )
640                }
641            })
642            .collect_vec();
643
644        (0..=self.max_block_level)
645            .map(|level| {
646                let level = level as usize;
647                let selected_tip = selected_tip_by_level[level];
648                let block_at_depth_2m = self
649                    .block_at_depth(&*self.ghostdag_stores[level], selected_tip, 2 * self.pruning_proof_m)
650                    .map_err(|err| format!("level: {}, err: {}", level, err))
651                    .unwrap();
652
653                let root = if level != self.max_block_level as usize {
654                    let block_at_depth_m_at_next_level = self
655                        .block_at_depth(&*self.ghostdag_stores[level + 1], selected_tip_by_level[level + 1], self.pruning_proof_m)
656                        .map_err(|err| format!("level + 1: {}, err: {}", level + 1, err))
657                        .unwrap();
658                    if self.reachability_service.is_dag_ancestor_of(block_at_depth_m_at_next_level, block_at_depth_2m) {
659                        block_at_depth_m_at_next_level
660                    } else if self.reachability_service.is_dag_ancestor_of(block_at_depth_2m, block_at_depth_m_at_next_level) {
661                        block_at_depth_2m
662                    } else {
663                        self.find_common_ancestor_in_chain_of_a(
664                            &*self.ghostdag_stores[level],
665                            block_at_depth_m_at_next_level,
666                            block_at_depth_2m,
667                        )
668                        .map_err(|err| format!("level: {}, err: {}", level, err))
669                        .unwrap()
670                    }
671                } else {
672                    block_at_depth_2m
673                };
674
675                let mut headers = Vec::with_capacity(2 * self.pruning_proof_m as usize);
676                let mut queue = BinaryHeap::<Reverse<SortableBlock>>::new();
677                let mut visited = BlockHashSet::new();
678                queue.push(Reverse(SortableBlock::new(root, self.ghostdag_stores[level].get_blue_work(root).unwrap())));
679                while let Some(current) = queue.pop() {
680                    let current = current.0.hash;
681                    if !visited.insert(current) {
682                        continue;
683                    }
684
685                    if !self.reachability_service.is_dag_ancestor_of(current, selected_tip) {
686                        continue;
687                    }
688
689                    headers.push(self.headers_store.get_header(current).unwrap());
690                    for child in self.relations_stores.read()[level].get_children(current).unwrap().read().iter().copied() {
691                        queue.push(Reverse(SortableBlock::new(child, self.ghostdag_stores[level].get_blue_work(child).unwrap())));
692                    }
693                }
694
695                // Temp assertion for verifying a bug fix: assert that the full 2M chain is actually contained in the composed level proof
696                let set = BlockHashSet::from_iter(headers.iter().map(|h| h.hash));
697                let chain_2m = self
698                    .chain_up_to_depth(&*self.ghostdag_stores[level], selected_tip, 2 * self.pruning_proof_m)
699                    .map_err(|err| {
700                        dbg!(level, selected_tip, block_at_depth_2m, root);
701                        format!("Assert 2M chain -- level: {}, err: {}", level, err)
702                    })
703                    .unwrap();
704                let chain_2m_len = chain_2m.len();
705                for (i, chain_hash) in chain_2m.into_iter().enumerate() {
706                    if !set.contains(&chain_hash) {
707                        let next_level_tip = selected_tip_by_level[level + 1];
708                        let next_level_chain_m =
709                            self.chain_up_to_depth(&*self.ghostdag_stores[level + 1], next_level_tip, self.pruning_proof_m).unwrap();
710                        let next_level_block_m = next_level_chain_m.last().copied().unwrap();
711                        dbg!(next_level_chain_m.len());
712                        dbg!(self.ghostdag_stores[level + 1].get_compact_data(next_level_tip).unwrap().blue_score);
713                        dbg!(self.ghostdag_stores[level + 1].get_compact_data(next_level_block_m).unwrap().blue_score);
714                        dbg!(self.ghostdag_stores[level].get_compact_data(selected_tip).unwrap().blue_score);
715                        dbg!(self.ghostdag_stores[level].get_compact_data(block_at_depth_2m).unwrap().blue_score);
716                        dbg!(level, selected_tip, block_at_depth_2m, root);
717                        panic!("Assert 2M chain -- missing block {} at index {} out of {} chain blocks", chain_hash, i, chain_2m_len);
718                    }
719                }
720
721                headers
722            })
723            .collect_vec()
724    }
725
726    /// Copy of `block_at_depth` which returns the full chain up to depth. Temporarily used for assertion purposes.
727    fn chain_up_to_depth(
728        &self,
729        ghostdag_store: &impl GhostdagStoreReader,
730        high: Hash,
731        depth: u64,
732    ) -> Result<Vec<Hash>, PruningProofManagerInternalError> {
733        let high_gd = ghostdag_store
734            .get_compact_data(high)
735            .map_err(|err| PruningProofManagerInternalError::BlockAtDepth(format!("high: {high}, depth: {depth}, {err}")))?;
736        let mut current_gd = high_gd;
737        let mut current = high;
738        let mut res = vec![current];
739        while current_gd.blue_score + depth >= high_gd.blue_score {
740            if current_gd.selected_parent.is_origin() {
741                break;
742            }
743            let prev = current;
744            current = current_gd.selected_parent;
745            res.push(current);
746            current_gd = ghostdag_store.get_compact_data(current).map_err(|err| {
747                PruningProofManagerInternalError::BlockAtDepth(format!(
748                    "high: {}, depth: {}, current: {}, high blue score: {}, current blue score: {}, {}",
749                    high, depth, prev, high_gd.blue_score, current_gd.blue_score, err
750                ))
751            })?;
752        }
753        Ok(res)
754    }
755
756    fn block_at_depth(
757        &self,
758        ghostdag_store: &impl GhostdagStoreReader,
759        high: Hash,
760        depth: u64,
761    ) -> Result<Hash, PruningProofManagerInternalError> {
762        let high_gd = ghostdag_store
763            .get_compact_data(high)
764            .map_err(|err| PruningProofManagerInternalError::BlockAtDepth(format!("high: {high}, depth: {depth}, {err}")))?;
765        let mut current_gd = high_gd;
766        let mut current = high;
767        while current_gd.blue_score + depth >= high_gd.blue_score {
768            if current_gd.selected_parent.is_origin() {
769                break;
770            }
771            let prev = current;
772            current = current_gd.selected_parent;
773            current_gd = ghostdag_store.get_compact_data(current).map_err(|err| {
774                PruningProofManagerInternalError::BlockAtDepth(format!(
775                    "high: {}, depth: {}, current: {}, high blue score: {}, current blue score: {}, {}",
776                    high, depth, prev, high_gd.blue_score, current_gd.blue_score, err
777                ))
778            })?;
779        }
780        Ok(current)
781    }
782
783    fn find_common_ancestor_in_chain_of_a(
784        &self,
785        ghostdag_store: &impl GhostdagStoreReader,
786        a: Hash,
787        b: Hash,
788    ) -> Result<Hash, PruningProofManagerInternalError> {
789        let a_gd = ghostdag_store
790            .get_compact_data(a)
791            .map_err(|err| PruningProofManagerInternalError::FindCommonAncestor(format!("a: {a}, b: {b}, {err}")))?;
792        let mut current_gd = a_gd;
793        let mut current;
794        let mut loop_counter = 0;
795        loop {
796            current = current_gd.selected_parent;
797            loop_counter += 1;
798            if current.is_origin() {
799                break Err(PruningProofManagerInternalError::NoCommonAncestor(format!("a: {a}, b: {b} ({loop_counter} loop steps)")));
800            }
801            if self.reachability_service.is_dag_ancestor_of(current, b) {
802                break Ok(current);
803            }
804            current_gd = ghostdag_store
805                .get_compact_data(current)
806                .map_err(|err| PruningProofManagerInternalError::FindCommonAncestor(format!("a: {a}, b: {b}, {err}")))?;
807        }
808    }
809
810    /// Returns the k + 1 chain blocks below this hash (inclusive). If data is missing
811    /// the search is halted and a partial chain is returned.
812    ///
813    /// The returned hashes are guaranteed to have GHOSTDAG data
814    pub(crate) fn get_ghostdag_chain_k_depth(&self, hash: Hash) -> Vec<Hash> {
815        let mut hashes = Vec::with_capacity(self.ghostdag_k as usize + 1);
816        let mut current = hash;
817        for _ in 0..=self.ghostdag_k {
818            hashes.push(current);
819            let Some(parent) = self.ghostdag_stores[0].get_selected_parent(current).unwrap_option() else {
820                break;
821            };
822            if parent == self.genesis_hash || parent == blockhash::ORIGIN {
823                break;
824            }
825            current = parent;
826        }
827        hashes
828    }
829
830    pub(crate) fn calculate_pruning_point_anticone_and_trusted_data(
831        &self,
832        pruning_point: Hash,
833        virtual_parents: impl Iterator<Item = Hash>,
834    ) -> PruningPointTrustedData {
835        let anticone = self
836            .traversal_manager
837            .anticone(pruning_point, virtual_parents, None)
838            .expect("no error is expected when max_traversal_allowed is None");
839        let mut anticone = self.ghostdag_managers[0].sort_blocks(anticone);
840        anticone.insert(0, pruning_point);
841
842        let mut daa_window_blocks = BlockHashMap::new();
843        let mut ghostdag_blocks = BlockHashMap::new();
844
845        // PRUNE SAFETY: called either via consensus under the prune guard or by the pruning processor (hence no pruning in parallel)
846
847        for anticone_block in anticone.iter().copied() {
848            let window = self
849                .window_manager
850                .block_window(&self.ghostdag_stores[0].get_data(anticone_block).unwrap(), WindowType::FullDifficultyWindow)
851                .unwrap();
852
853            for hash in window.deref().iter().map(|block| block.0.hash) {
854                if let Entry::Vacant(e) = daa_window_blocks.entry(hash) {
855                    e.insert(TrustedHeader {
856                        header: self.headers_store.get_header(hash).unwrap(),
857                        ghostdag: (&*self.ghostdag_stores[0].get_data(hash).unwrap()).into(),
858                    });
859                }
860            }
861
862            let ghostdag_chain = self.get_ghostdag_chain_k_depth(anticone_block);
863            for hash in ghostdag_chain {
864                if let Entry::Vacant(e) = ghostdag_blocks.entry(hash) {
865                    let ghostdag = self.ghostdag_stores[0].get_data(hash).unwrap();
866                    e.insert((&*ghostdag).into());
867
868                    // We fill `ghostdag_blocks` only for kaspad-go legacy reasons, but the real set we
869                    // send is `daa_window_blocks` which represents the full trusted sub-DAG in the antifuture
870                    // of the pruning point which kaspad-rust nodes expect to get when synced with headers proof
871                    if let Entry::Vacant(e) = daa_window_blocks.entry(hash) {
872                        e.insert(TrustedHeader {
873                            header: self.headers_store.get_header(hash).unwrap(),
874                            ghostdag: (&*ghostdag).into(),
875                        });
876                    }
877                }
878            }
879        }
880
881        // We traverse the DAG in the past of the pruning point and its anticone in order to make sure
882        // that the sub-DAG we share (which contains the union of DAA windows), is contiguous and includes
883        // all blocks between the pruning point and the DAA window blocks. This is crucial for the syncee
884        // to be able to build full reachability data of the sub-DAG and to actually validate that only the
885        // claimed anticone is indeed the pp anticone and all the rest of the blocks are in the pp past.
886
887        // We use the min blue-work in order to identify where the traversal can halt
888        let min_blue_work = daa_window_blocks.values().map(|th| th.header.blue_work).min().expect("non empty");
889        let mut queue = VecDeque::from_iter(anticone.iter().copied());
890        let mut visited = BlockHashSet::from_iter(queue.iter().copied().chain(std::iter::once(blockhash::ORIGIN))); // Mark origin as visited to avoid processing it
891        while let Some(current) = queue.pop_front() {
892            if let Entry::Vacant(e) = daa_window_blocks.entry(current) {
893                let header = self.headers_store.get_header(current).unwrap();
894                if header.blue_work < min_blue_work {
895                    continue;
896                }
897                let ghostdag = (&*self.ghostdag_stores[0].get_data(current).unwrap()).into();
898                e.insert(TrustedHeader { header, ghostdag });
899            }
900            let parents = self.relations_stores.read()[0].get_parents(current).unwrap();
901            for parent in parents.iter().copied() {
902                if visited.insert(parent) {
903                    queue.push_back(parent);
904                }
905            }
906        }
907
908        PruningPointTrustedData {
909            anticone,
910            daa_window_blocks: daa_window_blocks.into_values().collect_vec(),
911            ghostdag_blocks: ghostdag_blocks.into_iter().map(|(hash, ghostdag)| TrustedGhostdagData { hash, ghostdag }).collect_vec(),
912        }
913    }
914
915    pub fn get_pruning_point_proof(&self) -> Arc<PruningPointProof> {
916        let pp = self.pruning_point_store.read().pruning_point().unwrap();
917        let mut cache_lock = self.cached_proof.lock();
918        if let Some(cache) = cache_lock.clone() {
919            if cache.pruning_point == pp {
920                return cache.data;
921            }
922        }
923        let proof = Arc::new(self.build_pruning_point_proof(pp));
924        cache_lock.replace(CachedPruningPointData { pruning_point: pp, data: proof.clone() });
925        proof
926    }
927
928    pub fn get_pruning_point_anticone_and_trusted_data(&self) -> ConsensusResult<Arc<PruningPointTrustedData>> {
929        let pp = self.pruning_point_store.read().pruning_point().unwrap();
930        let mut cache_lock = self.cached_anticone.lock();
931        if let Some(cache) = cache_lock.clone() {
932            if cache.pruning_point == pp {
933                return Ok(cache.data);
934            }
935        }
936
937        let virtual_state = self.virtual_stores.read().state.get().unwrap();
938        let pp_bs = self.headers_store.get_blue_score(pp).unwrap();
939
940        // The anticone is considered final only if the pruning point is at sufficient depth from virtual
941        if virtual_state.ghostdag_data.blue_score >= pp_bs + self.anticone_finalization_depth {
942            let anticone = Arc::new(self.calculate_pruning_point_anticone_and_trusted_data(pp, virtual_state.parents.iter().copied()));
943            cache_lock.replace(CachedPruningPointData { pruning_point: pp, data: anticone.clone() });
944            Ok(anticone)
945        } else {
946            Err(ConsensusError::PruningPointInsufficientDepth)
947        }
948    }
949}