use crate::block_status::BlockStatus;
use crate::orphan_block_pool::OrphanBlockPool;
use crate::utils::is_internal_db_error;
use crate::{Status, StatusCode, FAST_INDEX, LOW_INDEX, NORMAL_INDEX, TIME_TRACE_SIZE};
use ckb_app_config::SyncConfig;
use ckb_chain::chain::ChainController;
use ckb_chain_spec::consensus::Consensus;
use ckb_channel::Receiver;
use ckb_constant::sync::{
BLOCK_DOWNLOAD_TIMEOUT, HEADERS_DOWNLOAD_HEADERS_PER_SECOND, HEADERS_DOWNLOAD_INSPECT_WINDOW,
HEADERS_DOWNLOAD_TOLERABLE_BIAS_FOR_SINGLE_SAMPLE, INIT_BLOCKS_IN_TRANSIT_PER_PEER,
MAX_BLOCKS_IN_TRANSIT_PER_PEER, MAX_HEADERS_LEN, MAX_OUTBOUND_PEERS_TO_PROTECT_FROM_DISCONNECT,
MAX_UNKNOWN_TX_HASHES_SIZE, MAX_UNKNOWN_TX_HASHES_SIZE_PER_PEER, POW_INTERVAL,
RETRY_ASK_TX_TIMEOUT_INCREASE, SUSPEND_SYNC_TIME,
};
use ckb_error::Error as CKBError;
use ckb_logger::{debug, error, trace};
use ckb_metrics::metrics;
use ckb_network::{CKBProtocolContext, PeerIndex, SupportProtocols};
use ckb_shared::{shared::Shared, Snapshot};
use ckb_store::{ChainDB, ChainStore};
use ckb_traits::HeaderProvider;
use ckb_tx_pool::service::TxVerificationResult;
use ckb_types::{
core::{self, BlockNumber, EpochExt},
packed::{self, Byte32},
prelude::*,
H256, U256,
};
use ckb_util::{shrink_to_fit, Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
use ckb_verification_traits::Switch;
use dashmap::{self, DashMap};
use faketime::unix_time_as_millis;
use keyed_priority_queue::{self, KeyedPriorityQueue};
use lru::LruCache;
use std::collections::{btree_map::Entry, BTreeMap, HashMap, HashSet};
use std::hash::Hash;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::{cmp, fmt, iter};
mod header_map;
use crate::utils::send_message;
use ckb_types::core::EpochNumber;
pub use header_map::HeaderMap;
const FILTER_SIZE: usize = 20000;
const GET_HEADERS_CACHE_SIZE: usize = 10000;
const GET_HEADERS_TIMEOUT: Duration = Duration::from_secs(15);
const TX_FILTER_SIZE: usize = 50000;
const ORPHAN_BLOCK_SIZE: usize = 1024;
const ONE_DAY_BLOCK_NUMBER: u64 = 8192;
const SHRINK_THRESHOLD: usize = 300;
#[derive(Clone, Debug, Default)]
pub struct ChainSyncState {
pub timeout: u64,
pub work_header: Option<core::HeaderView>,
pub total_difficulty: Option<U256>,
pub sent_getheaders: bool,
headers_sync_state: HeadersSyncState,
}
impl ChainSyncState {
fn can_start_sync(&self, now: u64) -> bool {
match self.headers_sync_state {
HeadersSyncState::Initialized => false,
HeadersSyncState::SyncProtocolConnected => true,
HeadersSyncState::Started => false,
HeadersSyncState::Suspend(until) | HeadersSyncState::TipSynced(until) => until < now,
}
}
fn connected(&mut self) {
self.headers_sync_state = HeadersSyncState::SyncProtocolConnected;
}
fn start(&mut self) {
self.headers_sync_state = HeadersSyncState::Started
}
fn suspend(&mut self, until: u64) {
self.headers_sync_state = HeadersSyncState::Suspend(until)
}
fn tip_synced(&mut self) {
let now = unix_time_as_millis();
self.headers_sync_state = HeadersSyncState::TipSynced(now + 28000);
}
fn started(&self) -> bool {
matches!(self.headers_sync_state, HeadersSyncState::Started)
}
fn started_or_tip_synced(&self) -> bool {
matches!(
self.headers_sync_state,
HeadersSyncState::Started | HeadersSyncState::TipSynced(_)
)
}
}
#[derive(Clone, Debug)]
enum HeadersSyncState {
Initialized,
SyncProtocolConnected,
Started,
Suspend(u64), TipSynced(u64), }
impl Default for HeadersSyncState {
fn default() -> Self {
HeadersSyncState::Initialized
}
}
#[derive(Clone, Default, Debug, Copy)]
pub struct PeerFlags {
pub is_outbound: bool,
pub is_protect: bool,
pub is_whitelist: bool,
}
#[derive(Clone, Default, Debug, Copy)]
pub struct HeadersSyncController {
pub(crate) started_ts: u64,
pub(crate) started_tip_ts: u64,
pub(crate) last_updated_ts: u64,
pub(crate) last_updated_tip_ts: u64,
pub(crate) is_close_to_the_end: bool,
}
impl HeadersSyncController {
#[cfg(test)]
pub(crate) fn new(
started_ts: u64,
started_tip_ts: u64,
last_updated_ts: u64,
last_updated_tip_ts: u64,
is_close_to_the_end: bool,
) -> Self {
Self {
started_ts,
started_tip_ts,
last_updated_ts,
last_updated_tip_ts,
is_close_to_the_end,
}
}
pub(crate) fn from_header(better_tip_header: &core::HeaderView) -> Self {
let started_ts = unix_time_as_millis();
let started_tip_ts = better_tip_header.timestamp();
Self {
started_ts,
started_tip_ts,
last_updated_ts: started_ts,
last_updated_tip_ts: started_tip_ts,
is_close_to_the_end: false,
}
}
#[allow(clippy::wrong_self_convention)]
pub(crate) fn is_timeout(&mut self, now_tip_ts: u64, now: u64) -> Option<bool> {
let inspect_window = HEADERS_DOWNLOAD_INSPECT_WINDOW;
let expected_headers_per_sec = HEADERS_DOWNLOAD_HEADERS_PER_SECOND;
let tolerable_bias = HEADERS_DOWNLOAD_TOLERABLE_BIAS_FOR_SINGLE_SAMPLE;
let expected_before_finished = now.saturating_sub(now_tip_ts);
trace!("headers-sync: better tip ts {}; now {}", now_tip_ts, now);
if self.is_close_to_the_end {
let expected_in_base_time =
expected_headers_per_sec * inspect_window * POW_INTERVAL / 1000;
if expected_before_finished > expected_in_base_time {
self.started_ts = now;
self.started_tip_ts = now_tip_ts;
self.last_updated_ts = now;
self.last_updated_tip_ts = now_tip_ts;
self.is_close_to_the_end = false;
trace!("headers-sync: send GetHeaders again since we behind the tip too much");
None
} else {
Some(false)
}
} else if expected_before_finished < inspect_window {
self.is_close_to_the_end = true;
trace!("headers-sync: ignore timeout because the tip almost reach the real time");
Some(false)
} else {
let spent_since_last_updated = now.saturating_sub(self.last_updated_ts);
if spent_since_last_updated < inspect_window {
Some(false)
} else {
let synced_since_last_updated = now_tip_ts.saturating_sub(self.last_updated_tip_ts);
let expected_since_last_updated =
expected_headers_per_sec * spent_since_last_updated * POW_INTERVAL / 1000;
if synced_since_last_updated < expected_since_last_updated / tolerable_bias {
trace!("headers-sync: the instantaneous speed is too slow");
Some(true)
} else {
self.last_updated_ts = now;
self.last_updated_tip_ts = now_tip_ts;
if synced_since_last_updated > expected_since_last_updated {
trace!("headers-sync: the instantaneous speed is acceptable");
Some(false)
} else {
let spent_since_started = now.saturating_sub(self.started_ts);
let synced_since_started = now_tip_ts.saturating_sub(self.started_tip_ts);
let expected_since_started =
expected_headers_per_sec * spent_since_started * POW_INTERVAL / 1000;
if synced_since_started < expected_since_started {
trace!(
"headers-sync: both the global average speed and the instantaneous speed \
is slow than expected"
);
Some(true)
} else {
trace!("headers-sync: the global average speed is acceptable");
Some(false)
}
}
}
}
}
}
}
#[derive(Clone, Default, Debug)]
pub struct PeerState {
pub headers_sync_controller: Option<HeadersSyncController>,
pub peer_flags: PeerFlags,
pub chain_sync: ChainSyncState,
pub best_known_header: Option<HeaderView>,
pub last_common_header: Option<core::HeaderView>,
pub unknown_header_list: Vec<Byte32>,
}
impl PeerState {
pub fn new(peer_flags: PeerFlags) -> PeerState {
PeerState {
headers_sync_controller: None,
peer_flags,
chain_sync: ChainSyncState::default(),
best_known_header: None,
last_common_header: None,
unknown_header_list: Vec::new(),
}
}
pub fn can_start_sync(&self, now: u64, ibd: bool) -> bool {
((self.peer_flags.is_protect || self.peer_flags.is_whitelist) || !ibd)
&& self.chain_sync.can_start_sync(now)
}
pub fn start_sync(&mut self, headers_sync_controller: HeadersSyncController) {
self.chain_sync.start();
self.headers_sync_controller = Some(headers_sync_controller);
}
fn suspend_sync(&mut self, suspend_time: u64) {
let now = unix_time_as_millis();
self.chain_sync.suspend(now + suspend_time);
self.headers_sync_controller = None;
}
fn tip_synced(&mut self) {
self.chain_sync.tip_synced();
self.headers_sync_controller = None;
}
pub(crate) fn sync_started(&self) -> bool {
self.chain_sync.started()
}
pub(crate) fn started_or_tip_synced(&self) -> bool {
self.chain_sync.started_or_tip_synced()
}
pub(crate) fn sync_connected(&mut self) {
self.chain_sync.connected()
}
}
pub struct Filter<T: Eq + Hash> {
inner: LruCache<T, ()>,
}
impl<T: Eq + Hash> Default for Filter<T> {
fn default() -> Self {
Filter::new(FILTER_SIZE)
}
}
impl<T: Eq + Hash> Filter<T> {
pub fn new(size: usize) -> Self {
Self {
inner: LruCache::new(size),
}
}
pub fn contains(&self, item: &T) -> bool {
self.inner.contains(item)
}
pub fn insert(&mut self, item: T) -> bool {
self.inner.put(item, ()).is_none()
}
pub fn remove(&mut self, item: &T) -> bool {
self.inner.pop(item).is_some()
}
}
#[derive(Default)]
pub struct Peers {
pub state: DashMap<PeerIndex, PeerState>,
pub n_sync_started: AtomicUsize,
pub n_protected_outbound_peers: AtomicUsize,
}
#[derive(Debug, Clone)]
pub struct InflightState {
pub(crate) peer: PeerIndex,
pub(crate) timestamp: u64,
}
impl InflightState {
fn new(peer: PeerIndex) -> Self {
Self {
peer,
timestamp: unix_time_as_millis(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct BlockNumberAndHash {
pub number: BlockNumber,
pub hash: Byte32,
}
impl From<(BlockNumber, Byte32)> for BlockNumberAndHash {
fn from(inner: (BlockNumber, Byte32)) -> Self {
Self {
number: inner.0,
hash: inner.1,
}
}
}
enum TimeQuantile {
MinToFast,
FastToNormal,
NormalToUpper,
UpperToMax,
}
#[derive(Clone)]
struct TimeAnalyzer {
trace: [u64; TIME_TRACE_SIZE],
index: usize,
fast_time: u64,
normal_time: u64,
low_time: u64,
}
impl fmt::Debug for TimeAnalyzer {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("TimeAnalyzer")
.field("fast_time", &self.fast_time)
.field("normal_time", &self.normal_time)
.field("low_time", &self.low_time)
.finish()
}
}
impl Default for TimeAnalyzer {
fn default() -> Self {
Self {
trace: [0; TIME_TRACE_SIZE],
index: 0,
fast_time: 1000,
normal_time: 1250,
low_time: 1500,
}
}
}
impl TimeAnalyzer {
fn push_time(&mut self, time: u64) -> TimeQuantile {
if self.index < TIME_TRACE_SIZE {
self.trace[self.index] = time;
self.index += 1;
} else {
self.trace.sort_unstable();
self.fast_time = (self.fast_time.saturating_add(self.trace[FAST_INDEX])) >> 1;
self.normal_time = (self.normal_time.saturating_add(self.trace[NORMAL_INDEX])) >> 1;
self.low_time = (self.low_time.saturating_add(self.trace[LOW_INDEX])) >> 1;
self.trace[0] = time;
self.index = 1;
}
if time <= self.fast_time {
TimeQuantile::MinToFast
} else if time <= self.normal_time {
TimeQuantile::FastToNormal
} else if time > self.low_time {
TimeQuantile::UpperToMax
} else {
TimeQuantile::NormalToUpper
}
}
}
#[derive(Debug, Clone)]
pub struct DownloadScheduler {
task_count: usize,
timeout_count: usize,
hashes: HashSet<BlockNumberAndHash>,
}
impl Default for DownloadScheduler {
fn default() -> Self {
Self {
hashes: HashSet::default(),
task_count: INIT_BLOCKS_IN_TRANSIT_PER_PEER,
timeout_count: 0,
}
}
}
impl DownloadScheduler {
fn inflight_count(&self) -> usize {
self.hashes.len()
}
fn can_fetch(&self) -> usize {
self.task_count.saturating_sub(self.hashes.len())
}
pub(crate) const fn task_count(&self) -> usize {
self.task_count
}
fn increase(&mut self, num: usize) {
if self.task_count < MAX_BLOCKS_IN_TRANSIT_PER_PEER {
self.task_count = ::std::cmp::min(
self.task_count.saturating_add(num),
MAX_BLOCKS_IN_TRANSIT_PER_PEER,
)
}
}
fn decrease(&mut self, num: usize) {
self.timeout_count = self.task_count.saturating_add(num);
if self.timeout_count > 2 {
self.task_count = self.task_count.saturating_sub(1);
self.timeout_count = 0;
}
}
fn punish(&mut self, exp: usize) {
self.task_count >>= exp
}
}
#[derive(Clone)]
pub struct InflightBlocks {
pub(crate) download_schedulers: HashMap<PeerIndex, DownloadScheduler>,
inflight_states: BTreeMap<BlockNumberAndHash, InflightState>,
pub(crate) trace_number: HashMap<BlockNumberAndHash, u64>,
pub(crate) restart_number: BlockNumber,
time_analyzer: TimeAnalyzer,
pub(crate) adjustment: bool,
pub(crate) protect_num: usize,
}
impl Default for InflightBlocks {
fn default() -> Self {
InflightBlocks {
download_schedulers: HashMap::default(),
inflight_states: BTreeMap::default(),
trace_number: HashMap::default(),
restart_number: 0,
time_analyzer: TimeAnalyzer::default(),
adjustment: true,
protect_num: MAX_OUTBOUND_PEERS_TO_PROTECT_FROM_DISCONNECT,
}
}
}
struct DebugHashSet<'a>(&'a HashSet<BlockNumberAndHash>);
impl<'a> fmt::Debug for DebugHashSet<'a> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_set()
.entries(self.0.iter().map(|h| format!("{}, {}", h.number, h.hash)))
.finish()
}
}
impl fmt::Debug for InflightBlocks {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_map()
.entries(
self.download_schedulers
.iter()
.map(|(k, v)| (k, DebugHashSet(&v.hashes))),
)
.finish()?;
fmt.debug_map()
.entries(
self.inflight_states
.iter()
.map(|(k, v)| (format!("{}, {}", k.number, k.hash), v)),
)
.finish()?;
self.time_analyzer.fmt(fmt)
}
}
impl InflightBlocks {
pub fn blocks_iter(&self) -> impl Iterator<Item = (&PeerIndex, &HashSet<BlockNumberAndHash>)> {
self.download_schedulers.iter().map(|(k, v)| (k, &v.hashes))
}
pub fn total_inflight_count(&self) -> usize {
self.inflight_states.len()
}
pub fn division_point(&self) -> (u64, u64, u64) {
(
self.time_analyzer.fast_time,
self.time_analyzer.normal_time,
self.time_analyzer.low_time,
)
}
pub fn peer_inflight_count(&self, peer: PeerIndex) -> usize {
self.download_schedulers
.get(&peer)
.map(DownloadScheduler::inflight_count)
.unwrap_or(0)
}
pub fn peer_can_fetch_count(&self, peer: PeerIndex) -> usize {
self.download_schedulers.get(&peer).map_or(
INIT_BLOCKS_IN_TRANSIT_PER_PEER,
DownloadScheduler::can_fetch,
)
}
pub fn inflight_block_by_peer(&self, peer: PeerIndex) -> Option<&HashSet<BlockNumberAndHash>> {
self.download_schedulers.get(&peer).map(|d| &d.hashes)
}
pub fn inflight_state_by_block(&self, block: &BlockNumberAndHash) -> Option<&InflightState> {
self.inflight_states.get(block)
}
pub fn mark_slow_block(&mut self, tip: BlockNumber) {
let now = faketime::unix_time_as_millis();
for key in self.inflight_states.keys() {
if key.number > tip + 1 {
break;
}
self.trace_number.entry(key.clone()).or_insert(now);
}
}
pub fn prune(&mut self, tip: BlockNumber) -> HashSet<PeerIndex> {
let now = unix_time_as_millis();
let mut disconnect_list = HashSet::new();
let should_punish = self.download_schedulers.len() > self.protect_num;
let adjustment = self.adjustment;
let trace = &mut self.trace_number;
let download_schedulers = &mut self.download_schedulers;
let states = &mut self.inflight_states;
let mut remove_key = Vec::new();
let end = tip + 20;
for (key, value) in states.iter() {
if key.number > end {
break;
}
if value.timestamp + BLOCK_DOWNLOAD_TIMEOUT < now {
if let Some(set) = download_schedulers.get_mut(&value.peer) {
set.hashes.remove(key);
if should_punish && adjustment {
set.punish(2);
}
};
if !trace.is_empty() {
trace.remove(key);
}
remove_key.push(key.clone());
}
}
for key in remove_key {
states.remove(&key);
}
download_schedulers.retain(|k, v| {
if v.task_count == 0 {
disconnect_list.insert(*k);
false
} else {
true
}
});
shrink_to_fit!(download_schedulers, SHRINK_THRESHOLD);
if self.restart_number != 0 && tip + 1 > self.restart_number {
self.restart_number = 0;
}
let timeout_limit = self.time_analyzer.low_time;
let restart_number = &mut self.restart_number;
trace.retain(|key, time| {
if now > timeout_limit + *time {
if let Some(state) = states.remove(key) {
if let Some(d) = download_schedulers.get_mut(&state.peer) {
if should_punish && adjustment {
d.punish(1);
}
d.hashes.remove(key);
};
}
if key.number > *restart_number {
*restart_number = key.number;
}
return false;
}
true
});
shrink_to_fit!(trace, SHRINK_THRESHOLD);
disconnect_list
}
pub fn insert(&mut self, peer: PeerIndex, block: BlockNumberAndHash) -> bool {
let state = self.inflight_states.entry(block.clone());
match state {
Entry::Occupied(_entry) => return false,
Entry::Vacant(entry) => entry.insert(InflightState::new(peer)),
};
if self.restart_number >= block.number {
self.trace_number
.insert(block.clone(), unix_time_as_millis());
}
let download_scheduler = self
.download_schedulers
.entry(peer)
.or_insert_with(DownloadScheduler::default);
download_scheduler.hashes.insert(block)
}
pub fn remove_by_peer(&mut self, peer: PeerIndex) -> bool {
let trace = &mut self.trace_number;
let state = &mut self.inflight_states;
self.download_schedulers
.remove(&peer)
.map(|blocks| {
for block in blocks.hashes {
state.remove(&block);
if !trace.is_empty() {
trace.remove(&block);
}
}
})
.is_some()
}
pub fn remove_by_block(&mut self, block: BlockNumberAndHash) -> bool {
let should_punish = self.download_schedulers.len() > self.protect_num;
let download_schedulers = &mut self.download_schedulers;
let trace = &mut self.trace_number;
let time_analyzer = &mut self.time_analyzer;
let adjustment = self.adjustment;
self.inflight_states
.remove(&block)
.map(|state| {
let elapsed = unix_time_as_millis().saturating_sub(state.timestamp);
if let Some(set) = download_schedulers.get_mut(&state.peer) {
set.hashes.remove(&block);
if adjustment {
match time_analyzer.push_time(elapsed) {
TimeQuantile::MinToFast => set.increase(2),
TimeQuantile::FastToNormal => set.increase(1),
TimeQuantile::NormalToUpper => {
if should_punish {
set.decrease(1)
}
}
TimeQuantile::UpperToMax => {
if should_punish {
set.decrease(2)
}
}
}
}
if !trace.is_empty() {
trace.remove(&block);
}
};
})
.is_some()
}
}
impl Peers {
pub fn sync_connected(&self, peer: PeerIndex, is_outbound: bool, is_whitelist: bool) {
let protect_outbound = is_outbound
&& self
.n_protected_outbound_peers
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |x| {
if x < MAX_OUTBOUND_PEERS_TO_PROTECT_FROM_DISCONNECT {
Some(x + 1)
} else {
None
}
})
.is_ok();
let peer_flags = PeerFlags {
is_outbound,
is_whitelist,
is_protect: protect_outbound,
};
self.state
.entry(peer)
.and_modify(|state| {
state.peer_flags = peer_flags;
state.sync_connected();
})
.or_insert_with(|| {
let mut state = PeerState::new(peer_flags);
state.sync_connected();
state
});
}
pub fn relay_connected(&self, peer: PeerIndex) {
self.state
.entry(peer)
.or_insert_with(|| PeerState::new(PeerFlags::default()));
}
pub fn get_best_known_header(&self, pi: PeerIndex) -> Option<HeaderView> {
self.state
.get(&pi)
.and_then(|peer_state| peer_state.best_known_header.clone())
}
pub fn may_set_best_known_header(&self, peer: PeerIndex, header_view: HeaderView) {
if let Some(mut peer_state) = self.state.get_mut(&peer) {
if let Some(ref hv) = peer_state.best_known_header {
if header_view.is_better_than(hv.total_difficulty()) {
peer_state.best_known_header = Some(header_view);
}
} else {
peer_state.best_known_header = Some(header_view);
}
}
}
pub fn get_last_common_header(&self, pi: PeerIndex) -> Option<core::HeaderView> {
self.state
.get(&pi)
.and_then(|peer_state| peer_state.last_common_header.clone())
}
pub fn set_last_common_header(&self, pi: PeerIndex, header: core::HeaderView) {
self.state
.entry(pi)
.and_modify(|peer_state| peer_state.last_common_header = Some(header));
}
pub fn getheaders_received(&self, _peer: PeerIndex) {
}
pub fn disconnected(&self, peer: PeerIndex) {
if let Some(peer_state) = self.state.remove(&peer).map(|(_, peer_state)| peer_state) {
if peer_state.sync_started() {
assert_ne!(
self.n_sync_started.fetch_sub(1, Ordering::AcqRel),
0,
"n_sync_started overflow when disconnects"
);
}
if peer_state.peer_flags.is_protect {
assert_ne!(
self.n_protected_outbound_peers
.fetch_sub(1, Ordering::AcqRel),
0,
"n_protected_outbound_peers overflow when disconnects"
);
}
}
}
pub fn insert_unknown_header_hash(&self, peer: PeerIndex, hash: Byte32) {
self.state
.entry(peer)
.and_modify(|state| state.unknown_header_list.push(hash));
}
pub fn unknown_header_list_is_empty(&self, peer: PeerIndex) -> bool {
self.state
.get(&peer)
.map(|state| state.unknown_header_list.is_empty())
.unwrap_or(true)
}
pub fn clear_unknown_list(&self) {
self.state.iter_mut().for_each(|mut state| {
if !state.unknown_header_list.is_empty() {
state.unknown_header_list.clear()
}
})
}
pub fn get_best_known_less_than_tip_and_unknown_empty(
&self,
tip: BlockNumber,
) -> Vec<PeerIndex> {
self.state
.iter()
.filter_map(|kv_pair| {
let (peer_index, state) = kv_pair.pair();
if !state.unknown_header_list.is_empty() {
return None;
}
match state.best_known_header {
Some(ref header) if header.number() < tip => Some(*peer_index),
_ => None,
}
})
.collect()
}
pub fn take_unknown_last(&self, peer: PeerIndex) -> Option<Byte32> {
self.state
.get_mut(&peer)
.and_then(|mut state| state.unknown_header_list.pop())
}
pub fn get_flag(&self, peer: PeerIndex) -> Option<PeerFlags> {
self.state.get(&peer).map(|state| state.peer_flags)
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct HeaderView {
inner: core::HeaderView,
total_difficulty: U256,
pub(crate) skip_hash: Option<Byte32>,
}
impl HeaderView {
pub fn new(inner: core::HeaderView, total_difficulty: U256) -> Self {
HeaderView {
inner,
total_difficulty,
skip_hash: None,
}
}
pub fn number(&self) -> BlockNumber {
self.inner.number()
}
pub fn hash(&self) -> Byte32 {
self.inner.hash()
}
pub fn parent_hash(&self) -> Byte32 {
self.inner.data().raw().parent_hash()
}
pub fn timestamp(&self) -> u64 {
self.inner.timestamp()
}
pub fn total_difficulty(&self) -> &U256 {
&self.total_difficulty
}
pub fn inner(&self) -> &core::HeaderView {
&self.inner
}
pub fn into_inner(self) -> core::HeaderView {
self.inner
}
pub fn build_skip<F, G>(&mut self, tip_number: BlockNumber, get_header_view: F, fast_scanner: G)
where
F: FnMut(&Byte32, Option<bool>) -> Option<HeaderView>,
G: Fn(BlockNumber, &HeaderView) -> Option<HeaderView>,
{
if self.inner.is_genesis() {
return;
}
self.skip_hash = self
.clone()
.get_ancestor(
tip_number,
get_skip_height(self.number()),
get_header_view,
fast_scanner,
)
.map(|header| header.hash());
}
pub fn get_ancestor<F, G>(
self,
tip_number: BlockNumber,
number: BlockNumber,
mut get_header_view: F,
fast_scanner: G,
) -> Option<core::HeaderView>
where
F: FnMut(&Byte32, Option<bool>) -> Option<HeaderView>,
G: Fn(BlockNumber, &HeaderView) -> Option<HeaderView>,
{
let mut current = self;
if number > current.number() {
return None;
}
let mut number_walk = current.number();
while number_walk > number {
let number_skip = get_skip_height(number_walk);
let number_skip_prev = get_skip_height(number_walk - 1);
let store_first = current.number() <= tip_number;
match current.skip_hash {
Some(ref hash)
if number_skip == number
|| (number_skip > number
&& !(number_skip_prev + 2 < number_skip
&& number_skip_prev >= number)) =>
{
current = get_header_view(hash, Some(store_first))?;
number_walk = number_skip;
}
_ => {
current = get_header_view(¤t.parent_hash(), Some(store_first))?;
number_walk -= 1;
}
}
if let Some(target) = fast_scanner(number, ¤t) {
current = target;
break;
}
}
Some(current).map(HeaderView::into_inner)
}
pub fn is_better_than(&self, total_difficulty: &U256) -> bool {
self.total_difficulty() > total_difficulty
}
fn from_slice_should_be_ok(slice: &[u8]) -> Self {
let len_size = packed::Uint32Reader::TOTAL_SIZE;
if slice.len() < len_size {
panic!("failed to unpack item in header map: header part is broken");
}
let mut idx = 0;
let inner_len = {
let reader = packed::Uint32Reader::from_slice_should_be_ok(&slice[idx..idx + len_size]);
Unpack::<u32>::unpack(&reader) as usize
};
idx += len_size;
let total_difficulty_len = packed::Uint256Reader::TOTAL_SIZE;
if slice.len() < len_size + inner_len + total_difficulty_len {
panic!("failed to unpack item in header map: body part is broken");
}
let inner = {
let reader =
packed::HeaderViewReader::from_slice_should_be_ok(&slice[idx..idx + inner_len]);
Unpack::<core::HeaderView>::unpack(&reader)
};
idx += inner_len;
let total_difficulty = {
let reader = packed::Uint256Reader::from_slice_should_be_ok(
&slice[idx..idx + total_difficulty_len],
);
Unpack::<U256>::unpack(&reader)
};
idx += total_difficulty_len;
let skip_hash = {
packed::Byte32OptReader::from_slice_should_be_ok(&slice[idx..])
.to_entity()
.to_opt()
};
Self {
inner,
total_difficulty,
skip_hash,
}
}
fn to_vec(&self) -> Vec<u8> {
let mut v = Vec::new();
let inner: packed::HeaderView = self.inner.pack();
let total_difficulty: packed::Uint256 = self.total_difficulty.pack();
let skip_hash: packed::Byte32Opt = Pack::pack(&self.skip_hash);
let inner_len: packed::Uint32 = (inner.as_slice().len() as u32).pack();
v.extend_from_slice(inner_len.as_slice());
v.extend_from_slice(inner.as_slice());
v.extend_from_slice(total_difficulty.as_slice());
v.extend_from_slice(skip_hash.as_slice());
v
}
}
fn get_skip_height(height: BlockNumber) -> BlockNumber {
fn invert_lowest_one(n: i64) -> i64 {
n & (n - 1)
}
if height < 2 {
return 0;
}
if (height & 1) > 0 {
invert_lowest_one(invert_lowest_one(height as i64 - 1)) as u64 + 1
} else {
invert_lowest_one(height as i64) as u64
}
}
pub(crate) type PendingCompactBlockMap = HashMap<
Byte32,
(
packed::CompactBlock,
HashMap<PeerIndex, (Vec<u32>, Vec<u32>)>,
u64,
),
>;
#[derive(Clone)]
pub struct SyncShared {
shared: Shared,
state: Arc<SyncState>,
}
impl SyncShared {
pub fn new(
shared: Shared,
sync_config: SyncConfig,
tx_relay_receiver: Receiver<TxVerificationResult>,
) -> SyncShared {
Self::with_tmpdir::<PathBuf>(shared, sync_config, None, tx_relay_receiver)
}
pub fn with_tmpdir<P>(
shared: Shared,
sync_config: SyncConfig,
tmpdir: Option<P>,
tx_relay_receiver: Receiver<TxVerificationResult>,
) -> SyncShared
where
P: AsRef<Path>,
{
let (total_difficulty, header) = {
let snapshot = shared.snapshot();
(
snapshot.total_difficulty().to_owned(),
snapshot.tip_header().to_owned(),
)
};
let shared_best_header = RwLock::new(HeaderView::new(header, total_difficulty));
ckb_logger::info!(
"header_map.memory_limit {}",
sync_config.header_map.memory_limit
);
let header_map = HeaderMap::new(
tmpdir,
sync_config.header_map.memory_limit.as_u64() as usize,
shared.async_handle(),
);
let state = SyncState {
shared_best_header,
header_map,
block_status_map: DashMap::new(),
tx_filter: Mutex::new(Filter::new(TX_FILTER_SIZE)),
unknown_tx_hashes: Mutex::new(KeyedPriorityQueue::new()),
peers: Peers::default(),
pending_get_block_proposals: DashMap::new(),
pending_compact_blocks: Mutex::new(HashMap::default()),
orphan_block_pool: OrphanBlockPool::with_capacity(ORPHAN_BLOCK_SIZE),
inflight_proposals: DashMap::new(),
inflight_blocks: RwLock::new(InflightBlocks::default()),
pending_get_headers: RwLock::new(LruCache::new(GET_HEADERS_CACHE_SIZE)),
tx_relay_receiver,
assume_valid_target: Mutex::new(sync_config.assume_valid_target),
min_chain_work: sync_config.min_chain_work,
};
SyncShared {
shared,
state: Arc::new(state),
}
}
pub fn shared(&self) -> &Shared {
&self.shared
}
pub fn active_chain(&self) -> ActiveChain {
ActiveChain {
shared: self.clone(),
snapshot: Arc::clone(&self.shared.snapshot()),
state: Arc::clone(&self.state),
}
}
pub fn store(&self) -> &ChainDB {
self.shared.store()
}
pub fn state(&self) -> &SyncState {
&self.state
}
pub fn consensus(&self) -> &Consensus {
self.shared.consensus()
}
pub fn insert_new_block(
&self,
chain: &ChainController,
block: Arc<core::BlockView>,
) -> Result<bool, CKBError> {
if !self.is_stored(&block.parent_hash()) {
debug!(
"insert new orphan block {} {}",
block.header().number(),
block.header().hash()
);
self.state.insert_orphan_block((*block).clone());
return Ok(false);
}
let ret = self.accept_block(chain, Arc::clone(&block));
if ret.is_err() {
debug!("accept block {:?} {:?}", block, ret);
return ret;
}
self.try_search_orphan_pool(chain);
ret
}
pub fn try_search_orphan_pool(&self, chain: &ChainController) {
let leaders = self.state.orphan_pool().clone_leaders();
debug!("orphan pool leader parents hash len: {}", leaders.len());
for hash in leaders {
if self.state.orphan_pool().is_empty() {
break;
}
if self.is_stored(&hash) {
let descendants = self.state.remove_orphan_by_parent(&hash);
debug!(
"try accepting {} descendant orphan blocks by exist parents hash",
descendants.len()
);
for block in descendants {
if !self.is_stored(&block.parent_hash()) {
debug!(
"parent-unknown orphan block, block: {}, {}, parent: {}",
block.header().number(),
block.header().hash(),
block.header().parent_hash(),
);
continue;
}
let block = Arc::new(block);
if let Err(err) = self.accept_block(chain, Arc::clone(&block)) {
debug!(
"accept descendant orphan block {} error {:?}",
block.header().hash(),
err
);
}
}
}
}
}
pub(crate) fn periodic_clean_orphan_pool(&self) {
let hashes = self
.state
.clean_expired_blocks(self.active_chain().epoch_ext().number());
for hash in hashes {
self.state.remove_header_view(&hash);
}
}
pub(crate) fn accept_block(
&self,
chain: &ChainController,
block: Arc<core::BlockView>,
) -> Result<bool, CKBError> {
let ret = {
let mut assume_valid_target = self.state.assume_valid_target();
if let Some(ref target) = *assume_valid_target {
let switch = if target == &Unpack::<H256>::unpack(&core::BlockView::hash(&block)) {
assume_valid_target.take();
Switch::NONE
} else {
Switch::DISABLE_SCRIPT
};
chain.internal_process_block(Arc::clone(&block), switch)
} else {
chain.process_block(Arc::clone(&block))
}
};
if let Err(ref error) = ret {
if !is_internal_db_error(error) {
error!("accept block {:?} {}", block, error);
self.state
.insert_block_status(block.header().hash(), BlockStatus::BLOCK_INVALID);
}
} else {
self.state.remove_block_status(&block.as_ref().hash());
self.state.remove_header_view(&block.as_ref().hash());
}
ret
}
pub fn insert_valid_header(&self, peer: PeerIndex, header: &core::HeaderView) {
let tip_number = self.active_chain().tip_number();
let store_first = tip_number >= header.number();
let parent_view = self
.get_header_view(&header.data().raw().parent_hash(), Some(store_first))
.expect("parent should be verified");
let mut header_view = {
let total_difficulty = parent_view.total_difficulty() + header.difficulty();
HeaderView::new(header.clone(), total_difficulty)
};
let snapshot = Arc::clone(&self.shared.snapshot());
header_view.build_skip(
tip_number,
|hash, store_first_opt| self.get_header_view(hash, store_first_opt),
|number, current| {
if current.number() <= snapshot.tip_number()
&& snapshot.is_main_chain(¤t.hash())
{
snapshot
.get_block_hash(number)
.and_then(|hash| self.get_header_view(&hash, Some(true)))
} else {
None
}
},
);
self.state.header_map.insert(header_view.clone());
self.state
.peers()
.may_set_best_known_header(peer, header_view.clone());
self.state.may_set_shared_best_header(header_view);
}
pub fn get_header_view(
&self,
hash: &Byte32,
store_first_opt: Option<bool>,
) -> Option<HeaderView> {
let store = self.store();
if store_first_opt.unwrap_or(false) {
store
.get_block_header(hash)
.and_then(|header| {
store
.get_block_ext(hash)
.map(|block_ext| HeaderView::new(header, block_ext.total_difficulty))
})
.or_else(|| self.state.header_map.get(hash))
} else {
self.state.header_map.get(hash).or_else(|| {
store.get_block_header(hash).and_then(|header| {
store
.get_block_ext(hash)
.map(|block_ext| HeaderView::new(header, block_ext.total_difficulty))
})
})
}
}
pub fn is_stored(&self, hash: &packed::Byte32) -> bool {
let status = self.active_chain().get_block_status(hash);
status.contains(BlockStatus::BLOCK_STORED)
}
pub fn get_epoch_ext(&self, hash: &Byte32) -> Option<EpochExt> {
self.store().get_block_epoch(hash)
}
}
impl HeaderProvider for SyncShared {
fn get_header(&self, hash: &Byte32) -> Option<core::HeaderView> {
self.state
.header_map
.get(hash)
.map(HeaderView::into_inner)
.or_else(|| self.store().get_block_header(hash))
}
}
#[derive(Eq, PartialEq, Clone)]
pub struct UnknownTxHashPriority {
request_time: Instant,
peers: Vec<PeerIndex>,
requested: bool,
}
impl UnknownTxHashPriority {
pub fn should_request(&self, now: Instant) -> bool {
self.next_request_at() < now
}
pub fn next_request_at(&self) -> Instant {
if self.requested {
self.request_time + RETRY_ASK_TX_TIMEOUT_INCREASE
} else {
self.request_time
}
}
pub fn next_request_peer(&mut self) -> Option<PeerIndex> {
if self.requested {
if self.peers.len() > 1 {
self.request_time = Instant::now();
self.peers.swap_remove(0);
self.peers.get(0).cloned()
} else {
None
}
} else {
self.requested = true;
self.peers.get(0).cloned()
}
}
pub fn push_peer(&mut self, peer_index: PeerIndex) {
self.peers.push(peer_index);
}
pub fn requesting_peer(&self) -> Option<PeerIndex> {
if self.requested {
self.peers.get(0).cloned()
} else {
None
}
}
}
impl Ord for UnknownTxHashPriority {
fn cmp(&self, other: &Self) -> cmp::Ordering {
self.next_request_at()
.cmp(&other.next_request_at())
.reverse()
}
}
impl PartialOrd for UnknownTxHashPriority {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
pub struct SyncState {
shared_best_header: RwLock<HeaderView>,
header_map: HeaderMap,
block_status_map: DashMap<Byte32, BlockStatus>,
tx_filter: Mutex<Filter<Byte32>>,
unknown_tx_hashes: Mutex<KeyedPriorityQueue<Byte32, UnknownTxHashPriority>>,
peers: Peers,
pending_get_block_proposals: DashMap<packed::ProposalShortId, HashSet<PeerIndex>>,
pending_get_headers: RwLock<LruCache<(PeerIndex, Byte32), Instant>>,
pending_compact_blocks: Mutex<PendingCompactBlockMap>,
orphan_block_pool: OrphanBlockPool,
inflight_proposals: DashMap<packed::ProposalShortId, BlockNumber>,
inflight_blocks: RwLock<InflightBlocks>,
tx_relay_receiver: Receiver<TxVerificationResult>,
assume_valid_target: Mutex<Option<H256>>,
min_chain_work: U256,
}
impl SyncState {
pub fn assume_valid_target(&self) -> MutexGuard<Option<H256>> {
self.assume_valid_target.lock()
}
pub fn min_chain_work(&self) -> &U256 {
&self.min_chain_work
}
pub fn min_chain_work_ready(&self) -> bool {
self.shared_best_header
.read()
.is_better_than(&self.min_chain_work)
}
pub fn n_sync_started(&self) -> &AtomicUsize {
&self.peers.n_sync_started
}
pub fn peers(&self) -> &Peers {
&self.peers
}
pub fn compare_with_pending_compact(&self, hash: &Byte32, now: u64) -> bool {
let pending = self.pending_compact_blocks.lock();
pending.is_empty()
|| pending
.get(hash)
.map(|(_, _, time)| now > time + 2000)
.unwrap_or(true)
}
pub fn pending_compact_blocks(&self) -> MutexGuard<PendingCompactBlockMap> {
self.pending_compact_blocks.lock()
}
pub fn read_inflight_blocks(&self) -> RwLockReadGuard<InflightBlocks> {
self.inflight_blocks.read()
}
pub fn write_inflight_blocks(&self) -> RwLockWriteGuard<InflightBlocks> {
self.inflight_blocks.write()
}
pub fn take_relay_tx_verify_results(&self, limit: usize) -> Vec<TxVerificationResult> {
self.tx_relay_receiver.try_iter().take(limit).collect()
}
pub fn shared_best_header(&self) -> HeaderView {
self.shared_best_header.read().to_owned()
}
pub fn shared_best_header_ref(&self) -> RwLockReadGuard<HeaderView> {
self.shared_best_header.read()
}
pub fn header_map(&self) -> &HeaderMap {
&self.header_map
}
pub fn may_set_shared_best_header(&self, header: HeaderView) {
if !header.is_better_than(self.shared_best_header.read().total_difficulty()) {
return;
}
metrics!(gauge, "ckb.shared_best_number", header.number() as i64);
*self.shared_best_header.write() = header;
}
pub fn remove_header_view(&self, hash: &Byte32) {
self.header_map.remove(hash);
}
pub(crate) fn suspend_sync(&self, peer_state: &mut PeerState) {
if peer_state.sync_started() {
assert_ne!(
self.peers.n_sync_started.fetch_sub(1, Ordering::AcqRel),
0,
"n_sync_started overflow when suspend_sync"
);
}
peer_state.suspend_sync(SUSPEND_SYNC_TIME);
}
pub(crate) fn tip_synced(&self, peer_state: &mut PeerState) {
if peer_state.sync_started() {
assert_ne!(
self.peers.n_sync_started.fetch_sub(1, Ordering::AcqRel),
0,
"n_sync_started overflow when tip_synced"
);
}
peer_state.tip_synced();
}
pub fn mark_as_known_tx(&self, hash: Byte32) {
self.mark_as_known_txs(iter::once(hash));
}
pub fn remove_from_known_txs(&self, hash: &Byte32) {
self.tx_filter.lock().remove(hash);
}
pub fn mark_as_known_txs(&self, hashes: impl Iterator<Item = Byte32> + std::clone::Clone) {
let mut unknown_tx_hashes = self.unknown_tx_hashes.lock();
let mut tx_filter = self.tx_filter.lock();
for hash in hashes {
unknown_tx_hashes.remove(&hash);
tx_filter.insert(hash);
}
}
pub fn pop_ask_for_txs(&self) -> HashMap<PeerIndex, Vec<Byte32>> {
let mut unknown_tx_hashes = self.unknown_tx_hashes.lock();
let mut result: HashMap<PeerIndex, Vec<Byte32>> = HashMap::new();
let now = Instant::now();
if !unknown_tx_hashes
.peek()
.map(|(_tx_hash, priority)| priority.should_request(now))
.unwrap_or_default()
{
return result;
}
while let Some((tx_hash, mut priority)) = unknown_tx_hashes.pop() {
if priority.should_request(now) {
if let Some(peer_index) = priority.next_request_peer() {
result
.entry(peer_index)
.and_modify(|hashes| hashes.push(tx_hash.clone()))
.or_insert_with(|| vec![tx_hash.clone()]);
unknown_tx_hashes.push(tx_hash, priority);
}
} else {
unknown_tx_hashes.push(tx_hash, priority);
break;
}
}
result
}
pub fn add_ask_for_txs(&self, peer_index: PeerIndex, tx_hashes: Vec<Byte32>) -> Status {
let mut unknown_tx_hashes = self.unknown_tx_hashes.lock();
for tx_hash in tx_hashes
.into_iter()
.take(MAX_UNKNOWN_TX_HASHES_SIZE_PER_PEER)
{
match unknown_tx_hashes.entry(tx_hash) {
keyed_priority_queue::Entry::Occupied(entry) => {
let mut priority = entry.get_priority().clone();
priority.push_peer(peer_index);
entry.set_priority(priority);
}
keyed_priority_queue::Entry::Vacant(entry) => {
entry.set_priority(UnknownTxHashPriority {
request_time: Instant::now(),
peers: vec![peer_index],
requested: false,
})
}
}
}
if unknown_tx_hashes.len() >= MAX_UNKNOWN_TX_HASHES_SIZE
|| unknown_tx_hashes.len()
>= self.peers.state.len() * MAX_UNKNOWN_TX_HASHES_SIZE_PER_PEER
{
ckb_logger::warn!(
"unknown_tx_hashes is too long, len: {}",
unknown_tx_hashes.len()
);
let mut peer_unknown_counter = 0;
for (_hash, priority) in unknown_tx_hashes.iter() {
for peer in priority.peers.iter() {
if *peer == peer_index {
peer_unknown_counter += 1;
}
}
}
if peer_unknown_counter >= MAX_UNKNOWN_TX_HASHES_SIZE_PER_PEER {
return StatusCode::TooManyUnknownTransactions.into();
}
return Status::ignored();
}
Status::ok()
}
pub fn already_known_tx(&self, hash: &Byte32) -> bool {
self.tx_filter.lock().contains(hash)
}
pub fn tx_filter(&self) -> MutexGuard<Filter<Byte32>> {
self.tx_filter.lock()
}
pub fn unknown_tx_hashes(
&self,
) -> MutexGuard<KeyedPriorityQueue<Byte32, UnknownTxHashPriority>> {
self.unknown_tx_hashes.lock()
}
pub fn new_block_received(&self, block: &core::BlockView) -> bool {
if self
.write_inflight_blocks()
.remove_by_block((block.number(), block.hash()).into())
{
self.insert_block_status(block.hash(), BlockStatus::BLOCK_RECEIVED);
true
} else {
false
}
}
pub fn insert_inflight_proposals(
&self,
ids: Vec<packed::ProposalShortId>,
block_number: BlockNumber,
) -> Vec<bool> {
ids.into_iter()
.map(|id| match self.inflight_proposals.entry(id) {
dashmap::mapref::entry::Entry::Occupied(mut occupied) => {
if *occupied.get() < block_number {
occupied.insert(block_number);
true
} else {
false
}
}
dashmap::mapref::entry::Entry::Vacant(vacant) => {
vacant.insert(block_number);
true
}
})
.collect()
}
pub fn remove_inflight_proposals(&self, ids: &[packed::ProposalShortId]) -> Vec<bool> {
ids.iter()
.map(|id| self.inflight_proposals.remove(id).is_some())
.collect()
}
pub fn clear_expired_inflight_proposals(&self, keep_min_block_number: BlockNumber) {
self.inflight_proposals
.retain(|_, block_number| *block_number >= keep_min_block_number);
}
pub fn contains_inflight_proposal(&self, proposal_id: &packed::ProposalShortId) -> bool {
self.inflight_proposals.contains_key(proposal_id)
}
pub fn insert_orphan_block(&self, block: core::BlockView) {
self.insert_block_status(block.hash(), BlockStatus::BLOCK_RECEIVED);
self.orphan_block_pool.insert(block);
}
pub fn remove_orphan_by_parent(&self, parent_hash: &Byte32) -> Vec<core::BlockView> {
let blocks = self.orphan_block_pool.remove_blocks_by_parent(parent_hash);
blocks.iter().for_each(|block| {
self.block_status_map.remove(&block.hash());
});
shrink_to_fit!(self.block_status_map, SHRINK_THRESHOLD);
blocks
}
pub fn orphan_pool(&self) -> &OrphanBlockPool {
&self.orphan_block_pool
}
pub fn insert_block_status(&self, block_hash: Byte32, status: BlockStatus) {
self.block_status_map.insert(block_hash, status);
}
pub fn remove_block_status(&self, block_hash: &Byte32) {
self.block_status_map.remove(block_hash);
shrink_to_fit!(self.block_status_map, SHRINK_THRESHOLD);
}
pub fn drain_get_block_proposals(
&self,
) -> DashMap<packed::ProposalShortId, HashSet<PeerIndex>> {
let ret = self.pending_get_block_proposals.clone();
self.pending_get_block_proposals.clear();
ret
}
pub fn insert_get_block_proposals(&self, pi: PeerIndex, ids: Vec<packed::ProposalShortId>) {
for id in ids.into_iter() {
self.pending_get_block_proposals
.entry(id)
.or_default()
.insert(pi);
}
}
pub fn disconnected(&self, pi: PeerIndex) {
self.write_inflight_blocks().remove_by_peer(pi);
self.peers().disconnected(pi);
}
pub fn get_orphan_block(&self, block_hash: &Byte32) -> Option<core::BlockView> {
self.orphan_block_pool.get_block(block_hash)
}
pub fn clean_expired_blocks(&self, epoch: EpochNumber) -> Vec<packed::Byte32> {
self.orphan_block_pool.clean_expired_blocks(epoch)
}
pub fn insert_peer_unknown_header_list(&self, pi: PeerIndex, header_list: Vec<Byte32>) {
if self.peers.unknown_header_list_is_empty(pi) {
for hash in header_list {
if let Some(header) = self.header_map.get(&hash) {
self.peers.may_set_best_known_header(pi, header);
break;
} else {
self.peers.insert_unknown_header_hash(pi, hash)
}
}
}
}
}
#[derive(Clone)]
pub struct ActiveChain {
shared: SyncShared,
snapshot: Arc<Snapshot>,
state: Arc<SyncState>,
}
#[doc(hidden)]
impl ActiveChain {
fn store(&self) -> &ChainDB {
self.shared.store()
}
fn snapshot(&self) -> &Snapshot {
&self.snapshot
}
pub fn get_block_hash(&self, number: BlockNumber) -> Option<packed::Byte32> {
self.snapshot().get_block_hash(number)
}
pub fn get_block(&self, h: &packed::Byte32) -> Option<core::BlockView> {
self.store().get_block(h)
}
pub fn get_block_header(&self, h: &packed::Byte32) -> Option<core::HeaderView> {
self.store().get_block_header(h)
}
pub fn get_block_ext(&self, h: &packed::Byte32) -> Option<core::BlockExt> {
self.snapshot().get_block_ext(h)
}
pub fn get_block_filter(&self, hash: &packed::Byte32) -> Option<packed::Bytes> {
self.store().get_block_filter(hash)
}
pub fn shared(&self) -> &SyncShared {
&self.shared
}
pub fn total_difficulty(&self) -> &U256 {
self.snapshot.total_difficulty()
}
pub fn tip_header(&self) -> core::HeaderView {
self.snapshot.tip_header().clone()
}
pub fn tip_hash(&self) -> Byte32 {
self.snapshot.tip_hash()
}
pub fn tip_number(&self) -> BlockNumber {
self.snapshot.tip_number()
}
pub fn epoch_ext(&self) -> core::EpochExt {
self.snapshot.epoch_ext().clone()
}
pub fn is_main_chain(&self, hash: &packed::Byte32) -> bool {
self.snapshot.is_main_chain(hash)
}
pub fn is_initial_block_download(&self) -> bool {
self.shared.shared().is_initial_block_download()
}
pub fn get_ancestor(&self, base: &Byte32, number: BlockNumber) -> Option<core::HeaderView> {
let tip_number = self.tip_number();
self.shared.get_header_view(base, None)?.get_ancestor(
tip_number,
number,
|hash, store_first_opt| self.shared.get_header_view(hash, store_first_opt),
|number, current| {
if current.number() <= tip_number && self.snapshot().is_main_chain(¤t.hash())
{
self.get_block_hash(number)
.and_then(|hash| self.shared.get_header_view(&hash, Some(true)))
} else {
None
}
},
)
}
pub fn get_locator(&self, start: &core::HeaderView) -> Vec<Byte32> {
let mut step = 1;
let mut locator = Vec::with_capacity(32);
let mut index = start.number();
let mut base = start.hash();
loop {
let header_hash = self
.get_ancestor(&base, index)
.unwrap_or_else(|| {
panic!(
"index calculated in get_locator: \
start: {}, base: {}, step: {}, locators({}): {:?}.",
start,
base,
step,
locator.len(),
locator,
)
})
.hash();
locator.push(header_hash.clone());
if locator.len() >= 10 {
step <<= 1;
}
if index < step * 2 {
if locator.len() < 52 && index > ONE_DAY_BLOCK_NUMBER {
index >>= 1;
base = header_hash;
continue;
}
if index != 0 {
locator.push(self.shared.consensus().genesis_hash());
}
break;
}
index -= step;
base = header_hash;
}
locator
}
pub fn last_common_ancestor(
&self,
pa: &core::HeaderView,
pb: &core::HeaderView,
) -> Option<core::HeaderView> {
let (mut m_left, mut m_right) = if pa.number() > pb.number() {
(pb.clone(), pa.clone())
} else {
(pa.clone(), pb.clone())
};
m_right = self.get_ancestor(&m_right.hash(), m_left.number())?;
if m_left == m_right {
return Some(m_left);
}
debug_assert!(m_left.number() == m_right.number());
while m_left != m_right {
m_left = self.get_ancestor(&m_left.hash(), m_left.number() - 1)?;
m_right = self.get_ancestor(&m_right.hash(), m_right.number() - 1)?;
}
Some(m_left)
}
pub fn locate_latest_common_block(
&self,
_hash_stop: &Byte32,
locator: &[Byte32],
) -> Option<BlockNumber> {
if locator.is_empty() {
return None;
}
let locator_hash = locator.last().expect("empty checked");
if locator_hash != &self.shared.consensus().genesis_hash() {
return None;
}
let (index, latest_common) = locator
.iter()
.enumerate()
.map(|(index, hash)| (index, self.snapshot.get_block_number(hash)))
.find(|(_index, number)| number.is_some())
.expect("locator last checked");
if index == 0 || latest_common == Some(0) {
return latest_common;
}
if let Some(header) = locator
.get(index - 1)
.and_then(|hash| self.shared.store().get_block_header(hash))
{
let mut block_hash = header.data().raw().parent_hash();
loop {
let block_header = match self.shared.store().get_block_header(&block_hash) {
None => break latest_common,
Some(block_header) => block_header,
};
if let Some(block_number) = self.snapshot.get_block_number(&block_hash) {
return Some(block_number);
}
block_hash = block_header.data().raw().parent_hash();
}
} else {
latest_common
}
}
pub fn get_locator_response(
&self,
block_number: BlockNumber,
hash_stop: &Byte32,
) -> Vec<core::HeaderView> {
let tip_number = self.tip_header().number();
let max_height = cmp::min(
block_number + 1 + MAX_HEADERS_LEN as BlockNumber,
tip_number + 1,
);
(block_number + 1..max_height)
.filter_map(|block_number| self.snapshot.get_block_hash(block_number))
.take_while(|block_hash| block_hash != hash_stop)
.filter_map(|block_hash| self.shared.store().get_block_header(&block_hash))
.collect()
}
pub fn send_getheaders_to_peer(
&self,
nc: &dyn CKBProtocolContext,
peer: PeerIndex,
header: &core::HeaderView,
) {
if let Some(last_time) = self
.state
.pending_get_headers
.write()
.get(&(peer, header.hash()))
{
if Instant::now() < *last_time + GET_HEADERS_TIMEOUT {
debug!(
"last send get headers from {} less than {:?} ago, ignore it",
peer, GET_HEADERS_TIMEOUT,
);
return;
} else {
debug!(
"Can not get headers from {} in {:?}, retry",
peer, GET_HEADERS_TIMEOUT,
);
}
}
self.state
.pending_get_headers
.write()
.put((peer, header.hash()), Instant::now());
debug!(
"send_getheaders_to_peer peer={}, hash={}",
peer,
header.hash()
);
let locator_hash = self.get_locator(header);
let content = packed::GetHeaders::new_builder()
.block_locator_hashes(locator_hash.pack())
.hash_stop(packed::Byte32::zero())
.build();
let message = packed::SyncMessage::new_builder().set(content).build();
let _status = send_message(SupportProtocols::Sync.protocol_id(), nc, peer, &message);
}
pub fn get_block_status(&self, block_hash: &Byte32) -> BlockStatus {
match self.state.block_status_map.get(block_hash) {
Some(status_ref) => *status_ref.value(),
None => {
if self.state.header_map.contains_key(block_hash) {
BlockStatus::HEADER_VALID
} else {
let verified = self
.snapshot
.get_block_ext(block_hash)
.map(|block_ext| block_ext.verified);
match verified {
None => BlockStatus::UNKNOWN,
Some(None) => BlockStatus::BLOCK_STORED,
Some(Some(true)) => BlockStatus::BLOCK_VALID,
Some(Some(false)) => BlockStatus::BLOCK_INVALID,
}
}
}
}
}
pub fn contains_block_status(&self, block_hash: &Byte32, status: BlockStatus) -> bool {
self.get_block_status(block_hash).contains(status)
}
}
#[derive(Clone, Copy, Debug)]
pub enum IBDState {
In,
Out,
}
impl From<bool> for IBDState {
fn from(src: bool) -> Self {
if src {
IBDState::In
} else {
IBDState::Out
}
}
}
impl From<IBDState> for bool {
fn from(s: IBDState) -> bool {
match s {
IBDState::In => true,
IBDState::Out => false,
}
}
}