ckb_sync/types/
mod.rs

1use crate::{FAST_INDEX, LOW_INDEX, NORMAL_INDEX, Status, StatusCode, TIME_TRACE_SIZE};
2use ckb_app_config::SyncConfig;
3#[cfg(test)]
4use ckb_chain::VerifyResult;
5use ckb_chain::{ChainController, RemoteBlock};
6use ckb_chain_spec::consensus::{Consensus, MAX_BLOCK_INTERVAL, MIN_BLOCK_INTERVAL};
7use ckb_channel::Receiver;
8use ckb_constant::sync::{
9    BLOCK_DOWNLOAD_TIMEOUT, HEADERS_DOWNLOAD_HEADERS_PER_SECOND, HEADERS_DOWNLOAD_INSPECT_WINDOW,
10    HEADERS_DOWNLOAD_TOLERABLE_BIAS_FOR_SINGLE_SAMPLE, INIT_BLOCKS_IN_TRANSIT_PER_PEER,
11    MAX_BLOCKS_IN_TRANSIT_PER_PEER, MAX_HEADERS_LEN, MAX_OUTBOUND_PEERS_TO_PROTECT_FROM_DISCONNECT,
12    MAX_UNKNOWN_TX_HASHES_SIZE, MAX_UNKNOWN_TX_HASHES_SIZE_PER_PEER, POW_INTERVAL,
13    RETRY_ASK_TX_TIMEOUT_INCREASE, SUSPEND_SYNC_TIME,
14};
15use ckb_logger::{debug, error, info, trace, warn};
16use ckb_network::{CKBProtocolContext, PeerIndex, SupportProtocols};
17use ckb_shared::{
18    Snapshot,
19    block_status::BlockStatus,
20    shared::Shared,
21    types::{HeaderIndex, HeaderIndexView, SHRINK_THRESHOLD},
22};
23use ckb_store::{ChainDB, ChainStore};
24use ckb_systemtime::unix_time_as_millis;
25use ckb_traits::{HeaderFields, HeaderFieldsProvider};
26use ckb_tx_pool::service::TxVerificationResult;
27use ckb_types::BlockNumberAndHash;
28use ckb_types::{
29    U256,
30    core::{self, BlockNumber, EpochExt},
31    packed::{self, Byte32},
32    prelude::*,
33};
34use ckb_util::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard, shrink_to_fit};
35use dashmap::{self, DashMap};
36use keyed_priority_queue::{self, KeyedPriorityQueue};
37use lru::LruCache;
38use std::collections::{BTreeMap, HashMap, HashSet, btree_map::Entry};
39use std::hash::Hash;
40use std::sync::Arc;
41use std::sync::atomic::{AtomicUsize, Ordering};
42use std::time::{Duration, Instant};
43use std::{cmp, fmt, iter};
44
45use crate::utils::send_message;
46
47const GET_HEADERS_CACHE_SIZE: usize = 10000;
48// TODO: Need discussed
49const GET_HEADERS_TIMEOUT: Duration = Duration::from_secs(15);
50const FILTER_SIZE: usize = 50000;
51// 2 ** 13 < 6 * 1800 < 2 ** 14
52const ONE_DAY_BLOCK_NUMBER: u64 = 8192;
53pub(crate) const FILTER_TTL: u64 = 4 * 60 * 60;
54
55// State used to enforce CHAIN_SYNC_TIMEOUT
56// Only in effect for connections that are outbound, non-manual,
57// non-protected and non-whitelist.
58// Algorithm: if a peer's best known block has less work than our tip,
59// set a timeout CHAIN_SYNC_TIMEOUT seconds in the future:
60//   - If at timeout their best known block now has more work than our tip
61//     when the timeout was set, then either reset the timeout or clear it
62//     (after comparing against our current tip's work)
63//   - If at timeout their best known block still has less work than our
64//     tip did when the timeout was set, then send a getheaders message,
65//     and set a shorter timeout, HEADERS_RESPONSE_TIME seconds in future.
66//     If their best known block is still behind when that new timeout is
67//     reached, disconnect.
68
69#[derive(Clone, Debug, Default)]
70pub struct ChainSyncState {
71    pub timeout: u64,
72    pub work_header: Option<core::HeaderView>,
73    pub total_difficulty: Option<U256>,
74    pub sent_getheaders: bool,
75    headers_sync_state: HeadersSyncState,
76}
77
78impl ChainSyncState {
79    fn can_start_sync(&self, now: u64) -> bool {
80        match self.headers_sync_state {
81            HeadersSyncState::Initialized => false,
82            HeadersSyncState::SyncProtocolConnected => true,
83            HeadersSyncState::Started => false,
84            HeadersSyncState::Suspend(until) | HeadersSyncState::TipSynced(until) => until < now,
85        }
86    }
87
88    fn connected(&mut self) {
89        self.headers_sync_state = HeadersSyncState::SyncProtocolConnected;
90    }
91
92    fn start(&mut self) {
93        self.headers_sync_state = HeadersSyncState::Started
94    }
95
96    fn suspend(&mut self, until: u64) {
97        self.headers_sync_state = HeadersSyncState::Suspend(until)
98    }
99
100    fn tip_synced(&mut self) {
101        let now = unix_time_as_millis();
102        let avg_interval = (MAX_BLOCK_INTERVAL + MIN_BLOCK_INTERVAL) / 2;
103        self.headers_sync_state = HeadersSyncState::TipSynced(now + avg_interval * 1000);
104    }
105
106    fn started(&self) -> bool {
107        matches!(self.headers_sync_state, HeadersSyncState::Started)
108    }
109
110    fn started_or_tip_synced(&self) -> bool {
111        matches!(
112            self.headers_sync_state,
113            HeadersSyncState::Started | HeadersSyncState::TipSynced(_)
114        )
115    }
116}
117
118#[derive(Default, Clone, Debug)]
119enum HeadersSyncState {
120    #[default]
121    Initialized,
122    SyncProtocolConnected,
123    Started,
124    Suspend(u64), // suspend headers sync until this timestamp (milliseconds since unix epoch)
125    TipSynced(u64), // already synced to the end, not as the sync target for the time being, until the pause time is exceeded
126}
127
128#[derive(Clone, Default, Debug, Copy)]
129pub struct PeerFlags {
130    pub is_outbound: bool,
131    pub is_protect: bool,
132    pub is_whitelist: bool,
133    pub is_2023edition: bool,
134}
135
136#[derive(Clone, Default, Debug, Copy)]
137pub struct HeadersSyncController {
138    // The timestamp when sync started
139    pub(crate) started_ts: u64,
140    // The timestamp of better tip header when sync started
141    pub(crate) started_tip_ts: u64,
142
143    // The timestamp when the process last updated
144    pub(crate) last_updated_ts: u64,
145    // The timestamp of better tip header when the process last updated
146    pub(crate) last_updated_tip_ts: u64,
147
148    pub(crate) is_close_to_the_end: bool,
149}
150
151impl HeadersSyncController {
152    #[cfg(test)]
153    pub(crate) fn new(
154        started_ts: u64,
155        started_tip_ts: u64,
156        last_updated_ts: u64,
157        last_updated_tip_ts: u64,
158        is_close_to_the_end: bool,
159    ) -> Self {
160        Self {
161            started_ts,
162            started_tip_ts,
163            last_updated_ts,
164            last_updated_tip_ts,
165            is_close_to_the_end,
166        }
167    }
168
169    pub(crate) fn from_header(better_tip_header: &HeaderIndexView) -> Self {
170        let started_ts = unix_time_as_millis();
171        let started_tip_ts = better_tip_header.timestamp();
172        Self {
173            started_ts,
174            started_tip_ts,
175            last_updated_ts: started_ts,
176            last_updated_tip_ts: started_tip_ts,
177            is_close_to_the_end: false,
178        }
179    }
180
181    // https://github.com/rust-lang/rust-clippy/pull/8738
182    // wrong_self_convention allows is_* to take &mut self
183    #[allow(clippy::wrong_self_convention)]
184    pub(crate) fn is_timeout(&mut self, now_tip_ts: u64, now: u64) -> Option<bool> {
185        let inspect_window = HEADERS_DOWNLOAD_INSPECT_WINDOW;
186        let expected_headers_per_sec = HEADERS_DOWNLOAD_HEADERS_PER_SECOND;
187        let tolerable_bias = HEADERS_DOWNLOAD_TOLERABLE_BIAS_FOR_SINGLE_SAMPLE;
188
189        let expected_before_finished = now.saturating_sub(now_tip_ts);
190
191        trace!("headers-sync: better tip ts {}; now {}", now_tip_ts, now);
192
193        if self.is_close_to_the_end {
194            let expected_in_base_time =
195                expected_headers_per_sec * inspect_window * POW_INTERVAL / 1000;
196            if expected_before_finished > expected_in_base_time {
197                self.started_ts = now;
198                self.started_tip_ts = now_tip_ts;
199                self.last_updated_ts = now;
200                self.last_updated_tip_ts = now_tip_ts;
201                self.is_close_to_the_end = false;
202                // if the node is behind the estimated tip header too much, sync again;
203                trace!(
204                    "headers-sync: send GetHeaders again since we are significantly behind the tip"
205                );
206                None
207            } else {
208                // ignore timeout because the tip already almost reach the real time;
209                // we can sync to the estimated tip in 1 inspect window by the slowest speed that we can accept.
210                Some(false)
211            }
212        } else if expected_before_finished < inspect_window {
213            self.is_close_to_the_end = true;
214            trace!("headers-sync: ignore timeout because the tip almost reaches the real time");
215            Some(false)
216        } else {
217            let spent_since_last_updated = now.saturating_sub(self.last_updated_ts);
218
219            if spent_since_last_updated < inspect_window {
220                // ignore timeout because the time spent since last updated is not enough as a sample
221                Some(false)
222            } else {
223                let synced_since_last_updated = now_tip_ts.saturating_sub(self.last_updated_tip_ts);
224                let expected_since_last_updated =
225                    expected_headers_per_sec * spent_since_last_updated * POW_INTERVAL / 1000;
226
227                if synced_since_last_updated < expected_since_last_updated / tolerable_bias {
228                    // if instantaneous speed is too slow, we don't care the global average speed
229                    trace!("headers-sync: the instantaneous speed is too slow");
230                    Some(true)
231                } else {
232                    self.last_updated_ts = now;
233                    self.last_updated_tip_ts = now_tip_ts;
234
235                    if synced_since_last_updated > expected_since_last_updated {
236                        trace!("headers-sync: the instantaneous speed is acceptable");
237                        Some(false)
238                    } else {
239                        // tolerate more bias for instantaneous speed, we will check the global average speed
240                        let spent_since_started = now.saturating_sub(self.started_ts);
241                        let synced_since_started = now_tip_ts.saturating_sub(self.started_tip_ts);
242
243                        let expected_since_started =
244                            expected_headers_per_sec * spent_since_started * POW_INTERVAL / 1000;
245
246                        if synced_since_started < expected_since_started {
247                            // the global average speed is too slow
248                            trace!(
249                                "headers-sync: both the global average speed and the instantaneous speed \
250                                are slower than expected"
251                            );
252                            Some(true)
253                        } else {
254                            trace!("headers-sync: the global average speed is acceptable");
255                            Some(false)
256                        }
257                    }
258                }
259            }
260        }
261    }
262}
263
264#[derive(Clone, Default, Debug)]
265pub struct PeerState {
266    pub headers_sync_controller: Option<HeadersSyncController>,
267    pub peer_flags: PeerFlags,
268    pub chain_sync: ChainSyncState,
269    // The best known block we know this peer has announced
270    pub best_known_header: Option<HeaderIndex>,
271    // The last block we both stored
272    pub last_common_header: Option<BlockNumberAndHash>,
273    // use on ibd concurrent block download
274    // save `get_headers` locator hashes here
275    pub unknown_header_list: Vec<Byte32>,
276}
277
278impl PeerState {
279    pub fn new(peer_flags: PeerFlags) -> PeerState {
280        PeerState {
281            headers_sync_controller: None,
282            peer_flags,
283            chain_sync: ChainSyncState::default(),
284            best_known_header: None,
285            last_common_header: None,
286            unknown_header_list: Vec::new(),
287        }
288    }
289
290    pub fn can_start_sync(&self, now: u64, ibd: bool) -> bool {
291        // only sync with protect/whitelist peer in IBD
292        ((self.peer_flags.is_protect || self.peer_flags.is_whitelist) || !ibd)
293            && self.chain_sync.can_start_sync(now)
294    }
295
296    pub fn start_sync(&mut self, headers_sync_controller: HeadersSyncController) {
297        self.chain_sync.start();
298        self.headers_sync_controller = Some(headers_sync_controller);
299    }
300
301    fn suspend_sync(&mut self, suspend_time: u64) {
302        let now = unix_time_as_millis();
303        self.chain_sync.suspend(now + suspend_time);
304        self.headers_sync_controller = None;
305    }
306
307    fn tip_synced(&mut self) {
308        self.chain_sync.tip_synced();
309        self.headers_sync_controller = None;
310    }
311
312    pub(crate) fn sync_started(&self) -> bool {
313        self.chain_sync.started()
314    }
315
316    pub(crate) fn started_or_tip_synced(&self) -> bool {
317        self.chain_sync.started_or_tip_synced()
318    }
319
320    pub(crate) fn sync_connected(&mut self) {
321        self.chain_sync.connected()
322    }
323}
324
325pub struct TtlFilter<T> {
326    inner: LruCache<T, u64>,
327    ttl: u64,
328}
329
330impl<T: Eq + Hash + Clone> Default for TtlFilter<T> {
331    fn default() -> Self {
332        TtlFilter::new(FILTER_SIZE, FILTER_TTL)
333    }
334}
335
336impl<T: Eq + Hash + Clone> TtlFilter<T> {
337    pub fn new(size: usize, ttl: u64) -> Self {
338        Self {
339            inner: LruCache::new(size),
340            ttl,
341        }
342    }
343
344    pub fn contains(&self, item: &T) -> bool {
345        self.inner.contains(item)
346    }
347
348    pub fn insert(&mut self, item: T) -> bool {
349        let now = ckb_systemtime::unix_time().as_secs();
350        self.inner.put(item, now).is_none()
351    }
352
353    pub fn remove(&mut self, item: &T) -> bool {
354        self.inner.pop(item).is_some()
355    }
356
357    /// Removes expired items.
358    pub fn remove_expired(&mut self) {
359        let now = ckb_systemtime::unix_time().as_secs();
360        let expired_keys: Vec<T> = self
361            .inner
362            .iter()
363            .filter_map(|(key, time)| {
364                if *time + self.ttl < now {
365                    Some(key)
366                } else {
367                    None
368                }
369            })
370            .cloned()
371            .collect();
372
373        for k in expired_keys {
374            self.remove(&k);
375        }
376    }
377}
378
379#[derive(Default)]
380pub struct Peers {
381    pub state: DashMap<PeerIndex, PeerState>,
382    pub n_sync_started: AtomicUsize,
383    pub n_protected_outbound_peers: AtomicUsize,
384}
385
386#[derive(Debug, Clone)]
387pub struct InflightState {
388    pub(crate) peer: PeerIndex,
389    pub(crate) timestamp: u64,
390}
391
392impl InflightState {
393    fn new(peer: PeerIndex) -> Self {
394        Self {
395            peer,
396            timestamp: unix_time_as_millis(),
397        }
398    }
399}
400
401enum TimeQuantile {
402    MinToFast,
403    FastToNormal,
404    NormalToUpper,
405    UpperToMax,
406}
407
408/// Using 512 blocks as a period, dynamically adjust the scheduler's time standard
409/// Divided into three time periods, including:
410///
411/// | fast | normal | penalty | double penalty |
412///
413/// The dividing line is, 1/3 position, 4/5 position, 1/10 position.
414///
415/// There is 14/30 normal area, 1/10 penalty area, 1/10 double penalty area, 1/3 accelerated reward area.
416///
417/// Most of the nodes that fall in the normal and accelerated reward area will be retained,
418/// while most of the nodes that fall in the normal and penalty zones will be slowly eliminated
419///
420/// The purpose of dynamic tuning is to reduce the consumption problem of sync networks
421/// by retaining the vast majority of nodes with stable communications and
422/// cleaning up nodes with significantly lower response times than a certain level
423#[derive(Clone)]
424struct TimeAnalyzer {
425    trace: [u64; TIME_TRACE_SIZE],
426    index: usize,
427    fast_time: u64,
428    normal_time: u64,
429    low_time: u64,
430}
431
432impl fmt::Debug for TimeAnalyzer {
433    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
434        fmt.debug_struct("TimeAnalyzer")
435            .field("fast_time", &self.fast_time)
436            .field("normal_time", &self.normal_time)
437            .field("low_time", &self.low_time)
438            .finish()
439    }
440}
441
442impl Default for TimeAnalyzer {
443    fn default() -> Self {
444        // Block max size about 700k, Under 10m/s bandwidth it may cost 1s to response
445        Self {
446            trace: [0; TIME_TRACE_SIZE],
447            index: 0,
448            fast_time: 1000,
449            normal_time: 1250,
450            low_time: 1500,
451        }
452    }
453}
454
455impl TimeAnalyzer {
456    fn push_time(&mut self, time: u64) -> TimeQuantile {
457        if self.index < TIME_TRACE_SIZE {
458            self.trace[self.index] = time;
459            self.index += 1;
460        } else {
461            self.trace.sort_unstable();
462            self.fast_time = (self.fast_time.saturating_add(self.trace[FAST_INDEX])) >> 1;
463            self.normal_time = (self.normal_time.saturating_add(self.trace[NORMAL_INDEX])) >> 1;
464            self.low_time = (self.low_time.saturating_add(self.trace[LOW_INDEX])) >> 1;
465            self.trace[0] = time;
466            self.index = 1;
467        }
468
469        if time <= self.fast_time {
470            TimeQuantile::MinToFast
471        } else if time <= self.normal_time {
472            TimeQuantile::FastToNormal
473        } else if time > self.low_time {
474            TimeQuantile::UpperToMax
475        } else {
476            TimeQuantile::NormalToUpper
477        }
478    }
479}
480
481#[derive(Debug, Clone)]
482pub struct DownloadScheduler {
483    task_count: usize,
484    timeout_count: usize,
485    hashes: HashSet<BlockNumberAndHash>,
486}
487
488impl Default for DownloadScheduler {
489    fn default() -> Self {
490        Self {
491            hashes: HashSet::default(),
492            task_count: INIT_BLOCKS_IN_TRANSIT_PER_PEER,
493            timeout_count: 0,
494        }
495    }
496}
497
498impl DownloadScheduler {
499    fn inflight_count(&self) -> usize {
500        self.hashes.len()
501    }
502
503    fn can_fetch(&self) -> usize {
504        self.task_count.saturating_sub(self.hashes.len())
505    }
506
507    pub(crate) const fn task_count(&self) -> usize {
508        self.task_count
509    }
510
511    fn increase(&mut self, num: usize) {
512        if self.task_count < MAX_BLOCKS_IN_TRANSIT_PER_PEER {
513            self.task_count = ::std::cmp::min(
514                self.task_count.saturating_add(num),
515                MAX_BLOCKS_IN_TRANSIT_PER_PEER,
516            )
517        }
518    }
519
520    fn decrease(&mut self, num: usize) {
521        self.timeout_count = self.task_count.saturating_add(num);
522        if self.timeout_count > 2 {
523            self.task_count = self.task_count.saturating_sub(1);
524            self.timeout_count = 0;
525        }
526    }
527
528    fn punish(&mut self, exp: usize) {
529        self.task_count >>= exp
530    }
531}
532
533#[derive(Clone)]
534pub struct InflightBlocks {
535    pub(crate) download_schedulers: HashMap<PeerIndex, DownloadScheduler>,
536    inflight_states: BTreeMap<BlockNumberAndHash, InflightState>,
537    pub(crate) trace_number: HashMap<BlockNumberAndHash, u64>,
538    pub(crate) restart_number: BlockNumber,
539    time_analyzer: TimeAnalyzer,
540    pub(crate) adjustment: bool,
541    pub(crate) protect_num: usize,
542}
543
544impl Default for InflightBlocks {
545    fn default() -> Self {
546        InflightBlocks {
547            download_schedulers: HashMap::default(),
548            inflight_states: BTreeMap::default(),
549            trace_number: HashMap::default(),
550            restart_number: 0,
551            time_analyzer: TimeAnalyzer::default(),
552            adjustment: true,
553            protect_num: MAX_OUTBOUND_PEERS_TO_PROTECT_FROM_DISCONNECT,
554        }
555    }
556}
557
558struct DebugHashSet<'a>(&'a HashSet<BlockNumberAndHash>);
559
560impl<'a> fmt::Debug for DebugHashSet<'a> {
561    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
562        fmt.debug_set()
563            .entries(self.0.iter().map(|h| format!("{}, {}", h.number, h.hash)))
564            .finish()
565    }
566}
567
568impl fmt::Debug for InflightBlocks {
569    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
570        fmt.debug_map()
571            .entries(
572                self.download_schedulers
573                    .iter()
574                    .map(|(k, v)| (k, DebugHashSet(&v.hashes))),
575            )
576            .finish()?;
577        fmt.debug_map()
578            .entries(
579                self.inflight_states
580                    .iter()
581                    .map(|(k, v)| (format!("{}, {}", k.number, k.hash), v)),
582            )
583            .finish()?;
584        self.time_analyzer.fmt(fmt)
585    }
586}
587
588impl InflightBlocks {
589    pub fn blocks_iter(&self) -> impl Iterator<Item = (&PeerIndex, &HashSet<BlockNumberAndHash>)> {
590        self.download_schedulers.iter().map(|(k, v)| (k, &v.hashes))
591    }
592
593    pub fn total_inflight_count(&self) -> usize {
594        self.inflight_states.len()
595    }
596
597    pub fn division_point(&self) -> (u64, u64, u64) {
598        (
599            self.time_analyzer.fast_time,
600            self.time_analyzer.normal_time,
601            self.time_analyzer.low_time,
602        )
603    }
604
605    pub fn peer_inflight_count(&self, peer: PeerIndex) -> usize {
606        self.download_schedulers
607            .get(&peer)
608            .map(DownloadScheduler::inflight_count)
609            .unwrap_or(0)
610    }
611
612    pub fn peer_can_fetch_count(&self, peer: PeerIndex) -> usize {
613        self.download_schedulers.get(&peer).map_or(
614            INIT_BLOCKS_IN_TRANSIT_PER_PEER,
615            DownloadScheduler::can_fetch,
616        )
617    }
618
619    pub fn inflight_block_by_peer(&self, peer: PeerIndex) -> Option<&HashSet<BlockNumberAndHash>> {
620        self.download_schedulers.get(&peer).map(|d| &d.hashes)
621    }
622
623    pub fn inflight_state_by_block(&self, block: &BlockNumberAndHash) -> Option<&InflightState> {
624        self.inflight_states.get(block)
625    }
626
627    pub fn mark_slow_block(&mut self, tip: BlockNumber) {
628        let now = ckb_systemtime::unix_time_as_millis();
629        for key in self.inflight_states.keys() {
630            if key.number > tip + 1 {
631                break;
632            }
633            self.trace_number.entry(key.clone()).or_insert(now);
634        }
635    }
636
637    pub fn prune(&mut self, tip: BlockNumber) -> HashSet<PeerIndex> {
638        let now = unix_time_as_millis();
639        let mut disconnect_list = HashSet::new();
640        // Since statistics are currently disturbed by the processing block time, when the number
641        // of transactions increases, the node will be accidentally evicted.
642        //
643        // Especially on machines with poor CPU performance, the node connection will be frequently
644        // disconnected due to statistics.
645        //
646        // In order to protect the decentralization of the network and ensure the survival of low-performance
647        // nodes, the penalty mechanism will be closed when the number of download nodes is less than the number of protected nodes
648        let should_punish = self.download_schedulers.len() > self.protect_num;
649        let adjustment = self.adjustment;
650
651        let trace = &mut self.trace_number;
652        let download_schedulers = &mut self.download_schedulers;
653        let states = &mut self.inflight_states;
654
655        let mut remove_key = Vec::new();
656        // Since this is a btreemap, with the data already sorted,
657        // we don't have to worry about missing points, and we don't need to
658        // iterate through all the data each time, just check within tip + 20,
659        // with the checkpoint marking possible blocking points, it's enough
660        let end = tip + 20;
661        for (key, value) in states.iter() {
662            if key.number > end {
663                break;
664            }
665            if value.timestamp + BLOCK_DOWNLOAD_TIMEOUT < now {
666                if let Some(set) = download_schedulers.get_mut(&value.peer) {
667                    set.hashes.remove(key);
668                    if should_punish && adjustment {
669                        set.punish(2);
670                    }
671                };
672                if !trace.is_empty() {
673                    trace.remove(key);
674                }
675                remove_key.push(key.clone());
676                debug!(
677                    "prune: remove InflightState: remove {}-{} from {}",
678                    key.number, key.hash, value.peer
679                );
680
681                if let Some(metrics) = ckb_metrics::handle() {
682                    metrics.ckb_inflight_timeout_count.inc();
683                }
684            }
685        }
686
687        for key in remove_key {
688            states.remove(&key);
689        }
690
691        download_schedulers.retain(|k, v| {
692            // task number zero means this peer's response is very slow
693            if v.task_count == 0 {
694                disconnect_list.insert(*k);
695                false
696            } else {
697                true
698            }
699        });
700        shrink_to_fit!(download_schedulers, SHRINK_THRESHOLD);
701
702        if self.restart_number != 0 && tip + 1 > self.restart_number {
703            self.restart_number = 0;
704        }
705
706        // Since each environment is different, the policy here must also be dynamically adjusted
707        // according to the current environment, and a low-level limit is given here, since frequent
708        // restarting of a task consumes more than a low-level limit
709        let timeout_limit = self.time_analyzer.low_time;
710
711        let restart_number = &mut self.restart_number;
712        trace.retain(|key, time| {
713            // In the normal state, trace will always empty
714            //
715            // When the inflight request reaches the checkpoint(inflight > tip + 512),
716            // it means that there is an anomaly in the sync less than tip + 1, i.e. some nodes are stuck,
717            // at which point it will be recorded as the timestamp at that time.
718            //
719            // If the time exceeds low time limit, delete the task and halve the number of
720            // executable tasks for the corresponding node
721            if now > timeout_limit + *time {
722                if let Some(state) = states.remove(key) {
723                    if let Some(d) = download_schedulers.get_mut(&state.peer) {
724                        if should_punish && adjustment {
725                            d.punish(1);
726                        }
727                        d.hashes.remove(key);
728                        debug!(
729                            "prune: remove download_schedulers: remove {}-{} from {}",
730                            key.number, key.hash, state.peer
731                        );
732                    };
733                }
734
735                if key.number > *restart_number {
736                    *restart_number = key.number;
737                }
738                return false;
739            }
740            true
741        });
742        shrink_to_fit!(trace, SHRINK_THRESHOLD);
743
744        disconnect_list
745    }
746
747    pub fn insert(&mut self, peer: PeerIndex, block: BlockNumberAndHash) -> bool {
748        let state = self.inflight_states.entry(block.clone());
749        match state {
750            Entry::Occupied(_entry) => return false,
751            Entry::Vacant(entry) => entry.insert(InflightState::new(peer)),
752        };
753
754        if self.restart_number >= block.number {
755            // All new requests smaller than restart_number mean that they are cleaned up and
756            // cannot be immediately marked as cleaned up again.
757            self.trace_number
758                .insert(block.clone(), unix_time_as_millis());
759        }
760
761        let download_scheduler = self.download_schedulers.entry(peer).or_default();
762        download_scheduler.hashes.insert(block)
763    }
764
765    pub fn remove_by_peer(&mut self, peer: PeerIndex) -> usize {
766        let trace = &mut self.trace_number;
767        let state = &mut self.inflight_states;
768
769        self.download_schedulers
770            .remove(&peer)
771            .map(|blocks| {
772                let blocks_count = blocks.hashes.iter().len();
773                for block in blocks.hashes {
774                    state.remove(&block);
775                    if !trace.is_empty() {
776                        trace.remove(&block);
777                    }
778                }
779                blocks_count
780            })
781            .unwrap_or_default()
782    }
783
784    pub fn remove_by_block(&mut self, block: BlockNumberAndHash) -> bool {
785        let should_punish = self.download_schedulers.len() > self.protect_num;
786        let download_schedulers = &mut self.download_schedulers;
787        let trace = &mut self.trace_number;
788        let time_analyzer = &mut self.time_analyzer;
789        let adjustment = self.adjustment;
790        self.inflight_states
791            .remove(&block)
792            .map(|state| {
793                let elapsed = unix_time_as_millis().saturating_sub(state.timestamp);
794                if let Some(set) = download_schedulers.get_mut(&state.peer) {
795                    set.hashes.remove(&block);
796                    if adjustment {
797                        match time_analyzer.push_time(elapsed) {
798                            TimeQuantile::MinToFast => set.increase(2),
799                            TimeQuantile::FastToNormal => set.increase(1),
800                            TimeQuantile::NormalToUpper => {
801                                if should_punish {
802                                    set.decrease(1)
803                                }
804                            }
805                            TimeQuantile::UpperToMax => {
806                                if should_punish {
807                                    set.decrease(2)
808                                }
809                            }
810                        }
811                    }
812                    if !trace.is_empty() {
813                        trace.remove(&block);
814                    }
815                };
816            })
817            .is_some()
818    }
819}
820
821impl Peers {
822    pub fn sync_connected(
823        &self,
824        peer: PeerIndex,
825        is_outbound: bool,
826        is_whitelist: bool,
827        is_2023edition: bool,
828    ) {
829        let protect_outbound = is_outbound
830            && self
831                .n_protected_outbound_peers
832                .fetch_update(Ordering::AcqRel, Ordering::Acquire, |x| {
833                    if x < MAX_OUTBOUND_PEERS_TO_PROTECT_FROM_DISCONNECT {
834                        Some(x + 1)
835                    } else {
836                        None
837                    }
838                })
839                .is_ok();
840
841        let peer_flags = PeerFlags {
842            is_outbound,
843            is_whitelist,
844            is_2023edition,
845            is_protect: protect_outbound,
846        };
847        self.state
848            .entry(peer)
849            .and_modify(|state| {
850                state.peer_flags = peer_flags;
851                state.sync_connected();
852            })
853            .or_insert_with(|| {
854                let mut state = PeerState::new(peer_flags);
855                state.sync_connected();
856                state
857            });
858    }
859
860    pub fn relay_connected(&self, peer: PeerIndex) {
861        self.state
862            .entry(peer)
863            .or_insert_with(|| PeerState::new(PeerFlags::default()));
864    }
865
866    pub fn get_best_known_header(&self, pi: PeerIndex) -> Option<HeaderIndex> {
867        self.state
868            .get(&pi)
869            .and_then(|peer_state| peer_state.best_known_header.clone())
870    }
871
872    pub fn may_set_best_known_header(&self, peer: PeerIndex, header_index: HeaderIndex) {
873        if let Some(mut peer_state) = self.state.get_mut(&peer) {
874            if let Some(ref known) = peer_state.best_known_header {
875                if header_index.is_better_chain(known) {
876                    peer_state.best_known_header = Some(header_index);
877                }
878            } else {
879                peer_state.best_known_header = Some(header_index);
880            }
881        }
882    }
883
884    pub fn get_last_common_header(&self, pi: PeerIndex) -> Option<BlockNumberAndHash> {
885        self.state
886            .get(&pi)
887            .and_then(|peer_state| peer_state.last_common_header.clone())
888    }
889
890    pub fn set_last_common_header(&self, pi: PeerIndex, header: BlockNumberAndHash) {
891        self.state
892            .entry(pi)
893            .and_modify(|peer_state| peer_state.last_common_header = Some(header));
894    }
895
896    pub fn getheaders_received(&self, _peer: PeerIndex) {
897        // TODO:
898    }
899
900    pub fn disconnected(&self, peer: PeerIndex) {
901        if let Some(peer_state) = self.state.remove(&peer).map(|(_, peer_state)| peer_state) {
902            if peer_state.sync_started() {
903                // It shouldn't happen
904                // fetch_sub wraps around on overflow, we still check manually
905                // panic here to prevent some bug be hidden silently.
906                assert_ne!(
907                    self.n_sync_started.fetch_sub(1, Ordering::AcqRel),
908                    0,
909                    "n_sync_started overflow when disconnects"
910                );
911            }
912
913            // Protection node disconnected
914            if peer_state.peer_flags.is_protect {
915                assert_ne!(
916                    self.n_protected_outbound_peers
917                        .fetch_sub(1, Ordering::AcqRel),
918                    0,
919                    "n_protected_outbound_peers overflow when disconnects"
920                );
921            }
922        }
923    }
924
925    pub fn insert_unknown_header_hash(&self, peer: PeerIndex, hash: Byte32) {
926        self.state
927            .entry(peer)
928            .and_modify(|state| state.unknown_header_list.push(hash));
929    }
930
931    pub fn unknown_header_list_is_empty(&self, peer: PeerIndex) -> bool {
932        self.state
933            .get(&peer)
934            .map(|state| state.unknown_header_list.is_empty())
935            .unwrap_or(true)
936    }
937
938    pub fn clear_unknown_list(&self) {
939        self.state.iter_mut().for_each(|mut state| {
940            if !state.unknown_header_list.is_empty() {
941                state.unknown_header_list.clear()
942            }
943        })
944    }
945
946    pub fn get_best_known_less_than_tip_and_unknown_empty(
947        &self,
948        tip: BlockNumber,
949    ) -> Vec<PeerIndex> {
950        self.state
951            .iter()
952            .filter_map(|kv_pair| {
953                let (peer_index, state) = kv_pair.pair();
954                if !state.unknown_header_list.is_empty() {
955                    return None;
956                }
957                if let Some(ref header) = state.best_known_header {
958                    if header.number() < tip {
959                        return Some(*peer_index);
960                    }
961                }
962                None
963            })
964            .collect()
965    }
966
967    pub fn take_unknown_last(&self, peer: PeerIndex) -> Option<Byte32> {
968        self.state
969            .get_mut(&peer)
970            .and_then(|mut state| state.unknown_header_list.pop())
971    }
972
973    pub fn get_flag(&self, peer: PeerIndex) -> Option<PeerFlags> {
974        self.state.get(&peer).map(|state| state.peer_flags)
975    }
976}
977
978// <CompactBlockHash, (CompactBlock, <PeerIndex, (Vec<TransactionsIndex>, Vec<UnclesIndex>)>, timestamp)>
979pub(crate) type PendingCompactBlockMap = HashMap<
980    Byte32,
981    (
982        packed::CompactBlock,
983        HashMap<PeerIndex, (Vec<u32>, Vec<u32>)>,
984        u64,
985    ),
986>;
987
988/// Sync state shared between sync and relayer protocol
989#[derive(Clone)]
990pub struct SyncShared {
991    shared: Shared,
992    state: Arc<SyncState>,
993}
994
995impl SyncShared {
996    /// Create a SyncShared
997    pub fn new(
998        shared: Shared,
999        sync_config: SyncConfig,
1000        tx_relay_receiver: Receiver<TxVerificationResult>,
1001    ) -> SyncShared {
1002        let (total_difficulty, header) = {
1003            let snapshot = shared.snapshot();
1004            (
1005                snapshot.total_difficulty().to_owned(),
1006                snapshot.tip_header().to_owned(),
1007            )
1008        };
1009        let shared_best_header = RwLock::new((header, total_difficulty).into());
1010        info!(
1011            "header_map.memory_limit {}",
1012            sync_config.header_map.memory_limit
1013        );
1014
1015        let state = SyncState {
1016            shared_best_header,
1017            tx_filter: Mutex::new(TtlFilter::default()),
1018            unknown_tx_hashes: Mutex::new(KeyedPriorityQueue::new()),
1019            peers: Peers::default(),
1020            pending_get_block_proposals: DashMap::new(),
1021            pending_compact_blocks: Mutex::new(HashMap::default()),
1022            inflight_proposals: DashMap::new(),
1023            inflight_blocks: RwLock::new(InflightBlocks::default()),
1024            pending_get_headers: RwLock::new(LruCache::new(GET_HEADERS_CACHE_SIZE)),
1025            tx_relay_receiver,
1026            min_chain_work: sync_config.min_chain_work,
1027        };
1028
1029        SyncShared {
1030            shared,
1031            state: Arc::new(state),
1032        }
1033    }
1034
1035    /// Shared chain db/config
1036    pub fn shared(&self) -> &Shared {
1037        &self.shared
1038    }
1039
1040    /// Get snapshot with current chain
1041    pub fn active_chain(&self) -> ActiveChain {
1042        ActiveChain {
1043            sync_shared: self.clone(),
1044            snapshot: Arc::clone(&self.shared.snapshot()),
1045        }
1046    }
1047
1048    /// Get chain store
1049    pub fn store(&self) -> &ChainDB {
1050        self.shared.store()
1051    }
1052
1053    /// Get sync state
1054    pub fn state(&self) -> &SyncState {
1055        &self.state
1056    }
1057
1058    /// Get consensus config
1059    pub fn consensus(&self) -> &Consensus {
1060        self.shared.consensus()
1061    }
1062
1063    // Only used by unit test
1064    // Blocking insert a new block, return the verify result
1065    #[cfg(test)]
1066    pub(crate) fn blocking_insert_new_block(
1067        &self,
1068        chain: &ChainController,
1069        block: Arc<core::BlockView>,
1070    ) -> VerifyResult {
1071        chain.blocking_process_block(block)
1072    }
1073
1074    pub(crate) fn accept_remote_block(&self, chain: &ChainController, remote_block: RemoteBlock) {
1075        {
1076            let entry = self
1077                .shared()
1078                .block_status_map()
1079                .entry(remote_block.block.header().hash());
1080            if let dashmap::mapref::entry::Entry::Vacant(entry) = entry {
1081                entry.insert(BlockStatus::BLOCK_RECEIVED);
1082            }
1083        }
1084
1085        chain.asynchronous_process_remote_block(remote_block)
1086    }
1087
1088    /// Sync a new valid header, try insert to sync state
1089    // Update the header_map
1090    // Update the block_status_map
1091    // Update the shared_best_header if need
1092    // Update the peer's best_known_header
1093    pub fn insert_valid_header(&self, peer: PeerIndex, header: &core::HeaderView) {
1094        let tip_number = self.active_chain().tip_number();
1095        let store_first = tip_number >= header.number();
1096        // We don't use header#parent_hash clone here because it will hold the arc counter of the SendHeaders message
1097        // which will cause the 2000 headers to be held in memory for a long time
1098        let parent_hash = Byte32::from_slice(header.data().raw().parent_hash().as_slice())
1099            .expect("checked slice length");
1100        let parent_header_index = self
1101            .get_header_index_view(&parent_hash, store_first)
1102            .expect("parent should be verified");
1103        let mut header_view = HeaderIndexView::new(
1104            header.hash(),
1105            header.number(),
1106            header.epoch(),
1107            header.timestamp(),
1108            parent_hash,
1109            parent_header_index.total_difficulty() + header.difficulty(),
1110        );
1111
1112        let snapshot = Arc::clone(&self.shared.snapshot());
1113        header_view.build_skip(
1114            tip_number,
1115            |hash, store_first| self.get_header_index_view(hash, store_first),
1116            |number, current| {
1117                // shortcut to return an ancestor block
1118                if current.number <= snapshot.tip_number() && snapshot.is_main_chain(&current.hash)
1119                {
1120                    snapshot
1121                        .get_block_hash(number)
1122                        .and_then(|hash| self.get_header_index_view(&hash, true))
1123                } else {
1124                    None
1125                }
1126            },
1127        );
1128        self.shared.header_map().insert(header_view.clone());
1129        self.state
1130            .peers()
1131            .may_set_best_known_header(peer, header_view.as_header_index());
1132        self.state.may_set_shared_best_header(header_view);
1133    }
1134
1135    pub(crate) fn get_header_index_view(
1136        &self,
1137        hash: &Byte32,
1138        store_first: bool,
1139    ) -> Option<HeaderIndexView> {
1140        let store = self.store();
1141        if store_first {
1142            store
1143                .get_block_header(hash)
1144                .and_then(|header| {
1145                    store
1146                        .get_block_ext(hash)
1147                        .map(|block_ext| (header, block_ext.total_difficulty).into())
1148                })
1149                .or_else(|| self.shared.header_map().get(hash))
1150        } else {
1151            self.shared.header_map().get(hash).or_else(|| {
1152                store.get_block_header(hash).and_then(|header| {
1153                    store
1154                        .get_block_ext(hash)
1155                        .map(|block_ext| (header, block_ext.total_difficulty).into())
1156                })
1157            })
1158        }
1159    }
1160
1161    /// Check whether block has been inserted to chain store
1162    pub fn is_stored(&self, hash: &packed::Byte32) -> bool {
1163        let status = self.active_chain().get_block_status(hash);
1164        status.contains(BlockStatus::BLOCK_STORED)
1165    }
1166
1167    /// Get epoch ext by block hash
1168    pub fn get_epoch_ext(&self, hash: &Byte32) -> Option<EpochExt> {
1169        self.store().get_block_epoch(hash)
1170    }
1171
1172    /// Insert peer's unknown_header_list
1173    pub fn insert_peer_unknown_header_list(&self, pi: PeerIndex, header_list: Vec<Byte32>) {
1174        // update peer's unknown_header_list only once
1175        if self.state().peers.unknown_header_list_is_empty(pi) {
1176            // header list is an ordered list, sorted from highest to lowest,
1177            // so here you discard and exit early
1178            for hash in header_list {
1179                if let Some(header) = self.shared().header_map().get(&hash) {
1180                    self.state()
1181                        .peers
1182                        .may_set_best_known_header(pi, header.as_header_index());
1183                    break;
1184                } else {
1185                    self.state().peers.insert_unknown_header_hash(pi, hash)
1186                }
1187            }
1188        }
1189    }
1190
1191    /// Return true when the block is that we have requested and received first time.
1192    pub fn new_block_received(&self, block: &core::BlockView) -> bool {
1193        if !self
1194            .state()
1195            .write_inflight_blocks()
1196            .remove_by_block((block.number(), block.hash()).into())
1197        {
1198            return false;
1199        }
1200
1201        let status = self.active_chain().get_block_status(&block.hash());
1202        debug!(
1203            "new_block_received {}-{}, status: {:?}",
1204            block.number(),
1205            block.hash(),
1206            status
1207        );
1208        if !BlockStatus::HEADER_VALID.eq(&status) {
1209            return false;
1210        }
1211
1212        if let dashmap::mapref::entry::Entry::Vacant(status) =
1213            self.shared().block_status_map().entry(block.hash())
1214        {
1215            status.insert(BlockStatus::BLOCK_RECEIVED);
1216            return true;
1217        }
1218        false
1219    }
1220}
1221
1222impl HeaderFieldsProvider for SyncShared {
1223    fn get_header_fields(&self, hash: &Byte32) -> Option<HeaderFields> {
1224        self.shared
1225            .header_map()
1226            .get(hash)
1227            .map(|header| HeaderFields {
1228                hash: header.hash(),
1229                number: header.number(),
1230                epoch: header.epoch(),
1231                timestamp: header.timestamp(),
1232                parent_hash: header.parent_hash(),
1233            })
1234            .or_else(|| {
1235                self.store()
1236                    .get_block_header(hash)
1237                    .map(|header| HeaderFields {
1238                        hash: header.hash(),
1239                        number: header.number(),
1240                        epoch: header.epoch(),
1241                        timestamp: header.timestamp(),
1242                        parent_hash: header.parent_hash(),
1243                    })
1244            })
1245    }
1246}
1247
1248#[derive(Eq, PartialEq, Clone)]
1249pub struct UnknownTxHashPriority {
1250    request_time: Instant,
1251    peers: Vec<PeerIndex>,
1252    requested: bool,
1253}
1254
1255impl UnknownTxHashPriority {
1256    pub fn should_request(&self, now: Instant) -> bool {
1257        self.next_request_at() < now
1258    }
1259
1260    pub fn next_request_at(&self) -> Instant {
1261        if self.requested {
1262            self.request_time + RETRY_ASK_TX_TIMEOUT_INCREASE
1263        } else {
1264            self.request_time
1265        }
1266    }
1267
1268    pub fn next_request_peer(&mut self) -> Option<PeerIndex> {
1269        if self.requested {
1270            if self.peers.len() > 1 {
1271                self.request_time = Instant::now();
1272                self.peers.swap_remove(0);
1273                self.peers.first().cloned()
1274            } else {
1275                None
1276            }
1277        } else {
1278            self.requested = true;
1279            self.peers.first().cloned()
1280        }
1281    }
1282
1283    pub fn push_peer(&mut self, peer_index: PeerIndex) {
1284        self.peers.push(peer_index);
1285    }
1286
1287    pub fn requesting_peer(&self) -> Option<PeerIndex> {
1288        if self.requested {
1289            self.peers.first().cloned()
1290        } else {
1291            None
1292        }
1293    }
1294}
1295
1296impl Ord for UnknownTxHashPriority {
1297    fn cmp(&self, other: &Self) -> cmp::Ordering {
1298        self.next_request_at()
1299            .cmp(&other.next_request_at())
1300            .reverse()
1301    }
1302}
1303
1304impl PartialOrd for UnknownTxHashPriority {
1305    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
1306        Some(self.cmp(other))
1307    }
1308}
1309
1310pub struct SyncState {
1311    /* Status irrelevant to peers */
1312    shared_best_header: RwLock<HeaderIndexView>,
1313    tx_filter: Mutex<TtlFilter<Byte32>>,
1314
1315    // The priority is ordering by timestamp (reversed), means do not ask the tx before this timestamp (timeout).
1316    unknown_tx_hashes: Mutex<KeyedPriorityQueue<Byte32, UnknownTxHashPriority>>,
1317
1318    /* Status relevant to peers */
1319    peers: Peers,
1320
1321    /* Cached items which we had received but not completely process */
1322    pending_get_block_proposals: DashMap<packed::ProposalShortId, HashSet<PeerIndex>>,
1323    pending_get_headers: RwLock<LruCache<(PeerIndex, Byte32), Instant>>,
1324    pending_compact_blocks: Mutex<PendingCompactBlockMap>,
1325
1326    /* In-flight items for which we request to peers, but not got the responses yet */
1327    inflight_proposals: DashMap<packed::ProposalShortId, BlockNumber>,
1328    inflight_blocks: RwLock<InflightBlocks>,
1329
1330    /* cached for sending bulk */
1331    tx_relay_receiver: Receiver<TxVerificationResult>,
1332    min_chain_work: U256,
1333}
1334
1335impl SyncState {
1336    pub fn min_chain_work(&self) -> &U256 {
1337        &self.min_chain_work
1338    }
1339
1340    pub fn min_chain_work_ready(&self) -> bool {
1341        self.shared_best_header
1342            .read()
1343            .is_better_than(&self.min_chain_work)
1344    }
1345
1346    pub fn n_sync_started(&self) -> &AtomicUsize {
1347        &self.peers.n_sync_started
1348    }
1349
1350    pub fn peers(&self) -> &Peers {
1351        &self.peers
1352    }
1353
1354    pub fn compare_with_pending_compact(&self, hash: &Byte32, now: u64) -> bool {
1355        let pending = self.pending_compact_blocks.lock();
1356        // After compact block request 2s or pending is empty, sync can create tasks
1357        pending.is_empty()
1358            || pending
1359                .get(hash)
1360                .map(|(_, _, time)| now > time + 2000)
1361                .unwrap_or(true)
1362    }
1363
1364    pub fn pending_compact_blocks(&self) -> MutexGuard<PendingCompactBlockMap> {
1365        self.pending_compact_blocks.lock()
1366    }
1367
1368    pub fn read_inflight_blocks(&self) -> RwLockReadGuard<InflightBlocks> {
1369        self.inflight_blocks.read()
1370    }
1371
1372    pub fn write_inflight_blocks(&self) -> RwLockWriteGuard<InflightBlocks> {
1373        self.inflight_blocks.write()
1374    }
1375
1376    pub fn take_relay_tx_verify_results(&self, limit: usize) -> Vec<TxVerificationResult> {
1377        self.tx_relay_receiver.try_iter().take(limit).collect()
1378    }
1379
1380    pub fn shared_best_header(&self) -> HeaderIndexView {
1381        self.shared_best_header.read().to_owned()
1382    }
1383
1384    pub fn shared_best_header_ref(&self) -> RwLockReadGuard<HeaderIndexView> {
1385        self.shared_best_header.read()
1386    }
1387
1388    pub fn may_set_shared_best_header(&self, header: HeaderIndexView) {
1389        let mut shared_best_header = self.shared_best_header.write();
1390        if !header.is_better_than(shared_best_header.total_difficulty()) {
1391            return;
1392        }
1393
1394        if let Some(metrics) = ckb_metrics::handle() {
1395            metrics.ckb_shared_best_number.set(header.number() as i64);
1396        }
1397        *shared_best_header = header;
1398    }
1399
1400    pub(crate) fn suspend_sync(&self, peer_state: &mut PeerState) {
1401        if peer_state.sync_started() {
1402            assert_ne!(
1403                self.peers.n_sync_started.fetch_sub(1, Ordering::AcqRel),
1404                0,
1405                "n_sync_started overflow when suspend_sync"
1406            );
1407        }
1408        peer_state.suspend_sync(SUSPEND_SYNC_TIME);
1409    }
1410
1411    pub(crate) fn tip_synced(&self, peer_state: &mut PeerState) {
1412        if peer_state.sync_started() {
1413            assert_ne!(
1414                self.peers.n_sync_started.fetch_sub(1, Ordering::AcqRel),
1415                0,
1416                "n_sync_started overflow when tip_synced"
1417            );
1418        }
1419        peer_state.tip_synced();
1420    }
1421
1422    pub fn mark_as_known_tx(&self, hash: Byte32) {
1423        self.mark_as_known_txs(iter::once(hash));
1424    }
1425
1426    pub fn remove_from_known_txs(&self, hash: &Byte32) {
1427        self.tx_filter.lock().remove(hash);
1428    }
1429
1430    // maybe someday we can use
1431    // where T: Iterator<Item=Byte32>,
1432    // for<'a> &'a T: Iterator<Item=&'a Byte32>,
1433    pub fn mark_as_known_txs(&self, hashes: impl Iterator<Item = Byte32> + std::clone::Clone) {
1434        let mut unknown_tx_hashes = self.unknown_tx_hashes.lock();
1435        let mut tx_filter = self.tx_filter.lock();
1436
1437        for hash in hashes {
1438            unknown_tx_hashes.remove(&hash);
1439            tx_filter.insert(hash);
1440        }
1441    }
1442
1443    pub fn pop_ask_for_txs(&self) -> HashMap<PeerIndex, Vec<Byte32>> {
1444        let mut unknown_tx_hashes = self.unknown_tx_hashes.lock();
1445        let mut result: HashMap<PeerIndex, Vec<Byte32>> = HashMap::new();
1446        let now = Instant::now();
1447
1448        if !unknown_tx_hashes
1449            .peek()
1450            .map(|(_tx_hash, priority)| priority.should_request(now))
1451            .unwrap_or_default()
1452        {
1453            return result;
1454        }
1455
1456        while let Some((tx_hash, mut priority)) = unknown_tx_hashes.pop() {
1457            if priority.should_request(now) {
1458                if let Some(peer_index) = priority.next_request_peer() {
1459                    result
1460                        .entry(peer_index)
1461                        .and_modify(|hashes| hashes.push(tx_hash.clone()))
1462                        .or_insert_with(|| vec![tx_hash.clone()]);
1463                    unknown_tx_hashes.push(tx_hash, priority);
1464                }
1465            } else {
1466                unknown_tx_hashes.push(tx_hash, priority);
1467                break;
1468            }
1469        }
1470        result
1471    }
1472
1473    pub fn add_ask_for_txs(&self, peer_index: PeerIndex, tx_hashes: Vec<Byte32>) -> Status {
1474        let mut unknown_tx_hashes = self.unknown_tx_hashes.lock();
1475
1476        for tx_hash in tx_hashes
1477            .into_iter()
1478            .take(MAX_UNKNOWN_TX_HASHES_SIZE_PER_PEER)
1479        {
1480            match unknown_tx_hashes.entry(tx_hash) {
1481                keyed_priority_queue::Entry::Occupied(entry) => {
1482                    let mut priority = entry.get_priority().clone();
1483                    priority.push_peer(peer_index);
1484                    entry.set_priority(priority);
1485                }
1486                keyed_priority_queue::Entry::Vacant(entry) => {
1487                    entry.set_priority(UnknownTxHashPriority {
1488                        request_time: Instant::now(),
1489                        peers: vec![peer_index],
1490                        requested: false,
1491                    })
1492                }
1493            }
1494        }
1495
1496        // Check `unknown_tx_hashes`'s length after inserting the arrival `tx_hashes`
1497        if unknown_tx_hashes.len() >= MAX_UNKNOWN_TX_HASHES_SIZE
1498            || unknown_tx_hashes.len()
1499                >= self.peers.state.len() * MAX_UNKNOWN_TX_HASHES_SIZE_PER_PEER
1500        {
1501            warn!(
1502                "unknown_tx_hashes is too long, len: {}",
1503                unknown_tx_hashes.len()
1504            );
1505
1506            let mut peer_unknown_counter = 0;
1507            for (_hash, priority) in unknown_tx_hashes.iter() {
1508                for peer in priority.peers.iter() {
1509                    if *peer == peer_index {
1510                        peer_unknown_counter += 1;
1511                    }
1512                }
1513            }
1514            if peer_unknown_counter >= MAX_UNKNOWN_TX_HASHES_SIZE_PER_PEER {
1515                return StatusCode::TooManyUnknownTransactions.into();
1516            }
1517
1518            return Status::ignored();
1519        }
1520
1521        Status::ok()
1522    }
1523
1524    pub fn already_known_tx(&self, hash: &Byte32) -> bool {
1525        self.tx_filter.lock().contains(hash)
1526    }
1527
1528    pub fn tx_filter(&self) -> MutexGuard<TtlFilter<Byte32>> {
1529        self.tx_filter.lock()
1530    }
1531
1532    pub fn unknown_tx_hashes(
1533        &self,
1534    ) -> MutexGuard<KeyedPriorityQueue<Byte32, UnknownTxHashPriority>> {
1535        self.unknown_tx_hashes.lock()
1536    }
1537
1538    pub fn insert_inflight_proposals(
1539        &self,
1540        ids: Vec<packed::ProposalShortId>,
1541        block_number: BlockNumber,
1542    ) -> Vec<bool> {
1543        ids.into_iter()
1544            .map(|id| match self.inflight_proposals.entry(id) {
1545                dashmap::mapref::entry::Entry::Occupied(mut occupied) => {
1546                    if *occupied.get() < block_number {
1547                        occupied.insert(block_number);
1548                        true
1549                    } else {
1550                        false
1551                    }
1552                }
1553                dashmap::mapref::entry::Entry::Vacant(vacant) => {
1554                    vacant.insert(block_number);
1555                    true
1556                }
1557            })
1558            .collect()
1559    }
1560
1561    pub fn remove_inflight_proposals(&self, ids: &[packed::ProposalShortId]) -> Vec<bool> {
1562        ids.iter()
1563            .map(|id| self.inflight_proposals.remove(id).is_some())
1564            .collect()
1565    }
1566
1567    pub fn clear_expired_inflight_proposals(&self, keep_min_block_number: BlockNumber) {
1568        self.inflight_proposals
1569            .retain(|_, block_number| *block_number >= keep_min_block_number);
1570    }
1571
1572    pub fn contains_inflight_proposal(&self, proposal_id: &packed::ProposalShortId) -> bool {
1573        self.inflight_proposals.contains_key(proposal_id)
1574    }
1575
1576    pub fn drain_get_block_proposals(
1577        &self,
1578    ) -> DashMap<packed::ProposalShortId, HashSet<PeerIndex>> {
1579        let ret = self.pending_get_block_proposals.clone();
1580        self.pending_get_block_proposals.clear();
1581        ret
1582    }
1583
1584    pub fn insert_get_block_proposals(&self, pi: PeerIndex, ids: Vec<packed::ProposalShortId>) {
1585        for id in ids.into_iter() {
1586            self.pending_get_block_proposals
1587                .entry(id)
1588                .or_default()
1589                .insert(pi);
1590        }
1591    }
1592
1593    // Disconnect this peer and remove inflight blocks by peer
1594    //
1595    // TODO: record peer's connection duration (disconnect time - connect established time)
1596    // and report peer's connection duration to ckb_metrics
1597    pub fn disconnected(&self, pi: PeerIndex) {
1598        let removed_inflight_blocks_count = self.write_inflight_blocks().remove_by_peer(pi);
1599        if removed_inflight_blocks_count > 0 {
1600            debug!(
1601                "disconnected {}, remove {} inflight blocks",
1602                pi, removed_inflight_blocks_count
1603            )
1604        }
1605        self.peers().disconnected(pi);
1606    }
1607}
1608
1609/** ActiveChain captures a point-in-time view of indexed chain of blocks. */
1610#[derive(Clone)]
1611pub struct ActiveChain {
1612    sync_shared: SyncShared,
1613    snapshot: Arc<Snapshot>,
1614}
1615
1616#[doc(hidden)]
1617impl ActiveChain {
1618    pub(crate) fn sync_shared(&self) -> &SyncShared {
1619        &self.sync_shared
1620    }
1621
1622    pub fn shared(&self) -> &Shared {
1623        self.sync_shared.shared()
1624    }
1625
1626    fn store(&self) -> &ChainDB {
1627        self.sync_shared.store()
1628    }
1629
1630    pub fn state(&self) -> &SyncState {
1631        self.sync_shared.state()
1632    }
1633
1634    fn snapshot(&self) -> &Snapshot {
1635        &self.snapshot
1636    }
1637
1638    pub fn get_block_hash(&self, number: BlockNumber) -> Option<packed::Byte32> {
1639        self.snapshot().get_block_hash(number)
1640    }
1641
1642    pub fn get_block(&self, h: &packed::Byte32) -> Option<core::BlockView> {
1643        self.store().get_block(h)
1644    }
1645
1646    pub fn get_block_header(&self, h: &packed::Byte32) -> Option<core::HeaderView> {
1647        self.store().get_block_header(h)
1648    }
1649
1650    pub fn get_block_ext(&self, h: &packed::Byte32) -> Option<core::BlockExt> {
1651        self.snapshot().get_block_ext(h)
1652    }
1653
1654    pub fn get_block_filter(&self, hash: &packed::Byte32) -> Option<packed::Bytes> {
1655        self.store().get_block_filter(hash)
1656    }
1657
1658    pub fn get_block_filter_hash(&self, hash: &packed::Byte32) -> Option<packed::Byte32> {
1659        self.store().get_block_filter_hash(hash)
1660    }
1661
1662    pub fn get_latest_built_filter_block_number(&self) -> BlockNumber {
1663        self.snapshot
1664            .get_latest_built_filter_data_block_hash()
1665            .and_then(|hash| self.snapshot.get_block_number(&hash))
1666            .unwrap_or_default()
1667    }
1668
1669    pub fn total_difficulty(&self) -> &U256 {
1670        self.snapshot.total_difficulty()
1671    }
1672
1673    pub fn tip_header(&self) -> core::HeaderView {
1674        self.snapshot.tip_header().clone()
1675    }
1676
1677    pub fn tip_hash(&self) -> Byte32 {
1678        self.snapshot.tip_hash()
1679    }
1680
1681    pub fn tip_number(&self) -> BlockNumber {
1682        self.snapshot.tip_number()
1683    }
1684
1685    pub fn epoch_ext(&self) -> core::EpochExt {
1686        self.snapshot.epoch_ext().clone()
1687    }
1688
1689    pub fn is_main_chain(&self, hash: &packed::Byte32) -> bool {
1690        self.snapshot.is_main_chain(hash)
1691    }
1692    pub fn is_unverified_chain(&self, hash: &packed::Byte32) -> bool {
1693        self.store().get_block_epoch_index(hash).is_some()
1694    }
1695
1696    pub fn is_initial_block_download(&self) -> bool {
1697        self.shared().is_initial_block_download()
1698    }
1699    pub fn unverified_tip_header(&self) -> HeaderIndex {
1700        self.shared().get_unverified_tip()
1701    }
1702
1703    pub fn unverified_tip_hash(&self) -> Byte32 {
1704        self.unverified_tip_header().hash()
1705    }
1706
1707    pub fn unverified_tip_number(&self) -> BlockNumber {
1708        self.unverified_tip_header().number()
1709    }
1710
1711    pub fn get_ancestor(&self, base: &Byte32, number: BlockNumber) -> Option<HeaderIndexView> {
1712        self.get_ancestor_internal(base, number, false)
1713    }
1714
1715    pub fn get_ancestor_with_unverified(
1716        &self,
1717        base: &Byte32,
1718        number: BlockNumber,
1719    ) -> Option<HeaderIndexView> {
1720        self.get_ancestor_internal(base, number, true)
1721    }
1722
1723    fn get_ancestor_internal(
1724        &self,
1725        base: &Byte32,
1726        number: BlockNumber,
1727        with_unverified: bool,
1728    ) -> Option<HeaderIndexView> {
1729        let tip_number = {
1730            if with_unverified {
1731                self.unverified_tip_number()
1732            } else {
1733                self.tip_number()
1734            }
1735        };
1736
1737        let block_is_on_chain_fn = |hash: &Byte32| {
1738            if with_unverified {
1739                self.is_unverified_chain(hash)
1740            } else {
1741                self.is_main_chain(hash)
1742            }
1743        };
1744
1745        let get_header_view_fn = |hash: &Byte32, store_first: bool| {
1746            self.sync_shared.get_header_index_view(hash, store_first)
1747        };
1748
1749        let fast_scanner_fn = |number: BlockNumber, current: BlockNumberAndHash| {
1750            // shortcut to return an ancestor block
1751            if current.number <= tip_number && block_is_on_chain_fn(&current.hash) {
1752                self.get_block_hash(number)
1753                    .and_then(|hash| self.sync_shared.get_header_index_view(&hash, true))
1754            } else {
1755                None
1756            }
1757        };
1758
1759        self.sync_shared
1760            .get_header_index_view(base, false)?
1761            .get_ancestor(tip_number, number, get_header_view_fn, fast_scanner_fn)
1762    }
1763
1764    pub fn get_locator(&self, start: BlockNumberAndHash) -> Vec<Byte32> {
1765        let mut step = 1;
1766        let mut locator = Vec::with_capacity(32);
1767        let mut index = start.number();
1768        let mut base = start.hash();
1769
1770        loop {
1771            let header_hash = self
1772                .get_ancestor(&base, index)
1773                .unwrap_or_else(|| {
1774                    panic!(
1775                        "index calculated in get_locator: \
1776                         start: {:?}, base: {}, step: {}, locators({}): {:?}.",
1777                        start,
1778                        base,
1779                        step,
1780                        locator.len(),
1781                        locator,
1782                    )
1783                })
1784                .hash();
1785            locator.push(header_hash.clone());
1786
1787            if locator.len() >= 10 {
1788                step <<= 1;
1789            }
1790
1791            if index < step * 2 {
1792                // Insert some low-height blocks in the locator
1793                // to quickly start parallel ibd block downloads
1794                // and it should not be too much
1795                //
1796                // 100 * 365 * 86400 / 8 = 394200000  100 years block number
1797                // 2 ** 29 = 536870912
1798                // 2 ** 13 = 8192
1799                // 52 = 10 + 29 + 13
1800                if locator.len() < 52 && index > ONE_DAY_BLOCK_NUMBER {
1801                    index >>= 1;
1802                    base = header_hash;
1803                    continue;
1804                }
1805                // always include genesis hash
1806                if index != 0 {
1807                    locator.push(self.sync_shared.consensus().genesis_hash());
1808                }
1809                break;
1810            }
1811            index -= step;
1812            base = header_hash;
1813        }
1814        locator
1815    }
1816
1817    pub fn last_common_ancestor(
1818        &self,
1819        pa: &BlockNumberAndHash,
1820        pb: &BlockNumberAndHash,
1821    ) -> Option<BlockNumberAndHash> {
1822        let (mut m_left, mut m_right) = if pa.number() > pb.number() {
1823            (pb.clone(), pa.clone())
1824        } else {
1825            (pa.clone(), pb.clone())
1826        };
1827
1828        m_right = self
1829            .get_ancestor(&m_right.hash(), m_left.number())?
1830            .number_and_hash();
1831        if m_left == m_right {
1832            return Some(m_left);
1833        }
1834        debug_assert!(m_left.number() == m_right.number());
1835
1836        while m_left != m_right {
1837            m_left = self
1838                .get_ancestor(&m_left.hash(), m_left.number() - 1)?
1839                .number_and_hash();
1840            m_right = self
1841                .get_ancestor(&m_right.hash(), m_right.number() - 1)?
1842                .number_and_hash();
1843        }
1844        Some(m_left)
1845    }
1846
1847    pub fn locate_latest_common_block(
1848        &self,
1849        _hash_stop: &Byte32,
1850        locator: &[Byte32],
1851    ) -> Option<BlockNumber> {
1852        if locator.is_empty() {
1853            return None;
1854        }
1855
1856        let locator_hash = locator.last().expect("empty checked");
1857        if locator_hash != &self.sync_shared.consensus().genesis_hash() {
1858            return None;
1859        }
1860
1861        // iterator are lazy
1862        let (index, latest_common) = locator
1863            .iter()
1864            .enumerate()
1865            .map(|(index, hash)| (index, self.snapshot.get_block_number(hash)))
1866            .find(|(_index, number)| number.is_some())
1867            .expect("locator last checked");
1868
1869        if index == 0 || latest_common == Some(0) {
1870            return latest_common;
1871        }
1872
1873        if let Some(header) = locator
1874            .get(index - 1)
1875            .and_then(|hash| self.sync_shared.store().get_block_header(hash))
1876        {
1877            let mut block_hash = header.data().raw().parent_hash();
1878            loop {
1879                let block_header = match self.sync_shared.store().get_block_header(&block_hash) {
1880                    None => break latest_common,
1881                    Some(block_header) => block_header,
1882                };
1883
1884                if let Some(block_number) = self.snapshot.get_block_number(&block_hash) {
1885                    return Some(block_number);
1886                }
1887
1888                block_hash = block_header.data().raw().parent_hash();
1889            }
1890        } else {
1891            latest_common
1892        }
1893    }
1894
1895    pub fn get_locator_response(
1896        &self,
1897        block_number: BlockNumber,
1898        hash_stop: &Byte32,
1899    ) -> Vec<core::HeaderView> {
1900        let tip_number = self.tip_header().number();
1901        let max_height = cmp::min(
1902            block_number + 1 + MAX_HEADERS_LEN as BlockNumber,
1903            tip_number + 1,
1904        );
1905        (block_number + 1..max_height)
1906            .filter_map(|block_number| self.snapshot.get_block_hash(block_number))
1907            .take_while(|block_hash| block_hash != hash_stop)
1908            .filter_map(|block_hash| self.sync_shared.store().get_block_header(&block_hash))
1909            .collect()
1910    }
1911
1912    pub fn send_getheaders_to_peer(
1913        &self,
1914        nc: &dyn CKBProtocolContext,
1915        peer: PeerIndex,
1916        block_number_and_hash: BlockNumberAndHash,
1917    ) {
1918        if let Some(last_time) = self
1919            .state()
1920            .pending_get_headers
1921            .write()
1922            .get(&(peer, block_number_and_hash.hash()))
1923        {
1924            if Instant::now() < *last_time + GET_HEADERS_TIMEOUT {
1925                debug!(
1926                    "Last get_headers request to peer {} is less than {:?}; Ignore it.",
1927                    peer, GET_HEADERS_TIMEOUT,
1928                );
1929                return;
1930            } else {
1931                debug!(
1932                    "Can not get headers from {} in {:?}, retry",
1933                    peer, GET_HEADERS_TIMEOUT,
1934                );
1935            }
1936        }
1937        self.state()
1938            .pending_get_headers
1939            .write()
1940            .put((peer, block_number_and_hash.hash()), Instant::now());
1941
1942        debug!(
1943            "send_getheaders_to_peer peer={}, hash={}",
1944            peer,
1945            block_number_and_hash.hash()
1946        );
1947        let locator_hash = self.get_locator(block_number_and_hash);
1948        let content = packed::GetHeaders::new_builder()
1949            .block_locator_hashes(locator_hash)
1950            .hash_stop(packed::Byte32::zero())
1951            .build();
1952        let message = packed::SyncMessage::new_builder().set(content).build();
1953        let _status = send_message(SupportProtocols::Sync.protocol_id(), nc, peer, &message);
1954    }
1955
1956    pub fn get_block_status(&self, block_hash: &Byte32) -> BlockStatus {
1957        self.shared().get_block_status(block_hash)
1958    }
1959
1960    pub fn contains_block_status(&self, block_hash: &Byte32, status: BlockStatus) -> bool {
1961        self.get_block_status(block_hash).contains(status)
1962    }
1963}
1964
1965/// The `IBDState` enum represents whether the node is currently in the IBD process (`In`) or has
1966/// completed it (`Out`).
1967#[derive(Clone, Copy, Debug)]
1968pub enum IBDState {
1969    In,
1970    Out,
1971}
1972
1973impl From<bool> for IBDState {
1974    fn from(src: bool) -> Self {
1975        if src { IBDState::In } else { IBDState::Out }
1976    }
1977}
1978
1979impl From<IBDState> for bool {
1980    fn from(s: IBDState) -> bool {
1981        match s {
1982            IBDState::In => true,
1983            IBDState::Out => false,
1984        }
1985    }
1986}
1987
1988pub(crate) fn post_sync_process(
1989    nc: &dyn CKBProtocolContext,
1990    peer: PeerIndex,
1991    item_name: &str,
1992    status: Status,
1993) {
1994    if let Some(ban_time) = status.should_ban() {
1995        error!(
1996            "Receive {} from {}. Ban {:?} for {}",
1997            item_name, peer, ban_time, status
1998        );
1999        nc.ban_peer(peer, ban_time, status.to_string());
2000    } else if status.should_warn() {
2001        warn!("Receive {} from {}, {}", item_name, peer, status);
2002    } else if !status.is_ok() {
2003        debug!("Receive {} from {}, {}", item_name, peer, status);
2004    }
2005}