kaspa_consensus/processes/sync/
mod.rs1use std::{cmp::min, ops::Deref, sync::Arc};
2
3use itertools::Itertools;
4use kaspa_consensus_core::errors::sync::{SyncManagerError, SyncManagerResult};
5use kaspa_database::prelude::StoreResultExtensions;
6use kaspa_hashes::Hash;
7use kaspa_math::uint::malachite_base::num::arithmetic::traits::CeilingLogBase2;
8use kaspa_utils::option::OptionExtensions;
9use parking_lot::RwLock;
10
11use crate::model::{
12 services::reachability::{MTReachabilityService, ReachabilityService},
13 stores::{
14 ghostdag::GhostdagStoreReader, headers_selected_tip::HeadersSelectedTipStoreReader, pruning::PruningStoreReader,
15 reachability::ReachabilityStoreReader, relations::RelationsStoreReader, selected_chain::SelectedChainStoreReader,
16 statuses::StatusesStoreReader,
17 },
18};
19
20use super::traversal_manager::DagTraversalManager;
21
22#[derive(Clone)]
23pub struct SyncManager<
24 S: RelationsStoreReader,
25 T: ReachabilityStoreReader,
26 U: GhostdagStoreReader,
27 V: SelectedChainStoreReader,
28 W: HeadersSelectedTipStoreReader,
29 X: PruningStoreReader,
30 Y: StatusesStoreReader,
31> {
32 mergeset_size_limit: usize,
33 reachability_service: MTReachabilityService<T>,
34 traversal_manager: DagTraversalManager<U, T, S>,
35 ghostdag_store: Arc<U>,
36 selected_chain_store: Arc<RwLock<V>>,
37 header_selected_tip_store: Arc<RwLock<W>>,
38 pruning_point_store: Arc<RwLock<X>>,
39 statuses_store: Arc<RwLock<Y>>,
40}
41
42impl<
43 S: RelationsStoreReader,
44 T: ReachabilityStoreReader,
45 U: GhostdagStoreReader,
46 V: SelectedChainStoreReader,
47 W: HeadersSelectedTipStoreReader,
48 X: PruningStoreReader,
49 Y: StatusesStoreReader,
50 > SyncManager<S, T, U, V, W, X, Y>
51{
52 pub fn new(
53 mergeset_size_limit: usize,
54 reachability_service: MTReachabilityService<T>,
55 traversal_manager: DagTraversalManager<U, T, S>,
56 ghostdag_store: Arc<U>,
57 selected_chain_store: Arc<RwLock<V>>,
58 header_selected_tip_store: Arc<RwLock<W>>,
59 pruning_point_store: Arc<RwLock<X>>,
60 statuses_store: Arc<RwLock<Y>>,
61 ) -> Self {
62 Self {
63 mergeset_size_limit,
64 reachability_service,
65 traversal_manager,
66 ghostdag_store,
67 selected_chain_store,
68 header_selected_tip_store,
69 pruning_point_store,
70 statuses_store,
71 }
72 }
73
74 pub fn antipast_hashes_between(&self, low: Hash, high: Hash, max_blocks: Option<usize>) -> (Vec<Hash>, Hash) {
78 let max_blocks = max_blocks.unwrap_or(usize::MAX);
79 assert!(max_blocks >= self.mergeset_size_limit);
80
81 let original_low = low;
86 let low = self.find_highest_common_chain_block(low, high);
87
88 let low_bs = self.ghostdag_store.get_blue_score(low).unwrap();
89 let high_bs = self.ghostdag_store.get_blue_score(high).unwrap();
90 assert!(low_bs <= high_bs);
91
92 let mut highest_reached = low; let mut blocks = Vec::with_capacity(min(max_blocks, (high_bs - low_bs) as usize));
94 for current in self.reachability_service.forward_chain_iterator(low, high, true).skip(1) {
95 let gd = self.ghostdag_store.get_data(current).unwrap();
96 if blocks.len() + gd.mergeset_size() > max_blocks {
97 break;
98 }
99 blocks.extend(
100 gd.consensus_ordered_mergeset(self.ghostdag_store.deref())
101 .filter(|hash| !self.reachability_service.is_dag_ancestor_of(*hash, original_low)),
102 );
103 highest_reached = current;
104 }
105
106 if low != highest_reached {
108 blocks.push(highest_reached);
109 }
110
111 (blocks, highest_reached)
112 }
113
114 pub fn find_highest_common_chain_block(&self, low: Hash, high: Hash) -> Hash {
115 self.reachability_service
116 .default_backward_chain_iterator(low)
117 .find(|candidate| self.reachability_service.is_chain_ancestor_of(*candidate, high))
118 .expect("because of the pruning rules such block has to exist")
119 }
120
121 pub fn create_virtual_selected_chain_block_locator(&self, low: Option<Hash>, high: Option<Hash>) -> SyncManagerResult<Vec<Hash>> {
124 let low = low.unwrap_or_else(|| self.pruning_point_store.read().get().unwrap().pruning_point);
125 let sc_read = self.selected_chain_store.read();
126 let high = high.unwrap_or_else(|| sc_read.get_tip().unwrap().1);
127 if low == high {
128 return Ok(vec![low]);
129 }
130
131 let low_index = match sc_read.get_by_hash(low).unwrap_option() {
132 Some(index) => index,
133 None => return Err(SyncManagerError::BlockNotInSelectedParentChain(low)),
134 };
135
136 let high_index = match sc_read.get_by_hash(high).unwrap_option() {
137 Some(index) => index,
138 None => return Err(SyncManagerError::BlockNotInSelectedParentChain(high)),
139 };
140
141 if low_index > high_index {
142 return Err(SyncManagerError::LowHashHigherThanHighHash(low, high));
143 }
144
145 let mut locator = Vec::with_capacity((high_index - low_index).ceiling_log_base_2() as usize);
146 let mut step = 1;
147 let mut current_index = high_index;
148 while current_index > low_index {
149 locator.push(sc_read.get_by_index(current_index).unwrap());
150 if current_index < step {
151 break;
152 }
153
154 current_index -= step;
155 step *= 2;
156 }
157
158 locator.push(low);
159 Ok(locator)
160 }
161
162 pub fn get_missing_block_body_hashes(&self, high: Hash) -> SyncManagerResult<Vec<Hash>> {
163 let pp = self.pruning_point_store.read().pruning_point().unwrap();
164 if !self.reachability_service.is_chain_ancestor_of(pp, high) {
165 return Err(SyncManagerError::PruningPointNotInChain(pp, high));
166 }
167
168 let mut highest_with_body = None;
169 let mut forward_iterator = self.reachability_service.forward_chain_iterator(pp, high, true).tuple_windows();
170 let mut backward_iterator = self.reachability_service.backward_chain_iterator(high, pp, true);
171 loop {
172 let Some((parent, current)) = forward_iterator.next() else {
174 break;
175 };
176 let status = self.statuses_store.read().get(current).unwrap();
177 if status.is_header_only() {
178 highest_with_body = Some(parent);
180 break;
181 }
182
183 let Some(backward_current) = backward_iterator.next() else {
184 break;
185 };
186 let status = self.statuses_store.read().get(backward_current).unwrap();
187 if status.has_block_body() {
188 highest_with_body = Some(backward_current);
190 break;
191 }
192 }
193
194 if highest_with_body.is_none_or_ex(|&h| h == high) {
195 return Ok(vec![]);
196 };
197
198 let (mut hashes_between, _) = self.antipast_hashes_between(highest_with_body.unwrap(), high, None);
199 let statuses = self.statuses_store.read();
200 hashes_between.retain(|&h| statuses.get(h).unwrap().is_header_only());
201
202 Ok(hashes_between)
203 }
204
205 pub fn create_block_locator_from_pruning_point(
206 &self,
207 high: Hash,
208 low: Hash,
209 limit: Option<usize>,
210 ) -> SyncManagerResult<Vec<Hash>> {
211 if !self.reachability_service.is_chain_ancestor_of(low, high) {
212 return Err(SyncManagerError::LocatorLowHashNotInHighHashChain(low, high));
213 }
214
215 let low_bs = self.ghostdag_store.get_blue_score(low).unwrap();
216 let mut current = high;
217 let mut step = 1;
218 let mut locator = Vec::new();
219 loop {
220 locator.push(current);
221 if limit == Some(locator.len()) {
222 break;
223 }
224
225 let current_gd = self.ghostdag_store.get_compact_data(current).unwrap();
226
227 if current_gd.blue_score <= low_bs {
229 break;
230 }
231
232 let next_bs = if current_gd.blue_score < step || current_gd.blue_score - step < low_bs {
235 low_bs
236 } else {
237 current_gd.blue_score - step
238 };
239
240 current = self.traversal_manager.lowest_chain_block_above_or_equal_to_blue_score(current, next_bs);
242
243 step *= 2;
245 }
246
247 Ok(locator)
248 }
249}