1use crate::{
2 consensus::{
3 services::{
4 ConsensusServices, DbBlockDepthManager, DbDagTraversalManager, DbGhostdagManager, DbParentsManager, DbPruningPointManager,
5 DbWindowManager,
6 },
7 storage::ConsensusStorage,
8 },
9 constants::BLOCK_VERSION,
10 errors::RuleError,
11 model::{
12 services::{
13 reachability::{MTReachabilityService, ReachabilityService},
14 relations::MTRelationsService,
15 },
16 stores::{
17 acceptance_data::{AcceptanceDataStoreReader, DbAcceptanceDataStore},
18 block_transactions::{BlockTransactionsStoreReader, DbBlockTransactionsStore},
19 daa::DbDaaStore,
20 depth::{DbDepthStore, DepthStoreReader},
21 ghostdag::{DbGhostdagStore, GhostdagData, GhostdagStoreReader},
22 headers::{DbHeadersStore, HeaderStoreReader},
23 past_pruning_points::DbPastPruningPointsStore,
24 pruning::{DbPruningStore, PruningStoreReader},
25 pruning_utxoset::PruningUtxosetStores,
26 reachability::DbReachabilityStore,
27 relations::{DbRelationsStore, RelationsStoreReader},
28 selected_chain::{DbSelectedChainStore, SelectedChainStore},
29 statuses::{DbStatusesStore, StatusesStore, StatusesStoreBatchExtensions, StatusesStoreReader},
30 tips::{DbTipsStore, TipsStoreReader},
31 utxo_diffs::{DbUtxoDiffsStore, UtxoDiffsStoreReader},
32 utxo_multisets::{DbUtxoMultisetsStore, UtxoMultisetsStoreReader},
33 virtual_state::{LkgVirtualState, VirtualState, VirtualStateStoreReader, VirtualStores},
34 DB,
35 },
36 },
37 params::Params,
38 pipeline::{
39 deps_manager::VirtualStateProcessingMessage, pruning_processor::processor::PruningProcessingMessage,
40 virtual_processor::utxo_validation::UtxoProcessingContext, ProcessingCounters,
41 },
42 processes::{
43 coinbase::CoinbaseManager,
44 ghostdag::ordering::SortableBlock,
45 transaction_validator::{errors::TxResult, transaction_validator_populated::TxValidationFlags, TransactionValidator},
46 window::WindowManager,
47 },
48};
49use kaspa_consensus_core::{
50 acceptance_data::AcceptanceData,
51 api::args::{TransactionValidationArgs, TransactionValidationBatchArgs},
52 block::{BlockTemplate, MutableBlock, TemplateBuildMode, TemplateTransactionSelector},
53 blockstatus::BlockStatus::{StatusDisqualifiedFromChain, StatusUTXOValid},
54 coinbase::MinerData,
55 config::genesis::GenesisBlock,
56 header::Header,
57 merkle::calc_hash_merkle_root,
58 pruning::PruningPointsList,
59 tx::{MutableTransaction, Transaction},
60 utxo::{
61 utxo_diff::UtxoDiff,
62 utxo_view::{UtxoView, UtxoViewComposition},
63 },
64 BlockHashSet, ChainPath,
65};
66use kaspa_consensus_notify::{
67 notification::{
68 NewBlockTemplateNotification, Notification, SinkBlueScoreChangedNotification, UtxosChangedNotification,
69 VirtualChainChangedNotification, VirtualDaaScoreChangedNotification,
70 },
71 root::ConsensusNotificationRoot,
72};
73use kaspa_consensusmanager::SessionLock;
74use kaspa_core::{debug, info, time::unix_now, trace, warn};
75use kaspa_database::prelude::{StoreError, StoreResultEmptyTuple, StoreResultExtensions};
76use kaspa_hashes::Hash;
77use kaspa_muhash::MuHash;
78use kaspa_notify::{events::EventType, notifier::Notify};
79
80use super::errors::{PruningImportError, PruningImportResult};
81use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
82use itertools::Itertools;
83use kaspa_consensus_core::tx::ValidatedTransaction;
84use kaspa_utils::binary_heap::BinaryHeapExtensions;
85use parking_lot::{RwLock, RwLockUpgradableReadGuard};
86use rand::{seq::SliceRandom, Rng};
87use rayon::{
88 prelude::{IntoParallelRefIterator, IntoParallelRefMutIterator, ParallelIterator},
89 ThreadPool,
90};
91use rocksdb::WriteBatch;
92use std::{
93 cmp::min,
94 collections::{BinaryHeap, HashMap, VecDeque},
95 ops::Deref,
96 sync::{atomic::Ordering, Arc},
97};
98
99pub struct VirtualStateProcessor {
100 receiver: CrossbeamReceiver<VirtualStateProcessingMessage>,
102 pruning_sender: CrossbeamSender<PruningProcessingMessage>,
103 pruning_receiver: CrossbeamReceiver<PruningProcessingMessage>,
104
105 pub(super) thread_pool: Arc<ThreadPool>,
107
108 db: Arc<DB>,
110
111 pub(super) genesis: GenesisBlock,
113 pub(super) max_block_parents: u8,
114 pub(super) mergeset_size_limit: u64,
115 pub(super) pruning_depth: u64,
116
117 pub(super) statuses_store: Arc<RwLock<DbStatusesStore>>,
119 pub(super) ghostdag_primary_store: Arc<DbGhostdagStore>,
120 pub(super) headers_store: Arc<DbHeadersStore>,
121 pub(super) daa_excluded_store: Arc<DbDaaStore>,
122 pub(super) block_transactions_store: Arc<DbBlockTransactionsStore>,
123 pub(super) pruning_point_store: Arc<RwLock<DbPruningStore>>,
124 pub(super) past_pruning_points_store: Arc<DbPastPruningPointsStore>,
125 pub(super) body_tips_store: Arc<RwLock<DbTipsStore>>,
126 pub(super) depth_store: Arc<DbDepthStore>,
127 pub(super) selected_chain_store: Arc<RwLock<DbSelectedChainStore>>,
128
129 pub(super) utxo_diffs_store: Arc<DbUtxoDiffsStore>,
131 pub(super) utxo_multisets_store: Arc<DbUtxoMultisetsStore>,
132 pub(super) acceptance_data_store: Arc<DbAcceptanceDataStore>,
133 pub(super) virtual_stores: Arc<RwLock<VirtualStores>>,
134 pub(super) pruning_utxoset_stores: Arc<RwLock<PruningUtxosetStores>>,
135
136 pub lkg_virtual_state: LkgVirtualState,
139
140 pub(super) ghostdag_manager: DbGhostdagManager,
142 pub(super) reachability_service: MTReachabilityService<DbReachabilityStore>,
143 pub(super) relations_service: MTRelationsService<DbRelationsStore>,
144 pub(super) dag_traversal_manager: DbDagTraversalManager,
145 pub(super) window_manager: DbWindowManager,
146 pub(super) coinbase_manager: CoinbaseManager,
147 pub(super) transaction_validator: TransactionValidator,
148 pub(super) pruning_point_manager: DbPruningPointManager,
149 pub(super) parents_manager: DbParentsManager,
150 pub(super) depth_manager: DbBlockDepthManager,
151
152 pruning_lock: SessionLock,
154
155 notification_root: Arc<ConsensusNotificationRoot>,
157
158 counters: Arc<ProcessingCounters>,
160
161 pub(crate) storage_mass_activation_daa_score: u64,
163}
164
165impl VirtualStateProcessor {
166 #[allow(clippy::too_many_arguments)]
167 pub fn new(
168 receiver: CrossbeamReceiver<VirtualStateProcessingMessage>,
169 pruning_sender: CrossbeamSender<PruningProcessingMessage>,
170 pruning_receiver: CrossbeamReceiver<PruningProcessingMessage>,
171 thread_pool: Arc<ThreadPool>,
172 params: &Params,
173 db: Arc<DB>,
174 storage: &Arc<ConsensusStorage>,
175 services: &Arc<ConsensusServices>,
176 pruning_lock: SessionLock,
177 notification_root: Arc<ConsensusNotificationRoot>,
178 counters: Arc<ProcessingCounters>,
179 ) -> Self {
180 Self {
181 receiver,
182 pruning_sender,
183 pruning_receiver,
184 thread_pool,
185
186 genesis: params.genesis.clone(),
187 max_block_parents: params.max_block_parents,
188 mergeset_size_limit: params.mergeset_size_limit,
189 pruning_depth: params.pruning_depth,
190
191 db,
192 statuses_store: storage.statuses_store.clone(),
193 headers_store: storage.headers_store.clone(),
194 ghostdag_primary_store: storage.ghostdag_primary_store.clone(),
195 daa_excluded_store: storage.daa_excluded_store.clone(),
196 block_transactions_store: storage.block_transactions_store.clone(),
197 pruning_point_store: storage.pruning_point_store.clone(),
198 past_pruning_points_store: storage.past_pruning_points_store.clone(),
199 body_tips_store: storage.body_tips_store.clone(),
200 depth_store: storage.depth_store.clone(),
201 selected_chain_store: storage.selected_chain_store.clone(),
202 utxo_diffs_store: storage.utxo_diffs_store.clone(),
203 utxo_multisets_store: storage.utxo_multisets_store.clone(),
204 acceptance_data_store: storage.acceptance_data_store.clone(),
205 virtual_stores: storage.virtual_stores.clone(),
206 pruning_utxoset_stores: storage.pruning_utxoset_stores.clone(),
207 lkg_virtual_state: storage.lkg_virtual_state.clone(),
208
209 ghostdag_manager: services.ghostdag_primary_manager.clone(),
210 reachability_service: services.reachability_service.clone(),
211 relations_service: services.relations_service.clone(),
212 dag_traversal_manager: services.dag_traversal_manager.clone(),
213 window_manager: services.window_manager.clone(),
214 coinbase_manager: services.coinbase_manager.clone(),
215 transaction_validator: services.transaction_validator.clone(),
216 pruning_point_manager: services.pruning_point_manager.clone(),
217 parents_manager: services.parents_manager.clone(),
218 depth_manager: services.depth_manager.clone(),
219
220 pruning_lock,
221 notification_root,
222 counters,
223 storage_mass_activation_daa_score: params.storage_mass_activation_daa_score,
224 }
225 }
226
227 pub fn worker(self: &Arc<Self>) {
228 'outer: while let Ok(msg) = self.receiver.recv() {
229 if msg.is_exit_message() {
230 break;
231 }
232
233 let messages: Vec<VirtualStateProcessingMessage> = std::iter::once(msg).chain(self.receiver.try_iter()).collect();
238 trace!("virtual processor received {} tasks", messages.len());
239
240 self.resolve_virtual();
241
242 let statuses_read = self.statuses_store.read();
243 for msg in messages {
244 match msg {
245 VirtualStateProcessingMessage::Exit => break 'outer,
246 VirtualStateProcessingMessage::Process(task, virtual_state_result_transmitter) => {
247 let _ = virtual_state_result_transmitter.send(Ok(statuses_read.get(task.block().hash()).unwrap()));
249 }
250 };
251 }
252 }
253
254 self.pruning_sender.send(PruningProcessingMessage::Exit).unwrap();
256 }
257
258 fn resolve_virtual(self: &Arc<Self>) {
259 let pruning_point = self.pruning_point_store.read().pruning_point().unwrap();
260 let virtual_read = self.virtual_stores.upgradable_read();
261 let prev_state = virtual_read.state.get().unwrap();
262 let finality_point = self.virtual_finality_point(&prev_state.ghostdag_data, pruning_point);
263
264 let prune_guard = self.pruning_lock.blocking_read();
273 let tips = self
274 .body_tips_store
275 .read()
276 .get()
277 .unwrap()
278 .read()
279 .iter()
280 .copied()
281 .filter(|&h| self.reachability_service.is_dag_ancestor_of(finality_point, h))
282 .collect_vec();
283 drop(prune_guard);
284 let prev_sink = prev_state.ghostdag_data.selected_parent;
285 let mut accumulated_diff = prev_state.utxo_diff.clone().to_reversed();
286
287 let (new_sink, virtual_parent_candidates) =
288 self.sink_search_algorithm(&virtual_read, &mut accumulated_diff, prev_sink, tips, finality_point, pruning_point);
289 let (virtual_parents, virtual_ghostdag_data) = self.pick_virtual_parents(new_sink, virtual_parent_candidates, pruning_point);
290 assert_eq!(virtual_ghostdag_data.selected_parent, new_sink);
291
292 let sink_multiset = self.utxo_multisets_store.get(new_sink).unwrap();
293 let chain_path = self.dag_traversal_manager.calculate_chain_path(prev_sink, new_sink, None);
294 let new_virtual_state = self
295 .calculate_and_commit_virtual_state(
296 virtual_read,
297 virtual_parents,
298 virtual_ghostdag_data,
299 sink_multiset,
300 &mut accumulated_diff,
301 &chain_path,
302 )
303 .expect("all possible rule errors are unexpected here");
304
305 let sink_ghostdag_data = self.ghostdag_primary_store.get_compact_data(new_sink).unwrap();
307 let _consume = self.pruning_receiver.try_iter().count();
310 self.pruning_sender.send(PruningProcessingMessage::Process { sink_ghostdag_data }).unwrap();
311
312 let accumulated_diff = Arc::new(accumulated_diff);
314 let virtual_parents = Arc::new(new_virtual_state.parents.clone());
315 self.notification_root
316 .notify(Notification::NewBlockTemplate(NewBlockTemplateNotification {}))
317 .expect("expecting an open unbounded channel");
318 self.notification_root
319 .notify(Notification::UtxosChanged(UtxosChangedNotification::new(accumulated_diff, virtual_parents)))
320 .expect("expecting an open unbounded channel");
321 self.notification_root
322 .notify(Notification::SinkBlueScoreChanged(SinkBlueScoreChangedNotification::new(sink_ghostdag_data.blue_score)))
323 .expect("expecting an open unbounded channel");
324 self.notification_root
325 .notify(Notification::VirtualDaaScoreChanged(VirtualDaaScoreChangedNotification::new(new_virtual_state.daa_score)))
326 .expect("expecting an open unbounded channel");
327 if self.notification_root.has_subscription(EventType::VirtualChainChanged) {
328 let added_chain_blocks_acceptance_data =
330 chain_path.added.iter().copied().map(|added| self.acceptance_data_store.get(added).unwrap()).collect_vec();
331 self.notification_root
332 .notify(Notification::VirtualChainChanged(VirtualChainChangedNotification::new(
333 chain_path.added.into(),
334 chain_path.removed.into(),
335 Arc::new(added_chain_blocks_acceptance_data),
336 )))
337 .expect("expecting an open unbounded channel");
338 }
339 }
340
341 pub(crate) fn virtual_finality_point(&self, virtual_ghostdag_data: &GhostdagData, pruning_point: Hash) -> Hash {
342 let finality_point = self.depth_manager.calc_finality_point(virtual_ghostdag_data, pruning_point);
343 if self.reachability_service.is_chain_ancestor_of(pruning_point, finality_point) {
344 finality_point
345 } else {
346 pruning_point
349 }
350 }
351
352 fn calculate_utxo_state_relatively(&self, stores: &VirtualStores, diff: &mut UtxoDiff, from: Hash, to: Hash) -> Hash {
358 if self.statuses_store.read().get(to).unwrap() == StatusDisqualifiedFromChain {
360 return from;
361 }
362
363 let mut split_point: Option<Hash> = None;
364
365 for current in self.reachability_service.default_backward_chain_iterator(from) {
367 if self.reachability_service.is_chain_ancestor_of(current, to) {
368 split_point = Some(current);
369 break;
370 }
371
372 let mergeset_diff = self.utxo_diffs_store.get(current).unwrap();
373 diff.with_diff_in_place(&mergeset_diff.as_reversed()).unwrap();
375 }
376
377 let split_point = split_point.expect("chain iterator was expected to reach the reorg split point");
378 debug!("VIRTUAL PROCESSOR, found split point: {split_point}");
379
380 let mut diff_point = split_point;
383
384 let mut chain_block_counter = 0;
386 let mut chain_disqualified_counter = 0;
387 for (selected_parent, current) in self.reachability_service.forward_chain_iterator(split_point, to, true).tuple_windows() {
388 if selected_parent != diff_point {
389 self.statuses_store.write().set(current, StatusDisqualifiedFromChain).unwrap();
391 chain_disqualified_counter += 1;
392 continue;
393 }
394
395 match self.utxo_diffs_store.get(current) {
396 Ok(mergeset_diff) => {
397 diff.with_diff_in_place(mergeset_diff.deref()).unwrap();
398 diff_point = current;
399 }
400 Err(StoreError::KeyNotFound(_)) => {
401 if self.statuses_store.read().get(current).unwrap() == StatusDisqualifiedFromChain {
402 continue;
404 }
405
406 let header = self.headers_store.get_header(current).unwrap();
407 let mergeset_data = self.ghostdag_primary_store.get_data(current).unwrap();
408 let pov_daa_score = header.daa_score;
409
410 let selected_parent_multiset_hash = self.utxo_multisets_store.get(selected_parent).unwrap();
411 let selected_parent_utxo_view = (&stores.utxo_set).compose(&*diff);
412
413 let mut ctx = UtxoProcessingContext::new(mergeset_data.into(), selected_parent_multiset_hash);
414
415 self.calculate_utxo_state(&mut ctx, &selected_parent_utxo_view, pov_daa_score);
416 let res = self.verify_expected_utxo_state(&mut ctx, &selected_parent_utxo_view, &header);
417
418 if let Err(rule_error) = res {
419 info!("Block {} is disqualified from virtual chain: {}", current, rule_error);
420 self.statuses_store.write().set(current, StatusDisqualifiedFromChain).unwrap();
421 chain_disqualified_counter += 1;
422 } else {
423 debug!("VIRTUAL PROCESSOR, UTXO validated for {current}");
424
425 diff.with_diff_in_place(&ctx.mergeset_diff).unwrap();
427 diff_point = current;
429 self.commit_utxo_state(current, ctx.mergeset_diff, ctx.multiset_hash, ctx.mergeset_acceptance_data);
431 chain_block_counter += 1;
433 }
434 }
435 Err(err) => panic!("unexpected error {err}"),
436 }
437 }
438 self.counters.chain_block_counts.fetch_add(chain_block_counter, Ordering::Relaxed);
440 if chain_disqualified_counter > 0 {
441 self.counters.chain_disqualified_counts.fetch_add(chain_disqualified_counter, Ordering::Relaxed);
442 }
443
444 diff_point
445 }
446
447 fn commit_utxo_state(&self, current: Hash, mergeset_diff: UtxoDiff, multiset: MuHash, acceptance_data: AcceptanceData) {
448 let mut batch = WriteBatch::default();
449 self.utxo_diffs_store.insert_batch(&mut batch, current, Arc::new(mergeset_diff)).unwrap();
450 self.utxo_multisets_store.insert_batch(&mut batch, current, multiset).unwrap();
451 self.acceptance_data_store.insert_batch(&mut batch, current, Arc::new(acceptance_data)).unwrap();
452 let write_guard = self.statuses_store.set_batch(&mut batch, current, StatusUTXOValid).unwrap();
453 self.db.write(batch).unwrap();
454 drop(write_guard);
456 }
457
458 fn calculate_and_commit_virtual_state(
459 &self,
460 virtual_read: RwLockUpgradableReadGuard<'_, VirtualStores>,
461 virtual_parents: Vec<Hash>,
462 virtual_ghostdag_data: GhostdagData,
463 selected_parent_multiset: MuHash,
464 accumulated_diff: &mut UtxoDiff,
465 chain_path: &ChainPath,
466 ) -> Result<Arc<VirtualState>, RuleError> {
467 let new_virtual_state = self.calculate_virtual_state(
468 &virtual_read,
469 virtual_parents,
470 virtual_ghostdag_data,
471 selected_parent_multiset,
472 accumulated_diff,
473 )?;
474 self.commit_virtual_state(virtual_read, new_virtual_state.clone(), accumulated_diff, chain_path);
475 Ok(new_virtual_state)
476 }
477
478 pub(super) fn calculate_virtual_state(
479 &self,
480 virtual_stores: &VirtualStores,
481 virtual_parents: Vec<Hash>,
482 virtual_ghostdag_data: GhostdagData,
483 selected_parent_multiset: MuHash,
484 accumulated_diff: &mut UtxoDiff,
485 ) -> Result<Arc<VirtualState>, RuleError> {
486 let selected_parent_utxo_view = (&virtual_stores.utxo_set).compose(&*accumulated_diff);
487 let mut ctx = UtxoProcessingContext::new((&virtual_ghostdag_data).into(), selected_parent_multiset);
488
489 let virtual_daa_window = self.window_manager.block_daa_window(&virtual_ghostdag_data)?;
491 let virtual_bits = self.window_manager.calculate_difficulty_bits(&virtual_ghostdag_data, &virtual_daa_window);
492 let virtual_past_median_time = self.window_manager.calc_past_median_time(&virtual_ghostdag_data)?.0;
493
494 self.calculate_utxo_state(&mut ctx, &selected_parent_utxo_view, virtual_daa_window.daa_score);
496
497 accumulated_diff.with_diff_in_place(&ctx.mergeset_diff).unwrap();
499
500 Ok(Arc::new(VirtualState::new(
502 virtual_parents,
503 virtual_daa_window.daa_score,
504 virtual_bits,
505 virtual_past_median_time,
506 ctx.multiset_hash,
507 ctx.mergeset_diff,
508 ctx.accepted_tx_ids,
509 ctx.mergeset_rewards,
510 virtual_daa_window.mergeset_non_daa,
511 virtual_ghostdag_data,
512 )))
513 }
514
515 fn commit_virtual_state(
516 &self,
517 virtual_read: RwLockUpgradableReadGuard<'_, VirtualStores>,
518 new_virtual_state: Arc<VirtualState>,
519 accumulated_diff: &UtxoDiff,
520 chain_path: &ChainPath,
521 ) {
522 let mut batch = WriteBatch::default();
523 let mut virtual_write = RwLockUpgradableReadGuard::upgrade(virtual_read);
524 let mut selected_chain_write = self.selected_chain_store.write();
525
526 virtual_write.utxo_set.write_diff_batch(&mut batch, accumulated_diff).unwrap();
528
529 virtual_write.state.set_batch(&mut batch, new_virtual_state).unwrap();
531
532 selected_chain_write.apply_changes(&mut batch, chain_path).unwrap();
534
535 self.db.write(batch).unwrap();
537
538 drop(virtual_write);
540 drop(selected_chain_write);
541 }
542
543 fn max_virtual_parent_candidates(&self) -> usize {
547 self.max_block_parents as usize * 3
551 }
552
553 pub(super) fn sink_search_algorithm(
560 &self,
561 stores: &VirtualStores,
562 diff: &mut UtxoDiff,
563 prev_sink: Hash,
564 tips: Vec<Hash>,
565 finality_point: Hash,
566 pruning_point: Hash,
567 ) -> (Hash, VecDeque<Hash>) {
568 let mut heap = tips
571 .into_iter()
572 .map(|block| SortableBlock { hash: block, blue_work: self.ghostdag_primary_store.get_blue_work(block).unwrap() })
573 .collect::<BinaryHeap<_>>();
574
575 let mut diff_point = prev_sink;
577
578 loop {
583 let candidate = heap.pop().expect("valid sink must exist").hash;
584 if self.reachability_service.is_chain_ancestor_of(finality_point, candidate) {
585 diff_point = self.calculate_utxo_state_relatively(stores, diff, diff_point, candidate);
586 if diff_point == candidate {
587 let filtering_root = self.depth_store.merge_depth_root(candidate).unwrap();
594 let filtering_blue_work = self.ghostdag_primary_store.get_blue_work(filtering_root).unwrap_or_default();
595 return (
596 candidate,
597 heap.into_sorted_iter().take_while(|s| s.blue_work >= filtering_blue_work).map(|s| s.hash).collect(),
598 );
599 } else {
600 debug!("Block candidate {} has invalid UTXO state and is ignored from Virtual chain.", candidate)
601 }
602 } else if finality_point != pruning_point {
603 warn!("Finality Violation Detected. Block {} violates finality and is ignored from Virtual chain.", candidate);
605 }
606 let prune_guard = self.pruning_lock.blocking_read();
608 for parent in self.relations_service.get_parents(candidate).unwrap().iter().copied() {
609 if self.reachability_service.is_dag_ancestor_of(finality_point, parent)
610 && !self.reachability_service.is_dag_ancestor_of_any(parent, &mut heap.iter().map(|sb| sb.hash))
611 {
612 heap.push(SortableBlock { hash: parent, blue_work: self.ghostdag_primary_store.get_blue_work(parent).unwrap() });
613 }
614 }
615 drop(prune_guard);
616 }
617 }
618
619 pub(super) fn pick_virtual_parents(
625 &self,
626 selected_parent: Hash,
627 mut candidates: VecDeque<Hash>,
628 pruning_point: Hash,
629 ) -> (Vec<Hash>, GhostdagData) {
630 let _prune_guard = self.pruning_lock.blocking_read();
638 let max_block_parents = self.max_block_parents as usize;
639 let max_candidates = self.max_virtual_parent_candidates();
640
641 if candidates.len() > max_candidates {
643 let slice = candidates.make_contiguous();
645
646 for i in max_block_parents / 2..max_candidates {
651 let j = rand::thread_rng().gen_range(i..slice.len()); slice.swap(i, j);
653 }
654
655 candidates.truncate(max_candidates);
657 } else if candidates.len() > max_block_parents / 2 {
658 candidates.make_contiguous()[max_block_parents / 2..].shuffle(&mut rand::thread_rng());
660 }
661
662 let mut virtual_parents = Vec::with_capacity(min(max_block_parents, candidates.len() + 1));
663 virtual_parents.push(selected_parent);
664 let mut mergeset_size = 1; while let Some(candidate) = candidates.pop_front() {
668 if mergeset_size >= self.mergeset_size_limit || virtual_parents.len() >= max_block_parents {
669 break;
670 }
671 match self.mergeset_increase(&virtual_parents, candidate, self.mergeset_size_limit - mergeset_size) {
672 MergesetIncreaseResult::Accepted { increase_size } => {
673 mergeset_size += increase_size;
674 virtual_parents.push(candidate);
675 }
676 MergesetIncreaseResult::Rejected { new_candidate } => {
677 if self.reachability_service.is_any_dag_ancestor(&mut candidates.iter().copied(), new_candidate) {
679 continue; }
681 candidates.retain(|&h| !self.reachability_service.is_dag_ancestor_of(new_candidate, h));
683 candidates.push_back(new_candidate);
684 }
685 }
686 }
687 assert!(mergeset_size <= self.mergeset_size_limit);
688 assert!(virtual_parents.len() <= max_block_parents);
689 self.remove_bounded_merge_breaking_parents(virtual_parents, pruning_point)
690 }
691
692 fn mergeset_increase(&self, selected_parents: &[Hash], candidate: Hash, budget: u64) -> MergesetIncreaseResult {
693 let candidate_parents = self.relations_service.get_parents(candidate).unwrap();
700 let mut queue: VecDeque<_> = candidate_parents.iter().copied().collect();
701 let mut visited: BlockHashSet = queue.iter().copied().collect();
702 let mut mergeset_increase = 1u64; while let Some(current) = queue.pop_front() {
705 if self.reachability_service.is_dag_ancestor_of_any(current, &mut selected_parents.iter().copied()) {
706 continue;
707 }
708 mergeset_increase += 1;
709 if mergeset_increase > budget {
710 return MergesetIncreaseResult::Rejected { new_candidate: current };
711 }
712
713 let current_parents = self.relations_service.get_parents(current).unwrap();
714 for &parent in current_parents.iter() {
715 if visited.insert(parent) {
716 queue.push_back(parent);
717 }
718 }
719 }
720 MergesetIncreaseResult::Accepted { increase_size: mergeset_increase }
721 }
722
723 fn remove_bounded_merge_breaking_parents(
724 &self,
725 mut virtual_parents: Vec<Hash>,
726 current_pruning_point: Hash,
727 ) -> (Vec<Hash>, GhostdagData) {
728 let mut ghostdag_data = self.ghostdag_manager.ghostdag(&virtual_parents);
729 let merge_depth_root = self.depth_manager.calc_merge_depth_root(&ghostdag_data, current_pruning_point);
730 let mut kosherizing_blues: Option<Vec<Hash>> = None;
731 let mut bad_reds = Vec::new();
732
733 for red in ghostdag_data.mergeset_reds.iter().copied() {
739 if self.reachability_service.is_dag_ancestor_of(merge_depth_root, red) {
740 continue;
741 }
742 if kosherizing_blues.is_none() {
744 kosherizing_blues = Some(self.depth_manager.kosherizing_blues(&ghostdag_data, merge_depth_root).collect());
745 }
746 if !self.reachability_service.is_dag_ancestor_of_any(red, &mut kosherizing_blues.as_ref().unwrap().iter().copied()) {
747 bad_reds.push(red);
748 }
749 }
750
751 if !bad_reds.is_empty() {
752 virtual_parents.retain(|&h| !self.reachability_service.is_any_dag_ancestor(&mut bad_reds.iter().copied(), h));
754 ghostdag_data = self.ghostdag_manager.ghostdag(&virtual_parents);
756 }
757
758 (virtual_parents, ghostdag_data)
759 }
760
761 fn validate_mempool_transaction_impl(
762 &self,
763 mutable_tx: &mut MutableTransaction,
764 virtual_utxo_view: &impl UtxoView,
765 virtual_daa_score: u64,
766 virtual_past_median_time: u64,
767 args: &TransactionValidationArgs,
768 ) -> TxResult<()> {
769 self.transaction_validator.validate_tx_in_isolation(&mutable_tx.tx)?;
770 self.transaction_validator.utxo_free_tx_validation(&mutable_tx.tx, virtual_daa_score, virtual_past_median_time)?;
771 self.validate_mempool_transaction_in_utxo_context(mutable_tx, virtual_utxo_view, virtual_daa_score, args)?;
772 Ok(())
773 }
774
775 pub fn validate_mempool_transaction(&self, mutable_tx: &mut MutableTransaction, args: &TransactionValidationArgs) -> TxResult<()> {
776 let virtual_read = self.virtual_stores.read();
777 let virtual_state = virtual_read.state.get().unwrap();
778 let virtual_utxo_view = &virtual_read.utxo_set;
779 let virtual_daa_score = virtual_state.daa_score;
780 let virtual_past_median_time = virtual_state.past_median_time;
781 self.validate_mempool_transaction_impl(mutable_tx, virtual_utxo_view, virtual_daa_score, virtual_past_median_time, args)
782 }
783
784 pub fn validate_mempool_transactions_in_parallel(
785 &self,
786 mutable_txs: &mut [MutableTransaction],
787 args: &TransactionValidationBatchArgs,
788 ) -> Vec<TxResult<()>> {
789 let virtual_read = self.virtual_stores.read();
790 let virtual_state = virtual_read.state.get().unwrap();
791 let virtual_utxo_view = &virtual_read.utxo_set;
792 let virtual_daa_score = virtual_state.daa_score;
793 let virtual_past_median_time = virtual_state.past_median_time;
794
795 self.thread_pool.install(|| {
796 mutable_txs
797 .par_iter_mut()
798 .map(|mtx| {
799 self.validate_mempool_transaction_impl(
800 mtx,
801 &virtual_utxo_view,
802 virtual_daa_score,
803 virtual_past_median_time,
804 args.get(&mtx.id()),
805 )
806 })
807 .collect::<Vec<TxResult<()>>>()
808 })
809 }
810
811 fn populate_mempool_transaction_impl(
812 &self,
813 mutable_tx: &mut MutableTransaction,
814 virtual_utxo_view: &impl UtxoView,
815 ) -> TxResult<()> {
816 self.populate_mempool_transaction_in_utxo_context(mutable_tx, virtual_utxo_view)?;
817 Ok(())
818 }
819
820 pub fn populate_mempool_transaction(&self, mutable_tx: &mut MutableTransaction) -> TxResult<()> {
821 let virtual_read = self.virtual_stores.read();
822 let virtual_utxo_view = &virtual_read.utxo_set;
823 self.populate_mempool_transaction_impl(mutable_tx, virtual_utxo_view)
824 }
825
826 pub fn populate_mempool_transactions_in_parallel(&self, mutable_txs: &mut [MutableTransaction]) -> Vec<TxResult<()>> {
827 let virtual_read = self.virtual_stores.read();
828 let virtual_utxo_view = &virtual_read.utxo_set;
829 self.thread_pool.install(|| {
830 mutable_txs
831 .par_iter_mut()
832 .map(|mtx| self.populate_mempool_transaction_impl(mtx, &virtual_utxo_view))
833 .collect::<Vec<TxResult<()>>>()
834 })
835 }
836
837 fn validate_block_template_transactions_in_parallel<V: UtxoView + Sync>(
838 &self,
839 txs: &[Transaction],
840 virtual_state: &VirtualState,
841 utxo_view: &V,
842 ) -> Vec<TxResult<u64>> {
843 self.thread_pool
844 .install(|| txs.par_iter().map(|tx| self.validate_block_template_transaction(tx, virtual_state, &utxo_view)).collect())
845 }
846
847 fn validate_block_template_transaction(
848 &self,
849 tx: &Transaction,
850 virtual_state: &VirtualState,
851 utxo_view: &impl UtxoView,
852 ) -> TxResult<u64> {
853 self.transaction_validator.utxo_free_tx_validation(tx, virtual_state.daa_score, virtual_state.past_median_time)?;
857 let ValidatedTransaction { calculated_fee, .. } =
858 self.validate_transaction_in_utxo_context(tx, utxo_view, virtual_state.daa_score, TxValidationFlags::Full)?;
859 Ok(calculated_fee)
860 }
861
862 pub fn build_block_template(
863 &self,
864 miner_data: MinerData,
865 mut tx_selector: Box<dyn TemplateTransactionSelector>,
866 build_mode: TemplateBuildMode,
867 ) -> Result<BlockTemplate, RuleError> {
868 let mut txs = tx_selector.select_transactions();
876 let mut calculated_fees = Vec::with_capacity(txs.len());
877 let virtual_read = self.virtual_stores.read();
878 let virtual_state = virtual_read.state.get().unwrap();
879 let virtual_utxo_view = &virtual_read.utxo_set;
880
881 let mut invalid_transactions = HashMap::new();
882 let results = self.validate_block_template_transactions_in_parallel(&txs, &virtual_state, &virtual_utxo_view);
883 for (tx, res) in txs.iter().zip(results) {
884 match res {
885 Err(e) => {
886 invalid_transactions.insert(tx.id(), e);
887 tx_selector.reject_selection(tx.id());
888 }
889 Ok(fee) => {
890 calculated_fees.push(fee);
891 }
892 }
893 }
894
895 let mut has_rejections = !invalid_transactions.is_empty();
896 if has_rejections {
897 txs.retain(|tx| !invalid_transactions.contains_key(&tx.id()));
898 }
899
900 while has_rejections {
901 has_rejections = false;
902 let next_batch = tx_selector.select_transactions(); let next_batch_results =
904 self.validate_block_template_transactions_in_parallel(&next_batch, &virtual_state, &virtual_utxo_view);
905 for (tx, res) in next_batch.into_iter().zip(next_batch_results) {
906 match res {
907 Err(e) => {
908 invalid_transactions.insert(tx.id(), e);
909 tx_selector.reject_selection(tx.id());
910 has_rejections = true;
911 }
912 Ok(fee) => {
913 txs.push(tx);
914 calculated_fees.push(fee);
915 }
916 }
917 }
918 }
919
920 match (build_mode, tx_selector.is_successful()) {
924 (TemplateBuildMode::Standard, false) => return Err(RuleError::InvalidTransactionsInNewBlock(invalid_transactions)),
925 (TemplateBuildMode::Standard, true) | (TemplateBuildMode::Infallible, _) => {}
926 }
927
928 drop(virtual_read);
930
931 self.build_block_template_from_virtual_state(virtual_state, miner_data, txs, calculated_fees)
933 }
934
935 pub(crate) fn validate_block_template_transactions(
936 &self,
937 txs: &[Transaction],
938 virtual_state: &VirtualState,
939 utxo_view: &impl UtxoView,
940 ) -> Result<(), RuleError> {
941 let mut invalid_transactions = HashMap::new();
943 for tx in txs.iter() {
944 if let Err(e) = self.validate_block_template_transaction(tx, virtual_state, utxo_view) {
945 invalid_transactions.insert(tx.id(), e);
946 }
947 }
948 if !invalid_transactions.is_empty() {
949 Err(RuleError::InvalidTransactionsInNewBlock(invalid_transactions))
950 } else {
951 Ok(())
952 }
953 }
954
955 pub(crate) fn build_block_template_from_virtual_state(
956 &self,
957 virtual_state: Arc<VirtualState>,
958 miner_data: MinerData,
959 mut txs: Vec<Transaction>,
960 calculated_fees: Vec<u64>,
961 ) -> Result<BlockTemplate, RuleError> {
962 let _prune_guard = self.pruning_lock.blocking_read();
965 let pruning_info = self.pruning_point_store.read().get().unwrap();
966 let header_pruning_point =
967 self.pruning_point_manager.expected_header_pruning_point(virtual_state.ghostdag_data.to_compact(), pruning_info);
968 let coinbase = self
969 .coinbase_manager
970 .expected_coinbase_transaction(
971 virtual_state.daa_score,
972 miner_data.clone(),
973 &virtual_state.ghostdag_data,
974 &virtual_state.mergeset_rewards,
975 &virtual_state.mergeset_non_daa,
976 )
977 .unwrap();
978 txs.insert(0, coinbase.tx);
979 let version = BLOCK_VERSION;
980 let parents_by_level = self.parents_manager.calc_block_parents(pruning_info.pruning_point, &virtual_state.parents);
981
982 let storage_mass_activated = virtual_state.daa_score > self.storage_mass_activation_daa_score;
984 let hash_merkle_root = calc_hash_merkle_root(txs.iter(), storage_mass_activated);
985
986 let accepted_id_merkle_root = kaspa_merkle::calc_merkle_root(virtual_state.accepted_tx_ids.iter().copied());
987 let utxo_commitment = virtual_state.multiset.clone().finalize();
988 let min_block_time = virtual_state.past_median_time + 1;
990 let header = Header::new_finalized(
991 version,
992 parents_by_level,
993 hash_merkle_root,
994 accepted_id_merkle_root,
995 utxo_commitment,
996 u64::max(min_block_time, unix_now()),
997 virtual_state.bits,
998 0,
999 virtual_state.daa_score,
1000 virtual_state.ghostdag_data.blue_work,
1001 virtual_state.ghostdag_data.blue_score,
1002 header_pruning_point,
1003 );
1004 let selected_parent_hash = virtual_state.ghostdag_data.selected_parent;
1005 let selected_parent_timestamp = self.headers_store.get_timestamp(selected_parent_hash).unwrap();
1006 let selected_parent_daa_score = self.headers_store.get_daa_score(selected_parent_hash).unwrap();
1007 Ok(BlockTemplate::new(
1008 MutableBlock::new(header, txs),
1009 miner_data,
1010 coinbase.has_red_reward,
1011 selected_parent_timestamp,
1012 selected_parent_daa_score,
1013 selected_parent_hash,
1014 calculated_fees,
1015 ))
1016 }
1017
1018 pub fn init(self: &Arc<Self>) {
1020 let pruning_point_read = self.pruning_point_store.upgradable_read();
1021 if pruning_point_read.pruning_point().unwrap_option().is_none() {
1022 let mut pruning_point_write = RwLockUpgradableReadGuard::upgrade(pruning_point_read);
1023 let mut pruning_utxoset_write = self.pruning_utxoset_stores.write();
1024 let mut batch = WriteBatch::default();
1025 self.past_pruning_points_store.insert_batch(&mut batch, 0, self.genesis.hash).unwrap_or_exists();
1026 pruning_point_write.set_batch(&mut batch, self.genesis.hash, self.genesis.hash, 0).unwrap();
1027 pruning_point_write.set_history_root(&mut batch, self.genesis.hash).unwrap();
1028 pruning_utxoset_write.set_utxoset_position(&mut batch, self.genesis.hash).unwrap();
1029 self.db.write(batch).unwrap();
1030 drop(pruning_point_write);
1031 drop(pruning_utxoset_write);
1032 }
1033 }
1034
1035 pub fn process_genesis(self: &Arc<Self>) {
1038 self.commit_utxo_state(self.genesis.hash, UtxoDiff::default(), MuHash::new(), AcceptanceData::default());
1040
1041 let mut batch = WriteBatch::default();
1043 let mut selected_chain_write = self.selected_chain_store.write();
1044 selected_chain_write.init_with_pruning_point(&mut batch, self.genesis.hash).unwrap();
1045 self.db.write(batch).unwrap();
1046 drop(selected_chain_write);
1047
1048 self.commit_virtual_state(
1050 self.virtual_stores.upgradable_read(),
1051 Arc::new(VirtualState::from_genesis(&self.genesis, self.ghostdag_manager.ghostdag(&[self.genesis.hash]))),
1052 &Default::default(),
1053 &Default::default(),
1054 );
1055 }
1056
1057 pub fn import_pruning_point_utxo_set(
1059 &self,
1060 new_pruning_point: Hash,
1061 mut imported_utxo_multiset: MuHash,
1062 ) -> PruningImportResult<()> {
1063 info!("Importing the UTXO set of the pruning point {}", new_pruning_point);
1064 let new_pruning_point_header = self.headers_store.get_header(new_pruning_point).unwrap();
1065 let imported_utxo_multiset_hash = imported_utxo_multiset.finalize();
1066 if imported_utxo_multiset_hash != new_pruning_point_header.utxo_commitment {
1067 return Err(PruningImportError::ImportedMultisetHashMismatch(
1068 new_pruning_point_header.utxo_commitment,
1069 imported_utxo_multiset_hash,
1070 ));
1071 }
1072
1073 {
1074 let mut batch = WriteBatch::default();
1076 let mut pruning_utxoset_write = self.pruning_utxoset_stores.write();
1077 pruning_utxoset_write.set_utxoset_position(&mut batch, new_pruning_point).unwrap();
1078 self.db.write(batch).unwrap();
1079 drop(pruning_utxoset_write);
1080 }
1081
1082 {
1083 let pruning_utxoset_read = self.pruning_utxoset_stores.read();
1085 let mut virtual_write = self.virtual_stores.write();
1086
1087 virtual_write.utxo_set.clear().unwrap();
1088 for chunk in &pruning_utxoset_read.utxo_set.iterator().map(|iter_result| iter_result.unwrap()).chunks(1000) {
1089 virtual_write.utxo_set.write_from_iterator_without_cache(chunk).unwrap();
1090 }
1091 }
1092
1093 let virtual_read = self.virtual_stores.upgradable_read();
1094
1095 let new_pruning_point_transactions = self.block_transactions_store.get(new_pruning_point).unwrap();
1097 let validated_transactions = self.validate_transactions_in_parallel(
1098 &new_pruning_point_transactions,
1099 &virtual_read.utxo_set,
1100 new_pruning_point_header.daa_score,
1101 TxValidationFlags::Full,
1102 );
1103 if validated_transactions.len() < new_pruning_point_transactions.len() - 1 {
1104 return Err(PruningImportError::NewPruningPointTxErrors);
1106 }
1107
1108 {
1109 let mut batch = WriteBatch::default();
1112 self.utxo_multisets_store.set_batch(&mut batch, new_pruning_point, imported_utxo_multiset.clone()).unwrap();
1113
1114 let statuses_write = self.statuses_store.set_batch(&mut batch, new_pruning_point, StatusUTXOValid).unwrap();
1115 self.db.write(batch).unwrap();
1116 drop(statuses_write);
1117 }
1118
1119 let virtual_parents = vec![new_pruning_point];
1121 let virtual_ghostdag_data = self.ghostdag_manager.ghostdag(&virtual_parents);
1122
1123 self.calculate_and_commit_virtual_state(
1124 virtual_read,
1125 virtual_parents,
1126 virtual_ghostdag_data,
1127 imported_utxo_multiset.clone(),
1128 &mut UtxoDiff::default(),
1129 &ChainPath::default(),
1130 )?;
1131
1132 Ok(())
1133 }
1134
1135 pub fn are_pruning_points_violating_finality(&self, pp_list: PruningPointsList) -> bool {
1136 let current_pp = self.pruning_point_store.read().pruning_point().unwrap();
1146 let vf = self.virtual_finality_point(&self.lkg_virtual_state.load().ghostdag_data, current_pp);
1147 let vff = self.depth_manager.calc_finality_point(&self.ghostdag_primary_store.get_data(vf).unwrap(), current_pp);
1148
1149 let last_known_pp = pp_list.iter().rev().find(|pp| match self.statuses_store.read().get(pp.hash).unwrap_option() {
1150 Some(status) => status.is_valid(),
1151 None => false,
1152 });
1153
1154 if let Some(last_known_pp) = last_known_pp {
1155 !self.reachability_service.is_chain_ancestor_of(vff, last_known_pp.hash)
1156 } else {
1157 true
1160 }
1161 }
1162}
1163
1164enum MergesetIncreaseResult {
1165 Accepted { increase_size: u64 },
1166 Rejected { new_candidate: Hash },
1167}