use std::{cmp::min, ops::Deref, sync::Arc};
use itertools::Itertools;
use kaspa_consensus_core::errors::sync::{SyncManagerError, SyncManagerResult};
use kaspa_database::prelude::StoreResultExtensions;
use kaspa_hashes::Hash;
use kaspa_math::uint::malachite_base::num::arithmetic::traits::CeilingLogBase2;
use kaspa_utils::option::OptionExtensions;
use parking_lot::RwLock;
use crate::model::{
services::reachability::{MTReachabilityService, ReachabilityService},
stores::{
block_window_cache::BlockWindowCacheReader, ghostdag::GhostdagStoreReader,
headers_selected_tip::HeadersSelectedTipStoreReader, pruning::PruningStoreReader, reachability::ReachabilityStoreReader,
relations::RelationsStoreReader, selected_chain::SelectedChainStoreReader, statuses::StatusesStoreReader,
},
};
use super::traversal_manager::DagTraversalManager;
#[derive(Clone)]
pub struct SyncManager<
S: RelationsStoreReader,
T: ReachabilityStoreReader,
U: GhostdagStoreReader,
V: SelectedChainStoreReader,
W: HeadersSelectedTipStoreReader,
X: PruningStoreReader,
Y: StatusesStoreReader,
Z: BlockWindowCacheReader,
> {
mergeset_size_limit: usize,
reachability_service: MTReachabilityService<T>,
traversal_manager: DagTraversalManager<U, Z, T, S>,
ghostdag_store: Arc<U>,
selected_chain_store: Arc<RwLock<V>>,
header_selected_tip_store: Arc<RwLock<W>>,
pruning_store: Arc<RwLock<X>>,
statuses_store: Arc<RwLock<Y>>,
}
impl<
S: RelationsStoreReader,
T: ReachabilityStoreReader,
U: GhostdagStoreReader,
V: SelectedChainStoreReader,
W: HeadersSelectedTipStoreReader,
X: PruningStoreReader,
Y: StatusesStoreReader,
Z: BlockWindowCacheReader,
> SyncManager<S, T, U, V, W, X, Y, Z>
{
pub fn new(
mergeset_size_limit: usize,
reachability_service: MTReachabilityService<T>,
traversal_manager: DagTraversalManager<U, Z, T, S>,
ghostdag_store: Arc<U>,
selected_chain_store: Arc<RwLock<V>>,
header_selected_tip_store: Arc<RwLock<W>>,
pruning_store: Arc<RwLock<X>>,
statuses_store: Arc<RwLock<Y>>,
) -> Self {
Self {
mergeset_size_limit,
reachability_service,
traversal_manager,
ghostdag_store,
selected_chain_store,
header_selected_tip_store,
pruning_store,
statuses_store,
}
}
pub fn antipast_hashes_between(&self, low: Hash, high: Hash, max_blocks: Option<usize>) -> (Vec<Hash>, Hash) {
let max_blocks = max_blocks.unwrap_or(usize::MAX);
assert!(max_blocks >= self.mergeset_size_limit);
let original_low = low;
let low = self.find_highest_common_chain_block(low, high);
let low_bs = self.ghostdag_store.get_blue_score(low).unwrap();
let high_bs = self.ghostdag_store.get_blue_score(high).unwrap();
assert!(low_bs <= high_bs);
let mut highest_reached = low; let mut blocks = Vec::with_capacity(min(max_blocks, (high_bs - low_bs) as usize));
for current in self.reachability_service.forward_chain_iterator(low, high, true).skip(1) {
let gd = self.ghostdag_store.get_data(current).unwrap();
if blocks.len() + gd.mergeset_size() > max_blocks {
break;
}
blocks.extend(
gd.consensus_ordered_mergeset(self.ghostdag_store.deref())
.filter(|hash| !self.reachability_service.is_dag_ancestor_of(*hash, original_low)),
);
highest_reached = current;
}
if low != highest_reached {
blocks.push(highest_reached);
}
(blocks, highest_reached)
}
fn find_highest_common_chain_block(&self, low: Hash, high: Hash) -> Hash {
self.reachability_service
.default_backward_chain_iterator(low)
.find(|candidate| self.reachability_service.is_chain_ancestor_of(*candidate, high))
.expect("because of the pruning rules such block has to exist")
}
pub fn create_headers_selected_chain_block_locator(&self, low: Option<Hash>, high: Option<Hash>) -> SyncManagerResult<Vec<Hash>> {
let sc_read_guard = self.selected_chain_store.read();
let hst_read_guard = self.header_selected_tip_store.read();
let pp_read_guard = self.pruning_store.read();
let low = low.unwrap_or_else(|| pp_read_guard.get().unwrap().pruning_point);
let high = high.unwrap_or_else(|| hst_read_guard.get().unwrap().hash);
if low == high {
return Ok(vec![low]);
}
let low_index = match sc_read_guard.get_by_hash(low).unwrap_option() {
Some(index) => index,
None => return Err(SyncManagerError::BlockNotInSelectedParentChain(low)),
};
let high_index = match sc_read_guard.get_by_hash(high).unwrap_option() {
Some(index) => index,
None => return Err(SyncManagerError::BlockNotInSelectedParentChain(high)),
};
if low_index > high_index {
return Err(SyncManagerError::LowHashHigherThanHighHash(low, high));
}
let mut locator = Vec::with_capacity((high_index - low_index).ceiling_log_base_2() as usize);
let mut step = 1;
let mut current_index = high_index;
while current_index > low_index {
locator.push(sc_read_guard.get_by_index(current_index).unwrap());
if current_index < step {
break;
}
current_index -= step;
step *= 2;
}
locator.push(low);
Ok(locator)
}
pub fn get_missing_block_body_hashes(&self, high: Hash) -> SyncManagerResult<Vec<Hash>> {
let pp = self.pruning_store.read().pruning_point().unwrap();
if !self.reachability_service.is_chain_ancestor_of(pp, high) {
return Err(SyncManagerError::PruningPointNotInChain(pp, high));
}
let mut highest_with_body = None;
let mut forward_iterator = self.reachability_service.forward_chain_iterator(pp, high, true).tuple_windows();
let mut backward_iterator = self.reachability_service.backward_chain_iterator(high, pp, true);
loop {
let Some((parent, current)) = forward_iterator.next() else { break; };
let status = self.statuses_store.read().get(current).unwrap();
if status.is_header_only() {
highest_with_body = Some(parent);
break;
}
let Some(backward_current) = backward_iterator.next() else { break; };
let status = self.statuses_store.read().get(backward_current).unwrap();
if status.has_block_body() {
highest_with_body = Some(backward_current);
break;
}
}
if highest_with_body.is_none_or(|&h| h == high) {
return Ok(vec![]);
};
let (mut hashes_between, _) = self.antipast_hashes_between(highest_with_body.unwrap(), high, None);
let statuses = self.statuses_store.read();
hashes_between.retain(|&h| statuses.get(h).unwrap().is_header_only());
Ok(hashes_between)
}
pub fn create_block_locator_from_pruning_point(
&self,
high: Hash,
low: Hash,
limit: Option<usize>,
) -> SyncManagerResult<Vec<Hash>> {
if !self.reachability_service.is_chain_ancestor_of(low, high) {
return Err(SyncManagerError::LocatorLowHashNotInHighHashChain(low, high));
}
let low_bs = self.ghostdag_store.get_blue_score(low).unwrap();
let mut current = high;
let mut step = 1;
let mut locator = Vec::new();
loop {
locator.push(current);
if let Some(limit) = limit {
if locator.len() == limit {
break;
}
}
let current_gd = self.ghostdag_store.get_compact_data(current).unwrap();
if current_gd.blue_score <= low_bs {
break;
}
let next_bs = if current_gd.blue_score < step || current_gd.blue_score - step < low_bs {
low_bs
} else {
current_gd.blue_score - step
};
current = self.traversal_manager.lowest_chain_block_above_or_equal_to_blue_score(current, next_bs);
step *= 2;
}
Ok(locator)
}
}