1use crate::{
4 consensus::{
5 services::{ConsensusServices, DbGhostdagManager, DbParentsManager, DbPruningPointManager},
6 storage::ConsensusStorage,
7 },
8 model::{
9 services::reachability::{MTReachabilityService, ReachabilityService},
10 stores::{
11 ghostdag::{CompactGhostdagData, GhostdagStoreReader},
12 headers::HeaderStoreReader,
13 past_pruning_points::PastPruningPointsStoreReader,
14 pruning::{PruningStore, PruningStoreReader},
15 reachability::{DbReachabilityStore, ReachabilityStoreReader, StagingReachabilityStore},
16 relations::StagingRelationsStore,
17 selected_chain::SelectedChainStore,
18 statuses::StatusesStoreReader,
19 tips::{TipsStore, TipsStoreReader},
20 utxo_diffs::UtxoDiffsStoreReader,
21 },
22 },
23 processes::{pruning_proof::PruningProofManager, reachability::inquirer as reachability, relations},
24};
25use crossbeam_channel::Receiver as CrossbeamReceiver;
26use itertools::Itertools;
27use kaspa_consensus_core::{
28 blockhash::ORIGIN,
29 blockstatus::BlockStatus::StatusHeaderOnly,
30 config::Config,
31 muhash::MuHashExtensions,
32 pruning::{PruningPointProof, PruningPointTrustedData},
33 trusted::ExternalGhostdagData,
34 BlockHashMap, BlockHashSet, BlockLevel,
35};
36use kaspa_consensusmanager::SessionLock;
37use kaspa_core::{debug, info, warn};
38use kaspa_database::prelude::{BatchDbWriter, MemoryWriter, StoreResultExtensions, DB};
39use kaspa_hashes::Hash;
40use kaspa_muhash::MuHash;
41use kaspa_utils::iter::IterExtensions;
42use parking_lot::RwLockUpgradableReadGuard;
43use rocksdb::WriteBatch;
44use std::{
45 collections::{hash_map::Entry::Vacant, VecDeque},
46 ops::Deref,
47 sync::{
48 atomic::{AtomicBool, Ordering},
49 Arc,
50 },
51 time::{Duration, Instant},
52};
53
54pub enum PruningProcessingMessage {
55 Exit,
56 Process { sink_ghostdag_data: CompactGhostdagData },
57}
58
59pub struct PruningProcessor {
61 receiver: CrossbeamReceiver<PruningProcessingMessage>,
63
64 db: Arc<DB>,
66
67 storage: Arc<ConsensusStorage>,
69
70 reachability_service: MTReachabilityService<DbReachabilityStore>,
72 ghostdag_managers: Arc<Vec<DbGhostdagManager>>,
73 pruning_point_manager: DbPruningPointManager,
74 pruning_proof_manager: Arc<PruningProofManager>,
75 parents_manager: DbParentsManager,
76
77 pruning_lock: SessionLock,
79
80 config: Arc<Config>,
82
83 is_consensus_exiting: Arc<AtomicBool>,
85}
86
87impl Deref for PruningProcessor {
88 type Target = ConsensusStorage;
89
90 fn deref(&self) -> &Self::Target {
91 &self.storage
92 }
93}
94
95impl PruningProcessor {
96 pub fn new(
97 receiver: CrossbeamReceiver<PruningProcessingMessage>,
98 db: Arc<DB>,
99 storage: &Arc<ConsensusStorage>,
100 services: &Arc<ConsensusServices>,
101 pruning_lock: SessionLock,
102 config: Arc<Config>,
103 is_consensus_exiting: Arc<AtomicBool>,
104 ) -> Self {
105 Self {
106 receiver,
107 db,
108 storage: storage.clone(),
109 reachability_service: services.reachability_service.clone(),
110 ghostdag_managers: services.ghostdag_managers.clone(),
111 pruning_point_manager: services.pruning_point_manager.clone(),
112 pruning_proof_manager: services.pruning_proof_manager.clone(),
113 parents_manager: services.parents_manager.clone(),
114 pruning_lock,
115 config,
116 is_consensus_exiting,
117 }
118 }
119
120 pub fn worker(self: &Arc<Self>) {
121 let Ok(PruningProcessingMessage::Process { sink_ghostdag_data }) = self.receiver.recv() else {
122 return;
123 };
124
125 self.recover_pruning_workflows_if_needed();
128 self.advance_pruning_point_and_candidate_if_possible(sink_ghostdag_data);
129
130 while let Ok(PruningProcessingMessage::Process { sink_ghostdag_data }) = self.receiver.recv() {
131 self.advance_pruning_point_and_candidate_if_possible(sink_ghostdag_data);
132 }
133 }
134
135 fn recover_pruning_workflows_if_needed(&self) {
136 let pruning_point_read = self.pruning_point_store.read();
137 let pruning_point = pruning_point_read.pruning_point().unwrap();
138 let history_root = pruning_point_read.history_root().unwrap_option();
139 let pruning_utxoset_position = self.pruning_utxoset_stores.read().utxoset_position().unwrap_option();
140 drop(pruning_point_read);
141
142 debug!(
143 "[PRUNING PROCESSOR] recovery check: current pruning point: {}, history root: {:?}, pruning utxoset position: {:?}",
144 pruning_point, history_root, pruning_utxoset_position
145 );
146
147 if let Some(pruning_utxoset_position) = pruning_utxoset_position {
148 if pruning_utxoset_position != pruning_point {
150 info!("Recovering pruning utxo-set from {} to the pruning point {}", pruning_utxoset_position, pruning_point);
151 if !self.advance_pruning_utxoset(pruning_utxoset_position, pruning_point) {
152 info!("Interrupted while advancing the pruning point UTXO set: Process is exiting");
153 return;
154 }
155 }
156 }
157
158 if let Some(history_root) = history_root {
159 if history_root != pruning_point {
162 self.prune(pruning_point);
163 }
164 }
165
166 }
168
169 fn advance_pruning_point_and_candidate_if_possible(&self, sink_ghostdag_data: CompactGhostdagData) {
170 let pruning_point_read = self.pruning_point_store.upgradable_read();
171 let current_pruning_info = pruning_point_read.get().unwrap();
172 let (new_pruning_points, new_candidate) = self.pruning_point_manager.next_pruning_points_and_candidate_by_ghostdag_data(
173 sink_ghostdag_data,
174 None,
175 current_pruning_info.candidate,
176 current_pruning_info.pruning_point,
177 );
178
179 if !new_pruning_points.is_empty() {
180 let mut batch = WriteBatch::default();
182 let mut pruning_point_write = RwLockUpgradableReadGuard::upgrade(pruning_point_read);
183 for (i, past_pp) in new_pruning_points.iter().copied().enumerate() {
184 self.past_pruning_points_store.insert_batch(&mut batch, current_pruning_info.index + i as u64 + 1, past_pp).unwrap();
185 }
186 let new_pp_index = current_pruning_info.index + new_pruning_points.len() as u64;
187 let new_pruning_point = *new_pruning_points.last().unwrap();
188 pruning_point_write.set_batch(&mut batch, new_pruning_point, new_candidate, new_pp_index).unwrap();
189 self.db.write(batch).unwrap();
190 drop(pruning_point_write);
191
192 info!("Periodic pruning point movement: advancing from {} to {}", current_pruning_info.pruning_point, new_pruning_point);
194
195 if !self.advance_pruning_utxoset(current_pruning_info.pruning_point, new_pruning_point) {
197 info!("Interrupted while advancing the pruning point UTXO set: Process is exiting");
198 return;
199 }
200 info!("Updated the pruning point UTXO set");
201
202 self.prune(new_pruning_point);
204 } else if new_candidate != current_pruning_info.candidate {
205 let mut pruning_point_write = RwLockUpgradableReadGuard::upgrade(pruning_point_read);
206 pruning_point_write.set(current_pruning_info.pruning_point, new_candidate, current_pruning_info.index).unwrap();
207 }
208 }
209
210 fn advance_pruning_utxoset(&self, utxoset_position: Hash, new_pruning_point: Hash) -> bool {
211 let mut pruning_utxoset_write = self.pruning_utxoset_stores.write();
212 for chain_block in self.reachability_service.forward_chain_iterator(utxoset_position, new_pruning_point, true).skip(1) {
213 if self.is_consensus_exiting.load(Ordering::Relaxed) {
214 return false;
215 }
216 let utxo_diff = self.utxo_diffs_store.get(chain_block).expect("chain blocks have utxo state");
217 let mut batch = WriteBatch::default();
218 pruning_utxoset_write.utxo_set.write_diff_batch(&mut batch, utxo_diff.as_ref()).unwrap();
219 pruning_utxoset_write.set_utxoset_position(&mut batch, chain_block).unwrap();
220 self.db.write(batch).unwrap();
221 }
222 drop(pruning_utxoset_write);
223
224 if self.config.enable_sanity_checks {
225 info!("Performing a sanity check that the new UTXO set has the expected UTXO commitment");
226 self.assert_utxo_commitment(new_pruning_point);
227 }
228 true
229 }
230
231 fn assert_utxo_commitment(&self, pruning_point: Hash) {
232 info!("Verifying the new pruning point UTXO commitment (sanity test)");
233 let commitment = self.headers_store.get_header(pruning_point).unwrap().utxo_commitment;
234 let mut multiset = MuHash::new();
235 let pruning_utxoset_read = self.pruning_utxoset_stores.read();
236 for (outpoint, entry) in pruning_utxoset_read.utxo_set.iterator().map(|r| r.unwrap()) {
237 multiset.add_utxo(&outpoint, &entry);
238 }
239 assert_eq!(multiset.finalize(), commitment, "Updated pruning point utxo set does not match the header utxo commitment");
240 info!("Pruning point UTXO commitment was verified correctly (sanity test)");
241 }
242
243 fn prune(&self, new_pruning_point: Hash) {
244 if self.config.is_archival {
245 warn!("The node is configured as an archival node -- avoiding data pruning. Note this might lead to heavy disk usage.");
246 return;
247 }
248
249 info!("Header and Block pruning: preparing proof and anticone data...");
250
251 let proof = self.pruning_proof_manager.get_pruning_point_proof();
252 let data = self
253 .pruning_proof_manager
254 .get_pruning_point_anticone_and_trusted_data()
255 .expect("insufficient depth error is unexpected here");
256
257 let genesis = self.past_pruning_points_store.get(0).unwrap();
258
259 assert_eq!(new_pruning_point, proof[0].last().unwrap().hash);
260 assert_eq!(new_pruning_point, data.anticone[0]);
261 assert_eq!(genesis, self.config.genesis.hash);
262 assert_eq!(genesis, proof.last().unwrap().last().unwrap().hash);
263
264 let keep_blocks: BlockHashSet = data.anticone.iter().copied().collect();
267 let mut keep_relations: BlockHashMap<BlockLevel> = std::iter::empty()
268 .chain(data.anticone.iter().copied())
269 .chain(data.daa_window_blocks.iter().map(|th| th.header.hash))
270 .chain(data.ghostdag_blocks.iter().map(|gd| gd.hash))
271 .chain(proof[0].iter().map(|h| h.hash))
272 .map(|h| (h, 0)) .collect();
274 let keep_headers: BlockHashSet = self.past_pruning_points();
275
276 info!("Header and Block pruning: waiting for consensus write permissions...");
277
278 let mut prune_guard = self.pruning_lock.blocking_write();
279
280 info!("Starting Header and Block pruning...");
281
282 {
283 let mut counter = 0;
284 let mut batch = WriteBatch::default();
285 for kept in keep_relations.keys().copied() {
287 let Some(ghostdag) = self.ghostdag_primary_store.get_data(kept).unwrap_option() else {
288 continue;
289 };
290 if ghostdag.unordered_mergeset().any(|h| !keep_relations.contains_key(&h)) {
291 let mut mutable_ghostdag: ExternalGhostdagData = ghostdag.as_ref().into();
292 mutable_ghostdag.mergeset_blues.retain(|h| keep_relations.contains_key(h));
293 mutable_ghostdag.mergeset_reds.retain(|h| keep_relations.contains_key(h));
294 mutable_ghostdag.blues_anticone_sizes.retain(|k, _| keep_relations.contains_key(k));
295 if !keep_relations.contains_key(&mutable_ghostdag.selected_parent) {
296 mutable_ghostdag.selected_parent = ORIGIN;
297 }
298 counter += 1;
299 self.ghostdag_primary_store.update_batch(&mut batch, kept, &Arc::new(mutable_ghostdag.into())).unwrap();
300 }
301 }
302 self.db.write(batch).unwrap();
303 info!("Header and Block pruning: updated ghostdag data for {} blocks", counter);
304 }
305
306 drop(prune_guard);
308
309 for (level, level_proof) in proof.iter().enumerate().skip(1) {
311 let level = level as BlockLevel;
312 let roots_parents_at_level = data
327 .anticone
328 .iter()
329 .copied()
330 .map(|hash| self.headers_store.get_header_with_block_level(hash).expect("pruning point anticone is not pruned"))
331 .filter(|root| level > root.block_level) .flat_map(|root| self.parents_manager.parents_at_level(&root.header, level).iter().copied().collect_vec());
333 for hash in level_proof.iter().map(|header| header.hash).chain(roots_parents_at_level) {
334 if let Vacant(e) = keep_relations.entry(hash) {
335 e.insert(level);
337 }
338 }
339 }
340
341 prune_guard = self.pruning_lock.blocking_write();
342 let mut lock_acquire_time = Instant::now();
343 let mut reachability_read = self.reachability_store.upgradable_read();
344
345 {
346 let mut batch = WriteBatch::default();
348
349 let mut tips_write = self.body_tips_store.write();
353 let pruned_tips = tips_write
354 .get()
355 .unwrap()
356 .read()
357 .iter()
358 .copied()
359 .filter(|&h| !reachability_read.is_dag_ancestor_of_result(new_pruning_point, h).unwrap())
360 .collect_vec();
361 tips_write.prune_tips_with_writer(BatchDbWriter::new(&mut batch), &pruned_tips).unwrap();
362 if !pruned_tips.is_empty() {
363 info!(
364 "Header and Block pruning: pruned {} tips: {}...{}",
365 pruned_tips.len(),
366 pruned_tips.iter().take(5.min((pruned_tips.len() + 1) / 2)).reusable_format(", "),
367 pruned_tips.iter().rev().take(5.min(pruned_tips.len() / 2)).reusable_format(", ")
368 )
369 }
370
371 let mut selected_chain_write = self.selected_chain_store.write();
373 selected_chain_write.prune_below_pruning_point(BatchDbWriter::new(&mut batch), new_pruning_point).unwrap();
374
375 self.db.write(batch).unwrap();
377
378 drop(selected_chain_write);
380 drop(tips_write);
381 }
382
383 let mut queue = VecDeque::<Hash>::from_iter(reachability_read.get_children(ORIGIN).unwrap().iter().copied());
386 let (mut counter, mut traversed) = (0, 0);
387 info!("Header and Block pruning: starting traversal from: {} (genesis: {})", queue.iter().reusable_format(", "), genesis);
388 while let Some(current) = queue.pop_front() {
389 if reachability_read.is_dag_ancestor_of_result(new_pruning_point, current).unwrap() {
390 continue;
391 }
392 traversed += 1;
393 queue.extend(reachability_read.get_children(current).unwrap().iter());
395
396 if lock_acquire_time.elapsed() > Duration::from_millis(5) {
398 drop(reachability_read);
399 if self.is_consensus_exiting.load(Ordering::Relaxed) {
401 drop(prune_guard);
402 info!("Header and Block pruning interrupted: Process is exiting");
403 return;
404 }
405 prune_guard.blocking_yield();
406 lock_acquire_time = Instant::now();
407 reachability_read = self.reachability_store.upgradable_read();
408 }
409
410 if traversed % 1000 == 0 {
411 info!("Header and Block pruning: traversed: {}, pruned {}...", traversed, counter);
412 }
413
414 self.block_window_cache_for_difficulty.remove(¤t);
416 self.block_window_cache_for_past_median_time.remove(¤t);
417
418 if !keep_blocks.contains(¤t) {
419 let mut batch = WriteBatch::default();
420 let mut level_relations_write = self.relations_stores.write();
421 let mut reachability_relations_write = self.reachability_relations_store.write();
422 let mut staging_relations = StagingRelationsStore::new(&mut reachability_relations_write);
423 let mut staging_reachability = StagingReachabilityStore::new(reachability_read);
424 let mut statuses_write = self.statuses_store.write();
425
426 self.utxo_multisets_store.delete_batch(&mut batch, current).unwrap();
428 self.utxo_diffs_store.delete_batch(&mut batch, current).unwrap();
429 self.acceptance_data_store.delete_batch(&mut batch, current).unwrap();
430 self.block_transactions_store.delete_batch(&mut batch, current).unwrap();
431
432 if let Some(&affiliated_proof_level) = keep_relations.get(¤t) {
433 if statuses_write.get(current).unwrap_option().is_some_and(|s| s.is_valid()) {
434 statuses_write.set_batch(&mut batch, current, StatusHeaderOnly).unwrap();
439 }
440
441 for lower_level in 0..affiliated_proof_level as usize {
444 let mut staging_level_relations = StagingRelationsStore::new(&mut level_relations_write[lower_level]);
445 relations::delete_level_relations(MemoryWriter, &mut staging_level_relations, current).unwrap_option();
446 staging_level_relations.commit(&mut batch).unwrap();
447 self.ghostdag_stores[lower_level].delete_batch(&mut batch, current).unwrap_option();
448 }
449 } else {
450 counter += 1;
452 let mergeset = relations::delete_reachability_relations(
454 MemoryWriter, &mut staging_relations,
456 &staging_reachability,
457 current,
458 );
459 reachability::delete_block(&mut staging_reachability, current, &mut mergeset.iter().copied()).unwrap();
460 let block_level = self.headers_store.get_header_with_block_level(current).unwrap().block_level;
462 (0..=block_level as usize).for_each(|level| {
463 let mut staging_level_relations = StagingRelationsStore::new(&mut level_relations_write[level]);
464 relations::delete_level_relations(MemoryWriter, &mut staging_level_relations, current).unwrap_option();
465 staging_level_relations.commit(&mut batch).unwrap();
466 self.ghostdag_stores[level].delete_batch(&mut batch, current).unwrap_option();
467 });
468
469 self.daa_excluded_store.delete_batch(&mut batch, current).unwrap();
471 self.depth_store.delete_batch(&mut batch, current).unwrap();
472 statuses_write.delete_batch(&mut batch, current).unwrap();
474
475 if !keep_headers.contains(¤t) {
476 self.headers_store.delete_batch(&mut batch, current).unwrap();
478 }
479 }
480
481 let reachability_write = staging_reachability.commit(&mut batch).unwrap();
482 staging_relations.commit(&mut batch).unwrap();
483
484 self.db.write(batch).unwrap();
486
487 drop(reachability_write);
489 drop(statuses_write);
490 drop(reachability_relations_write);
491 drop(level_relations_write);
492
493 reachability_read = self.reachability_store.upgradable_read();
494 }
495 }
496
497 drop(reachability_read);
498 drop(prune_guard);
499
500 info!("Header and Block pruning completed: traversed: {}, pruned {}", traversed, counter);
501 info!(
502 "Header and Block pruning stats: proof size: {}, pruning point and anticone: {}, unique headers in proof and windows: {}, pruning points in history: {}",
503 proof.iter().map(|l| l.len()).sum::<usize>(),
504 keep_blocks.len(),
505 keep_relations.len(),
506 keep_headers.len()
507 );
508
509 if self.config.enable_sanity_checks {
510 self.assert_proof_rebuilding(proof, new_pruning_point);
511 self.assert_data_rebuilding(data, new_pruning_point);
512 }
513
514 {
515 let mut pruning_point_write = self.pruning_point_store.write();
517 let mut batch = WriteBatch::default();
518 pruning_point_write.set_history_root(&mut batch, new_pruning_point).unwrap();
519 self.db.write(batch).unwrap();
520 drop(pruning_point_write);
521 }
522 }
523
524 fn past_pruning_points(&self) -> BlockHashSet {
525 (0..self.pruning_point_store.read().get().unwrap().index)
526 .map(|index| self.past_pruning_points_store.get(index).unwrap())
527 .collect()
528 }
529
530 fn assert_proof_rebuilding(&self, ref_proof: Arc<PruningPointProof>, new_pruning_point: Hash) {
531 info!("Rebuilding the pruning proof after pruning data (sanity test)");
532 let proof_hashes = ref_proof.iter().flatten().map(|h| h.hash).collect::<Vec<_>>();
533 let built_proof = self.pruning_proof_manager.build_pruning_point_proof(new_pruning_point);
534 let built_proof_hashes = built_proof.iter().flatten().map(|h| h.hash).collect::<Vec<_>>();
535 assert_eq!(proof_hashes.len(), built_proof_hashes.len(), "Rebuilt proof does not match the expected reference");
536 for (i, (a, b)) in proof_hashes.into_iter().zip(built_proof_hashes).enumerate() {
537 if a != b {
538 panic!("Proof built following pruning does not match the previous proof: built[{}]={}, prev[{}]={}", i, b, i, a);
539 }
540 }
541 info!("Proof was rebuilt successfully following pruning");
542 }
543
544 fn assert_data_rebuilding(&self, ref_data: Arc<PruningPointTrustedData>, new_pruning_point: Hash) {
545 info!("Rebuilding pruning point trusted data (sanity test)");
546 let virtual_state = self.lkg_virtual_state.load();
547 let built_data = self
548 .pruning_proof_manager
549 .calculate_pruning_point_anticone_and_trusted_data(new_pruning_point, virtual_state.parents.iter().copied());
550 assert_eq!(
551 ref_data.anticone.iter().copied().collect::<BlockHashSet>(),
552 built_data.anticone.iter().copied().collect::<BlockHashSet>()
553 );
554 assert_eq!(
555 ref_data.daa_window_blocks.iter().map(|th| th.header.hash).collect::<BlockHashSet>(),
556 built_data.daa_window_blocks.iter().map(|th| th.header.hash).collect::<BlockHashSet>()
557 );
558 assert_eq!(
559 ref_data.ghostdag_blocks.iter().map(|gd| gd.hash).collect::<BlockHashSet>(),
560 built_data.ghostdag_blocks.iter().map(|gd| gd.hash).collect::<BlockHashSet>()
561 );
562 info!("Trusted data was rebuilt successfully following pruning");
563 }
564}