kaspa_consensus/processes/sync/
mod.rs

1use std::{cmp::min, ops::Deref, sync::Arc};
2
3use itertools::Itertools;
4use kaspa_consensus_core::errors::sync::{SyncManagerError, SyncManagerResult};
5use kaspa_database::prelude::StoreResultExtensions;
6use kaspa_hashes::Hash;
7use kaspa_math::uint::malachite_base::num::arithmetic::traits::CeilingLogBase2;
8use kaspa_utils::option::OptionExtensions;
9use parking_lot::RwLock;
10
11use crate::model::{
12    services::reachability::{MTReachabilityService, ReachabilityService},
13    stores::{
14        ghostdag::GhostdagStoreReader, headers_selected_tip::HeadersSelectedTipStoreReader, pruning::PruningStoreReader,
15        reachability::ReachabilityStoreReader, relations::RelationsStoreReader, selected_chain::SelectedChainStoreReader,
16        statuses::StatusesStoreReader,
17    },
18};
19
20use super::traversal_manager::DagTraversalManager;
21
22#[derive(Clone)]
23pub struct SyncManager<
24    S: RelationsStoreReader,
25    T: ReachabilityStoreReader,
26    U: GhostdagStoreReader,
27    V: SelectedChainStoreReader,
28    W: HeadersSelectedTipStoreReader,
29    X: PruningStoreReader,
30    Y: StatusesStoreReader,
31> {
32    mergeset_size_limit: usize,
33    reachability_service: MTReachabilityService<T>,
34    traversal_manager: DagTraversalManager<U, T, S>,
35    ghostdag_store: Arc<U>,
36    selected_chain_store: Arc<RwLock<V>>,
37    header_selected_tip_store: Arc<RwLock<W>>,
38    pruning_point_store: Arc<RwLock<X>>,
39    statuses_store: Arc<RwLock<Y>>,
40}
41
42impl<
43        S: RelationsStoreReader,
44        T: ReachabilityStoreReader,
45        U: GhostdagStoreReader,
46        V: SelectedChainStoreReader,
47        W: HeadersSelectedTipStoreReader,
48        X: PruningStoreReader,
49        Y: StatusesStoreReader,
50    > SyncManager<S, T, U, V, W, X, Y>
51{
52    pub fn new(
53        mergeset_size_limit: usize,
54        reachability_service: MTReachabilityService<T>,
55        traversal_manager: DagTraversalManager<U, T, S>,
56        ghostdag_store: Arc<U>,
57        selected_chain_store: Arc<RwLock<V>>,
58        header_selected_tip_store: Arc<RwLock<W>>,
59        pruning_point_store: Arc<RwLock<X>>,
60        statuses_store: Arc<RwLock<Y>>,
61    ) -> Self {
62        Self {
63            mergeset_size_limit,
64            reachability_service,
65            traversal_manager,
66            ghostdag_store,
67            selected_chain_store,
68            header_selected_tip_store,
69            pruning_point_store,
70            statuses_store,
71        }
72    }
73
74    /// Returns the hashes of the blocks between low's antipast and high's antipast, or up to `max_blocks`, if provided.
75    /// The result excludes low and includes high. If low == high, returns nothing. If max_blocks is some then it MUST be >= MergeSetSizeLimit
76    /// because it returns blocks with MergeSet granularity, so if MergeSet > max_blocks, the function will return nothing which is undesired behavior.
77    pub fn antipast_hashes_between(&self, low: Hash, high: Hash, max_blocks: Option<usize>) -> (Vec<Hash>, Hash) {
78        let max_blocks = max_blocks.unwrap_or(usize::MAX);
79        assert!(max_blocks >= self.mergeset_size_limit);
80
81        // If low is not in the chain of high - forward_chain_iterator will fail.
82        // Therefore, we traverse down low's chain until we reach a block that is in
83        // high's chain.
84        // We keep original_low to filter out blocks in its past later down the road
85        let original_low = low;
86        let low = self.find_highest_common_chain_block(low, high);
87
88        let low_bs = self.ghostdag_store.get_blue_score(low).unwrap();
89        let high_bs = self.ghostdag_store.get_blue_score(high).unwrap();
90        assert!(low_bs <= high_bs);
91
92        let mut highest_reached = low; // The highest chain block we reached before completing/reaching a limit
93        let mut blocks = Vec::with_capacity(min(max_blocks, (high_bs - low_bs) as usize));
94        for current in self.reachability_service.forward_chain_iterator(low, high, true).skip(1) {
95            let gd = self.ghostdag_store.get_data(current).unwrap();
96            if blocks.len() + gd.mergeset_size() > max_blocks {
97                break;
98            }
99            blocks.extend(
100                gd.consensus_ordered_mergeset(self.ghostdag_store.deref())
101                    .filter(|hash| !self.reachability_service.is_dag_ancestor_of(*hash, original_low)),
102            );
103            highest_reached = current;
104        }
105
106        // The process above doesn't return `highest_reached`, so include it explicitly unless it is `low`
107        if low != highest_reached {
108            blocks.push(highest_reached);
109        }
110
111        (blocks, highest_reached)
112    }
113
114    pub fn find_highest_common_chain_block(&self, low: Hash, high: Hash) -> Hash {
115        self.reachability_service
116            .default_backward_chain_iterator(low)
117            .find(|candidate| self.reachability_service.is_chain_ancestor_of(*candidate, high))
118            .expect("because of the pruning rules such block has to exist")
119    }
120
121    /// Returns a logarithmic amount of blocks sampled from the virtual selected chain between `low` and `high`.
122    /// Expects both blocks to be on the virtual selected chain, otherwise an error is returned
123    pub fn create_virtual_selected_chain_block_locator(&self, low: Option<Hash>, high: Option<Hash>) -> SyncManagerResult<Vec<Hash>> {
124        let low = low.unwrap_or_else(|| self.pruning_point_store.read().get().unwrap().pruning_point);
125        let sc_read = self.selected_chain_store.read();
126        let high = high.unwrap_or_else(|| sc_read.get_tip().unwrap().1);
127        if low == high {
128            return Ok(vec![low]);
129        }
130
131        let low_index = match sc_read.get_by_hash(low).unwrap_option() {
132            Some(index) => index,
133            None => return Err(SyncManagerError::BlockNotInSelectedParentChain(low)),
134        };
135
136        let high_index = match sc_read.get_by_hash(high).unwrap_option() {
137            Some(index) => index,
138            None => return Err(SyncManagerError::BlockNotInSelectedParentChain(high)),
139        };
140
141        if low_index > high_index {
142            return Err(SyncManagerError::LowHashHigherThanHighHash(low, high));
143        }
144
145        let mut locator = Vec::with_capacity((high_index - low_index).ceiling_log_base_2() as usize);
146        let mut step = 1;
147        let mut current_index = high_index;
148        while current_index > low_index {
149            locator.push(sc_read.get_by_index(current_index).unwrap());
150            if current_index < step {
151                break;
152            }
153
154            current_index -= step;
155            step *= 2;
156        }
157
158        locator.push(low);
159        Ok(locator)
160    }
161
162    pub fn get_missing_block_body_hashes(&self, high: Hash) -> SyncManagerResult<Vec<Hash>> {
163        let pp = self.pruning_point_store.read().pruning_point().unwrap();
164        if !self.reachability_service.is_chain_ancestor_of(pp, high) {
165            return Err(SyncManagerError::PruningPointNotInChain(pp, high));
166        }
167
168        let mut highest_with_body = None;
169        let mut forward_iterator = self.reachability_service.forward_chain_iterator(pp, high, true).tuple_windows();
170        let mut backward_iterator = self.reachability_service.backward_chain_iterator(high, pp, true);
171        loop {
172            // We loop from both directions in parallel in order to use the shorter path
173            let Some((parent, current)) = forward_iterator.next() else {
174                break;
175            };
176            let status = self.statuses_store.read().get(current).unwrap();
177            if status.is_header_only() {
178                // Going up, the first parent which has a header-only child is our target
179                highest_with_body = Some(parent);
180                break;
181            }
182
183            let Some(backward_current) = backward_iterator.next() else {
184                break;
185            };
186            let status = self.statuses_store.read().get(backward_current).unwrap();
187            if status.has_block_body() {
188                // Since this iterator is going down, current must be the highest with body
189                highest_with_body = Some(backward_current);
190                break;
191            }
192        }
193
194        if highest_with_body.is_none_or_ex(|&h| h == high) {
195            return Ok(vec![]);
196        };
197
198        let (mut hashes_between, _) = self.antipast_hashes_between(highest_with_body.unwrap(), high, None);
199        let statuses = self.statuses_store.read();
200        hashes_between.retain(|&h| statuses.get(h).unwrap().is_header_only());
201
202        Ok(hashes_between)
203    }
204
205    pub fn create_block_locator_from_pruning_point(
206        &self,
207        high: Hash,
208        low: Hash,
209        limit: Option<usize>,
210    ) -> SyncManagerResult<Vec<Hash>> {
211        if !self.reachability_service.is_chain_ancestor_of(low, high) {
212            return Err(SyncManagerError::LocatorLowHashNotInHighHashChain(low, high));
213        }
214
215        let low_bs = self.ghostdag_store.get_blue_score(low).unwrap();
216        let mut current = high;
217        let mut step = 1;
218        let mut locator = Vec::new();
219        loop {
220            locator.push(current);
221            if limit == Some(locator.len()) {
222                break;
223            }
224
225            let current_gd = self.ghostdag_store.get_compact_data(current).unwrap();
226
227            // Nothing more to add once the low node has been added.
228            if current_gd.blue_score <= low_bs {
229                break;
230            }
231
232            // Calculate blue score of previous block to include ensuring the
233            // final block is `low`.
234            let next_bs = if current_gd.blue_score < step || current_gd.blue_score - step < low_bs {
235                low_bs
236            } else {
237                current_gd.blue_score - step
238            };
239
240            // Walk down current's selected parent chain to the appropriate ancestor
241            current = self.traversal_manager.lowest_chain_block_above_or_equal_to_blue_score(current, next_bs);
242
243            // Double the distance between included hashes
244            step *= 2;
245        }
246
247        Ok(locator)
248    }
249}