kaspa_consensus/pipeline/virtual_processor/
processor.rs

1use crate::{
2    consensus::{
3        services::{
4            ConsensusServices, DbBlockDepthManager, DbDagTraversalManager, DbGhostdagManager, DbParentsManager, DbPruningPointManager,
5            DbWindowManager,
6        },
7        storage::ConsensusStorage,
8    },
9    constants::BLOCK_VERSION,
10    errors::RuleError,
11    model::{
12        services::{
13            reachability::{MTReachabilityService, ReachabilityService},
14            relations::MTRelationsService,
15        },
16        stores::{
17            acceptance_data::{AcceptanceDataStoreReader, DbAcceptanceDataStore},
18            block_transactions::{BlockTransactionsStoreReader, DbBlockTransactionsStore},
19            daa::DbDaaStore,
20            depth::{DbDepthStore, DepthStoreReader},
21            ghostdag::{DbGhostdagStore, GhostdagData, GhostdagStoreReader},
22            headers::{DbHeadersStore, HeaderStoreReader},
23            past_pruning_points::DbPastPruningPointsStore,
24            pruning::{DbPruningStore, PruningStoreReader},
25            pruning_utxoset::PruningUtxosetStores,
26            reachability::DbReachabilityStore,
27            relations::{DbRelationsStore, RelationsStoreReader},
28            selected_chain::{DbSelectedChainStore, SelectedChainStore},
29            statuses::{DbStatusesStore, StatusesStore, StatusesStoreBatchExtensions, StatusesStoreReader},
30            tips::{DbTipsStore, TipsStoreReader},
31            utxo_diffs::{DbUtxoDiffsStore, UtxoDiffsStoreReader},
32            utxo_multisets::{DbUtxoMultisetsStore, UtxoMultisetsStoreReader},
33            virtual_state::{LkgVirtualState, VirtualState, VirtualStateStoreReader, VirtualStores},
34            DB,
35        },
36    },
37    params::Params,
38    pipeline::{
39        deps_manager::VirtualStateProcessingMessage, pruning_processor::processor::PruningProcessingMessage,
40        virtual_processor::utxo_validation::UtxoProcessingContext, ProcessingCounters,
41    },
42    processes::{
43        coinbase::CoinbaseManager,
44        ghostdag::ordering::SortableBlock,
45        transaction_validator::{errors::TxResult, transaction_validator_populated::TxValidationFlags, TransactionValidator},
46        window::WindowManager,
47    },
48};
49use kaspa_consensus_core::{
50    acceptance_data::AcceptanceData,
51    api::args::{TransactionValidationArgs, TransactionValidationBatchArgs},
52    block::{BlockTemplate, MutableBlock, TemplateBuildMode, TemplateTransactionSelector},
53    blockstatus::BlockStatus::{StatusDisqualifiedFromChain, StatusUTXOValid},
54    coinbase::MinerData,
55    config::genesis::GenesisBlock,
56    header::Header,
57    merkle::calc_hash_merkle_root,
58    pruning::PruningPointsList,
59    tx::{MutableTransaction, Transaction},
60    utxo::{
61        utxo_diff::UtxoDiff,
62        utxo_view::{UtxoView, UtxoViewComposition},
63    },
64    BlockHashSet, ChainPath,
65};
66use kaspa_consensus_notify::{
67    notification::{
68        NewBlockTemplateNotification, Notification, SinkBlueScoreChangedNotification, UtxosChangedNotification,
69        VirtualChainChangedNotification, VirtualDaaScoreChangedNotification,
70    },
71    root::ConsensusNotificationRoot,
72};
73use kaspa_consensusmanager::SessionLock;
74use kaspa_core::{debug, info, time::unix_now, trace, warn};
75use kaspa_database::prelude::{StoreError, StoreResultEmptyTuple, StoreResultExtensions};
76use kaspa_hashes::Hash;
77use kaspa_muhash::MuHash;
78use kaspa_notify::{events::EventType, notifier::Notify};
79
80use super::errors::{PruningImportError, PruningImportResult};
81use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
82use itertools::Itertools;
83use kaspa_consensus_core::tx::ValidatedTransaction;
84use kaspa_utils::binary_heap::BinaryHeapExtensions;
85use parking_lot::{RwLock, RwLockUpgradableReadGuard};
86use rand::{seq::SliceRandom, Rng};
87use rayon::{
88    prelude::{IntoParallelRefIterator, IntoParallelRefMutIterator, ParallelIterator},
89    ThreadPool,
90};
91use rocksdb::WriteBatch;
92use std::{
93    cmp::min,
94    collections::{BinaryHeap, HashMap, VecDeque},
95    ops::Deref,
96    sync::{atomic::Ordering, Arc},
97};
98
99pub struct VirtualStateProcessor {
100    // Channels
101    receiver: CrossbeamReceiver<VirtualStateProcessingMessage>,
102    pruning_sender: CrossbeamSender<PruningProcessingMessage>,
103    pruning_receiver: CrossbeamReceiver<PruningProcessingMessage>,
104
105    // Thread pool
106    pub(super) thread_pool: Arc<ThreadPool>,
107
108    // DB
109    db: Arc<DB>,
110
111    // Config
112    pub(super) genesis: GenesisBlock,
113    pub(super) max_block_parents: u8,
114    pub(super) mergeset_size_limit: u64,
115    pub(super) pruning_depth: u64,
116
117    // Stores
118    pub(super) statuses_store: Arc<RwLock<DbStatusesStore>>,
119    pub(super) ghostdag_primary_store: Arc<DbGhostdagStore>,
120    pub(super) headers_store: Arc<DbHeadersStore>,
121    pub(super) daa_excluded_store: Arc<DbDaaStore>,
122    pub(super) block_transactions_store: Arc<DbBlockTransactionsStore>,
123    pub(super) pruning_point_store: Arc<RwLock<DbPruningStore>>,
124    pub(super) past_pruning_points_store: Arc<DbPastPruningPointsStore>,
125    pub(super) body_tips_store: Arc<RwLock<DbTipsStore>>,
126    pub(super) depth_store: Arc<DbDepthStore>,
127    pub(super) selected_chain_store: Arc<RwLock<DbSelectedChainStore>>,
128
129    // Utxo-related stores
130    pub(super) utxo_diffs_store: Arc<DbUtxoDiffsStore>,
131    pub(super) utxo_multisets_store: Arc<DbUtxoMultisetsStore>,
132    pub(super) acceptance_data_store: Arc<DbAcceptanceDataStore>,
133    pub(super) virtual_stores: Arc<RwLock<VirtualStores>>,
134    pub(super) pruning_utxoset_stores: Arc<RwLock<PruningUtxosetStores>>,
135
136    /// The "last known good" virtual state. To be used by any logic which does not want to wait
137    /// for a possible virtual state write to complete but can rather settle with the last known state
138    pub lkg_virtual_state: LkgVirtualState,
139
140    // Managers and services
141    pub(super) ghostdag_manager: DbGhostdagManager,
142    pub(super) reachability_service: MTReachabilityService<DbReachabilityStore>,
143    pub(super) relations_service: MTRelationsService<DbRelationsStore>,
144    pub(super) dag_traversal_manager: DbDagTraversalManager,
145    pub(super) window_manager: DbWindowManager,
146    pub(super) coinbase_manager: CoinbaseManager,
147    pub(super) transaction_validator: TransactionValidator,
148    pub(super) pruning_point_manager: DbPruningPointManager,
149    pub(super) parents_manager: DbParentsManager,
150    pub(super) depth_manager: DbBlockDepthManager,
151
152    // Pruning lock
153    pruning_lock: SessionLock,
154
155    // Notifier
156    notification_root: Arc<ConsensusNotificationRoot>,
157
158    // Counters
159    counters: Arc<ProcessingCounters>,
160
161    // Storage mass hardfork DAA score
162    pub(crate) storage_mass_activation_daa_score: u64,
163}
164
165impl VirtualStateProcessor {
166    #[allow(clippy::too_many_arguments)]
167    pub fn new(
168        receiver: CrossbeamReceiver<VirtualStateProcessingMessage>,
169        pruning_sender: CrossbeamSender<PruningProcessingMessage>,
170        pruning_receiver: CrossbeamReceiver<PruningProcessingMessage>,
171        thread_pool: Arc<ThreadPool>,
172        params: &Params,
173        db: Arc<DB>,
174        storage: &Arc<ConsensusStorage>,
175        services: &Arc<ConsensusServices>,
176        pruning_lock: SessionLock,
177        notification_root: Arc<ConsensusNotificationRoot>,
178        counters: Arc<ProcessingCounters>,
179    ) -> Self {
180        Self {
181            receiver,
182            pruning_sender,
183            pruning_receiver,
184            thread_pool,
185
186            genesis: params.genesis.clone(),
187            max_block_parents: params.max_block_parents,
188            mergeset_size_limit: params.mergeset_size_limit,
189            pruning_depth: params.pruning_depth,
190
191            db,
192            statuses_store: storage.statuses_store.clone(),
193            headers_store: storage.headers_store.clone(),
194            ghostdag_primary_store: storage.ghostdag_primary_store.clone(),
195            daa_excluded_store: storage.daa_excluded_store.clone(),
196            block_transactions_store: storage.block_transactions_store.clone(),
197            pruning_point_store: storage.pruning_point_store.clone(),
198            past_pruning_points_store: storage.past_pruning_points_store.clone(),
199            body_tips_store: storage.body_tips_store.clone(),
200            depth_store: storage.depth_store.clone(),
201            selected_chain_store: storage.selected_chain_store.clone(),
202            utxo_diffs_store: storage.utxo_diffs_store.clone(),
203            utxo_multisets_store: storage.utxo_multisets_store.clone(),
204            acceptance_data_store: storage.acceptance_data_store.clone(),
205            virtual_stores: storage.virtual_stores.clone(),
206            pruning_utxoset_stores: storage.pruning_utxoset_stores.clone(),
207            lkg_virtual_state: storage.lkg_virtual_state.clone(),
208
209            ghostdag_manager: services.ghostdag_primary_manager.clone(),
210            reachability_service: services.reachability_service.clone(),
211            relations_service: services.relations_service.clone(),
212            dag_traversal_manager: services.dag_traversal_manager.clone(),
213            window_manager: services.window_manager.clone(),
214            coinbase_manager: services.coinbase_manager.clone(),
215            transaction_validator: services.transaction_validator.clone(),
216            pruning_point_manager: services.pruning_point_manager.clone(),
217            parents_manager: services.parents_manager.clone(),
218            depth_manager: services.depth_manager.clone(),
219
220            pruning_lock,
221            notification_root,
222            counters,
223            storage_mass_activation_daa_score: params.storage_mass_activation_daa_score,
224        }
225    }
226
227    pub fn worker(self: &Arc<Self>) {
228        'outer: while let Ok(msg) = self.receiver.recv() {
229            if msg.is_exit_message() {
230                break;
231            }
232
233            // Once a task arrived, collect all pending tasks from the channel.
234            // This is done since virtual processing is not a per-block
235            // operation, so it benefits from max available info
236
237            let messages: Vec<VirtualStateProcessingMessage> = std::iter::once(msg).chain(self.receiver.try_iter()).collect();
238            trace!("virtual processor received {} tasks", messages.len());
239
240            self.resolve_virtual();
241
242            let statuses_read = self.statuses_store.read();
243            for msg in messages {
244                match msg {
245                    VirtualStateProcessingMessage::Exit => break 'outer,
246                    VirtualStateProcessingMessage::Process(task, virtual_state_result_transmitter) => {
247                        // We don't care if receivers were dropped
248                        let _ = virtual_state_result_transmitter.send(Ok(statuses_read.get(task.block().hash()).unwrap()));
249                    }
250                };
251            }
252        }
253
254        // Pass the exit signal on to the following processor
255        self.pruning_sender.send(PruningProcessingMessage::Exit).unwrap();
256    }
257
258    fn resolve_virtual(self: &Arc<Self>) {
259        let pruning_point = self.pruning_point_store.read().pruning_point().unwrap();
260        let virtual_read = self.virtual_stores.upgradable_read();
261        let prev_state = virtual_read.state.get().unwrap();
262        let finality_point = self.virtual_finality_point(&prev_state.ghostdag_data, pruning_point);
263
264        // PRUNE SAFETY: in order to avoid locking the prune lock throughout virtual resolving we make sure
265        // to only process blocks in the future of the finality point (F) which are never pruned (since finality depth << pruning depth).
266        // This is justified since:
267        //      1. Tips which are not in the future of F definitely don't have F on their chain
268        //         hence cannot become the next sink (due to finality violation).
269        //      2. Such tips cannot be merged by virtual since they are violating the merge depth
270        //         bound (merge depth <= finality depth).
271        // (both claims are true by induction for any block in their past as well)
272        let prune_guard = self.pruning_lock.blocking_read();
273        let tips = self
274            .body_tips_store
275            .read()
276            .get()
277            .unwrap()
278            .read()
279            .iter()
280            .copied()
281            .filter(|&h| self.reachability_service.is_dag_ancestor_of(finality_point, h))
282            .collect_vec();
283        drop(prune_guard);
284        let prev_sink = prev_state.ghostdag_data.selected_parent;
285        let mut accumulated_diff = prev_state.utxo_diff.clone().to_reversed();
286
287        let (new_sink, virtual_parent_candidates) =
288            self.sink_search_algorithm(&virtual_read, &mut accumulated_diff, prev_sink, tips, finality_point, pruning_point);
289        let (virtual_parents, virtual_ghostdag_data) = self.pick_virtual_parents(new_sink, virtual_parent_candidates, pruning_point);
290        assert_eq!(virtual_ghostdag_data.selected_parent, new_sink);
291
292        let sink_multiset = self.utxo_multisets_store.get(new_sink).unwrap();
293        let chain_path = self.dag_traversal_manager.calculate_chain_path(prev_sink, new_sink, None);
294        let new_virtual_state = self
295            .calculate_and_commit_virtual_state(
296                virtual_read,
297                virtual_parents,
298                virtual_ghostdag_data,
299                sink_multiset,
300                &mut accumulated_diff,
301                &chain_path,
302            )
303            .expect("all possible rule errors are unexpected here");
304
305        // Update the pruning processor about the virtual state change
306        let sink_ghostdag_data = self.ghostdag_primary_store.get_compact_data(new_sink).unwrap();
307        // Empty the channel before sending the new message. If pruning processor is busy, this step makes sure
308        // the internal channel does not grow with no need (since we only care about the most recent message)
309        let _consume = self.pruning_receiver.try_iter().count();
310        self.pruning_sender.send(PruningProcessingMessage::Process { sink_ghostdag_data }).unwrap();
311
312        // Emit notifications
313        let accumulated_diff = Arc::new(accumulated_diff);
314        let virtual_parents = Arc::new(new_virtual_state.parents.clone());
315        self.notification_root
316            .notify(Notification::NewBlockTemplate(NewBlockTemplateNotification {}))
317            .expect("expecting an open unbounded channel");
318        self.notification_root
319            .notify(Notification::UtxosChanged(UtxosChangedNotification::new(accumulated_diff, virtual_parents)))
320            .expect("expecting an open unbounded channel");
321        self.notification_root
322            .notify(Notification::SinkBlueScoreChanged(SinkBlueScoreChangedNotification::new(sink_ghostdag_data.blue_score)))
323            .expect("expecting an open unbounded channel");
324        self.notification_root
325            .notify(Notification::VirtualDaaScoreChanged(VirtualDaaScoreChangedNotification::new(new_virtual_state.daa_score)))
326            .expect("expecting an open unbounded channel");
327        if self.notification_root.has_subscription(EventType::VirtualChainChanged) {
328            // check for subscriptions before the heavy lifting
329            let added_chain_blocks_acceptance_data =
330                chain_path.added.iter().copied().map(|added| self.acceptance_data_store.get(added).unwrap()).collect_vec();
331            self.notification_root
332                .notify(Notification::VirtualChainChanged(VirtualChainChangedNotification::new(
333                    chain_path.added.into(),
334                    chain_path.removed.into(),
335                    Arc::new(added_chain_blocks_acceptance_data),
336                )))
337                .expect("expecting an open unbounded channel");
338        }
339    }
340
341    pub(crate) fn virtual_finality_point(&self, virtual_ghostdag_data: &GhostdagData, pruning_point: Hash) -> Hash {
342        let finality_point = self.depth_manager.calc_finality_point(virtual_ghostdag_data, pruning_point);
343        if self.reachability_service.is_chain_ancestor_of(pruning_point, finality_point) {
344            finality_point
345        } else {
346            // At the beginning of IBD when virtual finality point might be below the pruning point
347            // or disagreeing with the pruning point chain, we take the pruning point itself as the finality point
348            pruning_point
349        }
350    }
351
352    /// Calculates the UTXO state of `to` starting from the state of `from`.
353    /// The provided `diff` is assumed to initially hold the UTXO diff of `from` from virtual.
354    /// The function returns the top-most UTXO-valid block on `chain(to)` which is ideally
355    /// `to` itself (with the exception of returning `from` if `to` is already known to be UTXO disqualified).
356    /// When returning it is guaranteed that `diff` holds the diff of the returned block from virtual
357    fn calculate_utxo_state_relatively(&self, stores: &VirtualStores, diff: &mut UtxoDiff, from: Hash, to: Hash) -> Hash {
358        // Avoid reorging if disqualified status is already known
359        if self.statuses_store.read().get(to).unwrap() == StatusDisqualifiedFromChain {
360            return from;
361        }
362
363        let mut split_point: Option<Hash> = None;
364
365        // Walk down to the reorg split point
366        for current in self.reachability_service.default_backward_chain_iterator(from) {
367            if self.reachability_service.is_chain_ancestor_of(current, to) {
368                split_point = Some(current);
369                break;
370            }
371
372            let mergeset_diff = self.utxo_diffs_store.get(current).unwrap();
373            // Apply the diff in reverse
374            diff.with_diff_in_place(&mergeset_diff.as_reversed()).unwrap();
375        }
376
377        let split_point = split_point.expect("chain iterator was expected to reach the reorg split point");
378        debug!("VIRTUAL PROCESSOR, found split point: {split_point}");
379
380        // A variable holding the most recent UTXO-valid block on `chain(to)` (note that it's maintained such
381        // that 'diff' is always its UTXO diff from virtual)
382        let mut diff_point = split_point;
383
384        // Walk back up to the new virtual selected parent candidate
385        let mut chain_block_counter = 0;
386        let mut chain_disqualified_counter = 0;
387        for (selected_parent, current) in self.reachability_service.forward_chain_iterator(split_point, to, true).tuple_windows() {
388            if selected_parent != diff_point {
389                // This indicates that the selected parent is disqualified, propagate up and continue
390                self.statuses_store.write().set(current, StatusDisqualifiedFromChain).unwrap();
391                chain_disqualified_counter += 1;
392                continue;
393            }
394
395            match self.utxo_diffs_store.get(current) {
396                Ok(mergeset_diff) => {
397                    diff.with_diff_in_place(mergeset_diff.deref()).unwrap();
398                    diff_point = current;
399                }
400                Err(StoreError::KeyNotFound(_)) => {
401                    if self.statuses_store.read().get(current).unwrap() == StatusDisqualifiedFromChain {
402                        // Current block is already known to be disqualified
403                        continue;
404                    }
405
406                    let header = self.headers_store.get_header(current).unwrap();
407                    let mergeset_data = self.ghostdag_primary_store.get_data(current).unwrap();
408                    let pov_daa_score = header.daa_score;
409
410                    let selected_parent_multiset_hash = self.utxo_multisets_store.get(selected_parent).unwrap();
411                    let selected_parent_utxo_view = (&stores.utxo_set).compose(&*diff);
412
413                    let mut ctx = UtxoProcessingContext::new(mergeset_data.into(), selected_parent_multiset_hash);
414
415                    self.calculate_utxo_state(&mut ctx, &selected_parent_utxo_view, pov_daa_score);
416                    let res = self.verify_expected_utxo_state(&mut ctx, &selected_parent_utxo_view, &header);
417
418                    if let Err(rule_error) = res {
419                        info!("Block {} is disqualified from virtual chain: {}", current, rule_error);
420                        self.statuses_store.write().set(current, StatusDisqualifiedFromChain).unwrap();
421                        chain_disqualified_counter += 1;
422                    } else {
423                        debug!("VIRTUAL PROCESSOR, UTXO validated for {current}");
424
425                        // Accumulate the diff
426                        diff.with_diff_in_place(&ctx.mergeset_diff).unwrap();
427                        // Update the diff point
428                        diff_point = current;
429                        // Commit UTXO data for current chain block
430                        self.commit_utxo_state(current, ctx.mergeset_diff, ctx.multiset_hash, ctx.mergeset_acceptance_data);
431                        // Count the number of UTXO-processed chain blocks
432                        chain_block_counter += 1;
433                    }
434                }
435                Err(err) => panic!("unexpected error {err}"),
436            }
437        }
438        // Report counters
439        self.counters.chain_block_counts.fetch_add(chain_block_counter, Ordering::Relaxed);
440        if chain_disqualified_counter > 0 {
441            self.counters.chain_disqualified_counts.fetch_add(chain_disqualified_counter, Ordering::Relaxed);
442        }
443
444        diff_point
445    }
446
447    fn commit_utxo_state(&self, current: Hash, mergeset_diff: UtxoDiff, multiset: MuHash, acceptance_data: AcceptanceData) {
448        let mut batch = WriteBatch::default();
449        self.utxo_diffs_store.insert_batch(&mut batch, current, Arc::new(mergeset_diff)).unwrap();
450        self.utxo_multisets_store.insert_batch(&mut batch, current, multiset).unwrap();
451        self.acceptance_data_store.insert_batch(&mut batch, current, Arc::new(acceptance_data)).unwrap();
452        let write_guard = self.statuses_store.set_batch(&mut batch, current, StatusUTXOValid).unwrap();
453        self.db.write(batch).unwrap();
454        // Calling the drops explicitly after the batch is written in order to avoid possible errors.
455        drop(write_guard);
456    }
457
458    fn calculate_and_commit_virtual_state(
459        &self,
460        virtual_read: RwLockUpgradableReadGuard<'_, VirtualStores>,
461        virtual_parents: Vec<Hash>,
462        virtual_ghostdag_data: GhostdagData,
463        selected_parent_multiset: MuHash,
464        accumulated_diff: &mut UtxoDiff,
465        chain_path: &ChainPath,
466    ) -> Result<Arc<VirtualState>, RuleError> {
467        let new_virtual_state = self.calculate_virtual_state(
468            &virtual_read,
469            virtual_parents,
470            virtual_ghostdag_data,
471            selected_parent_multiset,
472            accumulated_diff,
473        )?;
474        self.commit_virtual_state(virtual_read, new_virtual_state.clone(), accumulated_diff, chain_path);
475        Ok(new_virtual_state)
476    }
477
478    pub(super) fn calculate_virtual_state(
479        &self,
480        virtual_stores: &VirtualStores,
481        virtual_parents: Vec<Hash>,
482        virtual_ghostdag_data: GhostdagData,
483        selected_parent_multiset: MuHash,
484        accumulated_diff: &mut UtxoDiff,
485    ) -> Result<Arc<VirtualState>, RuleError> {
486        let selected_parent_utxo_view = (&virtual_stores.utxo_set).compose(&*accumulated_diff);
487        let mut ctx = UtxoProcessingContext::new((&virtual_ghostdag_data).into(), selected_parent_multiset);
488
489        // Calc virtual DAA score, difficulty bits and past median time
490        let virtual_daa_window = self.window_manager.block_daa_window(&virtual_ghostdag_data)?;
491        let virtual_bits = self.window_manager.calculate_difficulty_bits(&virtual_ghostdag_data, &virtual_daa_window);
492        let virtual_past_median_time = self.window_manager.calc_past_median_time(&virtual_ghostdag_data)?.0;
493
494        // Calc virtual UTXO state relative to selected parent
495        self.calculate_utxo_state(&mut ctx, &selected_parent_utxo_view, virtual_daa_window.daa_score);
496
497        // Update the accumulated diff
498        accumulated_diff.with_diff_in_place(&ctx.mergeset_diff).unwrap();
499
500        // Build the new virtual state
501        Ok(Arc::new(VirtualState::new(
502            virtual_parents,
503            virtual_daa_window.daa_score,
504            virtual_bits,
505            virtual_past_median_time,
506            ctx.multiset_hash,
507            ctx.mergeset_diff,
508            ctx.accepted_tx_ids,
509            ctx.mergeset_rewards,
510            virtual_daa_window.mergeset_non_daa,
511            virtual_ghostdag_data,
512        )))
513    }
514
515    fn commit_virtual_state(
516        &self,
517        virtual_read: RwLockUpgradableReadGuard<'_, VirtualStores>,
518        new_virtual_state: Arc<VirtualState>,
519        accumulated_diff: &UtxoDiff,
520        chain_path: &ChainPath,
521    ) {
522        let mut batch = WriteBatch::default();
523        let mut virtual_write = RwLockUpgradableReadGuard::upgrade(virtual_read);
524        let mut selected_chain_write = self.selected_chain_store.write();
525
526        // Apply the accumulated diff to the virtual UTXO set
527        virtual_write.utxo_set.write_diff_batch(&mut batch, accumulated_diff).unwrap();
528
529        // Update virtual state
530        virtual_write.state.set_batch(&mut batch, new_virtual_state).unwrap();
531
532        // Update the virtual selected chain
533        selected_chain_write.apply_changes(&mut batch, chain_path).unwrap();
534
535        // Flush the batch changes
536        self.db.write(batch).unwrap();
537
538        // Calling the drops explicitly after the batch is written in order to avoid possible errors.
539        drop(virtual_write);
540        drop(selected_chain_write);
541    }
542
543    /// Returns the max number of tips to consider as virtual parents in a single virtual resolve operation.
544    ///
545    /// Guaranteed to be `>= self.max_block_parents`
546    fn max_virtual_parent_candidates(&self) -> usize {
547        // Limit to max_block_parents x 3 candidates. This way we avoid going over thousands of tips when the network isn't healthy.
548        // There's no specific reason for a factor of 3, and its not a consensus rule, just an estimation for reducing the amount
549        // of candidates considered.
550        self.max_block_parents as usize * 3
551    }
552
553    /// Searches for the next valid sink block (SINK = Virtual selected parent). The search is performed
554    /// in the inclusive past of `tips`.
555    /// The provided `diff` is assumed to initially hold the UTXO diff of `prev_sink` from virtual.
556    /// The function returns with `diff` being the diff of the new sink from previous virtual.
557    /// In addition to the found sink the function also returns a queue of additional virtual
558    /// parent candidates ordered in descending blue work order.
559    pub(super) fn sink_search_algorithm(
560        &self,
561        stores: &VirtualStores,
562        diff: &mut UtxoDiff,
563        prev_sink: Hash,
564        tips: Vec<Hash>,
565        finality_point: Hash,
566        pruning_point: Hash,
567    ) -> (Hash, VecDeque<Hash>) {
568        // TODO (relaxed): additional tests
569
570        let mut heap = tips
571            .into_iter()
572            .map(|block| SortableBlock { hash: block, blue_work: self.ghostdag_primary_store.get_blue_work(block).unwrap() })
573            .collect::<BinaryHeap<_>>();
574
575        // The initial diff point is the previous sink
576        let mut diff_point = prev_sink;
577
578        // We maintain the following invariant: `heap` is an antichain.
579        // It holds at step 0 since tips are an antichain, and remains through the loop
580        // since we check that every pushed block is not in the past of current heap
581        // (and it can't be in the future by induction)
582        loop {
583            let candidate = heap.pop().expect("valid sink must exist").hash;
584            if self.reachability_service.is_chain_ancestor_of(finality_point, candidate) {
585                diff_point = self.calculate_utxo_state_relatively(stores, diff, diff_point, candidate);
586                if diff_point == candidate {
587                    // This indicates that candidate has valid UTXO state and that `diff` represents its diff from virtual
588
589                    // All blocks with lower blue work than filtering_root are:
590                    // 1. not in its future (bcs blue work is monotonic),
591                    // 2. will be removed eventually by the bounded merge check.
592                    // Hence as an optimization we prefer removing such blocks in advance to allow valid tips to be considered.
593                    let filtering_root = self.depth_store.merge_depth_root(candidate).unwrap();
594                    let filtering_blue_work = self.ghostdag_primary_store.get_blue_work(filtering_root).unwrap_or_default();
595                    return (
596                        candidate,
597                        heap.into_sorted_iter().take_while(|s| s.blue_work >= filtering_blue_work).map(|s| s.hash).collect(),
598                    );
599                } else {
600                    debug!("Block candidate {} has invalid UTXO state and is ignored from Virtual chain.", candidate)
601                }
602            } else if finality_point != pruning_point {
603                // `finality_point == pruning_point` indicates we are at IBD start hence no warning required
604                warn!("Finality Violation Detected. Block {} violates finality and is ignored from Virtual chain.", candidate);
605            }
606            // PRUNE SAFETY: see comment within [`resolve_virtual`]
607            let prune_guard = self.pruning_lock.blocking_read();
608            for parent in self.relations_service.get_parents(candidate).unwrap().iter().copied() {
609                if self.reachability_service.is_dag_ancestor_of(finality_point, parent)
610                    && !self.reachability_service.is_dag_ancestor_of_any(parent, &mut heap.iter().map(|sb| sb.hash))
611                {
612                    heap.push(SortableBlock { hash: parent, blue_work: self.ghostdag_primary_store.get_blue_work(parent).unwrap() });
613                }
614            }
615            drop(prune_guard);
616        }
617    }
618
619    /// Picks the virtual parents according to virtual parent selection pruning constrains.
620    /// Assumes:
621    ///     1. `selected_parent` is a UTXO-valid block
622    ///     2. `candidates` are an antichain ordered in descending blue work order
623    ///     3. `candidates` do not contain `selected_parent` and `selected_parent.blue work > max(candidates.blue_work)`  
624    pub(super) fn pick_virtual_parents(
625        &self,
626        selected_parent: Hash,
627        mut candidates: VecDeque<Hash>,
628        pruning_point: Hash,
629    ) -> (Vec<Hash>, GhostdagData) {
630        // TODO (relaxed): additional tests
631
632        // Mergeset increasing might traverse DAG areas which are below the finality point and which theoretically
633        // can borderline with pruned data, hence we acquire the prune lock to ensure data consistency. Note that
634        // the final selected mergeset can never be pruned (this is the essence of the prunality proof), however
635        // we might touch such data prior to validating the bounded merge rule. All in all, this function is short
636        // enough so we avoid making further optimizations
637        let _prune_guard = self.pruning_lock.blocking_read();
638        let max_block_parents = self.max_block_parents as usize;
639        let max_candidates = self.max_virtual_parent_candidates();
640
641        // Prioritize half the blocks with highest blue work and pick the rest randomly to ensure diversity between nodes
642        if candidates.len() > max_candidates {
643            // make_contiguous should be a no op since the deque was just built
644            let slice = candidates.make_contiguous();
645
646            // Keep slice[..max_block_parents / 2] as is, choose max_candidates - max_block_parents / 2 in random
647            // from the remainder of the slice while swapping them to slice[max_block_parents / 2..max_candidates].
648            //
649            // Inspired by rand::partial_shuffle (which lacks the guarantee on chosen elements location).
650            for i in max_block_parents / 2..max_candidates {
651                let j = rand::thread_rng().gen_range(i..slice.len()); // i < max_candidates < slice.len()
652                slice.swap(i, j);
653            }
654
655            // Truncate the unchosen elements
656            candidates.truncate(max_candidates);
657        } else if candidates.len() > max_block_parents / 2 {
658            // Fallback to a simpler algo in this case
659            candidates.make_contiguous()[max_block_parents / 2..].shuffle(&mut rand::thread_rng());
660        }
661
662        let mut virtual_parents = Vec::with_capacity(min(max_block_parents, candidates.len() + 1));
663        virtual_parents.push(selected_parent);
664        let mut mergeset_size = 1; // Count the selected parent
665
666        // Try adding parents as long as mergeset size and number of parents limits are not reached
667        while let Some(candidate) = candidates.pop_front() {
668            if mergeset_size >= self.mergeset_size_limit || virtual_parents.len() >= max_block_parents {
669                break;
670            }
671            match self.mergeset_increase(&virtual_parents, candidate, self.mergeset_size_limit - mergeset_size) {
672                MergesetIncreaseResult::Accepted { increase_size } => {
673                    mergeset_size += increase_size;
674                    virtual_parents.push(candidate);
675                }
676                MergesetIncreaseResult::Rejected { new_candidate } => {
677                    // If we already have a candidate in the past of new candidate then skip.
678                    if self.reachability_service.is_any_dag_ancestor(&mut candidates.iter().copied(), new_candidate) {
679                        continue; // TODO (optimization): not sure this check is needed if candidates invariant as antichain is kept
680                    }
681                    // Remove all candidates which are in the future of the new candidate
682                    candidates.retain(|&h| !self.reachability_service.is_dag_ancestor_of(new_candidate, h));
683                    candidates.push_back(new_candidate);
684                }
685            }
686        }
687        assert!(mergeset_size <= self.mergeset_size_limit);
688        assert!(virtual_parents.len() <= max_block_parents);
689        self.remove_bounded_merge_breaking_parents(virtual_parents, pruning_point)
690    }
691
692    fn mergeset_increase(&self, selected_parents: &[Hash], candidate: Hash, budget: u64) -> MergesetIncreaseResult {
693        /*
694        Algo:
695            Traverse past(candidate) \setminus past(selected_parents) and make
696            sure the increase in mergeset size is within the available budget
697        */
698
699        let candidate_parents = self.relations_service.get_parents(candidate).unwrap();
700        let mut queue: VecDeque<_> = candidate_parents.iter().copied().collect();
701        let mut visited: BlockHashSet = queue.iter().copied().collect();
702        let mut mergeset_increase = 1u64; // Starts with 1 to count for the candidate itself
703
704        while let Some(current) = queue.pop_front() {
705            if self.reachability_service.is_dag_ancestor_of_any(current, &mut selected_parents.iter().copied()) {
706                continue;
707            }
708            mergeset_increase += 1;
709            if mergeset_increase > budget {
710                return MergesetIncreaseResult::Rejected { new_candidate: current };
711            }
712
713            let current_parents = self.relations_service.get_parents(current).unwrap();
714            for &parent in current_parents.iter() {
715                if visited.insert(parent) {
716                    queue.push_back(parent);
717                }
718            }
719        }
720        MergesetIncreaseResult::Accepted { increase_size: mergeset_increase }
721    }
722
723    fn remove_bounded_merge_breaking_parents(
724        &self,
725        mut virtual_parents: Vec<Hash>,
726        current_pruning_point: Hash,
727    ) -> (Vec<Hash>, GhostdagData) {
728        let mut ghostdag_data = self.ghostdag_manager.ghostdag(&virtual_parents);
729        let merge_depth_root = self.depth_manager.calc_merge_depth_root(&ghostdag_data, current_pruning_point);
730        let mut kosherizing_blues: Option<Vec<Hash>> = None;
731        let mut bad_reds = Vec::new();
732
733        //
734        // Note that the code below optimizes for the usual case where there are no merge-bound-violating blocks.
735        //
736
737        // Find red blocks violating the merge bound and which are not kosherized by any blue
738        for red in ghostdag_data.mergeset_reds.iter().copied() {
739            if self.reachability_service.is_dag_ancestor_of(merge_depth_root, red) {
740                continue;
741            }
742            // Lazy load the kosherizing blocks since this case is extremely rare
743            if kosherizing_blues.is_none() {
744                kosherizing_blues = Some(self.depth_manager.kosherizing_blues(&ghostdag_data, merge_depth_root).collect());
745            }
746            if !self.reachability_service.is_dag_ancestor_of_any(red, &mut kosherizing_blues.as_ref().unwrap().iter().copied()) {
747                bad_reds.push(red);
748            }
749        }
750
751        if !bad_reds.is_empty() {
752            // Remove all parents which lead to merging a bad red
753            virtual_parents.retain(|&h| !self.reachability_service.is_any_dag_ancestor(&mut bad_reds.iter().copied(), h));
754            // Recompute ghostdag data since parents changed
755            ghostdag_data = self.ghostdag_manager.ghostdag(&virtual_parents);
756        }
757
758        (virtual_parents, ghostdag_data)
759    }
760
761    fn validate_mempool_transaction_impl(
762        &self,
763        mutable_tx: &mut MutableTransaction,
764        virtual_utxo_view: &impl UtxoView,
765        virtual_daa_score: u64,
766        virtual_past_median_time: u64,
767        args: &TransactionValidationArgs,
768    ) -> TxResult<()> {
769        self.transaction_validator.validate_tx_in_isolation(&mutable_tx.tx)?;
770        self.transaction_validator.utxo_free_tx_validation(&mutable_tx.tx, virtual_daa_score, virtual_past_median_time)?;
771        self.validate_mempool_transaction_in_utxo_context(mutable_tx, virtual_utxo_view, virtual_daa_score, args)?;
772        Ok(())
773    }
774
775    pub fn validate_mempool_transaction(&self, mutable_tx: &mut MutableTransaction, args: &TransactionValidationArgs) -> TxResult<()> {
776        let virtual_read = self.virtual_stores.read();
777        let virtual_state = virtual_read.state.get().unwrap();
778        let virtual_utxo_view = &virtual_read.utxo_set;
779        let virtual_daa_score = virtual_state.daa_score;
780        let virtual_past_median_time = virtual_state.past_median_time;
781        self.validate_mempool_transaction_impl(mutable_tx, virtual_utxo_view, virtual_daa_score, virtual_past_median_time, args)
782    }
783
784    pub fn validate_mempool_transactions_in_parallel(
785        &self,
786        mutable_txs: &mut [MutableTransaction],
787        args: &TransactionValidationBatchArgs,
788    ) -> Vec<TxResult<()>> {
789        let virtual_read = self.virtual_stores.read();
790        let virtual_state = virtual_read.state.get().unwrap();
791        let virtual_utxo_view = &virtual_read.utxo_set;
792        let virtual_daa_score = virtual_state.daa_score;
793        let virtual_past_median_time = virtual_state.past_median_time;
794
795        self.thread_pool.install(|| {
796            mutable_txs
797                .par_iter_mut()
798                .map(|mtx| {
799                    self.validate_mempool_transaction_impl(
800                        mtx,
801                        &virtual_utxo_view,
802                        virtual_daa_score,
803                        virtual_past_median_time,
804                        args.get(&mtx.id()),
805                    )
806                })
807                .collect::<Vec<TxResult<()>>>()
808        })
809    }
810
811    fn populate_mempool_transaction_impl(
812        &self,
813        mutable_tx: &mut MutableTransaction,
814        virtual_utxo_view: &impl UtxoView,
815    ) -> TxResult<()> {
816        self.populate_mempool_transaction_in_utxo_context(mutable_tx, virtual_utxo_view)?;
817        Ok(())
818    }
819
820    pub fn populate_mempool_transaction(&self, mutable_tx: &mut MutableTransaction) -> TxResult<()> {
821        let virtual_read = self.virtual_stores.read();
822        let virtual_utxo_view = &virtual_read.utxo_set;
823        self.populate_mempool_transaction_impl(mutable_tx, virtual_utxo_view)
824    }
825
826    pub fn populate_mempool_transactions_in_parallel(&self, mutable_txs: &mut [MutableTransaction]) -> Vec<TxResult<()>> {
827        let virtual_read = self.virtual_stores.read();
828        let virtual_utxo_view = &virtual_read.utxo_set;
829        self.thread_pool.install(|| {
830            mutable_txs
831                .par_iter_mut()
832                .map(|mtx| self.populate_mempool_transaction_impl(mtx, &virtual_utxo_view))
833                .collect::<Vec<TxResult<()>>>()
834        })
835    }
836
837    fn validate_block_template_transactions_in_parallel<V: UtxoView + Sync>(
838        &self,
839        txs: &[Transaction],
840        virtual_state: &VirtualState,
841        utxo_view: &V,
842    ) -> Vec<TxResult<u64>> {
843        self.thread_pool
844            .install(|| txs.par_iter().map(|tx| self.validate_block_template_transaction(tx, virtual_state, &utxo_view)).collect())
845    }
846
847    fn validate_block_template_transaction(
848        &self,
849        tx: &Transaction,
850        virtual_state: &VirtualState,
851        utxo_view: &impl UtxoView,
852    ) -> TxResult<u64> {
853        // No need to validate the transaction in isolation since we rely on the mining manager to submit transactions
854        // which were previously validated through `validate_mempool_transaction_and_populate`, hence we only perform
855        // in-context validations
856        self.transaction_validator.utxo_free_tx_validation(tx, virtual_state.daa_score, virtual_state.past_median_time)?;
857        let ValidatedTransaction { calculated_fee, .. } =
858            self.validate_transaction_in_utxo_context(tx, utxo_view, virtual_state.daa_score, TxValidationFlags::Full)?;
859        Ok(calculated_fee)
860    }
861
862    pub fn build_block_template(
863        &self,
864        miner_data: MinerData,
865        mut tx_selector: Box<dyn TemplateTransactionSelector>,
866        build_mode: TemplateBuildMode,
867    ) -> Result<BlockTemplate, RuleError> {
868        //
869        // TODO (relaxed): additional tests
870        //
871
872        // We call for the initial tx batch before acquiring the virtual read lock,
873        // optimizing for the common case where all txs are valid. Following selection calls
874        // are called within the lock in order to preserve validness of already validated txs
875        let mut txs = tx_selector.select_transactions();
876        let mut calculated_fees = Vec::with_capacity(txs.len());
877        let virtual_read = self.virtual_stores.read();
878        let virtual_state = virtual_read.state.get().unwrap();
879        let virtual_utxo_view = &virtual_read.utxo_set;
880
881        let mut invalid_transactions = HashMap::new();
882        let results = self.validate_block_template_transactions_in_parallel(&txs, &virtual_state, &virtual_utxo_view);
883        for (tx, res) in txs.iter().zip(results) {
884            match res {
885                Err(e) => {
886                    invalid_transactions.insert(tx.id(), e);
887                    tx_selector.reject_selection(tx.id());
888                }
889                Ok(fee) => {
890                    calculated_fees.push(fee);
891                }
892            }
893        }
894
895        let mut has_rejections = !invalid_transactions.is_empty();
896        if has_rejections {
897            txs.retain(|tx| !invalid_transactions.contains_key(&tx.id()));
898        }
899
900        while has_rejections {
901            has_rejections = false;
902            let next_batch = tx_selector.select_transactions(); // Note that once next_batch is empty the loop will exit
903            let next_batch_results =
904                self.validate_block_template_transactions_in_parallel(&next_batch, &virtual_state, &virtual_utxo_view);
905            for (tx, res) in next_batch.into_iter().zip(next_batch_results) {
906                match res {
907                    Err(e) => {
908                        invalid_transactions.insert(tx.id(), e);
909                        tx_selector.reject_selection(tx.id());
910                        has_rejections = true;
911                    }
912                    Ok(fee) => {
913                        txs.push(tx);
914                        calculated_fees.push(fee);
915                    }
916                }
917            }
918        }
919
920        // Check whether this was an overall successful selection episode. We pass this decision
921        // to the selector implementation which has the broadest picture and can use mempool config
922        // and context
923        match (build_mode, tx_selector.is_successful()) {
924            (TemplateBuildMode::Standard, false) => return Err(RuleError::InvalidTransactionsInNewBlock(invalid_transactions)),
925            (TemplateBuildMode::Standard, true) | (TemplateBuildMode::Infallible, _) => {}
926        }
927
928        // At this point we can safely drop the read lock
929        drop(virtual_read);
930
931        // Build the template
932        self.build_block_template_from_virtual_state(virtual_state, miner_data, txs, calculated_fees)
933    }
934
935    pub(crate) fn validate_block_template_transactions(
936        &self,
937        txs: &[Transaction],
938        virtual_state: &VirtualState,
939        utxo_view: &impl UtxoView,
940    ) -> Result<(), RuleError> {
941        // Search for invalid transactions
942        let mut invalid_transactions = HashMap::new();
943        for tx in txs.iter() {
944            if let Err(e) = self.validate_block_template_transaction(tx, virtual_state, utxo_view) {
945                invalid_transactions.insert(tx.id(), e);
946            }
947        }
948        if !invalid_transactions.is_empty() {
949            Err(RuleError::InvalidTransactionsInNewBlock(invalid_transactions))
950        } else {
951            Ok(())
952        }
953    }
954
955    pub(crate) fn build_block_template_from_virtual_state(
956        &self,
957        virtual_state: Arc<VirtualState>,
958        miner_data: MinerData,
959        mut txs: Vec<Transaction>,
960        calculated_fees: Vec<u64>,
961    ) -> Result<BlockTemplate, RuleError> {
962        // [`calc_block_parents`] can use deep blocks below the pruning point for this calculation, so we
963        // need to hold the pruning lock.
964        let _prune_guard = self.pruning_lock.blocking_read();
965        let pruning_info = self.pruning_point_store.read().get().unwrap();
966        let header_pruning_point =
967            self.pruning_point_manager.expected_header_pruning_point(virtual_state.ghostdag_data.to_compact(), pruning_info);
968        let coinbase = self
969            .coinbase_manager
970            .expected_coinbase_transaction(
971                virtual_state.daa_score,
972                miner_data.clone(),
973                &virtual_state.ghostdag_data,
974                &virtual_state.mergeset_rewards,
975                &virtual_state.mergeset_non_daa,
976            )
977            .unwrap();
978        txs.insert(0, coinbase.tx);
979        let version = BLOCK_VERSION;
980        let parents_by_level = self.parents_manager.calc_block_parents(pruning_info.pruning_point, &virtual_state.parents);
981
982        // Hash according to hardfork activation
983        let storage_mass_activated = virtual_state.daa_score > self.storage_mass_activation_daa_score;
984        let hash_merkle_root = calc_hash_merkle_root(txs.iter(), storage_mass_activated);
985
986        let accepted_id_merkle_root = kaspa_merkle::calc_merkle_root(virtual_state.accepted_tx_ids.iter().copied());
987        let utxo_commitment = virtual_state.multiset.clone().finalize();
988        // Past median time is the exclusive lower bound for valid block time, so we increase by 1 to get the valid min
989        let min_block_time = virtual_state.past_median_time + 1;
990        let header = Header::new_finalized(
991            version,
992            parents_by_level,
993            hash_merkle_root,
994            accepted_id_merkle_root,
995            utxo_commitment,
996            u64::max(min_block_time, unix_now()),
997            virtual_state.bits,
998            0,
999            virtual_state.daa_score,
1000            virtual_state.ghostdag_data.blue_work,
1001            virtual_state.ghostdag_data.blue_score,
1002            header_pruning_point,
1003        );
1004        let selected_parent_hash = virtual_state.ghostdag_data.selected_parent;
1005        let selected_parent_timestamp = self.headers_store.get_timestamp(selected_parent_hash).unwrap();
1006        let selected_parent_daa_score = self.headers_store.get_daa_score(selected_parent_hash).unwrap();
1007        Ok(BlockTemplate::new(
1008            MutableBlock::new(header, txs),
1009            miner_data,
1010            coinbase.has_red_reward,
1011            selected_parent_timestamp,
1012            selected_parent_daa_score,
1013            selected_parent_hash,
1014            calculated_fees,
1015        ))
1016    }
1017
1018    /// Make sure pruning point-related stores are initialized
1019    pub fn init(self: &Arc<Self>) {
1020        let pruning_point_read = self.pruning_point_store.upgradable_read();
1021        if pruning_point_read.pruning_point().unwrap_option().is_none() {
1022            let mut pruning_point_write = RwLockUpgradableReadGuard::upgrade(pruning_point_read);
1023            let mut pruning_utxoset_write = self.pruning_utxoset_stores.write();
1024            let mut batch = WriteBatch::default();
1025            self.past_pruning_points_store.insert_batch(&mut batch, 0, self.genesis.hash).unwrap_or_exists();
1026            pruning_point_write.set_batch(&mut batch, self.genesis.hash, self.genesis.hash, 0).unwrap();
1027            pruning_point_write.set_history_root(&mut batch, self.genesis.hash).unwrap();
1028            pruning_utxoset_write.set_utxoset_position(&mut batch, self.genesis.hash).unwrap();
1029            self.db.write(batch).unwrap();
1030            drop(pruning_point_write);
1031            drop(pruning_utxoset_write);
1032        }
1033    }
1034
1035    /// Initializes UTXO state of genesis and points virtual at genesis.
1036    /// Note that pruning point-related stores are initialized by `init`
1037    pub fn process_genesis(self: &Arc<Self>) {
1038        // Write the UTXO state of genesis
1039        self.commit_utxo_state(self.genesis.hash, UtxoDiff::default(), MuHash::new(), AcceptanceData::default());
1040
1041        // Init the virtual selected chain store
1042        let mut batch = WriteBatch::default();
1043        let mut selected_chain_write = self.selected_chain_store.write();
1044        selected_chain_write.init_with_pruning_point(&mut batch, self.genesis.hash).unwrap();
1045        self.db.write(batch).unwrap();
1046        drop(selected_chain_write);
1047
1048        // Init virtual state
1049        self.commit_virtual_state(
1050            self.virtual_stores.upgradable_read(),
1051            Arc::new(VirtualState::from_genesis(&self.genesis, self.ghostdag_manager.ghostdag(&[self.genesis.hash]))),
1052            &Default::default(),
1053            &Default::default(),
1054        );
1055    }
1056
1057    /// Finalizes the pruning point utxoset state and imports the pruning point utxoset *to* virtual utxoset
1058    pub fn import_pruning_point_utxo_set(
1059        &self,
1060        new_pruning_point: Hash,
1061        mut imported_utxo_multiset: MuHash,
1062    ) -> PruningImportResult<()> {
1063        info!("Importing the UTXO set of the pruning point {}", new_pruning_point);
1064        let new_pruning_point_header = self.headers_store.get_header(new_pruning_point).unwrap();
1065        let imported_utxo_multiset_hash = imported_utxo_multiset.finalize();
1066        if imported_utxo_multiset_hash != new_pruning_point_header.utxo_commitment {
1067            return Err(PruningImportError::ImportedMultisetHashMismatch(
1068                new_pruning_point_header.utxo_commitment,
1069                imported_utxo_multiset_hash,
1070            ));
1071        }
1072
1073        {
1074            // Set the pruning point utxoset position to the new point we just verified
1075            let mut batch = WriteBatch::default();
1076            let mut pruning_utxoset_write = self.pruning_utxoset_stores.write();
1077            pruning_utxoset_write.set_utxoset_position(&mut batch, new_pruning_point).unwrap();
1078            self.db.write(batch).unwrap();
1079            drop(pruning_utxoset_write);
1080        }
1081
1082        {
1083            // Copy the pruning-point UTXO set into virtual's UTXO set
1084            let pruning_utxoset_read = self.pruning_utxoset_stores.read();
1085            let mut virtual_write = self.virtual_stores.write();
1086
1087            virtual_write.utxo_set.clear().unwrap();
1088            for chunk in &pruning_utxoset_read.utxo_set.iterator().map(|iter_result| iter_result.unwrap()).chunks(1000) {
1089                virtual_write.utxo_set.write_from_iterator_without_cache(chunk).unwrap();
1090            }
1091        }
1092
1093        let virtual_read = self.virtual_stores.upgradable_read();
1094
1095        // Validate transactions of the pruning point itself
1096        let new_pruning_point_transactions = self.block_transactions_store.get(new_pruning_point).unwrap();
1097        let validated_transactions = self.validate_transactions_in_parallel(
1098            &new_pruning_point_transactions,
1099            &virtual_read.utxo_set,
1100            new_pruning_point_header.daa_score,
1101            TxValidationFlags::Full,
1102        );
1103        if validated_transactions.len() < new_pruning_point_transactions.len() - 1 {
1104            // Some non-coinbase transactions are invalid
1105            return Err(PruningImportError::NewPruningPointTxErrors);
1106        }
1107
1108        {
1109            // Submit partial UTXO state for the pruning point.
1110            // Note we only have and need the multiset; acceptance data and utxo-diff are irrelevant.
1111            let mut batch = WriteBatch::default();
1112            self.utxo_multisets_store.set_batch(&mut batch, new_pruning_point, imported_utxo_multiset.clone()).unwrap();
1113
1114            let statuses_write = self.statuses_store.set_batch(&mut batch, new_pruning_point, StatusUTXOValid).unwrap();
1115            self.db.write(batch).unwrap();
1116            drop(statuses_write);
1117        }
1118
1119        // Calculate the virtual state, treating the pruning point as the only virtual parent
1120        let virtual_parents = vec![new_pruning_point];
1121        let virtual_ghostdag_data = self.ghostdag_manager.ghostdag(&virtual_parents);
1122
1123        self.calculate_and_commit_virtual_state(
1124            virtual_read,
1125            virtual_parents,
1126            virtual_ghostdag_data,
1127            imported_utxo_multiset.clone(),
1128            &mut UtxoDiff::default(),
1129            &ChainPath::default(),
1130        )?;
1131
1132        Ok(())
1133    }
1134
1135    pub fn are_pruning_points_violating_finality(&self, pp_list: PruningPointsList) -> bool {
1136        // Ideally we would want to check if the last known pruning point has the finality point
1137        // in its chain, but in some cases it's impossible: let `lkp` be the last known pruning
1138        // point from the list, and `fup` be the first unknown pruning point (the one following `lkp`).
1139        // fup.blue_score - lkp.blue_score ≈ finality_depth (±k), so it's possible for `lkp` not to
1140        // have the finality point in its past. So we have no choice but to check if `lkp`
1141        // has `finality_point.finality_point` in its chain (in the worst case `fup` is one block
1142        // above the current finality point, and in this case `lkp` will be a few blocks above the
1143        // finality_point.finality_point), meaning this function can only detect finality violations
1144        // in depth of 2*finality_depth, and can give false negatives for smaller finality violations.
1145        let current_pp = self.pruning_point_store.read().pruning_point().unwrap();
1146        let vf = self.virtual_finality_point(&self.lkg_virtual_state.load().ghostdag_data, current_pp);
1147        let vff = self.depth_manager.calc_finality_point(&self.ghostdag_primary_store.get_data(vf).unwrap(), current_pp);
1148
1149        let last_known_pp = pp_list.iter().rev().find(|pp| match self.statuses_store.read().get(pp.hash).unwrap_option() {
1150            Some(status) => status.is_valid(),
1151            None => false,
1152        });
1153
1154        if let Some(last_known_pp) = last_known_pp {
1155            !self.reachability_service.is_chain_ancestor_of(vff, last_known_pp.hash)
1156        } else {
1157            // If no pruning point is known, there's definitely a finality violation
1158            // (normally at least genesis should be known).
1159            true
1160        }
1161    }
1162}
1163
1164enum MergesetIncreaseResult {
1165    Accepted { increase_size: u64 },
1166    Rejected { new_candidate: Hash },
1167}