kaspa_consensus/pipeline/pruning_processor/
processor.rs

1//! TODO: module comment about locking safety and consistency of various pruning stores
2
3use crate::{
4    consensus::{
5        services::{ConsensusServices, DbGhostdagManager, DbParentsManager, DbPruningPointManager},
6        storage::ConsensusStorage,
7    },
8    model::{
9        services::reachability::{MTReachabilityService, ReachabilityService},
10        stores::{
11            ghostdag::{CompactGhostdagData, GhostdagStoreReader},
12            headers::HeaderStoreReader,
13            past_pruning_points::PastPruningPointsStoreReader,
14            pruning::{PruningStore, PruningStoreReader},
15            reachability::{DbReachabilityStore, ReachabilityStoreReader, StagingReachabilityStore},
16            relations::StagingRelationsStore,
17            selected_chain::SelectedChainStore,
18            statuses::StatusesStoreReader,
19            tips::{TipsStore, TipsStoreReader},
20            utxo_diffs::UtxoDiffsStoreReader,
21        },
22    },
23    processes::{pruning_proof::PruningProofManager, reachability::inquirer as reachability, relations},
24};
25use crossbeam_channel::Receiver as CrossbeamReceiver;
26use itertools::Itertools;
27use kaspa_consensus_core::{
28    blockhash::ORIGIN,
29    blockstatus::BlockStatus::StatusHeaderOnly,
30    config::Config,
31    muhash::MuHashExtensions,
32    pruning::{PruningPointProof, PruningPointTrustedData},
33    trusted::ExternalGhostdagData,
34    BlockHashMap, BlockHashSet, BlockLevel,
35};
36use kaspa_consensusmanager::SessionLock;
37use kaspa_core::{debug, info, warn};
38use kaspa_database::prelude::{BatchDbWriter, MemoryWriter, StoreResultExtensions, DB};
39use kaspa_hashes::Hash;
40use kaspa_muhash::MuHash;
41use kaspa_utils::iter::IterExtensions;
42use parking_lot::RwLockUpgradableReadGuard;
43use rocksdb::WriteBatch;
44use std::{
45    collections::{hash_map::Entry::Vacant, VecDeque},
46    ops::Deref,
47    sync::{
48        atomic::{AtomicBool, Ordering},
49        Arc,
50    },
51    time::{Duration, Instant},
52};
53
54pub enum PruningProcessingMessage {
55    Exit,
56    Process { sink_ghostdag_data: CompactGhostdagData },
57}
58
59/// A processor dedicated for moving the pruning point and pruning any possible data in its past
60pub struct PruningProcessor {
61    // Channels
62    receiver: CrossbeamReceiver<PruningProcessingMessage>,
63
64    // DB
65    db: Arc<DB>,
66
67    // Storage
68    storage: Arc<ConsensusStorage>,
69
70    // Managers and Services
71    reachability_service: MTReachabilityService<DbReachabilityStore>,
72    ghostdag_managers: Arc<Vec<DbGhostdagManager>>,
73    pruning_point_manager: DbPruningPointManager,
74    pruning_proof_manager: Arc<PruningProofManager>,
75    parents_manager: DbParentsManager,
76
77    // Pruning lock
78    pruning_lock: SessionLock,
79
80    // Config
81    config: Arc<Config>,
82
83    // Signals
84    is_consensus_exiting: Arc<AtomicBool>,
85}
86
87impl Deref for PruningProcessor {
88    type Target = ConsensusStorage;
89
90    fn deref(&self) -> &Self::Target {
91        &self.storage
92    }
93}
94
95impl PruningProcessor {
96    pub fn new(
97        receiver: CrossbeamReceiver<PruningProcessingMessage>,
98        db: Arc<DB>,
99        storage: &Arc<ConsensusStorage>,
100        services: &Arc<ConsensusServices>,
101        pruning_lock: SessionLock,
102        config: Arc<Config>,
103        is_consensus_exiting: Arc<AtomicBool>,
104    ) -> Self {
105        Self {
106            receiver,
107            db,
108            storage: storage.clone(),
109            reachability_service: services.reachability_service.clone(),
110            ghostdag_managers: services.ghostdag_managers.clone(),
111            pruning_point_manager: services.pruning_point_manager.clone(),
112            pruning_proof_manager: services.pruning_proof_manager.clone(),
113            parents_manager: services.parents_manager.clone(),
114            pruning_lock,
115            config,
116            is_consensus_exiting,
117        }
118    }
119
120    pub fn worker(self: &Arc<Self>) {
121        let Ok(PruningProcessingMessage::Process { sink_ghostdag_data }) = self.receiver.recv() else {
122            return;
123        };
124
125        // On start-up, check if any pruning workflows require recovery. We wait for the first processing message to arrive
126        // in order to make sure the node is already connected and receiving blocks before we start background recovery operations
127        self.recover_pruning_workflows_if_needed();
128        self.advance_pruning_point_and_candidate_if_possible(sink_ghostdag_data);
129
130        while let Ok(PruningProcessingMessage::Process { sink_ghostdag_data }) = self.receiver.recv() {
131            self.advance_pruning_point_and_candidate_if_possible(sink_ghostdag_data);
132        }
133    }
134
135    fn recover_pruning_workflows_if_needed(&self) {
136        let pruning_point_read = self.pruning_point_store.read();
137        let pruning_point = pruning_point_read.pruning_point().unwrap();
138        let history_root = pruning_point_read.history_root().unwrap_option();
139        let pruning_utxoset_position = self.pruning_utxoset_stores.read().utxoset_position().unwrap_option();
140        drop(pruning_point_read);
141
142        debug!(
143            "[PRUNING PROCESSOR] recovery check: current pruning point: {}, history root: {:?}, pruning utxoset position: {:?}",
144            pruning_point, history_root, pruning_utxoset_position
145        );
146
147        if let Some(pruning_utxoset_position) = pruning_utxoset_position {
148            // This indicates the node crashed during a former pruning point move and we need to recover
149            if pruning_utxoset_position != pruning_point {
150                info!("Recovering pruning utxo-set from {} to the pruning point {}", pruning_utxoset_position, pruning_point);
151                if !self.advance_pruning_utxoset(pruning_utxoset_position, pruning_point) {
152                    info!("Interrupted while advancing the pruning point UTXO set: Process is exiting");
153                    return;
154                }
155            }
156        }
157
158        if let Some(history_root) = history_root {
159            // This indicates the node crashed or was forced to stop during a former data prune operation hence
160            // we need to complete it
161            if history_root != pruning_point {
162                self.prune(pruning_point);
163            }
164        }
165
166        // TODO: both `pruning_utxoset_position` and `history_root` are new DB keys so for now we assume correct state if the keys are missing
167    }
168
169    fn advance_pruning_point_and_candidate_if_possible(&self, sink_ghostdag_data: CompactGhostdagData) {
170        let pruning_point_read = self.pruning_point_store.upgradable_read();
171        let current_pruning_info = pruning_point_read.get().unwrap();
172        let (new_pruning_points, new_candidate) = self.pruning_point_manager.next_pruning_points_and_candidate_by_ghostdag_data(
173            sink_ghostdag_data,
174            None,
175            current_pruning_info.candidate,
176            current_pruning_info.pruning_point,
177        );
178
179        if !new_pruning_points.is_empty() {
180            // Update past pruning points and pruning point stores
181            let mut batch = WriteBatch::default();
182            let mut pruning_point_write = RwLockUpgradableReadGuard::upgrade(pruning_point_read);
183            for (i, past_pp) in new_pruning_points.iter().copied().enumerate() {
184                self.past_pruning_points_store.insert_batch(&mut batch, current_pruning_info.index + i as u64 + 1, past_pp).unwrap();
185            }
186            let new_pp_index = current_pruning_info.index + new_pruning_points.len() as u64;
187            let new_pruning_point = *new_pruning_points.last().unwrap();
188            pruning_point_write.set_batch(&mut batch, new_pruning_point, new_candidate, new_pp_index).unwrap();
189            self.db.write(batch).unwrap();
190            drop(pruning_point_write);
191
192            // Inform the user
193            info!("Periodic pruning point movement: advancing from {} to {}", current_pruning_info.pruning_point, new_pruning_point);
194
195            // Advance the pruning point utxoset to the state of the new pruning point using chain-block UTXO diffs
196            if !self.advance_pruning_utxoset(current_pruning_info.pruning_point, new_pruning_point) {
197                info!("Interrupted while advancing the pruning point UTXO set: Process is exiting");
198                return;
199            }
200            info!("Updated the pruning point UTXO set");
201
202            // Finally, prune data in the new pruning point past
203            self.prune(new_pruning_point);
204        } else if new_candidate != current_pruning_info.candidate {
205            let mut pruning_point_write = RwLockUpgradableReadGuard::upgrade(pruning_point_read);
206            pruning_point_write.set(current_pruning_info.pruning_point, new_candidate, current_pruning_info.index).unwrap();
207        }
208    }
209
210    fn advance_pruning_utxoset(&self, utxoset_position: Hash, new_pruning_point: Hash) -> bool {
211        let mut pruning_utxoset_write = self.pruning_utxoset_stores.write();
212        for chain_block in self.reachability_service.forward_chain_iterator(utxoset_position, new_pruning_point, true).skip(1) {
213            if self.is_consensus_exiting.load(Ordering::Relaxed) {
214                return false;
215            }
216            let utxo_diff = self.utxo_diffs_store.get(chain_block).expect("chain blocks have utxo state");
217            let mut batch = WriteBatch::default();
218            pruning_utxoset_write.utxo_set.write_diff_batch(&mut batch, utxo_diff.as_ref()).unwrap();
219            pruning_utxoset_write.set_utxoset_position(&mut batch, chain_block).unwrap();
220            self.db.write(batch).unwrap();
221        }
222        drop(pruning_utxoset_write);
223
224        if self.config.enable_sanity_checks {
225            info!("Performing a sanity check that the new UTXO set has the expected UTXO commitment");
226            self.assert_utxo_commitment(new_pruning_point);
227        }
228        true
229    }
230
231    fn assert_utxo_commitment(&self, pruning_point: Hash) {
232        info!("Verifying the new pruning point UTXO commitment (sanity test)");
233        let commitment = self.headers_store.get_header(pruning_point).unwrap().utxo_commitment;
234        let mut multiset = MuHash::new();
235        let pruning_utxoset_read = self.pruning_utxoset_stores.read();
236        for (outpoint, entry) in pruning_utxoset_read.utxo_set.iterator().map(|r| r.unwrap()) {
237            multiset.add_utxo(&outpoint, &entry);
238        }
239        assert_eq!(multiset.finalize(), commitment, "Updated pruning point utxo set does not match the header utxo commitment");
240        info!("Pruning point UTXO commitment was verified correctly (sanity test)");
241    }
242
243    fn prune(&self, new_pruning_point: Hash) {
244        if self.config.is_archival {
245            warn!("The node is configured as an archival node -- avoiding data pruning. Note this might lead to heavy disk usage.");
246            return;
247        }
248
249        info!("Header and Block pruning: preparing proof and anticone data...");
250
251        let proof = self.pruning_proof_manager.get_pruning_point_proof();
252        let data = self
253            .pruning_proof_manager
254            .get_pruning_point_anticone_and_trusted_data()
255            .expect("insufficient depth error is unexpected here");
256
257        let genesis = self.past_pruning_points_store.get(0).unwrap();
258
259        assert_eq!(new_pruning_point, proof[0].last().unwrap().hash);
260        assert_eq!(new_pruning_point, data.anticone[0]);
261        assert_eq!(genesis, self.config.genesis.hash);
262        assert_eq!(genesis, proof.last().unwrap().last().unwrap().hash);
263
264        // We keep full data for pruning point and its anticone, relations for DAA/GD
265        // windows and pruning proof, and only headers for past pruning points
266        let keep_blocks: BlockHashSet = data.anticone.iter().copied().collect();
267        let mut keep_relations: BlockHashMap<BlockLevel> = std::iter::empty()
268            .chain(data.anticone.iter().copied())
269            .chain(data.daa_window_blocks.iter().map(|th| th.header.hash))
270            .chain(data.ghostdag_blocks.iter().map(|gd| gd.hash))
271            .chain(proof[0].iter().map(|h| h.hash))
272            .map(|h| (h, 0)) // Mark block level 0 for all the above. Note that below we add the remaining levels
273            .collect();
274        let keep_headers: BlockHashSet = self.past_pruning_points();
275
276        info!("Header and Block pruning: waiting for consensus write permissions...");
277
278        let mut prune_guard = self.pruning_lock.blocking_write();
279
280        info!("Starting Header and Block pruning...");
281
282        {
283            let mut counter = 0;
284            let mut batch = WriteBatch::default();
285            // At this point keep_relations only holds level-0 relations which is the correct filtering criteria for primary GHOSTDAG
286            for kept in keep_relations.keys().copied() {
287                let Some(ghostdag) = self.ghostdag_primary_store.get_data(kept).unwrap_option() else {
288                    continue;
289                };
290                if ghostdag.unordered_mergeset().any(|h| !keep_relations.contains_key(&h)) {
291                    let mut mutable_ghostdag: ExternalGhostdagData = ghostdag.as_ref().into();
292                    mutable_ghostdag.mergeset_blues.retain(|h| keep_relations.contains_key(h));
293                    mutable_ghostdag.mergeset_reds.retain(|h| keep_relations.contains_key(h));
294                    mutable_ghostdag.blues_anticone_sizes.retain(|k, _| keep_relations.contains_key(k));
295                    if !keep_relations.contains_key(&mutable_ghostdag.selected_parent) {
296                        mutable_ghostdag.selected_parent = ORIGIN;
297                    }
298                    counter += 1;
299                    self.ghostdag_primary_store.update_batch(&mut batch, kept, &Arc::new(mutable_ghostdag.into())).unwrap();
300                }
301            }
302            self.db.write(batch).unwrap();
303            info!("Header and Block pruning: updated ghostdag data for {} blocks", counter);
304        }
305
306        // No need to hold the prune guard while we continue populating keep_relations
307        drop(prune_guard);
308
309        // Add additional levels only after filtering GHOSTDAG data via level 0
310        for (level, level_proof) in proof.iter().enumerate().skip(1) {
311            let level = level as BlockLevel;
312            // We obtain the headers of the pruning point anticone (including the pruning point)
313            // in order to mark all parents of anticone roots at level as not-to-be-deleted.
314            // This optimizes multi-level parent validation (see ParentsManager)
315            // by avoiding the deletion of high-level parents which might still be needed for future
316            // header validation (avoiding the need for reference blocks; see therein).
317            //
318            // Notes:
319            //
320            // 1. Normally, such blocks would be part of the proof for this level, but here we address the rare case
321            //    where there are a few such parallel blocks (since the proof only contains the past of the pruning point's
322            //    selected-tip-at-level)
323            // 2. We refer to the pp anticone as roots even though technically it might contain blocks which are not a pure
324            //    antichain (i.e., some of them are in the past of others). These blocks only add redundant info which would
325            //    be included anyway.
326            let roots_parents_at_level = data
327                .anticone
328                .iter()
329                .copied()
330                .map(|hash| self.headers_store.get_header_with_block_level(hash).expect("pruning point anticone is not pruned"))
331                .filter(|root| level > root.block_level) // If the root itself is at level, there's no need for its level-parents
332                .flat_map(|root| self.parents_manager.parents_at_level(&root.header, level).iter().copied().collect_vec());
333            for hash in level_proof.iter().map(|header| header.hash).chain(roots_parents_at_level) {
334                if let Vacant(e) = keep_relations.entry(hash) {
335                    // This hash was not added by any lower level -- mark it as affiliated with proof level `level`
336                    e.insert(level);
337                }
338            }
339        }
340
341        prune_guard = self.pruning_lock.blocking_write();
342        let mut lock_acquire_time = Instant::now();
343        let mut reachability_read = self.reachability_store.upgradable_read();
344
345        {
346            // Start with a batch for pruning body tips and selected chain stores
347            let mut batch = WriteBatch::default();
348
349            // Prune tips which can no longer be merged by virtual.
350            // By the prunality proof, any tip which isn't in future(pruning_point) will never be merged
351            // by virtual and hence can be safely deleted
352            let mut tips_write = self.body_tips_store.write();
353            let pruned_tips = tips_write
354                .get()
355                .unwrap()
356                .read()
357                .iter()
358                .copied()
359                .filter(|&h| !reachability_read.is_dag_ancestor_of_result(new_pruning_point, h).unwrap())
360                .collect_vec();
361            tips_write.prune_tips_with_writer(BatchDbWriter::new(&mut batch), &pruned_tips).unwrap();
362            if !pruned_tips.is_empty() {
363                info!(
364                    "Header and Block pruning: pruned {} tips: {}...{}",
365                    pruned_tips.len(),
366                    pruned_tips.iter().take(5.min((pruned_tips.len() + 1) / 2)).reusable_format(", "),
367                    pruned_tips.iter().rev().take(5.min(pruned_tips.len() / 2)).reusable_format(", ")
368                )
369            }
370
371            // Prune the selected chain index below the pruning point
372            let mut selected_chain_write = self.selected_chain_store.write();
373            selected_chain_write.prune_below_pruning_point(BatchDbWriter::new(&mut batch), new_pruning_point).unwrap();
374
375            // Flush the batch to the DB
376            self.db.write(batch).unwrap();
377
378            // Calling the drops explicitly after the batch is written in order to avoid possible errors.
379            drop(selected_chain_write);
380            drop(tips_write);
381        }
382
383        // Now we traverse the anti-future of the new pruning point starting from origin and going up.
384        // The most efficient way to traverse the entire DAG from the bottom-up is via the reachability tree
385        let mut queue = VecDeque::<Hash>::from_iter(reachability_read.get_children(ORIGIN).unwrap().iter().copied());
386        let (mut counter, mut traversed) = (0, 0);
387        info!("Header and Block pruning: starting traversal from: {} (genesis: {})", queue.iter().reusable_format(", "), genesis);
388        while let Some(current) = queue.pop_front() {
389            if reachability_read.is_dag_ancestor_of_result(new_pruning_point, current).unwrap() {
390                continue;
391            }
392            traversed += 1;
393            // Obtain the tree children of `current` and push them to the queue before possibly being deleted below
394            queue.extend(reachability_read.get_children(current).unwrap().iter());
395
396            // If we have the lock for more than a few milliseconds, release and recapture to allow consensus progress during pruning
397            if lock_acquire_time.elapsed() > Duration::from_millis(5) {
398                drop(reachability_read);
399                // An exit signal was received. Exit from this long running process.
400                if self.is_consensus_exiting.load(Ordering::Relaxed) {
401                    drop(prune_guard);
402                    info!("Header and Block pruning interrupted: Process is exiting");
403                    return;
404                }
405                prune_guard.blocking_yield();
406                lock_acquire_time = Instant::now();
407                reachability_read = self.reachability_store.upgradable_read();
408            }
409
410            if traversed % 1000 == 0 {
411                info!("Header and Block pruning: traversed: {}, pruned {}...", traversed, counter);
412            }
413
414            // Remove window cache entries
415            self.block_window_cache_for_difficulty.remove(&current);
416            self.block_window_cache_for_past_median_time.remove(&current);
417
418            if !keep_blocks.contains(&current) {
419                let mut batch = WriteBatch::default();
420                let mut level_relations_write = self.relations_stores.write();
421                let mut reachability_relations_write = self.reachability_relations_store.write();
422                let mut staging_relations = StagingRelationsStore::new(&mut reachability_relations_write);
423                let mut staging_reachability = StagingReachabilityStore::new(reachability_read);
424                let mut statuses_write = self.statuses_store.write();
425
426                // Prune data related to block bodies and UTXO state
427                self.utxo_multisets_store.delete_batch(&mut batch, current).unwrap();
428                self.utxo_diffs_store.delete_batch(&mut batch, current).unwrap();
429                self.acceptance_data_store.delete_batch(&mut batch, current).unwrap();
430                self.block_transactions_store.delete_batch(&mut batch, current).unwrap();
431
432                if let Some(&affiliated_proof_level) = keep_relations.get(&current) {
433                    if statuses_write.get(current).unwrap_option().is_some_and(|s| s.is_valid()) {
434                        // We set the status to header-only only if it was previously set to a valid
435                        // status. This is important since some proof headers might not have their status set
436                        // and we would like to preserve this semantic (having a valid status implies that
437                        // other parts of the code assume the existence of GD data etc.)
438                        statuses_write.set_batch(&mut batch, current, StatusHeaderOnly).unwrap();
439                    }
440
441                    // Delete level-x relations for blocks which only belong to higher-than-x proof levels.
442                    // This preserves the semantic that for each level, relations represent a contiguous DAG area in that level
443                    for lower_level in 0..affiliated_proof_level as usize {
444                        let mut staging_level_relations = StagingRelationsStore::new(&mut level_relations_write[lower_level]);
445                        relations::delete_level_relations(MemoryWriter, &mut staging_level_relations, current).unwrap_option();
446                        staging_level_relations.commit(&mut batch).unwrap();
447                        self.ghostdag_stores[lower_level].delete_batch(&mut batch, current).unwrap_option();
448                    }
449                } else {
450                    // Count only blocks which get fully pruned including DAG relations
451                    counter += 1;
452                    // Prune data related to headers: relations, reachability, ghostdag
453                    let mergeset = relations::delete_reachability_relations(
454                        MemoryWriter, // Both stores are staging so we just pass a dummy writer
455                        &mut staging_relations,
456                        &staging_reachability,
457                        current,
458                    );
459                    reachability::delete_block(&mut staging_reachability, current, &mut mergeset.iter().copied()).unwrap();
460                    // TODO: consider adding block level to compact header data
461                    let block_level = self.headers_store.get_header_with_block_level(current).unwrap().block_level;
462                    (0..=block_level as usize).for_each(|level| {
463                        let mut staging_level_relations = StagingRelationsStore::new(&mut level_relations_write[level]);
464                        relations::delete_level_relations(MemoryWriter, &mut staging_level_relations, current).unwrap_option();
465                        staging_level_relations.commit(&mut batch).unwrap();
466                        self.ghostdag_stores[level].delete_batch(&mut batch, current).unwrap_option();
467                    });
468
469                    // Remove additional header related data
470                    self.daa_excluded_store.delete_batch(&mut batch, current).unwrap();
471                    self.depth_store.delete_batch(&mut batch, current).unwrap();
472                    // Remove status completely
473                    statuses_write.delete_batch(&mut batch, current).unwrap();
474
475                    if !keep_headers.contains(&current) {
476                        // Prune the actual headers
477                        self.headers_store.delete_batch(&mut batch, current).unwrap();
478                    }
479                }
480
481                let reachability_write = staging_reachability.commit(&mut batch).unwrap();
482                staging_relations.commit(&mut batch).unwrap();
483
484                // Flush the batch to the DB
485                self.db.write(batch).unwrap();
486
487                // Calling the drops explicitly after the batch is written in order to avoid possible errors.
488                drop(reachability_write);
489                drop(statuses_write);
490                drop(reachability_relations_write);
491                drop(level_relations_write);
492
493                reachability_read = self.reachability_store.upgradable_read();
494            }
495        }
496
497        drop(reachability_read);
498        drop(prune_guard);
499
500        info!("Header and Block pruning completed: traversed: {}, pruned {}", traversed, counter);
501        info!(
502            "Header and Block pruning stats: proof size: {}, pruning point and anticone: {}, unique headers in proof and windows: {}, pruning points in history: {}",
503            proof.iter().map(|l| l.len()).sum::<usize>(),
504            keep_blocks.len(),
505            keep_relations.len(),
506            keep_headers.len()
507        );
508
509        if self.config.enable_sanity_checks {
510            self.assert_proof_rebuilding(proof, new_pruning_point);
511            self.assert_data_rebuilding(data, new_pruning_point);
512        }
513
514        {
515            // Set the history root to the new pruning point only after we successfully pruned its past
516            let mut pruning_point_write = self.pruning_point_store.write();
517            let mut batch = WriteBatch::default();
518            pruning_point_write.set_history_root(&mut batch, new_pruning_point).unwrap();
519            self.db.write(batch).unwrap();
520            drop(pruning_point_write);
521        }
522    }
523
524    fn past_pruning_points(&self) -> BlockHashSet {
525        (0..self.pruning_point_store.read().get().unwrap().index)
526            .map(|index| self.past_pruning_points_store.get(index).unwrap())
527            .collect()
528    }
529
530    fn assert_proof_rebuilding(&self, ref_proof: Arc<PruningPointProof>, new_pruning_point: Hash) {
531        info!("Rebuilding the pruning proof after pruning data (sanity test)");
532        let proof_hashes = ref_proof.iter().flatten().map(|h| h.hash).collect::<Vec<_>>();
533        let built_proof = self.pruning_proof_manager.build_pruning_point_proof(new_pruning_point);
534        let built_proof_hashes = built_proof.iter().flatten().map(|h| h.hash).collect::<Vec<_>>();
535        assert_eq!(proof_hashes.len(), built_proof_hashes.len(), "Rebuilt proof does not match the expected reference");
536        for (i, (a, b)) in proof_hashes.into_iter().zip(built_proof_hashes).enumerate() {
537            if a != b {
538                panic!("Proof built following pruning does not match the previous proof: built[{}]={}, prev[{}]={}", i, b, i, a);
539            }
540        }
541        info!("Proof was rebuilt successfully following pruning");
542    }
543
544    fn assert_data_rebuilding(&self, ref_data: Arc<PruningPointTrustedData>, new_pruning_point: Hash) {
545        info!("Rebuilding pruning point trusted data (sanity test)");
546        let virtual_state = self.lkg_virtual_state.load();
547        let built_data = self
548            .pruning_proof_manager
549            .calculate_pruning_point_anticone_and_trusted_data(new_pruning_point, virtual_state.parents.iter().copied());
550        assert_eq!(
551            ref_data.anticone.iter().copied().collect::<BlockHashSet>(),
552            built_data.anticone.iter().copied().collect::<BlockHashSet>()
553        );
554        assert_eq!(
555            ref_data.daa_window_blocks.iter().map(|th| th.header.hash).collect::<BlockHashSet>(),
556            built_data.daa_window_blocks.iter().map(|th| th.header.hash).collect::<BlockHashSet>()
557        );
558        assert_eq!(
559            ref_data.ghostdag_blocks.iter().map(|gd| gd.hash).collect::<BlockHashSet>(),
560            built_data.ghostdag_blocks.iter().map(|gd| gd.hash).collect::<BlockHashSet>()
561        );
562        info!("Trusted data was rebuilt successfully following pruning");
563    }
564}